1#!/usr/bin/env python 2 3""" Command line test driver. """ 4 5from __future__ import unicode_literals 6from __future__ import print_function 7 8import argparse 9from collections import deque 10import datetime 11import io 12import re 13import shlex 14import subprocess 15import sys 16 17try: 18 from itertools import zip_longest 19except ImportError: 20 from itertools import izip_longest as zip_longest 21from difflib import SequenceMatcher 22 23# Directives can occur at the beginning of a line, or anywhere in a line that does not start with #. 24COMMENT_RE = r"^(?:[^#].*)?#\s*" 25 26# A regex showing how to run the file. 27RUN_RE = re.compile(COMMENT_RE + r"RUN:\s+(.*)\n") 28REQUIRES_RE = re.compile(COMMENT_RE + r"REQUIRES:\s+(.*)\n") 29 30# A regex capturing lines that should be checked against stdout. 31CHECK_STDOUT_RE = re.compile(COMMENT_RE + r"CHECK:\s+(.*)\n") 32 33# A regex capturing lines that should be checked against stderr. 34CHECK_STDERR_RE = re.compile(COMMENT_RE + r"CHECKERR:\s+(.*)\n") 35 36SKIP = object() 37 38def find_command(program): 39 import os 40 41 path, name = os.path.split(program) 42 if path: 43 return os.path.isfile(program) and os.access(program, os.X_OK) 44 for path in os.environ["PATH"].split(os.pathsep): 45 exe = os.path.join(path, program) 46 if os.path.isfile(exe) and os.access(exe, os.X_OK): 47 return exe 48 49 return None 50 51class Config(object): 52 def __init__(self): 53 # Whether to have verbose output. 54 self.verbose = False 55 # Whether output gets ANSI colorization. 56 self.colorize = False 57 # Whether to show which file was tested. 58 self.progress = False 59 60 def colors(self): 61 """ Return a dictionary mapping color names to ANSI escapes """ 62 63 def ansic(n): 64 return "\033[%dm" % n if self.colorize else "" 65 66 return { 67 "RESET": ansic(0), 68 "BOLD": ansic(1), 69 "NORMAL": ansic(39), 70 "BLACK": ansic(30), 71 "RED": ansic(31), 72 "GREEN": ansic(32), 73 "YELLOW": ansic(33), 74 "BLUE": ansic(34), 75 "MAGENTA": ansic(35), 76 "CYAN": ansic(36), 77 "LIGHTGRAY": ansic(37), 78 "DARKGRAY": ansic(90), 79 "LIGHTRED": ansic(91), 80 "LIGHTGREEN": ansic(92), 81 "LIGHTYELLOW": ansic(93), 82 "LIGHTBLUE": ansic(94), 83 "LIGHTMAGENTA": ansic(95), 84 "LIGHTCYAN": ansic(96), 85 "WHITE": ansic(97), 86 } 87 88 89def output(*args): 90 print("".join(args) + "\n") 91 92 93import unicodedata 94 95 96def esc(m): 97 map = { 98 "\n": "\\n", 99 "\\": "\\\\", 100 "'": "\\'", 101 '"': '\\"', 102 "\a": "\\a", 103 "\b": "\\b", 104 "\f": "\\f", 105 "\r": "\\r", 106 "\t": "\\t", 107 "\v": "\\v", 108 } 109 if m in map: 110 return map[m] 111 if unicodedata.category(m)[0] == "C": 112 return "\\x{:02x}".format(ord(m)) 113 else: 114 return m 115 116 117def escape_string(s): 118 return "".join(esc(ch) for ch in s) 119 120 121class CheckerError(Exception): 122 """Exception subclass for check line parsing. 123 124 Attributes: 125 line: the Line object on which the exception occurred. 126 """ 127 128 def __init__(self, message, line=None): 129 super(CheckerError, self).__init__(message) 130 self.line = line 131 132 133class Line(object): 134 """ A line that remembers where it came from. """ 135 136 def __init__(self, text, number, file): 137 self.text = text 138 self.number = number 139 self.file = file 140 141 def __hash__(self): 142 # Chosen by fair diceroll 143 # No, just kidding. 144 # HACK: We pass this to the Sequencematcher, which puts the Checks into a dict. 145 # To force it to match the regexes, we return a hash collision intentionally, 146 # so it falls back on __eq__(). 147 # 148 # CheckCmd has the same thing. 149 return 0 150 151 def __eq__(self, other): 152 if other is None: 153 return False 154 if isinstance(other, CheckCmd): 155 return other.regex.match(self.text) 156 if isinstance(other, Line): 157 # We only compare the text here so SequenceMatcher can reshuffle these 158 return self.text == other.text 159 raise NotImplementedError 160 161 def subline(self, text): 162 """ Return a substring of our line with the given text, preserving number and file. """ 163 return Line(text, self.number, self.file) 164 165 @staticmethod 166 def readfile(file, name): 167 return [Line(text, idx + 1, name) for idx, text in enumerate(file)] 168 169 def is_empty_space(self): 170 return not self.text or self.text.isspace() 171 172 def escaped_text(self, for_formatting=False): 173 ret = escape_string(self.text.rstrip("\n")) 174 if for_formatting: 175 ret = ret.replace("{", "{{").replace("}", "}}") 176 return ret 177 178 179class RunCmd(object): 180 """A command to run on a given Checker. 181 182 Attributes: 183 args: Unexpanded shell command as a string. 184 """ 185 186 def __init__(self, args, line): 187 self.args = args 188 self.line = line 189 190 @staticmethod 191 def parse(line): 192 if not shlex.split(line.text): 193 raise CheckerError("Invalid RUN command", line) 194 return RunCmd(line.text, line) 195 196 197class TestFailure(object): 198 def __init__(self, line, check, testrun, diff=None, lines=[], checks=[]): 199 self.line = line 200 self.check = check 201 self.testrun = testrun 202 self.error_annotation_lines = None 203 self.diff = diff 204 self.lines = lines 205 self.checks = checks 206 207 def message(self): 208 fields = self.testrun.config.colors() 209 fields["name"] = self.testrun.name 210 fields["subbed_command"] = self.testrun.subbed_command 211 if self.line: 212 fields.update( 213 { 214 "output_file": self.line.file, 215 "output_lineno": self.line.number, 216 "output_line": self.line.escaped_text(), 217 } 218 ) 219 if self.check: 220 fields.update( 221 { 222 "input_file": self.check.line.file, 223 "input_lineno": self.check.line.number, 224 "input_line": self.check.line.escaped_text(), 225 "check_type": self.check.type, 226 } 227 ) 228 filemsg = "" if self.testrun.config.progress else " in {name}" 229 fmtstrs = ["{RED}Failure{RESET}" + filemsg + ":", ""] 230 if self.line and self.check: 231 fmtstrs += [ 232 " The {check_type} on line {input_lineno} wants:", 233 " {BOLD}{input_line}{RESET}", 234 "", 235 " which failed to match line {output_file}:{output_lineno}:", 236 " {BOLD}{output_line}{RESET}", 237 "", 238 ] 239 240 elif self.check: 241 fmtstrs += [ 242 " The {check_type} on line {input_lineno} wants:", 243 " {BOLD}{input_line}{RESET}", 244 "", 245 " but there was no remaining output to match.", 246 "", 247 ] 248 else: 249 fmtstrs += [ 250 " There were no remaining checks left to match {output_file}:{output_lineno}:", 251 " {BOLD}{output_line}{RESET}", 252 "", 253 ] 254 if self.error_annotation_lines: 255 fields["error_annotation"] = " ".join( 256 [x.text for x in self.error_annotation_lines] 257 ) 258 fields["error_annotation_lineno"] = str( 259 self.error_annotation_lines[0].number 260 ) 261 if len(self.error_annotation_lines) > 1: 262 fields["error_annotation_lineno"] += ":" + str( 263 self.error_annotation_lines[-1].number 264 ) 265 fmtstrs += [ 266 " additional output on stderr:{error_annotation_lineno}:", 267 " {BOLD}{error_annotation}{RESET}", 268 ] 269 if self.diff: 270 fmtstrs += [" Context:"] 271 lasthi = 0 272 lastcheckline = None 273 for d in self.diff.get_grouped_opcodes(): 274 for op, alo, ahi, blo, bhi in d: 275 color = "{BOLD}" 276 if op == "replace" or op == "delete": 277 color = "{RED}" 278 # We got a new chunk, so we print a marker. 279 if alo > lasthi: 280 fmtstrs += [ 281 " [...] from line " 282 + str(self.checks[blo].line.number) 283 + " (" 284 + self.lines[alo].file 285 + ":" 286 + str(self.lines[alo].number) 287 + "):" 288 ] 289 lasthi = ahi 290 291 # We print one "no more checks" after the last check and then skip any markers 292 lastcheck = False 293 for a, b in zip_longest(self.lines[alo:ahi], self.checks[blo:bhi]): 294 # Clean up strings for use in a format string - double up the curlies. 295 astr = ( 296 color + a.escaped_text(for_formatting=True) + "{RESET}" 297 if a 298 else "" 299 ) 300 if b: 301 bstr = ( 302 "'{BLUE}" 303 + b.line.escaped_text(for_formatting=True) 304 + "{RESET}'" 305 + " on line " 306 + str(b.line.number) 307 ) 308 lastcheckline = b.line.number 309 310 if op == "equal": 311 fmtstrs += [" " + astr] 312 elif b and a: 313 fmtstrs += [ 314 " " 315 + astr 316 + " <= does not match " 317 + b.type 318 + " " 319 + bstr 320 ] 321 elif b: 322 fmtstrs += [ 323 " " 324 + astr 325 + " <= nothing to match " 326 + b.type 327 + " " 328 + bstr 329 ] 330 elif not b: 331 string = " " + astr 332 if bhi == len(self.checks): 333 if not lastcheck: 334 string += " <= no more checks" 335 lastcheck = True 336 elif lastcheckline is not None: 337 string += ( 338 " <= no check matches this, previous check on line " 339 + str(lastcheckline) 340 ) 341 else: 342 string += " <= no check matches" 343 fmtstrs.append(string) 344 fmtstrs.append("") 345 fmtstrs += [" when running command:", " {subbed_command}"] 346 return "\n".join(fmtstrs).format(**fields) 347 348 def print_message(self): 349 """ Print our message to stdout. """ 350 print(self.message()) 351 352 353def perform_substitution(input_str, subs): 354 """Perform the substitutions described by subs to str 355 Return the substituted string. 356 """ 357 # Sort our substitutions into a list of tuples (key, value), descending by length. 358 # It needs to be descending because we need to try longer substitutions first. 359 subs_ordered = sorted(subs.items(), key=lambda s: len(s[0]), reverse=True) 360 361 def subber(m): 362 # We get the entire sequence of characters. 363 # Replace just the prefix and return it. 364 text = m.group(1) 365 for key, replacement in subs_ordered: 366 if text.startswith(key): 367 return replacement + text[len(key) :] 368 # No substitution found, so we default to running it as-is, 369 # which will end up running it via $PATH. 370 return text 371 372 return re.sub(r"%(%|[a-zA-Z0-9_-]+)", subber, input_str) 373 374 375def runproc(cmd): 376 """ Wrapper around subprocess.Popen to save typing """ 377 PIPE = subprocess.PIPE 378 proc = subprocess.Popen( 379 cmd, 380 stdin=PIPE, 381 stdout=PIPE, 382 stderr=PIPE, 383 shell=True, 384 close_fds=True, # For Python 2.6 as shipped on RHEL 6 385 ) 386 return proc 387 388 389class TestRun(object): 390 def __init__(self, name, runcmd, checker, subs, config): 391 self.name = name 392 self.runcmd = runcmd 393 self.subbed_command = perform_substitution(runcmd.args, subs) 394 self.checker = checker 395 self.subs = subs 396 self.config = config 397 398 def check(self, lines, checks): 399 # Reverse our lines and checks so we can pop off the end. 400 lineq = lines[::-1] 401 checkq = checks[::-1] 402 usedlines = [] 403 usedchecks = [] 404 mismatches = [] 405 while lineq and checkq: 406 line = lineq[-1] 407 check = checkq[-1] 408 if check == line: 409 # This line matched this checker, continue on. 410 usedlines.append(line) 411 usedchecks.append(check) 412 lineq.pop() 413 checkq.pop() 414 elif line.is_empty_space(): 415 # Skip all whitespace input lines. 416 lineq.pop() 417 else: 418 usedlines.append(line) 419 usedchecks.append(check) 420 mismatches.append((line, check)) 421 # Failed to match. 422 lineq.pop() 423 checkq.pop() 424 425 # Drain empties 426 while lineq and lineq[-1].is_empty_space(): 427 lineq.pop() 428 429 # Store the remaining lines for the diff 430 for i in lineq[::-1]: 431 if not i.is_empty_space(): 432 usedlines.append(i) 433 # Store remaining checks for the diff 434 for i in checkq[::-1]: 435 usedchecks.append(i) 436 437 # Do a SequenceMatch! This gives us a diff-like thing. 438 diff = SequenceMatcher(a=usedlines, b=usedchecks, autojunk=False) 439 # If there's a mismatch or still lines or checkers, we have a failure. 440 # Otherwise it's success. 441 if mismatches: 442 return TestFailure( 443 mismatches[0][0], 444 mismatches[0][1], 445 self, 446 diff=diff, 447 lines=usedlines, 448 checks=usedchecks, 449 ) 450 elif lineq: 451 return TestFailure( 452 lineq[-1], None, self, diff=diff, lines=usedlines, checks=usedchecks 453 ) 454 elif checkq: 455 return TestFailure( 456 None, checkq[-1], self, diff=diff, lines=usedlines, checks=usedchecks 457 ) 458 else: 459 # Success! 460 return None 461 462 def run(self): 463 """ Run the command. Return a TestFailure, or None. """ 464 465 def split_by_newlines(s): 466 """Decode a string and split it by newlines only, 467 retaining the newlines. 468 """ 469 return [s + "\n" for s in s.decode("utf-8").split("\n")] 470 471 PIPE = subprocess.PIPE 472 if self.config.verbose: 473 print(self.subbed_command) 474 proc = runproc(self.subbed_command) 475 stdout, stderr = proc.communicate() 476 # HACK: This is quite cheesy: POSIX specifies that sh should return 127 for a missing command. 477 # It's also possible that it'll be returned in other situations, 478 # most likely when the last command in a shell script doesn't exist. 479 # So we check if the command *we execute* exists, and complain then. 480 status = proc.returncode 481 cmd = shlex.split(self.subbed_command)[0] 482 if status == 127 and not find_command(cmd): 483 raise CheckerError("Command could not be found: " + cmd) 484 if status == 126 and not find_command(cmd): 485 raise CheckerError("Command is not executable: " + cmd) 486 487 outlines = [ 488 Line(text, idx + 1, "stdout") 489 for idx, text in enumerate(split_by_newlines(stdout)) 490 ] 491 errlines = [ 492 Line(text, idx + 1, "stderr") 493 for idx, text in enumerate(split_by_newlines(stderr)) 494 ] 495 outfail = self.check(outlines, self.checker.outchecks) 496 errfail = self.check(errlines, self.checker.errchecks) 497 # It's possible that something going wrong on stdout resulted in new 498 # text being printed on stderr. If we have an outfailure, and either 499 # non-matching or unmatched stderr text, then annotate the outfail 500 # with it. 501 if outfail and errfail and errfail.line: 502 outfail.error_annotation_lines = errlines[errfail.line.number - 1 :] 503 # Trim a trailing newline 504 if outfail.error_annotation_lines[-1].text == "\n": 505 del outfail.error_annotation_lines[-1] 506 return outfail if outfail else errfail 507 508 509class CheckCmd(object): 510 def __init__(self, line, checktype, regex): 511 self.line = line 512 self.type = checktype 513 self.regex = regex 514 515 def __hash__(self): 516 # HACK: We pass this to the Sequencematcher, which puts the Checks into a dict. 517 # To force it to match the regexes, we return a hash collision intentionally, 518 # so it falls back on __eq__(). 519 # 520 # Line has the same thing. 521 return 0 522 523 def __eq__(self, other): 524 # "Magical" comparison with lines and strings. 525 # Typically I wouldn't use this, but it allows us to check if a line matches any check in a dict or list via 526 # the `in` operator. 527 if other is None: 528 return False 529 if isinstance(other, CheckCmd): 530 return self.regex == other.regex 531 if isinstance(other, Line): 532 return self.regex.match(other.text) 533 if isinstance(other, str): 534 return self.regex.match(other) 535 raise NotImplementedError 536 537 @staticmethod 538 def parse(line, checktype): 539 # type: (Line) -> CheckCmd 540 # Everything inside {{}} is a regular expression. 541 # Everything outside of it is a literal string. 542 # Split around {{...}}. Then every odd index will be a regex, and 543 # evens will be literals. 544 # Note that if {{...}} appears first we will get an empty string in 545 # the split array, so the {{...}} matches are always at odd indexes. 546 bracket_re = re.compile( 547 r""" 548 \{\{ # Two open brackets 549 (.*?) # Nongreedy capture 550 \}\} # Two close brackets 551 """, 552 re.VERBOSE, 553 ) 554 pieces = bracket_re.split(line.text) 555 even = True 556 re_strings = [] 557 for piece in pieces: 558 if even: 559 # piece is a literal string. 560 re_strings.append(re.escape(piece)) 561 else: 562 # piece is a regex (found inside {{...}}). 563 # Verify the regex can be compiled. 564 try: 565 re.compile(piece) 566 except re.error: 567 raise CheckerError("Invalid regular expression: '%s'" % piece, line) 568 re_strings.append(piece) 569 even = not even 570 # Enclose each piece in a non-capturing group. 571 # This ensures that lower-precedence operators don't trip up catenation. 572 # For example: {{b|c}}d would result in /b|cd/ which is different. 573 # Backreferences are assumed to match across the entire string. 574 re_strings = ["(?:%s)" % s for s in re_strings] 575 # Anchor at beginning and end (allowing arbitrary whitespace), and maybe 576 # a terminating newline. 577 # We need the anchors because Python's match() matches an arbitrary prefix, 578 # not the entire string. 579 re_strings = [r"^\s*"] + re_strings + [r"\s*\n?$"] 580 full_re = re.compile("".join(re_strings)) 581 return CheckCmd(line, checktype, full_re) 582 583 584class Checker(object): 585 def __init__(self, name, lines): 586 self.name = name 587 # Helper to yield subline containing group1 from all matching lines. 588 def group1s(regex): 589 for line in lines: 590 m = regex.match(line.text) 591 if m: 592 yield line.subline(m.group(1)) 593 594 # Find run commands. 595 self.runcmds = [RunCmd.parse(sl) for sl in group1s(RUN_RE)] 596 if not self.runcmds: 597 # If no RUN command has been given, fall back to the shebang. 598 if lines[0].text.startswith("#!"): 599 # Remove the "#!" at the beginning, and the newline at the end. 600 cmd = lines[0].text[2:-1] 601 if not find_command(cmd): 602 raise CheckerError("Command could not be found: " + cmd) 603 self.runcmds = [RunCmd(cmd + " %s", lines[0])] 604 else: 605 raise CheckerError("No runlines ('# RUN') found") 606 607 self.requirecmds = [RunCmd.parse(sl) for sl in group1s(REQUIRES_RE)] 608 609 # Find check cmds. 610 self.outchecks = [ 611 CheckCmd.parse(sl, "CHECK") for sl in group1s(CHECK_STDOUT_RE) 612 ] 613 self.errchecks = [ 614 CheckCmd.parse(sl, "CHECKERR") for sl in group1s(CHECK_STDERR_RE) 615 ] 616 617 618def check_file(input_file, name, subs, config, failure_handler): 619 """ Check a single file. Return a True on success, False on error. """ 620 success = True 621 lines = Line.readfile(input_file, name) 622 checker = Checker(name, lines) 623 624 # Run all the REQUIRES lines first, 625 # if any of them fail it's a SKIP 626 for reqcmd in checker.requirecmds: 627 proc = runproc( 628 perform_substitution(reqcmd.args, subs) 629 ) 630 stdout, stderr = proc.communicate() 631 status = proc.returncode 632 if proc.returncode > 0: 633 return SKIP 634 635 # Only then run the RUN lines. 636 for runcmd in checker.runcmds: 637 failure = TestRun(name, runcmd, checker, subs, config).run() 638 if failure: 639 failure_handler(failure) 640 success = False 641 return success 642 643 644def check_path(path, subs, config, failure_handler): 645 with io.open(path, encoding="utf-8") as fd: 646 return check_file(fd, path, subs, config, failure_handler) 647 648 649def parse_subs(subs): 650 """Given a list of input substitutions like 'foo=bar', 651 return a dictionary like {foo:bar}, or exit if invalid. 652 """ 653 result = {} 654 for sub in subs: 655 try: 656 key, val = sub.split("=", 1) 657 if not key: 658 print("Invalid substitution %s: empty key" % sub) 659 sys.exit(1) 660 if not val: 661 print("Invalid substitution %s: empty value" % sub) 662 sys.exit(1) 663 result[key] = val 664 except ValueError: 665 print("Invalid substitution %s: equal sign not found" % sub) 666 sys.exit(1) 667 return result 668 669 670def get_argparse(): 671 """ Return a littlecheck argument parser. """ 672 parser = argparse.ArgumentParser( 673 description="littlecheck: command line tool tester." 674 ) 675 parser.add_argument( 676 "-s", 677 "--substitute", 678 type=str, 679 help="Add a new substitution for RUN lines. Example: bash=/bin/bash", 680 action="append", 681 default=[], 682 ) 683 parser.add_argument( 684 "-p", 685 "--progress", 686 action="store_true", 687 dest="progress", 688 help="Show the files to be checked", 689 default=False, 690 ) 691 parser.add_argument("file", nargs="+", help="File to check") 692 return parser 693 694 695def main(): 696 args = get_argparse().parse_args() 697 # Default substitution is %% -> % 698 def_subs = {"%": "%"} 699 def_subs.update(parse_subs(args.substitute)) 700 701 failure_count = 0 702 config = Config() 703 config.colorize = sys.stdout.isatty() 704 config.progress = args.progress 705 fields = config.colors() 706 707 for path in args.file: 708 fields["path"] = path 709 if config.progress: 710 print("Testing file {path} ... ".format(**fields), end="") 711 sys.stdout.flush() 712 subs = def_subs.copy() 713 subs["s"] = path 714 starttime = datetime.datetime.now() 715 ret = check_path(path, subs, config, TestFailure.print_message) 716 if not ret: 717 failure_count += 1 718 elif config.progress: 719 endtime = datetime.datetime.now() 720 duration_ms = round((endtime - starttime).total_seconds() * 1000) 721 reason = "ok" 722 color = "{GREEN}" 723 if ret is SKIP: 724 reason = "SKIPPED" 725 color = "{BLUE}" 726 print( 727 (color + "{reason}{RESET} ({duration} ms)").format( 728 duration=duration_ms, reason=reason, **fields 729 ) 730 ) 731 sys.exit(failure_count) 732 733 734if __name__ == "__main__": 735 main() 736