1import difflib
2import os
3import re
4import shlex
5import subprocess
6import time
7from typing import Callable, Iterable, Iterator, List, Optional, Tuple
8
9import pexpect
10import pytest
11
12PS1 = "/@"
13MAGIC_MARK = "__MaGiC-maRKz!__"
14
15
16def find_unique_completion_pair(
17    items: Iterable[str],
18) -> Optional[Tuple[str, str]]:
19    result = None
20    bestscore = 0
21    sitems = sorted(set(items))
22    for i in range(len(sitems)):
23        cur = sitems[i]
24        curlen = len(cur)
25        prv = sitems[i - 1] if i != 0 else ""
26        prvlen = len(prv)
27        nxt = sitems[i + 1] if i < len(sitems) - 1 else ""
28        nxtlen = len(nxt)
29        diffprv = prv == ""
30        diffnxt = nxt == ""
31        # Analyse each item of the list and look for the minimum length of the
32        # partial prefix which is distinct from both nxt and prv. The list
33        # is sorted so the prefix will be unique in the entire list.
34        for j in range(curlen):
35            curchar = cur[j]
36            if not diffprv and (j >= prvlen or prv[j] != curchar):
37                diffprv = True
38            if not diffnxt and (j >= nxtlen or nxt[j] != curchar):
39                diffnxt = True
40            if diffprv and diffnxt:
41                break
42        # At the end of the loop, j is the index of last character of
43        # the unique partial prefix. The length is one plus that.
44        parlen = j + 1
45        if parlen >= curlen:
46            continue
47        # Try to find the most "readable pair"; look for a long pair where
48        # part is about half of full.
49        if parlen < curlen / 2:
50            parlen = int(curlen / 2)
51        score = curlen - parlen
52        if score > bestscore:
53            bestscore = score
54            result = (cur[:parlen], cur)
55    return result
56
57
58@pytest.fixture(scope="class")
59def output_sort_uniq(bash: pexpect.spawn) -> Callable[[str], List[str]]:
60    def _output_sort_uniq(command: str) -> List[str]:
61        return sorted(
62            set(  # weed out possible duplicates
63                assert_bash_exec(bash, command, want_output=True).split()
64            )
65        )
66
67    return _output_sort_uniq
68
69
70@pytest.fixture(scope="class")
71def part_full_user(
72    bash: pexpect.spawn, output_sort_uniq: Callable[[str], List[str]]
73) -> Optional[Tuple[str, str]]:
74    res = output_sort_uniq("compgen -u")
75    pair = find_unique_completion_pair(res)
76    if not pair:
77        pytest.skip("No suitable test user found")
78    return pair
79
80
81@pytest.fixture(scope="class")
82def part_full_group(
83    bash: pexpect.spawn, output_sort_uniq: Callable[[str], List[str]]
84) -> Optional[Tuple[str, str]]:
85    res = output_sort_uniq("compgen -g")
86    pair = find_unique_completion_pair(res)
87    if not pair:
88        pytest.skip("No suitable test user found")
89    return pair
90
91
92@pytest.fixture(scope="class")
93def hosts(bash: pexpect.spawn) -> List[str]:
94    output = assert_bash_exec(bash, "compgen -A hostname", want_output=True)
95    return sorted(set(output.split() + _avahi_hosts(bash)))
96
97
98@pytest.fixture(scope="class")
99def avahi_hosts(bash: pexpect.spawn) -> List[str]:
100    return _avahi_hosts(bash)
101
102
103def _avahi_hosts(bash: pexpect.spawn) -> List[str]:
104    output = assert_bash_exec(
105        bash,
106        "! type avahi-browse &>/dev/null || "
107        "avahi-browse -cpr _workstation._tcp 2>/dev/null "
108        "| command grep ^= | cut -d';' -f7",
109        want_output=None,
110    )
111    return sorted(set(output.split()))
112
113
114@pytest.fixture(scope="class")
115def known_hosts(bash: pexpect.spawn) -> List[str]:
116    output = assert_bash_exec(
117        bash,
118        '_known_hosts_real ""; '
119        r'printf "%s\n" "${COMPREPLY[@]}"; unset COMPREPLY',
120        want_output=True,
121    )
122    return sorted(set(output.split()))
123
124
125@pytest.fixture(scope="class")
126def user_home(bash: pexpect.spawn) -> Tuple[str, str]:
127    user = assert_bash_exec(
128        bash, 'id -un 2>/dev/null || echo "$USER"', want_output=True
129    ).strip()
130    home = assert_bash_exec(bash, 'echo "$HOME"', want_output=True).strip()
131    return (user, home)
132
133
134def partialize(
135    bash: pexpect.spawn, items: Iterable[str]
136) -> Tuple[str, List[str]]:
137    """
138    Get list of items starting with the first char of first of items.
139
140    Disregard items starting with a COMP_WORDBREAKS character
141    (e.g. a colon ~ IPv6 address), they are special cases requiring
142    special tests.
143    """
144    first_char = None
145    comp_wordbreaks = assert_bash_exec(
146        bash,
147        'printf "%s" "$COMP_WORDBREAKS"',
148        want_output=True,
149        want_newline=False,
150    )
151    partial_items = []
152    for item in sorted(items):
153        if first_char is None:
154            if item[0] not in comp_wordbreaks:
155                first_char = item[0]
156                partial_items.append(item)
157        elif item.startswith(first_char):
158            partial_items.append(item)
159        else:
160            break
161    if first_char is None:
162        pytest.skip("Could not generate partial items list from %s" % items)
163        # superfluous/dead code to assist mypy; pytest.skip always raises
164        assert first_char is not None
165    return first_char, partial_items
166
167
168@pytest.fixture(scope="class")
169def bash(request) -> pexpect.spawn:
170
171    logfile = None
172    if os.environ.get("BASHCOMP_TEST_LOGFILE"):
173        logfile = open(os.environ["BASHCOMP_TEST_LOGFILE"], "w")
174    testdir = os.path.abspath(
175        os.path.join(os.path.dirname(__file__), os.pardir)
176    )
177    env = os.environ.copy()
178    env.update(
179        dict(
180            SRCDIR=testdir,  # TODO needed at least by bashrc
181            SRCDIRABS=testdir,  # TODO needed?
182            PS1=PS1,
183            INPUTRC="%s/config/inputrc" % testdir,
184            TERM="dumb",
185            LC_COLLATE="C",  # to match Python's default locale unaware sort
186        )
187    )
188
189    fixturesdir = os.path.join(testdir, "fixtures")
190    os.chdir(fixturesdir)
191
192    # Start bash
193    bash = pexpect.spawn(
194        "%s --norc" % os.environ.get("BASHCOMP_TEST_BASH", "bash"),
195        maxread=os.environ.get("BASHCOMP_TEST_PEXPECT_MAXREAD", 20000),
196        logfile=logfile,
197        cwd=fixturesdir,
198        env=env,
199        encoding="utf-8",  # TODO? or native or...?
200        # FIXME: Tests shouldn't depend on dimensions, but it's difficult to
201        # expect robustly enough for Bash to wrap lines anywhere (e.g. inside
202        # MAGIC_MARK).  Increase window width to reduce wrapping.
203        dimensions=(24, 160),
204        # TODO? codec_errors="replace",
205    )
206    bash.expect_exact(PS1)
207
208    # Load bashrc and bash_completion
209    assert_bash_exec(bash, "source '%s/config/bashrc'" % testdir)
210    assert_bash_exec(bash, "source '%s/../bash_completion'" % testdir)
211
212    # Use command name from marker if set, or grab from test filename
213    cmd = None  # type: Optional[str]
214    cmd_found = False
215    marker = request.node.get_closest_marker("bashcomp")
216    if marker:
217        cmd = marker.kwargs.get("cmd")
218        cmd_found = "cmd" in marker.kwargs
219        # Run pre-test commands, early so they're usable in skipif
220        for pre_cmd in marker.kwargs.get("pre_cmds", []):
221            assert_bash_exec(bash, pre_cmd)
222        # Process skip and xfail conditions
223        skipif = marker.kwargs.get("skipif")
224        if skipif:
225            try:
226                assert_bash_exec(bash, skipif, want_output=None)
227            except AssertionError:
228                pass
229            else:
230                bash.close()
231                pytest.skip(skipif)
232        xfail = marker.kwargs.get("xfail")
233        if xfail:
234            try:
235                assert_bash_exec(bash, xfail, want_output=None)
236            except AssertionError:
237                pass
238            else:
239                pytest.xfail(xfail)
240    if not cmd_found:
241        match = re.search(
242            r"^test_(.+)\.py$", os.path.basename(str(request.fspath))
243        )
244        if match:
245            cmd = match.group(1)
246
247    request.cls.cmd = cmd
248
249    if (cmd_found and cmd is None) or is_testable(bash, cmd):
250        before_env = get_env(bash)
251        yield bash
252        # Not exactly sure why, but some errors leave bash in state where
253        # getting the env here would fail and trash our test output. So
254        # reset to a good state first (Ctrl+C, expect prompt).
255        bash.sendintr()
256        bash.expect_exact(PS1)
257        diff_env(
258            before_env,
259            get_env(bash),
260            marker.kwargs.get("ignore_env") if marker else "",
261        )
262
263    if marker:
264        for post_cmd in marker.kwargs.get("post_cmds", []):
265            assert_bash_exec(bash, post_cmd)
266
267    # Clean up
268    bash.close()
269    if logfile:
270        logfile.close()
271
272
273def is_testable(bash: pexpect.spawn, cmd: Optional[str]) -> bool:
274    if not cmd:
275        pytest.fail("Could not resolve name of command to test")
276        return False
277    if not load_completion_for(bash, cmd):
278        pytest.skip("No completion for command %s" % cmd)
279    return True
280
281
282def is_bash_type(bash: pexpect.spawn, cmd: Optional[str]) -> bool:
283    if not cmd:
284        return False
285    typecmd = "type %s &>/dev/null && echo -n 0 || echo -n 1" % cmd
286    bash.sendline(typecmd)
287    bash.expect_exact(typecmd + "\r\n")
288    result = bash.expect_exact(["0", "1"]) == 0
289    bash.expect_exact(PS1)
290    return result
291
292
293def load_completion_for(bash: pexpect.spawn, cmd: str) -> bool:
294    try:
295        # Allow __load_completion to fail so we can test completions
296        # that are directly loaded in bash_completion without a separate file.
297        assert_bash_exec(bash, "__load_completion %s || :" % cmd)
298        assert_bash_exec(bash, "complete -p %s &>/dev/null" % cmd)
299    except AssertionError:
300        return False
301    return True
302
303
304def assert_bash_exec(
305    bash: pexpect.spawn,
306    cmd: str,
307    want_output: Optional[bool] = False,
308    want_newline=True,
309) -> str:
310    """
311    :param want_output: if None, don't care if got output or not
312    """
313
314    # Send command
315    bash.sendline(cmd)
316    bash.expect_exact(cmd)
317
318    # Find prompt, output is before it
319    bash.expect_exact("%s%s" % ("\r\n" if want_newline else "", PS1))
320    output = bash.before
321
322    # Retrieve exit status
323    echo = "echo $?"
324    bash.sendline(echo)
325    got = bash.expect(
326        [
327            r"^%s\r\n(\d+)\r\n%s" % (re.escape(echo), re.escape(PS1)),
328            PS1,
329            pexpect.EOF,
330            pexpect.TIMEOUT,
331        ]
332    )
333    status = bash.match.group(1) if got == 0 else "unknown"
334
335    assert status == "0", 'Error running "%s": exit status=%s, output="%s"' % (
336        cmd,
337        status,
338        output,
339    )
340    if want_output is not None:
341        if output:
342            assert want_output, (
343                'Unexpected output from "%s": exit status=%s, output="%s"'
344                % (cmd, status, output)
345            )
346        else:
347            assert not want_output, (
348                'Expected output from "%s": exit status=%s, output="%s"'
349                % (cmd, status, output)
350            )
351
352    return output
353
354
355def get_env(bash: pexpect.spawn) -> List[str]:
356    return (
357        assert_bash_exec(
358            bash,
359            "{ (set -o posix ; set); declare -F; shopt -p; set -o; }",
360            want_output=True,
361        )
362        .strip()
363        .splitlines()
364    )
365
366
367def diff_env(before: List[str], after: List[str], ignore: str):
368    diff = [
369        x
370        for x in difflib.unified_diff(before, after, n=0, lineterm="")
371        # Remove unified diff markers:
372        if not re.search(r"^(---|\+\+\+|@@ )", x)
373        # Ignore variables expected to change:
374        and not re.search("^[-+](_|PPID|BASH_REMATCH|OLDPWD)=", x)
375        # Ignore likely completion functions added by us:
376        and not re.search(r"^\+declare -f _.+", x)
377        # ...and additional specified things:
378        and not re.search(ignore or "^$", x)
379    ]
380    # For some reason, COMP_WORDBREAKS gets added to the list after
381    # saving. Remove its changes, and note that it may take two lines.
382    for i in range(0, len(diff)):
383        if re.match("^[-+]COMP_WORDBREAKS=", diff[i]):
384            if i < len(diff) and not re.match(r"^\+[\w]+=", diff[i + 1]):
385                del diff[i + 1]
386            del diff[i]
387            break
388    assert not diff, "Environment should not be modified"
389
390
391class CompletionResult(Iterable[str]):
392    """
393    Class to hold completion results.
394    """
395
396    def __init__(self, output: Optional[str] = None):
397        """
398        :param output: All completion output as-is.
399        """
400        self.output = output or ""
401
402    def endswith(self, suffix: str) -> bool:
403        return self.output.endswith(suffix)
404
405    def startswith(self, prefix: str) -> bool:
406        return self.output.startswith(prefix)
407
408    def _items(self) -> List[str]:
409        return [x.strip() for x in self.output.strip().splitlines()]
410
411    def __eq__(self, expected: object) -> bool:
412        """
413        Returns True if completion contains expected items, and no others.
414
415        Defining __eq__ this way is quite ugly, but facilitates concise
416        testing code.
417        """
418        if isinstance(expected, str):
419            expiter = [expected]  # type: Iterable
420        elif not isinstance(expected, Iterable):
421            return False
422        else:
423            expiter = expected
424        return self._items() == expiter
425
426    def __contains__(self, item: str) -> bool:
427        return item in self._items()
428
429    def __iter__(self) -> Iterator[str]:
430        return iter(self._items())
431
432    def __len__(self) -> int:
433        return len(self._items())
434
435    def __repr__(self) -> str:
436        return "<CompletionResult %s>" % self._items()
437
438
439def assert_complete(
440    bash: pexpect.spawn, cmd: str, **kwargs
441) -> CompletionResult:
442    skipif = kwargs.get("skipif")
443    if skipif:
444        try:
445            assert_bash_exec(bash, skipif, want_output=None)
446        except AssertionError:
447            pass
448        else:
449            pytest.skip(skipif)
450    xfail = kwargs.get("xfail")
451    if xfail:
452        try:
453            assert_bash_exec(bash, xfail, want_output=None)
454        except AssertionError:
455            pass
456        else:
457            pytest.xfail(xfail)
458    cwd = kwargs.get("cwd")
459    if cwd:
460        assert_bash_exec(bash, "cd '%s'" % cwd)
461    env_prefix = "_BASHCOMP_TEST_"
462    env = kwargs.get("env", {})
463    if env:
464        # Back up environment and apply new one
465        assert_bash_exec(
466            bash,
467            " ".join('%s%s="${%s-}"' % (env_prefix, k, k) for k in env.keys()),
468        )
469        assert_bash_exec(
470            bash,
471            "export %s" % " ".join("%s=%s" % (k, v) for k, v in env.items()),
472        )
473    try:
474        bash.send(cmd + "\t")
475        # Sleep a bit if requested, to avoid `.*` matching too early
476        time.sleep(kwargs.get("sleep_after_tab", 0))
477        bash.expect_exact(cmd)
478        bash.send(MAGIC_MARK)
479        got = bash.expect(
480            [
481                # 0: multiple lines, result in .before
482                r"\r\n" + re.escape(PS1 + cmd) + ".*" + re.escape(MAGIC_MARK),
483                # 1: no completion
484                r"^" + re.escape(MAGIC_MARK),
485                # 2: on same line, result in .match
486                r"^([^\r]+)%s$" % re.escape(MAGIC_MARK),
487                pexpect.EOF,
488                pexpect.TIMEOUT,
489            ]
490        )
491        if got == 0:
492            output = bash.before
493            if output.endswith(MAGIC_MARK):
494                output = bash.before[: -len(MAGIC_MARK)]
495            result = CompletionResult(output)
496        elif got == 2:
497            output = bash.match.group(1)
498            result = CompletionResult(output)
499        else:
500            # TODO: warn about EOF/TIMEOUT?
501            result = CompletionResult()
502    finally:
503        bash.sendintr()
504        bash.expect_exact(PS1)
505        if env:
506            # Restore environment, and clean up backup
507            # TODO: Test with declare -p if a var was set, backup only if yes, and
508            #       similarly restore only backed up vars. Should remove some need
509            #       for ignore_env.
510            assert_bash_exec(
511                bash,
512                "export %s"
513                % " ".join(
514                    '%s="$%s%s"' % (k, env_prefix, k) for k in env.keys()
515                ),
516            )
517            assert_bash_exec(
518                bash,
519                "unset -v %s"
520                % " ".join("%s%s" % (env_prefix, k) for k in env.keys()),
521            )
522        if cwd:
523            assert_bash_exec(bash, "cd - >/dev/null")
524    return result
525
526
527@pytest.fixture
528def completion(request, bash: pexpect.spawn) -> CompletionResult:
529    marker = request.node.get_closest_marker("complete")
530    if not marker:
531        return CompletionResult()
532    for pre_cmd in marker.kwargs.get("pre_cmds", []):
533        assert_bash_exec(bash, pre_cmd)
534    cmd = getattr(request.cls, "cmd", None)
535    if marker.kwargs.get("require_longopt"):
536        # longopt completions require both command presence and that it
537        # responds something useful to --help
538        if "require_cmd" not in marker.kwargs:
539            marker.kwargs["require_cmd"] = True
540        if "xfail" not in marker.kwargs:
541            marker.kwargs["xfail"] = (
542                "! %s --help &>/dev/null || "
543                "! %s --help 2>&1 | command grep -qF -- --help"
544            ) % ((cmd,) * 2)
545    if marker.kwargs.get("require_cmd") and not is_bash_type(bash, cmd):
546        pytest.skip("Command not found")
547
548    if "trail" in marker.kwargs:
549        return assert_complete_at_point(
550            bash, cmd=marker.args[0], trail=marker.kwargs["trail"]
551        )
552
553    return assert_complete(bash, marker.args[0], **marker.kwargs)
554
555
556def assert_complete_at_point(
557    bash: pexpect.spawn, cmd: str, trail: str
558) -> CompletionResult:
559    # TODO: merge to assert_complete
560    fullcmd = "%s%s%s" % (
561        cmd,
562        trail,
563        "\002" * len(trail),
564    )  # \002 = ^B = cursor left
565    bash.send(fullcmd + "\t")
566    bash.send(MAGIC_MARK)
567    bash.expect_exact(fullcmd.replace("\002", "\b"))
568
569    got = bash.expect_exact(
570        [
571            # 0: multiple lines, result in .before
572            PS1 + fullcmd.replace("\002", "\b"),
573            # 1: no completion
574            MAGIC_MARK,
575            pexpect.EOF,
576            pexpect.TIMEOUT,
577        ]
578    )
579    if got == 0:
580        output = bash.before
581        result = CompletionResult(output)
582
583        # At this point, something weird happens. For most test setups, as
584        # expected (pun intended!), MAGIC_MARK follows as is. But for some
585        # others (e.g. CentOS 6, Ubuntu 14 test containers), we get MAGIC_MARK
586        # one character a time, followed each time by trail and the corresponding
587        # number of \b's. Don't know why, but accept it until/if someone finds out.
588        # Or just be fine with it indefinitely, the visible and practical end
589        # result on a terminal is the same anyway.
590        repeat = "(%s%s)?" % (re.escape(trail), "\b" * len(trail))
591        fullexpected = "".join(
592            "%s%s" % (re.escape(x), repeat) for x in MAGIC_MARK
593        )
594        bash.expect(fullexpected)
595    else:
596        # TODO: warn about EOF/TIMEOUT?
597        result = CompletionResult()
598
599    return result
600
601
602def in_container() -> bool:
603    try:
604        container = subprocess.check_output(
605            "virt-what || systemd-detect-virt --container",
606            stderr=subprocess.DEVNULL,
607            shell=True,
608        ).strip()
609    except subprocess.CalledProcessError:
610        container = b""
611    if container and container != b"none":
612        return True
613    if os.path.exists("/.dockerenv"):
614        return True
615    try:
616        with open("/proc/1/environ", "rb") as f:
617            # LXC, others?
618            if any(
619                x.startswith(b"container=") for x in f.readline().split(b"\0")
620            ):
621                return True
622    except OSError:
623        pass
624    return False
625
626
627class TestUnitBase:
628    def _test_unit(
629        self, func, bash, comp_words, comp_cword, comp_line, comp_point, arg=""
630    ):
631        assert_bash_exec(
632            bash,
633            "COMP_WORDS=%s COMP_CWORD=%d COMP_LINE=%s COMP_POINT=%d"
634            % (comp_words, comp_cword, shlex.quote(comp_line), comp_point),
635        )
636        output = assert_bash_exec(bash, func % arg, want_output=True)
637        return output.strip()
638