1"""Tests for base_events.py""" 2 3import concurrent.futures 4import errno 5import math 6import socket 7import sys 8import threading 9import time 10import unittest 11from unittest import mock 12 13import asyncio 14from asyncio import base_events 15from asyncio import constants 16from test.test_asyncio import utils as test_utils 17from test import support 18from test.support.script_helper import assert_python_ok 19from test.support import os_helper 20from test.support import socket_helper 21 22 23MOCK_ANY = mock.ANY 24PY34 = sys.version_info >= (3, 4) 25 26 27def tearDownModule(): 28 asyncio.set_event_loop_policy(None) 29 30 31def mock_socket_module(): 32 m_socket = mock.MagicMock(spec=socket) 33 for name in ( 34 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP', 35 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton' 36 ): 37 if hasattr(socket, name): 38 setattr(m_socket, name, getattr(socket, name)) 39 else: 40 delattr(m_socket, name) 41 42 m_socket.socket = mock.MagicMock() 43 m_socket.socket.return_value = test_utils.mock_nonblocking_socket() 44 m_socket.getaddrinfo._is_coroutine = False 45 46 return m_socket 47 48 49def patch_socket(f): 50 return mock.patch('asyncio.base_events.socket', 51 new_callable=mock_socket_module)(f) 52 53 54class BaseEventTests(test_utils.TestCase): 55 56 def test_ipaddr_info(self): 57 UNSPEC = socket.AF_UNSPEC 58 INET = socket.AF_INET 59 INET6 = socket.AF_INET6 60 STREAM = socket.SOCK_STREAM 61 DGRAM = socket.SOCK_DGRAM 62 TCP = socket.IPPROTO_TCP 63 UDP = socket.IPPROTO_UDP 64 65 self.assertEqual( 66 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 67 base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP)) 68 69 self.assertEqual( 70 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 71 base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP)) 72 73 self.assertEqual( 74 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 75 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP)) 76 77 self.assertEqual( 78 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 79 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP)) 80 81 # Socket type STREAM implies TCP protocol. 82 self.assertEqual( 83 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 84 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0)) 85 86 # Socket type DGRAM implies UDP protocol. 87 self.assertEqual( 88 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 89 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0)) 90 91 # No socket type. 92 self.assertIsNone( 93 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) 94 95 if socket_helper.IPV6_ENABLED: 96 # IPv4 address with family IPv6. 97 self.assertIsNone( 98 base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) 99 100 self.assertEqual( 101 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 102 base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP)) 103 104 self.assertEqual( 105 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 106 base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP)) 107 108 # IPv6 address with family IPv4. 109 self.assertIsNone( 110 base_events._ipaddr_info('::3', 1, INET, STREAM, TCP)) 111 112 # IPv6 address with zone index. 113 self.assertIsNone( 114 base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP)) 115 116 def test_port_parameter_types(self): 117 # Test obscure kinds of arguments for "port". 118 INET = socket.AF_INET 119 STREAM = socket.SOCK_STREAM 120 TCP = socket.IPPROTO_TCP 121 122 self.assertEqual( 123 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 124 base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP)) 125 126 self.assertEqual( 127 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 128 base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP)) 129 130 self.assertEqual( 131 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 132 base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP)) 133 134 self.assertEqual( 135 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 136 base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP)) 137 138 self.assertEqual( 139 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 140 base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP)) 141 142 @patch_socket 143 def test_ipaddr_info_no_inet_pton(self, m_socket): 144 del m_socket.inet_pton 145 self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1, 146 socket.AF_INET, 147 socket.SOCK_STREAM, 148 socket.IPPROTO_TCP)) 149 150 151class BaseEventLoopTests(test_utils.TestCase): 152 153 def setUp(self): 154 super().setUp() 155 self.loop = base_events.BaseEventLoop() 156 self.loop._selector = mock.Mock() 157 self.loop._selector.select.return_value = () 158 self.set_event_loop(self.loop) 159 160 def test_not_implemented(self): 161 m = mock.Mock() 162 self.assertRaises( 163 NotImplementedError, 164 self.loop._make_socket_transport, m, m) 165 self.assertRaises( 166 NotImplementedError, 167 self.loop._make_ssl_transport, m, m, m, m) 168 self.assertRaises( 169 NotImplementedError, 170 self.loop._make_datagram_transport, m, m) 171 self.assertRaises( 172 NotImplementedError, self.loop._process_events, []) 173 self.assertRaises( 174 NotImplementedError, self.loop._write_to_self) 175 self.assertRaises( 176 NotImplementedError, 177 self.loop._make_read_pipe_transport, m, m) 178 self.assertRaises( 179 NotImplementedError, 180 self.loop._make_write_pipe_transport, m, m) 181 gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m) 182 with self.assertRaises(NotImplementedError): 183 gen.send(None) 184 185 def test_close(self): 186 self.assertFalse(self.loop.is_closed()) 187 self.loop.close() 188 self.assertTrue(self.loop.is_closed()) 189 190 # it should be possible to call close() more than once 191 self.loop.close() 192 self.loop.close() 193 194 # operation blocked when the loop is closed 195 f = self.loop.create_future() 196 self.assertRaises(RuntimeError, self.loop.run_forever) 197 self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 198 199 def test__add_callback_handle(self): 200 h = asyncio.Handle(lambda: False, (), self.loop, None) 201 202 self.loop._add_callback(h) 203 self.assertFalse(self.loop._scheduled) 204 self.assertIn(h, self.loop._ready) 205 206 def test__add_callback_cancelled_handle(self): 207 h = asyncio.Handle(lambda: False, (), self.loop, None) 208 h.cancel() 209 210 self.loop._add_callback(h) 211 self.assertFalse(self.loop._scheduled) 212 self.assertFalse(self.loop._ready) 213 214 def test_set_default_executor(self): 215 class DummyExecutor(concurrent.futures.ThreadPoolExecutor): 216 def submit(self, fn, *args, **kwargs): 217 raise NotImplementedError( 218 'cannot submit into a dummy executor') 219 220 self.loop._process_events = mock.Mock() 221 self.loop._write_to_self = mock.Mock() 222 223 executor = DummyExecutor() 224 self.loop.set_default_executor(executor) 225 self.assertIs(executor, self.loop._default_executor) 226 227 def test_set_default_executor_error(self): 228 executor = mock.Mock() 229 230 msg = 'executor must be ThreadPoolExecutor instance' 231 with self.assertRaisesRegex(TypeError, msg): 232 self.loop.set_default_executor(executor) 233 234 self.assertIsNone(self.loop._default_executor) 235 236 def test_call_soon(self): 237 def cb(): 238 pass 239 240 h = self.loop.call_soon(cb) 241 self.assertEqual(h._callback, cb) 242 self.assertIsInstance(h, asyncio.Handle) 243 self.assertIn(h, self.loop._ready) 244 245 def test_call_soon_non_callable(self): 246 self.loop.set_debug(True) 247 with self.assertRaisesRegex(TypeError, 'a callable object'): 248 self.loop.call_soon(1) 249 250 def test_call_later(self): 251 def cb(): 252 pass 253 254 h = self.loop.call_later(10.0, cb) 255 self.assertIsInstance(h, asyncio.TimerHandle) 256 self.assertIn(h, self.loop._scheduled) 257 self.assertNotIn(h, self.loop._ready) 258 with self.assertRaises(TypeError, msg="delay must not be None"): 259 self.loop.call_later(None, cb) 260 261 def test_call_later_negative_delays(self): 262 calls = [] 263 264 def cb(arg): 265 calls.append(arg) 266 267 self.loop._process_events = mock.Mock() 268 self.loop.call_later(-1, cb, 'a') 269 self.loop.call_later(-2, cb, 'b') 270 test_utils.run_briefly(self.loop) 271 self.assertEqual(calls, ['b', 'a']) 272 273 def test_time_and_call_at(self): 274 def cb(): 275 self.loop.stop() 276 277 self.loop._process_events = mock.Mock() 278 delay = 0.1 279 280 when = self.loop.time() + delay 281 self.loop.call_at(when, cb) 282 t0 = self.loop.time() 283 self.loop.run_forever() 284 dt = self.loop.time() - t0 285 286 # 50 ms: maximum granularity of the event loop 287 self.assertGreaterEqual(dt, delay - 0.050, dt) 288 # tolerate a difference of +800 ms because some Python buildbots 289 # are really slow 290 self.assertLessEqual(dt, 0.9, dt) 291 with self.assertRaises(TypeError, msg="when cannot be None"): 292 self.loop.call_at(None, cb) 293 294 def check_thread(self, loop, debug): 295 def cb(): 296 pass 297 298 loop.set_debug(debug) 299 if debug: 300 msg = ("Non-thread-safe operation invoked on an event loop other " 301 "than the current one") 302 with self.assertRaisesRegex(RuntimeError, msg): 303 loop.call_soon(cb) 304 with self.assertRaisesRegex(RuntimeError, msg): 305 loop.call_later(60, cb) 306 with self.assertRaisesRegex(RuntimeError, msg): 307 loop.call_at(loop.time() + 60, cb) 308 else: 309 loop.call_soon(cb) 310 loop.call_later(60, cb) 311 loop.call_at(loop.time() + 60, cb) 312 313 def test_check_thread(self): 314 def check_in_thread(loop, event, debug, create_loop, fut): 315 # wait until the event loop is running 316 event.wait() 317 318 try: 319 if create_loop: 320 loop2 = base_events.BaseEventLoop() 321 try: 322 asyncio.set_event_loop(loop2) 323 self.check_thread(loop, debug) 324 finally: 325 asyncio.set_event_loop(None) 326 loop2.close() 327 else: 328 self.check_thread(loop, debug) 329 except Exception as exc: 330 loop.call_soon_threadsafe(fut.set_exception, exc) 331 else: 332 loop.call_soon_threadsafe(fut.set_result, None) 333 334 def test_thread(loop, debug, create_loop=False): 335 event = threading.Event() 336 fut = loop.create_future() 337 loop.call_soon(event.set) 338 args = (loop, event, debug, create_loop, fut) 339 thread = threading.Thread(target=check_in_thread, args=args) 340 thread.start() 341 loop.run_until_complete(fut) 342 thread.join() 343 344 self.loop._process_events = mock.Mock() 345 self.loop._write_to_self = mock.Mock() 346 347 # raise RuntimeError if the thread has no event loop 348 test_thread(self.loop, True) 349 350 # check disabled if debug mode is disabled 351 test_thread(self.loop, False) 352 353 # raise RuntimeError if the event loop of the thread is not the called 354 # event loop 355 test_thread(self.loop, True, create_loop=True) 356 357 # check disabled if debug mode is disabled 358 test_thread(self.loop, False, create_loop=True) 359 360 def test__run_once(self): 361 h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (), 362 self.loop, None) 363 h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (), 364 self.loop, None) 365 366 h1.cancel() 367 368 self.loop._process_events = mock.Mock() 369 self.loop._scheduled.append(h1) 370 self.loop._scheduled.append(h2) 371 self.loop._run_once() 372 373 t = self.loop._selector.select.call_args[0][0] 374 self.assertTrue(9.5 < t < 10.5, t) 375 self.assertEqual([h2], self.loop._scheduled) 376 self.assertTrue(self.loop._process_events.called) 377 378 def test_set_debug(self): 379 self.loop.set_debug(True) 380 self.assertTrue(self.loop.get_debug()) 381 self.loop.set_debug(False) 382 self.assertFalse(self.loop.get_debug()) 383 384 def test__run_once_schedule_handle(self): 385 handle = None 386 processed = False 387 388 def cb(loop): 389 nonlocal processed, handle 390 processed = True 391 handle = loop.call_soon(lambda: True) 392 393 h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,), 394 self.loop, None) 395 396 self.loop._process_events = mock.Mock() 397 self.loop._scheduled.append(h) 398 self.loop._run_once() 399 400 self.assertTrue(processed) 401 self.assertEqual([handle], list(self.loop._ready)) 402 403 def test__run_once_cancelled_event_cleanup(self): 404 self.loop._process_events = mock.Mock() 405 406 self.assertTrue( 407 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) 408 409 def cb(): 410 pass 411 412 # Set up one "blocking" event that will not be cancelled to 413 # ensure later cancelled events do not make it to the head 414 # of the queue and get cleaned. 415 not_cancelled_count = 1 416 self.loop.call_later(3000, cb) 417 418 # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) 419 # cancelled handles, ensure they aren't removed 420 421 cancelled_count = 2 422 for x in range(2): 423 h = self.loop.call_later(3600, cb) 424 h.cancel() 425 426 # Add some cancelled events that will be at head and removed 427 cancelled_count += 2 428 for x in range(2): 429 h = self.loop.call_later(100, cb) 430 h.cancel() 431 432 # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low 433 self.assertLessEqual(cancelled_count + not_cancelled_count, 434 base_events._MIN_SCHEDULED_TIMER_HANDLES) 435 436 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 437 438 self.loop._run_once() 439 440 cancelled_count -= 2 441 442 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 443 444 self.assertEqual(len(self.loop._scheduled), 445 cancelled_count + not_cancelled_count) 446 447 # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION 448 # so that deletion of cancelled events will occur on next _run_once 449 add_cancel_count = int(math.ceil( 450 base_events._MIN_SCHEDULED_TIMER_HANDLES * 451 base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 452 453 add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - 454 add_cancel_count, 0) 455 456 # Add some events that will not be cancelled 457 not_cancelled_count += add_not_cancel_count 458 for x in range(add_not_cancel_count): 459 self.loop.call_later(3600, cb) 460 461 # Add enough cancelled events 462 cancelled_count += add_cancel_count 463 for x in range(add_cancel_count): 464 h = self.loop.call_later(3600, cb) 465 h.cancel() 466 467 # Ensure all handles are still scheduled 468 self.assertEqual(len(self.loop._scheduled), 469 cancelled_count + not_cancelled_count) 470 471 self.loop._run_once() 472 473 # Ensure cancelled events were removed 474 self.assertEqual(len(self.loop._scheduled), not_cancelled_count) 475 476 # Ensure only uncancelled events remain scheduled 477 self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) 478 479 def test_run_until_complete_type_error(self): 480 self.assertRaises(TypeError, 481 self.loop.run_until_complete, 'blah') 482 483 def test_run_until_complete_loop(self): 484 task = self.loop.create_future() 485 other_loop = self.new_test_loop() 486 self.addCleanup(other_loop.close) 487 self.assertRaises(ValueError, 488 other_loop.run_until_complete, task) 489 490 def test_run_until_complete_loop_orphan_future_close_loop(self): 491 class ShowStopper(SystemExit): 492 pass 493 494 async def foo(delay): 495 await asyncio.sleep(delay) 496 497 def throw(): 498 raise ShowStopper 499 500 self.loop._process_events = mock.Mock() 501 self.loop.call_soon(throw) 502 with self.assertRaises(ShowStopper): 503 self.loop.run_until_complete(foo(0.1)) 504 505 # This call fails if run_until_complete does not clean up 506 # done-callback for the previous future. 507 self.loop.run_until_complete(foo(0.2)) 508 509 def test_subprocess_exec_invalid_args(self): 510 args = [sys.executable, '-c', 'pass'] 511 512 # missing program parameter (empty args) 513 self.assertRaises(TypeError, 514 self.loop.run_until_complete, self.loop.subprocess_exec, 515 asyncio.SubprocessProtocol) 516 517 # expected multiple arguments, not a list 518 self.assertRaises(TypeError, 519 self.loop.run_until_complete, self.loop.subprocess_exec, 520 asyncio.SubprocessProtocol, args) 521 522 # program arguments must be strings, not int 523 self.assertRaises(TypeError, 524 self.loop.run_until_complete, self.loop.subprocess_exec, 525 asyncio.SubprocessProtocol, sys.executable, 123) 526 527 # universal_newlines, shell, bufsize must not be set 528 self.assertRaises(TypeError, 529 self.loop.run_until_complete, self.loop.subprocess_exec, 530 asyncio.SubprocessProtocol, *args, universal_newlines=True) 531 self.assertRaises(TypeError, 532 self.loop.run_until_complete, self.loop.subprocess_exec, 533 asyncio.SubprocessProtocol, *args, shell=True) 534 self.assertRaises(TypeError, 535 self.loop.run_until_complete, self.loop.subprocess_exec, 536 asyncio.SubprocessProtocol, *args, bufsize=4096) 537 538 def test_subprocess_shell_invalid_args(self): 539 # expected a string, not an int or a list 540 self.assertRaises(TypeError, 541 self.loop.run_until_complete, self.loop.subprocess_shell, 542 asyncio.SubprocessProtocol, 123) 543 self.assertRaises(TypeError, 544 self.loop.run_until_complete, self.loop.subprocess_shell, 545 asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass']) 546 547 # universal_newlines, shell, bufsize must not be set 548 self.assertRaises(TypeError, 549 self.loop.run_until_complete, self.loop.subprocess_shell, 550 asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True) 551 self.assertRaises(TypeError, 552 self.loop.run_until_complete, self.loop.subprocess_shell, 553 asyncio.SubprocessProtocol, 'exit 0', shell=True) 554 self.assertRaises(TypeError, 555 self.loop.run_until_complete, self.loop.subprocess_shell, 556 asyncio.SubprocessProtocol, 'exit 0', bufsize=4096) 557 558 def test_default_exc_handler_callback(self): 559 self.loop._process_events = mock.Mock() 560 561 def zero_error(fut): 562 fut.set_result(True) 563 1/0 564 565 # Test call_soon (events.Handle) 566 with mock.patch('asyncio.base_events.logger') as log: 567 fut = self.loop.create_future() 568 self.loop.call_soon(zero_error, fut) 569 fut.add_done_callback(lambda fut: self.loop.stop()) 570 self.loop.run_forever() 571 log.error.assert_called_with( 572 test_utils.MockPattern('Exception in callback.*zero'), 573 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 574 575 # Test call_later (events.TimerHandle) 576 with mock.patch('asyncio.base_events.logger') as log: 577 fut = self.loop.create_future() 578 self.loop.call_later(0.01, zero_error, fut) 579 fut.add_done_callback(lambda fut: self.loop.stop()) 580 self.loop.run_forever() 581 log.error.assert_called_with( 582 test_utils.MockPattern('Exception in callback.*zero'), 583 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 584 585 def test_default_exc_handler_coro(self): 586 self.loop._process_events = mock.Mock() 587 588 async def zero_error_coro(): 589 await asyncio.sleep(0.01) 590 1/0 591 592 # Test Future.__del__ 593 with mock.patch('asyncio.base_events.logger') as log: 594 fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) 595 fut.add_done_callback(lambda *args: self.loop.stop()) 596 self.loop.run_forever() 597 fut = None # Trigger Future.__del__ or futures._TracebackLogger 598 support.gc_collect() 599 if PY34: 600 # Future.__del__ in Python 3.4 logs error with 601 # an actual exception context 602 log.error.assert_called_with( 603 test_utils.MockPattern('.*exception was never retrieved'), 604 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 605 else: 606 # futures._TracebackLogger logs only textual traceback 607 log.error.assert_called_with( 608 test_utils.MockPattern( 609 '.*exception was never retrieved.*ZeroDiv'), 610 exc_info=False) 611 612 def test_set_exc_handler_invalid(self): 613 with self.assertRaisesRegex(TypeError, 'A callable object or None'): 614 self.loop.set_exception_handler('spam') 615 616 def test_set_exc_handler_custom(self): 617 def zero_error(): 618 1/0 619 620 def run_loop(): 621 handle = self.loop.call_soon(zero_error) 622 self.loop._run_once() 623 return handle 624 625 self.loop.set_debug(True) 626 self.loop._process_events = mock.Mock() 627 628 self.assertIsNone(self.loop.get_exception_handler()) 629 mock_handler = mock.Mock() 630 self.loop.set_exception_handler(mock_handler) 631 self.assertIs(self.loop.get_exception_handler(), mock_handler) 632 handle = run_loop() 633 mock_handler.assert_called_with(self.loop, { 634 'exception': MOCK_ANY, 635 'message': test_utils.MockPattern( 636 'Exception in callback.*zero_error'), 637 'handle': handle, 638 'source_traceback': handle._source_traceback, 639 }) 640 mock_handler.reset_mock() 641 642 self.loop.set_exception_handler(None) 643 with mock.patch('asyncio.base_events.logger') as log: 644 run_loop() 645 log.error.assert_called_with( 646 test_utils.MockPattern( 647 'Exception in callback.*zero'), 648 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 649 650 assert not mock_handler.called 651 652 def test_set_exc_handler_broken(self): 653 def run_loop(): 654 def zero_error(): 655 1/0 656 self.loop.call_soon(zero_error) 657 self.loop._run_once() 658 659 def handler(loop, context): 660 raise AttributeError('spam') 661 662 self.loop._process_events = mock.Mock() 663 664 self.loop.set_exception_handler(handler) 665 666 with mock.patch('asyncio.base_events.logger') as log: 667 run_loop() 668 log.error.assert_called_with( 669 test_utils.MockPattern( 670 'Unhandled error in exception handler'), 671 exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) 672 673 def test_default_exc_handler_broken(self): 674 _context = None 675 676 class Loop(base_events.BaseEventLoop): 677 678 _selector = mock.Mock() 679 _process_events = mock.Mock() 680 681 def default_exception_handler(self, context): 682 nonlocal _context 683 _context = context 684 # Simulates custom buggy "default_exception_handler" 685 raise ValueError('spam') 686 687 loop = Loop() 688 self.addCleanup(loop.close) 689 asyncio.set_event_loop(loop) 690 691 def run_loop(): 692 def zero_error(): 693 1/0 694 loop.call_soon(zero_error) 695 loop._run_once() 696 697 with mock.patch('asyncio.base_events.logger') as log: 698 run_loop() 699 log.error.assert_called_with( 700 'Exception in default exception handler', 701 exc_info=True) 702 703 def custom_handler(loop, context): 704 raise ValueError('ham') 705 706 _context = None 707 loop.set_exception_handler(custom_handler) 708 with mock.patch('asyncio.base_events.logger') as log: 709 run_loop() 710 log.error.assert_called_with( 711 test_utils.MockPattern('Exception in default exception.*' 712 'while handling.*in custom'), 713 exc_info=True) 714 715 # Check that original context was passed to default 716 # exception handler. 717 self.assertIn('context', _context) 718 self.assertIs(type(_context['context']['exception']), 719 ZeroDivisionError) 720 721 def test_set_task_factory_invalid(self): 722 with self.assertRaisesRegex( 723 TypeError, 'task factory must be a callable or None'): 724 725 self.loop.set_task_factory(1) 726 727 self.assertIsNone(self.loop.get_task_factory()) 728 729 def test_set_task_factory(self): 730 self.loop._process_events = mock.Mock() 731 732 class MyTask(asyncio.Task): 733 pass 734 735 async def coro(): 736 pass 737 738 factory = lambda loop, coro: MyTask(coro, loop=loop) 739 740 self.assertIsNone(self.loop.get_task_factory()) 741 self.loop.set_task_factory(factory) 742 self.assertIs(self.loop.get_task_factory(), factory) 743 744 task = self.loop.create_task(coro()) 745 self.assertTrue(isinstance(task, MyTask)) 746 self.loop.run_until_complete(task) 747 748 self.loop.set_task_factory(None) 749 self.assertIsNone(self.loop.get_task_factory()) 750 751 task = self.loop.create_task(coro()) 752 self.assertTrue(isinstance(task, asyncio.Task)) 753 self.assertFalse(isinstance(task, MyTask)) 754 self.loop.run_until_complete(task) 755 756 def test_env_var_debug(self): 757 code = '\n'.join(( 758 'import asyncio', 759 'loop = asyncio.get_event_loop()', 760 'print(loop.get_debug())')) 761 762 # Test with -E to not fail if the unit test was run with 763 # PYTHONASYNCIODEBUG set to a non-empty string 764 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 765 self.assertEqual(stdout.rstrip(), b'False') 766 767 sts, stdout, stderr = assert_python_ok('-c', code, 768 PYTHONASYNCIODEBUG='', 769 PYTHONDEVMODE='') 770 self.assertEqual(stdout.rstrip(), b'False') 771 772 sts, stdout, stderr = assert_python_ok('-c', code, 773 PYTHONASYNCIODEBUG='1', 774 PYTHONDEVMODE='') 775 self.assertEqual(stdout.rstrip(), b'True') 776 777 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 778 PYTHONASYNCIODEBUG='1') 779 self.assertEqual(stdout.rstrip(), b'False') 780 781 # -X dev 782 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 783 '-c', code) 784 self.assertEqual(stdout.rstrip(), b'True') 785 786 def test_create_task(self): 787 class MyTask(asyncio.Task): 788 pass 789 790 async def test(): 791 pass 792 793 class EventLoop(base_events.BaseEventLoop): 794 def create_task(self, coro): 795 return MyTask(coro, loop=loop) 796 797 loop = EventLoop() 798 self.set_event_loop(loop) 799 800 coro = test() 801 task = asyncio.ensure_future(coro, loop=loop) 802 self.assertIsInstance(task, MyTask) 803 804 # make warnings quiet 805 task._log_destroy_pending = False 806 coro.close() 807 808 def test_create_named_task_with_default_factory(self): 809 async def test(): 810 pass 811 812 loop = asyncio.new_event_loop() 813 task = loop.create_task(test(), name='test_task') 814 try: 815 self.assertEqual(task.get_name(), 'test_task') 816 finally: 817 loop.run_until_complete(task) 818 loop.close() 819 820 def test_create_named_task_with_custom_factory(self): 821 def task_factory(loop, coro): 822 return asyncio.Task(coro, loop=loop) 823 824 async def test(): 825 pass 826 827 loop = asyncio.new_event_loop() 828 loop.set_task_factory(task_factory) 829 task = loop.create_task(test(), name='test_task') 830 try: 831 self.assertEqual(task.get_name(), 'test_task') 832 finally: 833 loop.run_until_complete(task) 834 loop.close() 835 836 def test_run_forever_keyboard_interrupt(self): 837 # Python issue #22601: ensure that the temporary task created by 838 # run_forever() consumes the KeyboardInterrupt and so don't log 839 # a warning 840 async def raise_keyboard_interrupt(): 841 raise KeyboardInterrupt 842 843 self.loop._process_events = mock.Mock() 844 self.loop.call_exception_handler = mock.Mock() 845 846 try: 847 self.loop.run_until_complete(raise_keyboard_interrupt()) 848 except KeyboardInterrupt: 849 pass 850 self.loop.close() 851 support.gc_collect() 852 853 self.assertFalse(self.loop.call_exception_handler.called) 854 855 def test_run_until_complete_baseexception(self): 856 # Python issue #22429: run_until_complete() must not schedule a pending 857 # call to stop() if the future raised a BaseException 858 async def raise_keyboard_interrupt(): 859 raise KeyboardInterrupt 860 861 self.loop._process_events = mock.Mock() 862 863 try: 864 self.loop.run_until_complete(raise_keyboard_interrupt()) 865 except KeyboardInterrupt: 866 pass 867 868 def func(): 869 self.loop.stop() 870 func.called = True 871 func.called = False 872 try: 873 self.loop.call_soon(func) 874 self.loop.run_forever() 875 except KeyboardInterrupt: 876 pass 877 self.assertTrue(func.called) 878 879 def test_single_selecter_event_callback_after_stopping(self): 880 # Python issue #25593: A stopped event loop may cause event callbacks 881 # to run more than once. 882 event_sentinel = object() 883 callcount = 0 884 doer = None 885 886 def proc_events(event_list): 887 nonlocal doer 888 if event_sentinel in event_list: 889 doer = self.loop.call_soon(do_event) 890 891 def do_event(): 892 nonlocal callcount 893 callcount += 1 894 self.loop.call_soon(clear_selector) 895 896 def clear_selector(): 897 doer.cancel() 898 self.loop._selector.select.return_value = () 899 900 self.loop._process_events = proc_events 901 self.loop._selector.select.return_value = (event_sentinel,) 902 903 for i in range(1, 3): 904 with self.subTest('Loop %d/2' % i): 905 self.loop.call_soon(self.loop.stop) 906 self.loop.run_forever() 907 self.assertEqual(callcount, 1) 908 909 def test_run_once(self): 910 # Simple test for test_utils.run_once(). It may seem strange 911 # to have a test for this (the function isn't even used!) but 912 # it's a de-factor standard API for library tests. This tests 913 # the idiom: loop.call_soon(loop.stop); loop.run_forever(). 914 count = 0 915 916 def callback(): 917 nonlocal count 918 count += 1 919 920 self.loop._process_events = mock.Mock() 921 self.loop.call_soon(callback) 922 test_utils.run_once(self.loop) 923 self.assertEqual(count, 1) 924 925 def test_run_forever_pre_stopped(self): 926 # Test that the old idiom for pre-stopping the loop works. 927 self.loop._process_events = mock.Mock() 928 self.loop.stop() 929 self.loop.run_forever() 930 self.loop._selector.select.assert_called_once_with(0) 931 932 async def leave_unfinalized_asyncgen(self): 933 # Create an async generator, iterate it partially, and leave it 934 # to be garbage collected. 935 # Used in async generator finalization tests. 936 # Depends on implementation details of garbage collector. Changes 937 # in gc may break this function. 938 status = {'started': False, 939 'stopped': False, 940 'finalized': False} 941 942 async def agen(): 943 status['started'] = True 944 try: 945 for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']: 946 yield item 947 finally: 948 status['finalized'] = True 949 950 ag = agen() 951 ai = ag.__aiter__() 952 953 async def iter_one(): 954 try: 955 item = await ai.__anext__() 956 except StopAsyncIteration: 957 return 958 if item == 'THREE': 959 status['stopped'] = True 960 return 961 asyncio.create_task(iter_one()) 962 963 asyncio.create_task(iter_one()) 964 return status 965 966 def test_asyncgen_finalization_by_gc(self): 967 # Async generators should be finalized when garbage collected. 968 self.loop._process_events = mock.Mock() 969 self.loop._write_to_self = mock.Mock() 970 with support.disable_gc(): 971 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 972 while not status['stopped']: 973 test_utils.run_briefly(self.loop) 974 self.assertTrue(status['started']) 975 self.assertTrue(status['stopped']) 976 self.assertFalse(status['finalized']) 977 support.gc_collect() 978 test_utils.run_briefly(self.loop) 979 self.assertTrue(status['finalized']) 980 981 def test_asyncgen_finalization_by_gc_in_other_thread(self): 982 # Python issue 34769: If garbage collector runs in another 983 # thread, async generators will not finalize in debug 984 # mode. 985 self.loop._process_events = mock.Mock() 986 self.loop._write_to_self = mock.Mock() 987 self.loop.set_debug(True) 988 with support.disable_gc(): 989 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 990 while not status['stopped']: 991 test_utils.run_briefly(self.loop) 992 self.assertTrue(status['started']) 993 self.assertTrue(status['stopped']) 994 self.assertFalse(status['finalized']) 995 self.loop.run_until_complete( 996 self.loop.run_in_executor(None, support.gc_collect)) 997 test_utils.run_briefly(self.loop) 998 self.assertTrue(status['finalized']) 999 1000 1001class MyProto(asyncio.Protocol): 1002 done = None 1003 1004 def __init__(self, create_future=False): 1005 self.state = 'INITIAL' 1006 self.nbytes = 0 1007 if create_future: 1008 self.done = asyncio.get_running_loop().create_future() 1009 1010 def connection_made(self, transport): 1011 self.transport = transport 1012 assert self.state == 'INITIAL', self.state 1013 self.state = 'CONNECTED' 1014 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 1015 1016 def data_received(self, data): 1017 assert self.state == 'CONNECTED', self.state 1018 self.nbytes += len(data) 1019 1020 def eof_received(self): 1021 assert self.state == 'CONNECTED', self.state 1022 self.state = 'EOF' 1023 1024 def connection_lost(self, exc): 1025 assert self.state in ('CONNECTED', 'EOF'), self.state 1026 self.state = 'CLOSED' 1027 if self.done: 1028 self.done.set_result(None) 1029 1030 1031class MyDatagramProto(asyncio.DatagramProtocol): 1032 done = None 1033 1034 def __init__(self, create_future=False, loop=None): 1035 self.state = 'INITIAL' 1036 self.nbytes = 0 1037 if create_future: 1038 self.done = loop.create_future() 1039 1040 def connection_made(self, transport): 1041 self.transport = transport 1042 assert self.state == 'INITIAL', self.state 1043 self.state = 'INITIALIZED' 1044 1045 def datagram_received(self, data, addr): 1046 assert self.state == 'INITIALIZED', self.state 1047 self.nbytes += len(data) 1048 1049 def error_received(self, exc): 1050 assert self.state == 'INITIALIZED', self.state 1051 1052 def connection_lost(self, exc): 1053 assert self.state == 'INITIALIZED', self.state 1054 self.state = 'CLOSED' 1055 if self.done: 1056 self.done.set_result(None) 1057 1058 1059class BaseEventLoopWithSelectorTests(test_utils.TestCase): 1060 1061 def setUp(self): 1062 super().setUp() 1063 self.loop = asyncio.SelectorEventLoop() 1064 self.set_event_loop(self.loop) 1065 1066 @mock.patch('socket.getnameinfo') 1067 def test_getnameinfo(self, m_gai): 1068 m_gai.side_effect = lambda *args: 42 1069 r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123))) 1070 self.assertEqual(r, 42) 1071 1072 @patch_socket 1073 def test_create_connection_multiple_errors(self, m_socket): 1074 1075 class MyProto(asyncio.Protocol): 1076 pass 1077 1078 async def getaddrinfo(*args, **kw): 1079 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1080 (2, 1, 6, '', ('107.6.106.82', 80))] 1081 1082 def getaddrinfo_task(*args, **kwds): 1083 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1084 1085 idx = -1 1086 errors = ['err1', 'err2'] 1087 1088 def _socket(*args, **kw): 1089 nonlocal idx, errors 1090 idx += 1 1091 raise OSError(errors[idx]) 1092 1093 m_socket.socket = _socket 1094 1095 self.loop.getaddrinfo = getaddrinfo_task 1096 1097 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1098 with self.assertRaises(OSError) as cm: 1099 self.loop.run_until_complete(coro) 1100 1101 self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') 1102 1103 @patch_socket 1104 def test_create_connection_timeout(self, m_socket): 1105 # Ensure that the socket is closed on timeout 1106 sock = mock.Mock() 1107 m_socket.socket.return_value = sock 1108 1109 def getaddrinfo(*args, **kw): 1110 fut = self.loop.create_future() 1111 addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '', 1112 ('127.0.0.1', 80)) 1113 fut.set_result([addr]) 1114 return fut 1115 self.loop.getaddrinfo = getaddrinfo 1116 1117 with mock.patch.object(self.loop, 'sock_connect', 1118 side_effect=asyncio.TimeoutError): 1119 coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) 1120 with self.assertRaises(asyncio.TimeoutError): 1121 self.loop.run_until_complete(coro) 1122 self.assertTrue(sock.close.called) 1123 1124 def test_create_connection_host_port_sock(self): 1125 coro = self.loop.create_connection( 1126 MyProto, 'example.com', 80, sock=object()) 1127 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1128 1129 def test_create_connection_wrong_sock(self): 1130 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1131 with sock: 1132 coro = self.loop.create_connection(MyProto, sock=sock) 1133 with self.assertRaisesRegex(ValueError, 1134 'A Stream Socket was expected'): 1135 self.loop.run_until_complete(coro) 1136 1137 def test_create_server_wrong_sock(self): 1138 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1139 with sock: 1140 coro = self.loop.create_server(MyProto, sock=sock) 1141 with self.assertRaisesRegex(ValueError, 1142 'A Stream Socket was expected'): 1143 self.loop.run_until_complete(coro) 1144 1145 def test_create_server_ssl_timeout_for_plain_socket(self): 1146 coro = self.loop.create_server( 1147 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1148 with self.assertRaisesRegex( 1149 ValueError, 1150 'ssl_handshake_timeout is only meaningful with ssl'): 1151 self.loop.run_until_complete(coro) 1152 1153 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 1154 'no socket.SOCK_NONBLOCK (linux only)') 1155 def test_create_server_stream_bittype(self): 1156 sock = socket.socket( 1157 socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 1158 with sock: 1159 coro = self.loop.create_server(lambda: None, sock=sock) 1160 srv = self.loop.run_until_complete(coro) 1161 srv.close() 1162 self.loop.run_until_complete(srv.wait_closed()) 1163 1164 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 1165 def test_create_server_ipv6(self): 1166 async def main(): 1167 srv = await asyncio.start_server(lambda: None, '::1', 0) 1168 try: 1169 self.assertGreater(len(srv.sockets), 0) 1170 finally: 1171 srv.close() 1172 await srv.wait_closed() 1173 1174 try: 1175 self.loop.run_until_complete(main()) 1176 except OSError as ex: 1177 if (hasattr(errno, 'EADDRNOTAVAIL') and 1178 ex.errno == errno.EADDRNOTAVAIL): 1179 self.skipTest('failed to bind to ::1') 1180 else: 1181 raise 1182 1183 def test_create_datagram_endpoint_wrong_sock(self): 1184 sock = socket.socket(socket.AF_INET) 1185 with sock: 1186 coro = self.loop.create_datagram_endpoint(MyProto, sock=sock) 1187 with self.assertRaisesRegex(ValueError, 1188 'A UDP Socket was expected'): 1189 self.loop.run_until_complete(coro) 1190 1191 def test_create_connection_no_host_port_sock(self): 1192 coro = self.loop.create_connection(MyProto) 1193 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1194 1195 def test_create_connection_no_getaddrinfo(self): 1196 async def getaddrinfo(*args, **kw): 1197 return [] 1198 1199 def getaddrinfo_task(*args, **kwds): 1200 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1201 1202 self.loop.getaddrinfo = getaddrinfo_task 1203 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1204 self.assertRaises( 1205 OSError, self.loop.run_until_complete, coro) 1206 1207 def test_create_connection_connect_err(self): 1208 async def getaddrinfo(*args, **kw): 1209 return [(2, 1, 6, '', ('107.6.106.82', 80))] 1210 1211 def getaddrinfo_task(*args, **kwds): 1212 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1213 1214 self.loop.getaddrinfo = getaddrinfo_task 1215 self.loop.sock_connect = mock.Mock() 1216 self.loop.sock_connect.side_effect = OSError 1217 1218 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1219 self.assertRaises( 1220 OSError, self.loop.run_until_complete, coro) 1221 1222 def test_create_connection_multiple(self): 1223 async def getaddrinfo(*args, **kw): 1224 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1225 (2, 1, 6, '', ('0.0.0.2', 80))] 1226 1227 def getaddrinfo_task(*args, **kwds): 1228 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1229 1230 self.loop.getaddrinfo = getaddrinfo_task 1231 self.loop.sock_connect = mock.Mock() 1232 self.loop.sock_connect.side_effect = OSError 1233 1234 coro = self.loop.create_connection( 1235 MyProto, 'example.com', 80, family=socket.AF_INET) 1236 with self.assertRaises(OSError): 1237 self.loop.run_until_complete(coro) 1238 1239 @patch_socket 1240 def test_create_connection_multiple_errors_local_addr(self, m_socket): 1241 1242 def bind(addr): 1243 if addr[0] == '0.0.0.1': 1244 err = OSError('Err') 1245 err.strerror = 'Err' 1246 raise err 1247 1248 m_socket.socket.return_value.bind = bind 1249 1250 async def getaddrinfo(*args, **kw): 1251 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1252 (2, 1, 6, '', ('0.0.0.2', 80))] 1253 1254 def getaddrinfo_task(*args, **kwds): 1255 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1256 1257 self.loop.getaddrinfo = getaddrinfo_task 1258 self.loop.sock_connect = mock.Mock() 1259 self.loop.sock_connect.side_effect = OSError('Err2') 1260 1261 coro = self.loop.create_connection( 1262 MyProto, 'example.com', 80, family=socket.AF_INET, 1263 local_addr=(None, 8080)) 1264 with self.assertRaises(OSError) as cm: 1265 self.loop.run_until_complete(coro) 1266 1267 self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) 1268 self.assertTrue(m_socket.socket.return_value.close.called) 1269 1270 def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): 1271 # Test the fallback code, even if this system has inet_pton. 1272 if not allow_inet_pton: 1273 del m_socket.inet_pton 1274 1275 m_socket.getaddrinfo = socket.getaddrinfo 1276 sock = m_socket.socket.return_value 1277 1278 self.loop._add_reader = mock.Mock() 1279 self.loop._add_reader._is_coroutine = False 1280 self.loop._add_writer = mock.Mock() 1281 self.loop._add_writer._is_coroutine = False 1282 1283 coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) 1284 t, p = self.loop.run_until_complete(coro) 1285 try: 1286 sock.connect.assert_called_with(('1.2.3.4', 80)) 1287 _, kwargs = m_socket.socket.call_args 1288 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1289 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1290 finally: 1291 t.close() 1292 test_utils.run_briefly(self.loop) # allow transport to close 1293 1294 if socket_helper.IPV6_ENABLED: 1295 sock.family = socket.AF_INET6 1296 coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) 1297 t, p = self.loop.run_until_complete(coro) 1298 try: 1299 # Without inet_pton we use getaddrinfo, which transforms 1300 # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info, 1301 # scope id. 1302 [address] = sock.connect.call_args[0] 1303 host, port = address[:2] 1304 self.assertRegex(host, r'::(0\.)*1') 1305 self.assertEqual(port, 80) 1306 _, kwargs = m_socket.socket.call_args 1307 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1308 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1309 finally: 1310 t.close() 1311 test_utils.run_briefly(self.loop) # allow transport to close 1312 1313 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 1314 @unittest.skipIf(sys.platform.startswith('aix'), 1315 "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX") 1316 @patch_socket 1317 def test_create_connection_ipv6_scope(self, m_socket): 1318 m_socket.getaddrinfo = socket.getaddrinfo 1319 sock = m_socket.socket.return_value 1320 sock.family = socket.AF_INET6 1321 1322 self.loop._add_reader = mock.Mock() 1323 self.loop._add_reader._is_coroutine = False 1324 self.loop._add_writer = mock.Mock() 1325 self.loop._add_writer._is_coroutine = False 1326 1327 coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80) 1328 t, p = self.loop.run_until_complete(coro) 1329 try: 1330 sock.connect.assert_called_with(('fe80::1', 80, 0, 1)) 1331 _, kwargs = m_socket.socket.call_args 1332 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1333 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1334 finally: 1335 t.close() 1336 test_utils.run_briefly(self.loop) # allow transport to close 1337 1338 @patch_socket 1339 def test_create_connection_ip_addr(self, m_socket): 1340 self._test_create_connection_ip_addr(m_socket, True) 1341 1342 @patch_socket 1343 def test_create_connection_no_inet_pton(self, m_socket): 1344 self._test_create_connection_ip_addr(m_socket, False) 1345 1346 @patch_socket 1347 def test_create_connection_service_name(self, m_socket): 1348 m_socket.getaddrinfo = socket.getaddrinfo 1349 sock = m_socket.socket.return_value 1350 1351 self.loop._add_reader = mock.Mock() 1352 self.loop._add_reader._is_coroutine = False 1353 self.loop._add_writer = mock.Mock() 1354 self.loop._add_writer._is_coroutine = False 1355 1356 for service, port in ('http', 80), (b'http', 80): 1357 coro = self.loop.create_connection(asyncio.Protocol, 1358 '127.0.0.1', service) 1359 1360 t, p = self.loop.run_until_complete(coro) 1361 try: 1362 sock.connect.assert_called_with(('127.0.0.1', port)) 1363 _, kwargs = m_socket.socket.call_args 1364 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1365 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1366 finally: 1367 t.close() 1368 test_utils.run_briefly(self.loop) # allow transport to close 1369 1370 for service in 'nonsense', b'nonsense': 1371 coro = self.loop.create_connection(asyncio.Protocol, 1372 '127.0.0.1', service) 1373 1374 with self.assertRaises(OSError): 1375 self.loop.run_until_complete(coro) 1376 1377 def test_create_connection_no_local_addr(self): 1378 async def getaddrinfo(host, *args, **kw): 1379 if host == 'example.com': 1380 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1381 (2, 1, 6, '', ('107.6.106.82', 80))] 1382 else: 1383 return [] 1384 1385 def getaddrinfo_task(*args, **kwds): 1386 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1387 self.loop.getaddrinfo = getaddrinfo_task 1388 1389 coro = self.loop.create_connection( 1390 MyProto, 'example.com', 80, family=socket.AF_INET, 1391 local_addr=(None, 8080)) 1392 self.assertRaises( 1393 OSError, self.loop.run_until_complete, coro) 1394 1395 @patch_socket 1396 def test_create_connection_bluetooth(self, m_socket): 1397 # See http://bugs.python.org/issue27136, fallback to getaddrinfo when 1398 # we can't recognize an address is resolved, e.g. a Bluetooth address. 1399 addr = ('00:01:02:03:04:05', 1) 1400 1401 def getaddrinfo(host, port, *args, **kw): 1402 assert (host, port) == addr 1403 return [(999, 1, 999, '', (addr, 1))] 1404 1405 m_socket.getaddrinfo = getaddrinfo 1406 sock = m_socket.socket() 1407 coro = self.loop.sock_connect(sock, addr) 1408 self.loop.run_until_complete(coro) 1409 1410 def test_create_connection_ssl_server_hostname_default(self): 1411 self.loop.getaddrinfo = mock.Mock() 1412 1413 def mock_getaddrinfo(*args, **kwds): 1414 f = self.loop.create_future() 1415 f.set_result([(socket.AF_INET, socket.SOCK_STREAM, 1416 socket.SOL_TCP, '', ('1.2.3.4', 80))]) 1417 return f 1418 1419 self.loop.getaddrinfo.side_effect = mock_getaddrinfo 1420 self.loop.sock_connect = mock.Mock() 1421 self.loop.sock_connect.return_value = self.loop.create_future() 1422 self.loop.sock_connect.return_value.set_result(None) 1423 self.loop._make_ssl_transport = mock.Mock() 1424 1425 class _SelectorTransportMock: 1426 _sock = None 1427 1428 def get_extra_info(self, key): 1429 return mock.Mock() 1430 1431 def close(self): 1432 self._sock.close() 1433 1434 def mock_make_ssl_transport(sock, protocol, sslcontext, waiter, 1435 **kwds): 1436 waiter.set_result(None) 1437 transport = _SelectorTransportMock() 1438 transport._sock = sock 1439 return transport 1440 1441 self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport 1442 ANY = mock.ANY 1443 handshake_timeout = object() 1444 # First try the default server_hostname. 1445 self.loop._make_ssl_transport.reset_mock() 1446 coro = self.loop.create_connection( 1447 MyProto, 'python.org', 80, ssl=True, 1448 ssl_handshake_timeout=handshake_timeout) 1449 transport, _ = self.loop.run_until_complete(coro) 1450 transport.close() 1451 self.loop._make_ssl_transport.assert_called_with( 1452 ANY, ANY, ANY, ANY, 1453 server_side=False, 1454 server_hostname='python.org', 1455 ssl_handshake_timeout=handshake_timeout) 1456 # Next try an explicit server_hostname. 1457 self.loop._make_ssl_transport.reset_mock() 1458 coro = self.loop.create_connection( 1459 MyProto, 'python.org', 80, ssl=True, 1460 server_hostname='perl.com', 1461 ssl_handshake_timeout=handshake_timeout) 1462 transport, _ = self.loop.run_until_complete(coro) 1463 transport.close() 1464 self.loop._make_ssl_transport.assert_called_with( 1465 ANY, ANY, ANY, ANY, 1466 server_side=False, 1467 server_hostname='perl.com', 1468 ssl_handshake_timeout=handshake_timeout) 1469 # Finally try an explicit empty server_hostname. 1470 self.loop._make_ssl_transport.reset_mock() 1471 coro = self.loop.create_connection( 1472 MyProto, 'python.org', 80, ssl=True, 1473 server_hostname='', 1474 ssl_handshake_timeout=handshake_timeout) 1475 transport, _ = self.loop.run_until_complete(coro) 1476 transport.close() 1477 self.loop._make_ssl_transport.assert_called_with( 1478 ANY, ANY, ANY, ANY, 1479 server_side=False, 1480 server_hostname='', 1481 ssl_handshake_timeout=handshake_timeout) 1482 1483 def test_create_connection_no_ssl_server_hostname_errors(self): 1484 # When not using ssl, server_hostname must be None. 1485 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1486 server_hostname='') 1487 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1488 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1489 server_hostname='python.org') 1490 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1491 1492 def test_create_connection_ssl_server_hostname_errors(self): 1493 # When using ssl, server_hostname may be None if host is non-empty. 1494 coro = self.loop.create_connection(MyProto, '', 80, ssl=True) 1495 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1496 coro = self.loop.create_connection(MyProto, None, 80, ssl=True) 1497 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1498 sock = socket.socket() 1499 coro = self.loop.create_connection(MyProto, None, None, 1500 ssl=True, sock=sock) 1501 self.addCleanup(sock.close) 1502 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1503 1504 def test_create_connection_ssl_timeout_for_plain_socket(self): 1505 coro = self.loop.create_connection( 1506 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1507 with self.assertRaisesRegex( 1508 ValueError, 1509 'ssl_handshake_timeout is only meaningful with ssl'): 1510 self.loop.run_until_complete(coro) 1511 1512 def test_create_server_empty_host(self): 1513 # if host is empty string use None instead 1514 host = object() 1515 1516 async def getaddrinfo(*args, **kw): 1517 nonlocal host 1518 host = args[0] 1519 return [] 1520 1521 def getaddrinfo_task(*args, **kwds): 1522 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1523 1524 self.loop.getaddrinfo = getaddrinfo_task 1525 fut = self.loop.create_server(MyProto, '', 0) 1526 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1527 self.assertIsNone(host) 1528 1529 def test_create_server_host_port_sock(self): 1530 fut = self.loop.create_server( 1531 MyProto, '0.0.0.0', 0, sock=object()) 1532 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1533 1534 def test_create_server_no_host_port_sock(self): 1535 fut = self.loop.create_server(MyProto) 1536 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1537 1538 def test_create_server_no_getaddrinfo(self): 1539 getaddrinfo = self.loop.getaddrinfo = mock.Mock() 1540 getaddrinfo.return_value = self.loop.create_future() 1541 getaddrinfo.return_value.set_result(None) 1542 1543 f = self.loop.create_server(MyProto, 'python.org', 0) 1544 self.assertRaises(OSError, self.loop.run_until_complete, f) 1545 1546 @patch_socket 1547 def test_create_server_nosoreuseport(self, m_socket): 1548 m_socket.getaddrinfo = socket.getaddrinfo 1549 del m_socket.SO_REUSEPORT 1550 m_socket.socket.return_value = mock.Mock() 1551 1552 f = self.loop.create_server( 1553 MyProto, '0.0.0.0', 0, reuse_port=True) 1554 1555 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1556 1557 @patch_socket 1558 def test_create_server_soreuseport_only_defined(self, m_socket): 1559 m_socket.getaddrinfo = socket.getaddrinfo 1560 m_socket.socket.return_value = mock.Mock() 1561 m_socket.SO_REUSEPORT = -1 1562 1563 f = self.loop.create_server( 1564 MyProto, '0.0.0.0', 0, reuse_port=True) 1565 1566 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1567 1568 @patch_socket 1569 def test_create_server_cant_bind(self, m_socket): 1570 1571 class Err(OSError): 1572 strerror = 'error' 1573 1574 m_socket.getaddrinfo.return_value = [ 1575 (2, 1, 6, '', ('127.0.0.1', 10100))] 1576 m_socket.getaddrinfo._is_coroutine = False 1577 m_sock = m_socket.socket.return_value = mock.Mock() 1578 m_sock.bind.side_effect = Err 1579 1580 fut = self.loop.create_server(MyProto, '0.0.0.0', 0) 1581 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1582 self.assertTrue(m_sock.close.called) 1583 1584 @patch_socket 1585 def test_create_datagram_endpoint_no_addrinfo(self, m_socket): 1586 m_socket.getaddrinfo.return_value = [] 1587 m_socket.getaddrinfo._is_coroutine = False 1588 1589 coro = self.loop.create_datagram_endpoint( 1590 MyDatagramProto, local_addr=('localhost', 0)) 1591 self.assertRaises( 1592 OSError, self.loop.run_until_complete, coro) 1593 1594 def test_create_datagram_endpoint_addr_error(self): 1595 coro = self.loop.create_datagram_endpoint( 1596 MyDatagramProto, local_addr='localhost') 1597 self.assertRaises( 1598 AssertionError, self.loop.run_until_complete, coro) 1599 coro = self.loop.create_datagram_endpoint( 1600 MyDatagramProto, local_addr=('localhost', 1, 2, 3)) 1601 self.assertRaises( 1602 AssertionError, self.loop.run_until_complete, coro) 1603 1604 def test_create_datagram_endpoint_connect_err(self): 1605 self.loop.sock_connect = mock.Mock() 1606 self.loop.sock_connect.side_effect = OSError 1607 1608 coro = self.loop.create_datagram_endpoint( 1609 asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) 1610 self.assertRaises( 1611 OSError, self.loop.run_until_complete, coro) 1612 1613 def test_create_datagram_endpoint_allow_broadcast(self): 1614 protocol = MyDatagramProto(create_future=True, loop=self.loop) 1615 self.loop.sock_connect = sock_connect = mock.Mock() 1616 sock_connect.return_value = [] 1617 1618 coro = self.loop.create_datagram_endpoint( 1619 lambda: protocol, 1620 remote_addr=('127.0.0.1', 0), 1621 allow_broadcast=True) 1622 1623 transport, _ = self.loop.run_until_complete(coro) 1624 self.assertFalse(sock_connect.called) 1625 1626 transport.close() 1627 self.loop.run_until_complete(protocol.done) 1628 self.assertEqual('CLOSED', protocol.state) 1629 1630 @patch_socket 1631 def test_create_datagram_endpoint_socket_err(self, m_socket): 1632 m_socket.getaddrinfo = socket.getaddrinfo 1633 m_socket.socket.side_effect = OSError 1634 1635 coro = self.loop.create_datagram_endpoint( 1636 asyncio.DatagramProtocol, family=socket.AF_INET) 1637 self.assertRaises( 1638 OSError, self.loop.run_until_complete, coro) 1639 1640 coro = self.loop.create_datagram_endpoint( 1641 asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0)) 1642 self.assertRaises( 1643 OSError, self.loop.run_until_complete, coro) 1644 1645 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 1646 def test_create_datagram_endpoint_no_matching_family(self): 1647 coro = self.loop.create_datagram_endpoint( 1648 asyncio.DatagramProtocol, 1649 remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) 1650 self.assertRaises( 1651 ValueError, self.loop.run_until_complete, coro) 1652 1653 @patch_socket 1654 def test_create_datagram_endpoint_setblk_err(self, m_socket): 1655 m_socket.socket.return_value.setblocking.side_effect = OSError 1656 1657 coro = self.loop.create_datagram_endpoint( 1658 asyncio.DatagramProtocol, family=socket.AF_INET) 1659 self.assertRaises( 1660 OSError, self.loop.run_until_complete, coro) 1661 self.assertTrue( 1662 m_socket.socket.return_value.close.called) 1663 1664 def test_create_datagram_endpoint_noaddr_nofamily(self): 1665 coro = self.loop.create_datagram_endpoint( 1666 asyncio.DatagramProtocol) 1667 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1668 1669 @patch_socket 1670 def test_create_datagram_endpoint_cant_bind(self, m_socket): 1671 class Err(OSError): 1672 pass 1673 1674 m_socket.getaddrinfo = socket.getaddrinfo 1675 m_sock = m_socket.socket.return_value = mock.Mock() 1676 m_sock.bind.side_effect = Err 1677 1678 fut = self.loop.create_datagram_endpoint( 1679 MyDatagramProto, 1680 local_addr=('127.0.0.1', 0), family=socket.AF_INET) 1681 self.assertRaises(Err, self.loop.run_until_complete, fut) 1682 self.assertTrue(m_sock.close.called) 1683 1684 def test_create_datagram_endpoint_sock(self): 1685 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1686 sock.bind(('127.0.0.1', 0)) 1687 fut = self.loop.create_datagram_endpoint( 1688 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1689 sock=sock) 1690 transport, protocol = self.loop.run_until_complete(fut) 1691 transport.close() 1692 self.loop.run_until_complete(protocol.done) 1693 self.assertEqual('CLOSED', protocol.state) 1694 1695 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1696 def test_create_datagram_endpoint_sock_unix(self): 1697 fut = self.loop.create_datagram_endpoint( 1698 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1699 family=socket.AF_UNIX) 1700 transport, protocol = self.loop.run_until_complete(fut) 1701 assert transport._sock.family == socket.AF_UNIX 1702 transport.close() 1703 self.loop.run_until_complete(protocol.done) 1704 self.assertEqual('CLOSED', protocol.state) 1705 1706 @socket_helper.skip_unless_bind_unix_socket 1707 def test_create_datagram_endpoint_existing_sock_unix(self): 1708 with test_utils.unix_socket_path() as path: 1709 sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) 1710 sock.bind(path) 1711 sock.close() 1712 1713 coro = self.loop.create_datagram_endpoint( 1714 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1715 path, family=socket.AF_UNIX) 1716 transport, protocol = self.loop.run_until_complete(coro) 1717 transport.close() 1718 self.loop.run_until_complete(protocol.done) 1719 1720 def test_create_datagram_endpoint_sock_sockopts(self): 1721 class FakeSock: 1722 type = socket.SOCK_DGRAM 1723 1724 fut = self.loop.create_datagram_endpoint( 1725 MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) 1726 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1727 1728 fut = self.loop.create_datagram_endpoint( 1729 MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) 1730 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1731 1732 fut = self.loop.create_datagram_endpoint( 1733 MyDatagramProto, family=1, sock=FakeSock()) 1734 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1735 1736 fut = self.loop.create_datagram_endpoint( 1737 MyDatagramProto, proto=1, sock=FakeSock()) 1738 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1739 1740 fut = self.loop.create_datagram_endpoint( 1741 MyDatagramProto, flags=1, sock=FakeSock()) 1742 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1743 1744 fut = self.loop.create_datagram_endpoint( 1745 MyDatagramProto, reuse_port=True, sock=FakeSock()) 1746 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1747 1748 fut = self.loop.create_datagram_endpoint( 1749 MyDatagramProto, allow_broadcast=True, sock=FakeSock()) 1750 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1751 1752 @unittest.skipIf(sys.platform == 'vxworks', 1753 "SO_BROADCAST is enabled by default on VxWorks") 1754 def test_create_datagram_endpoint_sockopts(self): 1755 # Socket options should not be applied unless asked for. 1756 # SO_REUSEPORT is not available on all platforms. 1757 1758 coro = self.loop.create_datagram_endpoint( 1759 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1760 local_addr=('127.0.0.1', 0)) 1761 transport, protocol = self.loop.run_until_complete(coro) 1762 sock = transport.get_extra_info('socket') 1763 1764 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1765 1766 if reuseport_supported: 1767 self.assertFalse( 1768 sock.getsockopt( 1769 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1770 self.assertFalse( 1771 sock.getsockopt( 1772 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1773 1774 transport.close() 1775 self.loop.run_until_complete(protocol.done) 1776 self.assertEqual('CLOSED', protocol.state) 1777 1778 coro = self.loop.create_datagram_endpoint( 1779 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1780 local_addr=('127.0.0.1', 0), 1781 reuse_port=reuseport_supported, 1782 allow_broadcast=True) 1783 transport, protocol = self.loop.run_until_complete(coro) 1784 sock = transport.get_extra_info('socket') 1785 1786 self.assertFalse( 1787 sock.getsockopt( 1788 socket.SOL_SOCKET, socket.SO_REUSEADDR)) 1789 if reuseport_supported: 1790 self.assertTrue( 1791 sock.getsockopt( 1792 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1793 self.assertTrue( 1794 sock.getsockopt( 1795 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1796 1797 transport.close() 1798 self.loop.run_until_complete(protocol.done) 1799 self.assertEqual('CLOSED', protocol.state) 1800 1801 @patch_socket 1802 def test_create_datagram_endpoint_nosoreuseport(self, m_socket): 1803 del m_socket.SO_REUSEPORT 1804 m_socket.socket.return_value = mock.Mock() 1805 1806 coro = self.loop.create_datagram_endpoint( 1807 lambda: MyDatagramProto(loop=self.loop), 1808 local_addr=('127.0.0.1', 0), 1809 reuse_port=True) 1810 1811 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1812 1813 @patch_socket 1814 def test_create_datagram_endpoint_ip_addr(self, m_socket): 1815 def getaddrinfo(*args, **kw): 1816 self.fail('should not have called getaddrinfo') 1817 1818 m_socket.getaddrinfo = getaddrinfo 1819 m_socket.socket.return_value.bind = bind = mock.Mock() 1820 self.loop._add_reader = mock.Mock() 1821 self.loop._add_reader._is_coroutine = False 1822 1823 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1824 coro = self.loop.create_datagram_endpoint( 1825 lambda: MyDatagramProto(loop=self.loop), 1826 local_addr=('1.2.3.4', 0), 1827 reuse_port=reuseport_supported) 1828 1829 t, p = self.loop.run_until_complete(coro) 1830 try: 1831 bind.assert_called_with(('1.2.3.4', 0)) 1832 m_socket.socket.assert_called_with(family=m_socket.AF_INET, 1833 proto=m_socket.IPPROTO_UDP, 1834 type=m_socket.SOCK_DGRAM) 1835 finally: 1836 t.close() 1837 test_utils.run_briefly(self.loop) # allow transport to close 1838 1839 def test_accept_connection_retry(self): 1840 sock = mock.Mock() 1841 sock.accept.side_effect = BlockingIOError() 1842 1843 self.loop._accept_connection(MyProto, sock) 1844 self.assertFalse(sock.close.called) 1845 1846 @mock.patch('asyncio.base_events.logger') 1847 def test_accept_connection_exception(self, m_log): 1848 sock = mock.Mock() 1849 sock.fileno.return_value = 10 1850 sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files') 1851 self.loop._remove_reader = mock.Mock() 1852 self.loop.call_later = mock.Mock() 1853 1854 self.loop._accept_connection(MyProto, sock) 1855 self.assertTrue(m_log.error.called) 1856 self.assertFalse(sock.close.called) 1857 self.loop._remove_reader.assert_called_with(10) 1858 self.loop.call_later.assert_called_with( 1859 constants.ACCEPT_RETRY_DELAY, 1860 # self.loop._start_serving 1861 mock.ANY, 1862 MyProto, sock, None, None, mock.ANY, mock.ANY) 1863 1864 def test_call_coroutine(self): 1865 async def simple_coroutine(): 1866 pass 1867 1868 self.loop.set_debug(True) 1869 coro_func = simple_coroutine 1870 coro_obj = coro_func() 1871 self.addCleanup(coro_obj.close) 1872 for func in (coro_func, coro_obj): 1873 with self.assertRaises(TypeError): 1874 self.loop.call_soon(func) 1875 with self.assertRaises(TypeError): 1876 self.loop.call_soon_threadsafe(func) 1877 with self.assertRaises(TypeError): 1878 self.loop.call_later(60, func) 1879 with self.assertRaises(TypeError): 1880 self.loop.call_at(self.loop.time() + 60, func) 1881 with self.assertRaises(TypeError): 1882 self.loop.run_until_complete( 1883 self.loop.run_in_executor(None, func)) 1884 1885 @mock.patch('asyncio.base_events.logger') 1886 def test_log_slow_callbacks(self, m_logger): 1887 def stop_loop_cb(loop): 1888 loop.stop() 1889 1890 async def stop_loop_coro(loop): 1891 loop.stop() 1892 1893 asyncio.set_event_loop(self.loop) 1894 self.loop.set_debug(True) 1895 self.loop.slow_callback_duration = 0.0 1896 1897 # slow callback 1898 self.loop.call_soon(stop_loop_cb, self.loop) 1899 self.loop.run_forever() 1900 fmt, *args = m_logger.warning.call_args[0] 1901 self.assertRegex(fmt % tuple(args), 1902 "^Executing <Handle.*stop_loop_cb.*> " 1903 "took .* seconds$") 1904 1905 # slow task 1906 asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop) 1907 self.loop.run_forever() 1908 fmt, *args = m_logger.warning.call_args[0] 1909 self.assertRegex(fmt % tuple(args), 1910 "^Executing <Task.*stop_loop_coro.*> " 1911 "took .* seconds$") 1912 1913 1914class RunningLoopTests(unittest.TestCase): 1915 1916 def test_running_loop_within_a_loop(self): 1917 async def runner(loop): 1918 loop.run_forever() 1919 1920 loop = asyncio.new_event_loop() 1921 outer_loop = asyncio.new_event_loop() 1922 try: 1923 with self.assertRaisesRegex(RuntimeError, 1924 'while another loop is running'): 1925 outer_loop.run_until_complete(runner(loop)) 1926 finally: 1927 loop.close() 1928 outer_loop.close() 1929 1930 1931class BaseLoopSockSendfileTests(test_utils.TestCase): 1932 1933 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 1934 1935 class MyProto(asyncio.Protocol): 1936 1937 def __init__(self, loop): 1938 self.started = False 1939 self.closed = False 1940 self.data = bytearray() 1941 self.fut = loop.create_future() 1942 self.transport = None 1943 1944 def connection_made(self, transport): 1945 self.started = True 1946 self.transport = transport 1947 1948 def data_received(self, data): 1949 self.data.extend(data) 1950 1951 def connection_lost(self, exc): 1952 self.closed = True 1953 self.fut.set_result(None) 1954 self.transport = None 1955 1956 async def wait_closed(self): 1957 await self.fut 1958 1959 @classmethod 1960 def setUpClass(cls): 1961 cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE 1962 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 1963 with open(os_helper.TESTFN, 'wb') as fp: 1964 fp.write(cls.DATA) 1965 super().setUpClass() 1966 1967 @classmethod 1968 def tearDownClass(cls): 1969 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize 1970 os_helper.unlink(os_helper.TESTFN) 1971 super().tearDownClass() 1972 1973 def setUp(self): 1974 from asyncio.selector_events import BaseSelectorEventLoop 1975 # BaseSelectorEventLoop() has no native implementation 1976 self.loop = BaseSelectorEventLoop() 1977 self.set_event_loop(self.loop) 1978 self.file = open(os_helper.TESTFN, 'rb') 1979 self.addCleanup(self.file.close) 1980 super().setUp() 1981 1982 def make_socket(self, blocking=False): 1983 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1984 sock.setblocking(blocking) 1985 self.addCleanup(sock.close) 1986 return sock 1987 1988 def run_loop(self, coro): 1989 return self.loop.run_until_complete(coro) 1990 1991 def prepare(self): 1992 sock = self.make_socket() 1993 proto = self.MyProto(self.loop) 1994 server = self.run_loop(self.loop.create_server( 1995 lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET)) 1996 addr = server.sockets[0].getsockname() 1997 1998 for _ in range(10): 1999 try: 2000 self.run_loop(self.loop.sock_connect(sock, addr)) 2001 except OSError: 2002 self.run_loop(asyncio.sleep(0.5)) 2003 continue 2004 else: 2005 break 2006 else: 2007 # One last try, so we get the exception 2008 self.run_loop(self.loop.sock_connect(sock, addr)) 2009 2010 def cleanup(): 2011 server.close() 2012 self.run_loop(server.wait_closed()) 2013 sock.close() 2014 if proto.transport is not None: 2015 proto.transport.close() 2016 self.run_loop(proto.wait_closed()) 2017 2018 self.addCleanup(cleanup) 2019 2020 return sock, proto 2021 2022 def test__sock_sendfile_native_failure(self): 2023 sock, proto = self.prepare() 2024 2025 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2026 "sendfile is not available"): 2027 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 2028 0, None)) 2029 2030 self.assertEqual(proto.data, b'') 2031 self.assertEqual(self.file.tell(), 0) 2032 2033 def test_sock_sendfile_no_fallback(self): 2034 sock, proto = self.prepare() 2035 2036 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2037 "sendfile is not available"): 2038 self.run_loop(self.loop.sock_sendfile(sock, self.file, 2039 fallback=False)) 2040 2041 self.assertEqual(self.file.tell(), 0) 2042 self.assertEqual(proto.data, b'') 2043 2044 def test_sock_sendfile_fallback(self): 2045 sock, proto = self.prepare() 2046 2047 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2048 sock.close() 2049 self.run_loop(proto.wait_closed()) 2050 2051 self.assertEqual(ret, len(self.DATA)) 2052 self.assertEqual(self.file.tell(), len(self.DATA)) 2053 self.assertEqual(proto.data, self.DATA) 2054 2055 def test_sock_sendfile_fallback_offset_and_count(self): 2056 sock, proto = self.prepare() 2057 2058 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, 2059 1000, 2000)) 2060 sock.close() 2061 self.run_loop(proto.wait_closed()) 2062 2063 self.assertEqual(ret, 2000) 2064 self.assertEqual(self.file.tell(), 3000) 2065 self.assertEqual(proto.data, self.DATA[1000:3000]) 2066 2067 def test_blocking_socket(self): 2068 self.loop.set_debug(True) 2069 sock = self.make_socket(blocking=True) 2070 with self.assertRaisesRegex(ValueError, "must be non-blocking"): 2071 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2072 2073 def test_nonbinary_file(self): 2074 sock = self.make_socket() 2075 with open(os_helper.TESTFN, encoding="utf-8") as f: 2076 with self.assertRaisesRegex(ValueError, "binary mode"): 2077 self.run_loop(self.loop.sock_sendfile(sock, f)) 2078 2079 def test_nonstream_socket(self): 2080 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 2081 sock.setblocking(False) 2082 self.addCleanup(sock.close) 2083 with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"): 2084 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2085 2086 def test_notint_count(self): 2087 sock = self.make_socket() 2088 with self.assertRaisesRegex(TypeError, 2089 "count must be a positive integer"): 2090 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count')) 2091 2092 def test_negative_count(self): 2093 sock = self.make_socket() 2094 with self.assertRaisesRegex(ValueError, 2095 "count must be a positive integer"): 2096 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1)) 2097 2098 def test_notint_offset(self): 2099 sock = self.make_socket() 2100 with self.assertRaisesRegex(TypeError, 2101 "offset must be a non-negative integer"): 2102 self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset')) 2103 2104 def test_negative_offset(self): 2105 sock = self.make_socket() 2106 with self.assertRaisesRegex(ValueError, 2107 "offset must be a non-negative integer"): 2108 self.run_loop(self.loop.sock_sendfile(sock, self.file, -1)) 2109 2110 2111class TestSelectorUtils(test_utils.TestCase): 2112 def check_set_nodelay(self, sock): 2113 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2114 self.assertFalse(opt) 2115 2116 base_events._set_nodelay(sock) 2117 2118 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2119 self.assertTrue(opt) 2120 2121 @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'), 2122 'need socket.TCP_NODELAY') 2123 def test_set_nodelay(self): 2124 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2125 proto=socket.IPPROTO_TCP) 2126 with sock: 2127 self.check_set_nodelay(sock) 2128 2129 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2130 proto=socket.IPPROTO_TCP) 2131 with sock: 2132 sock.setblocking(False) 2133 self.check_set_nodelay(sock) 2134 2135 2136 2137if __name__ == '__main__': 2138 unittest.main() 2139