1# Utility functions for running tests and reporting the results.
2#
3# Copyright (C) 2007 Lemur Consulting Ltd
4# Copyright (C) 2008,2011 Olly Betts
5#
6# This program is free software; you can redistribute it and/or
7# modify it under the terms of the GNU General Public License as
8# published by the Free Software Foundation; either version 2 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
19# USA
20
21import gc
22import os as _os
23import os.path as _path
24import sys as _sys
25import traceback as _traceback
26import xapian as _xapian
27
28class TestFail(Exception):
29    pass
30
31class TestRunner(object):
32    def __init__(self):
33        """Initialise the TestRunner.
34
35        """
36
37        self._out = OutProxy(_sys.stdout)
38
39        # _verbose is an integer, higher meaning more verbose
40        self._verbose = _os.environ.get('VERBOSE', '').lower()
41        if self._verbose in ('', '0', 'no', 'off', 'false'):
42            self._verbose = 0
43        else:
44            try:
45                self._verbose = int(self._verbose)
46            except:
47                self._verbose = 1
48
49        # context is a description of what the test is currently checking
50        self._context = None
51
52    def context(self, context):
53        """Set the context.
54
55        This should be a string describing what a test is checking, and will be
56        displayed if the test fails.
57
58        A test may change the context several times - each call will override
59        subsequent calls.
60
61        Set the context to None to remove display of a specific context message.
62        This is performed automatically at the start of each test.
63
64        """
65        self._context = context
66        if context is not None and self._verbose > 1:
67            self._out.start_line()
68            self._out.write("Context: %s\n" % context)
69            self._out.flush()
70
71    def expect(self, got, expected, message="Expected equality"):
72        """Function used to check for a particular expected value.
73
74        """
75        if self._verbose > 2:
76            self._out.start_line()
77            self._out.write("Checking for %r: expecting %r ... " % (message, expected))
78            self._out.flush()
79        if got != expected:
80            if self._verbose > 2:
81                self._out.write_colour(" #red#failed##")
82                self._out.write(": got %r\n" % got)
83                self._out.flush()
84            raise TestFail("%s: got %r, expected %r" % (message, got, expected))
85        if self._verbose > 2:
86            self._out.write_colour(" #green#ok##\n")
87            self._out.flush()
88
89    def expect_query(self, query, expected):
90        """Check that the description of a query is as expected.
91
92        """
93        expected = 'Query(' + expected + ')'
94        desc = str(query)
95        if self._verbose > 2:
96            self._out.start_line()
97            self._out.write("Checking str(query): expecting %r ... " % expected)
98            self._out.flush()
99        if desc != expected:
100            if self._verbose > 2:
101                self._out.write_colour(" #red#failed##")
102                self._out.write(": got %r\n" % desc)
103                self._out.flush()
104            raise TestFail("Unexpected str(query): got %r, expected %r" % (desc, expected))
105        if self._verbose > 2:
106            self._out.write_colour(" #green#ok##\n")
107            self._out.flush()
108
109    def expect_exception(self, expectedclass, expectedmsg, code, *args):
110        """Check that an exception is raised.
111
112         - expectedclass is the class of the exception to check for.
113         - expectedmsg is the message to check for (which can be a string or
114           a callable), or None to skip checking the message.
115         - code is the thing to call.
116         - args are the arguments to pass to it.
117
118        """
119        if self._verbose > 2:
120            self._out.start_line()
121            self._out.write("Checking for exception: %s(%r) ... " % (str(expectedclass), expectedmsg))
122            self._out.flush()
123        try:
124            code(*args)
125            if self._verbose > 2:
126                self._out.write_colour(" #red#failed##: no exception occurred\n")
127                self._out.flush()
128            raise TestFail("Expected %s(%r) exception" % (str(expectedclass), expectedmsg))
129        except expectedclass as e:
130            if expectedmsg is None:
131                pass
132            elif isinstance(expectedmsg, str):
133                if str(e) != expectedmsg:
134                    if self._verbose > 2:
135                        self._out.write_colour(" #red#failed##")
136                        self._out.write(": exception string not as expected: got '%s'\n" % str(e))
137                        self._out.flush()
138                    raise TestFail("Exception string not as expected: got '%s', expected '%s'" % (str(e), expectedmsg))
139            elif callable(expectedmsg):
140                if not expectedmsg(str(e)):
141                    if self._verbose > 2:
142                        self._out.write_colour(" #red#failed##")
143                        self._out.write(": exception string not as expected: got '%s'\n" % str(e))
144                        self._out.flush()
145                    raise TestFail("Exception string not as expected: got '%s', expected pattern '%s'" % (str(e), expectedmsg.pattern))
146            else:
147                raise TestFail("Unexpected expectedmsg: %r" % (expectedmsg,))
148            if e.__class__ != expectedclass:
149                if self._verbose > 2:
150                    self._out.write_colour(" #red#failed##")
151                    self._out.write(": didn't get right exception class: got '%s'\n" % str(e.__class__))
152                    self._out.flush()
153                raise TestFail("Didn't get right exception class: got '%s', expected '%s'" % (str(e.__class__), str(expectedclass)))
154        if self._verbose > 2:
155            self._out.write_colour(" #green#ok##\n")
156            self._out.flush()
157
158    def report_failure(self, name, msg, show_traceback=True):
159        "Report a test failure, with some useful context."
160
161        tb = _traceback.extract_tb(_sys.exc_info()[2])
162
163        # Move up the traceback until we get to the line in the test
164        # function which caused the failure.
165        for line in range(1, len(tb) + 1):
166            if tb[-line][2] == 'test_' + name:
167                break
168
169        # Display the context in the text function.
170        filepath, linenum, functionname, text = tb[-line]
171        filename = _os.path.basename(filepath)
172
173        self._out.ensure_space()
174        self._out.write_colour("#red#FAILED##\n")
175        if self._verbose > 0:
176            if self._context is None:
177                context = ''
178            else:
179                context = ", when %s" % self._context
180            firstline = "%s:%d" % (filename, linenum)
181            self._out.write("\n%s:%s%s\n" % (firstline, msg, context))
182
183            # Display sourcecode lines
184            lines = open(filepath).readlines()
185            startline = max(linenum - 3, 0)
186            endline = min(linenum + 2, len(lines))
187            for num in range(startline, endline):
188                if num + 1 == linenum:
189                    self._out.write('->')
190                else:
191                    self._out.write('  ')
192                self._out.write("%4d %s\n" % (num + 1, lines[num].rstrip()))
193
194            # Display the traceback
195            if show_traceback:
196                self._out.write("Traceback (most recent call last):\n")
197                for line in _traceback.format_list(tb):
198                    self._out.write(line.rstrip() + '\n')
199                self._out.write('\n')
200
201            # Display some information about the xapian version and platform
202            self._out.write("Xapian version: %s\n" % _xapian.version_string())
203            try:
204                import platform
205                platdesc = "%s %s (%s)" % platform.system_alias(platform.system(),
206                                                                platform.release(),
207                                                                platform.version())
208                self._out.write("Platform: %s\n" % platdesc)
209            except:
210                pass
211            self._out.write('\nWhen reporting this problem, please quote all the preceding lines from\n"%s" onwards.\n\n' % firstline)
212
213        self._out.flush()
214
215    def gc_object_count(self):
216        # Python 2.7 doesn't seem to free all objects even for a full
217        # collection, so collect repeatedly until no further objects get freed.
218        old_count, count = len(gc.get_objects()), 0
219        while True:
220            gc.collect()
221            count = len(gc.get_objects())
222            if count == old_count:
223                return count
224            old_count = count
225
226    def runtest(self, name, test_fn):
227        """Run a single test.
228
229        """
230        startline = "Running test: %s..." % name
231        self._out.write(startline)
232        self._out.flush()
233        try:
234            object_count = self.gc_object_count()
235            test_fn()
236            object_count = self.gc_object_count() - object_count
237            if object_count != 0:
238                # Maybe some lazily initialised object got initialised for the
239                # first time, so rerun the test.
240                self._out.ensure_space()
241                msg = "#yellow#possible leak (%d), rerunning## " % object_count
242                self._out.write_colour(msg)
243                object_count = self.gc_object_count()
244                test_fn()
245                expect(self.gc_object_count(), object_count)
246                self._out.write_colour("#green#ok##\n")
247
248            if self._verbose > 0 or self._out.plain:
249                self._out.ensure_space()
250                self._out.write_colour("#green#ok##\n")
251            else:
252                self._out.clear_line()
253            self._out.flush()
254            return True
255        except TestFail as e:
256            self.report_failure(name, str(e), show_traceback=False)
257        except _xapian.Error as e:
258            self.report_failure(name, "%s: %s" % (str(e.__class__), str(e)))
259        except Exception as e:
260            self.report_failure(name, "%s: %s" % (str(e.__class__), str(e)))
261        return False
262
263    def runtests(self, namedict, runonly=None):
264        """Run a set of tests.
265
266        Takes a dictionary of name-value pairs and runs all the values which are
267        callables, for which the name begins with "test_".
268
269        Typical usage is to pass "locals()" as the parameter, to run all callables
270        with names starting "test_" in local scope.
271
272        If runonly is supplied, and non-empty, only those tests which appear in
273        runonly will be run.
274
275        """
276        tests = []
277        if isinstance(namedict, dict):
278            for name in namedict:
279                if name.startswith('test_'):
280                    fn = namedict[name]
281                    name = name[5:]
282                    if hasattr(fn, '__call__'):
283                        tests.append((name, fn))
284            tests.sort()
285        else:
286            tests = namedict
287
288        if runonly is not None and len(runonly) != 0:
289            oldtests = tests
290            tests = []
291            for name, fn in oldtests:
292                if name in runonly:
293                    tests.append((name, fn))
294
295        passed, failed = 0, 0
296        for name, fn in tests:
297            self.context(None)
298            if self.runtest(name, fn):
299                passed += 1
300            else:
301                failed += 1
302        if failed:
303            if self._verbose == 0:
304                self._out.write('Re-run with the environment variable VERBOSE=1 to see details.\n')
305                self._out.write('E.g. make check VERBOSE=1\n')
306            self._out.write_colour("#green#%d## tests passed, #red#%d## tests failed\n" % (passed, failed))
307            return False
308        else:
309            self._out.write_colour("#green#%d## tests passed, no failures\n" % passed)
310            return True
311
312class OutProxy(object):
313    """Proxy output class to make formatting easier.
314
315    Allows colourisation, and keeps track of whether we're mid-line or not.
316
317    """
318
319    def __init__(self, out):
320        self._out = out
321        self._line_pos = 0 # Position on current line
322        self._had_space = True # True iff we're preceded by whitespace (including newline)
323        self.plain = not self._allow_control_sequences()
324        self._colours = self.get_colour_strings()
325
326    def _allow_control_sequences(self):
327        "Return True if output device allows control sequences."
328        mode = _os.environ.get("XAPIAN_TESTSUITE_OUTPUT", '').lower()
329        if mode in ('', 'auto'):
330            if _sys.platform == 'win32':
331                return False
332            elif not hasattr(self._out, "isatty"):
333                return False
334            else:
335                return self._out.isatty()
336        elif mode == 'plain':
337            return False
338        return True
339
340    def get_colour_strings(self):
341        """Return a mapping of colour names to colour output sequences.
342
343        """
344        colours = {
345            'red': "\x1b[1m\x1b[31m",
346            'green': "\x1b[1m\x1b[32m",
347            'yellow': "\x1b[1m\x1b[33m",
348            '': "\x1b[0m",
349        }
350        if self.plain:
351            for key in colours:
352                colours[key] = ''
353        return colours
354
355    def _colourise(self, msg):
356        """Apply colours to a message.
357
358        #colourname# will change the text colour, ## will change the colour back.
359
360        """
361        for colour, val in self._colours.items():
362            msg = msg.replace('#%s#' % colour, val)
363        return msg
364
365    def clear_line(self):
366        """Clear the current line of output, if possible.
367
368        Otherwise, just move to the start of the next line.
369
370        """
371        if self._line_pos == 0:
372            return
373        if self.plain:
374            self.write('\n')
375        else:
376            self.write("\r" + " " * self._line_pos + "\r")
377
378    def start_line(self):
379        """Ensure that we're at the start of a line.
380
381        """
382        if self._line_pos != 0:
383            self.write('\n')
384
385    def ensure_space(self):
386        """Ensure that we're preceded by whitespace.
387
388        """
389        if not self._had_space:
390            self.write(' ')
391
392    def write(self, msg):
393        """Write the message to the output stream.
394
395        """
396        if len(msg) == 0:
397            return
398
399        # Adjust the line position counted
400        nlpos = max(msg.rfind('\n'), msg.rfind('\r'))
401        if nlpos >= 0:
402            subline = msg[nlpos + 1:]
403            self._line_pos = len(subline) # Note - doesn't cope with tabs.
404        else:
405            self._line_pos += len(msg) # Note - doesn't cope with tabs.
406
407        # Record whether we ended with whitespace
408        self._had_space = msg[-1].isspace()
409
410        self._out.write(msg)
411
412    def write_colour(self, msg):
413        """Write a message, first substituting markup for colours.
414
415        """
416        self.write(self._colourise(msg))
417
418    def flush(self):
419        self._out.flush()
420
421
422_runner = TestRunner()
423context = _runner.context
424expect = _runner.expect
425expect_query = _runner.expect_query
426expect_exception = _runner.expect_exception
427runtests = _runner.runtests
428
429__all__ = ('TestFail', 'context', 'expect', 'expect_query', 'expect_exception', 'runtests')
430