1"""Tests for events.py.""" 2 3import collections.abc 4import concurrent.futures 5import functools 6import io 7import os 8import platform 9import re 10import signal 11import socket 12try: 13 import ssl 14except ImportError: 15 ssl = None 16import subprocess 17import sys 18import threading 19import time 20import errno 21import unittest 22from unittest import mock 23import weakref 24 25if sys.platform != 'win32': 26 import tty 27 28import asyncio 29from asyncio import coroutines 30from asyncio import events 31from asyncio import proactor_events 32from asyncio import selector_events 33from test.test_asyncio import utils as test_utils 34from test import support 35from test.support import socket_helper 36from test.support import ALWAYS_EQ, LARGEST, SMALLEST 37 38 39def tearDownModule(): 40 asyncio.set_event_loop_policy(None) 41 42 43def broken_unix_getsockname(): 44 """Return True if the platform is Mac OS 10.4 or older.""" 45 if sys.platform.startswith("aix"): 46 return True 47 elif sys.platform != 'darwin': 48 return False 49 version = platform.mac_ver()[0] 50 version = tuple(map(int, version.split('.'))) 51 return version < (10, 5) 52 53 54def _test_get_event_loop_new_process__sub_proc(): 55 async def doit(): 56 return 'hello' 57 58 loop = asyncio.new_event_loop() 59 asyncio.set_event_loop(loop) 60 return loop.run_until_complete(doit()) 61 62 63class CoroLike: 64 def send(self, v): 65 pass 66 67 def throw(self, *exc): 68 pass 69 70 def close(self): 71 pass 72 73 def __await__(self): 74 pass 75 76 77class MyBaseProto(asyncio.Protocol): 78 connected = None 79 done = None 80 81 def __init__(self, loop=None): 82 self.transport = None 83 self.state = 'INITIAL' 84 self.nbytes = 0 85 if loop is not None: 86 self.connected = loop.create_future() 87 self.done = loop.create_future() 88 89 def connection_made(self, transport): 90 self.transport = transport 91 assert self.state == 'INITIAL', self.state 92 self.state = 'CONNECTED' 93 if self.connected: 94 self.connected.set_result(None) 95 96 def data_received(self, data): 97 assert self.state == 'CONNECTED', self.state 98 self.nbytes += len(data) 99 100 def eof_received(self): 101 assert self.state == 'CONNECTED', self.state 102 self.state = 'EOF' 103 104 def connection_lost(self, exc): 105 assert self.state in ('CONNECTED', 'EOF'), self.state 106 self.state = 'CLOSED' 107 if self.done: 108 self.done.set_result(None) 109 110 111class MyProto(MyBaseProto): 112 def connection_made(self, transport): 113 super().connection_made(transport) 114 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 115 116 117class MyDatagramProto(asyncio.DatagramProtocol): 118 done = None 119 120 def __init__(self, loop=None): 121 self.state = 'INITIAL' 122 self.nbytes = 0 123 if loop is not None: 124 self.done = loop.create_future() 125 126 def connection_made(self, transport): 127 self.transport = transport 128 assert self.state == 'INITIAL', self.state 129 self.state = 'INITIALIZED' 130 131 def datagram_received(self, data, addr): 132 assert self.state == 'INITIALIZED', self.state 133 self.nbytes += len(data) 134 135 def error_received(self, exc): 136 assert self.state == 'INITIALIZED', self.state 137 138 def connection_lost(self, exc): 139 assert self.state == 'INITIALIZED', self.state 140 self.state = 'CLOSED' 141 if self.done: 142 self.done.set_result(None) 143 144 145class MyReadPipeProto(asyncio.Protocol): 146 done = None 147 148 def __init__(self, loop=None): 149 self.state = ['INITIAL'] 150 self.nbytes = 0 151 self.transport = None 152 if loop is not None: 153 self.done = loop.create_future() 154 155 def connection_made(self, transport): 156 self.transport = transport 157 assert self.state == ['INITIAL'], self.state 158 self.state.append('CONNECTED') 159 160 def data_received(self, data): 161 assert self.state == ['INITIAL', 'CONNECTED'], self.state 162 self.nbytes += len(data) 163 164 def eof_received(self): 165 assert self.state == ['INITIAL', 'CONNECTED'], self.state 166 self.state.append('EOF') 167 168 def connection_lost(self, exc): 169 if 'EOF' not in self.state: 170 self.state.append('EOF') # It is okay if EOF is missed. 171 assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state 172 self.state.append('CLOSED') 173 if self.done: 174 self.done.set_result(None) 175 176 177class MyWritePipeProto(asyncio.BaseProtocol): 178 done = None 179 180 def __init__(self, loop=None): 181 self.state = 'INITIAL' 182 self.transport = None 183 if loop is not None: 184 self.done = loop.create_future() 185 186 def connection_made(self, transport): 187 self.transport = transport 188 assert self.state == 'INITIAL', self.state 189 self.state = 'CONNECTED' 190 191 def connection_lost(self, exc): 192 assert self.state == 'CONNECTED', self.state 193 self.state = 'CLOSED' 194 if self.done: 195 self.done.set_result(None) 196 197 198class MySubprocessProtocol(asyncio.SubprocessProtocol): 199 200 def __init__(self, loop): 201 self.state = 'INITIAL' 202 self.transport = None 203 self.connected = loop.create_future() 204 self.completed = loop.create_future() 205 self.disconnects = {fd: loop.create_future() for fd in range(3)} 206 self.data = {1: b'', 2: b''} 207 self.returncode = None 208 self.got_data = {1: asyncio.Event(), 209 2: asyncio.Event()} 210 211 def connection_made(self, transport): 212 self.transport = transport 213 assert self.state == 'INITIAL', self.state 214 self.state = 'CONNECTED' 215 self.connected.set_result(None) 216 217 def connection_lost(self, exc): 218 assert self.state == 'CONNECTED', self.state 219 self.state = 'CLOSED' 220 self.completed.set_result(None) 221 222 def pipe_data_received(self, fd, data): 223 assert self.state == 'CONNECTED', self.state 224 self.data[fd] += data 225 self.got_data[fd].set() 226 227 def pipe_connection_lost(self, fd, exc): 228 assert self.state == 'CONNECTED', self.state 229 if exc: 230 self.disconnects[fd].set_exception(exc) 231 else: 232 self.disconnects[fd].set_result(exc) 233 234 def process_exited(self): 235 assert self.state == 'CONNECTED', self.state 236 self.returncode = self.transport.get_returncode() 237 238 239class EventLoopTestsMixin: 240 241 def setUp(self): 242 super().setUp() 243 self.loop = self.create_event_loop() 244 self.set_event_loop(self.loop) 245 246 def tearDown(self): 247 # just in case if we have transport close callbacks 248 if not self.loop.is_closed(): 249 test_utils.run_briefly(self.loop) 250 251 self.doCleanups() 252 support.gc_collect() 253 super().tearDown() 254 255 def test_run_until_complete_nesting(self): 256 async def coro1(): 257 await asyncio.sleep(0) 258 259 async def coro2(): 260 self.assertTrue(self.loop.is_running()) 261 self.loop.run_until_complete(coro1()) 262 263 with self.assertWarnsRegex( 264 RuntimeWarning, 265 r"coroutine \S+ was never awaited" 266 ): 267 self.assertRaises( 268 RuntimeError, self.loop.run_until_complete, coro2()) 269 270 # Note: because of the default Windows timing granularity of 271 # 15.6 msec, we use fairly long sleep times here (~100 msec). 272 273 def test_run_until_complete(self): 274 t0 = self.loop.time() 275 self.loop.run_until_complete(asyncio.sleep(0.1)) 276 t1 = self.loop.time() 277 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 278 279 def test_run_until_complete_stopped(self): 280 281 async def cb(): 282 self.loop.stop() 283 await asyncio.sleep(0.1) 284 task = cb() 285 self.assertRaises(RuntimeError, 286 self.loop.run_until_complete, task) 287 288 def test_call_later(self): 289 results = [] 290 291 def callback(arg): 292 results.append(arg) 293 self.loop.stop() 294 295 self.loop.call_later(0.1, callback, 'hello world') 296 self.loop.run_forever() 297 self.assertEqual(results, ['hello world']) 298 299 def test_call_soon(self): 300 results = [] 301 302 def callback(arg1, arg2): 303 results.append((arg1, arg2)) 304 self.loop.stop() 305 306 self.loop.call_soon(callback, 'hello', 'world') 307 self.loop.run_forever() 308 self.assertEqual(results, [('hello', 'world')]) 309 310 def test_call_soon_threadsafe(self): 311 results = [] 312 lock = threading.Lock() 313 314 def callback(arg): 315 results.append(arg) 316 if len(results) >= 2: 317 self.loop.stop() 318 319 def run_in_thread(): 320 self.loop.call_soon_threadsafe(callback, 'hello') 321 lock.release() 322 323 lock.acquire() 324 t = threading.Thread(target=run_in_thread) 325 t.start() 326 327 with lock: 328 self.loop.call_soon(callback, 'world') 329 self.loop.run_forever() 330 t.join() 331 self.assertEqual(results, ['hello', 'world']) 332 333 def test_call_soon_threadsafe_same_thread(self): 334 results = [] 335 336 def callback(arg): 337 results.append(arg) 338 if len(results) >= 2: 339 self.loop.stop() 340 341 self.loop.call_soon_threadsafe(callback, 'hello') 342 self.loop.call_soon(callback, 'world') 343 self.loop.run_forever() 344 self.assertEqual(results, ['hello', 'world']) 345 346 def test_run_in_executor(self): 347 def run(arg): 348 return (arg, threading.get_ident()) 349 f2 = self.loop.run_in_executor(None, run, 'yo') 350 res, thread_id = self.loop.run_until_complete(f2) 351 self.assertEqual(res, 'yo') 352 self.assertNotEqual(thread_id, threading.get_ident()) 353 354 def test_run_in_executor_cancel(self): 355 called = False 356 357 def patched_call_soon(*args): 358 nonlocal called 359 called = True 360 361 def run(): 362 time.sleep(0.05) 363 364 f2 = self.loop.run_in_executor(None, run) 365 f2.cancel() 366 self.loop.run_until_complete( 367 self.loop.shutdown_default_executor()) 368 self.loop.close() 369 self.loop.call_soon = patched_call_soon 370 self.loop.call_soon_threadsafe = patched_call_soon 371 time.sleep(0.4) 372 self.assertFalse(called) 373 374 def test_reader_callback(self): 375 r, w = socket.socketpair() 376 r.setblocking(False) 377 bytes_read = bytearray() 378 379 def reader(): 380 try: 381 data = r.recv(1024) 382 except BlockingIOError: 383 # Spurious readiness notifications are possible 384 # at least on Linux -- see man select. 385 return 386 if data: 387 bytes_read.extend(data) 388 else: 389 self.assertTrue(self.loop.remove_reader(r.fileno())) 390 r.close() 391 392 self.loop.add_reader(r.fileno(), reader) 393 self.loop.call_soon(w.send, b'abc') 394 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) 395 self.loop.call_soon(w.send, b'def') 396 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) 397 self.loop.call_soon(w.close) 398 self.loop.call_soon(self.loop.stop) 399 self.loop.run_forever() 400 self.assertEqual(bytes_read, b'abcdef') 401 402 def test_writer_callback(self): 403 r, w = socket.socketpair() 404 w.setblocking(False) 405 406 def writer(data): 407 w.send(data) 408 self.loop.stop() 409 410 data = b'x' * 1024 411 self.loop.add_writer(w.fileno(), writer, data) 412 self.loop.run_forever() 413 414 self.assertTrue(self.loop.remove_writer(w.fileno())) 415 self.assertFalse(self.loop.remove_writer(w.fileno())) 416 417 w.close() 418 read = r.recv(len(data) * 2) 419 r.close() 420 self.assertEqual(read, data) 421 422 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL') 423 def test_add_signal_handler(self): 424 caught = 0 425 426 def my_handler(): 427 nonlocal caught 428 caught += 1 429 430 # Check error behavior first. 431 self.assertRaises( 432 TypeError, self.loop.add_signal_handler, 'boom', my_handler) 433 self.assertRaises( 434 TypeError, self.loop.remove_signal_handler, 'boom') 435 self.assertRaises( 436 ValueError, self.loop.add_signal_handler, signal.NSIG+1, 437 my_handler) 438 self.assertRaises( 439 ValueError, self.loop.remove_signal_handler, signal.NSIG+1) 440 self.assertRaises( 441 ValueError, self.loop.add_signal_handler, 0, my_handler) 442 self.assertRaises( 443 ValueError, self.loop.remove_signal_handler, 0) 444 self.assertRaises( 445 ValueError, self.loop.add_signal_handler, -1, my_handler) 446 self.assertRaises( 447 ValueError, self.loop.remove_signal_handler, -1) 448 self.assertRaises( 449 RuntimeError, self.loop.add_signal_handler, signal.SIGKILL, 450 my_handler) 451 # Removing SIGKILL doesn't raise, since we don't call signal(). 452 self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) 453 # Now set a handler and handle it. 454 self.loop.add_signal_handler(signal.SIGINT, my_handler) 455 456 os.kill(os.getpid(), signal.SIGINT) 457 test_utils.run_until(self.loop, lambda: caught) 458 459 # Removing it should restore the default handler. 460 self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) 461 self.assertEqual(signal.getsignal(signal.SIGINT), 462 signal.default_int_handler) 463 # Removing again returns False. 464 self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT)) 465 466 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 467 def test_signal_handling_while_selecting(self): 468 # Test with a signal actually arriving during a select() call. 469 caught = 0 470 471 def my_handler(): 472 nonlocal caught 473 caught += 1 474 self.loop.stop() 475 476 self.loop.add_signal_handler(signal.SIGALRM, my_handler) 477 478 signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. 479 self.loop.call_later(60, self.loop.stop) 480 self.loop.run_forever() 481 self.assertEqual(caught, 1) 482 483 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 484 def test_signal_handling_args(self): 485 some_args = (42,) 486 caught = 0 487 488 def my_handler(*args): 489 nonlocal caught 490 caught += 1 491 self.assertEqual(args, some_args) 492 self.loop.stop() 493 494 self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) 495 496 signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. 497 self.loop.call_later(60, self.loop.stop) 498 self.loop.run_forever() 499 self.assertEqual(caught, 1) 500 501 def _basetest_create_connection(self, connection_fut, check_sockname=True): 502 tr, pr = self.loop.run_until_complete(connection_fut) 503 self.assertIsInstance(tr, asyncio.Transport) 504 self.assertIsInstance(pr, asyncio.Protocol) 505 self.assertIs(pr.transport, tr) 506 if check_sockname: 507 self.assertIsNotNone(tr.get_extra_info('sockname')) 508 self.loop.run_until_complete(pr.done) 509 self.assertGreater(pr.nbytes, 0) 510 tr.close() 511 512 def test_create_connection(self): 513 with test_utils.run_test_server() as httpd: 514 conn_fut = self.loop.create_connection( 515 lambda: MyProto(loop=self.loop), *httpd.address) 516 self._basetest_create_connection(conn_fut) 517 518 @socket_helper.skip_unless_bind_unix_socket 519 def test_create_unix_connection(self): 520 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 521 # zero-length address for UNIX socket. 522 check_sockname = not broken_unix_getsockname() 523 524 with test_utils.run_test_unix_server() as httpd: 525 conn_fut = self.loop.create_unix_connection( 526 lambda: MyProto(loop=self.loop), httpd.address) 527 self._basetest_create_connection(conn_fut, check_sockname) 528 529 def check_ssl_extra_info(self, client, check_sockname=True, 530 peername=None, peercert={}): 531 if check_sockname: 532 self.assertIsNotNone(client.get_extra_info('sockname')) 533 if peername: 534 self.assertEqual(peername, 535 client.get_extra_info('peername')) 536 else: 537 self.assertIsNotNone(client.get_extra_info('peername')) 538 self.assertEqual(peercert, 539 client.get_extra_info('peercert')) 540 541 # test SSL cipher 542 cipher = client.get_extra_info('cipher') 543 self.assertIsInstance(cipher, tuple) 544 self.assertEqual(len(cipher), 3, cipher) 545 self.assertIsInstance(cipher[0], str) 546 self.assertIsInstance(cipher[1], str) 547 self.assertIsInstance(cipher[2], int) 548 549 # test SSL object 550 sslobj = client.get_extra_info('ssl_object') 551 self.assertIsNotNone(sslobj) 552 self.assertEqual(sslobj.compression(), 553 client.get_extra_info('compression')) 554 self.assertEqual(sslobj.cipher(), 555 client.get_extra_info('cipher')) 556 self.assertEqual(sslobj.getpeercert(), 557 client.get_extra_info('peercert')) 558 self.assertEqual(sslobj.compression(), 559 client.get_extra_info('compression')) 560 561 def _basetest_create_ssl_connection(self, connection_fut, 562 check_sockname=True, 563 peername=None): 564 tr, pr = self.loop.run_until_complete(connection_fut) 565 self.assertIsInstance(tr, asyncio.Transport) 566 self.assertIsInstance(pr, asyncio.Protocol) 567 self.assertTrue('ssl' in tr.__class__.__name__.lower()) 568 self.check_ssl_extra_info(tr, check_sockname, peername) 569 self.loop.run_until_complete(pr.done) 570 self.assertGreater(pr.nbytes, 0) 571 tr.close() 572 573 def _test_create_ssl_connection(self, httpd, create_connection, 574 check_sockname=True, peername=None): 575 conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) 576 self._basetest_create_ssl_connection(conn_fut, check_sockname, 577 peername) 578 579 # ssl.Purpose was introduced in Python 3.4 580 if hasattr(ssl, 'Purpose'): 581 def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *, 582 cafile=None, capath=None, 583 cadata=None): 584 """ 585 A ssl.create_default_context() replacement that doesn't enable 586 cert validation. 587 """ 588 self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH) 589 return test_utils.dummy_ssl_context() 590 591 # With ssl=True, ssl.create_default_context() should be called 592 with mock.patch('ssl.create_default_context', 593 side_effect=_dummy_ssl_create_context) as m: 594 conn_fut = create_connection(ssl=True) 595 self._basetest_create_ssl_connection(conn_fut, check_sockname, 596 peername) 597 self.assertEqual(m.call_count, 1) 598 599 # With the real ssl.create_default_context(), certificate 600 # validation will fail 601 with self.assertRaises(ssl.SSLError) as cm: 602 conn_fut = create_connection(ssl=True) 603 # Ignore the "SSL handshake failed" log in debug mode 604 with test_utils.disable_logger(): 605 self._basetest_create_ssl_connection(conn_fut, check_sockname, 606 peername) 607 608 self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 609 610 @unittest.skipIf(ssl is None, 'No ssl module') 611 def test_create_ssl_connection(self): 612 with test_utils.run_test_server(use_ssl=True) as httpd: 613 create_connection = functools.partial( 614 self.loop.create_connection, 615 lambda: MyProto(loop=self.loop), 616 *httpd.address) 617 self._test_create_ssl_connection(httpd, create_connection, 618 peername=httpd.address) 619 620 @socket_helper.skip_unless_bind_unix_socket 621 @unittest.skipIf(ssl is None, 'No ssl module') 622 def test_create_ssl_unix_connection(self): 623 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 624 # zero-length address for UNIX socket. 625 check_sockname = not broken_unix_getsockname() 626 627 with test_utils.run_test_unix_server(use_ssl=True) as httpd: 628 create_connection = functools.partial( 629 self.loop.create_unix_connection, 630 lambda: MyProto(loop=self.loop), httpd.address, 631 server_hostname='127.0.0.1') 632 633 self._test_create_ssl_connection(httpd, create_connection, 634 check_sockname, 635 peername=httpd.address) 636 637 def test_create_connection_local_addr(self): 638 with test_utils.run_test_server() as httpd: 639 port = socket_helper.find_unused_port() 640 f = self.loop.create_connection( 641 lambda: MyProto(loop=self.loop), 642 *httpd.address, local_addr=(httpd.address[0], port)) 643 tr, pr = self.loop.run_until_complete(f) 644 expected = pr.transport.get_extra_info('sockname')[1] 645 self.assertEqual(port, expected) 646 tr.close() 647 648 def test_create_connection_local_addr_in_use(self): 649 with test_utils.run_test_server() as httpd: 650 f = self.loop.create_connection( 651 lambda: MyProto(loop=self.loop), 652 *httpd.address, local_addr=httpd.address) 653 with self.assertRaises(OSError) as cm: 654 self.loop.run_until_complete(f) 655 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 656 self.assertIn(str(httpd.address), cm.exception.strerror) 657 658 def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None): 659 loop = self.loop 660 661 class MyProto(MyBaseProto): 662 663 def connection_lost(self, exc): 664 super().connection_lost(exc) 665 loop.call_soon(loop.stop) 666 667 def data_received(self, data): 668 super().data_received(data) 669 self.transport.write(expected_response) 670 671 lsock = socket.create_server(('127.0.0.1', 0), backlog=1) 672 addr = lsock.getsockname() 673 674 message = b'test data' 675 response = None 676 expected_response = b'roger' 677 678 def client(): 679 nonlocal response 680 try: 681 csock = socket.socket() 682 if client_ssl is not None: 683 csock = client_ssl.wrap_socket(csock) 684 csock.connect(addr) 685 csock.sendall(message) 686 response = csock.recv(99) 687 csock.close() 688 except Exception as exc: 689 print( 690 "Failure in client thread in test_connect_accepted_socket", 691 exc) 692 693 thread = threading.Thread(target=client, daemon=True) 694 thread.start() 695 696 conn, _ = lsock.accept() 697 proto = MyProto(loop=loop) 698 proto.loop = loop 699 loop.run_until_complete( 700 loop.connect_accepted_socket( 701 (lambda: proto), conn, ssl=server_ssl)) 702 loop.run_forever() 703 proto.transport.close() 704 lsock.close() 705 706 support.join_thread(thread) 707 self.assertFalse(thread.is_alive()) 708 self.assertEqual(proto.state, 'CLOSED') 709 self.assertEqual(proto.nbytes, len(message)) 710 self.assertEqual(response, expected_response) 711 712 @unittest.skipIf(ssl is None, 'No ssl module') 713 def test_ssl_connect_accepted_socket(self): 714 if (sys.platform == 'win32' and 715 sys.version_info < (3, 5) and 716 isinstance(self.loop, proactor_events.BaseProactorEventLoop) 717 ): 718 raise unittest.SkipTest( 719 'SSL not supported with proactor event loops before Python 3.5' 720 ) 721 722 server_context = test_utils.simple_server_sslcontext() 723 client_context = test_utils.simple_client_sslcontext() 724 725 self.test_connect_accepted_socket(server_context, client_context) 726 727 def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self): 728 sock = socket.socket() 729 self.addCleanup(sock.close) 730 coro = self.loop.connect_accepted_socket( 731 MyProto, sock, ssl_handshake_timeout=support.LOOPBACK_TIMEOUT) 732 with self.assertRaisesRegex( 733 ValueError, 734 'ssl_handshake_timeout is only meaningful with ssl'): 735 self.loop.run_until_complete(coro) 736 737 @mock.patch('asyncio.base_events.socket') 738 def create_server_multiple_hosts(self, family, hosts, mock_sock): 739 async def getaddrinfo(host, port, *args, **kw): 740 if family == socket.AF_INET: 741 return [(family, socket.SOCK_STREAM, 6, '', (host, port))] 742 else: 743 return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))] 744 745 def getaddrinfo_task(*args, **kwds): 746 return self.loop.create_task(getaddrinfo(*args, **kwds)) 747 748 unique_hosts = set(hosts) 749 750 if family == socket.AF_INET: 751 mock_sock.socket().getsockbyname.side_effect = [ 752 (host, 80) for host in unique_hosts] 753 else: 754 mock_sock.socket().getsockbyname.side_effect = [ 755 (host, 80, 0, 0) for host in unique_hosts] 756 self.loop.getaddrinfo = getaddrinfo_task 757 self.loop._start_serving = mock.Mock() 758 self.loop._stop_serving = mock.Mock() 759 f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80) 760 server = self.loop.run_until_complete(f) 761 self.addCleanup(server.close) 762 server_hosts = {sock.getsockbyname()[0] for sock in server.sockets} 763 self.assertEqual(server_hosts, unique_hosts) 764 765 def test_create_server_multiple_hosts_ipv4(self): 766 self.create_server_multiple_hosts(socket.AF_INET, 767 ['1.2.3.4', '5.6.7.8', '1.2.3.4']) 768 769 def test_create_server_multiple_hosts_ipv6(self): 770 self.create_server_multiple_hosts(socket.AF_INET6, 771 ['::1', '::2', '::1']) 772 773 def test_create_server(self): 774 proto = MyProto(self.loop) 775 f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) 776 server = self.loop.run_until_complete(f) 777 self.assertEqual(len(server.sockets), 1) 778 sock = server.sockets[0] 779 host, port = sock.getsockname() 780 self.assertEqual(host, '0.0.0.0') 781 client = socket.socket() 782 client.connect(('127.0.0.1', port)) 783 client.sendall(b'xxx') 784 785 self.loop.run_until_complete(proto.connected) 786 self.assertEqual('CONNECTED', proto.state) 787 788 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 789 self.assertEqual(3, proto.nbytes) 790 791 # extra info is available 792 self.assertIsNotNone(proto.transport.get_extra_info('sockname')) 793 self.assertEqual('127.0.0.1', 794 proto.transport.get_extra_info('peername')[0]) 795 796 # close connection 797 proto.transport.close() 798 self.loop.run_until_complete(proto.done) 799 800 self.assertEqual('CLOSED', proto.state) 801 802 # the client socket must be closed after to avoid ECONNRESET upon 803 # recv()/send() on the serving socket 804 client.close() 805 806 # close server 807 server.close() 808 809 @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT') 810 def test_create_server_reuse_port(self): 811 proto = MyProto(self.loop) 812 f = self.loop.create_server( 813 lambda: proto, '0.0.0.0', 0) 814 server = self.loop.run_until_complete(f) 815 self.assertEqual(len(server.sockets), 1) 816 sock = server.sockets[0] 817 self.assertFalse( 818 sock.getsockopt( 819 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 820 server.close() 821 822 test_utils.run_briefly(self.loop) 823 824 proto = MyProto(self.loop) 825 f = self.loop.create_server( 826 lambda: proto, '0.0.0.0', 0, reuse_port=True) 827 server = self.loop.run_until_complete(f) 828 self.assertEqual(len(server.sockets), 1) 829 sock = server.sockets[0] 830 self.assertTrue( 831 sock.getsockopt( 832 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 833 server.close() 834 835 def _make_unix_server(self, factory, **kwargs): 836 path = test_utils.gen_unix_socket_path() 837 self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) 838 839 f = self.loop.create_unix_server(factory, path, **kwargs) 840 server = self.loop.run_until_complete(f) 841 842 return server, path 843 844 @socket_helper.skip_unless_bind_unix_socket 845 def test_create_unix_server(self): 846 proto = MyProto(loop=self.loop) 847 server, path = self._make_unix_server(lambda: proto) 848 self.assertEqual(len(server.sockets), 1) 849 850 client = socket.socket(socket.AF_UNIX) 851 client.connect(path) 852 client.sendall(b'xxx') 853 854 self.loop.run_until_complete(proto.connected) 855 self.assertEqual('CONNECTED', proto.state) 856 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 857 self.assertEqual(3, proto.nbytes) 858 859 # close connection 860 proto.transport.close() 861 self.loop.run_until_complete(proto.done) 862 863 self.assertEqual('CLOSED', proto.state) 864 865 # the client socket must be closed after to avoid ECONNRESET upon 866 # recv()/send() on the serving socket 867 client.close() 868 869 # close server 870 server.close() 871 872 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 873 def test_create_unix_server_path_socket_error(self): 874 proto = MyProto(loop=self.loop) 875 sock = socket.socket() 876 with sock: 877 f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock) 878 with self.assertRaisesRegex(ValueError, 879 'path and sock can not be specified ' 880 'at the same time'): 881 self.loop.run_until_complete(f) 882 883 def _create_ssl_context(self, certfile, keyfile=None): 884 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 885 sslcontext.options |= ssl.OP_NO_SSLv2 886 sslcontext.load_cert_chain(certfile, keyfile) 887 return sslcontext 888 889 def _make_ssl_server(self, factory, certfile, keyfile=None): 890 sslcontext = self._create_ssl_context(certfile, keyfile) 891 892 f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext) 893 server = self.loop.run_until_complete(f) 894 895 sock = server.sockets[0] 896 host, port = sock.getsockname() 897 self.assertEqual(host, '127.0.0.1') 898 return server, host, port 899 900 def _make_ssl_unix_server(self, factory, certfile, keyfile=None): 901 sslcontext = self._create_ssl_context(certfile, keyfile) 902 return self._make_unix_server(factory, ssl=sslcontext) 903 904 @unittest.skipIf(ssl is None, 'No ssl module') 905 def test_create_server_ssl(self): 906 proto = MyProto(loop=self.loop) 907 server, host, port = self._make_ssl_server( 908 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 909 910 f_c = self.loop.create_connection(MyBaseProto, host, port, 911 ssl=test_utils.dummy_ssl_context()) 912 client, pr = self.loop.run_until_complete(f_c) 913 914 client.write(b'xxx') 915 self.loop.run_until_complete(proto.connected) 916 self.assertEqual('CONNECTED', proto.state) 917 918 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 919 self.assertEqual(3, proto.nbytes) 920 921 # extra info is available 922 self.check_ssl_extra_info(client, peername=(host, port)) 923 924 # close connection 925 proto.transport.close() 926 self.loop.run_until_complete(proto.done) 927 self.assertEqual('CLOSED', proto.state) 928 929 # the client socket must be closed after to avoid ECONNRESET upon 930 # recv()/send() on the serving socket 931 client.close() 932 933 # stop serving 934 server.close() 935 936 @socket_helper.skip_unless_bind_unix_socket 937 @unittest.skipIf(ssl is None, 'No ssl module') 938 def test_create_unix_server_ssl(self): 939 proto = MyProto(loop=self.loop) 940 server, path = self._make_ssl_unix_server( 941 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 942 943 f_c = self.loop.create_unix_connection( 944 MyBaseProto, path, ssl=test_utils.dummy_ssl_context(), 945 server_hostname='') 946 947 client, pr = self.loop.run_until_complete(f_c) 948 949 client.write(b'xxx') 950 self.loop.run_until_complete(proto.connected) 951 self.assertEqual('CONNECTED', proto.state) 952 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 953 self.assertEqual(3, proto.nbytes) 954 955 # close connection 956 proto.transport.close() 957 self.loop.run_until_complete(proto.done) 958 self.assertEqual('CLOSED', proto.state) 959 960 # the client socket must be closed after to avoid ECONNRESET upon 961 # recv()/send() on the serving socket 962 client.close() 963 964 # stop serving 965 server.close() 966 967 @unittest.skipIf(ssl is None, 'No ssl module') 968 def test_create_server_ssl_verify_failed(self): 969 proto = MyProto(loop=self.loop) 970 server, host, port = self._make_ssl_server( 971 lambda: proto, test_utils.SIGNED_CERTFILE) 972 973 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 974 sslcontext_client.options |= ssl.OP_NO_SSLv2 975 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 976 if hasattr(sslcontext_client, 'check_hostname'): 977 sslcontext_client.check_hostname = True 978 979 980 # no CA loaded 981 f_c = self.loop.create_connection(MyProto, host, port, 982 ssl=sslcontext_client) 983 with mock.patch.object(self.loop, 'call_exception_handler'): 984 with test_utils.disable_logger(): 985 with self.assertRaisesRegex(ssl.SSLError, 986 '(?i)certificate.verify.failed'): 987 self.loop.run_until_complete(f_c) 988 989 # execute the loop to log the connection error 990 test_utils.run_briefly(self.loop) 991 992 # close connection 993 self.assertIsNone(proto.transport) 994 server.close() 995 996 @socket_helper.skip_unless_bind_unix_socket 997 @unittest.skipIf(ssl is None, 'No ssl module') 998 def test_create_unix_server_ssl_verify_failed(self): 999 proto = MyProto(loop=self.loop) 1000 server, path = self._make_ssl_unix_server( 1001 lambda: proto, test_utils.SIGNED_CERTFILE) 1002 1003 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1004 sslcontext_client.options |= ssl.OP_NO_SSLv2 1005 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1006 if hasattr(sslcontext_client, 'check_hostname'): 1007 sslcontext_client.check_hostname = True 1008 1009 # no CA loaded 1010 f_c = self.loop.create_unix_connection(MyProto, path, 1011 ssl=sslcontext_client, 1012 server_hostname='invalid') 1013 with mock.patch.object(self.loop, 'call_exception_handler'): 1014 with test_utils.disable_logger(): 1015 with self.assertRaisesRegex(ssl.SSLError, 1016 '(?i)certificate.verify.failed'): 1017 self.loop.run_until_complete(f_c) 1018 1019 # execute the loop to log the connection error 1020 test_utils.run_briefly(self.loop) 1021 1022 # close connection 1023 self.assertIsNone(proto.transport) 1024 server.close() 1025 1026 @unittest.skipIf(ssl is None, 'No ssl module') 1027 def test_create_server_ssl_match_failed(self): 1028 proto = MyProto(loop=self.loop) 1029 server, host, port = self._make_ssl_server( 1030 lambda: proto, test_utils.SIGNED_CERTFILE) 1031 1032 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1033 sslcontext_client.options |= ssl.OP_NO_SSLv2 1034 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1035 sslcontext_client.load_verify_locations( 1036 cafile=test_utils.SIGNING_CA) 1037 if hasattr(sslcontext_client, 'check_hostname'): 1038 sslcontext_client.check_hostname = True 1039 1040 # incorrect server_hostname 1041 f_c = self.loop.create_connection(MyProto, host, port, 1042 ssl=sslcontext_client) 1043 with mock.patch.object(self.loop, 'call_exception_handler'): 1044 with test_utils.disable_logger(): 1045 with self.assertRaisesRegex( 1046 ssl.CertificateError, 1047 "IP address mismatch, certificate is not valid for " 1048 "'127.0.0.1'"): 1049 self.loop.run_until_complete(f_c) 1050 1051 # close connection 1052 # transport is None because TLS ALERT aborted the handshake 1053 self.assertIsNone(proto.transport) 1054 server.close() 1055 1056 @socket_helper.skip_unless_bind_unix_socket 1057 @unittest.skipIf(ssl is None, 'No ssl module') 1058 def test_create_unix_server_ssl_verified(self): 1059 proto = MyProto(loop=self.loop) 1060 server, path = self._make_ssl_unix_server( 1061 lambda: proto, test_utils.SIGNED_CERTFILE) 1062 1063 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1064 sslcontext_client.options |= ssl.OP_NO_SSLv2 1065 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1066 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1067 if hasattr(sslcontext_client, 'check_hostname'): 1068 sslcontext_client.check_hostname = True 1069 1070 # Connection succeeds with correct CA and server hostname. 1071 f_c = self.loop.create_unix_connection(MyProto, path, 1072 ssl=sslcontext_client, 1073 server_hostname='localhost') 1074 client, pr = self.loop.run_until_complete(f_c) 1075 self.loop.run_until_complete(proto.connected) 1076 1077 # close connection 1078 proto.transport.close() 1079 client.close() 1080 server.close() 1081 self.loop.run_until_complete(proto.done) 1082 1083 @unittest.skipIf(ssl is None, 'No ssl module') 1084 def test_create_server_ssl_verified(self): 1085 proto = MyProto(loop=self.loop) 1086 server, host, port = self._make_ssl_server( 1087 lambda: proto, test_utils.SIGNED_CERTFILE) 1088 1089 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1090 sslcontext_client.options |= ssl.OP_NO_SSLv2 1091 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1092 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1093 if hasattr(sslcontext_client, 'check_hostname'): 1094 sslcontext_client.check_hostname = True 1095 1096 # Connection succeeds with correct CA and server hostname. 1097 f_c = self.loop.create_connection(MyProto, host, port, 1098 ssl=sslcontext_client, 1099 server_hostname='localhost') 1100 client, pr = self.loop.run_until_complete(f_c) 1101 self.loop.run_until_complete(proto.connected) 1102 1103 # extra info is available 1104 self.check_ssl_extra_info(client, peername=(host, port), 1105 peercert=test_utils.PEERCERT) 1106 1107 # close connection 1108 proto.transport.close() 1109 client.close() 1110 server.close() 1111 self.loop.run_until_complete(proto.done) 1112 1113 def test_create_server_sock(self): 1114 proto = self.loop.create_future() 1115 1116 class TestMyProto(MyProto): 1117 def connection_made(self, transport): 1118 super().connection_made(transport) 1119 proto.set_result(self) 1120 1121 sock_ob = socket.create_server(('0.0.0.0', 0)) 1122 1123 f = self.loop.create_server(TestMyProto, sock=sock_ob) 1124 server = self.loop.run_until_complete(f) 1125 sock = server.sockets[0] 1126 self.assertEqual(sock.fileno(), sock_ob.fileno()) 1127 1128 host, port = sock.getsockname() 1129 self.assertEqual(host, '0.0.0.0') 1130 client = socket.socket() 1131 client.connect(('127.0.0.1', port)) 1132 client.send(b'xxx') 1133 client.close() 1134 server.close() 1135 1136 def test_create_server_addr_in_use(self): 1137 sock_ob = socket.create_server(('0.0.0.0', 0)) 1138 1139 f = self.loop.create_server(MyProto, sock=sock_ob) 1140 server = self.loop.run_until_complete(f) 1141 sock = server.sockets[0] 1142 host, port = sock.getsockname() 1143 1144 f = self.loop.create_server(MyProto, host=host, port=port) 1145 with self.assertRaises(OSError) as cm: 1146 self.loop.run_until_complete(f) 1147 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 1148 1149 server.close() 1150 1151 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 1152 def test_create_server_dual_stack(self): 1153 f_proto = self.loop.create_future() 1154 1155 class TestMyProto(MyProto): 1156 def connection_made(self, transport): 1157 super().connection_made(transport) 1158 f_proto.set_result(self) 1159 1160 try_count = 0 1161 while True: 1162 try: 1163 port = socket_helper.find_unused_port() 1164 f = self.loop.create_server(TestMyProto, host=None, port=port) 1165 server = self.loop.run_until_complete(f) 1166 except OSError as ex: 1167 if ex.errno == errno.EADDRINUSE: 1168 try_count += 1 1169 self.assertGreaterEqual(5, try_count) 1170 continue 1171 else: 1172 raise 1173 else: 1174 break 1175 client = socket.socket() 1176 client.connect(('127.0.0.1', port)) 1177 client.send(b'xxx') 1178 proto = self.loop.run_until_complete(f_proto) 1179 proto.transport.close() 1180 client.close() 1181 1182 f_proto = self.loop.create_future() 1183 client = socket.socket(socket.AF_INET6) 1184 client.connect(('::1', port)) 1185 client.send(b'xxx') 1186 proto = self.loop.run_until_complete(f_proto) 1187 proto.transport.close() 1188 client.close() 1189 1190 server.close() 1191 1192 def test_server_close(self): 1193 f = self.loop.create_server(MyProto, '0.0.0.0', 0) 1194 server = self.loop.run_until_complete(f) 1195 sock = server.sockets[0] 1196 host, port = sock.getsockname() 1197 1198 client = socket.socket() 1199 client.connect(('127.0.0.1', port)) 1200 client.send(b'xxx') 1201 client.close() 1202 1203 server.close() 1204 1205 client = socket.socket() 1206 self.assertRaises( 1207 ConnectionRefusedError, client.connect, ('127.0.0.1', port)) 1208 client.close() 1209 1210 def _test_create_datagram_endpoint(self, local_addr, family): 1211 class TestMyDatagramProto(MyDatagramProto): 1212 def __init__(inner_self): 1213 super().__init__(loop=self.loop) 1214 1215 def datagram_received(self, data, addr): 1216 super().datagram_received(data, addr) 1217 self.transport.sendto(b'resp:'+data, addr) 1218 1219 coro = self.loop.create_datagram_endpoint( 1220 TestMyDatagramProto, local_addr=local_addr, family=family) 1221 s_transport, server = self.loop.run_until_complete(coro) 1222 sockname = s_transport.get_extra_info('sockname') 1223 host, port = socket.getnameinfo( 1224 sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV) 1225 1226 self.assertIsInstance(s_transport, asyncio.Transport) 1227 self.assertIsInstance(server, TestMyDatagramProto) 1228 self.assertEqual('INITIALIZED', server.state) 1229 self.assertIs(server.transport, s_transport) 1230 1231 coro = self.loop.create_datagram_endpoint( 1232 lambda: MyDatagramProto(loop=self.loop), 1233 remote_addr=(host, port)) 1234 transport, client = self.loop.run_until_complete(coro) 1235 1236 self.assertIsInstance(transport, asyncio.Transport) 1237 self.assertIsInstance(client, MyDatagramProto) 1238 self.assertEqual('INITIALIZED', client.state) 1239 self.assertIs(client.transport, transport) 1240 1241 transport.sendto(b'xxx') 1242 test_utils.run_until(self.loop, lambda: server.nbytes) 1243 self.assertEqual(3, server.nbytes) 1244 test_utils.run_until(self.loop, lambda: client.nbytes) 1245 1246 # received 1247 self.assertEqual(8, client.nbytes) 1248 1249 # extra info is available 1250 self.assertIsNotNone(transport.get_extra_info('sockname')) 1251 1252 # close connection 1253 transport.close() 1254 self.loop.run_until_complete(client.done) 1255 self.assertEqual('CLOSED', client.state) 1256 server.transport.close() 1257 1258 def test_create_datagram_endpoint(self): 1259 self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET) 1260 1261 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 1262 def test_create_datagram_endpoint_ipv6(self): 1263 self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6) 1264 1265 def test_create_datagram_endpoint_sock(self): 1266 sock = None 1267 local_address = ('127.0.0.1', 0) 1268 infos = self.loop.run_until_complete( 1269 self.loop.getaddrinfo( 1270 *local_address, type=socket.SOCK_DGRAM)) 1271 for family, type, proto, cname, address in infos: 1272 try: 1273 sock = socket.socket(family=family, type=type, proto=proto) 1274 sock.setblocking(False) 1275 sock.bind(address) 1276 except: 1277 pass 1278 else: 1279 break 1280 else: 1281 assert False, 'Can not create socket.' 1282 1283 f = self.loop.create_datagram_endpoint( 1284 lambda: MyDatagramProto(loop=self.loop), sock=sock) 1285 tr, pr = self.loop.run_until_complete(f) 1286 self.assertIsInstance(tr, asyncio.Transport) 1287 self.assertIsInstance(pr, MyDatagramProto) 1288 tr.close() 1289 self.loop.run_until_complete(pr.done) 1290 1291 def test_internal_fds(self): 1292 loop = self.create_event_loop() 1293 if not isinstance(loop, selector_events.BaseSelectorEventLoop): 1294 loop.close() 1295 self.skipTest('loop is not a BaseSelectorEventLoop') 1296 1297 self.assertEqual(1, loop._internal_fds) 1298 loop.close() 1299 self.assertEqual(0, loop._internal_fds) 1300 self.assertIsNone(loop._csock) 1301 self.assertIsNone(loop._ssock) 1302 1303 @unittest.skipUnless(sys.platform != 'win32', 1304 "Don't support pipes for Windows") 1305 def test_read_pipe(self): 1306 proto = MyReadPipeProto(loop=self.loop) 1307 1308 rpipe, wpipe = os.pipe() 1309 pipeobj = io.open(rpipe, 'rb', 1024) 1310 1311 async def connect(): 1312 t, p = await self.loop.connect_read_pipe( 1313 lambda: proto, pipeobj) 1314 self.assertIs(p, proto) 1315 self.assertIs(t, proto.transport) 1316 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1317 self.assertEqual(0, proto.nbytes) 1318 1319 self.loop.run_until_complete(connect()) 1320 1321 os.write(wpipe, b'1') 1322 test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) 1323 self.assertEqual(1, proto.nbytes) 1324 1325 os.write(wpipe, b'2345') 1326 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1327 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1328 self.assertEqual(5, proto.nbytes) 1329 1330 os.close(wpipe) 1331 self.loop.run_until_complete(proto.done) 1332 self.assertEqual( 1333 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1334 # extra info is available 1335 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1336 1337 @unittest.skipUnless(sys.platform != 'win32', 1338 "Don't support pipes for Windows") 1339 def test_unclosed_pipe_transport(self): 1340 # This test reproduces the issue #314 on GitHub 1341 loop = self.create_event_loop() 1342 read_proto = MyReadPipeProto(loop=loop) 1343 write_proto = MyWritePipeProto(loop=loop) 1344 1345 rpipe, wpipe = os.pipe() 1346 rpipeobj = io.open(rpipe, 'rb', 1024) 1347 wpipeobj = io.open(wpipe, 'w', 1024) 1348 1349 async def connect(): 1350 read_transport, _ = await loop.connect_read_pipe( 1351 lambda: read_proto, rpipeobj) 1352 write_transport, _ = await loop.connect_write_pipe( 1353 lambda: write_proto, wpipeobj) 1354 return read_transport, write_transport 1355 1356 # Run and close the loop without closing the transports 1357 read_transport, write_transport = loop.run_until_complete(connect()) 1358 loop.close() 1359 1360 # These 'repr' calls used to raise an AttributeError 1361 # See Issue #314 on GitHub 1362 self.assertIn('open', repr(read_transport)) 1363 self.assertIn('open', repr(write_transport)) 1364 1365 # Clean up (avoid ResourceWarning) 1366 rpipeobj.close() 1367 wpipeobj.close() 1368 read_transport._pipe = None 1369 write_transport._pipe = None 1370 1371 @unittest.skipUnless(sys.platform != 'win32', 1372 "Don't support pipes for Windows") 1373 def test_read_pty_output(self): 1374 proto = MyReadPipeProto(loop=self.loop) 1375 1376 master, slave = os.openpty() 1377 master_read_obj = io.open(master, 'rb', 0) 1378 1379 async def connect(): 1380 t, p = await self.loop.connect_read_pipe(lambda: proto, 1381 master_read_obj) 1382 self.assertIs(p, proto) 1383 self.assertIs(t, proto.transport) 1384 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1385 self.assertEqual(0, proto.nbytes) 1386 1387 self.loop.run_until_complete(connect()) 1388 1389 os.write(slave, b'1') 1390 test_utils.run_until(self.loop, lambda: proto.nbytes) 1391 self.assertEqual(1, proto.nbytes) 1392 1393 os.write(slave, b'2345') 1394 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1395 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1396 self.assertEqual(5, proto.nbytes) 1397 1398 os.close(slave) 1399 proto.transport.close() 1400 self.loop.run_until_complete(proto.done) 1401 self.assertEqual( 1402 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1403 # extra info is available 1404 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1405 1406 @unittest.skipUnless(sys.platform != 'win32', 1407 "Don't support pipes for Windows") 1408 def test_write_pipe(self): 1409 rpipe, wpipe = os.pipe() 1410 pipeobj = io.open(wpipe, 'wb', 1024) 1411 1412 proto = MyWritePipeProto(loop=self.loop) 1413 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1414 transport, p = self.loop.run_until_complete(connect) 1415 self.assertIs(p, proto) 1416 self.assertIs(transport, proto.transport) 1417 self.assertEqual('CONNECTED', proto.state) 1418 1419 transport.write(b'1') 1420 1421 data = bytearray() 1422 def reader(data): 1423 chunk = os.read(rpipe, 1024) 1424 data += chunk 1425 return len(data) 1426 1427 test_utils.run_until(self.loop, lambda: reader(data) >= 1) 1428 self.assertEqual(b'1', data) 1429 1430 transport.write(b'2345') 1431 test_utils.run_until(self.loop, lambda: reader(data) >= 5) 1432 self.assertEqual(b'12345', data) 1433 self.assertEqual('CONNECTED', proto.state) 1434 1435 os.close(rpipe) 1436 1437 # extra info is available 1438 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1439 1440 # close connection 1441 proto.transport.close() 1442 self.loop.run_until_complete(proto.done) 1443 self.assertEqual('CLOSED', proto.state) 1444 1445 @unittest.skipUnless(sys.platform != 'win32', 1446 "Don't support pipes for Windows") 1447 def test_write_pipe_disconnect_on_close(self): 1448 rsock, wsock = socket.socketpair() 1449 rsock.setblocking(False) 1450 pipeobj = io.open(wsock.detach(), 'wb', 1024) 1451 1452 proto = MyWritePipeProto(loop=self.loop) 1453 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1454 transport, p = self.loop.run_until_complete(connect) 1455 self.assertIs(p, proto) 1456 self.assertIs(transport, proto.transport) 1457 self.assertEqual('CONNECTED', proto.state) 1458 1459 transport.write(b'1') 1460 data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) 1461 self.assertEqual(b'1', data) 1462 1463 rsock.close() 1464 1465 self.loop.run_until_complete(proto.done) 1466 self.assertEqual('CLOSED', proto.state) 1467 1468 @unittest.skipUnless(sys.platform != 'win32', 1469 "Don't support pipes for Windows") 1470 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1471 # older than 10.6 (Snow Leopard) 1472 @support.requires_mac_ver(10, 6) 1473 def test_write_pty(self): 1474 master, slave = os.openpty() 1475 slave_write_obj = io.open(slave, 'wb', 0) 1476 1477 proto = MyWritePipeProto(loop=self.loop) 1478 connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) 1479 transport, p = self.loop.run_until_complete(connect) 1480 self.assertIs(p, proto) 1481 self.assertIs(transport, proto.transport) 1482 self.assertEqual('CONNECTED', proto.state) 1483 1484 transport.write(b'1') 1485 1486 data = bytearray() 1487 def reader(data): 1488 chunk = os.read(master, 1024) 1489 data += chunk 1490 return len(data) 1491 1492 test_utils.run_until(self.loop, lambda: reader(data) >= 1, 1493 timeout=support.SHORT_TIMEOUT) 1494 self.assertEqual(b'1', data) 1495 1496 transport.write(b'2345') 1497 test_utils.run_until(self.loop, lambda: reader(data) >= 5, 1498 timeout=support.SHORT_TIMEOUT) 1499 self.assertEqual(b'12345', data) 1500 self.assertEqual('CONNECTED', proto.state) 1501 1502 os.close(master) 1503 1504 # extra info is available 1505 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1506 1507 # close connection 1508 proto.transport.close() 1509 self.loop.run_until_complete(proto.done) 1510 self.assertEqual('CLOSED', proto.state) 1511 1512 @unittest.skipUnless(sys.platform != 'win32', 1513 "Don't support pipes for Windows") 1514 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1515 # older than 10.6 (Snow Leopard) 1516 @support.requires_mac_ver(10, 6) 1517 def test_bidirectional_pty(self): 1518 master, read_slave = os.openpty() 1519 write_slave = os.dup(read_slave) 1520 tty.setraw(read_slave) 1521 1522 slave_read_obj = io.open(read_slave, 'rb', 0) 1523 read_proto = MyReadPipeProto(loop=self.loop) 1524 read_connect = self.loop.connect_read_pipe(lambda: read_proto, 1525 slave_read_obj) 1526 read_transport, p = self.loop.run_until_complete(read_connect) 1527 self.assertIs(p, read_proto) 1528 self.assertIs(read_transport, read_proto.transport) 1529 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1530 self.assertEqual(0, read_proto.nbytes) 1531 1532 1533 slave_write_obj = io.open(write_slave, 'wb', 0) 1534 write_proto = MyWritePipeProto(loop=self.loop) 1535 write_connect = self.loop.connect_write_pipe(lambda: write_proto, 1536 slave_write_obj) 1537 write_transport, p = self.loop.run_until_complete(write_connect) 1538 self.assertIs(p, write_proto) 1539 self.assertIs(write_transport, write_proto.transport) 1540 self.assertEqual('CONNECTED', write_proto.state) 1541 1542 data = bytearray() 1543 def reader(data): 1544 chunk = os.read(master, 1024) 1545 data += chunk 1546 return len(data) 1547 1548 write_transport.write(b'1') 1549 test_utils.run_until(self.loop, lambda: reader(data) >= 1, 1550 timeout=support.SHORT_TIMEOUT) 1551 self.assertEqual(b'1', data) 1552 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1553 self.assertEqual('CONNECTED', write_proto.state) 1554 1555 os.write(master, b'a') 1556 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, 1557 timeout=support.SHORT_TIMEOUT) 1558 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1559 self.assertEqual(1, read_proto.nbytes) 1560 self.assertEqual('CONNECTED', write_proto.state) 1561 1562 write_transport.write(b'2345') 1563 test_utils.run_until(self.loop, lambda: reader(data) >= 5, 1564 timeout=support.SHORT_TIMEOUT) 1565 self.assertEqual(b'12345', data) 1566 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1567 self.assertEqual('CONNECTED', write_proto.state) 1568 1569 os.write(master, b'bcde') 1570 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, 1571 timeout=support.SHORT_TIMEOUT) 1572 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1573 self.assertEqual(5, read_proto.nbytes) 1574 self.assertEqual('CONNECTED', write_proto.state) 1575 1576 os.close(master) 1577 1578 read_transport.close() 1579 self.loop.run_until_complete(read_proto.done) 1580 self.assertEqual( 1581 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) 1582 1583 write_transport.close() 1584 self.loop.run_until_complete(write_proto.done) 1585 self.assertEqual('CLOSED', write_proto.state) 1586 1587 def test_prompt_cancellation(self): 1588 r, w = socket.socketpair() 1589 r.setblocking(False) 1590 f = self.loop.create_task(self.loop.sock_recv(r, 1)) 1591 ov = getattr(f, 'ov', None) 1592 if ov is not None: 1593 self.assertTrue(ov.pending) 1594 1595 async def main(): 1596 try: 1597 self.loop.call_soon(f.cancel) 1598 await f 1599 except asyncio.CancelledError: 1600 res = 'cancelled' 1601 else: 1602 res = None 1603 finally: 1604 self.loop.stop() 1605 return res 1606 1607 start = time.monotonic() 1608 t = self.loop.create_task(main()) 1609 self.loop.run_forever() 1610 elapsed = time.monotonic() - start 1611 1612 self.assertLess(elapsed, 0.1) 1613 self.assertEqual(t.result(), 'cancelled') 1614 self.assertRaises(asyncio.CancelledError, f.result) 1615 if ov is not None: 1616 self.assertFalse(ov.pending) 1617 self.loop._stop_serving(r) 1618 1619 r.close() 1620 w.close() 1621 1622 def test_timeout_rounding(self): 1623 def _run_once(): 1624 self.loop._run_once_counter += 1 1625 orig_run_once() 1626 1627 orig_run_once = self.loop._run_once 1628 self.loop._run_once_counter = 0 1629 self.loop._run_once = _run_once 1630 1631 async def wait(): 1632 loop = self.loop 1633 await asyncio.sleep(1e-2) 1634 await asyncio.sleep(1e-4) 1635 await asyncio.sleep(1e-6) 1636 await asyncio.sleep(1e-8) 1637 await asyncio.sleep(1e-10) 1638 1639 self.loop.run_until_complete(wait()) 1640 # The ideal number of call is 12, but on some platforms, the selector 1641 # may sleep at little bit less than timeout depending on the resolution 1642 # of the clock used by the kernel. Tolerate a few useless calls on 1643 # these platforms. 1644 self.assertLessEqual(self.loop._run_once_counter, 20, 1645 {'clock_resolution': self.loop._clock_resolution, 1646 'selector': self.loop._selector.__class__.__name__}) 1647 1648 def test_remove_fds_after_closing(self): 1649 loop = self.create_event_loop() 1650 callback = lambda: None 1651 r, w = socket.socketpair() 1652 self.addCleanup(r.close) 1653 self.addCleanup(w.close) 1654 loop.add_reader(r, callback) 1655 loop.add_writer(w, callback) 1656 loop.close() 1657 self.assertFalse(loop.remove_reader(r)) 1658 self.assertFalse(loop.remove_writer(w)) 1659 1660 def test_add_fds_after_closing(self): 1661 loop = self.create_event_loop() 1662 callback = lambda: None 1663 r, w = socket.socketpair() 1664 self.addCleanup(r.close) 1665 self.addCleanup(w.close) 1666 loop.close() 1667 with self.assertRaises(RuntimeError): 1668 loop.add_reader(r, callback) 1669 with self.assertRaises(RuntimeError): 1670 loop.add_writer(w, callback) 1671 1672 def test_close_running_event_loop(self): 1673 async def close_loop(loop): 1674 self.loop.close() 1675 1676 coro = close_loop(self.loop) 1677 with self.assertRaises(RuntimeError): 1678 self.loop.run_until_complete(coro) 1679 1680 def test_close(self): 1681 self.loop.close() 1682 1683 async def test(): 1684 pass 1685 1686 func = lambda: False 1687 coro = test() 1688 self.addCleanup(coro.close) 1689 1690 # operation blocked when the loop is closed 1691 with self.assertRaises(RuntimeError): 1692 self.loop.run_forever() 1693 with self.assertRaises(RuntimeError): 1694 fut = self.loop.create_future() 1695 self.loop.run_until_complete(fut) 1696 with self.assertRaises(RuntimeError): 1697 self.loop.call_soon(func) 1698 with self.assertRaises(RuntimeError): 1699 self.loop.call_soon_threadsafe(func) 1700 with self.assertRaises(RuntimeError): 1701 self.loop.call_later(1.0, func) 1702 with self.assertRaises(RuntimeError): 1703 self.loop.call_at(self.loop.time() + .0, func) 1704 with self.assertRaises(RuntimeError): 1705 self.loop.create_task(coro) 1706 with self.assertRaises(RuntimeError): 1707 self.loop.add_signal_handler(signal.SIGTERM, func) 1708 1709 # run_in_executor test is tricky: the method is a coroutine, 1710 # but run_until_complete cannot be called on closed loop. 1711 # Thus iterate once explicitly. 1712 with self.assertRaises(RuntimeError): 1713 it = self.loop.run_in_executor(None, func).__await__() 1714 next(it) 1715 1716 1717class SubprocessTestsMixin: 1718 1719 def check_terminated(self, returncode): 1720 if sys.platform == 'win32': 1721 self.assertIsInstance(returncode, int) 1722 # expect 1 but sometimes get 0 1723 else: 1724 self.assertEqual(-signal.SIGTERM, returncode) 1725 1726 def check_killed(self, returncode): 1727 if sys.platform == 'win32': 1728 self.assertIsInstance(returncode, int) 1729 # expect 1 but sometimes get 0 1730 else: 1731 self.assertEqual(-signal.SIGKILL, returncode) 1732 1733 def test_subprocess_exec(self): 1734 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1735 1736 connect = self.loop.subprocess_exec( 1737 functools.partial(MySubprocessProtocol, self.loop), 1738 sys.executable, prog) 1739 transp, proto = self.loop.run_until_complete(connect) 1740 self.assertIsInstance(proto, MySubprocessProtocol) 1741 self.loop.run_until_complete(proto.connected) 1742 self.assertEqual('CONNECTED', proto.state) 1743 1744 stdin = transp.get_pipe_transport(0) 1745 stdin.write(b'Python The Winner') 1746 self.loop.run_until_complete(proto.got_data[1].wait()) 1747 with test_utils.disable_logger(): 1748 transp.close() 1749 self.loop.run_until_complete(proto.completed) 1750 self.check_killed(proto.returncode) 1751 self.assertEqual(b'Python The Winner', proto.data[1]) 1752 1753 def test_subprocess_interactive(self): 1754 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1755 1756 connect = self.loop.subprocess_exec( 1757 functools.partial(MySubprocessProtocol, self.loop), 1758 sys.executable, prog) 1759 1760 transp, proto = self.loop.run_until_complete(connect) 1761 self.assertIsInstance(proto, MySubprocessProtocol) 1762 self.loop.run_until_complete(proto.connected) 1763 self.assertEqual('CONNECTED', proto.state) 1764 1765 stdin = transp.get_pipe_transport(0) 1766 stdin.write(b'Python ') 1767 self.loop.run_until_complete(proto.got_data[1].wait()) 1768 proto.got_data[1].clear() 1769 self.assertEqual(b'Python ', proto.data[1]) 1770 1771 stdin.write(b'The Winner') 1772 self.loop.run_until_complete(proto.got_data[1].wait()) 1773 self.assertEqual(b'Python The Winner', proto.data[1]) 1774 1775 with test_utils.disable_logger(): 1776 transp.close() 1777 self.loop.run_until_complete(proto.completed) 1778 self.check_killed(proto.returncode) 1779 1780 def test_subprocess_shell(self): 1781 connect = self.loop.subprocess_shell( 1782 functools.partial(MySubprocessProtocol, self.loop), 1783 'echo Python') 1784 transp, proto = self.loop.run_until_complete(connect) 1785 self.assertIsInstance(proto, MySubprocessProtocol) 1786 self.loop.run_until_complete(proto.connected) 1787 1788 transp.get_pipe_transport(0).close() 1789 self.loop.run_until_complete(proto.completed) 1790 self.assertEqual(0, proto.returncode) 1791 self.assertTrue(all(f.done() for f in proto.disconnects.values())) 1792 self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') 1793 self.assertEqual(proto.data[2], b'') 1794 transp.close() 1795 1796 def test_subprocess_exitcode(self): 1797 connect = self.loop.subprocess_shell( 1798 functools.partial(MySubprocessProtocol, self.loop), 1799 'exit 7', stdin=None, stdout=None, stderr=None) 1800 1801 transp, proto = self.loop.run_until_complete(connect) 1802 self.assertIsInstance(proto, MySubprocessProtocol) 1803 self.loop.run_until_complete(proto.completed) 1804 self.assertEqual(7, proto.returncode) 1805 transp.close() 1806 1807 def test_subprocess_close_after_finish(self): 1808 connect = self.loop.subprocess_shell( 1809 functools.partial(MySubprocessProtocol, self.loop), 1810 'exit 7', stdin=None, stdout=None, stderr=None) 1811 transp, proto = self.loop.run_until_complete(connect) 1812 self.assertIsInstance(proto, MySubprocessProtocol) 1813 self.assertIsNone(transp.get_pipe_transport(0)) 1814 self.assertIsNone(transp.get_pipe_transport(1)) 1815 self.assertIsNone(transp.get_pipe_transport(2)) 1816 self.loop.run_until_complete(proto.completed) 1817 self.assertEqual(7, proto.returncode) 1818 self.assertIsNone(transp.close()) 1819 1820 def test_subprocess_kill(self): 1821 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1822 1823 connect = self.loop.subprocess_exec( 1824 functools.partial(MySubprocessProtocol, self.loop), 1825 sys.executable, prog) 1826 1827 transp, proto = self.loop.run_until_complete(connect) 1828 self.assertIsInstance(proto, MySubprocessProtocol) 1829 self.loop.run_until_complete(proto.connected) 1830 1831 transp.kill() 1832 self.loop.run_until_complete(proto.completed) 1833 self.check_killed(proto.returncode) 1834 transp.close() 1835 1836 def test_subprocess_terminate(self): 1837 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1838 1839 connect = self.loop.subprocess_exec( 1840 functools.partial(MySubprocessProtocol, self.loop), 1841 sys.executable, prog) 1842 1843 transp, proto = self.loop.run_until_complete(connect) 1844 self.assertIsInstance(proto, MySubprocessProtocol) 1845 self.loop.run_until_complete(proto.connected) 1846 1847 transp.terminate() 1848 self.loop.run_until_complete(proto.completed) 1849 self.check_terminated(proto.returncode) 1850 transp.close() 1851 1852 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 1853 def test_subprocess_send_signal(self): 1854 # bpo-31034: Make sure that we get the default signal handler (killing 1855 # the process). The parent process may have decided to ignore SIGHUP, 1856 # and signal handlers are inherited. 1857 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 1858 try: 1859 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1860 1861 connect = self.loop.subprocess_exec( 1862 functools.partial(MySubprocessProtocol, self.loop), 1863 sys.executable, prog) 1864 1865 transp, proto = self.loop.run_until_complete(connect) 1866 self.assertIsInstance(proto, MySubprocessProtocol) 1867 self.loop.run_until_complete(proto.connected) 1868 1869 transp.send_signal(signal.SIGHUP) 1870 self.loop.run_until_complete(proto.completed) 1871 self.assertEqual(-signal.SIGHUP, proto.returncode) 1872 transp.close() 1873 finally: 1874 signal.signal(signal.SIGHUP, old_handler) 1875 1876 def test_subprocess_stderr(self): 1877 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1878 1879 connect = self.loop.subprocess_exec( 1880 functools.partial(MySubprocessProtocol, self.loop), 1881 sys.executable, prog) 1882 1883 transp, proto = self.loop.run_until_complete(connect) 1884 self.assertIsInstance(proto, MySubprocessProtocol) 1885 self.loop.run_until_complete(proto.connected) 1886 1887 stdin = transp.get_pipe_transport(0) 1888 stdin.write(b'test') 1889 1890 self.loop.run_until_complete(proto.completed) 1891 1892 transp.close() 1893 self.assertEqual(b'OUT:test', proto.data[1]) 1894 self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) 1895 self.assertEqual(0, proto.returncode) 1896 1897 def test_subprocess_stderr_redirect_to_stdout(self): 1898 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1899 1900 connect = self.loop.subprocess_exec( 1901 functools.partial(MySubprocessProtocol, self.loop), 1902 sys.executable, prog, stderr=subprocess.STDOUT) 1903 1904 transp, proto = self.loop.run_until_complete(connect) 1905 self.assertIsInstance(proto, MySubprocessProtocol) 1906 self.loop.run_until_complete(proto.connected) 1907 1908 stdin = transp.get_pipe_transport(0) 1909 self.assertIsNotNone(transp.get_pipe_transport(1)) 1910 self.assertIsNone(transp.get_pipe_transport(2)) 1911 1912 stdin.write(b'test') 1913 self.loop.run_until_complete(proto.completed) 1914 self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), 1915 proto.data[1]) 1916 self.assertEqual(b'', proto.data[2]) 1917 1918 transp.close() 1919 self.assertEqual(0, proto.returncode) 1920 1921 def test_subprocess_close_client_stream(self): 1922 prog = os.path.join(os.path.dirname(__file__), 'echo3.py') 1923 1924 connect = self.loop.subprocess_exec( 1925 functools.partial(MySubprocessProtocol, self.loop), 1926 sys.executable, prog) 1927 transp, proto = self.loop.run_until_complete(connect) 1928 self.assertIsInstance(proto, MySubprocessProtocol) 1929 self.loop.run_until_complete(proto.connected) 1930 1931 stdin = transp.get_pipe_transport(0) 1932 stdout = transp.get_pipe_transport(1) 1933 stdin.write(b'test') 1934 self.loop.run_until_complete(proto.got_data[1].wait()) 1935 self.assertEqual(b'OUT:test', proto.data[1]) 1936 1937 stdout.close() 1938 self.loop.run_until_complete(proto.disconnects[1]) 1939 stdin.write(b'xxx') 1940 self.loop.run_until_complete(proto.got_data[2].wait()) 1941 if sys.platform != 'win32': 1942 self.assertEqual(b'ERR:BrokenPipeError', proto.data[2]) 1943 else: 1944 # After closing the read-end of a pipe, writing to the 1945 # write-end using os.write() fails with errno==EINVAL and 1946 # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using 1947 # WriteFile() we get ERROR_BROKEN_PIPE as expected.) 1948 self.assertEqual(b'ERR:OSError', proto.data[2]) 1949 with test_utils.disable_logger(): 1950 transp.close() 1951 self.loop.run_until_complete(proto.completed) 1952 self.check_killed(proto.returncode) 1953 1954 def test_subprocess_wait_no_same_group(self): 1955 # start the new process in a new session 1956 connect = self.loop.subprocess_shell( 1957 functools.partial(MySubprocessProtocol, self.loop), 1958 'exit 7', stdin=None, stdout=None, stderr=None, 1959 start_new_session=True) 1960 transp, proto = self.loop.run_until_complete(connect) 1961 self.assertIsInstance(proto, MySubprocessProtocol) 1962 self.loop.run_until_complete(proto.completed) 1963 self.assertEqual(7, proto.returncode) 1964 transp.close() 1965 1966 def test_subprocess_exec_invalid_args(self): 1967 async def connect(**kwds): 1968 await self.loop.subprocess_exec( 1969 asyncio.SubprocessProtocol, 1970 'pwd', **kwds) 1971 1972 with self.assertRaises(ValueError): 1973 self.loop.run_until_complete(connect(universal_newlines=True)) 1974 with self.assertRaises(ValueError): 1975 self.loop.run_until_complete(connect(bufsize=4096)) 1976 with self.assertRaises(ValueError): 1977 self.loop.run_until_complete(connect(shell=True)) 1978 1979 def test_subprocess_shell_invalid_args(self): 1980 1981 async def connect(cmd=None, **kwds): 1982 if not cmd: 1983 cmd = 'pwd' 1984 await self.loop.subprocess_shell( 1985 asyncio.SubprocessProtocol, 1986 cmd, **kwds) 1987 1988 with self.assertRaises(ValueError): 1989 self.loop.run_until_complete(connect(['ls', '-l'])) 1990 with self.assertRaises(ValueError): 1991 self.loop.run_until_complete(connect(universal_newlines=True)) 1992 with self.assertRaises(ValueError): 1993 self.loop.run_until_complete(connect(bufsize=4096)) 1994 with self.assertRaises(ValueError): 1995 self.loop.run_until_complete(connect(shell=False)) 1996 1997 1998if sys.platform == 'win32': 1999 2000 class SelectEventLoopTests(EventLoopTestsMixin, 2001 test_utils.TestCase): 2002 2003 def create_event_loop(self): 2004 return asyncio.SelectorEventLoop() 2005 2006 class ProactorEventLoopTests(EventLoopTestsMixin, 2007 SubprocessTestsMixin, 2008 test_utils.TestCase): 2009 2010 def create_event_loop(self): 2011 return asyncio.ProactorEventLoop() 2012 2013 def test_reader_callback(self): 2014 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2015 2016 def test_reader_callback_cancel(self): 2017 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2018 2019 def test_writer_callback(self): 2020 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2021 2022 def test_writer_callback_cancel(self): 2023 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2024 2025 def test_remove_fds_after_closing(self): 2026 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2027else: 2028 import selectors 2029 2030 class UnixEventLoopTestsMixin(EventLoopTestsMixin): 2031 def setUp(self): 2032 super().setUp() 2033 watcher = asyncio.SafeChildWatcher() 2034 watcher.attach_loop(self.loop) 2035 asyncio.set_child_watcher(watcher) 2036 2037 def tearDown(self): 2038 asyncio.set_child_watcher(None) 2039 super().tearDown() 2040 2041 2042 if hasattr(selectors, 'KqueueSelector'): 2043 class KqueueEventLoopTests(UnixEventLoopTestsMixin, 2044 SubprocessTestsMixin, 2045 test_utils.TestCase): 2046 2047 def create_event_loop(self): 2048 return asyncio.SelectorEventLoop( 2049 selectors.KqueueSelector()) 2050 2051 # kqueue doesn't support character devices (PTY) on Mac OS X older 2052 # than 10.9 (Maverick) 2053 @support.requires_mac_ver(10, 9) 2054 # Issue #20667: KqueueEventLoopTests.test_read_pty_output() 2055 # hangs on OpenBSD 5.5 2056 @unittest.skipIf(sys.platform.startswith('openbsd'), 2057 'test hangs on OpenBSD') 2058 def test_read_pty_output(self): 2059 super().test_read_pty_output() 2060 2061 # kqueue doesn't support character devices (PTY) on Mac OS X older 2062 # than 10.9 (Maverick) 2063 @support.requires_mac_ver(10, 9) 2064 def test_write_pty(self): 2065 super().test_write_pty() 2066 2067 if hasattr(selectors, 'EpollSelector'): 2068 class EPollEventLoopTests(UnixEventLoopTestsMixin, 2069 SubprocessTestsMixin, 2070 test_utils.TestCase): 2071 2072 def create_event_loop(self): 2073 return asyncio.SelectorEventLoop(selectors.EpollSelector()) 2074 2075 if hasattr(selectors, 'PollSelector'): 2076 class PollEventLoopTests(UnixEventLoopTestsMixin, 2077 SubprocessTestsMixin, 2078 test_utils.TestCase): 2079 2080 def create_event_loop(self): 2081 return asyncio.SelectorEventLoop(selectors.PollSelector()) 2082 2083 # Should always exist. 2084 class SelectEventLoopTests(UnixEventLoopTestsMixin, 2085 SubprocessTestsMixin, 2086 test_utils.TestCase): 2087 2088 def create_event_loop(self): 2089 return asyncio.SelectorEventLoop(selectors.SelectSelector()) 2090 2091 2092def noop(*args, **kwargs): 2093 pass 2094 2095 2096class HandleTests(test_utils.TestCase): 2097 2098 def setUp(self): 2099 super().setUp() 2100 self.loop = mock.Mock() 2101 self.loop.get_debug.return_value = True 2102 2103 def test_handle(self): 2104 def callback(*args): 2105 return args 2106 2107 args = () 2108 h = asyncio.Handle(callback, args, self.loop) 2109 self.assertIs(h._callback, callback) 2110 self.assertIs(h._args, args) 2111 self.assertFalse(h.cancelled()) 2112 2113 h.cancel() 2114 self.assertTrue(h.cancelled()) 2115 2116 def test_callback_with_exception(self): 2117 def callback(): 2118 raise ValueError() 2119 2120 self.loop = mock.Mock() 2121 self.loop.call_exception_handler = mock.Mock() 2122 2123 h = asyncio.Handle(callback, (), self.loop) 2124 h._run() 2125 2126 self.loop.call_exception_handler.assert_called_with({ 2127 'message': test_utils.MockPattern('Exception in callback.*'), 2128 'exception': mock.ANY, 2129 'handle': h, 2130 'source_traceback': h._source_traceback, 2131 }) 2132 2133 def test_handle_weakref(self): 2134 wd = weakref.WeakValueDictionary() 2135 h = asyncio.Handle(lambda: None, (), self.loop) 2136 wd['h'] = h # Would fail without __weakref__ slot. 2137 2138 def test_handle_repr(self): 2139 self.loop.get_debug.return_value = False 2140 2141 # simple function 2142 h = asyncio.Handle(noop, (1, 2), self.loop) 2143 filename, lineno = test_utils.get_function_source(noop) 2144 self.assertEqual(repr(h), 2145 '<Handle noop(1, 2) at %s:%s>' 2146 % (filename, lineno)) 2147 2148 # cancelled handle 2149 h.cancel() 2150 self.assertEqual(repr(h), 2151 '<Handle cancelled>') 2152 2153 # decorated function 2154 with self.assertWarns(DeprecationWarning): 2155 cb = asyncio.coroutine(noop) 2156 h = asyncio.Handle(cb, (), self.loop) 2157 self.assertEqual(repr(h), 2158 '<Handle noop() at %s:%s>' 2159 % (filename, lineno)) 2160 2161 # partial function 2162 cb = functools.partial(noop, 1, 2) 2163 h = asyncio.Handle(cb, (3,), self.loop) 2164 regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$' 2165 % (re.escape(filename), lineno)) 2166 self.assertRegex(repr(h), regex) 2167 2168 # partial function with keyword args 2169 cb = functools.partial(noop, x=1) 2170 h = asyncio.Handle(cb, (2, 3), self.loop) 2171 regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$' 2172 % (re.escape(filename), lineno)) 2173 self.assertRegex(repr(h), regex) 2174 2175 # partial method 2176 if sys.version_info >= (3, 4): 2177 method = HandleTests.test_handle_repr 2178 cb = functools.partialmethod(method) 2179 filename, lineno = test_utils.get_function_source(method) 2180 h = asyncio.Handle(cb, (), self.loop) 2181 2182 cb_regex = r'<function HandleTests.test_handle_repr .*>' 2183 cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex) 2184 regex = (r'^<Handle %s at %s:%s>$' 2185 % (cb_regex, re.escape(filename), lineno)) 2186 self.assertRegex(repr(h), regex) 2187 2188 def test_handle_repr_debug(self): 2189 self.loop.get_debug.return_value = True 2190 2191 # simple function 2192 create_filename = __file__ 2193 create_lineno = sys._getframe().f_lineno + 1 2194 h = asyncio.Handle(noop, (1, 2), self.loop) 2195 filename, lineno = test_utils.get_function_source(noop) 2196 self.assertEqual(repr(h), 2197 '<Handle noop(1, 2) at %s:%s created at %s:%s>' 2198 % (filename, lineno, create_filename, create_lineno)) 2199 2200 # cancelled handle 2201 h.cancel() 2202 self.assertEqual( 2203 repr(h), 2204 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2205 % (filename, lineno, create_filename, create_lineno)) 2206 2207 # double cancellation won't overwrite _repr 2208 h.cancel() 2209 self.assertEqual( 2210 repr(h), 2211 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2212 % (filename, lineno, create_filename, create_lineno)) 2213 2214 def test_handle_source_traceback(self): 2215 loop = asyncio.get_event_loop_policy().new_event_loop() 2216 loop.set_debug(True) 2217 self.set_event_loop(loop) 2218 2219 def check_source_traceback(h): 2220 lineno = sys._getframe(1).f_lineno - 1 2221 self.assertIsInstance(h._source_traceback, list) 2222 self.assertEqual(h._source_traceback[-1][:3], 2223 (__file__, 2224 lineno, 2225 'test_handle_source_traceback')) 2226 2227 # call_soon 2228 h = loop.call_soon(noop) 2229 check_source_traceback(h) 2230 2231 # call_soon_threadsafe 2232 h = loop.call_soon_threadsafe(noop) 2233 check_source_traceback(h) 2234 2235 # call_later 2236 h = loop.call_later(0, noop) 2237 check_source_traceback(h) 2238 2239 # call_at 2240 h = loop.call_later(0, noop) 2241 check_source_traceback(h) 2242 2243 @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'), 2244 'No collections.abc.Coroutine') 2245 def test_coroutine_like_object_debug_formatting(self): 2246 # Test that asyncio can format coroutines that are instances of 2247 # collections.abc.Coroutine, but lack cr_core or gi_code attributes 2248 # (such as ones compiled with Cython). 2249 2250 coro = CoroLike() 2251 coro.__name__ = 'AAA' 2252 self.assertTrue(asyncio.iscoroutine(coro)) 2253 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2254 2255 coro.__qualname__ = 'BBB' 2256 self.assertEqual(coroutines._format_coroutine(coro), 'BBB()') 2257 2258 coro.cr_running = True 2259 self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running') 2260 2261 coro.__name__ = coro.__qualname__ = None 2262 self.assertEqual(coroutines._format_coroutine(coro), 2263 '<CoroLike without __name__>() running') 2264 2265 coro = CoroLike() 2266 coro.__qualname__ = 'CoroLike' 2267 # Some coroutines might not have '__name__', such as 2268 # built-in async_gen.asend(). 2269 self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()') 2270 2271 coro = CoroLike() 2272 coro.__qualname__ = 'AAA' 2273 coro.cr_code = None 2274 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2275 2276 2277class TimerTests(unittest.TestCase): 2278 2279 def setUp(self): 2280 super().setUp() 2281 self.loop = mock.Mock() 2282 2283 def test_hash(self): 2284 when = time.monotonic() 2285 h = asyncio.TimerHandle(when, lambda: False, (), 2286 mock.Mock()) 2287 self.assertEqual(hash(h), hash(when)) 2288 2289 def test_when(self): 2290 when = time.monotonic() 2291 h = asyncio.TimerHandle(when, lambda: False, (), 2292 mock.Mock()) 2293 self.assertEqual(when, h.when()) 2294 2295 def test_timer(self): 2296 def callback(*args): 2297 return args 2298 2299 args = (1, 2, 3) 2300 when = time.monotonic() 2301 h = asyncio.TimerHandle(when, callback, args, mock.Mock()) 2302 self.assertIs(h._callback, callback) 2303 self.assertIs(h._args, args) 2304 self.assertFalse(h.cancelled()) 2305 2306 # cancel 2307 h.cancel() 2308 self.assertTrue(h.cancelled()) 2309 self.assertIsNone(h._callback) 2310 self.assertIsNone(h._args) 2311 2312 # when cannot be None 2313 self.assertRaises(AssertionError, 2314 asyncio.TimerHandle, None, callback, args, 2315 self.loop) 2316 2317 def test_timer_repr(self): 2318 self.loop.get_debug.return_value = False 2319 2320 # simple function 2321 h = asyncio.TimerHandle(123, noop, (), self.loop) 2322 src = test_utils.get_function_source(noop) 2323 self.assertEqual(repr(h), 2324 '<TimerHandle when=123 noop() at %s:%s>' % src) 2325 2326 # cancelled handle 2327 h.cancel() 2328 self.assertEqual(repr(h), 2329 '<TimerHandle cancelled when=123>') 2330 2331 def test_timer_repr_debug(self): 2332 self.loop.get_debug.return_value = True 2333 2334 # simple function 2335 create_filename = __file__ 2336 create_lineno = sys._getframe().f_lineno + 1 2337 h = asyncio.TimerHandle(123, noop, (), self.loop) 2338 filename, lineno = test_utils.get_function_source(noop) 2339 self.assertEqual(repr(h), 2340 '<TimerHandle when=123 noop() ' 2341 'at %s:%s created at %s:%s>' 2342 % (filename, lineno, create_filename, create_lineno)) 2343 2344 # cancelled handle 2345 h.cancel() 2346 self.assertEqual(repr(h), 2347 '<TimerHandle cancelled when=123 noop() ' 2348 'at %s:%s created at %s:%s>' 2349 % (filename, lineno, create_filename, create_lineno)) 2350 2351 2352 def test_timer_comparison(self): 2353 def callback(*args): 2354 return args 2355 2356 when = time.monotonic() 2357 2358 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2359 h2 = asyncio.TimerHandle(when, callback, (), self.loop) 2360 # TODO: Use assertLess etc. 2361 self.assertFalse(h1 < h2) 2362 self.assertFalse(h2 < h1) 2363 self.assertTrue(h1 <= h2) 2364 self.assertTrue(h2 <= h1) 2365 self.assertFalse(h1 > h2) 2366 self.assertFalse(h2 > h1) 2367 self.assertTrue(h1 >= h2) 2368 self.assertTrue(h2 >= h1) 2369 self.assertTrue(h1 == h2) 2370 self.assertFalse(h1 != h2) 2371 2372 h2.cancel() 2373 self.assertFalse(h1 == h2) 2374 2375 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2376 h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop) 2377 self.assertTrue(h1 < h2) 2378 self.assertFalse(h2 < h1) 2379 self.assertTrue(h1 <= h2) 2380 self.assertFalse(h2 <= h1) 2381 self.assertFalse(h1 > h2) 2382 self.assertTrue(h2 > h1) 2383 self.assertFalse(h1 >= h2) 2384 self.assertTrue(h2 >= h1) 2385 self.assertFalse(h1 == h2) 2386 self.assertTrue(h1 != h2) 2387 2388 h3 = asyncio.Handle(callback, (), self.loop) 2389 self.assertIs(NotImplemented, h1.__eq__(h3)) 2390 self.assertIs(NotImplemented, h1.__ne__(h3)) 2391 2392 with self.assertRaises(TypeError): 2393 h1 < () 2394 with self.assertRaises(TypeError): 2395 h1 > () 2396 with self.assertRaises(TypeError): 2397 h1 <= () 2398 with self.assertRaises(TypeError): 2399 h1 >= () 2400 self.assertFalse(h1 == ()) 2401 self.assertTrue(h1 != ()) 2402 2403 self.assertTrue(h1 == ALWAYS_EQ) 2404 self.assertFalse(h1 != ALWAYS_EQ) 2405 self.assertTrue(h1 < LARGEST) 2406 self.assertFalse(h1 > LARGEST) 2407 self.assertTrue(h1 <= LARGEST) 2408 self.assertFalse(h1 >= LARGEST) 2409 self.assertFalse(h1 < SMALLEST) 2410 self.assertTrue(h1 > SMALLEST) 2411 self.assertFalse(h1 <= SMALLEST) 2412 self.assertTrue(h1 >= SMALLEST) 2413 2414 2415class AbstractEventLoopTests(unittest.TestCase): 2416 2417 def test_not_implemented(self): 2418 f = mock.Mock() 2419 loop = asyncio.AbstractEventLoop() 2420 self.assertRaises( 2421 NotImplementedError, loop.run_forever) 2422 self.assertRaises( 2423 NotImplementedError, loop.run_until_complete, None) 2424 self.assertRaises( 2425 NotImplementedError, loop.stop) 2426 self.assertRaises( 2427 NotImplementedError, loop.is_running) 2428 self.assertRaises( 2429 NotImplementedError, loop.is_closed) 2430 self.assertRaises( 2431 NotImplementedError, loop.close) 2432 self.assertRaises( 2433 NotImplementedError, loop.create_task, None) 2434 self.assertRaises( 2435 NotImplementedError, loop.call_later, None, None) 2436 self.assertRaises( 2437 NotImplementedError, loop.call_at, f, f) 2438 self.assertRaises( 2439 NotImplementedError, loop.call_soon, None) 2440 self.assertRaises( 2441 NotImplementedError, loop.time) 2442 self.assertRaises( 2443 NotImplementedError, loop.call_soon_threadsafe, None) 2444 self.assertRaises( 2445 NotImplementedError, loop.set_default_executor, f) 2446 self.assertRaises( 2447 NotImplementedError, loop.add_reader, 1, f) 2448 self.assertRaises( 2449 NotImplementedError, loop.remove_reader, 1) 2450 self.assertRaises( 2451 NotImplementedError, loop.add_writer, 1, f) 2452 self.assertRaises( 2453 NotImplementedError, loop.remove_writer, 1) 2454 self.assertRaises( 2455 NotImplementedError, loop.add_signal_handler, 1, f) 2456 self.assertRaises( 2457 NotImplementedError, loop.remove_signal_handler, 1) 2458 self.assertRaises( 2459 NotImplementedError, loop.remove_signal_handler, 1) 2460 self.assertRaises( 2461 NotImplementedError, loop.set_exception_handler, f) 2462 self.assertRaises( 2463 NotImplementedError, loop.default_exception_handler, f) 2464 self.assertRaises( 2465 NotImplementedError, loop.call_exception_handler, f) 2466 self.assertRaises( 2467 NotImplementedError, loop.get_debug) 2468 self.assertRaises( 2469 NotImplementedError, loop.set_debug, f) 2470 2471 def test_not_implemented_async(self): 2472 2473 async def inner(): 2474 f = mock.Mock() 2475 loop = asyncio.AbstractEventLoop() 2476 2477 with self.assertRaises(NotImplementedError): 2478 await loop.run_in_executor(f, f) 2479 with self.assertRaises(NotImplementedError): 2480 await loop.getaddrinfo('localhost', 8080) 2481 with self.assertRaises(NotImplementedError): 2482 await loop.getnameinfo(('localhost', 8080)) 2483 with self.assertRaises(NotImplementedError): 2484 await loop.create_connection(f) 2485 with self.assertRaises(NotImplementedError): 2486 await loop.create_server(f) 2487 with self.assertRaises(NotImplementedError): 2488 await loop.create_datagram_endpoint(f) 2489 with self.assertRaises(NotImplementedError): 2490 await loop.sock_recv(f, 10) 2491 with self.assertRaises(NotImplementedError): 2492 await loop.sock_recv_into(f, 10) 2493 with self.assertRaises(NotImplementedError): 2494 await loop.sock_sendall(f, 10) 2495 with self.assertRaises(NotImplementedError): 2496 await loop.sock_connect(f, f) 2497 with self.assertRaises(NotImplementedError): 2498 await loop.sock_accept(f) 2499 with self.assertRaises(NotImplementedError): 2500 await loop.sock_sendfile(f, f) 2501 with self.assertRaises(NotImplementedError): 2502 await loop.sendfile(f, f) 2503 with self.assertRaises(NotImplementedError): 2504 await loop.connect_read_pipe(f, mock.sentinel.pipe) 2505 with self.assertRaises(NotImplementedError): 2506 await loop.connect_write_pipe(f, mock.sentinel.pipe) 2507 with self.assertRaises(NotImplementedError): 2508 await loop.subprocess_shell(f, mock.sentinel) 2509 with self.assertRaises(NotImplementedError): 2510 await loop.subprocess_exec(f) 2511 2512 loop = asyncio.new_event_loop() 2513 loop.run_until_complete(inner()) 2514 loop.close() 2515 2516 2517class PolicyTests(unittest.TestCase): 2518 2519 def test_event_loop_policy(self): 2520 policy = asyncio.AbstractEventLoopPolicy() 2521 self.assertRaises(NotImplementedError, policy.get_event_loop) 2522 self.assertRaises(NotImplementedError, policy.set_event_loop, object()) 2523 self.assertRaises(NotImplementedError, policy.new_event_loop) 2524 self.assertRaises(NotImplementedError, policy.get_child_watcher) 2525 self.assertRaises(NotImplementedError, policy.set_child_watcher, 2526 object()) 2527 2528 def test_get_event_loop(self): 2529 policy = asyncio.DefaultEventLoopPolicy() 2530 self.assertIsNone(policy._local._loop) 2531 2532 loop = policy.get_event_loop() 2533 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2534 2535 self.assertIs(policy._local._loop, loop) 2536 self.assertIs(loop, policy.get_event_loop()) 2537 loop.close() 2538 2539 def test_get_event_loop_calls_set_event_loop(self): 2540 policy = asyncio.DefaultEventLoopPolicy() 2541 2542 with mock.patch.object( 2543 policy, "set_event_loop", 2544 wraps=policy.set_event_loop) as m_set_event_loop: 2545 2546 loop = policy.get_event_loop() 2547 2548 # policy._local._loop must be set through .set_event_loop() 2549 # (the unix DefaultEventLoopPolicy needs this call to attach 2550 # the child watcher correctly) 2551 m_set_event_loop.assert_called_with(loop) 2552 2553 loop.close() 2554 2555 def test_get_event_loop_after_set_none(self): 2556 policy = asyncio.DefaultEventLoopPolicy() 2557 policy.set_event_loop(None) 2558 self.assertRaises(RuntimeError, policy.get_event_loop) 2559 2560 @mock.patch('asyncio.events.threading.current_thread') 2561 def test_get_event_loop_thread(self, m_current_thread): 2562 2563 def f(): 2564 policy = asyncio.DefaultEventLoopPolicy() 2565 self.assertRaises(RuntimeError, policy.get_event_loop) 2566 2567 th = threading.Thread(target=f) 2568 th.start() 2569 th.join() 2570 2571 def test_new_event_loop(self): 2572 policy = asyncio.DefaultEventLoopPolicy() 2573 2574 loop = policy.new_event_loop() 2575 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2576 loop.close() 2577 2578 def test_set_event_loop(self): 2579 policy = asyncio.DefaultEventLoopPolicy() 2580 old_loop = policy.get_event_loop() 2581 2582 self.assertRaises(AssertionError, policy.set_event_loop, object()) 2583 2584 loop = policy.new_event_loop() 2585 policy.set_event_loop(loop) 2586 self.assertIs(loop, policy.get_event_loop()) 2587 self.assertIsNot(old_loop, policy.get_event_loop()) 2588 loop.close() 2589 old_loop.close() 2590 2591 def test_get_event_loop_policy(self): 2592 policy = asyncio.get_event_loop_policy() 2593 self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy) 2594 self.assertIs(policy, asyncio.get_event_loop_policy()) 2595 2596 def test_set_event_loop_policy(self): 2597 self.assertRaises( 2598 AssertionError, asyncio.set_event_loop_policy, object()) 2599 2600 old_policy = asyncio.get_event_loop_policy() 2601 2602 policy = asyncio.DefaultEventLoopPolicy() 2603 asyncio.set_event_loop_policy(policy) 2604 self.assertIs(policy, asyncio.get_event_loop_policy()) 2605 self.assertIsNot(policy, old_policy) 2606 2607 2608class GetEventLoopTestsMixin: 2609 2610 _get_running_loop_impl = None 2611 _set_running_loop_impl = None 2612 get_running_loop_impl = None 2613 get_event_loop_impl = None 2614 2615 def setUp(self): 2616 self._get_running_loop_saved = events._get_running_loop 2617 self._set_running_loop_saved = events._set_running_loop 2618 self.get_running_loop_saved = events.get_running_loop 2619 self.get_event_loop_saved = events.get_event_loop 2620 2621 events._get_running_loop = type(self)._get_running_loop_impl 2622 events._set_running_loop = type(self)._set_running_loop_impl 2623 events.get_running_loop = type(self).get_running_loop_impl 2624 events.get_event_loop = type(self).get_event_loop_impl 2625 2626 asyncio._get_running_loop = type(self)._get_running_loop_impl 2627 asyncio._set_running_loop = type(self)._set_running_loop_impl 2628 asyncio.get_running_loop = type(self).get_running_loop_impl 2629 asyncio.get_event_loop = type(self).get_event_loop_impl 2630 2631 super().setUp() 2632 2633 self.loop = asyncio.new_event_loop() 2634 asyncio.set_event_loop(self.loop) 2635 2636 if sys.platform != 'win32': 2637 watcher = asyncio.SafeChildWatcher() 2638 watcher.attach_loop(self.loop) 2639 asyncio.set_child_watcher(watcher) 2640 2641 def tearDown(self): 2642 try: 2643 if sys.platform != 'win32': 2644 asyncio.set_child_watcher(None) 2645 2646 super().tearDown() 2647 finally: 2648 self.loop.close() 2649 asyncio.set_event_loop(None) 2650 2651 events._get_running_loop = self._get_running_loop_saved 2652 events._set_running_loop = self._set_running_loop_saved 2653 events.get_running_loop = self.get_running_loop_saved 2654 events.get_event_loop = self.get_event_loop_saved 2655 2656 asyncio._get_running_loop = self._get_running_loop_saved 2657 asyncio._set_running_loop = self._set_running_loop_saved 2658 asyncio.get_running_loop = self.get_running_loop_saved 2659 asyncio.get_event_loop = self.get_event_loop_saved 2660 2661 if sys.platform != 'win32': 2662 2663 def test_get_event_loop_new_process(self): 2664 # bpo-32126: The multiprocessing module used by 2665 # ProcessPoolExecutor is not functional when the 2666 # multiprocessing.synchronize module cannot be imported. 2667 support.skip_if_broken_multiprocessing_synchronize() 2668 2669 async def main(): 2670 pool = concurrent.futures.ProcessPoolExecutor() 2671 result = await self.loop.run_in_executor( 2672 pool, _test_get_event_loop_new_process__sub_proc) 2673 pool.shutdown() 2674 return result 2675 2676 self.assertEqual( 2677 self.loop.run_until_complete(main()), 2678 'hello') 2679 2680 def test_get_event_loop_returns_running_loop(self): 2681 class TestError(Exception): 2682 pass 2683 2684 class Policy(asyncio.DefaultEventLoopPolicy): 2685 def get_event_loop(self): 2686 raise TestError 2687 2688 old_policy = asyncio.get_event_loop_policy() 2689 try: 2690 asyncio.set_event_loop_policy(Policy()) 2691 loop = asyncio.new_event_loop() 2692 2693 with self.assertRaises(TestError): 2694 asyncio.get_event_loop() 2695 asyncio.set_event_loop(None) 2696 with self.assertRaises(TestError): 2697 asyncio.get_event_loop() 2698 2699 with self.assertRaisesRegex(RuntimeError, 'no running'): 2700 self.assertIs(asyncio.get_running_loop(), None) 2701 self.assertIs(asyncio._get_running_loop(), None) 2702 2703 async def func(): 2704 self.assertIs(asyncio.get_event_loop(), loop) 2705 self.assertIs(asyncio.get_running_loop(), loop) 2706 self.assertIs(asyncio._get_running_loop(), loop) 2707 2708 loop.run_until_complete(func()) 2709 2710 asyncio.set_event_loop(loop) 2711 with self.assertRaises(TestError): 2712 asyncio.get_event_loop() 2713 2714 asyncio.set_event_loop(None) 2715 with self.assertRaises(TestError): 2716 asyncio.get_event_loop() 2717 2718 finally: 2719 asyncio.set_event_loop_policy(old_policy) 2720 if loop is not None: 2721 loop.close() 2722 2723 with self.assertRaisesRegex(RuntimeError, 'no running'): 2724 self.assertIs(asyncio.get_running_loop(), None) 2725 2726 self.assertIs(asyncio._get_running_loop(), None) 2727 2728 2729class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 2730 2731 _get_running_loop_impl = events._py__get_running_loop 2732 _set_running_loop_impl = events._py__set_running_loop 2733 get_running_loop_impl = events._py_get_running_loop 2734 get_event_loop_impl = events._py_get_event_loop 2735 2736 2737try: 2738 import _asyncio # NoQA 2739except ImportError: 2740 pass 2741else: 2742 2743 class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 2744 2745 _get_running_loop_impl = events._c__get_running_loop 2746 _set_running_loop_impl = events._c__set_running_loop 2747 get_running_loop_impl = events._c_get_running_loop 2748 get_event_loop_impl = events._c_get_event_loop 2749 2750 2751class TestServer(unittest.TestCase): 2752 2753 def test_get_loop(self): 2754 loop = asyncio.new_event_loop() 2755 self.addCleanup(loop.close) 2756 proto = MyProto(loop) 2757 server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0)) 2758 self.assertEqual(server.get_loop(), loop) 2759 server.close() 2760 loop.run_until_complete(server.wait_closed()) 2761 2762 2763class TestAbstractServer(unittest.TestCase): 2764 2765 def test_close(self): 2766 with self.assertRaises(NotImplementedError): 2767 events.AbstractServer().close() 2768 2769 def test_wait_closed(self): 2770 loop = asyncio.new_event_loop() 2771 self.addCleanup(loop.close) 2772 2773 with self.assertRaises(NotImplementedError): 2774 loop.run_until_complete(events.AbstractServer().wait_closed()) 2775 2776 def test_get_loop(self): 2777 with self.assertRaises(NotImplementedError): 2778 events.AbstractServer().get_loop() 2779 2780 2781if __name__ == '__main__': 2782 unittest.main() 2783