1"""Tests for selector_events.py"""
2
3import selectors
4import socket
5import unittest
6from unittest import mock
7try:
8    import ssl
9except ImportError:
10    ssl = None
11
12import asyncio
13from asyncio.selector_events import BaseSelectorEventLoop
14from asyncio.selector_events import _SelectorTransport
15from asyncio.selector_events import _SelectorSocketTransport
16from asyncio.selector_events import _SelectorDatagramTransport
17from test.test_asyncio import utils as test_utils
18
19
20MOCK_ANY = mock.ANY
21
22
23def tearDownModule():
24    asyncio.set_event_loop_policy(None)
25
26
27class TestBaseSelectorEventLoop(BaseSelectorEventLoop):
28
29    def _make_self_pipe(self):
30        self._ssock = mock.Mock()
31        self._csock = mock.Mock()
32        self._internal_fds += 1
33
34    def _close_self_pipe(self):
35        pass
36
37
38def list_to_buffer(l=()):
39    return bytearray().join(l)
40
41
42def close_transport(transport):
43    # Don't call transport.close() because the event loop and the selector
44    # are mocked
45    if transport._sock is None:
46        return
47    transport._sock.close()
48    transport._sock = None
49
50
51class BaseSelectorEventLoopTests(test_utils.TestCase):
52
53    def setUp(self):
54        super().setUp()
55        self.selector = mock.Mock()
56        self.selector.select.return_value = []
57        self.loop = TestBaseSelectorEventLoop(self.selector)
58        self.set_event_loop(self.loop)
59
60    def test_make_socket_transport(self):
61        m = mock.Mock()
62        self.loop.add_reader = mock.Mock()
63        self.loop.add_reader._is_coroutine = False
64        transport = self.loop._make_socket_transport(m, asyncio.Protocol())
65        self.assertIsInstance(transport, _SelectorSocketTransport)
66
67        # Calling repr() must not fail when the event loop is closed
68        self.loop.close()
69        repr(transport)
70
71        close_transport(transport)
72
73    @unittest.skipIf(ssl is None, 'No ssl module')
74    def test_make_ssl_transport(self):
75        m = mock.Mock()
76        self.loop._add_reader = mock.Mock()
77        self.loop._add_reader._is_coroutine = False
78        self.loop._add_writer = mock.Mock()
79        self.loop._remove_reader = mock.Mock()
80        self.loop._remove_writer = mock.Mock()
81        waiter = self.loop.create_future()
82        with test_utils.disable_logger():
83            transport = self.loop._make_ssl_transport(
84                m, asyncio.Protocol(), m, waiter)
85
86            with self.assertRaisesRegex(RuntimeError,
87                                        r'SSL transport.*not.*initialized'):
88                transport.is_reading()
89
90            # execute the handshake while the logger is disabled
91            # to ignore SSL handshake failure
92            test_utils.run_briefly(self.loop)
93
94        self.assertTrue(transport.is_reading())
95        transport.pause_reading()
96        transport.pause_reading()
97        self.assertFalse(transport.is_reading())
98        transport.resume_reading()
99        transport.resume_reading()
100        self.assertTrue(transport.is_reading())
101
102        # Sanity check
103        class_name = transport.__class__.__name__
104        self.assertIn("ssl", class_name.lower())
105        self.assertIn("transport", class_name.lower())
106
107        transport.close()
108        # execute pending callbacks to close the socket transport
109        test_utils.run_briefly(self.loop)
110
111    @mock.patch('asyncio.selector_events.ssl', None)
112    @mock.patch('asyncio.sslproto.ssl', None)
113    def test_make_ssl_transport_without_ssl_error(self):
114        m = mock.Mock()
115        self.loop.add_reader = mock.Mock()
116        self.loop.add_writer = mock.Mock()
117        self.loop.remove_reader = mock.Mock()
118        self.loop.remove_writer = mock.Mock()
119        with self.assertRaises(RuntimeError):
120            self.loop._make_ssl_transport(m, m, m, m)
121
122    def test_close(self):
123        class EventLoop(BaseSelectorEventLoop):
124            def _make_self_pipe(self):
125                self._ssock = mock.Mock()
126                self._csock = mock.Mock()
127                self._internal_fds += 1
128
129        self.loop = EventLoop(self.selector)
130        self.set_event_loop(self.loop)
131
132        ssock = self.loop._ssock
133        ssock.fileno.return_value = 7
134        csock = self.loop._csock
135        csock.fileno.return_value = 1
136        remove_reader = self.loop._remove_reader = mock.Mock()
137
138        self.loop._selector.close()
139        self.loop._selector = selector = mock.Mock()
140        self.assertFalse(self.loop.is_closed())
141
142        self.loop.close()
143        self.assertTrue(self.loop.is_closed())
144        self.assertIsNone(self.loop._selector)
145        self.assertIsNone(self.loop._csock)
146        self.assertIsNone(self.loop._ssock)
147        selector.close.assert_called_with()
148        ssock.close.assert_called_with()
149        csock.close.assert_called_with()
150        remove_reader.assert_called_with(7)
151
152        # it should be possible to call close() more than once
153        self.loop.close()
154        self.loop.close()
155
156        # operation blocked when the loop is closed
157        f = self.loop.create_future()
158        self.assertRaises(RuntimeError, self.loop.run_forever)
159        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
160        fd = 0
161        def callback():
162            pass
163        self.assertRaises(RuntimeError, self.loop.add_reader, fd, callback)
164        self.assertRaises(RuntimeError, self.loop.add_writer, fd, callback)
165
166    def test_close_no_selector(self):
167        self.loop.remove_reader = mock.Mock()
168        self.loop._selector.close()
169        self.loop._selector = None
170        self.loop.close()
171        self.assertIsNone(self.loop._selector)
172
173    def test_read_from_self_tryagain(self):
174        self.loop._ssock.recv.side_effect = BlockingIOError
175        self.assertIsNone(self.loop._read_from_self())
176
177    def test_read_from_self_exception(self):
178        self.loop._ssock.recv.side_effect = OSError
179        self.assertRaises(OSError, self.loop._read_from_self)
180
181    def test_write_to_self_tryagain(self):
182        self.loop._csock.send.side_effect = BlockingIOError
183        with test_utils.disable_logger():
184            self.assertIsNone(self.loop._write_to_self())
185
186    def test_write_to_self_exception(self):
187        # _write_to_self() swallows OSError
188        self.loop._csock.send.side_effect = RuntimeError()
189        self.assertRaises(RuntimeError, self.loop._write_to_self)
190
191    def test_add_reader(self):
192        self.loop._selector.get_key.side_effect = KeyError
193        cb = lambda: True
194        self.loop.add_reader(1, cb)
195
196        self.assertTrue(self.loop._selector.register.called)
197        fd, mask, (r, w) = self.loop._selector.register.call_args[0]
198        self.assertEqual(1, fd)
199        self.assertEqual(selectors.EVENT_READ, mask)
200        self.assertEqual(cb, r._callback)
201        self.assertIsNone(w)
202
203    def test_add_reader_existing(self):
204        reader = mock.Mock()
205        writer = mock.Mock()
206        self.loop._selector.get_key.return_value = selectors.SelectorKey(
207            1, 1, selectors.EVENT_WRITE, (reader, writer))
208        cb = lambda: True
209        self.loop.add_reader(1, cb)
210
211        self.assertTrue(reader.cancel.called)
212        self.assertFalse(self.loop._selector.register.called)
213        self.assertTrue(self.loop._selector.modify.called)
214        fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
215        self.assertEqual(1, fd)
216        self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
217        self.assertEqual(cb, r._callback)
218        self.assertEqual(writer, w)
219
220    def test_add_reader_existing_writer(self):
221        writer = mock.Mock()
222        self.loop._selector.get_key.return_value = selectors.SelectorKey(
223            1, 1, selectors.EVENT_WRITE, (None, writer))
224        cb = lambda: True
225        self.loop.add_reader(1, cb)
226
227        self.assertFalse(self.loop._selector.register.called)
228        self.assertTrue(self.loop._selector.modify.called)
229        fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
230        self.assertEqual(1, fd)
231        self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
232        self.assertEqual(cb, r._callback)
233        self.assertEqual(writer, w)
234
235    def test_remove_reader(self):
236        self.loop._selector.get_key.return_value = selectors.SelectorKey(
237            1, 1, selectors.EVENT_READ, (None, None))
238        self.assertFalse(self.loop.remove_reader(1))
239
240        self.assertTrue(self.loop._selector.unregister.called)
241
242    def test_remove_reader_read_write(self):
243        reader = mock.Mock()
244        writer = mock.Mock()
245        self.loop._selector.get_key.return_value = selectors.SelectorKey(
246            1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
247            (reader, writer))
248        self.assertTrue(
249            self.loop.remove_reader(1))
250
251        self.assertFalse(self.loop._selector.unregister.called)
252        self.assertEqual(
253            (1, selectors.EVENT_WRITE, (None, writer)),
254            self.loop._selector.modify.call_args[0])
255
256    def test_remove_reader_unknown(self):
257        self.loop._selector.get_key.side_effect = KeyError
258        self.assertFalse(
259            self.loop.remove_reader(1))
260
261    def test_add_writer(self):
262        self.loop._selector.get_key.side_effect = KeyError
263        cb = lambda: True
264        self.loop.add_writer(1, cb)
265
266        self.assertTrue(self.loop._selector.register.called)
267        fd, mask, (r, w) = self.loop._selector.register.call_args[0]
268        self.assertEqual(1, fd)
269        self.assertEqual(selectors.EVENT_WRITE, mask)
270        self.assertIsNone(r)
271        self.assertEqual(cb, w._callback)
272
273    def test_add_writer_existing(self):
274        reader = mock.Mock()
275        writer = mock.Mock()
276        self.loop._selector.get_key.return_value = selectors.SelectorKey(
277            1, 1, selectors.EVENT_READ, (reader, writer))
278        cb = lambda: True
279        self.loop.add_writer(1, cb)
280
281        self.assertTrue(writer.cancel.called)
282        self.assertFalse(self.loop._selector.register.called)
283        self.assertTrue(self.loop._selector.modify.called)
284        fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
285        self.assertEqual(1, fd)
286        self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
287        self.assertEqual(reader, r)
288        self.assertEqual(cb, w._callback)
289
290    def test_remove_writer(self):
291        self.loop._selector.get_key.return_value = selectors.SelectorKey(
292            1, 1, selectors.EVENT_WRITE, (None, None))
293        self.assertFalse(self.loop.remove_writer(1))
294
295        self.assertTrue(self.loop._selector.unregister.called)
296
297    def test_remove_writer_read_write(self):
298        reader = mock.Mock()
299        writer = mock.Mock()
300        self.loop._selector.get_key.return_value = selectors.SelectorKey(
301            1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
302            (reader, writer))
303        self.assertTrue(
304            self.loop.remove_writer(1))
305
306        self.assertFalse(self.loop._selector.unregister.called)
307        self.assertEqual(
308            (1, selectors.EVENT_READ, (reader, None)),
309            self.loop._selector.modify.call_args[0])
310
311    def test_remove_writer_unknown(self):
312        self.loop._selector.get_key.side_effect = KeyError
313        self.assertFalse(
314            self.loop.remove_writer(1))
315
316    def test_process_events_read(self):
317        reader = mock.Mock()
318        reader._cancelled = False
319
320        self.loop._add_callback = mock.Mock()
321        self.loop._process_events(
322            [(selectors.SelectorKey(
323                1, 1, selectors.EVENT_READ, (reader, None)),
324              selectors.EVENT_READ)])
325        self.assertTrue(self.loop._add_callback.called)
326        self.loop._add_callback.assert_called_with(reader)
327
328    def test_process_events_read_cancelled(self):
329        reader = mock.Mock()
330        reader.cancelled = True
331
332        self.loop._remove_reader = mock.Mock()
333        self.loop._process_events(
334            [(selectors.SelectorKey(
335                1, 1, selectors.EVENT_READ, (reader, None)),
336             selectors.EVENT_READ)])
337        self.loop._remove_reader.assert_called_with(1)
338
339    def test_process_events_write(self):
340        writer = mock.Mock()
341        writer._cancelled = False
342
343        self.loop._add_callback = mock.Mock()
344        self.loop._process_events(
345            [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
346                                    (None, writer)),
347              selectors.EVENT_WRITE)])
348        self.loop._add_callback.assert_called_with(writer)
349
350    def test_process_events_write_cancelled(self):
351        writer = mock.Mock()
352        writer.cancelled = True
353        self.loop._remove_writer = mock.Mock()
354
355        self.loop._process_events(
356            [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
357                                    (None, writer)),
358              selectors.EVENT_WRITE)])
359        self.loop._remove_writer.assert_called_with(1)
360
361    def test_accept_connection_multiple(self):
362        sock = mock.Mock()
363        sock.accept.return_value = (mock.Mock(), mock.Mock())
364        backlog = 100
365        # Mock the coroutine generation for a connection to prevent
366        # warnings related to un-awaited coroutines. _accept_connection2
367        # is an async function that is patched with AsyncMock. create_task
368        # creates a task out of coroutine returned by AsyncMock, so use
369        # asyncio.sleep(0) to ensure created tasks are complete to avoid
370        # task pending warnings.
371        mock_obj = mock.patch.object
372        with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
373            self.loop._accept_connection(
374                mock.Mock(), sock, backlog=backlog)
375        self.loop.run_until_complete(asyncio.sleep(0))
376        self.assertEqual(sock.accept.call_count, backlog)
377
378
379class SelectorTransportTests(test_utils.TestCase):
380
381    def setUp(self):
382        super().setUp()
383        self.loop = self.new_test_loop()
384        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
385        self.sock = mock.Mock(socket.socket)
386        self.sock.fileno.return_value = 7
387
388    def create_transport(self):
389        transport = _SelectorTransport(self.loop, self.sock, self.protocol,
390                                       None)
391        self.addCleanup(close_transport, transport)
392        return transport
393
394    def test_ctor(self):
395        tr = self.create_transport()
396        self.assertIs(tr._loop, self.loop)
397        self.assertIs(tr._sock, self.sock)
398        self.assertIs(tr._sock_fd, 7)
399
400    def test_abort(self):
401        tr = self.create_transport()
402        tr._force_close = mock.Mock()
403
404        tr.abort()
405        tr._force_close.assert_called_with(None)
406
407    def test_close(self):
408        tr = self.create_transport()
409        tr.close()
410
411        self.assertTrue(tr.is_closing())
412        self.assertEqual(1, self.loop.remove_reader_count[7])
413        self.protocol.connection_lost(None)
414        self.assertEqual(tr._conn_lost, 1)
415
416        tr.close()
417        self.assertEqual(tr._conn_lost, 1)
418        self.assertEqual(1, self.loop.remove_reader_count[7])
419
420    def test_close_write_buffer(self):
421        tr = self.create_transport()
422        tr._buffer.extend(b'data')
423        tr.close()
424
425        self.assertFalse(self.loop.readers)
426        test_utils.run_briefly(self.loop)
427        self.assertFalse(self.protocol.connection_lost.called)
428
429    def test_force_close(self):
430        tr = self.create_transport()
431        tr._buffer.extend(b'1')
432        self.loop._add_reader(7, mock.sentinel)
433        self.loop._add_writer(7, mock.sentinel)
434        tr._force_close(None)
435
436        self.assertTrue(tr.is_closing())
437        self.assertEqual(tr._buffer, list_to_buffer())
438        self.assertFalse(self.loop.readers)
439        self.assertFalse(self.loop.writers)
440
441        # second close should not remove reader
442        tr._force_close(None)
443        self.assertFalse(self.loop.readers)
444        self.assertEqual(1, self.loop.remove_reader_count[7])
445
446    @mock.patch('asyncio.log.logger.error')
447    def test_fatal_error(self, m_exc):
448        exc = OSError()
449        tr = self.create_transport()
450        tr._force_close = mock.Mock()
451        tr._fatal_error(exc)
452
453        m_exc.assert_not_called()
454
455        tr._force_close.assert_called_with(exc)
456
457    @mock.patch('asyncio.log.logger.error')
458    def test_fatal_error_custom_exception(self, m_exc):
459        class MyError(Exception):
460            pass
461        exc = MyError()
462        tr = self.create_transport()
463        tr._force_close = mock.Mock()
464        tr._fatal_error(exc)
465
466        m_exc.assert_called_with(
467            test_utils.MockPattern(
468                'Fatal error on transport\nprotocol:.*\ntransport:.*'),
469            exc_info=(MyError, MOCK_ANY, MOCK_ANY))
470
471        tr._force_close.assert_called_with(exc)
472
473    def test_connection_lost(self):
474        exc = OSError()
475        tr = self.create_transport()
476        self.assertIsNotNone(tr._protocol)
477        self.assertIsNotNone(tr._loop)
478        tr._call_connection_lost(exc)
479
480        self.protocol.connection_lost.assert_called_with(exc)
481        self.sock.close.assert_called_with()
482        self.assertIsNone(tr._sock)
483
484        self.assertIsNone(tr._protocol)
485        self.assertIsNone(tr._loop)
486
487    def test__add_reader(self):
488        tr = self.create_transport()
489        tr._buffer.extend(b'1')
490        tr._add_reader(7, mock.sentinel)
491        self.assertTrue(self.loop.readers)
492
493        tr._force_close(None)
494
495        self.assertTrue(tr.is_closing())
496        self.assertFalse(self.loop.readers)
497
498        # can not add readers after closing
499        tr._add_reader(7, mock.sentinel)
500        self.assertFalse(self.loop.readers)
501
502
503class SelectorSocketTransportTests(test_utils.TestCase):
504
505    def setUp(self):
506        super().setUp()
507        self.loop = self.new_test_loop()
508        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
509        self.sock = mock.Mock(socket.socket)
510        self.sock_fd = self.sock.fileno.return_value = 7
511
512    def socket_transport(self, waiter=None):
513        transport = _SelectorSocketTransport(self.loop, self.sock,
514                                             self.protocol, waiter=waiter)
515        self.addCleanup(close_transport, transport)
516        return transport
517
518    def test_ctor(self):
519        waiter = self.loop.create_future()
520        tr = self.socket_transport(waiter=waiter)
521        self.loop.run_until_complete(waiter)
522
523        self.loop.assert_reader(7, tr._read_ready)
524        test_utils.run_briefly(self.loop)
525        self.protocol.connection_made.assert_called_with(tr)
526
527    def test_ctor_with_waiter(self):
528        waiter = self.loop.create_future()
529        self.socket_transport(waiter=waiter)
530        self.loop.run_until_complete(waiter)
531
532        self.assertIsNone(waiter.result())
533
534    def test_pause_resume_reading(self):
535        tr = self.socket_transport()
536        test_utils.run_briefly(self.loop)
537        self.assertFalse(tr._paused)
538        self.assertTrue(tr.is_reading())
539        self.loop.assert_reader(7, tr._read_ready)
540
541        tr.pause_reading()
542        tr.pause_reading()
543        self.assertTrue(tr._paused)
544        self.assertFalse(tr.is_reading())
545        self.loop.assert_no_reader(7)
546
547        tr.resume_reading()
548        tr.resume_reading()
549        self.assertFalse(tr._paused)
550        self.assertTrue(tr.is_reading())
551        self.loop.assert_reader(7, tr._read_ready)
552
553        tr.close()
554        self.assertFalse(tr.is_reading())
555        self.loop.assert_no_reader(7)
556
557    def test_read_eof_received_error(self):
558        transport = self.socket_transport()
559        transport.close = mock.Mock()
560        transport._fatal_error = mock.Mock()
561
562        self.loop.call_exception_handler = mock.Mock()
563
564        self.protocol.eof_received.side_effect = LookupError()
565
566        self.sock.recv.return_value = b''
567        transport._read_ready()
568
569        self.protocol.eof_received.assert_called_with()
570        self.assertTrue(transport._fatal_error.called)
571
572    def test_data_received_error(self):
573        transport = self.socket_transport()
574        transport._fatal_error = mock.Mock()
575
576        self.loop.call_exception_handler = mock.Mock()
577        self.protocol.data_received.side_effect = LookupError()
578
579        self.sock.recv.return_value = b'data'
580        transport._read_ready()
581
582        self.assertTrue(transport._fatal_error.called)
583        self.assertTrue(self.protocol.data_received.called)
584
585    def test_read_ready(self):
586        transport = self.socket_transport()
587
588        self.sock.recv.return_value = b'data'
589        transport._read_ready()
590
591        self.protocol.data_received.assert_called_with(b'data')
592
593    def test_read_ready_eof(self):
594        transport = self.socket_transport()
595        transport.close = mock.Mock()
596
597        self.sock.recv.return_value = b''
598        transport._read_ready()
599
600        self.protocol.eof_received.assert_called_with()
601        transport.close.assert_called_with()
602
603    def test_read_ready_eof_keep_open(self):
604        transport = self.socket_transport()
605        transport.close = mock.Mock()
606
607        self.sock.recv.return_value = b''
608        self.protocol.eof_received.return_value = True
609        transport._read_ready()
610
611        self.protocol.eof_received.assert_called_with()
612        self.assertFalse(transport.close.called)
613
614    @mock.patch('logging.exception')
615    def test_read_ready_tryagain(self, m_exc):
616        self.sock.recv.side_effect = BlockingIOError
617
618        transport = self.socket_transport()
619        transport._fatal_error = mock.Mock()
620        transport._read_ready()
621
622        self.assertFalse(transport._fatal_error.called)
623
624    @mock.patch('logging.exception')
625    def test_read_ready_tryagain_interrupted(self, m_exc):
626        self.sock.recv.side_effect = InterruptedError
627
628        transport = self.socket_transport()
629        transport._fatal_error = mock.Mock()
630        transport._read_ready()
631
632        self.assertFalse(transport._fatal_error.called)
633
634    @mock.patch('logging.exception')
635    def test_read_ready_conn_reset(self, m_exc):
636        err = self.sock.recv.side_effect = ConnectionResetError()
637
638        transport = self.socket_transport()
639        transport._force_close = mock.Mock()
640        with test_utils.disable_logger():
641            transport._read_ready()
642        transport._force_close.assert_called_with(err)
643
644    @mock.patch('logging.exception')
645    def test_read_ready_err(self, m_exc):
646        err = self.sock.recv.side_effect = OSError()
647
648        transport = self.socket_transport()
649        transport._fatal_error = mock.Mock()
650        transport._read_ready()
651
652        transport._fatal_error.assert_called_with(
653                                   err,
654                                   'Fatal read error on socket transport')
655
656    def test_write(self):
657        data = b'data'
658        self.sock.send.return_value = len(data)
659
660        transport = self.socket_transport()
661        transport.write(data)
662        self.sock.send.assert_called_with(data)
663
664    def test_write_bytearray(self):
665        data = bytearray(b'data')
666        self.sock.send.return_value = len(data)
667
668        transport = self.socket_transport()
669        transport.write(data)
670        self.sock.send.assert_called_with(data)
671        self.assertEqual(data, bytearray(b'data'))  # Hasn't been mutated.
672
673    def test_write_memoryview(self):
674        data = memoryview(b'data')
675        self.sock.send.return_value = len(data)
676
677        transport = self.socket_transport()
678        transport.write(data)
679        self.sock.send.assert_called_with(data)
680
681    def test_write_no_data(self):
682        transport = self.socket_transport()
683        transport._buffer.extend(b'data')
684        transport.write(b'')
685        self.assertFalse(self.sock.send.called)
686        self.assertEqual(list_to_buffer([b'data']), transport._buffer)
687
688    def test_write_buffer(self):
689        transport = self.socket_transport()
690        transport._buffer.extend(b'data1')
691        transport.write(b'data2')
692        self.assertFalse(self.sock.send.called)
693        self.assertEqual(list_to_buffer([b'data1', b'data2']),
694                         transport._buffer)
695
696    def test_write_partial(self):
697        data = b'data'
698        self.sock.send.return_value = 2
699
700        transport = self.socket_transport()
701        transport.write(data)
702
703        self.loop.assert_writer(7, transport._write_ready)
704        self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
705
706    def test_write_partial_bytearray(self):
707        data = bytearray(b'data')
708        self.sock.send.return_value = 2
709
710        transport = self.socket_transport()
711        transport.write(data)
712
713        self.loop.assert_writer(7, transport._write_ready)
714        self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
715        self.assertEqual(data, bytearray(b'data'))  # Hasn't been mutated.
716
717    def test_write_partial_memoryview(self):
718        data = memoryview(b'data')
719        self.sock.send.return_value = 2
720
721        transport = self.socket_transport()
722        transport.write(data)
723
724        self.loop.assert_writer(7, transport._write_ready)
725        self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
726
727    def test_write_partial_none(self):
728        data = b'data'
729        self.sock.send.return_value = 0
730        self.sock.fileno.return_value = 7
731
732        transport = self.socket_transport()
733        transport.write(data)
734
735        self.loop.assert_writer(7, transport._write_ready)
736        self.assertEqual(list_to_buffer([b'data']), transport._buffer)
737
738    def test_write_tryagain(self):
739        self.sock.send.side_effect = BlockingIOError
740
741        data = b'data'
742        transport = self.socket_transport()
743        transport.write(data)
744
745        self.loop.assert_writer(7, transport._write_ready)
746        self.assertEqual(list_to_buffer([b'data']), transport._buffer)
747
748    @mock.patch('asyncio.selector_events.logger')
749    def test_write_exception(self, m_log):
750        err = self.sock.send.side_effect = OSError()
751
752        data = b'data'
753        transport = self.socket_transport()
754        transport._fatal_error = mock.Mock()
755        transport.write(data)
756        transport._fatal_error.assert_called_with(
757                                   err,
758                                   'Fatal write error on socket transport')
759        transport._conn_lost = 1
760
761        self.sock.reset_mock()
762        transport.write(data)
763        self.assertFalse(self.sock.send.called)
764        self.assertEqual(transport._conn_lost, 2)
765        transport.write(data)
766        transport.write(data)
767        transport.write(data)
768        transport.write(data)
769        m_log.warning.assert_called_with('socket.send() raised exception.')
770
771    def test_write_str(self):
772        transport = self.socket_transport()
773        self.assertRaises(TypeError, transport.write, 'str')
774
775    def test_write_closing(self):
776        transport = self.socket_transport()
777        transport.close()
778        self.assertEqual(transport._conn_lost, 1)
779        transport.write(b'data')
780        self.assertEqual(transport._conn_lost, 2)
781
782    def test_write_ready(self):
783        data = b'data'
784        self.sock.send.return_value = len(data)
785
786        transport = self.socket_transport()
787        transport._buffer.extend(data)
788        self.loop._add_writer(7, transport._write_ready)
789        transport._write_ready()
790        self.assertTrue(self.sock.send.called)
791        self.assertFalse(self.loop.writers)
792
793    def test_write_ready_closing(self):
794        data = b'data'
795        self.sock.send.return_value = len(data)
796
797        transport = self.socket_transport()
798        transport._closing = True
799        transport._buffer.extend(data)
800        self.loop._add_writer(7, transport._write_ready)
801        transport._write_ready()
802        self.assertTrue(self.sock.send.called)
803        self.assertFalse(self.loop.writers)
804        self.sock.close.assert_called_with()
805        self.protocol.connection_lost.assert_called_with(None)
806
807    def test_write_ready_no_data(self):
808        transport = self.socket_transport()
809        # This is an internal error.
810        self.assertRaises(AssertionError, transport._write_ready)
811
812    def test_write_ready_partial(self):
813        data = b'data'
814        self.sock.send.return_value = 2
815
816        transport = self.socket_transport()
817        transport._buffer.extend(data)
818        self.loop._add_writer(7, transport._write_ready)
819        transport._write_ready()
820        self.loop.assert_writer(7, transport._write_ready)
821        self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
822
823    def test_write_ready_partial_none(self):
824        data = b'data'
825        self.sock.send.return_value = 0
826
827        transport = self.socket_transport()
828        transport._buffer.extend(data)
829        self.loop._add_writer(7, transport._write_ready)
830        transport._write_ready()
831        self.loop.assert_writer(7, transport._write_ready)
832        self.assertEqual(list_to_buffer([b'data']), transport._buffer)
833
834    def test_write_ready_tryagain(self):
835        self.sock.send.side_effect = BlockingIOError
836
837        transport = self.socket_transport()
838        transport._buffer = list_to_buffer([b'data1', b'data2'])
839        self.loop._add_writer(7, transport._write_ready)
840        transport._write_ready()
841
842        self.loop.assert_writer(7, transport._write_ready)
843        self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer)
844
845    def test_write_ready_exception(self):
846        err = self.sock.send.side_effect = OSError()
847
848        transport = self.socket_transport()
849        transport._fatal_error = mock.Mock()
850        transport._buffer.extend(b'data')
851        transport._write_ready()
852        transport._fatal_error.assert_called_with(
853                                   err,
854                                   'Fatal write error on socket transport')
855
856    def test_write_eof(self):
857        tr = self.socket_transport()
858        self.assertTrue(tr.can_write_eof())
859        tr.write_eof()
860        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
861        tr.write_eof()
862        self.assertEqual(self.sock.shutdown.call_count, 1)
863        tr.close()
864
865    def test_write_eof_buffer(self):
866        tr = self.socket_transport()
867        self.sock.send.side_effect = BlockingIOError
868        tr.write(b'data')
869        tr.write_eof()
870        self.assertEqual(tr._buffer, list_to_buffer([b'data']))
871        self.assertTrue(tr._eof)
872        self.assertFalse(self.sock.shutdown.called)
873        self.sock.send.side_effect = lambda _: 4
874        tr._write_ready()
875        self.assertTrue(self.sock.send.called)
876        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
877        tr.close()
878
879    def test_write_eof_after_close(self):
880        tr = self.socket_transport()
881        tr.close()
882        self.loop.run_until_complete(asyncio.sleep(0))
883        tr.write_eof()
884
885    @mock.patch('asyncio.base_events.logger')
886    def test_transport_close_remove_writer(self, m_log):
887        remove_writer = self.loop._remove_writer = mock.Mock()
888
889        transport = self.socket_transport()
890        transport.close()
891        remove_writer.assert_called_with(self.sock_fd)
892
893
894class SelectorSocketTransportBufferedProtocolTests(test_utils.TestCase):
895
896    def setUp(self):
897        super().setUp()
898        self.loop = self.new_test_loop()
899
900        self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
901        self.buf = bytearray(1)
902        self.protocol.get_buffer.side_effect = lambda hint: self.buf
903
904        self.sock = mock.Mock(socket.socket)
905        self.sock_fd = self.sock.fileno.return_value = 7
906
907    def socket_transport(self, waiter=None):
908        transport = _SelectorSocketTransport(self.loop, self.sock,
909                                             self.protocol, waiter=waiter)
910        self.addCleanup(close_transport, transport)
911        return transport
912
913    def test_ctor(self):
914        waiter = self.loop.create_future()
915        tr = self.socket_transport(waiter=waiter)
916        self.loop.run_until_complete(waiter)
917
918        self.loop.assert_reader(7, tr._read_ready)
919        test_utils.run_briefly(self.loop)
920        self.protocol.connection_made.assert_called_with(tr)
921
922    def test_get_buffer_error(self):
923        transport = self.socket_transport()
924        transport._fatal_error = mock.Mock()
925
926        self.loop.call_exception_handler = mock.Mock()
927        self.protocol.get_buffer.side_effect = LookupError()
928
929        transport._read_ready()
930
931        self.assertTrue(transport._fatal_error.called)
932        self.assertTrue(self.protocol.get_buffer.called)
933        self.assertFalse(self.protocol.buffer_updated.called)
934
935    def test_get_buffer_zerosized(self):
936        transport = self.socket_transport()
937        transport._fatal_error = mock.Mock()
938
939        self.loop.call_exception_handler = mock.Mock()
940        self.protocol.get_buffer.side_effect = lambda hint: bytearray(0)
941
942        transport._read_ready()
943
944        self.assertTrue(transport._fatal_error.called)
945        self.assertTrue(self.protocol.get_buffer.called)
946        self.assertFalse(self.protocol.buffer_updated.called)
947
948    def test_proto_type_switch(self):
949        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
950        transport = self.socket_transport()
951
952        self.sock.recv.return_value = b'data'
953        transport._read_ready()
954
955        self.protocol.data_received.assert_called_with(b'data')
956
957        # switch protocol to a BufferedProtocol
958
959        buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
960        buf = bytearray(4)
961        buf_proto.get_buffer.side_effect = lambda hint: buf
962
963        transport.set_protocol(buf_proto)
964
965        self.sock.recv_into.return_value = 10
966        transport._read_ready()
967
968        buf_proto.get_buffer.assert_called_with(-1)
969        buf_proto.buffer_updated.assert_called_with(10)
970
971    def test_buffer_updated_error(self):
972        transport = self.socket_transport()
973        transport._fatal_error = mock.Mock()
974
975        self.loop.call_exception_handler = mock.Mock()
976        self.protocol.buffer_updated.side_effect = LookupError()
977
978        self.sock.recv_into.return_value = 10
979        transport._read_ready()
980
981        self.assertTrue(transport._fatal_error.called)
982        self.assertTrue(self.protocol.get_buffer.called)
983        self.assertTrue(self.protocol.buffer_updated.called)
984
985    def test_read_eof_received_error(self):
986        transport = self.socket_transport()
987        transport.close = mock.Mock()
988        transport._fatal_error = mock.Mock()
989
990        self.loop.call_exception_handler = mock.Mock()
991
992        self.protocol.eof_received.side_effect = LookupError()
993
994        self.sock.recv_into.return_value = 0
995        transport._read_ready()
996
997        self.protocol.eof_received.assert_called_with()
998        self.assertTrue(transport._fatal_error.called)
999
1000    def test_read_ready(self):
1001        transport = self.socket_transport()
1002
1003        self.sock.recv_into.return_value = 10
1004        transport._read_ready()
1005
1006        self.protocol.get_buffer.assert_called_with(-1)
1007        self.protocol.buffer_updated.assert_called_with(10)
1008
1009    def test_read_ready_eof(self):
1010        transport = self.socket_transport()
1011        transport.close = mock.Mock()
1012
1013        self.sock.recv_into.return_value = 0
1014        transport._read_ready()
1015
1016        self.protocol.eof_received.assert_called_with()
1017        transport.close.assert_called_with()
1018
1019    def test_read_ready_eof_keep_open(self):
1020        transport = self.socket_transport()
1021        transport.close = mock.Mock()
1022
1023        self.sock.recv_into.return_value = 0
1024        self.protocol.eof_received.return_value = True
1025        transport._read_ready()
1026
1027        self.protocol.eof_received.assert_called_with()
1028        self.assertFalse(transport.close.called)
1029
1030    @mock.patch('logging.exception')
1031    def test_read_ready_tryagain(self, m_exc):
1032        self.sock.recv_into.side_effect = BlockingIOError
1033
1034        transport = self.socket_transport()
1035        transport._fatal_error = mock.Mock()
1036        transport._read_ready()
1037
1038        self.assertFalse(transport._fatal_error.called)
1039
1040    @mock.patch('logging.exception')
1041    def test_read_ready_tryagain_interrupted(self, m_exc):
1042        self.sock.recv_into.side_effect = InterruptedError
1043
1044        transport = self.socket_transport()
1045        transport._fatal_error = mock.Mock()
1046        transport._read_ready()
1047
1048        self.assertFalse(transport._fatal_error.called)
1049
1050    @mock.patch('logging.exception')
1051    def test_read_ready_conn_reset(self, m_exc):
1052        err = self.sock.recv_into.side_effect = ConnectionResetError()
1053
1054        transport = self.socket_transport()
1055        transport._force_close = mock.Mock()
1056        with test_utils.disable_logger():
1057            transport._read_ready()
1058        transport._force_close.assert_called_with(err)
1059
1060    @mock.patch('logging.exception')
1061    def test_read_ready_err(self, m_exc):
1062        err = self.sock.recv_into.side_effect = OSError()
1063
1064        transport = self.socket_transport()
1065        transport._fatal_error = mock.Mock()
1066        transport._read_ready()
1067
1068        transport._fatal_error.assert_called_with(
1069                                   err,
1070                                   'Fatal read error on socket transport')
1071
1072
1073class SelectorDatagramTransportTests(test_utils.TestCase):
1074
1075    def setUp(self):
1076        super().setUp()
1077        self.loop = self.new_test_loop()
1078        self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
1079        self.sock = mock.Mock(spec_set=socket.socket)
1080        self.sock.fileno.return_value = 7
1081
1082    def datagram_transport(self, address=None):
1083        self.sock.getpeername.side_effect = None if address else OSError
1084        transport = _SelectorDatagramTransport(self.loop, self.sock,
1085                                               self.protocol,
1086                                               address=address)
1087        self.addCleanup(close_transport, transport)
1088        return transport
1089
1090    def test_read_ready(self):
1091        transport = self.datagram_transport()
1092
1093        self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234))
1094        transport._read_ready()
1095
1096        self.protocol.datagram_received.assert_called_with(
1097            b'data', ('0.0.0.0', 1234))
1098
1099    def test_read_ready_tryagain(self):
1100        transport = self.datagram_transport()
1101
1102        self.sock.recvfrom.side_effect = BlockingIOError
1103        transport._fatal_error = mock.Mock()
1104        transport._read_ready()
1105
1106        self.assertFalse(transport._fatal_error.called)
1107
1108    def test_read_ready_err(self):
1109        transport = self.datagram_transport()
1110
1111        err = self.sock.recvfrom.side_effect = RuntimeError()
1112        transport._fatal_error = mock.Mock()
1113        transport._read_ready()
1114
1115        transport._fatal_error.assert_called_with(
1116                                   err,
1117                                   'Fatal read error on datagram transport')
1118
1119    def test_read_ready_oserr(self):
1120        transport = self.datagram_transport()
1121
1122        err = self.sock.recvfrom.side_effect = OSError()
1123        transport._fatal_error = mock.Mock()
1124        transport._read_ready()
1125
1126        self.assertFalse(transport._fatal_error.called)
1127        self.protocol.error_received.assert_called_with(err)
1128
1129    def test_sendto(self):
1130        data = b'data'
1131        transport = self.datagram_transport()
1132        transport.sendto(data, ('0.0.0.0', 1234))
1133        self.assertTrue(self.sock.sendto.called)
1134        self.assertEqual(
1135            self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1136
1137    def test_sendto_bytearray(self):
1138        data = bytearray(b'data')
1139        transport = self.datagram_transport()
1140        transport.sendto(data, ('0.0.0.0', 1234))
1141        self.assertTrue(self.sock.sendto.called)
1142        self.assertEqual(
1143            self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1144
1145    def test_sendto_memoryview(self):
1146        data = memoryview(b'data')
1147        transport = self.datagram_transport()
1148        transport.sendto(data, ('0.0.0.0', 1234))
1149        self.assertTrue(self.sock.sendto.called)
1150        self.assertEqual(
1151            self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1152
1153    def test_sendto_no_data(self):
1154        transport = self.datagram_transport()
1155        transport._buffer.append((b'data', ('0.0.0.0', 12345)))
1156        transport.sendto(b'', ())
1157        self.assertFalse(self.sock.sendto.called)
1158        self.assertEqual(
1159            [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
1160
1161    def test_sendto_buffer(self):
1162        transport = self.datagram_transport()
1163        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1164        transport.sendto(b'data2', ('0.0.0.0', 12345))
1165        self.assertFalse(self.sock.sendto.called)
1166        self.assertEqual(
1167            [(b'data1', ('0.0.0.0', 12345)),
1168             (b'data2', ('0.0.0.0', 12345))],
1169            list(transport._buffer))
1170
1171    def test_sendto_buffer_bytearray(self):
1172        data2 = bytearray(b'data2')
1173        transport = self.datagram_transport()
1174        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1175        transport.sendto(data2, ('0.0.0.0', 12345))
1176        self.assertFalse(self.sock.sendto.called)
1177        self.assertEqual(
1178            [(b'data1', ('0.0.0.0', 12345)),
1179             (b'data2', ('0.0.0.0', 12345))],
1180            list(transport._buffer))
1181        self.assertIsInstance(transport._buffer[1][0], bytes)
1182
1183    def test_sendto_buffer_memoryview(self):
1184        data2 = memoryview(b'data2')
1185        transport = self.datagram_transport()
1186        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1187        transport.sendto(data2, ('0.0.0.0', 12345))
1188        self.assertFalse(self.sock.sendto.called)
1189        self.assertEqual(
1190            [(b'data1', ('0.0.0.0', 12345)),
1191             (b'data2', ('0.0.0.0', 12345))],
1192            list(transport._buffer))
1193        self.assertIsInstance(transport._buffer[1][0], bytes)
1194
1195    def test_sendto_tryagain(self):
1196        data = b'data'
1197
1198        self.sock.sendto.side_effect = BlockingIOError
1199
1200        transport = self.datagram_transport()
1201        transport.sendto(data, ('0.0.0.0', 12345))
1202
1203        self.loop.assert_writer(7, transport._sendto_ready)
1204        self.assertEqual(
1205            [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
1206
1207    @mock.patch('asyncio.selector_events.logger')
1208    def test_sendto_exception(self, m_log):
1209        data = b'data'
1210        err = self.sock.sendto.side_effect = RuntimeError()
1211
1212        transport = self.datagram_transport()
1213        transport._fatal_error = mock.Mock()
1214        transport.sendto(data, ())
1215
1216        self.assertTrue(transport._fatal_error.called)
1217        transport._fatal_error.assert_called_with(
1218                                   err,
1219                                   'Fatal write error on datagram transport')
1220        transport._conn_lost = 1
1221
1222        transport._address = ('123',)
1223        transport.sendto(data)
1224        transport.sendto(data)
1225        transport.sendto(data)
1226        transport.sendto(data)
1227        transport.sendto(data)
1228        m_log.warning.assert_called_with('socket.send() raised exception.')
1229
1230    def test_sendto_error_received(self):
1231        data = b'data'
1232
1233        self.sock.sendto.side_effect = ConnectionRefusedError
1234
1235        transport = self.datagram_transport()
1236        transport._fatal_error = mock.Mock()
1237        transport.sendto(data, ())
1238
1239        self.assertEqual(transport._conn_lost, 0)
1240        self.assertFalse(transport._fatal_error.called)
1241
1242    def test_sendto_error_received_connected(self):
1243        data = b'data'
1244
1245        self.sock.send.side_effect = ConnectionRefusedError
1246
1247        transport = self.datagram_transport(address=('0.0.0.0', 1))
1248        transport._fatal_error = mock.Mock()
1249        transport.sendto(data)
1250
1251        self.assertFalse(transport._fatal_error.called)
1252        self.assertTrue(self.protocol.error_received.called)
1253
1254    def test_sendto_str(self):
1255        transport = self.datagram_transport()
1256        self.assertRaises(TypeError, transport.sendto, 'str', ())
1257
1258    def test_sendto_connected_addr(self):
1259        transport = self.datagram_transport(address=('0.0.0.0', 1))
1260        self.assertRaises(
1261            ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
1262
1263    def test_sendto_closing(self):
1264        transport = self.datagram_transport(address=(1,))
1265        transport.close()
1266        self.assertEqual(transport._conn_lost, 1)
1267        transport.sendto(b'data', (1,))
1268        self.assertEqual(transport._conn_lost, 2)
1269
1270    def test_sendto_ready(self):
1271        data = b'data'
1272        self.sock.sendto.return_value = len(data)
1273
1274        transport = self.datagram_transport()
1275        transport._buffer.append((data, ('0.0.0.0', 12345)))
1276        self.loop._add_writer(7, transport._sendto_ready)
1277        transport._sendto_ready()
1278        self.assertTrue(self.sock.sendto.called)
1279        self.assertEqual(
1280            self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
1281        self.assertFalse(self.loop.writers)
1282
1283    def test_sendto_ready_closing(self):
1284        data = b'data'
1285        self.sock.send.return_value = len(data)
1286
1287        transport = self.datagram_transport()
1288        transport._closing = True
1289        transport._buffer.append((data, ()))
1290        self.loop._add_writer(7, transport._sendto_ready)
1291        transport._sendto_ready()
1292        self.sock.sendto.assert_called_with(data, ())
1293        self.assertFalse(self.loop.writers)
1294        self.sock.close.assert_called_with()
1295        self.protocol.connection_lost.assert_called_with(None)
1296
1297    def test_sendto_ready_no_data(self):
1298        transport = self.datagram_transport()
1299        self.loop._add_writer(7, transport._sendto_ready)
1300        transport._sendto_ready()
1301        self.assertFalse(self.sock.sendto.called)
1302        self.assertFalse(self.loop.writers)
1303
1304    def test_sendto_ready_tryagain(self):
1305        self.sock.sendto.side_effect = BlockingIOError
1306
1307        transport = self.datagram_transport()
1308        transport._buffer.extend([(b'data1', ()), (b'data2', ())])
1309        self.loop._add_writer(7, transport._sendto_ready)
1310        transport._sendto_ready()
1311
1312        self.loop.assert_writer(7, transport._sendto_ready)
1313        self.assertEqual(
1314            [(b'data1', ()), (b'data2', ())],
1315            list(transport._buffer))
1316
1317    def test_sendto_ready_exception(self):
1318        err = self.sock.sendto.side_effect = RuntimeError()
1319
1320        transport = self.datagram_transport()
1321        transport._fatal_error = mock.Mock()
1322        transport._buffer.append((b'data', ()))
1323        transport._sendto_ready()
1324
1325        transport._fatal_error.assert_called_with(
1326                                   err,
1327                                   'Fatal write error on datagram transport')
1328
1329    def test_sendto_ready_error_received(self):
1330        self.sock.sendto.side_effect = ConnectionRefusedError
1331
1332        transport = self.datagram_transport()
1333        transport._fatal_error = mock.Mock()
1334        transport._buffer.append((b'data', ()))
1335        transport._sendto_ready()
1336
1337        self.assertFalse(transport._fatal_error.called)
1338
1339    def test_sendto_ready_error_received_connection(self):
1340        self.sock.send.side_effect = ConnectionRefusedError
1341
1342        transport = self.datagram_transport(address=('0.0.0.0', 1))
1343        transport._fatal_error = mock.Mock()
1344        transport._buffer.append((b'data', ()))
1345        transport._sendto_ready()
1346
1347        self.assertFalse(transport._fatal_error.called)
1348        self.assertTrue(self.protocol.error_received.called)
1349
1350    @mock.patch('asyncio.base_events.logger.error')
1351    def test_fatal_error_connected(self, m_exc):
1352        transport = self.datagram_transport(address=('0.0.0.0', 1))
1353        err = ConnectionRefusedError()
1354        transport._fatal_error(err)
1355        self.assertFalse(self.protocol.error_received.called)
1356        m_exc.assert_not_called()
1357
1358    @mock.patch('asyncio.base_events.logger.error')
1359    def test_fatal_error_connected_custom_error(self, m_exc):
1360        class MyException(Exception):
1361            pass
1362        transport = self.datagram_transport(address=('0.0.0.0', 1))
1363        err = MyException()
1364        transport._fatal_error(err)
1365        self.assertFalse(self.protocol.error_received.called)
1366        m_exc.assert_called_with(
1367            test_utils.MockPattern(
1368                'Fatal error on transport\nprotocol:.*\ntransport:.*'),
1369            exc_info=(MyException, MOCK_ANY, MOCK_ANY))
1370
1371
1372if __name__ == '__main__':
1373    unittest.main()
1374