1"""Tests for events.py.""" 2 3import collections.abc 4import concurrent.futures 5import functools 6import io 7import os 8import platform 9import re 10import signal 11import socket 12try: 13 import ssl 14except ImportError: 15 ssl = None 16import subprocess 17import sys 18import tempfile 19import threading 20import time 21import errno 22import unittest 23from unittest import mock 24import weakref 25 26if sys.platform != 'win32': 27 import tty 28 29import asyncio 30from asyncio import base_events 31from asyncio import constants 32from asyncio import coroutines 33from asyncio import events 34from asyncio import proactor_events 35from asyncio import selector_events 36from test.test_asyncio import utils as test_utils 37from test import support 38 39 40def broken_unix_getsockname(): 41 """Return True if the platform is Mac OS 10.4 or older.""" 42 if sys.platform.startswith("aix"): 43 return True 44 elif sys.platform != 'darwin': 45 return False 46 version = platform.mac_ver()[0] 47 version = tuple(map(int, version.split('.'))) 48 return version < (10, 5) 49 50 51def _test_get_event_loop_new_process__sub_proc(): 52 async def doit(): 53 return 'hello' 54 55 loop = asyncio.new_event_loop() 56 asyncio.set_event_loop(loop) 57 return loop.run_until_complete(doit()) 58 59 60class CoroLike: 61 def send(self, v): 62 pass 63 64 def throw(self, *exc): 65 pass 66 67 def close(self): 68 pass 69 70 def __await__(self): 71 pass 72 73 74class MyBaseProto(asyncio.Protocol): 75 connected = None 76 done = None 77 78 def __init__(self, loop=None): 79 self.transport = None 80 self.state = 'INITIAL' 81 self.nbytes = 0 82 if loop is not None: 83 self.connected = asyncio.Future(loop=loop) 84 self.done = asyncio.Future(loop=loop) 85 86 def connection_made(self, transport): 87 self.transport = transport 88 assert self.state == 'INITIAL', self.state 89 self.state = 'CONNECTED' 90 if self.connected: 91 self.connected.set_result(None) 92 93 def data_received(self, data): 94 assert self.state == 'CONNECTED', self.state 95 self.nbytes += len(data) 96 97 def eof_received(self): 98 assert self.state == 'CONNECTED', self.state 99 self.state = 'EOF' 100 101 def connection_lost(self, exc): 102 assert self.state in ('CONNECTED', 'EOF'), self.state 103 self.state = 'CLOSED' 104 if self.done: 105 self.done.set_result(None) 106 107 108class MyProto(MyBaseProto): 109 def connection_made(self, transport): 110 super().connection_made(transport) 111 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 112 113 114class MyDatagramProto(asyncio.DatagramProtocol): 115 done = None 116 117 def __init__(self, loop=None): 118 self.state = 'INITIAL' 119 self.nbytes = 0 120 if loop is not None: 121 self.done = asyncio.Future(loop=loop) 122 123 def connection_made(self, transport): 124 self.transport = transport 125 assert self.state == 'INITIAL', self.state 126 self.state = 'INITIALIZED' 127 128 def datagram_received(self, data, addr): 129 assert self.state == 'INITIALIZED', self.state 130 self.nbytes += len(data) 131 132 def error_received(self, exc): 133 assert self.state == 'INITIALIZED', self.state 134 135 def connection_lost(self, exc): 136 assert self.state == 'INITIALIZED', self.state 137 self.state = 'CLOSED' 138 if self.done: 139 self.done.set_result(None) 140 141 142class MyReadPipeProto(asyncio.Protocol): 143 done = None 144 145 def __init__(self, loop=None): 146 self.state = ['INITIAL'] 147 self.nbytes = 0 148 self.transport = None 149 if loop is not None: 150 self.done = asyncio.Future(loop=loop) 151 152 def connection_made(self, transport): 153 self.transport = transport 154 assert self.state == ['INITIAL'], self.state 155 self.state.append('CONNECTED') 156 157 def data_received(self, data): 158 assert self.state == ['INITIAL', 'CONNECTED'], self.state 159 self.nbytes += len(data) 160 161 def eof_received(self): 162 assert self.state == ['INITIAL', 'CONNECTED'], self.state 163 self.state.append('EOF') 164 165 def connection_lost(self, exc): 166 if 'EOF' not in self.state: 167 self.state.append('EOF') # It is okay if EOF is missed. 168 assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state 169 self.state.append('CLOSED') 170 if self.done: 171 self.done.set_result(None) 172 173 174class MyWritePipeProto(asyncio.BaseProtocol): 175 done = None 176 177 def __init__(self, loop=None): 178 self.state = 'INITIAL' 179 self.transport = None 180 if loop is not None: 181 self.done = asyncio.Future(loop=loop) 182 183 def connection_made(self, transport): 184 self.transport = transport 185 assert self.state == 'INITIAL', self.state 186 self.state = 'CONNECTED' 187 188 def connection_lost(self, exc): 189 assert self.state == 'CONNECTED', self.state 190 self.state = 'CLOSED' 191 if self.done: 192 self.done.set_result(None) 193 194 195class MySubprocessProtocol(asyncio.SubprocessProtocol): 196 197 def __init__(self, loop): 198 self.state = 'INITIAL' 199 self.transport = None 200 self.connected = asyncio.Future(loop=loop) 201 self.completed = asyncio.Future(loop=loop) 202 self.disconnects = {fd: asyncio.Future(loop=loop) for fd in range(3)} 203 self.data = {1: b'', 2: b''} 204 self.returncode = None 205 self.got_data = {1: asyncio.Event(loop=loop), 206 2: asyncio.Event(loop=loop)} 207 208 def connection_made(self, transport): 209 self.transport = transport 210 assert self.state == 'INITIAL', self.state 211 self.state = 'CONNECTED' 212 self.connected.set_result(None) 213 214 def connection_lost(self, exc): 215 assert self.state == 'CONNECTED', self.state 216 self.state = 'CLOSED' 217 self.completed.set_result(None) 218 219 def pipe_data_received(self, fd, data): 220 assert self.state == 'CONNECTED', self.state 221 self.data[fd] += data 222 self.got_data[fd].set() 223 224 def pipe_connection_lost(self, fd, exc): 225 assert self.state == 'CONNECTED', self.state 226 if exc: 227 self.disconnects[fd].set_exception(exc) 228 else: 229 self.disconnects[fd].set_result(exc) 230 231 def process_exited(self): 232 assert self.state == 'CONNECTED', self.state 233 self.returncode = self.transport.get_returncode() 234 235 236class EventLoopTestsMixin: 237 238 def setUp(self): 239 super().setUp() 240 self.loop = self.create_event_loop() 241 self.set_event_loop(self.loop) 242 243 def tearDown(self): 244 # just in case if we have transport close callbacks 245 if not self.loop.is_closed(): 246 test_utils.run_briefly(self.loop) 247 248 self.doCleanups() 249 support.gc_collect() 250 super().tearDown() 251 252 def test_run_until_complete_nesting(self): 253 @asyncio.coroutine 254 def coro1(): 255 yield 256 257 @asyncio.coroutine 258 def coro2(): 259 self.assertTrue(self.loop.is_running()) 260 self.loop.run_until_complete(coro1()) 261 262 self.assertRaises( 263 RuntimeError, self.loop.run_until_complete, coro2()) 264 265 # Note: because of the default Windows timing granularity of 266 # 15.6 msec, we use fairly long sleep times here (~100 msec). 267 268 def test_run_until_complete(self): 269 t0 = self.loop.time() 270 self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop)) 271 t1 = self.loop.time() 272 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 273 274 def test_run_until_complete_stopped(self): 275 276 async def cb(): 277 self.loop.stop() 278 await asyncio.sleep(0.1, loop=self.loop) 279 task = cb() 280 self.assertRaises(RuntimeError, 281 self.loop.run_until_complete, task) 282 283 def test_call_later(self): 284 results = [] 285 286 def callback(arg): 287 results.append(arg) 288 self.loop.stop() 289 290 self.loop.call_later(0.1, callback, 'hello world') 291 t0 = time.monotonic() 292 self.loop.run_forever() 293 t1 = time.monotonic() 294 self.assertEqual(results, ['hello world']) 295 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 296 297 def test_call_soon(self): 298 results = [] 299 300 def callback(arg1, arg2): 301 results.append((arg1, arg2)) 302 self.loop.stop() 303 304 self.loop.call_soon(callback, 'hello', 'world') 305 self.loop.run_forever() 306 self.assertEqual(results, [('hello', 'world')]) 307 308 def test_call_soon_threadsafe(self): 309 results = [] 310 lock = threading.Lock() 311 312 def callback(arg): 313 results.append(arg) 314 if len(results) >= 2: 315 self.loop.stop() 316 317 def run_in_thread(): 318 self.loop.call_soon_threadsafe(callback, 'hello') 319 lock.release() 320 321 lock.acquire() 322 t = threading.Thread(target=run_in_thread) 323 t.start() 324 325 with lock: 326 self.loop.call_soon(callback, 'world') 327 self.loop.run_forever() 328 t.join() 329 self.assertEqual(results, ['hello', 'world']) 330 331 def test_call_soon_threadsafe_same_thread(self): 332 results = [] 333 334 def callback(arg): 335 results.append(arg) 336 if len(results) >= 2: 337 self.loop.stop() 338 339 self.loop.call_soon_threadsafe(callback, 'hello') 340 self.loop.call_soon(callback, 'world') 341 self.loop.run_forever() 342 self.assertEqual(results, ['hello', 'world']) 343 344 def test_run_in_executor(self): 345 def run(arg): 346 return (arg, threading.get_ident()) 347 f2 = self.loop.run_in_executor(None, run, 'yo') 348 res, thread_id = self.loop.run_until_complete(f2) 349 self.assertEqual(res, 'yo') 350 self.assertNotEqual(thread_id, threading.get_ident()) 351 352 def test_run_in_executor_cancel(self): 353 called = False 354 355 def patched_call_soon(*args): 356 nonlocal called 357 called = True 358 359 def run(): 360 time.sleep(0.05) 361 362 f2 = self.loop.run_in_executor(None, run) 363 f2.cancel() 364 self.loop.close() 365 self.loop.call_soon = patched_call_soon 366 self.loop.call_soon_threadsafe = patched_call_soon 367 time.sleep(0.4) 368 self.assertFalse(called) 369 370 def test_reader_callback(self): 371 r, w = socket.socketpair() 372 r.setblocking(False) 373 bytes_read = bytearray() 374 375 def reader(): 376 try: 377 data = r.recv(1024) 378 except BlockingIOError: 379 # Spurious readiness notifications are possible 380 # at least on Linux -- see man select. 381 return 382 if data: 383 bytes_read.extend(data) 384 else: 385 self.assertTrue(self.loop.remove_reader(r.fileno())) 386 r.close() 387 388 self.loop.add_reader(r.fileno(), reader) 389 self.loop.call_soon(w.send, b'abc') 390 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) 391 self.loop.call_soon(w.send, b'def') 392 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) 393 self.loop.call_soon(w.close) 394 self.loop.call_soon(self.loop.stop) 395 self.loop.run_forever() 396 self.assertEqual(bytes_read, b'abcdef') 397 398 def test_writer_callback(self): 399 r, w = socket.socketpair() 400 w.setblocking(False) 401 402 def writer(data): 403 w.send(data) 404 self.loop.stop() 405 406 data = b'x' * 1024 407 self.loop.add_writer(w.fileno(), writer, data) 408 self.loop.run_forever() 409 410 self.assertTrue(self.loop.remove_writer(w.fileno())) 411 self.assertFalse(self.loop.remove_writer(w.fileno())) 412 413 w.close() 414 read = r.recv(len(data) * 2) 415 r.close() 416 self.assertEqual(read, data) 417 418 def _basetest_sock_client_ops(self, httpd, sock): 419 if not isinstance(self.loop, proactor_events.BaseProactorEventLoop): 420 # in debug mode, socket operations must fail 421 # if the socket is not in blocking mode 422 self.loop.set_debug(True) 423 sock.setblocking(True) 424 with self.assertRaises(ValueError): 425 self.loop.run_until_complete( 426 self.loop.sock_connect(sock, httpd.address)) 427 with self.assertRaises(ValueError): 428 self.loop.run_until_complete( 429 self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) 430 with self.assertRaises(ValueError): 431 self.loop.run_until_complete( 432 self.loop.sock_recv(sock, 1024)) 433 with self.assertRaises(ValueError): 434 self.loop.run_until_complete( 435 self.loop.sock_recv_into(sock, bytearray())) 436 with self.assertRaises(ValueError): 437 self.loop.run_until_complete( 438 self.loop.sock_accept(sock)) 439 440 # test in non-blocking mode 441 sock.setblocking(False) 442 self.loop.run_until_complete( 443 self.loop.sock_connect(sock, httpd.address)) 444 self.loop.run_until_complete( 445 self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) 446 data = self.loop.run_until_complete( 447 self.loop.sock_recv(sock, 1024)) 448 # consume data 449 self.loop.run_until_complete( 450 self.loop.sock_recv(sock, 1024)) 451 sock.close() 452 self.assertTrue(data.startswith(b'HTTP/1.0 200 OK')) 453 454 def _basetest_sock_recv_into(self, httpd, sock): 455 # same as _basetest_sock_client_ops, but using sock_recv_into 456 sock.setblocking(False) 457 self.loop.run_until_complete( 458 self.loop.sock_connect(sock, httpd.address)) 459 self.loop.run_until_complete( 460 self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) 461 data = bytearray(1024) 462 with memoryview(data) as buf: 463 nbytes = self.loop.run_until_complete( 464 self.loop.sock_recv_into(sock, buf[:1024])) 465 # consume data 466 self.loop.run_until_complete( 467 self.loop.sock_recv_into(sock, buf[nbytes:])) 468 sock.close() 469 self.assertTrue(data.startswith(b'HTTP/1.0 200 OK')) 470 471 def test_sock_client_ops(self): 472 with test_utils.run_test_server() as httpd: 473 sock = socket.socket() 474 self._basetest_sock_client_ops(httpd, sock) 475 sock = socket.socket() 476 self._basetest_sock_recv_into(httpd, sock) 477 478 @support.skip_unless_bind_unix_socket 479 def test_unix_sock_client_ops(self): 480 with test_utils.run_test_unix_server() as httpd: 481 sock = socket.socket(socket.AF_UNIX) 482 self._basetest_sock_client_ops(httpd, sock) 483 sock = socket.socket(socket.AF_UNIX) 484 self._basetest_sock_recv_into(httpd, sock) 485 486 def test_sock_client_fail(self): 487 # Make sure that we will get an unused port 488 address = None 489 try: 490 s = socket.socket() 491 s.bind(('127.0.0.1', 0)) 492 address = s.getsockname() 493 finally: 494 s.close() 495 496 sock = socket.socket() 497 sock.setblocking(False) 498 with self.assertRaises(ConnectionRefusedError): 499 self.loop.run_until_complete( 500 self.loop.sock_connect(sock, address)) 501 sock.close() 502 503 def test_sock_accept(self): 504 listener = socket.socket() 505 listener.setblocking(False) 506 listener.bind(('127.0.0.1', 0)) 507 listener.listen(1) 508 client = socket.socket() 509 client.connect(listener.getsockname()) 510 511 f = self.loop.sock_accept(listener) 512 conn, addr = self.loop.run_until_complete(f) 513 self.assertEqual(conn.gettimeout(), 0) 514 self.assertEqual(addr, client.getsockname()) 515 self.assertEqual(client.getpeername(), listener.getsockname()) 516 client.close() 517 conn.close() 518 listener.close() 519 520 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL') 521 def test_add_signal_handler(self): 522 caught = 0 523 524 def my_handler(): 525 nonlocal caught 526 caught += 1 527 528 # Check error behavior first. 529 self.assertRaises( 530 TypeError, self.loop.add_signal_handler, 'boom', my_handler) 531 self.assertRaises( 532 TypeError, self.loop.remove_signal_handler, 'boom') 533 self.assertRaises( 534 ValueError, self.loop.add_signal_handler, signal.NSIG+1, 535 my_handler) 536 self.assertRaises( 537 ValueError, self.loop.remove_signal_handler, signal.NSIG+1) 538 self.assertRaises( 539 ValueError, self.loop.add_signal_handler, 0, my_handler) 540 self.assertRaises( 541 ValueError, self.loop.remove_signal_handler, 0) 542 self.assertRaises( 543 ValueError, self.loop.add_signal_handler, -1, my_handler) 544 self.assertRaises( 545 ValueError, self.loop.remove_signal_handler, -1) 546 self.assertRaises( 547 RuntimeError, self.loop.add_signal_handler, signal.SIGKILL, 548 my_handler) 549 # Removing SIGKILL doesn't raise, since we don't call signal(). 550 self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) 551 # Now set a handler and handle it. 552 self.loop.add_signal_handler(signal.SIGINT, my_handler) 553 554 os.kill(os.getpid(), signal.SIGINT) 555 test_utils.run_until(self.loop, lambda: caught) 556 557 # Removing it should restore the default handler. 558 self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) 559 self.assertEqual(signal.getsignal(signal.SIGINT), 560 signal.default_int_handler) 561 # Removing again returns False. 562 self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT)) 563 564 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 565 def test_signal_handling_while_selecting(self): 566 # Test with a signal actually arriving during a select() call. 567 caught = 0 568 569 def my_handler(): 570 nonlocal caught 571 caught += 1 572 self.loop.stop() 573 574 self.loop.add_signal_handler(signal.SIGALRM, my_handler) 575 576 signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. 577 self.loop.call_later(60, self.loop.stop) 578 self.loop.run_forever() 579 self.assertEqual(caught, 1) 580 581 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 582 def test_signal_handling_args(self): 583 some_args = (42,) 584 caught = 0 585 586 def my_handler(*args): 587 nonlocal caught 588 caught += 1 589 self.assertEqual(args, some_args) 590 self.loop.stop() 591 592 self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) 593 594 signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. 595 self.loop.call_later(60, self.loop.stop) 596 self.loop.run_forever() 597 self.assertEqual(caught, 1) 598 599 def _basetest_create_connection(self, connection_fut, check_sockname=True): 600 tr, pr = self.loop.run_until_complete(connection_fut) 601 self.assertIsInstance(tr, asyncio.Transport) 602 self.assertIsInstance(pr, asyncio.Protocol) 603 self.assertIs(pr.transport, tr) 604 if check_sockname: 605 self.assertIsNotNone(tr.get_extra_info('sockname')) 606 self.loop.run_until_complete(pr.done) 607 self.assertGreater(pr.nbytes, 0) 608 tr.close() 609 610 def test_create_connection(self): 611 with test_utils.run_test_server() as httpd: 612 conn_fut = self.loop.create_connection( 613 lambda: MyProto(loop=self.loop), *httpd.address) 614 self._basetest_create_connection(conn_fut) 615 616 @support.skip_unless_bind_unix_socket 617 def test_create_unix_connection(self): 618 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 619 # zero-length address for UNIX socket. 620 check_sockname = not broken_unix_getsockname() 621 622 with test_utils.run_test_unix_server() as httpd: 623 conn_fut = self.loop.create_unix_connection( 624 lambda: MyProto(loop=self.loop), httpd.address) 625 self._basetest_create_connection(conn_fut, check_sockname) 626 627 def test_create_connection_sock(self): 628 with test_utils.run_test_server() as httpd: 629 sock = None 630 infos = self.loop.run_until_complete( 631 self.loop.getaddrinfo( 632 *httpd.address, type=socket.SOCK_STREAM)) 633 for family, type, proto, cname, address in infos: 634 try: 635 sock = socket.socket(family=family, type=type, proto=proto) 636 sock.setblocking(False) 637 self.loop.run_until_complete( 638 self.loop.sock_connect(sock, address)) 639 except: 640 pass 641 else: 642 break 643 else: 644 assert False, 'Can not create socket.' 645 646 f = self.loop.create_connection( 647 lambda: MyProto(loop=self.loop), sock=sock) 648 tr, pr = self.loop.run_until_complete(f) 649 self.assertIsInstance(tr, asyncio.Transport) 650 self.assertIsInstance(pr, asyncio.Protocol) 651 self.loop.run_until_complete(pr.done) 652 self.assertGreater(pr.nbytes, 0) 653 tr.close() 654 655 def check_ssl_extra_info(self, client, check_sockname=True, 656 peername=None, peercert={}): 657 if check_sockname: 658 self.assertIsNotNone(client.get_extra_info('sockname')) 659 if peername: 660 self.assertEqual(peername, 661 client.get_extra_info('peername')) 662 else: 663 self.assertIsNotNone(client.get_extra_info('peername')) 664 self.assertEqual(peercert, 665 client.get_extra_info('peercert')) 666 667 # test SSL cipher 668 cipher = client.get_extra_info('cipher') 669 self.assertIsInstance(cipher, tuple) 670 self.assertEqual(len(cipher), 3, cipher) 671 self.assertIsInstance(cipher[0], str) 672 self.assertIsInstance(cipher[1], str) 673 self.assertIsInstance(cipher[2], int) 674 675 # test SSL object 676 sslobj = client.get_extra_info('ssl_object') 677 self.assertIsNotNone(sslobj) 678 self.assertEqual(sslobj.compression(), 679 client.get_extra_info('compression')) 680 self.assertEqual(sslobj.cipher(), 681 client.get_extra_info('cipher')) 682 self.assertEqual(sslobj.getpeercert(), 683 client.get_extra_info('peercert')) 684 self.assertEqual(sslobj.compression(), 685 client.get_extra_info('compression')) 686 687 def _basetest_create_ssl_connection(self, connection_fut, 688 check_sockname=True, 689 peername=None): 690 tr, pr = self.loop.run_until_complete(connection_fut) 691 self.assertIsInstance(tr, asyncio.Transport) 692 self.assertIsInstance(pr, asyncio.Protocol) 693 self.assertTrue('ssl' in tr.__class__.__name__.lower()) 694 self.check_ssl_extra_info(tr, check_sockname, peername) 695 self.loop.run_until_complete(pr.done) 696 self.assertGreater(pr.nbytes, 0) 697 tr.close() 698 699 def _test_create_ssl_connection(self, httpd, create_connection, 700 check_sockname=True, peername=None): 701 conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) 702 self._basetest_create_ssl_connection(conn_fut, check_sockname, 703 peername) 704 705 # ssl.Purpose was introduced in Python 3.4 706 if hasattr(ssl, 'Purpose'): 707 def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *, 708 cafile=None, capath=None, 709 cadata=None): 710 """ 711 A ssl.create_default_context() replacement that doesn't enable 712 cert validation. 713 """ 714 self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH) 715 return test_utils.dummy_ssl_context() 716 717 # With ssl=True, ssl.create_default_context() should be called 718 with mock.patch('ssl.create_default_context', 719 side_effect=_dummy_ssl_create_context) as m: 720 conn_fut = create_connection(ssl=True) 721 self._basetest_create_ssl_connection(conn_fut, check_sockname, 722 peername) 723 self.assertEqual(m.call_count, 1) 724 725 # With the real ssl.create_default_context(), certificate 726 # validation will fail 727 with self.assertRaises(ssl.SSLError) as cm: 728 conn_fut = create_connection(ssl=True) 729 # Ignore the "SSL handshake failed" log in debug mode 730 with test_utils.disable_logger(): 731 self._basetest_create_ssl_connection(conn_fut, check_sockname, 732 peername) 733 734 self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 735 736 @unittest.skipIf(ssl is None, 'No ssl module') 737 def test_create_ssl_connection(self): 738 with test_utils.run_test_server(use_ssl=True) as httpd: 739 create_connection = functools.partial( 740 self.loop.create_connection, 741 lambda: MyProto(loop=self.loop), 742 *httpd.address) 743 self._test_create_ssl_connection(httpd, create_connection, 744 peername=httpd.address) 745 746 @support.skip_unless_bind_unix_socket 747 @unittest.skipIf(ssl is None, 'No ssl module') 748 def test_create_ssl_unix_connection(self): 749 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 750 # zero-length address for UNIX socket. 751 check_sockname = not broken_unix_getsockname() 752 753 with test_utils.run_test_unix_server(use_ssl=True) as httpd: 754 create_connection = functools.partial( 755 self.loop.create_unix_connection, 756 lambda: MyProto(loop=self.loop), httpd.address, 757 server_hostname='127.0.0.1') 758 759 self._test_create_ssl_connection(httpd, create_connection, 760 check_sockname, 761 peername=httpd.address) 762 763 def test_create_connection_local_addr(self): 764 with test_utils.run_test_server() as httpd: 765 port = support.find_unused_port() 766 f = self.loop.create_connection( 767 lambda: MyProto(loop=self.loop), 768 *httpd.address, local_addr=(httpd.address[0], port)) 769 tr, pr = self.loop.run_until_complete(f) 770 expected = pr.transport.get_extra_info('sockname')[1] 771 self.assertEqual(port, expected) 772 tr.close() 773 774 def test_create_connection_local_addr_in_use(self): 775 with test_utils.run_test_server() as httpd: 776 f = self.loop.create_connection( 777 lambda: MyProto(loop=self.loop), 778 *httpd.address, local_addr=httpd.address) 779 with self.assertRaises(OSError) as cm: 780 self.loop.run_until_complete(f) 781 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 782 self.assertIn(str(httpd.address), cm.exception.strerror) 783 784 def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None): 785 loop = self.loop 786 787 class MyProto(MyBaseProto): 788 789 def connection_lost(self, exc): 790 super().connection_lost(exc) 791 loop.call_soon(loop.stop) 792 793 def data_received(self, data): 794 super().data_received(data) 795 self.transport.write(expected_response) 796 797 lsock = socket.socket() 798 lsock.bind(('127.0.0.1', 0)) 799 lsock.listen(1) 800 addr = lsock.getsockname() 801 802 message = b'test data' 803 response = None 804 expected_response = b'roger' 805 806 def client(): 807 nonlocal response 808 try: 809 csock = socket.socket() 810 if client_ssl is not None: 811 csock = client_ssl.wrap_socket(csock) 812 csock.connect(addr) 813 csock.sendall(message) 814 response = csock.recv(99) 815 csock.close() 816 except Exception as exc: 817 print( 818 "Failure in client thread in test_connect_accepted_socket", 819 exc) 820 821 thread = threading.Thread(target=client, daemon=True) 822 thread.start() 823 824 conn, _ = lsock.accept() 825 proto = MyProto(loop=loop) 826 proto.loop = loop 827 loop.run_until_complete( 828 loop.connect_accepted_socket( 829 (lambda: proto), conn, ssl=server_ssl)) 830 loop.run_forever() 831 proto.transport.close() 832 lsock.close() 833 834 support.join_thread(thread, timeout=1) 835 self.assertFalse(thread.is_alive()) 836 self.assertEqual(proto.state, 'CLOSED') 837 self.assertEqual(proto.nbytes, len(message)) 838 self.assertEqual(response, expected_response) 839 840 @unittest.skipIf(ssl is None, 'No ssl module') 841 def test_ssl_connect_accepted_socket(self): 842 if (sys.platform == 'win32' and 843 sys.version_info < (3, 5) and 844 isinstance(self.loop, proactor_events.BaseProactorEventLoop) 845 ): 846 raise unittest.SkipTest( 847 'SSL not supported with proactor event loops before Python 3.5' 848 ) 849 850 server_context = test_utils.simple_server_sslcontext() 851 client_context = test_utils.simple_client_sslcontext() 852 853 self.test_connect_accepted_socket(server_context, client_context) 854 855 def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self): 856 sock = socket.socket() 857 self.addCleanup(sock.close) 858 coro = self.loop.connect_accepted_socket( 859 MyProto, sock, ssl_handshake_timeout=1) 860 with self.assertRaisesRegex( 861 ValueError, 862 'ssl_handshake_timeout is only meaningful with ssl'): 863 self.loop.run_until_complete(coro) 864 865 @mock.patch('asyncio.base_events.socket') 866 def create_server_multiple_hosts(self, family, hosts, mock_sock): 867 @asyncio.coroutine 868 def getaddrinfo(host, port, *args, **kw): 869 if family == socket.AF_INET: 870 return [(family, socket.SOCK_STREAM, 6, '', (host, port))] 871 else: 872 return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))] 873 874 def getaddrinfo_task(*args, **kwds): 875 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 876 877 unique_hosts = set(hosts) 878 879 if family == socket.AF_INET: 880 mock_sock.socket().getsockbyname.side_effect = [ 881 (host, 80) for host in unique_hosts] 882 else: 883 mock_sock.socket().getsockbyname.side_effect = [ 884 (host, 80, 0, 0) for host in unique_hosts] 885 self.loop.getaddrinfo = getaddrinfo_task 886 self.loop._start_serving = mock.Mock() 887 self.loop._stop_serving = mock.Mock() 888 f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80) 889 server = self.loop.run_until_complete(f) 890 self.addCleanup(server.close) 891 server_hosts = {sock.getsockbyname()[0] for sock in server.sockets} 892 self.assertEqual(server_hosts, unique_hosts) 893 894 def test_create_server_multiple_hosts_ipv4(self): 895 self.create_server_multiple_hosts(socket.AF_INET, 896 ['1.2.3.4', '5.6.7.8', '1.2.3.4']) 897 898 def test_create_server_multiple_hosts_ipv6(self): 899 self.create_server_multiple_hosts(socket.AF_INET6, 900 ['::1', '::2', '::1']) 901 902 def test_create_server(self): 903 proto = MyProto(self.loop) 904 f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) 905 server = self.loop.run_until_complete(f) 906 self.assertEqual(len(server.sockets), 1) 907 sock = server.sockets[0] 908 host, port = sock.getsockname() 909 self.assertEqual(host, '0.0.0.0') 910 client = socket.socket() 911 client.connect(('127.0.0.1', port)) 912 client.sendall(b'xxx') 913 914 self.loop.run_until_complete(proto.connected) 915 self.assertEqual('CONNECTED', proto.state) 916 917 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 918 self.assertEqual(3, proto.nbytes) 919 920 # extra info is available 921 self.assertIsNotNone(proto.transport.get_extra_info('sockname')) 922 self.assertEqual('127.0.0.1', 923 proto.transport.get_extra_info('peername')[0]) 924 925 # close connection 926 proto.transport.close() 927 self.loop.run_until_complete(proto.done) 928 929 self.assertEqual('CLOSED', proto.state) 930 931 # the client socket must be closed after to avoid ECONNRESET upon 932 # recv()/send() on the serving socket 933 client.close() 934 935 # close server 936 server.close() 937 938 @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT') 939 def test_create_server_reuse_port(self): 940 proto = MyProto(self.loop) 941 f = self.loop.create_server( 942 lambda: proto, '0.0.0.0', 0) 943 server = self.loop.run_until_complete(f) 944 self.assertEqual(len(server.sockets), 1) 945 sock = server.sockets[0] 946 self.assertFalse( 947 sock.getsockopt( 948 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 949 server.close() 950 951 test_utils.run_briefly(self.loop) 952 953 proto = MyProto(self.loop) 954 f = self.loop.create_server( 955 lambda: proto, '0.0.0.0', 0, reuse_port=True) 956 server = self.loop.run_until_complete(f) 957 self.assertEqual(len(server.sockets), 1) 958 sock = server.sockets[0] 959 self.assertTrue( 960 sock.getsockopt( 961 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 962 server.close() 963 964 def _make_unix_server(self, factory, **kwargs): 965 path = test_utils.gen_unix_socket_path() 966 self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) 967 968 f = self.loop.create_unix_server(factory, path, **kwargs) 969 server = self.loop.run_until_complete(f) 970 971 return server, path 972 973 @support.skip_unless_bind_unix_socket 974 def test_create_unix_server(self): 975 proto = MyProto(loop=self.loop) 976 server, path = self._make_unix_server(lambda: proto) 977 self.assertEqual(len(server.sockets), 1) 978 979 client = socket.socket(socket.AF_UNIX) 980 client.connect(path) 981 client.sendall(b'xxx') 982 983 self.loop.run_until_complete(proto.connected) 984 self.assertEqual('CONNECTED', proto.state) 985 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 986 self.assertEqual(3, proto.nbytes) 987 988 # close connection 989 proto.transport.close() 990 self.loop.run_until_complete(proto.done) 991 992 self.assertEqual('CLOSED', proto.state) 993 994 # the client socket must be closed after to avoid ECONNRESET upon 995 # recv()/send() on the serving socket 996 client.close() 997 998 # close server 999 server.close() 1000 1001 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1002 def test_create_unix_server_path_socket_error(self): 1003 proto = MyProto(loop=self.loop) 1004 sock = socket.socket() 1005 with sock: 1006 f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock) 1007 with self.assertRaisesRegex(ValueError, 1008 'path and sock can not be specified ' 1009 'at the same time'): 1010 self.loop.run_until_complete(f) 1011 1012 def _create_ssl_context(self, certfile, keyfile=None): 1013 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 1014 sslcontext.options |= ssl.OP_NO_SSLv2 1015 sslcontext.load_cert_chain(certfile, keyfile) 1016 return sslcontext 1017 1018 def _make_ssl_server(self, factory, certfile, keyfile=None): 1019 sslcontext = self._create_ssl_context(certfile, keyfile) 1020 1021 f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext) 1022 server = self.loop.run_until_complete(f) 1023 1024 sock = server.sockets[0] 1025 host, port = sock.getsockname() 1026 self.assertEqual(host, '127.0.0.1') 1027 return server, host, port 1028 1029 def _make_ssl_unix_server(self, factory, certfile, keyfile=None): 1030 sslcontext = self._create_ssl_context(certfile, keyfile) 1031 return self._make_unix_server(factory, ssl=sslcontext) 1032 1033 @unittest.skipIf(ssl is None, 'No ssl module') 1034 def test_create_server_ssl(self): 1035 proto = MyProto(loop=self.loop) 1036 server, host, port = self._make_ssl_server( 1037 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 1038 1039 f_c = self.loop.create_connection(MyBaseProto, host, port, 1040 ssl=test_utils.dummy_ssl_context()) 1041 client, pr = self.loop.run_until_complete(f_c) 1042 1043 client.write(b'xxx') 1044 self.loop.run_until_complete(proto.connected) 1045 self.assertEqual('CONNECTED', proto.state) 1046 1047 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 1048 self.assertEqual(3, proto.nbytes) 1049 1050 # extra info is available 1051 self.check_ssl_extra_info(client, peername=(host, port)) 1052 1053 # close connection 1054 proto.transport.close() 1055 self.loop.run_until_complete(proto.done) 1056 self.assertEqual('CLOSED', proto.state) 1057 1058 # the client socket must be closed after to avoid ECONNRESET upon 1059 # recv()/send() on the serving socket 1060 client.close() 1061 1062 # stop serving 1063 server.close() 1064 1065 @support.skip_unless_bind_unix_socket 1066 @unittest.skipIf(ssl is None, 'No ssl module') 1067 def test_create_unix_server_ssl(self): 1068 proto = MyProto(loop=self.loop) 1069 server, path = self._make_ssl_unix_server( 1070 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 1071 1072 f_c = self.loop.create_unix_connection( 1073 MyBaseProto, path, ssl=test_utils.dummy_ssl_context(), 1074 server_hostname='') 1075 1076 client, pr = self.loop.run_until_complete(f_c) 1077 1078 client.write(b'xxx') 1079 self.loop.run_until_complete(proto.connected) 1080 self.assertEqual('CONNECTED', proto.state) 1081 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 1082 self.assertEqual(3, proto.nbytes) 1083 1084 # close connection 1085 proto.transport.close() 1086 self.loop.run_until_complete(proto.done) 1087 self.assertEqual('CLOSED', proto.state) 1088 1089 # the client socket must be closed after to avoid ECONNRESET upon 1090 # recv()/send() on the serving socket 1091 client.close() 1092 1093 # stop serving 1094 server.close() 1095 1096 @unittest.skipIf(ssl is None, 'No ssl module') 1097 def test_create_server_ssl_verify_failed(self): 1098 proto = MyProto(loop=self.loop) 1099 server, host, port = self._make_ssl_server( 1100 lambda: proto, test_utils.SIGNED_CERTFILE) 1101 1102 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1103 sslcontext_client.options |= ssl.OP_NO_SSLv2 1104 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1105 if hasattr(sslcontext_client, 'check_hostname'): 1106 sslcontext_client.check_hostname = True 1107 1108 1109 # no CA loaded 1110 f_c = self.loop.create_connection(MyProto, host, port, 1111 ssl=sslcontext_client) 1112 with mock.patch.object(self.loop, 'call_exception_handler'): 1113 with test_utils.disable_logger(): 1114 with self.assertRaisesRegex(ssl.SSLError, 1115 '(?i)certificate.verify.failed'): 1116 self.loop.run_until_complete(f_c) 1117 1118 # execute the loop to log the connection error 1119 test_utils.run_briefly(self.loop) 1120 1121 # close connection 1122 self.assertIsNone(proto.transport) 1123 server.close() 1124 1125 @support.skip_unless_bind_unix_socket 1126 @unittest.skipIf(ssl is None, 'No ssl module') 1127 def test_create_unix_server_ssl_verify_failed(self): 1128 proto = MyProto(loop=self.loop) 1129 server, path = self._make_ssl_unix_server( 1130 lambda: proto, test_utils.SIGNED_CERTFILE) 1131 1132 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1133 sslcontext_client.options |= ssl.OP_NO_SSLv2 1134 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1135 if hasattr(sslcontext_client, 'check_hostname'): 1136 sslcontext_client.check_hostname = True 1137 1138 # no CA loaded 1139 f_c = self.loop.create_unix_connection(MyProto, path, 1140 ssl=sslcontext_client, 1141 server_hostname='invalid') 1142 with mock.patch.object(self.loop, 'call_exception_handler'): 1143 with test_utils.disable_logger(): 1144 with self.assertRaisesRegex(ssl.SSLError, 1145 '(?i)certificate.verify.failed'): 1146 self.loop.run_until_complete(f_c) 1147 1148 # execute the loop to log the connection error 1149 test_utils.run_briefly(self.loop) 1150 1151 # close connection 1152 self.assertIsNone(proto.transport) 1153 server.close() 1154 1155 @unittest.skipIf(ssl is None, 'No ssl module') 1156 def test_create_server_ssl_match_failed(self): 1157 proto = MyProto(loop=self.loop) 1158 server, host, port = self._make_ssl_server( 1159 lambda: proto, test_utils.SIGNED_CERTFILE) 1160 1161 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1162 sslcontext_client.options |= ssl.OP_NO_SSLv2 1163 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1164 sslcontext_client.load_verify_locations( 1165 cafile=test_utils.SIGNING_CA) 1166 if hasattr(sslcontext_client, 'check_hostname'): 1167 sslcontext_client.check_hostname = True 1168 1169 # incorrect server_hostname 1170 f_c = self.loop.create_connection(MyProto, host, port, 1171 ssl=sslcontext_client) 1172 with mock.patch.object(self.loop, 'call_exception_handler'): 1173 with test_utils.disable_logger(): 1174 with self.assertRaisesRegex( 1175 ssl.CertificateError, 1176 "IP address mismatch, certificate is not valid for " 1177 "'127.0.0.1'"): 1178 self.loop.run_until_complete(f_c) 1179 1180 # close connection 1181 # transport is None because TLS ALERT aborted the handshake 1182 self.assertIsNone(proto.transport) 1183 server.close() 1184 1185 @support.skip_unless_bind_unix_socket 1186 @unittest.skipIf(ssl is None, 'No ssl module') 1187 def test_create_unix_server_ssl_verified(self): 1188 proto = MyProto(loop=self.loop) 1189 server, path = self._make_ssl_unix_server( 1190 lambda: proto, test_utils.SIGNED_CERTFILE) 1191 1192 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1193 sslcontext_client.options |= ssl.OP_NO_SSLv2 1194 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1195 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1196 if hasattr(sslcontext_client, 'check_hostname'): 1197 sslcontext_client.check_hostname = True 1198 1199 # Connection succeeds with correct CA and server hostname. 1200 f_c = self.loop.create_unix_connection(MyProto, path, 1201 ssl=sslcontext_client, 1202 server_hostname='localhost') 1203 client, pr = self.loop.run_until_complete(f_c) 1204 1205 # close connection 1206 proto.transport.close() 1207 client.close() 1208 server.close() 1209 self.loop.run_until_complete(proto.done) 1210 1211 @unittest.skipIf(ssl is None, 'No ssl module') 1212 def test_create_server_ssl_verified(self): 1213 proto = MyProto(loop=self.loop) 1214 server, host, port = self._make_ssl_server( 1215 lambda: proto, test_utils.SIGNED_CERTFILE) 1216 1217 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1218 sslcontext_client.options |= ssl.OP_NO_SSLv2 1219 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1220 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1221 if hasattr(sslcontext_client, 'check_hostname'): 1222 sslcontext_client.check_hostname = True 1223 1224 # Connection succeeds with correct CA and server hostname. 1225 f_c = self.loop.create_connection(MyProto, host, port, 1226 ssl=sslcontext_client, 1227 server_hostname='localhost') 1228 client, pr = self.loop.run_until_complete(f_c) 1229 1230 # extra info is available 1231 self.check_ssl_extra_info(client,peername=(host, port), 1232 peercert=test_utils.PEERCERT) 1233 1234 # close connection 1235 proto.transport.close() 1236 client.close() 1237 server.close() 1238 self.loop.run_until_complete(proto.done) 1239 1240 def test_create_server_sock(self): 1241 proto = asyncio.Future(loop=self.loop) 1242 1243 class TestMyProto(MyProto): 1244 def connection_made(self, transport): 1245 super().connection_made(transport) 1246 proto.set_result(self) 1247 1248 sock_ob = socket.socket(type=socket.SOCK_STREAM) 1249 sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1250 sock_ob.bind(('0.0.0.0', 0)) 1251 1252 f = self.loop.create_server(TestMyProto, sock=sock_ob) 1253 server = self.loop.run_until_complete(f) 1254 sock = server.sockets[0] 1255 self.assertIs(sock, sock_ob) 1256 1257 host, port = sock.getsockname() 1258 self.assertEqual(host, '0.0.0.0') 1259 client = socket.socket() 1260 client.connect(('127.0.0.1', port)) 1261 client.send(b'xxx') 1262 client.close() 1263 server.close() 1264 1265 def test_create_server_addr_in_use(self): 1266 sock_ob = socket.socket(type=socket.SOCK_STREAM) 1267 sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1268 sock_ob.bind(('0.0.0.0', 0)) 1269 1270 f = self.loop.create_server(MyProto, sock=sock_ob) 1271 server = self.loop.run_until_complete(f) 1272 sock = server.sockets[0] 1273 host, port = sock.getsockname() 1274 1275 f = self.loop.create_server(MyProto, host=host, port=port) 1276 with self.assertRaises(OSError) as cm: 1277 self.loop.run_until_complete(f) 1278 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 1279 1280 server.close() 1281 1282 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') 1283 def test_create_server_dual_stack(self): 1284 f_proto = asyncio.Future(loop=self.loop) 1285 1286 class TestMyProto(MyProto): 1287 def connection_made(self, transport): 1288 super().connection_made(transport) 1289 f_proto.set_result(self) 1290 1291 try_count = 0 1292 while True: 1293 try: 1294 port = support.find_unused_port() 1295 f = self.loop.create_server(TestMyProto, host=None, port=port) 1296 server = self.loop.run_until_complete(f) 1297 except OSError as ex: 1298 if ex.errno == errno.EADDRINUSE: 1299 try_count += 1 1300 self.assertGreaterEqual(5, try_count) 1301 continue 1302 else: 1303 raise 1304 else: 1305 break 1306 client = socket.socket() 1307 client.connect(('127.0.0.1', port)) 1308 client.send(b'xxx') 1309 proto = self.loop.run_until_complete(f_proto) 1310 proto.transport.close() 1311 client.close() 1312 1313 f_proto = asyncio.Future(loop=self.loop) 1314 client = socket.socket(socket.AF_INET6) 1315 client.connect(('::1', port)) 1316 client.send(b'xxx') 1317 proto = self.loop.run_until_complete(f_proto) 1318 proto.transport.close() 1319 client.close() 1320 1321 server.close() 1322 1323 def test_server_close(self): 1324 f = self.loop.create_server(MyProto, '0.0.0.0', 0) 1325 server = self.loop.run_until_complete(f) 1326 sock = server.sockets[0] 1327 host, port = sock.getsockname() 1328 1329 client = socket.socket() 1330 client.connect(('127.0.0.1', port)) 1331 client.send(b'xxx') 1332 client.close() 1333 1334 server.close() 1335 1336 client = socket.socket() 1337 self.assertRaises( 1338 ConnectionRefusedError, client.connect, ('127.0.0.1', port)) 1339 client.close() 1340 1341 def test_create_datagram_endpoint(self): 1342 class TestMyDatagramProto(MyDatagramProto): 1343 def __init__(inner_self): 1344 super().__init__(loop=self.loop) 1345 1346 def datagram_received(self, data, addr): 1347 super().datagram_received(data, addr) 1348 self.transport.sendto(b'resp:'+data, addr) 1349 1350 coro = self.loop.create_datagram_endpoint( 1351 TestMyDatagramProto, local_addr=('127.0.0.1', 0)) 1352 s_transport, server = self.loop.run_until_complete(coro) 1353 host, port = s_transport.get_extra_info('sockname') 1354 1355 self.assertIsInstance(s_transport, asyncio.Transport) 1356 self.assertIsInstance(server, TestMyDatagramProto) 1357 self.assertEqual('INITIALIZED', server.state) 1358 self.assertIs(server.transport, s_transport) 1359 1360 coro = self.loop.create_datagram_endpoint( 1361 lambda: MyDatagramProto(loop=self.loop), 1362 remote_addr=(host, port)) 1363 transport, client = self.loop.run_until_complete(coro) 1364 1365 self.assertIsInstance(transport, asyncio.Transport) 1366 self.assertIsInstance(client, MyDatagramProto) 1367 self.assertEqual('INITIALIZED', client.state) 1368 self.assertIs(client.transport, transport) 1369 1370 transport.sendto(b'xxx') 1371 test_utils.run_until(self.loop, lambda: server.nbytes) 1372 self.assertEqual(3, server.nbytes) 1373 test_utils.run_until(self.loop, lambda: client.nbytes) 1374 1375 # received 1376 self.assertEqual(8, client.nbytes) 1377 1378 # extra info is available 1379 self.assertIsNotNone(transport.get_extra_info('sockname')) 1380 1381 # close connection 1382 transport.close() 1383 self.loop.run_until_complete(client.done) 1384 self.assertEqual('CLOSED', client.state) 1385 server.transport.close() 1386 1387 def test_create_datagram_endpoint_sock(self): 1388 if (sys.platform == 'win32' and 1389 isinstance(self.loop, proactor_events.BaseProactorEventLoop)): 1390 raise unittest.SkipTest( 1391 'UDP is not supported with proactor event loops') 1392 1393 sock = None 1394 local_address = ('127.0.0.1', 0) 1395 infos = self.loop.run_until_complete( 1396 self.loop.getaddrinfo( 1397 *local_address, type=socket.SOCK_DGRAM)) 1398 for family, type, proto, cname, address in infos: 1399 try: 1400 sock = socket.socket(family=family, type=type, proto=proto) 1401 sock.setblocking(False) 1402 sock.bind(address) 1403 except: 1404 pass 1405 else: 1406 break 1407 else: 1408 assert False, 'Can not create socket.' 1409 1410 f = self.loop.create_datagram_endpoint( 1411 lambda: MyDatagramProto(loop=self.loop), sock=sock) 1412 tr, pr = self.loop.run_until_complete(f) 1413 self.assertIsInstance(tr, asyncio.Transport) 1414 self.assertIsInstance(pr, MyDatagramProto) 1415 tr.close() 1416 self.loop.run_until_complete(pr.done) 1417 1418 def test_internal_fds(self): 1419 loop = self.create_event_loop() 1420 if not isinstance(loop, selector_events.BaseSelectorEventLoop): 1421 loop.close() 1422 self.skipTest('loop is not a BaseSelectorEventLoop') 1423 1424 self.assertEqual(1, loop._internal_fds) 1425 loop.close() 1426 self.assertEqual(0, loop._internal_fds) 1427 self.assertIsNone(loop._csock) 1428 self.assertIsNone(loop._ssock) 1429 1430 @unittest.skipUnless(sys.platform != 'win32', 1431 "Don't support pipes for Windows") 1432 def test_read_pipe(self): 1433 proto = MyReadPipeProto(loop=self.loop) 1434 1435 rpipe, wpipe = os.pipe() 1436 pipeobj = io.open(rpipe, 'rb', 1024) 1437 1438 async def connect(): 1439 t, p = await self.loop.connect_read_pipe( 1440 lambda: proto, pipeobj) 1441 self.assertIs(p, proto) 1442 self.assertIs(t, proto.transport) 1443 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1444 self.assertEqual(0, proto.nbytes) 1445 1446 self.loop.run_until_complete(connect()) 1447 1448 os.write(wpipe, b'1') 1449 test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) 1450 self.assertEqual(1, proto.nbytes) 1451 1452 os.write(wpipe, b'2345') 1453 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1454 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1455 self.assertEqual(5, proto.nbytes) 1456 1457 os.close(wpipe) 1458 self.loop.run_until_complete(proto.done) 1459 self.assertEqual( 1460 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1461 # extra info is available 1462 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1463 1464 @unittest.skipUnless(sys.platform != 'win32', 1465 "Don't support pipes for Windows") 1466 def test_unclosed_pipe_transport(self): 1467 # This test reproduces the issue #314 on GitHub 1468 loop = self.create_event_loop() 1469 read_proto = MyReadPipeProto(loop=loop) 1470 write_proto = MyWritePipeProto(loop=loop) 1471 1472 rpipe, wpipe = os.pipe() 1473 rpipeobj = io.open(rpipe, 'rb', 1024) 1474 wpipeobj = io.open(wpipe, 'w', 1024) 1475 1476 async def connect(): 1477 read_transport, _ = await loop.connect_read_pipe( 1478 lambda: read_proto, rpipeobj) 1479 write_transport, _ = await loop.connect_write_pipe( 1480 lambda: write_proto, wpipeobj) 1481 return read_transport, write_transport 1482 1483 # Run and close the loop without closing the transports 1484 read_transport, write_transport = loop.run_until_complete(connect()) 1485 loop.close() 1486 1487 # These 'repr' calls used to raise an AttributeError 1488 # See Issue #314 on GitHub 1489 self.assertIn('open', repr(read_transport)) 1490 self.assertIn('open', repr(write_transport)) 1491 1492 # Clean up (avoid ResourceWarning) 1493 rpipeobj.close() 1494 wpipeobj.close() 1495 read_transport._pipe = None 1496 write_transport._pipe = None 1497 1498 @unittest.skipUnless(sys.platform != 'win32', 1499 "Don't support pipes for Windows") 1500 def test_read_pty_output(self): 1501 proto = MyReadPipeProto(loop=self.loop) 1502 1503 master, slave = os.openpty() 1504 master_read_obj = io.open(master, 'rb', 0) 1505 1506 async def connect(): 1507 t, p = await self.loop.connect_read_pipe(lambda: proto, 1508 master_read_obj) 1509 self.assertIs(p, proto) 1510 self.assertIs(t, proto.transport) 1511 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1512 self.assertEqual(0, proto.nbytes) 1513 1514 self.loop.run_until_complete(connect()) 1515 1516 os.write(slave, b'1') 1517 test_utils.run_until(self.loop, lambda: proto.nbytes) 1518 self.assertEqual(1, proto.nbytes) 1519 1520 os.write(slave, b'2345') 1521 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1522 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1523 self.assertEqual(5, proto.nbytes) 1524 1525 os.close(slave) 1526 proto.transport.close() 1527 self.loop.run_until_complete(proto.done) 1528 self.assertEqual( 1529 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1530 # extra info is available 1531 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1532 1533 @unittest.skipUnless(sys.platform != 'win32', 1534 "Don't support pipes for Windows") 1535 def test_write_pipe(self): 1536 rpipe, wpipe = os.pipe() 1537 pipeobj = io.open(wpipe, 'wb', 1024) 1538 1539 proto = MyWritePipeProto(loop=self.loop) 1540 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1541 transport, p = self.loop.run_until_complete(connect) 1542 self.assertIs(p, proto) 1543 self.assertIs(transport, proto.transport) 1544 self.assertEqual('CONNECTED', proto.state) 1545 1546 transport.write(b'1') 1547 1548 data = bytearray() 1549 def reader(data): 1550 chunk = os.read(rpipe, 1024) 1551 data += chunk 1552 return len(data) 1553 1554 test_utils.run_until(self.loop, lambda: reader(data) >= 1) 1555 self.assertEqual(b'1', data) 1556 1557 transport.write(b'2345') 1558 test_utils.run_until(self.loop, lambda: reader(data) >= 5) 1559 self.assertEqual(b'12345', data) 1560 self.assertEqual('CONNECTED', proto.state) 1561 1562 os.close(rpipe) 1563 1564 # extra info is available 1565 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1566 1567 # close connection 1568 proto.transport.close() 1569 self.loop.run_until_complete(proto.done) 1570 self.assertEqual('CLOSED', proto.state) 1571 1572 @unittest.skipUnless(sys.platform != 'win32', 1573 "Don't support pipes for Windows") 1574 def test_write_pipe_disconnect_on_close(self): 1575 rsock, wsock = socket.socketpair() 1576 rsock.setblocking(False) 1577 pipeobj = io.open(wsock.detach(), 'wb', 1024) 1578 1579 proto = MyWritePipeProto(loop=self.loop) 1580 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1581 transport, p = self.loop.run_until_complete(connect) 1582 self.assertIs(p, proto) 1583 self.assertIs(transport, proto.transport) 1584 self.assertEqual('CONNECTED', proto.state) 1585 1586 transport.write(b'1') 1587 data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) 1588 self.assertEqual(b'1', data) 1589 1590 rsock.close() 1591 1592 self.loop.run_until_complete(proto.done) 1593 self.assertEqual('CLOSED', proto.state) 1594 1595 @unittest.skipUnless(sys.platform != 'win32', 1596 "Don't support pipes for Windows") 1597 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1598 # older than 10.6 (Snow Leopard) 1599 @support.requires_mac_ver(10, 6) 1600 def test_write_pty(self): 1601 master, slave = os.openpty() 1602 slave_write_obj = io.open(slave, 'wb', 0) 1603 1604 proto = MyWritePipeProto(loop=self.loop) 1605 connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) 1606 transport, p = self.loop.run_until_complete(connect) 1607 self.assertIs(p, proto) 1608 self.assertIs(transport, proto.transport) 1609 self.assertEqual('CONNECTED', proto.state) 1610 1611 transport.write(b'1') 1612 1613 data = bytearray() 1614 def reader(data): 1615 chunk = os.read(master, 1024) 1616 data += chunk 1617 return len(data) 1618 1619 test_utils.run_until(self.loop, lambda: reader(data) >= 1, 1620 timeout=10) 1621 self.assertEqual(b'1', data) 1622 1623 transport.write(b'2345') 1624 test_utils.run_until(self.loop, lambda: reader(data) >= 5, 1625 timeout=10) 1626 self.assertEqual(b'12345', data) 1627 self.assertEqual('CONNECTED', proto.state) 1628 1629 os.close(master) 1630 1631 # extra info is available 1632 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1633 1634 # close connection 1635 proto.transport.close() 1636 self.loop.run_until_complete(proto.done) 1637 self.assertEqual('CLOSED', proto.state) 1638 1639 @unittest.skipUnless(sys.platform != 'win32', 1640 "Don't support pipes for Windows") 1641 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1642 # older than 10.6 (Snow Leopard) 1643 @support.requires_mac_ver(10, 6) 1644 def test_bidirectional_pty(self): 1645 master, read_slave = os.openpty() 1646 write_slave = os.dup(read_slave) 1647 tty.setraw(read_slave) 1648 1649 slave_read_obj = io.open(read_slave, 'rb', 0) 1650 read_proto = MyReadPipeProto(loop=self.loop) 1651 read_connect = self.loop.connect_read_pipe(lambda: read_proto, 1652 slave_read_obj) 1653 read_transport, p = self.loop.run_until_complete(read_connect) 1654 self.assertIs(p, read_proto) 1655 self.assertIs(read_transport, read_proto.transport) 1656 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1657 self.assertEqual(0, read_proto.nbytes) 1658 1659 1660 slave_write_obj = io.open(write_slave, 'wb', 0) 1661 write_proto = MyWritePipeProto(loop=self.loop) 1662 write_connect = self.loop.connect_write_pipe(lambda: write_proto, 1663 slave_write_obj) 1664 write_transport, p = self.loop.run_until_complete(write_connect) 1665 self.assertIs(p, write_proto) 1666 self.assertIs(write_transport, write_proto.transport) 1667 self.assertEqual('CONNECTED', write_proto.state) 1668 1669 data = bytearray() 1670 def reader(data): 1671 chunk = os.read(master, 1024) 1672 data += chunk 1673 return len(data) 1674 1675 write_transport.write(b'1') 1676 test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) 1677 self.assertEqual(b'1', data) 1678 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1679 self.assertEqual('CONNECTED', write_proto.state) 1680 1681 os.write(master, b'a') 1682 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, 1683 timeout=10) 1684 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1685 self.assertEqual(1, read_proto.nbytes) 1686 self.assertEqual('CONNECTED', write_proto.state) 1687 1688 write_transport.write(b'2345') 1689 test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) 1690 self.assertEqual(b'12345', data) 1691 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1692 self.assertEqual('CONNECTED', write_proto.state) 1693 1694 os.write(master, b'bcde') 1695 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, 1696 timeout=10) 1697 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1698 self.assertEqual(5, read_proto.nbytes) 1699 self.assertEqual('CONNECTED', write_proto.state) 1700 1701 os.close(master) 1702 1703 read_transport.close() 1704 self.loop.run_until_complete(read_proto.done) 1705 self.assertEqual( 1706 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) 1707 1708 write_transport.close() 1709 self.loop.run_until_complete(write_proto.done) 1710 self.assertEqual('CLOSED', write_proto.state) 1711 1712 def test_prompt_cancellation(self): 1713 r, w = socket.socketpair() 1714 r.setblocking(False) 1715 f = self.loop.create_task(self.loop.sock_recv(r, 1)) 1716 ov = getattr(f, 'ov', None) 1717 if ov is not None: 1718 self.assertTrue(ov.pending) 1719 1720 async def main(): 1721 try: 1722 self.loop.call_soon(f.cancel) 1723 await f 1724 except asyncio.CancelledError: 1725 res = 'cancelled' 1726 else: 1727 res = None 1728 finally: 1729 self.loop.stop() 1730 return res 1731 1732 start = time.monotonic() 1733 t = asyncio.Task(main(), loop=self.loop) 1734 self.loop.run_forever() 1735 elapsed = time.monotonic() - start 1736 1737 self.assertLess(elapsed, 0.1) 1738 self.assertEqual(t.result(), 'cancelled') 1739 self.assertRaises(asyncio.CancelledError, f.result) 1740 if ov is not None: 1741 self.assertFalse(ov.pending) 1742 self.loop._stop_serving(r) 1743 1744 r.close() 1745 w.close() 1746 1747 def test_timeout_rounding(self): 1748 def _run_once(): 1749 self.loop._run_once_counter += 1 1750 orig_run_once() 1751 1752 orig_run_once = self.loop._run_once 1753 self.loop._run_once_counter = 0 1754 self.loop._run_once = _run_once 1755 1756 async def wait(): 1757 loop = self.loop 1758 await asyncio.sleep(1e-2, loop=loop) 1759 await asyncio.sleep(1e-4, loop=loop) 1760 await asyncio.sleep(1e-6, loop=loop) 1761 await asyncio.sleep(1e-8, loop=loop) 1762 await asyncio.sleep(1e-10, loop=loop) 1763 1764 self.loop.run_until_complete(wait()) 1765 # The ideal number of call is 12, but on some platforms, the selector 1766 # may sleep at little bit less than timeout depending on the resolution 1767 # of the clock used by the kernel. Tolerate a few useless calls on 1768 # these platforms. 1769 self.assertLessEqual(self.loop._run_once_counter, 20, 1770 {'clock_resolution': self.loop._clock_resolution, 1771 'selector': self.loop._selector.__class__.__name__}) 1772 1773 def test_remove_fds_after_closing(self): 1774 loop = self.create_event_loop() 1775 callback = lambda: None 1776 r, w = socket.socketpair() 1777 self.addCleanup(r.close) 1778 self.addCleanup(w.close) 1779 loop.add_reader(r, callback) 1780 loop.add_writer(w, callback) 1781 loop.close() 1782 self.assertFalse(loop.remove_reader(r)) 1783 self.assertFalse(loop.remove_writer(w)) 1784 1785 def test_add_fds_after_closing(self): 1786 loop = self.create_event_loop() 1787 callback = lambda: None 1788 r, w = socket.socketpair() 1789 self.addCleanup(r.close) 1790 self.addCleanup(w.close) 1791 loop.close() 1792 with self.assertRaises(RuntimeError): 1793 loop.add_reader(r, callback) 1794 with self.assertRaises(RuntimeError): 1795 loop.add_writer(w, callback) 1796 1797 def test_close_running_event_loop(self): 1798 @asyncio.coroutine 1799 def close_loop(loop): 1800 self.loop.close() 1801 1802 coro = close_loop(self.loop) 1803 with self.assertRaises(RuntimeError): 1804 self.loop.run_until_complete(coro) 1805 1806 def test_close(self): 1807 self.loop.close() 1808 1809 @asyncio.coroutine 1810 def test(): 1811 pass 1812 1813 func = lambda: False 1814 coro = test() 1815 self.addCleanup(coro.close) 1816 1817 # operation blocked when the loop is closed 1818 with self.assertRaises(RuntimeError): 1819 self.loop.run_forever() 1820 with self.assertRaises(RuntimeError): 1821 fut = asyncio.Future(loop=self.loop) 1822 self.loop.run_until_complete(fut) 1823 with self.assertRaises(RuntimeError): 1824 self.loop.call_soon(func) 1825 with self.assertRaises(RuntimeError): 1826 self.loop.call_soon_threadsafe(func) 1827 with self.assertRaises(RuntimeError): 1828 self.loop.call_later(1.0, func) 1829 with self.assertRaises(RuntimeError): 1830 self.loop.call_at(self.loop.time() + .0, func) 1831 with self.assertRaises(RuntimeError): 1832 self.loop.create_task(coro) 1833 with self.assertRaises(RuntimeError): 1834 self.loop.add_signal_handler(signal.SIGTERM, func) 1835 1836 # run_in_executor test is tricky: the method is a coroutine, 1837 # but run_until_complete cannot be called on closed loop. 1838 # Thus iterate once explicitly. 1839 with self.assertRaises(RuntimeError): 1840 it = self.loop.run_in_executor(None, func).__await__() 1841 next(it) 1842 1843 1844class SubprocessTestsMixin: 1845 1846 def check_terminated(self, returncode): 1847 if sys.platform == 'win32': 1848 self.assertIsInstance(returncode, int) 1849 # expect 1 but sometimes get 0 1850 else: 1851 self.assertEqual(-signal.SIGTERM, returncode) 1852 1853 def check_killed(self, returncode): 1854 if sys.platform == 'win32': 1855 self.assertIsInstance(returncode, int) 1856 # expect 1 but sometimes get 0 1857 else: 1858 self.assertEqual(-signal.SIGKILL, returncode) 1859 1860 def test_subprocess_exec(self): 1861 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1862 1863 connect = self.loop.subprocess_exec( 1864 functools.partial(MySubprocessProtocol, self.loop), 1865 sys.executable, prog) 1866 transp, proto = self.loop.run_until_complete(connect) 1867 self.assertIsInstance(proto, MySubprocessProtocol) 1868 self.loop.run_until_complete(proto.connected) 1869 self.assertEqual('CONNECTED', proto.state) 1870 1871 stdin = transp.get_pipe_transport(0) 1872 stdin.write(b'Python The Winner') 1873 self.loop.run_until_complete(proto.got_data[1].wait()) 1874 with test_utils.disable_logger(): 1875 transp.close() 1876 self.loop.run_until_complete(proto.completed) 1877 self.check_killed(proto.returncode) 1878 self.assertEqual(b'Python The Winner', proto.data[1]) 1879 1880 def test_subprocess_interactive(self): 1881 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1882 1883 connect = self.loop.subprocess_exec( 1884 functools.partial(MySubprocessProtocol, self.loop), 1885 sys.executable, prog) 1886 transp, proto = self.loop.run_until_complete(connect) 1887 self.assertIsInstance(proto, MySubprocessProtocol) 1888 self.loop.run_until_complete(proto.connected) 1889 self.assertEqual('CONNECTED', proto.state) 1890 1891 stdin = transp.get_pipe_transport(0) 1892 stdin.write(b'Python ') 1893 self.loop.run_until_complete(proto.got_data[1].wait()) 1894 proto.got_data[1].clear() 1895 self.assertEqual(b'Python ', proto.data[1]) 1896 1897 stdin.write(b'The Winner') 1898 self.loop.run_until_complete(proto.got_data[1].wait()) 1899 self.assertEqual(b'Python The Winner', proto.data[1]) 1900 1901 with test_utils.disable_logger(): 1902 transp.close() 1903 self.loop.run_until_complete(proto.completed) 1904 self.check_killed(proto.returncode) 1905 1906 def test_subprocess_shell(self): 1907 connect = self.loop.subprocess_shell( 1908 functools.partial(MySubprocessProtocol, self.loop), 1909 'echo Python') 1910 transp, proto = self.loop.run_until_complete(connect) 1911 self.assertIsInstance(proto, MySubprocessProtocol) 1912 self.loop.run_until_complete(proto.connected) 1913 1914 transp.get_pipe_transport(0).close() 1915 self.loop.run_until_complete(proto.completed) 1916 self.assertEqual(0, proto.returncode) 1917 self.assertTrue(all(f.done() for f in proto.disconnects.values())) 1918 self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') 1919 self.assertEqual(proto.data[2], b'') 1920 transp.close() 1921 1922 def test_subprocess_exitcode(self): 1923 connect = self.loop.subprocess_shell( 1924 functools.partial(MySubprocessProtocol, self.loop), 1925 'exit 7', stdin=None, stdout=None, stderr=None) 1926 transp, proto = self.loop.run_until_complete(connect) 1927 self.assertIsInstance(proto, MySubprocessProtocol) 1928 self.loop.run_until_complete(proto.completed) 1929 self.assertEqual(7, proto.returncode) 1930 transp.close() 1931 1932 def test_subprocess_close_after_finish(self): 1933 connect = self.loop.subprocess_shell( 1934 functools.partial(MySubprocessProtocol, self.loop), 1935 'exit 7', stdin=None, stdout=None, stderr=None) 1936 transp, proto = self.loop.run_until_complete(connect) 1937 self.assertIsInstance(proto, MySubprocessProtocol) 1938 self.assertIsNone(transp.get_pipe_transport(0)) 1939 self.assertIsNone(transp.get_pipe_transport(1)) 1940 self.assertIsNone(transp.get_pipe_transport(2)) 1941 self.loop.run_until_complete(proto.completed) 1942 self.assertEqual(7, proto.returncode) 1943 self.assertIsNone(transp.close()) 1944 1945 def test_subprocess_kill(self): 1946 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1947 1948 connect = self.loop.subprocess_exec( 1949 functools.partial(MySubprocessProtocol, self.loop), 1950 sys.executable, prog) 1951 transp, proto = self.loop.run_until_complete(connect) 1952 self.assertIsInstance(proto, MySubprocessProtocol) 1953 self.loop.run_until_complete(proto.connected) 1954 1955 transp.kill() 1956 self.loop.run_until_complete(proto.completed) 1957 self.check_killed(proto.returncode) 1958 transp.close() 1959 1960 def test_subprocess_terminate(self): 1961 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1962 1963 connect = self.loop.subprocess_exec( 1964 functools.partial(MySubprocessProtocol, self.loop), 1965 sys.executable, prog) 1966 transp, proto = self.loop.run_until_complete(connect) 1967 self.assertIsInstance(proto, MySubprocessProtocol) 1968 self.loop.run_until_complete(proto.connected) 1969 1970 transp.terminate() 1971 self.loop.run_until_complete(proto.completed) 1972 self.check_terminated(proto.returncode) 1973 transp.close() 1974 1975 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 1976 def test_subprocess_send_signal(self): 1977 # bpo-31034: Make sure that we get the default signal handler (killing 1978 # the process). The parent process may have decided to ignore SIGHUP, 1979 # and signal handlers are inherited. 1980 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 1981 try: 1982 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1983 1984 connect = self.loop.subprocess_exec( 1985 functools.partial(MySubprocessProtocol, self.loop), 1986 sys.executable, prog) 1987 transp, proto = self.loop.run_until_complete(connect) 1988 self.assertIsInstance(proto, MySubprocessProtocol) 1989 self.loop.run_until_complete(proto.connected) 1990 1991 transp.send_signal(signal.SIGHUP) 1992 self.loop.run_until_complete(proto.completed) 1993 self.assertEqual(-signal.SIGHUP, proto.returncode) 1994 transp.close() 1995 finally: 1996 signal.signal(signal.SIGHUP, old_handler) 1997 1998 def test_subprocess_stderr(self): 1999 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 2000 2001 connect = self.loop.subprocess_exec( 2002 functools.partial(MySubprocessProtocol, self.loop), 2003 sys.executable, prog) 2004 transp, proto = self.loop.run_until_complete(connect) 2005 self.assertIsInstance(proto, MySubprocessProtocol) 2006 self.loop.run_until_complete(proto.connected) 2007 2008 stdin = transp.get_pipe_transport(0) 2009 stdin.write(b'test') 2010 2011 self.loop.run_until_complete(proto.completed) 2012 2013 transp.close() 2014 self.assertEqual(b'OUT:test', proto.data[1]) 2015 self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) 2016 self.assertEqual(0, proto.returncode) 2017 2018 def test_subprocess_stderr_redirect_to_stdout(self): 2019 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 2020 2021 connect = self.loop.subprocess_exec( 2022 functools.partial(MySubprocessProtocol, self.loop), 2023 sys.executable, prog, stderr=subprocess.STDOUT) 2024 transp, proto = self.loop.run_until_complete(connect) 2025 self.assertIsInstance(proto, MySubprocessProtocol) 2026 self.loop.run_until_complete(proto.connected) 2027 2028 stdin = transp.get_pipe_transport(0) 2029 self.assertIsNotNone(transp.get_pipe_transport(1)) 2030 self.assertIsNone(transp.get_pipe_transport(2)) 2031 2032 stdin.write(b'test') 2033 self.loop.run_until_complete(proto.completed) 2034 self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), 2035 proto.data[1]) 2036 self.assertEqual(b'', proto.data[2]) 2037 2038 transp.close() 2039 self.assertEqual(0, proto.returncode) 2040 2041 def test_subprocess_close_client_stream(self): 2042 prog = os.path.join(os.path.dirname(__file__), 'echo3.py') 2043 2044 connect = self.loop.subprocess_exec( 2045 functools.partial(MySubprocessProtocol, self.loop), 2046 sys.executable, prog) 2047 transp, proto = self.loop.run_until_complete(connect) 2048 self.assertIsInstance(proto, MySubprocessProtocol) 2049 self.loop.run_until_complete(proto.connected) 2050 2051 stdin = transp.get_pipe_transport(0) 2052 stdout = transp.get_pipe_transport(1) 2053 stdin.write(b'test') 2054 self.loop.run_until_complete(proto.got_data[1].wait()) 2055 self.assertEqual(b'OUT:test', proto.data[1]) 2056 2057 stdout.close() 2058 self.loop.run_until_complete(proto.disconnects[1]) 2059 stdin.write(b'xxx') 2060 self.loop.run_until_complete(proto.got_data[2].wait()) 2061 if sys.platform != 'win32': 2062 self.assertEqual(b'ERR:BrokenPipeError', proto.data[2]) 2063 else: 2064 # After closing the read-end of a pipe, writing to the 2065 # write-end using os.write() fails with errno==EINVAL and 2066 # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using 2067 # WriteFile() we get ERROR_BROKEN_PIPE as expected.) 2068 self.assertEqual(b'ERR:OSError', proto.data[2]) 2069 with test_utils.disable_logger(): 2070 transp.close() 2071 self.loop.run_until_complete(proto.completed) 2072 self.check_killed(proto.returncode) 2073 2074 def test_subprocess_wait_no_same_group(self): 2075 # start the new process in a new session 2076 connect = self.loop.subprocess_shell( 2077 functools.partial(MySubprocessProtocol, self.loop), 2078 'exit 7', stdin=None, stdout=None, stderr=None, 2079 start_new_session=True) 2080 _, proto = yield self.loop.run_until_complete(connect) 2081 self.assertIsInstance(proto, MySubprocessProtocol) 2082 self.loop.run_until_complete(proto.completed) 2083 self.assertEqual(7, proto.returncode) 2084 2085 def test_subprocess_exec_invalid_args(self): 2086 2087 async def connect(**kwds): 2088 await self.loop.subprocess_exec( 2089 asyncio.SubprocessProtocol, 2090 'pwd', **kwds) 2091 2092 with self.assertRaises(ValueError): 2093 self.loop.run_until_complete(connect(universal_newlines=True)) 2094 with self.assertRaises(ValueError): 2095 self.loop.run_until_complete(connect(bufsize=4096)) 2096 with self.assertRaises(ValueError): 2097 self.loop.run_until_complete(connect(shell=True)) 2098 2099 def test_subprocess_shell_invalid_args(self): 2100 2101 async def connect(cmd=None, **kwds): 2102 if not cmd: 2103 cmd = 'pwd' 2104 await self.loop.subprocess_shell( 2105 asyncio.SubprocessProtocol, 2106 cmd, **kwds) 2107 2108 with self.assertRaises(ValueError): 2109 self.loop.run_until_complete(connect(['ls', '-l'])) 2110 with self.assertRaises(ValueError): 2111 self.loop.run_until_complete(connect(universal_newlines=True)) 2112 with self.assertRaises(ValueError): 2113 self.loop.run_until_complete(connect(bufsize=4096)) 2114 with self.assertRaises(ValueError): 2115 self.loop.run_until_complete(connect(shell=False)) 2116 2117 2118class SendfileBase: 2119 2120 # 128 KiB plus small unaligned to buffer chunk 2121 DATA = b"SendfileBaseData" * (1024 * 8 + 1) 2122 2123 # Reduce socket buffer size to test on relative small data sets. 2124 BUF_SIZE = 4 * 1024 # 4 KiB 2125 2126 @classmethod 2127 def setUpClass(cls): 2128 with open(support.TESTFN, 'wb') as fp: 2129 fp.write(cls.DATA) 2130 super().setUpClass() 2131 2132 @classmethod 2133 def tearDownClass(cls): 2134 support.unlink(support.TESTFN) 2135 super().tearDownClass() 2136 2137 def setUp(self): 2138 self.file = open(support.TESTFN, 'rb') 2139 self.addCleanup(self.file.close) 2140 super().setUp() 2141 2142 def run_loop(self, coro): 2143 return self.loop.run_until_complete(coro) 2144 2145 2146class SockSendfileMixin(SendfileBase): 2147 2148 class MyProto(asyncio.Protocol): 2149 2150 def __init__(self, loop): 2151 self.started = False 2152 self.closed = False 2153 self.data = bytearray() 2154 self.fut = loop.create_future() 2155 self.transport = None 2156 2157 def connection_made(self, transport): 2158 self.started = True 2159 self.transport = transport 2160 2161 def data_received(self, data): 2162 self.data.extend(data) 2163 2164 def connection_lost(self, exc): 2165 self.closed = True 2166 self.fut.set_result(None) 2167 2168 async def wait_closed(self): 2169 await self.fut 2170 2171 @classmethod 2172 def setUpClass(cls): 2173 cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE 2174 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 2175 super().setUpClass() 2176 2177 @classmethod 2178 def tearDownClass(cls): 2179 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize 2180 super().tearDownClass() 2181 2182 def make_socket(self, cleanup=True): 2183 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2184 sock.setblocking(False) 2185 if cleanup: 2186 self.addCleanup(sock.close) 2187 return sock 2188 2189 def reduce_receive_buffer_size(self, sock): 2190 # Reduce receive socket buffer size to test on relative 2191 # small data sets. 2192 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.BUF_SIZE) 2193 2194 def reduce_send_buffer_size(self, sock, transport=None): 2195 # Reduce send socket buffer size to test on relative small data sets. 2196 2197 # On macOS, SO_SNDBUF is reset by connect(). So this method 2198 # should be called after the socket is connected. 2199 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.BUF_SIZE) 2200 2201 if transport is not None: 2202 transport.set_write_buffer_limits(high=self.BUF_SIZE) 2203 2204 def prepare_socksendfile(self): 2205 proto = self.MyProto(self.loop) 2206 port = support.find_unused_port() 2207 srv_sock = self.make_socket(cleanup=False) 2208 srv_sock.bind((support.HOST, port)) 2209 server = self.run_loop(self.loop.create_server( 2210 lambda: proto, sock=srv_sock)) 2211 self.reduce_receive_buffer_size(srv_sock) 2212 2213 sock = self.make_socket() 2214 self.run_loop(self.loop.sock_connect(sock, ('127.0.0.1', port))) 2215 self.reduce_send_buffer_size(sock) 2216 2217 def cleanup(): 2218 if proto.transport is not None: 2219 # can be None if the task was cancelled before 2220 # connection_made callback 2221 proto.transport.close() 2222 self.run_loop(proto.wait_closed()) 2223 2224 server.close() 2225 self.run_loop(server.wait_closed()) 2226 2227 self.addCleanup(cleanup) 2228 2229 return sock, proto 2230 2231 def test_sock_sendfile_success(self): 2232 sock, proto = self.prepare_socksendfile() 2233 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2234 sock.close() 2235 self.run_loop(proto.wait_closed()) 2236 2237 self.assertEqual(ret, len(self.DATA)) 2238 self.assertEqual(proto.data, self.DATA) 2239 self.assertEqual(self.file.tell(), len(self.DATA)) 2240 2241 def test_sock_sendfile_with_offset_and_count(self): 2242 sock, proto = self.prepare_socksendfile() 2243 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, 2244 1000, 2000)) 2245 sock.close() 2246 self.run_loop(proto.wait_closed()) 2247 2248 self.assertEqual(proto.data, self.DATA[1000:3000]) 2249 self.assertEqual(self.file.tell(), 3000) 2250 self.assertEqual(ret, 2000) 2251 2252 def test_sock_sendfile_zero_size(self): 2253 sock, proto = self.prepare_socksendfile() 2254 with tempfile.TemporaryFile() as f: 2255 ret = self.run_loop(self.loop.sock_sendfile(sock, f, 2256 0, None)) 2257 sock.close() 2258 self.run_loop(proto.wait_closed()) 2259 2260 self.assertEqual(ret, 0) 2261 self.assertEqual(self.file.tell(), 0) 2262 2263 def test_sock_sendfile_mix_with_regular_send(self): 2264 buf = b"mix_regular_send" * (4 * 1024) # 64 KiB 2265 sock, proto = self.prepare_socksendfile() 2266 self.run_loop(self.loop.sock_sendall(sock, buf)) 2267 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2268 self.run_loop(self.loop.sock_sendall(sock, buf)) 2269 sock.close() 2270 self.run_loop(proto.wait_closed()) 2271 2272 self.assertEqual(ret, len(self.DATA)) 2273 expected = buf + self.DATA + buf 2274 self.assertEqual(proto.data, expected) 2275 self.assertEqual(self.file.tell(), len(self.DATA)) 2276 2277 2278class SendfileMixin(SendfileBase): 2279 2280 class MySendfileProto(MyBaseProto): 2281 2282 def __init__(self, loop=None, close_after=0): 2283 super().__init__(loop) 2284 self.data = bytearray() 2285 self.close_after = close_after 2286 2287 def data_received(self, data): 2288 self.data.extend(data) 2289 super().data_received(data) 2290 if self.close_after and self.nbytes >= self.close_after: 2291 self.transport.close() 2292 2293 2294 # Note: sendfile via SSL transport is equal to sendfile fallback 2295 2296 def prepare_sendfile(self, *, is_ssl=False, close_after=0): 2297 port = support.find_unused_port() 2298 srv_proto = self.MySendfileProto(loop=self.loop, 2299 close_after=close_after) 2300 if is_ssl: 2301 if not ssl: 2302 self.skipTest("No ssl module") 2303 srv_ctx = test_utils.simple_server_sslcontext() 2304 cli_ctx = test_utils.simple_client_sslcontext() 2305 else: 2306 srv_ctx = None 2307 cli_ctx = None 2308 srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2309 srv_sock.bind((support.HOST, port)) 2310 server = self.run_loop(self.loop.create_server( 2311 lambda: srv_proto, sock=srv_sock, ssl=srv_ctx)) 2312 self.reduce_receive_buffer_size(srv_sock) 2313 2314 if is_ssl: 2315 server_hostname = support.HOST 2316 else: 2317 server_hostname = None 2318 cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2319 cli_sock.connect((support.HOST, port)) 2320 2321 cli_proto = self.MySendfileProto(loop=self.loop) 2322 tr, pr = self.run_loop(self.loop.create_connection( 2323 lambda: cli_proto, sock=cli_sock, 2324 ssl=cli_ctx, server_hostname=server_hostname)) 2325 self.reduce_send_buffer_size(cli_sock, transport=tr) 2326 2327 def cleanup(): 2328 srv_proto.transport.close() 2329 cli_proto.transport.close() 2330 self.run_loop(srv_proto.done) 2331 self.run_loop(cli_proto.done) 2332 2333 server.close() 2334 self.run_loop(server.wait_closed()) 2335 2336 self.addCleanup(cleanup) 2337 return srv_proto, cli_proto 2338 2339 @unittest.skipIf(sys.platform == 'win32', "UDP sockets are not supported") 2340 def test_sendfile_not_supported(self): 2341 tr, pr = self.run_loop( 2342 self.loop.create_datagram_endpoint( 2343 lambda: MyDatagramProto(loop=self.loop), 2344 family=socket.AF_INET)) 2345 try: 2346 with self.assertRaisesRegex(RuntimeError, "not supported"): 2347 self.run_loop( 2348 self.loop.sendfile(tr, self.file)) 2349 self.assertEqual(0, self.file.tell()) 2350 finally: 2351 # don't use self.addCleanup because it produces resource warning 2352 tr.close() 2353 2354 def test_sendfile(self): 2355 srv_proto, cli_proto = self.prepare_sendfile() 2356 ret = self.run_loop( 2357 self.loop.sendfile(cli_proto.transport, self.file)) 2358 cli_proto.transport.close() 2359 self.run_loop(srv_proto.done) 2360 self.assertEqual(ret, len(self.DATA)) 2361 self.assertEqual(srv_proto.nbytes, len(self.DATA)) 2362 self.assertEqual(srv_proto.data, self.DATA) 2363 self.assertEqual(self.file.tell(), len(self.DATA)) 2364 2365 def test_sendfile_force_fallback(self): 2366 srv_proto, cli_proto = self.prepare_sendfile() 2367 2368 def sendfile_native(transp, file, offset, count): 2369 # to raise SendfileNotAvailableError 2370 return base_events.BaseEventLoop._sendfile_native( 2371 self.loop, transp, file, offset, count) 2372 2373 self.loop._sendfile_native = sendfile_native 2374 2375 ret = self.run_loop( 2376 self.loop.sendfile(cli_proto.transport, self.file)) 2377 cli_proto.transport.close() 2378 self.run_loop(srv_proto.done) 2379 self.assertEqual(ret, len(self.DATA)) 2380 self.assertEqual(srv_proto.nbytes, len(self.DATA)) 2381 self.assertEqual(srv_proto.data, self.DATA) 2382 self.assertEqual(self.file.tell(), len(self.DATA)) 2383 2384 def test_sendfile_force_unsupported_native(self): 2385 if sys.platform == 'win32': 2386 if isinstance(self.loop, asyncio.ProactorEventLoop): 2387 self.skipTest("Fails on proactor event loop") 2388 srv_proto, cli_proto = self.prepare_sendfile() 2389 2390 def sendfile_native(transp, file, offset, count): 2391 # to raise SendfileNotAvailableError 2392 return base_events.BaseEventLoop._sendfile_native( 2393 self.loop, transp, file, offset, count) 2394 2395 self.loop._sendfile_native = sendfile_native 2396 2397 with self.assertRaisesRegex(events.SendfileNotAvailableError, 2398 "not supported"): 2399 self.run_loop( 2400 self.loop.sendfile(cli_proto.transport, self.file, 2401 fallback=False)) 2402 2403 cli_proto.transport.close() 2404 self.run_loop(srv_proto.done) 2405 self.assertEqual(srv_proto.nbytes, 0) 2406 self.assertEqual(self.file.tell(), 0) 2407 2408 def test_sendfile_ssl(self): 2409 srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True) 2410 ret = self.run_loop( 2411 self.loop.sendfile(cli_proto.transport, self.file)) 2412 cli_proto.transport.close() 2413 self.run_loop(srv_proto.done) 2414 self.assertEqual(ret, len(self.DATA)) 2415 self.assertEqual(srv_proto.nbytes, len(self.DATA)) 2416 self.assertEqual(srv_proto.data, self.DATA) 2417 self.assertEqual(self.file.tell(), len(self.DATA)) 2418 2419 def test_sendfile_for_closing_transp(self): 2420 srv_proto, cli_proto = self.prepare_sendfile() 2421 cli_proto.transport.close() 2422 with self.assertRaisesRegex(RuntimeError, "is closing"): 2423 self.run_loop(self.loop.sendfile(cli_proto.transport, self.file)) 2424 self.run_loop(srv_proto.done) 2425 self.assertEqual(srv_proto.nbytes, 0) 2426 self.assertEqual(self.file.tell(), 0) 2427 2428 def test_sendfile_pre_and_post_data(self): 2429 srv_proto, cli_proto = self.prepare_sendfile() 2430 PREFIX = b'PREFIX__' * 1024 # 8 KiB 2431 SUFFIX = b'--SUFFIX' * 1024 # 8 KiB 2432 cli_proto.transport.write(PREFIX) 2433 ret = self.run_loop( 2434 self.loop.sendfile(cli_proto.transport, self.file)) 2435 cli_proto.transport.write(SUFFIX) 2436 cli_proto.transport.close() 2437 self.run_loop(srv_proto.done) 2438 self.assertEqual(ret, len(self.DATA)) 2439 self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX) 2440 self.assertEqual(self.file.tell(), len(self.DATA)) 2441 2442 def test_sendfile_ssl_pre_and_post_data(self): 2443 srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True) 2444 PREFIX = b'zxcvbnm' * 1024 2445 SUFFIX = b'0987654321' * 1024 2446 cli_proto.transport.write(PREFIX) 2447 ret = self.run_loop( 2448 self.loop.sendfile(cli_proto.transport, self.file)) 2449 cli_proto.transport.write(SUFFIX) 2450 cli_proto.transport.close() 2451 self.run_loop(srv_proto.done) 2452 self.assertEqual(ret, len(self.DATA)) 2453 self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX) 2454 self.assertEqual(self.file.tell(), len(self.DATA)) 2455 2456 def test_sendfile_partial(self): 2457 srv_proto, cli_proto = self.prepare_sendfile() 2458 ret = self.run_loop( 2459 self.loop.sendfile(cli_proto.transport, self.file, 1000, 100)) 2460 cli_proto.transport.close() 2461 self.run_loop(srv_proto.done) 2462 self.assertEqual(ret, 100) 2463 self.assertEqual(srv_proto.nbytes, 100) 2464 self.assertEqual(srv_proto.data, self.DATA[1000:1100]) 2465 self.assertEqual(self.file.tell(), 1100) 2466 2467 def test_sendfile_ssl_partial(self): 2468 srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True) 2469 ret = self.run_loop( 2470 self.loop.sendfile(cli_proto.transport, self.file, 1000, 100)) 2471 cli_proto.transport.close() 2472 self.run_loop(srv_proto.done) 2473 self.assertEqual(ret, 100) 2474 self.assertEqual(srv_proto.nbytes, 100) 2475 self.assertEqual(srv_proto.data, self.DATA[1000:1100]) 2476 self.assertEqual(self.file.tell(), 1100) 2477 2478 def test_sendfile_close_peer_after_receiving(self): 2479 srv_proto, cli_proto = self.prepare_sendfile( 2480 close_after=len(self.DATA)) 2481 ret = self.run_loop( 2482 self.loop.sendfile(cli_proto.transport, self.file)) 2483 cli_proto.transport.close() 2484 self.run_loop(srv_proto.done) 2485 self.assertEqual(ret, len(self.DATA)) 2486 self.assertEqual(srv_proto.nbytes, len(self.DATA)) 2487 self.assertEqual(srv_proto.data, self.DATA) 2488 self.assertEqual(self.file.tell(), len(self.DATA)) 2489 2490 def test_sendfile_ssl_close_peer_after_receiving(self): 2491 srv_proto, cli_proto = self.prepare_sendfile( 2492 is_ssl=True, close_after=len(self.DATA)) 2493 ret = self.run_loop( 2494 self.loop.sendfile(cli_proto.transport, self.file)) 2495 self.run_loop(srv_proto.done) 2496 self.assertEqual(ret, len(self.DATA)) 2497 self.assertEqual(srv_proto.nbytes, len(self.DATA)) 2498 self.assertEqual(srv_proto.data, self.DATA) 2499 self.assertEqual(self.file.tell(), len(self.DATA)) 2500 2501 def test_sendfile_close_peer_in_the_middle_of_receiving(self): 2502 srv_proto, cli_proto = self.prepare_sendfile(close_after=1024) 2503 with self.assertRaises(ConnectionError): 2504 self.run_loop( 2505 self.loop.sendfile(cli_proto.transport, self.file)) 2506 self.run_loop(srv_proto.done) 2507 2508 self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA), 2509 srv_proto.nbytes) 2510 self.assertTrue(1024 <= self.file.tell() < len(self.DATA), 2511 self.file.tell()) 2512 self.assertTrue(cli_proto.transport.is_closing()) 2513 2514 def test_sendfile_fallback_close_peer_in_the_middle_of_receiving(self): 2515 2516 def sendfile_native(transp, file, offset, count): 2517 # to raise SendfileNotAvailableError 2518 return base_events.BaseEventLoop._sendfile_native( 2519 self.loop, transp, file, offset, count) 2520 2521 self.loop._sendfile_native = sendfile_native 2522 2523 srv_proto, cli_proto = self.prepare_sendfile(close_after=1024) 2524 with self.assertRaises(ConnectionError): 2525 self.run_loop( 2526 self.loop.sendfile(cli_proto.transport, self.file)) 2527 self.run_loop(srv_proto.done) 2528 2529 self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA), 2530 srv_proto.nbytes) 2531 self.assertTrue(1024 <= self.file.tell() < len(self.DATA), 2532 self.file.tell()) 2533 2534 @unittest.skipIf(not hasattr(os, 'sendfile'), 2535 "Don't have native sendfile support") 2536 def test_sendfile_prevents_bare_write(self): 2537 srv_proto, cli_proto = self.prepare_sendfile() 2538 fut = self.loop.create_future() 2539 2540 async def coro(): 2541 fut.set_result(None) 2542 return await self.loop.sendfile(cli_proto.transport, self.file) 2543 2544 t = self.loop.create_task(coro()) 2545 self.run_loop(fut) 2546 with self.assertRaisesRegex(RuntimeError, 2547 "sendfile is in progress"): 2548 cli_proto.transport.write(b'data') 2549 ret = self.run_loop(t) 2550 self.assertEqual(ret, len(self.DATA)) 2551 2552 def test_sendfile_no_fallback_for_fallback_transport(self): 2553 transport = mock.Mock() 2554 transport.is_closing.side_effect = lambda: False 2555 transport._sendfile_compatible = constants._SendfileMode.FALLBACK 2556 with self.assertRaisesRegex(RuntimeError, 'fallback is disabled'): 2557 self.loop.run_until_complete( 2558 self.loop.sendfile(transport, None, fallback=False)) 2559 2560 2561if sys.platform == 'win32': 2562 2563 class SelectEventLoopTests(EventLoopTestsMixin, 2564 SendfileMixin, 2565 SockSendfileMixin, 2566 test_utils.TestCase): 2567 2568 def create_event_loop(self): 2569 return asyncio.SelectorEventLoop() 2570 2571 class ProactorEventLoopTests(EventLoopTestsMixin, 2572 SendfileMixin, 2573 SockSendfileMixin, 2574 SubprocessTestsMixin, 2575 test_utils.TestCase): 2576 2577 def create_event_loop(self): 2578 return asyncio.ProactorEventLoop() 2579 2580 def test_reader_callback(self): 2581 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2582 2583 def test_reader_callback_cancel(self): 2584 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2585 2586 def test_writer_callback(self): 2587 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2588 2589 def test_writer_callback_cancel(self): 2590 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2591 2592 def test_create_datagram_endpoint(self): 2593 raise unittest.SkipTest( 2594 "IocpEventLoop does not have create_datagram_endpoint()") 2595 2596 def test_remove_fds_after_closing(self): 2597 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2598else: 2599 import selectors 2600 2601 class UnixEventLoopTestsMixin(EventLoopTestsMixin, 2602 SendfileMixin, 2603 SockSendfileMixin): 2604 def setUp(self): 2605 super().setUp() 2606 watcher = asyncio.SafeChildWatcher() 2607 watcher.attach_loop(self.loop) 2608 asyncio.set_child_watcher(watcher) 2609 2610 def tearDown(self): 2611 asyncio.set_child_watcher(None) 2612 super().tearDown() 2613 2614 2615 if hasattr(selectors, 'KqueueSelector'): 2616 class KqueueEventLoopTests(UnixEventLoopTestsMixin, 2617 SubprocessTestsMixin, 2618 test_utils.TestCase): 2619 2620 def create_event_loop(self): 2621 return asyncio.SelectorEventLoop( 2622 selectors.KqueueSelector()) 2623 2624 # kqueue doesn't support character devices (PTY) on Mac OS X older 2625 # than 10.9 (Maverick) 2626 @support.requires_mac_ver(10, 9) 2627 # Issue #20667: KqueueEventLoopTests.test_read_pty_output() 2628 # hangs on OpenBSD 5.5 2629 @unittest.skipIf(sys.platform.startswith('openbsd'), 2630 'test hangs on OpenBSD') 2631 def test_read_pty_output(self): 2632 super().test_read_pty_output() 2633 2634 # kqueue doesn't support character devices (PTY) on Mac OS X older 2635 # than 10.9 (Maverick) 2636 @support.requires_mac_ver(10, 9) 2637 def test_write_pty(self): 2638 super().test_write_pty() 2639 2640 if hasattr(selectors, 'EpollSelector'): 2641 class EPollEventLoopTests(UnixEventLoopTestsMixin, 2642 SubprocessTestsMixin, 2643 test_utils.TestCase): 2644 2645 def create_event_loop(self): 2646 return asyncio.SelectorEventLoop(selectors.EpollSelector()) 2647 2648 if hasattr(selectors, 'PollSelector'): 2649 class PollEventLoopTests(UnixEventLoopTestsMixin, 2650 SubprocessTestsMixin, 2651 test_utils.TestCase): 2652 2653 def create_event_loop(self): 2654 return asyncio.SelectorEventLoop(selectors.PollSelector()) 2655 2656 # Should always exist. 2657 class SelectEventLoopTests(UnixEventLoopTestsMixin, 2658 SubprocessTestsMixin, 2659 test_utils.TestCase): 2660 2661 def create_event_loop(self): 2662 return asyncio.SelectorEventLoop(selectors.SelectSelector()) 2663 2664 2665def noop(*args, **kwargs): 2666 pass 2667 2668 2669class HandleTests(test_utils.TestCase): 2670 2671 def setUp(self): 2672 super().setUp() 2673 self.loop = mock.Mock() 2674 self.loop.get_debug.return_value = True 2675 2676 def test_handle(self): 2677 def callback(*args): 2678 return args 2679 2680 args = () 2681 h = asyncio.Handle(callback, args, self.loop) 2682 self.assertIs(h._callback, callback) 2683 self.assertIs(h._args, args) 2684 self.assertFalse(h.cancelled()) 2685 2686 h.cancel() 2687 self.assertTrue(h.cancelled()) 2688 2689 def test_callback_with_exception(self): 2690 def callback(): 2691 raise ValueError() 2692 2693 self.loop = mock.Mock() 2694 self.loop.call_exception_handler = mock.Mock() 2695 2696 h = asyncio.Handle(callback, (), self.loop) 2697 h._run() 2698 2699 self.loop.call_exception_handler.assert_called_with({ 2700 'message': test_utils.MockPattern('Exception in callback.*'), 2701 'exception': mock.ANY, 2702 'handle': h, 2703 'source_traceback': h._source_traceback, 2704 }) 2705 2706 def test_handle_weakref(self): 2707 wd = weakref.WeakValueDictionary() 2708 h = asyncio.Handle(lambda: None, (), self.loop) 2709 wd['h'] = h # Would fail without __weakref__ slot. 2710 2711 def test_handle_repr(self): 2712 self.loop.get_debug.return_value = False 2713 2714 # simple function 2715 h = asyncio.Handle(noop, (1, 2), self.loop) 2716 filename, lineno = test_utils.get_function_source(noop) 2717 self.assertEqual(repr(h), 2718 '<Handle noop(1, 2) at %s:%s>' 2719 % (filename, lineno)) 2720 2721 # cancelled handle 2722 h.cancel() 2723 self.assertEqual(repr(h), 2724 '<Handle cancelled>') 2725 2726 # decorated function 2727 cb = asyncio.coroutine(noop) 2728 h = asyncio.Handle(cb, (), self.loop) 2729 self.assertEqual(repr(h), 2730 '<Handle noop() at %s:%s>' 2731 % (filename, lineno)) 2732 2733 # partial function 2734 cb = functools.partial(noop, 1, 2) 2735 h = asyncio.Handle(cb, (3,), self.loop) 2736 regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$' 2737 % (re.escape(filename), lineno)) 2738 self.assertRegex(repr(h), regex) 2739 2740 # partial function with keyword args 2741 cb = functools.partial(noop, x=1) 2742 h = asyncio.Handle(cb, (2, 3), self.loop) 2743 regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$' 2744 % (re.escape(filename), lineno)) 2745 self.assertRegex(repr(h), regex) 2746 2747 # partial method 2748 if sys.version_info >= (3, 4): 2749 method = HandleTests.test_handle_repr 2750 cb = functools.partialmethod(method) 2751 filename, lineno = test_utils.get_function_source(method) 2752 h = asyncio.Handle(cb, (), self.loop) 2753 2754 cb_regex = r'<function HandleTests.test_handle_repr .*>' 2755 cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex) 2756 regex = (r'^<Handle %s at %s:%s>$' 2757 % (cb_regex, re.escape(filename), lineno)) 2758 self.assertRegex(repr(h), regex) 2759 2760 def test_handle_repr_debug(self): 2761 self.loop.get_debug.return_value = True 2762 2763 # simple function 2764 create_filename = __file__ 2765 create_lineno = sys._getframe().f_lineno + 1 2766 h = asyncio.Handle(noop, (1, 2), self.loop) 2767 filename, lineno = test_utils.get_function_source(noop) 2768 self.assertEqual(repr(h), 2769 '<Handle noop(1, 2) at %s:%s created at %s:%s>' 2770 % (filename, lineno, create_filename, create_lineno)) 2771 2772 # cancelled handle 2773 h.cancel() 2774 self.assertEqual( 2775 repr(h), 2776 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2777 % (filename, lineno, create_filename, create_lineno)) 2778 2779 # double cancellation won't overwrite _repr 2780 h.cancel() 2781 self.assertEqual( 2782 repr(h), 2783 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2784 % (filename, lineno, create_filename, create_lineno)) 2785 2786 def test_handle_source_traceback(self): 2787 loop = asyncio.get_event_loop_policy().new_event_loop() 2788 loop.set_debug(True) 2789 self.set_event_loop(loop) 2790 2791 def check_source_traceback(h): 2792 lineno = sys._getframe(1).f_lineno - 1 2793 self.assertIsInstance(h._source_traceback, list) 2794 self.assertEqual(h._source_traceback[-1][:3], 2795 (__file__, 2796 lineno, 2797 'test_handle_source_traceback')) 2798 2799 # call_soon 2800 h = loop.call_soon(noop) 2801 check_source_traceback(h) 2802 2803 # call_soon_threadsafe 2804 h = loop.call_soon_threadsafe(noop) 2805 check_source_traceback(h) 2806 2807 # call_later 2808 h = loop.call_later(0, noop) 2809 check_source_traceback(h) 2810 2811 # call_at 2812 h = loop.call_later(0, noop) 2813 check_source_traceback(h) 2814 2815 @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'), 2816 'No collections.abc.Coroutine') 2817 def test_coroutine_like_object_debug_formatting(self): 2818 # Test that asyncio can format coroutines that are instances of 2819 # collections.abc.Coroutine, but lack cr_core or gi_code attributes 2820 # (such as ones compiled with Cython). 2821 2822 coro = CoroLike() 2823 coro.__name__ = 'AAA' 2824 self.assertTrue(asyncio.iscoroutine(coro)) 2825 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2826 2827 coro.__qualname__ = 'BBB' 2828 self.assertEqual(coroutines._format_coroutine(coro), 'BBB()') 2829 2830 coro.cr_running = True 2831 self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running') 2832 2833 coro.__name__ = coro.__qualname__ = None 2834 self.assertEqual(coroutines._format_coroutine(coro), 2835 '<CoroLike without __name__>() running') 2836 2837 coro = CoroLike() 2838 coro.__qualname__ = 'CoroLike' 2839 # Some coroutines might not have '__name__', such as 2840 # built-in async_gen.asend(). 2841 self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()') 2842 2843 coro = CoroLike() 2844 coro.__qualname__ = 'AAA' 2845 coro.cr_code = None 2846 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2847 2848 2849class TimerTests(unittest.TestCase): 2850 2851 def setUp(self): 2852 super().setUp() 2853 self.loop = mock.Mock() 2854 2855 def test_hash(self): 2856 when = time.monotonic() 2857 h = asyncio.TimerHandle(when, lambda: False, (), 2858 mock.Mock()) 2859 self.assertEqual(hash(h), hash(when)) 2860 2861 def test_when(self): 2862 when = time.monotonic() 2863 h = asyncio.TimerHandle(when, lambda: False, (), 2864 mock.Mock()) 2865 self.assertEqual(when, h.when()) 2866 2867 def test_timer(self): 2868 def callback(*args): 2869 return args 2870 2871 args = (1, 2, 3) 2872 when = time.monotonic() 2873 h = asyncio.TimerHandle(when, callback, args, mock.Mock()) 2874 self.assertIs(h._callback, callback) 2875 self.assertIs(h._args, args) 2876 self.assertFalse(h.cancelled()) 2877 2878 # cancel 2879 h.cancel() 2880 self.assertTrue(h.cancelled()) 2881 self.assertIsNone(h._callback) 2882 self.assertIsNone(h._args) 2883 2884 # when cannot be None 2885 self.assertRaises(AssertionError, 2886 asyncio.TimerHandle, None, callback, args, 2887 self.loop) 2888 2889 def test_timer_repr(self): 2890 self.loop.get_debug.return_value = False 2891 2892 # simple function 2893 h = asyncio.TimerHandle(123, noop, (), self.loop) 2894 src = test_utils.get_function_source(noop) 2895 self.assertEqual(repr(h), 2896 '<TimerHandle when=123 noop() at %s:%s>' % src) 2897 2898 # cancelled handle 2899 h.cancel() 2900 self.assertEqual(repr(h), 2901 '<TimerHandle cancelled when=123>') 2902 2903 def test_timer_repr_debug(self): 2904 self.loop.get_debug.return_value = True 2905 2906 # simple function 2907 create_filename = __file__ 2908 create_lineno = sys._getframe().f_lineno + 1 2909 h = asyncio.TimerHandle(123, noop, (), self.loop) 2910 filename, lineno = test_utils.get_function_source(noop) 2911 self.assertEqual(repr(h), 2912 '<TimerHandle when=123 noop() ' 2913 'at %s:%s created at %s:%s>' 2914 % (filename, lineno, create_filename, create_lineno)) 2915 2916 # cancelled handle 2917 h.cancel() 2918 self.assertEqual(repr(h), 2919 '<TimerHandle cancelled when=123 noop() ' 2920 'at %s:%s created at %s:%s>' 2921 % (filename, lineno, create_filename, create_lineno)) 2922 2923 2924 def test_timer_comparison(self): 2925 def callback(*args): 2926 return args 2927 2928 when = time.monotonic() 2929 2930 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2931 h2 = asyncio.TimerHandle(when, callback, (), self.loop) 2932 # TODO: Use assertLess etc. 2933 self.assertFalse(h1 < h2) 2934 self.assertFalse(h2 < h1) 2935 self.assertTrue(h1 <= h2) 2936 self.assertTrue(h2 <= h1) 2937 self.assertFalse(h1 > h2) 2938 self.assertFalse(h2 > h1) 2939 self.assertTrue(h1 >= h2) 2940 self.assertTrue(h2 >= h1) 2941 self.assertTrue(h1 == h2) 2942 self.assertFalse(h1 != h2) 2943 2944 h2.cancel() 2945 self.assertFalse(h1 == h2) 2946 2947 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2948 h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop) 2949 self.assertTrue(h1 < h2) 2950 self.assertFalse(h2 < h1) 2951 self.assertTrue(h1 <= h2) 2952 self.assertFalse(h2 <= h1) 2953 self.assertFalse(h1 > h2) 2954 self.assertTrue(h2 > h1) 2955 self.assertFalse(h1 >= h2) 2956 self.assertTrue(h2 >= h1) 2957 self.assertFalse(h1 == h2) 2958 self.assertTrue(h1 != h2) 2959 2960 h3 = asyncio.Handle(callback, (), self.loop) 2961 self.assertIs(NotImplemented, h1.__eq__(h3)) 2962 self.assertIs(NotImplemented, h1.__ne__(h3)) 2963 2964 2965class AbstractEventLoopTests(unittest.TestCase): 2966 2967 def test_not_implemented(self): 2968 f = mock.Mock() 2969 loop = asyncio.AbstractEventLoop() 2970 self.assertRaises( 2971 NotImplementedError, loop.run_forever) 2972 self.assertRaises( 2973 NotImplementedError, loop.run_until_complete, None) 2974 self.assertRaises( 2975 NotImplementedError, loop.stop) 2976 self.assertRaises( 2977 NotImplementedError, loop.is_running) 2978 self.assertRaises( 2979 NotImplementedError, loop.is_closed) 2980 self.assertRaises( 2981 NotImplementedError, loop.close) 2982 self.assertRaises( 2983 NotImplementedError, loop.create_task, None) 2984 self.assertRaises( 2985 NotImplementedError, loop.call_later, None, None) 2986 self.assertRaises( 2987 NotImplementedError, loop.call_at, f, f) 2988 self.assertRaises( 2989 NotImplementedError, loop.call_soon, None) 2990 self.assertRaises( 2991 NotImplementedError, loop.time) 2992 self.assertRaises( 2993 NotImplementedError, loop.call_soon_threadsafe, None) 2994 self.assertRaises( 2995 NotImplementedError, loop.set_default_executor, f) 2996 self.assertRaises( 2997 NotImplementedError, loop.add_reader, 1, f) 2998 self.assertRaises( 2999 NotImplementedError, loop.remove_reader, 1) 3000 self.assertRaises( 3001 NotImplementedError, loop.add_writer, 1, f) 3002 self.assertRaises( 3003 NotImplementedError, loop.remove_writer, 1) 3004 self.assertRaises( 3005 NotImplementedError, loop.add_signal_handler, 1, f) 3006 self.assertRaises( 3007 NotImplementedError, loop.remove_signal_handler, 1) 3008 self.assertRaises( 3009 NotImplementedError, loop.remove_signal_handler, 1) 3010 self.assertRaises( 3011 NotImplementedError, loop.set_exception_handler, f) 3012 self.assertRaises( 3013 NotImplementedError, loop.default_exception_handler, f) 3014 self.assertRaises( 3015 NotImplementedError, loop.call_exception_handler, f) 3016 self.assertRaises( 3017 NotImplementedError, loop.get_debug) 3018 self.assertRaises( 3019 NotImplementedError, loop.set_debug, f) 3020 3021 def test_not_implemented_async(self): 3022 3023 async def inner(): 3024 f = mock.Mock() 3025 loop = asyncio.AbstractEventLoop() 3026 3027 with self.assertRaises(NotImplementedError): 3028 await loop.run_in_executor(f, f) 3029 with self.assertRaises(NotImplementedError): 3030 await loop.getaddrinfo('localhost', 8080) 3031 with self.assertRaises(NotImplementedError): 3032 await loop.getnameinfo(('localhost', 8080)) 3033 with self.assertRaises(NotImplementedError): 3034 await loop.create_connection(f) 3035 with self.assertRaises(NotImplementedError): 3036 await loop.create_server(f) 3037 with self.assertRaises(NotImplementedError): 3038 await loop.create_datagram_endpoint(f) 3039 with self.assertRaises(NotImplementedError): 3040 await loop.sock_recv(f, 10) 3041 with self.assertRaises(NotImplementedError): 3042 await loop.sock_recv_into(f, 10) 3043 with self.assertRaises(NotImplementedError): 3044 await loop.sock_sendall(f, 10) 3045 with self.assertRaises(NotImplementedError): 3046 await loop.sock_connect(f, f) 3047 with self.assertRaises(NotImplementedError): 3048 await loop.sock_accept(f) 3049 with self.assertRaises(NotImplementedError): 3050 await loop.sock_sendfile(f, f) 3051 with self.assertRaises(NotImplementedError): 3052 await loop.sendfile(f, f) 3053 with self.assertRaises(NotImplementedError): 3054 await loop.connect_read_pipe(f, mock.sentinel.pipe) 3055 with self.assertRaises(NotImplementedError): 3056 await loop.connect_write_pipe(f, mock.sentinel.pipe) 3057 with self.assertRaises(NotImplementedError): 3058 await loop.subprocess_shell(f, mock.sentinel) 3059 with self.assertRaises(NotImplementedError): 3060 await loop.subprocess_exec(f) 3061 3062 loop = asyncio.new_event_loop() 3063 loop.run_until_complete(inner()) 3064 loop.close() 3065 3066 3067class ProtocolsAbsTests(unittest.TestCase): 3068 3069 def test_empty(self): 3070 f = mock.Mock() 3071 p = asyncio.Protocol() 3072 self.assertIsNone(p.connection_made(f)) 3073 self.assertIsNone(p.connection_lost(f)) 3074 self.assertIsNone(p.data_received(f)) 3075 self.assertIsNone(p.eof_received()) 3076 3077 dp = asyncio.DatagramProtocol() 3078 self.assertIsNone(dp.connection_made(f)) 3079 self.assertIsNone(dp.connection_lost(f)) 3080 self.assertIsNone(dp.error_received(f)) 3081 self.assertIsNone(dp.datagram_received(f, f)) 3082 3083 sp = asyncio.SubprocessProtocol() 3084 self.assertIsNone(sp.connection_made(f)) 3085 self.assertIsNone(sp.connection_lost(f)) 3086 self.assertIsNone(sp.pipe_data_received(1, f)) 3087 self.assertIsNone(sp.pipe_connection_lost(1, f)) 3088 self.assertIsNone(sp.process_exited()) 3089 3090 3091class PolicyTests(unittest.TestCase): 3092 3093 def test_event_loop_policy(self): 3094 policy = asyncio.AbstractEventLoopPolicy() 3095 self.assertRaises(NotImplementedError, policy.get_event_loop) 3096 self.assertRaises(NotImplementedError, policy.set_event_loop, object()) 3097 self.assertRaises(NotImplementedError, policy.new_event_loop) 3098 self.assertRaises(NotImplementedError, policy.get_child_watcher) 3099 self.assertRaises(NotImplementedError, policy.set_child_watcher, 3100 object()) 3101 3102 def test_get_event_loop(self): 3103 policy = asyncio.DefaultEventLoopPolicy() 3104 self.assertIsNone(policy._local._loop) 3105 3106 loop = policy.get_event_loop() 3107 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 3108 3109 self.assertIs(policy._local._loop, loop) 3110 self.assertIs(loop, policy.get_event_loop()) 3111 loop.close() 3112 3113 def test_get_event_loop_calls_set_event_loop(self): 3114 policy = asyncio.DefaultEventLoopPolicy() 3115 3116 with mock.patch.object( 3117 policy, "set_event_loop", 3118 wraps=policy.set_event_loop) as m_set_event_loop: 3119 3120 loop = policy.get_event_loop() 3121 3122 # policy._local._loop must be set through .set_event_loop() 3123 # (the unix DefaultEventLoopPolicy needs this call to attach 3124 # the child watcher correctly) 3125 m_set_event_loop.assert_called_with(loop) 3126 3127 loop.close() 3128 3129 def test_get_event_loop_after_set_none(self): 3130 policy = asyncio.DefaultEventLoopPolicy() 3131 policy.set_event_loop(None) 3132 self.assertRaises(RuntimeError, policy.get_event_loop) 3133 3134 @mock.patch('asyncio.events.threading.current_thread') 3135 def test_get_event_loop_thread(self, m_current_thread): 3136 3137 def f(): 3138 policy = asyncio.DefaultEventLoopPolicy() 3139 self.assertRaises(RuntimeError, policy.get_event_loop) 3140 3141 th = threading.Thread(target=f) 3142 th.start() 3143 th.join() 3144 3145 def test_new_event_loop(self): 3146 policy = asyncio.DefaultEventLoopPolicy() 3147 3148 loop = policy.new_event_loop() 3149 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 3150 loop.close() 3151 3152 def test_set_event_loop(self): 3153 policy = asyncio.DefaultEventLoopPolicy() 3154 old_loop = policy.get_event_loop() 3155 3156 self.assertRaises(AssertionError, policy.set_event_loop, object()) 3157 3158 loop = policy.new_event_loop() 3159 policy.set_event_loop(loop) 3160 self.assertIs(loop, policy.get_event_loop()) 3161 self.assertIsNot(old_loop, policy.get_event_loop()) 3162 loop.close() 3163 old_loop.close() 3164 3165 def test_get_event_loop_policy(self): 3166 policy = asyncio.get_event_loop_policy() 3167 self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy) 3168 self.assertIs(policy, asyncio.get_event_loop_policy()) 3169 3170 def test_set_event_loop_policy(self): 3171 self.assertRaises( 3172 AssertionError, asyncio.set_event_loop_policy, object()) 3173 3174 old_policy = asyncio.get_event_loop_policy() 3175 3176 policy = asyncio.DefaultEventLoopPolicy() 3177 asyncio.set_event_loop_policy(policy) 3178 self.assertIs(policy, asyncio.get_event_loop_policy()) 3179 self.assertIsNot(policy, old_policy) 3180 3181 3182class GetEventLoopTestsMixin: 3183 3184 _get_running_loop_impl = None 3185 _set_running_loop_impl = None 3186 get_running_loop_impl = None 3187 get_event_loop_impl = None 3188 3189 def setUp(self): 3190 self._get_running_loop_saved = events._get_running_loop 3191 self._set_running_loop_saved = events._set_running_loop 3192 self.get_running_loop_saved = events.get_running_loop 3193 self.get_event_loop_saved = events.get_event_loop 3194 3195 events._get_running_loop = type(self)._get_running_loop_impl 3196 events._set_running_loop = type(self)._set_running_loop_impl 3197 events.get_running_loop = type(self).get_running_loop_impl 3198 events.get_event_loop = type(self).get_event_loop_impl 3199 3200 asyncio._get_running_loop = type(self)._get_running_loop_impl 3201 asyncio._set_running_loop = type(self)._set_running_loop_impl 3202 asyncio.get_running_loop = type(self).get_running_loop_impl 3203 asyncio.get_event_loop = type(self).get_event_loop_impl 3204 3205 super().setUp() 3206 3207 self.loop = asyncio.new_event_loop() 3208 asyncio.set_event_loop(self.loop) 3209 3210 if sys.platform != 'win32': 3211 watcher = asyncio.SafeChildWatcher() 3212 watcher.attach_loop(self.loop) 3213 asyncio.set_child_watcher(watcher) 3214 3215 def tearDown(self): 3216 try: 3217 if sys.platform != 'win32': 3218 asyncio.set_child_watcher(None) 3219 3220 super().tearDown() 3221 finally: 3222 self.loop.close() 3223 asyncio.set_event_loop(None) 3224 3225 events._get_running_loop = self._get_running_loop_saved 3226 events._set_running_loop = self._set_running_loop_saved 3227 events.get_running_loop = self.get_running_loop_saved 3228 events.get_event_loop = self.get_event_loop_saved 3229 3230 asyncio._get_running_loop = self._get_running_loop_saved 3231 asyncio._set_running_loop = self._set_running_loop_saved 3232 asyncio.get_running_loop = self.get_running_loop_saved 3233 asyncio.get_event_loop = self.get_event_loop_saved 3234 3235 if sys.platform != 'win32': 3236 3237 def test_get_event_loop_new_process(self): 3238 # Issue bpo-32126: The multiprocessing module used by 3239 # ProcessPoolExecutor is not functional when the 3240 # multiprocessing.synchronize module cannot be imported. 3241 support.import_module('multiprocessing.synchronize') 3242 3243 async def main(): 3244 pool = concurrent.futures.ProcessPoolExecutor() 3245 result = await self.loop.run_in_executor( 3246 pool, _test_get_event_loop_new_process__sub_proc) 3247 pool.shutdown() 3248 return result 3249 3250 self.assertEqual( 3251 self.loop.run_until_complete(main()), 3252 'hello') 3253 3254 def test_get_event_loop_returns_running_loop(self): 3255 class TestError(Exception): 3256 pass 3257 3258 class Policy(asyncio.DefaultEventLoopPolicy): 3259 def get_event_loop(self): 3260 raise TestError 3261 3262 old_policy = asyncio.get_event_loop_policy() 3263 try: 3264 asyncio.set_event_loop_policy(Policy()) 3265 loop = asyncio.new_event_loop() 3266 3267 with self.assertRaises(TestError): 3268 asyncio.get_event_loop() 3269 asyncio.set_event_loop(None) 3270 with self.assertRaises(TestError): 3271 asyncio.get_event_loop() 3272 3273 with self.assertRaisesRegex(RuntimeError, 'no running'): 3274 self.assertIs(asyncio.get_running_loop(), None) 3275 self.assertIs(asyncio._get_running_loop(), None) 3276 3277 async def func(): 3278 self.assertIs(asyncio.get_event_loop(), loop) 3279 self.assertIs(asyncio.get_running_loop(), loop) 3280 self.assertIs(asyncio._get_running_loop(), loop) 3281 3282 loop.run_until_complete(func()) 3283 3284 asyncio.set_event_loop(loop) 3285 with self.assertRaises(TestError): 3286 asyncio.get_event_loop() 3287 3288 asyncio.set_event_loop(None) 3289 with self.assertRaises(TestError): 3290 asyncio.get_event_loop() 3291 3292 finally: 3293 asyncio.set_event_loop_policy(old_policy) 3294 if loop is not None: 3295 loop.close() 3296 3297 with self.assertRaisesRegex(RuntimeError, 'no running'): 3298 self.assertIs(asyncio.get_running_loop(), None) 3299 3300 self.assertIs(asyncio._get_running_loop(), None) 3301 3302 3303class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 3304 3305 _get_running_loop_impl = events._py__get_running_loop 3306 _set_running_loop_impl = events._py__set_running_loop 3307 get_running_loop_impl = events._py_get_running_loop 3308 get_event_loop_impl = events._py_get_event_loop 3309 3310 3311try: 3312 import _asyncio # NoQA 3313except ImportError: 3314 pass 3315else: 3316 3317 class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 3318 3319 _get_running_loop_impl = events._c__get_running_loop 3320 _set_running_loop_impl = events._c__set_running_loop 3321 get_running_loop_impl = events._c_get_running_loop 3322 get_event_loop_impl = events._c_get_event_loop 3323 3324 3325class TestServer(unittest.TestCase): 3326 3327 def test_get_loop(self): 3328 loop = asyncio.new_event_loop() 3329 self.addCleanup(loop.close) 3330 proto = MyProto(loop) 3331 server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0)) 3332 self.assertEqual(server.get_loop(), loop) 3333 server.close() 3334 loop.run_until_complete(server.wait_closed()) 3335 3336 3337class TestAbstractServer(unittest.TestCase): 3338 3339 def test_close(self): 3340 with self.assertRaises(NotImplementedError): 3341 events.AbstractServer().close() 3342 3343 def test_wait_closed(self): 3344 loop = asyncio.new_event_loop() 3345 self.addCleanup(loop.close) 3346 3347 with self.assertRaises(NotImplementedError): 3348 loop.run_until_complete(events.AbstractServer().wait_closed()) 3349 3350 def test_get_loop(self): 3351 with self.assertRaises(NotImplementedError): 3352 events.AbstractServer().get_loop() 3353 3354 3355if __name__ == '__main__': 3356 unittest.main() 3357