1import unittest
2from mox3.mox import MoxTestBase, IsA
3from gevent.socket import socket
4from gevent.ssl import SSLContext, SSLSocket
5
6from slimta.smtp.auth import InvalidMechanismError
7from slimta.smtp.client import Client, LmtpClient
8from slimta.smtp.reply import Reply
9
10
11class TestSmtpClient(MoxTestBase, unittest.TestCase):
12
13    def setUp(self):
14        super(TestSmtpClient, self).setUp()
15        self.sock = self.mox.CreateMock(socket)
16        self.sock.fileno = lambda: -1
17        self.sock.getpeername = lambda: ('test', 0)
18        self.context = self.mox.CreateMock(SSLContext)
19        self.context.session_stats = lambda: {}
20
21    def test_get_reply(self):
22        self.sock.recv(IsA(int)).AndReturn(b'421 Test\r\n')
23        self.mox.ReplayAll()
24        client = Client(self.sock)
25        reply = client.get_reply(b'[TEST]')
26        self.assertEqual('421', reply.code)
27        self.assertEqual('4.0.0 Test', reply.message)
28        self.assertEqual(b'[TEST]', reply.command)
29
30    def test_get_banner(self):
31        self.sock.recv(IsA(int)).AndReturn(b'220 Go\r\n')
32        self.mox.ReplayAll()
33        client = Client(self.sock)
34        reply = client.get_banner()
35        self.assertEqual('220', reply.code)
36        self.assertEqual('Go', reply.message)
37        self.assertEqual(b'[BANNER]', reply.command)
38
39    def test_custom_command(self):
40        self.sock.sendall(b'cmd arg\r\n')
41        self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n')
42        self.mox.ReplayAll()
43        client = Client(self.sock)
44        reply = client.custom_command(b'cmd', b'arg')
45        self.assertEqual('250', reply.code)
46        self.assertEqual('2.0.0 Ok', reply.message)
47        self.assertEqual(b'CMD', reply.command)
48
49    def test_ehlo(self):
50        self.sock.sendall(b'EHLO there\r\n')
51        self.sock.recv(IsA(int)).AndReturn(b'250-Hello there\r\n250-TEST arg\r\n')
52        self.sock.recv(IsA(int)).AndReturn(b'250 EXTEN\r\n')
53        self.mox.ReplayAll()
54        client = Client(self.sock)
55        reply = client.ehlo('there')
56        self.assertEqual('250', reply.code)
57        self.assertEqual('Hello there', reply.message)
58        self.assertEqual(b'EHLO', reply.command)
59        self.assertTrue('TEST' in client.extensions)
60        self.assertTrue('EXTEN' in client.extensions)
61        self.assertEqual('arg', client.extensions.getparam('TEST'))
62
63    def test_helo(self):
64        self.sock.sendall(b'HELO there\r\n')
65        self.sock.recv(IsA(int)).AndReturn(b'250 Hello\r\n')
66        self.mox.ReplayAll()
67        client = Client(self.sock)
68        reply = client.helo('there')
69        self.assertEqual('250', reply.code)
70        self.assertEqual('Hello', reply.message)
71        self.assertEqual(b'HELO', reply.command)
72
73    def test_starttls(self):
74        sock = self.mox.CreateMock(SSLSocket)
75        sock.fileno = lambda: -1
76        sock.getpeername = lambda: ('test', 0)
77        sock.sendall(b'STARTTLS\r\n')
78        sock.recv(IsA(int)).AndReturn(b'220 Go ahead\r\n')
79        self.context.wrap_socket(sock, server_hostname='test').AndReturn(sock)
80        self.mox.ReplayAll()
81        client = Client(sock)
82        reply = client.starttls(self.context)
83        self.assertEqual('220', reply.code)
84        self.assertEqual('2.0.0 Go ahead', reply.message)
85        self.assertEqual(b'STARTTLS', reply.command)
86
87    def test_starttls_noencrypt(self):
88        self.sock.sendall(b'STARTTLS\r\n')
89        self.sock.recv(IsA(int)).AndReturn(b'420 Nope\r\n')
90        self.mox.ReplayAll()
91        client = Client(self.sock)
92        reply = client.starttls({})
93        self.assertEqual('420', reply.code)
94        self.assertEqual('4.0.0 Nope', reply.message)
95        self.assertEqual(b'STARTTLS', reply.command)
96
97    def test_auth(self):
98        sock = self.mox.CreateMock(SSLSocket)
99        sock.fileno = lambda: -1
100        sock.getpeername = lambda: ('test', 0)
101        sock.sendall(b'AUTH PLAIN AHRlc3RAZXhhbXBsZS5jb20AYXNkZg==\r\n')
102        sock.recv(IsA(int)).AndReturn(b'235 Ok\r\n')
103        self.mox.ReplayAll()
104        client = Client(sock)
105        client.extensions.add('AUTH', 'PLAIN')
106        reply = client.auth('test@example.com', 'asdf')
107        self.assertEqual('235', reply.code)
108        self.assertEqual('2.0.0 Ok', reply.message)
109        self.assertEqual(b'AUTH', reply.command)
110
111    def test_auth_insecure(self):
112        self.mox.ReplayAll()
113        client = Client(self.sock)
114        client.extensions.add('AUTH', 'PLAIN')
115        self.assertRaises(InvalidMechanismError, client.auth,
116                          'test@example.com', 'asdf')
117
118    def test_auth_force_mechanism(self):
119        self.sock.sendall(b'AUTH PLAIN AHRlc3RAZXhhbXBsZS5jb20AYXNkZg==\r\n')
120        self.sock.recv(IsA(int)).AndReturn(b'535 Nope!\r\n')
121        self.mox.ReplayAll()
122        client = Client(self.sock)
123        client.extensions.add('AUTH', 'PLAIN')
124        reply = client.auth('test@example.com', 'asdf', mechanism=b'PLAIN')
125        self.assertEqual('535', reply.code)
126        self.assertEqual('5.0.0 Nope!', reply.message)
127        self.assertEqual(b'AUTH', reply.command)
128
129    def test_mailfrom(self):
130        self.sock.sendall(b'MAIL FROM:<test>\r\n')
131        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n')
132        self.mox.ReplayAll()
133        client = Client(self.sock)
134        reply = client.mailfrom('test')
135        self.assertEqual('250', reply.code)
136        self.assertEqual('2.0.0 Ok', reply.message)
137        self.assertEqual(b'MAIL', reply.command)
138
139    def test_mailfrom_pipelining(self):
140        self.sock.sendall(b'MAIL FROM:<test>\r\n')
141        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n')
142        self.mox.ReplayAll()
143        client = Client(self.sock)
144        client.extensions.add('PIPELINING')
145        reply = client.mailfrom('test')
146        self.assertEqual(None, reply.code)
147        self.assertEqual(None, reply.message)
148        self.assertEqual(b'MAIL', reply.command)
149        client._flush_pipeline()
150        self.assertEqual('250', reply.code)
151        self.assertEqual('2.0.0 Ok', reply.message)
152
153    def test_mailfrom_size(self):
154        self.sock.sendall(b'MAIL FROM:<test> SIZE=10\r\n')
155        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n')
156        self.mox.ReplayAll()
157        client = Client(self.sock)
158        client.extensions.add('SIZE', 100)
159        client.mailfrom('test', data_size=10)
160
161    def test_mailfrom_auth(self):
162        self.sock.sendall(b'MAIL FROM:<test> AUTH=<>\r\n')
163        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n')
164        self.sock.sendall(b'MAIL FROM:<test> AUTH=1+2B1+3D2\r\n')
165        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n')
166        self.mox.ReplayAll()
167        client = Client(self.sock)
168        client.extensions.add('AUTH', True)
169        client.mailfrom('test', auth=False)
170        client.mailfrom('test', auth='1+1=2')
171
172    def test_rcptto(self):
173        self.sock.sendall(b'RCPT TO:<test>\r\n')
174        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n')
175        self.mox.ReplayAll()
176        client = Client(self.sock)
177        reply = client.rcptto('test')
178        self.assertEqual('250', reply.code)
179        self.assertEqual('2.0.0 Ok', reply.message)
180        self.assertEqual(b'RCPT', reply.command)
181
182    def test_rcptto_pipelining(self):
183        self.sock.sendall(b'RCPT TO:<test>\r\n')
184        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n')
185        self.mox.ReplayAll()
186        client = Client(self.sock)
187        client.extensions.add('PIPELINING')
188        reply = client.rcptto('test')
189        self.assertEqual(None, reply.code)
190        self.assertEqual(None, reply.message)
191        self.assertEqual(b'RCPT', reply.command)
192        client._flush_pipeline()
193        self.assertEqual('250', reply.code)
194        self.assertEqual('2.0.0 Ok', reply.message)
195
196    def test_data(self):
197        self.sock.sendall(b'DATA\r\n')
198        self.sock.recv(IsA(int)).AndReturn(b'354 Go ahead\r\n')
199        self.mox.ReplayAll()
200        client = Client(self.sock)
201        reply = client.data()
202        self.assertEqual('354', reply.code)
203        self.assertEqual('Go ahead', reply.message)
204        self.assertEqual(b'DATA', reply.command)
205
206    def test_send_empty_data(self):
207        self.sock.sendall(b'.\r\n')
208        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Done\r\n')
209        self.mox.ReplayAll()
210        client = Client(self.sock)
211        reply = client.send_empty_data()
212        self.assertEqual('250', reply.code)
213        self.assertEqual('2.0.0 Done', reply.message)
214        self.assertEqual(b'[SEND_DATA]', reply.command)
215
216    def test_send_data(self):
217        self.sock.sendall(b'One\r\nTwo\r\n..Three\r\n.\r\n')
218        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Done\r\n')
219        self.mox.ReplayAll()
220        client = Client(self.sock)
221        reply = client.send_data(b'One\r\nTwo\r\n.Three')
222        self.assertEqual('250', reply.code)
223        self.assertEqual('2.0.0 Done', reply.message)
224        self.assertEqual(b'[SEND_DATA]', reply.command)
225
226    def test_rset(self):
227        self.sock.sendall(b'RSET\r\n')
228        self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n')
229        self.mox.ReplayAll()
230        client = Client(self.sock)
231        reply = client.rset()
232        self.assertEqual('250', reply.code)
233        self.assertEqual('2.0.0 Ok', reply.message)
234        self.assertEqual(b'RSET', reply.command)
235
236    def test_quit(self):
237        self.sock.sendall(b'QUIT\r\n')
238        self.sock.recv(IsA(int)).AndReturn(b'221 Bye\r\n')
239        self.mox.ReplayAll()
240        client = Client(self.sock)
241        reply = client.quit()
242        self.assertEqual('221', reply.code)
243        self.assertEqual('2.0.0 Bye', reply.message)
244        self.assertEqual(b'QUIT', reply.command)
245
246
247class TestLmtpClient(MoxTestBase, unittest.TestCase):
248
249    def setUp(self):
250        super(TestLmtpClient, self).setUp()
251        self.sock = self.mox.CreateMock(socket)
252        self.sock.fileno = lambda: -1
253        self.sock.getpeername = lambda: ('test', 0)
254
255    def test_ehlo_invalid(self):
256        client = LmtpClient(self.sock)
257        self.assertRaises(NotImplementedError, client.ehlo, 'there')
258
259    def test_helo_invalid(self):
260        client = LmtpClient(self.sock)
261        self.assertRaises(NotImplementedError, client.helo, 'there')
262
263    def test_lhlo(self):
264        self.sock.sendall(b'LHLO there\r\n')
265        self.sock.recv(IsA(int)).AndReturn(b'250-Hello there\r\n250-TEST arg\r\n')
266        self.sock.recv(IsA(int)).AndReturn(b'250 EXTEN\r\n')
267        self.mox.ReplayAll()
268        client = LmtpClient(self.sock)
269        reply = client.lhlo('there')
270        self.assertEqual('250', reply.code)
271        self.assertEqual('Hello there', reply.message)
272        self.assertEqual(b'LHLO', reply.command)
273        self.assertTrue('TEST' in client.extensions)
274        self.assertTrue('EXTEN' in client.extensions)
275        self.assertEqual('arg', client.extensions.getparam('TEST'))
276
277    def test_rcptto(self):
278        self.sock.sendall(b'RCPT TO:<test>\r\n')
279        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n')
280        self.mox.ReplayAll()
281        client = LmtpClient(self.sock)
282        reply = client.rcptto('test')
283        self.assertEqual('250', reply.code)
284        self.assertEqual('2.0.0 Ok', reply.message)
285        self.assertEqual(b'RCPT', reply.command)
286        self.assertEqual([('test', reply)], client.rcpttos)
287
288    def test_rset(self):
289        self.sock.sendall(b'RSET\r\n')
290        self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n')
291        self.mox.ReplayAll()
292        client = LmtpClient(self.sock)
293        client.rcpttos = 'testing'
294        reply = client.rset()
295        self.assertEqual('250', reply.code)
296        self.assertEqual('2.0.0 Ok', reply.message)
297        self.assertEqual(b'RSET', reply.command)
298        self.assertEqual([], client.rcpttos)
299
300    def test_send_data(self):
301        self.sock.sendall(b'One\r\nTwo\r\n..Three\r\n.\r\n')
302        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n'
303                                           b'550 5.0.0 Not Ok\r\n')
304        self.mox.ReplayAll()
305        client = LmtpClient(self.sock)
306        client.rcpttos = [('test1', Reply('250')),
307                          ('test2', Reply('250')),
308                          ('test3', Reply('550'))]
309        replies = client.send_data(b'One\r\nTwo\r\n.Three')
310        self.assertEqual(2, len(replies))
311        self.assertEqual('test1', replies[0][0])
312        self.assertEqual('250', replies[0][1].code)
313        self.assertEqual('2.0.0 Ok', replies[0][1].message)
314        self.assertEqual(b'[SEND_DATA]', replies[0][1].command)
315        self.assertEqual('test2', replies[1][0])
316        self.assertEqual('550', replies[1][1].code)
317        self.assertEqual('5.0.0 Not Ok', replies[1][1].message)
318        self.assertEqual(b'[SEND_DATA]', replies[1][1].command)
319
320    def test_send_empty_data(self):
321        self.sock.sendall(b'.\r\n')
322        self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n'
323                                           b'550 5.0.0 Not Ok\r\n')
324        self.mox.ReplayAll()
325        client = LmtpClient(self.sock)
326        client.rcpttos = [('test1', Reply('250')),
327                          ('test2', Reply('250')),
328                          ('test3', Reply('550'))]
329        replies = client.send_empty_data()
330        self.assertEqual(2, len(replies))
331        self.assertEqual('test1', replies[0][0])
332        self.assertEqual('250', replies[0][1].code)
333        self.assertEqual('2.0.0 Ok', replies[0][1].message)
334        self.assertEqual(b'[SEND_DATA]', replies[0][1].command)
335        self.assertEqual('test2', replies[1][0])
336        self.assertEqual('550', replies[1][1].code)
337        self.assertEqual('5.0.0 Not Ok', replies[1][1].message)
338        self.assertEqual(b'[SEND_DATA]', replies[1][1].command)
339
340
341# vim:et:fdm=marker:sts=4:sw=4:ts=4
342