1# Utility functions for running tests and reporting the results. 2# 3# Copyright (C) 2007 Lemur Consulting Ltd 4# Copyright (C) 2008,2011 Olly Betts 5# 6# This program is free software; you can redistribute it and/or 7# modify it under the terms of the GNU General Public License as 8# published by the Free Software Foundation; either version 2 of the 9# License, or (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 19# USA 20 21import gc 22import os as _os 23import os.path as _path 24import sys as _sys 25import traceback as _traceback 26import xapian as _xapian 27 28class TestFail(Exception): 29 pass 30 31class TestRunner(object): 32 def __init__(self): 33 """Initialise the TestRunner. 34 35 """ 36 37 self._out = OutProxy(_sys.stdout) 38 39 # _verbose is an integer, higher meaning more verbose 40 self._verbose = _os.environ.get('VERBOSE', '').lower() 41 if self._verbose in ('', '0', 'no', 'off', 'false'): 42 self._verbose = 0 43 else: 44 try: 45 self._verbose = int(self._verbose) 46 except: 47 self._verbose = 1 48 49 # context is a description of what the test is currently checking 50 self._context = None 51 52 def context(self, context): 53 """Set the context. 54 55 This should be a string describing what a test is checking, and will be 56 displayed if the test fails. 57 58 A test may change the context several times - each call will override 59 subsequent calls. 60 61 Set the context to None to remove display of a specific context message. 62 This is performed automatically at the start of each test. 63 64 """ 65 self._context = context 66 if context is not None and self._verbose > 1: 67 self._out.start_line() 68 self._out.write("Context: %s\n" % context) 69 self._out.flush() 70 71 def expect(self, got, expected, message="Expected equality"): 72 """Function used to check for a particular expected value. 73 74 """ 75 if self._verbose > 2: 76 self._out.start_line() 77 self._out.write("Checking for %r: expecting %r ... " % (message, expected)) 78 self._out.flush() 79 if got != expected: 80 if self._verbose > 2: 81 self._out.write_colour(" #red#failed##") 82 self._out.write(": got %r\n" % got) 83 self._out.flush() 84 raise TestFail("%s: got %r, expected %r" % (message, got, expected)) 85 if self._verbose > 2: 86 self._out.write_colour(" #green#ok##\n") 87 self._out.flush() 88 89 def expect_query(self, query, expected): 90 """Check that the description of a query is as expected. 91 92 """ 93 expected = 'Query(' + expected + ')' 94 desc = str(query) 95 if self._verbose > 2: 96 self._out.start_line() 97 self._out.write("Checking str(query): expecting %r ... " % expected) 98 self._out.flush() 99 if desc != expected: 100 if self._verbose > 2: 101 self._out.write_colour(" #red#failed##") 102 self._out.write(": got %r\n" % desc) 103 self._out.flush() 104 raise TestFail("Unexpected str(query): got %r, expected %r" % (desc, expected)) 105 if self._verbose > 2: 106 self._out.write_colour(" #green#ok##\n") 107 self._out.flush() 108 109 def expect_exception(self, expectedclass, expectedmsg, code, *args): 110 """Check that an exception is raised. 111 112 - expectedclass is the class of the exception to check for. 113 - expectedmsg is the message to check for (which can be a string or 114 a callable), or None to skip checking the message. 115 - code is the thing to call. 116 - args are the arguments to pass to it. 117 118 """ 119 if self._verbose > 2: 120 self._out.start_line() 121 self._out.write("Checking for exception: %s(%r) ... " % (str(expectedclass), expectedmsg)) 122 self._out.flush() 123 try: 124 code(*args) 125 if self._verbose > 2: 126 self._out.write_colour(" #red#failed##: no exception occurred\n") 127 self._out.flush() 128 raise TestFail("Expected %s(%r) exception" % (str(expectedclass), expectedmsg)) 129 except expectedclass as e: 130 if expectedmsg is None: 131 pass 132 elif isinstance(expectedmsg, str): 133 if str(e) != expectedmsg: 134 if self._verbose > 2: 135 self._out.write_colour(" #red#failed##") 136 self._out.write(": exception string not as expected: got '%s'\n" % str(e)) 137 self._out.flush() 138 raise TestFail("Exception string not as expected: got '%s', expected '%s'" % (str(e), expectedmsg)) 139 elif callable(expectedmsg): 140 if not expectedmsg(str(e)): 141 if self._verbose > 2: 142 self._out.write_colour(" #red#failed##") 143 self._out.write(": exception string not as expected: got '%s'\n" % str(e)) 144 self._out.flush() 145 raise TestFail("Exception string not as expected: got '%s', expected pattern '%s'" % (str(e), expectedmsg.pattern)) 146 else: 147 raise TestFail("Unexpected expectedmsg: %r" % (expectedmsg,)) 148 if e.__class__ != expectedclass: 149 if self._verbose > 2: 150 self._out.write_colour(" #red#failed##") 151 self._out.write(": didn't get right exception class: got '%s'\n" % str(e.__class__)) 152 self._out.flush() 153 raise TestFail("Didn't get right exception class: got '%s', expected '%s'" % (str(e.__class__), str(expectedclass))) 154 if self._verbose > 2: 155 self._out.write_colour(" #green#ok##\n") 156 self._out.flush() 157 158 def report_failure(self, name, msg, show_traceback=True): 159 "Report a test failure, with some useful context." 160 161 tb = _traceback.extract_tb(_sys.exc_info()[2]) 162 163 # Move up the traceback until we get to the line in the test 164 # function which caused the failure. 165 for line in range(1, len(tb) + 1): 166 if tb[-line][2] == 'test_' + name: 167 break 168 169 # Display the context in the text function. 170 filepath, linenum, functionname, text = tb[-line] 171 filename = _os.path.basename(filepath) 172 173 self._out.ensure_space() 174 self._out.write_colour("#red#FAILED##\n") 175 if self._verbose > 0: 176 if self._context is None: 177 context = '' 178 else: 179 context = ", when %s" % self._context 180 firstline = "%s:%d" % (filename, linenum) 181 self._out.write("\n%s:%s%s\n" % (firstline, msg, context)) 182 183 # Display sourcecode lines 184 lines = open(filepath).readlines() 185 startline = max(linenum - 3, 0) 186 endline = min(linenum + 2, len(lines)) 187 for num in range(startline, endline): 188 if num + 1 == linenum: 189 self._out.write('->') 190 else: 191 self._out.write(' ') 192 self._out.write("%4d %s\n" % (num + 1, lines[num].rstrip())) 193 194 # Display the traceback 195 if show_traceback: 196 self._out.write("Traceback (most recent call last):\n") 197 for line in _traceback.format_list(tb): 198 self._out.write(line.rstrip() + '\n') 199 self._out.write('\n') 200 201 # Display some information about the xapian version and platform 202 self._out.write("Xapian version: %s\n" % _xapian.version_string()) 203 try: 204 import platform 205 platdesc = "%s %s (%s)" % platform.system_alias(platform.system(), 206 platform.release(), 207 platform.version()) 208 self._out.write("Platform: %s\n" % platdesc) 209 except: 210 pass 211 self._out.write('\nWhen reporting this problem, please quote all the preceding lines from\n"%s" onwards.\n\n' % firstline) 212 213 self._out.flush() 214 215 def gc_object_count(self): 216 # Python 2.7 doesn't seem to free all objects even for a full 217 # collection, so collect repeatedly until no further objects get freed. 218 old_count, count = len(gc.get_objects()), 0 219 while True: 220 gc.collect() 221 count = len(gc.get_objects()) 222 if count == old_count: 223 return count 224 old_count = count 225 226 def runtest(self, name, test_fn): 227 """Run a single test. 228 229 """ 230 startline = "Running test: %s..." % name 231 self._out.write(startline) 232 self._out.flush() 233 try: 234 object_count = self.gc_object_count() 235 test_fn() 236 object_count = self.gc_object_count() - object_count 237 if object_count != 0: 238 # Maybe some lazily initialised object got initialised for the 239 # first time, so rerun the test. 240 self._out.ensure_space() 241 msg = "#yellow#possible leak (%d), rerunning## " % object_count 242 self._out.write_colour(msg) 243 object_count = self.gc_object_count() 244 test_fn() 245 expect(self.gc_object_count(), object_count) 246 self._out.write_colour("#green#ok##\n") 247 248 if self._verbose > 0 or self._out.plain: 249 self._out.ensure_space() 250 self._out.write_colour("#green#ok##\n") 251 else: 252 self._out.clear_line() 253 self._out.flush() 254 return True 255 except TestFail as e: 256 self.report_failure(name, str(e), show_traceback=False) 257 except _xapian.Error as e: 258 self.report_failure(name, "%s: %s" % (str(e.__class__), str(e))) 259 except Exception as e: 260 self.report_failure(name, "%s: %s" % (str(e.__class__), str(e))) 261 return False 262 263 def runtests(self, namedict, runonly=None): 264 """Run a set of tests. 265 266 Takes a dictionary of name-value pairs and runs all the values which are 267 callables, for which the name begins with "test_". 268 269 Typical usage is to pass "locals()" as the parameter, to run all callables 270 with names starting "test_" in local scope. 271 272 If runonly is supplied, and non-empty, only those tests which appear in 273 runonly will be run. 274 275 """ 276 tests = [] 277 if isinstance(namedict, dict): 278 for name in namedict: 279 if name.startswith('test_'): 280 fn = namedict[name] 281 name = name[5:] 282 if hasattr(fn, '__call__'): 283 tests.append((name, fn)) 284 tests.sort() 285 else: 286 tests = namedict 287 288 if runonly is not None and len(runonly) != 0: 289 oldtests = tests 290 tests = [] 291 for name, fn in oldtests: 292 if name in runonly: 293 tests.append((name, fn)) 294 295 passed, failed = 0, 0 296 for name, fn in tests: 297 self.context(None) 298 if self.runtest(name, fn): 299 passed += 1 300 else: 301 failed += 1 302 if failed: 303 if self._verbose == 0: 304 self._out.write('Re-run with the environment variable VERBOSE=1 to see details.\n') 305 self._out.write('E.g. make check VERBOSE=1\n') 306 self._out.write_colour("#green#%d## tests passed, #red#%d## tests failed\n" % (passed, failed)) 307 return False 308 else: 309 self._out.write_colour("#green#%d## tests passed, no failures\n" % passed) 310 return True 311 312class OutProxy(object): 313 """Proxy output class to make formatting easier. 314 315 Allows colourisation, and keeps track of whether we're mid-line or not. 316 317 """ 318 319 def __init__(self, out): 320 self._out = out 321 self._line_pos = 0 # Position on current line 322 self._had_space = True # True iff we're preceded by whitespace (including newline) 323 self.plain = not self._allow_control_sequences() 324 self._colours = self.get_colour_strings() 325 326 def _allow_control_sequences(self): 327 "Return True if output device allows control sequences." 328 mode = _os.environ.get("XAPIAN_TESTSUITE_OUTPUT", '').lower() 329 if mode in ('', 'auto'): 330 if _sys.platform == 'win32': 331 return False 332 elif not hasattr(self._out, "isatty"): 333 return False 334 else: 335 return self._out.isatty() 336 elif mode == 'plain': 337 return False 338 return True 339 340 def get_colour_strings(self): 341 """Return a mapping of colour names to colour output sequences. 342 343 """ 344 colours = { 345 'red': "\x1b[1m\x1b[31m", 346 'green': "\x1b[1m\x1b[32m", 347 'yellow': "\x1b[1m\x1b[33m", 348 '': "\x1b[0m", 349 } 350 if self.plain: 351 for key in colours: 352 colours[key] = '' 353 return colours 354 355 def _colourise(self, msg): 356 """Apply colours to a message. 357 358 #colourname# will change the text colour, ## will change the colour back. 359 360 """ 361 for colour, val in self._colours.items(): 362 msg = msg.replace('#%s#' % colour, val) 363 return msg 364 365 def clear_line(self): 366 """Clear the current line of output, if possible. 367 368 Otherwise, just move to the start of the next line. 369 370 """ 371 if self._line_pos == 0: 372 return 373 if self.plain: 374 self.write('\n') 375 else: 376 self.write("\r" + " " * self._line_pos + "\r") 377 378 def start_line(self): 379 """Ensure that we're at the start of a line. 380 381 """ 382 if self._line_pos != 0: 383 self.write('\n') 384 385 def ensure_space(self): 386 """Ensure that we're preceded by whitespace. 387 388 """ 389 if not self._had_space: 390 self.write(' ') 391 392 def write(self, msg): 393 """Write the message to the output stream. 394 395 """ 396 if len(msg) == 0: 397 return 398 399 # Adjust the line position counted 400 nlpos = max(msg.rfind('\n'), msg.rfind('\r')) 401 if nlpos >= 0: 402 subline = msg[nlpos + 1:] 403 self._line_pos = len(subline) # Note - doesn't cope with tabs. 404 else: 405 self._line_pos += len(msg) # Note - doesn't cope with tabs. 406 407 # Record whether we ended with whitespace 408 self._had_space = msg[-1].isspace() 409 410 self._out.write(msg) 411 412 def write_colour(self, msg): 413 """Write a message, first substituting markup for colours. 414 415 """ 416 self.write(self._colourise(msg)) 417 418 def flush(self): 419 self._out.flush() 420 421 422_runner = TestRunner() 423context = _runner.context 424expect = _runner.expect 425expect_query = _runner.expect_query 426expect_exception = _runner.expect_exception 427runtests = _runner.runtests 428 429__all__ = ('TestFail', 'context', 'expect', 'expect_query', 'expect_exception', 'runtests') 430