1# Testoob, Python Testing Out Of (The) Box 2# Copyright (C) 2005-2006 The Testoob Team 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"convenience functions for running tests" 17 18from __future__ import generators 19 20import time 21 22############################################################################### 23# apply_runner 24############################################################################### 25from testoob.extracting import suite_iter as _suite_iter 26from testoob.extracting import full_extractor as _full_extractor 27 28def apply_decorators(callable, decorators): 29 "Wrap the callable in all the decorators" 30 result = callable 31 for decorator in decorators: 32 result = decorator(result) 33 return result 34 35class TestLoop(object): 36 "Runs the suites" 37 def __init__(self, 38 suites, runner, interval=None, stop_on_fail=False, 39 extraction_decorators=None, fixture_decorators=None): 40 41 from fixture_decorators import BaseFixture 42 self.suites = suites 43 self.runner = runner 44 self.interval = interval 45 self.stop_on_fail = stop_on_fail 46 self.extraction_decorators = extraction_decorators or [] 47 self.fixture_decorators = fixture_decorators or [BaseFixture] 48 49 self.runner.reporter.setParameters(num_tests = self.num_tests) 50 51 def _all_fixtures(self): 52 for suite in _suite_iter(self.suites): 53 for fixture in self.test_extractor(suite): 54 yield fixture 55 all_fixtures = property(_all_fixtures) 56 57 def _num_tests(self): 58 result = 0 59 for suite in _suite_iter(self.suites): 60 result += len(list(self.test_extractor(suite))) 61 return result 62 num_tests = property(_num_tests) 63 64 test_extractor = property( 65 lambda self: apply_decorators(_full_extractor, self.extraction_decorators) 66 ) 67 68 def _run_fixture(self, fixture): 69 decorated_fixture = apply_decorators(fixture, self.fixture_decorators) 70 if hasattr(self, "not_first") and self.interval is not None: 71 time.sleep(self.interval) 72 self.not_first = True 73 self.last_result = self.runner.run(decorated_fixture) 74 75 def _handle_interrupt(self, fixture): 76 from fixture_decorators import get_interrupterd_fixture 77 if hasattr(self, "last_interrupt") and (time.time() - self.last_interrupt < 1): 78 # Two interrupts in less than a second, cause all 79 # future tests to skip 80 self.fixture_decorators = [get_interrupterd_fixture()] 81 self.last_interrupt = time.time() 82 83 # Run the current test again with InterruptedFixture decorator 84 # So it'll be added to the skipped tests' list. 85 decorated_fixture = apply_decorators(fixture, [get_interrupterd_fixture(True)]) 86 self.runner.run(decorated_fixture) 87 88 def _run_all_fixtures(self): 89 for fixture in self.all_fixtures: 90 try: 91 self._run_fixture(fixture) 92 if self.stop_on_fail and not self.last_result: 93 return 94 except KeyboardInterrupt, e: 95 self._handle_interrupt(fixture) 96 97 def run(self): 98 self.runner.reporter.start() 99 self._run_all_fixtures() 100 self.runner.done() 101 return self.runner.isSuccessful() 102 103############################################################################### 104# run 105############################################################################### 106def run(suite=None, suites=None, **kwargs): 107 "Convenience frontend for text_run_suites" 108 if suite is None and suites is None: 109 raise TypeError("either suite or suites must be specified") 110 if suite is not None and suites is not None: 111 raise TypeError("only one of suite or suites may be specified") 112 113 if suites is None: 114 suites = [suite] 115 116 return run_suites(suites, **kwargs) 117 118def _apply_debug(reporter, runDebug): 119 if runDebug is None: 120 return reporter 121 122 def replace(reporter, flavor, methodname): 123 original = getattr(reporter, methodname) 124 def replacement(test, err): 125 runDebug(test, err, flavor, reporter, original) 126 setattr(reporter, methodname, replacement) 127 128 replace(reporter, "error", "addError") 129 replace(reporter, "failure", "addFailure") 130 131 return reporter 132 133def _create_reporter_proxy(reporters, runDebug, threads): 134 from testoob.reporting import ReporterProxy 135 result = ReporterProxy(threads) 136 for reporter in reporters: 137 result.add_observer(_apply_debug(reporter, runDebug)) 138 return result 139 140def run_suites(suites, reporters, runner=None, runDebug=None, threads=None, **kwargs): 141 "Run the test suites" 142 if runner is None: 143 from simplerunner import SimpleRunner 144 runner = SimpleRunner() 145 runner.reporter = _create_reporter_proxy(reporters, runDebug, threads=threads) 146 147 return TestLoop(suites=suites, runner=runner, **kwargs).run() 148 149############################################################################### 150# text_run 151############################################################################### 152def text_run(*args, **kwargs): 153 """ 154 Run suites with a TextStreamReporter. 155 """ 156 from testoob.utils import _pop 157 158 kwargs.setdefault("reporters", []) 159 160 import sys 161 from testoob.reporting import TextStreamReporter 162 reporter_class = _pop(kwargs, "reporter_class", TextStreamReporter) 163 164 from testoob.reporting.options import silent 165 if not silent: 166 kwargs["reporters"].append(reporter_class(stream=sys.stderr)) 167 168 # Always have at least one base reporter, so isSuccessful always works 169 if len(kwargs["reporters"]) == 0: 170 from testoob.reporting.base import BaseReporter 171 kwargs["reporters"].append(BaseReporter()) 172 173 from testoob.reporting.options import coverage 174 for reporter in kwargs["reporters"]: 175 reporter.setCoverageInfo(*coverage) 176 177 return run(*args, **kwargs) 178