1# Utility functions for running tests and reporting the results. 2# 3# Copyright (C) 2007 Lemur Consulting Ltd 4# Copyright (C) 2008 Olly Betts 5# 6# This program is free software; you can redistribute it and/or 7# modify it under the terms of the GNU General Public License as 8# published by the Free Software Foundation; either version 2 of the 9# License, or (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 19# USA 20 21import os as _os 22import os.path as _path 23import sys as _sys 24import traceback as _traceback 25import xapian as _xapian 26 27class TestFail(Exception): 28 pass 29 30class TestRunner(object): 31 def __init__(self): 32 """Initialise the TestRunner. 33 34 """ 35 36 self._out = OutProxy(_sys.stdout) 37 38 # _verbose is an integer, higher meaning more verbose 39 self._verbose = _os.environ.get('VERBOSE', '').lower() 40 if self._verbose in ('', '0', 'no', 'off', 'false'): 41 self._verbose = 0 42 else: 43 try: 44 self._verbose = int(self._verbose) 45 except: 46 self._verbose = 1 47 48 # context is a description of what the test is currently checking 49 self._context = None 50 51 def context(self, context): 52 """Set the context. 53 54 This should be a string describing what a test is checking, and will be 55 displayed if the test fails. 56 57 A test may change the context several times - each call will override 58 subsequent calls. 59 60 Set the context to None to remove display of a specific context message. 61 This is performed automatically at the start of each test. 62 63 """ 64 self._context = context 65 if context is not None and self._verbose > 1: 66 self._out.start_line() 67 self._out.write("Context: %s\n" % context) 68 self._out.flush() 69 70 def expect(self, got, expected, message="Expected equality"): 71 """Function used to check for a particular expected value. 72 73 """ 74 if self._verbose > 2: 75 self._out.start_line() 76 self._out.write("Checking for %r: expecting %r ... " % (message, expected)) 77 self._out.flush() 78 if got != expected: 79 if self._verbose > 2: 80 self._out.write_colour(" #red#failed##") 81 self._out.write(": got %r\n" % got) 82 self._out.flush() 83 raise TestFail("%s: got %r, expected %r" % (message, got, expected)) 84 if self._verbose > 2: 85 self._out.write_colour(" #green#ok##\n") 86 self._out.flush() 87 88 def expect_query(self, query, expected): 89 """Check that the description of a query is as expected. 90 91 """ 92 expected = 'Xapian::Query(' + expected + ')' 93 desc = str(query) 94 if self._verbose > 2: 95 self._out.start_line() 96 self._out.write("Checking str(query): expecting %r ... " % expected) 97 self._out.flush() 98 if desc != expected: 99 if self._verbose > 2: 100 self._out.write_colour(" #red#failed##") 101 self._out.write(": got %r\n" % desc) 102 self._out.flush() 103 raise TestFail("Unexpected str(query): got %r, expected %r" % (desc, expected)) 104 if self._verbose > 2: 105 self._out.write_colour(" #green#ok##\n") 106 self._out.flush() 107 108 def expect_exception(self, expectedclass, expectedmsg, callable, *args): 109 if self._verbose > 2: 110 self._out.start_line() 111 self._out.write("Checking for exception: %s(%r) ... " % (str(expectedclass), expectedmsg)) 112 self._out.flush() 113 try: 114 callable(*args) 115 if self._verbose > 2: 116 self._out.write_colour(" #red#failed##: no exception occurred\n") 117 self._out.flush() 118 raise TestFail("Expected %s(%r) exception" % (str(expectedclass), expectedmsg)) 119 except expectedclass as e: 120 if str(e) != expectedmsg: 121 if self._verbose > 2: 122 self._out.write_colour(" #red#failed##") 123 self._out.write(": exception string not as expected: got '%s'\n" % str(e)) 124 self._out.flush() 125 raise TestFail("Exception string not as expected: got '%s', expected '%s'" % (str(e), expectedmsg)) 126 if e.__class__ != expectedclass: 127 if self._verbose > 2: 128 self._out.write_colour(" #red#failed##") 129 self._out.write(": didn't get right exception class: got '%s'\n" % str(e.__class__)) 130 self._out.flush() 131 raise TestFail("Didn't get right exception class: got '%s', expected '%s'" % (str(e.__class__), str(expectedclass))) 132 if self._verbose > 2: 133 self._out.write_colour(" #green#ok##\n") 134 self._out.flush() 135 136 def report_failure(self, name, msg, show_traceback=True): 137 "Report a test failure, with some useful context." 138 139 orig_tb = _traceback.extract_tb(_sys.exc_info()[2]) 140 tb = orig_tb 141 142 # Move up the traceback until we get to the line in the test 143 # function which caused the failure. 144 while tb[-1][2] != 'test_' + name: 145 tb = tb[:-1] 146 147 # Display the context in the text function. 148 filepath, linenum, functionname, text = tb[-1] 149 filename = _os.path.basename(filepath) 150 151 self._out.ensure_space() 152 self._out.write_colour("#red#FAILED##\n") 153 if self._verbose > 0: 154 if self._context is None: 155 context = '' 156 else: 157 context = ", when %s" % self._context 158 firstline = "%s:%d" % (filename, linenum) 159 self._out.write("\n%s:%s%s\n" % (firstline, msg, context)) 160 161 # Display sourcecode lines 162 lines = open(filepath).readlines() 163 startline = max(linenum - 3, 0) 164 endline = min(linenum + 2, len(lines)) 165 for num in range(startline, endline): 166 if num + 1 == linenum: 167 self._out.write('->') 168 else: 169 self._out.write(' ') 170 self._out.write("%4d %s\n" % (num + 1, lines[num].rstrip())) 171 172 # Display the traceback 173 if show_traceback: 174 self._out.write("Traceback (most recent call last):\n") 175 for line in _traceback.format_list(orig_tb): 176 self._out.write(line.rstrip() + '\n') 177 self._out.write('\n') 178 179 # Display some information about the xapian version and platform 180 self._out.write("Xapian version: %s\n" % _xapian.version_string()) 181 try: 182 import platform 183 platdesc = "%s %s (%s)" % platform.system_alias(platform.system(), 184 platform.release(), 185 platform.version()) 186 self._out.write("Platform: %s\n" % platdesc) 187 except: 188 pass 189 self._out.write('\nWhen reporting this problem, please quote all the preceding lines from\n"%s" onwards.\n\n' % firstline) 190 191 self._out.flush() 192 193 def runtest(self, name, test_fn): 194 """Run a single test. 195 196 """ 197 startline = "Running test: %s..." % name 198 self._out.write(startline) 199 self._out.flush() 200 try: 201 test_fn() 202 if self._verbose > 0 or self._out.plain: 203 self._out.ensure_space() 204 self._out.write_colour("#green#ok##\n") 205 else: 206 self._out.clear_line() 207 self._out.flush() 208 return True 209 except TestFail as e: 210 self.report_failure(name, str(e), show_traceback=False) 211 except _xapian.Error as e: 212 self.report_failure(name, "%s: %s" % (str(e.__class__), str(e))) 213 except Exception as e: 214 self.report_failure(name, "%s: %s" % (str(e.__class__), str(e))) 215 return False 216 217 def runtests(self, namedict, runonly=None): 218 """Run a set of tests. 219 220 Takes a dictionary of name-value pairs and runs all the values which are 221 callables, for which the name begins with "test_". 222 223 Typical usage is to pass "locals()" as the parameter, to run all callables 224 with names starting "test_" in local scope. 225 226 If runonly is supplied, and non-empty, only those tests which appear in 227 runonly will be run. 228 229 """ 230 tests = [] 231 if isinstance(namedict, dict): 232 for name in namedict: 233 if name.startswith('test_'): 234 fn = namedict[name] 235 name = name[5:] 236 if hasattr(fn, '__call__'): 237 tests.append((name, fn)) 238 tests.sort() 239 else: 240 tests = namedict 241 242 if runonly is not None and len(runonly) != 0: 243 oldtests = tests 244 tests = [] 245 for name, fn in oldtests: 246 if name in runonly: 247 tests.append((name, fn)) 248 249 passed, failed = 0, 0 250 for name, fn in tests: 251 self.context(None) 252 if self.runtest(name, fn): 253 passed += 1 254 else: 255 failed += 1 256 if failed: 257 if self._verbose == 0: 258 self._out.write('Re-run with the VERBOSE environment variable set to "1" to see details.\n') 259 self._out.write_colour("#green#%d## tests passed, #red#%d## tests failed\n" % (passed, failed)) 260 return False 261 else: 262 self._out.write_colour("#green#%d## tests passed, no failures\n" % passed) 263 return True 264 265class OutProxy(object): 266 """Proxy output class to make formatting easier. 267 268 Allows colourisation, and keeps track of whether we're mid-line or not. 269 270 """ 271 272 def __init__(self, out): 273 self._out = out 274 self._line_pos = 0 # Position on current line 275 self._had_space = True # True iff we're preceded by whitespace (including newline) 276 self.plain = not self._allow_control_sequences() 277 self._colours = self.get_colour_strings() 278 279 def _allow_control_sequences(self): 280 "Return True if output device allows control sequences." 281 mode = _os.environ.get("XAPIAN_TESTSUITE_OUTPUT", '').lower() 282 if mode in ('', 'auto'): 283 if _sys.platform == 'win32': 284 return False 285 elif not hasattr(self._out, "isatty"): 286 return False 287 else: 288 return self._out.isatty() 289 elif mode == 'plain': 290 return False 291 return True 292 293 def get_colour_strings(self): 294 """Return a mapping of colour names to colour output sequences. 295 296 """ 297 colours = { 298 'red': "\x1b[1m\x1b[31m", 299 'green': "\x1b[1m\x1b[32m", 300 'yellow': "\x1b[1m\x1b[33m", 301 '': "\x1b[0m", 302 } 303 if self.plain: 304 for key in colours: 305 colours[key] = '' 306 return colours 307 308 def _colourise(self, msg): 309 """Apply colours to a message. 310 311 #colourname# will change the text colour, ## will change the colour back. 312 313 """ 314 for colour, val in self._colours.items(): 315 msg = msg.replace('#%s#' % colour, val) 316 return msg 317 318 def clear_line(self): 319 """Clear the current line of output, if possible. 320 321 Otherwise, just move to the start of the next line. 322 323 """ 324 if self._line_pos == 0: 325 return 326 if self.plain: 327 self.write('\n') 328 else: 329 self.write("\r" + " " * self._line_pos + "\r") 330 331 def start_line(self): 332 """Ensure that we're at the start of a line. 333 334 """ 335 if self._line_pos != 0: 336 self.write('\n') 337 338 def ensure_space(self): 339 """Ensure that we're preceded by whitespace. 340 341 """ 342 if not self._had_space: 343 self.write(' ') 344 345 def write(self, msg): 346 """Write the message to the output stream. 347 348 """ 349 if len(msg) == 0: 350 return 351 352 # Adjust the line position counted 353 nlpos = max(msg.rfind('\n'), msg.rfind('\r')) 354 if nlpos >= 0: 355 subline = msg[nlpos + 1:] 356 self._line_pos = len(subline) # Note - doesn't cope with tabs. 357 else: 358 self._line_pos += len(msg) # Note - doesn't cope with tabs. 359 360 # Record whether we ended with whitespace 361 self._had_space = msg[-1].isspace() 362 363 self._out.write(msg) 364 365 def write_colour(self, msg): 366 """Write a message, first substituting markup for colours. 367 368 """ 369 self.write(self._colourise(msg)) 370 371 def flush(self): 372 self._out.flush() 373 374 375_runner = TestRunner() 376context = _runner.context 377expect = _runner.expect 378expect_query = _runner.expect_query 379expect_exception = _runner.expect_exception 380runtests = _runner.runtests 381 382__all__ = ('context', 'expect', 'expect_query', 'expect_exception', 'runtests') 383