1# Utility functions for running tests and reporting the results.
2#
3# Copyright (C) 2007 Lemur Consulting Ltd
4# Copyright (C) 2008 Olly Betts
5#
6# This program is free software; you can redistribute it and/or
7# modify it under the terms of the GNU General Public License as
8# published by the Free Software Foundation; either version 2 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
19# USA
20
21import os as _os
22import os.path as _path
23import sys as _sys
24import traceback as _traceback
25import xapian as _xapian
26
27class TestFail(Exception):
28    pass
29
30class TestRunner(object):
31    def __init__(self):
32        """Initialise the TestRunner.
33
34        """
35
36        self._out = OutProxy(_sys.stdout)
37
38        # _verbose is an integer, higher meaning more verbose
39        self._verbose = _os.environ.get('VERBOSE', '').lower()
40        if self._verbose in ('', '0', 'no', 'off', 'false'):
41            self._verbose = 0
42        else:
43            try:
44                self._verbose = int(self._verbose)
45            except:
46                self._verbose = 1
47
48        # context is a description of what the test is currently checking
49        self._context = None
50
51    def context(self, context):
52        """Set the context.
53
54        This should be a string describing what a test is checking, and will be
55        displayed if the test fails.
56
57        A test may change the context several times - each call will override
58        subsequent calls.
59
60        Set the context to None to remove display of a specific context message.
61        This is performed automatically at the start of each test.
62
63        """
64        self._context = context
65        if context is not None and self._verbose > 1:
66            self._out.start_line()
67            self._out.write("Context: %s\n" % context)
68            self._out.flush()
69
70    def expect(self, got, expected, message="Expected equality"):
71        """Function used to check for a particular expected value.
72
73        """
74        if self._verbose > 2:
75            self._out.start_line()
76            self._out.write("Checking for %r: expecting %r ... " % (message, expected))
77            self._out.flush()
78        if got != expected:
79            if self._verbose > 2:
80                self._out.write_colour(" #red#failed##")
81                self._out.write(": got %r\n" % got)
82                self._out.flush()
83            raise TestFail("%s: got %r, expected %r" % (message, got, expected))
84        if self._verbose > 2:
85            self._out.write_colour(" #green#ok##\n")
86            self._out.flush()
87
88    def expect_query(self, query, expected):
89        """Check that the description of a query is as expected.
90
91        """
92        expected = 'Xapian::Query(' + expected + ')'
93        desc = str(query)
94        if self._verbose > 2:
95            self._out.start_line()
96            self._out.write("Checking str(query): expecting %r ... " % expected)
97            self._out.flush()
98        if desc != expected:
99            if self._verbose > 2:
100                self._out.write_colour(" #red#failed##")
101                self._out.write(": got %r\n" % desc)
102                self._out.flush()
103            raise TestFail("Unexpected str(query): got %r, expected %r" % (desc, expected))
104        if self._verbose > 2:
105            self._out.write_colour(" #green#ok##\n")
106            self._out.flush()
107
108    def expect_exception(self, expectedclass, expectedmsg, callable, *args):
109        if self._verbose > 2:
110            self._out.start_line()
111            self._out.write("Checking for exception: %s(%r) ... " % (str(expectedclass), expectedmsg))
112            self._out.flush()
113        try:
114            callable(*args)
115            if self._verbose > 2:
116                self._out.write_colour(" #red#failed##: no exception occurred\n")
117                self._out.flush()
118            raise TestFail("Expected %s(%r) exception" % (str(expectedclass), expectedmsg))
119        except expectedclass as e:
120            if str(e) != expectedmsg:
121                if self._verbose > 2:
122                    self._out.write_colour(" #red#failed##")
123                    self._out.write(": exception string not as expected: got '%s'\n" % str(e))
124                    self._out.flush()
125                raise TestFail("Exception string not as expected: got '%s', expected '%s'" % (str(e), expectedmsg))
126            if e.__class__ != expectedclass:
127                if self._verbose > 2:
128                    self._out.write_colour(" #red#failed##")
129                    self._out.write(": didn't get right exception class: got '%s'\n" % str(e.__class__))
130                    self._out.flush()
131                raise TestFail("Didn't get right exception class: got '%s', expected '%s'" % (str(e.__class__), str(expectedclass)))
132        if self._verbose > 2:
133            self._out.write_colour(" #green#ok##\n")
134            self._out.flush()
135
136    def report_failure(self, name, msg, show_traceback=True):
137        "Report a test failure, with some useful context."
138
139        orig_tb = _traceback.extract_tb(_sys.exc_info()[2])
140        tb = orig_tb
141
142        # Move up the traceback until we get to the line in the test
143        # function which caused the failure.
144        while tb[-1][2] != 'test_' + name:
145            tb = tb[:-1]
146
147        # Display the context in the text function.
148        filepath, linenum, functionname, text = tb[-1]
149        filename = _os.path.basename(filepath)
150
151        self._out.ensure_space()
152        self._out.write_colour("#red#FAILED##\n")
153        if self._verbose > 0:
154            if self._context is None:
155                context = ''
156            else:
157                context = ", when %s" % self._context
158            firstline = "%s:%d" % (filename, linenum)
159            self._out.write("\n%s:%s%s\n" % (firstline, msg, context))
160
161            # Display sourcecode lines
162            lines = open(filepath).readlines()
163            startline = max(linenum - 3, 0)
164            endline = min(linenum + 2, len(lines))
165            for num in range(startline, endline):
166                if num + 1 == linenum:
167                    self._out.write('->')
168                else:
169                    self._out.write('  ')
170                self._out.write("%4d %s\n" % (num + 1, lines[num].rstrip()))
171
172            # Display the traceback
173            if show_traceback:
174                self._out.write("Traceback (most recent call last):\n")
175                for line in _traceback.format_list(orig_tb):
176                    self._out.write(line.rstrip() + '\n')
177                self._out.write('\n')
178
179            # Display some information about the xapian version and platform
180            self._out.write("Xapian version: %s\n" % _xapian.version_string())
181            try:
182                import platform
183                platdesc = "%s %s (%s)" % platform.system_alias(platform.system(),
184                                                                platform.release(),
185                                                                platform.version())
186                self._out.write("Platform: %s\n" % platdesc)
187            except:
188                pass
189            self._out.write('\nWhen reporting this problem, please quote all the preceding lines from\n"%s" onwards.\n\n' % firstline)
190
191        self._out.flush()
192
193    def runtest(self, name, test_fn):
194        """Run a single test.
195
196        """
197        startline = "Running test: %s..." % name
198        self._out.write(startline)
199        self._out.flush()
200        try:
201            test_fn()
202            if self._verbose > 0 or self._out.plain:
203                self._out.ensure_space()
204                self._out.write_colour("#green#ok##\n")
205            else:
206                self._out.clear_line()
207            self._out.flush()
208            return True
209        except TestFail as e:
210            self.report_failure(name, str(e), show_traceback=False)
211        except _xapian.Error as e:
212            self.report_failure(name, "%s: %s" % (str(e.__class__), str(e)))
213        except Exception as e:
214            self.report_failure(name, "%s: %s" % (str(e.__class__), str(e)))
215        return False
216
217    def runtests(self, namedict, runonly=None):
218        """Run a set of tests.
219
220        Takes a dictionary of name-value pairs and runs all the values which are
221        callables, for which the name begins with "test_".
222
223        Typical usage is to pass "locals()" as the parameter, to run all callables
224        with names starting "test_" in local scope.
225
226        If runonly is supplied, and non-empty, only those tests which appear in
227        runonly will be run.
228
229        """
230        tests = []
231        if isinstance(namedict, dict):
232            for name in namedict:
233                if name.startswith('test_'):
234                    fn = namedict[name]
235                    name = name[5:]
236                    if hasattr(fn, '__call__'):
237                        tests.append((name, fn))
238            tests.sort()
239        else:
240            tests = namedict
241
242        if runonly is not None and len(runonly) != 0:
243            oldtests = tests
244            tests = []
245            for name, fn in oldtests:
246                if name in runonly:
247                    tests.append((name, fn))
248
249        passed, failed = 0, 0
250        for name, fn in tests:
251            self.context(None)
252            if self.runtest(name, fn):
253                passed += 1
254            else:
255                failed += 1
256        if failed:
257            if self._verbose == 0:
258                self._out.write('Re-run with the VERBOSE environment variable set to "1" to see details.\n')
259            self._out.write_colour("#green#%d## tests passed, #red#%d## tests failed\n" % (passed, failed))
260            return False
261        else:
262            self._out.write_colour("#green#%d## tests passed, no failures\n" % passed)
263            return True
264
265class OutProxy(object):
266    """Proxy output class to make formatting easier.
267
268    Allows colourisation, and keeps track of whether we're mid-line or not.
269
270    """
271
272    def __init__(self, out):
273        self._out = out
274        self._line_pos = 0 # Position on current line
275        self._had_space = True # True iff we're preceded by whitespace (including newline)
276        self.plain = not self._allow_control_sequences()
277        self._colours = self.get_colour_strings()
278
279    def _allow_control_sequences(self):
280        "Return True if output device allows control sequences."
281        mode = _os.environ.get("XAPIAN_TESTSUITE_OUTPUT", '').lower()
282        if mode in ('', 'auto'):
283            if _sys.platform == 'win32':
284                return False
285            elif not hasattr(self._out, "isatty"):
286                return False
287            else:
288                return self._out.isatty()
289        elif mode == 'plain':
290            return False
291        return True
292
293    def get_colour_strings(self):
294        """Return a mapping of colour names to colour output sequences.
295
296        """
297        colours = {
298            'red': "\x1b[1m\x1b[31m",
299            'green': "\x1b[1m\x1b[32m",
300            'yellow': "\x1b[1m\x1b[33m",
301            '': "\x1b[0m",
302        }
303        if self.plain:
304            for key in colours:
305                colours[key] = ''
306        return colours
307
308    def _colourise(self, msg):
309        """Apply colours to a message.
310
311        #colourname# will change the text colour, ## will change the colour back.
312
313        """
314        for colour, val in self._colours.items():
315            msg = msg.replace('#%s#' % colour, val)
316        return msg
317
318    def clear_line(self):
319        """Clear the current line of output, if possible.
320
321        Otherwise, just move to the start of the next line.
322
323        """
324        if self._line_pos == 0:
325            return
326        if self.plain:
327            self.write('\n')
328        else:
329            self.write("\r" + " " * self._line_pos + "\r")
330
331    def start_line(self):
332        """Ensure that we're at the start of a line.
333
334        """
335        if self._line_pos != 0:
336            self.write('\n')
337
338    def ensure_space(self):
339        """Ensure that we're preceded by whitespace.
340
341        """
342        if not self._had_space:
343            self.write(' ')
344
345    def write(self, msg):
346        """Write the message to the output stream.
347
348        """
349        if len(msg) == 0:
350            return
351
352        # Adjust the line position counted
353        nlpos = max(msg.rfind('\n'), msg.rfind('\r'))
354        if nlpos >= 0:
355            subline = msg[nlpos + 1:]
356            self._line_pos = len(subline) # Note - doesn't cope with tabs.
357        else:
358            self._line_pos += len(msg) # Note - doesn't cope with tabs.
359
360        # Record whether we ended with whitespace
361        self._had_space = msg[-1].isspace()
362
363        self._out.write(msg)
364
365    def write_colour(self, msg):
366        """Write a message, first substituting markup for colours.
367
368        """
369        self.write(self._colourise(msg))
370
371    def flush(self):
372        self._out.flush()
373
374
375_runner = TestRunner()
376context = _runner.context
377expect = _runner.expect
378expect_query = _runner.expect_query
379expect_exception = _runner.expect_exception
380runtests = _runner.runtests
381
382__all__ = ('context', 'expect', 'expect_query', 'expect_exception', 'runtests')
383