1"""Benchmark some basic import use-cases.
2
3The assumption is made that this benchmark is run in a fresh interpreter and
4thus has no external changes made to import-related attributes in sys.
5
6"""
7from test.test_importlib import util
8import decimal
9import imp
10import importlib
11import importlib.machinery
12import json
13import os
14import py_compile
15import sys
16import tabnanny
17import timeit
18
19
20def bench(name, cleanup=lambda: None, *, seconds=1, repeat=3):
21    """Bench the given statement as many times as necessary until total
22    executions take one second."""
23    stmt = "__import__({!r})".format(name)
24    timer = timeit.Timer(stmt)
25    for x in range(repeat):
26        total_time = 0
27        count = 0
28        while total_time < seconds:
29            try:
30                total_time += timer.timeit(1)
31            finally:
32                cleanup()
33            count += 1
34        else:
35            # One execution too far
36            if total_time > seconds:
37                count -= 1
38        yield count // seconds
39
40def from_cache(seconds, repeat):
41    """sys.modules"""
42    name = '<benchmark import>'
43    module = imp.new_module(name)
44    module.__file__ = '<test>'
45    module.__package__ = ''
46    with util.uncache(name):
47        sys.modules[name] = module
48        yield from bench(name, repeat=repeat, seconds=seconds)
49
50
51def builtin_mod(seconds, repeat):
52    """Built-in module"""
53    name = 'errno'
54    if name in sys.modules:
55        del sys.modules[name]
56    # Relying on built-in importer being implicit.
57    yield from bench(name, lambda: sys.modules.pop(name), repeat=repeat,
58                     seconds=seconds)
59
60
61def source_wo_bytecode(seconds, repeat):
62    """Source w/o bytecode: small"""
63    sys.dont_write_bytecode = True
64    try:
65        name = '__importlib_test_benchmark__'
66        # Clears out sys.modules and puts an entry at the front of sys.path.
67        with util.create_modules(name) as mapping:
68            assert not os.path.exists(imp.cache_from_source(mapping[name]))
69            sys.meta_path.append(importlib.machinery.PathFinder)
70            loader = (importlib.machinery.SourceFileLoader,
71                      importlib.machinery.SOURCE_SUFFIXES)
72            sys.path_hooks.append(importlib.machinery.FileFinder.path_hook(loader))
73            yield from bench(name, lambda: sys.modules.pop(name), repeat=repeat,
74                             seconds=seconds)
75    finally:
76        sys.dont_write_bytecode = False
77
78
79def _wo_bytecode(module):
80    name = module.__name__
81    def benchmark_wo_bytecode(seconds, repeat):
82        """Source w/o bytecode: {}"""
83        bytecode_path = imp.cache_from_source(module.__file__)
84        if os.path.exists(bytecode_path):
85            os.unlink(bytecode_path)
86        sys.dont_write_bytecode = True
87        try:
88            yield from bench(name, lambda: sys.modules.pop(name),
89                             repeat=repeat, seconds=seconds)
90        finally:
91            sys.dont_write_bytecode = False
92
93    benchmark_wo_bytecode.__doc__ = benchmark_wo_bytecode.__doc__.format(name)
94    return benchmark_wo_bytecode
95
96tabnanny_wo_bytecode = _wo_bytecode(tabnanny)
97decimal_wo_bytecode = _wo_bytecode(decimal)
98
99
100def source_writing_bytecode(seconds, repeat):
101    """Source writing bytecode: small"""
102    assert not sys.dont_write_bytecode
103    name = '__importlib_test_benchmark__'
104    with util.create_modules(name) as mapping:
105        sys.meta_path.append(importlib.machinery.PathFinder)
106        loader = (importlib.machinery.SourceFileLoader,
107                  importlib.machinery.SOURCE_SUFFIXES)
108        sys.path_hooks.append(importlib.machinery.FileFinder.path_hook(loader))
109        def cleanup():
110            sys.modules.pop(name)
111            os.unlink(imp.cache_from_source(mapping[name]))
112        for result in bench(name, cleanup, repeat=repeat, seconds=seconds):
113            assert not os.path.exists(imp.cache_from_source(mapping[name]))
114            yield result
115
116
117def _writing_bytecode(module):
118    name = module.__name__
119    def writing_bytecode_benchmark(seconds, repeat):
120        """Source writing bytecode: {}"""
121        assert not sys.dont_write_bytecode
122        def cleanup():
123            sys.modules.pop(name)
124            os.unlink(imp.cache_from_source(module.__file__))
125        yield from bench(name, cleanup, repeat=repeat, seconds=seconds)
126
127    writing_bytecode_benchmark.__doc__ = (
128                                writing_bytecode_benchmark.__doc__.format(name))
129    return writing_bytecode_benchmark
130
131tabnanny_writing_bytecode = _writing_bytecode(tabnanny)
132decimal_writing_bytecode = _writing_bytecode(decimal)
133
134
135def source_using_bytecode(seconds, repeat):
136    """Source w/ bytecode: small"""
137    name = '__importlib_test_benchmark__'
138    with util.create_modules(name) as mapping:
139        sys.meta_path.append(importlib.machinery.PathFinder)
140        loader = (importlib.machinery.SourceFileLoader,
141                  importlib.machinery.SOURCE_SUFFIXES)
142        sys.path_hooks.append(importlib.machinery.FileFinder.path_hook(loader))
143        py_compile.compile(mapping[name])
144        assert os.path.exists(imp.cache_from_source(mapping[name]))
145        yield from bench(name, lambda: sys.modules.pop(name), repeat=repeat,
146                         seconds=seconds)
147
148
149def _using_bytecode(module):
150    name = module.__name__
151    def using_bytecode_benchmark(seconds, repeat):
152        """Source w/ bytecode: {}"""
153        py_compile.compile(module.__file__)
154        yield from bench(name, lambda: sys.modules.pop(name), repeat=repeat,
155                         seconds=seconds)
156
157    using_bytecode_benchmark.__doc__ = (
158                                using_bytecode_benchmark.__doc__.format(name))
159    return using_bytecode_benchmark
160
161tabnanny_using_bytecode = _using_bytecode(tabnanny)
162decimal_using_bytecode = _using_bytecode(decimal)
163
164
165def main(import_, options):
166    if options.source_file:
167        with options.source_file:
168            prev_results = json.load(options.source_file)
169    else:
170        prev_results = {}
171    __builtins__.__import__ = import_
172    benchmarks = (from_cache, builtin_mod,
173                  source_writing_bytecode,
174                  source_wo_bytecode, source_using_bytecode,
175                  tabnanny_writing_bytecode,
176                  tabnanny_wo_bytecode, tabnanny_using_bytecode,
177                  decimal_writing_bytecode,
178                  decimal_wo_bytecode, decimal_using_bytecode,
179                )
180    if options.benchmark:
181        for b in benchmarks:
182            if b.__doc__ == options.benchmark:
183                benchmarks = [b]
184                break
185        else:
186            print('Unknown benchmark: {!r}'.format(options.benchmark),
187                  file=sys.stderr)
188            sys.exit(1)
189    seconds = 1
190    seconds_plural = 's' if seconds > 1 else ''
191    repeat = 3
192    header = ('Measuring imports/second over {} second{}, best out of {}\n'
193              'Entire benchmark run should take about {} seconds\n'
194              'Using {!r} as __import__\n')
195    print(header.format(seconds, seconds_plural, repeat,
196                        len(benchmarks) * seconds * repeat, __import__))
197    new_results = {}
198    for benchmark in benchmarks:
199        print(benchmark.__doc__, "[", end=' ')
200        sys.stdout.flush()
201        results = []
202        for result in benchmark(seconds=seconds, repeat=repeat):
203            results.append(result)
204            print(result, end=' ')
205            sys.stdout.flush()
206        assert not sys.dont_write_bytecode
207        print("]", "best is", format(max(results), ',d'))
208        new_results[benchmark.__doc__] = results
209    if prev_results:
210        print('\n\nComparing new vs. old\n')
211        for benchmark in benchmarks:
212            benchmark_name = benchmark.__doc__
213            old_result = max(prev_results[benchmark_name])
214            new_result = max(new_results[benchmark_name])
215            result = '{:,d} vs. {:,d} ({:%})'.format(new_result,
216                                                     old_result,
217                                              new_result/old_result)
218            print(benchmark_name, ':', result)
219    if options.dest_file:
220        with options.dest_file:
221            json.dump(new_results, options.dest_file, indent=2)
222
223
224if __name__ == '__main__':
225    import argparse
226
227    parser = argparse.ArgumentParser()
228    parser.add_argument('-b', '--builtin', dest='builtin', action='store_true',
229                        default=False, help="use the built-in __import__")
230    parser.add_argument('-r', '--read', dest='source_file',
231                        type=argparse.FileType('r'),
232                        help='file to read benchmark data from to compare '
233                             'against')
234    parser.add_argument('-w', '--write', dest='dest_file',
235                        type=argparse.FileType('w'),
236                        help='file to write benchmark data to')
237    parser.add_argument('--benchmark', dest='benchmark',
238                        help='specific benchmark to run')
239    options = parser.parse_args()
240    import_ = __import__
241    if not options.builtin:
242        import_ = importlib.__import__
243
244    main(import_, options)
245