1import sys
2import traceback
3import datetime
4import unittest
5
6from tcmessages import TeamcityServiceMessages
7from tcunittest import strclass
8from tcunittest import TeamcityTestResult
9
10_ERROR_TEST_NAME = "ERROR"
11
12try:
13  from nose.failure import Failure
14  from nose.util import isclass # backwards compat
15  from nose.config import Config
16  from nose.result import TextTestResult
17  from nose import SkipTest
18  from nose.plugins.errorclass import ErrorClassPlugin
19except (Exception, ):
20  e = sys.exc_info()[1]
21  raise NameError(
22          "Something went wrong, do you have nosetest installed? I got this error: %s" % e)
23
24class TeamcityPlugin(ErrorClassPlugin, TextTestResult, TeamcityTestResult):
25  """
26  TeamcityTest plugin for nose tests
27  """
28  name = "TeamcityPlugin"
29  enabled = True
30
31  def __init__(self, stream=sys.stderr, descriptions=None, verbosity=1,
32               config=None, errorClasses=None):
33    super(TeamcityPlugin, self).__init__()
34
35    if errorClasses is None:
36      errorClasses = {}
37
38    self.errorClasses = errorClasses
39    if config is None:
40      config = Config()
41    self.config = config
42    self.output = stream
43    self.messages = TeamcityServiceMessages(self.output,
44                                            prepend_linebreak=True)
45    self.messages.testMatrixEntered()
46    self.current_suite = None
47    TextTestResult.__init__(self, stream, descriptions, verbosity, config,
48                            errorClasses)
49    TeamcityTestResult.__init__(self, stream)
50
51  def configure(self, options, conf):
52    if not self.can_configure:
53      return
54    self.conf = conf
55
56
57  def _is_failure(self, test):
58    try:
59      return isinstance(test.test, Failure)
60    except AttributeError:
61      return False
62
63  def addError(self, test, err):
64    exctype, value, tb = err
65    err = self.formatErr(err)
66    if self._is_failure(test):
67        self.messages.testError(_ERROR_TEST_NAME, message='Error', details=err, duration=self.__getDuration(test))
68        return
69
70    if exctype == SkipTest:
71      self.messages.testIgnored(self.getTestName(test), message='Skip')
72    else:
73      self.messages.testError(self.getTestName(test), message='Error', details=err, duration=self.__getDuration(test))
74
75  def formatErr(self, err):
76    exctype, value, tb = err
77    if isinstance(value, str):
78      try:
79        value = exctype(value)
80      except TypeError:
81        pass
82    return ''.join(traceback.format_exception(exctype, value, tb))
83
84  def is_gen(self, test):
85    if hasattr(test, "test") and hasattr(test.test, "descriptor"):
86      if test.test.descriptor is not None:
87        return True
88    return False
89
90
91  def getTestName(self, test):
92    if hasattr(test, "error_context"):
93      return test.error_context
94    test_name_full = str(test)
95    if self.is_gen(test):
96      return test_name_full
97
98    ind_1 = test_name_full.rfind('(')
99    if ind_1 != -1:
100      return test_name_full[:ind_1]
101    return test_name_full
102
103
104  def addFailure(self, test, err):
105    err = self.formatErr(err)
106
107    self.messages.testFailed(self.getTestName(test),
108                             message='Failure', details=err)
109
110
111  def addSkip(self, test, reason):
112    self.messages.testIgnored(self.getTestName(test), message=reason)
113
114
115  def _getSuite(self, test):
116    if hasattr(test, "suite"):
117      suite = strclass(test.suite)
118      suite_location = test.suite.location
119      location = test.suite.abs_location
120      if hasattr(test, "lineno"):
121        location = location + ":" + str(test.lineno)
122      else:
123        location = location + ":" + str(test.test.lineno)
124    else:
125      suite = strclass(test.__class__)
126      suite_location = "python_nosetestid://" + suite
127      try:
128        from nose.util import func_lineno
129
130        if hasattr(test.test, "descriptor") and test.test.descriptor:
131          suite_location = "file://" + self.test_address(
132                  test.test.descriptor)
133          location = suite_location + ":" + str(
134                  func_lineno(test.test.descriptor))
135        else:
136          suite_location = "file://" + self.test_address(
137                  test.test.test)
138          location = "file://" + self.test_address(
139                  test.test.test) + ":" + str(func_lineno(test.test.test))
140      except:
141        test_id = test.id()
142        suite_id = test_id[:test_id.rfind(".")]
143        suite_location = "python_nosetestid://" + str(suite_id)
144        location = "python_nosetestid://" + str(test_id)
145    return (location, suite_location)
146
147
148  def test_address(self, test):
149    if hasattr(test, "address"):
150      return test.address()[0]
151    t = type(test)
152    file = None
153    import types, os
154
155    if (t == types.FunctionType or issubclass(t, type) or t == type
156        or isclass(test)):
157      module = getattr(test, '__module__', None)
158      if module is not None:
159        m = sys.modules[module]
160        file = getattr(m, '__file__', None)
161        if file is not None:
162          file = os.path.abspath(file)
163      if file.endswith("pyc"):
164        file = file[:-1]
165      return file
166    raise TypeError("I don't know what %s is (%s)" % (test, t))
167
168
169  def getSuiteName(self, test):
170    test_name_full = str(test)
171
172    ind_1 = test_name_full.rfind('(')
173    if self.is_gen(test) and ind_1 != -1:
174      ind = test_name_full[:ind_1].rfind('.')
175      if ind != -1:
176        return test_name_full[:ind]
177
178    if ind_1 != -1:
179      return test_name_full[ind_1 + 1: -1]
180    ind = test_name_full.rfind('.')
181    if ind != -1:
182      return test_name_full[:test_name_full.rfind(".")]
183    return test_name_full
184
185
186  def startTest(self, test):
187    location, suite_location = self._getSuite(test)
188    if self._is_failure(test):
189      self.messages.testStarted(_ERROR_TEST_NAME, location=suite_location)
190      return
191    suite = self.getSuiteName(test)
192    if suite != self.current_suite:
193      if self.current_suite:
194        self.messages.testSuiteFinished(self.current_suite)
195      self.current_suite = suite
196      self.messages.testSuiteStarted(self.current_suite,
197                                     location=suite_location)
198    setattr(test, "startTime", datetime.datetime.now())
199    self.messages.testStarted(self.getTestName(test), location=location)
200
201
202  def stopTest(self, test):
203    duration = self.__getDuration(test)
204    if self._is_failure(test):
205      return  # Finish reported by testError
206    self.messages.testFinished(self.getTestName(test),
207                               duration=int(duration))
208
209  def __getDuration(self, test):
210    start = getattr(test, "startTime", datetime.datetime.now())
211    d = datetime.datetime.now() - start
212    duration = d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000
213    return duration
214
215  def finalize(self, result):
216    if self.current_suite:
217      self.messages.testSuiteFinished(self.current_suite)
218      self.current_suite = None
219
220
221class TeamcityNoseRunner(unittest.TextTestRunner):
222  """Test runner that supports teamcity output
223  """
224
225  def __init__(self, stream=sys.stdout, descriptions=1, verbosity=1,
226               config=None):
227    if config is None:
228      config = Config()
229    self.config = config
230
231    unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
232
233
234  def _makeResult(self):
235    return TeamcityPlugin(self.stream,
236                          self.descriptions,
237                          self.verbosity,
238                          self.config)
239
240  def run(self, test):
241    """Overrides to provide plugin hooks and defer all output to
242    the test result class.
243    """
244    #for 2.5 compat
245    plugins = self.config.plugins
246    plugins.configure(self.config.options, self.config)
247    plugins.begin()
248    wrapper = plugins.prepareTest(test)
249    if wrapper is not None:
250      test = wrapper
251
252    # plugins can decorate or capture the output stream
253    wrapped = self.config.plugins.setOutputStream(self.stream)
254    if wrapped is not None:
255      self.stream = wrapped
256
257    result = self._makeResult()
258    test(result)
259    result.endLastSuite()
260    plugins.finalize(result)
261
262    return result