1from __future__ import absolute_import, division, print_function
2
3import gc
4import contextlib
5import datetime
6import functools
7import sys
8import textwrap
9import time
10import weakref
11
12from tornado.concurrent import return_future, Future
13from tornado.escape import url_escape
14from tornado.httpclient import AsyncHTTPClient
15from tornado.ioloop import IOLoop
16from tornado.log import app_log
17from tornado import stack_context
18from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
19from tornado.test.util import unittest, skipOnTravis, skipBefore33, skipBefore35, skipNotCPython, exec_test
20from tornado.web import Application, RequestHandler, asynchronous, HTTPError
21
22from tornado import gen
23
24try:
25    from concurrent import futures
26except ImportError:
27    futures = None
28
29
30class GenEngineTest(AsyncTestCase):
31    def setUp(self):
32        super(GenEngineTest, self).setUp()
33        self.named_contexts = []
34
35    def named_context(self, name):
36        @contextlib.contextmanager
37        def context():
38            self.named_contexts.append(name)
39            try:
40                yield
41            finally:
42                self.assertEqual(self.named_contexts.pop(), name)
43        return context
44
45    def run_gen(self, f):
46        f()
47        return self.wait()
48
49    def delay_callback(self, iterations, callback, arg):
50        """Runs callback(arg) after a number of IOLoop iterations."""
51        if iterations == 0:
52            callback(arg)
53        else:
54            self.io_loop.add_callback(functools.partial(
55                self.delay_callback, iterations - 1, callback, arg))
56
57    @return_future
58    def async_future(self, result, callback):
59        self.io_loop.add_callback(callback, result)
60
61    @gen.coroutine
62    def async_exception(self, e):
63        yield gen.moment
64        raise e
65
66    def test_no_yield(self):
67        @gen.engine
68        def f():
69            self.stop()
70        self.run_gen(f)
71
72    def test_inline_cb(self):
73        @gen.engine
74        def f():
75            (yield gen.Callback("k1"))()
76            res = yield gen.Wait("k1")
77            self.assertTrue(res is None)
78            self.stop()
79        self.run_gen(f)
80
81    def test_ioloop_cb(self):
82        @gen.engine
83        def f():
84            self.io_loop.add_callback((yield gen.Callback("k1")))
85            yield gen.Wait("k1")
86            self.stop()
87        self.run_gen(f)
88
89    def test_exception_phase1(self):
90        @gen.engine
91        def f():
92            1 / 0
93        self.assertRaises(ZeroDivisionError, self.run_gen, f)
94
95    def test_exception_phase2(self):
96        @gen.engine
97        def f():
98            self.io_loop.add_callback((yield gen.Callback("k1")))
99            yield gen.Wait("k1")
100            1 / 0
101        self.assertRaises(ZeroDivisionError, self.run_gen, f)
102
103    def test_exception_in_task_phase1(self):
104        def fail_task(callback):
105            1 / 0
106
107        @gen.engine
108        def f():
109            try:
110                yield gen.Task(fail_task)
111                raise Exception("did not get expected exception")
112            except ZeroDivisionError:
113                self.stop()
114        self.run_gen(f)
115
116    def test_exception_in_task_phase2(self):
117        # This is the case that requires the use of stack_context in gen.engine
118        def fail_task(callback):
119            self.io_loop.add_callback(lambda: 1 / 0)
120
121        @gen.engine
122        def f():
123            try:
124                yield gen.Task(fail_task)
125                raise Exception("did not get expected exception")
126            except ZeroDivisionError:
127                self.stop()
128        self.run_gen(f)
129
130    def test_with_arg(self):
131        @gen.engine
132        def f():
133            (yield gen.Callback("k1"))(42)
134            res = yield gen.Wait("k1")
135            self.assertEqual(42, res)
136            self.stop()
137        self.run_gen(f)
138
139    def test_with_arg_tuple(self):
140        @gen.engine
141        def f():
142            (yield gen.Callback((1, 2)))((3, 4))
143            res = yield gen.Wait((1, 2))
144            self.assertEqual((3, 4), res)
145            self.stop()
146        self.run_gen(f)
147
148    def test_key_reuse(self):
149        @gen.engine
150        def f():
151            yield gen.Callback("k1")
152            yield gen.Callback("k1")
153            self.stop()
154        self.assertRaises(gen.KeyReuseError, self.run_gen, f)
155
156    def test_key_reuse_tuple(self):
157        @gen.engine
158        def f():
159            yield gen.Callback((1, 2))
160            yield gen.Callback((1, 2))
161            self.stop()
162        self.assertRaises(gen.KeyReuseError, self.run_gen, f)
163
164    def test_key_mismatch(self):
165        @gen.engine
166        def f():
167            yield gen.Callback("k1")
168            yield gen.Wait("k2")
169            self.stop()
170        self.assertRaises(gen.UnknownKeyError, self.run_gen, f)
171
172    def test_key_mismatch_tuple(self):
173        @gen.engine
174        def f():
175            yield gen.Callback((1, 2))
176            yield gen.Wait((2, 3))
177            self.stop()
178        self.assertRaises(gen.UnknownKeyError, self.run_gen, f)
179
180    def test_leaked_callback(self):
181        @gen.engine
182        def f():
183            yield gen.Callback("k1")
184            self.stop()
185        self.assertRaises(gen.LeakedCallbackError, self.run_gen, f)
186
187    def test_leaked_callback_tuple(self):
188        @gen.engine
189        def f():
190            yield gen.Callback((1, 2))
191            self.stop()
192        self.assertRaises(gen.LeakedCallbackError, self.run_gen, f)
193
194    def test_parallel_callback(self):
195        @gen.engine
196        def f():
197            for k in range(3):
198                self.io_loop.add_callback((yield gen.Callback(k)))
199            yield gen.Wait(1)
200            self.io_loop.add_callback((yield gen.Callback(3)))
201            yield gen.Wait(0)
202            yield gen.Wait(3)
203            yield gen.Wait(2)
204            self.stop()
205        self.run_gen(f)
206
207    def test_bogus_yield(self):
208        @gen.engine
209        def f():
210            yield 42
211        self.assertRaises(gen.BadYieldError, self.run_gen, f)
212
213    def test_bogus_yield_tuple(self):
214        @gen.engine
215        def f():
216            yield (1, 2)
217        self.assertRaises(gen.BadYieldError, self.run_gen, f)
218
219    def test_reuse(self):
220        @gen.engine
221        def f():
222            self.io_loop.add_callback((yield gen.Callback(0)))
223            yield gen.Wait(0)
224            self.stop()
225        self.run_gen(f)
226        self.run_gen(f)
227
228    def test_task(self):
229        @gen.engine
230        def f():
231            yield gen.Task(self.io_loop.add_callback)
232            self.stop()
233        self.run_gen(f)
234
235    def test_wait_all(self):
236        @gen.engine
237        def f():
238            (yield gen.Callback("k1"))("v1")
239            (yield gen.Callback("k2"))("v2")
240            results = yield gen.WaitAll(["k1", "k2"])
241            self.assertEqual(results, ["v1", "v2"])
242            self.stop()
243        self.run_gen(f)
244
245    def test_exception_in_yield(self):
246        @gen.engine
247        def f():
248            try:
249                yield gen.Wait("k1")
250                raise Exception("did not get expected exception")
251            except gen.UnknownKeyError:
252                pass
253            self.stop()
254        self.run_gen(f)
255
256    def test_resume_after_exception_in_yield(self):
257        @gen.engine
258        def f():
259            try:
260                yield gen.Wait("k1")
261                raise Exception("did not get expected exception")
262            except gen.UnknownKeyError:
263                pass
264            (yield gen.Callback("k2"))("v2")
265            self.assertEqual((yield gen.Wait("k2")), "v2")
266            self.stop()
267        self.run_gen(f)
268
269    def test_orphaned_callback(self):
270        @gen.engine
271        def f():
272            self.orphaned_callback = yield gen.Callback(1)
273        try:
274            self.run_gen(f)
275            raise Exception("did not get expected exception")
276        except gen.LeakedCallbackError:
277            pass
278        self.orphaned_callback()
279
280    def test_none(self):
281        @gen.engine
282        def f():
283            yield None
284            self.stop()
285        self.run_gen(f)
286
287    def test_multi(self):
288        @gen.engine
289        def f():
290            (yield gen.Callback("k1"))("v1")
291            (yield gen.Callback("k2"))("v2")
292            results = yield [gen.Wait("k1"), gen.Wait("k2")]
293            self.assertEqual(results, ["v1", "v2"])
294            self.stop()
295        self.run_gen(f)
296
297    def test_multi_dict(self):
298        @gen.engine
299        def f():
300            (yield gen.Callback("k1"))("v1")
301            (yield gen.Callback("k2"))("v2")
302            results = yield dict(foo=gen.Wait("k1"), bar=gen.Wait("k2"))
303            self.assertEqual(results, dict(foo="v1", bar="v2"))
304            self.stop()
305        self.run_gen(f)
306
307    # The following tests explicitly run with both gen.Multi
308    # and gen.multi_future (Task returns a Future, so it can be used
309    # with either).
310    def test_multi_yieldpoint_delayed(self):
311        @gen.engine
312        def f():
313            # callbacks run at different times
314            responses = yield gen.Multi([
315                gen.Task(self.delay_callback, 3, arg="v1"),
316                gen.Task(self.delay_callback, 1, arg="v2"),
317            ])
318            self.assertEqual(responses, ["v1", "v2"])
319            self.stop()
320        self.run_gen(f)
321
322    def test_multi_yieldpoint_dict_delayed(self):
323        @gen.engine
324        def f():
325            # callbacks run at different times
326            responses = yield gen.Multi(dict(
327                foo=gen.Task(self.delay_callback, 3, arg="v1"),
328                bar=gen.Task(self.delay_callback, 1, arg="v2"),
329            ))
330            self.assertEqual(responses, dict(foo="v1", bar="v2"))
331            self.stop()
332        self.run_gen(f)
333
334    def test_multi_future_delayed(self):
335        @gen.engine
336        def f():
337            # callbacks run at different times
338            responses = yield gen.multi_future([
339                gen.Task(self.delay_callback, 3, arg="v1"),
340                gen.Task(self.delay_callback, 1, arg="v2"),
341            ])
342            self.assertEqual(responses, ["v1", "v2"])
343            self.stop()
344        self.run_gen(f)
345
346    def test_multi_future_dict_delayed(self):
347        @gen.engine
348        def f():
349            # callbacks run at different times
350            responses = yield gen.multi_future(dict(
351                foo=gen.Task(self.delay_callback, 3, arg="v1"),
352                bar=gen.Task(self.delay_callback, 1, arg="v2"),
353            ))
354            self.assertEqual(responses, dict(foo="v1", bar="v2"))
355            self.stop()
356        self.run_gen(f)
357
358    @skipOnTravis
359    @gen_test
360    def test_multi_performance(self):
361        # Yielding a list used to have quadratic performance; make
362        # sure a large list stays reasonable.  On my laptop a list of
363        # 2000 used to take 1.8s, now it takes 0.12.
364        start = time.time()
365        yield [gen.Task(self.io_loop.add_callback) for i in range(2000)]
366        end = time.time()
367        self.assertLess(end - start, 1.0)
368
369    @gen_test
370    def test_multi_empty(self):
371        # Empty lists or dicts should return the same type.
372        x = yield []
373        self.assertTrue(isinstance(x, list))
374        y = yield {}
375        self.assertTrue(isinstance(y, dict))
376
377    @gen_test
378    def test_multi_mixed_types(self):
379        # A YieldPoint (Wait) and Future (Task) can be combined
380        # (and use the YieldPoint codepath)
381        (yield gen.Callback("k1"))("v1")
382        responses = yield [gen.Wait("k1"),
383                           gen.Task(self.delay_callback, 3, arg="v2")]
384        self.assertEqual(responses, ["v1", "v2"])
385
386    @gen_test
387    def test_future(self):
388        result = yield self.async_future(1)
389        self.assertEqual(result, 1)
390
391    @gen_test
392    def test_multi_future(self):
393        results = yield [self.async_future(1), self.async_future(2)]
394        self.assertEqual(results, [1, 2])
395
396    @gen_test
397    def test_multi_future_duplicate(self):
398        f = self.async_future(2)
399        results = yield [self.async_future(1), f, self.async_future(3), f]
400        self.assertEqual(results, [1, 2, 3, 2])
401
402    @gen_test
403    def test_multi_dict_future(self):
404        results = yield dict(foo=self.async_future(1), bar=self.async_future(2))
405        self.assertEqual(results, dict(foo=1, bar=2))
406
407    @gen_test
408    def test_multi_exceptions(self):
409        with ExpectLog(app_log, "Multiple exceptions in yield list"):
410            with self.assertRaises(RuntimeError) as cm:
411                yield gen.Multi([self.async_exception(RuntimeError("error 1")),
412                                 self.async_exception(RuntimeError("error 2"))])
413        self.assertEqual(str(cm.exception), "error 1")
414
415        # With only one exception, no error is logged.
416        with self.assertRaises(RuntimeError):
417            yield gen.Multi([self.async_exception(RuntimeError("error 1")),
418                             self.async_future(2)])
419
420        # Exception logging may be explicitly quieted.
421        with self.assertRaises(RuntimeError):
422            yield gen.Multi([self.async_exception(RuntimeError("error 1")),
423                             self.async_exception(RuntimeError("error 2"))],
424                            quiet_exceptions=RuntimeError)
425
426    @gen_test
427    def test_multi_future_exceptions(self):
428        with ExpectLog(app_log, "Multiple exceptions in yield list"):
429            with self.assertRaises(RuntimeError) as cm:
430                yield [self.async_exception(RuntimeError("error 1")),
431                       self.async_exception(RuntimeError("error 2"))]
432        self.assertEqual(str(cm.exception), "error 1")
433
434        # With only one exception, no error is logged.
435        with self.assertRaises(RuntimeError):
436            yield [self.async_exception(RuntimeError("error 1")),
437                   self.async_future(2)]
438
439        # Exception logging may be explicitly quieted.
440        with self.assertRaises(RuntimeError):
441            yield gen.multi_future(
442                [self.async_exception(RuntimeError("error 1")),
443                 self.async_exception(RuntimeError("error 2"))],
444                quiet_exceptions=RuntimeError)
445
446    def test_arguments(self):
447        @gen.engine
448        def f():
449            (yield gen.Callback("noargs"))()
450            self.assertEqual((yield gen.Wait("noargs")), None)
451            (yield gen.Callback("1arg"))(42)
452            self.assertEqual((yield gen.Wait("1arg")), 42)
453
454            (yield gen.Callback("kwargs"))(value=42)
455            result = yield gen.Wait("kwargs")
456            self.assertTrue(isinstance(result, gen.Arguments))
457            self.assertEqual(((), dict(value=42)), result)
458            self.assertEqual(dict(value=42), result.kwargs)
459
460            (yield gen.Callback("2args"))(42, 43)
461            result = yield gen.Wait("2args")
462            self.assertTrue(isinstance(result, gen.Arguments))
463            self.assertEqual(((42, 43), {}), result)
464            self.assertEqual((42, 43), result.args)
465
466            def task_func(callback):
467                callback(None, error="foo")
468            result = yield gen.Task(task_func)
469            self.assertTrue(isinstance(result, gen.Arguments))
470            self.assertEqual(((None,), dict(error="foo")), result)
471
472            self.stop()
473        self.run_gen(f)
474
475    def test_stack_context_leak(self):
476        # regression test: repeated invocations of a gen-based
477        # function should not result in accumulated stack_contexts
478        def _stack_depth():
479            head = stack_context._state.contexts[1]
480            length = 0
481
482            while head is not None:
483                length += 1
484                head = head.old_contexts[1]
485
486            return length
487
488        @gen.engine
489        def inner(callback):
490            yield gen.Task(self.io_loop.add_callback)
491            callback()
492
493        @gen.engine
494        def outer():
495            for i in range(10):
496                yield gen.Task(inner)
497
498            stack_increase = _stack_depth() - initial_stack_depth
499            self.assertTrue(stack_increase <= 2)
500            self.stop()
501        initial_stack_depth = _stack_depth()
502        self.run_gen(outer)
503
504    def test_stack_context_leak_exception(self):
505        # same as previous, but with a function that exits with an exception
506        @gen.engine
507        def inner(callback):
508            yield gen.Task(self.io_loop.add_callback)
509            1 / 0
510
511        @gen.engine
512        def outer():
513            for i in range(10):
514                try:
515                    yield gen.Task(inner)
516                except ZeroDivisionError:
517                    pass
518            stack_increase = len(stack_context._state.contexts) - initial_stack_depth
519            self.assertTrue(stack_increase <= 2)
520            self.stop()
521        initial_stack_depth = len(stack_context._state.contexts)
522        self.run_gen(outer)
523
524    def function_with_stack_context(self, callback):
525        # Technically this function should stack_context.wrap its callback
526        # upon entry.  However, it is very common for this step to be
527        # omitted.
528        def step2():
529            self.assertEqual(self.named_contexts, ['a'])
530            self.io_loop.add_callback(callback)
531
532        with stack_context.StackContext(self.named_context('a')):
533            self.io_loop.add_callback(step2)
534
535    @gen_test
536    def test_wait_transfer_stack_context(self):
537        # Wait should not pick up contexts from where callback was invoked,
538        # even if that function improperly fails to wrap its callback.
539        cb = yield gen.Callback('k1')
540        self.function_with_stack_context(cb)
541        self.assertEqual(self.named_contexts, [])
542        yield gen.Wait('k1')
543        self.assertEqual(self.named_contexts, [])
544
545    @gen_test
546    def test_task_transfer_stack_context(self):
547        yield gen.Task(self.function_with_stack_context)
548        self.assertEqual(self.named_contexts, [])
549
550    def test_raise_after_stop(self):
551        # This pattern will be used in the following tests so make sure
552        # the exception propagates as expected.
553        @gen.engine
554        def f():
555            self.stop()
556            1 / 0
557
558        with self.assertRaises(ZeroDivisionError):
559            self.run_gen(f)
560
561    def test_sync_raise_return(self):
562        # gen.Return is allowed in @gen.engine, but it may not be used
563        # to return a value.
564        @gen.engine
565        def f():
566            self.stop(42)
567            raise gen.Return()
568
569        result = self.run_gen(f)
570        self.assertEqual(result, 42)
571
572    def test_async_raise_return(self):
573        @gen.engine
574        def f():
575            yield gen.Task(self.io_loop.add_callback)
576            self.stop(42)
577            raise gen.Return()
578
579        result = self.run_gen(f)
580        self.assertEqual(result, 42)
581
582    def test_sync_raise_return_value(self):
583        @gen.engine
584        def f():
585            raise gen.Return(42)
586
587        with self.assertRaises(gen.ReturnValueIgnoredError):
588            self.run_gen(f)
589
590    def test_sync_raise_return_value_tuple(self):
591        @gen.engine
592        def f():
593            raise gen.Return((1, 2))
594
595        with self.assertRaises(gen.ReturnValueIgnoredError):
596            self.run_gen(f)
597
598    def test_async_raise_return_value(self):
599        @gen.engine
600        def f():
601            yield gen.Task(self.io_loop.add_callback)
602            raise gen.Return(42)
603
604        with self.assertRaises(gen.ReturnValueIgnoredError):
605            self.run_gen(f)
606
607    def test_async_raise_return_value_tuple(self):
608        @gen.engine
609        def f():
610            yield gen.Task(self.io_loop.add_callback)
611            raise gen.Return((1, 2))
612
613        with self.assertRaises(gen.ReturnValueIgnoredError):
614            self.run_gen(f)
615
616    def test_return_value(self):
617        # It is an error to apply @gen.engine to a function that returns
618        # a value.
619        @gen.engine
620        def f():
621            return 42
622
623        with self.assertRaises(gen.ReturnValueIgnoredError):
624            self.run_gen(f)
625
626    def test_return_value_tuple(self):
627        # It is an error to apply @gen.engine to a function that returns
628        # a value.
629        @gen.engine
630        def f():
631            return (1, 2)
632
633        with self.assertRaises(gen.ReturnValueIgnoredError):
634            self.run_gen(f)
635
636    @skipNotCPython
637    def test_task_refcounting(self):
638        # On CPython, tasks and their arguments should be released immediately
639        # without waiting for garbage collection.
640        @gen.engine
641        def f():
642            class Foo(object):
643                pass
644            arg = Foo()
645            self.arg_ref = weakref.ref(arg)
646            task = gen.Task(self.io_loop.add_callback, arg=arg)
647            self.task_ref = weakref.ref(task)
648            yield task
649            self.stop()
650
651        self.run_gen(f)
652        self.assertIs(self.arg_ref(), None)
653        self.assertIs(self.task_ref(), None)
654
655
656class GenCoroutineTest(AsyncTestCase):
657    def setUp(self):
658        # Stray StopIteration exceptions can lead to tests exiting prematurely,
659        # so we need explicit checks here to make sure the tests run all
660        # the way through.
661        self.finished = False
662        super(GenCoroutineTest, self).setUp()
663
664    def tearDown(self):
665        super(GenCoroutineTest, self).tearDown()
666        assert self.finished
667
668    def test_attributes(self):
669        self.finished = True
670
671        def f():
672            yield gen.moment
673
674        coro = gen.coroutine(f)
675        self.assertEqual(coro.__name__, f.__name__)
676        self.assertEqual(coro.__module__, f.__module__)
677        self.assertIs(coro.__wrapped__, f)
678
679    def test_is_coroutine_function(self):
680        self.finished = True
681
682        def f():
683            yield gen.moment
684
685        coro = gen.coroutine(f)
686        self.assertFalse(gen.is_coroutine_function(f))
687        self.assertTrue(gen.is_coroutine_function(coro))
688        self.assertFalse(gen.is_coroutine_function(coro()))
689
690    @gen_test
691    def test_sync_gen_return(self):
692        @gen.coroutine
693        def f():
694            raise gen.Return(42)
695        result = yield f()
696        self.assertEqual(result, 42)
697        self.finished = True
698
699    @gen_test
700    def test_async_gen_return(self):
701        @gen.coroutine
702        def f():
703            yield gen.Task(self.io_loop.add_callback)
704            raise gen.Return(42)
705        result = yield f()
706        self.assertEqual(result, 42)
707        self.finished = True
708
709    @gen_test
710    def test_sync_return(self):
711        @gen.coroutine
712        def f():
713            return 42
714        result = yield f()
715        self.assertEqual(result, 42)
716        self.finished = True
717
718    @skipBefore33
719    @gen_test
720    def test_async_return(self):
721        namespace = exec_test(globals(), locals(), """
722        @gen.coroutine
723        def f():
724            yield gen.Task(self.io_loop.add_callback)
725            return 42
726        """)
727        result = yield namespace['f']()
728        self.assertEqual(result, 42)
729        self.finished = True
730
731    @skipBefore33
732    @gen_test
733    def test_async_early_return(self):
734        # A yield statement exists but is not executed, which means
735        # this function "returns" via an exception.  This exception
736        # doesn't happen before the exception handling is set up.
737        namespace = exec_test(globals(), locals(), """
738        @gen.coroutine
739        def f():
740            if True:
741                return 42
742            yield gen.Task(self.io_loop.add_callback)
743        """)
744        result = yield namespace['f']()
745        self.assertEqual(result, 42)
746        self.finished = True
747
748    @skipBefore35
749    @gen_test
750    def test_async_await(self):
751        # This test verifies that an async function can await a
752        # yield-based gen.coroutine, and that a gen.coroutine
753        # (the test method itself) can yield an async function.
754        namespace = exec_test(globals(), locals(), """
755        async def f():
756            await gen.Task(self.io_loop.add_callback)
757            return 42
758        """)
759        result = yield namespace['f']()
760        self.assertEqual(result, 42)
761        self.finished = True
762
763    @skipBefore35
764    @gen_test
765    def test_asyncio_sleep_zero(self):
766        # asyncio.sleep(0) turns into a special case (equivalent to
767        # `yield None`)
768        namespace = exec_test(globals(), locals(), """
769        async def f():
770            import asyncio
771            await asyncio.sleep(0)
772            return 42
773        """)
774        result = yield namespace['f']()
775        self.assertEqual(result, 42)
776        self.finished = True
777
778    @skipBefore35
779    @gen_test
780    def test_async_await_mixed_multi_native_future(self):
781        namespace = exec_test(globals(), locals(), """
782        async def f1():
783            await gen.Task(self.io_loop.add_callback)
784            return 42
785        """)
786
787        @gen.coroutine
788        def f2():
789            yield gen.Task(self.io_loop.add_callback)
790            raise gen.Return(43)
791
792        results = yield [namespace['f1'](), f2()]
793        self.assertEqual(results, [42, 43])
794        self.finished = True
795
796    @skipBefore35
797    @gen_test
798    def test_async_await_mixed_multi_native_yieldpoint(self):
799        namespace = exec_test(globals(), locals(), """
800        async def f1():
801            await gen.Task(self.io_loop.add_callback)
802            return 42
803        """)
804
805        @gen.coroutine
806        def f2():
807            yield gen.Task(self.io_loop.add_callback)
808            raise gen.Return(43)
809
810        f2(callback=(yield gen.Callback('cb')))
811        results = yield [namespace['f1'](), gen.Wait('cb')]
812        self.assertEqual(results, [42, 43])
813        self.finished = True
814
815    @skipBefore35
816    @gen_test
817    def test_async_with_timeout(self):
818        namespace = exec_test(globals(), locals(), """
819        async def f1():
820            return 42
821        """)
822
823        result = yield gen.with_timeout(datetime.timedelta(hours=1),
824                                        namespace['f1']())
825        self.assertEqual(result, 42)
826        self.finished = True
827
828    @gen_test
829    def test_sync_return_no_value(self):
830        @gen.coroutine
831        def f():
832            return
833        result = yield f()
834        self.assertEqual(result, None)
835        self.finished = True
836
837    @gen_test
838    def test_async_return_no_value(self):
839        # Without a return value we don't need python 3.3.
840        @gen.coroutine
841        def f():
842            yield gen.Task(self.io_loop.add_callback)
843            return
844        result = yield f()
845        self.assertEqual(result, None)
846        self.finished = True
847
848    @gen_test
849    def test_sync_raise(self):
850        @gen.coroutine
851        def f():
852            1 / 0
853        # The exception is raised when the future is yielded
854        # (or equivalently when its result method is called),
855        # not when the function itself is called).
856        future = f()
857        with self.assertRaises(ZeroDivisionError):
858            yield future
859        self.finished = True
860
861    @gen_test
862    def test_async_raise(self):
863        @gen.coroutine
864        def f():
865            yield gen.Task(self.io_loop.add_callback)
866            1 / 0
867        future = f()
868        with self.assertRaises(ZeroDivisionError):
869            yield future
870        self.finished = True
871
872    @gen_test
873    def test_pass_callback(self):
874        @gen.coroutine
875        def f():
876            raise gen.Return(42)
877        result = yield gen.Task(f)
878        self.assertEqual(result, 42)
879        self.finished = True
880
881    @gen_test
882    def test_replace_yieldpoint_exception(self):
883        # Test exception handling: a coroutine can catch one exception
884        # raised by a yield point and raise a different one.
885        @gen.coroutine
886        def f1():
887            1 / 0
888
889        @gen.coroutine
890        def f2():
891            try:
892                yield f1()
893            except ZeroDivisionError:
894                raise KeyError()
895
896        future = f2()
897        with self.assertRaises(KeyError):
898            yield future
899        self.finished = True
900
901    @gen_test
902    def test_swallow_yieldpoint_exception(self):
903        # Test exception handling: a coroutine can catch an exception
904        # raised by a yield point and not raise a different one.
905        @gen.coroutine
906        def f1():
907            1 / 0
908
909        @gen.coroutine
910        def f2():
911            try:
912                yield f1()
913            except ZeroDivisionError:
914                raise gen.Return(42)
915
916        result = yield f2()
917        self.assertEqual(result, 42)
918        self.finished = True
919
920    @gen_test
921    def test_replace_context_exception(self):
922        # Test exception handling: exceptions thrown into the stack context
923        # can be caught and replaced.
924        # Note that this test and the following are for behavior that is
925        # not really supported any more:  coroutines no longer create a
926        # stack context automatically; but one is created after the first
927        # YieldPoint (i.e. not a Future).
928        @gen.coroutine
929        def f2():
930            (yield gen.Callback(1))()
931            yield gen.Wait(1)
932            self.io_loop.add_callback(lambda: 1 / 0)
933            try:
934                yield gen.Task(self.io_loop.add_timeout,
935                               self.io_loop.time() + 10)
936            except ZeroDivisionError:
937                raise KeyError()
938
939        future = f2()
940        with self.assertRaises(KeyError):
941            yield future
942        self.finished = True
943
944    @gen_test
945    def test_swallow_context_exception(self):
946        # Test exception handling: exceptions thrown into the stack context
947        # can be caught and ignored.
948        @gen.coroutine
949        def f2():
950            (yield gen.Callback(1))()
951            yield gen.Wait(1)
952            self.io_loop.add_callback(lambda: 1 / 0)
953            try:
954                yield gen.Task(self.io_loop.add_timeout,
955                               self.io_loop.time() + 10)
956            except ZeroDivisionError:
957                raise gen.Return(42)
958
959        result = yield f2()
960        self.assertEqual(result, 42)
961        self.finished = True
962
963    @gen_test
964    def test_moment(self):
965        calls = []
966
967        @gen.coroutine
968        def f(name, yieldable):
969            for i in range(5):
970                calls.append(name)
971                yield yieldable
972        # First, confirm the behavior without moment: each coroutine
973        # monopolizes the event loop until it finishes.
974        immediate = Future()
975        immediate.set_result(None)
976        yield [f('a', immediate), f('b', immediate)]
977        self.assertEqual(''.join(calls), 'aaaaabbbbb')
978
979        # With moment, they take turns.
980        calls = []
981        yield [f('a', gen.moment), f('b', gen.moment)]
982        self.assertEqual(''.join(calls), 'ababababab')
983        self.finished = True
984
985        calls = []
986        yield [f('a', gen.moment), f('b', immediate)]
987        self.assertEqual(''.join(calls), 'abbbbbaaaa')
988
989    @gen_test
990    def test_sleep(self):
991        yield gen.sleep(0.01)
992        self.finished = True
993
994    @skipBefore33
995    @gen_test
996    def test_py3_leak_exception_context(self):
997        class LeakedException(Exception):
998            pass
999
1000        @gen.coroutine
1001        def inner(iteration):
1002            raise LeakedException(iteration)
1003
1004        try:
1005            yield inner(1)
1006        except LeakedException as e:
1007            self.assertEqual(str(e), "1")
1008            self.assertIsNone(e.__context__)
1009
1010        try:
1011            yield inner(2)
1012        except LeakedException as e:
1013            self.assertEqual(str(e), "2")
1014            self.assertIsNone(e.__context__)
1015
1016        self.finished = True
1017
1018    @skipNotCPython
1019    def test_coroutine_refcounting(self):
1020        # On CPython, tasks and their arguments should be released immediately
1021        # without waiting for garbage collection.
1022        @gen.coroutine
1023        def inner():
1024            class Foo(object):
1025                pass
1026            local_var = Foo()
1027            self.local_ref = weakref.ref(local_var)
1028            yield gen.coroutine(lambda: None)()
1029            raise ValueError('Some error')
1030
1031        @gen.coroutine
1032        def inner2():
1033            try:
1034                yield inner()
1035            except ValueError:
1036                pass
1037
1038        self.io_loop.run_sync(inner2, timeout=3)
1039
1040        self.assertIs(self.local_ref(), None)
1041        self.finished = True
1042
1043
1044class GenSequenceHandler(RequestHandler):
1045    @asynchronous
1046    @gen.engine
1047    def get(self):
1048        self.io_loop = self.request.connection.stream.io_loop
1049        self.io_loop.add_callback((yield gen.Callback("k1")))
1050        yield gen.Wait("k1")
1051        self.write("1")
1052        self.io_loop.add_callback((yield gen.Callback("k2")))
1053        yield gen.Wait("k2")
1054        self.write("2")
1055        # reuse an old key
1056        self.io_loop.add_callback((yield gen.Callback("k1")))
1057        yield gen.Wait("k1")
1058        self.finish("3")
1059
1060
1061class GenCoroutineSequenceHandler(RequestHandler):
1062    @gen.coroutine
1063    def get(self):
1064        self.io_loop = self.request.connection.stream.io_loop
1065        self.io_loop.add_callback((yield gen.Callback("k1")))
1066        yield gen.Wait("k1")
1067        self.write("1")
1068        self.io_loop.add_callback((yield gen.Callback("k2")))
1069        yield gen.Wait("k2")
1070        self.write("2")
1071        # reuse an old key
1072        self.io_loop.add_callback((yield gen.Callback("k1")))
1073        yield gen.Wait("k1")
1074        self.finish("3")
1075
1076
1077class GenCoroutineUnfinishedSequenceHandler(RequestHandler):
1078    @asynchronous
1079    @gen.coroutine
1080    def get(self):
1081        self.io_loop = self.request.connection.stream.io_loop
1082        self.io_loop.add_callback((yield gen.Callback("k1")))
1083        yield gen.Wait("k1")
1084        self.write("1")
1085        self.io_loop.add_callback((yield gen.Callback("k2")))
1086        yield gen.Wait("k2")
1087        self.write("2")
1088        # reuse an old key
1089        self.io_loop.add_callback((yield gen.Callback("k1")))
1090        yield gen.Wait("k1")
1091        # just write, don't finish
1092        self.write("3")
1093
1094
1095class GenTaskHandler(RequestHandler):
1096    @asynchronous
1097    @gen.engine
1098    def get(self):
1099        io_loop = self.request.connection.stream.io_loop
1100        client = AsyncHTTPClient(io_loop=io_loop)
1101        response = yield gen.Task(client.fetch, self.get_argument('url'))
1102        response.rethrow()
1103        self.finish(b"got response: " + response.body)
1104
1105
1106class GenExceptionHandler(RequestHandler):
1107    @asynchronous
1108    @gen.engine
1109    def get(self):
1110        # This test depends on the order of the two decorators.
1111        io_loop = self.request.connection.stream.io_loop
1112        yield gen.Task(io_loop.add_callback)
1113        raise Exception("oops")
1114
1115
1116class GenCoroutineExceptionHandler(RequestHandler):
1117    @gen.coroutine
1118    def get(self):
1119        # This test depends on the order of the two decorators.
1120        io_loop = self.request.connection.stream.io_loop
1121        yield gen.Task(io_loop.add_callback)
1122        raise Exception("oops")
1123
1124
1125class GenYieldExceptionHandler(RequestHandler):
1126    @asynchronous
1127    @gen.engine
1128    def get(self):
1129        io_loop = self.request.connection.stream.io_loop
1130        # Test the interaction of the two stack_contexts.
1131
1132        def fail_task(callback):
1133            io_loop.add_callback(lambda: 1 / 0)
1134        try:
1135            yield gen.Task(fail_task)
1136            raise Exception("did not get expected exception")
1137        except ZeroDivisionError:
1138            self.finish('ok')
1139
1140
1141# "Undecorated" here refers to the absence of @asynchronous.
1142class UndecoratedCoroutinesHandler(RequestHandler):
1143    @gen.coroutine
1144    def prepare(self):
1145        self.chunks = []
1146        yield gen.Task(IOLoop.current().add_callback)
1147        self.chunks.append('1')
1148
1149    @gen.coroutine
1150    def get(self):
1151        self.chunks.append('2')
1152        yield gen.Task(IOLoop.current().add_callback)
1153        self.chunks.append('3')
1154        yield gen.Task(IOLoop.current().add_callback)
1155        self.write(''.join(self.chunks))
1156
1157
1158class AsyncPrepareErrorHandler(RequestHandler):
1159    @gen.coroutine
1160    def prepare(self):
1161        yield gen.Task(IOLoop.current().add_callback)
1162        raise HTTPError(403)
1163
1164    def get(self):
1165        self.finish('ok')
1166
1167
1168class NativeCoroutineHandler(RequestHandler):
1169    if sys.version_info > (3, 5):
1170        exec(textwrap.dedent("""
1171        async def get(self):
1172            await gen.Task(IOLoop.current().add_callback)
1173            self.write("ok")
1174        """))
1175
1176
1177class GenWebTest(AsyncHTTPTestCase):
1178    def get_app(self):
1179        return Application([
1180            ('/sequence', GenSequenceHandler),
1181            ('/coroutine_sequence', GenCoroutineSequenceHandler),
1182            ('/coroutine_unfinished_sequence',
1183             GenCoroutineUnfinishedSequenceHandler),
1184            ('/task', GenTaskHandler),
1185            ('/exception', GenExceptionHandler),
1186            ('/coroutine_exception', GenCoroutineExceptionHandler),
1187            ('/yield_exception', GenYieldExceptionHandler),
1188            ('/undecorated_coroutine', UndecoratedCoroutinesHandler),
1189            ('/async_prepare_error', AsyncPrepareErrorHandler),
1190            ('/native_coroutine', NativeCoroutineHandler),
1191        ])
1192
1193    def test_sequence_handler(self):
1194        response = self.fetch('/sequence')
1195        self.assertEqual(response.body, b"123")
1196
1197    def test_coroutine_sequence_handler(self):
1198        response = self.fetch('/coroutine_sequence')
1199        self.assertEqual(response.body, b"123")
1200
1201    def test_coroutine_unfinished_sequence_handler(self):
1202        response = self.fetch('/coroutine_unfinished_sequence')
1203        self.assertEqual(response.body, b"123")
1204
1205    def test_task_handler(self):
1206        response = self.fetch('/task?url=%s' % url_escape(self.get_url('/sequence')))
1207        self.assertEqual(response.body, b"got response: 123")
1208
1209    def test_exception_handler(self):
1210        # Make sure we get an error and not a timeout
1211        with ExpectLog(app_log, "Uncaught exception GET /exception"):
1212            response = self.fetch('/exception')
1213        self.assertEqual(500, response.code)
1214
1215    def test_coroutine_exception_handler(self):
1216        # Make sure we get an error and not a timeout
1217        with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"):
1218            response = self.fetch('/coroutine_exception')
1219        self.assertEqual(500, response.code)
1220
1221    def test_yield_exception_handler(self):
1222        response = self.fetch('/yield_exception')
1223        self.assertEqual(response.body, b'ok')
1224
1225    def test_undecorated_coroutines(self):
1226        response = self.fetch('/undecorated_coroutine')
1227        self.assertEqual(response.body, b'123')
1228
1229    def test_async_prepare_error_handler(self):
1230        response = self.fetch('/async_prepare_error')
1231        self.assertEqual(response.code, 403)
1232
1233    @skipBefore35
1234    def test_native_coroutine_handler(self):
1235        response = self.fetch('/native_coroutine')
1236        self.assertEqual(response.code, 200)
1237        self.assertEqual(response.body, b'ok')
1238
1239
1240class WithTimeoutTest(AsyncTestCase):
1241    @gen_test
1242    def test_timeout(self):
1243        with self.assertRaises(gen.TimeoutError):
1244            yield gen.with_timeout(datetime.timedelta(seconds=0.1),
1245                                   Future())
1246
1247    @gen_test
1248    def test_completes_before_timeout(self):
1249        future = Future()
1250        self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
1251                                 lambda: future.set_result('asdf'))
1252        result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
1253                                        future, io_loop=self.io_loop)
1254        self.assertEqual(result, 'asdf')
1255
1256    @gen_test
1257    def test_fails_before_timeout(self):
1258        future = Future()
1259        self.io_loop.add_timeout(
1260            datetime.timedelta(seconds=0.1),
1261            lambda: future.set_exception(ZeroDivisionError()))
1262        with self.assertRaises(ZeroDivisionError):
1263            yield gen.with_timeout(datetime.timedelta(seconds=3600),
1264                                   future, io_loop=self.io_loop)
1265
1266    @gen_test
1267    def test_already_resolved(self):
1268        future = Future()
1269        future.set_result('asdf')
1270        result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
1271                                        future, io_loop=self.io_loop)
1272        self.assertEqual(result, 'asdf')
1273
1274    @unittest.skipIf(futures is None, 'futures module not present')
1275    @gen_test
1276    def test_timeout_concurrent_future(self):
1277        with futures.ThreadPoolExecutor(1) as executor:
1278            with self.assertRaises(gen.TimeoutError):
1279                yield gen.with_timeout(self.io_loop.time(),
1280                                       executor.submit(time.sleep, 0.1))
1281
1282    @unittest.skipIf(futures is None, 'futures module not present')
1283    @gen_test
1284    def test_completed_concurrent_future(self):
1285        with futures.ThreadPoolExecutor(1) as executor:
1286            yield gen.with_timeout(datetime.timedelta(seconds=3600),
1287                                   executor.submit(lambda: None))
1288
1289
1290class WaitIteratorTest(AsyncTestCase):
1291    @gen_test
1292    def test_empty_iterator(self):
1293        g = gen.WaitIterator()
1294        self.assertTrue(g.done(), 'empty generator iterated')
1295
1296        with self.assertRaises(ValueError):
1297            g = gen.WaitIterator(False, bar=False)
1298
1299        self.assertEqual(g.current_index, None, "bad nil current index")
1300        self.assertEqual(g.current_future, None, "bad nil current future")
1301
1302    @gen_test
1303    def test_already_done(self):
1304        f1 = Future()
1305        f2 = Future()
1306        f3 = Future()
1307        f1.set_result(24)
1308        f2.set_result(42)
1309        f3.set_result(84)
1310
1311        g = gen.WaitIterator(f1, f2, f3)
1312        i = 0
1313        while not g.done():
1314            r = yield g.next()
1315            # Order is not guaranteed, but the current implementation
1316            # preserves ordering of already-done Futures.
1317            if i == 0:
1318                self.assertEqual(g.current_index, 0)
1319                self.assertIs(g.current_future, f1)
1320                self.assertEqual(r, 24)
1321            elif i == 1:
1322                self.assertEqual(g.current_index, 1)
1323                self.assertIs(g.current_future, f2)
1324                self.assertEqual(r, 42)
1325            elif i == 2:
1326                self.assertEqual(g.current_index, 2)
1327                self.assertIs(g.current_future, f3)
1328                self.assertEqual(r, 84)
1329            i += 1
1330
1331        self.assertEqual(g.current_index, None, "bad nil current index")
1332        self.assertEqual(g.current_future, None, "bad nil current future")
1333
1334        dg = gen.WaitIterator(f1=f1, f2=f2)
1335
1336        while not dg.done():
1337            dr = yield dg.next()
1338            if dg.current_index == "f1":
1339                self.assertTrue(dg.current_future == f1 and dr == 24,
1340                                "WaitIterator dict status incorrect")
1341            elif dg.current_index == "f2":
1342                self.assertTrue(dg.current_future == f2 and dr == 42,
1343                                "WaitIterator dict status incorrect")
1344            else:
1345                self.fail("got bad WaitIterator index {}".format(
1346                    dg.current_index))
1347
1348            i += 1
1349
1350        self.assertEqual(dg.current_index, None, "bad nil current index")
1351        self.assertEqual(dg.current_future, None, "bad nil current future")
1352
1353    def finish_coroutines(self, iteration, futures):
1354        if iteration == 3:
1355            futures[2].set_result(24)
1356        elif iteration == 5:
1357            futures[0].set_exception(ZeroDivisionError())
1358        elif iteration == 8:
1359            futures[1].set_result(42)
1360            futures[3].set_result(84)
1361
1362        if iteration < 8:
1363            self.io_loop.add_callback(self.finish_coroutines, iteration + 1, futures)
1364
1365    @gen_test
1366    def test_iterator(self):
1367        futures = [Future(), Future(), Future(), Future()]
1368
1369        self.finish_coroutines(0, futures)
1370
1371        g = gen.WaitIterator(*futures)
1372
1373        i = 0
1374        while not g.done():
1375            try:
1376                r = yield g.next()
1377            except ZeroDivisionError:
1378                self.assertIs(g.current_future, futures[0],
1379                              'exception future invalid')
1380            else:
1381                if i == 0:
1382                    self.assertEqual(r, 24, 'iterator value incorrect')
1383                    self.assertEqual(g.current_index, 2, 'wrong index')
1384                elif i == 2:
1385                    self.assertEqual(r, 42, 'iterator value incorrect')
1386                    self.assertEqual(g.current_index, 1, 'wrong index')
1387                elif i == 3:
1388                    self.assertEqual(r, 84, 'iterator value incorrect')
1389                    self.assertEqual(g.current_index, 3, 'wrong index')
1390            i += 1
1391
1392    @skipBefore35
1393    @gen_test
1394    def test_iterator_async_await(self):
1395        # Recreate the previous test with py35 syntax. It's a little clunky
1396        # because of the way the previous test handles an exception on
1397        # a single iteration.
1398        futures = [Future(), Future(), Future(), Future()]
1399        self.finish_coroutines(0, futures)
1400        self.finished = False
1401
1402        namespace = exec_test(globals(), locals(), """
1403        async def f():
1404            i = 0
1405            g = gen.WaitIterator(*futures)
1406            try:
1407                async for r in g:
1408                    if i == 0:
1409                        self.assertEqual(r, 24, 'iterator value incorrect')
1410                        self.assertEqual(g.current_index, 2, 'wrong index')
1411                    else:
1412                        raise Exception("expected exception on iteration 1")
1413                    i += 1
1414            except ZeroDivisionError:
1415                i += 1
1416            async for r in g:
1417                if i == 2:
1418                    self.assertEqual(r, 42, 'iterator value incorrect')
1419                    self.assertEqual(g.current_index, 1, 'wrong index')
1420                elif i == 3:
1421                    self.assertEqual(r, 84, 'iterator value incorrect')
1422                    self.assertEqual(g.current_index, 3, 'wrong index')
1423                else:
1424                    raise Exception("didn't expect iteration %d" % i)
1425                i += 1
1426            self.finished = True
1427        """)
1428        yield namespace['f']()
1429        self.assertTrue(self.finished)
1430
1431    @gen_test
1432    def test_no_ref(self):
1433        # In this usage, there is no direct hard reference to the
1434        # WaitIterator itself, only the Future it returns. Since
1435        # WaitIterator uses weak references internally to improve GC
1436        # performance, this used to cause problems.
1437        yield gen.with_timeout(datetime.timedelta(seconds=0.1),
1438                               gen.WaitIterator(gen.sleep(0)).next())
1439
1440
1441class RunnerGCTest(AsyncTestCase):
1442    """Github issue 1769: Runner objects can get GCed unexpectedly"""
1443    @gen_test
1444    def test_gc(self):
1445        """Runners shouldn't GC if future is alive"""
1446        # Create the weakref
1447        weakref_scope = [None]
1448
1449        def callback():
1450            gc.collect(2)
1451            weakref_scope[0]().set_result(123)
1452
1453        @gen.coroutine
1454        def tester():
1455            fut = Future()
1456            weakref_scope[0] = weakref.ref(fut)
1457            self.io_loop.add_callback(callback)
1458            yield fut
1459
1460        yield gen.with_timeout(
1461            datetime.timedelta(seconds=0.2),
1462            tester()
1463        )
1464
1465
1466if __name__ == '__main__':
1467    unittest.main()
1468