1from tornado import gen, ioloop 2from tornado.httpserver import HTTPServer 3from tornado.locks import Event 4from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, bind_unused_port, gen_test 5from tornado.web import Application 6import asyncio 7import contextlib 8import gc 9import os 10import platform 11import traceback 12import unittest 13import warnings 14 15 16@contextlib.contextmanager 17def set_environ(name, value): 18 old_value = os.environ.get(name) 19 os.environ[name] = value 20 21 try: 22 yield 23 finally: 24 if old_value is None: 25 del os.environ[name] 26 else: 27 os.environ[name] = old_value 28 29 30class AsyncTestCaseTest(AsyncTestCase): 31 def test_wait_timeout(self): 32 time = self.io_loop.time 33 34 # Accept default 5-second timeout, no error 35 self.io_loop.add_timeout(time() + 0.01, self.stop) 36 self.wait() 37 38 # Timeout passed to wait() 39 self.io_loop.add_timeout(time() + 1, self.stop) 40 with self.assertRaises(self.failureException): 41 self.wait(timeout=0.01) 42 43 # Timeout set with environment variable 44 self.io_loop.add_timeout(time() + 1, self.stop) 45 with set_environ("ASYNC_TEST_TIMEOUT", "0.01"): 46 with self.assertRaises(self.failureException): 47 self.wait() 48 49 def test_subsequent_wait_calls(self): 50 """ 51 This test makes sure that a second call to wait() 52 clears the first timeout. 53 """ 54 # The first wait ends with time left on the clock 55 self.io_loop.add_timeout(self.io_loop.time() + 0.00, self.stop) 56 self.wait(timeout=0.1) 57 # The second wait has enough time for itself but would fail if the 58 # first wait's deadline were still in effect. 59 self.io_loop.add_timeout(self.io_loop.time() + 0.2, self.stop) 60 self.wait(timeout=0.4) 61 62 63class LeakTest(AsyncTestCase): 64 def tearDown(self): 65 super().tearDown() 66 # Trigger a gc to make warnings more deterministic. 67 gc.collect() 68 69 def test_leaked_coroutine(self): 70 # This test verifies that "leaked" coroutines are shut down 71 # without triggering warnings like "task was destroyed but it 72 # is pending". If this test were to fail, it would fail 73 # because runtests.py detected unexpected output to stderr. 74 event = Event() 75 76 async def callback(): 77 try: 78 await event.wait() 79 except asyncio.CancelledError: 80 pass 81 82 self.io_loop.add_callback(callback) 83 self.io_loop.add_callback(self.stop) 84 self.wait() 85 86 87class AsyncHTTPTestCaseTest(AsyncHTTPTestCase): 88 def setUp(self): 89 super().setUp() 90 # Bind a second port. 91 sock, port = bind_unused_port() 92 app = Application() 93 server = HTTPServer(app, **self.get_httpserver_options()) 94 server.add_socket(sock) 95 self.second_port = port 96 self.second_server = server 97 98 def get_app(self): 99 return Application() 100 101 def test_fetch_segment(self): 102 path = "/path" 103 response = self.fetch(path) 104 self.assertEqual(response.request.url, self.get_url(path)) 105 106 def test_fetch_full_http_url(self): 107 # Ensure that self.fetch() recognizes absolute urls and does 108 # not transform them into references to our main test server. 109 path = "http://localhost:%d/path" % self.second_port 110 111 response = self.fetch(path) 112 self.assertEqual(response.request.url, path) 113 114 def tearDown(self): 115 self.second_server.stop() 116 super().tearDown() 117 118 119class AsyncTestCaseWrapperTest(unittest.TestCase): 120 def test_undecorated_generator(self): 121 class Test(AsyncTestCase): 122 def test_gen(self): 123 yield 124 125 test = Test("test_gen") 126 result = unittest.TestResult() 127 test.run(result) 128 self.assertEqual(len(result.errors), 1) 129 self.assertIn("should be decorated", result.errors[0][1]) 130 131 @unittest.skipIf( 132 platform.python_implementation() == "PyPy", 133 "pypy destructor warnings cannot be silenced", 134 ) 135 def test_undecorated_coroutine(self): 136 class Test(AsyncTestCase): 137 async def test_coro(self): 138 pass 139 140 test = Test("test_coro") 141 result = unittest.TestResult() 142 143 # Silence "RuntimeWarning: coroutine 'test_coro' was never awaited". 144 with warnings.catch_warnings(): 145 warnings.simplefilter("ignore") 146 test.run(result) 147 148 self.assertEqual(len(result.errors), 1) 149 self.assertIn("should be decorated", result.errors[0][1]) 150 151 def test_undecorated_generator_with_skip(self): 152 class Test(AsyncTestCase): 153 @unittest.skip("don't run this") 154 def test_gen(self): 155 yield 156 157 test = Test("test_gen") 158 result = unittest.TestResult() 159 test.run(result) 160 self.assertEqual(len(result.errors), 0) 161 self.assertEqual(len(result.skipped), 1) 162 163 def test_other_return(self): 164 class Test(AsyncTestCase): 165 def test_other_return(self): 166 return 42 167 168 test = Test("test_other_return") 169 result = unittest.TestResult() 170 test.run(result) 171 self.assertEqual(len(result.errors), 1) 172 self.assertIn("Return value from test method ignored", result.errors[0][1]) 173 174 175class SetUpTearDownTest(unittest.TestCase): 176 def test_set_up_tear_down(self): 177 """ 178 This test makes sure that AsyncTestCase calls super methods for 179 setUp and tearDown. 180 181 InheritBoth is a subclass of both AsyncTestCase and 182 SetUpTearDown, with the ordering so that the super of 183 AsyncTestCase will be SetUpTearDown. 184 """ 185 events = [] 186 result = unittest.TestResult() 187 188 class SetUpTearDown(unittest.TestCase): 189 def setUp(self): 190 events.append("setUp") 191 192 def tearDown(self): 193 events.append("tearDown") 194 195 class InheritBoth(AsyncTestCase, SetUpTearDown): 196 def test(self): 197 events.append("test") 198 199 InheritBoth("test").run(result) 200 expected = ["setUp", "test", "tearDown"] 201 self.assertEqual(expected, events) 202 203 204class AsyncHTTPTestCaseSetUpTearDownTest(unittest.TestCase): 205 def test_tear_down_releases_app_and_http_server(self): 206 result = unittest.TestResult() 207 208 class SetUpTearDown(AsyncHTTPTestCase): 209 def get_app(self): 210 return Application() 211 212 def test(self): 213 self.assertTrue(hasattr(self, "_app")) 214 self.assertTrue(hasattr(self, "http_server")) 215 216 test = SetUpTearDown("test") 217 test.run(result) 218 self.assertFalse(hasattr(test, "_app")) 219 self.assertFalse(hasattr(test, "http_server")) 220 221 222class GenTest(AsyncTestCase): 223 def setUp(self): 224 super().setUp() 225 self.finished = False 226 227 def tearDown(self): 228 self.assertTrue(self.finished) 229 super().tearDown() 230 231 @gen_test 232 def test_sync(self): 233 self.finished = True 234 235 @gen_test 236 def test_async(self): 237 yield gen.moment 238 self.finished = True 239 240 def test_timeout(self): 241 # Set a short timeout and exceed it. 242 @gen_test(timeout=0.1) 243 def test(self): 244 yield gen.sleep(1) 245 246 # This can't use assertRaises because we need to inspect the 247 # exc_info triple (and not just the exception object) 248 try: 249 test(self) 250 self.fail("did not get expected exception") 251 except ioloop.TimeoutError: 252 # The stack trace should blame the add_timeout line, not just 253 # unrelated IOLoop/testing internals. 254 self.assertIn("gen.sleep(1)", traceback.format_exc()) 255 256 self.finished = True 257 258 def test_no_timeout(self): 259 # A test that does not exceed its timeout should succeed. 260 @gen_test(timeout=1) 261 def test(self): 262 yield gen.sleep(0.1) 263 264 test(self) 265 self.finished = True 266 267 def test_timeout_environment_variable(self): 268 @gen_test(timeout=0.5) 269 def test_long_timeout(self): 270 yield gen.sleep(0.25) 271 272 # Uses provided timeout of 0.5 seconds, doesn't time out. 273 with set_environ("ASYNC_TEST_TIMEOUT", "0.1"): 274 test_long_timeout(self) 275 276 self.finished = True 277 278 def test_no_timeout_environment_variable(self): 279 @gen_test(timeout=0.01) 280 def test_short_timeout(self): 281 yield gen.sleep(1) 282 283 # Uses environment-variable timeout of 0.1, times out. 284 with set_environ("ASYNC_TEST_TIMEOUT", "0.1"): 285 with self.assertRaises(ioloop.TimeoutError): 286 test_short_timeout(self) 287 288 self.finished = True 289 290 def test_with_method_args(self): 291 @gen_test 292 def test_with_args(self, *args): 293 self.assertEqual(args, ("test",)) 294 yield gen.moment 295 296 test_with_args(self, "test") 297 self.finished = True 298 299 def test_with_method_kwargs(self): 300 @gen_test 301 def test_with_kwargs(self, **kwargs): 302 self.assertDictEqual(kwargs, {"test": "test"}) 303 yield gen.moment 304 305 test_with_kwargs(self, test="test") 306 self.finished = True 307 308 def test_native_coroutine(self): 309 @gen_test 310 async def test(self): 311 self.finished = True 312 313 test(self) 314 315 def test_native_coroutine_timeout(self): 316 # Set a short timeout and exceed it. 317 @gen_test(timeout=0.1) 318 async def test(self): 319 await gen.sleep(1) 320 321 try: 322 test(self) 323 self.fail("did not get expected exception") 324 except ioloop.TimeoutError: 325 self.finished = True 326 327 328class GetNewIOLoopTest(AsyncTestCase): 329 def get_new_ioloop(self): 330 # Use the current loop instead of creating a new one here. 331 return ioloop.IOLoop.current() 332 333 def setUp(self): 334 # This simulates the effect of an asyncio test harness like 335 # pytest-asyncio. 336 self.orig_loop = asyncio.get_event_loop() 337 self.new_loop = asyncio.new_event_loop() 338 asyncio.set_event_loop(self.new_loop) 339 super().setUp() 340 341 def tearDown(self): 342 super().tearDown() 343 # AsyncTestCase must not affect the existing asyncio loop. 344 self.assertFalse(asyncio.get_event_loop().is_closed()) 345 asyncio.set_event_loop(self.orig_loop) 346 self.new_loop.close() 347 348 def test_loop(self): 349 self.assertIs(self.io_loop.asyncio_loop, self.new_loop) # type: ignore 350 351 352if __name__ == "__main__": 353 unittest.main() 354