1from tornado import gen, ioloop
2from tornado.httpserver import HTTPServer
3from tornado.locks import Event
4from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, bind_unused_port, gen_test
5from tornado.web import Application
6import asyncio
7import contextlib
8import gc
9import os
10import platform
11import traceback
12import unittest
13import warnings
14
15
16@contextlib.contextmanager
17def set_environ(name, value):
18    old_value = os.environ.get(name)
19    os.environ[name] = value
20
21    try:
22        yield
23    finally:
24        if old_value is None:
25            del os.environ[name]
26        else:
27            os.environ[name] = old_value
28
29
30class AsyncTestCaseTest(AsyncTestCase):
31    def test_wait_timeout(self):
32        time = self.io_loop.time
33
34        # Accept default 5-second timeout, no error
35        self.io_loop.add_timeout(time() + 0.01, self.stop)
36        self.wait()
37
38        # Timeout passed to wait()
39        self.io_loop.add_timeout(time() + 1, self.stop)
40        with self.assertRaises(self.failureException):
41            self.wait(timeout=0.01)
42
43        # Timeout set with environment variable
44        self.io_loop.add_timeout(time() + 1, self.stop)
45        with set_environ("ASYNC_TEST_TIMEOUT", "0.01"):
46            with self.assertRaises(self.failureException):
47                self.wait()
48
49    def test_subsequent_wait_calls(self):
50        """
51        This test makes sure that a second call to wait()
52        clears the first timeout.
53        """
54        # The first wait ends with time left on the clock
55        self.io_loop.add_timeout(self.io_loop.time() + 0.00, self.stop)
56        self.wait(timeout=0.1)
57        # The second wait has enough time for itself but would fail if the
58        # first wait's deadline were still in effect.
59        self.io_loop.add_timeout(self.io_loop.time() + 0.2, self.stop)
60        self.wait(timeout=0.4)
61
62
63class LeakTest(AsyncTestCase):
64    def tearDown(self):
65        super().tearDown()
66        # Trigger a gc to make warnings more deterministic.
67        gc.collect()
68
69    def test_leaked_coroutine(self):
70        # This test verifies that "leaked" coroutines are shut down
71        # without triggering warnings like "task was destroyed but it
72        # is pending". If this test were to fail, it would fail
73        # because runtests.py detected unexpected output to stderr.
74        event = Event()
75
76        async def callback():
77            try:
78                await event.wait()
79            except asyncio.CancelledError:
80                pass
81
82        self.io_loop.add_callback(callback)
83        self.io_loop.add_callback(self.stop)
84        self.wait()
85
86
87class AsyncHTTPTestCaseTest(AsyncHTTPTestCase):
88    def setUp(self):
89        super().setUp()
90        # Bind a second port.
91        sock, port = bind_unused_port()
92        app = Application()
93        server = HTTPServer(app, **self.get_httpserver_options())
94        server.add_socket(sock)
95        self.second_port = port
96        self.second_server = server
97
98    def get_app(self):
99        return Application()
100
101    def test_fetch_segment(self):
102        path = "/path"
103        response = self.fetch(path)
104        self.assertEqual(response.request.url, self.get_url(path))
105
106    def test_fetch_full_http_url(self):
107        # Ensure that self.fetch() recognizes absolute urls and does
108        # not transform them into references to our main test server.
109        path = "http://localhost:%d/path" % self.second_port
110
111        response = self.fetch(path)
112        self.assertEqual(response.request.url, path)
113
114    def tearDown(self):
115        self.second_server.stop()
116        super().tearDown()
117
118
119class AsyncTestCaseWrapperTest(unittest.TestCase):
120    def test_undecorated_generator(self):
121        class Test(AsyncTestCase):
122            def test_gen(self):
123                yield
124
125        test = Test("test_gen")
126        result = unittest.TestResult()
127        test.run(result)
128        self.assertEqual(len(result.errors), 1)
129        self.assertIn("should be decorated", result.errors[0][1])
130
131    @unittest.skipIf(
132        platform.python_implementation() == "PyPy",
133        "pypy destructor warnings cannot be silenced",
134    )
135    def test_undecorated_coroutine(self):
136        class Test(AsyncTestCase):
137            async def test_coro(self):
138                pass
139
140        test = Test("test_coro")
141        result = unittest.TestResult()
142
143        # Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
144        with warnings.catch_warnings():
145            warnings.simplefilter("ignore")
146            test.run(result)
147
148        self.assertEqual(len(result.errors), 1)
149        self.assertIn("should be decorated", result.errors[0][1])
150
151    def test_undecorated_generator_with_skip(self):
152        class Test(AsyncTestCase):
153            @unittest.skip("don't run this")
154            def test_gen(self):
155                yield
156
157        test = Test("test_gen")
158        result = unittest.TestResult()
159        test.run(result)
160        self.assertEqual(len(result.errors), 0)
161        self.assertEqual(len(result.skipped), 1)
162
163    def test_other_return(self):
164        class Test(AsyncTestCase):
165            def test_other_return(self):
166                return 42
167
168        test = Test("test_other_return")
169        result = unittest.TestResult()
170        test.run(result)
171        self.assertEqual(len(result.errors), 1)
172        self.assertIn("Return value from test method ignored", result.errors[0][1])
173
174
175class SetUpTearDownTest(unittest.TestCase):
176    def test_set_up_tear_down(self):
177        """
178        This test makes sure that AsyncTestCase calls super methods for
179        setUp and tearDown.
180
181        InheritBoth is a subclass of both AsyncTestCase and
182        SetUpTearDown, with the ordering so that the super of
183        AsyncTestCase will be SetUpTearDown.
184        """
185        events = []
186        result = unittest.TestResult()
187
188        class SetUpTearDown(unittest.TestCase):
189            def setUp(self):
190                events.append("setUp")
191
192            def tearDown(self):
193                events.append("tearDown")
194
195        class InheritBoth(AsyncTestCase, SetUpTearDown):
196            def test(self):
197                events.append("test")
198
199        InheritBoth("test").run(result)
200        expected = ["setUp", "test", "tearDown"]
201        self.assertEqual(expected, events)
202
203
204class AsyncHTTPTestCaseSetUpTearDownTest(unittest.TestCase):
205    def test_tear_down_releases_app_and_http_server(self):
206        result = unittest.TestResult()
207
208        class SetUpTearDown(AsyncHTTPTestCase):
209            def get_app(self):
210                return Application()
211
212            def test(self):
213                self.assertTrue(hasattr(self, "_app"))
214                self.assertTrue(hasattr(self, "http_server"))
215
216        test = SetUpTearDown("test")
217        test.run(result)
218        self.assertFalse(hasattr(test, "_app"))
219        self.assertFalse(hasattr(test, "http_server"))
220
221
222class GenTest(AsyncTestCase):
223    def setUp(self):
224        super().setUp()
225        self.finished = False
226
227    def tearDown(self):
228        self.assertTrue(self.finished)
229        super().tearDown()
230
231    @gen_test
232    def test_sync(self):
233        self.finished = True
234
235    @gen_test
236    def test_async(self):
237        yield gen.moment
238        self.finished = True
239
240    def test_timeout(self):
241        # Set a short timeout and exceed it.
242        @gen_test(timeout=0.1)
243        def test(self):
244            yield gen.sleep(1)
245
246        # This can't use assertRaises because we need to inspect the
247        # exc_info triple (and not just the exception object)
248        try:
249            test(self)
250            self.fail("did not get expected exception")
251        except ioloop.TimeoutError:
252            # The stack trace should blame the add_timeout line, not just
253            # unrelated IOLoop/testing internals.
254            self.assertIn("gen.sleep(1)", traceback.format_exc())
255
256        self.finished = True
257
258    def test_no_timeout(self):
259        # A test that does not exceed its timeout should succeed.
260        @gen_test(timeout=1)
261        def test(self):
262            yield gen.sleep(0.1)
263
264        test(self)
265        self.finished = True
266
267    def test_timeout_environment_variable(self):
268        @gen_test(timeout=0.5)
269        def test_long_timeout(self):
270            yield gen.sleep(0.25)
271
272        # Uses provided timeout of 0.5 seconds, doesn't time out.
273        with set_environ("ASYNC_TEST_TIMEOUT", "0.1"):
274            test_long_timeout(self)
275
276        self.finished = True
277
278    def test_no_timeout_environment_variable(self):
279        @gen_test(timeout=0.01)
280        def test_short_timeout(self):
281            yield gen.sleep(1)
282
283        # Uses environment-variable timeout of 0.1, times out.
284        with set_environ("ASYNC_TEST_TIMEOUT", "0.1"):
285            with self.assertRaises(ioloop.TimeoutError):
286                test_short_timeout(self)
287
288        self.finished = True
289
290    def test_with_method_args(self):
291        @gen_test
292        def test_with_args(self, *args):
293            self.assertEqual(args, ("test",))
294            yield gen.moment
295
296        test_with_args(self, "test")
297        self.finished = True
298
299    def test_with_method_kwargs(self):
300        @gen_test
301        def test_with_kwargs(self, **kwargs):
302            self.assertDictEqual(kwargs, {"test": "test"})
303            yield gen.moment
304
305        test_with_kwargs(self, test="test")
306        self.finished = True
307
308    def test_native_coroutine(self):
309        @gen_test
310        async def test(self):
311            self.finished = True
312
313        test(self)
314
315    def test_native_coroutine_timeout(self):
316        # Set a short timeout and exceed it.
317        @gen_test(timeout=0.1)
318        async def test(self):
319            await gen.sleep(1)
320
321        try:
322            test(self)
323            self.fail("did not get expected exception")
324        except ioloop.TimeoutError:
325            self.finished = True
326
327
328class GetNewIOLoopTest(AsyncTestCase):
329    def get_new_ioloop(self):
330        # Use the current loop instead of creating a new one here.
331        return ioloop.IOLoop.current()
332
333    def setUp(self):
334        # This simulates the effect of an asyncio test harness like
335        # pytest-asyncio.
336        self.orig_loop = asyncio.get_event_loop()
337        self.new_loop = asyncio.new_event_loop()
338        asyncio.set_event_loop(self.new_loop)
339        super().setUp()
340
341    def tearDown(self):
342        super().tearDown()
343        # AsyncTestCase must not affect the existing asyncio loop.
344        self.assertFalse(asyncio.get_event_loop().is_closed())
345        asyncio.set_event_loop(self.orig_loop)
346        self.new_loop.close()
347
348    def test_loop(self):
349        self.assertIs(self.io_loop.asyncio_loop, self.new_loop)  # type: ignore
350
351
352if __name__ == "__main__":
353    unittest.main()
354