1"""Tests for base_events.py""" 2 3import concurrent.futures 4import errno 5import math 6import socket 7import sys 8import threading 9import time 10import unittest 11from unittest import mock 12 13import asyncio 14from asyncio import base_events 15from asyncio import constants 16from test.test_asyncio import utils as test_utils 17from test import support 18from test.support.script_helper import assert_python_ok 19from test.support import socket_helper 20 21 22MOCK_ANY = mock.ANY 23PY34 = sys.version_info >= (3, 4) 24 25 26def tearDownModule(): 27 asyncio.set_event_loop_policy(None) 28 29 30def mock_socket_module(): 31 m_socket = mock.MagicMock(spec=socket) 32 for name in ( 33 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP', 34 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton' 35 ): 36 if hasattr(socket, name): 37 setattr(m_socket, name, getattr(socket, name)) 38 else: 39 delattr(m_socket, name) 40 41 m_socket.socket = mock.MagicMock() 42 m_socket.socket.return_value = test_utils.mock_nonblocking_socket() 43 m_socket.getaddrinfo._is_coroutine = False 44 45 return m_socket 46 47 48def patch_socket(f): 49 return mock.patch('asyncio.base_events.socket', 50 new_callable=mock_socket_module)(f) 51 52 53class BaseEventTests(test_utils.TestCase): 54 55 def test_ipaddr_info(self): 56 UNSPEC = socket.AF_UNSPEC 57 INET = socket.AF_INET 58 INET6 = socket.AF_INET6 59 STREAM = socket.SOCK_STREAM 60 DGRAM = socket.SOCK_DGRAM 61 TCP = socket.IPPROTO_TCP 62 UDP = socket.IPPROTO_UDP 63 64 self.assertEqual( 65 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 66 base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP)) 67 68 self.assertEqual( 69 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 70 base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP)) 71 72 self.assertEqual( 73 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 74 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP)) 75 76 self.assertEqual( 77 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 78 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP)) 79 80 # Socket type STREAM implies TCP protocol. 81 self.assertEqual( 82 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 83 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0)) 84 85 # Socket type DGRAM implies UDP protocol. 86 self.assertEqual( 87 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 88 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0)) 89 90 # No socket type. 91 self.assertIsNone( 92 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) 93 94 if socket_helper.IPV6_ENABLED: 95 # IPv4 address with family IPv6. 96 self.assertIsNone( 97 base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) 98 99 self.assertEqual( 100 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 101 base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP)) 102 103 self.assertEqual( 104 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 105 base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP)) 106 107 # IPv6 address with family IPv4. 108 self.assertIsNone( 109 base_events._ipaddr_info('::3', 1, INET, STREAM, TCP)) 110 111 # IPv6 address with zone index. 112 self.assertIsNone( 113 base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP)) 114 115 def test_port_parameter_types(self): 116 # Test obscure kinds of arguments for "port". 117 INET = socket.AF_INET 118 STREAM = socket.SOCK_STREAM 119 TCP = socket.IPPROTO_TCP 120 121 self.assertEqual( 122 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 123 base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP)) 124 125 self.assertEqual( 126 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 127 base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP)) 128 129 self.assertEqual( 130 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 131 base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP)) 132 133 self.assertEqual( 134 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 135 base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP)) 136 137 self.assertEqual( 138 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 139 base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP)) 140 141 @patch_socket 142 def test_ipaddr_info_no_inet_pton(self, m_socket): 143 del m_socket.inet_pton 144 self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1, 145 socket.AF_INET, 146 socket.SOCK_STREAM, 147 socket.IPPROTO_TCP)) 148 149 150class BaseEventLoopTests(test_utils.TestCase): 151 152 def setUp(self): 153 super().setUp() 154 self.loop = base_events.BaseEventLoop() 155 self.loop._selector = mock.Mock() 156 self.loop._selector.select.return_value = () 157 self.set_event_loop(self.loop) 158 159 def test_not_implemented(self): 160 m = mock.Mock() 161 self.assertRaises( 162 NotImplementedError, 163 self.loop._make_socket_transport, m, m) 164 self.assertRaises( 165 NotImplementedError, 166 self.loop._make_ssl_transport, m, m, m, m) 167 self.assertRaises( 168 NotImplementedError, 169 self.loop._make_datagram_transport, m, m) 170 self.assertRaises( 171 NotImplementedError, self.loop._process_events, []) 172 self.assertRaises( 173 NotImplementedError, self.loop._write_to_self) 174 self.assertRaises( 175 NotImplementedError, 176 self.loop._make_read_pipe_transport, m, m) 177 self.assertRaises( 178 NotImplementedError, 179 self.loop._make_write_pipe_transport, m, m) 180 gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m) 181 with self.assertRaises(NotImplementedError): 182 gen.send(None) 183 184 def test_close(self): 185 self.assertFalse(self.loop.is_closed()) 186 self.loop.close() 187 self.assertTrue(self.loop.is_closed()) 188 189 # it should be possible to call close() more than once 190 self.loop.close() 191 self.loop.close() 192 193 # operation blocked when the loop is closed 194 f = self.loop.create_future() 195 self.assertRaises(RuntimeError, self.loop.run_forever) 196 self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 197 198 def test__add_callback_handle(self): 199 h = asyncio.Handle(lambda: False, (), self.loop, None) 200 201 self.loop._add_callback(h) 202 self.assertFalse(self.loop._scheduled) 203 self.assertIn(h, self.loop._ready) 204 205 def test__add_callback_cancelled_handle(self): 206 h = asyncio.Handle(lambda: False, (), self.loop, None) 207 h.cancel() 208 209 self.loop._add_callback(h) 210 self.assertFalse(self.loop._scheduled) 211 self.assertFalse(self.loop._ready) 212 213 def test_set_default_executor(self): 214 class DummyExecutor(concurrent.futures.ThreadPoolExecutor): 215 def submit(self, fn, *args, **kwargs): 216 raise NotImplementedError( 217 'cannot submit into a dummy executor') 218 219 self.loop._process_events = mock.Mock() 220 self.loop._write_to_self = mock.Mock() 221 222 executor = DummyExecutor() 223 self.loop.set_default_executor(executor) 224 self.assertIs(executor, self.loop._default_executor) 225 226 def test_set_default_executor_deprecation_warnings(self): 227 executor = mock.Mock() 228 229 with self.assertWarns(DeprecationWarning): 230 self.loop.set_default_executor(executor) 231 232 # Avoid cleaning up the executor mock 233 self.loop._default_executor = None 234 235 def test_call_soon(self): 236 def cb(): 237 pass 238 239 h = self.loop.call_soon(cb) 240 self.assertEqual(h._callback, cb) 241 self.assertIsInstance(h, asyncio.Handle) 242 self.assertIn(h, self.loop._ready) 243 244 def test_call_soon_non_callable(self): 245 self.loop.set_debug(True) 246 with self.assertRaisesRegex(TypeError, 'a callable object'): 247 self.loop.call_soon(1) 248 249 def test_call_later(self): 250 def cb(): 251 pass 252 253 h = self.loop.call_later(10.0, cb) 254 self.assertIsInstance(h, asyncio.TimerHandle) 255 self.assertIn(h, self.loop._scheduled) 256 self.assertNotIn(h, self.loop._ready) 257 258 def test_call_later_negative_delays(self): 259 calls = [] 260 261 def cb(arg): 262 calls.append(arg) 263 264 self.loop._process_events = mock.Mock() 265 self.loop.call_later(-1, cb, 'a') 266 self.loop.call_later(-2, cb, 'b') 267 test_utils.run_briefly(self.loop) 268 self.assertEqual(calls, ['b', 'a']) 269 270 def test_time_and_call_at(self): 271 def cb(): 272 self.loop.stop() 273 274 self.loop._process_events = mock.Mock() 275 delay = 0.1 276 277 when = self.loop.time() + delay 278 self.loop.call_at(when, cb) 279 t0 = self.loop.time() 280 self.loop.run_forever() 281 dt = self.loop.time() - t0 282 283 # 50 ms: maximum granularity of the event loop 284 self.assertGreaterEqual(dt, delay - 0.050, dt) 285 # tolerate a difference of +800 ms because some Python buildbots 286 # are really slow 287 self.assertLessEqual(dt, 0.9, dt) 288 289 def check_thread(self, loop, debug): 290 def cb(): 291 pass 292 293 loop.set_debug(debug) 294 if debug: 295 msg = ("Non-thread-safe operation invoked on an event loop other " 296 "than the current one") 297 with self.assertRaisesRegex(RuntimeError, msg): 298 loop.call_soon(cb) 299 with self.assertRaisesRegex(RuntimeError, msg): 300 loop.call_later(60, cb) 301 with self.assertRaisesRegex(RuntimeError, msg): 302 loop.call_at(loop.time() + 60, cb) 303 else: 304 loop.call_soon(cb) 305 loop.call_later(60, cb) 306 loop.call_at(loop.time() + 60, cb) 307 308 def test_check_thread(self): 309 def check_in_thread(loop, event, debug, create_loop, fut): 310 # wait until the event loop is running 311 event.wait() 312 313 try: 314 if create_loop: 315 loop2 = base_events.BaseEventLoop() 316 try: 317 asyncio.set_event_loop(loop2) 318 self.check_thread(loop, debug) 319 finally: 320 asyncio.set_event_loop(None) 321 loop2.close() 322 else: 323 self.check_thread(loop, debug) 324 except Exception as exc: 325 loop.call_soon_threadsafe(fut.set_exception, exc) 326 else: 327 loop.call_soon_threadsafe(fut.set_result, None) 328 329 def test_thread(loop, debug, create_loop=False): 330 event = threading.Event() 331 fut = loop.create_future() 332 loop.call_soon(event.set) 333 args = (loop, event, debug, create_loop, fut) 334 thread = threading.Thread(target=check_in_thread, args=args) 335 thread.start() 336 loop.run_until_complete(fut) 337 thread.join() 338 339 self.loop._process_events = mock.Mock() 340 self.loop._write_to_self = mock.Mock() 341 342 # raise RuntimeError if the thread has no event loop 343 test_thread(self.loop, True) 344 345 # check disabled if debug mode is disabled 346 test_thread(self.loop, False) 347 348 # raise RuntimeError if the event loop of the thread is not the called 349 # event loop 350 test_thread(self.loop, True, create_loop=True) 351 352 # check disabled if debug mode is disabled 353 test_thread(self.loop, False, create_loop=True) 354 355 def test__run_once(self): 356 h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (), 357 self.loop, None) 358 h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (), 359 self.loop, None) 360 361 h1.cancel() 362 363 self.loop._process_events = mock.Mock() 364 self.loop._scheduled.append(h1) 365 self.loop._scheduled.append(h2) 366 self.loop._run_once() 367 368 t = self.loop._selector.select.call_args[0][0] 369 self.assertTrue(9.5 < t < 10.5, t) 370 self.assertEqual([h2], self.loop._scheduled) 371 self.assertTrue(self.loop._process_events.called) 372 373 def test_set_debug(self): 374 self.loop.set_debug(True) 375 self.assertTrue(self.loop.get_debug()) 376 self.loop.set_debug(False) 377 self.assertFalse(self.loop.get_debug()) 378 379 def test__run_once_schedule_handle(self): 380 handle = None 381 processed = False 382 383 def cb(loop): 384 nonlocal processed, handle 385 processed = True 386 handle = loop.call_soon(lambda: True) 387 388 h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,), 389 self.loop, None) 390 391 self.loop._process_events = mock.Mock() 392 self.loop._scheduled.append(h) 393 self.loop._run_once() 394 395 self.assertTrue(processed) 396 self.assertEqual([handle], list(self.loop._ready)) 397 398 def test__run_once_cancelled_event_cleanup(self): 399 self.loop._process_events = mock.Mock() 400 401 self.assertTrue( 402 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) 403 404 def cb(): 405 pass 406 407 # Set up one "blocking" event that will not be cancelled to 408 # ensure later cancelled events do not make it to the head 409 # of the queue and get cleaned. 410 not_cancelled_count = 1 411 self.loop.call_later(3000, cb) 412 413 # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) 414 # cancelled handles, ensure they aren't removed 415 416 cancelled_count = 2 417 for x in range(2): 418 h = self.loop.call_later(3600, cb) 419 h.cancel() 420 421 # Add some cancelled events that will be at head and removed 422 cancelled_count += 2 423 for x in range(2): 424 h = self.loop.call_later(100, cb) 425 h.cancel() 426 427 # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low 428 self.assertLessEqual(cancelled_count + not_cancelled_count, 429 base_events._MIN_SCHEDULED_TIMER_HANDLES) 430 431 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 432 433 self.loop._run_once() 434 435 cancelled_count -= 2 436 437 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 438 439 self.assertEqual(len(self.loop._scheduled), 440 cancelled_count + not_cancelled_count) 441 442 # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION 443 # so that deletion of cancelled events will occur on next _run_once 444 add_cancel_count = int(math.ceil( 445 base_events._MIN_SCHEDULED_TIMER_HANDLES * 446 base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 447 448 add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - 449 add_cancel_count, 0) 450 451 # Add some events that will not be cancelled 452 not_cancelled_count += add_not_cancel_count 453 for x in range(add_not_cancel_count): 454 self.loop.call_later(3600, cb) 455 456 # Add enough cancelled events 457 cancelled_count += add_cancel_count 458 for x in range(add_cancel_count): 459 h = self.loop.call_later(3600, cb) 460 h.cancel() 461 462 # Ensure all handles are still scheduled 463 self.assertEqual(len(self.loop._scheduled), 464 cancelled_count + not_cancelled_count) 465 466 self.loop._run_once() 467 468 # Ensure cancelled events were removed 469 self.assertEqual(len(self.loop._scheduled), not_cancelled_count) 470 471 # Ensure only uncancelled events remain scheduled 472 self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) 473 474 def test_run_until_complete_type_error(self): 475 self.assertRaises(TypeError, 476 self.loop.run_until_complete, 'blah') 477 478 def test_run_until_complete_loop(self): 479 task = self.loop.create_future() 480 other_loop = self.new_test_loop() 481 self.addCleanup(other_loop.close) 482 self.assertRaises(ValueError, 483 other_loop.run_until_complete, task) 484 485 def test_run_until_complete_loop_orphan_future_close_loop(self): 486 class ShowStopper(SystemExit): 487 pass 488 489 async def foo(delay): 490 await asyncio.sleep(delay) 491 492 def throw(): 493 raise ShowStopper 494 495 self.loop._process_events = mock.Mock() 496 self.loop.call_soon(throw) 497 with self.assertRaises(ShowStopper): 498 self.loop.run_until_complete(foo(0.1)) 499 500 # This call fails if run_until_complete does not clean up 501 # done-callback for the previous future. 502 self.loop.run_until_complete(foo(0.2)) 503 504 def test_subprocess_exec_invalid_args(self): 505 args = [sys.executable, '-c', 'pass'] 506 507 # missing program parameter (empty args) 508 self.assertRaises(TypeError, 509 self.loop.run_until_complete, self.loop.subprocess_exec, 510 asyncio.SubprocessProtocol) 511 512 # expected multiple arguments, not a list 513 self.assertRaises(TypeError, 514 self.loop.run_until_complete, self.loop.subprocess_exec, 515 asyncio.SubprocessProtocol, args) 516 517 # program arguments must be strings, not int 518 self.assertRaises(TypeError, 519 self.loop.run_until_complete, self.loop.subprocess_exec, 520 asyncio.SubprocessProtocol, sys.executable, 123) 521 522 # universal_newlines, shell, bufsize must not be set 523 self.assertRaises(TypeError, 524 self.loop.run_until_complete, self.loop.subprocess_exec, 525 asyncio.SubprocessProtocol, *args, universal_newlines=True) 526 self.assertRaises(TypeError, 527 self.loop.run_until_complete, self.loop.subprocess_exec, 528 asyncio.SubprocessProtocol, *args, shell=True) 529 self.assertRaises(TypeError, 530 self.loop.run_until_complete, self.loop.subprocess_exec, 531 asyncio.SubprocessProtocol, *args, bufsize=4096) 532 533 def test_subprocess_shell_invalid_args(self): 534 # expected a string, not an int or a list 535 self.assertRaises(TypeError, 536 self.loop.run_until_complete, self.loop.subprocess_shell, 537 asyncio.SubprocessProtocol, 123) 538 self.assertRaises(TypeError, 539 self.loop.run_until_complete, self.loop.subprocess_shell, 540 asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass']) 541 542 # universal_newlines, shell, bufsize must not be set 543 self.assertRaises(TypeError, 544 self.loop.run_until_complete, self.loop.subprocess_shell, 545 asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True) 546 self.assertRaises(TypeError, 547 self.loop.run_until_complete, self.loop.subprocess_shell, 548 asyncio.SubprocessProtocol, 'exit 0', shell=True) 549 self.assertRaises(TypeError, 550 self.loop.run_until_complete, self.loop.subprocess_shell, 551 asyncio.SubprocessProtocol, 'exit 0', bufsize=4096) 552 553 def test_default_exc_handler_callback(self): 554 self.loop._process_events = mock.Mock() 555 556 def zero_error(fut): 557 fut.set_result(True) 558 1/0 559 560 # Test call_soon (events.Handle) 561 with mock.patch('asyncio.base_events.logger') as log: 562 fut = self.loop.create_future() 563 self.loop.call_soon(zero_error, fut) 564 fut.add_done_callback(lambda fut: self.loop.stop()) 565 self.loop.run_forever() 566 log.error.assert_called_with( 567 test_utils.MockPattern('Exception in callback.*zero'), 568 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 569 570 # Test call_later (events.TimerHandle) 571 with mock.patch('asyncio.base_events.logger') as log: 572 fut = self.loop.create_future() 573 self.loop.call_later(0.01, zero_error, fut) 574 fut.add_done_callback(lambda fut: self.loop.stop()) 575 self.loop.run_forever() 576 log.error.assert_called_with( 577 test_utils.MockPattern('Exception in callback.*zero'), 578 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 579 580 def test_default_exc_handler_coro(self): 581 self.loop._process_events = mock.Mock() 582 583 async def zero_error_coro(): 584 await asyncio.sleep(0.01) 585 1/0 586 587 # Test Future.__del__ 588 with mock.patch('asyncio.base_events.logger') as log: 589 fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) 590 fut.add_done_callback(lambda *args: self.loop.stop()) 591 self.loop.run_forever() 592 fut = None # Trigger Future.__del__ or futures._TracebackLogger 593 support.gc_collect() 594 if PY34: 595 # Future.__del__ in Python 3.4 logs error with 596 # an actual exception context 597 log.error.assert_called_with( 598 test_utils.MockPattern('.*exception was never retrieved'), 599 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 600 else: 601 # futures._TracebackLogger logs only textual traceback 602 log.error.assert_called_with( 603 test_utils.MockPattern( 604 '.*exception was never retrieved.*ZeroDiv'), 605 exc_info=False) 606 607 def test_set_exc_handler_invalid(self): 608 with self.assertRaisesRegex(TypeError, 'A callable object or None'): 609 self.loop.set_exception_handler('spam') 610 611 def test_set_exc_handler_custom(self): 612 def zero_error(): 613 1/0 614 615 def run_loop(): 616 handle = self.loop.call_soon(zero_error) 617 self.loop._run_once() 618 return handle 619 620 self.loop.set_debug(True) 621 self.loop._process_events = mock.Mock() 622 623 self.assertIsNone(self.loop.get_exception_handler()) 624 mock_handler = mock.Mock() 625 self.loop.set_exception_handler(mock_handler) 626 self.assertIs(self.loop.get_exception_handler(), mock_handler) 627 handle = run_loop() 628 mock_handler.assert_called_with(self.loop, { 629 'exception': MOCK_ANY, 630 'message': test_utils.MockPattern( 631 'Exception in callback.*zero_error'), 632 'handle': handle, 633 'source_traceback': handle._source_traceback, 634 }) 635 mock_handler.reset_mock() 636 637 self.loop.set_exception_handler(None) 638 with mock.patch('asyncio.base_events.logger') as log: 639 run_loop() 640 log.error.assert_called_with( 641 test_utils.MockPattern( 642 'Exception in callback.*zero'), 643 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 644 645 assert not mock_handler.called 646 647 def test_set_exc_handler_broken(self): 648 def run_loop(): 649 def zero_error(): 650 1/0 651 self.loop.call_soon(zero_error) 652 self.loop._run_once() 653 654 def handler(loop, context): 655 raise AttributeError('spam') 656 657 self.loop._process_events = mock.Mock() 658 659 self.loop.set_exception_handler(handler) 660 661 with mock.patch('asyncio.base_events.logger') as log: 662 run_loop() 663 log.error.assert_called_with( 664 test_utils.MockPattern( 665 'Unhandled error in exception handler'), 666 exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) 667 668 def test_default_exc_handler_broken(self): 669 _context = None 670 671 class Loop(base_events.BaseEventLoop): 672 673 _selector = mock.Mock() 674 _process_events = mock.Mock() 675 676 def default_exception_handler(self, context): 677 nonlocal _context 678 _context = context 679 # Simulates custom buggy "default_exception_handler" 680 raise ValueError('spam') 681 682 loop = Loop() 683 self.addCleanup(loop.close) 684 asyncio.set_event_loop(loop) 685 686 def run_loop(): 687 def zero_error(): 688 1/0 689 loop.call_soon(zero_error) 690 loop._run_once() 691 692 with mock.patch('asyncio.base_events.logger') as log: 693 run_loop() 694 log.error.assert_called_with( 695 'Exception in default exception handler', 696 exc_info=True) 697 698 def custom_handler(loop, context): 699 raise ValueError('ham') 700 701 _context = None 702 loop.set_exception_handler(custom_handler) 703 with mock.patch('asyncio.base_events.logger') as log: 704 run_loop() 705 log.error.assert_called_with( 706 test_utils.MockPattern('Exception in default exception.*' 707 'while handling.*in custom'), 708 exc_info=True) 709 710 # Check that original context was passed to default 711 # exception handler. 712 self.assertIn('context', _context) 713 self.assertIs(type(_context['context']['exception']), 714 ZeroDivisionError) 715 716 def test_set_task_factory_invalid(self): 717 with self.assertRaisesRegex( 718 TypeError, 'task factory must be a callable or None'): 719 720 self.loop.set_task_factory(1) 721 722 self.assertIsNone(self.loop.get_task_factory()) 723 724 def test_set_task_factory(self): 725 self.loop._process_events = mock.Mock() 726 727 class MyTask(asyncio.Task): 728 pass 729 730 async def coro(): 731 pass 732 733 factory = lambda loop, coro: MyTask(coro, loop=loop) 734 735 self.assertIsNone(self.loop.get_task_factory()) 736 self.loop.set_task_factory(factory) 737 self.assertIs(self.loop.get_task_factory(), factory) 738 739 task = self.loop.create_task(coro()) 740 self.assertTrue(isinstance(task, MyTask)) 741 self.loop.run_until_complete(task) 742 743 self.loop.set_task_factory(None) 744 self.assertIsNone(self.loop.get_task_factory()) 745 746 task = self.loop.create_task(coro()) 747 self.assertTrue(isinstance(task, asyncio.Task)) 748 self.assertFalse(isinstance(task, MyTask)) 749 self.loop.run_until_complete(task) 750 751 def test_env_var_debug(self): 752 code = '\n'.join(( 753 'import asyncio', 754 'loop = asyncio.get_event_loop()', 755 'print(loop.get_debug())')) 756 757 # Test with -E to not fail if the unit test was run with 758 # PYTHONASYNCIODEBUG set to a non-empty string 759 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 760 self.assertEqual(stdout.rstrip(), b'False') 761 762 sts, stdout, stderr = assert_python_ok('-c', code, 763 PYTHONASYNCIODEBUG='', 764 PYTHONDEVMODE='') 765 self.assertEqual(stdout.rstrip(), b'False') 766 767 sts, stdout, stderr = assert_python_ok('-c', code, 768 PYTHONASYNCIODEBUG='1', 769 PYTHONDEVMODE='') 770 self.assertEqual(stdout.rstrip(), b'True') 771 772 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 773 PYTHONASYNCIODEBUG='1') 774 self.assertEqual(stdout.rstrip(), b'False') 775 776 # -X dev 777 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 778 '-c', code) 779 self.assertEqual(stdout.rstrip(), b'True') 780 781 def test_create_task(self): 782 class MyTask(asyncio.Task): 783 pass 784 785 async def test(): 786 pass 787 788 class EventLoop(base_events.BaseEventLoop): 789 def create_task(self, coro): 790 return MyTask(coro, loop=loop) 791 792 loop = EventLoop() 793 self.set_event_loop(loop) 794 795 coro = test() 796 task = asyncio.ensure_future(coro, loop=loop) 797 self.assertIsInstance(task, MyTask) 798 799 # make warnings quiet 800 task._log_destroy_pending = False 801 coro.close() 802 803 def test_create_named_task_with_default_factory(self): 804 async def test(): 805 pass 806 807 loop = asyncio.new_event_loop() 808 task = loop.create_task(test(), name='test_task') 809 try: 810 self.assertEqual(task.get_name(), 'test_task') 811 finally: 812 loop.run_until_complete(task) 813 loop.close() 814 815 def test_create_named_task_with_custom_factory(self): 816 def task_factory(loop, coro): 817 return asyncio.Task(coro, loop=loop) 818 819 async def test(): 820 pass 821 822 loop = asyncio.new_event_loop() 823 loop.set_task_factory(task_factory) 824 task = loop.create_task(test(), name='test_task') 825 try: 826 self.assertEqual(task.get_name(), 'test_task') 827 finally: 828 loop.run_until_complete(task) 829 loop.close() 830 831 def test_run_forever_keyboard_interrupt(self): 832 # Python issue #22601: ensure that the temporary task created by 833 # run_forever() consumes the KeyboardInterrupt and so don't log 834 # a warning 835 async def raise_keyboard_interrupt(): 836 raise KeyboardInterrupt 837 838 self.loop._process_events = mock.Mock() 839 self.loop.call_exception_handler = mock.Mock() 840 841 try: 842 self.loop.run_until_complete(raise_keyboard_interrupt()) 843 except KeyboardInterrupt: 844 pass 845 self.loop.close() 846 support.gc_collect() 847 848 self.assertFalse(self.loop.call_exception_handler.called) 849 850 def test_run_until_complete_baseexception(self): 851 # Python issue #22429: run_until_complete() must not schedule a pending 852 # call to stop() if the future raised a BaseException 853 async def raise_keyboard_interrupt(): 854 raise KeyboardInterrupt 855 856 self.loop._process_events = mock.Mock() 857 858 try: 859 self.loop.run_until_complete(raise_keyboard_interrupt()) 860 except KeyboardInterrupt: 861 pass 862 863 def func(): 864 self.loop.stop() 865 func.called = True 866 func.called = False 867 try: 868 self.loop.call_soon(func) 869 self.loop.run_forever() 870 except KeyboardInterrupt: 871 pass 872 self.assertTrue(func.called) 873 874 def test_single_selecter_event_callback_after_stopping(self): 875 # Python issue #25593: A stopped event loop may cause event callbacks 876 # to run more than once. 877 event_sentinel = object() 878 callcount = 0 879 doer = None 880 881 def proc_events(event_list): 882 nonlocal doer 883 if event_sentinel in event_list: 884 doer = self.loop.call_soon(do_event) 885 886 def do_event(): 887 nonlocal callcount 888 callcount += 1 889 self.loop.call_soon(clear_selector) 890 891 def clear_selector(): 892 doer.cancel() 893 self.loop._selector.select.return_value = () 894 895 self.loop._process_events = proc_events 896 self.loop._selector.select.return_value = (event_sentinel,) 897 898 for i in range(1, 3): 899 with self.subTest('Loop %d/2' % i): 900 self.loop.call_soon(self.loop.stop) 901 self.loop.run_forever() 902 self.assertEqual(callcount, 1) 903 904 def test_run_once(self): 905 # Simple test for test_utils.run_once(). It may seem strange 906 # to have a test for this (the function isn't even used!) but 907 # it's a de-factor standard API for library tests. This tests 908 # the idiom: loop.call_soon(loop.stop); loop.run_forever(). 909 count = 0 910 911 def callback(): 912 nonlocal count 913 count += 1 914 915 self.loop._process_events = mock.Mock() 916 self.loop.call_soon(callback) 917 test_utils.run_once(self.loop) 918 self.assertEqual(count, 1) 919 920 def test_run_forever_pre_stopped(self): 921 # Test that the old idiom for pre-stopping the loop works. 922 self.loop._process_events = mock.Mock() 923 self.loop.stop() 924 self.loop.run_forever() 925 self.loop._selector.select.assert_called_once_with(0) 926 927 async def leave_unfinalized_asyncgen(self): 928 # Create an async generator, iterate it partially, and leave it 929 # to be garbage collected. 930 # Used in async generator finalization tests. 931 # Depends on implementation details of garbage collector. Changes 932 # in gc may break this function. 933 status = {'started': False, 934 'stopped': False, 935 'finalized': False} 936 937 async def agen(): 938 status['started'] = True 939 try: 940 for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']: 941 yield item 942 finally: 943 status['finalized'] = True 944 945 ag = agen() 946 ai = ag.__aiter__() 947 948 async def iter_one(): 949 try: 950 item = await ai.__anext__() 951 except StopAsyncIteration: 952 return 953 if item == 'THREE': 954 status['stopped'] = True 955 return 956 asyncio.create_task(iter_one()) 957 958 asyncio.create_task(iter_one()) 959 return status 960 961 def test_asyncgen_finalization_by_gc(self): 962 # Async generators should be finalized when garbage collected. 963 self.loop._process_events = mock.Mock() 964 self.loop._write_to_self = mock.Mock() 965 with support.disable_gc(): 966 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 967 while not status['stopped']: 968 test_utils.run_briefly(self.loop) 969 self.assertTrue(status['started']) 970 self.assertTrue(status['stopped']) 971 self.assertFalse(status['finalized']) 972 support.gc_collect() 973 test_utils.run_briefly(self.loop) 974 self.assertTrue(status['finalized']) 975 976 def test_asyncgen_finalization_by_gc_in_other_thread(self): 977 # Python issue 34769: If garbage collector runs in another 978 # thread, async generators will not finalize in debug 979 # mode. 980 self.loop._process_events = mock.Mock() 981 self.loop._write_to_self = mock.Mock() 982 self.loop.set_debug(True) 983 with support.disable_gc(): 984 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 985 while not status['stopped']: 986 test_utils.run_briefly(self.loop) 987 self.assertTrue(status['started']) 988 self.assertTrue(status['stopped']) 989 self.assertFalse(status['finalized']) 990 self.loop.run_until_complete( 991 self.loop.run_in_executor(None, support.gc_collect)) 992 test_utils.run_briefly(self.loop) 993 self.assertTrue(status['finalized']) 994 995 996class MyProto(asyncio.Protocol): 997 done = None 998 999 def __init__(self, create_future=False): 1000 self.state = 'INITIAL' 1001 self.nbytes = 0 1002 if create_future: 1003 self.done = asyncio.get_running_loop().create_future() 1004 1005 def connection_made(self, transport): 1006 self.transport = transport 1007 assert self.state == 'INITIAL', self.state 1008 self.state = 'CONNECTED' 1009 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 1010 1011 def data_received(self, data): 1012 assert self.state == 'CONNECTED', self.state 1013 self.nbytes += len(data) 1014 1015 def eof_received(self): 1016 assert self.state == 'CONNECTED', self.state 1017 self.state = 'EOF' 1018 1019 def connection_lost(self, exc): 1020 assert self.state in ('CONNECTED', 'EOF'), self.state 1021 self.state = 'CLOSED' 1022 if self.done: 1023 self.done.set_result(None) 1024 1025 1026class MyDatagramProto(asyncio.DatagramProtocol): 1027 done = None 1028 1029 def __init__(self, create_future=False, loop=None): 1030 self.state = 'INITIAL' 1031 self.nbytes = 0 1032 if create_future: 1033 self.done = loop.create_future() 1034 1035 def connection_made(self, transport): 1036 self.transport = transport 1037 assert self.state == 'INITIAL', self.state 1038 self.state = 'INITIALIZED' 1039 1040 def datagram_received(self, data, addr): 1041 assert self.state == 'INITIALIZED', self.state 1042 self.nbytes += len(data) 1043 1044 def error_received(self, exc): 1045 assert self.state == 'INITIALIZED', self.state 1046 1047 def connection_lost(self, exc): 1048 assert self.state == 'INITIALIZED', self.state 1049 self.state = 'CLOSED' 1050 if self.done: 1051 self.done.set_result(None) 1052 1053 1054class BaseEventLoopWithSelectorTests(test_utils.TestCase): 1055 1056 def setUp(self): 1057 super().setUp() 1058 self.loop = asyncio.SelectorEventLoop() 1059 self.set_event_loop(self.loop) 1060 1061 @mock.patch('socket.getnameinfo') 1062 def test_getnameinfo(self, m_gai): 1063 m_gai.side_effect = lambda *args: 42 1064 r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123))) 1065 self.assertEqual(r, 42) 1066 1067 @patch_socket 1068 def test_create_connection_multiple_errors(self, m_socket): 1069 1070 class MyProto(asyncio.Protocol): 1071 pass 1072 1073 async def getaddrinfo(*args, **kw): 1074 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1075 (2, 1, 6, '', ('107.6.106.82', 80))] 1076 1077 def getaddrinfo_task(*args, **kwds): 1078 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1079 1080 idx = -1 1081 errors = ['err1', 'err2'] 1082 1083 def _socket(*args, **kw): 1084 nonlocal idx, errors 1085 idx += 1 1086 raise OSError(errors[idx]) 1087 1088 m_socket.socket = _socket 1089 1090 self.loop.getaddrinfo = getaddrinfo_task 1091 1092 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1093 with self.assertRaises(OSError) as cm: 1094 self.loop.run_until_complete(coro) 1095 1096 self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') 1097 1098 @patch_socket 1099 def test_create_connection_timeout(self, m_socket): 1100 # Ensure that the socket is closed on timeout 1101 sock = mock.Mock() 1102 m_socket.socket.return_value = sock 1103 1104 def getaddrinfo(*args, **kw): 1105 fut = self.loop.create_future() 1106 addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '', 1107 ('127.0.0.1', 80)) 1108 fut.set_result([addr]) 1109 return fut 1110 self.loop.getaddrinfo = getaddrinfo 1111 1112 with mock.patch.object(self.loop, 'sock_connect', 1113 side_effect=asyncio.TimeoutError): 1114 coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) 1115 with self.assertRaises(asyncio.TimeoutError): 1116 self.loop.run_until_complete(coro) 1117 self.assertTrue(sock.close.called) 1118 1119 def test_create_connection_host_port_sock(self): 1120 coro = self.loop.create_connection( 1121 MyProto, 'example.com', 80, sock=object()) 1122 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1123 1124 def test_create_connection_wrong_sock(self): 1125 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1126 with sock: 1127 coro = self.loop.create_connection(MyProto, sock=sock) 1128 with self.assertRaisesRegex(ValueError, 1129 'A Stream Socket was expected'): 1130 self.loop.run_until_complete(coro) 1131 1132 def test_create_server_wrong_sock(self): 1133 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1134 with sock: 1135 coro = self.loop.create_server(MyProto, sock=sock) 1136 with self.assertRaisesRegex(ValueError, 1137 'A Stream Socket was expected'): 1138 self.loop.run_until_complete(coro) 1139 1140 def test_create_server_ssl_timeout_for_plain_socket(self): 1141 coro = self.loop.create_server( 1142 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1143 with self.assertRaisesRegex( 1144 ValueError, 1145 'ssl_handshake_timeout is only meaningful with ssl'): 1146 self.loop.run_until_complete(coro) 1147 1148 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 1149 'no socket.SOCK_NONBLOCK (linux only)') 1150 def test_create_server_stream_bittype(self): 1151 sock = socket.socket( 1152 socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 1153 with sock: 1154 coro = self.loop.create_server(lambda: None, sock=sock) 1155 srv = self.loop.run_until_complete(coro) 1156 srv.close() 1157 self.loop.run_until_complete(srv.wait_closed()) 1158 1159 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 1160 def test_create_server_ipv6(self): 1161 async def main(): 1162 with self.assertWarns(DeprecationWarning): 1163 srv = await asyncio.start_server( 1164 lambda: None, '::1', 0, loop=self.loop) 1165 try: 1166 self.assertGreater(len(srv.sockets), 0) 1167 finally: 1168 srv.close() 1169 await srv.wait_closed() 1170 1171 try: 1172 self.loop.run_until_complete(main()) 1173 except OSError as ex: 1174 if (hasattr(errno, 'EADDRNOTAVAIL') and 1175 ex.errno == errno.EADDRNOTAVAIL): 1176 self.skipTest('failed to bind to ::1') 1177 else: 1178 raise 1179 1180 def test_create_datagram_endpoint_wrong_sock(self): 1181 sock = socket.socket(socket.AF_INET) 1182 with sock: 1183 coro = self.loop.create_datagram_endpoint(MyProto, sock=sock) 1184 with self.assertRaisesRegex(ValueError, 1185 'A UDP Socket was expected'): 1186 self.loop.run_until_complete(coro) 1187 1188 def test_create_connection_no_host_port_sock(self): 1189 coro = self.loop.create_connection(MyProto) 1190 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1191 1192 def test_create_connection_no_getaddrinfo(self): 1193 async def getaddrinfo(*args, **kw): 1194 return [] 1195 1196 def getaddrinfo_task(*args, **kwds): 1197 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1198 1199 self.loop.getaddrinfo = getaddrinfo_task 1200 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1201 self.assertRaises( 1202 OSError, self.loop.run_until_complete, coro) 1203 1204 def test_create_connection_connect_err(self): 1205 async def getaddrinfo(*args, **kw): 1206 return [(2, 1, 6, '', ('107.6.106.82', 80))] 1207 1208 def getaddrinfo_task(*args, **kwds): 1209 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1210 1211 self.loop.getaddrinfo = getaddrinfo_task 1212 self.loop.sock_connect = mock.Mock() 1213 self.loop.sock_connect.side_effect = OSError 1214 1215 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1216 self.assertRaises( 1217 OSError, self.loop.run_until_complete, coro) 1218 1219 def test_create_connection_multiple(self): 1220 async def getaddrinfo(*args, **kw): 1221 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1222 (2, 1, 6, '', ('0.0.0.2', 80))] 1223 1224 def getaddrinfo_task(*args, **kwds): 1225 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1226 1227 self.loop.getaddrinfo = getaddrinfo_task 1228 self.loop.sock_connect = mock.Mock() 1229 self.loop.sock_connect.side_effect = OSError 1230 1231 coro = self.loop.create_connection( 1232 MyProto, 'example.com', 80, family=socket.AF_INET) 1233 with self.assertRaises(OSError): 1234 self.loop.run_until_complete(coro) 1235 1236 @patch_socket 1237 def test_create_connection_multiple_errors_local_addr(self, m_socket): 1238 1239 def bind(addr): 1240 if addr[0] == '0.0.0.1': 1241 err = OSError('Err') 1242 err.strerror = 'Err' 1243 raise err 1244 1245 m_socket.socket.return_value.bind = bind 1246 1247 async def getaddrinfo(*args, **kw): 1248 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1249 (2, 1, 6, '', ('0.0.0.2', 80))] 1250 1251 def getaddrinfo_task(*args, **kwds): 1252 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1253 1254 self.loop.getaddrinfo = getaddrinfo_task 1255 self.loop.sock_connect = mock.Mock() 1256 self.loop.sock_connect.side_effect = OSError('Err2') 1257 1258 coro = self.loop.create_connection( 1259 MyProto, 'example.com', 80, family=socket.AF_INET, 1260 local_addr=(None, 8080)) 1261 with self.assertRaises(OSError) as cm: 1262 self.loop.run_until_complete(coro) 1263 1264 self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) 1265 self.assertTrue(m_socket.socket.return_value.close.called) 1266 1267 def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): 1268 # Test the fallback code, even if this system has inet_pton. 1269 if not allow_inet_pton: 1270 del m_socket.inet_pton 1271 1272 m_socket.getaddrinfo = socket.getaddrinfo 1273 sock = m_socket.socket.return_value 1274 1275 self.loop._add_reader = mock.Mock() 1276 self.loop._add_reader._is_coroutine = False 1277 self.loop._add_writer = mock.Mock() 1278 self.loop._add_writer._is_coroutine = False 1279 1280 coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) 1281 t, p = self.loop.run_until_complete(coro) 1282 try: 1283 sock.connect.assert_called_with(('1.2.3.4', 80)) 1284 _, kwargs = m_socket.socket.call_args 1285 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1286 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1287 finally: 1288 t.close() 1289 test_utils.run_briefly(self.loop) # allow transport to close 1290 1291 if socket_helper.IPV6_ENABLED: 1292 sock.family = socket.AF_INET6 1293 coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) 1294 t, p = self.loop.run_until_complete(coro) 1295 try: 1296 # Without inet_pton we use getaddrinfo, which transforms 1297 # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info, 1298 # scope id. 1299 [address] = sock.connect.call_args[0] 1300 host, port = address[:2] 1301 self.assertRegex(host, r'::(0\.)*1') 1302 self.assertEqual(port, 80) 1303 _, kwargs = m_socket.socket.call_args 1304 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1305 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1306 finally: 1307 t.close() 1308 test_utils.run_briefly(self.loop) # allow transport to close 1309 1310 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 1311 @unittest.skipIf(sys.platform.startswith('aix'), 1312 "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX") 1313 @patch_socket 1314 def test_create_connection_ipv6_scope(self, m_socket): 1315 m_socket.getaddrinfo = socket.getaddrinfo 1316 sock = m_socket.socket.return_value 1317 sock.family = socket.AF_INET6 1318 1319 self.loop._add_reader = mock.Mock() 1320 self.loop._add_reader._is_coroutine = False 1321 self.loop._add_writer = mock.Mock() 1322 self.loop._add_writer._is_coroutine = False 1323 1324 coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80) 1325 t, p = self.loop.run_until_complete(coro) 1326 try: 1327 sock.connect.assert_called_with(('fe80::1', 80, 0, 1)) 1328 _, kwargs = m_socket.socket.call_args 1329 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1330 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1331 finally: 1332 t.close() 1333 test_utils.run_briefly(self.loop) # allow transport to close 1334 1335 @patch_socket 1336 def test_create_connection_ip_addr(self, m_socket): 1337 self._test_create_connection_ip_addr(m_socket, True) 1338 1339 @patch_socket 1340 def test_create_connection_no_inet_pton(self, m_socket): 1341 self._test_create_connection_ip_addr(m_socket, False) 1342 1343 @patch_socket 1344 def test_create_connection_service_name(self, m_socket): 1345 m_socket.getaddrinfo = socket.getaddrinfo 1346 sock = m_socket.socket.return_value 1347 1348 self.loop._add_reader = mock.Mock() 1349 self.loop._add_reader._is_coroutine = False 1350 self.loop._add_writer = mock.Mock() 1351 self.loop._add_writer._is_coroutine = False 1352 1353 for service, port in ('http', 80), (b'http', 80): 1354 coro = self.loop.create_connection(asyncio.Protocol, 1355 '127.0.0.1', service) 1356 1357 t, p = self.loop.run_until_complete(coro) 1358 try: 1359 sock.connect.assert_called_with(('127.0.0.1', port)) 1360 _, kwargs = m_socket.socket.call_args 1361 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1362 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1363 finally: 1364 t.close() 1365 test_utils.run_briefly(self.loop) # allow transport to close 1366 1367 for service in 'nonsense', b'nonsense': 1368 coro = self.loop.create_connection(asyncio.Protocol, 1369 '127.0.0.1', service) 1370 1371 with self.assertRaises(OSError): 1372 self.loop.run_until_complete(coro) 1373 1374 def test_create_connection_no_local_addr(self): 1375 async def getaddrinfo(host, *args, **kw): 1376 if host == 'example.com': 1377 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1378 (2, 1, 6, '', ('107.6.106.82', 80))] 1379 else: 1380 return [] 1381 1382 def getaddrinfo_task(*args, **kwds): 1383 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1384 self.loop.getaddrinfo = getaddrinfo_task 1385 1386 coro = self.loop.create_connection( 1387 MyProto, 'example.com', 80, family=socket.AF_INET, 1388 local_addr=(None, 8080)) 1389 self.assertRaises( 1390 OSError, self.loop.run_until_complete, coro) 1391 1392 @patch_socket 1393 def test_create_connection_bluetooth(self, m_socket): 1394 # See http://bugs.python.org/issue27136, fallback to getaddrinfo when 1395 # we can't recognize an address is resolved, e.g. a Bluetooth address. 1396 addr = ('00:01:02:03:04:05', 1) 1397 1398 def getaddrinfo(host, port, *args, **kw): 1399 assert (host, port) == addr 1400 return [(999, 1, 999, '', (addr, 1))] 1401 1402 m_socket.getaddrinfo = getaddrinfo 1403 sock = m_socket.socket() 1404 coro = self.loop.sock_connect(sock, addr) 1405 self.loop.run_until_complete(coro) 1406 1407 def test_create_connection_ssl_server_hostname_default(self): 1408 self.loop.getaddrinfo = mock.Mock() 1409 1410 def mock_getaddrinfo(*args, **kwds): 1411 f = self.loop.create_future() 1412 f.set_result([(socket.AF_INET, socket.SOCK_STREAM, 1413 socket.SOL_TCP, '', ('1.2.3.4', 80))]) 1414 return f 1415 1416 self.loop.getaddrinfo.side_effect = mock_getaddrinfo 1417 self.loop.sock_connect = mock.Mock() 1418 self.loop.sock_connect.return_value = self.loop.create_future() 1419 self.loop.sock_connect.return_value.set_result(None) 1420 self.loop._make_ssl_transport = mock.Mock() 1421 1422 class _SelectorTransportMock: 1423 _sock = None 1424 1425 def get_extra_info(self, key): 1426 return mock.Mock() 1427 1428 def close(self): 1429 self._sock.close() 1430 1431 def mock_make_ssl_transport(sock, protocol, sslcontext, waiter, 1432 **kwds): 1433 waiter.set_result(None) 1434 transport = _SelectorTransportMock() 1435 transport._sock = sock 1436 return transport 1437 1438 self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport 1439 ANY = mock.ANY 1440 handshake_timeout = object() 1441 # First try the default server_hostname. 1442 self.loop._make_ssl_transport.reset_mock() 1443 coro = self.loop.create_connection( 1444 MyProto, 'python.org', 80, ssl=True, 1445 ssl_handshake_timeout=handshake_timeout) 1446 transport, _ = self.loop.run_until_complete(coro) 1447 transport.close() 1448 self.loop._make_ssl_transport.assert_called_with( 1449 ANY, ANY, ANY, ANY, 1450 server_side=False, 1451 server_hostname='python.org', 1452 ssl_handshake_timeout=handshake_timeout) 1453 # Next try an explicit server_hostname. 1454 self.loop._make_ssl_transport.reset_mock() 1455 coro = self.loop.create_connection( 1456 MyProto, 'python.org', 80, ssl=True, 1457 server_hostname='perl.com', 1458 ssl_handshake_timeout=handshake_timeout) 1459 transport, _ = self.loop.run_until_complete(coro) 1460 transport.close() 1461 self.loop._make_ssl_transport.assert_called_with( 1462 ANY, ANY, ANY, ANY, 1463 server_side=False, 1464 server_hostname='perl.com', 1465 ssl_handshake_timeout=handshake_timeout) 1466 # Finally try an explicit empty server_hostname. 1467 self.loop._make_ssl_transport.reset_mock() 1468 coro = self.loop.create_connection( 1469 MyProto, 'python.org', 80, ssl=True, 1470 server_hostname='', 1471 ssl_handshake_timeout=handshake_timeout) 1472 transport, _ = self.loop.run_until_complete(coro) 1473 transport.close() 1474 self.loop._make_ssl_transport.assert_called_with( 1475 ANY, ANY, ANY, ANY, 1476 server_side=False, 1477 server_hostname='', 1478 ssl_handshake_timeout=handshake_timeout) 1479 1480 def test_create_connection_no_ssl_server_hostname_errors(self): 1481 # When not using ssl, server_hostname must be None. 1482 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1483 server_hostname='') 1484 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1485 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1486 server_hostname='python.org') 1487 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1488 1489 def test_create_connection_ssl_server_hostname_errors(self): 1490 # When using ssl, server_hostname may be None if host is non-empty. 1491 coro = self.loop.create_connection(MyProto, '', 80, ssl=True) 1492 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1493 coro = self.loop.create_connection(MyProto, None, 80, ssl=True) 1494 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1495 sock = socket.socket() 1496 coro = self.loop.create_connection(MyProto, None, None, 1497 ssl=True, sock=sock) 1498 self.addCleanup(sock.close) 1499 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1500 1501 def test_create_connection_ssl_timeout_for_plain_socket(self): 1502 coro = self.loop.create_connection( 1503 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1504 with self.assertRaisesRegex( 1505 ValueError, 1506 'ssl_handshake_timeout is only meaningful with ssl'): 1507 self.loop.run_until_complete(coro) 1508 1509 def test_create_server_empty_host(self): 1510 # if host is empty string use None instead 1511 host = object() 1512 1513 async def getaddrinfo(*args, **kw): 1514 nonlocal host 1515 host = args[0] 1516 return [] 1517 1518 def getaddrinfo_task(*args, **kwds): 1519 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1520 1521 self.loop.getaddrinfo = getaddrinfo_task 1522 fut = self.loop.create_server(MyProto, '', 0) 1523 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1524 self.assertIsNone(host) 1525 1526 def test_create_server_host_port_sock(self): 1527 fut = self.loop.create_server( 1528 MyProto, '0.0.0.0', 0, sock=object()) 1529 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1530 1531 def test_create_server_no_host_port_sock(self): 1532 fut = self.loop.create_server(MyProto) 1533 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1534 1535 def test_create_server_no_getaddrinfo(self): 1536 getaddrinfo = self.loop.getaddrinfo = mock.Mock() 1537 getaddrinfo.return_value = self.loop.create_future() 1538 getaddrinfo.return_value.set_result(None) 1539 1540 f = self.loop.create_server(MyProto, 'python.org', 0) 1541 self.assertRaises(OSError, self.loop.run_until_complete, f) 1542 1543 @patch_socket 1544 def test_create_server_nosoreuseport(self, m_socket): 1545 m_socket.getaddrinfo = socket.getaddrinfo 1546 del m_socket.SO_REUSEPORT 1547 m_socket.socket.return_value = mock.Mock() 1548 1549 f = self.loop.create_server( 1550 MyProto, '0.0.0.0', 0, reuse_port=True) 1551 1552 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1553 1554 @patch_socket 1555 def test_create_server_soreuseport_only_defined(self, m_socket): 1556 m_socket.getaddrinfo = socket.getaddrinfo 1557 m_socket.socket.return_value = mock.Mock() 1558 m_socket.SO_REUSEPORT = -1 1559 1560 f = self.loop.create_server( 1561 MyProto, '0.0.0.0', 0, reuse_port=True) 1562 1563 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1564 1565 @patch_socket 1566 def test_create_server_cant_bind(self, m_socket): 1567 1568 class Err(OSError): 1569 strerror = 'error' 1570 1571 m_socket.getaddrinfo.return_value = [ 1572 (2, 1, 6, '', ('127.0.0.1', 10100))] 1573 m_socket.getaddrinfo._is_coroutine = False 1574 m_sock = m_socket.socket.return_value = mock.Mock() 1575 m_sock.bind.side_effect = Err 1576 1577 fut = self.loop.create_server(MyProto, '0.0.0.0', 0) 1578 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1579 self.assertTrue(m_sock.close.called) 1580 1581 @patch_socket 1582 def test_create_datagram_endpoint_no_addrinfo(self, m_socket): 1583 m_socket.getaddrinfo.return_value = [] 1584 m_socket.getaddrinfo._is_coroutine = False 1585 1586 coro = self.loop.create_datagram_endpoint( 1587 MyDatagramProto, local_addr=('localhost', 0)) 1588 self.assertRaises( 1589 OSError, self.loop.run_until_complete, coro) 1590 1591 def test_create_datagram_endpoint_addr_error(self): 1592 coro = self.loop.create_datagram_endpoint( 1593 MyDatagramProto, local_addr='localhost') 1594 self.assertRaises( 1595 AssertionError, self.loop.run_until_complete, coro) 1596 coro = self.loop.create_datagram_endpoint( 1597 MyDatagramProto, local_addr=('localhost', 1, 2, 3)) 1598 self.assertRaises( 1599 AssertionError, self.loop.run_until_complete, coro) 1600 1601 def test_create_datagram_endpoint_connect_err(self): 1602 self.loop.sock_connect = mock.Mock() 1603 self.loop.sock_connect.side_effect = OSError 1604 1605 coro = self.loop.create_datagram_endpoint( 1606 asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) 1607 self.assertRaises( 1608 OSError, self.loop.run_until_complete, coro) 1609 1610 def test_create_datagram_endpoint_allow_broadcast(self): 1611 protocol = MyDatagramProto(create_future=True, loop=self.loop) 1612 self.loop.sock_connect = sock_connect = mock.Mock() 1613 sock_connect.return_value = [] 1614 1615 coro = self.loop.create_datagram_endpoint( 1616 lambda: protocol, 1617 remote_addr=('127.0.0.1', 0), 1618 allow_broadcast=True) 1619 1620 transport, _ = self.loop.run_until_complete(coro) 1621 self.assertFalse(sock_connect.called) 1622 1623 transport.close() 1624 self.loop.run_until_complete(protocol.done) 1625 self.assertEqual('CLOSED', protocol.state) 1626 1627 @patch_socket 1628 def test_create_datagram_endpoint_socket_err(self, m_socket): 1629 m_socket.getaddrinfo = socket.getaddrinfo 1630 m_socket.socket.side_effect = OSError 1631 1632 coro = self.loop.create_datagram_endpoint( 1633 asyncio.DatagramProtocol, family=socket.AF_INET) 1634 self.assertRaises( 1635 OSError, self.loop.run_until_complete, coro) 1636 1637 coro = self.loop.create_datagram_endpoint( 1638 asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0)) 1639 self.assertRaises( 1640 OSError, self.loop.run_until_complete, coro) 1641 1642 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 1643 def test_create_datagram_endpoint_no_matching_family(self): 1644 coro = self.loop.create_datagram_endpoint( 1645 asyncio.DatagramProtocol, 1646 remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) 1647 self.assertRaises( 1648 ValueError, self.loop.run_until_complete, coro) 1649 1650 @patch_socket 1651 def test_create_datagram_endpoint_setblk_err(self, m_socket): 1652 m_socket.socket.return_value.setblocking.side_effect = OSError 1653 1654 coro = self.loop.create_datagram_endpoint( 1655 asyncio.DatagramProtocol, family=socket.AF_INET) 1656 self.assertRaises( 1657 OSError, self.loop.run_until_complete, coro) 1658 self.assertTrue( 1659 m_socket.socket.return_value.close.called) 1660 1661 def test_create_datagram_endpoint_noaddr_nofamily(self): 1662 coro = self.loop.create_datagram_endpoint( 1663 asyncio.DatagramProtocol) 1664 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1665 1666 @patch_socket 1667 def test_create_datagram_endpoint_cant_bind(self, m_socket): 1668 class Err(OSError): 1669 pass 1670 1671 m_socket.getaddrinfo = socket.getaddrinfo 1672 m_sock = m_socket.socket.return_value = mock.Mock() 1673 m_sock.bind.side_effect = Err 1674 1675 fut = self.loop.create_datagram_endpoint( 1676 MyDatagramProto, 1677 local_addr=('127.0.0.1', 0), family=socket.AF_INET) 1678 self.assertRaises(Err, self.loop.run_until_complete, fut) 1679 self.assertTrue(m_sock.close.called) 1680 1681 def test_create_datagram_endpoint_sock(self): 1682 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1683 sock.bind(('127.0.0.1', 0)) 1684 fut = self.loop.create_datagram_endpoint( 1685 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1686 sock=sock) 1687 transport, protocol = self.loop.run_until_complete(fut) 1688 transport.close() 1689 self.loop.run_until_complete(protocol.done) 1690 self.assertEqual('CLOSED', protocol.state) 1691 1692 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1693 def test_create_datagram_endpoint_sock_unix(self): 1694 fut = self.loop.create_datagram_endpoint( 1695 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1696 family=socket.AF_UNIX) 1697 transport, protocol = self.loop.run_until_complete(fut) 1698 assert transport._sock.family == socket.AF_UNIX 1699 transport.close() 1700 self.loop.run_until_complete(protocol.done) 1701 self.assertEqual('CLOSED', protocol.state) 1702 1703 @socket_helper.skip_unless_bind_unix_socket 1704 def test_create_datagram_endpoint_existing_sock_unix(self): 1705 with test_utils.unix_socket_path() as path: 1706 sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) 1707 sock.bind(path) 1708 sock.close() 1709 1710 coro = self.loop.create_datagram_endpoint( 1711 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1712 path, family=socket.AF_UNIX) 1713 transport, protocol = self.loop.run_until_complete(coro) 1714 transport.close() 1715 self.loop.run_until_complete(protocol.done) 1716 1717 def test_create_datagram_endpoint_sock_sockopts(self): 1718 class FakeSock: 1719 type = socket.SOCK_DGRAM 1720 1721 fut = self.loop.create_datagram_endpoint( 1722 MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) 1723 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1724 1725 fut = self.loop.create_datagram_endpoint( 1726 MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) 1727 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1728 1729 fut = self.loop.create_datagram_endpoint( 1730 MyDatagramProto, family=1, sock=FakeSock()) 1731 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1732 1733 fut = self.loop.create_datagram_endpoint( 1734 MyDatagramProto, proto=1, sock=FakeSock()) 1735 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1736 1737 fut = self.loop.create_datagram_endpoint( 1738 MyDatagramProto, flags=1, sock=FakeSock()) 1739 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1740 1741 fut = self.loop.create_datagram_endpoint( 1742 MyDatagramProto, reuse_port=True, sock=FakeSock()) 1743 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1744 1745 fut = self.loop.create_datagram_endpoint( 1746 MyDatagramProto, allow_broadcast=True, sock=FakeSock()) 1747 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1748 1749 def test_create_datagram_endpoint_sockopts(self): 1750 # Socket options should not be applied unless asked for. 1751 # SO_REUSEPORT is not available on all platforms. 1752 1753 coro = self.loop.create_datagram_endpoint( 1754 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1755 local_addr=('127.0.0.1', 0)) 1756 transport, protocol = self.loop.run_until_complete(coro) 1757 sock = transport.get_extra_info('socket') 1758 1759 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1760 1761 if reuseport_supported: 1762 self.assertFalse( 1763 sock.getsockopt( 1764 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1765 self.assertFalse( 1766 sock.getsockopt( 1767 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1768 1769 transport.close() 1770 self.loop.run_until_complete(protocol.done) 1771 self.assertEqual('CLOSED', protocol.state) 1772 1773 coro = self.loop.create_datagram_endpoint( 1774 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1775 local_addr=('127.0.0.1', 0), 1776 reuse_port=reuseport_supported, 1777 allow_broadcast=True) 1778 transport, protocol = self.loop.run_until_complete(coro) 1779 sock = transport.get_extra_info('socket') 1780 1781 self.assertFalse( 1782 sock.getsockopt( 1783 socket.SOL_SOCKET, socket.SO_REUSEADDR)) 1784 if reuseport_supported: 1785 self.assertTrue( 1786 sock.getsockopt( 1787 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1788 self.assertTrue( 1789 sock.getsockopt( 1790 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1791 1792 transport.close() 1793 self.loop.run_until_complete(protocol.done) 1794 self.assertEqual('CLOSED', protocol.state) 1795 1796 def test_create_datagram_endpoint_reuse_address_error(self): 1797 # bpo-37228: Ensure that explicit passing of `reuse_address=True` 1798 # raises an error, as it is not safe to use SO_REUSEADDR when using UDP 1799 1800 coro = self.loop.create_datagram_endpoint( 1801 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1802 local_addr=('127.0.0.1', 0), 1803 reuse_address=True) 1804 1805 with self.assertRaises(ValueError): 1806 self.loop.run_until_complete(coro) 1807 1808 def test_create_datagram_endpoint_reuse_address_warning(self): 1809 # bpo-37228: Deprecate *reuse_address* parameter 1810 1811 coro = self.loop.create_datagram_endpoint( 1812 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1813 local_addr=('127.0.0.1', 0), 1814 reuse_address=False) 1815 1816 with self.assertWarns(DeprecationWarning): 1817 transport, protocol = self.loop.run_until_complete(coro) 1818 transport.close() 1819 self.loop.run_until_complete(protocol.done) 1820 self.assertEqual('CLOSED', protocol.state) 1821 1822 @patch_socket 1823 def test_create_datagram_endpoint_nosoreuseport(self, m_socket): 1824 del m_socket.SO_REUSEPORT 1825 m_socket.socket.return_value = mock.Mock() 1826 1827 coro = self.loop.create_datagram_endpoint( 1828 lambda: MyDatagramProto(loop=self.loop), 1829 local_addr=('127.0.0.1', 0), 1830 reuse_port=True) 1831 1832 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1833 1834 @patch_socket 1835 def test_create_datagram_endpoint_ip_addr(self, m_socket): 1836 def getaddrinfo(*args, **kw): 1837 self.fail('should not have called getaddrinfo') 1838 1839 m_socket.getaddrinfo = getaddrinfo 1840 m_socket.socket.return_value.bind = bind = mock.Mock() 1841 self.loop._add_reader = mock.Mock() 1842 self.loop._add_reader._is_coroutine = False 1843 1844 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1845 coro = self.loop.create_datagram_endpoint( 1846 lambda: MyDatagramProto(loop=self.loop), 1847 local_addr=('1.2.3.4', 0), 1848 reuse_port=reuseport_supported) 1849 1850 t, p = self.loop.run_until_complete(coro) 1851 try: 1852 bind.assert_called_with(('1.2.3.4', 0)) 1853 m_socket.socket.assert_called_with(family=m_socket.AF_INET, 1854 proto=m_socket.IPPROTO_UDP, 1855 type=m_socket.SOCK_DGRAM) 1856 finally: 1857 t.close() 1858 test_utils.run_briefly(self.loop) # allow transport to close 1859 1860 def test_accept_connection_retry(self): 1861 sock = mock.Mock() 1862 sock.accept.side_effect = BlockingIOError() 1863 1864 self.loop._accept_connection(MyProto, sock) 1865 self.assertFalse(sock.close.called) 1866 1867 @mock.patch('asyncio.base_events.logger') 1868 def test_accept_connection_exception(self, m_log): 1869 sock = mock.Mock() 1870 sock.fileno.return_value = 10 1871 sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files') 1872 self.loop._remove_reader = mock.Mock() 1873 self.loop.call_later = mock.Mock() 1874 1875 self.loop._accept_connection(MyProto, sock) 1876 self.assertTrue(m_log.error.called) 1877 self.assertFalse(sock.close.called) 1878 self.loop._remove_reader.assert_called_with(10) 1879 self.loop.call_later.assert_called_with( 1880 constants.ACCEPT_RETRY_DELAY, 1881 # self.loop._start_serving 1882 mock.ANY, 1883 MyProto, sock, None, None, mock.ANY, mock.ANY) 1884 1885 def test_call_coroutine(self): 1886 with self.assertWarns(DeprecationWarning): 1887 @asyncio.coroutine 1888 def simple_coroutine(): 1889 pass 1890 1891 self.loop.set_debug(True) 1892 coro_func = simple_coroutine 1893 coro_obj = coro_func() 1894 self.addCleanup(coro_obj.close) 1895 for func in (coro_func, coro_obj): 1896 with self.assertRaises(TypeError): 1897 self.loop.call_soon(func) 1898 with self.assertRaises(TypeError): 1899 self.loop.call_soon_threadsafe(func) 1900 with self.assertRaises(TypeError): 1901 self.loop.call_later(60, func) 1902 with self.assertRaises(TypeError): 1903 self.loop.call_at(self.loop.time() + 60, func) 1904 with self.assertRaises(TypeError): 1905 self.loop.run_until_complete( 1906 self.loop.run_in_executor(None, func)) 1907 1908 @mock.patch('asyncio.base_events.logger') 1909 def test_log_slow_callbacks(self, m_logger): 1910 def stop_loop_cb(loop): 1911 loop.stop() 1912 1913 async def stop_loop_coro(loop): 1914 loop.stop() 1915 1916 asyncio.set_event_loop(self.loop) 1917 self.loop.set_debug(True) 1918 self.loop.slow_callback_duration = 0.0 1919 1920 # slow callback 1921 self.loop.call_soon(stop_loop_cb, self.loop) 1922 self.loop.run_forever() 1923 fmt, *args = m_logger.warning.call_args[0] 1924 self.assertRegex(fmt % tuple(args), 1925 "^Executing <Handle.*stop_loop_cb.*> " 1926 "took .* seconds$") 1927 1928 # slow task 1929 asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop) 1930 self.loop.run_forever() 1931 fmt, *args = m_logger.warning.call_args[0] 1932 self.assertRegex(fmt % tuple(args), 1933 "^Executing <Task.*stop_loop_coro.*> " 1934 "took .* seconds$") 1935 1936 1937class RunningLoopTests(unittest.TestCase): 1938 1939 def test_running_loop_within_a_loop(self): 1940 async def runner(loop): 1941 loop.run_forever() 1942 1943 loop = asyncio.new_event_loop() 1944 outer_loop = asyncio.new_event_loop() 1945 try: 1946 with self.assertRaisesRegex(RuntimeError, 1947 'while another loop is running'): 1948 outer_loop.run_until_complete(runner(loop)) 1949 finally: 1950 loop.close() 1951 outer_loop.close() 1952 1953 1954class BaseLoopSockSendfileTests(test_utils.TestCase): 1955 1956 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 1957 1958 class MyProto(asyncio.Protocol): 1959 1960 def __init__(self, loop): 1961 self.started = False 1962 self.closed = False 1963 self.data = bytearray() 1964 self.fut = loop.create_future() 1965 self.transport = None 1966 1967 def connection_made(self, transport): 1968 self.started = True 1969 self.transport = transport 1970 1971 def data_received(self, data): 1972 self.data.extend(data) 1973 1974 def connection_lost(self, exc): 1975 self.closed = True 1976 self.fut.set_result(None) 1977 self.transport = None 1978 1979 async def wait_closed(self): 1980 await self.fut 1981 1982 @classmethod 1983 def setUpClass(cls): 1984 cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE 1985 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 1986 with open(support.TESTFN, 'wb') as fp: 1987 fp.write(cls.DATA) 1988 super().setUpClass() 1989 1990 @classmethod 1991 def tearDownClass(cls): 1992 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize 1993 support.unlink(support.TESTFN) 1994 super().tearDownClass() 1995 1996 def setUp(self): 1997 from asyncio.selector_events import BaseSelectorEventLoop 1998 # BaseSelectorEventLoop() has no native implementation 1999 self.loop = BaseSelectorEventLoop() 2000 self.set_event_loop(self.loop) 2001 self.file = open(support.TESTFN, 'rb') 2002 self.addCleanup(self.file.close) 2003 super().setUp() 2004 2005 def make_socket(self, blocking=False): 2006 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2007 sock.setblocking(blocking) 2008 self.addCleanup(sock.close) 2009 return sock 2010 2011 def run_loop(self, coro): 2012 return self.loop.run_until_complete(coro) 2013 2014 def prepare(self): 2015 sock = self.make_socket() 2016 proto = self.MyProto(self.loop) 2017 server = self.run_loop(self.loop.create_server( 2018 lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET)) 2019 addr = server.sockets[0].getsockname() 2020 2021 for _ in range(10): 2022 try: 2023 self.run_loop(self.loop.sock_connect(sock, addr)) 2024 except OSError: 2025 self.run_loop(asyncio.sleep(0.5)) 2026 continue 2027 else: 2028 break 2029 else: 2030 # One last try, so we get the exception 2031 self.run_loop(self.loop.sock_connect(sock, addr)) 2032 2033 def cleanup(): 2034 server.close() 2035 self.run_loop(server.wait_closed()) 2036 sock.close() 2037 if proto.transport is not None: 2038 proto.transport.close() 2039 self.run_loop(proto.wait_closed()) 2040 2041 self.addCleanup(cleanup) 2042 2043 return sock, proto 2044 2045 def test__sock_sendfile_native_failure(self): 2046 sock, proto = self.prepare() 2047 2048 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2049 "sendfile is not available"): 2050 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 2051 0, None)) 2052 2053 self.assertEqual(proto.data, b'') 2054 self.assertEqual(self.file.tell(), 0) 2055 2056 def test_sock_sendfile_no_fallback(self): 2057 sock, proto = self.prepare() 2058 2059 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2060 "sendfile is not available"): 2061 self.run_loop(self.loop.sock_sendfile(sock, self.file, 2062 fallback=False)) 2063 2064 self.assertEqual(self.file.tell(), 0) 2065 self.assertEqual(proto.data, b'') 2066 2067 def test_sock_sendfile_fallback(self): 2068 sock, proto = self.prepare() 2069 2070 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2071 sock.close() 2072 self.run_loop(proto.wait_closed()) 2073 2074 self.assertEqual(ret, len(self.DATA)) 2075 self.assertEqual(self.file.tell(), len(self.DATA)) 2076 self.assertEqual(proto.data, self.DATA) 2077 2078 def test_sock_sendfile_fallback_offset_and_count(self): 2079 sock, proto = self.prepare() 2080 2081 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, 2082 1000, 2000)) 2083 sock.close() 2084 self.run_loop(proto.wait_closed()) 2085 2086 self.assertEqual(ret, 2000) 2087 self.assertEqual(self.file.tell(), 3000) 2088 self.assertEqual(proto.data, self.DATA[1000:3000]) 2089 2090 def test_blocking_socket(self): 2091 self.loop.set_debug(True) 2092 sock = self.make_socket(blocking=True) 2093 with self.assertRaisesRegex(ValueError, "must be non-blocking"): 2094 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2095 2096 def test_nonbinary_file(self): 2097 sock = self.make_socket() 2098 with open(support.TESTFN, 'r') as f: 2099 with self.assertRaisesRegex(ValueError, "binary mode"): 2100 self.run_loop(self.loop.sock_sendfile(sock, f)) 2101 2102 def test_nonstream_socket(self): 2103 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 2104 sock.setblocking(False) 2105 self.addCleanup(sock.close) 2106 with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"): 2107 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2108 2109 def test_notint_count(self): 2110 sock = self.make_socket() 2111 with self.assertRaisesRegex(TypeError, 2112 "count must be a positive integer"): 2113 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count')) 2114 2115 def test_negative_count(self): 2116 sock = self.make_socket() 2117 with self.assertRaisesRegex(ValueError, 2118 "count must be a positive integer"): 2119 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1)) 2120 2121 def test_notint_offset(self): 2122 sock = self.make_socket() 2123 with self.assertRaisesRegex(TypeError, 2124 "offset must be a non-negative integer"): 2125 self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset')) 2126 2127 def test_negative_offset(self): 2128 sock = self.make_socket() 2129 with self.assertRaisesRegex(ValueError, 2130 "offset must be a non-negative integer"): 2131 self.run_loop(self.loop.sock_sendfile(sock, self.file, -1)) 2132 2133 2134class TestSelectorUtils(test_utils.TestCase): 2135 def check_set_nodelay(self, sock): 2136 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2137 self.assertFalse(opt) 2138 2139 base_events._set_nodelay(sock) 2140 2141 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2142 self.assertTrue(opt) 2143 2144 @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'), 2145 'need socket.TCP_NODELAY') 2146 def test_set_nodelay(self): 2147 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2148 proto=socket.IPPROTO_TCP) 2149 with sock: 2150 self.check_set_nodelay(sock) 2151 2152 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2153 proto=socket.IPPROTO_TCP) 2154 with sock: 2155 sock.setblocking(False) 2156 self.check_set_nodelay(sock) 2157 2158 2159 2160if __name__ == '__main__': 2161 unittest.main() 2162