1"""Tests for events.py."""
2
3import collections.abc
4import concurrent.futures
5import functools
6import io
7import os
8import platform
9import re
10import signal
11import socket
12try:
13    import ssl
14except ImportError:
15    ssl = None
16import subprocess
17import sys
18import threading
19import time
20import errno
21import unittest
22from unittest import mock
23import weakref
24
25if sys.platform != 'win32':
26    import tty
27
28import asyncio
29from asyncio import coroutines
30from asyncio import events
31from asyncio import proactor_events
32from asyncio import selector_events
33from test.test_asyncio import utils as test_utils
34from test import support
35from test.support import socket_helper
36from test.support import ALWAYS_EQ, LARGEST, SMALLEST
37
38
39def tearDownModule():
40    asyncio.set_event_loop_policy(None)
41
42
43def broken_unix_getsockname():
44    """Return True if the platform is Mac OS 10.4 or older."""
45    if sys.platform.startswith("aix"):
46        return True
47    elif sys.platform != 'darwin':
48        return False
49    version = platform.mac_ver()[0]
50    version = tuple(map(int, version.split('.')))
51    return version < (10, 5)
52
53
54def _test_get_event_loop_new_process__sub_proc():
55    async def doit():
56        return 'hello'
57
58    loop = asyncio.new_event_loop()
59    asyncio.set_event_loop(loop)
60    return loop.run_until_complete(doit())
61
62
63class CoroLike:
64    def send(self, v):
65        pass
66
67    def throw(self, *exc):
68        pass
69
70    def close(self):
71        pass
72
73    def __await__(self):
74        pass
75
76
77class MyBaseProto(asyncio.Protocol):
78    connected = None
79    done = None
80
81    def __init__(self, loop=None):
82        self.transport = None
83        self.state = 'INITIAL'
84        self.nbytes = 0
85        if loop is not None:
86            self.connected = loop.create_future()
87            self.done = loop.create_future()
88
89    def connection_made(self, transport):
90        self.transport = transport
91        assert self.state == 'INITIAL', self.state
92        self.state = 'CONNECTED'
93        if self.connected:
94            self.connected.set_result(None)
95
96    def data_received(self, data):
97        assert self.state == 'CONNECTED', self.state
98        self.nbytes += len(data)
99
100    def eof_received(self):
101        assert self.state == 'CONNECTED', self.state
102        self.state = 'EOF'
103
104    def connection_lost(self, exc):
105        assert self.state in ('CONNECTED', 'EOF'), self.state
106        self.state = 'CLOSED'
107        if self.done:
108            self.done.set_result(None)
109
110
111class MyProto(MyBaseProto):
112    def connection_made(self, transport):
113        super().connection_made(transport)
114        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
115
116
117class MyDatagramProto(asyncio.DatagramProtocol):
118    done = None
119
120    def __init__(self, loop=None):
121        self.state = 'INITIAL'
122        self.nbytes = 0
123        if loop is not None:
124            self.done = loop.create_future()
125
126    def connection_made(self, transport):
127        self.transport = transport
128        assert self.state == 'INITIAL', self.state
129        self.state = 'INITIALIZED'
130
131    def datagram_received(self, data, addr):
132        assert self.state == 'INITIALIZED', self.state
133        self.nbytes += len(data)
134
135    def error_received(self, exc):
136        assert self.state == 'INITIALIZED', self.state
137
138    def connection_lost(self, exc):
139        assert self.state == 'INITIALIZED', self.state
140        self.state = 'CLOSED'
141        if self.done:
142            self.done.set_result(None)
143
144
145class MyReadPipeProto(asyncio.Protocol):
146    done = None
147
148    def __init__(self, loop=None):
149        self.state = ['INITIAL']
150        self.nbytes = 0
151        self.transport = None
152        if loop is not None:
153            self.done = loop.create_future()
154
155    def connection_made(self, transport):
156        self.transport = transport
157        assert self.state == ['INITIAL'], self.state
158        self.state.append('CONNECTED')
159
160    def data_received(self, data):
161        assert self.state == ['INITIAL', 'CONNECTED'], self.state
162        self.nbytes += len(data)
163
164    def eof_received(self):
165        assert self.state == ['INITIAL', 'CONNECTED'], self.state
166        self.state.append('EOF')
167
168    def connection_lost(self, exc):
169        if 'EOF' not in self.state:
170            self.state.append('EOF')  # It is okay if EOF is missed.
171        assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state
172        self.state.append('CLOSED')
173        if self.done:
174            self.done.set_result(None)
175
176
177class MyWritePipeProto(asyncio.BaseProtocol):
178    done = None
179
180    def __init__(self, loop=None):
181        self.state = 'INITIAL'
182        self.transport = None
183        if loop is not None:
184            self.done = loop.create_future()
185
186    def connection_made(self, transport):
187        self.transport = transport
188        assert self.state == 'INITIAL', self.state
189        self.state = 'CONNECTED'
190
191    def connection_lost(self, exc):
192        assert self.state == 'CONNECTED', self.state
193        self.state = 'CLOSED'
194        if self.done:
195            self.done.set_result(None)
196
197
198class MySubprocessProtocol(asyncio.SubprocessProtocol):
199
200    def __init__(self, loop):
201        self.state = 'INITIAL'
202        self.transport = None
203        self.connected = loop.create_future()
204        self.completed = loop.create_future()
205        self.disconnects = {fd: loop.create_future() for fd in range(3)}
206        self.data = {1: b'', 2: b''}
207        self.returncode = None
208        self.got_data = {1: asyncio.Event(),
209                         2: asyncio.Event()}
210
211    def connection_made(self, transport):
212        self.transport = transport
213        assert self.state == 'INITIAL', self.state
214        self.state = 'CONNECTED'
215        self.connected.set_result(None)
216
217    def connection_lost(self, exc):
218        assert self.state == 'CONNECTED', self.state
219        self.state = 'CLOSED'
220        self.completed.set_result(None)
221
222    def pipe_data_received(self, fd, data):
223        assert self.state == 'CONNECTED', self.state
224        self.data[fd] += data
225        self.got_data[fd].set()
226
227    def pipe_connection_lost(self, fd, exc):
228        assert self.state == 'CONNECTED', self.state
229        if exc:
230            self.disconnects[fd].set_exception(exc)
231        else:
232            self.disconnects[fd].set_result(exc)
233
234    def process_exited(self):
235        assert self.state == 'CONNECTED', self.state
236        self.returncode = self.transport.get_returncode()
237
238
239class EventLoopTestsMixin:
240
241    def setUp(self):
242        super().setUp()
243        self.loop = self.create_event_loop()
244        self.set_event_loop(self.loop)
245
246    def tearDown(self):
247        # just in case if we have transport close callbacks
248        if not self.loop.is_closed():
249            test_utils.run_briefly(self.loop)
250
251        self.doCleanups()
252        support.gc_collect()
253        super().tearDown()
254
255    def test_run_until_complete_nesting(self):
256        async def coro1():
257            await asyncio.sleep(0)
258
259        async def coro2():
260            self.assertTrue(self.loop.is_running())
261            self.loop.run_until_complete(coro1())
262
263        with self.assertWarnsRegex(
264            RuntimeWarning,
265            r"coroutine \S+ was never awaited"
266        ):
267            self.assertRaises(
268                RuntimeError, self.loop.run_until_complete, coro2())
269
270    # Note: because of the default Windows timing granularity of
271    # 15.6 msec, we use fairly long sleep times here (~100 msec).
272
273    def test_run_until_complete(self):
274        t0 = self.loop.time()
275        self.loop.run_until_complete(asyncio.sleep(0.1))
276        t1 = self.loop.time()
277        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
278
279    def test_run_until_complete_stopped(self):
280
281        async def cb():
282            self.loop.stop()
283            await asyncio.sleep(0.1)
284        task = cb()
285        self.assertRaises(RuntimeError,
286                          self.loop.run_until_complete, task)
287
288    def test_call_later(self):
289        results = []
290
291        def callback(arg):
292            results.append(arg)
293            self.loop.stop()
294
295        self.loop.call_later(0.1, callback, 'hello world')
296        self.loop.run_forever()
297        self.assertEqual(results, ['hello world'])
298
299    def test_call_soon(self):
300        results = []
301
302        def callback(arg1, arg2):
303            results.append((arg1, arg2))
304            self.loop.stop()
305
306        self.loop.call_soon(callback, 'hello', 'world')
307        self.loop.run_forever()
308        self.assertEqual(results, [('hello', 'world')])
309
310    def test_call_soon_threadsafe(self):
311        results = []
312        lock = threading.Lock()
313
314        def callback(arg):
315            results.append(arg)
316            if len(results) >= 2:
317                self.loop.stop()
318
319        def run_in_thread():
320            self.loop.call_soon_threadsafe(callback, 'hello')
321            lock.release()
322
323        lock.acquire()
324        t = threading.Thread(target=run_in_thread)
325        t.start()
326
327        with lock:
328            self.loop.call_soon(callback, 'world')
329            self.loop.run_forever()
330        t.join()
331        self.assertEqual(results, ['hello', 'world'])
332
333    def test_call_soon_threadsafe_same_thread(self):
334        results = []
335
336        def callback(arg):
337            results.append(arg)
338            if len(results) >= 2:
339                self.loop.stop()
340
341        self.loop.call_soon_threadsafe(callback, 'hello')
342        self.loop.call_soon(callback, 'world')
343        self.loop.run_forever()
344        self.assertEqual(results, ['hello', 'world'])
345
346    def test_run_in_executor(self):
347        def run(arg):
348            return (arg, threading.get_ident())
349        f2 = self.loop.run_in_executor(None, run, 'yo')
350        res, thread_id = self.loop.run_until_complete(f2)
351        self.assertEqual(res, 'yo')
352        self.assertNotEqual(thread_id, threading.get_ident())
353
354    def test_run_in_executor_cancel(self):
355        called = False
356
357        def patched_call_soon(*args):
358            nonlocal called
359            called = True
360
361        def run():
362            time.sleep(0.05)
363
364        f2 = self.loop.run_in_executor(None, run)
365        f2.cancel()
366        self.loop.run_until_complete(
367                self.loop.shutdown_default_executor())
368        self.loop.close()
369        self.loop.call_soon = patched_call_soon
370        self.loop.call_soon_threadsafe = patched_call_soon
371        time.sleep(0.4)
372        self.assertFalse(called)
373
374    def test_reader_callback(self):
375        r, w = socket.socketpair()
376        r.setblocking(False)
377        bytes_read = bytearray()
378
379        def reader():
380            try:
381                data = r.recv(1024)
382            except BlockingIOError:
383                # Spurious readiness notifications are possible
384                # at least on Linux -- see man select.
385                return
386            if data:
387                bytes_read.extend(data)
388            else:
389                self.assertTrue(self.loop.remove_reader(r.fileno()))
390                r.close()
391
392        self.loop.add_reader(r.fileno(), reader)
393        self.loop.call_soon(w.send, b'abc')
394        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
395        self.loop.call_soon(w.send, b'def')
396        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
397        self.loop.call_soon(w.close)
398        self.loop.call_soon(self.loop.stop)
399        self.loop.run_forever()
400        self.assertEqual(bytes_read, b'abcdef')
401
402    def test_writer_callback(self):
403        r, w = socket.socketpair()
404        w.setblocking(False)
405
406        def writer(data):
407            w.send(data)
408            self.loop.stop()
409
410        data = b'x' * 1024
411        self.loop.add_writer(w.fileno(), writer, data)
412        self.loop.run_forever()
413
414        self.assertTrue(self.loop.remove_writer(w.fileno()))
415        self.assertFalse(self.loop.remove_writer(w.fileno()))
416
417        w.close()
418        read = r.recv(len(data) * 2)
419        r.close()
420        self.assertEqual(read, data)
421
422    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
423    def test_add_signal_handler(self):
424        caught = 0
425
426        def my_handler():
427            nonlocal caught
428            caught += 1
429
430        # Check error behavior first.
431        self.assertRaises(
432            TypeError, self.loop.add_signal_handler, 'boom', my_handler)
433        self.assertRaises(
434            TypeError, self.loop.remove_signal_handler, 'boom')
435        self.assertRaises(
436            ValueError, self.loop.add_signal_handler, signal.NSIG+1,
437            my_handler)
438        self.assertRaises(
439            ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
440        self.assertRaises(
441            ValueError, self.loop.add_signal_handler, 0, my_handler)
442        self.assertRaises(
443            ValueError, self.loop.remove_signal_handler, 0)
444        self.assertRaises(
445            ValueError, self.loop.add_signal_handler, -1, my_handler)
446        self.assertRaises(
447            ValueError, self.loop.remove_signal_handler, -1)
448        self.assertRaises(
449            RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
450            my_handler)
451        # Removing SIGKILL doesn't raise, since we don't call signal().
452        self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
453        # Now set a handler and handle it.
454        self.loop.add_signal_handler(signal.SIGINT, my_handler)
455
456        os.kill(os.getpid(), signal.SIGINT)
457        test_utils.run_until(self.loop, lambda: caught)
458
459        # Removing it should restore the default handler.
460        self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
461        self.assertEqual(signal.getsignal(signal.SIGINT),
462                         signal.default_int_handler)
463        # Removing again returns False.
464        self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
465
466    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
467    def test_signal_handling_while_selecting(self):
468        # Test with a signal actually arriving during a select() call.
469        caught = 0
470
471        def my_handler():
472            nonlocal caught
473            caught += 1
474            self.loop.stop()
475
476        self.loop.add_signal_handler(signal.SIGALRM, my_handler)
477
478        signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
479        self.loop.call_later(60, self.loop.stop)
480        self.loop.run_forever()
481        self.assertEqual(caught, 1)
482
483    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
484    def test_signal_handling_args(self):
485        some_args = (42,)
486        caught = 0
487
488        def my_handler(*args):
489            nonlocal caught
490            caught += 1
491            self.assertEqual(args, some_args)
492            self.loop.stop()
493
494        self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
495
496        signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
497        self.loop.call_later(60, self.loop.stop)
498        self.loop.run_forever()
499        self.assertEqual(caught, 1)
500
501    def _basetest_create_connection(self, connection_fut, check_sockname=True):
502        tr, pr = self.loop.run_until_complete(connection_fut)
503        self.assertIsInstance(tr, asyncio.Transport)
504        self.assertIsInstance(pr, asyncio.Protocol)
505        self.assertIs(pr.transport, tr)
506        if check_sockname:
507            self.assertIsNotNone(tr.get_extra_info('sockname'))
508        self.loop.run_until_complete(pr.done)
509        self.assertGreater(pr.nbytes, 0)
510        tr.close()
511
512    def test_create_connection(self):
513        with test_utils.run_test_server() as httpd:
514            conn_fut = self.loop.create_connection(
515                lambda: MyProto(loop=self.loop), *httpd.address)
516            self._basetest_create_connection(conn_fut)
517
518    @socket_helper.skip_unless_bind_unix_socket
519    def test_create_unix_connection(self):
520        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
521        # zero-length address for UNIX socket.
522        check_sockname = not broken_unix_getsockname()
523
524        with test_utils.run_test_unix_server() as httpd:
525            conn_fut = self.loop.create_unix_connection(
526                lambda: MyProto(loop=self.loop), httpd.address)
527            self._basetest_create_connection(conn_fut, check_sockname)
528
529    def check_ssl_extra_info(self, client, check_sockname=True,
530                             peername=None, peercert={}):
531        if check_sockname:
532            self.assertIsNotNone(client.get_extra_info('sockname'))
533        if peername:
534            self.assertEqual(peername,
535                             client.get_extra_info('peername'))
536        else:
537            self.assertIsNotNone(client.get_extra_info('peername'))
538        self.assertEqual(peercert,
539                         client.get_extra_info('peercert'))
540
541        # test SSL cipher
542        cipher = client.get_extra_info('cipher')
543        self.assertIsInstance(cipher, tuple)
544        self.assertEqual(len(cipher), 3, cipher)
545        self.assertIsInstance(cipher[0], str)
546        self.assertIsInstance(cipher[1], str)
547        self.assertIsInstance(cipher[2], int)
548
549        # test SSL object
550        sslobj = client.get_extra_info('ssl_object')
551        self.assertIsNotNone(sslobj)
552        self.assertEqual(sslobj.compression(),
553                         client.get_extra_info('compression'))
554        self.assertEqual(sslobj.cipher(),
555                         client.get_extra_info('cipher'))
556        self.assertEqual(sslobj.getpeercert(),
557                         client.get_extra_info('peercert'))
558        self.assertEqual(sslobj.compression(),
559                         client.get_extra_info('compression'))
560
561    def _basetest_create_ssl_connection(self, connection_fut,
562                                        check_sockname=True,
563                                        peername=None):
564        tr, pr = self.loop.run_until_complete(connection_fut)
565        self.assertIsInstance(tr, asyncio.Transport)
566        self.assertIsInstance(pr, asyncio.Protocol)
567        self.assertTrue('ssl' in tr.__class__.__name__.lower())
568        self.check_ssl_extra_info(tr, check_sockname, peername)
569        self.loop.run_until_complete(pr.done)
570        self.assertGreater(pr.nbytes, 0)
571        tr.close()
572
573    def _test_create_ssl_connection(self, httpd, create_connection,
574                                    check_sockname=True, peername=None):
575        conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
576        self._basetest_create_ssl_connection(conn_fut, check_sockname,
577                                             peername)
578
579        # ssl.Purpose was introduced in Python 3.4
580        if hasattr(ssl, 'Purpose'):
581            def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
582                                          cafile=None, capath=None,
583                                          cadata=None):
584                """
585                A ssl.create_default_context() replacement that doesn't enable
586                cert validation.
587                """
588                self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
589                return test_utils.dummy_ssl_context()
590
591            # With ssl=True, ssl.create_default_context() should be called
592            with mock.patch('ssl.create_default_context',
593                            side_effect=_dummy_ssl_create_context) as m:
594                conn_fut = create_connection(ssl=True)
595                self._basetest_create_ssl_connection(conn_fut, check_sockname,
596                                                     peername)
597                self.assertEqual(m.call_count, 1)
598
599        # With the real ssl.create_default_context(), certificate
600        # validation will fail
601        with self.assertRaises(ssl.SSLError) as cm:
602            conn_fut = create_connection(ssl=True)
603            # Ignore the "SSL handshake failed" log in debug mode
604            with test_utils.disable_logger():
605                self._basetest_create_ssl_connection(conn_fut, check_sockname,
606                                                     peername)
607
608        self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
609
610    @unittest.skipIf(ssl is None, 'No ssl module')
611    def test_create_ssl_connection(self):
612        with test_utils.run_test_server(use_ssl=True) as httpd:
613            create_connection = functools.partial(
614                self.loop.create_connection,
615                lambda: MyProto(loop=self.loop),
616                *httpd.address)
617            self._test_create_ssl_connection(httpd, create_connection,
618                                             peername=httpd.address)
619
620    @socket_helper.skip_unless_bind_unix_socket
621    @unittest.skipIf(ssl is None, 'No ssl module')
622    def test_create_ssl_unix_connection(self):
623        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
624        # zero-length address for UNIX socket.
625        check_sockname = not broken_unix_getsockname()
626
627        with test_utils.run_test_unix_server(use_ssl=True) as httpd:
628            create_connection = functools.partial(
629                self.loop.create_unix_connection,
630                lambda: MyProto(loop=self.loop), httpd.address,
631                server_hostname='127.0.0.1')
632
633            self._test_create_ssl_connection(httpd, create_connection,
634                                             check_sockname,
635                                             peername=httpd.address)
636
637    def test_create_connection_local_addr(self):
638        with test_utils.run_test_server() as httpd:
639            port = socket_helper.find_unused_port()
640            f = self.loop.create_connection(
641                lambda: MyProto(loop=self.loop),
642                *httpd.address, local_addr=(httpd.address[0], port))
643            tr, pr = self.loop.run_until_complete(f)
644            expected = pr.transport.get_extra_info('sockname')[1]
645            self.assertEqual(port, expected)
646            tr.close()
647
648    def test_create_connection_local_addr_in_use(self):
649        with test_utils.run_test_server() as httpd:
650            f = self.loop.create_connection(
651                lambda: MyProto(loop=self.loop),
652                *httpd.address, local_addr=httpd.address)
653            with self.assertRaises(OSError) as cm:
654                self.loop.run_until_complete(f)
655            self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
656            self.assertIn(str(httpd.address), cm.exception.strerror)
657
658    def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
659        loop = self.loop
660
661        class MyProto(MyBaseProto):
662
663            def connection_lost(self, exc):
664                super().connection_lost(exc)
665                loop.call_soon(loop.stop)
666
667            def data_received(self, data):
668                super().data_received(data)
669                self.transport.write(expected_response)
670
671        lsock = socket.create_server(('127.0.0.1', 0), backlog=1)
672        addr = lsock.getsockname()
673
674        message = b'test data'
675        response = None
676        expected_response = b'roger'
677
678        def client():
679            nonlocal response
680            try:
681                csock = socket.socket()
682                if client_ssl is not None:
683                    csock = client_ssl.wrap_socket(csock)
684                csock.connect(addr)
685                csock.sendall(message)
686                response = csock.recv(99)
687                csock.close()
688            except Exception as exc:
689                print(
690                    "Failure in client thread in test_connect_accepted_socket",
691                    exc)
692
693        thread = threading.Thread(target=client, daemon=True)
694        thread.start()
695
696        conn, _ = lsock.accept()
697        proto = MyProto(loop=loop)
698        proto.loop = loop
699        loop.run_until_complete(
700            loop.connect_accepted_socket(
701                (lambda: proto), conn, ssl=server_ssl))
702        loop.run_forever()
703        proto.transport.close()
704        lsock.close()
705
706        support.join_thread(thread)
707        self.assertFalse(thread.is_alive())
708        self.assertEqual(proto.state, 'CLOSED')
709        self.assertEqual(proto.nbytes, len(message))
710        self.assertEqual(response, expected_response)
711
712    @unittest.skipIf(ssl is None, 'No ssl module')
713    def test_ssl_connect_accepted_socket(self):
714        if (sys.platform == 'win32' and
715            sys.version_info < (3, 5) and
716            isinstance(self.loop, proactor_events.BaseProactorEventLoop)
717            ):
718            raise unittest.SkipTest(
719                'SSL not supported with proactor event loops before Python 3.5'
720                )
721
722        server_context = test_utils.simple_server_sslcontext()
723        client_context = test_utils.simple_client_sslcontext()
724
725        self.test_connect_accepted_socket(server_context, client_context)
726
727    def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self):
728        sock = socket.socket()
729        self.addCleanup(sock.close)
730        coro = self.loop.connect_accepted_socket(
731            MyProto, sock, ssl_handshake_timeout=support.LOOPBACK_TIMEOUT)
732        with self.assertRaisesRegex(
733                ValueError,
734                'ssl_handshake_timeout is only meaningful with ssl'):
735            self.loop.run_until_complete(coro)
736
737    @mock.patch('asyncio.base_events.socket')
738    def create_server_multiple_hosts(self, family, hosts, mock_sock):
739        async def getaddrinfo(host, port, *args, **kw):
740            if family == socket.AF_INET:
741                return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
742            else:
743                return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
744
745        def getaddrinfo_task(*args, **kwds):
746            return self.loop.create_task(getaddrinfo(*args, **kwds))
747
748        unique_hosts = set(hosts)
749
750        if family == socket.AF_INET:
751            mock_sock.socket().getsockbyname.side_effect = [
752                (host, 80) for host in unique_hosts]
753        else:
754            mock_sock.socket().getsockbyname.side_effect = [
755                (host, 80, 0, 0) for host in unique_hosts]
756        self.loop.getaddrinfo = getaddrinfo_task
757        self.loop._start_serving = mock.Mock()
758        self.loop._stop_serving = mock.Mock()
759        f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
760        server = self.loop.run_until_complete(f)
761        self.addCleanup(server.close)
762        server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
763        self.assertEqual(server_hosts, unique_hosts)
764
765    def test_create_server_multiple_hosts_ipv4(self):
766        self.create_server_multiple_hosts(socket.AF_INET,
767                                          ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
768
769    def test_create_server_multiple_hosts_ipv6(self):
770        self.create_server_multiple_hosts(socket.AF_INET6,
771                                          ['::1', '::2', '::1'])
772
773    def test_create_server(self):
774        proto = MyProto(self.loop)
775        f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
776        server = self.loop.run_until_complete(f)
777        self.assertEqual(len(server.sockets), 1)
778        sock = server.sockets[0]
779        host, port = sock.getsockname()
780        self.assertEqual(host, '0.0.0.0')
781        client = socket.socket()
782        client.connect(('127.0.0.1', port))
783        client.sendall(b'xxx')
784
785        self.loop.run_until_complete(proto.connected)
786        self.assertEqual('CONNECTED', proto.state)
787
788        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
789        self.assertEqual(3, proto.nbytes)
790
791        # extra info is available
792        self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
793        self.assertEqual('127.0.0.1',
794                         proto.transport.get_extra_info('peername')[0])
795
796        # close connection
797        proto.transport.close()
798        self.loop.run_until_complete(proto.done)
799
800        self.assertEqual('CLOSED', proto.state)
801
802        # the client socket must be closed after to avoid ECONNRESET upon
803        # recv()/send() on the serving socket
804        client.close()
805
806        # close server
807        server.close()
808
809    @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
810    def test_create_server_reuse_port(self):
811        proto = MyProto(self.loop)
812        f = self.loop.create_server(
813            lambda: proto, '0.0.0.0', 0)
814        server = self.loop.run_until_complete(f)
815        self.assertEqual(len(server.sockets), 1)
816        sock = server.sockets[0]
817        self.assertFalse(
818            sock.getsockopt(
819                socket.SOL_SOCKET, socket.SO_REUSEPORT))
820        server.close()
821
822        test_utils.run_briefly(self.loop)
823
824        proto = MyProto(self.loop)
825        f = self.loop.create_server(
826            lambda: proto, '0.0.0.0', 0, reuse_port=True)
827        server = self.loop.run_until_complete(f)
828        self.assertEqual(len(server.sockets), 1)
829        sock = server.sockets[0]
830        self.assertTrue(
831            sock.getsockopt(
832                socket.SOL_SOCKET, socket.SO_REUSEPORT))
833        server.close()
834
835    def _make_unix_server(self, factory, **kwargs):
836        path = test_utils.gen_unix_socket_path()
837        self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
838
839        f = self.loop.create_unix_server(factory, path, **kwargs)
840        server = self.loop.run_until_complete(f)
841
842        return server, path
843
844    @socket_helper.skip_unless_bind_unix_socket
845    def test_create_unix_server(self):
846        proto = MyProto(loop=self.loop)
847        server, path = self._make_unix_server(lambda: proto)
848        self.assertEqual(len(server.sockets), 1)
849
850        client = socket.socket(socket.AF_UNIX)
851        client.connect(path)
852        client.sendall(b'xxx')
853
854        self.loop.run_until_complete(proto.connected)
855        self.assertEqual('CONNECTED', proto.state)
856        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
857        self.assertEqual(3, proto.nbytes)
858
859        # close connection
860        proto.transport.close()
861        self.loop.run_until_complete(proto.done)
862
863        self.assertEqual('CLOSED', proto.state)
864
865        # the client socket must be closed after to avoid ECONNRESET upon
866        # recv()/send() on the serving socket
867        client.close()
868
869        # close server
870        server.close()
871
872    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
873    def test_create_unix_server_path_socket_error(self):
874        proto = MyProto(loop=self.loop)
875        sock = socket.socket()
876        with sock:
877            f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
878            with self.assertRaisesRegex(ValueError,
879                                        'path and sock can not be specified '
880                                        'at the same time'):
881                self.loop.run_until_complete(f)
882
883    def _create_ssl_context(self, certfile, keyfile=None):
884        sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
885        sslcontext.options |= ssl.OP_NO_SSLv2
886        sslcontext.load_cert_chain(certfile, keyfile)
887        return sslcontext
888
889    def _make_ssl_server(self, factory, certfile, keyfile=None):
890        sslcontext = self._create_ssl_context(certfile, keyfile)
891
892        f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
893        server = self.loop.run_until_complete(f)
894
895        sock = server.sockets[0]
896        host, port = sock.getsockname()
897        self.assertEqual(host, '127.0.0.1')
898        return server, host, port
899
900    def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
901        sslcontext = self._create_ssl_context(certfile, keyfile)
902        return self._make_unix_server(factory, ssl=sslcontext)
903
904    @unittest.skipIf(ssl is None, 'No ssl module')
905    def test_create_server_ssl(self):
906        proto = MyProto(loop=self.loop)
907        server, host, port = self._make_ssl_server(
908            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
909
910        f_c = self.loop.create_connection(MyBaseProto, host, port,
911                                          ssl=test_utils.dummy_ssl_context())
912        client, pr = self.loop.run_until_complete(f_c)
913
914        client.write(b'xxx')
915        self.loop.run_until_complete(proto.connected)
916        self.assertEqual('CONNECTED', proto.state)
917
918        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
919        self.assertEqual(3, proto.nbytes)
920
921        # extra info is available
922        self.check_ssl_extra_info(client, peername=(host, port))
923
924        # close connection
925        proto.transport.close()
926        self.loop.run_until_complete(proto.done)
927        self.assertEqual('CLOSED', proto.state)
928
929        # the client socket must be closed after to avoid ECONNRESET upon
930        # recv()/send() on the serving socket
931        client.close()
932
933        # stop serving
934        server.close()
935
936    @socket_helper.skip_unless_bind_unix_socket
937    @unittest.skipIf(ssl is None, 'No ssl module')
938    def test_create_unix_server_ssl(self):
939        proto = MyProto(loop=self.loop)
940        server, path = self._make_ssl_unix_server(
941            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
942
943        f_c = self.loop.create_unix_connection(
944            MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
945            server_hostname='')
946
947        client, pr = self.loop.run_until_complete(f_c)
948
949        client.write(b'xxx')
950        self.loop.run_until_complete(proto.connected)
951        self.assertEqual('CONNECTED', proto.state)
952        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
953        self.assertEqual(3, proto.nbytes)
954
955        # close connection
956        proto.transport.close()
957        self.loop.run_until_complete(proto.done)
958        self.assertEqual('CLOSED', proto.state)
959
960        # the client socket must be closed after to avoid ECONNRESET upon
961        # recv()/send() on the serving socket
962        client.close()
963
964        # stop serving
965        server.close()
966
967    @unittest.skipIf(ssl is None, 'No ssl module')
968    def test_create_server_ssl_verify_failed(self):
969        proto = MyProto(loop=self.loop)
970        server, host, port = self._make_ssl_server(
971            lambda: proto, test_utils.SIGNED_CERTFILE)
972
973        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
974        sslcontext_client.options |= ssl.OP_NO_SSLv2
975        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
976        if hasattr(sslcontext_client, 'check_hostname'):
977            sslcontext_client.check_hostname = True
978
979
980        # no CA loaded
981        f_c = self.loop.create_connection(MyProto, host, port,
982                                          ssl=sslcontext_client)
983        with mock.patch.object(self.loop, 'call_exception_handler'):
984            with test_utils.disable_logger():
985                with self.assertRaisesRegex(ssl.SSLError,
986                                            '(?i)certificate.verify.failed'):
987                    self.loop.run_until_complete(f_c)
988
989            # execute the loop to log the connection error
990            test_utils.run_briefly(self.loop)
991
992        # close connection
993        self.assertIsNone(proto.transport)
994        server.close()
995
996    @socket_helper.skip_unless_bind_unix_socket
997    @unittest.skipIf(ssl is None, 'No ssl module')
998    def test_create_unix_server_ssl_verify_failed(self):
999        proto = MyProto(loop=self.loop)
1000        server, path = self._make_ssl_unix_server(
1001            lambda: proto, test_utils.SIGNED_CERTFILE)
1002
1003        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1004        sslcontext_client.options |= ssl.OP_NO_SSLv2
1005        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1006        if hasattr(sslcontext_client, 'check_hostname'):
1007            sslcontext_client.check_hostname = True
1008
1009        # no CA loaded
1010        f_c = self.loop.create_unix_connection(MyProto, path,
1011                                               ssl=sslcontext_client,
1012                                               server_hostname='invalid')
1013        with mock.patch.object(self.loop, 'call_exception_handler'):
1014            with test_utils.disable_logger():
1015                with self.assertRaisesRegex(ssl.SSLError,
1016                                            '(?i)certificate.verify.failed'):
1017                    self.loop.run_until_complete(f_c)
1018
1019            # execute the loop to log the connection error
1020            test_utils.run_briefly(self.loop)
1021
1022        # close connection
1023        self.assertIsNone(proto.transport)
1024        server.close()
1025
1026    @unittest.skipIf(ssl is None, 'No ssl module')
1027    def test_create_server_ssl_match_failed(self):
1028        proto = MyProto(loop=self.loop)
1029        server, host, port = self._make_ssl_server(
1030            lambda: proto, test_utils.SIGNED_CERTFILE)
1031
1032        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1033        sslcontext_client.options |= ssl.OP_NO_SSLv2
1034        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1035        sslcontext_client.load_verify_locations(
1036            cafile=test_utils.SIGNING_CA)
1037        if hasattr(sslcontext_client, 'check_hostname'):
1038            sslcontext_client.check_hostname = True
1039
1040        # incorrect server_hostname
1041        f_c = self.loop.create_connection(MyProto, host, port,
1042                                          ssl=sslcontext_client)
1043        with mock.patch.object(self.loop, 'call_exception_handler'):
1044            with test_utils.disable_logger():
1045                with self.assertRaisesRegex(
1046                        ssl.CertificateError,
1047                        "IP address mismatch, certificate is not valid for "
1048                        "'127.0.0.1'"):
1049                    self.loop.run_until_complete(f_c)
1050
1051        # close connection
1052        # transport is None because TLS ALERT aborted the handshake
1053        self.assertIsNone(proto.transport)
1054        server.close()
1055
1056    @socket_helper.skip_unless_bind_unix_socket
1057    @unittest.skipIf(ssl is None, 'No ssl module')
1058    def test_create_unix_server_ssl_verified(self):
1059        proto = MyProto(loop=self.loop)
1060        server, path = self._make_ssl_unix_server(
1061            lambda: proto, test_utils.SIGNED_CERTFILE)
1062
1063        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1064        sslcontext_client.options |= ssl.OP_NO_SSLv2
1065        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1066        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1067        if hasattr(sslcontext_client, 'check_hostname'):
1068            sslcontext_client.check_hostname = True
1069
1070        # Connection succeeds with correct CA and server hostname.
1071        f_c = self.loop.create_unix_connection(MyProto, path,
1072                                               ssl=sslcontext_client,
1073                                               server_hostname='localhost')
1074        client, pr = self.loop.run_until_complete(f_c)
1075        self.loop.run_until_complete(proto.connected)
1076
1077        # close connection
1078        proto.transport.close()
1079        client.close()
1080        server.close()
1081        self.loop.run_until_complete(proto.done)
1082
1083    @unittest.skipIf(ssl is None, 'No ssl module')
1084    def test_create_server_ssl_verified(self):
1085        proto = MyProto(loop=self.loop)
1086        server, host, port = self._make_ssl_server(
1087            lambda: proto, test_utils.SIGNED_CERTFILE)
1088
1089        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1090        sslcontext_client.options |= ssl.OP_NO_SSLv2
1091        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1092        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1093        if hasattr(sslcontext_client, 'check_hostname'):
1094            sslcontext_client.check_hostname = True
1095
1096        # Connection succeeds with correct CA and server hostname.
1097        f_c = self.loop.create_connection(MyProto, host, port,
1098                                          ssl=sslcontext_client,
1099                                          server_hostname='localhost')
1100        client, pr = self.loop.run_until_complete(f_c)
1101        self.loop.run_until_complete(proto.connected)
1102
1103        # extra info is available
1104        self.check_ssl_extra_info(client, peername=(host, port),
1105                                  peercert=test_utils.PEERCERT)
1106
1107        # close connection
1108        proto.transport.close()
1109        client.close()
1110        server.close()
1111        self.loop.run_until_complete(proto.done)
1112
1113    def test_create_server_sock(self):
1114        proto = self.loop.create_future()
1115
1116        class TestMyProto(MyProto):
1117            def connection_made(self, transport):
1118                super().connection_made(transport)
1119                proto.set_result(self)
1120
1121        sock_ob = socket.create_server(('0.0.0.0', 0))
1122
1123        f = self.loop.create_server(TestMyProto, sock=sock_ob)
1124        server = self.loop.run_until_complete(f)
1125        sock = server.sockets[0]
1126        self.assertEqual(sock.fileno(), sock_ob.fileno())
1127
1128        host, port = sock.getsockname()
1129        self.assertEqual(host, '0.0.0.0')
1130        client = socket.socket()
1131        client.connect(('127.0.0.1', port))
1132        client.send(b'xxx')
1133        client.close()
1134        server.close()
1135
1136    def test_create_server_addr_in_use(self):
1137        sock_ob = socket.create_server(('0.0.0.0', 0))
1138
1139        f = self.loop.create_server(MyProto, sock=sock_ob)
1140        server = self.loop.run_until_complete(f)
1141        sock = server.sockets[0]
1142        host, port = sock.getsockname()
1143
1144        f = self.loop.create_server(MyProto, host=host, port=port)
1145        with self.assertRaises(OSError) as cm:
1146            self.loop.run_until_complete(f)
1147        self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
1148
1149        server.close()
1150
1151    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1152    def test_create_server_dual_stack(self):
1153        f_proto = self.loop.create_future()
1154
1155        class TestMyProto(MyProto):
1156            def connection_made(self, transport):
1157                super().connection_made(transport)
1158                f_proto.set_result(self)
1159
1160        try_count = 0
1161        while True:
1162            try:
1163                port = socket_helper.find_unused_port()
1164                f = self.loop.create_server(TestMyProto, host=None, port=port)
1165                server = self.loop.run_until_complete(f)
1166            except OSError as ex:
1167                if ex.errno == errno.EADDRINUSE:
1168                    try_count += 1
1169                    self.assertGreaterEqual(5, try_count)
1170                    continue
1171                else:
1172                    raise
1173            else:
1174                break
1175        client = socket.socket()
1176        client.connect(('127.0.0.1', port))
1177        client.send(b'xxx')
1178        proto = self.loop.run_until_complete(f_proto)
1179        proto.transport.close()
1180        client.close()
1181
1182        f_proto = self.loop.create_future()
1183        client = socket.socket(socket.AF_INET6)
1184        client.connect(('::1', port))
1185        client.send(b'xxx')
1186        proto = self.loop.run_until_complete(f_proto)
1187        proto.transport.close()
1188        client.close()
1189
1190        server.close()
1191
1192    def test_server_close(self):
1193        f = self.loop.create_server(MyProto, '0.0.0.0', 0)
1194        server = self.loop.run_until_complete(f)
1195        sock = server.sockets[0]
1196        host, port = sock.getsockname()
1197
1198        client = socket.socket()
1199        client.connect(('127.0.0.1', port))
1200        client.send(b'xxx')
1201        client.close()
1202
1203        server.close()
1204
1205        client = socket.socket()
1206        self.assertRaises(
1207            ConnectionRefusedError, client.connect, ('127.0.0.1', port))
1208        client.close()
1209
1210    def _test_create_datagram_endpoint(self, local_addr, family):
1211        class TestMyDatagramProto(MyDatagramProto):
1212            def __init__(inner_self):
1213                super().__init__(loop=self.loop)
1214
1215            def datagram_received(self, data, addr):
1216                super().datagram_received(data, addr)
1217                self.transport.sendto(b'resp:'+data, addr)
1218
1219        coro = self.loop.create_datagram_endpoint(
1220            TestMyDatagramProto, local_addr=local_addr, family=family)
1221        s_transport, server = self.loop.run_until_complete(coro)
1222        sockname = s_transport.get_extra_info('sockname')
1223        host, port = socket.getnameinfo(
1224            sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV)
1225
1226        self.assertIsInstance(s_transport, asyncio.Transport)
1227        self.assertIsInstance(server, TestMyDatagramProto)
1228        self.assertEqual('INITIALIZED', server.state)
1229        self.assertIs(server.transport, s_transport)
1230
1231        coro = self.loop.create_datagram_endpoint(
1232            lambda: MyDatagramProto(loop=self.loop),
1233            remote_addr=(host, port))
1234        transport, client = self.loop.run_until_complete(coro)
1235
1236        self.assertIsInstance(transport, asyncio.Transport)
1237        self.assertIsInstance(client, MyDatagramProto)
1238        self.assertEqual('INITIALIZED', client.state)
1239        self.assertIs(client.transport, transport)
1240
1241        transport.sendto(b'xxx')
1242        test_utils.run_until(self.loop, lambda: server.nbytes)
1243        self.assertEqual(3, server.nbytes)
1244        test_utils.run_until(self.loop, lambda: client.nbytes)
1245
1246        # received
1247        self.assertEqual(8, client.nbytes)
1248
1249        # extra info is available
1250        self.assertIsNotNone(transport.get_extra_info('sockname'))
1251
1252        # close connection
1253        transport.close()
1254        self.loop.run_until_complete(client.done)
1255        self.assertEqual('CLOSED', client.state)
1256        server.transport.close()
1257
1258    def test_create_datagram_endpoint(self):
1259        self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET)
1260
1261    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1262    def test_create_datagram_endpoint_ipv6(self):
1263        self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6)
1264
1265    def test_create_datagram_endpoint_sock(self):
1266        sock = None
1267        local_address = ('127.0.0.1', 0)
1268        infos = self.loop.run_until_complete(
1269            self.loop.getaddrinfo(
1270                *local_address, type=socket.SOCK_DGRAM))
1271        for family, type, proto, cname, address in infos:
1272            try:
1273                sock = socket.socket(family=family, type=type, proto=proto)
1274                sock.setblocking(False)
1275                sock.bind(address)
1276            except:
1277                pass
1278            else:
1279                break
1280        else:
1281            assert False, 'Can not create socket.'
1282
1283        f = self.loop.create_datagram_endpoint(
1284            lambda: MyDatagramProto(loop=self.loop), sock=sock)
1285        tr, pr = self.loop.run_until_complete(f)
1286        self.assertIsInstance(tr, asyncio.Transport)
1287        self.assertIsInstance(pr, MyDatagramProto)
1288        tr.close()
1289        self.loop.run_until_complete(pr.done)
1290
1291    def test_internal_fds(self):
1292        loop = self.create_event_loop()
1293        if not isinstance(loop, selector_events.BaseSelectorEventLoop):
1294            loop.close()
1295            self.skipTest('loop is not a BaseSelectorEventLoop')
1296
1297        self.assertEqual(1, loop._internal_fds)
1298        loop.close()
1299        self.assertEqual(0, loop._internal_fds)
1300        self.assertIsNone(loop._csock)
1301        self.assertIsNone(loop._ssock)
1302
1303    @unittest.skipUnless(sys.platform != 'win32',
1304                         "Don't support pipes for Windows")
1305    def test_read_pipe(self):
1306        proto = MyReadPipeProto(loop=self.loop)
1307
1308        rpipe, wpipe = os.pipe()
1309        pipeobj = io.open(rpipe, 'rb', 1024)
1310
1311        async def connect():
1312            t, p = await self.loop.connect_read_pipe(
1313                lambda: proto, pipeobj)
1314            self.assertIs(p, proto)
1315            self.assertIs(t, proto.transport)
1316            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1317            self.assertEqual(0, proto.nbytes)
1318
1319        self.loop.run_until_complete(connect())
1320
1321        os.write(wpipe, b'1')
1322        test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
1323        self.assertEqual(1, proto.nbytes)
1324
1325        os.write(wpipe, b'2345')
1326        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1327        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1328        self.assertEqual(5, proto.nbytes)
1329
1330        os.close(wpipe)
1331        self.loop.run_until_complete(proto.done)
1332        self.assertEqual(
1333            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1334        # extra info is available
1335        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1336
1337    @unittest.skipUnless(sys.platform != 'win32',
1338                         "Don't support pipes for Windows")
1339    def test_unclosed_pipe_transport(self):
1340        # This test reproduces the issue #314 on GitHub
1341        loop = self.create_event_loop()
1342        read_proto = MyReadPipeProto(loop=loop)
1343        write_proto = MyWritePipeProto(loop=loop)
1344
1345        rpipe, wpipe = os.pipe()
1346        rpipeobj = io.open(rpipe, 'rb', 1024)
1347        wpipeobj = io.open(wpipe, 'w', 1024)
1348
1349        async def connect():
1350            read_transport, _ = await loop.connect_read_pipe(
1351                lambda: read_proto, rpipeobj)
1352            write_transport, _ = await loop.connect_write_pipe(
1353                lambda: write_proto, wpipeobj)
1354            return read_transport, write_transport
1355
1356        # Run and close the loop without closing the transports
1357        read_transport, write_transport = loop.run_until_complete(connect())
1358        loop.close()
1359
1360        # These 'repr' calls used to raise an AttributeError
1361        # See Issue #314 on GitHub
1362        self.assertIn('open', repr(read_transport))
1363        self.assertIn('open', repr(write_transport))
1364
1365        # Clean up (avoid ResourceWarning)
1366        rpipeobj.close()
1367        wpipeobj.close()
1368        read_transport._pipe = None
1369        write_transport._pipe = None
1370
1371    @unittest.skipUnless(sys.platform != 'win32',
1372                         "Don't support pipes for Windows")
1373    def test_read_pty_output(self):
1374        proto = MyReadPipeProto(loop=self.loop)
1375
1376        master, slave = os.openpty()
1377        master_read_obj = io.open(master, 'rb', 0)
1378
1379        async def connect():
1380            t, p = await self.loop.connect_read_pipe(lambda: proto,
1381                                                     master_read_obj)
1382            self.assertIs(p, proto)
1383            self.assertIs(t, proto.transport)
1384            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1385            self.assertEqual(0, proto.nbytes)
1386
1387        self.loop.run_until_complete(connect())
1388
1389        os.write(slave, b'1')
1390        test_utils.run_until(self.loop, lambda: proto.nbytes)
1391        self.assertEqual(1, proto.nbytes)
1392
1393        os.write(slave, b'2345')
1394        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1395        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1396        self.assertEqual(5, proto.nbytes)
1397
1398        os.close(slave)
1399        proto.transport.close()
1400        self.loop.run_until_complete(proto.done)
1401        self.assertEqual(
1402            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1403        # extra info is available
1404        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1405
1406    @unittest.skipUnless(sys.platform != 'win32',
1407                         "Don't support pipes for Windows")
1408    def test_write_pipe(self):
1409        rpipe, wpipe = os.pipe()
1410        pipeobj = io.open(wpipe, 'wb', 1024)
1411
1412        proto = MyWritePipeProto(loop=self.loop)
1413        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1414        transport, p = self.loop.run_until_complete(connect)
1415        self.assertIs(p, proto)
1416        self.assertIs(transport, proto.transport)
1417        self.assertEqual('CONNECTED', proto.state)
1418
1419        transport.write(b'1')
1420
1421        data = bytearray()
1422        def reader(data):
1423            chunk = os.read(rpipe, 1024)
1424            data += chunk
1425            return len(data)
1426
1427        test_utils.run_until(self.loop, lambda: reader(data) >= 1)
1428        self.assertEqual(b'1', data)
1429
1430        transport.write(b'2345')
1431        test_utils.run_until(self.loop, lambda: reader(data) >= 5)
1432        self.assertEqual(b'12345', data)
1433        self.assertEqual('CONNECTED', proto.state)
1434
1435        os.close(rpipe)
1436
1437        # extra info is available
1438        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1439
1440        # close connection
1441        proto.transport.close()
1442        self.loop.run_until_complete(proto.done)
1443        self.assertEqual('CLOSED', proto.state)
1444
1445    @unittest.skipUnless(sys.platform != 'win32',
1446                         "Don't support pipes for Windows")
1447    def test_write_pipe_disconnect_on_close(self):
1448        rsock, wsock = socket.socketpair()
1449        rsock.setblocking(False)
1450        pipeobj = io.open(wsock.detach(), 'wb', 1024)
1451
1452        proto = MyWritePipeProto(loop=self.loop)
1453        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1454        transport, p = self.loop.run_until_complete(connect)
1455        self.assertIs(p, proto)
1456        self.assertIs(transport, proto.transport)
1457        self.assertEqual('CONNECTED', proto.state)
1458
1459        transport.write(b'1')
1460        data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
1461        self.assertEqual(b'1', data)
1462
1463        rsock.close()
1464
1465        self.loop.run_until_complete(proto.done)
1466        self.assertEqual('CLOSED', proto.state)
1467
1468    @unittest.skipUnless(sys.platform != 'win32',
1469                         "Don't support pipes for Windows")
1470    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1471    # older than 10.6 (Snow Leopard)
1472    @support.requires_mac_ver(10, 6)
1473    def test_write_pty(self):
1474        master, slave = os.openpty()
1475        slave_write_obj = io.open(slave, 'wb', 0)
1476
1477        proto = MyWritePipeProto(loop=self.loop)
1478        connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
1479        transport, p = self.loop.run_until_complete(connect)
1480        self.assertIs(p, proto)
1481        self.assertIs(transport, proto.transport)
1482        self.assertEqual('CONNECTED', proto.state)
1483
1484        transport.write(b'1')
1485
1486        data = bytearray()
1487        def reader(data):
1488            chunk = os.read(master, 1024)
1489            data += chunk
1490            return len(data)
1491
1492        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1493                             timeout=support.SHORT_TIMEOUT)
1494        self.assertEqual(b'1', data)
1495
1496        transport.write(b'2345')
1497        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1498                             timeout=support.SHORT_TIMEOUT)
1499        self.assertEqual(b'12345', data)
1500        self.assertEqual('CONNECTED', proto.state)
1501
1502        os.close(master)
1503
1504        # extra info is available
1505        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1506
1507        # close connection
1508        proto.transport.close()
1509        self.loop.run_until_complete(proto.done)
1510        self.assertEqual('CLOSED', proto.state)
1511
1512    @unittest.skipUnless(sys.platform != 'win32',
1513                         "Don't support pipes for Windows")
1514    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1515    # older than 10.6 (Snow Leopard)
1516    @support.requires_mac_ver(10, 6)
1517    def test_bidirectional_pty(self):
1518        master, read_slave = os.openpty()
1519        write_slave = os.dup(read_slave)
1520        tty.setraw(read_slave)
1521
1522        slave_read_obj = io.open(read_slave, 'rb', 0)
1523        read_proto = MyReadPipeProto(loop=self.loop)
1524        read_connect = self.loop.connect_read_pipe(lambda: read_proto,
1525                                                   slave_read_obj)
1526        read_transport, p = self.loop.run_until_complete(read_connect)
1527        self.assertIs(p, read_proto)
1528        self.assertIs(read_transport, read_proto.transport)
1529        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1530        self.assertEqual(0, read_proto.nbytes)
1531
1532
1533        slave_write_obj = io.open(write_slave, 'wb', 0)
1534        write_proto = MyWritePipeProto(loop=self.loop)
1535        write_connect = self.loop.connect_write_pipe(lambda: write_proto,
1536                                                     slave_write_obj)
1537        write_transport, p = self.loop.run_until_complete(write_connect)
1538        self.assertIs(p, write_proto)
1539        self.assertIs(write_transport, write_proto.transport)
1540        self.assertEqual('CONNECTED', write_proto.state)
1541
1542        data = bytearray()
1543        def reader(data):
1544            chunk = os.read(master, 1024)
1545            data += chunk
1546            return len(data)
1547
1548        write_transport.write(b'1')
1549        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1550                             timeout=support.SHORT_TIMEOUT)
1551        self.assertEqual(b'1', data)
1552        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1553        self.assertEqual('CONNECTED', write_proto.state)
1554
1555        os.write(master, b'a')
1556        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
1557                             timeout=support.SHORT_TIMEOUT)
1558        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1559        self.assertEqual(1, read_proto.nbytes)
1560        self.assertEqual('CONNECTED', write_proto.state)
1561
1562        write_transport.write(b'2345')
1563        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1564                             timeout=support.SHORT_TIMEOUT)
1565        self.assertEqual(b'12345', data)
1566        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1567        self.assertEqual('CONNECTED', write_proto.state)
1568
1569        os.write(master, b'bcde')
1570        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
1571                             timeout=support.SHORT_TIMEOUT)
1572        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1573        self.assertEqual(5, read_proto.nbytes)
1574        self.assertEqual('CONNECTED', write_proto.state)
1575
1576        os.close(master)
1577
1578        read_transport.close()
1579        self.loop.run_until_complete(read_proto.done)
1580        self.assertEqual(
1581            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
1582
1583        write_transport.close()
1584        self.loop.run_until_complete(write_proto.done)
1585        self.assertEqual('CLOSED', write_proto.state)
1586
1587    def test_prompt_cancellation(self):
1588        r, w = socket.socketpair()
1589        r.setblocking(False)
1590        f = self.loop.create_task(self.loop.sock_recv(r, 1))
1591        ov = getattr(f, 'ov', None)
1592        if ov is not None:
1593            self.assertTrue(ov.pending)
1594
1595        async def main():
1596            try:
1597                self.loop.call_soon(f.cancel)
1598                await f
1599            except asyncio.CancelledError:
1600                res = 'cancelled'
1601            else:
1602                res = None
1603            finally:
1604                self.loop.stop()
1605            return res
1606
1607        start = time.monotonic()
1608        t = self.loop.create_task(main())
1609        self.loop.run_forever()
1610        elapsed = time.monotonic() - start
1611
1612        self.assertLess(elapsed, 0.1)
1613        self.assertEqual(t.result(), 'cancelled')
1614        self.assertRaises(asyncio.CancelledError, f.result)
1615        if ov is not None:
1616            self.assertFalse(ov.pending)
1617        self.loop._stop_serving(r)
1618
1619        r.close()
1620        w.close()
1621
1622    def test_timeout_rounding(self):
1623        def _run_once():
1624            self.loop._run_once_counter += 1
1625            orig_run_once()
1626
1627        orig_run_once = self.loop._run_once
1628        self.loop._run_once_counter = 0
1629        self.loop._run_once = _run_once
1630
1631        async def wait():
1632            loop = self.loop
1633            await asyncio.sleep(1e-2)
1634            await asyncio.sleep(1e-4)
1635            await asyncio.sleep(1e-6)
1636            await asyncio.sleep(1e-8)
1637            await asyncio.sleep(1e-10)
1638
1639        self.loop.run_until_complete(wait())
1640        # The ideal number of call is 12, but on some platforms, the selector
1641        # may sleep at little bit less than timeout depending on the resolution
1642        # of the clock used by the kernel. Tolerate a few useless calls on
1643        # these platforms.
1644        self.assertLessEqual(self.loop._run_once_counter, 20,
1645            {'clock_resolution': self.loop._clock_resolution,
1646             'selector': self.loop._selector.__class__.__name__})
1647
1648    def test_remove_fds_after_closing(self):
1649        loop = self.create_event_loop()
1650        callback = lambda: None
1651        r, w = socket.socketpair()
1652        self.addCleanup(r.close)
1653        self.addCleanup(w.close)
1654        loop.add_reader(r, callback)
1655        loop.add_writer(w, callback)
1656        loop.close()
1657        self.assertFalse(loop.remove_reader(r))
1658        self.assertFalse(loop.remove_writer(w))
1659
1660    def test_add_fds_after_closing(self):
1661        loop = self.create_event_loop()
1662        callback = lambda: None
1663        r, w = socket.socketpair()
1664        self.addCleanup(r.close)
1665        self.addCleanup(w.close)
1666        loop.close()
1667        with self.assertRaises(RuntimeError):
1668            loop.add_reader(r, callback)
1669        with self.assertRaises(RuntimeError):
1670            loop.add_writer(w, callback)
1671
1672    def test_close_running_event_loop(self):
1673        async def close_loop(loop):
1674            self.loop.close()
1675
1676        coro = close_loop(self.loop)
1677        with self.assertRaises(RuntimeError):
1678            self.loop.run_until_complete(coro)
1679
1680    def test_close(self):
1681        self.loop.close()
1682
1683        async def test():
1684            pass
1685
1686        func = lambda: False
1687        coro = test()
1688        self.addCleanup(coro.close)
1689
1690        # operation blocked when the loop is closed
1691        with self.assertRaises(RuntimeError):
1692            self.loop.run_forever()
1693        with self.assertRaises(RuntimeError):
1694            fut = self.loop.create_future()
1695            self.loop.run_until_complete(fut)
1696        with self.assertRaises(RuntimeError):
1697            self.loop.call_soon(func)
1698        with self.assertRaises(RuntimeError):
1699            self.loop.call_soon_threadsafe(func)
1700        with self.assertRaises(RuntimeError):
1701            self.loop.call_later(1.0, func)
1702        with self.assertRaises(RuntimeError):
1703            self.loop.call_at(self.loop.time() + .0, func)
1704        with self.assertRaises(RuntimeError):
1705            self.loop.create_task(coro)
1706        with self.assertRaises(RuntimeError):
1707            self.loop.add_signal_handler(signal.SIGTERM, func)
1708
1709        # run_in_executor test is tricky: the method is a coroutine,
1710        # but run_until_complete cannot be called on closed loop.
1711        # Thus iterate once explicitly.
1712        with self.assertRaises(RuntimeError):
1713            it = self.loop.run_in_executor(None, func).__await__()
1714            next(it)
1715
1716
1717class SubprocessTestsMixin:
1718
1719    def check_terminated(self, returncode):
1720        if sys.platform == 'win32':
1721            self.assertIsInstance(returncode, int)
1722            # expect 1 but sometimes get 0
1723        else:
1724            self.assertEqual(-signal.SIGTERM, returncode)
1725
1726    def check_killed(self, returncode):
1727        if sys.platform == 'win32':
1728            self.assertIsInstance(returncode, int)
1729            # expect 1 but sometimes get 0
1730        else:
1731            self.assertEqual(-signal.SIGKILL, returncode)
1732
1733    def test_subprocess_exec(self):
1734        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1735
1736        connect = self.loop.subprocess_exec(
1737                        functools.partial(MySubprocessProtocol, self.loop),
1738                        sys.executable, prog)
1739        transp, proto = self.loop.run_until_complete(connect)
1740        self.assertIsInstance(proto, MySubprocessProtocol)
1741        self.loop.run_until_complete(proto.connected)
1742        self.assertEqual('CONNECTED', proto.state)
1743
1744        stdin = transp.get_pipe_transport(0)
1745        stdin.write(b'Python The Winner')
1746        self.loop.run_until_complete(proto.got_data[1].wait())
1747        with test_utils.disable_logger():
1748            transp.close()
1749        self.loop.run_until_complete(proto.completed)
1750        self.check_killed(proto.returncode)
1751        self.assertEqual(b'Python The Winner', proto.data[1])
1752
1753    def test_subprocess_interactive(self):
1754        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1755
1756        connect = self.loop.subprocess_exec(
1757                        functools.partial(MySubprocessProtocol, self.loop),
1758                        sys.executable, prog)
1759
1760        transp, proto = self.loop.run_until_complete(connect)
1761        self.assertIsInstance(proto, MySubprocessProtocol)
1762        self.loop.run_until_complete(proto.connected)
1763        self.assertEqual('CONNECTED', proto.state)
1764
1765        stdin = transp.get_pipe_transport(0)
1766        stdin.write(b'Python ')
1767        self.loop.run_until_complete(proto.got_data[1].wait())
1768        proto.got_data[1].clear()
1769        self.assertEqual(b'Python ', proto.data[1])
1770
1771        stdin.write(b'The Winner')
1772        self.loop.run_until_complete(proto.got_data[1].wait())
1773        self.assertEqual(b'Python The Winner', proto.data[1])
1774
1775        with test_utils.disable_logger():
1776            transp.close()
1777        self.loop.run_until_complete(proto.completed)
1778        self.check_killed(proto.returncode)
1779
1780    def test_subprocess_shell(self):
1781        connect = self.loop.subprocess_shell(
1782                        functools.partial(MySubprocessProtocol, self.loop),
1783                        'echo Python')
1784        transp, proto = self.loop.run_until_complete(connect)
1785        self.assertIsInstance(proto, MySubprocessProtocol)
1786        self.loop.run_until_complete(proto.connected)
1787
1788        transp.get_pipe_transport(0).close()
1789        self.loop.run_until_complete(proto.completed)
1790        self.assertEqual(0, proto.returncode)
1791        self.assertTrue(all(f.done() for f in proto.disconnects.values()))
1792        self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
1793        self.assertEqual(proto.data[2], b'')
1794        transp.close()
1795
1796    def test_subprocess_exitcode(self):
1797        connect = self.loop.subprocess_shell(
1798                        functools.partial(MySubprocessProtocol, self.loop),
1799                        'exit 7', stdin=None, stdout=None, stderr=None)
1800
1801        transp, proto = self.loop.run_until_complete(connect)
1802        self.assertIsInstance(proto, MySubprocessProtocol)
1803        self.loop.run_until_complete(proto.completed)
1804        self.assertEqual(7, proto.returncode)
1805        transp.close()
1806
1807    def test_subprocess_close_after_finish(self):
1808        connect = self.loop.subprocess_shell(
1809                        functools.partial(MySubprocessProtocol, self.loop),
1810                        'exit 7', stdin=None, stdout=None, stderr=None)
1811        transp, proto = self.loop.run_until_complete(connect)
1812        self.assertIsInstance(proto, MySubprocessProtocol)
1813        self.assertIsNone(transp.get_pipe_transport(0))
1814        self.assertIsNone(transp.get_pipe_transport(1))
1815        self.assertIsNone(transp.get_pipe_transport(2))
1816        self.loop.run_until_complete(proto.completed)
1817        self.assertEqual(7, proto.returncode)
1818        self.assertIsNone(transp.close())
1819
1820    def test_subprocess_kill(self):
1821        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1822
1823        connect = self.loop.subprocess_exec(
1824                        functools.partial(MySubprocessProtocol, self.loop),
1825                        sys.executable, prog)
1826
1827        transp, proto = self.loop.run_until_complete(connect)
1828        self.assertIsInstance(proto, MySubprocessProtocol)
1829        self.loop.run_until_complete(proto.connected)
1830
1831        transp.kill()
1832        self.loop.run_until_complete(proto.completed)
1833        self.check_killed(proto.returncode)
1834        transp.close()
1835
1836    def test_subprocess_terminate(self):
1837        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1838
1839        connect = self.loop.subprocess_exec(
1840                        functools.partial(MySubprocessProtocol, self.loop),
1841                        sys.executable, prog)
1842
1843        transp, proto = self.loop.run_until_complete(connect)
1844        self.assertIsInstance(proto, MySubprocessProtocol)
1845        self.loop.run_until_complete(proto.connected)
1846
1847        transp.terminate()
1848        self.loop.run_until_complete(proto.completed)
1849        self.check_terminated(proto.returncode)
1850        transp.close()
1851
1852    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
1853    def test_subprocess_send_signal(self):
1854        # bpo-31034: Make sure that we get the default signal handler (killing
1855        # the process). The parent process may have decided to ignore SIGHUP,
1856        # and signal handlers are inherited.
1857        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
1858        try:
1859            prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1860
1861            connect = self.loop.subprocess_exec(
1862                            functools.partial(MySubprocessProtocol, self.loop),
1863                            sys.executable, prog)
1864
1865            transp, proto = self.loop.run_until_complete(connect)
1866            self.assertIsInstance(proto, MySubprocessProtocol)
1867            self.loop.run_until_complete(proto.connected)
1868
1869            transp.send_signal(signal.SIGHUP)
1870            self.loop.run_until_complete(proto.completed)
1871            self.assertEqual(-signal.SIGHUP, proto.returncode)
1872            transp.close()
1873        finally:
1874            signal.signal(signal.SIGHUP, old_handler)
1875
1876    def test_subprocess_stderr(self):
1877        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1878
1879        connect = self.loop.subprocess_exec(
1880                        functools.partial(MySubprocessProtocol, self.loop),
1881                        sys.executable, prog)
1882
1883        transp, proto = self.loop.run_until_complete(connect)
1884        self.assertIsInstance(proto, MySubprocessProtocol)
1885        self.loop.run_until_complete(proto.connected)
1886
1887        stdin = transp.get_pipe_transport(0)
1888        stdin.write(b'test')
1889
1890        self.loop.run_until_complete(proto.completed)
1891
1892        transp.close()
1893        self.assertEqual(b'OUT:test', proto.data[1])
1894        self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
1895        self.assertEqual(0, proto.returncode)
1896
1897    def test_subprocess_stderr_redirect_to_stdout(self):
1898        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1899
1900        connect = self.loop.subprocess_exec(
1901                        functools.partial(MySubprocessProtocol, self.loop),
1902                        sys.executable, prog, stderr=subprocess.STDOUT)
1903
1904        transp, proto = self.loop.run_until_complete(connect)
1905        self.assertIsInstance(proto, MySubprocessProtocol)
1906        self.loop.run_until_complete(proto.connected)
1907
1908        stdin = transp.get_pipe_transport(0)
1909        self.assertIsNotNone(transp.get_pipe_transport(1))
1910        self.assertIsNone(transp.get_pipe_transport(2))
1911
1912        stdin.write(b'test')
1913        self.loop.run_until_complete(proto.completed)
1914        self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
1915                        proto.data[1])
1916        self.assertEqual(b'', proto.data[2])
1917
1918        transp.close()
1919        self.assertEqual(0, proto.returncode)
1920
1921    def test_subprocess_close_client_stream(self):
1922        prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
1923
1924        connect = self.loop.subprocess_exec(
1925                        functools.partial(MySubprocessProtocol, self.loop),
1926                        sys.executable, prog)
1927        transp, proto = self.loop.run_until_complete(connect)
1928        self.assertIsInstance(proto, MySubprocessProtocol)
1929        self.loop.run_until_complete(proto.connected)
1930
1931        stdin = transp.get_pipe_transport(0)
1932        stdout = transp.get_pipe_transport(1)
1933        stdin.write(b'test')
1934        self.loop.run_until_complete(proto.got_data[1].wait())
1935        self.assertEqual(b'OUT:test', proto.data[1])
1936
1937        stdout.close()
1938        self.loop.run_until_complete(proto.disconnects[1])
1939        stdin.write(b'xxx')
1940        self.loop.run_until_complete(proto.got_data[2].wait())
1941        if sys.platform != 'win32':
1942            self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
1943        else:
1944            # After closing the read-end of a pipe, writing to the
1945            # write-end using os.write() fails with errno==EINVAL and
1946            # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
1947            # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
1948            self.assertEqual(b'ERR:OSError', proto.data[2])
1949        with test_utils.disable_logger():
1950            transp.close()
1951        self.loop.run_until_complete(proto.completed)
1952        self.check_killed(proto.returncode)
1953
1954    def test_subprocess_wait_no_same_group(self):
1955        # start the new process in a new session
1956        connect = self.loop.subprocess_shell(
1957                        functools.partial(MySubprocessProtocol, self.loop),
1958                        'exit 7', stdin=None, stdout=None, stderr=None,
1959                        start_new_session=True)
1960        transp, proto = self.loop.run_until_complete(connect)
1961        self.assertIsInstance(proto, MySubprocessProtocol)
1962        self.loop.run_until_complete(proto.completed)
1963        self.assertEqual(7, proto.returncode)
1964        transp.close()
1965
1966    def test_subprocess_exec_invalid_args(self):
1967        async def connect(**kwds):
1968            await self.loop.subprocess_exec(
1969                asyncio.SubprocessProtocol,
1970                'pwd', **kwds)
1971
1972        with self.assertRaises(ValueError):
1973            self.loop.run_until_complete(connect(universal_newlines=True))
1974        with self.assertRaises(ValueError):
1975            self.loop.run_until_complete(connect(bufsize=4096))
1976        with self.assertRaises(ValueError):
1977            self.loop.run_until_complete(connect(shell=True))
1978
1979    def test_subprocess_shell_invalid_args(self):
1980
1981        async def connect(cmd=None, **kwds):
1982            if not cmd:
1983                cmd = 'pwd'
1984            await self.loop.subprocess_shell(
1985                asyncio.SubprocessProtocol,
1986                cmd, **kwds)
1987
1988        with self.assertRaises(ValueError):
1989            self.loop.run_until_complete(connect(['ls', '-l']))
1990        with self.assertRaises(ValueError):
1991            self.loop.run_until_complete(connect(universal_newlines=True))
1992        with self.assertRaises(ValueError):
1993            self.loop.run_until_complete(connect(bufsize=4096))
1994        with self.assertRaises(ValueError):
1995            self.loop.run_until_complete(connect(shell=False))
1996
1997
1998if sys.platform == 'win32':
1999
2000    class SelectEventLoopTests(EventLoopTestsMixin,
2001                               test_utils.TestCase):
2002
2003        def create_event_loop(self):
2004            return asyncio.SelectorEventLoop()
2005
2006    class ProactorEventLoopTests(EventLoopTestsMixin,
2007                                 SubprocessTestsMixin,
2008                                 test_utils.TestCase):
2009
2010        def create_event_loop(self):
2011            return asyncio.ProactorEventLoop()
2012
2013        def test_reader_callback(self):
2014            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2015
2016        def test_reader_callback_cancel(self):
2017            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2018
2019        def test_writer_callback(self):
2020            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2021
2022        def test_writer_callback_cancel(self):
2023            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2024
2025        def test_remove_fds_after_closing(self):
2026            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2027else:
2028    import selectors
2029
2030    class UnixEventLoopTestsMixin(EventLoopTestsMixin):
2031        def setUp(self):
2032            super().setUp()
2033            watcher = asyncio.SafeChildWatcher()
2034            watcher.attach_loop(self.loop)
2035            asyncio.set_child_watcher(watcher)
2036
2037        def tearDown(self):
2038            asyncio.set_child_watcher(None)
2039            super().tearDown()
2040
2041
2042    if hasattr(selectors, 'KqueueSelector'):
2043        class KqueueEventLoopTests(UnixEventLoopTestsMixin,
2044                                   SubprocessTestsMixin,
2045                                   test_utils.TestCase):
2046
2047            def create_event_loop(self):
2048                return asyncio.SelectorEventLoop(
2049                    selectors.KqueueSelector())
2050
2051            # kqueue doesn't support character devices (PTY) on Mac OS X older
2052            # than 10.9 (Maverick)
2053            @support.requires_mac_ver(10, 9)
2054            # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
2055            # hangs on OpenBSD 5.5
2056            @unittest.skipIf(sys.platform.startswith('openbsd'),
2057                             'test hangs on OpenBSD')
2058            def test_read_pty_output(self):
2059                super().test_read_pty_output()
2060
2061            # kqueue doesn't support character devices (PTY) on Mac OS X older
2062            # than 10.9 (Maverick)
2063            @support.requires_mac_ver(10, 9)
2064            def test_write_pty(self):
2065                super().test_write_pty()
2066
2067    if hasattr(selectors, 'EpollSelector'):
2068        class EPollEventLoopTests(UnixEventLoopTestsMixin,
2069                                  SubprocessTestsMixin,
2070                                  test_utils.TestCase):
2071
2072            def create_event_loop(self):
2073                return asyncio.SelectorEventLoop(selectors.EpollSelector())
2074
2075    if hasattr(selectors, 'PollSelector'):
2076        class PollEventLoopTests(UnixEventLoopTestsMixin,
2077                                 SubprocessTestsMixin,
2078                                 test_utils.TestCase):
2079
2080            def create_event_loop(self):
2081                return asyncio.SelectorEventLoop(selectors.PollSelector())
2082
2083    # Should always exist.
2084    class SelectEventLoopTests(UnixEventLoopTestsMixin,
2085                               SubprocessTestsMixin,
2086                               test_utils.TestCase):
2087
2088        def create_event_loop(self):
2089            return asyncio.SelectorEventLoop(selectors.SelectSelector())
2090
2091
2092def noop(*args, **kwargs):
2093    pass
2094
2095
2096class HandleTests(test_utils.TestCase):
2097
2098    def setUp(self):
2099        super().setUp()
2100        self.loop = mock.Mock()
2101        self.loop.get_debug.return_value = True
2102
2103    def test_handle(self):
2104        def callback(*args):
2105            return args
2106
2107        args = ()
2108        h = asyncio.Handle(callback, args, self.loop)
2109        self.assertIs(h._callback, callback)
2110        self.assertIs(h._args, args)
2111        self.assertFalse(h.cancelled())
2112
2113        h.cancel()
2114        self.assertTrue(h.cancelled())
2115
2116    def test_callback_with_exception(self):
2117        def callback():
2118            raise ValueError()
2119
2120        self.loop = mock.Mock()
2121        self.loop.call_exception_handler = mock.Mock()
2122
2123        h = asyncio.Handle(callback, (), self.loop)
2124        h._run()
2125
2126        self.loop.call_exception_handler.assert_called_with({
2127            'message': test_utils.MockPattern('Exception in callback.*'),
2128            'exception': mock.ANY,
2129            'handle': h,
2130            'source_traceback': h._source_traceback,
2131        })
2132
2133    def test_handle_weakref(self):
2134        wd = weakref.WeakValueDictionary()
2135        h = asyncio.Handle(lambda: None, (), self.loop)
2136        wd['h'] = h  # Would fail without __weakref__ slot.
2137
2138    def test_handle_repr(self):
2139        self.loop.get_debug.return_value = False
2140
2141        # simple function
2142        h = asyncio.Handle(noop, (1, 2), self.loop)
2143        filename, lineno = test_utils.get_function_source(noop)
2144        self.assertEqual(repr(h),
2145                        '<Handle noop(1, 2) at %s:%s>'
2146                        % (filename, lineno))
2147
2148        # cancelled handle
2149        h.cancel()
2150        self.assertEqual(repr(h),
2151                        '<Handle cancelled>')
2152
2153        # decorated function
2154        with self.assertWarns(DeprecationWarning):
2155            cb = asyncio.coroutine(noop)
2156        h = asyncio.Handle(cb, (), self.loop)
2157        self.assertEqual(repr(h),
2158                        '<Handle noop() at %s:%s>'
2159                        % (filename, lineno))
2160
2161        # partial function
2162        cb = functools.partial(noop, 1, 2)
2163        h = asyncio.Handle(cb, (3,), self.loop)
2164        regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
2165                 % (re.escape(filename), lineno))
2166        self.assertRegex(repr(h), regex)
2167
2168        # partial function with keyword args
2169        cb = functools.partial(noop, x=1)
2170        h = asyncio.Handle(cb, (2, 3), self.loop)
2171        regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
2172                 % (re.escape(filename), lineno))
2173        self.assertRegex(repr(h), regex)
2174
2175        # partial method
2176        if sys.version_info >= (3, 4):
2177            method = HandleTests.test_handle_repr
2178            cb = functools.partialmethod(method)
2179            filename, lineno = test_utils.get_function_source(method)
2180            h = asyncio.Handle(cb, (), self.loop)
2181
2182            cb_regex = r'<function HandleTests.test_handle_repr .*>'
2183            cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
2184            regex = (r'^<Handle %s at %s:%s>$'
2185                     % (cb_regex, re.escape(filename), lineno))
2186            self.assertRegex(repr(h), regex)
2187
2188    def test_handle_repr_debug(self):
2189        self.loop.get_debug.return_value = True
2190
2191        # simple function
2192        create_filename = __file__
2193        create_lineno = sys._getframe().f_lineno + 1
2194        h = asyncio.Handle(noop, (1, 2), self.loop)
2195        filename, lineno = test_utils.get_function_source(noop)
2196        self.assertEqual(repr(h),
2197                        '<Handle noop(1, 2) at %s:%s created at %s:%s>'
2198                        % (filename, lineno, create_filename, create_lineno))
2199
2200        # cancelled handle
2201        h.cancel()
2202        self.assertEqual(
2203            repr(h),
2204            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2205            % (filename, lineno, create_filename, create_lineno))
2206
2207        # double cancellation won't overwrite _repr
2208        h.cancel()
2209        self.assertEqual(
2210            repr(h),
2211            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2212            % (filename, lineno, create_filename, create_lineno))
2213
2214    def test_handle_source_traceback(self):
2215        loop = asyncio.get_event_loop_policy().new_event_loop()
2216        loop.set_debug(True)
2217        self.set_event_loop(loop)
2218
2219        def check_source_traceback(h):
2220            lineno = sys._getframe(1).f_lineno - 1
2221            self.assertIsInstance(h._source_traceback, list)
2222            self.assertEqual(h._source_traceback[-1][:3],
2223                             (__file__,
2224                              lineno,
2225                              'test_handle_source_traceback'))
2226
2227        # call_soon
2228        h = loop.call_soon(noop)
2229        check_source_traceback(h)
2230
2231        # call_soon_threadsafe
2232        h = loop.call_soon_threadsafe(noop)
2233        check_source_traceback(h)
2234
2235        # call_later
2236        h = loop.call_later(0, noop)
2237        check_source_traceback(h)
2238
2239        # call_at
2240        h = loop.call_later(0, noop)
2241        check_source_traceback(h)
2242
2243    @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
2244                         'No collections.abc.Coroutine')
2245    def test_coroutine_like_object_debug_formatting(self):
2246        # Test that asyncio can format coroutines that are instances of
2247        # collections.abc.Coroutine, but lack cr_core or gi_code attributes
2248        # (such as ones compiled with Cython).
2249
2250        coro = CoroLike()
2251        coro.__name__ = 'AAA'
2252        self.assertTrue(asyncio.iscoroutine(coro))
2253        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2254
2255        coro.__qualname__ = 'BBB'
2256        self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
2257
2258        coro.cr_running = True
2259        self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
2260
2261        coro.__name__ = coro.__qualname__ = None
2262        self.assertEqual(coroutines._format_coroutine(coro),
2263                         '<CoroLike without __name__>() running')
2264
2265        coro = CoroLike()
2266        coro.__qualname__ = 'CoroLike'
2267        # Some coroutines might not have '__name__', such as
2268        # built-in async_gen.asend().
2269        self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()')
2270
2271        coro = CoroLike()
2272        coro.__qualname__ = 'AAA'
2273        coro.cr_code = None
2274        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2275
2276
2277class TimerTests(unittest.TestCase):
2278
2279    def setUp(self):
2280        super().setUp()
2281        self.loop = mock.Mock()
2282
2283    def test_hash(self):
2284        when = time.monotonic()
2285        h = asyncio.TimerHandle(when, lambda: False, (),
2286                                mock.Mock())
2287        self.assertEqual(hash(h), hash(when))
2288
2289    def test_when(self):
2290        when = time.monotonic()
2291        h = asyncio.TimerHandle(when, lambda: False, (),
2292                                mock.Mock())
2293        self.assertEqual(when, h.when())
2294
2295    def test_timer(self):
2296        def callback(*args):
2297            return args
2298
2299        args = (1, 2, 3)
2300        when = time.monotonic()
2301        h = asyncio.TimerHandle(when, callback, args, mock.Mock())
2302        self.assertIs(h._callback, callback)
2303        self.assertIs(h._args, args)
2304        self.assertFalse(h.cancelled())
2305
2306        # cancel
2307        h.cancel()
2308        self.assertTrue(h.cancelled())
2309        self.assertIsNone(h._callback)
2310        self.assertIsNone(h._args)
2311
2312        # when cannot be None
2313        self.assertRaises(AssertionError,
2314                          asyncio.TimerHandle, None, callback, args,
2315                          self.loop)
2316
2317    def test_timer_repr(self):
2318        self.loop.get_debug.return_value = False
2319
2320        # simple function
2321        h = asyncio.TimerHandle(123, noop, (), self.loop)
2322        src = test_utils.get_function_source(noop)
2323        self.assertEqual(repr(h),
2324                        '<TimerHandle when=123 noop() at %s:%s>' % src)
2325
2326        # cancelled handle
2327        h.cancel()
2328        self.assertEqual(repr(h),
2329                        '<TimerHandle cancelled when=123>')
2330
2331    def test_timer_repr_debug(self):
2332        self.loop.get_debug.return_value = True
2333
2334        # simple function
2335        create_filename = __file__
2336        create_lineno = sys._getframe().f_lineno + 1
2337        h = asyncio.TimerHandle(123, noop, (), self.loop)
2338        filename, lineno = test_utils.get_function_source(noop)
2339        self.assertEqual(repr(h),
2340                        '<TimerHandle when=123 noop() '
2341                        'at %s:%s created at %s:%s>'
2342                        % (filename, lineno, create_filename, create_lineno))
2343
2344        # cancelled handle
2345        h.cancel()
2346        self.assertEqual(repr(h),
2347                        '<TimerHandle cancelled when=123 noop() '
2348                        'at %s:%s created at %s:%s>'
2349                        % (filename, lineno, create_filename, create_lineno))
2350
2351
2352    def test_timer_comparison(self):
2353        def callback(*args):
2354            return args
2355
2356        when = time.monotonic()
2357
2358        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2359        h2 = asyncio.TimerHandle(when, callback, (), self.loop)
2360        # TODO: Use assertLess etc.
2361        self.assertFalse(h1 < h2)
2362        self.assertFalse(h2 < h1)
2363        self.assertTrue(h1 <= h2)
2364        self.assertTrue(h2 <= h1)
2365        self.assertFalse(h1 > h2)
2366        self.assertFalse(h2 > h1)
2367        self.assertTrue(h1 >= h2)
2368        self.assertTrue(h2 >= h1)
2369        self.assertTrue(h1 == h2)
2370        self.assertFalse(h1 != h2)
2371
2372        h2.cancel()
2373        self.assertFalse(h1 == h2)
2374
2375        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2376        h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
2377        self.assertTrue(h1 < h2)
2378        self.assertFalse(h2 < h1)
2379        self.assertTrue(h1 <= h2)
2380        self.assertFalse(h2 <= h1)
2381        self.assertFalse(h1 > h2)
2382        self.assertTrue(h2 > h1)
2383        self.assertFalse(h1 >= h2)
2384        self.assertTrue(h2 >= h1)
2385        self.assertFalse(h1 == h2)
2386        self.assertTrue(h1 != h2)
2387
2388        h3 = asyncio.Handle(callback, (), self.loop)
2389        self.assertIs(NotImplemented, h1.__eq__(h3))
2390        self.assertIs(NotImplemented, h1.__ne__(h3))
2391
2392        with self.assertRaises(TypeError):
2393            h1 < ()
2394        with self.assertRaises(TypeError):
2395            h1 > ()
2396        with self.assertRaises(TypeError):
2397            h1 <= ()
2398        with self.assertRaises(TypeError):
2399            h1 >= ()
2400        self.assertFalse(h1 == ())
2401        self.assertTrue(h1 != ())
2402
2403        self.assertTrue(h1 == ALWAYS_EQ)
2404        self.assertFalse(h1 != ALWAYS_EQ)
2405        self.assertTrue(h1 < LARGEST)
2406        self.assertFalse(h1 > LARGEST)
2407        self.assertTrue(h1 <= LARGEST)
2408        self.assertFalse(h1 >= LARGEST)
2409        self.assertFalse(h1 < SMALLEST)
2410        self.assertTrue(h1 > SMALLEST)
2411        self.assertFalse(h1 <= SMALLEST)
2412        self.assertTrue(h1 >= SMALLEST)
2413
2414
2415class AbstractEventLoopTests(unittest.TestCase):
2416
2417    def test_not_implemented(self):
2418        f = mock.Mock()
2419        loop = asyncio.AbstractEventLoop()
2420        self.assertRaises(
2421            NotImplementedError, loop.run_forever)
2422        self.assertRaises(
2423            NotImplementedError, loop.run_until_complete, None)
2424        self.assertRaises(
2425            NotImplementedError, loop.stop)
2426        self.assertRaises(
2427            NotImplementedError, loop.is_running)
2428        self.assertRaises(
2429            NotImplementedError, loop.is_closed)
2430        self.assertRaises(
2431            NotImplementedError, loop.close)
2432        self.assertRaises(
2433            NotImplementedError, loop.create_task, None)
2434        self.assertRaises(
2435            NotImplementedError, loop.call_later, None, None)
2436        self.assertRaises(
2437            NotImplementedError, loop.call_at, f, f)
2438        self.assertRaises(
2439            NotImplementedError, loop.call_soon, None)
2440        self.assertRaises(
2441            NotImplementedError, loop.time)
2442        self.assertRaises(
2443            NotImplementedError, loop.call_soon_threadsafe, None)
2444        self.assertRaises(
2445            NotImplementedError, loop.set_default_executor, f)
2446        self.assertRaises(
2447            NotImplementedError, loop.add_reader, 1, f)
2448        self.assertRaises(
2449            NotImplementedError, loop.remove_reader, 1)
2450        self.assertRaises(
2451            NotImplementedError, loop.add_writer, 1, f)
2452        self.assertRaises(
2453            NotImplementedError, loop.remove_writer, 1)
2454        self.assertRaises(
2455            NotImplementedError, loop.add_signal_handler, 1, f)
2456        self.assertRaises(
2457            NotImplementedError, loop.remove_signal_handler, 1)
2458        self.assertRaises(
2459            NotImplementedError, loop.remove_signal_handler, 1)
2460        self.assertRaises(
2461            NotImplementedError, loop.set_exception_handler, f)
2462        self.assertRaises(
2463            NotImplementedError, loop.default_exception_handler, f)
2464        self.assertRaises(
2465            NotImplementedError, loop.call_exception_handler, f)
2466        self.assertRaises(
2467            NotImplementedError, loop.get_debug)
2468        self.assertRaises(
2469            NotImplementedError, loop.set_debug, f)
2470
2471    def test_not_implemented_async(self):
2472
2473        async def inner():
2474            f = mock.Mock()
2475            loop = asyncio.AbstractEventLoop()
2476
2477            with self.assertRaises(NotImplementedError):
2478                await loop.run_in_executor(f, f)
2479            with self.assertRaises(NotImplementedError):
2480                await loop.getaddrinfo('localhost', 8080)
2481            with self.assertRaises(NotImplementedError):
2482                await loop.getnameinfo(('localhost', 8080))
2483            with self.assertRaises(NotImplementedError):
2484                await loop.create_connection(f)
2485            with self.assertRaises(NotImplementedError):
2486                await loop.create_server(f)
2487            with self.assertRaises(NotImplementedError):
2488                await loop.create_datagram_endpoint(f)
2489            with self.assertRaises(NotImplementedError):
2490                await loop.sock_recv(f, 10)
2491            with self.assertRaises(NotImplementedError):
2492                await loop.sock_recv_into(f, 10)
2493            with self.assertRaises(NotImplementedError):
2494                await loop.sock_sendall(f, 10)
2495            with self.assertRaises(NotImplementedError):
2496                await loop.sock_connect(f, f)
2497            with self.assertRaises(NotImplementedError):
2498                await loop.sock_accept(f)
2499            with self.assertRaises(NotImplementedError):
2500                await loop.sock_sendfile(f, f)
2501            with self.assertRaises(NotImplementedError):
2502                await loop.sendfile(f, f)
2503            with self.assertRaises(NotImplementedError):
2504                await loop.connect_read_pipe(f, mock.sentinel.pipe)
2505            with self.assertRaises(NotImplementedError):
2506                await loop.connect_write_pipe(f, mock.sentinel.pipe)
2507            with self.assertRaises(NotImplementedError):
2508                await loop.subprocess_shell(f, mock.sentinel)
2509            with self.assertRaises(NotImplementedError):
2510                await loop.subprocess_exec(f)
2511
2512        loop = asyncio.new_event_loop()
2513        loop.run_until_complete(inner())
2514        loop.close()
2515
2516
2517class PolicyTests(unittest.TestCase):
2518
2519    def test_event_loop_policy(self):
2520        policy = asyncio.AbstractEventLoopPolicy()
2521        self.assertRaises(NotImplementedError, policy.get_event_loop)
2522        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
2523        self.assertRaises(NotImplementedError, policy.new_event_loop)
2524        self.assertRaises(NotImplementedError, policy.get_child_watcher)
2525        self.assertRaises(NotImplementedError, policy.set_child_watcher,
2526                          object())
2527
2528    def test_get_event_loop(self):
2529        policy = asyncio.DefaultEventLoopPolicy()
2530        self.assertIsNone(policy._local._loop)
2531
2532        loop = policy.get_event_loop()
2533        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2534
2535        self.assertIs(policy._local._loop, loop)
2536        self.assertIs(loop, policy.get_event_loop())
2537        loop.close()
2538
2539    def test_get_event_loop_calls_set_event_loop(self):
2540        policy = asyncio.DefaultEventLoopPolicy()
2541
2542        with mock.patch.object(
2543                policy, "set_event_loop",
2544                wraps=policy.set_event_loop) as m_set_event_loop:
2545
2546            loop = policy.get_event_loop()
2547
2548            # policy._local._loop must be set through .set_event_loop()
2549            # (the unix DefaultEventLoopPolicy needs this call to attach
2550            # the child watcher correctly)
2551            m_set_event_loop.assert_called_with(loop)
2552
2553        loop.close()
2554
2555    def test_get_event_loop_after_set_none(self):
2556        policy = asyncio.DefaultEventLoopPolicy()
2557        policy.set_event_loop(None)
2558        self.assertRaises(RuntimeError, policy.get_event_loop)
2559
2560    @mock.patch('asyncio.events.threading.current_thread')
2561    def test_get_event_loop_thread(self, m_current_thread):
2562
2563        def f():
2564            policy = asyncio.DefaultEventLoopPolicy()
2565            self.assertRaises(RuntimeError, policy.get_event_loop)
2566
2567        th = threading.Thread(target=f)
2568        th.start()
2569        th.join()
2570
2571    def test_new_event_loop(self):
2572        policy = asyncio.DefaultEventLoopPolicy()
2573
2574        loop = policy.new_event_loop()
2575        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2576        loop.close()
2577
2578    def test_set_event_loop(self):
2579        policy = asyncio.DefaultEventLoopPolicy()
2580        old_loop = policy.get_event_loop()
2581
2582        self.assertRaises(AssertionError, policy.set_event_loop, object())
2583
2584        loop = policy.new_event_loop()
2585        policy.set_event_loop(loop)
2586        self.assertIs(loop, policy.get_event_loop())
2587        self.assertIsNot(old_loop, policy.get_event_loop())
2588        loop.close()
2589        old_loop.close()
2590
2591    def test_get_event_loop_policy(self):
2592        policy = asyncio.get_event_loop_policy()
2593        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
2594        self.assertIs(policy, asyncio.get_event_loop_policy())
2595
2596    def test_set_event_loop_policy(self):
2597        self.assertRaises(
2598            AssertionError, asyncio.set_event_loop_policy, object())
2599
2600        old_policy = asyncio.get_event_loop_policy()
2601
2602        policy = asyncio.DefaultEventLoopPolicy()
2603        asyncio.set_event_loop_policy(policy)
2604        self.assertIs(policy, asyncio.get_event_loop_policy())
2605        self.assertIsNot(policy, old_policy)
2606
2607
2608class GetEventLoopTestsMixin:
2609
2610    _get_running_loop_impl = None
2611    _set_running_loop_impl = None
2612    get_running_loop_impl = None
2613    get_event_loop_impl = None
2614
2615    def setUp(self):
2616        self._get_running_loop_saved = events._get_running_loop
2617        self._set_running_loop_saved = events._set_running_loop
2618        self.get_running_loop_saved = events.get_running_loop
2619        self.get_event_loop_saved = events.get_event_loop
2620
2621        events._get_running_loop = type(self)._get_running_loop_impl
2622        events._set_running_loop = type(self)._set_running_loop_impl
2623        events.get_running_loop = type(self).get_running_loop_impl
2624        events.get_event_loop = type(self).get_event_loop_impl
2625
2626        asyncio._get_running_loop = type(self)._get_running_loop_impl
2627        asyncio._set_running_loop = type(self)._set_running_loop_impl
2628        asyncio.get_running_loop = type(self).get_running_loop_impl
2629        asyncio.get_event_loop = type(self).get_event_loop_impl
2630
2631        super().setUp()
2632
2633        self.loop = asyncio.new_event_loop()
2634        asyncio.set_event_loop(self.loop)
2635
2636        if sys.platform != 'win32':
2637            watcher = asyncio.SafeChildWatcher()
2638            watcher.attach_loop(self.loop)
2639            asyncio.set_child_watcher(watcher)
2640
2641    def tearDown(self):
2642        try:
2643            if sys.platform != 'win32':
2644                asyncio.set_child_watcher(None)
2645
2646            super().tearDown()
2647        finally:
2648            self.loop.close()
2649            asyncio.set_event_loop(None)
2650
2651            events._get_running_loop = self._get_running_loop_saved
2652            events._set_running_loop = self._set_running_loop_saved
2653            events.get_running_loop = self.get_running_loop_saved
2654            events.get_event_loop = self.get_event_loop_saved
2655
2656            asyncio._get_running_loop = self._get_running_loop_saved
2657            asyncio._set_running_loop = self._set_running_loop_saved
2658            asyncio.get_running_loop = self.get_running_loop_saved
2659            asyncio.get_event_loop = self.get_event_loop_saved
2660
2661    if sys.platform != 'win32':
2662
2663        def test_get_event_loop_new_process(self):
2664            # bpo-32126: The multiprocessing module used by
2665            # ProcessPoolExecutor is not functional when the
2666            # multiprocessing.synchronize module cannot be imported.
2667            support.skip_if_broken_multiprocessing_synchronize()
2668
2669            async def main():
2670                pool = concurrent.futures.ProcessPoolExecutor()
2671                result = await self.loop.run_in_executor(
2672                    pool, _test_get_event_loop_new_process__sub_proc)
2673                pool.shutdown()
2674                return result
2675
2676            self.assertEqual(
2677                self.loop.run_until_complete(main()),
2678                'hello')
2679
2680    def test_get_event_loop_returns_running_loop(self):
2681        class TestError(Exception):
2682            pass
2683
2684        class Policy(asyncio.DefaultEventLoopPolicy):
2685            def get_event_loop(self):
2686                raise TestError
2687
2688        old_policy = asyncio.get_event_loop_policy()
2689        try:
2690            asyncio.set_event_loop_policy(Policy())
2691            loop = asyncio.new_event_loop()
2692
2693            with self.assertRaises(TestError):
2694                asyncio.get_event_loop()
2695            asyncio.set_event_loop(None)
2696            with self.assertRaises(TestError):
2697                asyncio.get_event_loop()
2698
2699            with self.assertRaisesRegex(RuntimeError, 'no running'):
2700                self.assertIs(asyncio.get_running_loop(), None)
2701            self.assertIs(asyncio._get_running_loop(), None)
2702
2703            async def func():
2704                self.assertIs(asyncio.get_event_loop(), loop)
2705                self.assertIs(asyncio.get_running_loop(), loop)
2706                self.assertIs(asyncio._get_running_loop(), loop)
2707
2708            loop.run_until_complete(func())
2709
2710            asyncio.set_event_loop(loop)
2711            with self.assertRaises(TestError):
2712                asyncio.get_event_loop()
2713
2714            asyncio.set_event_loop(None)
2715            with self.assertRaises(TestError):
2716                asyncio.get_event_loop()
2717
2718        finally:
2719            asyncio.set_event_loop_policy(old_policy)
2720            if loop is not None:
2721                loop.close()
2722
2723        with self.assertRaisesRegex(RuntimeError, 'no running'):
2724            self.assertIs(asyncio.get_running_loop(), None)
2725
2726        self.assertIs(asyncio._get_running_loop(), None)
2727
2728
2729class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
2730
2731    _get_running_loop_impl = events._py__get_running_loop
2732    _set_running_loop_impl = events._py__set_running_loop
2733    get_running_loop_impl = events._py_get_running_loop
2734    get_event_loop_impl = events._py_get_event_loop
2735
2736
2737try:
2738    import _asyncio  # NoQA
2739except ImportError:
2740    pass
2741else:
2742
2743    class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
2744
2745        _get_running_loop_impl = events._c__get_running_loop
2746        _set_running_loop_impl = events._c__set_running_loop
2747        get_running_loop_impl = events._c_get_running_loop
2748        get_event_loop_impl = events._c_get_event_loop
2749
2750
2751class TestServer(unittest.TestCase):
2752
2753    def test_get_loop(self):
2754        loop = asyncio.new_event_loop()
2755        self.addCleanup(loop.close)
2756        proto = MyProto(loop)
2757        server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
2758        self.assertEqual(server.get_loop(), loop)
2759        server.close()
2760        loop.run_until_complete(server.wait_closed())
2761
2762
2763class TestAbstractServer(unittest.TestCase):
2764
2765    def test_close(self):
2766        with self.assertRaises(NotImplementedError):
2767            events.AbstractServer().close()
2768
2769    def test_wait_closed(self):
2770        loop = asyncio.new_event_loop()
2771        self.addCleanup(loop.close)
2772
2773        with self.assertRaises(NotImplementedError):
2774            loop.run_until_complete(events.AbstractServer().wait_closed())
2775
2776    def test_get_loop(self):
2777        with self.assertRaises(NotImplementedError):
2778            events.AbstractServer().get_loop()
2779
2780
2781if __name__ == '__main__':
2782    unittest.main()
2783