1# test asynchat 2 3from test import support 4from test.support import socket_helper 5 6import asynchat 7import asyncore 8import errno 9import socket 10import sys 11import threading 12import time 13import unittest 14import unittest.mock 15 16HOST = socket_helper.HOST 17SERVER_QUIT = b'QUIT\n' 18 19 20class echo_server(threading.Thread): 21 # parameter to determine the number of bytes passed back to the 22 # client each send 23 chunk_size = 1 24 25 def __init__(self, event): 26 threading.Thread.__init__(self) 27 self.event = event 28 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 29 self.port = socket_helper.bind_port(self.sock) 30 # This will be set if the client wants us to wait before echoing 31 # data back. 32 self.start_resend_event = None 33 34 def run(self): 35 self.sock.listen() 36 self.event.set() 37 conn, client = self.sock.accept() 38 self.buffer = b"" 39 # collect data until quit message is seen 40 while SERVER_QUIT not in self.buffer: 41 data = conn.recv(1) 42 if not data: 43 break 44 self.buffer = self.buffer + data 45 46 # remove the SERVER_QUIT message 47 self.buffer = self.buffer.replace(SERVER_QUIT, b'') 48 49 if self.start_resend_event: 50 self.start_resend_event.wait() 51 52 # re-send entire set of collected data 53 try: 54 # this may fail on some tests, such as test_close_when_done, 55 # since the client closes the channel when it's done sending 56 while self.buffer: 57 n = conn.send(self.buffer[:self.chunk_size]) 58 time.sleep(0.001) 59 self.buffer = self.buffer[n:] 60 except: 61 pass 62 63 conn.close() 64 self.sock.close() 65 66class echo_client(asynchat.async_chat): 67 68 def __init__(self, terminator, server_port): 69 asynchat.async_chat.__init__(self) 70 self.contents = [] 71 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 72 self.connect((HOST, server_port)) 73 self.set_terminator(terminator) 74 self.buffer = b"" 75 76 def handle_connect(self): 77 pass 78 79 if sys.platform == 'darwin': 80 # select.poll returns a select.POLLHUP at the end of the tests 81 # on darwin, so just ignore it 82 def handle_expt(self): 83 pass 84 85 def collect_incoming_data(self, data): 86 self.buffer += data 87 88 def found_terminator(self): 89 self.contents.append(self.buffer) 90 self.buffer = b"" 91 92def start_echo_server(): 93 event = threading.Event() 94 s = echo_server(event) 95 s.start() 96 event.wait() 97 event.clear() 98 time.sleep(0.01) # Give server time to start accepting. 99 return s, event 100 101 102class TestAsynchat(unittest.TestCase): 103 usepoll = False 104 105 def setUp(self): 106 self._threads = support.threading_setup() 107 108 def tearDown(self): 109 support.threading_cleanup(*self._threads) 110 111 def line_terminator_check(self, term, server_chunk): 112 event = threading.Event() 113 s = echo_server(event) 114 s.chunk_size = server_chunk 115 s.start() 116 event.wait() 117 event.clear() 118 time.sleep(0.01) # Give server time to start accepting. 119 c = echo_client(term, s.port) 120 c.push(b"hello ") 121 c.push(b"world" + term) 122 c.push(b"I'm not dead yet!" + term) 123 c.push(SERVER_QUIT) 124 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 125 support.join_thread(s) 126 127 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 128 129 # the line terminator tests below check receiving variously-sized 130 # chunks back from the server in order to exercise all branches of 131 # async_chat.handle_read 132 133 def test_line_terminator1(self): 134 # test one-character terminator 135 for l in (1, 2, 3): 136 self.line_terminator_check(b'\n', l) 137 138 def test_line_terminator2(self): 139 # test two-character terminator 140 for l in (1, 2, 3): 141 self.line_terminator_check(b'\r\n', l) 142 143 def test_line_terminator3(self): 144 # test three-character terminator 145 for l in (1, 2, 3): 146 self.line_terminator_check(b'qqq', l) 147 148 def numeric_terminator_check(self, termlen): 149 # Try reading a fixed number of bytes 150 s, event = start_echo_server() 151 c = echo_client(termlen, s.port) 152 data = b"hello world, I'm not dead yet!\n" 153 c.push(data) 154 c.push(SERVER_QUIT) 155 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 156 support.join_thread(s) 157 158 self.assertEqual(c.contents, [data[:termlen]]) 159 160 def test_numeric_terminator1(self): 161 # check that ints & longs both work (since type is 162 # explicitly checked in async_chat.handle_read) 163 self.numeric_terminator_check(1) 164 165 def test_numeric_terminator2(self): 166 self.numeric_terminator_check(6) 167 168 def test_none_terminator(self): 169 # Try reading a fixed number of bytes 170 s, event = start_echo_server() 171 c = echo_client(None, s.port) 172 data = b"hello world, I'm not dead yet!\n" 173 c.push(data) 174 c.push(SERVER_QUIT) 175 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 176 support.join_thread(s) 177 178 self.assertEqual(c.contents, []) 179 self.assertEqual(c.buffer, data) 180 181 def test_simple_producer(self): 182 s, event = start_echo_server() 183 c = echo_client(b'\n', s.port) 184 data = b"hello world\nI'm not dead yet!\n" 185 p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) 186 c.push_with_producer(p) 187 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 188 support.join_thread(s) 189 190 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 191 192 def test_string_producer(self): 193 s, event = start_echo_server() 194 c = echo_client(b'\n', s.port) 195 data = b"hello world\nI'm not dead yet!\n" 196 c.push_with_producer(data+SERVER_QUIT) 197 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 198 support.join_thread(s) 199 200 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 201 202 def test_empty_line(self): 203 # checks that empty lines are handled correctly 204 s, event = start_echo_server() 205 c = echo_client(b'\n', s.port) 206 c.push(b"hello world\n\nI'm not dead yet!\n") 207 c.push(SERVER_QUIT) 208 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 209 support.join_thread(s) 210 211 self.assertEqual(c.contents, 212 [b"hello world", b"", b"I'm not dead yet!"]) 213 214 def test_close_when_done(self): 215 s, event = start_echo_server() 216 s.start_resend_event = threading.Event() 217 c = echo_client(b'\n', s.port) 218 c.push(b"hello world\nI'm not dead yet!\n") 219 c.push(SERVER_QUIT) 220 c.close_when_done() 221 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 222 223 # Only allow the server to start echoing data back to the client after 224 # the client has closed its connection. This prevents a race condition 225 # where the server echoes all of its data before we can check that it 226 # got any down below. 227 s.start_resend_event.set() 228 support.join_thread(s) 229 230 self.assertEqual(c.contents, []) 231 # the server might have been able to send a byte or two back, but this 232 # at least checks that it received something and didn't just fail 233 # (which could still result in the client not having received anything) 234 self.assertGreater(len(s.buffer), 0) 235 236 def test_push(self): 237 # Issue #12523: push() should raise a TypeError if it doesn't get 238 # a bytes string 239 s, event = start_echo_server() 240 c = echo_client(b'\n', s.port) 241 data = b'bytes\n' 242 c.push(data) 243 c.push(bytearray(data)) 244 c.push(memoryview(data)) 245 self.assertRaises(TypeError, c.push, 10) 246 self.assertRaises(TypeError, c.push, 'unicode') 247 c.push(SERVER_QUIT) 248 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 249 support.join_thread(s) 250 self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes']) 251 252 253class TestAsynchat_WithPoll(TestAsynchat): 254 usepoll = True 255 256 257class TestAsynchatMocked(unittest.TestCase): 258 def test_blockingioerror(self): 259 # Issue #16133: handle_read() must ignore BlockingIOError 260 sock = unittest.mock.Mock() 261 sock.recv.side_effect = BlockingIOError(errno.EAGAIN) 262 263 dispatcher = asynchat.async_chat() 264 dispatcher.set_socket(sock) 265 self.addCleanup(dispatcher.del_channel) 266 267 with unittest.mock.patch.object(dispatcher, 'handle_error') as error: 268 dispatcher.handle_read() 269 self.assertFalse(error.called) 270 271 272class TestHelperFunctions(unittest.TestCase): 273 def test_find_prefix_at_end(self): 274 self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) 275 self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) 276 277 278class TestNotConnected(unittest.TestCase): 279 def test_disallow_negative_terminator(self): 280 # Issue #11259 281 client = asynchat.async_chat() 282 self.assertRaises(ValueError, client.set_terminator, -1) 283 284 285 286if __name__ == "__main__": 287 unittest.main() 288