1# Common utility functions used by various script execution tests
2#  e.g. test_cmd_line, test_cmd_line_script and test_runpy
3
4import collections
5import importlib
6import sys
7import os
8import os.path
9import subprocess
10import py_compile
11import zipfile
12
13from importlib.util import source_from_cache
14from test import support
15from test.support.import_helper import make_legacy_pyc
16
17
18# Cached result of the expensive test performed in the function below.
19__cached_interp_requires_environment = None
20
21
22def interpreter_requires_environment():
23    """
24    Returns True if our sys.executable interpreter requires environment
25    variables in order to be able to run at all.
26
27    This is designed to be used with @unittest.skipIf() to annotate tests
28    that need to use an assert_python*() function to launch an isolated
29    mode (-I) or no environment mode (-E) sub-interpreter process.
30
31    A normal build & test does not run into this situation but it can happen
32    when trying to run the standard library test suite from an interpreter that
33    doesn't have an obvious home with Python's current home finding logic.
34
35    Setting PYTHONHOME is one way to get most of the testsuite to run in that
36    situation.  PYTHONPATH or PYTHONUSERSITE are other common environment
37    variables that might impact whether or not the interpreter can start.
38    """
39    global __cached_interp_requires_environment
40    if __cached_interp_requires_environment is None:
41        # If PYTHONHOME is set, assume that we need it
42        if 'PYTHONHOME' in os.environ:
43            __cached_interp_requires_environment = True
44            return True
45
46        # Try running an interpreter with -E to see if it works or not.
47        try:
48            subprocess.check_call([sys.executable, '-E',
49                                   '-c', 'import sys; sys.exit(0)'])
50        except subprocess.CalledProcessError:
51            __cached_interp_requires_environment = True
52        else:
53            __cached_interp_requires_environment = False
54
55    return __cached_interp_requires_environment
56
57
58class _PythonRunResult(collections.namedtuple("_PythonRunResult",
59                                          ("rc", "out", "err"))):
60    """Helper for reporting Python subprocess run results"""
61    def fail(self, cmd_line):
62        """Provide helpful details about failed subcommand runs"""
63        # Limit to 80 lines to ASCII characters
64        maxlen = 80 * 100
65        out, err = self.out, self.err
66        if len(out) > maxlen:
67            out = b'(... truncated stdout ...)' + out[-maxlen:]
68        if len(err) > maxlen:
69            err = b'(... truncated stderr ...)' + err[-maxlen:]
70        out = out.decode('ascii', 'replace').rstrip()
71        err = err.decode('ascii', 'replace').rstrip()
72        raise AssertionError("Process return code is %d\n"
73                             "command line: %r\n"
74                             "\n"
75                             "stdout:\n"
76                             "---\n"
77                             "%s\n"
78                             "---\n"
79                             "\n"
80                             "stderr:\n"
81                             "---\n"
82                             "%s\n"
83                             "---"
84                             % (self.rc, cmd_line,
85                                out,
86                                err))
87
88
89# Executing the interpreter in a subprocess
90def run_python_until_end(*args, **env_vars):
91    env_required = interpreter_requires_environment()
92    cwd = env_vars.pop('__cwd', None)
93    if '__isolated' in env_vars:
94        isolated = env_vars.pop('__isolated')
95    else:
96        isolated = not env_vars and not env_required
97    cmd_line = [sys.executable, '-X', 'faulthandler']
98    if isolated:
99        # isolated mode: ignore Python environment variables, ignore user
100        # site-packages, and don't add the current directory to sys.path
101        cmd_line.append('-I')
102    elif not env_vars and not env_required:
103        # ignore Python environment variables
104        cmd_line.append('-E')
105
106    # But a special flag that can be set to override -- in this case, the
107    # caller is responsible to pass the full environment.
108    if env_vars.pop('__cleanenv', None):
109        env = {}
110        if sys.platform == 'win32':
111            # Windows requires at least the SYSTEMROOT environment variable to
112            # start Python.
113            env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
114
115        # Other interesting environment variables, not copied currently:
116        # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP.
117    else:
118        # Need to preserve the original environment, for in-place testing of
119        # shared library builds.
120        env = os.environ.copy()
121
122    # set TERM='' unless the TERM environment variable is passed explicitly
123    # see issues #11390 and #18300
124    if 'TERM' not in env_vars:
125        env['TERM'] = ''
126
127    env.update(env_vars)
128    cmd_line.extend(args)
129    proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
130                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
131                         env=env, cwd=cwd)
132    with proc:
133        try:
134            out, err = proc.communicate()
135        finally:
136            proc.kill()
137            subprocess._cleanup()
138    rc = proc.returncode
139    return _PythonRunResult(rc, out, err), cmd_line
140
141
142def _assert_python(expected_success, /, *args, **env_vars):
143    res, cmd_line = run_python_until_end(*args, **env_vars)
144    if (res.rc and expected_success) or (not res.rc and not expected_success):
145        res.fail(cmd_line)
146    return res
147
148
149def assert_python_ok(*args, **env_vars):
150    """
151    Assert that running the interpreter with `args` and optional environment
152    variables `env_vars` succeeds (rc == 0) and return a (return code, stdout,
153    stderr) tuple.
154
155    If the __cleanenv keyword is set, env_vars is used as a fresh environment.
156
157    Python is started in isolated mode (command line option -I),
158    except if the __isolated keyword is set to False.
159    """
160    return _assert_python(True, *args, **env_vars)
161
162
163def assert_python_failure(*args, **env_vars):
164    """
165    Assert that running the interpreter with `args` and optional environment
166    variables `env_vars` fails (rc != 0) and return a (return code, stdout,
167    stderr) tuple.
168
169    See assert_python_ok() for more options.
170    """
171    return _assert_python(False, *args, **env_vars)
172
173
174def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
175    """Run a Python subprocess with the given arguments.
176
177    kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
178    object.
179    """
180    cmd_line = [sys.executable]
181    if not interpreter_requires_environment():
182        cmd_line.append('-E')
183    cmd_line.extend(args)
184    # Under Fedora (?), GNU readline can output junk on stderr when initialized,
185    # depending on the TERM setting.  Setting TERM=vt100 is supposed to disable
186    # that.  References:
187    # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
188    # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
189    # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html
190    env = kw.setdefault('env', dict(os.environ))
191    env['TERM'] = 'vt100'
192    return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
193                            stdout=stdout, stderr=stderr,
194                            **kw)
195
196
197def kill_python(p):
198    """Run the given Popen process until completion and return stdout."""
199    p.stdin.close()
200    data = p.stdout.read()
201    p.stdout.close()
202    # try to cleanup the child so we don't appear to leak when running
203    # with regrtest -R.
204    p.wait()
205    subprocess._cleanup()
206    return data
207
208
209def make_script(script_dir, script_basename, source, omit_suffix=False):
210    script_filename = script_basename
211    if not omit_suffix:
212        script_filename += os.extsep + 'py'
213    script_name = os.path.join(script_dir, script_filename)
214    # The script should be encoded to UTF-8, the default string encoding
215    with open(script_name, 'w', encoding='utf-8') as script_file:
216        script_file.write(source)
217    importlib.invalidate_caches()
218    return script_name
219
220
221def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
222    zip_filename = zip_basename+os.extsep+'zip'
223    zip_name = os.path.join(zip_dir, zip_filename)
224    with zipfile.ZipFile(zip_name, 'w') as zip_file:
225        if name_in_zip is None:
226            parts = script_name.split(os.sep)
227            if len(parts) >= 2 and parts[-2] == '__pycache__':
228                legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
229                name_in_zip = os.path.basename(legacy_pyc)
230                script_name = legacy_pyc
231            else:
232                name_in_zip = os.path.basename(script_name)
233        zip_file.write(script_name, name_in_zip)
234    #if test.support.verbose:
235    #    with zipfile.ZipFile(zip_name, 'r') as zip_file:
236    #        print 'Contents of %r:' % zip_name
237    #        zip_file.printdir()
238    return zip_name, os.path.join(zip_name, name_in_zip)
239
240
241def make_pkg(pkg_dir, init_source=''):
242    os.mkdir(pkg_dir)
243    make_script(pkg_dir, '__init__', init_source)
244
245
246def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
247                 source, depth=1, compiled=False):
248    unlink = []
249    init_name = make_script(zip_dir, '__init__', '')
250    unlink.append(init_name)
251    init_basename = os.path.basename(init_name)
252    script_name = make_script(zip_dir, script_basename, source)
253    unlink.append(script_name)
254    if compiled:
255        init_name = py_compile.compile(init_name, doraise=True)
256        script_name = py_compile.compile(script_name, doraise=True)
257        unlink.extend((init_name, script_name))
258    pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
259    script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
260    zip_filename = zip_basename+os.extsep+'zip'
261    zip_name = os.path.join(zip_dir, zip_filename)
262    with zipfile.ZipFile(zip_name, 'w') as zip_file:
263        for name in pkg_names:
264            init_name_in_zip = os.path.join(name, init_basename)
265            zip_file.write(init_name, init_name_in_zip)
266        zip_file.write(script_name, script_name_in_zip)
267    for name in unlink:
268        os.unlink(name)
269    #if test.support.verbose:
270    #    with zipfile.ZipFile(zip_name, 'r') as zip_file:
271    #        print 'Contents of %r:' % zip_name
272    #        zip_file.printdir()
273    return zip_name, os.path.join(zip_name, script_name_in_zip)
274
275
276def run_test_script(script):
277    # use -u to try to get the full output if the test hangs or crash
278    if support.verbose:
279        def title(text):
280            return f"===== {text} ======"
281
282        name = f"script {os.path.basename(script)}"
283        print()
284        print(title(name), flush=True)
285        # In verbose mode, the child process inherit stdout and stdout,
286        # to see output in realtime and reduce the risk of losing output.
287        args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"]
288        proc = subprocess.run(args)
289        print(title(f"{name} completed: exit code {proc.returncode}"),
290              flush=True)
291        if proc.returncode:
292            raise AssertionError(f"{name} failed")
293    else:
294        assert_python_ok("-u", script, "-v")
295