1# test asynchat
2
3from test import support
4from test.support import socket_helper
5from test.support import threading_helper
6
7import errno
8import socket
9import sys
10import threading
11import time
12import unittest
13import unittest.mock
14
15import warnings
16with warnings.catch_warnings():
17    warnings.simplefilter('ignore', DeprecationWarning)
18    import asynchat
19    import asyncore
20
21HOST = socket_helper.HOST
22SERVER_QUIT = b'QUIT\n'
23
24
25class echo_server(threading.Thread):
26    # parameter to determine the number of bytes passed back to the
27    # client each send
28    chunk_size = 1
29
30    def __init__(self, event):
31        threading.Thread.__init__(self)
32        self.event = event
33        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
34        self.port = socket_helper.bind_port(self.sock)
35        # This will be set if the client wants us to wait before echoing
36        # data back.
37        self.start_resend_event = None
38
39    def run(self):
40        self.sock.listen()
41        self.event.set()
42        conn, client = self.sock.accept()
43        self.buffer = b""
44        # collect data until quit message is seen
45        while SERVER_QUIT not in self.buffer:
46            data = conn.recv(1)
47            if not data:
48                break
49            self.buffer = self.buffer + data
50
51        # remove the SERVER_QUIT message
52        self.buffer = self.buffer.replace(SERVER_QUIT, b'')
53
54        if self.start_resend_event:
55            self.start_resend_event.wait()
56
57        # re-send entire set of collected data
58        try:
59            # this may fail on some tests, such as test_close_when_done,
60            # since the client closes the channel when it's done sending
61            while self.buffer:
62                n = conn.send(self.buffer[:self.chunk_size])
63                time.sleep(0.001)
64                self.buffer = self.buffer[n:]
65        except:
66            pass
67
68        conn.close()
69        self.sock.close()
70
71class echo_client(asynchat.async_chat):
72
73    def __init__(self, terminator, server_port):
74        asynchat.async_chat.__init__(self)
75        self.contents = []
76        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
77        self.connect((HOST, server_port))
78        self.set_terminator(terminator)
79        self.buffer = b""
80
81    def handle_connect(self):
82        pass
83
84    if sys.platform == 'darwin':
85        # select.poll returns a select.POLLHUP at the end of the tests
86        # on darwin, so just ignore it
87        def handle_expt(self):
88            pass
89
90    def collect_incoming_data(self, data):
91        self.buffer += data
92
93    def found_terminator(self):
94        self.contents.append(self.buffer)
95        self.buffer = b""
96
97def start_echo_server():
98    event = threading.Event()
99    s = echo_server(event)
100    s.start()
101    event.wait()
102    event.clear()
103    time.sleep(0.01)   # Give server time to start accepting.
104    return s, event
105
106
107class TestAsynchat(unittest.TestCase):
108    usepoll = False
109
110    def setUp(self):
111        self._threads = threading_helper.threading_setup()
112
113    def tearDown(self):
114        threading_helper.threading_cleanup(*self._threads)
115
116    def line_terminator_check(self, term, server_chunk):
117        event = threading.Event()
118        s = echo_server(event)
119        s.chunk_size = server_chunk
120        s.start()
121        event.wait()
122        event.clear()
123        time.sleep(0.01)   # Give server time to start accepting.
124        c = echo_client(term, s.port)
125        c.push(b"hello ")
126        c.push(b"world" + term)
127        c.push(b"I'm not dead yet!" + term)
128        c.push(SERVER_QUIT)
129        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
130        threading_helper.join_thread(s)
131
132        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
133
134    # the line terminator tests below check receiving variously-sized
135    # chunks back from the server in order to exercise all branches of
136    # async_chat.handle_read
137
138    def test_line_terminator1(self):
139        # test one-character terminator
140        for l in (1, 2, 3):
141            self.line_terminator_check(b'\n', l)
142
143    def test_line_terminator2(self):
144        # test two-character terminator
145        for l in (1, 2, 3):
146            self.line_terminator_check(b'\r\n', l)
147
148    def test_line_terminator3(self):
149        # test three-character terminator
150        for l in (1, 2, 3):
151            self.line_terminator_check(b'qqq', l)
152
153    def numeric_terminator_check(self, termlen):
154        # Try reading a fixed number of bytes
155        s, event = start_echo_server()
156        c = echo_client(termlen, s.port)
157        data = b"hello world, I'm not dead yet!\n"
158        c.push(data)
159        c.push(SERVER_QUIT)
160        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
161        threading_helper.join_thread(s)
162
163        self.assertEqual(c.contents, [data[:termlen]])
164
165    def test_numeric_terminator1(self):
166        # check that ints & longs both work (since type is
167        # explicitly checked in async_chat.handle_read)
168        self.numeric_terminator_check(1)
169
170    def test_numeric_terminator2(self):
171        self.numeric_terminator_check(6)
172
173    def test_none_terminator(self):
174        # Try reading a fixed number of bytes
175        s, event = start_echo_server()
176        c = echo_client(None, s.port)
177        data = b"hello world, I'm not dead yet!\n"
178        c.push(data)
179        c.push(SERVER_QUIT)
180        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
181        threading_helper.join_thread(s)
182
183        self.assertEqual(c.contents, [])
184        self.assertEqual(c.buffer, data)
185
186    def test_simple_producer(self):
187        s, event = start_echo_server()
188        c = echo_client(b'\n', s.port)
189        data = b"hello world\nI'm not dead yet!\n"
190        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
191        c.push_with_producer(p)
192        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
193        threading_helper.join_thread(s)
194
195        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
196
197    def test_string_producer(self):
198        s, event = start_echo_server()
199        c = echo_client(b'\n', s.port)
200        data = b"hello world\nI'm not dead yet!\n"
201        c.push_with_producer(data+SERVER_QUIT)
202        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
203        threading_helper.join_thread(s)
204
205        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
206
207    def test_empty_line(self):
208        # checks that empty lines are handled correctly
209        s, event = start_echo_server()
210        c = echo_client(b'\n', s.port)
211        c.push(b"hello world\n\nI'm not dead yet!\n")
212        c.push(SERVER_QUIT)
213        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
214        threading_helper.join_thread(s)
215
216        self.assertEqual(c.contents,
217                         [b"hello world", b"", b"I'm not dead yet!"])
218
219    def test_close_when_done(self):
220        s, event = start_echo_server()
221        s.start_resend_event = threading.Event()
222        c = echo_client(b'\n', s.port)
223        c.push(b"hello world\nI'm not dead yet!\n")
224        c.push(SERVER_QUIT)
225        c.close_when_done()
226        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
227
228        # Only allow the server to start echoing data back to the client after
229        # the client has closed its connection.  This prevents a race condition
230        # where the server echoes all of its data before we can check that it
231        # got any down below.
232        s.start_resend_event.set()
233        threading_helper.join_thread(s)
234
235        self.assertEqual(c.contents, [])
236        # the server might have been able to send a byte or two back, but this
237        # at least checks that it received something and didn't just fail
238        # (which could still result in the client not having received anything)
239        self.assertGreater(len(s.buffer), 0)
240
241    def test_push(self):
242        # Issue #12523: push() should raise a TypeError if it doesn't get
243        # a bytes string
244        s, event = start_echo_server()
245        c = echo_client(b'\n', s.port)
246        data = b'bytes\n'
247        c.push(data)
248        c.push(bytearray(data))
249        c.push(memoryview(data))
250        self.assertRaises(TypeError, c.push, 10)
251        self.assertRaises(TypeError, c.push, 'unicode')
252        c.push(SERVER_QUIT)
253        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
254        threading_helper.join_thread(s)
255        self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
256
257
258class TestAsynchat_WithPoll(TestAsynchat):
259    usepoll = True
260
261
262class TestAsynchatMocked(unittest.TestCase):
263    def test_blockingioerror(self):
264        # Issue #16133: handle_read() must ignore BlockingIOError
265        sock = unittest.mock.Mock()
266        sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
267
268        dispatcher = asynchat.async_chat()
269        dispatcher.set_socket(sock)
270        self.addCleanup(dispatcher.del_channel)
271
272        with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
273            dispatcher.handle_read()
274        self.assertFalse(error.called)
275
276
277class TestHelperFunctions(unittest.TestCase):
278    def test_find_prefix_at_end(self):
279        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
280        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
281
282
283class TestNotConnected(unittest.TestCase):
284    def test_disallow_negative_terminator(self):
285        # Issue #11259
286        client = asynchat.async_chat()
287        self.assertRaises(ValueError, client.set_terminator, -1)
288
289
290
291if __name__ == "__main__":
292    unittest.main()
293