1import sys, os 2import imp 3 4helpers_dir = os.getenv("PYCHARM_HELPERS_DIR", sys.path[0]) 5if sys.path[0] != helpers_dir: 6 sys.path.insert(0, helpers_dir) 7 8from tcunittest import TeamcityTestResult 9 10from pycharm_run_utils import import_system_module 11from pycharm_run_utils import adjust_sys_path 12from pycharm_run_utils import debug, getModuleName 13 14adjust_sys_path() 15 16re = import_system_module("re") 17inspect = import_system_module("inspect") 18 19try: 20 from attest.reporters import AbstractReporter 21 from attest.collectors import Tests 22 from attest import TestBase 23except: 24 raise NameError("Please, install attests") 25 26class TeamCityReporter(AbstractReporter, TeamcityTestResult): 27 """Teamcity reporter for attests.""" 28 29 def __init__(self, prefix): 30 TeamcityTestResult.__init__(self) 31 self.prefix = prefix 32 33 def begin(self, tests): 34 """initialize suite stack and count tests""" 35 self.total = len(tests) 36 self.suite_stack = [] 37 self.messages.testCount(self.total) 38 39 def success(self, result): 40 """called when test finished successfully""" 41 suite = self.get_suite_name(result.test) 42 self.start_suite(suite) 43 name = self.get_test_name(result) 44 self.start_test(result, name) 45 self.messages.testFinished(name) 46 47 def failure(self, result): 48 """called when test failed""" 49 suite = self.get_suite_name(result.test) 50 self.start_suite(suite) 51 name = self.get_test_name(result) 52 self.start_test(result, name) 53 exctype, value, tb = result.exc_info 54 error_value = self.find_error_value(tb) 55 if (error_value.startswith("'") or error_value.startswith('"')) and\ 56 (error_value.endswith("'") or error_value.endswith('"')): 57 first = self._unescape(self.find_first(error_value)) 58 second = self._unescape(self.find_second(error_value)) 59 else: 60 first = second = "" 61 62 err = self.formatErr(result.exc_info) 63 if isinstance(result.error, AssertionError): 64 self.messages.testFailed(name, message='Failure', 65 details=err, 66 expected=first, actual=second) 67 else: 68 self.messages.testError(name, message='Error', 69 details=err) 70 71 def finished(self): 72 """called when all tests finished""" 73 self.end_last_suite() 74 for suite in self.suite_stack[::-1]: 75 self.messages.testSuiteFinished(suite) 76 77 def get_test_name(self, result): 78 name = result.test_name 79 ind = name.find("%") #remove unique module prefix 80 if ind != -1: 81 name = name[:ind]+name[name.find(".", ind):] 82 return name 83 84 def end_last_suite(self): 85 if self.current_suite: 86 self.messages.testSuiteFinished(self.current_suite) 87 self.current_suite = None 88 89 def get_suite_name(self, test): 90 module = inspect.getmodule(test) 91 klass = getattr(test, "im_class", None) 92 file = module.__file__ 93 if file.endswith("pyc"): 94 file = file[:-1] 95 96 suite = module.__name__ 97 if self.prefix: 98 tmp = file[:-3] 99 ind = tmp.split(self.prefix)[1] 100 suite = ind.replace("/", ".") 101 if klass: 102 suite += "." + klass.__name__ 103 lineno = inspect.getsourcelines(klass) 104 else: 105 lineno = ("", 1) 106 107 return (suite, file+":"+str(lineno[1])) 108 109 def start_suite(self, suite_info): 110 """finish previous suite and put current suite 111 to stack""" 112 suite, file = suite_info 113 if suite != self.current_suite: 114 if self.current_suite: 115 if suite.startswith(self.current_suite+"."): 116 self.suite_stack.append(self.current_suite) 117 else: 118 self.messages.testSuiteFinished(self.current_suite) 119 for s in self.suite_stack: 120 if not suite.startswith(s+"."): 121 self.current_suite = s 122 self.messages.testSuiteFinished(self.current_suite) 123 else: 124 break 125 self.current_suite = suite 126 self.messages.testSuiteStarted(self.current_suite, location="file://" + file) 127 128 def start_test(self, result, name): 129 """trying to find test location """ 130 real_func = result.test.func_closure[0].cell_contents 131 lineno = inspect.getsourcelines(real_func) 132 file = inspect.getsourcefile(real_func) 133 self.messages.testStarted(name, "file://"+file+":"+str(lineno[1])) 134 135def get_subclasses(module, base_class=TestBase): 136 test_classes = [] 137 for name in dir(module): 138 obj = getattr(module, name) 139 try: 140 if issubclass(obj, base_class): 141 test_classes.append(obj) 142 except TypeError: # If 'obj' is not a class 143 pass 144 return test_classes 145 146def get_module(file_name): 147 baseName = os.path.splitext(os.path.basename(file_name))[0] 148 return imp.load_source(baseName, file_name) 149 150modules = {} 151def getModuleName(prefix, cnt): 152 """ adds unique number to prevent name collisions""" 153 return prefix + "%" + str(cnt) 154 155def loadSource(fileName): 156 baseName = os.path.basename(fileName) 157 moduleName = os.path.splitext(baseName)[0] 158 159 if moduleName in modules: 160 cnt = 2 161 prefix = moduleName 162 while getModuleName(prefix, cnt) in modules: 163 cnt += 1 164 moduleName = getModuleName(prefix, cnt) 165 debug("/ Loading " + fileName + " as " + moduleName) 166 module = imp.load_source(moduleName, fileName) 167 modules[moduleName] = module 168 return module 169 170 171def register_tests_from_module(module, tests): 172 """add tests from module to main test suite""" 173 tests_to_register = [] 174 175 for i in dir(module): 176 obj = getattr(module, i) 177 if isinstance(obj, Tests): 178 tests_to_register.append(i) 179 180 for i in tests_to_register: 181 baseName = module.__name__+"."+i 182 tests.register(baseName) 183 test_subclasses = get_subclasses(module) 184 if test_subclasses: 185 for subclass in test_subclasses: 186 tests.register(subclass()) 187 188 189def register_tests_from_folder(tests, folder, pattern=None): 190 """add tests from folder to main test suite""" 191 listing = os.listdir(folder) 192 files = listing 193 if pattern: #get files matched given pattern 194 prog_list = [re.compile(pat.strip()) for pat in pattern.split(',')] 195 files = [] 196 for fileName in listing: 197 if os.path.isdir(folder+fileName): 198 files.append(fileName) 199 for prog in prog_list: 200 if prog.match(fileName): 201 files.append(fileName) 202 203 if not folder.endswith("/"): 204 folder += "/" 205 for fileName in files: 206 if os.path.isdir(folder+fileName): 207 register_tests_from_folder(tests, folder+fileName, pattern) 208 if not fileName.endswith("py"): 209 continue 210 211 module = loadSource(folder+fileName) 212 register_tests_from_module(module, tests) 213 214def process_args(): 215 tests = Tests() 216 prefix = "" 217 if not sys.argv: 218 return 219 220 arg = sys.argv[1].strip() 221 if not len(arg): 222 return 223 224 argument_list = arg.split("::") 225 if len(argument_list) == 1: 226 # From module or folder 227 a_splitted = argument_list[0].split(";") 228 if len(a_splitted) != 1: 229 # means we have pattern to match against 230 if a_splitted[0].endswith("/"): 231 debug("/ from folder " + a_splitted[0] + ". Use pattern: " + a_splitted[1]) 232 prefix = a_splitted[0] 233 register_tests_from_folder(tests, a_splitted[0], a_splitted[1]) 234 else: 235 if argument_list[0].endswith("/"): 236 debug("/ from folder " + argument_list[0]) 237 prefix = a_splitted[0] 238 register_tests_from_folder(tests, argument_list[0]) 239 else: 240 debug("/ from file " + argument_list[0]) 241 module = get_module(argument_list[0]) 242 register_tests_from_module(module, tests) 243 244 elif len(argument_list) == 2: 245 # From testcase 246 debug("/ from test class " + argument_list[1] + " in " + argument_list[0]) 247 module = get_module(argument_list[0]) 248 klass = getattr(module, argument_list[1]) 249 tests.register(klass()) 250 else: 251 # From method in class or from function 252 module = get_module(argument_list[0]) 253 if argument_list[1] == "": 254 debug("/ from function " + argument_list[2] + " in " + argument_list[0]) 255 # test function, not method 256 test = getattr(module, argument_list[2]) 257 else: 258 debug("/ from method " + argument_list[2] + " in class " + argument_list[1] + " in " + argument_list[0]) 259 klass = getattr(module, argument_list[1]) 260 test = getattr(klass(), argument_list[2]) 261 tests.register([test]) 262 263 tests.run(reporter=TeamCityReporter(prefix)) 264 265if __name__ == "__main__": 266 process_args()