1# Python module for parsing and generating the Subunit protocol 2# (Samba-specific) 3# Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org> 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 3 of the License, or 8# (at your option) any later version. 9 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18from __future__ import print_function 19__all__ = ['parse_results'] 20 21import datetime 22import re 23import sys 24import os 25from samba import subunit 26from samba.subunit.run import TestProtocolClient 27from samba.subunit import iso8601 28import unittest 29from samba.compat import binary_type 30 31 32VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip', 33 'knownfail', 'error', 'xfail', 'skip-testsuite', 34 'testsuite-failure', 'testsuite-xfail', 35 'testsuite-success', 'testsuite-error', 36 'uxsuccess', 'testsuite-uxsuccess']) 37 38 39class TestsuiteEnabledTestResult(unittest.TestResult): 40 41 def start_testsuite(self, name): 42 raise NotImplementedError(self.start_testsuite) 43 44 45def parse_results(msg_ops, statistics, fh): 46 exitcode = 0 47 open_tests = {} 48 49 while fh: 50 l = fh.readline() 51 if l == "": 52 break 53 parts = l.split(None, 1) 54 if not len(parts) == 2 or not l.startswith(parts[0]): 55 msg_ops.output_msg(l) 56 continue 57 command = parts[0].rstrip(":") 58 arg = parts[1] 59 if command in ("test", "testing"): 60 msg_ops.control_msg(l) 61 name = arg.rstrip() 62 test = subunit.RemotedTestCase(name) 63 if name in open_tests: 64 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running")) 65 msg_ops.startTest(test) 66 open_tests[name] = test 67 elif command == "time": 68 msg_ops.control_msg(l) 69 try: 70 dt = iso8601.parse_date(arg.rstrip("\n")) 71 except TypeError as e: 72 print("Unable to parse time line: %s" % arg.rstrip("\n")) 73 else: 74 msg_ops.time(dt) 75 elif command in VALID_RESULTS: 76 msg_ops.control_msg(l) 77 result = command 78 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg) 79 (testname, hasreason) = (grp.group(1), grp.group(2)) 80 if hasreason: 81 reason = "" 82 # reason may be specified in next lines 83 terminated = False 84 while fh: 85 l = fh.readline() 86 if l == "": 87 break 88 msg_ops.control_msg(l) 89 if l[-2:] == "]\n": 90 reason += l[:-2] 91 terminated = True 92 break 93 else: 94 reason += l 95 96 if isinstance(reason, binary_type): 97 remote_error = subunit.RemoteError(reason.decode("utf-8")) 98 else: 99 remote_error = subunit.RemoteError(reason) 100 101 if not terminated: 102 statistics['TESTS_ERROR'] += 1 103 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result)) 104 return 1 105 else: 106 reason = None 107 remote_error = subunit.RemoteError(u"No reason specified") 108 if result in ("success", "successful"): 109 try: 110 test = open_tests.pop(testname) 111 except KeyError: 112 statistics['TESTS_ERROR'] += 1 113 exitcode = 1 114 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started")) 115 else: 116 statistics['TESTS_EXPECTED_OK'] += 1 117 msg_ops.addSuccess(test) 118 elif result in ("xfail", "knownfail"): 119 try: 120 test = open_tests.pop(testname) 121 except KeyError: 122 statistics['TESTS_ERROR'] += 1 123 exitcode = 1 124 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started")) 125 else: 126 statistics['TESTS_EXPECTED_FAIL'] += 1 127 msg_ops.addExpectedFailure(test, remote_error) 128 elif result in ("uxsuccess", ): 129 try: 130 test = open_tests.pop(testname) 131 except KeyError: 132 statistics['TESTS_ERROR'] += 1 133 exitcode = 1 134 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started")) 135 else: 136 statistics['TESTS_UNEXPECTED_OK'] += 1 137 msg_ops.addUnexpectedSuccess(test) 138 exitcode = 1 139 elif result in ("failure", "fail"): 140 try: 141 test = open_tests.pop(testname) 142 except KeyError: 143 statistics['TESTS_ERROR'] += 1 144 exitcode = 1 145 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started")) 146 else: 147 statistics['TESTS_UNEXPECTED_FAIL'] += 1 148 exitcode = 1 149 msg_ops.addFailure(test, remote_error) 150 elif result == "skip": 151 statistics['TESTS_SKIP'] += 1 152 # Allow tests to be skipped without prior announcement of test 153 try: 154 test = open_tests.pop(testname) 155 except KeyError: 156 test = subunit.RemotedTestCase(testname) 157 msg_ops.addSkip(test, reason) 158 elif result == "error": 159 statistics['TESTS_ERROR'] += 1 160 exitcode = 1 161 try: 162 test = open_tests.pop(testname) 163 except KeyError: 164 test = subunit.RemotedTestCase(testname) 165 msg_ops.addError(test, remote_error) 166 elif result == "skip-testsuite": 167 msg_ops.skip_testsuite(testname) 168 elif result == "testsuite-success": 169 msg_ops.end_testsuite(testname, "success", reason) 170 elif result == "testsuite-failure": 171 msg_ops.end_testsuite(testname, "failure", reason) 172 exitcode = 1 173 elif result == "testsuite-xfail": 174 msg_ops.end_testsuite(testname, "xfail", reason) 175 elif result == "testsuite-uxsuccess": 176 msg_ops.end_testsuite(testname, "uxsuccess", reason) 177 exitcode = 1 178 elif result == "testsuite-error": 179 msg_ops.end_testsuite(testname, "error", reason) 180 exitcode = 1 181 else: 182 raise AssertionError("Recognized but unhandled result %r" % 183 result) 184 elif command == "testsuite": 185 msg_ops.start_testsuite(arg.strip()) 186 elif command == "progress": 187 arg = arg.strip() 188 if arg == "pop": 189 msg_ops.progress(None, subunit.PROGRESS_POP) 190 elif arg == "push": 191 msg_ops.progress(None, subunit.PROGRESS_PUSH) 192 elif arg[0] in '+-': 193 msg_ops.progress(int(arg), subunit.PROGRESS_CUR) 194 else: 195 msg_ops.progress(int(arg), subunit.PROGRESS_SET) 196 else: 197 msg_ops.output_msg(l) 198 199 while open_tests: 200 test = subunit.RemotedTestCase(open_tests.popitem()[1]) 201 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!")) 202 statistics['TESTS_ERROR'] += 1 203 exitcode = 1 204 205 return exitcode 206 207 208class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult): 209 210 def progress(self, count, whence): 211 if whence == subunit.PROGRESS_POP: 212 self._stream.write("progress: pop\n") 213 elif whence == subunit.PROGRESS_PUSH: 214 self._stream.write("progress: push\n") 215 elif whence == subunit.PROGRESS_SET: 216 self._stream.write("progress: %d\n" % count) 217 elif whence == subunit.PROGRESS_CUR: 218 raise NotImplementedError 219 220 # The following are Samba extensions: 221 def start_testsuite(self, name): 222 self._stream.write("testsuite: %s\n" % name) 223 224 def skip_testsuite(self, name, reason=None): 225 if reason: 226 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason)) 227 else: 228 self._stream.write("skip-testsuite: %s\n" % name) 229 230 def end_testsuite(self, name, result, reason=None): 231 if reason: 232 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason)) 233 else: 234 self._stream.write("testsuite-%s: %s\n" % (result, name)) 235 236 def output_msg(self, msg): 237 self._stream.write(msg) 238 239 240def read_test_regexes(*names): 241 ret = {} 242 files = [] 243 for name in names: 244 # if we are given a directory, we read all the files it contains 245 # (except the ones that end with "~"). 246 if os.path.isdir(name): 247 files.extend([os.path.join(name, x) 248 for x in os.listdir(name) 249 if x[-1] != '~']) 250 else: 251 files.append(name) 252 253 for filename in files: 254 f = open(filename, 'r') 255 try: 256 for l in f: 257 l = l.strip() 258 if l == "" or l[0] == "#": 259 continue 260 if "#" in l: 261 (regex, reason) = l.split("#", 1) 262 ret[regex.strip()] = reason.strip() 263 else: 264 ret[l] = None 265 finally: 266 f.close() 267 return ret 268 269 270def find_in_list(regexes, fullname): 271 for regex, reason in regexes.items(): 272 if re.match(regex, fullname): 273 if reason is None: 274 return "" 275 return reason 276 return None 277 278 279class ImmediateFail(Exception): 280 """Raised to abort immediately.""" 281 282 def __init__(self): 283 super(ImmediateFail, self).__init__("test failed and fail_immediately set") 284 285 286class FilterOps(unittest.TestResult): 287 288 def control_msg(self, msg): 289 pass # We regenerate control messages, so ignore this 290 291 def time(self, time): 292 self._ops.time(time) 293 294 def progress(self, delta, whence): 295 self._ops.progress(delta, whence) 296 297 def output_msg(self, msg): 298 if self.output is None: 299 sys.stdout.write(msg) 300 else: 301 self.output += msg 302 303 def startTest(self, test): 304 self.seen_output = True 305 test = self._add_prefix(test) 306 if self.strip_ok_output: 307 self.output = "" 308 309 self._ops.startTest(test) 310 311 def _add_prefix(self, test): 312 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix) 313 314 def addError(self, test, err=None): 315 test = self._add_prefix(test) 316 self.error_added += 1 317 self.total_error += 1 318 self._ops.addError(test, err) 319 self.output = None 320 if self.fail_immediately: 321 raise ImmediateFail() 322 323 def addSkip(self, test, reason=None): 324 self.seen_output = True 325 test = self._add_prefix(test) 326 self._ops.addSkip(test, reason) 327 self.output = None 328 329 def addExpectedFailure(self, test, err=None): 330 test = self._add_prefix(test) 331 self._ops.addExpectedFailure(test, err) 332 self.output = None 333 334 def addUnexpectedSuccess(self, test): 335 test = self._add_prefix(test) 336 self.uxsuccess_added += 1 337 self.total_uxsuccess += 1 338 self._ops.addUnexpectedSuccess(test) 339 if self.output: 340 self._ops.output_msg(self.output) 341 self.output = None 342 if self.fail_immediately: 343 raise ImmediateFail() 344 345 def addFailure(self, test, err=None): 346 test = self._add_prefix(test) 347 xfail_reason = find_in_list(self.expected_failures, test.id()) 348 if xfail_reason is None: 349 xfail_reason = find_in_list(self.flapping, test.id()) 350 if xfail_reason is not None: 351 self.xfail_added += 1 352 self.total_xfail += 1 353 self._ops.addExpectedFailure(test, err) 354 else: 355 self.fail_added += 1 356 self.total_fail += 1 357 self._ops.addFailure(test, err) 358 if self.output: 359 self._ops.output_msg(self.output) 360 if self.fail_immediately: 361 raise ImmediateFail() 362 self.output = None 363 364 def addSuccess(self, test): 365 test = self._add_prefix(test) 366 xfail_reason = find_in_list(self.expected_failures, test.id()) 367 if xfail_reason is not None: 368 self.uxsuccess_added += 1 369 self.total_uxsuccess += 1 370 self._ops.addUnexpectedSuccess(test) 371 if self.output: 372 self._ops.output_msg(self.output) 373 if self.fail_immediately: 374 raise ImmediateFail() 375 else: 376 self._ops.addSuccess(test) 377 self.output = None 378 379 def skip_testsuite(self, name, reason=None): 380 self._ops.skip_testsuite(name, reason) 381 382 def start_testsuite(self, name): 383 self._ops.start_testsuite(name) 384 self.error_added = 0 385 self.fail_added = 0 386 self.xfail_added = 0 387 self.uxsuccess_added = 0 388 389 def end_testsuite(self, name, result, reason=None): 390 xfail = False 391 392 if self.xfail_added > 0: 393 xfail = True 394 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0: 395 xfail = False 396 397 if xfail and result in ("fail", "failure"): 398 result = "xfail" 399 400 if self.uxsuccess_added > 0 and result != "uxsuccess": 401 result = "uxsuccess" 402 if reason is None: 403 reason = "Subunit/Filter Reason" 404 reason += "\n uxsuccess[%d]" % self.uxsuccess_added 405 406 if self.fail_added > 0 and result != "failure": 407 result = "failure" 408 if reason is None: 409 reason = "Subunit/Filter Reason" 410 reason += "\n failures[%d]" % self.fail_added 411 412 if self.error_added > 0 and result != "error": 413 result = "error" 414 if reason is None: 415 reason = "Subunit/Filter Reason" 416 reason += "\n errors[%d]" % self.error_added 417 418 self._ops.end_testsuite(name, result, reason) 419 if result not in ("success", "xfail"): 420 if self.output: 421 self._ops.output_msg(self.output) 422 if self.fail_immediately: 423 raise ImmediateFail() 424 self.output = None 425 426 def __init__(self, out, prefix=None, suffix=None, expected_failures=None, 427 strip_ok_output=False, fail_immediately=False, 428 flapping=None): 429 self._ops = out 430 self.seen_output = False 431 self.output = None 432 self.prefix = prefix 433 self.suffix = suffix 434 if expected_failures is not None: 435 self.expected_failures = expected_failures 436 else: 437 self.expected_failures = {} 438 if flapping is not None: 439 self.flapping = flapping 440 else: 441 self.flapping = {} 442 self.strip_ok_output = strip_ok_output 443 self.xfail_added = 0 444 self.fail_added = 0 445 self.uxsuccess_added = 0 446 self.total_xfail = 0 447 self.total_error = 0 448 self.total_fail = 0 449 self.total_uxsuccess = 0 450 self.error_added = 0 451 self.fail_immediately = fail_immediately 452 453 454class PerfFilterOps(unittest.TestResult): 455 456 def progress(self, delta, whence): 457 pass 458 459 def output_msg(self, msg): 460 pass 461 462 def control_msg(self, msg): 463 pass 464 465 def skip_testsuite(self, name, reason=None): 466 self._ops.skip_testsuite(name, reason) 467 468 def start_testsuite(self, name): 469 self.suite_has_time = False 470 471 def end_testsuite(self, name, result, reason=None): 472 pass 473 474 def _add_prefix(self, test): 475 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix) 476 477 def time(self, time): 478 self.latest_time = time 479 #self._ops.output_msg("found time %s\n" % time) 480 self.suite_has_time = True 481 482 def get_time(self): 483 if self.suite_has_time: 484 return self.latest_time 485 return datetime.datetime.utcnow() 486 487 def startTest(self, test): 488 self.seen_output = True 489 test = self._add_prefix(test) 490 self.starts[test.id()] = self.get_time() 491 492 def addSuccess(self, test): 493 test = self._add_prefix(test) 494 tid = test.id() 495 if tid not in self.starts: 496 self._ops.addError(test, "%s succeeded without ever starting!" % tid) 497 delta = self.get_time() - self.starts[tid] 498 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds())) 499 500 def addFailure(self, test, err=''): 501 tid = test.id() 502 delta = self.get_time() - self.starts[tid] 503 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" % 504 (tid, delta.total_seconds(), err)) 505 506 def addError(self, test, err=''): 507 tid = test.id() 508 delta = self.get_time() - self.starts[tid] 509 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" % 510 (tid, delta.total_seconds(), err)) 511 512 def __init__(self, out, prefix='', suffix=''): 513 self._ops = out 514 self.prefix = prefix or '' 515 self.suffix = suffix or '' 516 self.starts = {} 517 self.seen_output = False 518 self.suite_has_time = False 519 520 521class PlainFormatter(TestsuiteEnabledTestResult): 522 523 def __init__(self, verbose, immediate, statistics, 524 totaltests=None): 525 super(PlainFormatter, self).__init__() 526 self.verbose = verbose 527 self.immediate = immediate 528 self.statistics = statistics 529 self.start_time = None 530 self.test_output = {} 531 self.suitesfailed = [] 532 self.suites_ok = 0 533 self.skips = {} 534 self.index = 0 535 self.name = None 536 self._progress_level = 0 537 self.totalsuites = totaltests 538 self.last_time = None 539 540 @staticmethod 541 def _format_time(delta): 542 minutes, seconds = divmod(delta.seconds, 60) 543 hours, minutes = divmod(minutes, 60) 544 ret = "" 545 if hours: 546 ret += "%dh" % hours 547 if minutes: 548 ret += "%dm" % minutes 549 ret += "%ds" % seconds 550 return ret 551 552 def progress(self, offset, whence): 553 if whence == subunit.PROGRESS_POP: 554 self._progress_level -= 1 555 elif whence == subunit.PROGRESS_PUSH: 556 self._progress_level += 1 557 elif whence == subunit.PROGRESS_SET: 558 if self._progress_level == 0: 559 self.totalsuites = offset 560 elif whence == subunit.PROGRESS_CUR: 561 raise NotImplementedError 562 563 def time(self, dt): 564 if self.start_time is None: 565 self.start_time = dt 566 self.last_time = dt 567 568 def start_testsuite(self, name): 569 self.index += 1 570 self.name = name 571 572 if not self.verbose: 573 self.test_output[name] = "" 574 575 total_tests = (self.statistics['TESTS_EXPECTED_OK'] + 576 self.statistics['TESTS_EXPECTED_FAIL'] + 577 self.statistics['TESTS_ERROR'] + 578 self.statistics['TESTS_UNEXPECTED_FAIL'] + 579 self.statistics['TESTS_UNEXPECTED_OK']) 580 581 out = "[%d(%d)" % (self.index, total_tests) 582 if self.totalsuites is not None: 583 out += "/%d" % self.totalsuites 584 if self.start_time is not None: 585 out += " at " + self._format_time(self.last_time - self.start_time) 586 if self.suitesfailed: 587 out += ", %d errors" % (len(self.suitesfailed),) 588 out += "] %s" % name 589 if self.immediate: 590 sys.stdout.write(out + "\n") 591 else: 592 sys.stdout.write(out + ": ") 593 594 def output_msg(self, output): 595 if self.verbose: 596 sys.stdout.write(output) 597 elif self.name is not None: 598 self.test_output[self.name] += output 599 else: 600 sys.stdout.write(output) 601 602 def control_msg(self, output): 603 pass 604 605 def end_testsuite(self, name, result, reason): 606 out = "" 607 unexpected = False 608 609 if name not in self.test_output: 610 print("no output for name[%s]" % name) 611 612 if result in ("success", "xfail"): 613 self.suites_ok += 1 614 else: 615 self.output_msg("ERROR: Testsuite[%s]\n" % name) 616 if reason is not None: 617 self.output_msg("REASON: %s\n" % (reason,)) 618 self.suitesfailed.append(name) 619 if self.immediate and not self.verbose and name in self.test_output: 620 out += self.test_output[name] 621 unexpected = True 622 623 if not self.immediate: 624 if not unexpected: 625 out += " ok\n" 626 else: 627 out += " " + result.upper() + "\n" 628 629 sys.stdout.write(out) 630 631 def startTest(self, test): 632 pass 633 634 def addSuccess(self, test): 635 self.end_test(test.id(), "success", False) 636 637 def addError(self, test, err=None): 638 self.end_test(test.id(), "error", True, err) 639 640 def addFailure(self, test, err=None): 641 self.end_test(test.id(), "failure", True, err) 642 643 def addSkip(self, test, reason=None): 644 self.end_test(test.id(), "skip", False, reason) 645 646 def addExpectedFailure(self, test, err=None): 647 self.end_test(test.id(), "xfail", False, err) 648 649 def addUnexpectedSuccess(self, test): 650 self.end_test(test.id(), "uxsuccess", True) 651 652 def end_test(self, testname, result, unexpected, err=None): 653 if not unexpected: 654 self.test_output[self.name] = "" 655 if not self.immediate: 656 sys.stdout.write({ 657 'failure': 'f', 658 'xfail': 'X', 659 'skip': 's', 660 'success': '.'}.get(result, "?(%s)" % result)) 661 return 662 663 if self.name not in self.test_output: 664 self.test_output[self.name] = "" 665 666 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname) 667 if err is not None: 668 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip() 669 670 if self.immediate and not self.verbose: 671 sys.stdout.write(self.test_output[self.name]) 672 self.test_output[self.name] = "" 673 674 if not self.immediate: 675 sys.stdout.write({ 676 'error': 'E', 677 'failure': 'F', 678 'uxsuccess': 'U', 679 'success': 'S'}.get(result, "?")) 680 681 def write_summary(self, path): 682 f = open(path, 'w+') 683 684 if self.suitesfailed: 685 f.write("= Failed tests =\n") 686 687 for suite in self.suitesfailed: 688 f.write("== %s ==\n" % suite) 689 if suite in self.test_output: 690 f.write(self.test_output[suite] + "\n\n") 691 692 f.write("\n") 693 694 if not self.immediate and not self.verbose: 695 for suite in self.suitesfailed: 696 print("=" * 78) 697 print("FAIL: %s" % suite) 698 if suite in self.test_output: 699 print(self.test_output[suite]) 700 print("") 701 702 f.write("= Skipped tests =\n") 703 for reason in self.skips.keys(): 704 f.write(reason + "\n") 705 for name in self.skips[reason]: 706 f.write("\t%s\n" % name) 707 f.write("\n") 708 f.close() 709 710 if (not self.suitesfailed and 711 not self.statistics['TESTS_UNEXPECTED_FAIL'] and 712 not self.statistics['TESTS_UNEXPECTED_OK'] and 713 not self.statistics['TESTS_ERROR']): 714 ok = (self.statistics['TESTS_EXPECTED_OK'] + 715 self.statistics['TESTS_EXPECTED_FAIL']) 716 print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)) 717 else: 718 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % ( 719 self.statistics['TESTS_UNEXPECTED_FAIL'], 720 self.statistics['TESTS_ERROR'], 721 self.statistics['TESTS_UNEXPECTED_OK'], 722 len(self.suitesfailed))) 723 724 def skip_testsuite(self, name, reason="UNKNOWN"): 725 self.skips.setdefault(reason, []).append(name) 726 if self.totalsuites: 727 self.totalsuites -= 1 728