1
2import unittest
3from mox3.mox import MoxTestBase, IsA
4
5from slimta.queue.proxy import ProxyQueue
6from slimta.smtp.reply import Reply
7from slimta.relay import Relay, TransientRelayError, PermanentRelayError
8from slimta.envelope import Envelope
9
10
11class TestProxyQueue(MoxTestBase, unittest.TestCase):
12
13    def setUp(self):
14        super(TestProxyQueue, self).setUp()
15        self.relay = self.mox.CreateMock(Relay)
16        self.env = Envelope('sender@example.com', ['rcpt@example.com'])
17
18    def test_enqueue(self):
19        self.relay._attempt(self.env, 0)
20        self.mox.ReplayAll()
21        q = ProxyQueue(self.relay)
22        ret = q.enqueue(self.env)
23        self.assertEqual(1, len(ret))
24        self.assertEqual(2, len(ret[0]))
25        self.assertEqual(self.env, ret[0][0])
26        self.assertRegexpMatches(ret[0][1], r'[0-9a-fA-F]{32}')
27
28    def test_enqueue_relayerror(self):
29        err = PermanentRelayError('msg failure', Reply('550', 'Not Ok'))
30        self.relay._attempt(self.env, 0).AndRaise(err)
31        self.mox.ReplayAll()
32        q = ProxyQueue(self.relay)
33        ret = q.enqueue(self.env)
34        self.assertEqual(1, len(ret))
35        self.assertEqual(2, len(ret[0]))
36        self.assertEqual(self.env, ret[0][0])
37        self.assertEqual(err, ret[0][1])
38
39    def test_start_noop(self):
40        self.mox.ReplayAll()
41        q = ProxyQueue(self.relay)
42        q.start()
43
44    def test_kill_noop(self):
45        self.mox.ReplayAll()
46        q = ProxyQueue(self.relay)
47        q.kill()
48
49    def test_flush_noop(self):
50        self.mox.ReplayAll()
51        q = ProxyQueue(self.relay)
52        q.flush()
53
54    def test_add_policy_error(self):
55        self.mox.ReplayAll()
56        q = ProxyQueue(self.relay)
57        with self.assertRaises(NotImplementedError):
58            q.add_policy('test')
59
60
61# vim:et:fdm=marker:sts=4:sw=4:ts=4
62