1import sys 2import traceback 3import datetime 4import unittest 5 6from tcmessages import TeamcityServiceMessages 7from tcunittest import strclass 8from tcunittest import TeamcityTestResult 9 10_ERROR_TEST_NAME = "ERROR" 11 12try: 13 from nose.failure import Failure 14 from nose.util import isclass # backwards compat 15 from nose.config import Config 16 from nose.result import TextTestResult 17 from nose import SkipTest 18 from nose.plugins.errorclass import ErrorClassPlugin 19except (Exception, ): 20 e = sys.exc_info()[1] 21 raise NameError( 22 "Something went wrong, do you have nosetest installed? I got this error: %s" % e) 23 24class TeamcityPlugin(ErrorClassPlugin, TextTestResult, TeamcityTestResult): 25 """ 26 TeamcityTest plugin for nose tests 27 """ 28 name = "TeamcityPlugin" 29 enabled = True 30 31 def __init__(self, stream=sys.stderr, descriptions=None, verbosity=1, 32 config=None, errorClasses=None): 33 super(TeamcityPlugin, self).__init__() 34 35 if errorClasses is None: 36 errorClasses = {} 37 38 self.errorClasses = errorClasses 39 if config is None: 40 config = Config() 41 self.config = config 42 self.output = stream 43 self.messages = TeamcityServiceMessages(self.output, 44 prepend_linebreak=True) 45 self.messages.testMatrixEntered() 46 self.current_suite = None 47 TextTestResult.__init__(self, stream, descriptions, verbosity, config, 48 errorClasses) 49 TeamcityTestResult.__init__(self, stream) 50 51 def configure(self, options, conf): 52 if not self.can_configure: 53 return 54 self.conf = conf 55 56 57 def _is_failure(self, test): 58 try: 59 return isinstance(test.test, Failure) 60 except AttributeError: 61 return False 62 63 def addError(self, test, err): 64 exctype, value, tb = err 65 err = self.formatErr(err) 66 if self._is_failure(test): 67 self.messages.testError(_ERROR_TEST_NAME, message='Error', details=err, duration=self.__getDuration(test)) 68 return 69 70 if exctype == SkipTest: 71 self.messages.testIgnored(self.getTestName(test), message='Skip') 72 else: 73 self.messages.testError(self.getTestName(test), message='Error', details=err, duration=self.__getDuration(test)) 74 75 def formatErr(self, err): 76 exctype, value, tb = err 77 if isinstance(value, str): 78 try: 79 value = exctype(value) 80 except TypeError: 81 pass 82 return ''.join(traceback.format_exception(exctype, value, tb)) 83 84 def is_gen(self, test): 85 if hasattr(test, "test") and hasattr(test.test, "descriptor"): 86 if test.test.descriptor is not None: 87 return True 88 return False 89 90 91 def getTestName(self, test): 92 if hasattr(test, "error_context"): 93 return test.error_context 94 test_name_full = str(test) 95 if self.is_gen(test): 96 return test_name_full 97 98 ind_1 = test_name_full.rfind('(') 99 if ind_1 != -1: 100 return test_name_full[:ind_1] 101 return test_name_full 102 103 104 def addFailure(self, test, err): 105 err = self.formatErr(err) 106 107 self.messages.testFailed(self.getTestName(test), 108 message='Failure', details=err) 109 110 111 def addSkip(self, test, reason): 112 self.messages.testIgnored(self.getTestName(test), message=reason) 113 114 115 def _getSuite(self, test): 116 if hasattr(test, "suite"): 117 suite = strclass(test.suite) 118 suite_location = test.suite.location 119 location = test.suite.abs_location 120 if hasattr(test, "lineno"): 121 location = location + ":" + str(test.lineno) 122 else: 123 location = location + ":" + str(test.test.lineno) 124 else: 125 suite = strclass(test.__class__) 126 suite_location = "python_nosetestid://" + suite 127 try: 128 from nose.util import func_lineno 129 130 if hasattr(test.test, "descriptor") and test.test.descriptor: 131 suite_location = "file://" + self.test_address( 132 test.test.descriptor) 133 location = suite_location + ":" + str( 134 func_lineno(test.test.descriptor)) 135 else: 136 suite_location = "file://" + self.test_address( 137 test.test.test) 138 location = "file://" + self.test_address( 139 test.test.test) + ":" + str(func_lineno(test.test.test)) 140 except: 141 test_id = test.id() 142 suite_id = test_id[:test_id.rfind(".")] 143 suite_location = "python_nosetestid://" + str(suite_id) 144 location = "python_nosetestid://" + str(test_id) 145 return (location, suite_location) 146 147 148 def test_address(self, test): 149 if hasattr(test, "address"): 150 return test.address()[0] 151 t = type(test) 152 file = None 153 import types, os 154 155 if (t == types.FunctionType or issubclass(t, type) or t == type 156 or isclass(test)): 157 module = getattr(test, '__module__', None) 158 if module is not None: 159 m = sys.modules[module] 160 file = getattr(m, '__file__', None) 161 if file is not None: 162 file = os.path.abspath(file) 163 if file.endswith("pyc"): 164 file = file[:-1] 165 return file 166 raise TypeError("I don't know what %s is (%s)" % (test, t)) 167 168 169 def getSuiteName(self, test): 170 test_name_full = str(test) 171 172 ind_1 = test_name_full.rfind('(') 173 if self.is_gen(test) and ind_1 != -1: 174 ind = test_name_full[:ind_1].rfind('.') 175 if ind != -1: 176 return test_name_full[:ind] 177 178 if ind_1 != -1: 179 return test_name_full[ind_1 + 1: -1] 180 ind = test_name_full.rfind('.') 181 if ind != -1: 182 return test_name_full[:test_name_full.rfind(".")] 183 return test_name_full 184 185 186 def startTest(self, test): 187 location, suite_location = self._getSuite(test) 188 if self._is_failure(test): 189 self.messages.testStarted(_ERROR_TEST_NAME, location=suite_location) 190 return 191 suite = self.getSuiteName(test) 192 if suite != self.current_suite: 193 if self.current_suite: 194 self.messages.testSuiteFinished(self.current_suite) 195 self.current_suite = suite 196 self.messages.testSuiteStarted(self.current_suite, 197 location=suite_location) 198 setattr(test, "startTime", datetime.datetime.now()) 199 self.messages.testStarted(self.getTestName(test), location=location) 200 201 202 def stopTest(self, test): 203 duration = self.__getDuration(test) 204 if self._is_failure(test): 205 return # Finish reported by testError 206 self.messages.testFinished(self.getTestName(test), 207 duration=int(duration)) 208 209 def __getDuration(self, test): 210 start = getattr(test, "startTime", datetime.datetime.now()) 211 d = datetime.datetime.now() - start 212 duration = d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000 213 return duration 214 215 def finalize(self, result): 216 if self.current_suite: 217 self.messages.testSuiteFinished(self.current_suite) 218 self.current_suite = None 219 220 221class TeamcityNoseRunner(unittest.TextTestRunner): 222 """Test runner that supports teamcity output 223 """ 224 225 def __init__(self, stream=sys.stdout, descriptions=1, verbosity=1, 226 config=None): 227 if config is None: 228 config = Config() 229 self.config = config 230 231 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity) 232 233 234 def _makeResult(self): 235 return TeamcityPlugin(self.stream, 236 self.descriptions, 237 self.verbosity, 238 self.config) 239 240 def run(self, test): 241 """Overrides to provide plugin hooks and defer all output to 242 the test result class. 243 """ 244 #for 2.5 compat 245 plugins = self.config.plugins 246 plugins.configure(self.config.options, self.config) 247 plugins.begin() 248 wrapper = plugins.prepareTest(test) 249 if wrapper is not None: 250 test = wrapper 251 252 # plugins can decorate or capture the output stream 253 wrapped = self.config.plugins.setOutputStream(self.stream) 254 if wrapped is not None: 255 self.stream = wrapped 256 257 result = self._makeResult() 258 test(result) 259 result.endLastSuite() 260 plugins.finalize(result) 261 262 return result