1from __future__ import absolute_import, division, print_function 2 3import gc 4import contextlib 5import datetime 6import functools 7import sys 8import textwrap 9import time 10import weakref 11 12from tornado.concurrent import return_future, Future 13from tornado.escape import url_escape 14from tornado.httpclient import AsyncHTTPClient 15from tornado.ioloop import IOLoop 16from tornado.log import app_log 17from tornado import stack_context 18from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test 19from tornado.test.util import unittest, skipOnTravis, skipBefore33, skipBefore35, skipNotCPython, exec_test 20from tornado.web import Application, RequestHandler, asynchronous, HTTPError 21 22from tornado import gen 23 24try: 25 from concurrent import futures 26except ImportError: 27 futures = None 28 29 30class GenEngineTest(AsyncTestCase): 31 def setUp(self): 32 super(GenEngineTest, self).setUp() 33 self.named_contexts = [] 34 35 def named_context(self, name): 36 @contextlib.contextmanager 37 def context(): 38 self.named_contexts.append(name) 39 try: 40 yield 41 finally: 42 self.assertEqual(self.named_contexts.pop(), name) 43 return context 44 45 def run_gen(self, f): 46 f() 47 return self.wait() 48 49 def delay_callback(self, iterations, callback, arg): 50 """Runs callback(arg) after a number of IOLoop iterations.""" 51 if iterations == 0: 52 callback(arg) 53 else: 54 self.io_loop.add_callback(functools.partial( 55 self.delay_callback, iterations - 1, callback, arg)) 56 57 @return_future 58 def async_future(self, result, callback): 59 self.io_loop.add_callback(callback, result) 60 61 @gen.coroutine 62 def async_exception(self, e): 63 yield gen.moment 64 raise e 65 66 def test_no_yield(self): 67 @gen.engine 68 def f(): 69 self.stop() 70 self.run_gen(f) 71 72 def test_inline_cb(self): 73 @gen.engine 74 def f(): 75 (yield gen.Callback("k1"))() 76 res = yield gen.Wait("k1") 77 self.assertTrue(res is None) 78 self.stop() 79 self.run_gen(f) 80 81 def test_ioloop_cb(self): 82 @gen.engine 83 def f(): 84 self.io_loop.add_callback((yield gen.Callback("k1"))) 85 yield gen.Wait("k1") 86 self.stop() 87 self.run_gen(f) 88 89 def test_exception_phase1(self): 90 @gen.engine 91 def f(): 92 1 / 0 93 self.assertRaises(ZeroDivisionError, self.run_gen, f) 94 95 def test_exception_phase2(self): 96 @gen.engine 97 def f(): 98 self.io_loop.add_callback((yield gen.Callback("k1"))) 99 yield gen.Wait("k1") 100 1 / 0 101 self.assertRaises(ZeroDivisionError, self.run_gen, f) 102 103 def test_exception_in_task_phase1(self): 104 def fail_task(callback): 105 1 / 0 106 107 @gen.engine 108 def f(): 109 try: 110 yield gen.Task(fail_task) 111 raise Exception("did not get expected exception") 112 except ZeroDivisionError: 113 self.stop() 114 self.run_gen(f) 115 116 def test_exception_in_task_phase2(self): 117 # This is the case that requires the use of stack_context in gen.engine 118 def fail_task(callback): 119 self.io_loop.add_callback(lambda: 1 / 0) 120 121 @gen.engine 122 def f(): 123 try: 124 yield gen.Task(fail_task) 125 raise Exception("did not get expected exception") 126 except ZeroDivisionError: 127 self.stop() 128 self.run_gen(f) 129 130 def test_with_arg(self): 131 @gen.engine 132 def f(): 133 (yield gen.Callback("k1"))(42) 134 res = yield gen.Wait("k1") 135 self.assertEqual(42, res) 136 self.stop() 137 self.run_gen(f) 138 139 def test_with_arg_tuple(self): 140 @gen.engine 141 def f(): 142 (yield gen.Callback((1, 2)))((3, 4)) 143 res = yield gen.Wait((1, 2)) 144 self.assertEqual((3, 4), res) 145 self.stop() 146 self.run_gen(f) 147 148 def test_key_reuse(self): 149 @gen.engine 150 def f(): 151 yield gen.Callback("k1") 152 yield gen.Callback("k1") 153 self.stop() 154 self.assertRaises(gen.KeyReuseError, self.run_gen, f) 155 156 def test_key_reuse_tuple(self): 157 @gen.engine 158 def f(): 159 yield gen.Callback((1, 2)) 160 yield gen.Callback((1, 2)) 161 self.stop() 162 self.assertRaises(gen.KeyReuseError, self.run_gen, f) 163 164 def test_key_mismatch(self): 165 @gen.engine 166 def f(): 167 yield gen.Callback("k1") 168 yield gen.Wait("k2") 169 self.stop() 170 self.assertRaises(gen.UnknownKeyError, self.run_gen, f) 171 172 def test_key_mismatch_tuple(self): 173 @gen.engine 174 def f(): 175 yield gen.Callback((1, 2)) 176 yield gen.Wait((2, 3)) 177 self.stop() 178 self.assertRaises(gen.UnknownKeyError, self.run_gen, f) 179 180 def test_leaked_callback(self): 181 @gen.engine 182 def f(): 183 yield gen.Callback("k1") 184 self.stop() 185 self.assertRaises(gen.LeakedCallbackError, self.run_gen, f) 186 187 def test_leaked_callback_tuple(self): 188 @gen.engine 189 def f(): 190 yield gen.Callback((1, 2)) 191 self.stop() 192 self.assertRaises(gen.LeakedCallbackError, self.run_gen, f) 193 194 def test_parallel_callback(self): 195 @gen.engine 196 def f(): 197 for k in range(3): 198 self.io_loop.add_callback((yield gen.Callback(k))) 199 yield gen.Wait(1) 200 self.io_loop.add_callback((yield gen.Callback(3))) 201 yield gen.Wait(0) 202 yield gen.Wait(3) 203 yield gen.Wait(2) 204 self.stop() 205 self.run_gen(f) 206 207 def test_bogus_yield(self): 208 @gen.engine 209 def f(): 210 yield 42 211 self.assertRaises(gen.BadYieldError, self.run_gen, f) 212 213 def test_bogus_yield_tuple(self): 214 @gen.engine 215 def f(): 216 yield (1, 2) 217 self.assertRaises(gen.BadYieldError, self.run_gen, f) 218 219 def test_reuse(self): 220 @gen.engine 221 def f(): 222 self.io_loop.add_callback((yield gen.Callback(0))) 223 yield gen.Wait(0) 224 self.stop() 225 self.run_gen(f) 226 self.run_gen(f) 227 228 def test_task(self): 229 @gen.engine 230 def f(): 231 yield gen.Task(self.io_loop.add_callback) 232 self.stop() 233 self.run_gen(f) 234 235 def test_wait_all(self): 236 @gen.engine 237 def f(): 238 (yield gen.Callback("k1"))("v1") 239 (yield gen.Callback("k2"))("v2") 240 results = yield gen.WaitAll(["k1", "k2"]) 241 self.assertEqual(results, ["v1", "v2"]) 242 self.stop() 243 self.run_gen(f) 244 245 def test_exception_in_yield(self): 246 @gen.engine 247 def f(): 248 try: 249 yield gen.Wait("k1") 250 raise Exception("did not get expected exception") 251 except gen.UnknownKeyError: 252 pass 253 self.stop() 254 self.run_gen(f) 255 256 def test_resume_after_exception_in_yield(self): 257 @gen.engine 258 def f(): 259 try: 260 yield gen.Wait("k1") 261 raise Exception("did not get expected exception") 262 except gen.UnknownKeyError: 263 pass 264 (yield gen.Callback("k2"))("v2") 265 self.assertEqual((yield gen.Wait("k2")), "v2") 266 self.stop() 267 self.run_gen(f) 268 269 def test_orphaned_callback(self): 270 @gen.engine 271 def f(): 272 self.orphaned_callback = yield gen.Callback(1) 273 try: 274 self.run_gen(f) 275 raise Exception("did not get expected exception") 276 except gen.LeakedCallbackError: 277 pass 278 self.orphaned_callback() 279 280 def test_none(self): 281 @gen.engine 282 def f(): 283 yield None 284 self.stop() 285 self.run_gen(f) 286 287 def test_multi(self): 288 @gen.engine 289 def f(): 290 (yield gen.Callback("k1"))("v1") 291 (yield gen.Callback("k2"))("v2") 292 results = yield [gen.Wait("k1"), gen.Wait("k2")] 293 self.assertEqual(results, ["v1", "v2"]) 294 self.stop() 295 self.run_gen(f) 296 297 def test_multi_dict(self): 298 @gen.engine 299 def f(): 300 (yield gen.Callback("k1"))("v1") 301 (yield gen.Callback("k2"))("v2") 302 results = yield dict(foo=gen.Wait("k1"), bar=gen.Wait("k2")) 303 self.assertEqual(results, dict(foo="v1", bar="v2")) 304 self.stop() 305 self.run_gen(f) 306 307 # The following tests explicitly run with both gen.Multi 308 # and gen.multi_future (Task returns a Future, so it can be used 309 # with either). 310 def test_multi_yieldpoint_delayed(self): 311 @gen.engine 312 def f(): 313 # callbacks run at different times 314 responses = yield gen.Multi([ 315 gen.Task(self.delay_callback, 3, arg="v1"), 316 gen.Task(self.delay_callback, 1, arg="v2"), 317 ]) 318 self.assertEqual(responses, ["v1", "v2"]) 319 self.stop() 320 self.run_gen(f) 321 322 def test_multi_yieldpoint_dict_delayed(self): 323 @gen.engine 324 def f(): 325 # callbacks run at different times 326 responses = yield gen.Multi(dict( 327 foo=gen.Task(self.delay_callback, 3, arg="v1"), 328 bar=gen.Task(self.delay_callback, 1, arg="v2"), 329 )) 330 self.assertEqual(responses, dict(foo="v1", bar="v2")) 331 self.stop() 332 self.run_gen(f) 333 334 def test_multi_future_delayed(self): 335 @gen.engine 336 def f(): 337 # callbacks run at different times 338 responses = yield gen.multi_future([ 339 gen.Task(self.delay_callback, 3, arg="v1"), 340 gen.Task(self.delay_callback, 1, arg="v2"), 341 ]) 342 self.assertEqual(responses, ["v1", "v2"]) 343 self.stop() 344 self.run_gen(f) 345 346 def test_multi_future_dict_delayed(self): 347 @gen.engine 348 def f(): 349 # callbacks run at different times 350 responses = yield gen.multi_future(dict( 351 foo=gen.Task(self.delay_callback, 3, arg="v1"), 352 bar=gen.Task(self.delay_callback, 1, arg="v2"), 353 )) 354 self.assertEqual(responses, dict(foo="v1", bar="v2")) 355 self.stop() 356 self.run_gen(f) 357 358 @skipOnTravis 359 @gen_test 360 def test_multi_performance(self): 361 # Yielding a list used to have quadratic performance; make 362 # sure a large list stays reasonable. On my laptop a list of 363 # 2000 used to take 1.8s, now it takes 0.12. 364 start = time.time() 365 yield [gen.Task(self.io_loop.add_callback) for i in range(2000)] 366 end = time.time() 367 self.assertLess(end - start, 1.0) 368 369 @gen_test 370 def test_multi_empty(self): 371 # Empty lists or dicts should return the same type. 372 x = yield [] 373 self.assertTrue(isinstance(x, list)) 374 y = yield {} 375 self.assertTrue(isinstance(y, dict)) 376 377 @gen_test 378 def test_multi_mixed_types(self): 379 # A YieldPoint (Wait) and Future (Task) can be combined 380 # (and use the YieldPoint codepath) 381 (yield gen.Callback("k1"))("v1") 382 responses = yield [gen.Wait("k1"), 383 gen.Task(self.delay_callback, 3, arg="v2")] 384 self.assertEqual(responses, ["v1", "v2"]) 385 386 @gen_test 387 def test_future(self): 388 result = yield self.async_future(1) 389 self.assertEqual(result, 1) 390 391 @gen_test 392 def test_multi_future(self): 393 results = yield [self.async_future(1), self.async_future(2)] 394 self.assertEqual(results, [1, 2]) 395 396 @gen_test 397 def test_multi_future_duplicate(self): 398 f = self.async_future(2) 399 results = yield [self.async_future(1), f, self.async_future(3), f] 400 self.assertEqual(results, [1, 2, 3, 2]) 401 402 @gen_test 403 def test_multi_dict_future(self): 404 results = yield dict(foo=self.async_future(1), bar=self.async_future(2)) 405 self.assertEqual(results, dict(foo=1, bar=2)) 406 407 @gen_test 408 def test_multi_exceptions(self): 409 with ExpectLog(app_log, "Multiple exceptions in yield list"): 410 with self.assertRaises(RuntimeError) as cm: 411 yield gen.Multi([self.async_exception(RuntimeError("error 1")), 412 self.async_exception(RuntimeError("error 2"))]) 413 self.assertEqual(str(cm.exception), "error 1") 414 415 # With only one exception, no error is logged. 416 with self.assertRaises(RuntimeError): 417 yield gen.Multi([self.async_exception(RuntimeError("error 1")), 418 self.async_future(2)]) 419 420 # Exception logging may be explicitly quieted. 421 with self.assertRaises(RuntimeError): 422 yield gen.Multi([self.async_exception(RuntimeError("error 1")), 423 self.async_exception(RuntimeError("error 2"))], 424 quiet_exceptions=RuntimeError) 425 426 @gen_test 427 def test_multi_future_exceptions(self): 428 with ExpectLog(app_log, "Multiple exceptions in yield list"): 429 with self.assertRaises(RuntimeError) as cm: 430 yield [self.async_exception(RuntimeError("error 1")), 431 self.async_exception(RuntimeError("error 2"))] 432 self.assertEqual(str(cm.exception), "error 1") 433 434 # With only one exception, no error is logged. 435 with self.assertRaises(RuntimeError): 436 yield [self.async_exception(RuntimeError("error 1")), 437 self.async_future(2)] 438 439 # Exception logging may be explicitly quieted. 440 with self.assertRaises(RuntimeError): 441 yield gen.multi_future( 442 [self.async_exception(RuntimeError("error 1")), 443 self.async_exception(RuntimeError("error 2"))], 444 quiet_exceptions=RuntimeError) 445 446 def test_arguments(self): 447 @gen.engine 448 def f(): 449 (yield gen.Callback("noargs"))() 450 self.assertEqual((yield gen.Wait("noargs")), None) 451 (yield gen.Callback("1arg"))(42) 452 self.assertEqual((yield gen.Wait("1arg")), 42) 453 454 (yield gen.Callback("kwargs"))(value=42) 455 result = yield gen.Wait("kwargs") 456 self.assertTrue(isinstance(result, gen.Arguments)) 457 self.assertEqual(((), dict(value=42)), result) 458 self.assertEqual(dict(value=42), result.kwargs) 459 460 (yield gen.Callback("2args"))(42, 43) 461 result = yield gen.Wait("2args") 462 self.assertTrue(isinstance(result, gen.Arguments)) 463 self.assertEqual(((42, 43), {}), result) 464 self.assertEqual((42, 43), result.args) 465 466 def task_func(callback): 467 callback(None, error="foo") 468 result = yield gen.Task(task_func) 469 self.assertTrue(isinstance(result, gen.Arguments)) 470 self.assertEqual(((None,), dict(error="foo")), result) 471 472 self.stop() 473 self.run_gen(f) 474 475 def test_stack_context_leak(self): 476 # regression test: repeated invocations of a gen-based 477 # function should not result in accumulated stack_contexts 478 def _stack_depth(): 479 head = stack_context._state.contexts[1] 480 length = 0 481 482 while head is not None: 483 length += 1 484 head = head.old_contexts[1] 485 486 return length 487 488 @gen.engine 489 def inner(callback): 490 yield gen.Task(self.io_loop.add_callback) 491 callback() 492 493 @gen.engine 494 def outer(): 495 for i in range(10): 496 yield gen.Task(inner) 497 498 stack_increase = _stack_depth() - initial_stack_depth 499 self.assertTrue(stack_increase <= 2) 500 self.stop() 501 initial_stack_depth = _stack_depth() 502 self.run_gen(outer) 503 504 def test_stack_context_leak_exception(self): 505 # same as previous, but with a function that exits with an exception 506 @gen.engine 507 def inner(callback): 508 yield gen.Task(self.io_loop.add_callback) 509 1 / 0 510 511 @gen.engine 512 def outer(): 513 for i in range(10): 514 try: 515 yield gen.Task(inner) 516 except ZeroDivisionError: 517 pass 518 stack_increase = len(stack_context._state.contexts) - initial_stack_depth 519 self.assertTrue(stack_increase <= 2) 520 self.stop() 521 initial_stack_depth = len(stack_context._state.contexts) 522 self.run_gen(outer) 523 524 def function_with_stack_context(self, callback): 525 # Technically this function should stack_context.wrap its callback 526 # upon entry. However, it is very common for this step to be 527 # omitted. 528 def step2(): 529 self.assertEqual(self.named_contexts, ['a']) 530 self.io_loop.add_callback(callback) 531 532 with stack_context.StackContext(self.named_context('a')): 533 self.io_loop.add_callback(step2) 534 535 @gen_test 536 def test_wait_transfer_stack_context(self): 537 # Wait should not pick up contexts from where callback was invoked, 538 # even if that function improperly fails to wrap its callback. 539 cb = yield gen.Callback('k1') 540 self.function_with_stack_context(cb) 541 self.assertEqual(self.named_contexts, []) 542 yield gen.Wait('k1') 543 self.assertEqual(self.named_contexts, []) 544 545 @gen_test 546 def test_task_transfer_stack_context(self): 547 yield gen.Task(self.function_with_stack_context) 548 self.assertEqual(self.named_contexts, []) 549 550 def test_raise_after_stop(self): 551 # This pattern will be used in the following tests so make sure 552 # the exception propagates as expected. 553 @gen.engine 554 def f(): 555 self.stop() 556 1 / 0 557 558 with self.assertRaises(ZeroDivisionError): 559 self.run_gen(f) 560 561 def test_sync_raise_return(self): 562 # gen.Return is allowed in @gen.engine, but it may not be used 563 # to return a value. 564 @gen.engine 565 def f(): 566 self.stop(42) 567 raise gen.Return() 568 569 result = self.run_gen(f) 570 self.assertEqual(result, 42) 571 572 def test_async_raise_return(self): 573 @gen.engine 574 def f(): 575 yield gen.Task(self.io_loop.add_callback) 576 self.stop(42) 577 raise gen.Return() 578 579 result = self.run_gen(f) 580 self.assertEqual(result, 42) 581 582 def test_sync_raise_return_value(self): 583 @gen.engine 584 def f(): 585 raise gen.Return(42) 586 587 with self.assertRaises(gen.ReturnValueIgnoredError): 588 self.run_gen(f) 589 590 def test_sync_raise_return_value_tuple(self): 591 @gen.engine 592 def f(): 593 raise gen.Return((1, 2)) 594 595 with self.assertRaises(gen.ReturnValueIgnoredError): 596 self.run_gen(f) 597 598 def test_async_raise_return_value(self): 599 @gen.engine 600 def f(): 601 yield gen.Task(self.io_loop.add_callback) 602 raise gen.Return(42) 603 604 with self.assertRaises(gen.ReturnValueIgnoredError): 605 self.run_gen(f) 606 607 def test_async_raise_return_value_tuple(self): 608 @gen.engine 609 def f(): 610 yield gen.Task(self.io_loop.add_callback) 611 raise gen.Return((1, 2)) 612 613 with self.assertRaises(gen.ReturnValueIgnoredError): 614 self.run_gen(f) 615 616 def test_return_value(self): 617 # It is an error to apply @gen.engine to a function that returns 618 # a value. 619 @gen.engine 620 def f(): 621 return 42 622 623 with self.assertRaises(gen.ReturnValueIgnoredError): 624 self.run_gen(f) 625 626 def test_return_value_tuple(self): 627 # It is an error to apply @gen.engine to a function that returns 628 # a value. 629 @gen.engine 630 def f(): 631 return (1, 2) 632 633 with self.assertRaises(gen.ReturnValueIgnoredError): 634 self.run_gen(f) 635 636 @skipNotCPython 637 def test_task_refcounting(self): 638 # On CPython, tasks and their arguments should be released immediately 639 # without waiting for garbage collection. 640 @gen.engine 641 def f(): 642 class Foo(object): 643 pass 644 arg = Foo() 645 self.arg_ref = weakref.ref(arg) 646 task = gen.Task(self.io_loop.add_callback, arg=arg) 647 self.task_ref = weakref.ref(task) 648 yield task 649 self.stop() 650 651 self.run_gen(f) 652 self.assertIs(self.arg_ref(), None) 653 self.assertIs(self.task_ref(), None) 654 655 656class GenCoroutineTest(AsyncTestCase): 657 def setUp(self): 658 # Stray StopIteration exceptions can lead to tests exiting prematurely, 659 # so we need explicit checks here to make sure the tests run all 660 # the way through. 661 self.finished = False 662 super(GenCoroutineTest, self).setUp() 663 664 def tearDown(self): 665 super(GenCoroutineTest, self).tearDown() 666 assert self.finished 667 668 def test_attributes(self): 669 self.finished = True 670 671 def f(): 672 yield gen.moment 673 674 coro = gen.coroutine(f) 675 self.assertEqual(coro.__name__, f.__name__) 676 self.assertEqual(coro.__module__, f.__module__) 677 self.assertIs(coro.__wrapped__, f) 678 679 def test_is_coroutine_function(self): 680 self.finished = True 681 682 def f(): 683 yield gen.moment 684 685 coro = gen.coroutine(f) 686 self.assertFalse(gen.is_coroutine_function(f)) 687 self.assertTrue(gen.is_coroutine_function(coro)) 688 self.assertFalse(gen.is_coroutine_function(coro())) 689 690 @gen_test 691 def test_sync_gen_return(self): 692 @gen.coroutine 693 def f(): 694 raise gen.Return(42) 695 result = yield f() 696 self.assertEqual(result, 42) 697 self.finished = True 698 699 @gen_test 700 def test_async_gen_return(self): 701 @gen.coroutine 702 def f(): 703 yield gen.Task(self.io_loop.add_callback) 704 raise gen.Return(42) 705 result = yield f() 706 self.assertEqual(result, 42) 707 self.finished = True 708 709 @gen_test 710 def test_sync_return(self): 711 @gen.coroutine 712 def f(): 713 return 42 714 result = yield f() 715 self.assertEqual(result, 42) 716 self.finished = True 717 718 @skipBefore33 719 @gen_test 720 def test_async_return(self): 721 namespace = exec_test(globals(), locals(), """ 722 @gen.coroutine 723 def f(): 724 yield gen.Task(self.io_loop.add_callback) 725 return 42 726 """) 727 result = yield namespace['f']() 728 self.assertEqual(result, 42) 729 self.finished = True 730 731 @skipBefore33 732 @gen_test 733 def test_async_early_return(self): 734 # A yield statement exists but is not executed, which means 735 # this function "returns" via an exception. This exception 736 # doesn't happen before the exception handling is set up. 737 namespace = exec_test(globals(), locals(), """ 738 @gen.coroutine 739 def f(): 740 if True: 741 return 42 742 yield gen.Task(self.io_loop.add_callback) 743 """) 744 result = yield namespace['f']() 745 self.assertEqual(result, 42) 746 self.finished = True 747 748 @skipBefore35 749 @gen_test 750 def test_async_await(self): 751 # This test verifies that an async function can await a 752 # yield-based gen.coroutine, and that a gen.coroutine 753 # (the test method itself) can yield an async function. 754 namespace = exec_test(globals(), locals(), """ 755 async def f(): 756 await gen.Task(self.io_loop.add_callback) 757 return 42 758 """) 759 result = yield namespace['f']() 760 self.assertEqual(result, 42) 761 self.finished = True 762 763 @skipBefore35 764 @gen_test 765 def test_asyncio_sleep_zero(self): 766 # asyncio.sleep(0) turns into a special case (equivalent to 767 # `yield None`) 768 namespace = exec_test(globals(), locals(), """ 769 async def f(): 770 import asyncio 771 await asyncio.sleep(0) 772 return 42 773 """) 774 result = yield namespace['f']() 775 self.assertEqual(result, 42) 776 self.finished = True 777 778 @skipBefore35 779 @gen_test 780 def test_async_await_mixed_multi_native_future(self): 781 namespace = exec_test(globals(), locals(), """ 782 async def f1(): 783 await gen.Task(self.io_loop.add_callback) 784 return 42 785 """) 786 787 @gen.coroutine 788 def f2(): 789 yield gen.Task(self.io_loop.add_callback) 790 raise gen.Return(43) 791 792 results = yield [namespace['f1'](), f2()] 793 self.assertEqual(results, [42, 43]) 794 self.finished = True 795 796 @skipBefore35 797 @gen_test 798 def test_async_await_mixed_multi_native_yieldpoint(self): 799 namespace = exec_test(globals(), locals(), """ 800 async def f1(): 801 await gen.Task(self.io_loop.add_callback) 802 return 42 803 """) 804 805 @gen.coroutine 806 def f2(): 807 yield gen.Task(self.io_loop.add_callback) 808 raise gen.Return(43) 809 810 f2(callback=(yield gen.Callback('cb'))) 811 results = yield [namespace['f1'](), gen.Wait('cb')] 812 self.assertEqual(results, [42, 43]) 813 self.finished = True 814 815 @skipBefore35 816 @gen_test 817 def test_async_with_timeout(self): 818 namespace = exec_test(globals(), locals(), """ 819 async def f1(): 820 return 42 821 """) 822 823 result = yield gen.with_timeout(datetime.timedelta(hours=1), 824 namespace['f1']()) 825 self.assertEqual(result, 42) 826 self.finished = True 827 828 @gen_test 829 def test_sync_return_no_value(self): 830 @gen.coroutine 831 def f(): 832 return 833 result = yield f() 834 self.assertEqual(result, None) 835 self.finished = True 836 837 @gen_test 838 def test_async_return_no_value(self): 839 # Without a return value we don't need python 3.3. 840 @gen.coroutine 841 def f(): 842 yield gen.Task(self.io_loop.add_callback) 843 return 844 result = yield f() 845 self.assertEqual(result, None) 846 self.finished = True 847 848 @gen_test 849 def test_sync_raise(self): 850 @gen.coroutine 851 def f(): 852 1 / 0 853 # The exception is raised when the future is yielded 854 # (or equivalently when its result method is called), 855 # not when the function itself is called). 856 future = f() 857 with self.assertRaises(ZeroDivisionError): 858 yield future 859 self.finished = True 860 861 @gen_test 862 def test_async_raise(self): 863 @gen.coroutine 864 def f(): 865 yield gen.Task(self.io_loop.add_callback) 866 1 / 0 867 future = f() 868 with self.assertRaises(ZeroDivisionError): 869 yield future 870 self.finished = True 871 872 @gen_test 873 def test_pass_callback(self): 874 @gen.coroutine 875 def f(): 876 raise gen.Return(42) 877 result = yield gen.Task(f) 878 self.assertEqual(result, 42) 879 self.finished = True 880 881 @gen_test 882 def test_replace_yieldpoint_exception(self): 883 # Test exception handling: a coroutine can catch one exception 884 # raised by a yield point and raise a different one. 885 @gen.coroutine 886 def f1(): 887 1 / 0 888 889 @gen.coroutine 890 def f2(): 891 try: 892 yield f1() 893 except ZeroDivisionError: 894 raise KeyError() 895 896 future = f2() 897 with self.assertRaises(KeyError): 898 yield future 899 self.finished = True 900 901 @gen_test 902 def test_swallow_yieldpoint_exception(self): 903 # Test exception handling: a coroutine can catch an exception 904 # raised by a yield point and not raise a different one. 905 @gen.coroutine 906 def f1(): 907 1 / 0 908 909 @gen.coroutine 910 def f2(): 911 try: 912 yield f1() 913 except ZeroDivisionError: 914 raise gen.Return(42) 915 916 result = yield f2() 917 self.assertEqual(result, 42) 918 self.finished = True 919 920 @gen_test 921 def test_replace_context_exception(self): 922 # Test exception handling: exceptions thrown into the stack context 923 # can be caught and replaced. 924 # Note that this test and the following are for behavior that is 925 # not really supported any more: coroutines no longer create a 926 # stack context automatically; but one is created after the first 927 # YieldPoint (i.e. not a Future). 928 @gen.coroutine 929 def f2(): 930 (yield gen.Callback(1))() 931 yield gen.Wait(1) 932 self.io_loop.add_callback(lambda: 1 / 0) 933 try: 934 yield gen.Task(self.io_loop.add_timeout, 935 self.io_loop.time() + 10) 936 except ZeroDivisionError: 937 raise KeyError() 938 939 future = f2() 940 with self.assertRaises(KeyError): 941 yield future 942 self.finished = True 943 944 @gen_test 945 def test_swallow_context_exception(self): 946 # Test exception handling: exceptions thrown into the stack context 947 # can be caught and ignored. 948 @gen.coroutine 949 def f2(): 950 (yield gen.Callback(1))() 951 yield gen.Wait(1) 952 self.io_loop.add_callback(lambda: 1 / 0) 953 try: 954 yield gen.Task(self.io_loop.add_timeout, 955 self.io_loop.time() + 10) 956 except ZeroDivisionError: 957 raise gen.Return(42) 958 959 result = yield f2() 960 self.assertEqual(result, 42) 961 self.finished = True 962 963 @gen_test 964 def test_moment(self): 965 calls = [] 966 967 @gen.coroutine 968 def f(name, yieldable): 969 for i in range(5): 970 calls.append(name) 971 yield yieldable 972 # First, confirm the behavior without moment: each coroutine 973 # monopolizes the event loop until it finishes. 974 immediate = Future() 975 immediate.set_result(None) 976 yield [f('a', immediate), f('b', immediate)] 977 self.assertEqual(''.join(calls), 'aaaaabbbbb') 978 979 # With moment, they take turns. 980 calls = [] 981 yield [f('a', gen.moment), f('b', gen.moment)] 982 self.assertEqual(''.join(calls), 'ababababab') 983 self.finished = True 984 985 calls = [] 986 yield [f('a', gen.moment), f('b', immediate)] 987 self.assertEqual(''.join(calls), 'abbbbbaaaa') 988 989 @gen_test 990 def test_sleep(self): 991 yield gen.sleep(0.01) 992 self.finished = True 993 994 @skipBefore33 995 @gen_test 996 def test_py3_leak_exception_context(self): 997 class LeakedException(Exception): 998 pass 999 1000 @gen.coroutine 1001 def inner(iteration): 1002 raise LeakedException(iteration) 1003 1004 try: 1005 yield inner(1) 1006 except LeakedException as e: 1007 self.assertEqual(str(e), "1") 1008 self.assertIsNone(e.__context__) 1009 1010 try: 1011 yield inner(2) 1012 except LeakedException as e: 1013 self.assertEqual(str(e), "2") 1014 self.assertIsNone(e.__context__) 1015 1016 self.finished = True 1017 1018 @skipNotCPython 1019 def test_coroutine_refcounting(self): 1020 # On CPython, tasks and their arguments should be released immediately 1021 # without waiting for garbage collection. 1022 @gen.coroutine 1023 def inner(): 1024 class Foo(object): 1025 pass 1026 local_var = Foo() 1027 self.local_ref = weakref.ref(local_var) 1028 yield gen.coroutine(lambda: None)() 1029 raise ValueError('Some error') 1030 1031 @gen.coroutine 1032 def inner2(): 1033 try: 1034 yield inner() 1035 except ValueError: 1036 pass 1037 1038 self.io_loop.run_sync(inner2, timeout=3) 1039 1040 self.assertIs(self.local_ref(), None) 1041 self.finished = True 1042 1043 1044class GenSequenceHandler(RequestHandler): 1045 @asynchronous 1046 @gen.engine 1047 def get(self): 1048 self.io_loop = self.request.connection.stream.io_loop 1049 self.io_loop.add_callback((yield gen.Callback("k1"))) 1050 yield gen.Wait("k1") 1051 self.write("1") 1052 self.io_loop.add_callback((yield gen.Callback("k2"))) 1053 yield gen.Wait("k2") 1054 self.write("2") 1055 # reuse an old key 1056 self.io_loop.add_callback((yield gen.Callback("k1"))) 1057 yield gen.Wait("k1") 1058 self.finish("3") 1059 1060 1061class GenCoroutineSequenceHandler(RequestHandler): 1062 @gen.coroutine 1063 def get(self): 1064 self.io_loop = self.request.connection.stream.io_loop 1065 self.io_loop.add_callback((yield gen.Callback("k1"))) 1066 yield gen.Wait("k1") 1067 self.write("1") 1068 self.io_loop.add_callback((yield gen.Callback("k2"))) 1069 yield gen.Wait("k2") 1070 self.write("2") 1071 # reuse an old key 1072 self.io_loop.add_callback((yield gen.Callback("k1"))) 1073 yield gen.Wait("k1") 1074 self.finish("3") 1075 1076 1077class GenCoroutineUnfinishedSequenceHandler(RequestHandler): 1078 @asynchronous 1079 @gen.coroutine 1080 def get(self): 1081 self.io_loop = self.request.connection.stream.io_loop 1082 self.io_loop.add_callback((yield gen.Callback("k1"))) 1083 yield gen.Wait("k1") 1084 self.write("1") 1085 self.io_loop.add_callback((yield gen.Callback("k2"))) 1086 yield gen.Wait("k2") 1087 self.write("2") 1088 # reuse an old key 1089 self.io_loop.add_callback((yield gen.Callback("k1"))) 1090 yield gen.Wait("k1") 1091 # just write, don't finish 1092 self.write("3") 1093 1094 1095class GenTaskHandler(RequestHandler): 1096 @asynchronous 1097 @gen.engine 1098 def get(self): 1099 io_loop = self.request.connection.stream.io_loop 1100 client = AsyncHTTPClient(io_loop=io_loop) 1101 response = yield gen.Task(client.fetch, self.get_argument('url')) 1102 response.rethrow() 1103 self.finish(b"got response: " + response.body) 1104 1105 1106class GenExceptionHandler(RequestHandler): 1107 @asynchronous 1108 @gen.engine 1109 def get(self): 1110 # This test depends on the order of the two decorators. 1111 io_loop = self.request.connection.stream.io_loop 1112 yield gen.Task(io_loop.add_callback) 1113 raise Exception("oops") 1114 1115 1116class GenCoroutineExceptionHandler(RequestHandler): 1117 @gen.coroutine 1118 def get(self): 1119 # This test depends on the order of the two decorators. 1120 io_loop = self.request.connection.stream.io_loop 1121 yield gen.Task(io_loop.add_callback) 1122 raise Exception("oops") 1123 1124 1125class GenYieldExceptionHandler(RequestHandler): 1126 @asynchronous 1127 @gen.engine 1128 def get(self): 1129 io_loop = self.request.connection.stream.io_loop 1130 # Test the interaction of the two stack_contexts. 1131 1132 def fail_task(callback): 1133 io_loop.add_callback(lambda: 1 / 0) 1134 try: 1135 yield gen.Task(fail_task) 1136 raise Exception("did not get expected exception") 1137 except ZeroDivisionError: 1138 self.finish('ok') 1139 1140 1141# "Undecorated" here refers to the absence of @asynchronous. 1142class UndecoratedCoroutinesHandler(RequestHandler): 1143 @gen.coroutine 1144 def prepare(self): 1145 self.chunks = [] 1146 yield gen.Task(IOLoop.current().add_callback) 1147 self.chunks.append('1') 1148 1149 @gen.coroutine 1150 def get(self): 1151 self.chunks.append('2') 1152 yield gen.Task(IOLoop.current().add_callback) 1153 self.chunks.append('3') 1154 yield gen.Task(IOLoop.current().add_callback) 1155 self.write(''.join(self.chunks)) 1156 1157 1158class AsyncPrepareErrorHandler(RequestHandler): 1159 @gen.coroutine 1160 def prepare(self): 1161 yield gen.Task(IOLoop.current().add_callback) 1162 raise HTTPError(403) 1163 1164 def get(self): 1165 self.finish('ok') 1166 1167 1168class NativeCoroutineHandler(RequestHandler): 1169 if sys.version_info > (3, 5): 1170 exec(textwrap.dedent(""" 1171 async def get(self): 1172 await gen.Task(IOLoop.current().add_callback) 1173 self.write("ok") 1174 """)) 1175 1176 1177class GenWebTest(AsyncHTTPTestCase): 1178 def get_app(self): 1179 return Application([ 1180 ('/sequence', GenSequenceHandler), 1181 ('/coroutine_sequence', GenCoroutineSequenceHandler), 1182 ('/coroutine_unfinished_sequence', 1183 GenCoroutineUnfinishedSequenceHandler), 1184 ('/task', GenTaskHandler), 1185 ('/exception', GenExceptionHandler), 1186 ('/coroutine_exception', GenCoroutineExceptionHandler), 1187 ('/yield_exception', GenYieldExceptionHandler), 1188 ('/undecorated_coroutine', UndecoratedCoroutinesHandler), 1189 ('/async_prepare_error', AsyncPrepareErrorHandler), 1190 ('/native_coroutine', NativeCoroutineHandler), 1191 ]) 1192 1193 def test_sequence_handler(self): 1194 response = self.fetch('/sequence') 1195 self.assertEqual(response.body, b"123") 1196 1197 def test_coroutine_sequence_handler(self): 1198 response = self.fetch('/coroutine_sequence') 1199 self.assertEqual(response.body, b"123") 1200 1201 def test_coroutine_unfinished_sequence_handler(self): 1202 response = self.fetch('/coroutine_unfinished_sequence') 1203 self.assertEqual(response.body, b"123") 1204 1205 def test_task_handler(self): 1206 response = self.fetch('/task?url=%s' % url_escape(self.get_url('/sequence'))) 1207 self.assertEqual(response.body, b"got response: 123") 1208 1209 def test_exception_handler(self): 1210 # Make sure we get an error and not a timeout 1211 with ExpectLog(app_log, "Uncaught exception GET /exception"): 1212 response = self.fetch('/exception') 1213 self.assertEqual(500, response.code) 1214 1215 def test_coroutine_exception_handler(self): 1216 # Make sure we get an error and not a timeout 1217 with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"): 1218 response = self.fetch('/coroutine_exception') 1219 self.assertEqual(500, response.code) 1220 1221 def test_yield_exception_handler(self): 1222 response = self.fetch('/yield_exception') 1223 self.assertEqual(response.body, b'ok') 1224 1225 def test_undecorated_coroutines(self): 1226 response = self.fetch('/undecorated_coroutine') 1227 self.assertEqual(response.body, b'123') 1228 1229 def test_async_prepare_error_handler(self): 1230 response = self.fetch('/async_prepare_error') 1231 self.assertEqual(response.code, 403) 1232 1233 @skipBefore35 1234 def test_native_coroutine_handler(self): 1235 response = self.fetch('/native_coroutine') 1236 self.assertEqual(response.code, 200) 1237 self.assertEqual(response.body, b'ok') 1238 1239 1240class WithTimeoutTest(AsyncTestCase): 1241 @gen_test 1242 def test_timeout(self): 1243 with self.assertRaises(gen.TimeoutError): 1244 yield gen.with_timeout(datetime.timedelta(seconds=0.1), 1245 Future()) 1246 1247 @gen_test 1248 def test_completes_before_timeout(self): 1249 future = Future() 1250 self.io_loop.add_timeout(datetime.timedelta(seconds=0.1), 1251 lambda: future.set_result('asdf')) 1252 result = yield gen.with_timeout(datetime.timedelta(seconds=3600), 1253 future, io_loop=self.io_loop) 1254 self.assertEqual(result, 'asdf') 1255 1256 @gen_test 1257 def test_fails_before_timeout(self): 1258 future = Future() 1259 self.io_loop.add_timeout( 1260 datetime.timedelta(seconds=0.1), 1261 lambda: future.set_exception(ZeroDivisionError())) 1262 with self.assertRaises(ZeroDivisionError): 1263 yield gen.with_timeout(datetime.timedelta(seconds=3600), 1264 future, io_loop=self.io_loop) 1265 1266 @gen_test 1267 def test_already_resolved(self): 1268 future = Future() 1269 future.set_result('asdf') 1270 result = yield gen.with_timeout(datetime.timedelta(seconds=3600), 1271 future, io_loop=self.io_loop) 1272 self.assertEqual(result, 'asdf') 1273 1274 @unittest.skipIf(futures is None, 'futures module not present') 1275 @gen_test 1276 def test_timeout_concurrent_future(self): 1277 with futures.ThreadPoolExecutor(1) as executor: 1278 with self.assertRaises(gen.TimeoutError): 1279 yield gen.with_timeout(self.io_loop.time(), 1280 executor.submit(time.sleep, 0.1)) 1281 1282 @unittest.skipIf(futures is None, 'futures module not present') 1283 @gen_test 1284 def test_completed_concurrent_future(self): 1285 with futures.ThreadPoolExecutor(1) as executor: 1286 yield gen.with_timeout(datetime.timedelta(seconds=3600), 1287 executor.submit(lambda: None)) 1288 1289 1290class WaitIteratorTest(AsyncTestCase): 1291 @gen_test 1292 def test_empty_iterator(self): 1293 g = gen.WaitIterator() 1294 self.assertTrue(g.done(), 'empty generator iterated') 1295 1296 with self.assertRaises(ValueError): 1297 g = gen.WaitIterator(False, bar=False) 1298 1299 self.assertEqual(g.current_index, None, "bad nil current index") 1300 self.assertEqual(g.current_future, None, "bad nil current future") 1301 1302 @gen_test 1303 def test_already_done(self): 1304 f1 = Future() 1305 f2 = Future() 1306 f3 = Future() 1307 f1.set_result(24) 1308 f2.set_result(42) 1309 f3.set_result(84) 1310 1311 g = gen.WaitIterator(f1, f2, f3) 1312 i = 0 1313 while not g.done(): 1314 r = yield g.next() 1315 # Order is not guaranteed, but the current implementation 1316 # preserves ordering of already-done Futures. 1317 if i == 0: 1318 self.assertEqual(g.current_index, 0) 1319 self.assertIs(g.current_future, f1) 1320 self.assertEqual(r, 24) 1321 elif i == 1: 1322 self.assertEqual(g.current_index, 1) 1323 self.assertIs(g.current_future, f2) 1324 self.assertEqual(r, 42) 1325 elif i == 2: 1326 self.assertEqual(g.current_index, 2) 1327 self.assertIs(g.current_future, f3) 1328 self.assertEqual(r, 84) 1329 i += 1 1330 1331 self.assertEqual(g.current_index, None, "bad nil current index") 1332 self.assertEqual(g.current_future, None, "bad nil current future") 1333 1334 dg = gen.WaitIterator(f1=f1, f2=f2) 1335 1336 while not dg.done(): 1337 dr = yield dg.next() 1338 if dg.current_index == "f1": 1339 self.assertTrue(dg.current_future == f1 and dr == 24, 1340 "WaitIterator dict status incorrect") 1341 elif dg.current_index == "f2": 1342 self.assertTrue(dg.current_future == f2 and dr == 42, 1343 "WaitIterator dict status incorrect") 1344 else: 1345 self.fail("got bad WaitIterator index {}".format( 1346 dg.current_index)) 1347 1348 i += 1 1349 1350 self.assertEqual(dg.current_index, None, "bad nil current index") 1351 self.assertEqual(dg.current_future, None, "bad nil current future") 1352 1353 def finish_coroutines(self, iteration, futures): 1354 if iteration == 3: 1355 futures[2].set_result(24) 1356 elif iteration == 5: 1357 futures[0].set_exception(ZeroDivisionError()) 1358 elif iteration == 8: 1359 futures[1].set_result(42) 1360 futures[3].set_result(84) 1361 1362 if iteration < 8: 1363 self.io_loop.add_callback(self.finish_coroutines, iteration + 1, futures) 1364 1365 @gen_test 1366 def test_iterator(self): 1367 futures = [Future(), Future(), Future(), Future()] 1368 1369 self.finish_coroutines(0, futures) 1370 1371 g = gen.WaitIterator(*futures) 1372 1373 i = 0 1374 while not g.done(): 1375 try: 1376 r = yield g.next() 1377 except ZeroDivisionError: 1378 self.assertIs(g.current_future, futures[0], 1379 'exception future invalid') 1380 else: 1381 if i == 0: 1382 self.assertEqual(r, 24, 'iterator value incorrect') 1383 self.assertEqual(g.current_index, 2, 'wrong index') 1384 elif i == 2: 1385 self.assertEqual(r, 42, 'iterator value incorrect') 1386 self.assertEqual(g.current_index, 1, 'wrong index') 1387 elif i == 3: 1388 self.assertEqual(r, 84, 'iterator value incorrect') 1389 self.assertEqual(g.current_index, 3, 'wrong index') 1390 i += 1 1391 1392 @skipBefore35 1393 @gen_test 1394 def test_iterator_async_await(self): 1395 # Recreate the previous test with py35 syntax. It's a little clunky 1396 # because of the way the previous test handles an exception on 1397 # a single iteration. 1398 futures = [Future(), Future(), Future(), Future()] 1399 self.finish_coroutines(0, futures) 1400 self.finished = False 1401 1402 namespace = exec_test(globals(), locals(), """ 1403 async def f(): 1404 i = 0 1405 g = gen.WaitIterator(*futures) 1406 try: 1407 async for r in g: 1408 if i == 0: 1409 self.assertEqual(r, 24, 'iterator value incorrect') 1410 self.assertEqual(g.current_index, 2, 'wrong index') 1411 else: 1412 raise Exception("expected exception on iteration 1") 1413 i += 1 1414 except ZeroDivisionError: 1415 i += 1 1416 async for r in g: 1417 if i == 2: 1418 self.assertEqual(r, 42, 'iterator value incorrect') 1419 self.assertEqual(g.current_index, 1, 'wrong index') 1420 elif i == 3: 1421 self.assertEqual(r, 84, 'iterator value incorrect') 1422 self.assertEqual(g.current_index, 3, 'wrong index') 1423 else: 1424 raise Exception("didn't expect iteration %d" % i) 1425 i += 1 1426 self.finished = True 1427 """) 1428 yield namespace['f']() 1429 self.assertTrue(self.finished) 1430 1431 @gen_test 1432 def test_no_ref(self): 1433 # In this usage, there is no direct hard reference to the 1434 # WaitIterator itself, only the Future it returns. Since 1435 # WaitIterator uses weak references internally to improve GC 1436 # performance, this used to cause problems. 1437 yield gen.with_timeout(datetime.timedelta(seconds=0.1), 1438 gen.WaitIterator(gen.sleep(0)).next()) 1439 1440 1441class RunnerGCTest(AsyncTestCase): 1442 """Github issue 1769: Runner objects can get GCed unexpectedly""" 1443 @gen_test 1444 def test_gc(self): 1445 """Runners shouldn't GC if future is alive""" 1446 # Create the weakref 1447 weakref_scope = [None] 1448 1449 def callback(): 1450 gc.collect(2) 1451 weakref_scope[0]().set_result(123) 1452 1453 @gen.coroutine 1454 def tester(): 1455 fut = Future() 1456 weakref_scope[0] = weakref.ref(fut) 1457 self.io_loop.add_callback(callback) 1458 yield fut 1459 1460 yield gen.with_timeout( 1461 datetime.timedelta(seconds=0.2), 1462 tester() 1463 ) 1464 1465 1466if __name__ == '__main__': 1467 unittest.main() 1468