1"""Tests for events.py."""
2
3import collections.abc
4import concurrent.futures
5import functools
6import io
7import os
8import platform
9import re
10import signal
11import socket
12try:
13    import ssl
14except ImportError:
15    ssl = None
16import subprocess
17import sys
18import tempfile
19import threading
20import time
21import errno
22import unittest
23from unittest import mock
24import weakref
25
26if sys.platform != 'win32':
27    import tty
28
29import asyncio
30from asyncio import base_events
31from asyncio import constants
32from asyncio import coroutines
33from asyncio import events
34from asyncio import proactor_events
35from asyncio import selector_events
36from test.test_asyncio import utils as test_utils
37from test import support
38
39
40def broken_unix_getsockname():
41    """Return True if the platform is Mac OS 10.4 or older."""
42    if sys.platform.startswith("aix"):
43        return True
44    elif sys.platform != 'darwin':
45        return False
46    version = platform.mac_ver()[0]
47    version = tuple(map(int, version.split('.')))
48    return version < (10, 5)
49
50
51def _test_get_event_loop_new_process__sub_proc():
52    async def doit():
53        return 'hello'
54
55    loop = asyncio.new_event_loop()
56    asyncio.set_event_loop(loop)
57    return loop.run_until_complete(doit())
58
59
60class CoroLike:
61    def send(self, v):
62        pass
63
64    def throw(self, *exc):
65        pass
66
67    def close(self):
68        pass
69
70    def __await__(self):
71        pass
72
73
74class MyBaseProto(asyncio.Protocol):
75    connected = None
76    done = None
77
78    def __init__(self, loop=None):
79        self.transport = None
80        self.state = 'INITIAL'
81        self.nbytes = 0
82        if loop is not None:
83            self.connected = asyncio.Future(loop=loop)
84            self.done = asyncio.Future(loop=loop)
85
86    def connection_made(self, transport):
87        self.transport = transport
88        assert self.state == 'INITIAL', self.state
89        self.state = 'CONNECTED'
90        if self.connected:
91            self.connected.set_result(None)
92
93    def data_received(self, data):
94        assert self.state == 'CONNECTED', self.state
95        self.nbytes += len(data)
96
97    def eof_received(self):
98        assert self.state == 'CONNECTED', self.state
99        self.state = 'EOF'
100
101    def connection_lost(self, exc):
102        assert self.state in ('CONNECTED', 'EOF'), self.state
103        self.state = 'CLOSED'
104        if self.done:
105            self.done.set_result(None)
106
107
108class MyProto(MyBaseProto):
109    def connection_made(self, transport):
110        super().connection_made(transport)
111        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
112
113
114class MyDatagramProto(asyncio.DatagramProtocol):
115    done = None
116
117    def __init__(self, loop=None):
118        self.state = 'INITIAL'
119        self.nbytes = 0
120        if loop is not None:
121            self.done = asyncio.Future(loop=loop)
122
123    def connection_made(self, transport):
124        self.transport = transport
125        assert self.state == 'INITIAL', self.state
126        self.state = 'INITIALIZED'
127
128    def datagram_received(self, data, addr):
129        assert self.state == 'INITIALIZED', self.state
130        self.nbytes += len(data)
131
132    def error_received(self, exc):
133        assert self.state == 'INITIALIZED', self.state
134
135    def connection_lost(self, exc):
136        assert self.state == 'INITIALIZED', self.state
137        self.state = 'CLOSED'
138        if self.done:
139            self.done.set_result(None)
140
141
142class MyReadPipeProto(asyncio.Protocol):
143    done = None
144
145    def __init__(self, loop=None):
146        self.state = ['INITIAL']
147        self.nbytes = 0
148        self.transport = None
149        if loop is not None:
150            self.done = asyncio.Future(loop=loop)
151
152    def connection_made(self, transport):
153        self.transport = transport
154        assert self.state == ['INITIAL'], self.state
155        self.state.append('CONNECTED')
156
157    def data_received(self, data):
158        assert self.state == ['INITIAL', 'CONNECTED'], self.state
159        self.nbytes += len(data)
160
161    def eof_received(self):
162        assert self.state == ['INITIAL', 'CONNECTED'], self.state
163        self.state.append('EOF')
164
165    def connection_lost(self, exc):
166        if 'EOF' not in self.state:
167            self.state.append('EOF')  # It is okay if EOF is missed.
168        assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state
169        self.state.append('CLOSED')
170        if self.done:
171            self.done.set_result(None)
172
173
174class MyWritePipeProto(asyncio.BaseProtocol):
175    done = None
176
177    def __init__(self, loop=None):
178        self.state = 'INITIAL'
179        self.transport = None
180        if loop is not None:
181            self.done = asyncio.Future(loop=loop)
182
183    def connection_made(self, transport):
184        self.transport = transport
185        assert self.state == 'INITIAL', self.state
186        self.state = 'CONNECTED'
187
188    def connection_lost(self, exc):
189        assert self.state == 'CONNECTED', self.state
190        self.state = 'CLOSED'
191        if self.done:
192            self.done.set_result(None)
193
194
195class MySubprocessProtocol(asyncio.SubprocessProtocol):
196
197    def __init__(self, loop):
198        self.state = 'INITIAL'
199        self.transport = None
200        self.connected = asyncio.Future(loop=loop)
201        self.completed = asyncio.Future(loop=loop)
202        self.disconnects = {fd: asyncio.Future(loop=loop) for fd in range(3)}
203        self.data = {1: b'', 2: b''}
204        self.returncode = None
205        self.got_data = {1: asyncio.Event(loop=loop),
206                         2: asyncio.Event(loop=loop)}
207
208    def connection_made(self, transport):
209        self.transport = transport
210        assert self.state == 'INITIAL', self.state
211        self.state = 'CONNECTED'
212        self.connected.set_result(None)
213
214    def connection_lost(self, exc):
215        assert self.state == 'CONNECTED', self.state
216        self.state = 'CLOSED'
217        self.completed.set_result(None)
218
219    def pipe_data_received(self, fd, data):
220        assert self.state == 'CONNECTED', self.state
221        self.data[fd] += data
222        self.got_data[fd].set()
223
224    def pipe_connection_lost(self, fd, exc):
225        assert self.state == 'CONNECTED', self.state
226        if exc:
227            self.disconnects[fd].set_exception(exc)
228        else:
229            self.disconnects[fd].set_result(exc)
230
231    def process_exited(self):
232        assert self.state == 'CONNECTED', self.state
233        self.returncode = self.transport.get_returncode()
234
235
236class EventLoopTestsMixin:
237
238    def setUp(self):
239        super().setUp()
240        self.loop = self.create_event_loop()
241        self.set_event_loop(self.loop)
242
243    def tearDown(self):
244        # just in case if we have transport close callbacks
245        if not self.loop.is_closed():
246            test_utils.run_briefly(self.loop)
247
248        self.doCleanups()
249        support.gc_collect()
250        super().tearDown()
251
252    def test_run_until_complete_nesting(self):
253        @asyncio.coroutine
254        def coro1():
255            yield
256
257        @asyncio.coroutine
258        def coro2():
259            self.assertTrue(self.loop.is_running())
260            self.loop.run_until_complete(coro1())
261
262        self.assertRaises(
263            RuntimeError, self.loop.run_until_complete, coro2())
264
265    # Note: because of the default Windows timing granularity of
266    # 15.6 msec, we use fairly long sleep times here (~100 msec).
267
268    def test_run_until_complete(self):
269        t0 = self.loop.time()
270        self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
271        t1 = self.loop.time()
272        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
273
274    def test_run_until_complete_stopped(self):
275
276        async def cb():
277            self.loop.stop()
278            await asyncio.sleep(0.1, loop=self.loop)
279        task = cb()
280        self.assertRaises(RuntimeError,
281                          self.loop.run_until_complete, task)
282
283    def test_call_later(self):
284        results = []
285
286        def callback(arg):
287            results.append(arg)
288            self.loop.stop()
289
290        self.loop.call_later(0.1, callback, 'hello world')
291        t0 = time.monotonic()
292        self.loop.run_forever()
293        t1 = time.monotonic()
294        self.assertEqual(results, ['hello world'])
295        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
296
297    def test_call_soon(self):
298        results = []
299
300        def callback(arg1, arg2):
301            results.append((arg1, arg2))
302            self.loop.stop()
303
304        self.loop.call_soon(callback, 'hello', 'world')
305        self.loop.run_forever()
306        self.assertEqual(results, [('hello', 'world')])
307
308    def test_call_soon_threadsafe(self):
309        results = []
310        lock = threading.Lock()
311
312        def callback(arg):
313            results.append(arg)
314            if len(results) >= 2:
315                self.loop.stop()
316
317        def run_in_thread():
318            self.loop.call_soon_threadsafe(callback, 'hello')
319            lock.release()
320
321        lock.acquire()
322        t = threading.Thread(target=run_in_thread)
323        t.start()
324
325        with lock:
326            self.loop.call_soon(callback, 'world')
327            self.loop.run_forever()
328        t.join()
329        self.assertEqual(results, ['hello', 'world'])
330
331    def test_call_soon_threadsafe_same_thread(self):
332        results = []
333
334        def callback(arg):
335            results.append(arg)
336            if len(results) >= 2:
337                self.loop.stop()
338
339        self.loop.call_soon_threadsafe(callback, 'hello')
340        self.loop.call_soon(callback, 'world')
341        self.loop.run_forever()
342        self.assertEqual(results, ['hello', 'world'])
343
344    def test_run_in_executor(self):
345        def run(arg):
346            return (arg, threading.get_ident())
347        f2 = self.loop.run_in_executor(None, run, 'yo')
348        res, thread_id = self.loop.run_until_complete(f2)
349        self.assertEqual(res, 'yo')
350        self.assertNotEqual(thread_id, threading.get_ident())
351
352    def test_run_in_executor_cancel(self):
353        called = False
354
355        def patched_call_soon(*args):
356            nonlocal called
357            called = True
358
359        def run():
360            time.sleep(0.05)
361
362        f2 = self.loop.run_in_executor(None, run)
363        f2.cancel()
364        self.loop.close()
365        self.loop.call_soon = patched_call_soon
366        self.loop.call_soon_threadsafe = patched_call_soon
367        time.sleep(0.4)
368        self.assertFalse(called)
369
370    def test_reader_callback(self):
371        r, w = socket.socketpair()
372        r.setblocking(False)
373        bytes_read = bytearray()
374
375        def reader():
376            try:
377                data = r.recv(1024)
378            except BlockingIOError:
379                # Spurious readiness notifications are possible
380                # at least on Linux -- see man select.
381                return
382            if data:
383                bytes_read.extend(data)
384            else:
385                self.assertTrue(self.loop.remove_reader(r.fileno()))
386                r.close()
387
388        self.loop.add_reader(r.fileno(), reader)
389        self.loop.call_soon(w.send, b'abc')
390        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
391        self.loop.call_soon(w.send, b'def')
392        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
393        self.loop.call_soon(w.close)
394        self.loop.call_soon(self.loop.stop)
395        self.loop.run_forever()
396        self.assertEqual(bytes_read, b'abcdef')
397
398    def test_writer_callback(self):
399        r, w = socket.socketpair()
400        w.setblocking(False)
401
402        def writer(data):
403            w.send(data)
404            self.loop.stop()
405
406        data = b'x' * 1024
407        self.loop.add_writer(w.fileno(), writer, data)
408        self.loop.run_forever()
409
410        self.assertTrue(self.loop.remove_writer(w.fileno()))
411        self.assertFalse(self.loop.remove_writer(w.fileno()))
412
413        w.close()
414        read = r.recv(len(data) * 2)
415        r.close()
416        self.assertEqual(read, data)
417
418    def _basetest_sock_client_ops(self, httpd, sock):
419        if not isinstance(self.loop, proactor_events.BaseProactorEventLoop):
420            # in debug mode, socket operations must fail
421            # if the socket is not in blocking mode
422            self.loop.set_debug(True)
423            sock.setblocking(True)
424            with self.assertRaises(ValueError):
425                self.loop.run_until_complete(
426                    self.loop.sock_connect(sock, httpd.address))
427            with self.assertRaises(ValueError):
428                self.loop.run_until_complete(
429                    self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
430            with self.assertRaises(ValueError):
431                self.loop.run_until_complete(
432                    self.loop.sock_recv(sock, 1024))
433            with self.assertRaises(ValueError):
434                self.loop.run_until_complete(
435                    self.loop.sock_recv_into(sock, bytearray()))
436            with self.assertRaises(ValueError):
437                self.loop.run_until_complete(
438                    self.loop.sock_accept(sock))
439
440        # test in non-blocking mode
441        sock.setblocking(False)
442        self.loop.run_until_complete(
443            self.loop.sock_connect(sock, httpd.address))
444        self.loop.run_until_complete(
445            self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
446        data = self.loop.run_until_complete(
447            self.loop.sock_recv(sock, 1024))
448        # consume data
449        self.loop.run_until_complete(
450            self.loop.sock_recv(sock, 1024))
451        sock.close()
452        self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
453
454    def _basetest_sock_recv_into(self, httpd, sock):
455        # same as _basetest_sock_client_ops, but using sock_recv_into
456        sock.setblocking(False)
457        self.loop.run_until_complete(
458            self.loop.sock_connect(sock, httpd.address))
459        self.loop.run_until_complete(
460            self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
461        data = bytearray(1024)
462        with memoryview(data) as buf:
463            nbytes = self.loop.run_until_complete(
464                self.loop.sock_recv_into(sock, buf[:1024]))
465            # consume data
466            self.loop.run_until_complete(
467                self.loop.sock_recv_into(sock, buf[nbytes:]))
468        sock.close()
469        self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
470
471    def test_sock_client_ops(self):
472        with test_utils.run_test_server() as httpd:
473            sock = socket.socket()
474            self._basetest_sock_client_ops(httpd, sock)
475            sock = socket.socket()
476            self._basetest_sock_recv_into(httpd, sock)
477
478    @support.skip_unless_bind_unix_socket
479    def test_unix_sock_client_ops(self):
480        with test_utils.run_test_unix_server() as httpd:
481            sock = socket.socket(socket.AF_UNIX)
482            self._basetest_sock_client_ops(httpd, sock)
483            sock = socket.socket(socket.AF_UNIX)
484            self._basetest_sock_recv_into(httpd, sock)
485
486    def test_sock_client_fail(self):
487        # Make sure that we will get an unused port
488        address = None
489        try:
490            s = socket.socket()
491            s.bind(('127.0.0.1', 0))
492            address = s.getsockname()
493        finally:
494            s.close()
495
496        sock = socket.socket()
497        sock.setblocking(False)
498        with self.assertRaises(ConnectionRefusedError):
499            self.loop.run_until_complete(
500                self.loop.sock_connect(sock, address))
501        sock.close()
502
503    def test_sock_accept(self):
504        listener = socket.socket()
505        listener.setblocking(False)
506        listener.bind(('127.0.0.1', 0))
507        listener.listen(1)
508        client = socket.socket()
509        client.connect(listener.getsockname())
510
511        f = self.loop.sock_accept(listener)
512        conn, addr = self.loop.run_until_complete(f)
513        self.assertEqual(conn.gettimeout(), 0)
514        self.assertEqual(addr, client.getsockname())
515        self.assertEqual(client.getpeername(), listener.getsockname())
516        client.close()
517        conn.close()
518        listener.close()
519
520    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
521    def test_add_signal_handler(self):
522        caught = 0
523
524        def my_handler():
525            nonlocal caught
526            caught += 1
527
528        # Check error behavior first.
529        self.assertRaises(
530            TypeError, self.loop.add_signal_handler, 'boom', my_handler)
531        self.assertRaises(
532            TypeError, self.loop.remove_signal_handler, 'boom')
533        self.assertRaises(
534            ValueError, self.loop.add_signal_handler, signal.NSIG+1,
535            my_handler)
536        self.assertRaises(
537            ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
538        self.assertRaises(
539            ValueError, self.loop.add_signal_handler, 0, my_handler)
540        self.assertRaises(
541            ValueError, self.loop.remove_signal_handler, 0)
542        self.assertRaises(
543            ValueError, self.loop.add_signal_handler, -1, my_handler)
544        self.assertRaises(
545            ValueError, self.loop.remove_signal_handler, -1)
546        self.assertRaises(
547            RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
548            my_handler)
549        # Removing SIGKILL doesn't raise, since we don't call signal().
550        self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
551        # Now set a handler and handle it.
552        self.loop.add_signal_handler(signal.SIGINT, my_handler)
553
554        os.kill(os.getpid(), signal.SIGINT)
555        test_utils.run_until(self.loop, lambda: caught)
556
557        # Removing it should restore the default handler.
558        self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
559        self.assertEqual(signal.getsignal(signal.SIGINT),
560                         signal.default_int_handler)
561        # Removing again returns False.
562        self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
563
564    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
565    def test_signal_handling_while_selecting(self):
566        # Test with a signal actually arriving during a select() call.
567        caught = 0
568
569        def my_handler():
570            nonlocal caught
571            caught += 1
572            self.loop.stop()
573
574        self.loop.add_signal_handler(signal.SIGALRM, my_handler)
575
576        signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
577        self.loop.call_later(60, self.loop.stop)
578        self.loop.run_forever()
579        self.assertEqual(caught, 1)
580
581    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
582    def test_signal_handling_args(self):
583        some_args = (42,)
584        caught = 0
585
586        def my_handler(*args):
587            nonlocal caught
588            caught += 1
589            self.assertEqual(args, some_args)
590            self.loop.stop()
591
592        self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
593
594        signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
595        self.loop.call_later(60, self.loop.stop)
596        self.loop.run_forever()
597        self.assertEqual(caught, 1)
598
599    def _basetest_create_connection(self, connection_fut, check_sockname=True):
600        tr, pr = self.loop.run_until_complete(connection_fut)
601        self.assertIsInstance(tr, asyncio.Transport)
602        self.assertIsInstance(pr, asyncio.Protocol)
603        self.assertIs(pr.transport, tr)
604        if check_sockname:
605            self.assertIsNotNone(tr.get_extra_info('sockname'))
606        self.loop.run_until_complete(pr.done)
607        self.assertGreater(pr.nbytes, 0)
608        tr.close()
609
610    def test_create_connection(self):
611        with test_utils.run_test_server() as httpd:
612            conn_fut = self.loop.create_connection(
613                lambda: MyProto(loop=self.loop), *httpd.address)
614            self._basetest_create_connection(conn_fut)
615
616    @support.skip_unless_bind_unix_socket
617    def test_create_unix_connection(self):
618        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
619        # zero-length address for UNIX socket.
620        check_sockname = not broken_unix_getsockname()
621
622        with test_utils.run_test_unix_server() as httpd:
623            conn_fut = self.loop.create_unix_connection(
624                lambda: MyProto(loop=self.loop), httpd.address)
625            self._basetest_create_connection(conn_fut, check_sockname)
626
627    def test_create_connection_sock(self):
628        with test_utils.run_test_server() as httpd:
629            sock = None
630            infos = self.loop.run_until_complete(
631                self.loop.getaddrinfo(
632                    *httpd.address, type=socket.SOCK_STREAM))
633            for family, type, proto, cname, address in infos:
634                try:
635                    sock = socket.socket(family=family, type=type, proto=proto)
636                    sock.setblocking(False)
637                    self.loop.run_until_complete(
638                        self.loop.sock_connect(sock, address))
639                except:
640                    pass
641                else:
642                    break
643            else:
644                assert False, 'Can not create socket.'
645
646            f = self.loop.create_connection(
647                lambda: MyProto(loop=self.loop), sock=sock)
648            tr, pr = self.loop.run_until_complete(f)
649            self.assertIsInstance(tr, asyncio.Transport)
650            self.assertIsInstance(pr, asyncio.Protocol)
651            self.loop.run_until_complete(pr.done)
652            self.assertGreater(pr.nbytes, 0)
653            tr.close()
654
655    def check_ssl_extra_info(self, client, check_sockname=True,
656                             peername=None, peercert={}):
657        if check_sockname:
658            self.assertIsNotNone(client.get_extra_info('sockname'))
659        if peername:
660            self.assertEqual(peername,
661                             client.get_extra_info('peername'))
662        else:
663            self.assertIsNotNone(client.get_extra_info('peername'))
664        self.assertEqual(peercert,
665                         client.get_extra_info('peercert'))
666
667        # test SSL cipher
668        cipher = client.get_extra_info('cipher')
669        self.assertIsInstance(cipher, tuple)
670        self.assertEqual(len(cipher), 3, cipher)
671        self.assertIsInstance(cipher[0], str)
672        self.assertIsInstance(cipher[1], str)
673        self.assertIsInstance(cipher[2], int)
674
675        # test SSL object
676        sslobj = client.get_extra_info('ssl_object')
677        self.assertIsNotNone(sslobj)
678        self.assertEqual(sslobj.compression(),
679                         client.get_extra_info('compression'))
680        self.assertEqual(sslobj.cipher(),
681                         client.get_extra_info('cipher'))
682        self.assertEqual(sslobj.getpeercert(),
683                         client.get_extra_info('peercert'))
684        self.assertEqual(sslobj.compression(),
685                         client.get_extra_info('compression'))
686
687    def _basetest_create_ssl_connection(self, connection_fut,
688                                        check_sockname=True,
689                                        peername=None):
690        tr, pr = self.loop.run_until_complete(connection_fut)
691        self.assertIsInstance(tr, asyncio.Transport)
692        self.assertIsInstance(pr, asyncio.Protocol)
693        self.assertTrue('ssl' in tr.__class__.__name__.lower())
694        self.check_ssl_extra_info(tr, check_sockname, peername)
695        self.loop.run_until_complete(pr.done)
696        self.assertGreater(pr.nbytes, 0)
697        tr.close()
698
699    def _test_create_ssl_connection(self, httpd, create_connection,
700                                    check_sockname=True, peername=None):
701        conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
702        self._basetest_create_ssl_connection(conn_fut, check_sockname,
703                                             peername)
704
705        # ssl.Purpose was introduced in Python 3.4
706        if hasattr(ssl, 'Purpose'):
707            def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
708                                          cafile=None, capath=None,
709                                          cadata=None):
710                """
711                A ssl.create_default_context() replacement that doesn't enable
712                cert validation.
713                """
714                self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
715                return test_utils.dummy_ssl_context()
716
717            # With ssl=True, ssl.create_default_context() should be called
718            with mock.patch('ssl.create_default_context',
719                            side_effect=_dummy_ssl_create_context) as m:
720                conn_fut = create_connection(ssl=True)
721                self._basetest_create_ssl_connection(conn_fut, check_sockname,
722                                                     peername)
723                self.assertEqual(m.call_count, 1)
724
725        # With the real ssl.create_default_context(), certificate
726        # validation will fail
727        with self.assertRaises(ssl.SSLError) as cm:
728            conn_fut = create_connection(ssl=True)
729            # Ignore the "SSL handshake failed" log in debug mode
730            with test_utils.disable_logger():
731                self._basetest_create_ssl_connection(conn_fut, check_sockname,
732                                                     peername)
733
734        self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
735
736    @unittest.skipIf(ssl is None, 'No ssl module')
737    def test_create_ssl_connection(self):
738        with test_utils.run_test_server(use_ssl=True) as httpd:
739            create_connection = functools.partial(
740                self.loop.create_connection,
741                lambda: MyProto(loop=self.loop),
742                *httpd.address)
743            self._test_create_ssl_connection(httpd, create_connection,
744                                             peername=httpd.address)
745
746    @support.skip_unless_bind_unix_socket
747    @unittest.skipIf(ssl is None, 'No ssl module')
748    def test_create_ssl_unix_connection(self):
749        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
750        # zero-length address for UNIX socket.
751        check_sockname = not broken_unix_getsockname()
752
753        with test_utils.run_test_unix_server(use_ssl=True) as httpd:
754            create_connection = functools.partial(
755                self.loop.create_unix_connection,
756                lambda: MyProto(loop=self.loop), httpd.address,
757                server_hostname='127.0.0.1')
758
759            self._test_create_ssl_connection(httpd, create_connection,
760                                             check_sockname,
761                                             peername=httpd.address)
762
763    def test_create_connection_local_addr(self):
764        with test_utils.run_test_server() as httpd:
765            port = support.find_unused_port()
766            f = self.loop.create_connection(
767                lambda: MyProto(loop=self.loop),
768                *httpd.address, local_addr=(httpd.address[0], port))
769            tr, pr = self.loop.run_until_complete(f)
770            expected = pr.transport.get_extra_info('sockname')[1]
771            self.assertEqual(port, expected)
772            tr.close()
773
774    def test_create_connection_local_addr_in_use(self):
775        with test_utils.run_test_server() as httpd:
776            f = self.loop.create_connection(
777                lambda: MyProto(loop=self.loop),
778                *httpd.address, local_addr=httpd.address)
779            with self.assertRaises(OSError) as cm:
780                self.loop.run_until_complete(f)
781            self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
782            self.assertIn(str(httpd.address), cm.exception.strerror)
783
784    def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
785        loop = self.loop
786
787        class MyProto(MyBaseProto):
788
789            def connection_lost(self, exc):
790                super().connection_lost(exc)
791                loop.call_soon(loop.stop)
792
793            def data_received(self, data):
794                super().data_received(data)
795                self.transport.write(expected_response)
796
797        lsock = socket.socket()
798        lsock.bind(('127.0.0.1', 0))
799        lsock.listen(1)
800        addr = lsock.getsockname()
801
802        message = b'test data'
803        response = None
804        expected_response = b'roger'
805
806        def client():
807            nonlocal response
808            try:
809                csock = socket.socket()
810                if client_ssl is not None:
811                    csock = client_ssl.wrap_socket(csock)
812                csock.connect(addr)
813                csock.sendall(message)
814                response = csock.recv(99)
815                csock.close()
816            except Exception as exc:
817                print(
818                    "Failure in client thread in test_connect_accepted_socket",
819                    exc)
820
821        thread = threading.Thread(target=client, daemon=True)
822        thread.start()
823
824        conn, _ = lsock.accept()
825        proto = MyProto(loop=loop)
826        proto.loop = loop
827        loop.run_until_complete(
828            loop.connect_accepted_socket(
829                (lambda: proto), conn, ssl=server_ssl))
830        loop.run_forever()
831        proto.transport.close()
832        lsock.close()
833
834        support.join_thread(thread, timeout=1)
835        self.assertFalse(thread.is_alive())
836        self.assertEqual(proto.state, 'CLOSED')
837        self.assertEqual(proto.nbytes, len(message))
838        self.assertEqual(response, expected_response)
839
840    @unittest.skipIf(ssl is None, 'No ssl module')
841    def test_ssl_connect_accepted_socket(self):
842        if (sys.platform == 'win32' and
843            sys.version_info < (3, 5) and
844            isinstance(self.loop, proactor_events.BaseProactorEventLoop)
845            ):
846            raise unittest.SkipTest(
847                'SSL not supported with proactor event loops before Python 3.5'
848                )
849
850        server_context = test_utils.simple_server_sslcontext()
851        client_context = test_utils.simple_client_sslcontext()
852
853        self.test_connect_accepted_socket(server_context, client_context)
854
855    def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self):
856        sock = socket.socket()
857        self.addCleanup(sock.close)
858        coro = self.loop.connect_accepted_socket(
859            MyProto, sock, ssl_handshake_timeout=1)
860        with self.assertRaisesRegex(
861                ValueError,
862                'ssl_handshake_timeout is only meaningful with ssl'):
863            self.loop.run_until_complete(coro)
864
865    @mock.patch('asyncio.base_events.socket')
866    def create_server_multiple_hosts(self, family, hosts, mock_sock):
867        @asyncio.coroutine
868        def getaddrinfo(host, port, *args, **kw):
869            if family == socket.AF_INET:
870                return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
871            else:
872                return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
873
874        def getaddrinfo_task(*args, **kwds):
875            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
876
877        unique_hosts = set(hosts)
878
879        if family == socket.AF_INET:
880            mock_sock.socket().getsockbyname.side_effect = [
881                (host, 80) for host in unique_hosts]
882        else:
883            mock_sock.socket().getsockbyname.side_effect = [
884                (host, 80, 0, 0) for host in unique_hosts]
885        self.loop.getaddrinfo = getaddrinfo_task
886        self.loop._start_serving = mock.Mock()
887        self.loop._stop_serving = mock.Mock()
888        f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
889        server = self.loop.run_until_complete(f)
890        self.addCleanup(server.close)
891        server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
892        self.assertEqual(server_hosts, unique_hosts)
893
894    def test_create_server_multiple_hosts_ipv4(self):
895        self.create_server_multiple_hosts(socket.AF_INET,
896                                          ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
897
898    def test_create_server_multiple_hosts_ipv6(self):
899        self.create_server_multiple_hosts(socket.AF_INET6,
900                                          ['::1', '::2', '::1'])
901
902    def test_create_server(self):
903        proto = MyProto(self.loop)
904        f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
905        server = self.loop.run_until_complete(f)
906        self.assertEqual(len(server.sockets), 1)
907        sock = server.sockets[0]
908        host, port = sock.getsockname()
909        self.assertEqual(host, '0.0.0.0')
910        client = socket.socket()
911        client.connect(('127.0.0.1', port))
912        client.sendall(b'xxx')
913
914        self.loop.run_until_complete(proto.connected)
915        self.assertEqual('CONNECTED', proto.state)
916
917        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
918        self.assertEqual(3, proto.nbytes)
919
920        # extra info is available
921        self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
922        self.assertEqual('127.0.0.1',
923                         proto.transport.get_extra_info('peername')[0])
924
925        # close connection
926        proto.transport.close()
927        self.loop.run_until_complete(proto.done)
928
929        self.assertEqual('CLOSED', proto.state)
930
931        # the client socket must be closed after to avoid ECONNRESET upon
932        # recv()/send() on the serving socket
933        client.close()
934
935        # close server
936        server.close()
937
938    @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
939    def test_create_server_reuse_port(self):
940        proto = MyProto(self.loop)
941        f = self.loop.create_server(
942            lambda: proto, '0.0.0.0', 0)
943        server = self.loop.run_until_complete(f)
944        self.assertEqual(len(server.sockets), 1)
945        sock = server.sockets[0]
946        self.assertFalse(
947            sock.getsockopt(
948                socket.SOL_SOCKET, socket.SO_REUSEPORT))
949        server.close()
950
951        test_utils.run_briefly(self.loop)
952
953        proto = MyProto(self.loop)
954        f = self.loop.create_server(
955            lambda: proto, '0.0.0.0', 0, reuse_port=True)
956        server = self.loop.run_until_complete(f)
957        self.assertEqual(len(server.sockets), 1)
958        sock = server.sockets[0]
959        self.assertTrue(
960            sock.getsockopt(
961                socket.SOL_SOCKET, socket.SO_REUSEPORT))
962        server.close()
963
964    def _make_unix_server(self, factory, **kwargs):
965        path = test_utils.gen_unix_socket_path()
966        self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
967
968        f = self.loop.create_unix_server(factory, path, **kwargs)
969        server = self.loop.run_until_complete(f)
970
971        return server, path
972
973    @support.skip_unless_bind_unix_socket
974    def test_create_unix_server(self):
975        proto = MyProto(loop=self.loop)
976        server, path = self._make_unix_server(lambda: proto)
977        self.assertEqual(len(server.sockets), 1)
978
979        client = socket.socket(socket.AF_UNIX)
980        client.connect(path)
981        client.sendall(b'xxx')
982
983        self.loop.run_until_complete(proto.connected)
984        self.assertEqual('CONNECTED', proto.state)
985        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
986        self.assertEqual(3, proto.nbytes)
987
988        # close connection
989        proto.transport.close()
990        self.loop.run_until_complete(proto.done)
991
992        self.assertEqual('CLOSED', proto.state)
993
994        # the client socket must be closed after to avoid ECONNRESET upon
995        # recv()/send() on the serving socket
996        client.close()
997
998        # close server
999        server.close()
1000
1001    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1002    def test_create_unix_server_path_socket_error(self):
1003        proto = MyProto(loop=self.loop)
1004        sock = socket.socket()
1005        with sock:
1006            f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
1007            with self.assertRaisesRegex(ValueError,
1008                                        'path and sock can not be specified '
1009                                        'at the same time'):
1010                self.loop.run_until_complete(f)
1011
1012    def _create_ssl_context(self, certfile, keyfile=None):
1013        sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1014        sslcontext.options |= ssl.OP_NO_SSLv2
1015        sslcontext.load_cert_chain(certfile, keyfile)
1016        return sslcontext
1017
1018    def _make_ssl_server(self, factory, certfile, keyfile=None):
1019        sslcontext = self._create_ssl_context(certfile, keyfile)
1020
1021        f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
1022        server = self.loop.run_until_complete(f)
1023
1024        sock = server.sockets[0]
1025        host, port = sock.getsockname()
1026        self.assertEqual(host, '127.0.0.1')
1027        return server, host, port
1028
1029    def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
1030        sslcontext = self._create_ssl_context(certfile, keyfile)
1031        return self._make_unix_server(factory, ssl=sslcontext)
1032
1033    @unittest.skipIf(ssl is None, 'No ssl module')
1034    def test_create_server_ssl(self):
1035        proto = MyProto(loop=self.loop)
1036        server, host, port = self._make_ssl_server(
1037            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
1038
1039        f_c = self.loop.create_connection(MyBaseProto, host, port,
1040                                          ssl=test_utils.dummy_ssl_context())
1041        client, pr = self.loop.run_until_complete(f_c)
1042
1043        client.write(b'xxx')
1044        self.loop.run_until_complete(proto.connected)
1045        self.assertEqual('CONNECTED', proto.state)
1046
1047        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
1048        self.assertEqual(3, proto.nbytes)
1049
1050        # extra info is available
1051        self.check_ssl_extra_info(client, peername=(host, port))
1052
1053        # close connection
1054        proto.transport.close()
1055        self.loop.run_until_complete(proto.done)
1056        self.assertEqual('CLOSED', proto.state)
1057
1058        # the client socket must be closed after to avoid ECONNRESET upon
1059        # recv()/send() on the serving socket
1060        client.close()
1061
1062        # stop serving
1063        server.close()
1064
1065    @support.skip_unless_bind_unix_socket
1066    @unittest.skipIf(ssl is None, 'No ssl module')
1067    def test_create_unix_server_ssl(self):
1068        proto = MyProto(loop=self.loop)
1069        server, path = self._make_ssl_unix_server(
1070            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
1071
1072        f_c = self.loop.create_unix_connection(
1073            MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
1074            server_hostname='')
1075
1076        client, pr = self.loop.run_until_complete(f_c)
1077
1078        client.write(b'xxx')
1079        self.loop.run_until_complete(proto.connected)
1080        self.assertEqual('CONNECTED', proto.state)
1081        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
1082        self.assertEqual(3, proto.nbytes)
1083
1084        # close connection
1085        proto.transport.close()
1086        self.loop.run_until_complete(proto.done)
1087        self.assertEqual('CLOSED', proto.state)
1088
1089        # the client socket must be closed after to avoid ECONNRESET upon
1090        # recv()/send() on the serving socket
1091        client.close()
1092
1093        # stop serving
1094        server.close()
1095
1096    @unittest.skipIf(ssl is None, 'No ssl module')
1097    def test_create_server_ssl_verify_failed(self):
1098        proto = MyProto(loop=self.loop)
1099        server, host, port = self._make_ssl_server(
1100            lambda: proto, test_utils.SIGNED_CERTFILE)
1101
1102        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1103        sslcontext_client.options |= ssl.OP_NO_SSLv2
1104        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1105        if hasattr(sslcontext_client, 'check_hostname'):
1106            sslcontext_client.check_hostname = True
1107
1108
1109        # no CA loaded
1110        f_c = self.loop.create_connection(MyProto, host, port,
1111                                          ssl=sslcontext_client)
1112        with mock.patch.object(self.loop, 'call_exception_handler'):
1113            with test_utils.disable_logger():
1114                with self.assertRaisesRegex(ssl.SSLError,
1115                                            '(?i)certificate.verify.failed'):
1116                    self.loop.run_until_complete(f_c)
1117
1118            # execute the loop to log the connection error
1119            test_utils.run_briefly(self.loop)
1120
1121        # close connection
1122        self.assertIsNone(proto.transport)
1123        server.close()
1124
1125    @support.skip_unless_bind_unix_socket
1126    @unittest.skipIf(ssl is None, 'No ssl module')
1127    def test_create_unix_server_ssl_verify_failed(self):
1128        proto = MyProto(loop=self.loop)
1129        server, path = self._make_ssl_unix_server(
1130            lambda: proto, test_utils.SIGNED_CERTFILE)
1131
1132        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1133        sslcontext_client.options |= ssl.OP_NO_SSLv2
1134        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1135        if hasattr(sslcontext_client, 'check_hostname'):
1136            sslcontext_client.check_hostname = True
1137
1138        # no CA loaded
1139        f_c = self.loop.create_unix_connection(MyProto, path,
1140                                               ssl=sslcontext_client,
1141                                               server_hostname='invalid')
1142        with mock.patch.object(self.loop, 'call_exception_handler'):
1143            with test_utils.disable_logger():
1144                with self.assertRaisesRegex(ssl.SSLError,
1145                                            '(?i)certificate.verify.failed'):
1146                    self.loop.run_until_complete(f_c)
1147
1148            # execute the loop to log the connection error
1149            test_utils.run_briefly(self.loop)
1150
1151        # close connection
1152        self.assertIsNone(proto.transport)
1153        server.close()
1154
1155    @unittest.skipIf(ssl is None, 'No ssl module')
1156    def test_create_server_ssl_match_failed(self):
1157        proto = MyProto(loop=self.loop)
1158        server, host, port = self._make_ssl_server(
1159            lambda: proto, test_utils.SIGNED_CERTFILE)
1160
1161        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1162        sslcontext_client.options |= ssl.OP_NO_SSLv2
1163        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1164        sslcontext_client.load_verify_locations(
1165            cafile=test_utils.SIGNING_CA)
1166        if hasattr(sslcontext_client, 'check_hostname'):
1167            sslcontext_client.check_hostname = True
1168
1169        # incorrect server_hostname
1170        f_c = self.loop.create_connection(MyProto, host, port,
1171                                          ssl=sslcontext_client)
1172        with mock.patch.object(self.loop, 'call_exception_handler'):
1173            with test_utils.disable_logger():
1174                with self.assertRaisesRegex(
1175                        ssl.CertificateError,
1176                        "IP address mismatch, certificate is not valid for "
1177                        "'127.0.0.1'"):
1178                    self.loop.run_until_complete(f_c)
1179
1180        # close connection
1181        # transport is None because TLS ALERT aborted the handshake
1182        self.assertIsNone(proto.transport)
1183        server.close()
1184
1185    @support.skip_unless_bind_unix_socket
1186    @unittest.skipIf(ssl is None, 'No ssl module')
1187    def test_create_unix_server_ssl_verified(self):
1188        proto = MyProto(loop=self.loop)
1189        server, path = self._make_ssl_unix_server(
1190            lambda: proto, test_utils.SIGNED_CERTFILE)
1191
1192        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1193        sslcontext_client.options |= ssl.OP_NO_SSLv2
1194        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1195        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1196        if hasattr(sslcontext_client, 'check_hostname'):
1197            sslcontext_client.check_hostname = True
1198
1199        # Connection succeeds with correct CA and server hostname.
1200        f_c = self.loop.create_unix_connection(MyProto, path,
1201                                               ssl=sslcontext_client,
1202                                               server_hostname='localhost')
1203        client, pr = self.loop.run_until_complete(f_c)
1204
1205        # close connection
1206        proto.transport.close()
1207        client.close()
1208        server.close()
1209        self.loop.run_until_complete(proto.done)
1210
1211    @unittest.skipIf(ssl is None, 'No ssl module')
1212    def test_create_server_ssl_verified(self):
1213        proto = MyProto(loop=self.loop)
1214        server, host, port = self._make_ssl_server(
1215            lambda: proto, test_utils.SIGNED_CERTFILE)
1216
1217        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1218        sslcontext_client.options |= ssl.OP_NO_SSLv2
1219        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1220        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1221        if hasattr(sslcontext_client, 'check_hostname'):
1222            sslcontext_client.check_hostname = True
1223
1224        # Connection succeeds with correct CA and server hostname.
1225        f_c = self.loop.create_connection(MyProto, host, port,
1226                                          ssl=sslcontext_client,
1227                                          server_hostname='localhost')
1228        client, pr = self.loop.run_until_complete(f_c)
1229
1230        # extra info is available
1231        self.check_ssl_extra_info(client,peername=(host, port),
1232                                  peercert=test_utils.PEERCERT)
1233
1234        # close connection
1235        proto.transport.close()
1236        client.close()
1237        server.close()
1238        self.loop.run_until_complete(proto.done)
1239
1240    def test_create_server_sock(self):
1241        proto = asyncio.Future(loop=self.loop)
1242
1243        class TestMyProto(MyProto):
1244            def connection_made(self, transport):
1245                super().connection_made(transport)
1246                proto.set_result(self)
1247
1248        sock_ob = socket.socket(type=socket.SOCK_STREAM)
1249        sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1250        sock_ob.bind(('0.0.0.0', 0))
1251
1252        f = self.loop.create_server(TestMyProto, sock=sock_ob)
1253        server = self.loop.run_until_complete(f)
1254        sock = server.sockets[0]
1255        self.assertIs(sock, sock_ob)
1256
1257        host, port = sock.getsockname()
1258        self.assertEqual(host, '0.0.0.0')
1259        client = socket.socket()
1260        client.connect(('127.0.0.1', port))
1261        client.send(b'xxx')
1262        client.close()
1263        server.close()
1264
1265    def test_create_server_addr_in_use(self):
1266        sock_ob = socket.socket(type=socket.SOCK_STREAM)
1267        sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1268        sock_ob.bind(('0.0.0.0', 0))
1269
1270        f = self.loop.create_server(MyProto, sock=sock_ob)
1271        server = self.loop.run_until_complete(f)
1272        sock = server.sockets[0]
1273        host, port = sock.getsockname()
1274
1275        f = self.loop.create_server(MyProto, host=host, port=port)
1276        with self.assertRaises(OSError) as cm:
1277            self.loop.run_until_complete(f)
1278        self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
1279
1280        server.close()
1281
1282    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1283    def test_create_server_dual_stack(self):
1284        f_proto = asyncio.Future(loop=self.loop)
1285
1286        class TestMyProto(MyProto):
1287            def connection_made(self, transport):
1288                super().connection_made(transport)
1289                f_proto.set_result(self)
1290
1291        try_count = 0
1292        while True:
1293            try:
1294                port = support.find_unused_port()
1295                f = self.loop.create_server(TestMyProto, host=None, port=port)
1296                server = self.loop.run_until_complete(f)
1297            except OSError as ex:
1298                if ex.errno == errno.EADDRINUSE:
1299                    try_count += 1
1300                    self.assertGreaterEqual(5, try_count)
1301                    continue
1302                else:
1303                    raise
1304            else:
1305                break
1306        client = socket.socket()
1307        client.connect(('127.0.0.1', port))
1308        client.send(b'xxx')
1309        proto = self.loop.run_until_complete(f_proto)
1310        proto.transport.close()
1311        client.close()
1312
1313        f_proto = asyncio.Future(loop=self.loop)
1314        client = socket.socket(socket.AF_INET6)
1315        client.connect(('::1', port))
1316        client.send(b'xxx')
1317        proto = self.loop.run_until_complete(f_proto)
1318        proto.transport.close()
1319        client.close()
1320
1321        server.close()
1322
1323    def test_server_close(self):
1324        f = self.loop.create_server(MyProto, '0.0.0.0', 0)
1325        server = self.loop.run_until_complete(f)
1326        sock = server.sockets[0]
1327        host, port = sock.getsockname()
1328
1329        client = socket.socket()
1330        client.connect(('127.0.0.1', port))
1331        client.send(b'xxx')
1332        client.close()
1333
1334        server.close()
1335
1336        client = socket.socket()
1337        self.assertRaises(
1338            ConnectionRefusedError, client.connect, ('127.0.0.1', port))
1339        client.close()
1340
1341    def test_create_datagram_endpoint(self):
1342        class TestMyDatagramProto(MyDatagramProto):
1343            def __init__(inner_self):
1344                super().__init__(loop=self.loop)
1345
1346            def datagram_received(self, data, addr):
1347                super().datagram_received(data, addr)
1348                self.transport.sendto(b'resp:'+data, addr)
1349
1350        coro = self.loop.create_datagram_endpoint(
1351            TestMyDatagramProto, local_addr=('127.0.0.1', 0))
1352        s_transport, server = self.loop.run_until_complete(coro)
1353        host, port = s_transport.get_extra_info('sockname')
1354
1355        self.assertIsInstance(s_transport, asyncio.Transport)
1356        self.assertIsInstance(server, TestMyDatagramProto)
1357        self.assertEqual('INITIALIZED', server.state)
1358        self.assertIs(server.transport, s_transport)
1359
1360        coro = self.loop.create_datagram_endpoint(
1361            lambda: MyDatagramProto(loop=self.loop),
1362            remote_addr=(host, port))
1363        transport, client = self.loop.run_until_complete(coro)
1364
1365        self.assertIsInstance(transport, asyncio.Transport)
1366        self.assertIsInstance(client, MyDatagramProto)
1367        self.assertEqual('INITIALIZED', client.state)
1368        self.assertIs(client.transport, transport)
1369
1370        transport.sendto(b'xxx')
1371        test_utils.run_until(self.loop, lambda: server.nbytes)
1372        self.assertEqual(3, server.nbytes)
1373        test_utils.run_until(self.loop, lambda: client.nbytes)
1374
1375        # received
1376        self.assertEqual(8, client.nbytes)
1377
1378        # extra info is available
1379        self.assertIsNotNone(transport.get_extra_info('sockname'))
1380
1381        # close connection
1382        transport.close()
1383        self.loop.run_until_complete(client.done)
1384        self.assertEqual('CLOSED', client.state)
1385        server.transport.close()
1386
1387    def test_create_datagram_endpoint_sock(self):
1388        if (sys.platform == 'win32' and
1389                isinstance(self.loop, proactor_events.BaseProactorEventLoop)):
1390            raise unittest.SkipTest(
1391                'UDP is not supported with proactor event loops')
1392
1393        sock = None
1394        local_address = ('127.0.0.1', 0)
1395        infos = self.loop.run_until_complete(
1396            self.loop.getaddrinfo(
1397                *local_address, type=socket.SOCK_DGRAM))
1398        for family, type, proto, cname, address in infos:
1399            try:
1400                sock = socket.socket(family=family, type=type, proto=proto)
1401                sock.setblocking(False)
1402                sock.bind(address)
1403            except:
1404                pass
1405            else:
1406                break
1407        else:
1408            assert False, 'Can not create socket.'
1409
1410        f = self.loop.create_datagram_endpoint(
1411            lambda: MyDatagramProto(loop=self.loop), sock=sock)
1412        tr, pr = self.loop.run_until_complete(f)
1413        self.assertIsInstance(tr, asyncio.Transport)
1414        self.assertIsInstance(pr, MyDatagramProto)
1415        tr.close()
1416        self.loop.run_until_complete(pr.done)
1417
1418    def test_internal_fds(self):
1419        loop = self.create_event_loop()
1420        if not isinstance(loop, selector_events.BaseSelectorEventLoop):
1421            loop.close()
1422            self.skipTest('loop is not a BaseSelectorEventLoop')
1423
1424        self.assertEqual(1, loop._internal_fds)
1425        loop.close()
1426        self.assertEqual(0, loop._internal_fds)
1427        self.assertIsNone(loop._csock)
1428        self.assertIsNone(loop._ssock)
1429
1430    @unittest.skipUnless(sys.platform != 'win32',
1431                         "Don't support pipes for Windows")
1432    def test_read_pipe(self):
1433        proto = MyReadPipeProto(loop=self.loop)
1434
1435        rpipe, wpipe = os.pipe()
1436        pipeobj = io.open(rpipe, 'rb', 1024)
1437
1438        async def connect():
1439            t, p = await self.loop.connect_read_pipe(
1440                lambda: proto, pipeobj)
1441            self.assertIs(p, proto)
1442            self.assertIs(t, proto.transport)
1443            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1444            self.assertEqual(0, proto.nbytes)
1445
1446        self.loop.run_until_complete(connect())
1447
1448        os.write(wpipe, b'1')
1449        test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
1450        self.assertEqual(1, proto.nbytes)
1451
1452        os.write(wpipe, b'2345')
1453        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1454        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1455        self.assertEqual(5, proto.nbytes)
1456
1457        os.close(wpipe)
1458        self.loop.run_until_complete(proto.done)
1459        self.assertEqual(
1460            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1461        # extra info is available
1462        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1463
1464    @unittest.skipUnless(sys.platform != 'win32',
1465                         "Don't support pipes for Windows")
1466    def test_unclosed_pipe_transport(self):
1467        # This test reproduces the issue #314 on GitHub
1468        loop = self.create_event_loop()
1469        read_proto = MyReadPipeProto(loop=loop)
1470        write_proto = MyWritePipeProto(loop=loop)
1471
1472        rpipe, wpipe = os.pipe()
1473        rpipeobj = io.open(rpipe, 'rb', 1024)
1474        wpipeobj = io.open(wpipe, 'w', 1024)
1475
1476        async def connect():
1477            read_transport, _ = await loop.connect_read_pipe(
1478                lambda: read_proto, rpipeobj)
1479            write_transport, _ = await loop.connect_write_pipe(
1480                lambda: write_proto, wpipeobj)
1481            return read_transport, write_transport
1482
1483        # Run and close the loop without closing the transports
1484        read_transport, write_transport = loop.run_until_complete(connect())
1485        loop.close()
1486
1487        # These 'repr' calls used to raise an AttributeError
1488        # See Issue #314 on GitHub
1489        self.assertIn('open', repr(read_transport))
1490        self.assertIn('open', repr(write_transport))
1491
1492        # Clean up (avoid ResourceWarning)
1493        rpipeobj.close()
1494        wpipeobj.close()
1495        read_transport._pipe = None
1496        write_transport._pipe = None
1497
1498    @unittest.skipUnless(sys.platform != 'win32',
1499                         "Don't support pipes for Windows")
1500    def test_read_pty_output(self):
1501        proto = MyReadPipeProto(loop=self.loop)
1502
1503        master, slave = os.openpty()
1504        master_read_obj = io.open(master, 'rb', 0)
1505
1506        async def connect():
1507            t, p = await self.loop.connect_read_pipe(lambda: proto,
1508                                                     master_read_obj)
1509            self.assertIs(p, proto)
1510            self.assertIs(t, proto.transport)
1511            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1512            self.assertEqual(0, proto.nbytes)
1513
1514        self.loop.run_until_complete(connect())
1515
1516        os.write(slave, b'1')
1517        test_utils.run_until(self.loop, lambda: proto.nbytes)
1518        self.assertEqual(1, proto.nbytes)
1519
1520        os.write(slave, b'2345')
1521        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1522        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1523        self.assertEqual(5, proto.nbytes)
1524
1525        os.close(slave)
1526        proto.transport.close()
1527        self.loop.run_until_complete(proto.done)
1528        self.assertEqual(
1529            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1530        # extra info is available
1531        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1532
1533    @unittest.skipUnless(sys.platform != 'win32',
1534                         "Don't support pipes for Windows")
1535    def test_write_pipe(self):
1536        rpipe, wpipe = os.pipe()
1537        pipeobj = io.open(wpipe, 'wb', 1024)
1538
1539        proto = MyWritePipeProto(loop=self.loop)
1540        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1541        transport, p = self.loop.run_until_complete(connect)
1542        self.assertIs(p, proto)
1543        self.assertIs(transport, proto.transport)
1544        self.assertEqual('CONNECTED', proto.state)
1545
1546        transport.write(b'1')
1547
1548        data = bytearray()
1549        def reader(data):
1550            chunk = os.read(rpipe, 1024)
1551            data += chunk
1552            return len(data)
1553
1554        test_utils.run_until(self.loop, lambda: reader(data) >= 1)
1555        self.assertEqual(b'1', data)
1556
1557        transport.write(b'2345')
1558        test_utils.run_until(self.loop, lambda: reader(data) >= 5)
1559        self.assertEqual(b'12345', data)
1560        self.assertEqual('CONNECTED', proto.state)
1561
1562        os.close(rpipe)
1563
1564        # extra info is available
1565        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1566
1567        # close connection
1568        proto.transport.close()
1569        self.loop.run_until_complete(proto.done)
1570        self.assertEqual('CLOSED', proto.state)
1571
1572    @unittest.skipUnless(sys.platform != 'win32',
1573                         "Don't support pipes for Windows")
1574    def test_write_pipe_disconnect_on_close(self):
1575        rsock, wsock = socket.socketpair()
1576        rsock.setblocking(False)
1577        pipeobj = io.open(wsock.detach(), 'wb', 1024)
1578
1579        proto = MyWritePipeProto(loop=self.loop)
1580        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1581        transport, p = self.loop.run_until_complete(connect)
1582        self.assertIs(p, proto)
1583        self.assertIs(transport, proto.transport)
1584        self.assertEqual('CONNECTED', proto.state)
1585
1586        transport.write(b'1')
1587        data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
1588        self.assertEqual(b'1', data)
1589
1590        rsock.close()
1591
1592        self.loop.run_until_complete(proto.done)
1593        self.assertEqual('CLOSED', proto.state)
1594
1595    @unittest.skipUnless(sys.platform != 'win32',
1596                         "Don't support pipes for Windows")
1597    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1598    # older than 10.6 (Snow Leopard)
1599    @support.requires_mac_ver(10, 6)
1600    def test_write_pty(self):
1601        master, slave = os.openpty()
1602        slave_write_obj = io.open(slave, 'wb', 0)
1603
1604        proto = MyWritePipeProto(loop=self.loop)
1605        connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
1606        transport, p = self.loop.run_until_complete(connect)
1607        self.assertIs(p, proto)
1608        self.assertIs(transport, proto.transport)
1609        self.assertEqual('CONNECTED', proto.state)
1610
1611        transport.write(b'1')
1612
1613        data = bytearray()
1614        def reader(data):
1615            chunk = os.read(master, 1024)
1616            data += chunk
1617            return len(data)
1618
1619        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1620                             timeout=10)
1621        self.assertEqual(b'1', data)
1622
1623        transport.write(b'2345')
1624        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1625                             timeout=10)
1626        self.assertEqual(b'12345', data)
1627        self.assertEqual('CONNECTED', proto.state)
1628
1629        os.close(master)
1630
1631        # extra info is available
1632        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1633
1634        # close connection
1635        proto.transport.close()
1636        self.loop.run_until_complete(proto.done)
1637        self.assertEqual('CLOSED', proto.state)
1638
1639    @unittest.skipUnless(sys.platform != 'win32',
1640                         "Don't support pipes for Windows")
1641    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1642    # older than 10.6 (Snow Leopard)
1643    @support.requires_mac_ver(10, 6)
1644    def test_bidirectional_pty(self):
1645        master, read_slave = os.openpty()
1646        write_slave = os.dup(read_slave)
1647        tty.setraw(read_slave)
1648
1649        slave_read_obj = io.open(read_slave, 'rb', 0)
1650        read_proto = MyReadPipeProto(loop=self.loop)
1651        read_connect = self.loop.connect_read_pipe(lambda: read_proto,
1652                                                   slave_read_obj)
1653        read_transport, p = self.loop.run_until_complete(read_connect)
1654        self.assertIs(p, read_proto)
1655        self.assertIs(read_transport, read_proto.transport)
1656        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1657        self.assertEqual(0, read_proto.nbytes)
1658
1659
1660        slave_write_obj = io.open(write_slave, 'wb', 0)
1661        write_proto = MyWritePipeProto(loop=self.loop)
1662        write_connect = self.loop.connect_write_pipe(lambda: write_proto,
1663                                                     slave_write_obj)
1664        write_transport, p = self.loop.run_until_complete(write_connect)
1665        self.assertIs(p, write_proto)
1666        self.assertIs(write_transport, write_proto.transport)
1667        self.assertEqual('CONNECTED', write_proto.state)
1668
1669        data = bytearray()
1670        def reader(data):
1671            chunk = os.read(master, 1024)
1672            data += chunk
1673            return len(data)
1674
1675        write_transport.write(b'1')
1676        test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
1677        self.assertEqual(b'1', data)
1678        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1679        self.assertEqual('CONNECTED', write_proto.state)
1680
1681        os.write(master, b'a')
1682        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
1683                             timeout=10)
1684        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1685        self.assertEqual(1, read_proto.nbytes)
1686        self.assertEqual('CONNECTED', write_proto.state)
1687
1688        write_transport.write(b'2345')
1689        test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
1690        self.assertEqual(b'12345', data)
1691        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1692        self.assertEqual('CONNECTED', write_proto.state)
1693
1694        os.write(master, b'bcde')
1695        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
1696                             timeout=10)
1697        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1698        self.assertEqual(5, read_proto.nbytes)
1699        self.assertEqual('CONNECTED', write_proto.state)
1700
1701        os.close(master)
1702
1703        read_transport.close()
1704        self.loop.run_until_complete(read_proto.done)
1705        self.assertEqual(
1706            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
1707
1708        write_transport.close()
1709        self.loop.run_until_complete(write_proto.done)
1710        self.assertEqual('CLOSED', write_proto.state)
1711
1712    def test_prompt_cancellation(self):
1713        r, w = socket.socketpair()
1714        r.setblocking(False)
1715        f = self.loop.create_task(self.loop.sock_recv(r, 1))
1716        ov = getattr(f, 'ov', None)
1717        if ov is not None:
1718            self.assertTrue(ov.pending)
1719
1720        async def main():
1721            try:
1722                self.loop.call_soon(f.cancel)
1723                await f
1724            except asyncio.CancelledError:
1725                res = 'cancelled'
1726            else:
1727                res = None
1728            finally:
1729                self.loop.stop()
1730            return res
1731
1732        start = time.monotonic()
1733        t = asyncio.Task(main(), loop=self.loop)
1734        self.loop.run_forever()
1735        elapsed = time.monotonic() - start
1736
1737        self.assertLess(elapsed, 0.1)
1738        self.assertEqual(t.result(), 'cancelled')
1739        self.assertRaises(asyncio.CancelledError, f.result)
1740        if ov is not None:
1741            self.assertFalse(ov.pending)
1742        self.loop._stop_serving(r)
1743
1744        r.close()
1745        w.close()
1746
1747    def test_timeout_rounding(self):
1748        def _run_once():
1749            self.loop._run_once_counter += 1
1750            orig_run_once()
1751
1752        orig_run_once = self.loop._run_once
1753        self.loop._run_once_counter = 0
1754        self.loop._run_once = _run_once
1755
1756        async def wait():
1757            loop = self.loop
1758            await asyncio.sleep(1e-2, loop=loop)
1759            await asyncio.sleep(1e-4, loop=loop)
1760            await asyncio.sleep(1e-6, loop=loop)
1761            await asyncio.sleep(1e-8, loop=loop)
1762            await asyncio.sleep(1e-10, loop=loop)
1763
1764        self.loop.run_until_complete(wait())
1765        # The ideal number of call is 12, but on some platforms, the selector
1766        # may sleep at little bit less than timeout depending on the resolution
1767        # of the clock used by the kernel. Tolerate a few useless calls on
1768        # these platforms.
1769        self.assertLessEqual(self.loop._run_once_counter, 20,
1770            {'clock_resolution': self.loop._clock_resolution,
1771             'selector': self.loop._selector.__class__.__name__})
1772
1773    def test_remove_fds_after_closing(self):
1774        loop = self.create_event_loop()
1775        callback = lambda: None
1776        r, w = socket.socketpair()
1777        self.addCleanup(r.close)
1778        self.addCleanup(w.close)
1779        loop.add_reader(r, callback)
1780        loop.add_writer(w, callback)
1781        loop.close()
1782        self.assertFalse(loop.remove_reader(r))
1783        self.assertFalse(loop.remove_writer(w))
1784
1785    def test_add_fds_after_closing(self):
1786        loop = self.create_event_loop()
1787        callback = lambda: None
1788        r, w = socket.socketpair()
1789        self.addCleanup(r.close)
1790        self.addCleanup(w.close)
1791        loop.close()
1792        with self.assertRaises(RuntimeError):
1793            loop.add_reader(r, callback)
1794        with self.assertRaises(RuntimeError):
1795            loop.add_writer(w, callback)
1796
1797    def test_close_running_event_loop(self):
1798        @asyncio.coroutine
1799        def close_loop(loop):
1800            self.loop.close()
1801
1802        coro = close_loop(self.loop)
1803        with self.assertRaises(RuntimeError):
1804            self.loop.run_until_complete(coro)
1805
1806    def test_close(self):
1807        self.loop.close()
1808
1809        @asyncio.coroutine
1810        def test():
1811            pass
1812
1813        func = lambda: False
1814        coro = test()
1815        self.addCleanup(coro.close)
1816
1817        # operation blocked when the loop is closed
1818        with self.assertRaises(RuntimeError):
1819            self.loop.run_forever()
1820        with self.assertRaises(RuntimeError):
1821            fut = asyncio.Future(loop=self.loop)
1822            self.loop.run_until_complete(fut)
1823        with self.assertRaises(RuntimeError):
1824            self.loop.call_soon(func)
1825        with self.assertRaises(RuntimeError):
1826            self.loop.call_soon_threadsafe(func)
1827        with self.assertRaises(RuntimeError):
1828            self.loop.call_later(1.0, func)
1829        with self.assertRaises(RuntimeError):
1830            self.loop.call_at(self.loop.time() + .0, func)
1831        with self.assertRaises(RuntimeError):
1832            self.loop.create_task(coro)
1833        with self.assertRaises(RuntimeError):
1834            self.loop.add_signal_handler(signal.SIGTERM, func)
1835
1836        # run_in_executor test is tricky: the method is a coroutine,
1837        # but run_until_complete cannot be called on closed loop.
1838        # Thus iterate once explicitly.
1839        with self.assertRaises(RuntimeError):
1840            it = self.loop.run_in_executor(None, func).__await__()
1841            next(it)
1842
1843
1844class SubprocessTestsMixin:
1845
1846    def check_terminated(self, returncode):
1847        if sys.platform == 'win32':
1848            self.assertIsInstance(returncode, int)
1849            # expect 1 but sometimes get 0
1850        else:
1851            self.assertEqual(-signal.SIGTERM, returncode)
1852
1853    def check_killed(self, returncode):
1854        if sys.platform == 'win32':
1855            self.assertIsInstance(returncode, int)
1856            # expect 1 but sometimes get 0
1857        else:
1858            self.assertEqual(-signal.SIGKILL, returncode)
1859
1860    def test_subprocess_exec(self):
1861        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1862
1863        connect = self.loop.subprocess_exec(
1864                        functools.partial(MySubprocessProtocol, self.loop),
1865                        sys.executable, prog)
1866        transp, proto = self.loop.run_until_complete(connect)
1867        self.assertIsInstance(proto, MySubprocessProtocol)
1868        self.loop.run_until_complete(proto.connected)
1869        self.assertEqual('CONNECTED', proto.state)
1870
1871        stdin = transp.get_pipe_transport(0)
1872        stdin.write(b'Python The Winner')
1873        self.loop.run_until_complete(proto.got_data[1].wait())
1874        with test_utils.disable_logger():
1875            transp.close()
1876        self.loop.run_until_complete(proto.completed)
1877        self.check_killed(proto.returncode)
1878        self.assertEqual(b'Python The Winner', proto.data[1])
1879
1880    def test_subprocess_interactive(self):
1881        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1882
1883        connect = self.loop.subprocess_exec(
1884                        functools.partial(MySubprocessProtocol, self.loop),
1885                        sys.executable, prog)
1886        transp, proto = self.loop.run_until_complete(connect)
1887        self.assertIsInstance(proto, MySubprocessProtocol)
1888        self.loop.run_until_complete(proto.connected)
1889        self.assertEqual('CONNECTED', proto.state)
1890
1891        stdin = transp.get_pipe_transport(0)
1892        stdin.write(b'Python ')
1893        self.loop.run_until_complete(proto.got_data[1].wait())
1894        proto.got_data[1].clear()
1895        self.assertEqual(b'Python ', proto.data[1])
1896
1897        stdin.write(b'The Winner')
1898        self.loop.run_until_complete(proto.got_data[1].wait())
1899        self.assertEqual(b'Python The Winner', proto.data[1])
1900
1901        with test_utils.disable_logger():
1902            transp.close()
1903        self.loop.run_until_complete(proto.completed)
1904        self.check_killed(proto.returncode)
1905
1906    def test_subprocess_shell(self):
1907        connect = self.loop.subprocess_shell(
1908                        functools.partial(MySubprocessProtocol, self.loop),
1909                        'echo Python')
1910        transp, proto = self.loop.run_until_complete(connect)
1911        self.assertIsInstance(proto, MySubprocessProtocol)
1912        self.loop.run_until_complete(proto.connected)
1913
1914        transp.get_pipe_transport(0).close()
1915        self.loop.run_until_complete(proto.completed)
1916        self.assertEqual(0, proto.returncode)
1917        self.assertTrue(all(f.done() for f in proto.disconnects.values()))
1918        self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
1919        self.assertEqual(proto.data[2], b'')
1920        transp.close()
1921
1922    def test_subprocess_exitcode(self):
1923        connect = self.loop.subprocess_shell(
1924                        functools.partial(MySubprocessProtocol, self.loop),
1925                        'exit 7', stdin=None, stdout=None, stderr=None)
1926        transp, proto = self.loop.run_until_complete(connect)
1927        self.assertIsInstance(proto, MySubprocessProtocol)
1928        self.loop.run_until_complete(proto.completed)
1929        self.assertEqual(7, proto.returncode)
1930        transp.close()
1931
1932    def test_subprocess_close_after_finish(self):
1933        connect = self.loop.subprocess_shell(
1934                        functools.partial(MySubprocessProtocol, self.loop),
1935                        'exit 7', stdin=None, stdout=None, stderr=None)
1936        transp, proto = self.loop.run_until_complete(connect)
1937        self.assertIsInstance(proto, MySubprocessProtocol)
1938        self.assertIsNone(transp.get_pipe_transport(0))
1939        self.assertIsNone(transp.get_pipe_transport(1))
1940        self.assertIsNone(transp.get_pipe_transport(2))
1941        self.loop.run_until_complete(proto.completed)
1942        self.assertEqual(7, proto.returncode)
1943        self.assertIsNone(transp.close())
1944
1945    def test_subprocess_kill(self):
1946        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1947
1948        connect = self.loop.subprocess_exec(
1949                        functools.partial(MySubprocessProtocol, self.loop),
1950                        sys.executable, prog)
1951        transp, proto = self.loop.run_until_complete(connect)
1952        self.assertIsInstance(proto, MySubprocessProtocol)
1953        self.loop.run_until_complete(proto.connected)
1954
1955        transp.kill()
1956        self.loop.run_until_complete(proto.completed)
1957        self.check_killed(proto.returncode)
1958        transp.close()
1959
1960    def test_subprocess_terminate(self):
1961        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1962
1963        connect = self.loop.subprocess_exec(
1964                        functools.partial(MySubprocessProtocol, self.loop),
1965                        sys.executable, prog)
1966        transp, proto = self.loop.run_until_complete(connect)
1967        self.assertIsInstance(proto, MySubprocessProtocol)
1968        self.loop.run_until_complete(proto.connected)
1969
1970        transp.terminate()
1971        self.loop.run_until_complete(proto.completed)
1972        self.check_terminated(proto.returncode)
1973        transp.close()
1974
1975    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
1976    def test_subprocess_send_signal(self):
1977        # bpo-31034: Make sure that we get the default signal handler (killing
1978        # the process). The parent process may have decided to ignore SIGHUP,
1979        # and signal handlers are inherited.
1980        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
1981        try:
1982            prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1983
1984            connect = self.loop.subprocess_exec(
1985                            functools.partial(MySubprocessProtocol, self.loop),
1986                            sys.executable, prog)
1987            transp, proto = self.loop.run_until_complete(connect)
1988            self.assertIsInstance(proto, MySubprocessProtocol)
1989            self.loop.run_until_complete(proto.connected)
1990
1991            transp.send_signal(signal.SIGHUP)
1992            self.loop.run_until_complete(proto.completed)
1993            self.assertEqual(-signal.SIGHUP, proto.returncode)
1994            transp.close()
1995        finally:
1996            signal.signal(signal.SIGHUP, old_handler)
1997
1998    def test_subprocess_stderr(self):
1999        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
2000
2001        connect = self.loop.subprocess_exec(
2002                        functools.partial(MySubprocessProtocol, self.loop),
2003                        sys.executable, prog)
2004        transp, proto = self.loop.run_until_complete(connect)
2005        self.assertIsInstance(proto, MySubprocessProtocol)
2006        self.loop.run_until_complete(proto.connected)
2007
2008        stdin = transp.get_pipe_transport(0)
2009        stdin.write(b'test')
2010
2011        self.loop.run_until_complete(proto.completed)
2012
2013        transp.close()
2014        self.assertEqual(b'OUT:test', proto.data[1])
2015        self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
2016        self.assertEqual(0, proto.returncode)
2017
2018    def test_subprocess_stderr_redirect_to_stdout(self):
2019        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
2020
2021        connect = self.loop.subprocess_exec(
2022                        functools.partial(MySubprocessProtocol, self.loop),
2023                        sys.executable, prog, stderr=subprocess.STDOUT)
2024        transp, proto = self.loop.run_until_complete(connect)
2025        self.assertIsInstance(proto, MySubprocessProtocol)
2026        self.loop.run_until_complete(proto.connected)
2027
2028        stdin = transp.get_pipe_transport(0)
2029        self.assertIsNotNone(transp.get_pipe_transport(1))
2030        self.assertIsNone(transp.get_pipe_transport(2))
2031
2032        stdin.write(b'test')
2033        self.loop.run_until_complete(proto.completed)
2034        self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
2035                        proto.data[1])
2036        self.assertEqual(b'', proto.data[2])
2037
2038        transp.close()
2039        self.assertEqual(0, proto.returncode)
2040
2041    def test_subprocess_close_client_stream(self):
2042        prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
2043
2044        connect = self.loop.subprocess_exec(
2045                        functools.partial(MySubprocessProtocol, self.loop),
2046                        sys.executable, prog)
2047        transp, proto = self.loop.run_until_complete(connect)
2048        self.assertIsInstance(proto, MySubprocessProtocol)
2049        self.loop.run_until_complete(proto.connected)
2050
2051        stdin = transp.get_pipe_transport(0)
2052        stdout = transp.get_pipe_transport(1)
2053        stdin.write(b'test')
2054        self.loop.run_until_complete(proto.got_data[1].wait())
2055        self.assertEqual(b'OUT:test', proto.data[1])
2056
2057        stdout.close()
2058        self.loop.run_until_complete(proto.disconnects[1])
2059        stdin.write(b'xxx')
2060        self.loop.run_until_complete(proto.got_data[2].wait())
2061        if sys.platform != 'win32':
2062            self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
2063        else:
2064            # After closing the read-end of a pipe, writing to the
2065            # write-end using os.write() fails with errno==EINVAL and
2066            # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
2067            # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
2068            self.assertEqual(b'ERR:OSError', proto.data[2])
2069        with test_utils.disable_logger():
2070            transp.close()
2071        self.loop.run_until_complete(proto.completed)
2072        self.check_killed(proto.returncode)
2073
2074    def test_subprocess_wait_no_same_group(self):
2075        # start the new process in a new session
2076        connect = self.loop.subprocess_shell(
2077                        functools.partial(MySubprocessProtocol, self.loop),
2078                        'exit 7', stdin=None, stdout=None, stderr=None,
2079                        start_new_session=True)
2080        _, proto = yield self.loop.run_until_complete(connect)
2081        self.assertIsInstance(proto, MySubprocessProtocol)
2082        self.loop.run_until_complete(proto.completed)
2083        self.assertEqual(7, proto.returncode)
2084
2085    def test_subprocess_exec_invalid_args(self):
2086
2087        async def connect(**kwds):
2088            await self.loop.subprocess_exec(
2089                asyncio.SubprocessProtocol,
2090                'pwd', **kwds)
2091
2092        with self.assertRaises(ValueError):
2093            self.loop.run_until_complete(connect(universal_newlines=True))
2094        with self.assertRaises(ValueError):
2095            self.loop.run_until_complete(connect(bufsize=4096))
2096        with self.assertRaises(ValueError):
2097            self.loop.run_until_complete(connect(shell=True))
2098
2099    def test_subprocess_shell_invalid_args(self):
2100
2101        async def connect(cmd=None, **kwds):
2102            if not cmd:
2103                cmd = 'pwd'
2104            await self.loop.subprocess_shell(
2105                asyncio.SubprocessProtocol,
2106                cmd, **kwds)
2107
2108        with self.assertRaises(ValueError):
2109            self.loop.run_until_complete(connect(['ls', '-l']))
2110        with self.assertRaises(ValueError):
2111            self.loop.run_until_complete(connect(universal_newlines=True))
2112        with self.assertRaises(ValueError):
2113            self.loop.run_until_complete(connect(bufsize=4096))
2114        with self.assertRaises(ValueError):
2115            self.loop.run_until_complete(connect(shell=False))
2116
2117
2118class SendfileBase:
2119
2120    # 128 KiB plus small unaligned to buffer chunk
2121    DATA = b"SendfileBaseData" * (1024 * 8 + 1)
2122
2123    # Reduce socket buffer size to test on relative small data sets.
2124    BUF_SIZE = 4 * 1024   # 4 KiB
2125
2126    @classmethod
2127    def setUpClass(cls):
2128        with open(support.TESTFN, 'wb') as fp:
2129            fp.write(cls.DATA)
2130        super().setUpClass()
2131
2132    @classmethod
2133    def tearDownClass(cls):
2134        support.unlink(support.TESTFN)
2135        super().tearDownClass()
2136
2137    def setUp(self):
2138        self.file = open(support.TESTFN, 'rb')
2139        self.addCleanup(self.file.close)
2140        super().setUp()
2141
2142    def run_loop(self, coro):
2143        return self.loop.run_until_complete(coro)
2144
2145
2146class SockSendfileMixin(SendfileBase):
2147
2148    class MyProto(asyncio.Protocol):
2149
2150        def __init__(self, loop):
2151            self.started = False
2152            self.closed = False
2153            self.data = bytearray()
2154            self.fut = loop.create_future()
2155            self.transport = None
2156
2157        def connection_made(self, transport):
2158            self.started = True
2159            self.transport = transport
2160
2161        def data_received(self, data):
2162            self.data.extend(data)
2163
2164        def connection_lost(self, exc):
2165            self.closed = True
2166            self.fut.set_result(None)
2167
2168        async def wait_closed(self):
2169            await self.fut
2170
2171    @classmethod
2172    def setUpClass(cls):
2173        cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
2174        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
2175        super().setUpClass()
2176
2177    @classmethod
2178    def tearDownClass(cls):
2179        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
2180        super().tearDownClass()
2181
2182    def make_socket(self, cleanup=True):
2183        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2184        sock.setblocking(False)
2185        if cleanup:
2186            self.addCleanup(sock.close)
2187        return sock
2188
2189    def reduce_receive_buffer_size(self, sock):
2190        # Reduce receive socket buffer size to test on relative
2191        # small data sets.
2192        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.BUF_SIZE)
2193
2194    def reduce_send_buffer_size(self, sock, transport=None):
2195        # Reduce send socket buffer size to test on relative small data sets.
2196
2197        # On macOS, SO_SNDBUF is reset by connect(). So this method
2198        # should be called after the socket is connected.
2199        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.BUF_SIZE)
2200
2201        if transport is not None:
2202            transport.set_write_buffer_limits(high=self.BUF_SIZE)
2203
2204    def prepare_socksendfile(self):
2205        proto = self.MyProto(self.loop)
2206        port = support.find_unused_port()
2207        srv_sock = self.make_socket(cleanup=False)
2208        srv_sock.bind((support.HOST, port))
2209        server = self.run_loop(self.loop.create_server(
2210            lambda: proto, sock=srv_sock))
2211        self.reduce_receive_buffer_size(srv_sock)
2212
2213        sock = self.make_socket()
2214        self.run_loop(self.loop.sock_connect(sock, ('127.0.0.1', port)))
2215        self.reduce_send_buffer_size(sock)
2216
2217        def cleanup():
2218            if proto.transport is not None:
2219                # can be None if the task was cancelled before
2220                # connection_made callback
2221                proto.transport.close()
2222                self.run_loop(proto.wait_closed())
2223
2224            server.close()
2225            self.run_loop(server.wait_closed())
2226
2227        self.addCleanup(cleanup)
2228
2229        return sock, proto
2230
2231    def test_sock_sendfile_success(self):
2232        sock, proto = self.prepare_socksendfile()
2233        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
2234        sock.close()
2235        self.run_loop(proto.wait_closed())
2236
2237        self.assertEqual(ret, len(self.DATA))
2238        self.assertEqual(proto.data, self.DATA)
2239        self.assertEqual(self.file.tell(), len(self.DATA))
2240
2241    def test_sock_sendfile_with_offset_and_count(self):
2242        sock, proto = self.prepare_socksendfile()
2243        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
2244                                                    1000, 2000))
2245        sock.close()
2246        self.run_loop(proto.wait_closed())
2247
2248        self.assertEqual(proto.data, self.DATA[1000:3000])
2249        self.assertEqual(self.file.tell(), 3000)
2250        self.assertEqual(ret, 2000)
2251
2252    def test_sock_sendfile_zero_size(self):
2253        sock, proto = self.prepare_socksendfile()
2254        with tempfile.TemporaryFile() as f:
2255            ret = self.run_loop(self.loop.sock_sendfile(sock, f,
2256                                                        0, None))
2257        sock.close()
2258        self.run_loop(proto.wait_closed())
2259
2260        self.assertEqual(ret, 0)
2261        self.assertEqual(self.file.tell(), 0)
2262
2263    def test_sock_sendfile_mix_with_regular_send(self):
2264        buf = b"mix_regular_send" * (4 * 1024)  # 64 KiB
2265        sock, proto = self.prepare_socksendfile()
2266        self.run_loop(self.loop.sock_sendall(sock, buf))
2267        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
2268        self.run_loop(self.loop.sock_sendall(sock, buf))
2269        sock.close()
2270        self.run_loop(proto.wait_closed())
2271
2272        self.assertEqual(ret, len(self.DATA))
2273        expected = buf + self.DATA + buf
2274        self.assertEqual(proto.data, expected)
2275        self.assertEqual(self.file.tell(), len(self.DATA))
2276
2277
2278class SendfileMixin(SendfileBase):
2279
2280    class MySendfileProto(MyBaseProto):
2281
2282        def __init__(self, loop=None, close_after=0):
2283            super().__init__(loop)
2284            self.data = bytearray()
2285            self.close_after = close_after
2286
2287        def data_received(self, data):
2288            self.data.extend(data)
2289            super().data_received(data)
2290            if self.close_after and self.nbytes >= self.close_after:
2291                self.transport.close()
2292
2293
2294    # Note: sendfile via SSL transport is equal to sendfile fallback
2295
2296    def prepare_sendfile(self, *, is_ssl=False, close_after=0):
2297        port = support.find_unused_port()
2298        srv_proto = self.MySendfileProto(loop=self.loop,
2299                                         close_after=close_after)
2300        if is_ssl:
2301            if not ssl:
2302                self.skipTest("No ssl module")
2303            srv_ctx = test_utils.simple_server_sslcontext()
2304            cli_ctx = test_utils.simple_client_sslcontext()
2305        else:
2306            srv_ctx = None
2307            cli_ctx = None
2308        srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2309        srv_sock.bind((support.HOST, port))
2310        server = self.run_loop(self.loop.create_server(
2311            lambda: srv_proto, sock=srv_sock, ssl=srv_ctx))
2312        self.reduce_receive_buffer_size(srv_sock)
2313
2314        if is_ssl:
2315            server_hostname = support.HOST
2316        else:
2317            server_hostname = None
2318        cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2319        cli_sock.connect((support.HOST, port))
2320
2321        cli_proto = self.MySendfileProto(loop=self.loop)
2322        tr, pr = self.run_loop(self.loop.create_connection(
2323            lambda: cli_proto, sock=cli_sock,
2324            ssl=cli_ctx, server_hostname=server_hostname))
2325        self.reduce_send_buffer_size(cli_sock, transport=tr)
2326
2327        def cleanup():
2328            srv_proto.transport.close()
2329            cli_proto.transport.close()
2330            self.run_loop(srv_proto.done)
2331            self.run_loop(cli_proto.done)
2332
2333            server.close()
2334            self.run_loop(server.wait_closed())
2335
2336        self.addCleanup(cleanup)
2337        return srv_proto, cli_proto
2338
2339    @unittest.skipIf(sys.platform == 'win32', "UDP sockets are not supported")
2340    def test_sendfile_not_supported(self):
2341        tr, pr = self.run_loop(
2342            self.loop.create_datagram_endpoint(
2343                lambda: MyDatagramProto(loop=self.loop),
2344                family=socket.AF_INET))
2345        try:
2346            with self.assertRaisesRegex(RuntimeError, "not supported"):
2347                self.run_loop(
2348                    self.loop.sendfile(tr, self.file))
2349            self.assertEqual(0, self.file.tell())
2350        finally:
2351            # don't use self.addCleanup because it produces resource warning
2352            tr.close()
2353
2354    def test_sendfile(self):
2355        srv_proto, cli_proto = self.prepare_sendfile()
2356        ret = self.run_loop(
2357            self.loop.sendfile(cli_proto.transport, self.file))
2358        cli_proto.transport.close()
2359        self.run_loop(srv_proto.done)
2360        self.assertEqual(ret, len(self.DATA))
2361        self.assertEqual(srv_proto.nbytes, len(self.DATA))
2362        self.assertEqual(srv_proto.data, self.DATA)
2363        self.assertEqual(self.file.tell(), len(self.DATA))
2364
2365    def test_sendfile_force_fallback(self):
2366        srv_proto, cli_proto = self.prepare_sendfile()
2367
2368        def sendfile_native(transp, file, offset, count):
2369            # to raise SendfileNotAvailableError
2370            return base_events.BaseEventLoop._sendfile_native(
2371                self.loop, transp, file, offset, count)
2372
2373        self.loop._sendfile_native = sendfile_native
2374
2375        ret = self.run_loop(
2376            self.loop.sendfile(cli_proto.transport, self.file))
2377        cli_proto.transport.close()
2378        self.run_loop(srv_proto.done)
2379        self.assertEqual(ret, len(self.DATA))
2380        self.assertEqual(srv_proto.nbytes, len(self.DATA))
2381        self.assertEqual(srv_proto.data, self.DATA)
2382        self.assertEqual(self.file.tell(), len(self.DATA))
2383
2384    def test_sendfile_force_unsupported_native(self):
2385        if sys.platform == 'win32':
2386            if isinstance(self.loop, asyncio.ProactorEventLoop):
2387                self.skipTest("Fails on proactor event loop")
2388        srv_proto, cli_proto = self.prepare_sendfile()
2389
2390        def sendfile_native(transp, file, offset, count):
2391            # to raise SendfileNotAvailableError
2392            return base_events.BaseEventLoop._sendfile_native(
2393                self.loop, transp, file, offset, count)
2394
2395        self.loop._sendfile_native = sendfile_native
2396
2397        with self.assertRaisesRegex(events.SendfileNotAvailableError,
2398                                    "not supported"):
2399            self.run_loop(
2400                self.loop.sendfile(cli_proto.transport, self.file,
2401                                   fallback=False))
2402
2403        cli_proto.transport.close()
2404        self.run_loop(srv_proto.done)
2405        self.assertEqual(srv_proto.nbytes, 0)
2406        self.assertEqual(self.file.tell(), 0)
2407
2408    def test_sendfile_ssl(self):
2409        srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True)
2410        ret = self.run_loop(
2411            self.loop.sendfile(cli_proto.transport, self.file))
2412        cli_proto.transport.close()
2413        self.run_loop(srv_proto.done)
2414        self.assertEqual(ret, len(self.DATA))
2415        self.assertEqual(srv_proto.nbytes, len(self.DATA))
2416        self.assertEqual(srv_proto.data, self.DATA)
2417        self.assertEqual(self.file.tell(), len(self.DATA))
2418
2419    def test_sendfile_for_closing_transp(self):
2420        srv_proto, cli_proto = self.prepare_sendfile()
2421        cli_proto.transport.close()
2422        with self.assertRaisesRegex(RuntimeError, "is closing"):
2423            self.run_loop(self.loop.sendfile(cli_proto.transport, self.file))
2424        self.run_loop(srv_proto.done)
2425        self.assertEqual(srv_proto.nbytes, 0)
2426        self.assertEqual(self.file.tell(), 0)
2427
2428    def test_sendfile_pre_and_post_data(self):
2429        srv_proto, cli_proto = self.prepare_sendfile()
2430        PREFIX = b'PREFIX__' * 1024  # 8 KiB
2431        SUFFIX = b'--SUFFIX' * 1024  # 8 KiB
2432        cli_proto.transport.write(PREFIX)
2433        ret = self.run_loop(
2434            self.loop.sendfile(cli_proto.transport, self.file))
2435        cli_proto.transport.write(SUFFIX)
2436        cli_proto.transport.close()
2437        self.run_loop(srv_proto.done)
2438        self.assertEqual(ret, len(self.DATA))
2439        self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX)
2440        self.assertEqual(self.file.tell(), len(self.DATA))
2441
2442    def test_sendfile_ssl_pre_and_post_data(self):
2443        srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True)
2444        PREFIX = b'zxcvbnm' * 1024
2445        SUFFIX = b'0987654321' * 1024
2446        cli_proto.transport.write(PREFIX)
2447        ret = self.run_loop(
2448            self.loop.sendfile(cli_proto.transport, self.file))
2449        cli_proto.transport.write(SUFFIX)
2450        cli_proto.transport.close()
2451        self.run_loop(srv_proto.done)
2452        self.assertEqual(ret, len(self.DATA))
2453        self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX)
2454        self.assertEqual(self.file.tell(), len(self.DATA))
2455
2456    def test_sendfile_partial(self):
2457        srv_proto, cli_proto = self.prepare_sendfile()
2458        ret = self.run_loop(
2459            self.loop.sendfile(cli_proto.transport, self.file, 1000, 100))
2460        cli_proto.transport.close()
2461        self.run_loop(srv_proto.done)
2462        self.assertEqual(ret, 100)
2463        self.assertEqual(srv_proto.nbytes, 100)
2464        self.assertEqual(srv_proto.data, self.DATA[1000:1100])
2465        self.assertEqual(self.file.tell(), 1100)
2466
2467    def test_sendfile_ssl_partial(self):
2468        srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True)
2469        ret = self.run_loop(
2470            self.loop.sendfile(cli_proto.transport, self.file, 1000, 100))
2471        cli_proto.transport.close()
2472        self.run_loop(srv_proto.done)
2473        self.assertEqual(ret, 100)
2474        self.assertEqual(srv_proto.nbytes, 100)
2475        self.assertEqual(srv_proto.data, self.DATA[1000:1100])
2476        self.assertEqual(self.file.tell(), 1100)
2477
2478    def test_sendfile_close_peer_after_receiving(self):
2479        srv_proto, cli_proto = self.prepare_sendfile(
2480            close_after=len(self.DATA))
2481        ret = self.run_loop(
2482            self.loop.sendfile(cli_proto.transport, self.file))
2483        cli_proto.transport.close()
2484        self.run_loop(srv_proto.done)
2485        self.assertEqual(ret, len(self.DATA))
2486        self.assertEqual(srv_proto.nbytes, len(self.DATA))
2487        self.assertEqual(srv_proto.data, self.DATA)
2488        self.assertEqual(self.file.tell(), len(self.DATA))
2489
2490    def test_sendfile_ssl_close_peer_after_receiving(self):
2491        srv_proto, cli_proto = self.prepare_sendfile(
2492            is_ssl=True, close_after=len(self.DATA))
2493        ret = self.run_loop(
2494            self.loop.sendfile(cli_proto.transport, self.file))
2495        self.run_loop(srv_proto.done)
2496        self.assertEqual(ret, len(self.DATA))
2497        self.assertEqual(srv_proto.nbytes, len(self.DATA))
2498        self.assertEqual(srv_proto.data, self.DATA)
2499        self.assertEqual(self.file.tell(), len(self.DATA))
2500
2501    def test_sendfile_close_peer_in_the_middle_of_receiving(self):
2502        srv_proto, cli_proto = self.prepare_sendfile(close_after=1024)
2503        with self.assertRaises(ConnectionError):
2504            self.run_loop(
2505                self.loop.sendfile(cli_proto.transport, self.file))
2506        self.run_loop(srv_proto.done)
2507
2508        self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA),
2509                        srv_proto.nbytes)
2510        self.assertTrue(1024 <= self.file.tell() < len(self.DATA),
2511                        self.file.tell())
2512        self.assertTrue(cli_proto.transport.is_closing())
2513
2514    def test_sendfile_fallback_close_peer_in_the_middle_of_receiving(self):
2515
2516        def sendfile_native(transp, file, offset, count):
2517            # to raise SendfileNotAvailableError
2518            return base_events.BaseEventLoop._sendfile_native(
2519                self.loop, transp, file, offset, count)
2520
2521        self.loop._sendfile_native = sendfile_native
2522
2523        srv_proto, cli_proto = self.prepare_sendfile(close_after=1024)
2524        with self.assertRaises(ConnectionError):
2525            self.run_loop(
2526                self.loop.sendfile(cli_proto.transport, self.file))
2527        self.run_loop(srv_proto.done)
2528
2529        self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA),
2530                        srv_proto.nbytes)
2531        self.assertTrue(1024 <= self.file.tell() < len(self.DATA),
2532                        self.file.tell())
2533
2534    @unittest.skipIf(not hasattr(os, 'sendfile'),
2535                     "Don't have native sendfile support")
2536    def test_sendfile_prevents_bare_write(self):
2537        srv_proto, cli_proto = self.prepare_sendfile()
2538        fut = self.loop.create_future()
2539
2540        async def coro():
2541            fut.set_result(None)
2542            return await self.loop.sendfile(cli_proto.transport, self.file)
2543
2544        t = self.loop.create_task(coro())
2545        self.run_loop(fut)
2546        with self.assertRaisesRegex(RuntimeError,
2547                                    "sendfile is in progress"):
2548            cli_proto.transport.write(b'data')
2549        ret = self.run_loop(t)
2550        self.assertEqual(ret, len(self.DATA))
2551
2552    def test_sendfile_no_fallback_for_fallback_transport(self):
2553        transport = mock.Mock()
2554        transport.is_closing.side_effect = lambda: False
2555        transport._sendfile_compatible = constants._SendfileMode.FALLBACK
2556        with self.assertRaisesRegex(RuntimeError, 'fallback is disabled'):
2557            self.loop.run_until_complete(
2558                self.loop.sendfile(transport, None, fallback=False))
2559
2560
2561if sys.platform == 'win32':
2562
2563    class SelectEventLoopTests(EventLoopTestsMixin,
2564                               SendfileMixin,
2565                               SockSendfileMixin,
2566                               test_utils.TestCase):
2567
2568        def create_event_loop(self):
2569            return asyncio.SelectorEventLoop()
2570
2571    class ProactorEventLoopTests(EventLoopTestsMixin,
2572                                 SendfileMixin,
2573                                 SockSendfileMixin,
2574                                 SubprocessTestsMixin,
2575                                 test_utils.TestCase):
2576
2577        def create_event_loop(self):
2578            return asyncio.ProactorEventLoop()
2579
2580        def test_reader_callback(self):
2581            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2582
2583        def test_reader_callback_cancel(self):
2584            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2585
2586        def test_writer_callback(self):
2587            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2588
2589        def test_writer_callback_cancel(self):
2590            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2591
2592        def test_create_datagram_endpoint(self):
2593            raise unittest.SkipTest(
2594                "IocpEventLoop does not have create_datagram_endpoint()")
2595
2596        def test_remove_fds_after_closing(self):
2597            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2598else:
2599    import selectors
2600
2601    class UnixEventLoopTestsMixin(EventLoopTestsMixin,
2602                                  SendfileMixin,
2603                                  SockSendfileMixin):
2604        def setUp(self):
2605            super().setUp()
2606            watcher = asyncio.SafeChildWatcher()
2607            watcher.attach_loop(self.loop)
2608            asyncio.set_child_watcher(watcher)
2609
2610        def tearDown(self):
2611            asyncio.set_child_watcher(None)
2612            super().tearDown()
2613
2614
2615    if hasattr(selectors, 'KqueueSelector'):
2616        class KqueueEventLoopTests(UnixEventLoopTestsMixin,
2617                                   SubprocessTestsMixin,
2618                                   test_utils.TestCase):
2619
2620            def create_event_loop(self):
2621                return asyncio.SelectorEventLoop(
2622                    selectors.KqueueSelector())
2623
2624            # kqueue doesn't support character devices (PTY) on Mac OS X older
2625            # than 10.9 (Maverick)
2626            @support.requires_mac_ver(10, 9)
2627            # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
2628            # hangs on OpenBSD 5.5
2629            @unittest.skipIf(sys.platform.startswith('openbsd'),
2630                             'test hangs on OpenBSD')
2631            def test_read_pty_output(self):
2632                super().test_read_pty_output()
2633
2634            # kqueue doesn't support character devices (PTY) on Mac OS X older
2635            # than 10.9 (Maverick)
2636            @support.requires_mac_ver(10, 9)
2637            def test_write_pty(self):
2638                super().test_write_pty()
2639
2640    if hasattr(selectors, 'EpollSelector'):
2641        class EPollEventLoopTests(UnixEventLoopTestsMixin,
2642                                  SubprocessTestsMixin,
2643                                  test_utils.TestCase):
2644
2645            def create_event_loop(self):
2646                return asyncio.SelectorEventLoop(selectors.EpollSelector())
2647
2648    if hasattr(selectors, 'PollSelector'):
2649        class PollEventLoopTests(UnixEventLoopTestsMixin,
2650                                 SubprocessTestsMixin,
2651                                 test_utils.TestCase):
2652
2653            def create_event_loop(self):
2654                return asyncio.SelectorEventLoop(selectors.PollSelector())
2655
2656    # Should always exist.
2657    class SelectEventLoopTests(UnixEventLoopTestsMixin,
2658                               SubprocessTestsMixin,
2659                               test_utils.TestCase):
2660
2661        def create_event_loop(self):
2662            return asyncio.SelectorEventLoop(selectors.SelectSelector())
2663
2664
2665def noop(*args, **kwargs):
2666    pass
2667
2668
2669class HandleTests(test_utils.TestCase):
2670
2671    def setUp(self):
2672        super().setUp()
2673        self.loop = mock.Mock()
2674        self.loop.get_debug.return_value = True
2675
2676    def test_handle(self):
2677        def callback(*args):
2678            return args
2679
2680        args = ()
2681        h = asyncio.Handle(callback, args, self.loop)
2682        self.assertIs(h._callback, callback)
2683        self.assertIs(h._args, args)
2684        self.assertFalse(h.cancelled())
2685
2686        h.cancel()
2687        self.assertTrue(h.cancelled())
2688
2689    def test_callback_with_exception(self):
2690        def callback():
2691            raise ValueError()
2692
2693        self.loop = mock.Mock()
2694        self.loop.call_exception_handler = mock.Mock()
2695
2696        h = asyncio.Handle(callback, (), self.loop)
2697        h._run()
2698
2699        self.loop.call_exception_handler.assert_called_with({
2700            'message': test_utils.MockPattern('Exception in callback.*'),
2701            'exception': mock.ANY,
2702            'handle': h,
2703            'source_traceback': h._source_traceback,
2704        })
2705
2706    def test_handle_weakref(self):
2707        wd = weakref.WeakValueDictionary()
2708        h = asyncio.Handle(lambda: None, (), self.loop)
2709        wd['h'] = h  # Would fail without __weakref__ slot.
2710
2711    def test_handle_repr(self):
2712        self.loop.get_debug.return_value = False
2713
2714        # simple function
2715        h = asyncio.Handle(noop, (1, 2), self.loop)
2716        filename, lineno = test_utils.get_function_source(noop)
2717        self.assertEqual(repr(h),
2718                        '<Handle noop(1, 2) at %s:%s>'
2719                        % (filename, lineno))
2720
2721        # cancelled handle
2722        h.cancel()
2723        self.assertEqual(repr(h),
2724                        '<Handle cancelled>')
2725
2726        # decorated function
2727        cb = asyncio.coroutine(noop)
2728        h = asyncio.Handle(cb, (), self.loop)
2729        self.assertEqual(repr(h),
2730                        '<Handle noop() at %s:%s>'
2731                        % (filename, lineno))
2732
2733        # partial function
2734        cb = functools.partial(noop, 1, 2)
2735        h = asyncio.Handle(cb, (3,), self.loop)
2736        regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
2737                 % (re.escape(filename), lineno))
2738        self.assertRegex(repr(h), regex)
2739
2740        # partial function with keyword args
2741        cb = functools.partial(noop, x=1)
2742        h = asyncio.Handle(cb, (2, 3), self.loop)
2743        regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
2744                 % (re.escape(filename), lineno))
2745        self.assertRegex(repr(h), regex)
2746
2747        # partial method
2748        if sys.version_info >= (3, 4):
2749            method = HandleTests.test_handle_repr
2750            cb = functools.partialmethod(method)
2751            filename, lineno = test_utils.get_function_source(method)
2752            h = asyncio.Handle(cb, (), self.loop)
2753
2754            cb_regex = r'<function HandleTests.test_handle_repr .*>'
2755            cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
2756            regex = (r'^<Handle %s at %s:%s>$'
2757                     % (cb_regex, re.escape(filename), lineno))
2758            self.assertRegex(repr(h), regex)
2759
2760    def test_handle_repr_debug(self):
2761        self.loop.get_debug.return_value = True
2762
2763        # simple function
2764        create_filename = __file__
2765        create_lineno = sys._getframe().f_lineno + 1
2766        h = asyncio.Handle(noop, (1, 2), self.loop)
2767        filename, lineno = test_utils.get_function_source(noop)
2768        self.assertEqual(repr(h),
2769                        '<Handle noop(1, 2) at %s:%s created at %s:%s>'
2770                        % (filename, lineno, create_filename, create_lineno))
2771
2772        # cancelled handle
2773        h.cancel()
2774        self.assertEqual(
2775            repr(h),
2776            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2777            % (filename, lineno, create_filename, create_lineno))
2778
2779        # double cancellation won't overwrite _repr
2780        h.cancel()
2781        self.assertEqual(
2782            repr(h),
2783            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2784            % (filename, lineno, create_filename, create_lineno))
2785
2786    def test_handle_source_traceback(self):
2787        loop = asyncio.get_event_loop_policy().new_event_loop()
2788        loop.set_debug(True)
2789        self.set_event_loop(loop)
2790
2791        def check_source_traceback(h):
2792            lineno = sys._getframe(1).f_lineno - 1
2793            self.assertIsInstance(h._source_traceback, list)
2794            self.assertEqual(h._source_traceback[-1][:3],
2795                             (__file__,
2796                              lineno,
2797                              'test_handle_source_traceback'))
2798
2799        # call_soon
2800        h = loop.call_soon(noop)
2801        check_source_traceback(h)
2802
2803        # call_soon_threadsafe
2804        h = loop.call_soon_threadsafe(noop)
2805        check_source_traceback(h)
2806
2807        # call_later
2808        h = loop.call_later(0, noop)
2809        check_source_traceback(h)
2810
2811        # call_at
2812        h = loop.call_later(0, noop)
2813        check_source_traceback(h)
2814
2815    @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
2816                         'No collections.abc.Coroutine')
2817    def test_coroutine_like_object_debug_formatting(self):
2818        # Test that asyncio can format coroutines that are instances of
2819        # collections.abc.Coroutine, but lack cr_core or gi_code attributes
2820        # (such as ones compiled with Cython).
2821
2822        coro = CoroLike()
2823        coro.__name__ = 'AAA'
2824        self.assertTrue(asyncio.iscoroutine(coro))
2825        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2826
2827        coro.__qualname__ = 'BBB'
2828        self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
2829
2830        coro.cr_running = True
2831        self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
2832
2833        coro.__name__ = coro.__qualname__ = None
2834        self.assertEqual(coroutines._format_coroutine(coro),
2835                         '<CoroLike without __name__>() running')
2836
2837        coro = CoroLike()
2838        coro.__qualname__ = 'CoroLike'
2839        # Some coroutines might not have '__name__', such as
2840        # built-in async_gen.asend().
2841        self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()')
2842
2843        coro = CoroLike()
2844        coro.__qualname__ = 'AAA'
2845        coro.cr_code = None
2846        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2847
2848
2849class TimerTests(unittest.TestCase):
2850
2851    def setUp(self):
2852        super().setUp()
2853        self.loop = mock.Mock()
2854
2855    def test_hash(self):
2856        when = time.monotonic()
2857        h = asyncio.TimerHandle(when, lambda: False, (),
2858                                mock.Mock())
2859        self.assertEqual(hash(h), hash(when))
2860
2861    def test_when(self):
2862        when = time.monotonic()
2863        h = asyncio.TimerHandle(when, lambda: False, (),
2864                                mock.Mock())
2865        self.assertEqual(when, h.when())
2866
2867    def test_timer(self):
2868        def callback(*args):
2869            return args
2870
2871        args = (1, 2, 3)
2872        when = time.monotonic()
2873        h = asyncio.TimerHandle(when, callback, args, mock.Mock())
2874        self.assertIs(h._callback, callback)
2875        self.assertIs(h._args, args)
2876        self.assertFalse(h.cancelled())
2877
2878        # cancel
2879        h.cancel()
2880        self.assertTrue(h.cancelled())
2881        self.assertIsNone(h._callback)
2882        self.assertIsNone(h._args)
2883
2884        # when cannot be None
2885        self.assertRaises(AssertionError,
2886                          asyncio.TimerHandle, None, callback, args,
2887                          self.loop)
2888
2889    def test_timer_repr(self):
2890        self.loop.get_debug.return_value = False
2891
2892        # simple function
2893        h = asyncio.TimerHandle(123, noop, (), self.loop)
2894        src = test_utils.get_function_source(noop)
2895        self.assertEqual(repr(h),
2896                        '<TimerHandle when=123 noop() at %s:%s>' % src)
2897
2898        # cancelled handle
2899        h.cancel()
2900        self.assertEqual(repr(h),
2901                        '<TimerHandle cancelled when=123>')
2902
2903    def test_timer_repr_debug(self):
2904        self.loop.get_debug.return_value = True
2905
2906        # simple function
2907        create_filename = __file__
2908        create_lineno = sys._getframe().f_lineno + 1
2909        h = asyncio.TimerHandle(123, noop, (), self.loop)
2910        filename, lineno = test_utils.get_function_source(noop)
2911        self.assertEqual(repr(h),
2912                        '<TimerHandle when=123 noop() '
2913                        'at %s:%s created at %s:%s>'
2914                        % (filename, lineno, create_filename, create_lineno))
2915
2916        # cancelled handle
2917        h.cancel()
2918        self.assertEqual(repr(h),
2919                        '<TimerHandle cancelled when=123 noop() '
2920                        'at %s:%s created at %s:%s>'
2921                        % (filename, lineno, create_filename, create_lineno))
2922
2923
2924    def test_timer_comparison(self):
2925        def callback(*args):
2926            return args
2927
2928        when = time.monotonic()
2929
2930        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2931        h2 = asyncio.TimerHandle(when, callback, (), self.loop)
2932        # TODO: Use assertLess etc.
2933        self.assertFalse(h1 < h2)
2934        self.assertFalse(h2 < h1)
2935        self.assertTrue(h1 <= h2)
2936        self.assertTrue(h2 <= h1)
2937        self.assertFalse(h1 > h2)
2938        self.assertFalse(h2 > h1)
2939        self.assertTrue(h1 >= h2)
2940        self.assertTrue(h2 >= h1)
2941        self.assertTrue(h1 == h2)
2942        self.assertFalse(h1 != h2)
2943
2944        h2.cancel()
2945        self.assertFalse(h1 == h2)
2946
2947        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2948        h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
2949        self.assertTrue(h1 < h2)
2950        self.assertFalse(h2 < h1)
2951        self.assertTrue(h1 <= h2)
2952        self.assertFalse(h2 <= h1)
2953        self.assertFalse(h1 > h2)
2954        self.assertTrue(h2 > h1)
2955        self.assertFalse(h1 >= h2)
2956        self.assertTrue(h2 >= h1)
2957        self.assertFalse(h1 == h2)
2958        self.assertTrue(h1 != h2)
2959
2960        h3 = asyncio.Handle(callback, (), self.loop)
2961        self.assertIs(NotImplemented, h1.__eq__(h3))
2962        self.assertIs(NotImplemented, h1.__ne__(h3))
2963
2964
2965class AbstractEventLoopTests(unittest.TestCase):
2966
2967    def test_not_implemented(self):
2968        f = mock.Mock()
2969        loop = asyncio.AbstractEventLoop()
2970        self.assertRaises(
2971            NotImplementedError, loop.run_forever)
2972        self.assertRaises(
2973            NotImplementedError, loop.run_until_complete, None)
2974        self.assertRaises(
2975            NotImplementedError, loop.stop)
2976        self.assertRaises(
2977            NotImplementedError, loop.is_running)
2978        self.assertRaises(
2979            NotImplementedError, loop.is_closed)
2980        self.assertRaises(
2981            NotImplementedError, loop.close)
2982        self.assertRaises(
2983            NotImplementedError, loop.create_task, None)
2984        self.assertRaises(
2985            NotImplementedError, loop.call_later, None, None)
2986        self.assertRaises(
2987            NotImplementedError, loop.call_at, f, f)
2988        self.assertRaises(
2989            NotImplementedError, loop.call_soon, None)
2990        self.assertRaises(
2991            NotImplementedError, loop.time)
2992        self.assertRaises(
2993            NotImplementedError, loop.call_soon_threadsafe, None)
2994        self.assertRaises(
2995            NotImplementedError, loop.set_default_executor, f)
2996        self.assertRaises(
2997            NotImplementedError, loop.add_reader, 1, f)
2998        self.assertRaises(
2999            NotImplementedError, loop.remove_reader, 1)
3000        self.assertRaises(
3001            NotImplementedError, loop.add_writer, 1, f)
3002        self.assertRaises(
3003            NotImplementedError, loop.remove_writer, 1)
3004        self.assertRaises(
3005            NotImplementedError, loop.add_signal_handler, 1, f)
3006        self.assertRaises(
3007            NotImplementedError, loop.remove_signal_handler, 1)
3008        self.assertRaises(
3009            NotImplementedError, loop.remove_signal_handler, 1)
3010        self.assertRaises(
3011            NotImplementedError, loop.set_exception_handler, f)
3012        self.assertRaises(
3013            NotImplementedError, loop.default_exception_handler, f)
3014        self.assertRaises(
3015            NotImplementedError, loop.call_exception_handler, f)
3016        self.assertRaises(
3017            NotImplementedError, loop.get_debug)
3018        self.assertRaises(
3019            NotImplementedError, loop.set_debug, f)
3020
3021    def test_not_implemented_async(self):
3022
3023        async def inner():
3024            f = mock.Mock()
3025            loop = asyncio.AbstractEventLoop()
3026
3027            with self.assertRaises(NotImplementedError):
3028                await loop.run_in_executor(f, f)
3029            with self.assertRaises(NotImplementedError):
3030                await loop.getaddrinfo('localhost', 8080)
3031            with self.assertRaises(NotImplementedError):
3032                await loop.getnameinfo(('localhost', 8080))
3033            with self.assertRaises(NotImplementedError):
3034                await loop.create_connection(f)
3035            with self.assertRaises(NotImplementedError):
3036                await loop.create_server(f)
3037            with self.assertRaises(NotImplementedError):
3038                await loop.create_datagram_endpoint(f)
3039            with self.assertRaises(NotImplementedError):
3040                await loop.sock_recv(f, 10)
3041            with self.assertRaises(NotImplementedError):
3042                await loop.sock_recv_into(f, 10)
3043            with self.assertRaises(NotImplementedError):
3044                await loop.sock_sendall(f, 10)
3045            with self.assertRaises(NotImplementedError):
3046                await loop.sock_connect(f, f)
3047            with self.assertRaises(NotImplementedError):
3048                await loop.sock_accept(f)
3049            with self.assertRaises(NotImplementedError):
3050                await loop.sock_sendfile(f, f)
3051            with self.assertRaises(NotImplementedError):
3052                await loop.sendfile(f, f)
3053            with self.assertRaises(NotImplementedError):
3054                await loop.connect_read_pipe(f, mock.sentinel.pipe)
3055            with self.assertRaises(NotImplementedError):
3056                await loop.connect_write_pipe(f, mock.sentinel.pipe)
3057            with self.assertRaises(NotImplementedError):
3058                await loop.subprocess_shell(f, mock.sentinel)
3059            with self.assertRaises(NotImplementedError):
3060                await loop.subprocess_exec(f)
3061
3062        loop = asyncio.new_event_loop()
3063        loop.run_until_complete(inner())
3064        loop.close()
3065
3066
3067class ProtocolsAbsTests(unittest.TestCase):
3068
3069    def test_empty(self):
3070        f = mock.Mock()
3071        p = asyncio.Protocol()
3072        self.assertIsNone(p.connection_made(f))
3073        self.assertIsNone(p.connection_lost(f))
3074        self.assertIsNone(p.data_received(f))
3075        self.assertIsNone(p.eof_received())
3076
3077        dp = asyncio.DatagramProtocol()
3078        self.assertIsNone(dp.connection_made(f))
3079        self.assertIsNone(dp.connection_lost(f))
3080        self.assertIsNone(dp.error_received(f))
3081        self.assertIsNone(dp.datagram_received(f, f))
3082
3083        sp = asyncio.SubprocessProtocol()
3084        self.assertIsNone(sp.connection_made(f))
3085        self.assertIsNone(sp.connection_lost(f))
3086        self.assertIsNone(sp.pipe_data_received(1, f))
3087        self.assertIsNone(sp.pipe_connection_lost(1, f))
3088        self.assertIsNone(sp.process_exited())
3089
3090
3091class PolicyTests(unittest.TestCase):
3092
3093    def test_event_loop_policy(self):
3094        policy = asyncio.AbstractEventLoopPolicy()
3095        self.assertRaises(NotImplementedError, policy.get_event_loop)
3096        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
3097        self.assertRaises(NotImplementedError, policy.new_event_loop)
3098        self.assertRaises(NotImplementedError, policy.get_child_watcher)
3099        self.assertRaises(NotImplementedError, policy.set_child_watcher,
3100                          object())
3101
3102    def test_get_event_loop(self):
3103        policy = asyncio.DefaultEventLoopPolicy()
3104        self.assertIsNone(policy._local._loop)
3105
3106        loop = policy.get_event_loop()
3107        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
3108
3109        self.assertIs(policy._local._loop, loop)
3110        self.assertIs(loop, policy.get_event_loop())
3111        loop.close()
3112
3113    def test_get_event_loop_calls_set_event_loop(self):
3114        policy = asyncio.DefaultEventLoopPolicy()
3115
3116        with mock.patch.object(
3117                policy, "set_event_loop",
3118                wraps=policy.set_event_loop) as m_set_event_loop:
3119
3120            loop = policy.get_event_loop()
3121
3122            # policy._local._loop must be set through .set_event_loop()
3123            # (the unix DefaultEventLoopPolicy needs this call to attach
3124            # the child watcher correctly)
3125            m_set_event_loop.assert_called_with(loop)
3126
3127        loop.close()
3128
3129    def test_get_event_loop_after_set_none(self):
3130        policy = asyncio.DefaultEventLoopPolicy()
3131        policy.set_event_loop(None)
3132        self.assertRaises(RuntimeError, policy.get_event_loop)
3133
3134    @mock.patch('asyncio.events.threading.current_thread')
3135    def test_get_event_loop_thread(self, m_current_thread):
3136
3137        def f():
3138            policy = asyncio.DefaultEventLoopPolicy()
3139            self.assertRaises(RuntimeError, policy.get_event_loop)
3140
3141        th = threading.Thread(target=f)
3142        th.start()
3143        th.join()
3144
3145    def test_new_event_loop(self):
3146        policy = asyncio.DefaultEventLoopPolicy()
3147
3148        loop = policy.new_event_loop()
3149        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
3150        loop.close()
3151
3152    def test_set_event_loop(self):
3153        policy = asyncio.DefaultEventLoopPolicy()
3154        old_loop = policy.get_event_loop()
3155
3156        self.assertRaises(AssertionError, policy.set_event_loop, object())
3157
3158        loop = policy.new_event_loop()
3159        policy.set_event_loop(loop)
3160        self.assertIs(loop, policy.get_event_loop())
3161        self.assertIsNot(old_loop, policy.get_event_loop())
3162        loop.close()
3163        old_loop.close()
3164
3165    def test_get_event_loop_policy(self):
3166        policy = asyncio.get_event_loop_policy()
3167        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
3168        self.assertIs(policy, asyncio.get_event_loop_policy())
3169
3170    def test_set_event_loop_policy(self):
3171        self.assertRaises(
3172            AssertionError, asyncio.set_event_loop_policy, object())
3173
3174        old_policy = asyncio.get_event_loop_policy()
3175
3176        policy = asyncio.DefaultEventLoopPolicy()
3177        asyncio.set_event_loop_policy(policy)
3178        self.assertIs(policy, asyncio.get_event_loop_policy())
3179        self.assertIsNot(policy, old_policy)
3180
3181
3182class GetEventLoopTestsMixin:
3183
3184    _get_running_loop_impl = None
3185    _set_running_loop_impl = None
3186    get_running_loop_impl = None
3187    get_event_loop_impl = None
3188
3189    def setUp(self):
3190        self._get_running_loop_saved = events._get_running_loop
3191        self._set_running_loop_saved = events._set_running_loop
3192        self.get_running_loop_saved = events.get_running_loop
3193        self.get_event_loop_saved = events.get_event_loop
3194
3195        events._get_running_loop = type(self)._get_running_loop_impl
3196        events._set_running_loop = type(self)._set_running_loop_impl
3197        events.get_running_loop = type(self).get_running_loop_impl
3198        events.get_event_loop = type(self).get_event_loop_impl
3199
3200        asyncio._get_running_loop = type(self)._get_running_loop_impl
3201        asyncio._set_running_loop = type(self)._set_running_loop_impl
3202        asyncio.get_running_loop = type(self).get_running_loop_impl
3203        asyncio.get_event_loop = type(self).get_event_loop_impl
3204
3205        super().setUp()
3206
3207        self.loop = asyncio.new_event_loop()
3208        asyncio.set_event_loop(self.loop)
3209
3210        if sys.platform != 'win32':
3211            watcher = asyncio.SafeChildWatcher()
3212            watcher.attach_loop(self.loop)
3213            asyncio.set_child_watcher(watcher)
3214
3215    def tearDown(self):
3216        try:
3217            if sys.platform != 'win32':
3218                asyncio.set_child_watcher(None)
3219
3220            super().tearDown()
3221        finally:
3222            self.loop.close()
3223            asyncio.set_event_loop(None)
3224
3225            events._get_running_loop = self._get_running_loop_saved
3226            events._set_running_loop = self._set_running_loop_saved
3227            events.get_running_loop = self.get_running_loop_saved
3228            events.get_event_loop = self.get_event_loop_saved
3229
3230            asyncio._get_running_loop = self._get_running_loop_saved
3231            asyncio._set_running_loop = self._set_running_loop_saved
3232            asyncio.get_running_loop = self.get_running_loop_saved
3233            asyncio.get_event_loop = self.get_event_loop_saved
3234
3235    if sys.platform != 'win32':
3236
3237        def test_get_event_loop_new_process(self):
3238            # Issue bpo-32126: The multiprocessing module used by
3239            # ProcessPoolExecutor is not functional when the
3240            # multiprocessing.synchronize module cannot be imported.
3241            support.import_module('multiprocessing.synchronize')
3242
3243            async def main():
3244                pool = concurrent.futures.ProcessPoolExecutor()
3245                result = await self.loop.run_in_executor(
3246                    pool, _test_get_event_loop_new_process__sub_proc)
3247                pool.shutdown()
3248                return result
3249
3250            self.assertEqual(
3251                self.loop.run_until_complete(main()),
3252                'hello')
3253
3254    def test_get_event_loop_returns_running_loop(self):
3255        class TestError(Exception):
3256            pass
3257
3258        class Policy(asyncio.DefaultEventLoopPolicy):
3259            def get_event_loop(self):
3260                raise TestError
3261
3262        old_policy = asyncio.get_event_loop_policy()
3263        try:
3264            asyncio.set_event_loop_policy(Policy())
3265            loop = asyncio.new_event_loop()
3266
3267            with self.assertRaises(TestError):
3268                asyncio.get_event_loop()
3269            asyncio.set_event_loop(None)
3270            with self.assertRaises(TestError):
3271                asyncio.get_event_loop()
3272
3273            with self.assertRaisesRegex(RuntimeError, 'no running'):
3274                self.assertIs(asyncio.get_running_loop(), None)
3275            self.assertIs(asyncio._get_running_loop(), None)
3276
3277            async def func():
3278                self.assertIs(asyncio.get_event_loop(), loop)
3279                self.assertIs(asyncio.get_running_loop(), loop)
3280                self.assertIs(asyncio._get_running_loop(), loop)
3281
3282            loop.run_until_complete(func())
3283
3284            asyncio.set_event_loop(loop)
3285            with self.assertRaises(TestError):
3286                asyncio.get_event_loop()
3287
3288            asyncio.set_event_loop(None)
3289            with self.assertRaises(TestError):
3290                asyncio.get_event_loop()
3291
3292        finally:
3293            asyncio.set_event_loop_policy(old_policy)
3294            if loop is not None:
3295                loop.close()
3296
3297        with self.assertRaisesRegex(RuntimeError, 'no running'):
3298            self.assertIs(asyncio.get_running_loop(), None)
3299
3300        self.assertIs(asyncio._get_running_loop(), None)
3301
3302
3303class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
3304
3305    _get_running_loop_impl = events._py__get_running_loop
3306    _set_running_loop_impl = events._py__set_running_loop
3307    get_running_loop_impl = events._py_get_running_loop
3308    get_event_loop_impl = events._py_get_event_loop
3309
3310
3311try:
3312    import _asyncio  # NoQA
3313except ImportError:
3314    pass
3315else:
3316
3317    class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
3318
3319        _get_running_loop_impl = events._c__get_running_loop
3320        _set_running_loop_impl = events._c__set_running_loop
3321        get_running_loop_impl = events._c_get_running_loop
3322        get_event_loop_impl = events._c_get_event_loop
3323
3324
3325class TestServer(unittest.TestCase):
3326
3327    def test_get_loop(self):
3328        loop = asyncio.new_event_loop()
3329        self.addCleanup(loop.close)
3330        proto = MyProto(loop)
3331        server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
3332        self.assertEqual(server.get_loop(), loop)
3333        server.close()
3334        loop.run_until_complete(server.wait_closed())
3335
3336
3337class TestAbstractServer(unittest.TestCase):
3338
3339    def test_close(self):
3340        with self.assertRaises(NotImplementedError):
3341            events.AbstractServer().close()
3342
3343    def test_wait_closed(self):
3344        loop = asyncio.new_event_loop()
3345        self.addCleanup(loop.close)
3346
3347        with self.assertRaises(NotImplementedError):
3348            loop.run_until_complete(events.AbstractServer().wait_closed())
3349
3350    def test_get_loop(self):
3351        with self.assertRaises(NotImplementedError):
3352            events.AbstractServer().get_loop()
3353
3354
3355if __name__ == '__main__':
3356    unittest.main()
3357