1#!/usr/bin/env python
2
3""" Command line test driver. """
4
5from __future__ import unicode_literals
6from __future__ import print_function
7
8import argparse
9from collections import deque
10import datetime
11import io
12import re
13import shlex
14import subprocess
15import sys
16
17try:
18    from itertools import zip_longest
19except ImportError:
20    from itertools import izip_longest as zip_longest
21from difflib import SequenceMatcher
22
23# Directives can occur at the beginning of a line, or anywhere in a line that does not start with #.
24COMMENT_RE = r"^(?:[^#].*)?#\s*"
25
26# A regex showing how to run the file.
27RUN_RE = re.compile(COMMENT_RE + r"RUN:\s+(.*)\n")
28REQUIRES_RE = re.compile(COMMENT_RE + r"REQUIRES:\s+(.*)\n")
29
30# A regex capturing lines that should be checked against stdout.
31CHECK_STDOUT_RE = re.compile(COMMENT_RE + r"CHECK:\s+(.*)\n")
32
33# A regex capturing lines that should be checked against stderr.
34CHECK_STDERR_RE = re.compile(COMMENT_RE + r"CHECKERR:\s+(.*)\n")
35
36SKIP = object()
37
38def find_command(program):
39    import os
40
41    path, name = os.path.split(program)
42    if path:
43        return os.path.isfile(program) and os.access(program, os.X_OK)
44    for path in os.environ["PATH"].split(os.pathsep):
45        exe = os.path.join(path, program)
46        if os.path.isfile(exe) and os.access(exe, os.X_OK):
47            return exe
48
49    return None
50
51class Config(object):
52    def __init__(self):
53        # Whether to have verbose output.
54        self.verbose = False
55        # Whether output gets ANSI colorization.
56        self.colorize = False
57        # Whether to show which file was tested.
58        self.progress = False
59
60    def colors(self):
61        """ Return a dictionary mapping color names to ANSI escapes """
62
63        def ansic(n):
64            return "\033[%dm" % n if self.colorize else ""
65
66        return {
67            "RESET": ansic(0),
68            "BOLD": ansic(1),
69            "NORMAL": ansic(39),
70            "BLACK": ansic(30),
71            "RED": ansic(31),
72            "GREEN": ansic(32),
73            "YELLOW": ansic(33),
74            "BLUE": ansic(34),
75            "MAGENTA": ansic(35),
76            "CYAN": ansic(36),
77            "LIGHTGRAY": ansic(37),
78            "DARKGRAY": ansic(90),
79            "LIGHTRED": ansic(91),
80            "LIGHTGREEN": ansic(92),
81            "LIGHTYELLOW": ansic(93),
82            "LIGHTBLUE": ansic(94),
83            "LIGHTMAGENTA": ansic(95),
84            "LIGHTCYAN": ansic(96),
85            "WHITE": ansic(97),
86        }
87
88
89def output(*args):
90    print("".join(args) + "\n")
91
92
93import unicodedata
94
95
96def esc(m):
97    map = {
98        "\n": "\\n",
99        "\\": "\\\\",
100        "'": "\\'",
101        '"': '\\"',
102        "\a": "\\a",
103        "\b": "\\b",
104        "\f": "\\f",
105        "\r": "\\r",
106        "\t": "\\t",
107        "\v": "\\v",
108    }
109    if m in map:
110        return map[m]
111    if unicodedata.category(m)[0] == "C":
112        return "\\x{:02x}".format(ord(m))
113    else:
114        return m
115
116
117def escape_string(s):
118    return "".join(esc(ch) for ch in s)
119
120
121class CheckerError(Exception):
122    """Exception subclass for check line parsing.
123
124    Attributes:
125      line: the Line object on which the exception occurred.
126    """
127
128    def __init__(self, message, line=None):
129        super(CheckerError, self).__init__(message)
130        self.line = line
131
132
133class Line(object):
134    """ A line that remembers where it came from. """
135
136    def __init__(self, text, number, file):
137        self.text = text
138        self.number = number
139        self.file = file
140
141    def __hash__(self):
142        # Chosen by fair diceroll
143        # No, just kidding.
144        # HACK: We pass this to the Sequencematcher, which puts the Checks into a dict.
145        # To force it to match the regexes, we return a hash collision intentionally,
146        # so it falls back on __eq__().
147        #
148        # CheckCmd has the same thing.
149        return 0
150
151    def __eq__(self, other):
152        if other is None:
153            return False
154        if isinstance(other, CheckCmd):
155            return other.regex.match(self.text)
156        if isinstance(other, Line):
157            # We only compare the text here so SequenceMatcher can reshuffle these
158            return self.text == other.text
159        raise NotImplementedError
160
161    def subline(self, text):
162        """ Return a substring of our line with the given text, preserving number and file. """
163        return Line(text, self.number, self.file)
164
165    @staticmethod
166    def readfile(file, name):
167        return [Line(text, idx + 1, name) for idx, text in enumerate(file)]
168
169    def is_empty_space(self):
170        return not self.text or self.text.isspace()
171
172    def escaped_text(self, for_formatting=False):
173        ret = escape_string(self.text.rstrip("\n"))
174        if for_formatting:
175            ret = ret.replace("{", "{{").replace("}", "}}")
176        return ret
177
178
179class RunCmd(object):
180    """A command to run on a given Checker.
181
182    Attributes:
183        args: Unexpanded shell command as a string.
184    """
185
186    def __init__(self, args, line):
187        self.args = args
188        self.line = line
189
190    @staticmethod
191    def parse(line):
192        if not shlex.split(line.text):
193            raise CheckerError("Invalid RUN command", line)
194        return RunCmd(line.text, line)
195
196
197class TestFailure(object):
198    def __init__(self, line, check, testrun, diff=None, lines=[], checks=[]):
199        self.line = line
200        self.check = check
201        self.testrun = testrun
202        self.error_annotation_lines = None
203        self.diff = diff
204        self.lines = lines
205        self.checks = checks
206
207    def message(self):
208        fields = self.testrun.config.colors()
209        fields["name"] = self.testrun.name
210        fields["subbed_command"] = self.testrun.subbed_command
211        if self.line:
212            fields.update(
213                {
214                    "output_file": self.line.file,
215                    "output_lineno": self.line.number,
216                    "output_line": self.line.escaped_text(),
217                }
218            )
219        if self.check:
220            fields.update(
221                {
222                    "input_file": self.check.line.file,
223                    "input_lineno": self.check.line.number,
224                    "input_line": self.check.line.escaped_text(),
225                    "check_type": self.check.type,
226                }
227            )
228        filemsg = "" if self.testrun.config.progress else " in {name}"
229        fmtstrs = ["{RED}Failure{RESET}" + filemsg + ":", ""]
230        if self.line and self.check:
231            fmtstrs += [
232                "  The {check_type} on line {input_lineno} wants:",
233                "    {BOLD}{input_line}{RESET}",
234                "",
235                "  which failed to match line {output_file}:{output_lineno}:",
236                "    {BOLD}{output_line}{RESET}",
237                "",
238            ]
239
240        elif self.check:
241            fmtstrs += [
242                "  The {check_type} on line {input_lineno} wants:",
243                "    {BOLD}{input_line}{RESET}",
244                "",
245                "  but there was no remaining output to match.",
246                "",
247            ]
248        else:
249            fmtstrs += [
250                "  There were no remaining checks left to match {output_file}:{output_lineno}:",
251                "    {BOLD}{output_line}{RESET}",
252                "",
253            ]
254        if self.error_annotation_lines:
255            fields["error_annotation"] = "    ".join(
256                [x.text for x in self.error_annotation_lines]
257            )
258            fields["error_annotation_lineno"] = str(
259                self.error_annotation_lines[0].number
260            )
261            if len(self.error_annotation_lines) > 1:
262                fields["error_annotation_lineno"] += ":" + str(
263                    self.error_annotation_lines[-1].number
264                )
265            fmtstrs += [
266                "  additional output on stderr:{error_annotation_lineno}:",
267                "    {BOLD}{error_annotation}{RESET}",
268            ]
269        if self.diff:
270            fmtstrs += ["  Context:"]
271            lasthi = 0
272            lastcheckline = None
273            for d in self.diff.get_grouped_opcodes():
274                for op, alo, ahi, blo, bhi in d:
275                    color = "{BOLD}"
276                    if op == "replace" or op == "delete":
277                        color = "{RED}"
278                    # We got a new chunk, so we print a marker.
279                    if alo > lasthi:
280                        fmtstrs += [
281                            "    [...] from line "
282                            + str(self.checks[blo].line.number)
283                            + " ("
284                            + self.lines[alo].file
285                            + ":"
286                            + str(self.lines[alo].number)
287                            + "):"
288                        ]
289                    lasthi = ahi
290
291                    # We print one "no more checks" after the last check and then skip any markers
292                    lastcheck = False
293                    for a, b in zip_longest(self.lines[alo:ahi], self.checks[blo:bhi]):
294                        # Clean up strings for use in a format string - double up the curlies.
295                        astr = (
296                            color + a.escaped_text(for_formatting=True) + "{RESET}"
297                            if a
298                            else ""
299                        )
300                        if b:
301                            bstr = (
302                                "'{BLUE}"
303                                + b.line.escaped_text(for_formatting=True)
304                                + "{RESET}'"
305                                + " on line "
306                                + str(b.line.number)
307                            )
308                            lastcheckline = b.line.number
309
310                        if op == "equal":
311                            fmtstrs += ["    " + astr]
312                        elif b and a:
313                            fmtstrs += [
314                                "    "
315                                + astr
316                                + " <= does not match "
317                                + b.type
318                                + " "
319                                + bstr
320                            ]
321                        elif b:
322                            fmtstrs += [
323                                "    "
324                                + astr
325                                + " <= nothing to match "
326                                + b.type
327                                + " "
328                                + bstr
329                            ]
330                        elif not b:
331                            string = "    " + astr
332                            if bhi == len(self.checks):
333                                if not lastcheck:
334                                    string += " <= no more checks"
335                                    lastcheck = True
336                            elif lastcheckline is not None:
337                                string += (
338                                    " <= no check matches this, previous check on line "
339                                    + str(lastcheckline)
340                                )
341                            else:
342                                string += " <= no check matches"
343                            fmtstrs.append(string)
344            fmtstrs.append("")
345        fmtstrs += ["  when running command:", "    {subbed_command}"]
346        return "\n".join(fmtstrs).format(**fields)
347
348    def print_message(self):
349        """ Print our message to stdout. """
350        print(self.message())
351
352
353def perform_substitution(input_str, subs):
354    """Perform the substitutions described by subs to str
355    Return the substituted string.
356    """
357    # Sort our substitutions into a list of tuples (key, value), descending by length.
358    # It needs to be descending because we need to try longer substitutions first.
359    subs_ordered = sorted(subs.items(), key=lambda s: len(s[0]), reverse=True)
360
361    def subber(m):
362        # We get the entire sequence of characters.
363        # Replace just the prefix and return it.
364        text = m.group(1)
365        for key, replacement in subs_ordered:
366            if text.startswith(key):
367                return replacement + text[len(key) :]
368        # No substitution found, so we default to running it as-is,
369        # which will end up running it via $PATH.
370        return text
371
372    return re.sub(r"%(%|[a-zA-Z0-9_-]+)", subber, input_str)
373
374
375def runproc(cmd):
376    """ Wrapper around subprocess.Popen to save typing """
377    PIPE = subprocess.PIPE
378    proc = subprocess.Popen(
379        cmd,
380        stdin=PIPE,
381        stdout=PIPE,
382        stderr=PIPE,
383        shell=True,
384        close_fds=True,  # For Python 2.6 as shipped on RHEL 6
385    )
386    return proc
387
388
389class TestRun(object):
390    def __init__(self, name, runcmd, checker, subs, config):
391        self.name = name
392        self.runcmd = runcmd
393        self.subbed_command = perform_substitution(runcmd.args, subs)
394        self.checker = checker
395        self.subs = subs
396        self.config = config
397
398    def check(self, lines, checks):
399        # Reverse our lines and checks so we can pop off the end.
400        lineq = lines[::-1]
401        checkq = checks[::-1]
402        usedlines = []
403        usedchecks = []
404        mismatches = []
405        while lineq and checkq:
406            line = lineq[-1]
407            check = checkq[-1]
408            if check == line:
409                # This line matched this checker, continue on.
410                usedlines.append(line)
411                usedchecks.append(check)
412                lineq.pop()
413                checkq.pop()
414            elif line.is_empty_space():
415                # Skip all whitespace input lines.
416                lineq.pop()
417            else:
418                usedlines.append(line)
419                usedchecks.append(check)
420                mismatches.append((line, check))
421                # Failed to match.
422                lineq.pop()
423                checkq.pop()
424
425        # Drain empties
426        while lineq and lineq[-1].is_empty_space():
427            lineq.pop()
428
429        # Store the remaining lines for the diff
430        for i in lineq[::-1]:
431            if not i.is_empty_space():
432                usedlines.append(i)
433        # Store remaining checks for the diff
434        for i in checkq[::-1]:
435            usedchecks.append(i)
436
437        # Do a SequenceMatch! This gives us a diff-like thing.
438        diff = SequenceMatcher(a=usedlines, b=usedchecks, autojunk=False)
439        # If there's a mismatch or still lines or checkers, we have a failure.
440        # Otherwise it's success.
441        if mismatches:
442            return TestFailure(
443                mismatches[0][0],
444                mismatches[0][1],
445                self,
446                diff=diff,
447                lines=usedlines,
448                checks=usedchecks,
449            )
450        elif lineq:
451            return TestFailure(
452                lineq[-1], None, self, diff=diff, lines=usedlines, checks=usedchecks
453            )
454        elif checkq:
455            return TestFailure(
456                None, checkq[-1], self, diff=diff, lines=usedlines, checks=usedchecks
457            )
458        else:
459            # Success!
460            return None
461
462    def run(self):
463        """ Run the command. Return a TestFailure, or None. """
464
465        def split_by_newlines(s):
466            """Decode a string and split it by newlines only,
467            retaining the newlines.
468            """
469            return [s + "\n" for s in s.decode("utf-8").split("\n")]
470
471        PIPE = subprocess.PIPE
472        if self.config.verbose:
473            print(self.subbed_command)
474        proc = runproc(self.subbed_command)
475        stdout, stderr = proc.communicate()
476        # HACK: This is quite cheesy: POSIX specifies that sh should return 127 for a missing command.
477        # It's also possible that it'll be returned in other situations,
478        # most likely when the last command in a shell script doesn't exist.
479        # So we check if the command *we execute* exists, and complain then.
480        status = proc.returncode
481        cmd = shlex.split(self.subbed_command)[0]
482        if status == 127 and not find_command(cmd):
483            raise CheckerError("Command could not be found: " + cmd)
484        if status == 126 and not find_command(cmd):
485            raise CheckerError("Command is not executable: " + cmd)
486
487        outlines = [
488            Line(text, idx + 1, "stdout")
489            for idx, text in enumerate(split_by_newlines(stdout))
490        ]
491        errlines = [
492            Line(text, idx + 1, "stderr")
493            for idx, text in enumerate(split_by_newlines(stderr))
494        ]
495        outfail = self.check(outlines, self.checker.outchecks)
496        errfail = self.check(errlines, self.checker.errchecks)
497        # It's possible that something going wrong on stdout resulted in new
498        # text being printed on stderr. If we have an outfailure, and either
499        # non-matching or unmatched stderr text, then annotate the outfail
500        # with it.
501        if outfail and errfail and errfail.line:
502            outfail.error_annotation_lines = errlines[errfail.line.number - 1 :]
503            # Trim a trailing newline
504            if outfail.error_annotation_lines[-1].text == "\n":
505                del outfail.error_annotation_lines[-1]
506        return outfail if outfail else errfail
507
508
509class CheckCmd(object):
510    def __init__(self, line, checktype, regex):
511        self.line = line
512        self.type = checktype
513        self.regex = regex
514
515    def __hash__(self):
516        # HACK: We pass this to the Sequencematcher, which puts the Checks into a dict.
517        # To force it to match the regexes, we return a hash collision intentionally,
518        # so it falls back on __eq__().
519        #
520        # Line has the same thing.
521        return 0
522
523    def __eq__(self, other):
524        # "Magical" comparison with lines and strings.
525        # Typically I wouldn't use this, but it allows us to check if a line matches any check in a dict or list via
526        # the `in` operator.
527        if other is None:
528            return False
529        if isinstance(other, CheckCmd):
530            return self.regex == other.regex
531        if isinstance(other, Line):
532            return self.regex.match(other.text)
533        if isinstance(other, str):
534            return self.regex.match(other)
535        raise NotImplementedError
536
537    @staticmethod
538    def parse(line, checktype):
539        # type: (Line) -> CheckCmd
540        # Everything inside {{}} is a regular expression.
541        # Everything outside of it is a literal string.
542        # Split around {{...}}. Then every odd index will be a regex, and
543        # evens will be literals.
544        # Note that if {{...}} appears first we will get an empty string in
545        # the split array, so the {{...}} matches are always at odd indexes.
546        bracket_re = re.compile(
547            r"""
548                \{\{   # Two open brackets
549                (.*?)  # Nongreedy capture
550                \}\}   # Two close brackets
551            """,
552            re.VERBOSE,
553        )
554        pieces = bracket_re.split(line.text)
555        even = True
556        re_strings = []
557        for piece in pieces:
558            if even:
559                # piece is a literal string.
560                re_strings.append(re.escape(piece))
561            else:
562                # piece is a regex (found inside {{...}}).
563                # Verify the regex can be compiled.
564                try:
565                    re.compile(piece)
566                except re.error:
567                    raise CheckerError("Invalid regular expression: '%s'" % piece, line)
568                re_strings.append(piece)
569            even = not even
570        # Enclose each piece in a non-capturing group.
571        # This ensures that lower-precedence operators don't trip up catenation.
572        # For example: {{b|c}}d would result in /b|cd/ which is different.
573        # Backreferences are assumed to match across the entire string.
574        re_strings = ["(?:%s)" % s for s in re_strings]
575        # Anchor at beginning and end (allowing arbitrary whitespace), and maybe
576        # a terminating newline.
577        # We need the anchors because Python's match() matches an arbitrary prefix,
578        # not the entire string.
579        re_strings = [r"^\s*"] + re_strings + [r"\s*\n?$"]
580        full_re = re.compile("".join(re_strings))
581        return CheckCmd(line, checktype, full_re)
582
583
584class Checker(object):
585    def __init__(self, name, lines):
586        self.name = name
587        # Helper to yield subline containing group1 from all matching lines.
588        def group1s(regex):
589            for line in lines:
590                m = regex.match(line.text)
591                if m:
592                    yield line.subline(m.group(1))
593
594        # Find run commands.
595        self.runcmds = [RunCmd.parse(sl) for sl in group1s(RUN_RE)]
596        if not self.runcmds:
597            # If no RUN command has been given, fall back to the shebang.
598            if lines[0].text.startswith("#!"):
599                # Remove the "#!" at the beginning, and the newline at the end.
600                cmd = lines[0].text[2:-1]
601                if not find_command(cmd):
602                    raise CheckerError("Command could not be found: " + cmd)
603                self.runcmds = [RunCmd(cmd + " %s", lines[0])]
604            else:
605                raise CheckerError("No runlines ('# RUN') found")
606
607        self.requirecmds = [RunCmd.parse(sl) for sl in group1s(REQUIRES_RE)]
608
609        # Find check cmds.
610        self.outchecks = [
611            CheckCmd.parse(sl, "CHECK") for sl in group1s(CHECK_STDOUT_RE)
612        ]
613        self.errchecks = [
614            CheckCmd.parse(sl, "CHECKERR") for sl in group1s(CHECK_STDERR_RE)
615        ]
616
617
618def check_file(input_file, name, subs, config, failure_handler):
619    """ Check a single file. Return a True on success, False on error. """
620    success = True
621    lines = Line.readfile(input_file, name)
622    checker = Checker(name, lines)
623
624    # Run all the REQUIRES lines first,
625    # if any of them fail it's a SKIP
626    for reqcmd in checker.requirecmds:
627        proc = runproc(
628            perform_substitution(reqcmd.args, subs)
629        )
630        stdout, stderr = proc.communicate()
631        status = proc.returncode
632        if proc.returncode > 0:
633            return SKIP
634
635    # Only then run the RUN lines.
636    for runcmd in checker.runcmds:
637        failure = TestRun(name, runcmd, checker, subs, config).run()
638        if failure:
639            failure_handler(failure)
640            success = False
641    return success
642
643
644def check_path(path, subs, config, failure_handler):
645    with io.open(path, encoding="utf-8") as fd:
646        return check_file(fd, path, subs, config, failure_handler)
647
648
649def parse_subs(subs):
650    """Given a list of input substitutions like 'foo=bar',
651    return a dictionary like {foo:bar}, or exit if invalid.
652    """
653    result = {}
654    for sub in subs:
655        try:
656            key, val = sub.split("=", 1)
657            if not key:
658                print("Invalid substitution %s: empty key" % sub)
659                sys.exit(1)
660            if not val:
661                print("Invalid substitution %s: empty value" % sub)
662                sys.exit(1)
663            result[key] = val
664        except ValueError:
665            print("Invalid substitution %s: equal sign not found" % sub)
666            sys.exit(1)
667    return result
668
669
670def get_argparse():
671    """ Return a littlecheck argument parser. """
672    parser = argparse.ArgumentParser(
673        description="littlecheck: command line tool tester."
674    )
675    parser.add_argument(
676        "-s",
677        "--substitute",
678        type=str,
679        help="Add a new substitution for RUN lines. Example: bash=/bin/bash",
680        action="append",
681        default=[],
682    )
683    parser.add_argument(
684        "-p",
685        "--progress",
686        action="store_true",
687        dest="progress",
688        help="Show the files to be checked",
689        default=False,
690    )
691    parser.add_argument("file", nargs="+", help="File to check")
692    return parser
693
694
695def main():
696    args = get_argparse().parse_args()
697    # Default substitution is %% -> %
698    def_subs = {"%": "%"}
699    def_subs.update(parse_subs(args.substitute))
700
701    failure_count = 0
702    config = Config()
703    config.colorize = sys.stdout.isatty()
704    config.progress = args.progress
705    fields = config.colors()
706
707    for path in args.file:
708        fields["path"] = path
709        if config.progress:
710            print("Testing file {path} ... ".format(**fields), end="")
711            sys.stdout.flush()
712        subs = def_subs.copy()
713        subs["s"] = path
714        starttime = datetime.datetime.now()
715        ret = check_path(path, subs, config, TestFailure.print_message)
716        if not ret:
717            failure_count += 1
718        elif config.progress:
719            endtime = datetime.datetime.now()
720            duration_ms = round((endtime - starttime).total_seconds() * 1000)
721            reason = "ok"
722            color = "{GREEN}"
723            if ret is SKIP:
724                reason = "SKIPPED"
725                color = "{BLUE}"
726            print(
727                (color + "{reason}{RESET} ({duration} ms)").format(
728                    duration=duration_ms, reason=reason, **fields
729                )
730            )
731    sys.exit(failure_count)
732
733
734if __name__ == "__main__":
735    main()
736