1import errno
2import os
3import pty
4import re
5import select
6import subprocess
7import sys
8import tempfile
9import unittest
10
11from textwrap import dedent
12from bpython import args
13from bpython.config import getpreferredencoding
14from bpython.test import FixLanguageTestCase as TestCase
15
16
17def run_with_tty(command):
18    # based on https://stackoverflow.com/questions/52954248/capture-output-as-a-tty-in-python
19    master_stdout, slave_stdout = pty.openpty()
20    master_stderr, slave_stderr = pty.openpty()
21    master_stdin, slave_stdin = pty.openpty()
22
23    p = subprocess.Popen(
24        command,
25        stdout=slave_stdout,
26        stderr=slave_stderr,
27        stdin=slave_stdin,
28        close_fds=True,
29    )
30    for fd in (slave_stdout, slave_stderr, slave_stdin):
31        os.close(fd)
32
33    readable = [master_stdout, master_stderr]
34    result = {master_stdout: b"", master_stderr: b""}
35    try:
36        while readable:
37            ready, _, _ = select.select(readable, [], [], 1)
38            for fd in ready:
39                try:
40                    data = os.read(fd, 512)
41                except OSError as e:
42                    if e.errno != errno.EIO:
43                        raise
44                    # EIO means EOF on some systems
45                    readable.remove(fd)
46                else:
47                    if not data:  # EOF
48                        readable.remove(fd)
49                    result[fd] += data
50    finally:
51        for fd in (master_stdout, master_stderr, master_stdin):
52            os.close(fd)
53        if p.poll() is None:
54            p.kill()
55        p.wait()
56
57    if p.returncode:
58        raise RuntimeError(f"Subprocess exited with {p.returncode}")
59
60    return (
61        result[master_stdout].decode(getpreferredencoding()),
62        result[master_stderr].decode(getpreferredencoding()),
63    )
64
65
66class TestExecArgs(unittest.TestCase):
67    def test_exec_dunder_file(self):
68        with tempfile.NamedTemporaryFile(mode="w") as f:
69            f.write(
70                dedent(
71                    """\
72                import sys
73                sys.stderr.write(__file__)
74                sys.stderr.flush()"""
75                )
76            )
77            f.flush()
78            _, stderr = run_with_tty(
79                [sys.executable] + ["-m", "bpython.curtsies", f.name]
80            )
81            self.assertEqual(stderr.strip(), f.name)
82
83    def test_exec_nonascii_file(self):
84        with tempfile.NamedTemporaryFile(mode="w") as f:
85            f.write(
86                dedent(
87                    """\
88                # coding: utf-8
89                "你好 # nonascii"
90                """
91                )
92            )
93            f.flush()
94            _, stderr = run_with_tty(
95                [sys.executable, "-m", "bpython.curtsies", f.name],
96            )
97            self.assertEqual(len(stderr), 0)
98
99    def test_exec_nonascii_file_linenums(self):
100        with tempfile.NamedTemporaryFile(mode="w") as f:
101            f.write(
102                dedent(
103                    """\
104                1/0
105                """
106                )
107            )
108            f.flush()
109            _, stderr = run_with_tty(
110                [sys.executable, "-m", "bpython.curtsies", f.name],
111            )
112            self.assertIn("line 1", clean_colors(stderr))
113
114
115def clean_colors(s):
116    return re.sub(r"\x1b[^m]*m", "", s)
117
118
119class TestParse(TestCase):
120    def test_version(self):
121        with self.assertRaises(SystemExit):
122            args.parse(["--version"])
123