1# Python module for parsing and generating the Subunit protocol
2# (Samba-specific)
3# Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 3 of the License, or
8# (at your option) any later version.
9
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18from __future__ import print_function
19__all__ = ['parse_results']
20
21import datetime
22import re
23import sys
24import os
25from samba import subunit
26from samba.subunit.run import TestProtocolClient
27from samba.subunit import iso8601
28import unittest
29from samba.compat import binary_type
30
31
32VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
33                     'knownfail', 'error', 'xfail', 'skip-testsuite',
34                     'testsuite-failure', 'testsuite-xfail',
35                     'testsuite-success', 'testsuite-error',
36                     'uxsuccess', 'testsuite-uxsuccess'])
37
38
39class TestsuiteEnabledTestResult(unittest.TestResult):
40
41    def start_testsuite(self, name):
42        raise NotImplementedError(self.start_testsuite)
43
44
45def parse_results(msg_ops, statistics, fh):
46    exitcode = 0
47    open_tests = {}
48
49    while fh:
50        l = fh.readline()
51        if l == "":
52            break
53        parts = l.split(None, 1)
54        if not len(parts) == 2 or not l.startswith(parts[0]):
55            msg_ops.output_msg(l)
56            continue
57        command = parts[0].rstrip(":")
58        arg = parts[1]
59        if command in ("test", "testing"):
60            msg_ops.control_msg(l)
61            name = arg.rstrip()
62            test = subunit.RemotedTestCase(name)
63            if name in open_tests:
64                msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
65            msg_ops.startTest(test)
66            open_tests[name] = test
67        elif command == "time":
68            msg_ops.control_msg(l)
69            try:
70                dt = iso8601.parse_date(arg.rstrip("\n"))
71            except TypeError as e:
72                print("Unable to parse time line: %s" % arg.rstrip("\n"))
73            else:
74                msg_ops.time(dt)
75        elif command in VALID_RESULTS:
76            msg_ops.control_msg(l)
77            result = command
78            grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
79            (testname, hasreason) = (grp.group(1), grp.group(2))
80            if hasreason:
81                reason = ""
82                # reason may be specified in next lines
83                terminated = False
84                while fh:
85                    l = fh.readline()
86                    if l == "":
87                        break
88                    msg_ops.control_msg(l)
89                    if l[-2:] == "]\n":
90                        reason += l[:-2]
91                        terminated = True
92                        break
93                    else:
94                        reason += l
95
96                if isinstance(reason, binary_type):
97                    remote_error = subunit.RemoteError(reason.decode("utf-8"))
98                else:
99                    remote_error = subunit.RemoteError(reason)
100
101                if not terminated:
102                    statistics['TESTS_ERROR'] += 1
103                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
104                    return 1
105            else:
106                reason = None
107                remote_error = subunit.RemoteError(u"No reason specified")
108            if result in ("success", "successful"):
109                try:
110                    test = open_tests.pop(testname)
111                except KeyError:
112                    statistics['TESTS_ERROR'] += 1
113                    exitcode = 1
114                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
115                else:
116                    statistics['TESTS_EXPECTED_OK'] += 1
117                    msg_ops.addSuccess(test)
118            elif result in ("xfail", "knownfail"):
119                try:
120                    test = open_tests.pop(testname)
121                except KeyError:
122                    statistics['TESTS_ERROR'] += 1
123                    exitcode = 1
124                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
125                else:
126                    statistics['TESTS_EXPECTED_FAIL'] += 1
127                    msg_ops.addExpectedFailure(test, remote_error)
128            elif result in ("uxsuccess", ):
129                try:
130                    test = open_tests.pop(testname)
131                except KeyError:
132                    statistics['TESTS_ERROR'] += 1
133                    exitcode = 1
134                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
135                else:
136                    statistics['TESTS_UNEXPECTED_OK'] += 1
137                    msg_ops.addUnexpectedSuccess(test)
138                    exitcode = 1
139            elif result in ("failure", "fail"):
140                try:
141                    test = open_tests.pop(testname)
142                except KeyError:
143                    statistics['TESTS_ERROR'] += 1
144                    exitcode = 1
145                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
146                else:
147                    statistics['TESTS_UNEXPECTED_FAIL'] += 1
148                    exitcode = 1
149                    msg_ops.addFailure(test, remote_error)
150            elif result == "skip":
151                statistics['TESTS_SKIP'] += 1
152                # Allow tests to be skipped without prior announcement of test
153                try:
154                    test = open_tests.pop(testname)
155                except KeyError:
156                    test = subunit.RemotedTestCase(testname)
157                msg_ops.addSkip(test, reason)
158            elif result == "error":
159                statistics['TESTS_ERROR'] += 1
160                exitcode = 1
161                try:
162                    test = open_tests.pop(testname)
163                except KeyError:
164                    test = subunit.RemotedTestCase(testname)
165                msg_ops.addError(test, remote_error)
166            elif result == "skip-testsuite":
167                msg_ops.skip_testsuite(testname)
168            elif result == "testsuite-success":
169                msg_ops.end_testsuite(testname, "success", reason)
170            elif result == "testsuite-failure":
171                msg_ops.end_testsuite(testname, "failure", reason)
172                exitcode = 1
173            elif result == "testsuite-xfail":
174                msg_ops.end_testsuite(testname, "xfail", reason)
175            elif result == "testsuite-uxsuccess":
176                msg_ops.end_testsuite(testname, "uxsuccess", reason)
177                exitcode = 1
178            elif result == "testsuite-error":
179                msg_ops.end_testsuite(testname, "error", reason)
180                exitcode = 1
181            else:
182                raise AssertionError("Recognized but unhandled result %r" %
183                                     result)
184        elif command == "testsuite":
185            msg_ops.start_testsuite(arg.strip())
186        elif command == "progress":
187            arg = arg.strip()
188            if arg == "pop":
189                msg_ops.progress(None, subunit.PROGRESS_POP)
190            elif arg == "push":
191                msg_ops.progress(None, subunit.PROGRESS_PUSH)
192            elif arg[0] in '+-':
193                msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
194            else:
195                msg_ops.progress(int(arg), subunit.PROGRESS_SET)
196        else:
197            msg_ops.output_msg(l)
198
199    while open_tests:
200        test = subunit.RemotedTestCase(open_tests.popitem()[1])
201        msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
202        statistics['TESTS_ERROR'] += 1
203        exitcode = 1
204
205    return exitcode
206
207
208class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
209
210    def progress(self, count, whence):
211        if whence == subunit.PROGRESS_POP:
212            self._stream.write("progress: pop\n")
213        elif whence == subunit.PROGRESS_PUSH:
214            self._stream.write("progress: push\n")
215        elif whence == subunit.PROGRESS_SET:
216            self._stream.write("progress: %d\n" % count)
217        elif whence == subunit.PROGRESS_CUR:
218            raise NotImplementedError
219
220    # The following are Samba extensions:
221    def start_testsuite(self, name):
222        self._stream.write("testsuite: %s\n" % name)
223
224    def skip_testsuite(self, name, reason=None):
225        if reason:
226            self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
227        else:
228            self._stream.write("skip-testsuite: %s\n" % name)
229
230    def end_testsuite(self, name, result, reason=None):
231        if reason:
232            self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
233        else:
234            self._stream.write("testsuite-%s: %s\n" % (result, name))
235
236    def output_msg(self, msg):
237        self._stream.write(msg)
238
239
240def read_test_regexes(*names):
241    ret = {}
242    files = []
243    for name in names:
244        # if we are given a directory, we read all the files it contains
245        # (except the ones that end with "~").
246        if os.path.isdir(name):
247            files.extend([os.path.join(name, x)
248                          for x in os.listdir(name)
249                          if x[-1] != '~'])
250        else:
251            files.append(name)
252
253    for filename in files:
254        f = open(filename, 'r')
255        try:
256            for l in f:
257                l = l.strip()
258                if l == "" or l[0] == "#":
259                    continue
260                if "#" in l:
261                    (regex, reason) = l.split("#", 1)
262                    ret[regex.strip()] = reason.strip()
263                else:
264                    ret[l] = None
265        finally:
266            f.close()
267    return ret
268
269
270def find_in_list(regexes, fullname):
271    for regex, reason in regexes.items():
272        if re.match(regex, fullname):
273            if reason is None:
274                return ""
275            return reason
276    return None
277
278
279class ImmediateFail(Exception):
280    """Raised to abort immediately."""
281
282    def __init__(self):
283        super(ImmediateFail, self).__init__("test failed and fail_immediately set")
284
285
286class FilterOps(unittest.TestResult):
287
288    def control_msg(self, msg):
289        pass  # We regenerate control messages, so ignore this
290
291    def time(self, time):
292        self._ops.time(time)
293
294    def progress(self, delta, whence):
295        self._ops.progress(delta, whence)
296
297    def output_msg(self, msg):
298        if self.output is None:
299            sys.stdout.write(msg)
300        else:
301            self.output += msg
302
303    def startTest(self, test):
304        self.seen_output = True
305        test = self._add_prefix(test)
306        if self.strip_ok_output:
307            self.output = ""
308
309        self._ops.startTest(test)
310
311    def _add_prefix(self, test):
312        return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
313
314    def addError(self, test, err=None):
315        test = self._add_prefix(test)
316        self.error_added += 1
317        self.total_error += 1
318        self._ops.addError(test, err)
319        self.output = None
320        if self.fail_immediately:
321            raise ImmediateFail()
322
323    def addSkip(self, test, reason=None):
324        self.seen_output = True
325        test = self._add_prefix(test)
326        self._ops.addSkip(test, reason)
327        self.output = None
328
329    def addExpectedFailure(self, test, err=None):
330        test = self._add_prefix(test)
331        self._ops.addExpectedFailure(test, err)
332        self.output = None
333
334    def addUnexpectedSuccess(self, test):
335        test = self._add_prefix(test)
336        self.uxsuccess_added += 1
337        self.total_uxsuccess += 1
338        self._ops.addUnexpectedSuccess(test)
339        if self.output:
340            self._ops.output_msg(self.output)
341        self.output = None
342        if self.fail_immediately:
343            raise ImmediateFail()
344
345    def addFailure(self, test, err=None):
346        test = self._add_prefix(test)
347        xfail_reason = find_in_list(self.expected_failures, test.id())
348        if xfail_reason is None:
349            xfail_reason = find_in_list(self.flapping, test.id())
350        if xfail_reason is not None:
351            self.xfail_added += 1
352            self.total_xfail += 1
353            self._ops.addExpectedFailure(test, err)
354        else:
355            self.fail_added += 1
356            self.total_fail += 1
357            self._ops.addFailure(test, err)
358            if self.output:
359                self._ops.output_msg(self.output)
360            if self.fail_immediately:
361                raise ImmediateFail()
362        self.output = None
363
364    def addSuccess(self, test):
365        test = self._add_prefix(test)
366        xfail_reason = find_in_list(self.expected_failures, test.id())
367        if xfail_reason is not None:
368            self.uxsuccess_added += 1
369            self.total_uxsuccess += 1
370            self._ops.addUnexpectedSuccess(test)
371            if self.output:
372                self._ops.output_msg(self.output)
373            if self.fail_immediately:
374                raise ImmediateFail()
375        else:
376            self._ops.addSuccess(test)
377        self.output = None
378
379    def skip_testsuite(self, name, reason=None):
380        self._ops.skip_testsuite(name, reason)
381
382    def start_testsuite(self, name):
383        self._ops.start_testsuite(name)
384        self.error_added = 0
385        self.fail_added = 0
386        self.xfail_added = 0
387        self.uxsuccess_added = 0
388
389    def end_testsuite(self, name, result, reason=None):
390        xfail = False
391
392        if self.xfail_added > 0:
393            xfail = True
394        if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
395            xfail = False
396
397        if xfail and result in ("fail", "failure"):
398            result = "xfail"
399
400        if self.uxsuccess_added > 0 and result != "uxsuccess":
401            result = "uxsuccess"
402            if reason is None:
403                reason = "Subunit/Filter Reason"
404            reason += "\n uxsuccess[%d]" % self.uxsuccess_added
405
406        if self.fail_added > 0 and result != "failure":
407            result = "failure"
408            if reason is None:
409                reason = "Subunit/Filter Reason"
410            reason += "\n failures[%d]" % self.fail_added
411
412        if self.error_added > 0 and result != "error":
413            result = "error"
414            if reason is None:
415                reason = "Subunit/Filter Reason"
416            reason += "\n errors[%d]" % self.error_added
417
418        self._ops.end_testsuite(name, result, reason)
419        if result not in ("success", "xfail"):
420            if self.output:
421                self._ops.output_msg(self.output)
422            if self.fail_immediately:
423                raise ImmediateFail()
424        self.output = None
425
426    def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
427                 strip_ok_output=False, fail_immediately=False,
428                 flapping=None):
429        self._ops = out
430        self.seen_output = False
431        self.output = None
432        self.prefix = prefix
433        self.suffix = suffix
434        if expected_failures is not None:
435            self.expected_failures = expected_failures
436        else:
437            self.expected_failures = {}
438        if flapping is not None:
439            self.flapping = flapping
440        else:
441            self.flapping = {}
442        self.strip_ok_output = strip_ok_output
443        self.xfail_added = 0
444        self.fail_added = 0
445        self.uxsuccess_added = 0
446        self.total_xfail = 0
447        self.total_error = 0
448        self.total_fail = 0
449        self.total_uxsuccess = 0
450        self.error_added = 0
451        self.fail_immediately = fail_immediately
452
453
454class PerfFilterOps(unittest.TestResult):
455
456    def progress(self, delta, whence):
457        pass
458
459    def output_msg(self, msg):
460        pass
461
462    def control_msg(self, msg):
463        pass
464
465    def skip_testsuite(self, name, reason=None):
466        self._ops.skip_testsuite(name, reason)
467
468    def start_testsuite(self, name):
469        self.suite_has_time = False
470
471    def end_testsuite(self, name, result, reason=None):
472        pass
473
474    def _add_prefix(self, test):
475        return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
476
477    def time(self, time):
478        self.latest_time = time
479        #self._ops.output_msg("found time %s\n" % time)
480        self.suite_has_time = True
481
482    def get_time(self):
483        if self.suite_has_time:
484            return self.latest_time
485        return datetime.datetime.utcnow()
486
487    def startTest(self, test):
488        self.seen_output = True
489        test = self._add_prefix(test)
490        self.starts[test.id()] = self.get_time()
491
492    def addSuccess(self, test):
493        test = self._add_prefix(test)
494        tid = test.id()
495        if tid not in self.starts:
496            self._ops.addError(test, "%s succeeded without ever starting!" % tid)
497        delta = self.get_time() - self.starts[tid]
498        self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
499
500    def addFailure(self, test, err=''):
501        tid = test.id()
502        delta = self.get_time() - self.starts[tid]
503        self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
504                             (tid, delta.total_seconds(), err))
505
506    def addError(self, test, err=''):
507        tid = test.id()
508        delta = self.get_time() - self.starts[tid]
509        self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
510                             (tid, delta.total_seconds(), err))
511
512    def __init__(self, out, prefix='', suffix=''):
513        self._ops = out
514        self.prefix = prefix or ''
515        self.suffix = suffix or ''
516        self.starts = {}
517        self.seen_output = False
518        self.suite_has_time = False
519
520
521class PlainFormatter(TestsuiteEnabledTestResult):
522
523    def __init__(self, verbose, immediate, statistics,
524                 totaltests=None):
525        super(PlainFormatter, self).__init__()
526        self.verbose = verbose
527        self.immediate = immediate
528        self.statistics = statistics
529        self.start_time = None
530        self.test_output = {}
531        self.suitesfailed = []
532        self.suites_ok = 0
533        self.skips = {}
534        self.index = 0
535        self.name = None
536        self._progress_level = 0
537        self.totalsuites = totaltests
538        self.last_time = None
539
540    @staticmethod
541    def _format_time(delta):
542        minutes, seconds = divmod(delta.seconds, 60)
543        hours, minutes = divmod(minutes, 60)
544        ret = ""
545        if hours:
546            ret += "%dh" % hours
547        if minutes:
548            ret += "%dm" % minutes
549        ret += "%ds" % seconds
550        return ret
551
552    def progress(self, offset, whence):
553        if whence == subunit.PROGRESS_POP:
554            self._progress_level -= 1
555        elif whence == subunit.PROGRESS_PUSH:
556            self._progress_level += 1
557        elif whence == subunit.PROGRESS_SET:
558            if self._progress_level == 0:
559                self.totalsuites = offset
560        elif whence == subunit.PROGRESS_CUR:
561            raise NotImplementedError
562
563    def time(self, dt):
564        if self.start_time is None:
565            self.start_time = dt
566        self.last_time = dt
567
568    def start_testsuite(self, name):
569        self.index += 1
570        self.name = name
571
572        if not self.verbose:
573            self.test_output[name] = ""
574
575        total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
576                       self.statistics['TESTS_EXPECTED_FAIL'] +
577                       self.statistics['TESTS_ERROR'] +
578                       self.statistics['TESTS_UNEXPECTED_FAIL'] +
579                       self.statistics['TESTS_UNEXPECTED_OK'])
580
581        out = "[%d(%d)" % (self.index, total_tests)
582        if self.totalsuites is not None:
583            out += "/%d" % self.totalsuites
584        if self.start_time is not None:
585            out += " at " + self._format_time(self.last_time - self.start_time)
586        if self.suitesfailed:
587            out += ", %d errors" % (len(self.suitesfailed),)
588        out += "] %s" % name
589        if self.immediate:
590            sys.stdout.write(out + "\n")
591        else:
592            sys.stdout.write(out + ": ")
593
594    def output_msg(self, output):
595        if self.verbose:
596            sys.stdout.write(output)
597        elif self.name is not None:
598            self.test_output[self.name] += output
599        else:
600            sys.stdout.write(output)
601
602    def control_msg(self, output):
603        pass
604
605    def end_testsuite(self, name, result, reason):
606        out = ""
607        unexpected = False
608
609        if name not in self.test_output:
610            print("no output for name[%s]" % name)
611
612        if result in ("success", "xfail"):
613            self.suites_ok += 1
614        else:
615            self.output_msg("ERROR: Testsuite[%s]\n" % name)
616            if reason is not None:
617                self.output_msg("REASON: %s\n" % (reason,))
618            self.suitesfailed.append(name)
619            if self.immediate and not self.verbose and name in self.test_output:
620                out += self.test_output[name]
621            unexpected = True
622
623        if not self.immediate:
624            if not unexpected:
625                out += " ok\n"
626            else:
627                out += " " + result.upper() + "\n"
628
629        sys.stdout.write(out)
630
631    def startTest(self, test):
632        pass
633
634    def addSuccess(self, test):
635        self.end_test(test.id(), "success", False)
636
637    def addError(self, test, err=None):
638        self.end_test(test.id(), "error", True, err)
639
640    def addFailure(self, test, err=None):
641        self.end_test(test.id(), "failure", True, err)
642
643    def addSkip(self, test, reason=None):
644        self.end_test(test.id(), "skip", False, reason)
645
646    def addExpectedFailure(self, test, err=None):
647        self.end_test(test.id(), "xfail", False, err)
648
649    def addUnexpectedSuccess(self, test):
650        self.end_test(test.id(), "uxsuccess", True)
651
652    def end_test(self, testname, result, unexpected, err=None):
653        if not unexpected:
654            self.test_output[self.name] = ""
655            if not self.immediate:
656                sys.stdout.write({
657                    'failure': 'f',
658                    'xfail': 'X',
659                    'skip': 's',
660                    'success': '.'}.get(result, "?(%s)" % result))
661            return
662
663        if self.name not in self.test_output:
664            self.test_output[self.name] = ""
665
666        self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
667        if err is not None:
668            self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
669
670        if self.immediate and not self.verbose:
671            sys.stdout.write(self.test_output[self.name])
672            self.test_output[self.name] = ""
673
674        if not self.immediate:
675            sys.stdout.write({
676                'error': 'E',
677               'failure': 'F',
678               'uxsuccess': 'U',
679               'success': 'S'}.get(result, "?"))
680
681    def write_summary(self, path):
682        f = open(path, 'w+')
683
684        if self.suitesfailed:
685            f.write("= Failed tests =\n")
686
687            for suite in self.suitesfailed:
688                f.write("== %s ==\n" % suite)
689                if suite in self.test_output:
690                    f.write(self.test_output[suite] + "\n\n")
691
692            f.write("\n")
693
694        if not self.immediate and not self.verbose:
695            for suite in self.suitesfailed:
696                print("=" * 78)
697                print("FAIL: %s" % suite)
698                if suite in self.test_output:
699                    print(self.test_output[suite])
700                print("")
701
702        f.write("= Skipped tests =\n")
703        for reason in self.skips.keys():
704            f.write(reason + "\n")
705            for name in self.skips[reason]:
706                f.write("\t%s\n" % name)
707            f.write("\n")
708        f.close()
709
710        if (not self.suitesfailed and
711            not self.statistics['TESTS_UNEXPECTED_FAIL'] and
712            not self.statistics['TESTS_UNEXPECTED_OK'] and
713            not self.statistics['TESTS_ERROR']):
714            ok = (self.statistics['TESTS_EXPECTED_OK'] +
715                  self.statistics['TESTS_EXPECTED_FAIL'])
716            print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
717        else:
718            print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
719                self.statistics['TESTS_UNEXPECTED_FAIL'],
720                self.statistics['TESTS_ERROR'],
721                self.statistics['TESTS_UNEXPECTED_OK'],
722                len(self.suitesfailed)))
723
724    def skip_testsuite(self, name, reason="UNKNOWN"):
725        self.skips.setdefault(reason, []).append(name)
726        if self.totalsuites:
727            self.totalsuites -= 1
728