1import sys, os
2import imp
3
4helpers_dir = os.getenv("PYCHARM_HELPERS_DIR", sys.path[0])
5if sys.path[0] != helpers_dir:
6    sys.path.insert(0, helpers_dir)
7
8from tcunittest import TeamcityTestResult
9
10from pycharm_run_utils import import_system_module
11from pycharm_run_utils import adjust_sys_path
12from pycharm_run_utils import debug, getModuleName
13
14adjust_sys_path()
15
16re = import_system_module("re")
17inspect = import_system_module("inspect")
18
19try:
20  from attest.reporters import AbstractReporter
21  from attest.collectors import Tests
22  from attest import TestBase
23except:
24  raise NameError("Please, install attests")
25
26class TeamCityReporter(AbstractReporter, TeamcityTestResult):
27  """Teamcity reporter for attests."""
28
29  def __init__(self, prefix):
30    TeamcityTestResult.__init__(self)
31    self.prefix = prefix
32
33  def begin(self, tests):
34    """initialize suite stack and count tests"""
35    self.total = len(tests)
36    self.suite_stack = []
37    self.messages.testCount(self.total)
38
39  def success(self, result):
40    """called when test finished successfully"""
41    suite = self.get_suite_name(result.test)
42    self.start_suite(suite)
43    name = self.get_test_name(result)
44    self.start_test(result, name)
45    self.messages.testFinished(name)
46
47  def failure(self, result):
48    """called when test failed"""
49    suite = self.get_suite_name(result.test)
50    self.start_suite(suite)
51    name = self.get_test_name(result)
52    self.start_test(result, name)
53    exctype, value, tb = result.exc_info
54    error_value = self.find_error_value(tb)
55    if (error_value.startswith("'") or error_value.startswith('"')) and\
56       (error_value.endswith("'") or error_value.endswith('"')):
57      first = self._unescape(self.find_first(error_value))
58      second = self._unescape(self.find_second(error_value))
59    else:
60      first = second = ""
61
62    err = self.formatErr(result.exc_info)
63    if isinstance(result.error, AssertionError):
64      self.messages.testFailed(name, message='Failure',
65        details=err,
66        expected=first, actual=second)
67    else:
68      self.messages.testError(name, message='Error',
69        details=err)
70
71  def finished(self):
72    """called when all tests finished"""
73    self.end_last_suite()
74    for suite in self.suite_stack[::-1]:
75      self.messages.testSuiteFinished(suite)
76
77  def get_test_name(self, result):
78    name = result.test_name
79    ind = name.find("%")    #remove unique module prefix
80    if ind != -1:
81      name = name[:ind]+name[name.find(".", ind):]
82    return name
83
84  def end_last_suite(self):
85    if self.current_suite:
86      self.messages.testSuiteFinished(self.current_suite)
87      self.current_suite = None
88
89  def get_suite_name(self, test):
90    module = inspect.getmodule(test)
91    klass = getattr(test, "im_class", None)
92    file = module.__file__
93    if file.endswith("pyc"):
94      file = file[:-1]
95
96    suite = module.__name__
97    if self.prefix:
98      tmp = file[:-3]
99      ind = tmp.split(self.prefix)[1]
100      suite = ind.replace("/", ".")
101    if klass:
102      suite += "." + klass.__name__
103      lineno = inspect.getsourcelines(klass)
104    else:
105      lineno = ("", 1)
106
107    return (suite, file+":"+str(lineno[1]))
108
109  def start_suite(self, suite_info):
110    """finish previous suite and put current suite
111        to stack"""
112    suite, file = suite_info
113    if suite != self.current_suite:
114      if self.current_suite:
115        if suite.startswith(self.current_suite+"."):
116          self.suite_stack.append(self.current_suite)
117        else:
118          self.messages.testSuiteFinished(self.current_suite)
119          for s in self.suite_stack:
120            if not suite.startswith(s+"."):
121              self.current_suite = s
122              self.messages.testSuiteFinished(self.current_suite)
123            else:
124              break
125      self.current_suite = suite
126      self.messages.testSuiteStarted(self.current_suite, location="file://" + file)
127
128  def start_test(self, result, name):
129    """trying to find test location """
130    real_func = result.test.func_closure[0].cell_contents
131    lineno = inspect.getsourcelines(real_func)
132    file = inspect.getsourcefile(real_func)
133    self.messages.testStarted(name, "file://"+file+":"+str(lineno[1]))
134
135def get_subclasses(module, base_class=TestBase):
136  test_classes = []
137  for name in dir(module):
138    obj = getattr(module, name)
139    try:
140      if issubclass(obj, base_class):
141        test_classes.append(obj)
142    except TypeError:  # If 'obj' is not a class
143      pass
144  return test_classes
145
146def get_module(file_name):
147  baseName = os.path.splitext(os.path.basename(file_name))[0]
148  return imp.load_source(baseName, file_name)
149
150modules = {}
151def getModuleName(prefix, cnt):
152  """ adds unique number to prevent name collisions"""
153  return prefix + "%" + str(cnt)
154
155def loadSource(fileName):
156  baseName = os.path.basename(fileName)
157  moduleName = os.path.splitext(baseName)[0]
158
159  if moduleName in modules:
160    cnt = 2
161    prefix = moduleName
162    while getModuleName(prefix, cnt) in modules:
163      cnt += 1
164    moduleName = getModuleName(prefix, cnt)
165  debug("/ Loading " + fileName + " as " + moduleName)
166  module = imp.load_source(moduleName, fileName)
167  modules[moduleName] = module
168  return module
169
170
171def register_tests_from_module(module, tests):
172  """add tests from module to main test suite"""
173  tests_to_register = []
174
175  for i in dir(module):
176    obj = getattr(module, i)
177    if isinstance(obj, Tests):
178      tests_to_register.append(i)
179
180  for i in tests_to_register:
181    baseName = module.__name__+"."+i
182    tests.register(baseName)
183  test_subclasses = get_subclasses(module)
184  if test_subclasses:
185    for subclass in test_subclasses:
186      tests.register(subclass())
187
188
189def register_tests_from_folder(tests, folder, pattern=None):
190  """add tests from folder to main test suite"""
191  listing = os.listdir(folder)
192  files = listing
193  if pattern: #get files matched given pattern
194    prog_list = [re.compile(pat.strip()) for pat in pattern.split(',')]
195    files = []
196    for fileName in listing:
197      if os.path.isdir(folder+fileName):
198        files.append(fileName)
199      for prog in prog_list:
200        if prog.match(fileName):
201          files.append(fileName)
202
203  if not folder.endswith("/"):
204    folder += "/"
205  for fileName in files:
206    if os.path.isdir(folder+fileName):
207      register_tests_from_folder(tests, folder+fileName, pattern)
208    if not fileName.endswith("py"):
209      continue
210
211    module = loadSource(folder+fileName)
212    register_tests_from_module(module, tests)
213
214def process_args():
215  tests = Tests()
216  prefix = ""
217  if not sys.argv:
218    return
219
220  arg = sys.argv[1].strip()
221  if not len(arg):
222    return
223
224  argument_list = arg.split("::")
225  if len(argument_list) == 1:
226    # From module or folder
227    a_splitted = argument_list[0].split(";")
228    if len(a_splitted) != 1:
229      # means we have pattern to match against
230      if a_splitted[0].endswith("/"):
231        debug("/ from folder " + a_splitted[0] + ". Use pattern: " + a_splitted[1])
232        prefix = a_splitted[0]
233        register_tests_from_folder(tests, a_splitted[0], a_splitted[1])
234    else:
235      if argument_list[0].endswith("/"):
236        debug("/ from folder " + argument_list[0])
237        prefix = a_splitted[0]
238        register_tests_from_folder(tests, argument_list[0])
239      else:
240        debug("/ from file " + argument_list[0])
241        module = get_module(argument_list[0])
242        register_tests_from_module(module, tests)
243
244  elif len(argument_list) == 2:
245    # From testcase
246    debug("/ from test class " + argument_list[1] + " in " + argument_list[0])
247    module = get_module(argument_list[0])
248    klass = getattr(module, argument_list[1])
249    tests.register(klass())
250  else:
251    # From method in class or from function
252    module = get_module(argument_list[0])
253    if argument_list[1] == "":
254      debug("/ from function " + argument_list[2] + " in " + argument_list[0])
255      # test function, not method
256      test = getattr(module, argument_list[2])
257    else:
258      debug("/ from method " + argument_list[2] + " in class " +  argument_list[1] + " in " + argument_list[0])
259      klass = getattr(module, argument_list[1])
260      test = getattr(klass(), argument_list[2])
261    tests.register([test])
262
263  tests.run(reporter=TeamCityReporter(prefix))
264
265if __name__ == "__main__":
266  process_args()