1import unittest
2from unittest import mock
3from test import support
4from test.support import import_helper
5from test.support import os_helper
6from test.support import warnings_helper
7import subprocess
8import sys
9import signal
10import io
11import itertools
12import os
13import errno
14import tempfile
15import time
16import traceback
17import types
18import selectors
19import sysconfig
20import select
21import shutil
22import threading
23import gc
24import textwrap
25import json
26import pathlib
27from test.support.os_helper import FakePath
28
29try:
30    import _testcapi
31except ImportError:
32    _testcapi = None
33
34try:
35    import pwd
36except ImportError:
37    pwd = None
38try:
39    import grp
40except ImportError:
41    grp = None
42
43try:
44    import fcntl
45except:
46    fcntl = None
47
48if support.PGO:
49    raise unittest.SkipTest("test is not helpful for PGO")
50
51mswindows = (sys.platform == "win32")
52
53#
54# Depends on the following external programs: Python
55#
56
57if mswindows:
58    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
59                                                'os.O_BINARY);')
60else:
61    SETBINARY = ''
62
63NONEXISTING_CMD = ('nonexisting_i_hope',)
64# Ignore errors that indicate the command was not found
65NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError)
66
67ZERO_RETURN_CMD = (sys.executable, '-c', 'pass')
68
69
70def setUpModule():
71    shell_true = shutil.which('true')
72    if shell_true is None:
73        return
74    if (os.access(shell_true, os.X_OK) and
75        subprocess.run([shell_true]).returncode == 0):
76        global ZERO_RETURN_CMD
77        ZERO_RETURN_CMD = (shell_true,)  # Faster than Python startup.
78
79
80class BaseTestCase(unittest.TestCase):
81    def setUp(self):
82        # Try to minimize the number of children we have so this test
83        # doesn't crash on some buildbots (Alphas in particular).
84        support.reap_children()
85
86    def tearDown(self):
87        if not mswindows:
88            # subprocess._active is not used on Windows and is set to None.
89            for inst in subprocess._active:
90                inst.wait()
91            subprocess._cleanup()
92            self.assertFalse(
93                subprocess._active, "subprocess._active not empty"
94            )
95        self.doCleanups()
96        support.reap_children()
97
98
99class PopenTestException(Exception):
100    pass
101
102
103class PopenExecuteChildRaises(subprocess.Popen):
104    """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
105    _execute_child fails.
106    """
107    def _execute_child(self, *args, **kwargs):
108        raise PopenTestException("Forced Exception for Test")
109
110
111class ProcessTestCase(BaseTestCase):
112
113    def test_io_buffered_by_default(self):
114        p = subprocess.Popen(ZERO_RETURN_CMD,
115                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
116                             stderr=subprocess.PIPE)
117        try:
118            self.assertIsInstance(p.stdin, io.BufferedIOBase)
119            self.assertIsInstance(p.stdout, io.BufferedIOBase)
120            self.assertIsInstance(p.stderr, io.BufferedIOBase)
121        finally:
122            p.stdin.close()
123            p.stdout.close()
124            p.stderr.close()
125            p.wait()
126
127    def test_io_unbuffered_works(self):
128        p = subprocess.Popen(ZERO_RETURN_CMD,
129                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
130                             stderr=subprocess.PIPE, bufsize=0)
131        try:
132            self.assertIsInstance(p.stdin, io.RawIOBase)
133            self.assertIsInstance(p.stdout, io.RawIOBase)
134            self.assertIsInstance(p.stderr, io.RawIOBase)
135        finally:
136            p.stdin.close()
137            p.stdout.close()
138            p.stderr.close()
139            p.wait()
140
141    def test_call_seq(self):
142        # call() function with sequence argument
143        rc = subprocess.call([sys.executable, "-c",
144                              "import sys; sys.exit(47)"])
145        self.assertEqual(rc, 47)
146
147    def test_call_timeout(self):
148        # call() function with timeout argument; we want to test that the child
149        # process gets killed when the timeout expires.  If the child isn't
150        # killed, this call will deadlock since subprocess.call waits for the
151        # child.
152        self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
153                          [sys.executable, "-c", "while True: pass"],
154                          timeout=0.1)
155
156    def test_check_call_zero(self):
157        # check_call() function with zero return code
158        rc = subprocess.check_call(ZERO_RETURN_CMD)
159        self.assertEqual(rc, 0)
160
161    def test_check_call_nonzero(self):
162        # check_call() function with non-zero return code
163        with self.assertRaises(subprocess.CalledProcessError) as c:
164            subprocess.check_call([sys.executable, "-c",
165                                   "import sys; sys.exit(47)"])
166        self.assertEqual(c.exception.returncode, 47)
167
168    def test_check_output(self):
169        # check_output() function with zero return code
170        output = subprocess.check_output(
171                [sys.executable, "-c", "print('BDFL')"])
172        self.assertIn(b'BDFL', output)
173
174        with self.assertRaisesRegex(ValueError,
175                "stdout argument not allowed, it will be overridden"):
176            subprocess.check_output([], stdout=None)
177
178        with self.assertRaisesRegex(ValueError,
179                "check argument not allowed, it will be overridden"):
180            subprocess.check_output([], check=False)
181
182    def test_check_output_nonzero(self):
183        # check_call() function with non-zero return code
184        with self.assertRaises(subprocess.CalledProcessError) as c:
185            subprocess.check_output(
186                    [sys.executable, "-c", "import sys; sys.exit(5)"])
187        self.assertEqual(c.exception.returncode, 5)
188
189    def test_check_output_stderr(self):
190        # check_output() function stderr redirected to stdout
191        output = subprocess.check_output(
192                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
193                stderr=subprocess.STDOUT)
194        self.assertIn(b'BDFL', output)
195
196    def test_check_output_stdin_arg(self):
197        # check_output() can be called with stdin set to a file
198        tf = tempfile.TemporaryFile()
199        self.addCleanup(tf.close)
200        tf.write(b'pear')
201        tf.seek(0)
202        output = subprocess.check_output(
203                [sys.executable, "-c",
204                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
205                stdin=tf)
206        self.assertIn(b'PEAR', output)
207
208    def test_check_output_input_arg(self):
209        # check_output() can be called with input set to a string
210        output = subprocess.check_output(
211                [sys.executable, "-c",
212                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
213                input=b'pear')
214        self.assertIn(b'PEAR', output)
215
216    def test_check_output_input_none(self):
217        """input=None has a legacy meaning of input='' on check_output."""
218        output = subprocess.check_output(
219                [sys.executable, "-c",
220                 "import sys; print('XX' if sys.stdin.read() else '')"],
221                input=None)
222        self.assertNotIn(b'XX', output)
223
224    def test_check_output_input_none_text(self):
225        output = subprocess.check_output(
226                [sys.executable, "-c",
227                 "import sys; print('XX' if sys.stdin.read() else '')"],
228                input=None, text=True)
229        self.assertNotIn('XX', output)
230
231    def test_check_output_input_none_universal_newlines(self):
232        output = subprocess.check_output(
233                [sys.executable, "-c",
234                 "import sys; print('XX' if sys.stdin.read() else '')"],
235                input=None, universal_newlines=True)
236        self.assertNotIn('XX', output)
237
238    def test_check_output_stdout_arg(self):
239        # check_output() refuses to accept 'stdout' argument
240        with self.assertRaises(ValueError) as c:
241            output = subprocess.check_output(
242                    [sys.executable, "-c", "print('will not be run')"],
243                    stdout=sys.stdout)
244            self.fail("Expected ValueError when stdout arg supplied.")
245        self.assertIn('stdout', c.exception.args[0])
246
247    def test_check_output_stdin_with_input_arg(self):
248        # check_output() refuses to accept 'stdin' with 'input'
249        tf = tempfile.TemporaryFile()
250        self.addCleanup(tf.close)
251        tf.write(b'pear')
252        tf.seek(0)
253        with self.assertRaises(ValueError) as c:
254            output = subprocess.check_output(
255                    [sys.executable, "-c", "print('will not be run')"],
256                    stdin=tf, input=b'hare')
257            self.fail("Expected ValueError when stdin and input args supplied.")
258        self.assertIn('stdin', c.exception.args[0])
259        self.assertIn('input', c.exception.args[0])
260
261    def test_check_output_timeout(self):
262        # check_output() function with timeout arg
263        with self.assertRaises(subprocess.TimeoutExpired) as c:
264            output = subprocess.check_output(
265                    [sys.executable, "-c",
266                     "import sys, time\n"
267                     "sys.stdout.write('BDFL')\n"
268                     "sys.stdout.flush()\n"
269                     "time.sleep(3600)"],
270                    # Some heavily loaded buildbots (sparc Debian 3.x) require
271                    # this much time to start and print.
272                    timeout=3)
273            self.fail("Expected TimeoutExpired.")
274        self.assertEqual(c.exception.output, b'BDFL')
275
276    def test_call_kwargs(self):
277        # call() function with keyword args
278        newenv = os.environ.copy()
279        newenv["FRUIT"] = "banana"
280        rc = subprocess.call([sys.executable, "-c",
281                              'import sys, os;'
282                              'sys.exit(os.getenv("FRUIT")=="banana")'],
283                             env=newenv)
284        self.assertEqual(rc, 1)
285
286    def test_invalid_args(self):
287        # Popen() called with invalid arguments should raise TypeError
288        # but Popen.__del__ should not complain (issue #12085)
289        with support.captured_stderr() as s:
290            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
291            argcount = subprocess.Popen.__init__.__code__.co_argcount
292            too_many_args = [0] * (argcount + 1)
293            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
294        self.assertEqual(s.getvalue(), '')
295
296    def test_stdin_none(self):
297        # .stdin is None when not redirected
298        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
299                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
300        self.addCleanup(p.stdout.close)
301        self.addCleanup(p.stderr.close)
302        p.wait()
303        self.assertEqual(p.stdin, None)
304
305    def test_stdout_none(self):
306        # .stdout is None when not redirected, and the child's stdout will
307        # be inherited from the parent.  In order to test this we run a
308        # subprocess in a subprocess:
309        # this_test
310        #   \-- subprocess created by this test (parent)
311        #          \-- subprocess created by the parent subprocess (child)
312        # The parent doesn't specify stdout, so the child will use the
313        # parent's stdout.  This test checks that the message printed by the
314        # child goes to the parent stdout.  The parent also checks that the
315        # child's stdout is None.  See #11963.
316        code = ('import sys; from subprocess import Popen, PIPE;'
317                'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
318                '          stdin=PIPE, stderr=PIPE);'
319                'p.wait(); assert p.stdout is None;')
320        p = subprocess.Popen([sys.executable, "-c", code],
321                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
322        self.addCleanup(p.stdout.close)
323        self.addCleanup(p.stderr.close)
324        out, err = p.communicate()
325        self.assertEqual(p.returncode, 0, err)
326        self.assertEqual(out.rstrip(), b'test_stdout_none')
327
328    def test_stderr_none(self):
329        # .stderr is None when not redirected
330        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
331                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
332        self.addCleanup(p.stdout.close)
333        self.addCleanup(p.stdin.close)
334        p.wait()
335        self.assertEqual(p.stderr, None)
336
337    def _assert_python(self, pre_args, **kwargs):
338        # We include sys.exit() to prevent the test runner from hanging
339        # whenever python is found.
340        args = pre_args + ["import sys; sys.exit(47)"]
341        p = subprocess.Popen(args, **kwargs)
342        p.wait()
343        self.assertEqual(47, p.returncode)
344
345    def test_executable(self):
346        # Check that the executable argument works.
347        #
348        # On Unix (non-Mac and non-Windows), Python looks at args[0] to
349        # determine where its standard library is, so we need the directory
350        # of args[0] to be valid for the Popen() call to Python to succeed.
351        # See also issue #16170 and issue #7774.
352        doesnotexist = os.path.join(os.path.dirname(sys.executable),
353                                    "doesnotexist")
354        self._assert_python([doesnotexist, "-c"], executable=sys.executable)
355
356    def test_bytes_executable(self):
357        doesnotexist = os.path.join(os.path.dirname(sys.executable),
358                                    "doesnotexist")
359        self._assert_python([doesnotexist, "-c"],
360                            executable=os.fsencode(sys.executable))
361
362    def test_pathlike_executable(self):
363        doesnotexist = os.path.join(os.path.dirname(sys.executable),
364                                    "doesnotexist")
365        self._assert_python([doesnotexist, "-c"],
366                            executable=FakePath(sys.executable))
367
368    def test_executable_takes_precedence(self):
369        # Check that the executable argument takes precedence over args[0].
370        #
371        # Verify first that the call succeeds without the executable arg.
372        pre_args = [sys.executable, "-c"]
373        self._assert_python(pre_args)
374        self.assertRaises(NONEXISTING_ERRORS,
375                          self._assert_python, pre_args,
376                          executable=NONEXISTING_CMD[0])
377
378    @unittest.skipIf(mswindows, "executable argument replaces shell")
379    def test_executable_replaces_shell(self):
380        # Check that the executable argument replaces the default shell
381        # when shell=True.
382        self._assert_python([], executable=sys.executable, shell=True)
383
384    @unittest.skipIf(mswindows, "executable argument replaces shell")
385    def test_bytes_executable_replaces_shell(self):
386        self._assert_python([], executable=os.fsencode(sys.executable),
387                            shell=True)
388
389    @unittest.skipIf(mswindows, "executable argument replaces shell")
390    def test_pathlike_executable_replaces_shell(self):
391        self._assert_python([], executable=FakePath(sys.executable),
392                            shell=True)
393
394    # For use in the test_cwd* tests below.
395    def _normalize_cwd(self, cwd):
396        # Normalize an expected cwd (for Tru64 support).
397        # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
398        # strings.  See bug #1063571.
399        with os_helper.change_cwd(cwd):
400            return os.getcwd()
401
402    # For use in the test_cwd* tests below.
403    def _split_python_path(self):
404        # Return normalized (python_dir, python_base).
405        python_path = os.path.realpath(sys.executable)
406        return os.path.split(python_path)
407
408    # For use in the test_cwd* tests below.
409    def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
410        # Invoke Python via Popen, and assert that (1) the call succeeds,
411        # and that (2) the current working directory of the child process
412        # matches *expected_cwd*.
413        p = subprocess.Popen([python_arg, "-c",
414                              "import os, sys; "
415                              "buf = sys.stdout.buffer; "
416                              "buf.write(os.getcwd().encode()); "
417                              "buf.flush(); "
418                              "sys.exit(47)"],
419                              stdout=subprocess.PIPE,
420                              **kwargs)
421        self.addCleanup(p.stdout.close)
422        p.wait()
423        self.assertEqual(47, p.returncode)
424        normcase = os.path.normcase
425        self.assertEqual(normcase(expected_cwd),
426                         normcase(p.stdout.read().decode()))
427
428    def test_cwd(self):
429        # Check that cwd changes the cwd for the child process.
430        temp_dir = tempfile.gettempdir()
431        temp_dir = self._normalize_cwd(temp_dir)
432        self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
433
434    def test_cwd_with_bytes(self):
435        temp_dir = tempfile.gettempdir()
436        temp_dir = self._normalize_cwd(temp_dir)
437        self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir))
438
439    def test_cwd_with_pathlike(self):
440        temp_dir = tempfile.gettempdir()
441        temp_dir = self._normalize_cwd(temp_dir)
442        self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir))
443
444    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
445    def test_cwd_with_relative_arg(self):
446        # Check that Popen looks for args[0] relative to cwd if args[0]
447        # is relative.
448        python_dir, python_base = self._split_python_path()
449        rel_python = os.path.join(os.curdir, python_base)
450        with os_helper.temp_cwd() as wrong_dir:
451            # Before calling with the correct cwd, confirm that the call fails
452            # without cwd and with the wrong cwd.
453            self.assertRaises(FileNotFoundError, subprocess.Popen,
454                              [rel_python])
455            self.assertRaises(FileNotFoundError, subprocess.Popen,
456                              [rel_python], cwd=wrong_dir)
457            python_dir = self._normalize_cwd(python_dir)
458            self._assert_cwd(python_dir, rel_python, cwd=python_dir)
459
460    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
461    def test_cwd_with_relative_executable(self):
462        # Check that Popen looks for executable relative to cwd if executable
463        # is relative (and that executable takes precedence over args[0]).
464        python_dir, python_base = self._split_python_path()
465        rel_python = os.path.join(os.curdir, python_base)
466        doesntexist = "somethingyoudonthave"
467        with os_helper.temp_cwd() as wrong_dir:
468            # Before calling with the correct cwd, confirm that the call fails
469            # without cwd and with the wrong cwd.
470            self.assertRaises(FileNotFoundError, subprocess.Popen,
471                              [doesntexist], executable=rel_python)
472            self.assertRaises(FileNotFoundError, subprocess.Popen,
473                              [doesntexist], executable=rel_python,
474                              cwd=wrong_dir)
475            python_dir = self._normalize_cwd(python_dir)
476            self._assert_cwd(python_dir, doesntexist, executable=rel_python,
477                             cwd=python_dir)
478
479    def test_cwd_with_absolute_arg(self):
480        # Check that Popen can find the executable when the cwd is wrong
481        # if args[0] is an absolute path.
482        python_dir, python_base = self._split_python_path()
483        abs_python = os.path.join(python_dir, python_base)
484        rel_python = os.path.join(os.curdir, python_base)
485        with os_helper.temp_dir() as wrong_dir:
486            # Before calling with an absolute path, confirm that using a
487            # relative path fails.
488            self.assertRaises(FileNotFoundError, subprocess.Popen,
489                              [rel_python], cwd=wrong_dir)
490            wrong_dir = self._normalize_cwd(wrong_dir)
491            self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
492
493    @unittest.skipIf(sys.base_prefix != sys.prefix,
494                     'Test is not venv-compatible')
495    def test_executable_with_cwd(self):
496        python_dir, python_base = self._split_python_path()
497        python_dir = self._normalize_cwd(python_dir)
498        self._assert_cwd(python_dir, "somethingyoudonthave",
499                         executable=sys.executable, cwd=python_dir)
500
501    @unittest.skipIf(sys.base_prefix != sys.prefix,
502                     'Test is not venv-compatible')
503    @unittest.skipIf(sysconfig.is_python_build(),
504                     "need an installed Python. See #7774")
505    def test_executable_without_cwd(self):
506        # For a normal installation, it should work without 'cwd'
507        # argument.  For test runs in the build directory, see #7774.
508        self._assert_cwd(os.getcwd(), "somethingyoudonthave",
509                         executable=sys.executable)
510
511    def test_stdin_pipe(self):
512        # stdin redirection
513        p = subprocess.Popen([sys.executable, "-c",
514                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
515                        stdin=subprocess.PIPE)
516        p.stdin.write(b"pear")
517        p.stdin.close()
518        p.wait()
519        self.assertEqual(p.returncode, 1)
520
521    def test_stdin_filedes(self):
522        # stdin is set to open file descriptor
523        tf = tempfile.TemporaryFile()
524        self.addCleanup(tf.close)
525        d = tf.fileno()
526        os.write(d, b"pear")
527        os.lseek(d, 0, 0)
528        p = subprocess.Popen([sys.executable, "-c",
529                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
530                         stdin=d)
531        p.wait()
532        self.assertEqual(p.returncode, 1)
533
534    def test_stdin_fileobj(self):
535        # stdin is set to open file object
536        tf = tempfile.TemporaryFile()
537        self.addCleanup(tf.close)
538        tf.write(b"pear")
539        tf.seek(0)
540        p = subprocess.Popen([sys.executable, "-c",
541                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
542                         stdin=tf)
543        p.wait()
544        self.assertEqual(p.returncode, 1)
545
546    def test_stdout_pipe(self):
547        # stdout redirection
548        p = subprocess.Popen([sys.executable, "-c",
549                          'import sys; sys.stdout.write("orange")'],
550                         stdout=subprocess.PIPE)
551        with p:
552            self.assertEqual(p.stdout.read(), b"orange")
553
554    def test_stdout_filedes(self):
555        # stdout is set to open file descriptor
556        tf = tempfile.TemporaryFile()
557        self.addCleanup(tf.close)
558        d = tf.fileno()
559        p = subprocess.Popen([sys.executable, "-c",
560                          'import sys; sys.stdout.write("orange")'],
561                         stdout=d)
562        p.wait()
563        os.lseek(d, 0, 0)
564        self.assertEqual(os.read(d, 1024), b"orange")
565
566    def test_stdout_fileobj(self):
567        # stdout is set to open file object
568        tf = tempfile.TemporaryFile()
569        self.addCleanup(tf.close)
570        p = subprocess.Popen([sys.executable, "-c",
571                          'import sys; sys.stdout.write("orange")'],
572                         stdout=tf)
573        p.wait()
574        tf.seek(0)
575        self.assertEqual(tf.read(), b"orange")
576
577    def test_stderr_pipe(self):
578        # stderr redirection
579        p = subprocess.Popen([sys.executable, "-c",
580                          'import sys; sys.stderr.write("strawberry")'],
581                         stderr=subprocess.PIPE)
582        with p:
583            self.assertEqual(p.stderr.read(), b"strawberry")
584
585    def test_stderr_filedes(self):
586        # stderr is set to open file descriptor
587        tf = tempfile.TemporaryFile()
588        self.addCleanup(tf.close)
589        d = tf.fileno()
590        p = subprocess.Popen([sys.executable, "-c",
591                          'import sys; sys.stderr.write("strawberry")'],
592                         stderr=d)
593        p.wait()
594        os.lseek(d, 0, 0)
595        self.assertEqual(os.read(d, 1024), b"strawberry")
596
597    def test_stderr_fileobj(self):
598        # stderr is set to open file object
599        tf = tempfile.TemporaryFile()
600        self.addCleanup(tf.close)
601        p = subprocess.Popen([sys.executable, "-c",
602                          'import sys; sys.stderr.write("strawberry")'],
603                         stderr=tf)
604        p.wait()
605        tf.seek(0)
606        self.assertEqual(tf.read(), b"strawberry")
607
608    def test_stderr_redirect_with_no_stdout_redirect(self):
609        # test stderr=STDOUT while stdout=None (not set)
610
611        # - grandchild prints to stderr
612        # - child redirects grandchild's stderr to its stdout
613        # - the parent should get grandchild's stderr in child's stdout
614        p = subprocess.Popen([sys.executable, "-c",
615                              'import sys, subprocess;'
616                              'rc = subprocess.call([sys.executable, "-c",'
617                              '    "import sys;"'
618                              '    "sys.stderr.write(\'42\')"],'
619                              '    stderr=subprocess.STDOUT);'
620                              'sys.exit(rc)'],
621                             stdout=subprocess.PIPE,
622                             stderr=subprocess.PIPE)
623        stdout, stderr = p.communicate()
624        #NOTE: stdout should get stderr from grandchild
625        self.assertEqual(stdout, b'42')
626        self.assertEqual(stderr, b'') # should be empty
627        self.assertEqual(p.returncode, 0)
628
629    def test_stdout_stderr_pipe(self):
630        # capture stdout and stderr to the same pipe
631        p = subprocess.Popen([sys.executable, "-c",
632                              'import sys;'
633                              'sys.stdout.write("apple");'
634                              'sys.stdout.flush();'
635                              'sys.stderr.write("orange")'],
636                             stdout=subprocess.PIPE,
637                             stderr=subprocess.STDOUT)
638        with p:
639            self.assertEqual(p.stdout.read(), b"appleorange")
640
641    def test_stdout_stderr_file(self):
642        # capture stdout and stderr to the same open file
643        tf = tempfile.TemporaryFile()
644        self.addCleanup(tf.close)
645        p = subprocess.Popen([sys.executable, "-c",
646                              'import sys;'
647                              'sys.stdout.write("apple");'
648                              'sys.stdout.flush();'
649                              'sys.stderr.write("orange")'],
650                             stdout=tf,
651                             stderr=tf)
652        p.wait()
653        tf.seek(0)
654        self.assertEqual(tf.read(), b"appleorange")
655
656    def test_stdout_filedes_of_stdout(self):
657        # stdout is set to 1 (#1531862).
658        # To avoid printing the text on stdout, we do something similar to
659        # test_stdout_none (see above).  The parent subprocess calls the child
660        # subprocess passing stdout=1, and this test uses stdout=PIPE in
661        # order to capture and check the output of the parent. See #11963.
662        code = ('import sys, subprocess; '
663                'rc = subprocess.call([sys.executable, "-c", '
664                '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
665                     'b\'test with stdout=1\'))"], stdout=1); '
666                'assert rc == 18')
667        p = subprocess.Popen([sys.executable, "-c", code],
668                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
669        self.addCleanup(p.stdout.close)
670        self.addCleanup(p.stderr.close)
671        out, err = p.communicate()
672        self.assertEqual(p.returncode, 0, err)
673        self.assertEqual(out.rstrip(), b'test with stdout=1')
674
675    def test_stdout_devnull(self):
676        p = subprocess.Popen([sys.executable, "-c",
677                              'for i in range(10240):'
678                              'print("x" * 1024)'],
679                              stdout=subprocess.DEVNULL)
680        p.wait()
681        self.assertEqual(p.stdout, None)
682
683    def test_stderr_devnull(self):
684        p = subprocess.Popen([sys.executable, "-c",
685                              'import sys\n'
686                              'for i in range(10240):'
687                              'sys.stderr.write("x" * 1024)'],
688                              stderr=subprocess.DEVNULL)
689        p.wait()
690        self.assertEqual(p.stderr, None)
691
692    def test_stdin_devnull(self):
693        p = subprocess.Popen([sys.executable, "-c",
694                              'import sys;'
695                              'sys.stdin.read(1)'],
696                              stdin=subprocess.DEVNULL)
697        p.wait()
698        self.assertEqual(p.stdin, None)
699
700    @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'),
701                         'fcntl.F_GETPIPE_SZ required for test.')
702    def test_pipesizes(self):
703        test_pipe_r, test_pipe_w = os.pipe()
704        try:
705            # Get the default pipesize with F_GETPIPE_SZ
706            pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ)
707        finally:
708            os.close(test_pipe_r)
709            os.close(test_pipe_w)
710        pipesize = pipesize_default // 2
711        if pipesize < 512:  # the POSIX minimum
712            raise unittest.SkitTest(
713                'default pipesize too small to perform test.')
714        p = subprocess.Popen(
715            [sys.executable, "-c",
716             'import sys; sys.stdin.read(); sys.stdout.write("out"); '
717             'sys.stderr.write("error!")'],
718            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
719            stderr=subprocess.PIPE, pipesize=pipesize)
720        try:
721            for fifo in [p.stdin, p.stdout, p.stderr]:
722                self.assertEqual(
723                    fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ),
724                    pipesize)
725            # Windows pipe size can be acquired via GetNamedPipeInfoFunction
726            # https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-getnamedpipeinfo
727            # However, this function is not yet in _winapi.
728            p.stdin.write(b"pear")
729            p.stdin.close()
730            p.stdout.close()
731            p.stderr.close()
732        finally:
733            p.kill()
734            p.wait()
735
736    @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'),
737                         'fcntl.F_GETPIPE_SZ required for test.')
738    def test_pipesize_default(self):
739        p = subprocess.Popen(
740            [sys.executable, "-c",
741             'import sys; sys.stdin.read(); sys.stdout.write("out"); '
742             'sys.stderr.write("error!")'],
743            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
744            stderr=subprocess.PIPE, pipesize=-1)
745        try:
746            fp_r, fp_w = os.pipe()
747            try:
748                default_pipesize = fcntl.fcntl(fp_w, fcntl.F_GETPIPE_SZ)
749                for fifo in [p.stdin, p.stdout, p.stderr]:
750                    self.assertEqual(
751                        fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ),
752                        default_pipesize)
753            finally:
754                os.close(fp_r)
755                os.close(fp_w)
756            # On other platforms we cannot test the pipe size (yet). But above
757            # code using pipesize=-1 should not crash.
758            p.stdin.close()
759            p.stdout.close()
760            p.stderr.close()
761        finally:
762            p.kill()
763            p.wait()
764
765    def test_env(self):
766        newenv = os.environ.copy()
767        newenv["FRUIT"] = "orange"
768        with subprocess.Popen([sys.executable, "-c",
769                               'import sys,os;'
770                               'sys.stdout.write(os.getenv("FRUIT"))'],
771                              stdout=subprocess.PIPE,
772                              env=newenv) as p:
773            stdout, stderr = p.communicate()
774            self.assertEqual(stdout, b"orange")
775
776    # Windows requires at least the SYSTEMROOT environment variable to start
777    # Python
778    @unittest.skipIf(sys.platform == 'win32',
779                     'cannot test an empty env on Windows')
780    @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1,
781                     'The Python shared library cannot be loaded '
782                     'with an empty environment.')
783    def test_empty_env(self):
784        """Verify that env={} is as empty as possible."""
785
786        def is_env_var_to_ignore(n):
787            """Determine if an environment variable is under our control."""
788            # This excludes some __CF_* and VERSIONER_* keys MacOS insists
789            # on adding even when the environment in exec is empty.
790            # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist.
791            return ('VERSIONER' in n or '__CF' in n or  # MacOS
792                    n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo
793                    n == 'LC_CTYPE') # Locale coercion triggered
794
795        with subprocess.Popen([sys.executable, "-c",
796                               'import os; print(list(os.environ.keys()))'],
797                              stdout=subprocess.PIPE, env={}) as p:
798            stdout, stderr = p.communicate()
799            child_env_names = eval(stdout.strip())
800            self.assertIsInstance(child_env_names, list)
801            child_env_names = [k for k in child_env_names
802                               if not is_env_var_to_ignore(k)]
803            self.assertEqual(child_env_names, [])
804
805    def test_invalid_cmd(self):
806        # null character in the command name
807        cmd = sys.executable + '\0'
808        with self.assertRaises(ValueError):
809            subprocess.Popen([cmd, "-c", "pass"])
810
811        # null character in the command argument
812        with self.assertRaises(ValueError):
813            subprocess.Popen([sys.executable, "-c", "pass#\0"])
814
815    def test_invalid_env(self):
816        # null character in the environment variable name
817        newenv = os.environ.copy()
818        newenv["FRUIT\0VEGETABLE"] = "cabbage"
819        with self.assertRaises(ValueError):
820            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
821
822        # null character in the environment variable value
823        newenv = os.environ.copy()
824        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
825        with self.assertRaises(ValueError):
826            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
827
828        # equal character in the environment variable name
829        newenv = os.environ.copy()
830        newenv["FRUIT=ORANGE"] = "lemon"
831        with self.assertRaises(ValueError):
832            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
833
834        # equal character in the environment variable value
835        newenv = os.environ.copy()
836        newenv["FRUIT"] = "orange=lemon"
837        with subprocess.Popen([sys.executable, "-c",
838                               'import sys, os;'
839                               'sys.stdout.write(os.getenv("FRUIT"))'],
840                              stdout=subprocess.PIPE,
841                              env=newenv) as p:
842            stdout, stderr = p.communicate()
843            self.assertEqual(stdout, b"orange=lemon")
844
845    def test_communicate_stdin(self):
846        p = subprocess.Popen([sys.executable, "-c",
847                              'import sys;'
848                              'sys.exit(sys.stdin.read() == "pear")'],
849                             stdin=subprocess.PIPE)
850        p.communicate(b"pear")
851        self.assertEqual(p.returncode, 1)
852
853    def test_communicate_stdout(self):
854        p = subprocess.Popen([sys.executable, "-c",
855                              'import sys; sys.stdout.write("pineapple")'],
856                             stdout=subprocess.PIPE)
857        (stdout, stderr) = p.communicate()
858        self.assertEqual(stdout, b"pineapple")
859        self.assertEqual(stderr, None)
860
861    def test_communicate_stderr(self):
862        p = subprocess.Popen([sys.executable, "-c",
863                              'import sys; sys.stderr.write("pineapple")'],
864                             stderr=subprocess.PIPE)
865        (stdout, stderr) = p.communicate()
866        self.assertEqual(stdout, None)
867        self.assertEqual(stderr, b"pineapple")
868
869    def test_communicate(self):
870        p = subprocess.Popen([sys.executable, "-c",
871                              'import sys,os;'
872                              'sys.stderr.write("pineapple");'
873                              'sys.stdout.write(sys.stdin.read())'],
874                             stdin=subprocess.PIPE,
875                             stdout=subprocess.PIPE,
876                             stderr=subprocess.PIPE)
877        self.addCleanup(p.stdout.close)
878        self.addCleanup(p.stderr.close)
879        self.addCleanup(p.stdin.close)
880        (stdout, stderr) = p.communicate(b"banana")
881        self.assertEqual(stdout, b"banana")
882        self.assertEqual(stderr, b"pineapple")
883
884    def test_communicate_timeout(self):
885        p = subprocess.Popen([sys.executable, "-c",
886                              'import sys,os,time;'
887                              'sys.stderr.write("pineapple\\n");'
888                              'time.sleep(1);'
889                              'sys.stderr.write("pear\\n");'
890                              'sys.stdout.write(sys.stdin.read())'],
891                             universal_newlines=True,
892                             stdin=subprocess.PIPE,
893                             stdout=subprocess.PIPE,
894                             stderr=subprocess.PIPE)
895        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
896                          timeout=0.3)
897        # Make sure we can keep waiting for it, and that we get the whole output
898        # after it completes.
899        (stdout, stderr) = p.communicate()
900        self.assertEqual(stdout, "banana")
901        self.assertEqual(stderr.encode(), b"pineapple\npear\n")
902
903    def test_communicate_timeout_large_output(self):
904        # Test an expiring timeout while the child is outputting lots of data.
905        p = subprocess.Popen([sys.executable, "-c",
906                              'import sys,os,time;'
907                              'sys.stdout.write("a" * (64 * 1024));'
908                              'time.sleep(0.2);'
909                              'sys.stdout.write("a" * (64 * 1024));'
910                              'time.sleep(0.2);'
911                              'sys.stdout.write("a" * (64 * 1024));'
912                              'time.sleep(0.2);'
913                              'sys.stdout.write("a" * (64 * 1024));'],
914                             stdout=subprocess.PIPE)
915        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
916        (stdout, _) = p.communicate()
917        self.assertEqual(len(stdout), 4 * 64 * 1024)
918
919    # Test for the fd leak reported in http://bugs.python.org/issue2791.
920    def test_communicate_pipe_fd_leak(self):
921        for stdin_pipe in (False, True):
922            for stdout_pipe in (False, True):
923                for stderr_pipe in (False, True):
924                    options = {}
925                    if stdin_pipe:
926                        options['stdin'] = subprocess.PIPE
927                    if stdout_pipe:
928                        options['stdout'] = subprocess.PIPE
929                    if stderr_pipe:
930                        options['stderr'] = subprocess.PIPE
931                    if not options:
932                        continue
933                    p = subprocess.Popen(ZERO_RETURN_CMD, **options)
934                    p.communicate()
935                    if p.stdin is not None:
936                        self.assertTrue(p.stdin.closed)
937                    if p.stdout is not None:
938                        self.assertTrue(p.stdout.closed)
939                    if p.stderr is not None:
940                        self.assertTrue(p.stderr.closed)
941
942    def test_communicate_returns(self):
943        # communicate() should return None if no redirection is active
944        p = subprocess.Popen([sys.executable, "-c",
945                              "import sys; sys.exit(47)"])
946        (stdout, stderr) = p.communicate()
947        self.assertEqual(stdout, None)
948        self.assertEqual(stderr, None)
949
950    def test_communicate_pipe_buf(self):
951        # communicate() with writes larger than pipe_buf
952        # This test will probably deadlock rather than fail, if
953        # communicate() does not work properly.
954        x, y = os.pipe()
955        os.close(x)
956        os.close(y)
957        p = subprocess.Popen([sys.executable, "-c",
958                              'import sys,os;'
959                              'sys.stdout.write(sys.stdin.read(47));'
960                              'sys.stderr.write("x" * %d);'
961                              'sys.stdout.write(sys.stdin.read())' %
962                              support.PIPE_MAX_SIZE],
963                             stdin=subprocess.PIPE,
964                             stdout=subprocess.PIPE,
965                             stderr=subprocess.PIPE)
966        self.addCleanup(p.stdout.close)
967        self.addCleanup(p.stderr.close)
968        self.addCleanup(p.stdin.close)
969        string_to_write = b"a" * support.PIPE_MAX_SIZE
970        (stdout, stderr) = p.communicate(string_to_write)
971        self.assertEqual(stdout, string_to_write)
972
973    def test_writes_before_communicate(self):
974        # stdin.write before communicate()
975        p = subprocess.Popen([sys.executable, "-c",
976                              'import sys,os;'
977                              'sys.stdout.write(sys.stdin.read())'],
978                             stdin=subprocess.PIPE,
979                             stdout=subprocess.PIPE,
980                             stderr=subprocess.PIPE)
981        self.addCleanup(p.stdout.close)
982        self.addCleanup(p.stderr.close)
983        self.addCleanup(p.stdin.close)
984        p.stdin.write(b"banana")
985        (stdout, stderr) = p.communicate(b"split")
986        self.assertEqual(stdout, b"bananasplit")
987        self.assertEqual(stderr, b"")
988
989    def test_universal_newlines_and_text(self):
990        args = [
991            sys.executable, "-c",
992            'import sys,os;' + SETBINARY +
993            'buf = sys.stdout.buffer;'
994            'buf.write(sys.stdin.readline().encode());'
995            'buf.flush();'
996            'buf.write(b"line2\\n");'
997            'buf.flush();'
998            'buf.write(sys.stdin.read().encode());'
999            'buf.flush();'
1000            'buf.write(b"line4\\n");'
1001            'buf.flush();'
1002            'buf.write(b"line5\\r\\n");'
1003            'buf.flush();'
1004            'buf.write(b"line6\\r");'
1005            'buf.flush();'
1006            'buf.write(b"\\nline7");'
1007            'buf.flush();'
1008            'buf.write(b"\\nline8");']
1009
1010        for extra_kwarg in ('universal_newlines', 'text'):
1011            p = subprocess.Popen(args, **{'stdin': subprocess.PIPE,
1012                                          'stdout': subprocess.PIPE,
1013                                          extra_kwarg: True})
1014            with p:
1015                p.stdin.write("line1\n")
1016                p.stdin.flush()
1017                self.assertEqual(p.stdout.readline(), "line1\n")
1018                p.stdin.write("line3\n")
1019                p.stdin.close()
1020                self.addCleanup(p.stdout.close)
1021                self.assertEqual(p.stdout.readline(),
1022                                 "line2\n")
1023                self.assertEqual(p.stdout.read(6),
1024                                 "line3\n")
1025                self.assertEqual(p.stdout.read(),
1026                                 "line4\nline5\nline6\nline7\nline8")
1027
1028    def test_universal_newlines_communicate(self):
1029        # universal newlines through communicate()
1030        p = subprocess.Popen([sys.executable, "-c",
1031                              'import sys,os;' + SETBINARY +
1032                              'buf = sys.stdout.buffer;'
1033                              'buf.write(b"line2\\n");'
1034                              'buf.flush();'
1035                              'buf.write(b"line4\\n");'
1036                              'buf.flush();'
1037                              'buf.write(b"line5\\r\\n");'
1038                              'buf.flush();'
1039                              'buf.write(b"line6\\r");'
1040                              'buf.flush();'
1041                              'buf.write(b"\\nline7");'
1042                              'buf.flush();'
1043                              'buf.write(b"\\nline8");'],
1044                             stderr=subprocess.PIPE,
1045                             stdout=subprocess.PIPE,
1046                             universal_newlines=1)
1047        self.addCleanup(p.stdout.close)
1048        self.addCleanup(p.stderr.close)
1049        (stdout, stderr) = p.communicate()
1050        self.assertEqual(stdout,
1051                         "line2\nline4\nline5\nline6\nline7\nline8")
1052
1053    def test_universal_newlines_communicate_stdin(self):
1054        # universal newlines through communicate(), with only stdin
1055        p = subprocess.Popen([sys.executable, "-c",
1056                              'import sys,os;' + SETBINARY + textwrap.dedent('''
1057                               s = sys.stdin.readline()
1058                               assert s == "line1\\n", repr(s)
1059                               s = sys.stdin.read()
1060                               assert s == "line3\\n", repr(s)
1061                              ''')],
1062                             stdin=subprocess.PIPE,
1063                             universal_newlines=1)
1064        (stdout, stderr) = p.communicate("line1\nline3\n")
1065        self.assertEqual(p.returncode, 0)
1066
1067    def test_universal_newlines_communicate_input_none(self):
1068        # Test communicate(input=None) with universal newlines.
1069        #
1070        # We set stdout to PIPE because, as of this writing, a different
1071        # code path is tested when the number of pipes is zero or one.
1072        p = subprocess.Popen(ZERO_RETURN_CMD,
1073                             stdin=subprocess.PIPE,
1074                             stdout=subprocess.PIPE,
1075                             universal_newlines=True)
1076        p.communicate()
1077        self.assertEqual(p.returncode, 0)
1078
1079    def test_universal_newlines_communicate_stdin_stdout_stderr(self):
1080        # universal newlines through communicate(), with stdin, stdout, stderr
1081        p = subprocess.Popen([sys.executable, "-c",
1082                              'import sys,os;' + SETBINARY + textwrap.dedent('''
1083                               s = sys.stdin.buffer.readline()
1084                               sys.stdout.buffer.write(s)
1085                               sys.stdout.buffer.write(b"line2\\r")
1086                               sys.stderr.buffer.write(b"eline2\\n")
1087                               s = sys.stdin.buffer.read()
1088                               sys.stdout.buffer.write(s)
1089                               sys.stdout.buffer.write(b"line4\\n")
1090                               sys.stdout.buffer.write(b"line5\\r\\n")
1091                               sys.stderr.buffer.write(b"eline6\\r")
1092                               sys.stderr.buffer.write(b"eline7\\r\\nz")
1093                              ''')],
1094                             stdin=subprocess.PIPE,
1095                             stderr=subprocess.PIPE,
1096                             stdout=subprocess.PIPE,
1097                             universal_newlines=True)
1098        self.addCleanup(p.stdout.close)
1099        self.addCleanup(p.stderr.close)
1100        (stdout, stderr) = p.communicate("line1\nline3\n")
1101        self.assertEqual(p.returncode, 0)
1102        self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
1103        # Python debug build push something like "[42442 refs]\n"
1104        # to stderr at exit of subprocess.
1105        self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
1106
1107    def test_universal_newlines_communicate_encodings(self):
1108        # Check that universal newlines mode works for various encodings,
1109        # in particular for encodings in the UTF-16 and UTF-32 families.
1110        # See issue #15595.
1111        #
1112        # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
1113        # without, and UTF-16 and UTF-32.
1114        for encoding in ['utf-16', 'utf-32-be']:
1115            code = ("import sys; "
1116                    r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
1117                    encoding)
1118            args = [sys.executable, '-c', code]
1119            # We set stdin to be non-None because, as of this writing,
1120            # a different code path is used when the number of pipes is
1121            # zero or one.
1122            popen = subprocess.Popen(args,
1123                                     stdin=subprocess.PIPE,
1124                                     stdout=subprocess.PIPE,
1125                                     encoding=encoding)
1126            stdout, stderr = popen.communicate(input='')
1127            self.assertEqual(stdout, '1\n2\n3\n4')
1128
1129    def test_communicate_errors(self):
1130        for errors, expected in [
1131            ('ignore', ''),
1132            ('replace', '\ufffd\ufffd'),
1133            ('surrogateescape', '\udc80\udc80'),
1134            ('backslashreplace', '\\x80\\x80'),
1135        ]:
1136            code = ("import sys; "
1137                    r"sys.stdout.buffer.write(b'[\x80\x80]')")
1138            args = [sys.executable, '-c', code]
1139            # We set stdin to be non-None because, as of this writing,
1140            # a different code path is used when the number of pipes is
1141            # zero or one.
1142            popen = subprocess.Popen(args,
1143                                     stdin=subprocess.PIPE,
1144                                     stdout=subprocess.PIPE,
1145                                     encoding='utf-8',
1146                                     errors=errors)
1147            stdout, stderr = popen.communicate(input='')
1148            self.assertEqual(stdout, '[{}]'.format(expected))
1149
1150    def test_no_leaking(self):
1151        # Make sure we leak no resources
1152        if not mswindows:
1153            max_handles = 1026 # too much for most UNIX systems
1154        else:
1155            max_handles = 2050 # too much for (at least some) Windows setups
1156        handles = []
1157        tmpdir = tempfile.mkdtemp()
1158        try:
1159            for i in range(max_handles):
1160                try:
1161                    tmpfile = os.path.join(tmpdir, os_helper.TESTFN)
1162                    handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
1163                except OSError as e:
1164                    if e.errno != errno.EMFILE:
1165                        raise
1166                    break
1167            else:
1168                self.skipTest("failed to reach the file descriptor limit "
1169                    "(tried %d)" % max_handles)
1170            # Close a couple of them (should be enough for a subprocess)
1171            for i in range(10):
1172                os.close(handles.pop())
1173            # Loop creating some subprocesses. If one of them leaks some fds,
1174            # the next loop iteration will fail by reaching the max fd limit.
1175            for i in range(15):
1176                p = subprocess.Popen([sys.executable, "-c",
1177                                      "import sys;"
1178                                      "sys.stdout.write(sys.stdin.read())"],
1179                                     stdin=subprocess.PIPE,
1180                                     stdout=subprocess.PIPE,
1181                                     stderr=subprocess.PIPE)
1182                data = p.communicate(b"lime")[0]
1183                self.assertEqual(data, b"lime")
1184        finally:
1185            for h in handles:
1186                os.close(h)
1187            shutil.rmtree(tmpdir)
1188
1189    def test_list2cmdline(self):
1190        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
1191                         '"a b c" d e')
1192        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
1193                         'ab\\"c \\ d')
1194        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
1195                         'ab\\"c " \\\\" d')
1196        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
1197                         'a\\\\\\b "de fg" h')
1198        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
1199                         'a\\\\\\"b c d')
1200        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
1201                         '"a\\\\b c" d e')
1202        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
1203                         '"a\\\\b\\ c" d e')
1204        self.assertEqual(subprocess.list2cmdline(['ab', '']),
1205                         'ab ""')
1206
1207    def test_poll(self):
1208        p = subprocess.Popen([sys.executable, "-c",
1209                              "import os; os.read(0, 1)"],
1210                             stdin=subprocess.PIPE)
1211        self.addCleanup(p.stdin.close)
1212        self.assertIsNone(p.poll())
1213        os.write(p.stdin.fileno(), b'A')
1214        p.wait()
1215        # Subsequent invocations should just return the returncode
1216        self.assertEqual(p.poll(), 0)
1217
1218    def test_wait(self):
1219        p = subprocess.Popen(ZERO_RETURN_CMD)
1220        self.assertEqual(p.wait(), 0)
1221        # Subsequent invocations should just return the returncode
1222        self.assertEqual(p.wait(), 0)
1223
1224    def test_wait_timeout(self):
1225        p = subprocess.Popen([sys.executable,
1226                              "-c", "import time; time.sleep(0.3)"])
1227        with self.assertRaises(subprocess.TimeoutExpired) as c:
1228            p.wait(timeout=0.0001)
1229        self.assertIn("0.0001", str(c.exception))  # For coverage of __str__.
1230        self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
1231
1232    def test_invalid_bufsize(self):
1233        # an invalid type of the bufsize argument should raise
1234        # TypeError.
1235        with self.assertRaises(TypeError):
1236            subprocess.Popen(ZERO_RETURN_CMD, "orange")
1237
1238    def test_bufsize_is_none(self):
1239        # bufsize=None should be the same as bufsize=0.
1240        p = subprocess.Popen(ZERO_RETURN_CMD, None)
1241        self.assertEqual(p.wait(), 0)
1242        # Again with keyword arg
1243        p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None)
1244        self.assertEqual(p.wait(), 0)
1245
1246    def _test_bufsize_equal_one(self, line, expected, universal_newlines):
1247        # subprocess may deadlock with bufsize=1, see issue #21332
1248        with subprocess.Popen([sys.executable, "-c", "import sys;"
1249                               "sys.stdout.write(sys.stdin.readline());"
1250                               "sys.stdout.flush()"],
1251                              stdin=subprocess.PIPE,
1252                              stdout=subprocess.PIPE,
1253                              stderr=subprocess.DEVNULL,
1254                              bufsize=1,
1255                              universal_newlines=universal_newlines) as p:
1256            p.stdin.write(line) # expect that it flushes the line in text mode
1257            os.close(p.stdin.fileno()) # close it without flushing the buffer
1258            read_line = p.stdout.readline()
1259            with support.SuppressCrashReport():
1260                try:
1261                    p.stdin.close()
1262                except OSError:
1263                    pass
1264            p.stdin = None
1265        self.assertEqual(p.returncode, 0)
1266        self.assertEqual(read_line, expected)
1267
1268    def test_bufsize_equal_one_text_mode(self):
1269        # line is flushed in text mode with bufsize=1.
1270        # we should get the full line in return
1271        line = "line\n"
1272        self._test_bufsize_equal_one(line, line, universal_newlines=True)
1273
1274    def test_bufsize_equal_one_binary_mode(self):
1275        # line is not flushed in binary mode with bufsize=1.
1276        # we should get empty response
1277        line = b'line' + os.linesep.encode() # assume ascii-based locale
1278        with self.assertWarnsRegex(RuntimeWarning, 'line buffering'):
1279            self._test_bufsize_equal_one(line, b'', universal_newlines=False)
1280
1281    def test_leaking_fds_on_error(self):
1282        # see bug #5179: Popen leaks file descriptors to PIPEs if
1283        # the child fails to execute; this will eventually exhaust
1284        # the maximum number of open fds. 1024 seems a very common
1285        # value for that limit, but Windows has 2048, so we loop
1286        # 1024 times (each call leaked two fds).
1287        for i in range(1024):
1288            with self.assertRaises(NONEXISTING_ERRORS):
1289                subprocess.Popen(NONEXISTING_CMD,
1290                                 stdout=subprocess.PIPE,
1291                                 stderr=subprocess.PIPE)
1292
1293    def test_nonexisting_with_pipes(self):
1294        # bpo-30121: Popen with pipes must close properly pipes on error.
1295        # Previously, os.close() was called with a Windows handle which is not
1296        # a valid file descriptor.
1297        #
1298        # Run the test in a subprocess to control how the CRT reports errors
1299        # and to get stderr content.
1300        try:
1301            import msvcrt
1302            msvcrt.CrtSetReportMode
1303        except (AttributeError, ImportError):
1304            self.skipTest("need msvcrt.CrtSetReportMode")
1305
1306        code = textwrap.dedent(f"""
1307            import msvcrt
1308            import subprocess
1309
1310            cmd = {NONEXISTING_CMD!r}
1311
1312            for report_type in [msvcrt.CRT_WARN,
1313                                msvcrt.CRT_ERROR,
1314                                msvcrt.CRT_ASSERT]:
1315                msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE)
1316                msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR)
1317
1318            try:
1319                subprocess.Popen(cmd,
1320                                 stdout=subprocess.PIPE,
1321                                 stderr=subprocess.PIPE)
1322            except OSError:
1323                pass
1324        """)
1325        cmd = [sys.executable, "-c", code]
1326        proc = subprocess.Popen(cmd,
1327                                stderr=subprocess.PIPE,
1328                                universal_newlines=True)
1329        with proc:
1330            stderr = proc.communicate()[1]
1331        self.assertEqual(stderr, "")
1332        self.assertEqual(proc.returncode, 0)
1333
1334    def test_double_close_on_error(self):
1335        # Issue #18851
1336        fds = []
1337        def open_fds():
1338            for i in range(20):
1339                fds.extend(os.pipe())
1340                time.sleep(0.001)
1341        t = threading.Thread(target=open_fds)
1342        t.start()
1343        try:
1344            with self.assertRaises(EnvironmentError):
1345                subprocess.Popen(NONEXISTING_CMD,
1346                                 stdin=subprocess.PIPE,
1347                                 stdout=subprocess.PIPE,
1348                                 stderr=subprocess.PIPE)
1349        finally:
1350            t.join()
1351            exc = None
1352            for fd in fds:
1353                # If a double close occurred, some of those fds will
1354                # already have been closed by mistake, and os.close()
1355                # here will raise.
1356                try:
1357                    os.close(fd)
1358                except OSError as e:
1359                    exc = e
1360            if exc is not None:
1361                raise exc
1362
1363    def test_threadsafe_wait(self):
1364        """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
1365        proc = subprocess.Popen([sys.executable, '-c',
1366                                 'import time; time.sleep(12)'])
1367        self.assertEqual(proc.returncode, None)
1368        results = []
1369
1370        def kill_proc_timer_thread():
1371            results.append(('thread-start-poll-result', proc.poll()))
1372            # terminate it from the thread and wait for the result.
1373            proc.kill()
1374            proc.wait()
1375            results.append(('thread-after-kill-and-wait', proc.returncode))
1376            # this wait should be a no-op given the above.
1377            proc.wait()
1378            results.append(('thread-after-second-wait', proc.returncode))
1379
1380        # This is a timing sensitive test, the failure mode is
1381        # triggered when both the main thread and this thread are in
1382        # the wait() call at once.  The delay here is to allow the
1383        # main thread to most likely be blocked in its wait() call.
1384        t = threading.Timer(0.2, kill_proc_timer_thread)
1385        t.start()
1386
1387        if mswindows:
1388            expected_errorcode = 1
1389        else:
1390            # Should be -9 because of the proc.kill() from the thread.
1391            expected_errorcode = -9
1392
1393        # Wait for the process to finish; the thread should kill it
1394        # long before it finishes on its own.  Supplying a timeout
1395        # triggers a different code path for better coverage.
1396        proc.wait(timeout=support.SHORT_TIMEOUT)
1397        self.assertEqual(proc.returncode, expected_errorcode,
1398                         msg="unexpected result in wait from main thread")
1399
1400        # This should be a no-op with no change in returncode.
1401        proc.wait()
1402        self.assertEqual(proc.returncode, expected_errorcode,
1403                         msg="unexpected result in second main wait.")
1404
1405        t.join()
1406        # Ensure that all of the thread results are as expected.
1407        # When a race condition occurs in wait(), the returncode could
1408        # be set by the wrong thread that doesn't actually have it
1409        # leading to an incorrect value.
1410        self.assertEqual([('thread-start-poll-result', None),
1411                          ('thread-after-kill-and-wait', expected_errorcode),
1412                          ('thread-after-second-wait', expected_errorcode)],
1413                         results)
1414
1415    def test_issue8780(self):
1416        # Ensure that stdout is inherited from the parent
1417        # if stdout=PIPE is not used
1418        code = ';'.join((
1419            'import subprocess, sys',
1420            'retcode = subprocess.call('
1421                "[sys.executable, '-c', 'print(\"Hello World!\")'])",
1422            'assert retcode == 0'))
1423        output = subprocess.check_output([sys.executable, '-c', code])
1424        self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
1425
1426    def test_handles_closed_on_exception(self):
1427        # If CreateProcess exits with an error, ensure the
1428        # duplicate output handles are released
1429        ifhandle, ifname = tempfile.mkstemp()
1430        ofhandle, ofname = tempfile.mkstemp()
1431        efhandle, efname = tempfile.mkstemp()
1432        try:
1433            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
1434              stderr=efhandle)
1435        except OSError:
1436            os.close(ifhandle)
1437            os.remove(ifname)
1438            os.close(ofhandle)
1439            os.remove(ofname)
1440            os.close(efhandle)
1441            os.remove(efname)
1442        self.assertFalse(os.path.exists(ifname))
1443        self.assertFalse(os.path.exists(ofname))
1444        self.assertFalse(os.path.exists(efname))
1445
1446    def test_communicate_epipe(self):
1447        # Issue 10963: communicate() should hide EPIPE
1448        p = subprocess.Popen(ZERO_RETURN_CMD,
1449                             stdin=subprocess.PIPE,
1450                             stdout=subprocess.PIPE,
1451                             stderr=subprocess.PIPE)
1452        self.addCleanup(p.stdout.close)
1453        self.addCleanup(p.stderr.close)
1454        self.addCleanup(p.stdin.close)
1455        p.communicate(b"x" * 2**20)
1456
1457    def test_repr(self):
1458        path_cmd = pathlib.Path("my-tool.py")
1459        pathlib_cls = path_cmd.__class__.__name__
1460
1461        cases = [
1462            ("ls", True, 123, "<Popen: returncode: 123 args: 'ls'>"),
1463            ('a' * 100, True, 0,
1464             "<Popen: returncode: 0 args: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...>"),
1465            (["ls"], False, None, "<Popen: returncode: None args: ['ls']>"),
1466            (["ls", '--my-opts', 'a' * 100], False, None,
1467             "<Popen: returncode: None args: ['ls', '--my-opts', 'aaaaaaaaaaaaaaaaaaaaaaaa...>"),
1468            (path_cmd, False, 7, f"<Popen: returncode: 7 args: {pathlib_cls}('my-tool.py')>")
1469        ]
1470        with unittest.mock.patch.object(subprocess.Popen, '_execute_child'):
1471            for cmd, shell, code, sx in cases:
1472                p = subprocess.Popen(cmd, shell=shell)
1473                p.returncode = code
1474                self.assertEqual(repr(p), sx)
1475
1476    def test_communicate_epipe_only_stdin(self):
1477        # Issue 10963: communicate() should hide EPIPE
1478        p = subprocess.Popen(ZERO_RETURN_CMD,
1479                             stdin=subprocess.PIPE)
1480        self.addCleanup(p.stdin.close)
1481        p.wait()
1482        p.communicate(b"x" * 2**20)
1483
1484    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
1485                         "Requires signal.SIGUSR1")
1486    @unittest.skipUnless(hasattr(os, 'kill'),
1487                         "Requires os.kill")
1488    @unittest.skipUnless(hasattr(os, 'getppid'),
1489                         "Requires os.getppid")
1490    def test_communicate_eintr(self):
1491        # Issue #12493: communicate() should handle EINTR
1492        def handler(signum, frame):
1493            pass
1494        old_handler = signal.signal(signal.SIGUSR1, handler)
1495        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
1496
1497        args = [sys.executable, "-c",
1498                'import os, signal;'
1499                'os.kill(os.getppid(), signal.SIGUSR1)']
1500        for stream in ('stdout', 'stderr'):
1501            kw = {stream: subprocess.PIPE}
1502            with subprocess.Popen(args, **kw) as process:
1503                # communicate() will be interrupted by SIGUSR1
1504                process.communicate()
1505
1506
1507    # This test is Linux-ish specific for simplicity to at least have
1508    # some coverage.  It is not a platform specific bug.
1509    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
1510                         "Linux specific")
1511    def test_failed_child_execute_fd_leak(self):
1512        """Test for the fork() failure fd leak reported in issue16327."""
1513        fd_directory = '/proc/%d/fd' % os.getpid()
1514        fds_before_popen = os.listdir(fd_directory)
1515        with self.assertRaises(PopenTestException):
1516            PopenExecuteChildRaises(
1517                    ZERO_RETURN_CMD, stdin=subprocess.PIPE,
1518                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1519
1520        # NOTE: This test doesn't verify that the real _execute_child
1521        # does not close the file descriptors itself on the way out
1522        # during an exception.  Code inspection has confirmed that.
1523
1524        fds_after_exception = os.listdir(fd_directory)
1525        self.assertEqual(fds_before_popen, fds_after_exception)
1526
1527    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1528    def test_file_not_found_includes_filename(self):
1529        with self.assertRaises(FileNotFoundError) as c:
1530            subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args'])
1531        self.assertEqual(c.exception.filename, '/opt/nonexistent_binary')
1532
1533    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1534    def test_file_not_found_with_bad_cwd(self):
1535        with self.assertRaises(FileNotFoundError) as c:
1536            subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory')
1537        self.assertEqual(c.exception.filename, '/some/nonexistent/directory')
1538
1539    def test_class_getitems(self):
1540        self.assertIsInstance(subprocess.Popen[bytes], types.GenericAlias)
1541        self.assertIsInstance(subprocess.CompletedProcess[str], types.GenericAlias)
1542
1543class RunFuncTestCase(BaseTestCase):
1544    def run_python(self, code, **kwargs):
1545        """Run Python code in a subprocess using subprocess.run"""
1546        argv = [sys.executable, "-c", code]
1547        return subprocess.run(argv, **kwargs)
1548
1549    def test_returncode(self):
1550        # call() function with sequence argument
1551        cp = self.run_python("import sys; sys.exit(47)")
1552        self.assertEqual(cp.returncode, 47)
1553        with self.assertRaises(subprocess.CalledProcessError):
1554            cp.check_returncode()
1555
1556    def test_check(self):
1557        with self.assertRaises(subprocess.CalledProcessError) as c:
1558            self.run_python("import sys; sys.exit(47)", check=True)
1559        self.assertEqual(c.exception.returncode, 47)
1560
1561    def test_check_zero(self):
1562        # check_returncode shouldn't raise when returncode is zero
1563        cp = subprocess.run(ZERO_RETURN_CMD, check=True)
1564        self.assertEqual(cp.returncode, 0)
1565
1566    def test_timeout(self):
1567        # run() function with timeout argument; we want to test that the child
1568        # process gets killed when the timeout expires.  If the child isn't
1569        # killed, this call will deadlock since subprocess.run waits for the
1570        # child.
1571        with self.assertRaises(subprocess.TimeoutExpired):
1572            self.run_python("while True: pass", timeout=0.0001)
1573
1574    def test_capture_stdout(self):
1575        # capture stdout with zero return code
1576        cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
1577        self.assertIn(b'BDFL', cp.stdout)
1578
1579    def test_capture_stderr(self):
1580        cp = self.run_python("import sys; sys.stderr.write('BDFL')",
1581                             stderr=subprocess.PIPE)
1582        self.assertIn(b'BDFL', cp.stderr)
1583
1584    def test_check_output_stdin_arg(self):
1585        # run() can be called with stdin set to a file
1586        tf = tempfile.TemporaryFile()
1587        self.addCleanup(tf.close)
1588        tf.write(b'pear')
1589        tf.seek(0)
1590        cp = self.run_python(
1591                 "import sys; sys.stdout.write(sys.stdin.read().upper())",
1592                stdin=tf, stdout=subprocess.PIPE)
1593        self.assertIn(b'PEAR', cp.stdout)
1594
1595    def test_check_output_input_arg(self):
1596        # check_output() can be called with input set to a string
1597        cp = self.run_python(
1598                "import sys; sys.stdout.write(sys.stdin.read().upper())",
1599                input=b'pear', stdout=subprocess.PIPE)
1600        self.assertIn(b'PEAR', cp.stdout)
1601
1602    def test_check_output_stdin_with_input_arg(self):
1603        # run() refuses to accept 'stdin' with 'input'
1604        tf = tempfile.TemporaryFile()
1605        self.addCleanup(tf.close)
1606        tf.write(b'pear')
1607        tf.seek(0)
1608        with self.assertRaises(ValueError,
1609              msg="Expected ValueError when stdin and input args supplied.") as c:
1610            output = self.run_python("print('will not be run')",
1611                                     stdin=tf, input=b'hare')
1612        self.assertIn('stdin', c.exception.args[0])
1613        self.assertIn('input', c.exception.args[0])
1614
1615    def test_check_output_timeout(self):
1616        with self.assertRaises(subprocess.TimeoutExpired) as c:
1617            cp = self.run_python((
1618                     "import sys, time\n"
1619                     "sys.stdout.write('BDFL')\n"
1620                     "sys.stdout.flush()\n"
1621                     "time.sleep(3600)"),
1622                    # Some heavily loaded buildbots (sparc Debian 3.x) require
1623                    # this much time to start and print.
1624                    timeout=3, stdout=subprocess.PIPE)
1625        self.assertEqual(c.exception.output, b'BDFL')
1626        # output is aliased to stdout
1627        self.assertEqual(c.exception.stdout, b'BDFL')
1628
1629    def test_run_kwargs(self):
1630        newenv = os.environ.copy()
1631        newenv["FRUIT"] = "banana"
1632        cp = self.run_python(('import sys, os;'
1633                      'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
1634                             env=newenv)
1635        self.assertEqual(cp.returncode, 33)
1636
1637    def test_run_with_pathlike_path(self):
1638        # bpo-31961: test run(pathlike_object)
1639        # the name of a command that can be run without
1640        # any arguments that exit fast
1641        prog = 'tree.com' if mswindows else 'ls'
1642        path = shutil.which(prog)
1643        if path is None:
1644            self.skipTest(f'{prog} required for this test')
1645        path = FakePath(path)
1646        res = subprocess.run(path, stdout=subprocess.DEVNULL)
1647        self.assertEqual(res.returncode, 0)
1648        with self.assertRaises(TypeError):
1649            subprocess.run(path, stdout=subprocess.DEVNULL, shell=True)
1650
1651    def test_run_with_bytes_path_and_arguments(self):
1652        # bpo-31961: test run([bytes_object, b'additional arguments'])
1653        path = os.fsencode(sys.executable)
1654        args = [path, '-c', b'import sys; sys.exit(57)']
1655        res = subprocess.run(args)
1656        self.assertEqual(res.returncode, 57)
1657
1658    def test_run_with_pathlike_path_and_arguments(self):
1659        # bpo-31961: test run([pathlike_object, 'additional arguments'])
1660        path = FakePath(sys.executable)
1661        args = [path, '-c', 'import sys; sys.exit(57)']
1662        res = subprocess.run(args)
1663        self.assertEqual(res.returncode, 57)
1664
1665    def test_capture_output(self):
1666        cp = self.run_python(("import sys;"
1667                              "sys.stdout.write('BDFL'); "
1668                              "sys.stderr.write('FLUFL')"),
1669                             capture_output=True)
1670        self.assertIn(b'BDFL', cp.stdout)
1671        self.assertIn(b'FLUFL', cp.stderr)
1672
1673    def test_stdout_with_capture_output_arg(self):
1674        # run() refuses to accept 'stdout' with 'capture_output'
1675        tf = tempfile.TemporaryFile()
1676        self.addCleanup(tf.close)
1677        with self.assertRaises(ValueError,
1678            msg=("Expected ValueError when stdout and capture_output "
1679                 "args supplied.")) as c:
1680            output = self.run_python("print('will not be run')",
1681                                      capture_output=True, stdout=tf)
1682        self.assertIn('stdout', c.exception.args[0])
1683        self.assertIn('capture_output', c.exception.args[0])
1684
1685    def test_stderr_with_capture_output_arg(self):
1686        # run() refuses to accept 'stderr' with 'capture_output'
1687        tf = tempfile.TemporaryFile()
1688        self.addCleanup(tf.close)
1689        with self.assertRaises(ValueError,
1690            msg=("Expected ValueError when stderr and capture_output "
1691                 "args supplied.")) as c:
1692            output = self.run_python("print('will not be run')",
1693                                      capture_output=True, stderr=tf)
1694        self.assertIn('stderr', c.exception.args[0])
1695        self.assertIn('capture_output', c.exception.args[0])
1696
1697    # This test _might_ wind up a bit fragile on loaded build+test machines
1698    # as it depends on the timing with wide enough margins for normal situations
1699    # but does assert that it happened "soon enough" to believe the right thing
1700    # happened.
1701    @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command")
1702    def test_run_with_shell_timeout_and_capture_output(self):
1703        """Output capturing after a timeout mustn't hang forever on open filehandles."""
1704        before_secs = time.monotonic()
1705        try:
1706            subprocess.run('sleep 3', shell=True, timeout=0.1,
1707                           capture_output=True)  # New session unspecified.
1708        except subprocess.TimeoutExpired as exc:
1709            after_secs = time.monotonic()
1710            stacks = traceback.format_exc()  # assertRaises doesn't give this.
1711        else:
1712            self.fail("TimeoutExpired not raised.")
1713        self.assertLess(after_secs - before_secs, 1.5,
1714                        msg="TimeoutExpired was delayed! Bad traceback:\n```\n"
1715                        f"{stacks}```")
1716
1717
1718def _get_test_grp_name():
1719    for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'):
1720        if grp:
1721            try:
1722                grp.getgrnam(name_group)
1723            except KeyError:
1724                continue
1725            return name_group
1726    else:
1727        raise unittest.SkipTest('No identified group name to use for this test on this platform.')
1728
1729
1730@unittest.skipIf(mswindows, "POSIX specific tests")
1731class POSIXProcessTestCase(BaseTestCase):
1732
1733    def setUp(self):
1734        super().setUp()
1735        self._nonexistent_dir = "/_this/pa.th/does/not/exist"
1736
1737    def _get_chdir_exception(self):
1738        try:
1739            os.chdir(self._nonexistent_dir)
1740        except OSError as e:
1741            # This avoids hard coding the errno value or the OS perror()
1742            # string and instead capture the exception that we want to see
1743            # below for comparison.
1744            desired_exception = e
1745        else:
1746            self.fail("chdir to nonexistent directory %s succeeded." %
1747                      self._nonexistent_dir)
1748        return desired_exception
1749
1750    def test_exception_cwd(self):
1751        """Test error in the child raised in the parent for a bad cwd."""
1752        desired_exception = self._get_chdir_exception()
1753        try:
1754            p = subprocess.Popen([sys.executable, "-c", ""],
1755                                 cwd=self._nonexistent_dir)
1756        except OSError as e:
1757            # Test that the child process chdir failure actually makes
1758            # it up to the parent process as the correct exception.
1759            self.assertEqual(desired_exception.errno, e.errno)
1760            self.assertEqual(desired_exception.strerror, e.strerror)
1761            self.assertEqual(desired_exception.filename, e.filename)
1762        else:
1763            self.fail("Expected OSError: %s" % desired_exception)
1764
1765    def test_exception_bad_executable(self):
1766        """Test error in the child raised in the parent for a bad executable."""
1767        desired_exception = self._get_chdir_exception()
1768        try:
1769            p = subprocess.Popen([sys.executable, "-c", ""],
1770                                 executable=self._nonexistent_dir)
1771        except OSError as e:
1772            # Test that the child process exec failure actually makes
1773            # it up to the parent process as the correct exception.
1774            self.assertEqual(desired_exception.errno, e.errno)
1775            self.assertEqual(desired_exception.strerror, e.strerror)
1776            self.assertEqual(desired_exception.filename, e.filename)
1777        else:
1778            self.fail("Expected OSError: %s" % desired_exception)
1779
1780    def test_exception_bad_args_0(self):
1781        """Test error in the child raised in the parent for a bad args[0]."""
1782        desired_exception = self._get_chdir_exception()
1783        try:
1784            p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
1785        except OSError as e:
1786            # Test that the child process exec failure actually makes
1787            # it up to the parent process as the correct exception.
1788            self.assertEqual(desired_exception.errno, e.errno)
1789            self.assertEqual(desired_exception.strerror, e.strerror)
1790            self.assertEqual(desired_exception.filename, e.filename)
1791        else:
1792            self.fail("Expected OSError: %s" % desired_exception)
1793
1794    # We mock the __del__ method for Popen in the next two tests
1795    # because it does cleanup based on the pid returned by fork_exec
1796    # along with issuing a resource warning if it still exists. Since
1797    # we don't actually spawn a process in these tests we can forego
1798    # the destructor. An alternative would be to set _child_created to
1799    # False before the destructor is called but there is no easy way
1800    # to do that
1801    class PopenNoDestructor(subprocess.Popen):
1802        def __del__(self):
1803            pass
1804
1805    @mock.patch("subprocess._posixsubprocess.fork_exec")
1806    def test_exception_errpipe_normal(self, fork_exec):
1807        """Test error passing done through errpipe_write in the good case"""
1808        def proper_error(*args):
1809            errpipe_write = args[13]
1810            # Write the hex for the error code EISDIR: 'is a directory'
1811            err_code = '{:x}'.format(errno.EISDIR).encode()
1812            os.write(errpipe_write, b"OSError:" + err_code + b":")
1813            return 0
1814
1815        fork_exec.side_effect = proper_error
1816
1817        with mock.patch("subprocess.os.waitpid",
1818                        side_effect=ChildProcessError):
1819            with self.assertRaises(IsADirectoryError):
1820                self.PopenNoDestructor(["non_existent_command"])
1821
1822    @mock.patch("subprocess._posixsubprocess.fork_exec")
1823    def test_exception_errpipe_bad_data(self, fork_exec):
1824        """Test error passing done through errpipe_write where its not
1825        in the expected format"""
1826        error_data = b"\xFF\x00\xDE\xAD"
1827        def bad_error(*args):
1828            errpipe_write = args[13]
1829            # Anything can be in the pipe, no assumptions should
1830            # be made about its encoding, so we'll write some
1831            # arbitrary hex bytes to test it out
1832            os.write(errpipe_write, error_data)
1833            return 0
1834
1835        fork_exec.side_effect = bad_error
1836
1837        with mock.patch("subprocess.os.waitpid",
1838                        side_effect=ChildProcessError):
1839            with self.assertRaises(subprocess.SubprocessError) as e:
1840                self.PopenNoDestructor(["non_existent_command"])
1841
1842        self.assertIn(repr(error_data), str(e.exception))
1843
1844    @unittest.skipIf(not os.path.exists('/proc/self/status'),
1845                     "need /proc/self/status")
1846    def test_restore_signals(self):
1847        # Blindly assume that cat exists on systems with /proc/self/status...
1848        default_proc_status = subprocess.check_output(
1849                ['cat', '/proc/self/status'],
1850                restore_signals=False)
1851        for line in default_proc_status.splitlines():
1852            if line.startswith(b'SigIgn'):
1853                default_sig_ign_mask = line
1854                break
1855        else:
1856            self.skipTest("SigIgn not found in /proc/self/status.")
1857        restored_proc_status = subprocess.check_output(
1858                ['cat', '/proc/self/status'],
1859                restore_signals=True)
1860        for line in restored_proc_status.splitlines():
1861            if line.startswith(b'SigIgn'):
1862                restored_sig_ign_mask = line
1863                break
1864        self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask,
1865                            msg="restore_signals=True should've unblocked "
1866                            "SIGPIPE and friends.")
1867
1868    def test_start_new_session(self):
1869        # For code coverage of calling setsid().  We don't care if we get an
1870        # EPERM error from it depending on the test execution environment, that
1871        # still indicates that it was called.
1872        try:
1873            output = subprocess.check_output(
1874                    [sys.executable, "-c", "import os; print(os.getsid(0))"],
1875                    start_new_session=True)
1876        except OSError as e:
1877            if e.errno != errno.EPERM:
1878                raise
1879        else:
1880            parent_sid = os.getsid(0)
1881            child_sid = int(output)
1882            self.assertNotEqual(parent_sid, child_sid)
1883
1884    @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform')
1885    def test_user(self):
1886        # For code coverage of the user parameter.  We don't care if we get an
1887        # EPERM error from it depending on the test execution environment, that
1888        # still indicates that it was called.
1889
1890        uid = os.geteuid()
1891        test_users = [65534 if uid != 65534 else 65533, uid]
1892        name_uid = "nobody" if sys.platform != 'darwin' else "unknown"
1893
1894        if pwd is not None:
1895            try:
1896                pwd.getpwnam(name_uid)
1897                test_users.append(name_uid)
1898            except KeyError:
1899                # unknown user name
1900                name_uid = None
1901
1902        for user in test_users:
1903            # posix_spawn() may be used with close_fds=False
1904            for close_fds in (False, True):
1905                with self.subTest(user=user, close_fds=close_fds):
1906                    try:
1907                        output = subprocess.check_output(
1908                                [sys.executable, "-c",
1909                                 "import os; print(os.getuid())"],
1910                                user=user,
1911                                close_fds=close_fds)
1912                    except PermissionError:  # (EACCES, EPERM)
1913                        pass
1914                    except OSError as e:
1915                        if e.errno not in (errno.EACCES, errno.EPERM):
1916                            raise
1917                    else:
1918                        if isinstance(user, str):
1919                            user_uid = pwd.getpwnam(user).pw_uid
1920                        else:
1921                            user_uid = user
1922                        child_user = int(output)
1923                        self.assertEqual(child_user, user_uid)
1924
1925        with self.assertRaises(ValueError):
1926            subprocess.check_call(ZERO_RETURN_CMD, user=-1)
1927
1928        with self.assertRaises(OverflowError):
1929            subprocess.check_call(ZERO_RETURN_CMD,
1930                                  cwd=os.curdir, env=os.environ, user=2**64)
1931
1932        if pwd is None and name_uid is not None:
1933            with self.assertRaises(ValueError):
1934                subprocess.check_call(ZERO_RETURN_CMD, user=name_uid)
1935
1936    @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform')
1937    def test_user_error(self):
1938        with self.assertRaises(ValueError):
1939            subprocess.check_call(ZERO_RETURN_CMD, user=65535)
1940
1941    @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform')
1942    def test_group(self):
1943        gid = os.getegid()
1944        group_list = [65534 if gid != 65534 else 65533]
1945        name_group = _get_test_grp_name()
1946
1947        if grp is not None:
1948            group_list.append(name_group)
1949
1950        for group in group_list + [gid]:
1951            # posix_spawn() may be used with close_fds=False
1952            for close_fds in (False, True):
1953                with self.subTest(group=group, close_fds=close_fds):
1954                    try:
1955                        output = subprocess.check_output(
1956                                [sys.executable, "-c",
1957                                 "import os; print(os.getgid())"],
1958                                group=group,
1959                                close_fds=close_fds)
1960                    except PermissionError:  # (EACCES, EPERM)
1961                        pass
1962                    else:
1963                        if isinstance(group, str):
1964                            group_gid = grp.getgrnam(group).gr_gid
1965                        else:
1966                            group_gid = group
1967
1968                        child_group = int(output)
1969                        self.assertEqual(child_group, group_gid)
1970
1971        # make sure we bomb on negative values
1972        with self.assertRaises(ValueError):
1973            subprocess.check_call(ZERO_RETURN_CMD, group=-1)
1974
1975        with self.assertRaises(OverflowError):
1976            subprocess.check_call(ZERO_RETURN_CMD,
1977                                  cwd=os.curdir, env=os.environ, group=2**64)
1978
1979        if grp is None:
1980            with self.assertRaises(ValueError):
1981                subprocess.check_call(ZERO_RETURN_CMD, group=name_group)
1982
1983    @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform')
1984    def test_group_error(self):
1985        with self.assertRaises(ValueError):
1986            subprocess.check_call(ZERO_RETURN_CMD, group=65535)
1987
1988    @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform')
1989    def test_extra_groups(self):
1990        gid = os.getegid()
1991        group_list = [65534 if gid != 65534 else 65533]
1992        name_group = _get_test_grp_name()
1993        perm_error = False
1994
1995        if grp is not None:
1996            group_list.append(name_group)
1997
1998        try:
1999            output = subprocess.check_output(
2000                    [sys.executable, "-c",
2001                     "import os, sys, json; json.dump(os.getgroups(), sys.stdout)"],
2002                    extra_groups=group_list)
2003        except OSError as ex:
2004            if ex.errno != errno.EPERM:
2005                raise
2006            perm_error = True
2007
2008        else:
2009            parent_groups = os.getgroups()
2010            child_groups = json.loads(output)
2011
2012            if grp is not None:
2013                desired_gids = [grp.getgrnam(g).gr_gid if isinstance(g, str) else g
2014                                for g in group_list]
2015            else:
2016                desired_gids = group_list
2017
2018            if perm_error:
2019                self.assertEqual(set(child_groups), set(parent_groups))
2020            else:
2021                self.assertEqual(set(desired_gids), set(child_groups))
2022
2023        # make sure we bomb on negative values
2024        with self.assertRaises(ValueError):
2025            subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1])
2026
2027        with self.assertRaises(ValueError):
2028            subprocess.check_call(ZERO_RETURN_CMD,
2029                                  cwd=os.curdir, env=os.environ,
2030                                  extra_groups=[2**64])
2031
2032        if grp is None:
2033            with self.assertRaises(ValueError):
2034                subprocess.check_call(ZERO_RETURN_CMD,
2035                                      extra_groups=[name_group])
2036
2037    @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform')
2038    def test_extra_groups_error(self):
2039        with self.assertRaises(ValueError):
2040            subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[])
2041
2042    @unittest.skipIf(mswindows or not hasattr(os, 'umask'),
2043                     'POSIX umask() is not available.')
2044    def test_umask(self):
2045        tmpdir = None
2046        try:
2047            tmpdir = tempfile.mkdtemp()
2048            name = os.path.join(tmpdir, "beans")
2049            # We set an unusual umask in the child so as a unique mode
2050            # for us to test the child's touched file for.
2051            subprocess.check_call(
2052                    [sys.executable, "-c", f"open({name!r}, 'w').close()"],
2053                    umask=0o053)
2054            # Ignore execute permissions entirely in our test,
2055            # filesystems could be mounted to ignore or force that.
2056            st_mode = os.stat(name).st_mode & 0o666
2057            expected_mode = 0o624
2058            self.assertEqual(expected_mode, st_mode,
2059                             msg=f'{oct(expected_mode)} != {oct(st_mode)}')
2060        finally:
2061            if tmpdir is not None:
2062                shutil.rmtree(tmpdir)
2063
2064    def test_run_abort(self):
2065        # returncode handles signal termination
2066        with support.SuppressCrashReport():
2067            p = subprocess.Popen([sys.executable, "-c",
2068                                  'import os; os.abort()'])
2069            p.wait()
2070        self.assertEqual(-p.returncode, signal.SIGABRT)
2071
2072    def test_CalledProcessError_str_signal(self):
2073        err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
2074        error_string = str(err)
2075        # We're relying on the repr() of the signal.Signals intenum to provide
2076        # the word signal, the signal name and the numeric value.
2077        self.assertIn("signal", error_string.lower())
2078        # We're not being specific about the signal name as some signals have
2079        # multiple names and which name is revealed can vary.
2080        self.assertIn("SIG", error_string)
2081        self.assertIn(str(signal.SIGABRT), error_string)
2082
2083    def test_CalledProcessError_str_unknown_signal(self):
2084        err = subprocess.CalledProcessError(-9876543, "fake cmd")
2085        error_string = str(err)
2086        self.assertIn("unknown signal 9876543.", error_string)
2087
2088    def test_CalledProcessError_str_non_zero(self):
2089        err = subprocess.CalledProcessError(2, "fake cmd")
2090        error_string = str(err)
2091        self.assertIn("non-zero exit status 2.", error_string)
2092
2093    def test_preexec(self):
2094        # DISCLAIMER: Setting environment variables is *not* a good use
2095        # of a preexec_fn.  This is merely a test.
2096        p = subprocess.Popen([sys.executable, "-c",
2097                              'import sys,os;'
2098                              'sys.stdout.write(os.getenv("FRUIT"))'],
2099                             stdout=subprocess.PIPE,
2100                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
2101        with p:
2102            self.assertEqual(p.stdout.read(), b"apple")
2103
2104    def test_preexec_exception(self):
2105        def raise_it():
2106            raise ValueError("What if two swallows carried a coconut?")
2107        try:
2108            p = subprocess.Popen([sys.executable, "-c", ""],
2109                                 preexec_fn=raise_it)
2110        except subprocess.SubprocessError as e:
2111            self.assertTrue(
2112                    subprocess._posixsubprocess,
2113                    "Expected a ValueError from the preexec_fn")
2114        except ValueError as e:
2115            self.assertIn("coconut", e.args[0])
2116        else:
2117            self.fail("Exception raised by preexec_fn did not make it "
2118                      "to the parent process.")
2119
2120    class _TestExecuteChildPopen(subprocess.Popen):
2121        """Used to test behavior at the end of _execute_child."""
2122        def __init__(self, testcase, *args, **kwargs):
2123            self._testcase = testcase
2124            subprocess.Popen.__init__(self, *args, **kwargs)
2125
2126        def _execute_child(self, *args, **kwargs):
2127            try:
2128                subprocess.Popen._execute_child(self, *args, **kwargs)
2129            finally:
2130                # Open a bunch of file descriptors and verify that
2131                # none of them are the same as the ones the Popen
2132                # instance is using for stdin/stdout/stderr.
2133                devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
2134                               for _ in range(8)]
2135                try:
2136                    for fd in devzero_fds:
2137                        self._testcase.assertNotIn(
2138                                fd, (self.stdin.fileno(), self.stdout.fileno(),
2139                                     self.stderr.fileno()),
2140                                msg="At least one fd was closed early.")
2141                finally:
2142                    for fd in devzero_fds:
2143                        os.close(fd)
2144
2145    @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
2146    def test_preexec_errpipe_does_not_double_close_pipes(self):
2147        """Issue16140: Don't double close pipes on preexec error."""
2148
2149        def raise_it():
2150            raise subprocess.SubprocessError(
2151                    "force the _execute_child() errpipe_data path.")
2152
2153        with self.assertRaises(subprocess.SubprocessError):
2154            self._TestExecuteChildPopen(
2155                        self, ZERO_RETURN_CMD,
2156                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2157                        stderr=subprocess.PIPE, preexec_fn=raise_it)
2158
2159    def test_preexec_gc_module_failure(self):
2160        # This tests the code that disables garbage collection if the child
2161        # process will execute any Python.
2162        enabled = gc.isenabled()
2163        try:
2164            gc.disable()
2165            self.assertFalse(gc.isenabled())
2166            subprocess.call([sys.executable, '-c', ''],
2167                            preexec_fn=lambda: None)
2168            self.assertFalse(gc.isenabled(),
2169                             "Popen enabled gc when it shouldn't.")
2170
2171            gc.enable()
2172            self.assertTrue(gc.isenabled())
2173            subprocess.call([sys.executable, '-c', ''],
2174                            preexec_fn=lambda: None)
2175            self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
2176        finally:
2177            if not enabled:
2178                gc.disable()
2179
2180    @unittest.skipIf(
2181        sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
2182    def test_preexec_fork_failure(self):
2183        # The internal code did not preserve the previous exception when
2184        # re-enabling garbage collection
2185        try:
2186            from resource import getrlimit, setrlimit, RLIMIT_NPROC
2187        except ImportError as err:
2188            self.skipTest(err)  # RLIMIT_NPROC is specific to Linux and BSD
2189        limits = getrlimit(RLIMIT_NPROC)
2190        [_, hard] = limits
2191        setrlimit(RLIMIT_NPROC, (0, hard))
2192        self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
2193        try:
2194            subprocess.call([sys.executable, '-c', ''],
2195                            preexec_fn=lambda: None)
2196        except BlockingIOError:
2197            # Forking should raise EAGAIN, translated to BlockingIOError
2198            pass
2199        else:
2200            self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
2201
2202    def test_args_string(self):
2203        # args is a string
2204        fd, fname = tempfile.mkstemp()
2205        # reopen in text mode
2206        with open(fd, "w", errors="surrogateescape") as fobj:
2207            fobj.write("#!%s\n" % support.unix_shell)
2208            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
2209                       sys.executable)
2210        os.chmod(fname, 0o700)
2211        p = subprocess.Popen(fname)
2212        p.wait()
2213        os.remove(fname)
2214        self.assertEqual(p.returncode, 47)
2215
2216    def test_invalid_args(self):
2217        # invalid arguments should raise ValueError
2218        self.assertRaises(ValueError, subprocess.call,
2219                          [sys.executable, "-c",
2220                           "import sys; sys.exit(47)"],
2221                          startupinfo=47)
2222        self.assertRaises(ValueError, subprocess.call,
2223                          [sys.executable, "-c",
2224                           "import sys; sys.exit(47)"],
2225                          creationflags=47)
2226
2227    def test_shell_sequence(self):
2228        # Run command through the shell (sequence)
2229        newenv = os.environ.copy()
2230        newenv["FRUIT"] = "apple"
2231        p = subprocess.Popen(["echo $FRUIT"], shell=1,
2232                             stdout=subprocess.PIPE,
2233                             env=newenv)
2234        with p:
2235            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
2236
2237    def test_shell_string(self):
2238        # Run command through the shell (string)
2239        newenv = os.environ.copy()
2240        newenv["FRUIT"] = "apple"
2241        p = subprocess.Popen("echo $FRUIT", shell=1,
2242                             stdout=subprocess.PIPE,
2243                             env=newenv)
2244        with p:
2245            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
2246
2247    def test_call_string(self):
2248        # call() function with string argument on UNIX
2249        fd, fname = tempfile.mkstemp()
2250        # reopen in text mode
2251        with open(fd, "w", errors="surrogateescape") as fobj:
2252            fobj.write("#!%s\n" % support.unix_shell)
2253            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
2254                       sys.executable)
2255        os.chmod(fname, 0o700)
2256        rc = subprocess.call(fname)
2257        os.remove(fname)
2258        self.assertEqual(rc, 47)
2259
2260    def test_specific_shell(self):
2261        # Issue #9265: Incorrect name passed as arg[0].
2262        shells = []
2263        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
2264            for name in ['bash', 'ksh']:
2265                sh = os.path.join(prefix, name)
2266                if os.path.isfile(sh):
2267                    shells.append(sh)
2268        if not shells: # Will probably work for any shell but csh.
2269            self.skipTest("bash or ksh required for this test")
2270        sh = '/bin/sh'
2271        if os.path.isfile(sh) and not os.path.islink(sh):
2272            # Test will fail if /bin/sh is a symlink to csh.
2273            shells.append(sh)
2274        for sh in shells:
2275            p = subprocess.Popen("echo $0", executable=sh, shell=True,
2276                                 stdout=subprocess.PIPE)
2277            with p:
2278                self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
2279
2280    def _kill_process(self, method, *args):
2281        # Do not inherit file handles from the parent.
2282        # It should fix failures on some platforms.
2283        # Also set the SIGINT handler to the default to make sure it's not
2284        # being ignored (some tests rely on that.)
2285        old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
2286        try:
2287            p = subprocess.Popen([sys.executable, "-c", """if 1:
2288                                 import sys, time
2289                                 sys.stdout.write('x\\n')
2290                                 sys.stdout.flush()
2291                                 time.sleep(30)
2292                                 """],
2293                                 close_fds=True,
2294                                 stdin=subprocess.PIPE,
2295                                 stdout=subprocess.PIPE,
2296                                 stderr=subprocess.PIPE)
2297        finally:
2298            signal.signal(signal.SIGINT, old_handler)
2299        # Wait for the interpreter to be completely initialized before
2300        # sending any signal.
2301        p.stdout.read(1)
2302        getattr(p, method)(*args)
2303        return p
2304
2305    @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
2306                     "Due to known OS bug (issue #16762)")
2307    def _kill_dead_process(self, method, *args):
2308        # Do not inherit file handles from the parent.
2309        # It should fix failures on some platforms.
2310        p = subprocess.Popen([sys.executable, "-c", """if 1:
2311                             import sys, time
2312                             sys.stdout.write('x\\n')
2313                             sys.stdout.flush()
2314                             """],
2315                             close_fds=True,
2316                             stdin=subprocess.PIPE,
2317                             stdout=subprocess.PIPE,
2318                             stderr=subprocess.PIPE)
2319        # Wait for the interpreter to be completely initialized before
2320        # sending any signal.
2321        p.stdout.read(1)
2322        # The process should end after this
2323        time.sleep(1)
2324        # This shouldn't raise even though the child is now dead
2325        getattr(p, method)(*args)
2326        p.communicate()
2327
2328    def test_send_signal(self):
2329        p = self._kill_process('send_signal', signal.SIGINT)
2330        _, stderr = p.communicate()
2331        self.assertIn(b'KeyboardInterrupt', stderr)
2332        self.assertNotEqual(p.wait(), 0)
2333
2334    def test_kill(self):
2335        p = self._kill_process('kill')
2336        _, stderr = p.communicate()
2337        self.assertEqual(stderr, b'')
2338        self.assertEqual(p.wait(), -signal.SIGKILL)
2339
2340    def test_terminate(self):
2341        p = self._kill_process('terminate')
2342        _, stderr = p.communicate()
2343        self.assertEqual(stderr, b'')
2344        self.assertEqual(p.wait(), -signal.SIGTERM)
2345
2346    def test_send_signal_dead(self):
2347        # Sending a signal to a dead process
2348        self._kill_dead_process('send_signal', signal.SIGINT)
2349
2350    def test_kill_dead(self):
2351        # Killing a dead process
2352        self._kill_dead_process('kill')
2353
2354    def test_terminate_dead(self):
2355        # Terminating a dead process
2356        self._kill_dead_process('terminate')
2357
2358    def _save_fds(self, save_fds):
2359        fds = []
2360        for fd in save_fds:
2361            inheritable = os.get_inheritable(fd)
2362            saved = os.dup(fd)
2363            fds.append((fd, saved, inheritable))
2364        return fds
2365
2366    def _restore_fds(self, fds):
2367        for fd, saved, inheritable in fds:
2368            os.dup2(saved, fd, inheritable=inheritable)
2369            os.close(saved)
2370
2371    def check_close_std_fds(self, fds):
2372        # Issue #9905: test that subprocess pipes still work properly with
2373        # some standard fds closed
2374        stdin = 0
2375        saved_fds = self._save_fds(fds)
2376        for fd, saved, inheritable in saved_fds:
2377            if fd == 0:
2378                stdin = saved
2379                break
2380        try:
2381            for fd in fds:
2382                os.close(fd)
2383            out, err = subprocess.Popen([sys.executable, "-c",
2384                              'import sys;'
2385                              'sys.stdout.write("apple");'
2386                              'sys.stdout.flush();'
2387                              'sys.stderr.write("orange")'],
2388                       stdin=stdin,
2389                       stdout=subprocess.PIPE,
2390                       stderr=subprocess.PIPE).communicate()
2391            self.assertEqual(out, b'apple')
2392            self.assertEqual(err, b'orange')
2393        finally:
2394            self._restore_fds(saved_fds)
2395
2396    def test_close_fd_0(self):
2397        self.check_close_std_fds([0])
2398
2399    def test_close_fd_1(self):
2400        self.check_close_std_fds([1])
2401
2402    def test_close_fd_2(self):
2403        self.check_close_std_fds([2])
2404
2405    def test_close_fds_0_1(self):
2406        self.check_close_std_fds([0, 1])
2407
2408    def test_close_fds_0_2(self):
2409        self.check_close_std_fds([0, 2])
2410
2411    def test_close_fds_1_2(self):
2412        self.check_close_std_fds([1, 2])
2413
2414    def test_close_fds_0_1_2(self):
2415        # Issue #10806: test that subprocess pipes still work properly with
2416        # all standard fds closed.
2417        self.check_close_std_fds([0, 1, 2])
2418
2419    def test_small_errpipe_write_fd(self):
2420        """Issue #15798: Popen should work when stdio fds are available."""
2421        new_stdin = os.dup(0)
2422        new_stdout = os.dup(1)
2423        try:
2424            os.close(0)
2425            os.close(1)
2426
2427            # Side test: if errpipe_write fails to have its CLOEXEC
2428            # flag set this should cause the parent to think the exec
2429            # failed.  Extremely unlikely: everyone supports CLOEXEC.
2430            subprocess.Popen([
2431                    sys.executable, "-c",
2432                    "print('AssertionError:0:CLOEXEC failure.')"]).wait()
2433        finally:
2434            # Restore original stdin and stdout
2435            os.dup2(new_stdin, 0)
2436            os.dup2(new_stdout, 1)
2437            os.close(new_stdin)
2438            os.close(new_stdout)
2439
2440    def test_remapping_std_fds(self):
2441        # open up some temporary files
2442        temps = [tempfile.mkstemp() for i in range(3)]
2443        try:
2444            temp_fds = [fd for fd, fname in temps]
2445
2446            # unlink the files -- we won't need to reopen them
2447            for fd, fname in temps:
2448                os.unlink(fname)
2449
2450            # write some data to what will become stdin, and rewind
2451            os.write(temp_fds[1], b"STDIN")
2452            os.lseek(temp_fds[1], 0, 0)
2453
2454            # move the standard file descriptors out of the way
2455            saved_fds = self._save_fds(range(3))
2456            try:
2457                # duplicate the file objects over the standard fd's
2458                for fd, temp_fd in enumerate(temp_fds):
2459                    os.dup2(temp_fd, fd)
2460
2461                # now use those files in the "wrong" order, so that subprocess
2462                # has to rearrange them in the child
2463                p = subprocess.Popen([sys.executable, "-c",
2464                    'import sys; got = sys.stdin.read();'
2465                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2466                    stdin=temp_fds[1],
2467                    stdout=temp_fds[2],
2468                    stderr=temp_fds[0])
2469                p.wait()
2470            finally:
2471                self._restore_fds(saved_fds)
2472
2473            for fd in temp_fds:
2474                os.lseek(fd, 0, 0)
2475
2476            out = os.read(temp_fds[2], 1024)
2477            err = os.read(temp_fds[0], 1024).strip()
2478            self.assertEqual(out, b"got STDIN")
2479            self.assertEqual(err, b"err")
2480
2481        finally:
2482            for fd in temp_fds:
2483                os.close(fd)
2484
2485    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
2486        # open up some temporary files
2487        temps = [tempfile.mkstemp() for i in range(3)]
2488        temp_fds = [fd for fd, fname in temps]
2489        try:
2490            # unlink the files -- we won't need to reopen them
2491            for fd, fname in temps:
2492                os.unlink(fname)
2493
2494            # save a copy of the standard file descriptors
2495            saved_fds = self._save_fds(range(3))
2496            try:
2497                # duplicate the temp files over the standard fd's 0, 1, 2
2498                for fd, temp_fd in enumerate(temp_fds):
2499                    os.dup2(temp_fd, fd)
2500
2501                # write some data to what will become stdin, and rewind
2502                os.write(stdin_no, b"STDIN")
2503                os.lseek(stdin_no, 0, 0)
2504
2505                # now use those files in the given order, so that subprocess
2506                # has to rearrange them in the child
2507                p = subprocess.Popen([sys.executable, "-c",
2508                    'import sys; got = sys.stdin.read();'
2509                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2510                    stdin=stdin_no,
2511                    stdout=stdout_no,
2512                    stderr=stderr_no)
2513                p.wait()
2514
2515                for fd in temp_fds:
2516                    os.lseek(fd, 0, 0)
2517
2518                out = os.read(stdout_no, 1024)
2519                err = os.read(stderr_no, 1024).strip()
2520            finally:
2521                self._restore_fds(saved_fds)
2522
2523            self.assertEqual(out, b"got STDIN")
2524            self.assertEqual(err, b"err")
2525
2526        finally:
2527            for fd in temp_fds:
2528                os.close(fd)
2529
2530    # When duping fds, if there arises a situation where one of the fds is
2531    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
2532    # This tests all combinations of this.
2533    def test_swap_fds(self):
2534        self.check_swap_fds(0, 1, 2)
2535        self.check_swap_fds(0, 2, 1)
2536        self.check_swap_fds(1, 0, 2)
2537        self.check_swap_fds(1, 2, 0)
2538        self.check_swap_fds(2, 0, 1)
2539        self.check_swap_fds(2, 1, 0)
2540
2541    def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds):
2542        saved_fds = self._save_fds(range(3))
2543        try:
2544            for from_fd in from_fds:
2545                with tempfile.TemporaryFile() as f:
2546                    os.dup2(f.fileno(), from_fd)
2547
2548            fd_to_close = (set(range(3)) - set(from_fds)).pop()
2549            os.close(fd_to_close)
2550
2551            arg_names = ['stdin', 'stdout', 'stderr']
2552            kwargs = {}
2553            for from_fd, to_fd in zip(from_fds, to_fds):
2554                kwargs[arg_names[to_fd]] = from_fd
2555
2556            code = textwrap.dedent(r'''
2557                import os, sys
2558                skipped_fd = int(sys.argv[1])
2559                for fd in range(3):
2560                    if fd != skipped_fd:
2561                        os.write(fd, str(fd).encode('ascii'))
2562            ''')
2563
2564            skipped_fd = (set(range(3)) - set(to_fds)).pop()
2565
2566            rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)],
2567                                 **kwargs)
2568            self.assertEqual(rc, 0)
2569
2570            for from_fd, to_fd in zip(from_fds, to_fds):
2571                os.lseek(from_fd, 0, os.SEEK_SET)
2572                read_bytes = os.read(from_fd, 1024)
2573                read_fds = list(map(int, read_bytes.decode('ascii')))
2574                msg = textwrap.dedent(f"""
2575                    When testing {from_fds} to {to_fds} redirection,
2576                    parent descriptor {from_fd} got redirected
2577                    to descriptor(s) {read_fds} instead of descriptor {to_fd}.
2578                """)
2579                self.assertEqual([to_fd], read_fds, msg)
2580        finally:
2581            self._restore_fds(saved_fds)
2582
2583    # Check that subprocess can remap std fds correctly even
2584    # if one of them is closed (#32844).
2585    def test_swap_std_fds_with_one_closed(self):
2586        for from_fds in itertools.combinations(range(3), 2):
2587            for to_fds in itertools.permutations(range(3), 2):
2588                self._check_swap_std_fds_with_one_closed(from_fds, to_fds)
2589
2590    def test_surrogates_error_message(self):
2591        def prepare():
2592            raise ValueError("surrogate:\uDCff")
2593
2594        try:
2595            subprocess.call(
2596                ZERO_RETURN_CMD,
2597                preexec_fn=prepare)
2598        except ValueError as err:
2599            # Pure Python implementations keeps the message
2600            self.assertIsNone(subprocess._posixsubprocess)
2601            self.assertEqual(str(err), "surrogate:\uDCff")
2602        except subprocess.SubprocessError as err:
2603            # _posixsubprocess uses a default message
2604            self.assertIsNotNone(subprocess._posixsubprocess)
2605            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
2606        else:
2607            self.fail("Expected ValueError or subprocess.SubprocessError")
2608
2609    def test_undecodable_env(self):
2610        for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
2611            encoded_value = value.encode("ascii", "surrogateescape")
2612
2613            # test str with surrogates
2614            script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
2615            env = os.environ.copy()
2616            env[key] = value
2617            # Use C locale to get ASCII for the locale encoding to force
2618            # surrogate-escaping of \xFF in the child process
2619            env['LC_ALL'] = 'C'
2620            decoded_value = value
2621            stdout = subprocess.check_output(
2622                [sys.executable, "-c", script],
2623                env=env)
2624            stdout = stdout.rstrip(b'\n\r')
2625            self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
2626
2627            # test bytes
2628            key = key.encode("ascii", "surrogateescape")
2629            script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
2630            env = os.environ.copy()
2631            env[key] = encoded_value
2632            stdout = subprocess.check_output(
2633                [sys.executable, "-c", script],
2634                env=env)
2635            stdout = stdout.rstrip(b'\n\r')
2636            self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
2637
2638    def test_bytes_program(self):
2639        abs_program = os.fsencode(ZERO_RETURN_CMD[0])
2640        args = list(ZERO_RETURN_CMD[1:])
2641        path, program = os.path.split(ZERO_RETURN_CMD[0])
2642        program = os.fsencode(program)
2643
2644        # absolute bytes path
2645        exitcode = subprocess.call([abs_program]+args)
2646        self.assertEqual(exitcode, 0)
2647
2648        # absolute bytes path as a string
2649        cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8"))
2650        exitcode = subprocess.call(cmd, shell=True)
2651        self.assertEqual(exitcode, 0)
2652
2653        # bytes program, unicode PATH
2654        env = os.environ.copy()
2655        env["PATH"] = path
2656        exitcode = subprocess.call([program]+args, env=env)
2657        self.assertEqual(exitcode, 0)
2658
2659        # bytes program, bytes PATH
2660        envb = os.environb.copy()
2661        envb[b"PATH"] = os.fsencode(path)
2662        exitcode = subprocess.call([program]+args, env=envb)
2663        self.assertEqual(exitcode, 0)
2664
2665    def test_pipe_cloexec(self):
2666        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
2667        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2668
2669        p1 = subprocess.Popen([sys.executable, sleeper],
2670                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2671                              stderr=subprocess.PIPE, close_fds=False)
2672
2673        self.addCleanup(p1.communicate, b'')
2674
2675        p2 = subprocess.Popen([sys.executable, fd_status],
2676                              stdout=subprocess.PIPE, close_fds=False)
2677
2678        output, error = p2.communicate()
2679        result_fds = set(map(int, output.split(b',')))
2680        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
2681                            p1.stderr.fileno()])
2682
2683        self.assertFalse(result_fds & unwanted_fds,
2684                         "Expected no fds from %r to be open in child, "
2685                         "found %r" %
2686                              (unwanted_fds, result_fds & unwanted_fds))
2687
2688    def test_pipe_cloexec_real_tools(self):
2689        qcat = support.findfile("qcat.py", subdir="subprocessdata")
2690        qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
2691
2692        subdata = b'zxcvbn'
2693        data = subdata * 4 + b'\n'
2694
2695        p1 = subprocess.Popen([sys.executable, qcat],
2696                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2697                              close_fds=False)
2698
2699        p2 = subprocess.Popen([sys.executable, qgrep, subdata],
2700                              stdin=p1.stdout, stdout=subprocess.PIPE,
2701                              close_fds=False)
2702
2703        self.addCleanup(p1.wait)
2704        self.addCleanup(p2.wait)
2705        def kill_p1():
2706            try:
2707                p1.terminate()
2708            except ProcessLookupError:
2709                pass
2710        def kill_p2():
2711            try:
2712                p2.terminate()
2713            except ProcessLookupError:
2714                pass
2715        self.addCleanup(kill_p1)
2716        self.addCleanup(kill_p2)
2717
2718        p1.stdin.write(data)
2719        p1.stdin.close()
2720
2721        readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
2722
2723        self.assertTrue(readfiles, "The child hung")
2724        self.assertEqual(p2.stdout.read(), data)
2725
2726        p1.stdout.close()
2727        p2.stdout.close()
2728
2729    def test_close_fds(self):
2730        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2731
2732        fds = os.pipe()
2733        self.addCleanup(os.close, fds[0])
2734        self.addCleanup(os.close, fds[1])
2735
2736        open_fds = set(fds)
2737        # add a bunch more fds
2738        for _ in range(9):
2739            fd = os.open(os.devnull, os.O_RDONLY)
2740            self.addCleanup(os.close, fd)
2741            open_fds.add(fd)
2742
2743        for fd in open_fds:
2744            os.set_inheritable(fd, True)
2745
2746        p = subprocess.Popen([sys.executable, fd_status],
2747                             stdout=subprocess.PIPE, close_fds=False)
2748        output, ignored = p.communicate()
2749        remaining_fds = set(map(int, output.split(b',')))
2750
2751        self.assertEqual(remaining_fds & open_fds, open_fds,
2752                         "Some fds were closed")
2753
2754        p = subprocess.Popen([sys.executable, fd_status],
2755                             stdout=subprocess.PIPE, close_fds=True)
2756        output, ignored = p.communicate()
2757        remaining_fds = set(map(int, output.split(b',')))
2758
2759        self.assertFalse(remaining_fds & open_fds,
2760                         "Some fds were left open")
2761        self.assertIn(1, remaining_fds, "Subprocess failed")
2762
2763        # Keep some of the fd's we opened open in the subprocess.
2764        # This tests _posixsubprocess.c's proper handling of fds_to_keep.
2765        fds_to_keep = set(open_fds.pop() for _ in range(8))
2766        p = subprocess.Popen([sys.executable, fd_status],
2767                             stdout=subprocess.PIPE, close_fds=True,
2768                             pass_fds=fds_to_keep)
2769        output, ignored = p.communicate()
2770        remaining_fds = set(map(int, output.split(b',')))
2771
2772        self.assertFalse((remaining_fds - fds_to_keep) & open_fds,
2773                         "Some fds not in pass_fds were left open")
2774        self.assertIn(1, remaining_fds, "Subprocess failed")
2775
2776
2777    @unittest.skipIf(sys.platform.startswith("freebsd") and
2778                     os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
2779                     "Requires fdescfs mounted on /dev/fd on FreeBSD.")
2780    def test_close_fds_when_max_fd_is_lowered(self):
2781        """Confirm that issue21618 is fixed (may fail under valgrind)."""
2782        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2783
2784        # This launches the meat of the test in a child process to
2785        # avoid messing with the larger unittest processes maximum
2786        # number of file descriptors.
2787        #  This process launches:
2788        #  +--> Process that lowers its RLIMIT_NOFILE aftr setting up
2789        #    a bunch of high open fds above the new lower rlimit.
2790        #    Those are reported via stdout before launching a new
2791        #    process with close_fds=False to run the actual test:
2792        #    +--> The TEST: This one launches a fd_status.py
2793        #      subprocess with close_fds=True so we can find out if
2794        #      any of the fds above the lowered rlimit are still open.
2795        p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
2796        '''
2797        import os, resource, subprocess, sys, textwrap
2798        open_fds = set()
2799        # Add a bunch more fds to pass down.
2800        for _ in range(40):
2801            fd = os.open(os.devnull, os.O_RDONLY)
2802            open_fds.add(fd)
2803
2804        # Leave a two pairs of low ones available for use by the
2805        # internal child error pipe and the stdout pipe.
2806        # We also leave 10 more open as some Python buildbots run into
2807        # "too many open files" errors during the test if we do not.
2808        for fd in sorted(open_fds)[:14]:
2809            os.close(fd)
2810            open_fds.remove(fd)
2811
2812        for fd in open_fds:
2813            #self.addCleanup(os.close, fd)
2814            os.set_inheritable(fd, True)
2815
2816        max_fd_open = max(open_fds)
2817
2818        # Communicate the open_fds to the parent unittest.TestCase process.
2819        print(','.join(map(str, sorted(open_fds))))
2820        sys.stdout.flush()
2821
2822        rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
2823        try:
2824            # 29 is lower than the highest fds we are leaving open.
2825            resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
2826            # Launch a new Python interpreter with our low fd rlim_cur that
2827            # inherits open fds above that limit.  It then uses subprocess
2828            # with close_fds=True to get a report of open fds in the child.
2829            # An explicit list of fds to check is passed to fd_status.py as
2830            # letting fd_status rely on its default logic would miss the
2831            # fds above rlim_cur as it normally only checks up to that limit.
2832            subprocess.Popen(
2833                [sys.executable, '-c',
2834                 textwrap.dedent("""
2835                     import subprocess, sys
2836                     subprocess.Popen([sys.executable, %r] +
2837                                      [str(x) for x in range({max_fd})],
2838                                      close_fds=True).wait()
2839                     """.format(max_fd=max_fd_open+1))],
2840                close_fds=False).wait()
2841        finally:
2842            resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
2843        ''' % fd_status)], stdout=subprocess.PIPE)
2844
2845        output, unused_stderr = p.communicate()
2846        output_lines = output.splitlines()
2847        self.assertEqual(len(output_lines), 2,
2848                         msg="expected exactly two lines of output:\n%r" % output)
2849        opened_fds = set(map(int, output_lines[0].strip().split(b',')))
2850        remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
2851
2852        self.assertFalse(remaining_fds & opened_fds,
2853                         msg="Some fds were left open.")
2854
2855
2856    # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
2857    # descriptor of a pipe closed in the parent process is valid in the
2858    # child process according to fstat(), but the mode of the file
2859    # descriptor is invalid, and read or write raise an error.
2860    @support.requires_mac_ver(10, 5)
2861    def test_pass_fds(self):
2862        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2863
2864        open_fds = set()
2865
2866        for x in range(5):
2867            fds = os.pipe()
2868            self.addCleanup(os.close, fds[0])
2869            self.addCleanup(os.close, fds[1])
2870            os.set_inheritable(fds[0], True)
2871            os.set_inheritable(fds[1], True)
2872            open_fds.update(fds)
2873
2874        for fd in open_fds:
2875            p = subprocess.Popen([sys.executable, fd_status],
2876                                 stdout=subprocess.PIPE, close_fds=True,
2877                                 pass_fds=(fd, ))
2878            output, ignored = p.communicate()
2879
2880            remaining_fds = set(map(int, output.split(b',')))
2881            to_be_closed = open_fds - {fd}
2882
2883            self.assertIn(fd, remaining_fds, "fd to be passed not passed")
2884            self.assertFalse(remaining_fds & to_be_closed,
2885                             "fd to be closed passed")
2886
2887            # pass_fds overrides close_fds with a warning.
2888            with self.assertWarns(RuntimeWarning) as context:
2889                self.assertFalse(subprocess.call(
2890                        ZERO_RETURN_CMD,
2891                        close_fds=False, pass_fds=(fd, )))
2892            self.assertIn('overriding close_fds', str(context.warning))
2893
2894    def test_pass_fds_inheritable(self):
2895        script = support.findfile("fd_status.py", subdir="subprocessdata")
2896
2897        inheritable, non_inheritable = os.pipe()
2898        self.addCleanup(os.close, inheritable)
2899        self.addCleanup(os.close, non_inheritable)
2900        os.set_inheritable(inheritable, True)
2901        os.set_inheritable(non_inheritable, False)
2902        pass_fds = (inheritable, non_inheritable)
2903        args = [sys.executable, script]
2904        args += list(map(str, pass_fds))
2905
2906        p = subprocess.Popen(args,
2907                             stdout=subprocess.PIPE, close_fds=True,
2908                             pass_fds=pass_fds)
2909        output, ignored = p.communicate()
2910        fds = set(map(int, output.split(b',')))
2911
2912        # the inheritable file descriptor must be inherited, so its inheritable
2913        # flag must be set in the child process after fork() and before exec()
2914        self.assertEqual(fds, set(pass_fds), "output=%a" % output)
2915
2916        # inheritable flag must not be changed in the parent process
2917        self.assertEqual(os.get_inheritable(inheritable), True)
2918        self.assertEqual(os.get_inheritable(non_inheritable), False)
2919
2920
2921    # bpo-32270: Ensure that descriptors specified in pass_fds
2922    # are inherited even if they are used in redirections.
2923    # Contributed by @izbyshev.
2924    def test_pass_fds_redirected(self):
2925        """Regression test for https://bugs.python.org/issue32270."""
2926        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2927        pass_fds = []
2928        for _ in range(2):
2929            fd = os.open(os.devnull, os.O_RDWR)
2930            self.addCleanup(os.close, fd)
2931            pass_fds.append(fd)
2932
2933        stdout_r, stdout_w = os.pipe()
2934        self.addCleanup(os.close, stdout_r)
2935        self.addCleanup(os.close, stdout_w)
2936        pass_fds.insert(1, stdout_w)
2937
2938        with subprocess.Popen([sys.executable, fd_status],
2939                              stdin=pass_fds[0],
2940                              stdout=pass_fds[1],
2941                              stderr=pass_fds[2],
2942                              close_fds=True,
2943                              pass_fds=pass_fds):
2944            output = os.read(stdout_r, 1024)
2945        fds = {int(num) for num in output.split(b',')}
2946
2947        self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}")
2948
2949
2950    def test_stdout_stdin_are_single_inout_fd(self):
2951        with io.open(os.devnull, "r+") as inout:
2952            p = subprocess.Popen(ZERO_RETURN_CMD,
2953                                 stdout=inout, stdin=inout)
2954            p.wait()
2955
2956    def test_stdout_stderr_are_single_inout_fd(self):
2957        with io.open(os.devnull, "r+") as inout:
2958            p = subprocess.Popen(ZERO_RETURN_CMD,
2959                                 stdout=inout, stderr=inout)
2960            p.wait()
2961
2962    def test_stderr_stdin_are_single_inout_fd(self):
2963        with io.open(os.devnull, "r+") as inout:
2964            p = subprocess.Popen(ZERO_RETURN_CMD,
2965                                 stderr=inout, stdin=inout)
2966            p.wait()
2967
2968    def test_wait_when_sigchild_ignored(self):
2969        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
2970        sigchild_ignore = support.findfile("sigchild_ignore.py",
2971                                           subdir="subprocessdata")
2972        p = subprocess.Popen([sys.executable, sigchild_ignore],
2973                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2974        stdout, stderr = p.communicate()
2975        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
2976                         " non-zero with this error:\n%s" %
2977                         stderr.decode('utf-8'))
2978
2979    def test_select_unbuffered(self):
2980        # Issue #11459: bufsize=0 should really set the pipes as
2981        # unbuffered (and therefore let select() work properly).
2982        select = import_helper.import_module("select")
2983        p = subprocess.Popen([sys.executable, "-c",
2984                              'import sys;'
2985                              'sys.stdout.write("apple")'],
2986                             stdout=subprocess.PIPE,
2987                             bufsize=0)
2988        f = p.stdout
2989        self.addCleanup(f.close)
2990        try:
2991            self.assertEqual(f.read(4), b"appl")
2992            self.assertIn(f, select.select([f], [], [], 0.0)[0])
2993        finally:
2994            p.wait()
2995
2996    def test_zombie_fast_process_del(self):
2997        # Issue #12650: on Unix, if Popen.__del__() was called before the
2998        # process exited, it wouldn't be added to subprocess._active, and would
2999        # remain a zombie.
3000        # spawn a Popen, and delete its reference before it exits
3001        p = subprocess.Popen([sys.executable, "-c",
3002                              'import sys, time;'
3003                              'time.sleep(0.2)'],
3004                             stdout=subprocess.PIPE,
3005                             stderr=subprocess.PIPE)
3006        self.addCleanup(p.stdout.close)
3007        self.addCleanup(p.stderr.close)
3008        ident = id(p)
3009        pid = p.pid
3010        with warnings_helper.check_warnings(('', ResourceWarning)):
3011            p = None
3012
3013        if mswindows:
3014            # subprocess._active is not used on Windows and is set to None.
3015            self.assertIsNone(subprocess._active)
3016        else:
3017            # check that p is in the active processes list
3018            self.assertIn(ident, [id(o) for o in subprocess._active])
3019
3020    def test_leak_fast_process_del_killed(self):
3021        # Issue #12650: on Unix, if Popen.__del__() was called before the
3022        # process exited, and the process got killed by a signal, it would never
3023        # be removed from subprocess._active, which triggered a FD and memory
3024        # leak.
3025        # spawn a Popen, delete its reference and kill it
3026        p = subprocess.Popen([sys.executable, "-c",
3027                              'import time;'
3028                              'time.sleep(3)'],
3029                             stdout=subprocess.PIPE,
3030                             stderr=subprocess.PIPE)
3031        self.addCleanup(p.stdout.close)
3032        self.addCleanup(p.stderr.close)
3033        ident = id(p)
3034        pid = p.pid
3035        with warnings_helper.check_warnings(('', ResourceWarning)):
3036            p = None
3037            support.gc_collect()  # For PyPy or other GCs.
3038
3039        os.kill(pid, signal.SIGKILL)
3040        if mswindows:
3041            # subprocess._active is not used on Windows and is set to None.
3042            self.assertIsNone(subprocess._active)
3043        else:
3044            # check that p is in the active processes list
3045            self.assertIn(ident, [id(o) for o in subprocess._active])
3046
3047        # let some time for the process to exit, and create a new Popen: this
3048        # should trigger the wait() of p
3049        time.sleep(0.2)
3050        with self.assertRaises(OSError):
3051            with subprocess.Popen(NONEXISTING_CMD,
3052                                  stdout=subprocess.PIPE,
3053                                  stderr=subprocess.PIPE) as proc:
3054                pass
3055        # p should have been wait()ed on, and removed from the _active list
3056        self.assertRaises(OSError, os.waitpid, pid, 0)
3057        if mswindows:
3058            # subprocess._active is not used on Windows and is set to None.
3059            self.assertIsNone(subprocess._active)
3060        else:
3061            self.assertNotIn(ident, [id(o) for o in subprocess._active])
3062
3063    def test_close_fds_after_preexec(self):
3064        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
3065
3066        # this FD is used as dup2() target by preexec_fn, and should be closed
3067        # in the child process
3068        fd = os.dup(1)
3069        self.addCleanup(os.close, fd)
3070
3071        p = subprocess.Popen([sys.executable, fd_status],
3072                             stdout=subprocess.PIPE, close_fds=True,
3073                             preexec_fn=lambda: os.dup2(1, fd))
3074        output, ignored = p.communicate()
3075
3076        remaining_fds = set(map(int, output.split(b',')))
3077
3078        self.assertNotIn(fd, remaining_fds)
3079
3080    @support.cpython_only
3081    def test_fork_exec(self):
3082        # Issue #22290: fork_exec() must not crash on memory allocation failure
3083        # or other errors
3084        import _posixsubprocess
3085        gc_enabled = gc.isenabled()
3086        try:
3087            # Use a preexec function and enable the garbage collector
3088            # to force fork_exec() to re-enable the garbage collector
3089            # on error.
3090            func = lambda: None
3091            gc.enable()
3092
3093            for args, exe_list, cwd, env_list in (
3094                (123,      [b"exe"], None, [b"env"]),
3095                ([b"arg"], 123,      None, [b"env"]),
3096                ([b"arg"], [b"exe"], 123,  [b"env"]),
3097                ([b"arg"], [b"exe"], None, 123),
3098            ):
3099                with self.assertRaises(TypeError) as err:
3100                    _posixsubprocess.fork_exec(
3101                        args, exe_list,
3102                        True, (), cwd, env_list,
3103                        -1, -1, -1, -1,
3104                        1, 2, 3, 4,
3105                        True, True,
3106                        False, [], 0, -1,
3107                        func)
3108                # Attempt to prevent
3109                # "TypeError: fork_exec() takes exactly N arguments (M given)"
3110                # from passing the test.  More refactoring to have us start
3111                # with a valid *args list, confirm a good call with that works
3112                # before mutating it in various ways to ensure that bad calls
3113                # with individual arg type errors raise a typeerror would be
3114                # ideal.  Saving that for a future PR...
3115                self.assertNotIn('takes exactly', str(err.exception))
3116        finally:
3117            if not gc_enabled:
3118                gc.disable()
3119
3120    @support.cpython_only
3121    def test_fork_exec_sorted_fd_sanity_check(self):
3122        # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
3123        import _posixsubprocess
3124        class BadInt:
3125            first = True
3126            def __init__(self, value):
3127                self.value = value
3128            def __int__(self):
3129                if self.first:
3130                    self.first = False
3131                    return self.value
3132                raise ValueError
3133
3134        gc_enabled = gc.isenabled()
3135        try:
3136            gc.enable()
3137
3138            for fds_to_keep in (
3139                (-1, 2, 3, 4, 5),  # Negative number.
3140                ('str', 4),  # Not an int.
3141                (18, 23, 42, 2**63),  # Out of range.
3142                (5, 4),  # Not sorted.
3143                (6, 7, 7, 8),  # Duplicate.
3144                (BadInt(1), BadInt(2)),
3145            ):
3146                with self.assertRaises(
3147                        ValueError,
3148                        msg='fds_to_keep={}'.format(fds_to_keep)) as c:
3149                    _posixsubprocess.fork_exec(
3150                        [b"false"], [b"false"],
3151                        True, fds_to_keep, None, [b"env"],
3152                        -1, -1, -1, -1,
3153                        1, 2, 3, 4,
3154                        True, True,
3155                        None, None, None, -1,
3156                        None)
3157                self.assertIn('fds_to_keep', str(c.exception))
3158        finally:
3159            if not gc_enabled:
3160                gc.disable()
3161
3162    def test_communicate_BrokenPipeError_stdin_close(self):
3163        # By not setting stdout or stderr or a timeout we force the fast path
3164        # that just calls _stdin_write() internally due to our mock.
3165        proc = subprocess.Popen(ZERO_RETURN_CMD)
3166        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3167            mock_proc_stdin.close.side_effect = BrokenPipeError
3168            proc.communicate()  # Should swallow BrokenPipeError from close.
3169            mock_proc_stdin.close.assert_called_with()
3170
3171    def test_communicate_BrokenPipeError_stdin_write(self):
3172        # By not setting stdout or stderr or a timeout we force the fast path
3173        # that just calls _stdin_write() internally due to our mock.
3174        proc = subprocess.Popen(ZERO_RETURN_CMD)
3175        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3176            mock_proc_stdin.write.side_effect = BrokenPipeError
3177            proc.communicate(b'stuff')  # Should swallow the BrokenPipeError.
3178            mock_proc_stdin.write.assert_called_once_with(b'stuff')
3179            mock_proc_stdin.close.assert_called_once_with()
3180
3181    def test_communicate_BrokenPipeError_stdin_flush(self):
3182        # Setting stdin and stdout forces the ._communicate() code path.
3183        # python -h exits faster than python -c pass (but spams stdout).
3184        proc = subprocess.Popen([sys.executable, '-h'],
3185                                stdin=subprocess.PIPE,
3186                                stdout=subprocess.PIPE)
3187        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
3188                open(os.devnull, 'wb') as dev_null:
3189            mock_proc_stdin.flush.side_effect = BrokenPipeError
3190            # because _communicate registers a selector using proc.stdin...
3191            mock_proc_stdin.fileno.return_value = dev_null.fileno()
3192            # _communicate() should swallow BrokenPipeError from flush.
3193            proc.communicate(b'stuff')
3194            mock_proc_stdin.flush.assert_called_once_with()
3195
3196    def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
3197        # Setting stdin and stdout forces the ._communicate() code path.
3198        # python -h exits faster than python -c pass (but spams stdout).
3199        proc = subprocess.Popen([sys.executable, '-h'],
3200                                stdin=subprocess.PIPE,
3201                                stdout=subprocess.PIPE)
3202        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3203            mock_proc_stdin.close.side_effect = BrokenPipeError
3204            # _communicate() should swallow BrokenPipeError from close.
3205            proc.communicate(timeout=999)
3206            mock_proc_stdin.close.assert_called_once_with()
3207
3208    @unittest.skipUnless(_testcapi is not None
3209                         and hasattr(_testcapi, 'W_STOPCODE'),
3210                         'need _testcapi.W_STOPCODE')
3211    def test_stopped(self):
3212        """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
3213        args = ZERO_RETURN_CMD
3214        proc = subprocess.Popen(args)
3215
3216        # Wait until the real process completes to avoid zombie process
3217        support.wait_process(proc.pid, exitcode=0)
3218
3219        status = _testcapi.W_STOPCODE(3)
3220        with mock.patch('subprocess.os.waitpid', return_value=(proc.pid, status)):
3221            returncode = proc.wait()
3222
3223        self.assertEqual(returncode, -3)
3224
3225    def test_send_signal_race(self):
3226        # bpo-38630: send_signal() must poll the process exit status to reduce
3227        # the risk of sending the signal to the wrong process.
3228        proc = subprocess.Popen(ZERO_RETURN_CMD)
3229
3230        # wait until the process completes without using the Popen APIs.
3231        support.wait_process(proc.pid, exitcode=0)
3232
3233        # returncode is still None but the process completed.
3234        self.assertIsNone(proc.returncode)
3235
3236        with mock.patch("os.kill") as mock_kill:
3237            proc.send_signal(signal.SIGTERM)
3238
3239        # send_signal() didn't call os.kill() since the process already
3240        # completed.
3241        mock_kill.assert_not_called()
3242
3243        # Don't check the returncode value: the test reads the exit status,
3244        # so Popen failed to read it and uses a default returncode instead.
3245        self.assertIsNotNone(proc.returncode)
3246
3247    def test_send_signal_race2(self):
3248        # bpo-40550: the process might exist between the returncode check and
3249        # the kill operation
3250        p = subprocess.Popen([sys.executable, '-c', 'exit(1)'])
3251
3252        # wait for process to exit
3253        while not p.returncode:
3254            p.poll()
3255
3256        with mock.patch.object(p, 'poll', new=lambda: None):
3257            p.returncode = None
3258            p.send_signal(signal.SIGTERM)
3259        p.kill()
3260
3261    def test_communicate_repeated_call_after_stdout_close(self):
3262        proc = subprocess.Popen([sys.executable, '-c',
3263                                 'import os, time; os.close(1), time.sleep(2)'],
3264                                stdout=subprocess.PIPE)
3265        while True:
3266            try:
3267                proc.communicate(timeout=0.1)
3268                return
3269            except subprocess.TimeoutExpired:
3270                pass
3271
3272
3273@unittest.skipUnless(mswindows, "Windows specific tests")
3274class Win32ProcessTestCase(BaseTestCase):
3275
3276    def test_startupinfo(self):
3277        # startupinfo argument
3278        # We uses hardcoded constants, because we do not want to
3279        # depend on win32all.
3280        STARTF_USESHOWWINDOW = 1
3281        SW_MAXIMIZE = 3
3282        startupinfo = subprocess.STARTUPINFO()
3283        startupinfo.dwFlags = STARTF_USESHOWWINDOW
3284        startupinfo.wShowWindow = SW_MAXIMIZE
3285        # Since Python is a console process, it won't be affected
3286        # by wShowWindow, but the argument should be silently
3287        # ignored
3288        subprocess.call(ZERO_RETURN_CMD,
3289                        startupinfo=startupinfo)
3290
3291    def test_startupinfo_keywords(self):
3292        # startupinfo argument
3293        # We use hardcoded constants, because we do not want to
3294        # depend on win32all.
3295        STARTF_USERSHOWWINDOW = 1
3296        SW_MAXIMIZE = 3
3297        startupinfo = subprocess.STARTUPINFO(
3298            dwFlags=STARTF_USERSHOWWINDOW,
3299            wShowWindow=SW_MAXIMIZE
3300        )
3301        # Since Python is a console process, it won't be affected
3302        # by wShowWindow, but the argument should be silently
3303        # ignored
3304        subprocess.call(ZERO_RETURN_CMD,
3305                        startupinfo=startupinfo)
3306
3307    def test_startupinfo_copy(self):
3308        # bpo-34044: Popen must not modify input STARTUPINFO structure
3309        startupinfo = subprocess.STARTUPINFO()
3310        startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
3311        startupinfo.wShowWindow = subprocess.SW_HIDE
3312
3313        # Call Popen() twice with the same startupinfo object to make sure
3314        # that it's not modified
3315        for _ in range(2):
3316            cmd = ZERO_RETURN_CMD
3317            with open(os.devnull, 'w') as null:
3318                proc = subprocess.Popen(cmd,
3319                                        stdout=null,
3320                                        stderr=subprocess.STDOUT,
3321                                        startupinfo=startupinfo)
3322                with proc:
3323                    proc.communicate()
3324                self.assertEqual(proc.returncode, 0)
3325
3326            self.assertEqual(startupinfo.dwFlags,
3327                             subprocess.STARTF_USESHOWWINDOW)
3328            self.assertIsNone(startupinfo.hStdInput)
3329            self.assertIsNone(startupinfo.hStdOutput)
3330            self.assertIsNone(startupinfo.hStdError)
3331            self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE)
3332            self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []})
3333
3334    def test_creationflags(self):
3335        # creationflags argument
3336        CREATE_NEW_CONSOLE = 16
3337        sys.stderr.write("    a DOS box should flash briefly ...\n")
3338        subprocess.call(sys.executable +
3339                        ' -c "import time; time.sleep(0.25)"',
3340                        creationflags=CREATE_NEW_CONSOLE)
3341
3342    def test_invalid_args(self):
3343        # invalid arguments should raise ValueError
3344        self.assertRaises(ValueError, subprocess.call,
3345                          [sys.executable, "-c",
3346                           "import sys; sys.exit(47)"],
3347                          preexec_fn=lambda: 1)
3348
3349    @support.cpython_only
3350    def test_issue31471(self):
3351        # There shouldn't be an assertion failure in Popen() in case the env
3352        # argument has a bad keys() method.
3353        class BadEnv(dict):
3354            keys = None
3355        with self.assertRaises(TypeError):
3356            subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv())
3357
3358    def test_close_fds(self):
3359        # close file descriptors
3360        rc = subprocess.call([sys.executable, "-c",
3361                              "import sys; sys.exit(47)"],
3362                              close_fds=True)
3363        self.assertEqual(rc, 47)
3364
3365    def test_close_fds_with_stdio(self):
3366        import msvcrt
3367
3368        fds = os.pipe()
3369        self.addCleanup(os.close, fds[0])
3370        self.addCleanup(os.close, fds[1])
3371
3372        handles = []
3373        for fd in fds:
3374            os.set_inheritable(fd, True)
3375            handles.append(msvcrt.get_osfhandle(fd))
3376
3377        p = subprocess.Popen([sys.executable, "-c",
3378                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3379                             stdout=subprocess.PIPE, close_fds=False)
3380        stdout, stderr = p.communicate()
3381        self.assertEqual(p.returncode, 0)
3382        int(stdout.strip())  # Check that stdout is an integer
3383
3384        p = subprocess.Popen([sys.executable, "-c",
3385                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3386                             stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
3387        stdout, stderr = p.communicate()
3388        self.assertEqual(p.returncode, 1)
3389        self.assertIn(b"OSError", stderr)
3390
3391        # The same as the previous call, but with an empty handle_list
3392        handle_list = []
3393        startupinfo = subprocess.STARTUPINFO()
3394        startupinfo.lpAttributeList = {"handle_list": handle_list}
3395        p = subprocess.Popen([sys.executable, "-c",
3396                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3397                             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3398                             startupinfo=startupinfo, close_fds=True)
3399        stdout, stderr = p.communicate()
3400        self.assertEqual(p.returncode, 1)
3401        self.assertIn(b"OSError", stderr)
3402
3403        # Check for a warning due to using handle_list and close_fds=False
3404        with warnings_helper.check_warnings((".*overriding close_fds",
3405                                             RuntimeWarning)):
3406            startupinfo = subprocess.STARTUPINFO()
3407            startupinfo.lpAttributeList = {"handle_list": handles[:]}
3408            p = subprocess.Popen([sys.executable, "-c",
3409                                  "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3410                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3411                                 startupinfo=startupinfo, close_fds=False)
3412            stdout, stderr = p.communicate()
3413            self.assertEqual(p.returncode, 0)
3414
3415    def test_empty_attribute_list(self):
3416        startupinfo = subprocess.STARTUPINFO()
3417        startupinfo.lpAttributeList = {}
3418        subprocess.call(ZERO_RETURN_CMD,
3419                        startupinfo=startupinfo)
3420
3421    def test_empty_handle_list(self):
3422        startupinfo = subprocess.STARTUPINFO()
3423        startupinfo.lpAttributeList = {"handle_list": []}
3424        subprocess.call(ZERO_RETURN_CMD,
3425                        startupinfo=startupinfo)
3426
3427    def test_shell_sequence(self):
3428        # Run command through the shell (sequence)
3429        newenv = os.environ.copy()
3430        newenv["FRUIT"] = "physalis"
3431        p = subprocess.Popen(["set"], shell=1,
3432                             stdout=subprocess.PIPE,
3433                             env=newenv)
3434        with p:
3435            self.assertIn(b"physalis", p.stdout.read())
3436
3437    def test_shell_string(self):
3438        # Run command through the shell (string)
3439        newenv = os.environ.copy()
3440        newenv["FRUIT"] = "physalis"
3441        p = subprocess.Popen("set", shell=1,
3442                             stdout=subprocess.PIPE,
3443                             env=newenv)
3444        with p:
3445            self.assertIn(b"physalis", p.stdout.read())
3446
3447    def test_shell_encodings(self):
3448        # Run command through the shell (string)
3449        for enc in ['ansi', 'oem']:
3450            newenv = os.environ.copy()
3451            newenv["FRUIT"] = "physalis"
3452            p = subprocess.Popen("set", shell=1,
3453                                 stdout=subprocess.PIPE,
3454                                 env=newenv,
3455                                 encoding=enc)
3456            with p:
3457                self.assertIn("physalis", p.stdout.read(), enc)
3458
3459    def test_call_string(self):
3460        # call() function with string argument on Windows
3461        rc = subprocess.call(sys.executable +
3462                             ' -c "import sys; sys.exit(47)"')
3463        self.assertEqual(rc, 47)
3464
3465    def _kill_process(self, method, *args):
3466        # Some win32 buildbot raises EOFError if stdin is inherited
3467        p = subprocess.Popen([sys.executable, "-c", """if 1:
3468                             import sys, time
3469                             sys.stdout.write('x\\n')
3470                             sys.stdout.flush()
3471                             time.sleep(30)
3472                             """],
3473                             stdin=subprocess.PIPE,
3474                             stdout=subprocess.PIPE,
3475                             stderr=subprocess.PIPE)
3476        with p:
3477            # Wait for the interpreter to be completely initialized before
3478            # sending any signal.
3479            p.stdout.read(1)
3480            getattr(p, method)(*args)
3481            _, stderr = p.communicate()
3482            self.assertEqual(stderr, b'')
3483            returncode = p.wait()
3484        self.assertNotEqual(returncode, 0)
3485
3486    def _kill_dead_process(self, method, *args):
3487        p = subprocess.Popen([sys.executable, "-c", """if 1:
3488                             import sys, time
3489                             sys.stdout.write('x\\n')
3490                             sys.stdout.flush()
3491                             sys.exit(42)
3492                             """],
3493                             stdin=subprocess.PIPE,
3494                             stdout=subprocess.PIPE,
3495                             stderr=subprocess.PIPE)
3496        with p:
3497            # Wait for the interpreter to be completely initialized before
3498            # sending any signal.
3499            p.stdout.read(1)
3500            # The process should end after this
3501            time.sleep(1)
3502            # This shouldn't raise even though the child is now dead
3503            getattr(p, method)(*args)
3504            _, stderr = p.communicate()
3505            self.assertEqual(stderr, b'')
3506            rc = p.wait()
3507        self.assertEqual(rc, 42)
3508
3509    def test_send_signal(self):
3510        self._kill_process('send_signal', signal.SIGTERM)
3511
3512    def test_kill(self):
3513        self._kill_process('kill')
3514
3515    def test_terminate(self):
3516        self._kill_process('terminate')
3517
3518    def test_send_signal_dead(self):
3519        self._kill_dead_process('send_signal', signal.SIGTERM)
3520
3521    def test_kill_dead(self):
3522        self._kill_dead_process('kill')
3523
3524    def test_terminate_dead(self):
3525        self._kill_dead_process('terminate')
3526
3527class MiscTests(unittest.TestCase):
3528
3529    class RecordingPopen(subprocess.Popen):
3530        """A Popen that saves a reference to each instance for testing."""
3531        instances_created = []
3532
3533        def __init__(self, *args, **kwargs):
3534            super().__init__(*args, **kwargs)
3535            self.instances_created.append(self)
3536
3537    @mock.patch.object(subprocess.Popen, "_communicate")
3538    def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate,
3539                                        **kwargs):
3540        """Fake a SIGINT happening during Popen._communicate() and ._wait().
3541
3542        This avoids the need to actually try and get test environments to send
3543        and receive signals reliably across platforms.  The net effect of a ^C
3544        happening during a blocking subprocess execution which we want to clean
3545        up from is a KeyboardInterrupt coming out of communicate() or wait().
3546        """
3547
3548        mock__communicate.side_effect = KeyboardInterrupt
3549        try:
3550            with mock.patch.object(subprocess.Popen, "_wait") as mock__wait:
3551                # We patch out _wait() as no signal was involved so the
3552                # child process isn't actually going to exit rapidly.
3553                mock__wait.side_effect = KeyboardInterrupt
3554                with mock.patch.object(subprocess, "Popen",
3555                                       self.RecordingPopen):
3556                    with self.assertRaises(KeyboardInterrupt):
3557                        popener([sys.executable, "-c",
3558                                 "import time\ntime.sleep(9)\nimport sys\n"
3559                                 "sys.stderr.write('\\n!runaway child!\\n')"],
3560                                stdout=subprocess.DEVNULL, **kwargs)
3561                for call in mock__wait.call_args_list[1:]:
3562                    self.assertNotEqual(
3563                            call, mock.call(timeout=None),
3564                            "no open-ended wait() after the first allowed: "
3565                            f"{mock__wait.call_args_list}")
3566                sigint_calls = []
3567                for call in mock__wait.call_args_list:
3568                    if call == mock.call(timeout=0.25):  # from Popen.__init__
3569                        sigint_calls.append(call)
3570                self.assertLessEqual(mock__wait.call_count, 2,
3571                                     msg=mock__wait.call_args_list)
3572                self.assertEqual(len(sigint_calls), 1,
3573                                 msg=mock__wait.call_args_list)
3574        finally:
3575            # cleanup the forgotten (due to our mocks) child process
3576            process = self.RecordingPopen.instances_created.pop()
3577            process.kill()
3578            process.wait()
3579            self.assertEqual([], self.RecordingPopen.instances_created)
3580
3581    def test_call_keyboardinterrupt_no_kill(self):
3582        self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282)
3583
3584    def test_run_keyboardinterrupt_no_kill(self):
3585        self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282)
3586
3587    def test_context_manager_keyboardinterrupt_no_kill(self):
3588        def popen_via_context_manager(*args, **kwargs):
3589            with subprocess.Popen(*args, **kwargs) as unused_process:
3590                raise KeyboardInterrupt  # Test how __exit__ handles ^C.
3591        self._test_keyboardinterrupt_no_kill(popen_via_context_manager)
3592
3593    def test_getoutput(self):
3594        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
3595        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
3596                         (0, 'xyzzy'))
3597
3598        # we use mkdtemp in the next line to create an empty directory
3599        # under our exclusive control; from that, we can invent a pathname
3600        # that we _know_ won't exist.  This is guaranteed to fail.
3601        dir = None
3602        try:
3603            dir = tempfile.mkdtemp()
3604            name = os.path.join(dir, "foo")
3605            status, output = subprocess.getstatusoutput(
3606                ("type " if mswindows else "cat ") + name)
3607            self.assertNotEqual(status, 0)
3608        finally:
3609            if dir is not None:
3610                os.rmdir(dir)
3611
3612    def test__all__(self):
3613        """Ensure that __all__ is populated properly."""
3614        intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp", "fcntl"}
3615        exported = set(subprocess.__all__)
3616        possible_exports = set()
3617        import types
3618        for name, value in subprocess.__dict__.items():
3619            if name.startswith('_'):
3620                continue
3621            if isinstance(value, (types.ModuleType,)):
3622                continue
3623            possible_exports.add(name)
3624        self.assertEqual(exported, possible_exports - intentionally_excluded)
3625
3626
3627@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
3628                     "Test needs selectors.PollSelector")
3629class ProcessTestCaseNoPoll(ProcessTestCase):
3630    def setUp(self):
3631        self.orig_selector = subprocess._PopenSelector
3632        subprocess._PopenSelector = selectors.SelectSelector
3633        ProcessTestCase.setUp(self)
3634
3635    def tearDown(self):
3636        subprocess._PopenSelector = self.orig_selector
3637        ProcessTestCase.tearDown(self)
3638
3639
3640@unittest.skipUnless(mswindows, "Windows-specific tests")
3641class CommandsWithSpaces (BaseTestCase):
3642
3643    def setUp(self):
3644        super().setUp()
3645        f, fname = tempfile.mkstemp(".py", "te st")
3646        self.fname = fname.lower ()
3647        os.write(f, b"import sys;"
3648                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
3649        )
3650        os.close(f)
3651
3652    def tearDown(self):
3653        os.remove(self.fname)
3654        super().tearDown()
3655
3656    def with_spaces(self, *args, **kwargs):
3657        kwargs['stdout'] = subprocess.PIPE
3658        p = subprocess.Popen(*args, **kwargs)
3659        with p:
3660            self.assertEqual(
3661              p.stdout.read ().decode("mbcs"),
3662              "2 [%r, 'ab cd']" % self.fname
3663            )
3664
3665    def test_shell_string_with_spaces(self):
3666        # call() function with string argument with spaces on Windows
3667        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3668                                             "ab cd"), shell=1)
3669
3670    def test_shell_sequence_with_spaces(self):
3671        # call() function with sequence argument with spaces on Windows
3672        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
3673
3674    def test_noshell_string_with_spaces(self):
3675        # call() function with string argument with spaces on Windows
3676        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3677                             "ab cd"))
3678
3679    def test_noshell_sequence_with_spaces(self):
3680        # call() function with sequence argument with spaces on Windows
3681        self.with_spaces([sys.executable, self.fname, "ab cd"])
3682
3683
3684class ContextManagerTests(BaseTestCase):
3685
3686    def test_pipe(self):
3687        with subprocess.Popen([sys.executable, "-c",
3688                               "import sys;"
3689                               "sys.stdout.write('stdout');"
3690                               "sys.stderr.write('stderr');"],
3691                              stdout=subprocess.PIPE,
3692                              stderr=subprocess.PIPE) as proc:
3693            self.assertEqual(proc.stdout.read(), b"stdout")
3694            self.assertEqual(proc.stderr.read(), b"stderr")
3695
3696        self.assertTrue(proc.stdout.closed)
3697        self.assertTrue(proc.stderr.closed)
3698
3699    def test_returncode(self):
3700        with subprocess.Popen([sys.executable, "-c",
3701                               "import sys; sys.exit(100)"]) as proc:
3702            pass
3703        # __exit__ calls wait(), so the returncode should be set
3704        self.assertEqual(proc.returncode, 100)
3705
3706    def test_communicate_stdin(self):
3707        with subprocess.Popen([sys.executable, "-c",
3708                              "import sys;"
3709                              "sys.exit(sys.stdin.read() == 'context')"],
3710                             stdin=subprocess.PIPE) as proc:
3711            proc.communicate(b"context")
3712            self.assertEqual(proc.returncode, 1)
3713
3714    def test_invalid_args(self):
3715        with self.assertRaises(NONEXISTING_ERRORS):
3716            with subprocess.Popen(NONEXISTING_CMD,
3717                                  stdout=subprocess.PIPE,
3718                                  stderr=subprocess.PIPE) as proc:
3719                pass
3720
3721    def test_broken_pipe_cleanup(self):
3722        """Broken pipe error should not prevent wait() (Issue 21619)"""
3723        proc = subprocess.Popen(ZERO_RETURN_CMD,
3724                                stdin=subprocess.PIPE,
3725                                bufsize=support.PIPE_MAX_SIZE*2)
3726        proc = proc.__enter__()
3727        # Prepare to send enough data to overflow any OS pipe buffering and
3728        # guarantee a broken pipe error. Data is held in BufferedWriter
3729        # buffer until closed.
3730        proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
3731        self.assertIsNone(proc.returncode)
3732        # EPIPE expected under POSIX; EINVAL under Windows
3733        self.assertRaises(OSError, proc.__exit__, None, None, None)
3734        self.assertEqual(proc.returncode, 0)
3735        self.assertTrue(proc.stdin.closed)
3736
3737
3738if __name__ == "__main__":
3739    unittest.main()
3740