1import asyncio
2import time
3import threading
4import unittest
5
6from test import support
7from test.test_asyncio import utils as test_utils
8from test.test_asyncio import functional as func_tests
9
10
11def tearDownModule():
12    asyncio.set_event_loop_policy(None)
13
14
15class BaseStartServer(func_tests.FunctionalTestCaseMixin):
16
17    def new_loop(self):
18        raise NotImplementedError
19
20    def test_start_server_1(self):
21        HELLO_MSG = b'1' * 1024 * 5 + b'\n'
22
23        def client(sock, addr):
24            for i in range(10):
25                time.sleep(0.2)
26                if srv.is_serving():
27                    break
28            else:
29                raise RuntimeError
30
31            sock.settimeout(2)
32            sock.connect(addr)
33            sock.send(HELLO_MSG)
34            sock.recv_all(1)
35            sock.close()
36
37        async def serve(reader, writer):
38            await reader.readline()
39            main_task.cancel()
40            writer.write(b'1')
41            writer.close()
42            await writer.wait_closed()
43
44        async def main(srv):
45            async with srv:
46                await srv.serve_forever()
47
48        with self.assertWarns(DeprecationWarning):
49            srv = self.loop.run_until_complete(asyncio.start_server(
50                serve, support.HOSTv4, 0, loop=self.loop, start_serving=False))
51
52        self.assertFalse(srv.is_serving())
53
54        main_task = self.loop.create_task(main(srv))
55
56        addr = srv.sockets[0].getsockname()
57        with self.assertRaises(asyncio.CancelledError):
58            with self.tcp_client(lambda sock: client(sock, addr)):
59                self.loop.run_until_complete(main_task)
60
61        self.assertEqual(srv.sockets, ())
62
63        self.assertIsNone(srv._sockets)
64        self.assertIsNone(srv._waiters)
65        self.assertFalse(srv.is_serving())
66
67        with self.assertRaisesRegex(RuntimeError, r'is closed'):
68            self.loop.run_until_complete(srv.serve_forever())
69
70
71class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
72
73    def new_loop(self):
74        return asyncio.SelectorEventLoop()
75
76    @support.skip_unless_bind_unix_socket
77    def test_start_unix_server_1(self):
78        HELLO_MSG = b'1' * 1024 * 5 + b'\n'
79        started = threading.Event()
80
81        def client(sock, addr):
82            sock.settimeout(2)
83            started.wait(5)
84            sock.connect(addr)
85            sock.send(HELLO_MSG)
86            sock.recv_all(1)
87            sock.close()
88
89        async def serve(reader, writer):
90            await reader.readline()
91            main_task.cancel()
92            writer.write(b'1')
93            writer.close()
94            await writer.wait_closed()
95
96        async def main(srv):
97            async with srv:
98                self.assertFalse(srv.is_serving())
99                await srv.start_serving()
100                self.assertTrue(srv.is_serving())
101                started.set()
102                await srv.serve_forever()
103
104        with test_utils.unix_socket_path() as addr:
105            with self.assertWarns(DeprecationWarning):
106                srv = self.loop.run_until_complete(asyncio.start_unix_server(
107                    serve, addr, loop=self.loop, start_serving=False))
108
109            main_task = self.loop.create_task(main(srv))
110
111            with self.assertRaises(asyncio.CancelledError):
112                with self.unix_client(lambda sock: client(sock, addr)):
113                    self.loop.run_until_complete(main_task)
114
115            self.assertEqual(srv.sockets, ())
116
117            self.assertIsNone(srv._sockets)
118            self.assertIsNone(srv._waiters)
119            self.assertFalse(srv.is_serving())
120
121            with self.assertRaisesRegex(RuntimeError, r'is closed'):
122                self.loop.run_until_complete(srv.serve_forever())
123
124
125@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
126class ProactorStartServerTests(BaseStartServer, unittest.TestCase):
127
128    def new_loop(self):
129        return asyncio.ProactorEventLoop()
130
131
132if __name__ == '__main__':
133    unittest.main()
134