1import unittest
2import textwrap
3from test import support, mock_socket
4import socket
5import io
6import smtpd
7import asyncore
8
9
10class DummyServer(smtpd.SMTPServer):
11    def __init__(self, *args, **kwargs):
12        smtpd.SMTPServer.__init__(self, *args, **kwargs)
13        self.messages = []
14        if self._decode_data:
15            self.return_status = 'return status'
16        else:
17            self.return_status = b'return status'
18
19    def process_message(self, peer, mailfrom, rcpttos, data, **kw):
20        self.messages.append((peer, mailfrom, rcpttos, data))
21        if data == self.return_status:
22            return '250 Okish'
23        if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']:
24            return '250 SMTPUTF8 message okish'
25
26
27class DummyDispatcherBroken(Exception):
28    pass
29
30
31class BrokenDummyServer(DummyServer):
32    def listen(self, num):
33        raise DummyDispatcherBroken()
34
35
36class SMTPDServerTest(unittest.TestCase):
37    def setUp(self):
38        smtpd.socket = asyncore.socket = mock_socket
39
40    def test_process_message_unimplemented(self):
41        server = smtpd.SMTPServer((support.HOST, 0), ('b', 0),
42                                  decode_data=True)
43        conn, addr = server.accept()
44        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
45
46        def write_line(line):
47            channel.socket.queue_recv(line)
48            channel.handle_read()
49
50        write_line(b'HELO example')
51        write_line(b'MAIL From:eggs@example')
52        write_line(b'RCPT To:spam@example')
53        write_line(b'DATA')
54        self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
55
56    def test_decode_data_and_enable_SMTPUTF8_raises(self):
57        self.assertRaises(
58            ValueError,
59            smtpd.SMTPServer,
60            (support.HOST, 0),
61            ('b', 0),
62            enable_SMTPUTF8=True,
63            decode_data=True)
64
65    def tearDown(self):
66        asyncore.close_all()
67        asyncore.socket = smtpd.socket = socket
68
69
70class DebuggingServerTest(unittest.TestCase):
71
72    def setUp(self):
73        smtpd.socket = asyncore.socket = mock_socket
74
75    def send_data(self, channel, data, enable_SMTPUTF8=False):
76        def write_line(line):
77            channel.socket.queue_recv(line)
78            channel.handle_read()
79        write_line(b'EHLO example')
80        if enable_SMTPUTF8:
81            write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')
82        else:
83            write_line(b'MAIL From:eggs@example')
84        write_line(b'RCPT To:spam@example')
85        write_line(b'DATA')
86        write_line(data)
87        write_line(b'.')
88
89    def test_process_message_with_decode_data_true(self):
90        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
91                                       decode_data=True)
92        conn, addr = server.accept()
93        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
94        with support.captured_stdout() as s:
95            self.send_data(channel, b'From: test\n\nhello\n')
96        stdout = s.getvalue()
97        self.assertEqual(stdout, textwrap.dedent("""\
98             ---------- MESSAGE FOLLOWS ----------
99             From: test
100             X-Peer: peer-address
101
102             hello
103             ------------ END MESSAGE ------------
104             """))
105
106    def test_process_message_with_decode_data_false(self):
107        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0))
108        conn, addr = server.accept()
109        channel = smtpd.SMTPChannel(server, conn, addr)
110        with support.captured_stdout() as s:
111            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
112        stdout = s.getvalue()
113        self.assertEqual(stdout, textwrap.dedent("""\
114             ---------- MESSAGE FOLLOWS ----------
115             b'From: test'
116             b'X-Peer: peer-address'
117             b''
118             b'h\\xc3\\xa9llo\\xff'
119             ------------ END MESSAGE ------------
120             """))
121
122    def test_process_message_with_enable_SMTPUTF8_true(self):
123        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
124                                       enable_SMTPUTF8=True)
125        conn, addr = server.accept()
126        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
127        with support.captured_stdout() as s:
128            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
129        stdout = s.getvalue()
130        self.assertEqual(stdout, textwrap.dedent("""\
131             ---------- MESSAGE FOLLOWS ----------
132             b'From: test'
133             b'X-Peer: peer-address'
134             b''
135             b'h\\xc3\\xa9llo\\xff'
136             ------------ END MESSAGE ------------
137             """))
138
139    def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
140        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
141                                       enable_SMTPUTF8=True)
142        conn, addr = server.accept()
143        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
144        with support.captured_stdout() as s:
145            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
146                           enable_SMTPUTF8=True)
147        stdout = s.getvalue()
148        self.assertEqual(stdout, textwrap.dedent("""\
149             ---------- MESSAGE FOLLOWS ----------
150             mail options: ['BODY=8BITMIME', 'SMTPUTF8']
151             b'From: test'
152             b'X-Peer: peer-address'
153             b''
154             b'h\\xc3\\xa9llo\\xff'
155             ------------ END MESSAGE ------------
156             """))
157
158    def tearDown(self):
159        asyncore.close_all()
160        asyncore.socket = smtpd.socket = socket
161
162
163class TestFamilyDetection(unittest.TestCase):
164    def setUp(self):
165        smtpd.socket = asyncore.socket = mock_socket
166
167    def tearDown(self):
168        asyncore.close_all()
169        asyncore.socket = smtpd.socket = socket
170
171    @unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
172    def test_socket_uses_IPv6(self):
173        server = smtpd.SMTPServer((support.HOSTv6, 0), (support.HOSTv4, 0))
174        self.assertEqual(server.socket.family, socket.AF_INET6)
175
176    def test_socket_uses_IPv4(self):
177        server = smtpd.SMTPServer((support.HOSTv4, 0), (support.HOSTv6, 0))
178        self.assertEqual(server.socket.family, socket.AF_INET)
179
180
181class TestRcptOptionParsing(unittest.TestCase):
182    error_response = (b'555 RCPT TO parameters not recognized or not '
183                      b'implemented\r\n')
184
185    def setUp(self):
186        smtpd.socket = asyncore.socket = mock_socket
187        self.old_debugstream = smtpd.DEBUGSTREAM
188        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
189
190    def tearDown(self):
191        asyncore.close_all()
192        asyncore.socket = smtpd.socket = socket
193        smtpd.DEBUGSTREAM = self.old_debugstream
194
195    def write_line(self, channel, line):
196        channel.socket.queue_recv(line)
197        channel.handle_read()
198
199    def test_params_rejected(self):
200        server = DummyServer((support.HOST, 0), ('b', 0))
201        conn, addr = server.accept()
202        channel = smtpd.SMTPChannel(server, conn, addr)
203        self.write_line(channel, b'EHLO example')
204        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
205        self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar')
206        self.assertEqual(channel.socket.last, self.error_response)
207
208    def test_nothing_accepted(self):
209        server = DummyServer((support.HOST, 0), ('b', 0))
210        conn, addr = server.accept()
211        channel = smtpd.SMTPChannel(server, conn, addr)
212        self.write_line(channel, b'EHLO example')
213        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
214        self.write_line(channel, b'RCPT to: <foo@example.com>')
215        self.assertEqual(channel.socket.last, b'250 OK\r\n')
216
217
218class TestMailOptionParsing(unittest.TestCase):
219    error_response = (b'555 MAIL FROM parameters not recognized or not '
220                      b'implemented\r\n')
221
222    def setUp(self):
223        smtpd.socket = asyncore.socket = mock_socket
224        self.old_debugstream = smtpd.DEBUGSTREAM
225        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
226
227    def tearDown(self):
228        asyncore.close_all()
229        asyncore.socket = smtpd.socket = socket
230        smtpd.DEBUGSTREAM = self.old_debugstream
231
232    def write_line(self, channel, line):
233        channel.socket.queue_recv(line)
234        channel.handle_read()
235
236    def test_with_decode_data_true(self):
237        server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True)
238        conn, addr = server.accept()
239        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
240        self.write_line(channel, b'EHLO example')
241        for line in [
242            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
243            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
244            b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN',
245            b'MAIL from: <foo@example.com> size=20 body=8bitmime',
246        ]:
247            self.write_line(channel, line)
248            self.assertEqual(channel.socket.last, self.error_response)
249        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
250        self.assertEqual(channel.socket.last, b'250 OK\r\n')
251
252    def test_with_decode_data_false(self):
253        server = DummyServer((support.HOST, 0), ('b', 0))
254        conn, addr = server.accept()
255        channel = smtpd.SMTPChannel(server, conn, addr)
256        self.write_line(channel, b'EHLO example')
257        for line in [
258            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
259            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
260        ]:
261            self.write_line(channel, line)
262            self.assertEqual(channel.socket.last, self.error_response)
263        self.write_line(
264            channel,
265            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
266        self.assertEqual(
267            channel.socket.last,
268            b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
269        self.write_line(
270            channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')
271        self.assertEqual(channel.socket.last, b'250 OK\r\n')
272
273    def test_with_enable_smtputf8_true(self):
274        server = DummyServer((support.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
275        conn, addr = server.accept()
276        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
277        self.write_line(channel, b'EHLO example')
278        self.write_line(
279            channel,
280            b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8')
281        self.assertEqual(channel.socket.last, b'250 OK\r\n')
282
283
284class SMTPDChannelTest(unittest.TestCase):
285    def setUp(self):
286        smtpd.socket = asyncore.socket = mock_socket
287        self.old_debugstream = smtpd.DEBUGSTREAM
288        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
289        self.server = DummyServer((support.HOST, 0), ('b', 0),
290                                  decode_data=True)
291        conn, addr = self.server.accept()
292        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
293                                         decode_data=True)
294
295    def tearDown(self):
296        asyncore.close_all()
297        asyncore.socket = smtpd.socket = socket
298        smtpd.DEBUGSTREAM = self.old_debugstream
299
300    def write_line(self, line):
301        self.channel.socket.queue_recv(line)
302        self.channel.handle_read()
303
304    def test_broken_connect(self):
305        self.assertRaises(
306            DummyDispatcherBroken, BrokenDummyServer,
307            (support.HOST, 0), ('b', 0), decode_data=True)
308
309    def test_decode_data_and_enable_SMTPUTF8_raises(self):
310        self.assertRaises(
311            ValueError, smtpd.SMTPChannel,
312            self.server, self.channel.conn, self.channel.addr,
313            enable_SMTPUTF8=True, decode_data=True)
314
315    def test_server_accept(self):
316        self.server.handle_accept()
317
318    def test_missing_data(self):
319        self.write_line(b'')
320        self.assertEqual(self.channel.socket.last,
321                         b'500 Error: bad syntax\r\n')
322
323    def test_EHLO(self):
324        self.write_line(b'EHLO example')
325        self.assertEqual(self.channel.socket.last, b'250 HELP\r\n')
326
327    def test_EHLO_bad_syntax(self):
328        self.write_line(b'EHLO')
329        self.assertEqual(self.channel.socket.last,
330                         b'501 Syntax: EHLO hostname\r\n')
331
332    def test_EHLO_duplicate(self):
333        self.write_line(b'EHLO example')
334        self.write_line(b'EHLO example')
335        self.assertEqual(self.channel.socket.last,
336                         b'503 Duplicate HELO/EHLO\r\n')
337
338    def test_EHLO_HELO_duplicate(self):
339        self.write_line(b'EHLO example')
340        self.write_line(b'HELO example')
341        self.assertEqual(self.channel.socket.last,
342                         b'503 Duplicate HELO/EHLO\r\n')
343
344    def test_HELO(self):
345        name = smtpd.socket.getfqdn()
346        self.write_line(b'HELO example')
347        self.assertEqual(self.channel.socket.last,
348                         '250 {}\r\n'.format(name).encode('ascii'))
349
350    def test_HELO_EHLO_duplicate(self):
351        self.write_line(b'HELO example')
352        self.write_line(b'EHLO example')
353        self.assertEqual(self.channel.socket.last,
354                         b'503 Duplicate HELO/EHLO\r\n')
355
356    def test_HELP(self):
357        self.write_line(b'HELP')
358        self.assertEqual(self.channel.socket.last,
359                         b'250 Supported commands: EHLO HELO MAIL RCPT ' + \
360                         b'DATA RSET NOOP QUIT VRFY\r\n')
361
362    def test_HELP_command(self):
363        self.write_line(b'HELP MAIL')
364        self.assertEqual(self.channel.socket.last,
365                         b'250 Syntax: MAIL FROM: <address>\r\n')
366
367    def test_HELP_command_unknown(self):
368        self.write_line(b'HELP SPAM')
369        self.assertEqual(self.channel.socket.last,
370                         b'501 Supported commands: EHLO HELO MAIL RCPT ' + \
371                         b'DATA RSET NOOP QUIT VRFY\r\n')
372
373    def test_HELO_bad_syntax(self):
374        self.write_line(b'HELO')
375        self.assertEqual(self.channel.socket.last,
376                         b'501 Syntax: HELO hostname\r\n')
377
378    def test_HELO_duplicate(self):
379        self.write_line(b'HELO example')
380        self.write_line(b'HELO example')
381        self.assertEqual(self.channel.socket.last,
382                         b'503 Duplicate HELO/EHLO\r\n')
383
384    def test_HELO_parameter_rejected_when_extensions_not_enabled(self):
385        self.extended_smtp = False
386        self.write_line(b'HELO example')
387        self.write_line(b'MAIL from:<foo@example.com> SIZE=1234')
388        self.assertEqual(self.channel.socket.last,
389                         b'501 Syntax: MAIL FROM: <address>\r\n')
390
391    def test_MAIL_allows_space_after_colon(self):
392        self.write_line(b'HELO example')
393        self.write_line(b'MAIL from:   <foo@example.com>')
394        self.assertEqual(self.channel.socket.last,
395                         b'250 OK\r\n')
396
397    def test_extended_MAIL_allows_space_after_colon(self):
398        self.write_line(b'EHLO example')
399        self.write_line(b'MAIL from:   <foo@example.com> size=20')
400        self.assertEqual(self.channel.socket.last,
401                         b'250 OK\r\n')
402
403    def test_NOOP(self):
404        self.write_line(b'NOOP')
405        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
406
407    def test_HELO_NOOP(self):
408        self.write_line(b'HELO example')
409        self.write_line(b'NOOP')
410        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
411
412    def test_NOOP_bad_syntax(self):
413        self.write_line(b'NOOP hi')
414        self.assertEqual(self.channel.socket.last,
415                         b'501 Syntax: NOOP\r\n')
416
417    def test_QUIT(self):
418        self.write_line(b'QUIT')
419        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
420
421    def test_HELO_QUIT(self):
422        self.write_line(b'HELO example')
423        self.write_line(b'QUIT')
424        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
425
426    def test_QUIT_arg_ignored(self):
427        self.write_line(b'QUIT bye bye')
428        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
429
430    def test_bad_state(self):
431        self.channel.smtp_state = 'BAD STATE'
432        self.write_line(b'HELO example')
433        self.assertEqual(self.channel.socket.last,
434                         b'451 Internal confusion\r\n')
435
436    def test_command_too_long(self):
437        self.write_line(b'HELO example')
438        self.write_line(b'MAIL from: ' +
439                        b'a' * self.channel.command_size_limit +
440                        b'@example')
441        self.assertEqual(self.channel.socket.last,
442                         b'500 Error: line too long\r\n')
443
444    def test_MAIL_command_limit_extended_with_SIZE(self):
445        self.write_line(b'EHLO example')
446        fill_len = self.channel.command_size_limit - len('MAIL from:<@example>')
447        self.write_line(b'MAIL from:<' +
448                        b'a' * fill_len +
449                        b'@example> SIZE=1234')
450        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
451
452        self.write_line(b'MAIL from:<' +
453                        b'a' * (fill_len + 26) +
454                        b'@example> SIZE=1234')
455        self.assertEqual(self.channel.socket.last,
456                         b'500 Error: line too long\r\n')
457
458    def test_MAIL_command_rejects_SMTPUTF8_by_default(self):
459        self.write_line(b'EHLO example')
460        self.write_line(
461            b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8')
462        self.assertEqual(self.channel.socket.last[0:1], b'5')
463
464    def test_data_longer_than_default_data_size_limit(self):
465        # Hack the default so we don't have to generate so much data.
466        self.channel.data_size_limit = 1048
467        self.write_line(b'HELO example')
468        self.write_line(b'MAIL From:eggs@example')
469        self.write_line(b'RCPT To:spam@example')
470        self.write_line(b'DATA')
471        self.write_line(b'A' * self.channel.data_size_limit +
472                        b'A\r\n.')
473        self.assertEqual(self.channel.socket.last,
474                         b'552 Error: Too much mail data\r\n')
475
476    def test_MAIL_size_parameter(self):
477        self.write_line(b'EHLO example')
478        self.write_line(b'MAIL FROM:<eggs@example> SIZE=512')
479        self.assertEqual(self.channel.socket.last,
480                         b'250 OK\r\n')
481
482    def test_MAIL_invalid_size_parameter(self):
483        self.write_line(b'EHLO example')
484        self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid')
485        self.assertEqual(self.channel.socket.last,
486            b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
487
488    def test_MAIL_RCPT_unknown_parameters(self):
489        self.write_line(b'EHLO example')
490        self.write_line(b'MAIL FROM:<eggs@example> ham=green')
491        self.assertEqual(self.channel.socket.last,
492            b'555 MAIL FROM parameters not recognized or not implemented\r\n')
493
494        self.write_line(b'MAIL FROM:<eggs@example>')
495        self.write_line(b'RCPT TO:<eggs@example> ham=green')
496        self.assertEqual(self.channel.socket.last,
497            b'555 RCPT TO parameters not recognized or not implemented\r\n')
498
499    def test_MAIL_size_parameter_larger_than_default_data_size_limit(self):
500        self.channel.data_size_limit = 1048
501        self.write_line(b'EHLO example')
502        self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096')
503        self.assertEqual(self.channel.socket.last,
504            b'552 Error: message size exceeds fixed maximum message size\r\n')
505
506    def test_need_MAIL(self):
507        self.write_line(b'HELO example')
508        self.write_line(b'RCPT to:spam@example')
509        self.assertEqual(self.channel.socket.last,
510            b'503 Error: need MAIL command\r\n')
511
512    def test_MAIL_syntax_HELO(self):
513        self.write_line(b'HELO example')
514        self.write_line(b'MAIL from eggs@example')
515        self.assertEqual(self.channel.socket.last,
516            b'501 Syntax: MAIL FROM: <address>\r\n')
517
518    def test_MAIL_syntax_EHLO(self):
519        self.write_line(b'EHLO example')
520        self.write_line(b'MAIL from eggs@example')
521        self.assertEqual(self.channel.socket.last,
522            b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
523
524    def test_MAIL_missing_address(self):
525        self.write_line(b'HELO example')
526        self.write_line(b'MAIL from:')
527        self.assertEqual(self.channel.socket.last,
528            b'501 Syntax: MAIL FROM: <address>\r\n')
529
530    def test_MAIL_chevrons(self):
531        self.write_line(b'HELO example')
532        self.write_line(b'MAIL from:<eggs@example>')
533        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
534
535    def test_MAIL_empty_chevrons(self):
536        self.write_line(b'EHLO example')
537        self.write_line(b'MAIL from:<>')
538        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
539
540    def test_MAIL_quoted_localpart(self):
541        self.write_line(b'EHLO example')
542        self.write_line(b'MAIL from: <"Fred Blogs"@example.com>')
543        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
544        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
545
546    def test_MAIL_quoted_localpart_no_angles(self):
547        self.write_line(b'EHLO example')
548        self.write_line(b'MAIL from: "Fred Blogs"@example.com')
549        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
550        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
551
552    def test_MAIL_quoted_localpart_with_size(self):
553        self.write_line(b'EHLO example')
554        self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000')
555        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
556        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
557
558    def test_MAIL_quoted_localpart_with_size_no_angles(self):
559        self.write_line(b'EHLO example')
560        self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000')
561        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
562        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
563
564    def test_nested_MAIL(self):
565        self.write_line(b'HELO example')
566        self.write_line(b'MAIL from:eggs@example')
567        self.write_line(b'MAIL from:spam@example')
568        self.assertEqual(self.channel.socket.last,
569            b'503 Error: nested MAIL command\r\n')
570
571    def test_VRFY(self):
572        self.write_line(b'VRFY eggs@example')
573        self.assertEqual(self.channel.socket.last,
574            b'252 Cannot VRFY user, but will accept message and attempt ' + \
575            b'delivery\r\n')
576
577    def test_VRFY_syntax(self):
578        self.write_line(b'VRFY')
579        self.assertEqual(self.channel.socket.last,
580            b'501 Syntax: VRFY <address>\r\n')
581
582    def test_EXPN_not_implemented(self):
583        self.write_line(b'EXPN')
584        self.assertEqual(self.channel.socket.last,
585            b'502 EXPN not implemented\r\n')
586
587    def test_no_HELO_MAIL(self):
588        self.write_line(b'MAIL from:<foo@example.com>')
589        self.assertEqual(self.channel.socket.last,
590                         b'503 Error: send HELO first\r\n')
591
592    def test_need_RCPT(self):
593        self.write_line(b'HELO example')
594        self.write_line(b'MAIL From:eggs@example')
595        self.write_line(b'DATA')
596        self.assertEqual(self.channel.socket.last,
597            b'503 Error: need RCPT command\r\n')
598
599    def test_RCPT_syntax_HELO(self):
600        self.write_line(b'HELO example')
601        self.write_line(b'MAIL From: eggs@example')
602        self.write_line(b'RCPT to eggs@example')
603        self.assertEqual(self.channel.socket.last,
604            b'501 Syntax: RCPT TO: <address>\r\n')
605
606    def test_RCPT_syntax_EHLO(self):
607        self.write_line(b'EHLO example')
608        self.write_line(b'MAIL From: eggs@example')
609        self.write_line(b'RCPT to eggs@example')
610        self.assertEqual(self.channel.socket.last,
611            b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n')
612
613    def test_RCPT_lowercase_to_OK(self):
614        self.write_line(b'HELO example')
615        self.write_line(b'MAIL From: eggs@example')
616        self.write_line(b'RCPT to: <eggs@example>')
617        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
618
619    def test_no_HELO_RCPT(self):
620        self.write_line(b'RCPT to eggs@example')
621        self.assertEqual(self.channel.socket.last,
622                         b'503 Error: send HELO first\r\n')
623
624    def test_data_dialog(self):
625        self.write_line(b'HELO example')
626        self.write_line(b'MAIL From:eggs@example')
627        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
628        self.write_line(b'RCPT To:spam@example')
629        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
630
631        self.write_line(b'DATA')
632        self.assertEqual(self.channel.socket.last,
633            b'354 End data with <CR><LF>.<CR><LF>\r\n')
634        self.write_line(b'data\r\nmore\r\n.')
635        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
636        self.assertEqual(self.server.messages,
637            [(('peer-address', 'peer-port'),
638              'eggs@example',
639              ['spam@example'],
640              'data\nmore')])
641
642    def test_DATA_syntax(self):
643        self.write_line(b'HELO example')
644        self.write_line(b'MAIL From:eggs@example')
645        self.write_line(b'RCPT To:spam@example')
646        self.write_line(b'DATA spam')
647        self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n')
648
649    def test_no_HELO_DATA(self):
650        self.write_line(b'DATA spam')
651        self.assertEqual(self.channel.socket.last,
652                         b'503 Error: send HELO first\r\n')
653
654    def test_data_transparency_section_4_5_2(self):
655        self.write_line(b'HELO example')
656        self.write_line(b'MAIL From:eggs@example')
657        self.write_line(b'RCPT To:spam@example')
658        self.write_line(b'DATA')
659        self.write_line(b'..\r\n.\r\n')
660        self.assertEqual(self.channel.received_data, '.')
661
662    def test_multiple_RCPT(self):
663        self.write_line(b'HELO example')
664        self.write_line(b'MAIL From:eggs@example')
665        self.write_line(b'RCPT To:spam@example')
666        self.write_line(b'RCPT To:ham@example')
667        self.write_line(b'DATA')
668        self.write_line(b'data\r\n.')
669        self.assertEqual(self.server.messages,
670            [(('peer-address', 'peer-port'),
671              'eggs@example',
672              ['spam@example','ham@example'],
673              'data')])
674
675    def test_manual_status(self):
676        # checks that the Channel is able to return a custom status message
677        self.write_line(b'HELO example')
678        self.write_line(b'MAIL From:eggs@example')
679        self.write_line(b'RCPT To:spam@example')
680        self.write_line(b'DATA')
681        self.write_line(b'return status\r\n.')
682        self.assertEqual(self.channel.socket.last, b'250 Okish\r\n')
683
684    def test_RSET(self):
685        self.write_line(b'HELO example')
686        self.write_line(b'MAIL From:eggs@example')
687        self.write_line(b'RCPT To:spam@example')
688        self.write_line(b'RSET')
689        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
690        self.write_line(b'MAIL From:foo@example')
691        self.write_line(b'RCPT To:eggs@example')
692        self.write_line(b'DATA')
693        self.write_line(b'data\r\n.')
694        self.assertEqual(self.server.messages,
695            [(('peer-address', 'peer-port'),
696               'foo@example',
697               ['eggs@example'],
698               'data')])
699
700    def test_HELO_RSET(self):
701        self.write_line(b'HELO example')
702        self.write_line(b'RSET')
703        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
704
705    def test_RSET_syntax(self):
706        self.write_line(b'RSET hi')
707        self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n')
708
709    def test_unknown_command(self):
710        self.write_line(b'UNKNOWN_CMD')
711        self.assertEqual(self.channel.socket.last,
712                         b'500 Error: command "UNKNOWN_CMD" not ' + \
713                         b'recognized\r\n')
714
715    def test_attribute_deprecations(self):
716        with support.check_warnings(('', DeprecationWarning)):
717            spam = self.channel._SMTPChannel__server
718        with support.check_warnings(('', DeprecationWarning)):
719            self.channel._SMTPChannel__server = 'spam'
720        with support.check_warnings(('', DeprecationWarning)):
721            spam = self.channel._SMTPChannel__line
722        with support.check_warnings(('', DeprecationWarning)):
723            self.channel._SMTPChannel__line = 'spam'
724        with support.check_warnings(('', DeprecationWarning)):
725            spam = self.channel._SMTPChannel__state
726        with support.check_warnings(('', DeprecationWarning)):
727            self.channel._SMTPChannel__state = 'spam'
728        with support.check_warnings(('', DeprecationWarning)):
729            spam = self.channel._SMTPChannel__greeting
730        with support.check_warnings(('', DeprecationWarning)):
731            self.channel._SMTPChannel__greeting = 'spam'
732        with support.check_warnings(('', DeprecationWarning)):
733            spam = self.channel._SMTPChannel__mailfrom
734        with support.check_warnings(('', DeprecationWarning)):
735            self.channel._SMTPChannel__mailfrom = 'spam'
736        with support.check_warnings(('', DeprecationWarning)):
737            spam = self.channel._SMTPChannel__rcpttos
738        with support.check_warnings(('', DeprecationWarning)):
739            self.channel._SMTPChannel__rcpttos = 'spam'
740        with support.check_warnings(('', DeprecationWarning)):
741            spam = self.channel._SMTPChannel__data
742        with support.check_warnings(('', DeprecationWarning)):
743            self.channel._SMTPChannel__data = 'spam'
744        with support.check_warnings(('', DeprecationWarning)):
745            spam = self.channel._SMTPChannel__fqdn
746        with support.check_warnings(('', DeprecationWarning)):
747            self.channel._SMTPChannel__fqdn = 'spam'
748        with support.check_warnings(('', DeprecationWarning)):
749            spam = self.channel._SMTPChannel__peer
750        with support.check_warnings(('', DeprecationWarning)):
751            self.channel._SMTPChannel__peer = 'spam'
752        with support.check_warnings(('', DeprecationWarning)):
753            spam = self.channel._SMTPChannel__conn
754        with support.check_warnings(('', DeprecationWarning)):
755            self.channel._SMTPChannel__conn = 'spam'
756        with support.check_warnings(('', DeprecationWarning)):
757            spam = self.channel._SMTPChannel__addr
758        with support.check_warnings(('', DeprecationWarning)):
759            self.channel._SMTPChannel__addr = 'spam'
760
761@unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
762class SMTPDChannelIPv6Test(SMTPDChannelTest):
763    def setUp(self):
764        smtpd.socket = asyncore.socket = mock_socket
765        self.old_debugstream = smtpd.DEBUGSTREAM
766        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
767        self.server = DummyServer((support.HOSTv6, 0), ('b', 0),
768                                  decode_data=True)
769        conn, addr = self.server.accept()
770        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
771                                         decode_data=True)
772
773class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
774
775    def setUp(self):
776        smtpd.socket = asyncore.socket = mock_socket
777        self.old_debugstream = smtpd.DEBUGSTREAM
778        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
779        self.server = DummyServer((support.HOST, 0), ('b', 0),
780                                  decode_data=True)
781        conn, addr = self.server.accept()
782        # Set DATA size limit to 32 bytes for easy testing
783        self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
784                                         decode_data=True)
785
786    def tearDown(self):
787        asyncore.close_all()
788        asyncore.socket = smtpd.socket = socket
789        smtpd.DEBUGSTREAM = self.old_debugstream
790
791    def write_line(self, line):
792        self.channel.socket.queue_recv(line)
793        self.channel.handle_read()
794
795    def test_data_limit_dialog(self):
796        self.write_line(b'HELO example')
797        self.write_line(b'MAIL From:eggs@example')
798        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
799        self.write_line(b'RCPT To:spam@example')
800        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
801
802        self.write_line(b'DATA')
803        self.assertEqual(self.channel.socket.last,
804            b'354 End data with <CR><LF>.<CR><LF>\r\n')
805        self.write_line(b'data\r\nmore\r\n.')
806        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
807        self.assertEqual(self.server.messages,
808            [(('peer-address', 'peer-port'),
809              'eggs@example',
810              ['spam@example'],
811              'data\nmore')])
812
813    def test_data_limit_dialog_too_much_data(self):
814        self.write_line(b'HELO example')
815        self.write_line(b'MAIL From:eggs@example')
816        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
817        self.write_line(b'RCPT To:spam@example')
818        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
819
820        self.write_line(b'DATA')
821        self.assertEqual(self.channel.socket.last,
822            b'354 End data with <CR><LF>.<CR><LF>\r\n')
823        self.write_line(b'This message is longer than 32 bytes\r\n.')
824        self.assertEqual(self.channel.socket.last,
825                         b'552 Error: Too much mail data\r\n')
826
827
828class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
829
830    def setUp(self):
831        smtpd.socket = asyncore.socket = mock_socket
832        self.old_debugstream = smtpd.DEBUGSTREAM
833        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
834        self.server = DummyServer((support.HOST, 0), ('b', 0))
835        conn, addr = self.server.accept()
836        self.channel = smtpd.SMTPChannel(self.server, conn, addr)
837
838    def tearDown(self):
839        asyncore.close_all()
840        asyncore.socket = smtpd.socket = socket
841        smtpd.DEBUGSTREAM = self.old_debugstream
842
843    def write_line(self, line):
844        self.channel.socket.queue_recv(line)
845        self.channel.handle_read()
846
847    def test_ascii_data(self):
848        self.write_line(b'HELO example')
849        self.write_line(b'MAIL From:eggs@example')
850        self.write_line(b'RCPT To:spam@example')
851        self.write_line(b'DATA')
852        self.write_line(b'plain ascii text')
853        self.write_line(b'.')
854        self.assertEqual(self.channel.received_data, b'plain ascii text')
855
856    def test_utf8_data(self):
857        self.write_line(b'HELO example')
858        self.write_line(b'MAIL From:eggs@example')
859        self.write_line(b'RCPT To:spam@example')
860        self.write_line(b'DATA')
861        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
862        self.write_line(b'and some plain ascii')
863        self.write_line(b'.')
864        self.assertEqual(
865            self.channel.received_data,
866            b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n'
867                b'and some plain ascii')
868
869
870class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
871
872    def setUp(self):
873        smtpd.socket = asyncore.socket = mock_socket
874        self.old_debugstream = smtpd.DEBUGSTREAM
875        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
876        self.server = DummyServer((support.HOST, 0), ('b', 0),
877                                  decode_data=True)
878        conn, addr = self.server.accept()
879        # Set decode_data to True
880        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
881                decode_data=True)
882
883    def tearDown(self):
884        asyncore.close_all()
885        asyncore.socket = smtpd.socket = socket
886        smtpd.DEBUGSTREAM = self.old_debugstream
887
888    def write_line(self, line):
889        self.channel.socket.queue_recv(line)
890        self.channel.handle_read()
891
892    def test_ascii_data(self):
893        self.write_line(b'HELO example')
894        self.write_line(b'MAIL From:eggs@example')
895        self.write_line(b'RCPT To:spam@example')
896        self.write_line(b'DATA')
897        self.write_line(b'plain ascii text')
898        self.write_line(b'.')
899        self.assertEqual(self.channel.received_data, 'plain ascii text')
900
901    def test_utf8_data(self):
902        self.write_line(b'HELO example')
903        self.write_line(b'MAIL From:eggs@example')
904        self.write_line(b'RCPT To:spam@example')
905        self.write_line(b'DATA')
906        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
907        self.write_line(b'and some plain ascii')
908        self.write_line(b'.')
909        self.assertEqual(
910            self.channel.received_data,
911            'utf8 enriched text: żźć\nand some plain ascii')
912
913
914class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
915    def setUp(self):
916        smtpd.socket = asyncore.socket = mock_socket
917        self.old_debugstream = smtpd.DEBUGSTREAM
918        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
919        self.server = DummyServer((support.HOST, 0), ('b', 0),
920                                  enable_SMTPUTF8=True)
921        conn, addr = self.server.accept()
922        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
923                                         enable_SMTPUTF8=True)
924
925    def tearDown(self):
926        asyncore.close_all()
927        asyncore.socket = smtpd.socket = socket
928        smtpd.DEBUGSTREAM = self.old_debugstream
929
930    def write_line(self, line):
931        self.channel.socket.queue_recv(line)
932        self.channel.handle_read()
933
934    def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):
935        self.write_line(b'EHLO example')
936        self.write_line(
937            'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode(
938                'utf-8')
939        )
940        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
941
942    def test_process_smtputf8_message(self):
943        self.write_line(b'EHLO example')
944        for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:
945            self.write_line(b'MAIL from: <a@example> ' + mail_parameters)
946            self.assertEqual(self.channel.socket.last[0:3], b'250')
947            self.write_line(b'rcpt to:<b@example.com>')
948            self.assertEqual(self.channel.socket.last[0:3], b'250')
949            self.write_line(b'data')
950            self.assertEqual(self.channel.socket.last[0:3], b'354')
951            self.write_line(b'c\r\n.')
952            if mail_parameters == b'':
953                self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
954            else:
955                self.assertEqual(self.channel.socket.last,
956                                 b'250 SMTPUTF8 message okish\r\n')
957
958    def test_utf8_data(self):
959        self.write_line(b'EHLO example')
960        self.write_line(
961            'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8'))
962        self.assertEqual(self.channel.socket.last[0:3], b'250')
963        self.write_line('RCPT To:späm@examplé'.encode('utf-8'))
964        self.assertEqual(self.channel.socket.last[0:3], b'250')
965        self.write_line(b'DATA')
966        self.assertEqual(self.channel.socket.last[0:3], b'354')
967        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
968        self.write_line(b'.')
969        self.assertEqual(
970            self.channel.received_data,
971            b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
972
973    def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):
974        self.write_line(b'ehlo example')
975        fill_len = (512 + 26 + 10) - len('mail from:<@example>')
976        self.write_line(b'MAIL from:<' +
977                        b'a' * (fill_len + 1) +
978                        b'@example>')
979        self.assertEqual(self.channel.socket.last,
980                         b'500 Error: line too long\r\n')
981        self.write_line(b'MAIL from:<' +
982                        b'a' * fill_len +
983                        b'@example>')
984        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
985
986    def test_multiple_emails_with_extended_command_length(self):
987        self.write_line(b'ehlo example')
988        fill_len = (512 + 26 + 10) - len('mail from:<@example>')
989        for char in [b'a', b'b', b'c']:
990            self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')
991            self.assertEqual(self.channel.socket.last[0:3], b'500')
992            self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')
993            self.assertEqual(self.channel.socket.last[0:3], b'250')
994            self.write_line(b'rcpt to:<hans@example.com>')
995            self.assertEqual(self.channel.socket.last[0:3], b'250')
996            self.write_line(b'data')
997            self.assertEqual(self.channel.socket.last[0:3], b'354')
998            self.write_line(b'test\r\n.')
999            self.assertEqual(self.channel.socket.last[0:3], b'250')
1000
1001
1002class MiscTestCase(unittest.TestCase):
1003    def test__all__(self):
1004        blacklist = {
1005            "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE",
1006            "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs",
1007
1008        }
1009        support.check__all__(self, smtpd, blacklist=blacklist)
1010
1011
1012if __name__ == "__main__":
1013    unittest.main()
1014