1import unittest
2import yappi
3import asyncio
4import threading
5from utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io
6
7
8@asyncio.coroutine
9def async_sleep(sec):
10    yield from asyncio.sleep(sec)
11
12
13class SingleThreadTests(YappiUnitTestCase):
14
15    def test_recursive_coroutine(self):
16
17        @asyncio.coroutine
18        def a(n):
19            if n <= 0:
20                return
21            yield from async_sleep(0.1)
22            burn_cpu(0.1)
23            yield from a(n - 1)
24            yield from a(n - 2)
25
26        yappi.set_clock_type("cpu")
27        yappi.start()
28        asyncio.get_event_loop().run_until_complete(a(3))
29        yappi.stop()
30
31        r1 = '''
32        ..p/yappi/tests/test_asyncio.py:11 a  9/1    0.000124  0.400667  0.044519
33        ../yappi/tests/utils.py:126 burn_cpu  4      0.000000  0.400099  0.100025
34        async_sleep                           4      0.000000  0.000444  0.000111
35        '''
36        stats = yappi.get_func_stats()
37        self.assert_traces_almost_equal(r1, stats)
38
39    def test_basic_old_style(self):
40
41        @asyncio.coroutine
42        def a():
43            yield from async_sleep(0.1)
44            burn_io(0.1)
45            yield from async_sleep(0.1)
46            burn_io(0.1)
47            yield from async_sleep(0.1)
48            burn_cpu(0.3)
49
50        yappi.set_clock_type("wall")
51        yappi.start(builtins=True)
52        asyncio.get_event_loop().run_until_complete(a())
53        asyncio.get_event_loop().run_until_complete(a())
54        yappi.stop()
55
56        r1 = '''
57        ..p/yappi/tests/test_asyncio.py:43 a  2      0.000118  1.604049  0.802024
58        async_sleep                           6      0.000000  0.603239  0.100540
59        ../yappi/tests/utils.py:126 burn_cpu  2      0.576313  0.600026  0.300013
60        ..p/yappi/tests/utils.py:135 burn_io  4      0.000025  0.400666  0.100166
61        time.sleep                            4      0.400641  0.400641  0.100160
62        '''
63        stats = yappi.get_func_stats()
64
65        self.assert_traces_almost_equal(r1, stats)
66
67        yappi.clear_stats()
68        yappi.set_clock_type("cpu")
69        yappi.start(builtins=True)
70        asyncio.get_event_loop().run_until_complete(a())
71        asyncio.get_event_loop().run_until_complete(a())
72        yappi.stop()
73        stats = yappi.get_func_stats()
74        r1 = '''
75        ..p/yappi/tests/test_asyncio.py:43 a  2      0.000117  0.601170  0.300585
76        ../yappi/tests/utils.py:126 burn_cpu  2      0.000000  0.600047  0.300024
77        async_sleep                           6      0.000159  0.000801  0.000134
78        time.sleep                            4      0.000169  0.000169  0.000042
79        '''
80        self.assert_traces_almost_equal(r1, stats)
81
82
83class MultiThreadTests(YappiUnitTestCase):
84
85    def test_basic(self):
86
87        @asyncio.coroutine
88        def a():
89            yield from async_sleep(0.3)
90            burn_cpu(0.4)
91
92        @asyncio.coroutine
93        def b():
94            yield from a()
95
96        @asyncio.coroutine
97        def recursive_a(n):
98            if not n:
99                return
100            burn_io(0.3)
101            yield from async_sleep(0.3)
102            yield from recursive_a(n - 1)
103
104        tlocal = threading.local()
105
106        def tag_cbk():
107            try:
108                return tlocal._tag
109            except:
110                return -1
111
112        yappi.set_clock_type("wall")
113        tlocal._tag = 0
114        yappi.set_tag_callback(tag_cbk)
115
116        def _thread_event_loop(loop, tag):
117            tlocal._tag = tag
118            asyncio.set_event_loop(loop)
119            loop.run_forever()
120
121        _TCOUNT = 3
122        _ctag = 1
123
124        ts = []
125        for i in range(_TCOUNT):
126            _loop = asyncio.new_event_loop()
127            t = threading.Thread(target=_thread_event_loop, args=(_loop, _ctag))
128            t._loop = _loop
129            t.start()
130
131            ts.append(t)
132            _ctag += 1
133
134        @asyncio.coroutine
135        def stop_loop():
136            asyncio.get_event_loop().stop()
137
138        @asyncio.coroutine
139        def driver():
140            futs = []
141            fut = asyncio.run_coroutine_threadsafe(a(), ts[0]._loop)
142            futs.append(fut)
143            fut = asyncio.run_coroutine_threadsafe(recursive_a(5), ts[1]._loop)
144            futs.append(fut)
145            fut = asyncio.run_coroutine_threadsafe(b(), ts[2]._loop)
146            futs.append(fut)
147            for fut in futs:
148                fut.result()
149
150            # stop asyncio loops in threads
151            for t in ts:
152                asyncio.run_coroutine_threadsafe(stop_loop(), t._loop)
153
154        yappi.start()
155        asyncio.get_event_loop().run_until_complete(driver())
156        yappi.stop()
157        traces = yappi.get_func_stats()
158        t1 = '''
159        tests/test_asyncio.py:137 driver      1      0.000061  3.744064  3.744064
160        tests/test_asyncio.py:96 recursive_a  6/1    0.000188  3.739663  0.623277
161        tests/test_asyncio.py:8 async_sleep   7      0.000085  2.375271  0.339324
162        tests/utils.py:135 burn_io            5      0.000044  1.700000  0.437400
163        tests/test_asyncio.py:87 a            2      0.000019  1.600000  0.921138
164        tests/utils.py:126 burn_cpu           2      0.800000  0.800000  0.509730
165        tests/test_asyncio.py:92 b            1      0.000005  0.800000  0.921055
166        '''
167        self.assert_traces_almost_equal(t1, traces)
168
169        traces = yappi.get_func_stats(filter={'tag': 2})
170        t1 = '''
171        tests/test_asyncio.py:96 recursive_a  6/1    0.000211  3.720011  0.620002
172        tests/utils.py:135 burn_io            5      0.000079  1.700000  0.431813
173        async_sleep                           5      0.000170  1.560735  0.312147
174        '''
175        self.assert_traces_almost_equal(t1, traces)
176
177
178if __name__ == '__main__':
179    unittest.main()
180