1#!/usr/bin/env python
2"""Run trollius unittests.
3
4Usage:
5  python3 runtests.py [flags] [pattern] ...
6
7Patterns are matched against the fully qualified name of the test,
8including package, module, class and method,
9e.g. 'tests.test_events.PolicyTests.testPolicy'.
10
11For full help, try --help.
12
13runtests.py --coverage is equivalent of:
14
15  $(COVERAGE) run --branch runtests.py -v
16  $(COVERAGE) html $(list of files)
17  $(COVERAGE) report -m $(list of files)
18
19"""
20
21# Originally written by Beech Horn (for NDB).
22
23from __future__ import print_function
24import optparse
25import gc
26import logging
27import os
28import random
29import re
30import sys
31import textwrap
32PY2 = (sys.version_info < (3,))
33PY33 = (sys.version_info >= (3, 3))
34if PY33:
35    import importlib.machinery
36else:
37    import imp
38try:
39    import coverage
40except ImportError:
41    coverage = None
42if PY2:
43    sys.exc_clear()
44
45try:
46    import unittest2 as unittest
47    from unittest2.signals import installHandler
48except ImportError:
49    import unittest
50    from unittest.signals import installHandler
51
52ARGS = optparse.OptionParser(description="Run all unittests.", usage="%prog [options] [pattern] [pattern2 ...]")
53ARGS.add_option(
54    '-v', '--verbose', type=int, dest='verbose',
55    default=0, help='verbose')
56ARGS.add_option(
57    '-x', action="store_true", dest='exclude', help='exclude tests')
58ARGS.add_option(
59    '-f', '--failfast', action="store_true", default=False,
60    dest='failfast', help='Stop on first fail or error')
61ARGS.add_option(
62    '--no-ssl', action="store_true", default=False,
63    help='Disable the SSL module')
64ARGS.add_option(
65    '--no-concurrent', action="store_true", default=False,
66    help='Disable the concurrent module')
67ARGS.add_option(
68    '-c', '--catch', action="store_true", default=False,
69    dest='catchbreak', help='Catch control-C and display results')
70ARGS.add_option(
71    '--forever', action="store_true", dest='forever', default=False,
72    help='run tests forever to catch sporadic errors')
73ARGS.add_option(
74    '--findleaks', action='store_true', dest='findleaks',
75    help='detect tests that leak memory')
76ARGS.add_option(
77    '-r', '--randomize', action='store_true',
78    help='randomize test execution order.')
79ARGS.add_option(
80    '--seed', type=int,
81    help='random seed to reproduce a previous random run')
82ARGS.add_option(
83    '-q', action="store_true", dest='quiet', help='quiet')
84ARGS.add_option(
85    '--tests', action="store", dest='testsdir', default='tests',
86    help='tests directory')
87ARGS.add_option(
88    '--coverage', action="store_true", dest='coverage',
89    help='enable html coverage report')
90
91
92if PY33:
93    def load_module(modname, sourcefile):
94        loader = importlib.machinery.SourceFileLoader(modname, sourcefile)
95        return loader.load_module()
96else:
97    def load_module(modname, sourcefile):
98        return imp.load_source(modname, sourcefile)
99
100
101def load_modules(basedir, suffix='.py'):
102    import trollius.test_utils
103
104    def list_dir(prefix, dir):
105        files = []
106
107        modpath = os.path.join(dir, '__init__.py')
108        if os.path.isfile(modpath):
109            mod = os.path.split(dir)[-1]
110            files.append(('{0}{1}'.format(prefix, mod), modpath))
111
112            prefix = '{0}{1}.'.format(prefix, mod)
113
114        for name in os.listdir(dir):
115            path = os.path.join(dir, name)
116
117            if os.path.isdir(path):
118                files.extend(list_dir('{0}{1}.'.format(prefix, name), path))
119            else:
120                if (name != '__init__.py' and
121                    name.endswith(suffix) and
122                    not name.startswith(('.', '_'))):
123                    files.append(('{0}{1}'.format(prefix, name[:-3]), path))
124
125        return files
126
127    mods = []
128    for modname, sourcefile in list_dir('', basedir):
129        if modname == 'runtests':
130            continue
131        if modname == 'test_asyncio' and not PY33:
132            print("Skipping '{0}': need at least Python 3.3".format(modname),
133                  file=sys.stderr)
134            continue
135        try:
136            mod = load_module(modname, sourcefile)
137            mods.append((mod, sourcefile))
138        except SyntaxError:
139            raise
140        except trollius.test_utils.SkipTest as err:
141            print("Skipping '{0}': {1}".format(modname, err), file=sys.stderr)
142
143    return mods
144
145
146def randomize_tests(tests, seed):
147    if seed is None:
148        seed = random.randrange(10000000)
149    random.seed(seed)
150    print("Randomize test execution order (seed: %s)" % seed)
151    random.shuffle(tests._tests)
152
153
154class TestsFinder:
155
156    def __init__(self, testsdir, includes=(), excludes=()):
157        self._testsdir = testsdir
158        self._includes = includes
159        self._excludes = excludes
160        self.find_available_tests()
161
162    def find_available_tests(self):
163        """
164        Find available test classes without instantiating them.
165        """
166        self._test_factories = []
167        mods = [mod for mod, _ in load_modules(self._testsdir)]
168        for mod in mods:
169            for name in set(dir(mod)):
170                if name.endswith('Tests'):
171                    self._test_factories.append(getattr(mod, name))
172
173    def load_tests(self):
174        """
175        Load test cases from the available test classes and apply
176        optional include / exclude filters.
177        """
178        loader = unittest.TestLoader()
179        suite = unittest.TestSuite()
180        for test_factory in self._test_factories:
181            tests = loader.loadTestsFromTestCase(test_factory)
182            if self._includes:
183                tests = [test
184                         for test in tests
185                         if any(re.search(pat, test.id())
186                                for pat in self._includes)]
187            if self._excludes:
188                tests = [test
189                         for test in tests
190                         if not any(re.search(pat, test.id())
191                                    for pat in self._excludes)]
192            suite.addTests(tests)
193        return suite
194
195
196class TestResult(unittest.TextTestResult):
197
198    def __init__(self, stream, descriptions, verbosity):
199        super().__init__(stream, descriptions, verbosity)
200        self.leaks = []
201
202    def startTest(self, test):
203        super().startTest(test)
204        gc.collect()
205
206    def addSuccess(self, test):
207        super().addSuccess(test)
208        gc.collect()
209        if gc.garbage:
210            if self.showAll:
211                self.stream.writeln(
212                    "    Warning: test created {} uncollectable "
213                    "object(s).".format(len(gc.garbage)))
214            # move the uncollectable objects somewhere so we don't see
215            # them again
216            self.leaks.append((self.getDescription(test), gc.garbage[:]))
217            del gc.garbage[:]
218
219
220class TestRunner(unittest.TextTestRunner):
221    resultclass = TestResult
222
223    def run(self, test):
224        result = super().run(test)
225        if result.leaks:
226            self.stream.writeln("{0} tests leaks:".format(len(result.leaks)))
227            for name, leaks in result.leaks:
228                self.stream.writeln(' '*4 + name + ':')
229                for leak in leaks:
230                    self.stream.writeln(' '*8 + repr(leak))
231        return result
232
233
234def _runtests(args, tests):
235    v = 0 if args.quiet else args.verbose + 1
236    runner_factory = TestRunner if args.findleaks else unittest.TextTestRunner
237    if args.randomize:
238        randomize_tests(tests, args.seed)
239    runner = runner_factory(verbosity=v, failfast=args.failfast)
240    sys.stdout.flush()
241    sys.stderr.flush()
242    return runner.run(tests)
243
244
245def runtests():
246    args, pattern = ARGS.parse_args()
247
248    if args.no_ssl:
249        sys.modules['ssl'] = None
250
251    if args.no_concurrent:
252        sys.modules['concurrent'] = None
253
254    if args.coverage and coverage is None:
255        URL = "bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py"
256        print(textwrap.dedent("""
257            coverage package is not installed.
258
259            To install coverage3 for Python 3, you need:
260              - Setuptools (https://pypi.python.org/pypi/setuptools)
261
262              What worked for me:
263              - download {0}
264                 * curl -O https://{0}
265              - python3 ez_setup.py
266              - python3 -m easy_install coverage
267        """.format(URL)).strip())
268        sys.exit(1)
269
270    testsdir = os.path.abspath(args.testsdir)
271    if not os.path.isdir(testsdir):
272        print("Tests directory is not found: {0}\n".format(testsdir))
273        ARGS.print_help()
274        return
275
276    excludes = includes = []
277    if args.exclude:
278        excludes = pattern
279    else:
280        includes = pattern
281
282    v = 0 if args.quiet else args.verbose + 1
283    failfast = args.failfast
284
285    if args.coverage:
286        cov = coverage.coverage(branch=True,
287                                source=['asyncio'],
288                                )
289        cov.start()
290
291    if v == 0:
292        level = logging.CRITICAL
293    elif v == 1:
294        level = logging.ERROR
295    elif v == 2:
296        level = logging.WARNING
297    elif v == 3:
298        level = logging.INFO
299    elif v >= 4:
300        level = logging.DEBUG
301    logging.basicConfig(level=level)
302
303    finder = TestsFinder(args.testsdir, includes, excludes)
304    if args.catchbreak:
305        installHandler()
306    import trollius.coroutines
307    if trollius.coroutines._DEBUG:
308        print("Run tests in debug mode")
309    else:
310        print("Run tests in release mode")
311    try:
312        tests = finder.load_tests()
313        if args.forever:
314            while True:
315                result = _runtests(args, tests)
316                if not result.wasSuccessful():
317                    sys.exit(1)
318        else:
319            result = _runtests(args, tests)
320            sys.exit(not result.wasSuccessful())
321    finally:
322        if args.coverage:
323            cov.stop()
324            cov.save()
325            cov.html_report(directory='htmlcov')
326            print("\nCoverage report:")
327            cov.report(show_missing=False)
328            here = os.path.dirname(os.path.abspath(__file__))
329            print("\nFor html report:")
330            print("open file://{0}/htmlcov/index.html".format(here))
331
332
333if __name__ == '__main__':
334    runtests()
335