1from __future__ import absolute_import, unicode_literals
2
3import errno
4import socket
5
6import pytest
7from case import Mock, patch, skip
8
9from celery.contrib.rdb import Rdb, debugger, set_trace
10from celery.five import WhateverIO
11
12
13class SockErr(socket.error):
14    errno = None
15
16
17class test_Rdb:
18
19    @patch('celery.contrib.rdb.Rdb')
20    def test_debugger(self, Rdb):
21        x = debugger()
22        assert x
23        assert x is debugger()
24
25    @patch('celery.contrib.rdb.debugger')
26    @patch('celery.contrib.rdb._frame')
27    def test_set_trace(self, _frame, debugger):
28        assert set_trace(Mock())
29        assert set_trace()
30        debugger.return_value.set_trace.assert_called()
31
32    @patch('celery.contrib.rdb.Rdb.get_avail_port')
33    @skip.if_pypy()
34    def test_rdb(self, get_avail_port):
35        sock = Mock()
36        get_avail_port.return_value = (sock, 8000)
37        sock.accept.return_value = (Mock(), ['helu'])
38        out = WhateverIO()
39        with Rdb(out=out) as rdb:
40            get_avail_port.assert_called()
41            assert 'helu' in out.getvalue()
42
43            # set_quit
44            with patch('sys.settrace') as settrace:
45                rdb.set_quit()
46                settrace.assert_called_with(None)
47
48            # set_trace
49            with patch('celery.contrib.rdb.Pdb.set_trace') as pset:
50                with patch('celery.contrib.rdb._frame'):
51                    rdb.set_trace()
52                    rdb.set_trace(Mock())
53                    pset.side_effect = SockErr
54                    pset.side_effect.errno = errno.ENOENT
55                    with pytest.raises(SockErr):
56                        rdb.set_trace()
57
58            # _close_session
59            rdb._close_session()
60            rdb.active = True
61            rdb._handle = None
62            rdb._client = None
63            rdb._sock = None
64            rdb._close_session()
65
66            # do_continue
67            rdb.set_continue = Mock()
68            rdb.do_continue(Mock())
69            rdb.set_continue.assert_called_with()
70
71            # do_quit
72            rdb.set_quit = Mock()
73            rdb.do_quit(Mock())
74            rdb.set_quit.assert_called_with()
75
76    @patch('socket.socket')
77    @skip.if_pypy()
78    def test_get_avail_port(self, sock):
79        out = WhateverIO()
80        sock.return_value.accept.return_value = (Mock(), ['helu'])
81        with Rdb(out=out):
82            pass
83
84        with patch('celery.contrib.rdb.current_process') as curproc:
85            curproc.return_value.name = 'PoolWorker-10'
86            with Rdb(out=out):
87                pass
88
89        err = sock.return_value.bind.side_effect = SockErr()
90        err.errno = errno.ENOENT
91        with pytest.raises(SockErr):
92            with Rdb(out=out):
93                pass
94        err.errno = errno.EADDRINUSE
95        with pytest.raises(Exception):
96            with Rdb(out=out):
97                pass
98        called = [0]
99
100        def effect(*a, **kw):
101            try:
102                if called[0] > 50:
103                    return True
104                raise err
105            finally:
106                called[0] += 1
107        sock.return_value.bind.side_effect = effect
108        with Rdb(out=out):
109            pass
110