1 2import unittest 3from mox3.mox import MoxTestBase, IsA 4 5from slimta.queue.proxy import ProxyQueue 6from slimta.smtp.reply import Reply 7from slimta.relay import Relay, TransientRelayError, PermanentRelayError 8from slimta.envelope import Envelope 9 10 11class TestProxyQueue(MoxTestBase, unittest.TestCase): 12 13 def setUp(self): 14 super(TestProxyQueue, self).setUp() 15 self.relay = self.mox.CreateMock(Relay) 16 self.env = Envelope('sender@example.com', ['rcpt@example.com']) 17 18 def test_enqueue(self): 19 self.relay._attempt(self.env, 0) 20 self.mox.ReplayAll() 21 q = ProxyQueue(self.relay) 22 ret = q.enqueue(self.env) 23 self.assertEqual(1, len(ret)) 24 self.assertEqual(2, len(ret[0])) 25 self.assertEqual(self.env, ret[0][0]) 26 self.assertRegexpMatches(ret[0][1], r'[0-9a-fA-F]{32}') 27 28 def test_enqueue_relayerror(self): 29 err = PermanentRelayError('msg failure', Reply('550', 'Not Ok')) 30 self.relay._attempt(self.env, 0).AndRaise(err) 31 self.mox.ReplayAll() 32 q = ProxyQueue(self.relay) 33 ret = q.enqueue(self.env) 34 self.assertEqual(1, len(ret)) 35 self.assertEqual(2, len(ret[0])) 36 self.assertEqual(self.env, ret[0][0]) 37 self.assertEqual(err, ret[0][1]) 38 39 def test_start_noop(self): 40 self.mox.ReplayAll() 41 q = ProxyQueue(self.relay) 42 q.start() 43 44 def test_kill_noop(self): 45 self.mox.ReplayAll() 46 q = ProxyQueue(self.relay) 47 q.kill() 48 49 def test_flush_noop(self): 50 self.mox.ReplayAll() 51 q = ProxyQueue(self.relay) 52 q.flush() 53 54 def test_add_policy_error(self): 55 self.mox.ReplayAll() 56 q = ProxyQueue(self.relay) 57 with self.assertRaises(NotImplementedError): 58 q.add_policy('test') 59 60 61# vim:et:fdm=marker:sts=4:sw=4:ts=4 62