1"""Tests for selector_events.py""" 2 3import selectors 4import socket 5import unittest 6from unittest import mock 7try: 8 import ssl 9except ImportError: 10 ssl = None 11 12import asyncio 13from asyncio.selector_events import BaseSelectorEventLoop 14from asyncio.selector_events import _SelectorTransport 15from asyncio.selector_events import _SelectorSocketTransport 16from asyncio.selector_events import _SelectorDatagramTransport 17from test.test_asyncio import utils as test_utils 18 19 20MOCK_ANY = mock.ANY 21 22 23def tearDownModule(): 24 asyncio.set_event_loop_policy(None) 25 26 27class TestBaseSelectorEventLoop(BaseSelectorEventLoop): 28 29 def _make_self_pipe(self): 30 self._ssock = mock.Mock() 31 self._csock = mock.Mock() 32 self._internal_fds += 1 33 34 def _close_self_pipe(self): 35 pass 36 37 38def list_to_buffer(l=()): 39 return bytearray().join(l) 40 41 42def close_transport(transport): 43 # Don't call transport.close() because the event loop and the selector 44 # are mocked 45 if transport._sock is None: 46 return 47 transport._sock.close() 48 transport._sock = None 49 50 51class BaseSelectorEventLoopTests(test_utils.TestCase): 52 53 def setUp(self): 54 super().setUp() 55 self.selector = mock.Mock() 56 self.selector.select.return_value = [] 57 self.loop = TestBaseSelectorEventLoop(self.selector) 58 self.set_event_loop(self.loop) 59 60 def test_make_socket_transport(self): 61 m = mock.Mock() 62 self.loop.add_reader = mock.Mock() 63 self.loop.add_reader._is_coroutine = False 64 transport = self.loop._make_socket_transport(m, asyncio.Protocol()) 65 self.assertIsInstance(transport, _SelectorSocketTransport) 66 67 # Calling repr() must not fail when the event loop is closed 68 self.loop.close() 69 repr(transport) 70 71 close_transport(transport) 72 73 @unittest.skipIf(ssl is None, 'No ssl module') 74 def test_make_ssl_transport(self): 75 m = mock.Mock() 76 self.loop._add_reader = mock.Mock() 77 self.loop._add_reader._is_coroutine = False 78 self.loop._add_writer = mock.Mock() 79 self.loop._remove_reader = mock.Mock() 80 self.loop._remove_writer = mock.Mock() 81 waiter = self.loop.create_future() 82 with test_utils.disable_logger(): 83 transport = self.loop._make_ssl_transport( 84 m, asyncio.Protocol(), m, waiter) 85 86 with self.assertRaisesRegex(RuntimeError, 87 r'SSL transport.*not.*initialized'): 88 transport.is_reading() 89 90 # execute the handshake while the logger is disabled 91 # to ignore SSL handshake failure 92 test_utils.run_briefly(self.loop) 93 94 self.assertTrue(transport.is_reading()) 95 transport.pause_reading() 96 transport.pause_reading() 97 self.assertFalse(transport.is_reading()) 98 transport.resume_reading() 99 transport.resume_reading() 100 self.assertTrue(transport.is_reading()) 101 102 # Sanity check 103 class_name = transport.__class__.__name__ 104 self.assertIn("ssl", class_name.lower()) 105 self.assertIn("transport", class_name.lower()) 106 107 transport.close() 108 # execute pending callbacks to close the socket transport 109 test_utils.run_briefly(self.loop) 110 111 @mock.patch('asyncio.selector_events.ssl', None) 112 @mock.patch('asyncio.sslproto.ssl', None) 113 def test_make_ssl_transport_without_ssl_error(self): 114 m = mock.Mock() 115 self.loop.add_reader = mock.Mock() 116 self.loop.add_writer = mock.Mock() 117 self.loop.remove_reader = mock.Mock() 118 self.loop.remove_writer = mock.Mock() 119 with self.assertRaises(RuntimeError): 120 self.loop._make_ssl_transport(m, m, m, m) 121 122 def test_close(self): 123 class EventLoop(BaseSelectorEventLoop): 124 def _make_self_pipe(self): 125 self._ssock = mock.Mock() 126 self._csock = mock.Mock() 127 self._internal_fds += 1 128 129 self.loop = EventLoop(self.selector) 130 self.set_event_loop(self.loop) 131 132 ssock = self.loop._ssock 133 ssock.fileno.return_value = 7 134 csock = self.loop._csock 135 csock.fileno.return_value = 1 136 remove_reader = self.loop._remove_reader = mock.Mock() 137 138 self.loop._selector.close() 139 self.loop._selector = selector = mock.Mock() 140 self.assertFalse(self.loop.is_closed()) 141 142 self.loop.close() 143 self.assertTrue(self.loop.is_closed()) 144 self.assertIsNone(self.loop._selector) 145 self.assertIsNone(self.loop._csock) 146 self.assertIsNone(self.loop._ssock) 147 selector.close.assert_called_with() 148 ssock.close.assert_called_with() 149 csock.close.assert_called_with() 150 remove_reader.assert_called_with(7) 151 152 # it should be possible to call close() more than once 153 self.loop.close() 154 self.loop.close() 155 156 # operation blocked when the loop is closed 157 f = self.loop.create_future() 158 self.assertRaises(RuntimeError, self.loop.run_forever) 159 self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 160 fd = 0 161 def callback(): 162 pass 163 self.assertRaises(RuntimeError, self.loop.add_reader, fd, callback) 164 self.assertRaises(RuntimeError, self.loop.add_writer, fd, callback) 165 166 def test_close_no_selector(self): 167 self.loop.remove_reader = mock.Mock() 168 self.loop._selector.close() 169 self.loop._selector = None 170 self.loop.close() 171 self.assertIsNone(self.loop._selector) 172 173 def test_read_from_self_tryagain(self): 174 self.loop._ssock.recv.side_effect = BlockingIOError 175 self.assertIsNone(self.loop._read_from_self()) 176 177 def test_read_from_self_exception(self): 178 self.loop._ssock.recv.side_effect = OSError 179 self.assertRaises(OSError, self.loop._read_from_self) 180 181 def test_write_to_self_tryagain(self): 182 self.loop._csock.send.side_effect = BlockingIOError 183 with test_utils.disable_logger(): 184 self.assertIsNone(self.loop._write_to_self()) 185 186 def test_write_to_self_exception(self): 187 # _write_to_self() swallows OSError 188 self.loop._csock.send.side_effect = RuntimeError() 189 self.assertRaises(RuntimeError, self.loop._write_to_self) 190 191 def test_add_reader(self): 192 self.loop._selector.get_key.side_effect = KeyError 193 cb = lambda: True 194 self.loop.add_reader(1, cb) 195 196 self.assertTrue(self.loop._selector.register.called) 197 fd, mask, (r, w) = self.loop._selector.register.call_args[0] 198 self.assertEqual(1, fd) 199 self.assertEqual(selectors.EVENT_READ, mask) 200 self.assertEqual(cb, r._callback) 201 self.assertIsNone(w) 202 203 def test_add_reader_existing(self): 204 reader = mock.Mock() 205 writer = mock.Mock() 206 self.loop._selector.get_key.return_value = selectors.SelectorKey( 207 1, 1, selectors.EVENT_WRITE, (reader, writer)) 208 cb = lambda: True 209 self.loop.add_reader(1, cb) 210 211 self.assertTrue(reader.cancel.called) 212 self.assertFalse(self.loop._selector.register.called) 213 self.assertTrue(self.loop._selector.modify.called) 214 fd, mask, (r, w) = self.loop._selector.modify.call_args[0] 215 self.assertEqual(1, fd) 216 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask) 217 self.assertEqual(cb, r._callback) 218 self.assertEqual(writer, w) 219 220 def test_add_reader_existing_writer(self): 221 writer = mock.Mock() 222 self.loop._selector.get_key.return_value = selectors.SelectorKey( 223 1, 1, selectors.EVENT_WRITE, (None, writer)) 224 cb = lambda: True 225 self.loop.add_reader(1, cb) 226 227 self.assertFalse(self.loop._selector.register.called) 228 self.assertTrue(self.loop._selector.modify.called) 229 fd, mask, (r, w) = self.loop._selector.modify.call_args[0] 230 self.assertEqual(1, fd) 231 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask) 232 self.assertEqual(cb, r._callback) 233 self.assertEqual(writer, w) 234 235 def test_remove_reader(self): 236 self.loop._selector.get_key.return_value = selectors.SelectorKey( 237 1, 1, selectors.EVENT_READ, (None, None)) 238 self.assertFalse(self.loop.remove_reader(1)) 239 240 self.assertTrue(self.loop._selector.unregister.called) 241 242 def test_remove_reader_read_write(self): 243 reader = mock.Mock() 244 writer = mock.Mock() 245 self.loop._selector.get_key.return_value = selectors.SelectorKey( 246 1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE, 247 (reader, writer)) 248 self.assertTrue( 249 self.loop.remove_reader(1)) 250 251 self.assertFalse(self.loop._selector.unregister.called) 252 self.assertEqual( 253 (1, selectors.EVENT_WRITE, (None, writer)), 254 self.loop._selector.modify.call_args[0]) 255 256 def test_remove_reader_unknown(self): 257 self.loop._selector.get_key.side_effect = KeyError 258 self.assertFalse( 259 self.loop.remove_reader(1)) 260 261 def test_add_writer(self): 262 self.loop._selector.get_key.side_effect = KeyError 263 cb = lambda: True 264 self.loop.add_writer(1, cb) 265 266 self.assertTrue(self.loop._selector.register.called) 267 fd, mask, (r, w) = self.loop._selector.register.call_args[0] 268 self.assertEqual(1, fd) 269 self.assertEqual(selectors.EVENT_WRITE, mask) 270 self.assertIsNone(r) 271 self.assertEqual(cb, w._callback) 272 273 def test_add_writer_existing(self): 274 reader = mock.Mock() 275 writer = mock.Mock() 276 self.loop._selector.get_key.return_value = selectors.SelectorKey( 277 1, 1, selectors.EVENT_READ, (reader, writer)) 278 cb = lambda: True 279 self.loop.add_writer(1, cb) 280 281 self.assertTrue(writer.cancel.called) 282 self.assertFalse(self.loop._selector.register.called) 283 self.assertTrue(self.loop._selector.modify.called) 284 fd, mask, (r, w) = self.loop._selector.modify.call_args[0] 285 self.assertEqual(1, fd) 286 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask) 287 self.assertEqual(reader, r) 288 self.assertEqual(cb, w._callback) 289 290 def test_remove_writer(self): 291 self.loop._selector.get_key.return_value = selectors.SelectorKey( 292 1, 1, selectors.EVENT_WRITE, (None, None)) 293 self.assertFalse(self.loop.remove_writer(1)) 294 295 self.assertTrue(self.loop._selector.unregister.called) 296 297 def test_remove_writer_read_write(self): 298 reader = mock.Mock() 299 writer = mock.Mock() 300 self.loop._selector.get_key.return_value = selectors.SelectorKey( 301 1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE, 302 (reader, writer)) 303 self.assertTrue( 304 self.loop.remove_writer(1)) 305 306 self.assertFalse(self.loop._selector.unregister.called) 307 self.assertEqual( 308 (1, selectors.EVENT_READ, (reader, None)), 309 self.loop._selector.modify.call_args[0]) 310 311 def test_remove_writer_unknown(self): 312 self.loop._selector.get_key.side_effect = KeyError 313 self.assertFalse( 314 self.loop.remove_writer(1)) 315 316 def test_process_events_read(self): 317 reader = mock.Mock() 318 reader._cancelled = False 319 320 self.loop._add_callback = mock.Mock() 321 self.loop._process_events( 322 [(selectors.SelectorKey( 323 1, 1, selectors.EVENT_READ, (reader, None)), 324 selectors.EVENT_READ)]) 325 self.assertTrue(self.loop._add_callback.called) 326 self.loop._add_callback.assert_called_with(reader) 327 328 def test_process_events_read_cancelled(self): 329 reader = mock.Mock() 330 reader.cancelled = True 331 332 self.loop._remove_reader = mock.Mock() 333 self.loop._process_events( 334 [(selectors.SelectorKey( 335 1, 1, selectors.EVENT_READ, (reader, None)), 336 selectors.EVENT_READ)]) 337 self.loop._remove_reader.assert_called_with(1) 338 339 def test_process_events_write(self): 340 writer = mock.Mock() 341 writer._cancelled = False 342 343 self.loop._add_callback = mock.Mock() 344 self.loop._process_events( 345 [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE, 346 (None, writer)), 347 selectors.EVENT_WRITE)]) 348 self.loop._add_callback.assert_called_with(writer) 349 350 def test_process_events_write_cancelled(self): 351 writer = mock.Mock() 352 writer.cancelled = True 353 self.loop._remove_writer = mock.Mock() 354 355 self.loop._process_events( 356 [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE, 357 (None, writer)), 358 selectors.EVENT_WRITE)]) 359 self.loop._remove_writer.assert_called_with(1) 360 361 def test_accept_connection_multiple(self): 362 sock = mock.Mock() 363 sock.accept.return_value = (mock.Mock(), mock.Mock()) 364 backlog = 100 365 # Mock the coroutine generation for a connection to prevent 366 # warnings related to un-awaited coroutines. _accept_connection2 367 # is an async function that is patched with AsyncMock. create_task 368 # creates a task out of coroutine returned by AsyncMock, so use 369 # asyncio.sleep(0) to ensure created tasks are complete to avoid 370 # task pending warnings. 371 mock_obj = mock.patch.object 372 with mock_obj(self.loop, '_accept_connection2') as accept2_mock: 373 self.loop._accept_connection( 374 mock.Mock(), sock, backlog=backlog) 375 self.loop.run_until_complete(asyncio.sleep(0)) 376 self.assertEqual(sock.accept.call_count, backlog) 377 378 379class SelectorTransportTests(test_utils.TestCase): 380 381 def setUp(self): 382 super().setUp() 383 self.loop = self.new_test_loop() 384 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 385 self.sock = mock.Mock(socket.socket) 386 self.sock.fileno.return_value = 7 387 388 def create_transport(self): 389 transport = _SelectorTransport(self.loop, self.sock, self.protocol, 390 None) 391 self.addCleanup(close_transport, transport) 392 return transport 393 394 def test_ctor(self): 395 tr = self.create_transport() 396 self.assertIs(tr._loop, self.loop) 397 self.assertIs(tr._sock, self.sock) 398 self.assertIs(tr._sock_fd, 7) 399 400 def test_abort(self): 401 tr = self.create_transport() 402 tr._force_close = mock.Mock() 403 404 tr.abort() 405 tr._force_close.assert_called_with(None) 406 407 def test_close(self): 408 tr = self.create_transport() 409 tr.close() 410 411 self.assertTrue(tr.is_closing()) 412 self.assertEqual(1, self.loop.remove_reader_count[7]) 413 self.protocol.connection_lost(None) 414 self.assertEqual(tr._conn_lost, 1) 415 416 tr.close() 417 self.assertEqual(tr._conn_lost, 1) 418 self.assertEqual(1, self.loop.remove_reader_count[7]) 419 420 def test_close_write_buffer(self): 421 tr = self.create_transport() 422 tr._buffer.extend(b'data') 423 tr.close() 424 425 self.assertFalse(self.loop.readers) 426 test_utils.run_briefly(self.loop) 427 self.assertFalse(self.protocol.connection_lost.called) 428 429 def test_force_close(self): 430 tr = self.create_transport() 431 tr._buffer.extend(b'1') 432 self.loop._add_reader(7, mock.sentinel) 433 self.loop._add_writer(7, mock.sentinel) 434 tr._force_close(None) 435 436 self.assertTrue(tr.is_closing()) 437 self.assertEqual(tr._buffer, list_to_buffer()) 438 self.assertFalse(self.loop.readers) 439 self.assertFalse(self.loop.writers) 440 441 # second close should not remove reader 442 tr._force_close(None) 443 self.assertFalse(self.loop.readers) 444 self.assertEqual(1, self.loop.remove_reader_count[7]) 445 446 @mock.patch('asyncio.log.logger.error') 447 def test_fatal_error(self, m_exc): 448 exc = OSError() 449 tr = self.create_transport() 450 tr._force_close = mock.Mock() 451 tr._fatal_error(exc) 452 453 m_exc.assert_not_called() 454 455 tr._force_close.assert_called_with(exc) 456 457 @mock.patch('asyncio.log.logger.error') 458 def test_fatal_error_custom_exception(self, m_exc): 459 class MyError(Exception): 460 pass 461 exc = MyError() 462 tr = self.create_transport() 463 tr._force_close = mock.Mock() 464 tr._fatal_error(exc) 465 466 m_exc.assert_called_with( 467 test_utils.MockPattern( 468 'Fatal error on transport\nprotocol:.*\ntransport:.*'), 469 exc_info=(MyError, MOCK_ANY, MOCK_ANY)) 470 471 tr._force_close.assert_called_with(exc) 472 473 def test_connection_lost(self): 474 exc = OSError() 475 tr = self.create_transport() 476 self.assertIsNotNone(tr._protocol) 477 self.assertIsNotNone(tr._loop) 478 tr._call_connection_lost(exc) 479 480 self.protocol.connection_lost.assert_called_with(exc) 481 self.sock.close.assert_called_with() 482 self.assertIsNone(tr._sock) 483 484 self.assertIsNone(tr._protocol) 485 self.assertIsNone(tr._loop) 486 487 def test__add_reader(self): 488 tr = self.create_transport() 489 tr._buffer.extend(b'1') 490 tr._add_reader(7, mock.sentinel) 491 self.assertTrue(self.loop.readers) 492 493 tr._force_close(None) 494 495 self.assertTrue(tr.is_closing()) 496 self.assertFalse(self.loop.readers) 497 498 # can not add readers after closing 499 tr._add_reader(7, mock.sentinel) 500 self.assertFalse(self.loop.readers) 501 502 503class SelectorSocketTransportTests(test_utils.TestCase): 504 505 def setUp(self): 506 super().setUp() 507 self.loop = self.new_test_loop() 508 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 509 self.sock = mock.Mock(socket.socket) 510 self.sock_fd = self.sock.fileno.return_value = 7 511 512 def socket_transport(self, waiter=None): 513 transport = _SelectorSocketTransport(self.loop, self.sock, 514 self.protocol, waiter=waiter) 515 self.addCleanup(close_transport, transport) 516 return transport 517 518 def test_ctor(self): 519 waiter = self.loop.create_future() 520 tr = self.socket_transport(waiter=waiter) 521 self.loop.run_until_complete(waiter) 522 523 self.loop.assert_reader(7, tr._read_ready) 524 test_utils.run_briefly(self.loop) 525 self.protocol.connection_made.assert_called_with(tr) 526 527 def test_ctor_with_waiter(self): 528 waiter = self.loop.create_future() 529 self.socket_transport(waiter=waiter) 530 self.loop.run_until_complete(waiter) 531 532 self.assertIsNone(waiter.result()) 533 534 def test_pause_resume_reading(self): 535 tr = self.socket_transport() 536 test_utils.run_briefly(self.loop) 537 self.assertFalse(tr._paused) 538 self.assertTrue(tr.is_reading()) 539 self.loop.assert_reader(7, tr._read_ready) 540 541 tr.pause_reading() 542 tr.pause_reading() 543 self.assertTrue(tr._paused) 544 self.assertFalse(tr.is_reading()) 545 self.loop.assert_no_reader(7) 546 547 tr.resume_reading() 548 tr.resume_reading() 549 self.assertFalse(tr._paused) 550 self.assertTrue(tr.is_reading()) 551 self.loop.assert_reader(7, tr._read_ready) 552 553 tr.close() 554 self.assertFalse(tr.is_reading()) 555 self.loop.assert_no_reader(7) 556 557 def test_read_eof_received_error(self): 558 transport = self.socket_transport() 559 transport.close = mock.Mock() 560 transport._fatal_error = mock.Mock() 561 562 self.loop.call_exception_handler = mock.Mock() 563 564 self.protocol.eof_received.side_effect = LookupError() 565 566 self.sock.recv.return_value = b'' 567 transport._read_ready() 568 569 self.protocol.eof_received.assert_called_with() 570 self.assertTrue(transport._fatal_error.called) 571 572 def test_data_received_error(self): 573 transport = self.socket_transport() 574 transport._fatal_error = mock.Mock() 575 576 self.loop.call_exception_handler = mock.Mock() 577 self.protocol.data_received.side_effect = LookupError() 578 579 self.sock.recv.return_value = b'data' 580 transport._read_ready() 581 582 self.assertTrue(transport._fatal_error.called) 583 self.assertTrue(self.protocol.data_received.called) 584 585 def test_read_ready(self): 586 transport = self.socket_transport() 587 588 self.sock.recv.return_value = b'data' 589 transport._read_ready() 590 591 self.protocol.data_received.assert_called_with(b'data') 592 593 def test_read_ready_eof(self): 594 transport = self.socket_transport() 595 transport.close = mock.Mock() 596 597 self.sock.recv.return_value = b'' 598 transport._read_ready() 599 600 self.protocol.eof_received.assert_called_with() 601 transport.close.assert_called_with() 602 603 def test_read_ready_eof_keep_open(self): 604 transport = self.socket_transport() 605 transport.close = mock.Mock() 606 607 self.sock.recv.return_value = b'' 608 self.protocol.eof_received.return_value = True 609 transport._read_ready() 610 611 self.protocol.eof_received.assert_called_with() 612 self.assertFalse(transport.close.called) 613 614 @mock.patch('logging.exception') 615 def test_read_ready_tryagain(self, m_exc): 616 self.sock.recv.side_effect = BlockingIOError 617 618 transport = self.socket_transport() 619 transport._fatal_error = mock.Mock() 620 transport._read_ready() 621 622 self.assertFalse(transport._fatal_error.called) 623 624 @mock.patch('logging.exception') 625 def test_read_ready_tryagain_interrupted(self, m_exc): 626 self.sock.recv.side_effect = InterruptedError 627 628 transport = self.socket_transport() 629 transport._fatal_error = mock.Mock() 630 transport._read_ready() 631 632 self.assertFalse(transport._fatal_error.called) 633 634 @mock.patch('logging.exception') 635 def test_read_ready_conn_reset(self, m_exc): 636 err = self.sock.recv.side_effect = ConnectionResetError() 637 638 transport = self.socket_transport() 639 transport._force_close = mock.Mock() 640 with test_utils.disable_logger(): 641 transport._read_ready() 642 transport._force_close.assert_called_with(err) 643 644 @mock.patch('logging.exception') 645 def test_read_ready_err(self, m_exc): 646 err = self.sock.recv.side_effect = OSError() 647 648 transport = self.socket_transport() 649 transport._fatal_error = mock.Mock() 650 transport._read_ready() 651 652 transport._fatal_error.assert_called_with( 653 err, 654 'Fatal read error on socket transport') 655 656 def test_write(self): 657 data = b'data' 658 self.sock.send.return_value = len(data) 659 660 transport = self.socket_transport() 661 transport.write(data) 662 self.sock.send.assert_called_with(data) 663 664 def test_write_bytearray(self): 665 data = bytearray(b'data') 666 self.sock.send.return_value = len(data) 667 668 transport = self.socket_transport() 669 transport.write(data) 670 self.sock.send.assert_called_with(data) 671 self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated. 672 673 def test_write_memoryview(self): 674 data = memoryview(b'data') 675 self.sock.send.return_value = len(data) 676 677 transport = self.socket_transport() 678 transport.write(data) 679 self.sock.send.assert_called_with(data) 680 681 def test_write_no_data(self): 682 transport = self.socket_transport() 683 transport._buffer.extend(b'data') 684 transport.write(b'') 685 self.assertFalse(self.sock.send.called) 686 self.assertEqual(list_to_buffer([b'data']), transport._buffer) 687 688 def test_write_buffer(self): 689 transport = self.socket_transport() 690 transport._buffer.extend(b'data1') 691 transport.write(b'data2') 692 self.assertFalse(self.sock.send.called) 693 self.assertEqual(list_to_buffer([b'data1', b'data2']), 694 transport._buffer) 695 696 def test_write_partial(self): 697 data = b'data' 698 self.sock.send.return_value = 2 699 700 transport = self.socket_transport() 701 transport.write(data) 702 703 self.loop.assert_writer(7, transport._write_ready) 704 self.assertEqual(list_to_buffer([b'ta']), transport._buffer) 705 706 def test_write_partial_bytearray(self): 707 data = bytearray(b'data') 708 self.sock.send.return_value = 2 709 710 transport = self.socket_transport() 711 transport.write(data) 712 713 self.loop.assert_writer(7, transport._write_ready) 714 self.assertEqual(list_to_buffer([b'ta']), transport._buffer) 715 self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated. 716 717 def test_write_partial_memoryview(self): 718 data = memoryview(b'data') 719 self.sock.send.return_value = 2 720 721 transport = self.socket_transport() 722 transport.write(data) 723 724 self.loop.assert_writer(7, transport._write_ready) 725 self.assertEqual(list_to_buffer([b'ta']), transport._buffer) 726 727 def test_write_partial_none(self): 728 data = b'data' 729 self.sock.send.return_value = 0 730 self.sock.fileno.return_value = 7 731 732 transport = self.socket_transport() 733 transport.write(data) 734 735 self.loop.assert_writer(7, transport._write_ready) 736 self.assertEqual(list_to_buffer([b'data']), transport._buffer) 737 738 def test_write_tryagain(self): 739 self.sock.send.side_effect = BlockingIOError 740 741 data = b'data' 742 transport = self.socket_transport() 743 transport.write(data) 744 745 self.loop.assert_writer(7, transport._write_ready) 746 self.assertEqual(list_to_buffer([b'data']), transport._buffer) 747 748 @mock.patch('asyncio.selector_events.logger') 749 def test_write_exception(self, m_log): 750 err = self.sock.send.side_effect = OSError() 751 752 data = b'data' 753 transport = self.socket_transport() 754 transport._fatal_error = mock.Mock() 755 transport.write(data) 756 transport._fatal_error.assert_called_with( 757 err, 758 'Fatal write error on socket transport') 759 transport._conn_lost = 1 760 761 self.sock.reset_mock() 762 transport.write(data) 763 self.assertFalse(self.sock.send.called) 764 self.assertEqual(transport._conn_lost, 2) 765 transport.write(data) 766 transport.write(data) 767 transport.write(data) 768 transport.write(data) 769 m_log.warning.assert_called_with('socket.send() raised exception.') 770 771 def test_write_str(self): 772 transport = self.socket_transport() 773 self.assertRaises(TypeError, transport.write, 'str') 774 775 def test_write_closing(self): 776 transport = self.socket_transport() 777 transport.close() 778 self.assertEqual(transport._conn_lost, 1) 779 transport.write(b'data') 780 self.assertEqual(transport._conn_lost, 2) 781 782 def test_write_ready(self): 783 data = b'data' 784 self.sock.send.return_value = len(data) 785 786 transport = self.socket_transport() 787 transport._buffer.extend(data) 788 self.loop._add_writer(7, transport._write_ready) 789 transport._write_ready() 790 self.assertTrue(self.sock.send.called) 791 self.assertFalse(self.loop.writers) 792 793 def test_write_ready_closing(self): 794 data = b'data' 795 self.sock.send.return_value = len(data) 796 797 transport = self.socket_transport() 798 transport._closing = True 799 transport._buffer.extend(data) 800 self.loop._add_writer(7, transport._write_ready) 801 transport._write_ready() 802 self.assertTrue(self.sock.send.called) 803 self.assertFalse(self.loop.writers) 804 self.sock.close.assert_called_with() 805 self.protocol.connection_lost.assert_called_with(None) 806 807 def test_write_ready_no_data(self): 808 transport = self.socket_transport() 809 # This is an internal error. 810 self.assertRaises(AssertionError, transport._write_ready) 811 812 def test_write_ready_partial(self): 813 data = b'data' 814 self.sock.send.return_value = 2 815 816 transport = self.socket_transport() 817 transport._buffer.extend(data) 818 self.loop._add_writer(7, transport._write_ready) 819 transport._write_ready() 820 self.loop.assert_writer(7, transport._write_ready) 821 self.assertEqual(list_to_buffer([b'ta']), transport._buffer) 822 823 def test_write_ready_partial_none(self): 824 data = b'data' 825 self.sock.send.return_value = 0 826 827 transport = self.socket_transport() 828 transport._buffer.extend(data) 829 self.loop._add_writer(7, transport._write_ready) 830 transport._write_ready() 831 self.loop.assert_writer(7, transport._write_ready) 832 self.assertEqual(list_to_buffer([b'data']), transport._buffer) 833 834 def test_write_ready_tryagain(self): 835 self.sock.send.side_effect = BlockingIOError 836 837 transport = self.socket_transport() 838 transport._buffer = list_to_buffer([b'data1', b'data2']) 839 self.loop._add_writer(7, transport._write_ready) 840 transport._write_ready() 841 842 self.loop.assert_writer(7, transport._write_ready) 843 self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer) 844 845 def test_write_ready_exception(self): 846 err = self.sock.send.side_effect = OSError() 847 848 transport = self.socket_transport() 849 transport._fatal_error = mock.Mock() 850 transport._buffer.extend(b'data') 851 transport._write_ready() 852 transport._fatal_error.assert_called_with( 853 err, 854 'Fatal write error on socket transport') 855 856 def test_write_eof(self): 857 tr = self.socket_transport() 858 self.assertTrue(tr.can_write_eof()) 859 tr.write_eof() 860 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 861 tr.write_eof() 862 self.assertEqual(self.sock.shutdown.call_count, 1) 863 tr.close() 864 865 def test_write_eof_buffer(self): 866 tr = self.socket_transport() 867 self.sock.send.side_effect = BlockingIOError 868 tr.write(b'data') 869 tr.write_eof() 870 self.assertEqual(tr._buffer, list_to_buffer([b'data'])) 871 self.assertTrue(tr._eof) 872 self.assertFalse(self.sock.shutdown.called) 873 self.sock.send.side_effect = lambda _: 4 874 tr._write_ready() 875 self.assertTrue(self.sock.send.called) 876 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 877 tr.close() 878 879 def test_write_eof_after_close(self): 880 tr = self.socket_transport() 881 tr.close() 882 self.loop.run_until_complete(asyncio.sleep(0)) 883 tr.write_eof() 884 885 @mock.patch('asyncio.base_events.logger') 886 def test_transport_close_remove_writer(self, m_log): 887 remove_writer = self.loop._remove_writer = mock.Mock() 888 889 transport = self.socket_transport() 890 transport.close() 891 remove_writer.assert_called_with(self.sock_fd) 892 893 894class SelectorSocketTransportBufferedProtocolTests(test_utils.TestCase): 895 896 def setUp(self): 897 super().setUp() 898 self.loop = self.new_test_loop() 899 900 self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol) 901 self.buf = bytearray(1) 902 self.protocol.get_buffer.side_effect = lambda hint: self.buf 903 904 self.sock = mock.Mock(socket.socket) 905 self.sock_fd = self.sock.fileno.return_value = 7 906 907 def socket_transport(self, waiter=None): 908 transport = _SelectorSocketTransport(self.loop, self.sock, 909 self.protocol, waiter=waiter) 910 self.addCleanup(close_transport, transport) 911 return transport 912 913 def test_ctor(self): 914 waiter = self.loop.create_future() 915 tr = self.socket_transport(waiter=waiter) 916 self.loop.run_until_complete(waiter) 917 918 self.loop.assert_reader(7, tr._read_ready) 919 test_utils.run_briefly(self.loop) 920 self.protocol.connection_made.assert_called_with(tr) 921 922 def test_get_buffer_error(self): 923 transport = self.socket_transport() 924 transport._fatal_error = mock.Mock() 925 926 self.loop.call_exception_handler = mock.Mock() 927 self.protocol.get_buffer.side_effect = LookupError() 928 929 transport._read_ready() 930 931 self.assertTrue(transport._fatal_error.called) 932 self.assertTrue(self.protocol.get_buffer.called) 933 self.assertFalse(self.protocol.buffer_updated.called) 934 935 def test_get_buffer_zerosized(self): 936 transport = self.socket_transport() 937 transport._fatal_error = mock.Mock() 938 939 self.loop.call_exception_handler = mock.Mock() 940 self.protocol.get_buffer.side_effect = lambda hint: bytearray(0) 941 942 transport._read_ready() 943 944 self.assertTrue(transport._fatal_error.called) 945 self.assertTrue(self.protocol.get_buffer.called) 946 self.assertFalse(self.protocol.buffer_updated.called) 947 948 def test_proto_type_switch(self): 949 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 950 transport = self.socket_transport() 951 952 self.sock.recv.return_value = b'data' 953 transport._read_ready() 954 955 self.protocol.data_received.assert_called_with(b'data') 956 957 # switch protocol to a BufferedProtocol 958 959 buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol) 960 buf = bytearray(4) 961 buf_proto.get_buffer.side_effect = lambda hint: buf 962 963 transport.set_protocol(buf_proto) 964 965 self.sock.recv_into.return_value = 10 966 transport._read_ready() 967 968 buf_proto.get_buffer.assert_called_with(-1) 969 buf_proto.buffer_updated.assert_called_with(10) 970 971 def test_buffer_updated_error(self): 972 transport = self.socket_transport() 973 transport._fatal_error = mock.Mock() 974 975 self.loop.call_exception_handler = mock.Mock() 976 self.protocol.buffer_updated.side_effect = LookupError() 977 978 self.sock.recv_into.return_value = 10 979 transport._read_ready() 980 981 self.assertTrue(transport._fatal_error.called) 982 self.assertTrue(self.protocol.get_buffer.called) 983 self.assertTrue(self.protocol.buffer_updated.called) 984 985 def test_read_eof_received_error(self): 986 transport = self.socket_transport() 987 transport.close = mock.Mock() 988 transport._fatal_error = mock.Mock() 989 990 self.loop.call_exception_handler = mock.Mock() 991 992 self.protocol.eof_received.side_effect = LookupError() 993 994 self.sock.recv_into.return_value = 0 995 transport._read_ready() 996 997 self.protocol.eof_received.assert_called_with() 998 self.assertTrue(transport._fatal_error.called) 999 1000 def test_read_ready(self): 1001 transport = self.socket_transport() 1002 1003 self.sock.recv_into.return_value = 10 1004 transport._read_ready() 1005 1006 self.protocol.get_buffer.assert_called_with(-1) 1007 self.protocol.buffer_updated.assert_called_with(10) 1008 1009 def test_read_ready_eof(self): 1010 transport = self.socket_transport() 1011 transport.close = mock.Mock() 1012 1013 self.sock.recv_into.return_value = 0 1014 transport._read_ready() 1015 1016 self.protocol.eof_received.assert_called_with() 1017 transport.close.assert_called_with() 1018 1019 def test_read_ready_eof_keep_open(self): 1020 transport = self.socket_transport() 1021 transport.close = mock.Mock() 1022 1023 self.sock.recv_into.return_value = 0 1024 self.protocol.eof_received.return_value = True 1025 transport._read_ready() 1026 1027 self.protocol.eof_received.assert_called_with() 1028 self.assertFalse(transport.close.called) 1029 1030 @mock.patch('logging.exception') 1031 def test_read_ready_tryagain(self, m_exc): 1032 self.sock.recv_into.side_effect = BlockingIOError 1033 1034 transport = self.socket_transport() 1035 transport._fatal_error = mock.Mock() 1036 transport._read_ready() 1037 1038 self.assertFalse(transport._fatal_error.called) 1039 1040 @mock.patch('logging.exception') 1041 def test_read_ready_tryagain_interrupted(self, m_exc): 1042 self.sock.recv_into.side_effect = InterruptedError 1043 1044 transport = self.socket_transport() 1045 transport._fatal_error = mock.Mock() 1046 transport._read_ready() 1047 1048 self.assertFalse(transport._fatal_error.called) 1049 1050 @mock.patch('logging.exception') 1051 def test_read_ready_conn_reset(self, m_exc): 1052 err = self.sock.recv_into.side_effect = ConnectionResetError() 1053 1054 transport = self.socket_transport() 1055 transport._force_close = mock.Mock() 1056 with test_utils.disable_logger(): 1057 transport._read_ready() 1058 transport._force_close.assert_called_with(err) 1059 1060 @mock.patch('logging.exception') 1061 def test_read_ready_err(self, m_exc): 1062 err = self.sock.recv_into.side_effect = OSError() 1063 1064 transport = self.socket_transport() 1065 transport._fatal_error = mock.Mock() 1066 transport._read_ready() 1067 1068 transport._fatal_error.assert_called_with( 1069 err, 1070 'Fatal read error on socket transport') 1071 1072 1073class SelectorDatagramTransportTests(test_utils.TestCase): 1074 1075 def setUp(self): 1076 super().setUp() 1077 self.loop = self.new_test_loop() 1078 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) 1079 self.sock = mock.Mock(spec_set=socket.socket) 1080 self.sock.fileno.return_value = 7 1081 1082 def datagram_transport(self, address=None): 1083 self.sock.getpeername.side_effect = None if address else OSError 1084 transport = _SelectorDatagramTransport(self.loop, self.sock, 1085 self.protocol, 1086 address=address) 1087 self.addCleanup(close_transport, transport) 1088 return transport 1089 1090 def test_read_ready(self): 1091 transport = self.datagram_transport() 1092 1093 self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234)) 1094 transport._read_ready() 1095 1096 self.protocol.datagram_received.assert_called_with( 1097 b'data', ('0.0.0.0', 1234)) 1098 1099 def test_read_ready_tryagain(self): 1100 transport = self.datagram_transport() 1101 1102 self.sock.recvfrom.side_effect = BlockingIOError 1103 transport._fatal_error = mock.Mock() 1104 transport._read_ready() 1105 1106 self.assertFalse(transport._fatal_error.called) 1107 1108 def test_read_ready_err(self): 1109 transport = self.datagram_transport() 1110 1111 err = self.sock.recvfrom.side_effect = RuntimeError() 1112 transport._fatal_error = mock.Mock() 1113 transport._read_ready() 1114 1115 transport._fatal_error.assert_called_with( 1116 err, 1117 'Fatal read error on datagram transport') 1118 1119 def test_read_ready_oserr(self): 1120 transport = self.datagram_transport() 1121 1122 err = self.sock.recvfrom.side_effect = OSError() 1123 transport._fatal_error = mock.Mock() 1124 transport._read_ready() 1125 1126 self.assertFalse(transport._fatal_error.called) 1127 self.protocol.error_received.assert_called_with(err) 1128 1129 def test_sendto(self): 1130 data = b'data' 1131 transport = self.datagram_transport() 1132 transport.sendto(data, ('0.0.0.0', 1234)) 1133 self.assertTrue(self.sock.sendto.called) 1134 self.assertEqual( 1135 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) 1136 1137 def test_sendto_bytearray(self): 1138 data = bytearray(b'data') 1139 transport = self.datagram_transport() 1140 transport.sendto(data, ('0.0.0.0', 1234)) 1141 self.assertTrue(self.sock.sendto.called) 1142 self.assertEqual( 1143 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) 1144 1145 def test_sendto_memoryview(self): 1146 data = memoryview(b'data') 1147 transport = self.datagram_transport() 1148 transport.sendto(data, ('0.0.0.0', 1234)) 1149 self.assertTrue(self.sock.sendto.called) 1150 self.assertEqual( 1151 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) 1152 1153 def test_sendto_no_data(self): 1154 transport = self.datagram_transport() 1155 transport._buffer.append((b'data', ('0.0.0.0', 12345))) 1156 transport.sendto(b'', ()) 1157 self.assertFalse(self.sock.sendto.called) 1158 self.assertEqual( 1159 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) 1160 1161 def test_sendto_buffer(self): 1162 transport = self.datagram_transport() 1163 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 1164 transport.sendto(b'data2', ('0.0.0.0', 12345)) 1165 self.assertFalse(self.sock.sendto.called) 1166 self.assertEqual( 1167 [(b'data1', ('0.0.0.0', 12345)), 1168 (b'data2', ('0.0.0.0', 12345))], 1169 list(transport._buffer)) 1170 1171 def test_sendto_buffer_bytearray(self): 1172 data2 = bytearray(b'data2') 1173 transport = self.datagram_transport() 1174 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 1175 transport.sendto(data2, ('0.0.0.0', 12345)) 1176 self.assertFalse(self.sock.sendto.called) 1177 self.assertEqual( 1178 [(b'data1', ('0.0.0.0', 12345)), 1179 (b'data2', ('0.0.0.0', 12345))], 1180 list(transport._buffer)) 1181 self.assertIsInstance(transport._buffer[1][0], bytes) 1182 1183 def test_sendto_buffer_memoryview(self): 1184 data2 = memoryview(b'data2') 1185 transport = self.datagram_transport() 1186 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 1187 transport.sendto(data2, ('0.0.0.0', 12345)) 1188 self.assertFalse(self.sock.sendto.called) 1189 self.assertEqual( 1190 [(b'data1', ('0.0.0.0', 12345)), 1191 (b'data2', ('0.0.0.0', 12345))], 1192 list(transport._buffer)) 1193 self.assertIsInstance(transport._buffer[1][0], bytes) 1194 1195 def test_sendto_tryagain(self): 1196 data = b'data' 1197 1198 self.sock.sendto.side_effect = BlockingIOError 1199 1200 transport = self.datagram_transport() 1201 transport.sendto(data, ('0.0.0.0', 12345)) 1202 1203 self.loop.assert_writer(7, transport._sendto_ready) 1204 self.assertEqual( 1205 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) 1206 1207 @mock.patch('asyncio.selector_events.logger') 1208 def test_sendto_exception(self, m_log): 1209 data = b'data' 1210 err = self.sock.sendto.side_effect = RuntimeError() 1211 1212 transport = self.datagram_transport() 1213 transport._fatal_error = mock.Mock() 1214 transport.sendto(data, ()) 1215 1216 self.assertTrue(transport._fatal_error.called) 1217 transport._fatal_error.assert_called_with( 1218 err, 1219 'Fatal write error on datagram transport') 1220 transport._conn_lost = 1 1221 1222 transport._address = ('123',) 1223 transport.sendto(data) 1224 transport.sendto(data) 1225 transport.sendto(data) 1226 transport.sendto(data) 1227 transport.sendto(data) 1228 m_log.warning.assert_called_with('socket.send() raised exception.') 1229 1230 def test_sendto_error_received(self): 1231 data = b'data' 1232 1233 self.sock.sendto.side_effect = ConnectionRefusedError 1234 1235 transport = self.datagram_transport() 1236 transport._fatal_error = mock.Mock() 1237 transport.sendto(data, ()) 1238 1239 self.assertEqual(transport._conn_lost, 0) 1240 self.assertFalse(transport._fatal_error.called) 1241 1242 def test_sendto_error_received_connected(self): 1243 data = b'data' 1244 1245 self.sock.send.side_effect = ConnectionRefusedError 1246 1247 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1248 transport._fatal_error = mock.Mock() 1249 transport.sendto(data) 1250 1251 self.assertFalse(transport._fatal_error.called) 1252 self.assertTrue(self.protocol.error_received.called) 1253 1254 def test_sendto_str(self): 1255 transport = self.datagram_transport() 1256 self.assertRaises(TypeError, transport.sendto, 'str', ()) 1257 1258 def test_sendto_connected_addr(self): 1259 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1260 self.assertRaises( 1261 ValueError, transport.sendto, b'str', ('0.0.0.0', 2)) 1262 1263 def test_sendto_closing(self): 1264 transport = self.datagram_transport(address=(1,)) 1265 transport.close() 1266 self.assertEqual(transport._conn_lost, 1) 1267 transport.sendto(b'data', (1,)) 1268 self.assertEqual(transport._conn_lost, 2) 1269 1270 def test_sendto_ready(self): 1271 data = b'data' 1272 self.sock.sendto.return_value = len(data) 1273 1274 transport = self.datagram_transport() 1275 transport._buffer.append((data, ('0.0.0.0', 12345))) 1276 self.loop._add_writer(7, transport._sendto_ready) 1277 transport._sendto_ready() 1278 self.assertTrue(self.sock.sendto.called) 1279 self.assertEqual( 1280 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345))) 1281 self.assertFalse(self.loop.writers) 1282 1283 def test_sendto_ready_closing(self): 1284 data = b'data' 1285 self.sock.send.return_value = len(data) 1286 1287 transport = self.datagram_transport() 1288 transport._closing = True 1289 transport._buffer.append((data, ())) 1290 self.loop._add_writer(7, transport._sendto_ready) 1291 transport._sendto_ready() 1292 self.sock.sendto.assert_called_with(data, ()) 1293 self.assertFalse(self.loop.writers) 1294 self.sock.close.assert_called_with() 1295 self.protocol.connection_lost.assert_called_with(None) 1296 1297 def test_sendto_ready_no_data(self): 1298 transport = self.datagram_transport() 1299 self.loop._add_writer(7, transport._sendto_ready) 1300 transport._sendto_ready() 1301 self.assertFalse(self.sock.sendto.called) 1302 self.assertFalse(self.loop.writers) 1303 1304 def test_sendto_ready_tryagain(self): 1305 self.sock.sendto.side_effect = BlockingIOError 1306 1307 transport = self.datagram_transport() 1308 transport._buffer.extend([(b'data1', ()), (b'data2', ())]) 1309 self.loop._add_writer(7, transport._sendto_ready) 1310 transport._sendto_ready() 1311 1312 self.loop.assert_writer(7, transport._sendto_ready) 1313 self.assertEqual( 1314 [(b'data1', ()), (b'data2', ())], 1315 list(transport._buffer)) 1316 1317 def test_sendto_ready_exception(self): 1318 err = self.sock.sendto.side_effect = RuntimeError() 1319 1320 transport = self.datagram_transport() 1321 transport._fatal_error = mock.Mock() 1322 transport._buffer.append((b'data', ())) 1323 transport._sendto_ready() 1324 1325 transport._fatal_error.assert_called_with( 1326 err, 1327 'Fatal write error on datagram transport') 1328 1329 def test_sendto_ready_error_received(self): 1330 self.sock.sendto.side_effect = ConnectionRefusedError 1331 1332 transport = self.datagram_transport() 1333 transport._fatal_error = mock.Mock() 1334 transport._buffer.append((b'data', ())) 1335 transport._sendto_ready() 1336 1337 self.assertFalse(transport._fatal_error.called) 1338 1339 def test_sendto_ready_error_received_connection(self): 1340 self.sock.send.side_effect = ConnectionRefusedError 1341 1342 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1343 transport._fatal_error = mock.Mock() 1344 transport._buffer.append((b'data', ())) 1345 transport._sendto_ready() 1346 1347 self.assertFalse(transport._fatal_error.called) 1348 self.assertTrue(self.protocol.error_received.called) 1349 1350 @mock.patch('asyncio.base_events.logger.error') 1351 def test_fatal_error_connected(self, m_exc): 1352 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1353 err = ConnectionRefusedError() 1354 transport._fatal_error(err) 1355 self.assertFalse(self.protocol.error_received.called) 1356 m_exc.assert_not_called() 1357 1358 @mock.patch('asyncio.base_events.logger.error') 1359 def test_fatal_error_connected_custom_error(self, m_exc): 1360 class MyException(Exception): 1361 pass 1362 transport = self.datagram_transport(address=('0.0.0.0', 1)) 1363 err = MyException() 1364 transport._fatal_error(err) 1365 self.assertFalse(self.protocol.error_received.called) 1366 m_exc.assert_called_with( 1367 test_utils.MockPattern( 1368 'Fatal error on transport\nprotocol:.*\ntransport:.*'), 1369 exc_info=(MyException, MOCK_ANY, MOCK_ANY)) 1370 1371 1372if __name__ == '__main__': 1373 unittest.main() 1374