1# Testoob, Python Testing Out Of (The) Box
2# Copyright (C) 2005-2006 The Testoob Team
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"convenience functions for running tests"
17
18from __future__ import generators
19
20import time
21
22###############################################################################
23# apply_runner
24###############################################################################
25from testoob.extracting import suite_iter as _suite_iter
26from testoob.extracting import full_extractor as _full_extractor
27
28def apply_decorators(callable, decorators):
29    "Wrap the callable in all the decorators"
30    result = callable
31    for decorator in decorators:
32        result = decorator(result)
33    return result
34
35class TestLoop(object):
36    "Runs the suites"
37    def __init__(self,
38            suites, runner, interval=None, stop_on_fail=False,
39            extraction_decorators=None, fixture_decorators=None):
40
41        from fixture_decorators import BaseFixture
42        self.suites = suites
43        self.runner = runner
44        self.interval = interval
45        self.stop_on_fail = stop_on_fail
46        self.extraction_decorators = extraction_decorators or []
47        self.fixture_decorators = fixture_decorators or [BaseFixture]
48
49        self.runner.reporter.setParameters(num_tests = self.num_tests)
50
51    def _all_fixtures(self):
52        for suite in _suite_iter(self.suites):
53            for fixture in self.test_extractor(suite):
54                yield fixture
55    all_fixtures = property(_all_fixtures)
56
57    def _num_tests(self):
58        result = 0
59        for suite in _suite_iter(self.suites):
60            result += len(list(self.test_extractor(suite)))
61        return result
62    num_tests = property(_num_tests)
63
64    test_extractor = property(
65        lambda self: apply_decorators(_full_extractor, self.extraction_decorators)
66    )
67
68    def _run_fixture(self, fixture):
69        decorated_fixture = apply_decorators(fixture, self.fixture_decorators)
70        if hasattr(self, "not_first") and self.interval is not None:
71            time.sleep(self.interval)
72        self.not_first = True
73        self.last_result = self.runner.run(decorated_fixture)
74
75    def _handle_interrupt(self, fixture):
76        from fixture_decorators import get_interrupterd_fixture
77        if hasattr(self, "last_interrupt") and (time.time() - self.last_interrupt < 1):
78            # Two interrupts in less than a second, cause all
79            # future tests to skip
80            self.fixture_decorators = [get_interrupterd_fixture()]
81        self.last_interrupt = time.time()
82
83        # Run the current test again with InterruptedFixture decorator
84        # So it'll be added to the skipped tests' list.
85        decorated_fixture = apply_decorators(fixture, [get_interrupterd_fixture(True)])
86        self.runner.run(decorated_fixture)
87
88    def _run_all_fixtures(self):
89        for fixture in self.all_fixtures:
90            try:
91                self._run_fixture(fixture)
92                if self.stop_on_fail and not self.last_result:
93                    return
94            except KeyboardInterrupt, e:
95                self._handle_interrupt(fixture)
96
97    def run(self):
98        self.runner.reporter.start()
99        self._run_all_fixtures()
100        self.runner.done()
101        return self.runner.isSuccessful()
102
103###############################################################################
104# run
105###############################################################################
106def run(suite=None, suites=None, **kwargs):
107    "Convenience frontend for text_run_suites"
108    if suite is None and suites is None:
109        raise TypeError("either suite or suites must be specified")
110    if suite is not None and suites is not None:
111        raise TypeError("only one of suite or suites may be specified")
112
113    if suites is None:
114        suites = [suite]
115
116    return run_suites(suites, **kwargs)
117
118def _apply_debug(reporter, runDebug):
119    if runDebug is None:
120        return reporter
121
122    def replace(reporter, flavor, methodname):
123        original = getattr(reporter, methodname)
124        def replacement(test, err):
125            runDebug(test, err, flavor, reporter, original)
126        setattr(reporter, methodname, replacement)
127
128    replace(reporter, "error", "addError")
129    replace(reporter, "failure", "addFailure")
130
131    return reporter
132
133def _create_reporter_proxy(reporters, runDebug, threads):
134    from testoob.reporting import ReporterProxy
135    result = ReporterProxy(threads)
136    for reporter in reporters:
137        result.add_observer(_apply_debug(reporter, runDebug))
138    return result
139
140def run_suites(suites, reporters, runner=None, runDebug=None, threads=None, **kwargs):
141    "Run the test suites"
142    if runner is None:
143        from simplerunner import SimpleRunner
144        runner = SimpleRunner()
145    runner.reporter = _create_reporter_proxy(reporters, runDebug, threads=threads)
146
147    return TestLoop(suites=suites, runner=runner, **kwargs).run()
148
149###############################################################################
150# text_run
151###############################################################################
152def text_run(*args, **kwargs):
153    """
154    Run suites with a TextStreamReporter.
155    """
156    from testoob.utils import _pop
157
158    kwargs.setdefault("reporters", [])
159
160    import sys
161    from testoob.reporting import TextStreamReporter
162    reporter_class = _pop(kwargs, "reporter_class", TextStreamReporter)
163
164    from testoob.reporting.options import silent
165    if not silent:
166        kwargs["reporters"].append(reporter_class(stream=sys.stderr))
167
168    # Always have at least one base reporter, so isSuccessful always works
169    if len(kwargs["reporters"]) == 0:
170        from testoob.reporting.base import BaseReporter
171        kwargs["reporters"].append(BaseReporter())
172
173    from testoob.reporting.options import coverage
174    for reporter in kwargs["reporters"]:
175        reporter.setCoverageInfo(*coverage)
176
177    return run(*args, **kwargs)
178