1import unittest 2from mox3.mox import MoxTestBase, IsA 3from gevent.socket import socket 4from gevent.ssl import SSLContext, SSLSocket 5 6from slimta.smtp.auth import InvalidMechanismError 7from slimta.smtp.client import Client, LmtpClient 8from slimta.smtp.reply import Reply 9 10 11class TestSmtpClient(MoxTestBase, unittest.TestCase): 12 13 def setUp(self): 14 super(TestSmtpClient, self).setUp() 15 self.sock = self.mox.CreateMock(socket) 16 self.sock.fileno = lambda: -1 17 self.sock.getpeername = lambda: ('test', 0) 18 self.context = self.mox.CreateMock(SSLContext) 19 self.context.session_stats = lambda: {} 20 21 def test_get_reply(self): 22 self.sock.recv(IsA(int)).AndReturn(b'421 Test\r\n') 23 self.mox.ReplayAll() 24 client = Client(self.sock) 25 reply = client.get_reply(b'[TEST]') 26 self.assertEqual('421', reply.code) 27 self.assertEqual('4.0.0 Test', reply.message) 28 self.assertEqual(b'[TEST]', reply.command) 29 30 def test_get_banner(self): 31 self.sock.recv(IsA(int)).AndReturn(b'220 Go\r\n') 32 self.mox.ReplayAll() 33 client = Client(self.sock) 34 reply = client.get_banner() 35 self.assertEqual('220', reply.code) 36 self.assertEqual('Go', reply.message) 37 self.assertEqual(b'[BANNER]', reply.command) 38 39 def test_custom_command(self): 40 self.sock.sendall(b'cmd arg\r\n') 41 self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n') 42 self.mox.ReplayAll() 43 client = Client(self.sock) 44 reply = client.custom_command(b'cmd', b'arg') 45 self.assertEqual('250', reply.code) 46 self.assertEqual('2.0.0 Ok', reply.message) 47 self.assertEqual(b'CMD', reply.command) 48 49 def test_ehlo(self): 50 self.sock.sendall(b'EHLO there\r\n') 51 self.sock.recv(IsA(int)).AndReturn(b'250-Hello there\r\n250-TEST arg\r\n') 52 self.sock.recv(IsA(int)).AndReturn(b'250 EXTEN\r\n') 53 self.mox.ReplayAll() 54 client = Client(self.sock) 55 reply = client.ehlo('there') 56 self.assertEqual('250', reply.code) 57 self.assertEqual('Hello there', reply.message) 58 self.assertEqual(b'EHLO', reply.command) 59 self.assertTrue('TEST' in client.extensions) 60 self.assertTrue('EXTEN' in client.extensions) 61 self.assertEqual('arg', client.extensions.getparam('TEST')) 62 63 def test_helo(self): 64 self.sock.sendall(b'HELO there\r\n') 65 self.sock.recv(IsA(int)).AndReturn(b'250 Hello\r\n') 66 self.mox.ReplayAll() 67 client = Client(self.sock) 68 reply = client.helo('there') 69 self.assertEqual('250', reply.code) 70 self.assertEqual('Hello', reply.message) 71 self.assertEqual(b'HELO', reply.command) 72 73 def test_starttls(self): 74 sock = self.mox.CreateMock(SSLSocket) 75 sock.fileno = lambda: -1 76 sock.getpeername = lambda: ('test', 0) 77 sock.sendall(b'STARTTLS\r\n') 78 sock.recv(IsA(int)).AndReturn(b'220 Go ahead\r\n') 79 self.context.wrap_socket(sock, server_hostname='test').AndReturn(sock) 80 self.mox.ReplayAll() 81 client = Client(sock) 82 reply = client.starttls(self.context) 83 self.assertEqual('220', reply.code) 84 self.assertEqual('2.0.0 Go ahead', reply.message) 85 self.assertEqual(b'STARTTLS', reply.command) 86 87 def test_starttls_noencrypt(self): 88 self.sock.sendall(b'STARTTLS\r\n') 89 self.sock.recv(IsA(int)).AndReturn(b'420 Nope\r\n') 90 self.mox.ReplayAll() 91 client = Client(self.sock) 92 reply = client.starttls({}) 93 self.assertEqual('420', reply.code) 94 self.assertEqual('4.0.0 Nope', reply.message) 95 self.assertEqual(b'STARTTLS', reply.command) 96 97 def test_auth(self): 98 sock = self.mox.CreateMock(SSLSocket) 99 sock.fileno = lambda: -1 100 sock.getpeername = lambda: ('test', 0) 101 sock.sendall(b'AUTH PLAIN AHRlc3RAZXhhbXBsZS5jb20AYXNkZg==\r\n') 102 sock.recv(IsA(int)).AndReturn(b'235 Ok\r\n') 103 self.mox.ReplayAll() 104 client = Client(sock) 105 client.extensions.add('AUTH', 'PLAIN') 106 reply = client.auth('test@example.com', 'asdf') 107 self.assertEqual('235', reply.code) 108 self.assertEqual('2.0.0 Ok', reply.message) 109 self.assertEqual(b'AUTH', reply.command) 110 111 def test_auth_insecure(self): 112 self.mox.ReplayAll() 113 client = Client(self.sock) 114 client.extensions.add('AUTH', 'PLAIN') 115 self.assertRaises(InvalidMechanismError, client.auth, 116 'test@example.com', 'asdf') 117 118 def test_auth_force_mechanism(self): 119 self.sock.sendall(b'AUTH PLAIN AHRlc3RAZXhhbXBsZS5jb20AYXNkZg==\r\n') 120 self.sock.recv(IsA(int)).AndReturn(b'535 Nope!\r\n') 121 self.mox.ReplayAll() 122 client = Client(self.sock) 123 client.extensions.add('AUTH', 'PLAIN') 124 reply = client.auth('test@example.com', 'asdf', mechanism=b'PLAIN') 125 self.assertEqual('535', reply.code) 126 self.assertEqual('5.0.0 Nope!', reply.message) 127 self.assertEqual(b'AUTH', reply.command) 128 129 def test_mailfrom(self): 130 self.sock.sendall(b'MAIL FROM:<test>\r\n') 131 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n') 132 self.mox.ReplayAll() 133 client = Client(self.sock) 134 reply = client.mailfrom('test') 135 self.assertEqual('250', reply.code) 136 self.assertEqual('2.0.0 Ok', reply.message) 137 self.assertEqual(b'MAIL', reply.command) 138 139 def test_mailfrom_pipelining(self): 140 self.sock.sendall(b'MAIL FROM:<test>\r\n') 141 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n') 142 self.mox.ReplayAll() 143 client = Client(self.sock) 144 client.extensions.add('PIPELINING') 145 reply = client.mailfrom('test') 146 self.assertEqual(None, reply.code) 147 self.assertEqual(None, reply.message) 148 self.assertEqual(b'MAIL', reply.command) 149 client._flush_pipeline() 150 self.assertEqual('250', reply.code) 151 self.assertEqual('2.0.0 Ok', reply.message) 152 153 def test_mailfrom_size(self): 154 self.sock.sendall(b'MAIL FROM:<test> SIZE=10\r\n') 155 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n') 156 self.mox.ReplayAll() 157 client = Client(self.sock) 158 client.extensions.add('SIZE', 100) 159 client.mailfrom('test', data_size=10) 160 161 def test_mailfrom_auth(self): 162 self.sock.sendall(b'MAIL FROM:<test> AUTH=<>\r\n') 163 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n') 164 self.sock.sendall(b'MAIL FROM:<test> AUTH=1+2B1+3D2\r\n') 165 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n') 166 self.mox.ReplayAll() 167 client = Client(self.sock) 168 client.extensions.add('AUTH', True) 169 client.mailfrom('test', auth=False) 170 client.mailfrom('test', auth='1+1=2') 171 172 def test_rcptto(self): 173 self.sock.sendall(b'RCPT TO:<test>\r\n') 174 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n') 175 self.mox.ReplayAll() 176 client = Client(self.sock) 177 reply = client.rcptto('test') 178 self.assertEqual('250', reply.code) 179 self.assertEqual('2.0.0 Ok', reply.message) 180 self.assertEqual(b'RCPT', reply.command) 181 182 def test_rcptto_pipelining(self): 183 self.sock.sendall(b'RCPT TO:<test>\r\n') 184 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n') 185 self.mox.ReplayAll() 186 client = Client(self.sock) 187 client.extensions.add('PIPELINING') 188 reply = client.rcptto('test') 189 self.assertEqual(None, reply.code) 190 self.assertEqual(None, reply.message) 191 self.assertEqual(b'RCPT', reply.command) 192 client._flush_pipeline() 193 self.assertEqual('250', reply.code) 194 self.assertEqual('2.0.0 Ok', reply.message) 195 196 def test_data(self): 197 self.sock.sendall(b'DATA\r\n') 198 self.sock.recv(IsA(int)).AndReturn(b'354 Go ahead\r\n') 199 self.mox.ReplayAll() 200 client = Client(self.sock) 201 reply = client.data() 202 self.assertEqual('354', reply.code) 203 self.assertEqual('Go ahead', reply.message) 204 self.assertEqual(b'DATA', reply.command) 205 206 def test_send_empty_data(self): 207 self.sock.sendall(b'.\r\n') 208 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Done\r\n') 209 self.mox.ReplayAll() 210 client = Client(self.sock) 211 reply = client.send_empty_data() 212 self.assertEqual('250', reply.code) 213 self.assertEqual('2.0.0 Done', reply.message) 214 self.assertEqual(b'[SEND_DATA]', reply.command) 215 216 def test_send_data(self): 217 self.sock.sendall(b'One\r\nTwo\r\n..Three\r\n.\r\n') 218 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Done\r\n') 219 self.mox.ReplayAll() 220 client = Client(self.sock) 221 reply = client.send_data(b'One\r\nTwo\r\n.Three') 222 self.assertEqual('250', reply.code) 223 self.assertEqual('2.0.0 Done', reply.message) 224 self.assertEqual(b'[SEND_DATA]', reply.command) 225 226 def test_rset(self): 227 self.sock.sendall(b'RSET\r\n') 228 self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n') 229 self.mox.ReplayAll() 230 client = Client(self.sock) 231 reply = client.rset() 232 self.assertEqual('250', reply.code) 233 self.assertEqual('2.0.0 Ok', reply.message) 234 self.assertEqual(b'RSET', reply.command) 235 236 def test_quit(self): 237 self.sock.sendall(b'QUIT\r\n') 238 self.sock.recv(IsA(int)).AndReturn(b'221 Bye\r\n') 239 self.mox.ReplayAll() 240 client = Client(self.sock) 241 reply = client.quit() 242 self.assertEqual('221', reply.code) 243 self.assertEqual('2.0.0 Bye', reply.message) 244 self.assertEqual(b'QUIT', reply.command) 245 246 247class TestLmtpClient(MoxTestBase, unittest.TestCase): 248 249 def setUp(self): 250 super(TestLmtpClient, self).setUp() 251 self.sock = self.mox.CreateMock(socket) 252 self.sock.fileno = lambda: -1 253 self.sock.getpeername = lambda: ('test', 0) 254 255 def test_ehlo_invalid(self): 256 client = LmtpClient(self.sock) 257 self.assertRaises(NotImplementedError, client.ehlo, 'there') 258 259 def test_helo_invalid(self): 260 client = LmtpClient(self.sock) 261 self.assertRaises(NotImplementedError, client.helo, 'there') 262 263 def test_lhlo(self): 264 self.sock.sendall(b'LHLO there\r\n') 265 self.sock.recv(IsA(int)).AndReturn(b'250-Hello there\r\n250-TEST arg\r\n') 266 self.sock.recv(IsA(int)).AndReturn(b'250 EXTEN\r\n') 267 self.mox.ReplayAll() 268 client = LmtpClient(self.sock) 269 reply = client.lhlo('there') 270 self.assertEqual('250', reply.code) 271 self.assertEqual('Hello there', reply.message) 272 self.assertEqual(b'LHLO', reply.command) 273 self.assertTrue('TEST' in client.extensions) 274 self.assertTrue('EXTEN' in client.extensions) 275 self.assertEqual('arg', client.extensions.getparam('TEST')) 276 277 def test_rcptto(self): 278 self.sock.sendall(b'RCPT TO:<test>\r\n') 279 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n') 280 self.mox.ReplayAll() 281 client = LmtpClient(self.sock) 282 reply = client.rcptto('test') 283 self.assertEqual('250', reply.code) 284 self.assertEqual('2.0.0 Ok', reply.message) 285 self.assertEqual(b'RCPT', reply.command) 286 self.assertEqual([('test', reply)], client.rcpttos) 287 288 def test_rset(self): 289 self.sock.sendall(b'RSET\r\n') 290 self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n') 291 self.mox.ReplayAll() 292 client = LmtpClient(self.sock) 293 client.rcpttos = 'testing' 294 reply = client.rset() 295 self.assertEqual('250', reply.code) 296 self.assertEqual('2.0.0 Ok', reply.message) 297 self.assertEqual(b'RSET', reply.command) 298 self.assertEqual([], client.rcpttos) 299 300 def test_send_data(self): 301 self.sock.sendall(b'One\r\nTwo\r\n..Three\r\n.\r\n') 302 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n' 303 b'550 5.0.0 Not Ok\r\n') 304 self.mox.ReplayAll() 305 client = LmtpClient(self.sock) 306 client.rcpttos = [('test1', Reply('250')), 307 ('test2', Reply('250')), 308 ('test3', Reply('550'))] 309 replies = client.send_data(b'One\r\nTwo\r\n.Three') 310 self.assertEqual(2, len(replies)) 311 self.assertEqual('test1', replies[0][0]) 312 self.assertEqual('250', replies[0][1].code) 313 self.assertEqual('2.0.0 Ok', replies[0][1].message) 314 self.assertEqual(b'[SEND_DATA]', replies[0][1].command) 315 self.assertEqual('test2', replies[1][0]) 316 self.assertEqual('550', replies[1][1].code) 317 self.assertEqual('5.0.0 Not Ok', replies[1][1].message) 318 self.assertEqual(b'[SEND_DATA]', replies[1][1].command) 319 320 def test_send_empty_data(self): 321 self.sock.sendall(b'.\r\n') 322 self.sock.recv(IsA(int)).AndReturn(b'250 2.0.0 Ok\r\n' 323 b'550 5.0.0 Not Ok\r\n') 324 self.mox.ReplayAll() 325 client = LmtpClient(self.sock) 326 client.rcpttos = [('test1', Reply('250')), 327 ('test2', Reply('250')), 328 ('test3', Reply('550'))] 329 replies = client.send_empty_data() 330 self.assertEqual(2, len(replies)) 331 self.assertEqual('test1', replies[0][0]) 332 self.assertEqual('250', replies[0][1].code) 333 self.assertEqual('2.0.0 Ok', replies[0][1].message) 334 self.assertEqual(b'[SEND_DATA]', replies[0][1].command) 335 self.assertEqual('test2', replies[1][0]) 336 self.assertEqual('550', replies[1][1].code) 337 self.assertEqual('5.0.0 Not Ok', replies[1][1].message) 338 self.assertEqual(b'[SEND_DATA]', replies[1][1].command) 339 340 341# vim:et:fdm=marker:sts=4:sw=4:ts=4 342