1"""Tests for base_events.py"""
2
3import concurrent.futures
4import errno
5import math
6import socket
7import sys
8import threading
9import time
10import unittest
11from unittest import mock
12
13import asyncio
14from asyncio import base_events
15from asyncio import constants
16from test.test_asyncio import utils as test_utils
17from test import support
18from test.support.script_helper import assert_python_ok
19from test.support import os_helper
20from test.support import socket_helper
21
22
23MOCK_ANY = mock.ANY
24PY34 = sys.version_info >= (3, 4)
25
26
27def tearDownModule():
28    asyncio.set_event_loop_policy(None)
29
30
31def mock_socket_module():
32    m_socket = mock.MagicMock(spec=socket)
33    for name in (
34        'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
35        'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
36    ):
37        if hasattr(socket, name):
38            setattr(m_socket, name, getattr(socket, name))
39        else:
40            delattr(m_socket, name)
41
42    m_socket.socket = mock.MagicMock()
43    m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
44    m_socket.getaddrinfo._is_coroutine = False
45
46    return m_socket
47
48
49def patch_socket(f):
50    return mock.patch('asyncio.base_events.socket',
51                      new_callable=mock_socket_module)(f)
52
53
54class BaseEventTests(test_utils.TestCase):
55
56    def test_ipaddr_info(self):
57        UNSPEC = socket.AF_UNSPEC
58        INET = socket.AF_INET
59        INET6 = socket.AF_INET6
60        STREAM = socket.SOCK_STREAM
61        DGRAM = socket.SOCK_DGRAM
62        TCP = socket.IPPROTO_TCP
63        UDP = socket.IPPROTO_UDP
64
65        self.assertEqual(
66            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
67            base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
68
69        self.assertEqual(
70            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
71            base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
72
73        self.assertEqual(
74            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
75            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
76
77        self.assertEqual(
78            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
79            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
80
81        # Socket type STREAM implies TCP protocol.
82        self.assertEqual(
83            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
84            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
85
86        # Socket type DGRAM implies UDP protocol.
87        self.assertEqual(
88            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
89            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
90
91        # No socket type.
92        self.assertIsNone(
93            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
94
95        if socket_helper.IPV6_ENABLED:
96            # IPv4 address with family IPv6.
97            self.assertIsNone(
98                base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
99
100            self.assertEqual(
101                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
102                base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
103
104            self.assertEqual(
105                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
106                base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
107
108            # IPv6 address with family IPv4.
109            self.assertIsNone(
110                base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
111
112            # IPv6 address with zone index.
113            self.assertIsNone(
114                base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
115
116    def test_port_parameter_types(self):
117        # Test obscure kinds of arguments for "port".
118        INET = socket.AF_INET
119        STREAM = socket.SOCK_STREAM
120        TCP = socket.IPPROTO_TCP
121
122        self.assertEqual(
123            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
124            base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
125
126        self.assertEqual(
127            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
128            base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
129
130        self.assertEqual(
131            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
132            base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
133
134        self.assertEqual(
135            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
136            base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
137
138        self.assertEqual(
139            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
140            base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
141
142    @patch_socket
143    def test_ipaddr_info_no_inet_pton(self, m_socket):
144        del m_socket.inet_pton
145        self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
146                                                   socket.AF_INET,
147                                                   socket.SOCK_STREAM,
148                                                   socket.IPPROTO_TCP))
149
150
151class BaseEventLoopTests(test_utils.TestCase):
152
153    def setUp(self):
154        super().setUp()
155        self.loop = base_events.BaseEventLoop()
156        self.loop._selector = mock.Mock()
157        self.loop._selector.select.return_value = ()
158        self.set_event_loop(self.loop)
159
160    def test_not_implemented(self):
161        m = mock.Mock()
162        self.assertRaises(
163            NotImplementedError,
164            self.loop._make_socket_transport, m, m)
165        self.assertRaises(
166            NotImplementedError,
167            self.loop._make_ssl_transport, m, m, m, m)
168        self.assertRaises(
169            NotImplementedError,
170            self.loop._make_datagram_transport, m, m)
171        self.assertRaises(
172            NotImplementedError, self.loop._process_events, [])
173        self.assertRaises(
174            NotImplementedError, self.loop._write_to_self)
175        self.assertRaises(
176            NotImplementedError,
177            self.loop._make_read_pipe_transport, m, m)
178        self.assertRaises(
179            NotImplementedError,
180            self.loop._make_write_pipe_transport, m, m)
181        gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
182        with self.assertRaises(NotImplementedError):
183            gen.send(None)
184
185    def test_close(self):
186        self.assertFalse(self.loop.is_closed())
187        self.loop.close()
188        self.assertTrue(self.loop.is_closed())
189
190        # it should be possible to call close() more than once
191        self.loop.close()
192        self.loop.close()
193
194        # operation blocked when the loop is closed
195        f = self.loop.create_future()
196        self.assertRaises(RuntimeError, self.loop.run_forever)
197        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
198
199    def test__add_callback_handle(self):
200        h = asyncio.Handle(lambda: False, (), self.loop, None)
201
202        self.loop._add_callback(h)
203        self.assertFalse(self.loop._scheduled)
204        self.assertIn(h, self.loop._ready)
205
206    def test__add_callback_cancelled_handle(self):
207        h = asyncio.Handle(lambda: False, (), self.loop, None)
208        h.cancel()
209
210        self.loop._add_callback(h)
211        self.assertFalse(self.loop._scheduled)
212        self.assertFalse(self.loop._ready)
213
214    def test_set_default_executor(self):
215        class DummyExecutor(concurrent.futures.ThreadPoolExecutor):
216            def submit(self, fn, *args, **kwargs):
217                raise NotImplementedError(
218                    'cannot submit into a dummy executor')
219
220        self.loop._process_events = mock.Mock()
221        self.loop._write_to_self = mock.Mock()
222
223        executor = DummyExecutor()
224        self.loop.set_default_executor(executor)
225        self.assertIs(executor, self.loop._default_executor)
226
227    def test_set_default_executor_error(self):
228        executor = mock.Mock()
229
230        msg = 'executor must be ThreadPoolExecutor instance'
231        with self.assertRaisesRegex(TypeError, msg):
232            self.loop.set_default_executor(executor)
233
234        self.assertIsNone(self.loop._default_executor)
235
236    def test_call_soon(self):
237        def cb():
238            pass
239
240        h = self.loop.call_soon(cb)
241        self.assertEqual(h._callback, cb)
242        self.assertIsInstance(h, asyncio.Handle)
243        self.assertIn(h, self.loop._ready)
244
245    def test_call_soon_non_callable(self):
246        self.loop.set_debug(True)
247        with self.assertRaisesRegex(TypeError, 'a callable object'):
248            self.loop.call_soon(1)
249
250    def test_call_later(self):
251        def cb():
252            pass
253
254        h = self.loop.call_later(10.0, cb)
255        self.assertIsInstance(h, asyncio.TimerHandle)
256        self.assertIn(h, self.loop._scheduled)
257        self.assertNotIn(h, self.loop._ready)
258        with self.assertRaises(TypeError, msg="delay must not be None"):
259            self.loop.call_later(None, cb)
260
261    def test_call_later_negative_delays(self):
262        calls = []
263
264        def cb(arg):
265            calls.append(arg)
266
267        self.loop._process_events = mock.Mock()
268        self.loop.call_later(-1, cb, 'a')
269        self.loop.call_later(-2, cb, 'b')
270        test_utils.run_briefly(self.loop)
271        self.assertEqual(calls, ['b', 'a'])
272
273    def test_time_and_call_at(self):
274        def cb():
275            self.loop.stop()
276
277        self.loop._process_events = mock.Mock()
278        delay = 0.1
279
280        when = self.loop.time() + delay
281        self.loop.call_at(when, cb)
282        t0 = self.loop.time()
283        self.loop.run_forever()
284        dt = self.loop.time() - t0
285
286        # 50 ms: maximum granularity of the event loop
287        self.assertGreaterEqual(dt, delay - 0.050, dt)
288        # tolerate a difference of +800 ms because some Python buildbots
289        # are really slow
290        self.assertLessEqual(dt, 0.9, dt)
291        with self.assertRaises(TypeError, msg="when cannot be None"):
292            self.loop.call_at(None, cb)
293
294    def check_thread(self, loop, debug):
295        def cb():
296            pass
297
298        loop.set_debug(debug)
299        if debug:
300            msg = ("Non-thread-safe operation invoked on an event loop other "
301                   "than the current one")
302            with self.assertRaisesRegex(RuntimeError, msg):
303                loop.call_soon(cb)
304            with self.assertRaisesRegex(RuntimeError, msg):
305                loop.call_later(60, cb)
306            with self.assertRaisesRegex(RuntimeError, msg):
307                loop.call_at(loop.time() + 60, cb)
308        else:
309            loop.call_soon(cb)
310            loop.call_later(60, cb)
311            loop.call_at(loop.time() + 60, cb)
312
313    def test_check_thread(self):
314        def check_in_thread(loop, event, debug, create_loop, fut):
315            # wait until the event loop is running
316            event.wait()
317
318            try:
319                if create_loop:
320                    loop2 = base_events.BaseEventLoop()
321                    try:
322                        asyncio.set_event_loop(loop2)
323                        self.check_thread(loop, debug)
324                    finally:
325                        asyncio.set_event_loop(None)
326                        loop2.close()
327                else:
328                    self.check_thread(loop, debug)
329            except Exception as exc:
330                loop.call_soon_threadsafe(fut.set_exception, exc)
331            else:
332                loop.call_soon_threadsafe(fut.set_result, None)
333
334        def test_thread(loop, debug, create_loop=False):
335            event = threading.Event()
336            fut = loop.create_future()
337            loop.call_soon(event.set)
338            args = (loop, event, debug, create_loop, fut)
339            thread = threading.Thread(target=check_in_thread, args=args)
340            thread.start()
341            loop.run_until_complete(fut)
342            thread.join()
343
344        self.loop._process_events = mock.Mock()
345        self.loop._write_to_self = mock.Mock()
346
347        # raise RuntimeError if the thread has no event loop
348        test_thread(self.loop, True)
349
350        # check disabled if debug mode is disabled
351        test_thread(self.loop, False)
352
353        # raise RuntimeError if the event loop of the thread is not the called
354        # event loop
355        test_thread(self.loop, True, create_loop=True)
356
357        # check disabled if debug mode is disabled
358        test_thread(self.loop, False, create_loop=True)
359
360    def test__run_once(self):
361        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
362                                 self.loop, None)
363        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
364                                 self.loop, None)
365
366        h1.cancel()
367
368        self.loop._process_events = mock.Mock()
369        self.loop._scheduled.append(h1)
370        self.loop._scheduled.append(h2)
371        self.loop._run_once()
372
373        t = self.loop._selector.select.call_args[0][0]
374        self.assertTrue(9.5 < t < 10.5, t)
375        self.assertEqual([h2], self.loop._scheduled)
376        self.assertTrue(self.loop._process_events.called)
377
378    def test_set_debug(self):
379        self.loop.set_debug(True)
380        self.assertTrue(self.loop.get_debug())
381        self.loop.set_debug(False)
382        self.assertFalse(self.loop.get_debug())
383
384    def test__run_once_schedule_handle(self):
385        handle = None
386        processed = False
387
388        def cb(loop):
389            nonlocal processed, handle
390            processed = True
391            handle = loop.call_soon(lambda: True)
392
393        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
394                                self.loop, None)
395
396        self.loop._process_events = mock.Mock()
397        self.loop._scheduled.append(h)
398        self.loop._run_once()
399
400        self.assertTrue(processed)
401        self.assertEqual([handle], list(self.loop._ready))
402
403    def test__run_once_cancelled_event_cleanup(self):
404        self.loop._process_events = mock.Mock()
405
406        self.assertTrue(
407            0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
408
409        def cb():
410            pass
411
412        # Set up one "blocking" event that will not be cancelled to
413        # ensure later cancelled events do not make it to the head
414        # of the queue and get cleaned.
415        not_cancelled_count = 1
416        self.loop.call_later(3000, cb)
417
418        # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
419        # cancelled handles, ensure they aren't removed
420
421        cancelled_count = 2
422        for x in range(2):
423            h = self.loop.call_later(3600, cb)
424            h.cancel()
425
426        # Add some cancelled events that will be at head and removed
427        cancelled_count += 2
428        for x in range(2):
429            h = self.loop.call_later(100, cb)
430            h.cancel()
431
432        # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
433        self.assertLessEqual(cancelled_count + not_cancelled_count,
434            base_events._MIN_SCHEDULED_TIMER_HANDLES)
435
436        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
437
438        self.loop._run_once()
439
440        cancelled_count -= 2
441
442        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
443
444        self.assertEqual(len(self.loop._scheduled),
445            cancelled_count + not_cancelled_count)
446
447        # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
448        # so that deletion of cancelled events will occur on next _run_once
449        add_cancel_count = int(math.ceil(
450            base_events._MIN_SCHEDULED_TIMER_HANDLES *
451            base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
452
453        add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
454            add_cancel_count, 0)
455
456        # Add some events that will not be cancelled
457        not_cancelled_count += add_not_cancel_count
458        for x in range(add_not_cancel_count):
459            self.loop.call_later(3600, cb)
460
461        # Add enough cancelled events
462        cancelled_count += add_cancel_count
463        for x in range(add_cancel_count):
464            h = self.loop.call_later(3600, cb)
465            h.cancel()
466
467        # Ensure all handles are still scheduled
468        self.assertEqual(len(self.loop._scheduled),
469            cancelled_count + not_cancelled_count)
470
471        self.loop._run_once()
472
473        # Ensure cancelled events were removed
474        self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
475
476        # Ensure only uncancelled events remain scheduled
477        self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
478
479    def test_run_until_complete_type_error(self):
480        self.assertRaises(TypeError,
481            self.loop.run_until_complete, 'blah')
482
483    def test_run_until_complete_loop(self):
484        task = self.loop.create_future()
485        other_loop = self.new_test_loop()
486        self.addCleanup(other_loop.close)
487        self.assertRaises(ValueError,
488            other_loop.run_until_complete, task)
489
490    def test_run_until_complete_loop_orphan_future_close_loop(self):
491        class ShowStopper(SystemExit):
492            pass
493
494        async def foo(delay):
495            await asyncio.sleep(delay)
496
497        def throw():
498            raise ShowStopper
499
500        self.loop._process_events = mock.Mock()
501        self.loop.call_soon(throw)
502        with self.assertRaises(ShowStopper):
503            self.loop.run_until_complete(foo(0.1))
504
505        # This call fails if run_until_complete does not clean up
506        # done-callback for the previous future.
507        self.loop.run_until_complete(foo(0.2))
508
509    def test_subprocess_exec_invalid_args(self):
510        args = [sys.executable, '-c', 'pass']
511
512        # missing program parameter (empty args)
513        self.assertRaises(TypeError,
514            self.loop.run_until_complete, self.loop.subprocess_exec,
515            asyncio.SubprocessProtocol)
516
517        # expected multiple arguments, not a list
518        self.assertRaises(TypeError,
519            self.loop.run_until_complete, self.loop.subprocess_exec,
520            asyncio.SubprocessProtocol, args)
521
522        # program arguments must be strings, not int
523        self.assertRaises(TypeError,
524            self.loop.run_until_complete, self.loop.subprocess_exec,
525            asyncio.SubprocessProtocol, sys.executable, 123)
526
527        # universal_newlines, shell, bufsize must not be set
528        self.assertRaises(TypeError,
529        self.loop.run_until_complete, self.loop.subprocess_exec,
530            asyncio.SubprocessProtocol, *args, universal_newlines=True)
531        self.assertRaises(TypeError,
532            self.loop.run_until_complete, self.loop.subprocess_exec,
533            asyncio.SubprocessProtocol, *args, shell=True)
534        self.assertRaises(TypeError,
535            self.loop.run_until_complete, self.loop.subprocess_exec,
536            asyncio.SubprocessProtocol, *args, bufsize=4096)
537
538    def test_subprocess_shell_invalid_args(self):
539        # expected a string, not an int or a list
540        self.assertRaises(TypeError,
541            self.loop.run_until_complete, self.loop.subprocess_shell,
542            asyncio.SubprocessProtocol, 123)
543        self.assertRaises(TypeError,
544            self.loop.run_until_complete, self.loop.subprocess_shell,
545            asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
546
547        # universal_newlines, shell, bufsize must not be set
548        self.assertRaises(TypeError,
549            self.loop.run_until_complete, self.loop.subprocess_shell,
550            asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
551        self.assertRaises(TypeError,
552            self.loop.run_until_complete, self.loop.subprocess_shell,
553            asyncio.SubprocessProtocol, 'exit 0', shell=True)
554        self.assertRaises(TypeError,
555            self.loop.run_until_complete, self.loop.subprocess_shell,
556            asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
557
558    def test_default_exc_handler_callback(self):
559        self.loop._process_events = mock.Mock()
560
561        def zero_error(fut):
562            fut.set_result(True)
563            1/0
564
565        # Test call_soon (events.Handle)
566        with mock.patch('asyncio.base_events.logger') as log:
567            fut = self.loop.create_future()
568            self.loop.call_soon(zero_error, fut)
569            fut.add_done_callback(lambda fut: self.loop.stop())
570            self.loop.run_forever()
571            log.error.assert_called_with(
572                test_utils.MockPattern('Exception in callback.*zero'),
573                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
574
575        # Test call_later (events.TimerHandle)
576        with mock.patch('asyncio.base_events.logger') as log:
577            fut = self.loop.create_future()
578            self.loop.call_later(0.01, zero_error, fut)
579            fut.add_done_callback(lambda fut: self.loop.stop())
580            self.loop.run_forever()
581            log.error.assert_called_with(
582                test_utils.MockPattern('Exception in callback.*zero'),
583                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
584
585    def test_default_exc_handler_coro(self):
586        self.loop._process_events = mock.Mock()
587
588        async def zero_error_coro():
589            await asyncio.sleep(0.01)
590            1/0
591
592        # Test Future.__del__
593        with mock.patch('asyncio.base_events.logger') as log:
594            fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
595            fut.add_done_callback(lambda *args: self.loop.stop())
596            self.loop.run_forever()
597            fut = None # Trigger Future.__del__ or futures._TracebackLogger
598            support.gc_collect()
599            if PY34:
600                # Future.__del__ in Python 3.4 logs error with
601                # an actual exception context
602                log.error.assert_called_with(
603                    test_utils.MockPattern('.*exception was never retrieved'),
604                    exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
605            else:
606                # futures._TracebackLogger logs only textual traceback
607                log.error.assert_called_with(
608                    test_utils.MockPattern(
609                        '.*exception was never retrieved.*ZeroDiv'),
610                    exc_info=False)
611
612    def test_set_exc_handler_invalid(self):
613        with self.assertRaisesRegex(TypeError, 'A callable object or None'):
614            self.loop.set_exception_handler('spam')
615
616    def test_set_exc_handler_custom(self):
617        def zero_error():
618            1/0
619
620        def run_loop():
621            handle = self.loop.call_soon(zero_error)
622            self.loop._run_once()
623            return handle
624
625        self.loop.set_debug(True)
626        self.loop._process_events = mock.Mock()
627
628        self.assertIsNone(self.loop.get_exception_handler())
629        mock_handler = mock.Mock()
630        self.loop.set_exception_handler(mock_handler)
631        self.assertIs(self.loop.get_exception_handler(), mock_handler)
632        handle = run_loop()
633        mock_handler.assert_called_with(self.loop, {
634            'exception': MOCK_ANY,
635            'message': test_utils.MockPattern(
636                                'Exception in callback.*zero_error'),
637            'handle': handle,
638            'source_traceback': handle._source_traceback,
639        })
640        mock_handler.reset_mock()
641
642        self.loop.set_exception_handler(None)
643        with mock.patch('asyncio.base_events.logger') as log:
644            run_loop()
645            log.error.assert_called_with(
646                        test_utils.MockPattern(
647                                'Exception in callback.*zero'),
648                        exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
649
650        assert not mock_handler.called
651
652    def test_set_exc_handler_broken(self):
653        def run_loop():
654            def zero_error():
655                1/0
656            self.loop.call_soon(zero_error)
657            self.loop._run_once()
658
659        def handler(loop, context):
660            raise AttributeError('spam')
661
662        self.loop._process_events = mock.Mock()
663
664        self.loop.set_exception_handler(handler)
665
666        with mock.patch('asyncio.base_events.logger') as log:
667            run_loop()
668            log.error.assert_called_with(
669                test_utils.MockPattern(
670                    'Unhandled error in exception handler'),
671                exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
672
673    def test_default_exc_handler_broken(self):
674        _context = None
675
676        class Loop(base_events.BaseEventLoop):
677
678            _selector = mock.Mock()
679            _process_events = mock.Mock()
680
681            def default_exception_handler(self, context):
682                nonlocal _context
683                _context = context
684                # Simulates custom buggy "default_exception_handler"
685                raise ValueError('spam')
686
687        loop = Loop()
688        self.addCleanup(loop.close)
689        asyncio.set_event_loop(loop)
690
691        def run_loop():
692            def zero_error():
693                1/0
694            loop.call_soon(zero_error)
695            loop._run_once()
696
697        with mock.patch('asyncio.base_events.logger') as log:
698            run_loop()
699            log.error.assert_called_with(
700                'Exception in default exception handler',
701                exc_info=True)
702
703        def custom_handler(loop, context):
704            raise ValueError('ham')
705
706        _context = None
707        loop.set_exception_handler(custom_handler)
708        with mock.patch('asyncio.base_events.logger') as log:
709            run_loop()
710            log.error.assert_called_with(
711                test_utils.MockPattern('Exception in default exception.*'
712                                       'while handling.*in custom'),
713                exc_info=True)
714
715            # Check that original context was passed to default
716            # exception handler.
717            self.assertIn('context', _context)
718            self.assertIs(type(_context['context']['exception']),
719                          ZeroDivisionError)
720
721    def test_set_task_factory_invalid(self):
722        with self.assertRaisesRegex(
723            TypeError, 'task factory must be a callable or None'):
724
725            self.loop.set_task_factory(1)
726
727        self.assertIsNone(self.loop.get_task_factory())
728
729    def test_set_task_factory(self):
730        self.loop._process_events = mock.Mock()
731
732        class MyTask(asyncio.Task):
733            pass
734
735        async def coro():
736            pass
737
738        factory = lambda loop, coro: MyTask(coro, loop=loop)
739
740        self.assertIsNone(self.loop.get_task_factory())
741        self.loop.set_task_factory(factory)
742        self.assertIs(self.loop.get_task_factory(), factory)
743
744        task = self.loop.create_task(coro())
745        self.assertTrue(isinstance(task, MyTask))
746        self.loop.run_until_complete(task)
747
748        self.loop.set_task_factory(None)
749        self.assertIsNone(self.loop.get_task_factory())
750
751        task = self.loop.create_task(coro())
752        self.assertTrue(isinstance(task, asyncio.Task))
753        self.assertFalse(isinstance(task, MyTask))
754        self.loop.run_until_complete(task)
755
756    def test_env_var_debug(self):
757        code = '\n'.join((
758            'import asyncio',
759            'loop = asyncio.get_event_loop()',
760            'print(loop.get_debug())'))
761
762        # Test with -E to not fail if the unit test was run with
763        # PYTHONASYNCIODEBUG set to a non-empty string
764        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
765        self.assertEqual(stdout.rstrip(), b'False')
766
767        sts, stdout, stderr = assert_python_ok('-c', code,
768                                               PYTHONASYNCIODEBUG='',
769                                               PYTHONDEVMODE='')
770        self.assertEqual(stdout.rstrip(), b'False')
771
772        sts, stdout, stderr = assert_python_ok('-c', code,
773                                               PYTHONASYNCIODEBUG='1',
774                                               PYTHONDEVMODE='')
775        self.assertEqual(stdout.rstrip(), b'True')
776
777        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
778                                               PYTHONASYNCIODEBUG='1')
779        self.assertEqual(stdout.rstrip(), b'False')
780
781        # -X dev
782        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
783                                               '-c', code)
784        self.assertEqual(stdout.rstrip(), b'True')
785
786    def test_create_task(self):
787        class MyTask(asyncio.Task):
788            pass
789
790        async def test():
791            pass
792
793        class EventLoop(base_events.BaseEventLoop):
794            def create_task(self, coro):
795                return MyTask(coro, loop=loop)
796
797        loop = EventLoop()
798        self.set_event_loop(loop)
799
800        coro = test()
801        task = asyncio.ensure_future(coro, loop=loop)
802        self.assertIsInstance(task, MyTask)
803
804        # make warnings quiet
805        task._log_destroy_pending = False
806        coro.close()
807
808    def test_create_named_task_with_default_factory(self):
809        async def test():
810            pass
811
812        loop = asyncio.new_event_loop()
813        task = loop.create_task(test(), name='test_task')
814        try:
815            self.assertEqual(task.get_name(), 'test_task')
816        finally:
817            loop.run_until_complete(task)
818            loop.close()
819
820    def test_create_named_task_with_custom_factory(self):
821        def task_factory(loop, coro):
822            return asyncio.Task(coro, loop=loop)
823
824        async def test():
825            pass
826
827        loop = asyncio.new_event_loop()
828        loop.set_task_factory(task_factory)
829        task = loop.create_task(test(), name='test_task')
830        try:
831            self.assertEqual(task.get_name(), 'test_task')
832        finally:
833            loop.run_until_complete(task)
834            loop.close()
835
836    def test_run_forever_keyboard_interrupt(self):
837        # Python issue #22601: ensure that the temporary task created by
838        # run_forever() consumes the KeyboardInterrupt and so don't log
839        # a warning
840        async def raise_keyboard_interrupt():
841            raise KeyboardInterrupt
842
843        self.loop._process_events = mock.Mock()
844        self.loop.call_exception_handler = mock.Mock()
845
846        try:
847            self.loop.run_until_complete(raise_keyboard_interrupt())
848        except KeyboardInterrupt:
849            pass
850        self.loop.close()
851        support.gc_collect()
852
853        self.assertFalse(self.loop.call_exception_handler.called)
854
855    def test_run_until_complete_baseexception(self):
856        # Python issue #22429: run_until_complete() must not schedule a pending
857        # call to stop() if the future raised a BaseException
858        async def raise_keyboard_interrupt():
859            raise KeyboardInterrupt
860
861        self.loop._process_events = mock.Mock()
862
863        try:
864            self.loop.run_until_complete(raise_keyboard_interrupt())
865        except KeyboardInterrupt:
866            pass
867
868        def func():
869            self.loop.stop()
870            func.called = True
871        func.called = False
872        try:
873            self.loop.call_soon(func)
874            self.loop.run_forever()
875        except KeyboardInterrupt:
876            pass
877        self.assertTrue(func.called)
878
879    def test_single_selecter_event_callback_after_stopping(self):
880        # Python issue #25593: A stopped event loop may cause event callbacks
881        # to run more than once.
882        event_sentinel = object()
883        callcount = 0
884        doer = None
885
886        def proc_events(event_list):
887            nonlocal doer
888            if event_sentinel in event_list:
889                doer = self.loop.call_soon(do_event)
890
891        def do_event():
892            nonlocal callcount
893            callcount += 1
894            self.loop.call_soon(clear_selector)
895
896        def clear_selector():
897            doer.cancel()
898            self.loop._selector.select.return_value = ()
899
900        self.loop._process_events = proc_events
901        self.loop._selector.select.return_value = (event_sentinel,)
902
903        for i in range(1, 3):
904            with self.subTest('Loop %d/2' % i):
905                self.loop.call_soon(self.loop.stop)
906                self.loop.run_forever()
907                self.assertEqual(callcount, 1)
908
909    def test_run_once(self):
910        # Simple test for test_utils.run_once().  It may seem strange
911        # to have a test for this (the function isn't even used!) but
912        # it's a de-factor standard API for library tests.  This tests
913        # the idiom: loop.call_soon(loop.stop); loop.run_forever().
914        count = 0
915
916        def callback():
917            nonlocal count
918            count += 1
919
920        self.loop._process_events = mock.Mock()
921        self.loop.call_soon(callback)
922        test_utils.run_once(self.loop)
923        self.assertEqual(count, 1)
924
925    def test_run_forever_pre_stopped(self):
926        # Test that the old idiom for pre-stopping the loop works.
927        self.loop._process_events = mock.Mock()
928        self.loop.stop()
929        self.loop.run_forever()
930        self.loop._selector.select.assert_called_once_with(0)
931
932    async def leave_unfinalized_asyncgen(self):
933        # Create an async generator, iterate it partially, and leave it
934        # to be garbage collected.
935        # Used in async generator finalization tests.
936        # Depends on implementation details of garbage collector. Changes
937        # in gc may break this function.
938        status = {'started': False,
939                  'stopped': False,
940                  'finalized': False}
941
942        async def agen():
943            status['started'] = True
944            try:
945                for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
946                    yield item
947            finally:
948                status['finalized'] = True
949
950        ag = agen()
951        ai = ag.__aiter__()
952
953        async def iter_one():
954            try:
955                item = await ai.__anext__()
956            except StopAsyncIteration:
957                return
958            if item == 'THREE':
959                status['stopped'] = True
960                return
961            asyncio.create_task(iter_one())
962
963        asyncio.create_task(iter_one())
964        return status
965
966    def test_asyncgen_finalization_by_gc(self):
967        # Async generators should be finalized when garbage collected.
968        self.loop._process_events = mock.Mock()
969        self.loop._write_to_self = mock.Mock()
970        with support.disable_gc():
971            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
972            while not status['stopped']:
973                test_utils.run_briefly(self.loop)
974            self.assertTrue(status['started'])
975            self.assertTrue(status['stopped'])
976            self.assertFalse(status['finalized'])
977            support.gc_collect()
978            test_utils.run_briefly(self.loop)
979            self.assertTrue(status['finalized'])
980
981    def test_asyncgen_finalization_by_gc_in_other_thread(self):
982        # Python issue 34769: If garbage collector runs in another
983        # thread, async generators will not finalize in debug
984        # mode.
985        self.loop._process_events = mock.Mock()
986        self.loop._write_to_self = mock.Mock()
987        self.loop.set_debug(True)
988        with support.disable_gc():
989            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
990            while not status['stopped']:
991                test_utils.run_briefly(self.loop)
992            self.assertTrue(status['started'])
993            self.assertTrue(status['stopped'])
994            self.assertFalse(status['finalized'])
995            self.loop.run_until_complete(
996                self.loop.run_in_executor(None, support.gc_collect))
997            test_utils.run_briefly(self.loop)
998            self.assertTrue(status['finalized'])
999
1000
1001class MyProto(asyncio.Protocol):
1002    done = None
1003
1004    def __init__(self, create_future=False):
1005        self.state = 'INITIAL'
1006        self.nbytes = 0
1007        if create_future:
1008            self.done = asyncio.get_running_loop().create_future()
1009
1010    def connection_made(self, transport):
1011        self.transport = transport
1012        assert self.state == 'INITIAL', self.state
1013        self.state = 'CONNECTED'
1014        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
1015
1016    def data_received(self, data):
1017        assert self.state == 'CONNECTED', self.state
1018        self.nbytes += len(data)
1019
1020    def eof_received(self):
1021        assert self.state == 'CONNECTED', self.state
1022        self.state = 'EOF'
1023
1024    def connection_lost(self, exc):
1025        assert self.state in ('CONNECTED', 'EOF'), self.state
1026        self.state = 'CLOSED'
1027        if self.done:
1028            self.done.set_result(None)
1029
1030
1031class MyDatagramProto(asyncio.DatagramProtocol):
1032    done = None
1033
1034    def __init__(self, create_future=False, loop=None):
1035        self.state = 'INITIAL'
1036        self.nbytes = 0
1037        if create_future:
1038            self.done = loop.create_future()
1039
1040    def connection_made(self, transport):
1041        self.transport = transport
1042        assert self.state == 'INITIAL', self.state
1043        self.state = 'INITIALIZED'
1044
1045    def datagram_received(self, data, addr):
1046        assert self.state == 'INITIALIZED', self.state
1047        self.nbytes += len(data)
1048
1049    def error_received(self, exc):
1050        assert self.state == 'INITIALIZED', self.state
1051
1052    def connection_lost(self, exc):
1053        assert self.state == 'INITIALIZED', self.state
1054        self.state = 'CLOSED'
1055        if self.done:
1056            self.done.set_result(None)
1057
1058
1059class BaseEventLoopWithSelectorTests(test_utils.TestCase):
1060
1061    def setUp(self):
1062        super().setUp()
1063        self.loop = asyncio.SelectorEventLoop()
1064        self.set_event_loop(self.loop)
1065
1066    @mock.patch('socket.getnameinfo')
1067    def test_getnameinfo(self, m_gai):
1068        m_gai.side_effect = lambda *args: 42
1069        r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123)))
1070        self.assertEqual(r, 42)
1071
1072    @patch_socket
1073    def test_create_connection_multiple_errors(self, m_socket):
1074
1075        class MyProto(asyncio.Protocol):
1076            pass
1077
1078        async def getaddrinfo(*args, **kw):
1079            return [(2, 1, 6, '', ('107.6.106.82', 80)),
1080                    (2, 1, 6, '', ('107.6.106.82', 80))]
1081
1082        def getaddrinfo_task(*args, **kwds):
1083            return self.loop.create_task(getaddrinfo(*args, **kwds))
1084
1085        idx = -1
1086        errors = ['err1', 'err2']
1087
1088        def _socket(*args, **kw):
1089            nonlocal idx, errors
1090            idx += 1
1091            raise OSError(errors[idx])
1092
1093        m_socket.socket = _socket
1094
1095        self.loop.getaddrinfo = getaddrinfo_task
1096
1097        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1098        with self.assertRaises(OSError) as cm:
1099            self.loop.run_until_complete(coro)
1100
1101        self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
1102
1103    @patch_socket
1104    def test_create_connection_timeout(self, m_socket):
1105        # Ensure that the socket is closed on timeout
1106        sock = mock.Mock()
1107        m_socket.socket.return_value = sock
1108
1109        def getaddrinfo(*args, **kw):
1110            fut = self.loop.create_future()
1111            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
1112                    ('127.0.0.1', 80))
1113            fut.set_result([addr])
1114            return fut
1115        self.loop.getaddrinfo = getaddrinfo
1116
1117        with mock.patch.object(self.loop, 'sock_connect',
1118                               side_effect=asyncio.TimeoutError):
1119            coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
1120            with self.assertRaises(asyncio.TimeoutError):
1121                self.loop.run_until_complete(coro)
1122            self.assertTrue(sock.close.called)
1123
1124    def test_create_connection_host_port_sock(self):
1125        coro = self.loop.create_connection(
1126            MyProto, 'example.com', 80, sock=object())
1127        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1128
1129    def test_create_connection_wrong_sock(self):
1130        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1131        with sock:
1132            coro = self.loop.create_connection(MyProto, sock=sock)
1133            with self.assertRaisesRegex(ValueError,
1134                                        'A Stream Socket was expected'):
1135                self.loop.run_until_complete(coro)
1136
1137    def test_create_server_wrong_sock(self):
1138        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1139        with sock:
1140            coro = self.loop.create_server(MyProto, sock=sock)
1141            with self.assertRaisesRegex(ValueError,
1142                                        'A Stream Socket was expected'):
1143                self.loop.run_until_complete(coro)
1144
1145    def test_create_server_ssl_timeout_for_plain_socket(self):
1146        coro = self.loop.create_server(
1147            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1148        with self.assertRaisesRegex(
1149                ValueError,
1150                'ssl_handshake_timeout is only meaningful with ssl'):
1151            self.loop.run_until_complete(coro)
1152
1153    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
1154                         'no socket.SOCK_NONBLOCK (linux only)')
1155    def test_create_server_stream_bittype(self):
1156        sock = socket.socket(
1157            socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
1158        with sock:
1159            coro = self.loop.create_server(lambda: None, sock=sock)
1160            srv = self.loop.run_until_complete(coro)
1161            srv.close()
1162            self.loop.run_until_complete(srv.wait_closed())
1163
1164    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
1165    def test_create_server_ipv6(self):
1166        async def main():
1167            srv = await asyncio.start_server(lambda: None, '::1', 0)
1168            try:
1169                self.assertGreater(len(srv.sockets), 0)
1170            finally:
1171                srv.close()
1172                await srv.wait_closed()
1173
1174        try:
1175            self.loop.run_until_complete(main())
1176        except OSError as ex:
1177            if (hasattr(errno, 'EADDRNOTAVAIL') and
1178                    ex.errno == errno.EADDRNOTAVAIL):
1179                self.skipTest('failed to bind to ::1')
1180            else:
1181                raise
1182
1183    def test_create_datagram_endpoint_wrong_sock(self):
1184        sock = socket.socket(socket.AF_INET)
1185        with sock:
1186            coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
1187            with self.assertRaisesRegex(ValueError,
1188                                        'A UDP Socket was expected'):
1189                self.loop.run_until_complete(coro)
1190
1191    def test_create_connection_no_host_port_sock(self):
1192        coro = self.loop.create_connection(MyProto)
1193        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1194
1195    def test_create_connection_no_getaddrinfo(self):
1196        async def getaddrinfo(*args, **kw):
1197            return []
1198
1199        def getaddrinfo_task(*args, **kwds):
1200            return self.loop.create_task(getaddrinfo(*args, **kwds))
1201
1202        self.loop.getaddrinfo = getaddrinfo_task
1203        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1204        self.assertRaises(
1205            OSError, self.loop.run_until_complete, coro)
1206
1207    def test_create_connection_connect_err(self):
1208        async def getaddrinfo(*args, **kw):
1209            return [(2, 1, 6, '', ('107.6.106.82', 80))]
1210
1211        def getaddrinfo_task(*args, **kwds):
1212            return self.loop.create_task(getaddrinfo(*args, **kwds))
1213
1214        self.loop.getaddrinfo = getaddrinfo_task
1215        self.loop.sock_connect = mock.Mock()
1216        self.loop.sock_connect.side_effect = OSError
1217
1218        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1219        self.assertRaises(
1220            OSError, self.loop.run_until_complete, coro)
1221
1222    def test_create_connection_multiple(self):
1223        async def getaddrinfo(*args, **kw):
1224            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1225                    (2, 1, 6, '', ('0.0.0.2', 80))]
1226
1227        def getaddrinfo_task(*args, **kwds):
1228            return self.loop.create_task(getaddrinfo(*args, **kwds))
1229
1230        self.loop.getaddrinfo = getaddrinfo_task
1231        self.loop.sock_connect = mock.Mock()
1232        self.loop.sock_connect.side_effect = OSError
1233
1234        coro = self.loop.create_connection(
1235            MyProto, 'example.com', 80, family=socket.AF_INET)
1236        with self.assertRaises(OSError):
1237            self.loop.run_until_complete(coro)
1238
1239    @patch_socket
1240    def test_create_connection_multiple_errors_local_addr(self, m_socket):
1241
1242        def bind(addr):
1243            if addr[0] == '0.0.0.1':
1244                err = OSError('Err')
1245                err.strerror = 'Err'
1246                raise err
1247
1248        m_socket.socket.return_value.bind = bind
1249
1250        async def getaddrinfo(*args, **kw):
1251            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1252                    (2, 1, 6, '', ('0.0.0.2', 80))]
1253
1254        def getaddrinfo_task(*args, **kwds):
1255            return self.loop.create_task(getaddrinfo(*args, **kwds))
1256
1257        self.loop.getaddrinfo = getaddrinfo_task
1258        self.loop.sock_connect = mock.Mock()
1259        self.loop.sock_connect.side_effect = OSError('Err2')
1260
1261        coro = self.loop.create_connection(
1262            MyProto, 'example.com', 80, family=socket.AF_INET,
1263            local_addr=(None, 8080))
1264        with self.assertRaises(OSError) as cm:
1265            self.loop.run_until_complete(coro)
1266
1267        self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
1268        self.assertTrue(m_socket.socket.return_value.close.called)
1269
1270    def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
1271        # Test the fallback code, even if this system has inet_pton.
1272        if not allow_inet_pton:
1273            del m_socket.inet_pton
1274
1275        m_socket.getaddrinfo = socket.getaddrinfo
1276        sock = m_socket.socket.return_value
1277
1278        self.loop._add_reader = mock.Mock()
1279        self.loop._add_reader._is_coroutine = False
1280        self.loop._add_writer = mock.Mock()
1281        self.loop._add_writer._is_coroutine = False
1282
1283        coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
1284        t, p = self.loop.run_until_complete(coro)
1285        try:
1286            sock.connect.assert_called_with(('1.2.3.4', 80))
1287            _, kwargs = m_socket.socket.call_args
1288            self.assertEqual(kwargs['family'], m_socket.AF_INET)
1289            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1290        finally:
1291            t.close()
1292            test_utils.run_briefly(self.loop)  # allow transport to close
1293
1294        if socket_helper.IPV6_ENABLED:
1295            sock.family = socket.AF_INET6
1296            coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
1297            t, p = self.loop.run_until_complete(coro)
1298            try:
1299                # Without inet_pton we use getaddrinfo, which transforms
1300                # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info,
1301                # scope id.
1302                [address] = sock.connect.call_args[0]
1303                host, port = address[:2]
1304                self.assertRegex(host, r'::(0\.)*1')
1305                self.assertEqual(port, 80)
1306                _, kwargs = m_socket.socket.call_args
1307                self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1308                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1309            finally:
1310                t.close()
1311                test_utils.run_briefly(self.loop)  # allow transport to close
1312
1313    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
1314    @unittest.skipIf(sys.platform.startswith('aix'),
1315                    "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
1316    @patch_socket
1317    def test_create_connection_ipv6_scope(self, m_socket):
1318        m_socket.getaddrinfo = socket.getaddrinfo
1319        sock = m_socket.socket.return_value
1320        sock.family = socket.AF_INET6
1321
1322        self.loop._add_reader = mock.Mock()
1323        self.loop._add_reader._is_coroutine = False
1324        self.loop._add_writer = mock.Mock()
1325        self.loop._add_writer._is_coroutine = False
1326
1327        coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80)
1328        t, p = self.loop.run_until_complete(coro)
1329        try:
1330            sock.connect.assert_called_with(('fe80::1', 80, 0, 1))
1331            _, kwargs = m_socket.socket.call_args
1332            self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1333            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1334        finally:
1335            t.close()
1336            test_utils.run_briefly(self.loop)  # allow transport to close
1337
1338    @patch_socket
1339    def test_create_connection_ip_addr(self, m_socket):
1340        self._test_create_connection_ip_addr(m_socket, True)
1341
1342    @patch_socket
1343    def test_create_connection_no_inet_pton(self, m_socket):
1344        self._test_create_connection_ip_addr(m_socket, False)
1345
1346    @patch_socket
1347    def test_create_connection_service_name(self, m_socket):
1348        m_socket.getaddrinfo = socket.getaddrinfo
1349        sock = m_socket.socket.return_value
1350
1351        self.loop._add_reader = mock.Mock()
1352        self.loop._add_reader._is_coroutine = False
1353        self.loop._add_writer = mock.Mock()
1354        self.loop._add_writer._is_coroutine = False
1355
1356        for service, port in ('http', 80), (b'http', 80):
1357            coro = self.loop.create_connection(asyncio.Protocol,
1358                                               '127.0.0.1', service)
1359
1360            t, p = self.loop.run_until_complete(coro)
1361            try:
1362                sock.connect.assert_called_with(('127.0.0.1', port))
1363                _, kwargs = m_socket.socket.call_args
1364                self.assertEqual(kwargs['family'], m_socket.AF_INET)
1365                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1366            finally:
1367                t.close()
1368                test_utils.run_briefly(self.loop)  # allow transport to close
1369
1370        for service in 'nonsense', b'nonsense':
1371            coro = self.loop.create_connection(asyncio.Protocol,
1372                                               '127.0.0.1', service)
1373
1374            with self.assertRaises(OSError):
1375                self.loop.run_until_complete(coro)
1376
1377    def test_create_connection_no_local_addr(self):
1378        async def getaddrinfo(host, *args, **kw):
1379            if host == 'example.com':
1380                return [(2, 1, 6, '', ('107.6.106.82', 80)),
1381                        (2, 1, 6, '', ('107.6.106.82', 80))]
1382            else:
1383                return []
1384
1385        def getaddrinfo_task(*args, **kwds):
1386            return self.loop.create_task(getaddrinfo(*args, **kwds))
1387        self.loop.getaddrinfo = getaddrinfo_task
1388
1389        coro = self.loop.create_connection(
1390            MyProto, 'example.com', 80, family=socket.AF_INET,
1391            local_addr=(None, 8080))
1392        self.assertRaises(
1393            OSError, self.loop.run_until_complete, coro)
1394
1395    @patch_socket
1396    def test_create_connection_bluetooth(self, m_socket):
1397        # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
1398        # we can't recognize an address is resolved, e.g. a Bluetooth address.
1399        addr = ('00:01:02:03:04:05', 1)
1400
1401        def getaddrinfo(host, port, *args, **kw):
1402            assert (host, port) == addr
1403            return [(999, 1, 999, '', (addr, 1))]
1404
1405        m_socket.getaddrinfo = getaddrinfo
1406        sock = m_socket.socket()
1407        coro = self.loop.sock_connect(sock, addr)
1408        self.loop.run_until_complete(coro)
1409
1410    def test_create_connection_ssl_server_hostname_default(self):
1411        self.loop.getaddrinfo = mock.Mock()
1412
1413        def mock_getaddrinfo(*args, **kwds):
1414            f = self.loop.create_future()
1415            f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
1416                           socket.SOL_TCP, '', ('1.2.3.4', 80))])
1417            return f
1418
1419        self.loop.getaddrinfo.side_effect = mock_getaddrinfo
1420        self.loop.sock_connect = mock.Mock()
1421        self.loop.sock_connect.return_value = self.loop.create_future()
1422        self.loop.sock_connect.return_value.set_result(None)
1423        self.loop._make_ssl_transport = mock.Mock()
1424
1425        class _SelectorTransportMock:
1426            _sock = None
1427
1428            def get_extra_info(self, key):
1429                return mock.Mock()
1430
1431            def close(self):
1432                self._sock.close()
1433
1434        def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
1435                                    **kwds):
1436            waiter.set_result(None)
1437            transport = _SelectorTransportMock()
1438            transport._sock = sock
1439            return transport
1440
1441        self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
1442        ANY = mock.ANY
1443        handshake_timeout = object()
1444        # First try the default server_hostname.
1445        self.loop._make_ssl_transport.reset_mock()
1446        coro = self.loop.create_connection(
1447                MyProto, 'python.org', 80, ssl=True,
1448                ssl_handshake_timeout=handshake_timeout)
1449        transport, _ = self.loop.run_until_complete(coro)
1450        transport.close()
1451        self.loop._make_ssl_transport.assert_called_with(
1452            ANY, ANY, ANY, ANY,
1453            server_side=False,
1454            server_hostname='python.org',
1455            ssl_handshake_timeout=handshake_timeout)
1456        # Next try an explicit server_hostname.
1457        self.loop._make_ssl_transport.reset_mock()
1458        coro = self.loop.create_connection(
1459                MyProto, 'python.org', 80, ssl=True,
1460                server_hostname='perl.com',
1461                ssl_handshake_timeout=handshake_timeout)
1462        transport, _ = self.loop.run_until_complete(coro)
1463        transport.close()
1464        self.loop._make_ssl_transport.assert_called_with(
1465            ANY, ANY, ANY, ANY,
1466            server_side=False,
1467            server_hostname='perl.com',
1468            ssl_handshake_timeout=handshake_timeout)
1469        # Finally try an explicit empty server_hostname.
1470        self.loop._make_ssl_transport.reset_mock()
1471        coro = self.loop.create_connection(
1472                MyProto, 'python.org', 80, ssl=True,
1473                server_hostname='',
1474                ssl_handshake_timeout=handshake_timeout)
1475        transport, _ = self.loop.run_until_complete(coro)
1476        transport.close()
1477        self.loop._make_ssl_transport.assert_called_with(
1478                ANY, ANY, ANY, ANY,
1479                server_side=False,
1480                server_hostname='',
1481                ssl_handshake_timeout=handshake_timeout)
1482
1483    def test_create_connection_no_ssl_server_hostname_errors(self):
1484        # When not using ssl, server_hostname must be None.
1485        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1486                                           server_hostname='')
1487        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1488        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1489                                           server_hostname='python.org')
1490        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1491
1492    def test_create_connection_ssl_server_hostname_errors(self):
1493        # When using ssl, server_hostname may be None if host is non-empty.
1494        coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
1495        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1496        coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
1497        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1498        sock = socket.socket()
1499        coro = self.loop.create_connection(MyProto, None, None,
1500                                           ssl=True, sock=sock)
1501        self.addCleanup(sock.close)
1502        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1503
1504    def test_create_connection_ssl_timeout_for_plain_socket(self):
1505        coro = self.loop.create_connection(
1506            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1507        with self.assertRaisesRegex(
1508                ValueError,
1509                'ssl_handshake_timeout is only meaningful with ssl'):
1510            self.loop.run_until_complete(coro)
1511
1512    def test_create_server_empty_host(self):
1513        # if host is empty string use None instead
1514        host = object()
1515
1516        async def getaddrinfo(*args, **kw):
1517            nonlocal host
1518            host = args[0]
1519            return []
1520
1521        def getaddrinfo_task(*args, **kwds):
1522            return self.loop.create_task(getaddrinfo(*args, **kwds))
1523
1524        self.loop.getaddrinfo = getaddrinfo_task
1525        fut = self.loop.create_server(MyProto, '', 0)
1526        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1527        self.assertIsNone(host)
1528
1529    def test_create_server_host_port_sock(self):
1530        fut = self.loop.create_server(
1531            MyProto, '0.0.0.0', 0, sock=object())
1532        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1533
1534    def test_create_server_no_host_port_sock(self):
1535        fut = self.loop.create_server(MyProto)
1536        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1537
1538    def test_create_server_no_getaddrinfo(self):
1539        getaddrinfo = self.loop.getaddrinfo = mock.Mock()
1540        getaddrinfo.return_value = self.loop.create_future()
1541        getaddrinfo.return_value.set_result(None)
1542
1543        f = self.loop.create_server(MyProto, 'python.org', 0)
1544        self.assertRaises(OSError, self.loop.run_until_complete, f)
1545
1546    @patch_socket
1547    def test_create_server_nosoreuseport(self, m_socket):
1548        m_socket.getaddrinfo = socket.getaddrinfo
1549        del m_socket.SO_REUSEPORT
1550        m_socket.socket.return_value = mock.Mock()
1551
1552        f = self.loop.create_server(
1553            MyProto, '0.0.0.0', 0, reuse_port=True)
1554
1555        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1556
1557    @patch_socket
1558    def test_create_server_soreuseport_only_defined(self, m_socket):
1559        m_socket.getaddrinfo = socket.getaddrinfo
1560        m_socket.socket.return_value = mock.Mock()
1561        m_socket.SO_REUSEPORT = -1
1562
1563        f = self.loop.create_server(
1564            MyProto, '0.0.0.0', 0, reuse_port=True)
1565
1566        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1567
1568    @patch_socket
1569    def test_create_server_cant_bind(self, m_socket):
1570
1571        class Err(OSError):
1572            strerror = 'error'
1573
1574        m_socket.getaddrinfo.return_value = [
1575            (2, 1, 6, '', ('127.0.0.1', 10100))]
1576        m_socket.getaddrinfo._is_coroutine = False
1577        m_sock = m_socket.socket.return_value = mock.Mock()
1578        m_sock.bind.side_effect = Err
1579
1580        fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
1581        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1582        self.assertTrue(m_sock.close.called)
1583
1584    @patch_socket
1585    def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
1586        m_socket.getaddrinfo.return_value = []
1587        m_socket.getaddrinfo._is_coroutine = False
1588
1589        coro = self.loop.create_datagram_endpoint(
1590            MyDatagramProto, local_addr=('localhost', 0))
1591        self.assertRaises(
1592            OSError, self.loop.run_until_complete, coro)
1593
1594    def test_create_datagram_endpoint_addr_error(self):
1595        coro = self.loop.create_datagram_endpoint(
1596            MyDatagramProto, local_addr='localhost')
1597        self.assertRaises(
1598            AssertionError, self.loop.run_until_complete, coro)
1599        coro = self.loop.create_datagram_endpoint(
1600            MyDatagramProto, local_addr=('localhost', 1, 2, 3))
1601        self.assertRaises(
1602            AssertionError, self.loop.run_until_complete, coro)
1603
1604    def test_create_datagram_endpoint_connect_err(self):
1605        self.loop.sock_connect = mock.Mock()
1606        self.loop.sock_connect.side_effect = OSError
1607
1608        coro = self.loop.create_datagram_endpoint(
1609            asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
1610        self.assertRaises(
1611            OSError, self.loop.run_until_complete, coro)
1612
1613    def test_create_datagram_endpoint_allow_broadcast(self):
1614        protocol = MyDatagramProto(create_future=True, loop=self.loop)
1615        self.loop.sock_connect = sock_connect = mock.Mock()
1616        sock_connect.return_value = []
1617
1618        coro = self.loop.create_datagram_endpoint(
1619            lambda: protocol,
1620            remote_addr=('127.0.0.1', 0),
1621            allow_broadcast=True)
1622
1623        transport, _ = self.loop.run_until_complete(coro)
1624        self.assertFalse(sock_connect.called)
1625
1626        transport.close()
1627        self.loop.run_until_complete(protocol.done)
1628        self.assertEqual('CLOSED', protocol.state)
1629
1630    @patch_socket
1631    def test_create_datagram_endpoint_socket_err(self, m_socket):
1632        m_socket.getaddrinfo = socket.getaddrinfo
1633        m_socket.socket.side_effect = OSError
1634
1635        coro = self.loop.create_datagram_endpoint(
1636            asyncio.DatagramProtocol, family=socket.AF_INET)
1637        self.assertRaises(
1638            OSError, self.loop.run_until_complete, coro)
1639
1640        coro = self.loop.create_datagram_endpoint(
1641            asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
1642        self.assertRaises(
1643            OSError, self.loop.run_until_complete, coro)
1644
1645    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1646    def test_create_datagram_endpoint_no_matching_family(self):
1647        coro = self.loop.create_datagram_endpoint(
1648            asyncio.DatagramProtocol,
1649            remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
1650        self.assertRaises(
1651            ValueError, self.loop.run_until_complete, coro)
1652
1653    @patch_socket
1654    def test_create_datagram_endpoint_setblk_err(self, m_socket):
1655        m_socket.socket.return_value.setblocking.side_effect = OSError
1656
1657        coro = self.loop.create_datagram_endpoint(
1658            asyncio.DatagramProtocol, family=socket.AF_INET)
1659        self.assertRaises(
1660            OSError, self.loop.run_until_complete, coro)
1661        self.assertTrue(
1662            m_socket.socket.return_value.close.called)
1663
1664    def test_create_datagram_endpoint_noaddr_nofamily(self):
1665        coro = self.loop.create_datagram_endpoint(
1666            asyncio.DatagramProtocol)
1667        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1668
1669    @patch_socket
1670    def test_create_datagram_endpoint_cant_bind(self, m_socket):
1671        class Err(OSError):
1672            pass
1673
1674        m_socket.getaddrinfo = socket.getaddrinfo
1675        m_sock = m_socket.socket.return_value = mock.Mock()
1676        m_sock.bind.side_effect = Err
1677
1678        fut = self.loop.create_datagram_endpoint(
1679            MyDatagramProto,
1680            local_addr=('127.0.0.1', 0), family=socket.AF_INET)
1681        self.assertRaises(Err, self.loop.run_until_complete, fut)
1682        self.assertTrue(m_sock.close.called)
1683
1684    def test_create_datagram_endpoint_sock(self):
1685        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1686        sock.bind(('127.0.0.1', 0))
1687        fut = self.loop.create_datagram_endpoint(
1688            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1689            sock=sock)
1690        transport, protocol = self.loop.run_until_complete(fut)
1691        transport.close()
1692        self.loop.run_until_complete(protocol.done)
1693        self.assertEqual('CLOSED', protocol.state)
1694
1695    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1696    def test_create_datagram_endpoint_sock_unix(self):
1697        fut = self.loop.create_datagram_endpoint(
1698            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1699            family=socket.AF_UNIX)
1700        transport, protocol = self.loop.run_until_complete(fut)
1701        assert transport._sock.family == socket.AF_UNIX
1702        transport.close()
1703        self.loop.run_until_complete(protocol.done)
1704        self.assertEqual('CLOSED', protocol.state)
1705
1706    @socket_helper.skip_unless_bind_unix_socket
1707    def test_create_datagram_endpoint_existing_sock_unix(self):
1708        with test_utils.unix_socket_path() as path:
1709            sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
1710            sock.bind(path)
1711            sock.close()
1712
1713            coro = self.loop.create_datagram_endpoint(
1714                lambda: MyDatagramProto(create_future=True, loop=self.loop),
1715                path, family=socket.AF_UNIX)
1716            transport, protocol = self.loop.run_until_complete(coro)
1717            transport.close()
1718            self.loop.run_until_complete(protocol.done)
1719
1720    def test_create_datagram_endpoint_sock_sockopts(self):
1721        class FakeSock:
1722            type = socket.SOCK_DGRAM
1723
1724        fut = self.loop.create_datagram_endpoint(
1725            MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
1726        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1727
1728        fut = self.loop.create_datagram_endpoint(
1729            MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
1730        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1731
1732        fut = self.loop.create_datagram_endpoint(
1733            MyDatagramProto, family=1, sock=FakeSock())
1734        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1735
1736        fut = self.loop.create_datagram_endpoint(
1737            MyDatagramProto, proto=1, sock=FakeSock())
1738        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1739
1740        fut = self.loop.create_datagram_endpoint(
1741            MyDatagramProto, flags=1, sock=FakeSock())
1742        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1743
1744        fut = self.loop.create_datagram_endpoint(
1745            MyDatagramProto, reuse_port=True, sock=FakeSock())
1746        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1747
1748        fut = self.loop.create_datagram_endpoint(
1749            MyDatagramProto, allow_broadcast=True, sock=FakeSock())
1750        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1751
1752    @unittest.skipIf(sys.platform == 'vxworks',
1753                    "SO_BROADCAST is enabled by default on VxWorks")
1754    def test_create_datagram_endpoint_sockopts(self):
1755        # Socket options should not be applied unless asked for.
1756        # SO_REUSEPORT is not available on all platforms.
1757
1758        coro = self.loop.create_datagram_endpoint(
1759            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1760            local_addr=('127.0.0.1', 0))
1761        transport, protocol = self.loop.run_until_complete(coro)
1762        sock = transport.get_extra_info('socket')
1763
1764        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1765
1766        if reuseport_supported:
1767            self.assertFalse(
1768                sock.getsockopt(
1769                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1770        self.assertFalse(
1771            sock.getsockopt(
1772                socket.SOL_SOCKET, socket.SO_BROADCAST))
1773
1774        transport.close()
1775        self.loop.run_until_complete(protocol.done)
1776        self.assertEqual('CLOSED', protocol.state)
1777
1778        coro = self.loop.create_datagram_endpoint(
1779            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1780            local_addr=('127.0.0.1', 0),
1781            reuse_port=reuseport_supported,
1782            allow_broadcast=True)
1783        transport, protocol = self.loop.run_until_complete(coro)
1784        sock = transport.get_extra_info('socket')
1785
1786        self.assertFalse(
1787            sock.getsockopt(
1788                socket.SOL_SOCKET, socket.SO_REUSEADDR))
1789        if reuseport_supported:
1790            self.assertTrue(
1791                sock.getsockopt(
1792                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1793        self.assertTrue(
1794            sock.getsockopt(
1795                socket.SOL_SOCKET, socket.SO_BROADCAST))
1796
1797        transport.close()
1798        self.loop.run_until_complete(protocol.done)
1799        self.assertEqual('CLOSED', protocol.state)
1800
1801    @patch_socket
1802    def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
1803        del m_socket.SO_REUSEPORT
1804        m_socket.socket.return_value = mock.Mock()
1805
1806        coro = self.loop.create_datagram_endpoint(
1807            lambda: MyDatagramProto(loop=self.loop),
1808            local_addr=('127.0.0.1', 0),
1809            reuse_port=True)
1810
1811        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1812
1813    @patch_socket
1814    def test_create_datagram_endpoint_ip_addr(self, m_socket):
1815        def getaddrinfo(*args, **kw):
1816            self.fail('should not have called getaddrinfo')
1817
1818        m_socket.getaddrinfo = getaddrinfo
1819        m_socket.socket.return_value.bind = bind = mock.Mock()
1820        self.loop._add_reader = mock.Mock()
1821        self.loop._add_reader._is_coroutine = False
1822
1823        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1824        coro = self.loop.create_datagram_endpoint(
1825            lambda: MyDatagramProto(loop=self.loop),
1826            local_addr=('1.2.3.4', 0),
1827            reuse_port=reuseport_supported)
1828
1829        t, p = self.loop.run_until_complete(coro)
1830        try:
1831            bind.assert_called_with(('1.2.3.4', 0))
1832            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
1833                                               proto=m_socket.IPPROTO_UDP,
1834                                               type=m_socket.SOCK_DGRAM)
1835        finally:
1836            t.close()
1837            test_utils.run_briefly(self.loop)  # allow transport to close
1838
1839    def test_accept_connection_retry(self):
1840        sock = mock.Mock()
1841        sock.accept.side_effect = BlockingIOError()
1842
1843        self.loop._accept_connection(MyProto, sock)
1844        self.assertFalse(sock.close.called)
1845
1846    @mock.patch('asyncio.base_events.logger')
1847    def test_accept_connection_exception(self, m_log):
1848        sock = mock.Mock()
1849        sock.fileno.return_value = 10
1850        sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
1851        self.loop._remove_reader = mock.Mock()
1852        self.loop.call_later = mock.Mock()
1853
1854        self.loop._accept_connection(MyProto, sock)
1855        self.assertTrue(m_log.error.called)
1856        self.assertFalse(sock.close.called)
1857        self.loop._remove_reader.assert_called_with(10)
1858        self.loop.call_later.assert_called_with(
1859            constants.ACCEPT_RETRY_DELAY,
1860            # self.loop._start_serving
1861            mock.ANY,
1862            MyProto, sock, None, None, mock.ANY, mock.ANY)
1863
1864    def test_call_coroutine(self):
1865        async def simple_coroutine():
1866            pass
1867
1868        self.loop.set_debug(True)
1869        coro_func = simple_coroutine
1870        coro_obj = coro_func()
1871        self.addCleanup(coro_obj.close)
1872        for func in (coro_func, coro_obj):
1873            with self.assertRaises(TypeError):
1874                self.loop.call_soon(func)
1875            with self.assertRaises(TypeError):
1876                self.loop.call_soon_threadsafe(func)
1877            with self.assertRaises(TypeError):
1878                self.loop.call_later(60, func)
1879            with self.assertRaises(TypeError):
1880                self.loop.call_at(self.loop.time() + 60, func)
1881            with self.assertRaises(TypeError):
1882                self.loop.run_until_complete(
1883                    self.loop.run_in_executor(None, func))
1884
1885    @mock.patch('asyncio.base_events.logger')
1886    def test_log_slow_callbacks(self, m_logger):
1887        def stop_loop_cb(loop):
1888            loop.stop()
1889
1890        async def stop_loop_coro(loop):
1891            loop.stop()
1892
1893        asyncio.set_event_loop(self.loop)
1894        self.loop.set_debug(True)
1895        self.loop.slow_callback_duration = 0.0
1896
1897        # slow callback
1898        self.loop.call_soon(stop_loop_cb, self.loop)
1899        self.loop.run_forever()
1900        fmt, *args = m_logger.warning.call_args[0]
1901        self.assertRegex(fmt % tuple(args),
1902                         "^Executing <Handle.*stop_loop_cb.*> "
1903                         "took .* seconds$")
1904
1905        # slow task
1906        asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
1907        self.loop.run_forever()
1908        fmt, *args = m_logger.warning.call_args[0]
1909        self.assertRegex(fmt % tuple(args),
1910                         "^Executing <Task.*stop_loop_coro.*> "
1911                         "took .* seconds$")
1912
1913
1914class RunningLoopTests(unittest.TestCase):
1915
1916    def test_running_loop_within_a_loop(self):
1917        async def runner(loop):
1918            loop.run_forever()
1919
1920        loop = asyncio.new_event_loop()
1921        outer_loop = asyncio.new_event_loop()
1922        try:
1923            with self.assertRaisesRegex(RuntimeError,
1924                                        'while another loop is running'):
1925                outer_loop.run_until_complete(runner(loop))
1926        finally:
1927            loop.close()
1928            outer_loop.close()
1929
1930
1931class BaseLoopSockSendfileTests(test_utils.TestCase):
1932
1933    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
1934
1935    class MyProto(asyncio.Protocol):
1936
1937        def __init__(self, loop):
1938            self.started = False
1939            self.closed = False
1940            self.data = bytearray()
1941            self.fut = loop.create_future()
1942            self.transport = None
1943
1944        def connection_made(self, transport):
1945            self.started = True
1946            self.transport = transport
1947
1948        def data_received(self, data):
1949            self.data.extend(data)
1950
1951        def connection_lost(self, exc):
1952            self.closed = True
1953            self.fut.set_result(None)
1954            self.transport = None
1955
1956        async def wait_closed(self):
1957            await self.fut
1958
1959    @classmethod
1960    def setUpClass(cls):
1961        cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
1962        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
1963        with open(os_helper.TESTFN, 'wb') as fp:
1964            fp.write(cls.DATA)
1965        super().setUpClass()
1966
1967    @classmethod
1968    def tearDownClass(cls):
1969        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
1970        os_helper.unlink(os_helper.TESTFN)
1971        super().tearDownClass()
1972
1973    def setUp(self):
1974        from asyncio.selector_events import BaseSelectorEventLoop
1975        # BaseSelectorEventLoop() has no native implementation
1976        self.loop = BaseSelectorEventLoop()
1977        self.set_event_loop(self.loop)
1978        self.file = open(os_helper.TESTFN, 'rb')
1979        self.addCleanup(self.file.close)
1980        super().setUp()
1981
1982    def make_socket(self, blocking=False):
1983        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1984        sock.setblocking(blocking)
1985        self.addCleanup(sock.close)
1986        return sock
1987
1988    def run_loop(self, coro):
1989        return self.loop.run_until_complete(coro)
1990
1991    def prepare(self):
1992        sock = self.make_socket()
1993        proto = self.MyProto(self.loop)
1994        server = self.run_loop(self.loop.create_server(
1995            lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET))
1996        addr = server.sockets[0].getsockname()
1997
1998        for _ in range(10):
1999            try:
2000                self.run_loop(self.loop.sock_connect(sock, addr))
2001            except OSError:
2002                self.run_loop(asyncio.sleep(0.5))
2003                continue
2004            else:
2005                break
2006        else:
2007            # One last try, so we get the exception
2008            self.run_loop(self.loop.sock_connect(sock, addr))
2009
2010        def cleanup():
2011            server.close()
2012            self.run_loop(server.wait_closed())
2013            sock.close()
2014            if proto.transport is not None:
2015                proto.transport.close()
2016                self.run_loop(proto.wait_closed())
2017
2018        self.addCleanup(cleanup)
2019
2020        return sock, proto
2021
2022    def test__sock_sendfile_native_failure(self):
2023        sock, proto = self.prepare()
2024
2025        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2026                                    "sendfile is not available"):
2027            self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
2028                                                          0, None))
2029
2030        self.assertEqual(proto.data, b'')
2031        self.assertEqual(self.file.tell(), 0)
2032
2033    def test_sock_sendfile_no_fallback(self):
2034        sock, proto = self.prepare()
2035
2036        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2037                                    "sendfile is not available"):
2038            self.run_loop(self.loop.sock_sendfile(sock, self.file,
2039                                                  fallback=False))
2040
2041        self.assertEqual(self.file.tell(), 0)
2042        self.assertEqual(proto.data, b'')
2043
2044    def test_sock_sendfile_fallback(self):
2045        sock, proto = self.prepare()
2046
2047        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
2048        sock.close()
2049        self.run_loop(proto.wait_closed())
2050
2051        self.assertEqual(ret, len(self.DATA))
2052        self.assertEqual(self.file.tell(), len(self.DATA))
2053        self.assertEqual(proto.data, self.DATA)
2054
2055    def test_sock_sendfile_fallback_offset_and_count(self):
2056        sock, proto = self.prepare()
2057
2058        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
2059                                                    1000, 2000))
2060        sock.close()
2061        self.run_loop(proto.wait_closed())
2062
2063        self.assertEqual(ret, 2000)
2064        self.assertEqual(self.file.tell(), 3000)
2065        self.assertEqual(proto.data, self.DATA[1000:3000])
2066
2067    def test_blocking_socket(self):
2068        self.loop.set_debug(True)
2069        sock = self.make_socket(blocking=True)
2070        with self.assertRaisesRegex(ValueError, "must be non-blocking"):
2071            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2072
2073    def test_nonbinary_file(self):
2074        sock = self.make_socket()
2075        with open(os_helper.TESTFN, encoding="utf-8") as f:
2076            with self.assertRaisesRegex(ValueError, "binary mode"):
2077                self.run_loop(self.loop.sock_sendfile(sock, f))
2078
2079    def test_nonstream_socket(self):
2080        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2081        sock.setblocking(False)
2082        self.addCleanup(sock.close)
2083        with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
2084            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2085
2086    def test_notint_count(self):
2087        sock = self.make_socket()
2088        with self.assertRaisesRegex(TypeError,
2089                                    "count must be a positive integer"):
2090            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
2091
2092    def test_negative_count(self):
2093        sock = self.make_socket()
2094        with self.assertRaisesRegex(ValueError,
2095                                    "count must be a positive integer"):
2096            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
2097
2098    def test_notint_offset(self):
2099        sock = self.make_socket()
2100        with self.assertRaisesRegex(TypeError,
2101                                    "offset must be a non-negative integer"):
2102            self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
2103
2104    def test_negative_offset(self):
2105        sock = self.make_socket()
2106        with self.assertRaisesRegex(ValueError,
2107                                    "offset must be a non-negative integer"):
2108            self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
2109
2110
2111class TestSelectorUtils(test_utils.TestCase):
2112    def check_set_nodelay(self, sock):
2113        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2114        self.assertFalse(opt)
2115
2116        base_events._set_nodelay(sock)
2117
2118        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2119        self.assertTrue(opt)
2120
2121    @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'),
2122                         'need socket.TCP_NODELAY')
2123    def test_set_nodelay(self):
2124        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2125                             proto=socket.IPPROTO_TCP)
2126        with sock:
2127            self.check_set_nodelay(sock)
2128
2129        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2130                             proto=socket.IPPROTO_TCP)
2131        with sock:
2132            sock.setblocking(False)
2133            self.check_set_nodelay(sock)
2134
2135
2136
2137if __name__ == '__main__':
2138    unittest.main()
2139