1# test asynchat
2
3from test import support
4from test.support import socket_helper
5
6import asynchat
7import asyncore
8import errno
9import socket
10import sys
11import threading
12import time
13import unittest
14import unittest.mock
15
16HOST = socket_helper.HOST
17SERVER_QUIT = b'QUIT\n'
18
19
20class echo_server(threading.Thread):
21    # parameter to determine the number of bytes passed back to the
22    # client each send
23    chunk_size = 1
24
25    def __init__(self, event):
26        threading.Thread.__init__(self)
27        self.event = event
28        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
29        self.port = socket_helper.bind_port(self.sock)
30        # This will be set if the client wants us to wait before echoing
31        # data back.
32        self.start_resend_event = None
33
34    def run(self):
35        self.sock.listen()
36        self.event.set()
37        conn, client = self.sock.accept()
38        self.buffer = b""
39        # collect data until quit message is seen
40        while SERVER_QUIT not in self.buffer:
41            data = conn.recv(1)
42            if not data:
43                break
44            self.buffer = self.buffer + data
45
46        # remove the SERVER_QUIT message
47        self.buffer = self.buffer.replace(SERVER_QUIT, b'')
48
49        if self.start_resend_event:
50            self.start_resend_event.wait()
51
52        # re-send entire set of collected data
53        try:
54            # this may fail on some tests, such as test_close_when_done,
55            # since the client closes the channel when it's done sending
56            while self.buffer:
57                n = conn.send(self.buffer[:self.chunk_size])
58                time.sleep(0.001)
59                self.buffer = self.buffer[n:]
60        except:
61            pass
62
63        conn.close()
64        self.sock.close()
65
66class echo_client(asynchat.async_chat):
67
68    def __init__(self, terminator, server_port):
69        asynchat.async_chat.__init__(self)
70        self.contents = []
71        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
72        self.connect((HOST, server_port))
73        self.set_terminator(terminator)
74        self.buffer = b""
75
76    def handle_connect(self):
77        pass
78
79    if sys.platform == 'darwin':
80        # select.poll returns a select.POLLHUP at the end of the tests
81        # on darwin, so just ignore it
82        def handle_expt(self):
83            pass
84
85    def collect_incoming_data(self, data):
86        self.buffer += data
87
88    def found_terminator(self):
89        self.contents.append(self.buffer)
90        self.buffer = b""
91
92def start_echo_server():
93    event = threading.Event()
94    s = echo_server(event)
95    s.start()
96    event.wait()
97    event.clear()
98    time.sleep(0.01)   # Give server time to start accepting.
99    return s, event
100
101
102class TestAsynchat(unittest.TestCase):
103    usepoll = False
104
105    def setUp(self):
106        self._threads = support.threading_setup()
107
108    def tearDown(self):
109        support.threading_cleanup(*self._threads)
110
111    def line_terminator_check(self, term, server_chunk):
112        event = threading.Event()
113        s = echo_server(event)
114        s.chunk_size = server_chunk
115        s.start()
116        event.wait()
117        event.clear()
118        time.sleep(0.01)   # Give server time to start accepting.
119        c = echo_client(term, s.port)
120        c.push(b"hello ")
121        c.push(b"world" + term)
122        c.push(b"I'm not dead yet!" + term)
123        c.push(SERVER_QUIT)
124        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
125        support.join_thread(s)
126
127        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
128
129    # the line terminator tests below check receiving variously-sized
130    # chunks back from the server in order to exercise all branches of
131    # async_chat.handle_read
132
133    def test_line_terminator1(self):
134        # test one-character terminator
135        for l in (1, 2, 3):
136            self.line_terminator_check(b'\n', l)
137
138    def test_line_terminator2(self):
139        # test two-character terminator
140        for l in (1, 2, 3):
141            self.line_terminator_check(b'\r\n', l)
142
143    def test_line_terminator3(self):
144        # test three-character terminator
145        for l in (1, 2, 3):
146            self.line_terminator_check(b'qqq', l)
147
148    def numeric_terminator_check(self, termlen):
149        # Try reading a fixed number of bytes
150        s, event = start_echo_server()
151        c = echo_client(termlen, s.port)
152        data = b"hello world, I'm not dead yet!\n"
153        c.push(data)
154        c.push(SERVER_QUIT)
155        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
156        support.join_thread(s)
157
158        self.assertEqual(c.contents, [data[:termlen]])
159
160    def test_numeric_terminator1(self):
161        # check that ints & longs both work (since type is
162        # explicitly checked in async_chat.handle_read)
163        self.numeric_terminator_check(1)
164
165    def test_numeric_terminator2(self):
166        self.numeric_terminator_check(6)
167
168    def test_none_terminator(self):
169        # Try reading a fixed number of bytes
170        s, event = start_echo_server()
171        c = echo_client(None, s.port)
172        data = b"hello world, I'm not dead yet!\n"
173        c.push(data)
174        c.push(SERVER_QUIT)
175        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
176        support.join_thread(s)
177
178        self.assertEqual(c.contents, [])
179        self.assertEqual(c.buffer, data)
180
181    def test_simple_producer(self):
182        s, event = start_echo_server()
183        c = echo_client(b'\n', s.port)
184        data = b"hello world\nI'm not dead yet!\n"
185        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
186        c.push_with_producer(p)
187        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
188        support.join_thread(s)
189
190        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
191
192    def test_string_producer(self):
193        s, event = start_echo_server()
194        c = echo_client(b'\n', s.port)
195        data = b"hello world\nI'm not dead yet!\n"
196        c.push_with_producer(data+SERVER_QUIT)
197        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
198        support.join_thread(s)
199
200        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
201
202    def test_empty_line(self):
203        # checks that empty lines are handled correctly
204        s, event = start_echo_server()
205        c = echo_client(b'\n', s.port)
206        c.push(b"hello world\n\nI'm not dead yet!\n")
207        c.push(SERVER_QUIT)
208        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
209        support.join_thread(s)
210
211        self.assertEqual(c.contents,
212                         [b"hello world", b"", b"I'm not dead yet!"])
213
214    def test_close_when_done(self):
215        s, event = start_echo_server()
216        s.start_resend_event = threading.Event()
217        c = echo_client(b'\n', s.port)
218        c.push(b"hello world\nI'm not dead yet!\n")
219        c.push(SERVER_QUIT)
220        c.close_when_done()
221        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
222
223        # Only allow the server to start echoing data back to the client after
224        # the client has closed its connection.  This prevents a race condition
225        # where the server echoes all of its data before we can check that it
226        # got any down below.
227        s.start_resend_event.set()
228        support.join_thread(s)
229
230        self.assertEqual(c.contents, [])
231        # the server might have been able to send a byte or two back, but this
232        # at least checks that it received something and didn't just fail
233        # (which could still result in the client not having received anything)
234        self.assertGreater(len(s.buffer), 0)
235
236    def test_push(self):
237        # Issue #12523: push() should raise a TypeError if it doesn't get
238        # a bytes string
239        s, event = start_echo_server()
240        c = echo_client(b'\n', s.port)
241        data = b'bytes\n'
242        c.push(data)
243        c.push(bytearray(data))
244        c.push(memoryview(data))
245        self.assertRaises(TypeError, c.push, 10)
246        self.assertRaises(TypeError, c.push, 'unicode')
247        c.push(SERVER_QUIT)
248        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
249        support.join_thread(s)
250        self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
251
252
253class TestAsynchat_WithPoll(TestAsynchat):
254    usepoll = True
255
256
257class TestAsynchatMocked(unittest.TestCase):
258    def test_blockingioerror(self):
259        # Issue #16133: handle_read() must ignore BlockingIOError
260        sock = unittest.mock.Mock()
261        sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
262
263        dispatcher = asynchat.async_chat()
264        dispatcher.set_socket(sock)
265        self.addCleanup(dispatcher.del_channel)
266
267        with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
268            dispatcher.handle_read()
269        self.assertFalse(error.called)
270
271
272class TestHelperFunctions(unittest.TestCase):
273    def test_find_prefix_at_end(self):
274        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
275        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
276
277
278class TestNotConnected(unittest.TestCase):
279    def test_disallow_negative_terminator(self):
280        # Issue #11259
281        client = asynchat.async_chat()
282        self.assertRaises(ValueError, client.set_terminator, -1)
283
284
285
286if __name__ == "__main__":
287    unittest.main()
288