1import unittest
2from unittest import mock
3from test import support
4import subprocess
5import sys
6import signal
7import io
8import itertools
9import os
10import errno
11import tempfile
12import time
13import traceback
14import selectors
15import sysconfig
16import select
17import shutil
18import threading
19import gc
20import textwrap
21from test.support import FakePath
22
23try:
24    import _testcapi
25except ImportError:
26    _testcapi = None
27
28
29if support.PGO:
30    raise unittest.SkipTest("test is not helpful for PGO")
31
32mswindows = (sys.platform == "win32")
33
34#
35# Depends on the following external programs: Python
36#
37
38if mswindows:
39    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
40                                                'os.O_BINARY);')
41else:
42    SETBINARY = ''
43
44NONEXISTING_CMD = ('nonexisting_i_hope',)
45# Ignore errors that indicate the command was not found
46NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError)
47
48ZERO_RETURN_CMD = (sys.executable, '-c', 'pass')
49
50
51def setUpModule():
52    shell_true = shutil.which('true')
53    if shell_true is None:
54        return
55    if (os.access(shell_true, os.X_OK) and
56        subprocess.run([shell_true]).returncode == 0):
57        global ZERO_RETURN_CMD
58        ZERO_RETURN_CMD = (shell_true,)  # Faster than Python startup.
59
60
61class BaseTestCase(unittest.TestCase):
62    def setUp(self):
63        # Try to minimize the number of children we have so this test
64        # doesn't crash on some buildbots (Alphas in particular).
65        support.reap_children()
66
67    def tearDown(self):
68        if not mswindows:
69            # subprocess._active is not used on Windows and is set to None.
70            for inst in subprocess._active:
71                inst.wait()
72            subprocess._cleanup()
73            self.assertFalse(
74                subprocess._active, "subprocess._active not empty"
75            )
76        self.doCleanups()
77        support.reap_children()
78
79    def assertStderrEqual(self, stderr, expected, msg=None):
80        # In a debug build, stuff like "[6580 refs]" is printed to stderr at
81        # shutdown time.  That frustrates tests trying to check stderr produced
82        # from a spawned Python process.
83        actual = support.strip_python_stderr(stderr)
84        # strip_python_stderr also strips whitespace, so we do too.
85        expected = expected.strip()
86        self.assertEqual(actual, expected, msg)
87
88
89class PopenTestException(Exception):
90    pass
91
92
93class PopenExecuteChildRaises(subprocess.Popen):
94    """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
95    _execute_child fails.
96    """
97    def _execute_child(self, *args, **kwargs):
98        raise PopenTestException("Forced Exception for Test")
99
100
101class ProcessTestCase(BaseTestCase):
102
103    def test_io_buffered_by_default(self):
104        p = subprocess.Popen(ZERO_RETURN_CMD,
105                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
106                             stderr=subprocess.PIPE)
107        try:
108            self.assertIsInstance(p.stdin, io.BufferedIOBase)
109            self.assertIsInstance(p.stdout, io.BufferedIOBase)
110            self.assertIsInstance(p.stderr, io.BufferedIOBase)
111        finally:
112            p.stdin.close()
113            p.stdout.close()
114            p.stderr.close()
115            p.wait()
116
117    def test_io_unbuffered_works(self):
118        p = subprocess.Popen(ZERO_RETURN_CMD,
119                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
120                             stderr=subprocess.PIPE, bufsize=0)
121        try:
122            self.assertIsInstance(p.stdin, io.RawIOBase)
123            self.assertIsInstance(p.stdout, io.RawIOBase)
124            self.assertIsInstance(p.stderr, io.RawIOBase)
125        finally:
126            p.stdin.close()
127            p.stdout.close()
128            p.stderr.close()
129            p.wait()
130
131    def test_call_seq(self):
132        # call() function with sequence argument
133        rc = subprocess.call([sys.executable, "-c",
134                              "import sys; sys.exit(47)"])
135        self.assertEqual(rc, 47)
136
137    def test_call_timeout(self):
138        # call() function with timeout argument; we want to test that the child
139        # process gets killed when the timeout expires.  If the child isn't
140        # killed, this call will deadlock since subprocess.call waits for the
141        # child.
142        self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
143                          [sys.executable, "-c", "while True: pass"],
144                          timeout=0.1)
145
146    def test_check_call_zero(self):
147        # check_call() function with zero return code
148        rc = subprocess.check_call(ZERO_RETURN_CMD)
149        self.assertEqual(rc, 0)
150
151    def test_check_call_nonzero(self):
152        # check_call() function with non-zero return code
153        with self.assertRaises(subprocess.CalledProcessError) as c:
154            subprocess.check_call([sys.executable, "-c",
155                                   "import sys; sys.exit(47)"])
156        self.assertEqual(c.exception.returncode, 47)
157
158    def test_check_output(self):
159        # check_output() function with zero return code
160        output = subprocess.check_output(
161                [sys.executable, "-c", "print('BDFL')"])
162        self.assertIn(b'BDFL', output)
163
164    def test_check_output_nonzero(self):
165        # check_call() function with non-zero return code
166        with self.assertRaises(subprocess.CalledProcessError) as c:
167            subprocess.check_output(
168                    [sys.executable, "-c", "import sys; sys.exit(5)"])
169        self.assertEqual(c.exception.returncode, 5)
170
171    def test_check_output_stderr(self):
172        # check_output() function stderr redirected to stdout
173        output = subprocess.check_output(
174                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
175                stderr=subprocess.STDOUT)
176        self.assertIn(b'BDFL', output)
177
178    def test_check_output_stdin_arg(self):
179        # check_output() can be called with stdin set to a file
180        tf = tempfile.TemporaryFile()
181        self.addCleanup(tf.close)
182        tf.write(b'pear')
183        tf.seek(0)
184        output = subprocess.check_output(
185                [sys.executable, "-c",
186                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
187                stdin=tf)
188        self.assertIn(b'PEAR', output)
189
190    def test_check_output_input_arg(self):
191        # check_output() can be called with input set to a string
192        output = subprocess.check_output(
193                [sys.executable, "-c",
194                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
195                input=b'pear')
196        self.assertIn(b'PEAR', output)
197
198    def test_check_output_input_none(self):
199        """input=None has a legacy meaning of input='' on check_output."""
200        output = subprocess.check_output(
201                [sys.executable, "-c",
202                 "import sys; print('XX' if sys.stdin.read() else '')"],
203                input=None)
204        self.assertNotIn(b'XX', output)
205
206    def test_check_output_input_none_text(self):
207        output = subprocess.check_output(
208                [sys.executable, "-c",
209                 "import sys; print('XX' if sys.stdin.read() else '')"],
210                input=None, text=True)
211        self.assertNotIn('XX', output)
212
213    def test_check_output_input_none_universal_newlines(self):
214        output = subprocess.check_output(
215                [sys.executable, "-c",
216                 "import sys; print('XX' if sys.stdin.read() else '')"],
217                input=None, universal_newlines=True)
218        self.assertNotIn('XX', output)
219
220    def test_check_output_stdout_arg(self):
221        # check_output() refuses to accept 'stdout' argument
222        with self.assertRaises(ValueError) as c:
223            output = subprocess.check_output(
224                    [sys.executable, "-c", "print('will not be run')"],
225                    stdout=sys.stdout)
226            self.fail("Expected ValueError when stdout arg supplied.")
227        self.assertIn('stdout', c.exception.args[0])
228
229    def test_check_output_stdin_with_input_arg(self):
230        # check_output() refuses to accept 'stdin' with 'input'
231        tf = tempfile.TemporaryFile()
232        self.addCleanup(tf.close)
233        tf.write(b'pear')
234        tf.seek(0)
235        with self.assertRaises(ValueError) as c:
236            output = subprocess.check_output(
237                    [sys.executable, "-c", "print('will not be run')"],
238                    stdin=tf, input=b'hare')
239            self.fail("Expected ValueError when stdin and input args supplied.")
240        self.assertIn('stdin', c.exception.args[0])
241        self.assertIn('input', c.exception.args[0])
242
243    def test_check_output_timeout(self):
244        # check_output() function with timeout arg
245        with self.assertRaises(subprocess.TimeoutExpired) as c:
246            output = subprocess.check_output(
247                    [sys.executable, "-c",
248                     "import sys, time\n"
249                     "sys.stdout.write('BDFL')\n"
250                     "sys.stdout.flush()\n"
251                     "time.sleep(3600)"],
252                    # Some heavily loaded buildbots (sparc Debian 3.x) require
253                    # this much time to start and print.
254                    timeout=3)
255            self.fail("Expected TimeoutExpired.")
256        self.assertEqual(c.exception.output, b'BDFL')
257
258    def test_call_kwargs(self):
259        # call() function with keyword args
260        newenv = os.environ.copy()
261        newenv["FRUIT"] = "banana"
262        rc = subprocess.call([sys.executable, "-c",
263                              'import sys, os;'
264                              'sys.exit(os.getenv("FRUIT")=="banana")'],
265                             env=newenv)
266        self.assertEqual(rc, 1)
267
268    def test_invalid_args(self):
269        # Popen() called with invalid arguments should raise TypeError
270        # but Popen.__del__ should not complain (issue #12085)
271        with support.captured_stderr() as s:
272            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
273            argcount = subprocess.Popen.__init__.__code__.co_argcount
274            too_many_args = [0] * (argcount + 1)
275            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
276        self.assertEqual(s.getvalue(), '')
277
278    def test_stdin_none(self):
279        # .stdin is None when not redirected
280        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
281                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
282        self.addCleanup(p.stdout.close)
283        self.addCleanup(p.stderr.close)
284        p.wait()
285        self.assertEqual(p.stdin, None)
286
287    def test_stdout_none(self):
288        # .stdout is None when not redirected, and the child's stdout will
289        # be inherited from the parent.  In order to test this we run a
290        # subprocess in a subprocess:
291        # this_test
292        #   \-- subprocess created by this test (parent)
293        #          \-- subprocess created by the parent subprocess (child)
294        # The parent doesn't specify stdout, so the child will use the
295        # parent's stdout.  This test checks that the message printed by the
296        # child goes to the parent stdout.  The parent also checks that the
297        # child's stdout is None.  See #11963.
298        code = ('import sys; from subprocess import Popen, PIPE;'
299                'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
300                '          stdin=PIPE, stderr=PIPE);'
301                'p.wait(); assert p.stdout is None;')
302        p = subprocess.Popen([sys.executable, "-c", code],
303                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
304        self.addCleanup(p.stdout.close)
305        self.addCleanup(p.stderr.close)
306        out, err = p.communicate()
307        self.assertEqual(p.returncode, 0, err)
308        self.assertEqual(out.rstrip(), b'test_stdout_none')
309
310    def test_stderr_none(self):
311        # .stderr is None when not redirected
312        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
313                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
314        self.addCleanup(p.stdout.close)
315        self.addCleanup(p.stdin.close)
316        p.wait()
317        self.assertEqual(p.stderr, None)
318
319    def _assert_python(self, pre_args, **kwargs):
320        # We include sys.exit() to prevent the test runner from hanging
321        # whenever python is found.
322        args = pre_args + ["import sys; sys.exit(47)"]
323        p = subprocess.Popen(args, **kwargs)
324        p.wait()
325        self.assertEqual(47, p.returncode)
326
327    def test_executable(self):
328        # Check that the executable argument works.
329        #
330        # On Unix (non-Mac and non-Windows), Python looks at args[0] to
331        # determine where its standard library is, so we need the directory
332        # of args[0] to be valid for the Popen() call to Python to succeed.
333        # See also issue #16170 and issue #7774.
334        doesnotexist = os.path.join(os.path.dirname(sys.executable),
335                                    "doesnotexist")
336        self._assert_python([doesnotexist, "-c"], executable=sys.executable)
337
338    def test_bytes_executable(self):
339        doesnotexist = os.path.join(os.path.dirname(sys.executable),
340                                    "doesnotexist")
341        self._assert_python([doesnotexist, "-c"],
342                            executable=os.fsencode(sys.executable))
343
344    def test_pathlike_executable(self):
345        doesnotexist = os.path.join(os.path.dirname(sys.executable),
346                                    "doesnotexist")
347        self._assert_python([doesnotexist, "-c"],
348                            executable=FakePath(sys.executable))
349
350    def test_executable_takes_precedence(self):
351        # Check that the executable argument takes precedence over args[0].
352        #
353        # Verify first that the call succeeds without the executable arg.
354        pre_args = [sys.executable, "-c"]
355        self._assert_python(pre_args)
356        self.assertRaises(NONEXISTING_ERRORS,
357                          self._assert_python, pre_args,
358                          executable=NONEXISTING_CMD[0])
359
360    @unittest.skipIf(mswindows, "executable argument replaces shell")
361    def test_executable_replaces_shell(self):
362        # Check that the executable argument replaces the default shell
363        # when shell=True.
364        self._assert_python([], executable=sys.executable, shell=True)
365
366    @unittest.skipIf(mswindows, "executable argument replaces shell")
367    def test_bytes_executable_replaces_shell(self):
368        self._assert_python([], executable=os.fsencode(sys.executable),
369                            shell=True)
370
371    @unittest.skipIf(mswindows, "executable argument replaces shell")
372    def test_pathlike_executable_replaces_shell(self):
373        self._assert_python([], executable=FakePath(sys.executable),
374                            shell=True)
375
376    # For use in the test_cwd* tests below.
377    def _normalize_cwd(self, cwd):
378        # Normalize an expected cwd (for Tru64 support).
379        # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
380        # strings.  See bug #1063571.
381        with support.change_cwd(cwd):
382            return os.getcwd()
383
384    # For use in the test_cwd* tests below.
385    def _split_python_path(self):
386        # Return normalized (python_dir, python_base).
387        python_path = os.path.realpath(sys.executable)
388        return os.path.split(python_path)
389
390    # For use in the test_cwd* tests below.
391    def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
392        # Invoke Python via Popen, and assert that (1) the call succeeds,
393        # and that (2) the current working directory of the child process
394        # matches *expected_cwd*.
395        p = subprocess.Popen([python_arg, "-c",
396                              "import os, sys; "
397                              "buf = sys.stdout.buffer; "
398                              "buf.write(os.getcwd().encode()); "
399                              "buf.flush(); "
400                              "sys.exit(47)"],
401                              stdout=subprocess.PIPE,
402                              **kwargs)
403        self.addCleanup(p.stdout.close)
404        p.wait()
405        self.assertEqual(47, p.returncode)
406        normcase = os.path.normcase
407        self.assertEqual(normcase(expected_cwd),
408                         normcase(p.stdout.read().decode()))
409
410    def test_cwd(self):
411        # Check that cwd changes the cwd for the child process.
412        temp_dir = tempfile.gettempdir()
413        temp_dir = self._normalize_cwd(temp_dir)
414        self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
415
416    def test_cwd_with_bytes(self):
417        temp_dir = tempfile.gettempdir()
418        temp_dir = self._normalize_cwd(temp_dir)
419        self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir))
420
421    def test_cwd_with_pathlike(self):
422        temp_dir = tempfile.gettempdir()
423        temp_dir = self._normalize_cwd(temp_dir)
424        self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir))
425
426    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
427    def test_cwd_with_relative_arg(self):
428        # Check that Popen looks for args[0] relative to cwd if args[0]
429        # is relative.
430        python_dir, python_base = self._split_python_path()
431        rel_python = os.path.join(os.curdir, python_base)
432        with support.temp_cwd() as wrong_dir:
433            # Before calling with the correct cwd, confirm that the call fails
434            # without cwd and with the wrong cwd.
435            self.assertRaises(FileNotFoundError, subprocess.Popen,
436                              [rel_python])
437            self.assertRaises(FileNotFoundError, subprocess.Popen,
438                              [rel_python], cwd=wrong_dir)
439            python_dir = self._normalize_cwd(python_dir)
440            self._assert_cwd(python_dir, rel_python, cwd=python_dir)
441
442    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
443    def test_cwd_with_relative_executable(self):
444        # Check that Popen looks for executable relative to cwd if executable
445        # is relative (and that executable takes precedence over args[0]).
446        python_dir, python_base = self._split_python_path()
447        rel_python = os.path.join(os.curdir, python_base)
448        doesntexist = "somethingyoudonthave"
449        with support.temp_cwd() as wrong_dir:
450            # Before calling with the correct cwd, confirm that the call fails
451            # without cwd and with the wrong cwd.
452            self.assertRaises(FileNotFoundError, subprocess.Popen,
453                              [doesntexist], executable=rel_python)
454            self.assertRaises(FileNotFoundError, subprocess.Popen,
455                              [doesntexist], executable=rel_python,
456                              cwd=wrong_dir)
457            python_dir = self._normalize_cwd(python_dir)
458            self._assert_cwd(python_dir, doesntexist, executable=rel_python,
459                             cwd=python_dir)
460
461    def test_cwd_with_absolute_arg(self):
462        # Check that Popen can find the executable when the cwd is wrong
463        # if args[0] is an absolute path.
464        python_dir, python_base = self._split_python_path()
465        abs_python = os.path.join(python_dir, python_base)
466        rel_python = os.path.join(os.curdir, python_base)
467        with support.temp_dir() as wrong_dir:
468            # Before calling with an absolute path, confirm that using a
469            # relative path fails.
470            self.assertRaises(FileNotFoundError, subprocess.Popen,
471                              [rel_python], cwd=wrong_dir)
472            wrong_dir = self._normalize_cwd(wrong_dir)
473            self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
474
475    @unittest.skipIf(sys.base_prefix != sys.prefix,
476                     'Test is not venv-compatible')
477    def test_executable_with_cwd(self):
478        python_dir, python_base = self._split_python_path()
479        python_dir = self._normalize_cwd(python_dir)
480        self._assert_cwd(python_dir, "somethingyoudonthave",
481                         executable=sys.executable, cwd=python_dir)
482
483    @unittest.skipIf(sys.base_prefix != sys.prefix,
484                     'Test is not venv-compatible')
485    @unittest.skipIf(sysconfig.is_python_build(),
486                     "need an installed Python. See #7774")
487    def test_executable_without_cwd(self):
488        # For a normal installation, it should work without 'cwd'
489        # argument.  For test runs in the build directory, see #7774.
490        self._assert_cwd(os.getcwd(), "somethingyoudonthave",
491                         executable=sys.executable)
492
493    def test_stdin_pipe(self):
494        # stdin redirection
495        p = subprocess.Popen([sys.executable, "-c",
496                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
497                        stdin=subprocess.PIPE)
498        p.stdin.write(b"pear")
499        p.stdin.close()
500        p.wait()
501        self.assertEqual(p.returncode, 1)
502
503    def test_stdin_filedes(self):
504        # stdin is set to open file descriptor
505        tf = tempfile.TemporaryFile()
506        self.addCleanup(tf.close)
507        d = tf.fileno()
508        os.write(d, b"pear")
509        os.lseek(d, 0, 0)
510        p = subprocess.Popen([sys.executable, "-c",
511                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
512                         stdin=d)
513        p.wait()
514        self.assertEqual(p.returncode, 1)
515
516    def test_stdin_fileobj(self):
517        # stdin is set to open file object
518        tf = tempfile.TemporaryFile()
519        self.addCleanup(tf.close)
520        tf.write(b"pear")
521        tf.seek(0)
522        p = subprocess.Popen([sys.executable, "-c",
523                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
524                         stdin=tf)
525        p.wait()
526        self.assertEqual(p.returncode, 1)
527
528    def test_stdout_pipe(self):
529        # stdout redirection
530        p = subprocess.Popen([sys.executable, "-c",
531                          'import sys; sys.stdout.write("orange")'],
532                         stdout=subprocess.PIPE)
533        with p:
534            self.assertEqual(p.stdout.read(), b"orange")
535
536    def test_stdout_filedes(self):
537        # stdout is set to open file descriptor
538        tf = tempfile.TemporaryFile()
539        self.addCleanup(tf.close)
540        d = tf.fileno()
541        p = subprocess.Popen([sys.executable, "-c",
542                          'import sys; sys.stdout.write("orange")'],
543                         stdout=d)
544        p.wait()
545        os.lseek(d, 0, 0)
546        self.assertEqual(os.read(d, 1024), b"orange")
547
548    def test_stdout_fileobj(self):
549        # stdout is set to open file object
550        tf = tempfile.TemporaryFile()
551        self.addCleanup(tf.close)
552        p = subprocess.Popen([sys.executable, "-c",
553                          'import sys; sys.stdout.write("orange")'],
554                         stdout=tf)
555        p.wait()
556        tf.seek(0)
557        self.assertEqual(tf.read(), b"orange")
558
559    def test_stderr_pipe(self):
560        # stderr redirection
561        p = subprocess.Popen([sys.executable, "-c",
562                          'import sys; sys.stderr.write("strawberry")'],
563                         stderr=subprocess.PIPE)
564        with p:
565            self.assertStderrEqual(p.stderr.read(), b"strawberry")
566
567    def test_stderr_filedes(self):
568        # stderr is set to open file descriptor
569        tf = tempfile.TemporaryFile()
570        self.addCleanup(tf.close)
571        d = tf.fileno()
572        p = subprocess.Popen([sys.executable, "-c",
573                          'import sys; sys.stderr.write("strawberry")'],
574                         stderr=d)
575        p.wait()
576        os.lseek(d, 0, 0)
577        self.assertStderrEqual(os.read(d, 1024), b"strawberry")
578
579    def test_stderr_fileobj(self):
580        # stderr is set to open file object
581        tf = tempfile.TemporaryFile()
582        self.addCleanup(tf.close)
583        p = subprocess.Popen([sys.executable, "-c",
584                          'import sys; sys.stderr.write("strawberry")'],
585                         stderr=tf)
586        p.wait()
587        tf.seek(0)
588        self.assertStderrEqual(tf.read(), b"strawberry")
589
590    def test_stderr_redirect_with_no_stdout_redirect(self):
591        # test stderr=STDOUT while stdout=None (not set)
592
593        # - grandchild prints to stderr
594        # - child redirects grandchild's stderr to its stdout
595        # - the parent should get grandchild's stderr in child's stdout
596        p = subprocess.Popen([sys.executable, "-c",
597                              'import sys, subprocess;'
598                              'rc = subprocess.call([sys.executable, "-c",'
599                              '    "import sys;"'
600                              '    "sys.stderr.write(\'42\')"],'
601                              '    stderr=subprocess.STDOUT);'
602                              'sys.exit(rc)'],
603                             stdout=subprocess.PIPE,
604                             stderr=subprocess.PIPE)
605        stdout, stderr = p.communicate()
606        #NOTE: stdout should get stderr from grandchild
607        self.assertStderrEqual(stdout, b'42')
608        self.assertStderrEqual(stderr, b'') # should be empty
609        self.assertEqual(p.returncode, 0)
610
611    def test_stdout_stderr_pipe(self):
612        # capture stdout and stderr to the same pipe
613        p = subprocess.Popen([sys.executable, "-c",
614                              'import sys;'
615                              'sys.stdout.write("apple");'
616                              'sys.stdout.flush();'
617                              'sys.stderr.write("orange")'],
618                             stdout=subprocess.PIPE,
619                             stderr=subprocess.STDOUT)
620        with p:
621            self.assertStderrEqual(p.stdout.read(), b"appleorange")
622
623    def test_stdout_stderr_file(self):
624        # capture stdout and stderr to the same open file
625        tf = tempfile.TemporaryFile()
626        self.addCleanup(tf.close)
627        p = subprocess.Popen([sys.executable, "-c",
628                              'import sys;'
629                              'sys.stdout.write("apple");'
630                              'sys.stdout.flush();'
631                              'sys.stderr.write("orange")'],
632                             stdout=tf,
633                             stderr=tf)
634        p.wait()
635        tf.seek(0)
636        self.assertStderrEqual(tf.read(), b"appleorange")
637
638    def test_stdout_filedes_of_stdout(self):
639        # stdout is set to 1 (#1531862).
640        # To avoid printing the text on stdout, we do something similar to
641        # test_stdout_none (see above).  The parent subprocess calls the child
642        # subprocess passing stdout=1, and this test uses stdout=PIPE in
643        # order to capture and check the output of the parent. See #11963.
644        code = ('import sys, subprocess; '
645                'rc = subprocess.call([sys.executable, "-c", '
646                '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
647                     'b\'test with stdout=1\'))"], stdout=1); '
648                'assert rc == 18')
649        p = subprocess.Popen([sys.executable, "-c", code],
650                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
651        self.addCleanup(p.stdout.close)
652        self.addCleanup(p.stderr.close)
653        out, err = p.communicate()
654        self.assertEqual(p.returncode, 0, err)
655        self.assertEqual(out.rstrip(), b'test with stdout=1')
656
657    def test_stdout_devnull(self):
658        p = subprocess.Popen([sys.executable, "-c",
659                              'for i in range(10240):'
660                              'print("x" * 1024)'],
661                              stdout=subprocess.DEVNULL)
662        p.wait()
663        self.assertEqual(p.stdout, None)
664
665    def test_stderr_devnull(self):
666        p = subprocess.Popen([sys.executable, "-c",
667                              'import sys\n'
668                              'for i in range(10240):'
669                              'sys.stderr.write("x" * 1024)'],
670                              stderr=subprocess.DEVNULL)
671        p.wait()
672        self.assertEqual(p.stderr, None)
673
674    def test_stdin_devnull(self):
675        p = subprocess.Popen([sys.executable, "-c",
676                              'import sys;'
677                              'sys.stdin.read(1)'],
678                              stdin=subprocess.DEVNULL)
679        p.wait()
680        self.assertEqual(p.stdin, None)
681
682    def test_env(self):
683        newenv = os.environ.copy()
684        newenv["FRUIT"] = "orange"
685        with subprocess.Popen([sys.executable, "-c",
686                               'import sys,os;'
687                               'sys.stdout.write(os.getenv("FRUIT"))'],
688                              stdout=subprocess.PIPE,
689                              env=newenv) as p:
690            stdout, stderr = p.communicate()
691            self.assertEqual(stdout, b"orange")
692
693    # Windows requires at least the SYSTEMROOT environment variable to start
694    # Python
695    @unittest.skipIf(sys.platform == 'win32',
696                     'cannot test an empty env on Windows')
697    @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1,
698                     'The Python shared library cannot be loaded '
699                     'with an empty environment.')
700    def test_empty_env(self):
701        """Verify that env={} is as empty as possible."""
702
703        def is_env_var_to_ignore(n):
704            """Determine if an environment variable is under our control."""
705            # This excludes some __CF_* and VERSIONER_* keys MacOS insists
706            # on adding even when the environment in exec is empty.
707            # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist.
708            return ('VERSIONER' in n or '__CF' in n or  # MacOS
709                    n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo
710                    n == 'LC_CTYPE') # Locale coercion triggered
711
712        with subprocess.Popen([sys.executable, "-c",
713                               'import os; print(list(os.environ.keys()))'],
714                              stdout=subprocess.PIPE, env={}) as p:
715            stdout, stderr = p.communicate()
716            child_env_names = eval(stdout.strip())
717            self.assertIsInstance(child_env_names, list)
718            child_env_names = [k for k in child_env_names
719                               if not is_env_var_to_ignore(k)]
720            self.assertEqual(child_env_names, [])
721
722    def test_invalid_cmd(self):
723        # null character in the command name
724        cmd = sys.executable + '\0'
725        with self.assertRaises(ValueError):
726            subprocess.Popen([cmd, "-c", "pass"])
727
728        # null character in the command argument
729        with self.assertRaises(ValueError):
730            subprocess.Popen([sys.executable, "-c", "pass#\0"])
731
732    def test_invalid_env(self):
733        # null character in the environment variable name
734        newenv = os.environ.copy()
735        newenv["FRUIT\0VEGETABLE"] = "cabbage"
736        with self.assertRaises(ValueError):
737            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
738
739        # null character in the environment variable value
740        newenv = os.environ.copy()
741        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
742        with self.assertRaises(ValueError):
743            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
744
745        # equal character in the environment variable name
746        newenv = os.environ.copy()
747        newenv["FRUIT=ORANGE"] = "lemon"
748        with self.assertRaises(ValueError):
749            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
750
751        # equal character in the environment variable value
752        newenv = os.environ.copy()
753        newenv["FRUIT"] = "orange=lemon"
754        with subprocess.Popen([sys.executable, "-c",
755                               'import sys, os;'
756                               'sys.stdout.write(os.getenv("FRUIT"))'],
757                              stdout=subprocess.PIPE,
758                              env=newenv) as p:
759            stdout, stderr = p.communicate()
760            self.assertEqual(stdout, b"orange=lemon")
761
762    def test_communicate_stdin(self):
763        p = subprocess.Popen([sys.executable, "-c",
764                              'import sys;'
765                              'sys.exit(sys.stdin.read() == "pear")'],
766                             stdin=subprocess.PIPE)
767        p.communicate(b"pear")
768        self.assertEqual(p.returncode, 1)
769
770    def test_communicate_stdout(self):
771        p = subprocess.Popen([sys.executable, "-c",
772                              'import sys; sys.stdout.write("pineapple")'],
773                             stdout=subprocess.PIPE)
774        (stdout, stderr) = p.communicate()
775        self.assertEqual(stdout, b"pineapple")
776        self.assertEqual(stderr, None)
777
778    def test_communicate_stderr(self):
779        p = subprocess.Popen([sys.executable, "-c",
780                              'import sys; sys.stderr.write("pineapple")'],
781                             stderr=subprocess.PIPE)
782        (stdout, stderr) = p.communicate()
783        self.assertEqual(stdout, None)
784        self.assertStderrEqual(stderr, b"pineapple")
785
786    def test_communicate(self):
787        p = subprocess.Popen([sys.executable, "-c",
788                              'import sys,os;'
789                              'sys.stderr.write("pineapple");'
790                              'sys.stdout.write(sys.stdin.read())'],
791                             stdin=subprocess.PIPE,
792                             stdout=subprocess.PIPE,
793                             stderr=subprocess.PIPE)
794        self.addCleanup(p.stdout.close)
795        self.addCleanup(p.stderr.close)
796        self.addCleanup(p.stdin.close)
797        (stdout, stderr) = p.communicate(b"banana")
798        self.assertEqual(stdout, b"banana")
799        self.assertStderrEqual(stderr, b"pineapple")
800
801    def test_communicate_timeout(self):
802        p = subprocess.Popen([sys.executable, "-c",
803                              'import sys,os,time;'
804                              'sys.stderr.write("pineapple\\n");'
805                              'time.sleep(1);'
806                              'sys.stderr.write("pear\\n");'
807                              'sys.stdout.write(sys.stdin.read())'],
808                             universal_newlines=True,
809                             stdin=subprocess.PIPE,
810                             stdout=subprocess.PIPE,
811                             stderr=subprocess.PIPE)
812        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
813                          timeout=0.3)
814        # Make sure we can keep waiting for it, and that we get the whole output
815        # after it completes.
816        (stdout, stderr) = p.communicate()
817        self.assertEqual(stdout, "banana")
818        self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
819
820    def test_communicate_timeout_large_output(self):
821        # Test an expiring timeout while the child is outputting lots of data.
822        p = subprocess.Popen([sys.executable, "-c",
823                              'import sys,os,time;'
824                              'sys.stdout.write("a" * (64 * 1024));'
825                              'time.sleep(0.2);'
826                              'sys.stdout.write("a" * (64 * 1024));'
827                              'time.sleep(0.2);'
828                              'sys.stdout.write("a" * (64 * 1024));'
829                              'time.sleep(0.2);'
830                              'sys.stdout.write("a" * (64 * 1024));'],
831                             stdout=subprocess.PIPE)
832        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
833        (stdout, _) = p.communicate()
834        self.assertEqual(len(stdout), 4 * 64 * 1024)
835
836    # Test for the fd leak reported in http://bugs.python.org/issue2791.
837    def test_communicate_pipe_fd_leak(self):
838        for stdin_pipe in (False, True):
839            for stdout_pipe in (False, True):
840                for stderr_pipe in (False, True):
841                    options = {}
842                    if stdin_pipe:
843                        options['stdin'] = subprocess.PIPE
844                    if stdout_pipe:
845                        options['stdout'] = subprocess.PIPE
846                    if stderr_pipe:
847                        options['stderr'] = subprocess.PIPE
848                    if not options:
849                        continue
850                    p = subprocess.Popen(ZERO_RETURN_CMD, **options)
851                    p.communicate()
852                    if p.stdin is not None:
853                        self.assertTrue(p.stdin.closed)
854                    if p.stdout is not None:
855                        self.assertTrue(p.stdout.closed)
856                    if p.stderr is not None:
857                        self.assertTrue(p.stderr.closed)
858
859    def test_communicate_returns(self):
860        # communicate() should return None if no redirection is active
861        p = subprocess.Popen([sys.executable, "-c",
862                              "import sys; sys.exit(47)"])
863        (stdout, stderr) = p.communicate()
864        self.assertEqual(stdout, None)
865        self.assertEqual(stderr, None)
866
867    def test_communicate_pipe_buf(self):
868        # communicate() with writes larger than pipe_buf
869        # This test will probably deadlock rather than fail, if
870        # communicate() does not work properly.
871        x, y = os.pipe()
872        os.close(x)
873        os.close(y)
874        p = subprocess.Popen([sys.executable, "-c",
875                              'import sys,os;'
876                              'sys.stdout.write(sys.stdin.read(47));'
877                              'sys.stderr.write("x" * %d);'
878                              'sys.stdout.write(sys.stdin.read())' %
879                              support.PIPE_MAX_SIZE],
880                             stdin=subprocess.PIPE,
881                             stdout=subprocess.PIPE,
882                             stderr=subprocess.PIPE)
883        self.addCleanup(p.stdout.close)
884        self.addCleanup(p.stderr.close)
885        self.addCleanup(p.stdin.close)
886        string_to_write = b"a" * support.PIPE_MAX_SIZE
887        (stdout, stderr) = p.communicate(string_to_write)
888        self.assertEqual(stdout, string_to_write)
889
890    def test_writes_before_communicate(self):
891        # stdin.write before communicate()
892        p = subprocess.Popen([sys.executable, "-c",
893                              'import sys,os;'
894                              'sys.stdout.write(sys.stdin.read())'],
895                             stdin=subprocess.PIPE,
896                             stdout=subprocess.PIPE,
897                             stderr=subprocess.PIPE)
898        self.addCleanup(p.stdout.close)
899        self.addCleanup(p.stderr.close)
900        self.addCleanup(p.stdin.close)
901        p.stdin.write(b"banana")
902        (stdout, stderr) = p.communicate(b"split")
903        self.assertEqual(stdout, b"bananasplit")
904        self.assertStderrEqual(stderr, b"")
905
906    def test_universal_newlines_and_text(self):
907        args = [
908            sys.executable, "-c",
909            'import sys,os;' + SETBINARY +
910            'buf = sys.stdout.buffer;'
911            'buf.write(sys.stdin.readline().encode());'
912            'buf.flush();'
913            'buf.write(b"line2\\n");'
914            'buf.flush();'
915            'buf.write(sys.stdin.read().encode());'
916            'buf.flush();'
917            'buf.write(b"line4\\n");'
918            'buf.flush();'
919            'buf.write(b"line5\\r\\n");'
920            'buf.flush();'
921            'buf.write(b"line6\\r");'
922            'buf.flush();'
923            'buf.write(b"\\nline7");'
924            'buf.flush();'
925            'buf.write(b"\\nline8");']
926
927        for extra_kwarg in ('universal_newlines', 'text'):
928            p = subprocess.Popen(args, **{'stdin': subprocess.PIPE,
929                                          'stdout': subprocess.PIPE,
930                                          extra_kwarg: True})
931            with p:
932                p.stdin.write("line1\n")
933                p.stdin.flush()
934                self.assertEqual(p.stdout.readline(), "line1\n")
935                p.stdin.write("line3\n")
936                p.stdin.close()
937                self.addCleanup(p.stdout.close)
938                self.assertEqual(p.stdout.readline(),
939                                 "line2\n")
940                self.assertEqual(p.stdout.read(6),
941                                 "line3\n")
942                self.assertEqual(p.stdout.read(),
943                                 "line4\nline5\nline6\nline7\nline8")
944
945    def test_universal_newlines_communicate(self):
946        # universal newlines through communicate()
947        p = subprocess.Popen([sys.executable, "-c",
948                              'import sys,os;' + SETBINARY +
949                              'buf = sys.stdout.buffer;'
950                              'buf.write(b"line2\\n");'
951                              'buf.flush();'
952                              'buf.write(b"line4\\n");'
953                              'buf.flush();'
954                              'buf.write(b"line5\\r\\n");'
955                              'buf.flush();'
956                              'buf.write(b"line6\\r");'
957                              'buf.flush();'
958                              'buf.write(b"\\nline7");'
959                              'buf.flush();'
960                              'buf.write(b"\\nline8");'],
961                             stderr=subprocess.PIPE,
962                             stdout=subprocess.PIPE,
963                             universal_newlines=1)
964        self.addCleanup(p.stdout.close)
965        self.addCleanup(p.stderr.close)
966        (stdout, stderr) = p.communicate()
967        self.assertEqual(stdout,
968                         "line2\nline4\nline5\nline6\nline7\nline8")
969
970    def test_universal_newlines_communicate_stdin(self):
971        # universal newlines through communicate(), with only stdin
972        p = subprocess.Popen([sys.executable, "-c",
973                              'import sys,os;' + SETBINARY + textwrap.dedent('''
974                               s = sys.stdin.readline()
975                               assert s == "line1\\n", repr(s)
976                               s = sys.stdin.read()
977                               assert s == "line3\\n", repr(s)
978                              ''')],
979                             stdin=subprocess.PIPE,
980                             universal_newlines=1)
981        (stdout, stderr) = p.communicate("line1\nline3\n")
982        self.assertEqual(p.returncode, 0)
983
984    def test_universal_newlines_communicate_input_none(self):
985        # Test communicate(input=None) with universal newlines.
986        #
987        # We set stdout to PIPE because, as of this writing, a different
988        # code path is tested when the number of pipes is zero or one.
989        p = subprocess.Popen(ZERO_RETURN_CMD,
990                             stdin=subprocess.PIPE,
991                             stdout=subprocess.PIPE,
992                             universal_newlines=True)
993        p.communicate()
994        self.assertEqual(p.returncode, 0)
995
996    def test_universal_newlines_communicate_stdin_stdout_stderr(self):
997        # universal newlines through communicate(), with stdin, stdout, stderr
998        p = subprocess.Popen([sys.executable, "-c",
999                              'import sys,os;' + SETBINARY + textwrap.dedent('''
1000                               s = sys.stdin.buffer.readline()
1001                               sys.stdout.buffer.write(s)
1002                               sys.stdout.buffer.write(b"line2\\r")
1003                               sys.stderr.buffer.write(b"eline2\\n")
1004                               s = sys.stdin.buffer.read()
1005                               sys.stdout.buffer.write(s)
1006                               sys.stdout.buffer.write(b"line4\\n")
1007                               sys.stdout.buffer.write(b"line5\\r\\n")
1008                               sys.stderr.buffer.write(b"eline6\\r")
1009                               sys.stderr.buffer.write(b"eline7\\r\\nz")
1010                              ''')],
1011                             stdin=subprocess.PIPE,
1012                             stderr=subprocess.PIPE,
1013                             stdout=subprocess.PIPE,
1014                             universal_newlines=True)
1015        self.addCleanup(p.stdout.close)
1016        self.addCleanup(p.stderr.close)
1017        (stdout, stderr) = p.communicate("line1\nline3\n")
1018        self.assertEqual(p.returncode, 0)
1019        self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
1020        # Python debug build push something like "[42442 refs]\n"
1021        # to stderr at exit of subprocess.
1022        # Don't use assertStderrEqual because it strips CR and LF from output.
1023        self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
1024
1025    def test_universal_newlines_communicate_encodings(self):
1026        # Check that universal newlines mode works for various encodings,
1027        # in particular for encodings in the UTF-16 and UTF-32 families.
1028        # See issue #15595.
1029        #
1030        # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
1031        # without, and UTF-16 and UTF-32.
1032        for encoding in ['utf-16', 'utf-32-be']:
1033            code = ("import sys; "
1034                    r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
1035                    encoding)
1036            args = [sys.executable, '-c', code]
1037            # We set stdin to be non-None because, as of this writing,
1038            # a different code path is used when the number of pipes is
1039            # zero or one.
1040            popen = subprocess.Popen(args,
1041                                     stdin=subprocess.PIPE,
1042                                     stdout=subprocess.PIPE,
1043                                     encoding=encoding)
1044            stdout, stderr = popen.communicate(input='')
1045            self.assertEqual(stdout, '1\n2\n3\n4')
1046
1047    def test_communicate_errors(self):
1048        for errors, expected in [
1049            ('ignore', ''),
1050            ('replace', '\ufffd\ufffd'),
1051            ('surrogateescape', '\udc80\udc80'),
1052            ('backslashreplace', '\\x80\\x80'),
1053        ]:
1054            code = ("import sys; "
1055                    r"sys.stdout.buffer.write(b'[\x80\x80]')")
1056            args = [sys.executable, '-c', code]
1057            # We set stdin to be non-None because, as of this writing,
1058            # a different code path is used when the number of pipes is
1059            # zero or one.
1060            popen = subprocess.Popen(args,
1061                                     stdin=subprocess.PIPE,
1062                                     stdout=subprocess.PIPE,
1063                                     encoding='utf-8',
1064                                     errors=errors)
1065            stdout, stderr = popen.communicate(input='')
1066            self.assertEqual(stdout, '[{}]'.format(expected))
1067
1068    def test_no_leaking(self):
1069        # Make sure we leak no resources
1070        if not mswindows:
1071            max_handles = 1026 # too much for most UNIX systems
1072        else:
1073            max_handles = 2050 # too much for (at least some) Windows setups
1074        handles = []
1075        tmpdir = tempfile.mkdtemp()
1076        try:
1077            for i in range(max_handles):
1078                try:
1079                    tmpfile = os.path.join(tmpdir, support.TESTFN)
1080                    handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
1081                except OSError as e:
1082                    if e.errno != errno.EMFILE:
1083                        raise
1084                    break
1085            else:
1086                self.skipTest("failed to reach the file descriptor limit "
1087                    "(tried %d)" % max_handles)
1088            # Close a couple of them (should be enough for a subprocess)
1089            for i in range(10):
1090                os.close(handles.pop())
1091            # Loop creating some subprocesses. If one of them leaks some fds,
1092            # the next loop iteration will fail by reaching the max fd limit.
1093            for i in range(15):
1094                p = subprocess.Popen([sys.executable, "-c",
1095                                      "import sys;"
1096                                      "sys.stdout.write(sys.stdin.read())"],
1097                                     stdin=subprocess.PIPE,
1098                                     stdout=subprocess.PIPE,
1099                                     stderr=subprocess.PIPE)
1100                data = p.communicate(b"lime")[0]
1101                self.assertEqual(data, b"lime")
1102        finally:
1103            for h in handles:
1104                os.close(h)
1105            shutil.rmtree(tmpdir)
1106
1107    def test_list2cmdline(self):
1108        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
1109                         '"a b c" d e')
1110        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
1111                         'ab\\"c \\ d')
1112        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
1113                         'ab\\"c " \\\\" d')
1114        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
1115                         'a\\\\\\b "de fg" h')
1116        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
1117                         'a\\\\\\"b c d')
1118        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
1119                         '"a\\\\b c" d e')
1120        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
1121                         '"a\\\\b\\ c" d e')
1122        self.assertEqual(subprocess.list2cmdline(['ab', '']),
1123                         'ab ""')
1124
1125    def test_poll(self):
1126        p = subprocess.Popen([sys.executable, "-c",
1127                              "import os; os.read(0, 1)"],
1128                             stdin=subprocess.PIPE)
1129        self.addCleanup(p.stdin.close)
1130        self.assertIsNone(p.poll())
1131        os.write(p.stdin.fileno(), b'A')
1132        p.wait()
1133        # Subsequent invocations should just return the returncode
1134        self.assertEqual(p.poll(), 0)
1135
1136    def test_wait(self):
1137        p = subprocess.Popen(ZERO_RETURN_CMD)
1138        self.assertEqual(p.wait(), 0)
1139        # Subsequent invocations should just return the returncode
1140        self.assertEqual(p.wait(), 0)
1141
1142    def test_wait_timeout(self):
1143        p = subprocess.Popen([sys.executable,
1144                              "-c", "import time; time.sleep(0.3)"])
1145        with self.assertRaises(subprocess.TimeoutExpired) as c:
1146            p.wait(timeout=0.0001)
1147        self.assertIn("0.0001", str(c.exception))  # For coverage of __str__.
1148        # Some heavily loaded buildbots (sparc Debian 3.x) require this much
1149        # time to start.
1150        self.assertEqual(p.wait(timeout=3), 0)
1151
1152    def test_invalid_bufsize(self):
1153        # an invalid type of the bufsize argument should raise
1154        # TypeError.
1155        with self.assertRaises(TypeError):
1156            subprocess.Popen(ZERO_RETURN_CMD, "orange")
1157
1158    def test_bufsize_is_none(self):
1159        # bufsize=None should be the same as bufsize=0.
1160        p = subprocess.Popen(ZERO_RETURN_CMD, None)
1161        self.assertEqual(p.wait(), 0)
1162        # Again with keyword arg
1163        p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None)
1164        self.assertEqual(p.wait(), 0)
1165
1166    def _test_bufsize_equal_one(self, line, expected, universal_newlines):
1167        # subprocess may deadlock with bufsize=1, see issue #21332
1168        with subprocess.Popen([sys.executable, "-c", "import sys;"
1169                               "sys.stdout.write(sys.stdin.readline());"
1170                               "sys.stdout.flush()"],
1171                              stdin=subprocess.PIPE,
1172                              stdout=subprocess.PIPE,
1173                              stderr=subprocess.DEVNULL,
1174                              bufsize=1,
1175                              universal_newlines=universal_newlines) as p:
1176            p.stdin.write(line) # expect that it flushes the line in text mode
1177            os.close(p.stdin.fileno()) # close it without flushing the buffer
1178            read_line = p.stdout.readline()
1179            with support.SuppressCrashReport():
1180                try:
1181                    p.stdin.close()
1182                except OSError:
1183                    pass
1184            p.stdin = None
1185        self.assertEqual(p.returncode, 0)
1186        self.assertEqual(read_line, expected)
1187
1188    def test_bufsize_equal_one_text_mode(self):
1189        # line is flushed in text mode with bufsize=1.
1190        # we should get the full line in return
1191        line = "line\n"
1192        self._test_bufsize_equal_one(line, line, universal_newlines=True)
1193
1194    def test_bufsize_equal_one_binary_mode(self):
1195        # line is not flushed in binary mode with bufsize=1.
1196        # we should get empty response
1197        line = b'line' + os.linesep.encode() # assume ascii-based locale
1198        with self.assertWarnsRegex(RuntimeWarning, 'line buffering'):
1199            self._test_bufsize_equal_one(line, b'', universal_newlines=False)
1200
1201    def test_leaking_fds_on_error(self):
1202        # see bug #5179: Popen leaks file descriptors to PIPEs if
1203        # the child fails to execute; this will eventually exhaust
1204        # the maximum number of open fds. 1024 seems a very common
1205        # value for that limit, but Windows has 2048, so we loop
1206        # 1024 times (each call leaked two fds).
1207        for i in range(1024):
1208            with self.assertRaises(NONEXISTING_ERRORS):
1209                subprocess.Popen(NONEXISTING_CMD,
1210                                 stdout=subprocess.PIPE,
1211                                 stderr=subprocess.PIPE)
1212
1213    def test_nonexisting_with_pipes(self):
1214        # bpo-30121: Popen with pipes must close properly pipes on error.
1215        # Previously, os.close() was called with a Windows handle which is not
1216        # a valid file descriptor.
1217        #
1218        # Run the test in a subprocess to control how the CRT reports errors
1219        # and to get stderr content.
1220        try:
1221            import msvcrt
1222            msvcrt.CrtSetReportMode
1223        except (AttributeError, ImportError):
1224            self.skipTest("need msvcrt.CrtSetReportMode")
1225
1226        code = textwrap.dedent(f"""
1227            import msvcrt
1228            import subprocess
1229
1230            cmd = {NONEXISTING_CMD!r}
1231
1232            for report_type in [msvcrt.CRT_WARN,
1233                                msvcrt.CRT_ERROR,
1234                                msvcrt.CRT_ASSERT]:
1235                msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE)
1236                msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR)
1237
1238            try:
1239                subprocess.Popen(cmd,
1240                                 stdout=subprocess.PIPE,
1241                                 stderr=subprocess.PIPE)
1242            except OSError:
1243                pass
1244        """)
1245        cmd = [sys.executable, "-c", code]
1246        proc = subprocess.Popen(cmd,
1247                                stderr=subprocess.PIPE,
1248                                universal_newlines=True)
1249        with proc:
1250            stderr = proc.communicate()[1]
1251        self.assertEqual(stderr, "")
1252        self.assertEqual(proc.returncode, 0)
1253
1254    def test_double_close_on_error(self):
1255        # Issue #18851
1256        fds = []
1257        def open_fds():
1258            for i in range(20):
1259                fds.extend(os.pipe())
1260                time.sleep(0.001)
1261        t = threading.Thread(target=open_fds)
1262        t.start()
1263        try:
1264            with self.assertRaises(EnvironmentError):
1265                subprocess.Popen(NONEXISTING_CMD,
1266                                 stdin=subprocess.PIPE,
1267                                 stdout=subprocess.PIPE,
1268                                 stderr=subprocess.PIPE)
1269        finally:
1270            t.join()
1271            exc = None
1272            for fd in fds:
1273                # If a double close occurred, some of those fds will
1274                # already have been closed by mistake, and os.close()
1275                # here will raise.
1276                try:
1277                    os.close(fd)
1278                except OSError as e:
1279                    exc = e
1280            if exc is not None:
1281                raise exc
1282
1283    def test_threadsafe_wait(self):
1284        """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
1285        proc = subprocess.Popen([sys.executable, '-c',
1286                                 'import time; time.sleep(12)'])
1287        self.assertEqual(proc.returncode, None)
1288        results = []
1289
1290        def kill_proc_timer_thread():
1291            results.append(('thread-start-poll-result', proc.poll()))
1292            # terminate it from the thread and wait for the result.
1293            proc.kill()
1294            proc.wait()
1295            results.append(('thread-after-kill-and-wait', proc.returncode))
1296            # this wait should be a no-op given the above.
1297            proc.wait()
1298            results.append(('thread-after-second-wait', proc.returncode))
1299
1300        # This is a timing sensitive test, the failure mode is
1301        # triggered when both the main thread and this thread are in
1302        # the wait() call at once.  The delay here is to allow the
1303        # main thread to most likely be blocked in its wait() call.
1304        t = threading.Timer(0.2, kill_proc_timer_thread)
1305        t.start()
1306
1307        if mswindows:
1308            expected_errorcode = 1
1309        else:
1310            # Should be -9 because of the proc.kill() from the thread.
1311            expected_errorcode = -9
1312
1313        # Wait for the process to finish; the thread should kill it
1314        # long before it finishes on its own.  Supplying a timeout
1315        # triggers a different code path for better coverage.
1316        proc.wait(timeout=20)
1317        self.assertEqual(proc.returncode, expected_errorcode,
1318                         msg="unexpected result in wait from main thread")
1319
1320        # This should be a no-op with no change in returncode.
1321        proc.wait()
1322        self.assertEqual(proc.returncode, expected_errorcode,
1323                         msg="unexpected result in second main wait.")
1324
1325        t.join()
1326        # Ensure that all of the thread results are as expected.
1327        # When a race condition occurs in wait(), the returncode could
1328        # be set by the wrong thread that doesn't actually have it
1329        # leading to an incorrect value.
1330        self.assertEqual([('thread-start-poll-result', None),
1331                          ('thread-after-kill-and-wait', expected_errorcode),
1332                          ('thread-after-second-wait', expected_errorcode)],
1333                         results)
1334
1335    def test_issue8780(self):
1336        # Ensure that stdout is inherited from the parent
1337        # if stdout=PIPE is not used
1338        code = ';'.join((
1339            'import subprocess, sys',
1340            'retcode = subprocess.call('
1341                "[sys.executable, '-c', 'print(\"Hello World!\")'])",
1342            'assert retcode == 0'))
1343        output = subprocess.check_output([sys.executable, '-c', code])
1344        self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
1345
1346    def test_handles_closed_on_exception(self):
1347        # If CreateProcess exits with an error, ensure the
1348        # duplicate output handles are released
1349        ifhandle, ifname = tempfile.mkstemp()
1350        ofhandle, ofname = tempfile.mkstemp()
1351        efhandle, efname = tempfile.mkstemp()
1352        try:
1353            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
1354              stderr=efhandle)
1355        except OSError:
1356            os.close(ifhandle)
1357            os.remove(ifname)
1358            os.close(ofhandle)
1359            os.remove(ofname)
1360            os.close(efhandle)
1361            os.remove(efname)
1362        self.assertFalse(os.path.exists(ifname))
1363        self.assertFalse(os.path.exists(ofname))
1364        self.assertFalse(os.path.exists(efname))
1365
1366    def test_communicate_epipe(self):
1367        # Issue 10963: communicate() should hide EPIPE
1368        p = subprocess.Popen(ZERO_RETURN_CMD,
1369                             stdin=subprocess.PIPE,
1370                             stdout=subprocess.PIPE,
1371                             stderr=subprocess.PIPE)
1372        self.addCleanup(p.stdout.close)
1373        self.addCleanup(p.stderr.close)
1374        self.addCleanup(p.stdin.close)
1375        p.communicate(b"x" * 2**20)
1376
1377    def test_communicate_epipe_only_stdin(self):
1378        # Issue 10963: communicate() should hide EPIPE
1379        p = subprocess.Popen(ZERO_RETURN_CMD,
1380                             stdin=subprocess.PIPE)
1381        self.addCleanup(p.stdin.close)
1382        p.wait()
1383        p.communicate(b"x" * 2**20)
1384
1385    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
1386                         "Requires signal.SIGUSR1")
1387    @unittest.skipUnless(hasattr(os, 'kill'),
1388                         "Requires os.kill")
1389    @unittest.skipUnless(hasattr(os, 'getppid'),
1390                         "Requires os.getppid")
1391    def test_communicate_eintr(self):
1392        # Issue #12493: communicate() should handle EINTR
1393        def handler(signum, frame):
1394            pass
1395        old_handler = signal.signal(signal.SIGUSR1, handler)
1396        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
1397
1398        args = [sys.executable, "-c",
1399                'import os, signal;'
1400                'os.kill(os.getppid(), signal.SIGUSR1)']
1401        for stream in ('stdout', 'stderr'):
1402            kw = {stream: subprocess.PIPE}
1403            with subprocess.Popen(args, **kw) as process:
1404                # communicate() will be interrupted by SIGUSR1
1405                process.communicate()
1406
1407
1408    # This test is Linux-ish specific for simplicity to at least have
1409    # some coverage.  It is not a platform specific bug.
1410    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
1411                         "Linux specific")
1412    def test_failed_child_execute_fd_leak(self):
1413        """Test for the fork() failure fd leak reported in issue16327."""
1414        fd_directory = '/proc/%d/fd' % os.getpid()
1415        fds_before_popen = os.listdir(fd_directory)
1416        with self.assertRaises(PopenTestException):
1417            PopenExecuteChildRaises(
1418                    ZERO_RETURN_CMD, stdin=subprocess.PIPE,
1419                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1420
1421        # NOTE: This test doesn't verify that the real _execute_child
1422        # does not close the file descriptors itself on the way out
1423        # during an exception.  Code inspection has confirmed that.
1424
1425        fds_after_exception = os.listdir(fd_directory)
1426        self.assertEqual(fds_before_popen, fds_after_exception)
1427
1428    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1429    def test_file_not_found_includes_filename(self):
1430        with self.assertRaises(FileNotFoundError) as c:
1431            subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args'])
1432        self.assertEqual(c.exception.filename, '/opt/nonexistent_binary')
1433
1434    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1435    def test_file_not_found_with_bad_cwd(self):
1436        with self.assertRaises(FileNotFoundError) as c:
1437            subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory')
1438        self.assertEqual(c.exception.filename, '/some/nonexistent/directory')
1439
1440
1441class RunFuncTestCase(BaseTestCase):
1442    def run_python(self, code, **kwargs):
1443        """Run Python code in a subprocess using subprocess.run"""
1444        argv = [sys.executable, "-c", code]
1445        return subprocess.run(argv, **kwargs)
1446
1447    def test_returncode(self):
1448        # call() function with sequence argument
1449        cp = self.run_python("import sys; sys.exit(47)")
1450        self.assertEqual(cp.returncode, 47)
1451        with self.assertRaises(subprocess.CalledProcessError):
1452            cp.check_returncode()
1453
1454    def test_check(self):
1455        with self.assertRaises(subprocess.CalledProcessError) as c:
1456            self.run_python("import sys; sys.exit(47)", check=True)
1457        self.assertEqual(c.exception.returncode, 47)
1458
1459    def test_check_zero(self):
1460        # check_returncode shouldn't raise when returncode is zero
1461        cp = subprocess.run(ZERO_RETURN_CMD, check=True)
1462        self.assertEqual(cp.returncode, 0)
1463
1464    def test_timeout(self):
1465        # run() function with timeout argument; we want to test that the child
1466        # process gets killed when the timeout expires.  If the child isn't
1467        # killed, this call will deadlock since subprocess.run waits for the
1468        # child.
1469        with self.assertRaises(subprocess.TimeoutExpired):
1470            self.run_python("while True: pass", timeout=0.0001)
1471
1472    def test_capture_stdout(self):
1473        # capture stdout with zero return code
1474        cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
1475        self.assertIn(b'BDFL', cp.stdout)
1476
1477    def test_capture_stderr(self):
1478        cp = self.run_python("import sys; sys.stderr.write('BDFL')",
1479                             stderr=subprocess.PIPE)
1480        self.assertIn(b'BDFL', cp.stderr)
1481
1482    def test_check_output_stdin_arg(self):
1483        # run() can be called with stdin set to a file
1484        tf = tempfile.TemporaryFile()
1485        self.addCleanup(tf.close)
1486        tf.write(b'pear')
1487        tf.seek(0)
1488        cp = self.run_python(
1489                 "import sys; sys.stdout.write(sys.stdin.read().upper())",
1490                stdin=tf, stdout=subprocess.PIPE)
1491        self.assertIn(b'PEAR', cp.stdout)
1492
1493    def test_check_output_input_arg(self):
1494        # check_output() can be called with input set to a string
1495        cp = self.run_python(
1496                "import sys; sys.stdout.write(sys.stdin.read().upper())",
1497                input=b'pear', stdout=subprocess.PIPE)
1498        self.assertIn(b'PEAR', cp.stdout)
1499
1500    def test_check_output_stdin_with_input_arg(self):
1501        # run() refuses to accept 'stdin' with 'input'
1502        tf = tempfile.TemporaryFile()
1503        self.addCleanup(tf.close)
1504        tf.write(b'pear')
1505        tf.seek(0)
1506        with self.assertRaises(ValueError,
1507              msg="Expected ValueError when stdin and input args supplied.") as c:
1508            output = self.run_python("print('will not be run')",
1509                                     stdin=tf, input=b'hare')
1510        self.assertIn('stdin', c.exception.args[0])
1511        self.assertIn('input', c.exception.args[0])
1512
1513    def test_check_output_timeout(self):
1514        with self.assertRaises(subprocess.TimeoutExpired) as c:
1515            cp = self.run_python((
1516                     "import sys, time\n"
1517                     "sys.stdout.write('BDFL')\n"
1518                     "sys.stdout.flush()\n"
1519                     "time.sleep(3600)"),
1520                    # Some heavily loaded buildbots (sparc Debian 3.x) require
1521                    # this much time to start and print.
1522                    timeout=3, stdout=subprocess.PIPE)
1523        self.assertEqual(c.exception.output, b'BDFL')
1524        # output is aliased to stdout
1525        self.assertEqual(c.exception.stdout, b'BDFL')
1526
1527    def test_run_kwargs(self):
1528        newenv = os.environ.copy()
1529        newenv["FRUIT"] = "banana"
1530        cp = self.run_python(('import sys, os;'
1531                      'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
1532                             env=newenv)
1533        self.assertEqual(cp.returncode, 33)
1534
1535    def test_run_with_pathlike_path(self):
1536        # bpo-31961: test run(pathlike_object)
1537        # the name of a command that can be run without
1538        # any argumenets that exit fast
1539        prog = 'tree.com' if mswindows else 'ls'
1540        path = shutil.which(prog)
1541        if path is None:
1542            self.skipTest(f'{prog} required for this test')
1543        path = FakePath(path)
1544        res = subprocess.run(path, stdout=subprocess.DEVNULL)
1545        self.assertEqual(res.returncode, 0)
1546        with self.assertRaises(TypeError):
1547            subprocess.run(path, stdout=subprocess.DEVNULL, shell=True)
1548
1549    def test_run_with_bytes_path_and_arguments(self):
1550        # bpo-31961: test run([bytes_object, b'additional arguments'])
1551        path = os.fsencode(sys.executable)
1552        args = [path, '-c', b'import sys; sys.exit(57)']
1553        res = subprocess.run(args)
1554        self.assertEqual(res.returncode, 57)
1555
1556    def test_run_with_pathlike_path_and_arguments(self):
1557        # bpo-31961: test run([pathlike_object, 'additional arguments'])
1558        path = FakePath(sys.executable)
1559        args = [path, '-c', 'import sys; sys.exit(57)']
1560        res = subprocess.run(args)
1561        self.assertEqual(res.returncode, 57)
1562
1563    def test_capture_output(self):
1564        cp = self.run_python(("import sys;"
1565                              "sys.stdout.write('BDFL'); "
1566                              "sys.stderr.write('FLUFL')"),
1567                             capture_output=True)
1568        self.assertIn(b'BDFL', cp.stdout)
1569        self.assertIn(b'FLUFL', cp.stderr)
1570
1571    def test_stdout_with_capture_output_arg(self):
1572        # run() refuses to accept 'stdout' with 'capture_output'
1573        tf = tempfile.TemporaryFile()
1574        self.addCleanup(tf.close)
1575        with self.assertRaises(ValueError,
1576            msg=("Expected ValueError when stdout and capture_output "
1577                 "args supplied.")) as c:
1578            output = self.run_python("print('will not be run')",
1579                                      capture_output=True, stdout=tf)
1580        self.assertIn('stdout', c.exception.args[0])
1581        self.assertIn('capture_output', c.exception.args[0])
1582
1583    def test_stderr_with_capture_output_arg(self):
1584        # run() refuses to accept 'stderr' with 'capture_output'
1585        tf = tempfile.TemporaryFile()
1586        self.addCleanup(tf.close)
1587        with self.assertRaises(ValueError,
1588            msg=("Expected ValueError when stderr and capture_output "
1589                 "args supplied.")) as c:
1590            output = self.run_python("print('will not be run')",
1591                                      capture_output=True, stderr=tf)
1592        self.assertIn('stderr', c.exception.args[0])
1593        self.assertIn('capture_output', c.exception.args[0])
1594
1595    # This test _might_ wind up a bit fragile on loaded build+test machines
1596    # as it depends on the timing with wide enough margins for normal situations
1597    # but does assert that it happened "soon enough" to believe the right thing
1598    # happened.
1599    @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command")
1600    def test_run_with_shell_timeout_and_capture_output(self):
1601        """Output capturing after a timeout mustn't hang forever on open filehandles."""
1602        before_secs = time.monotonic()
1603        try:
1604            subprocess.run('sleep 3', shell=True, timeout=0.1,
1605                           capture_output=True)  # New session unspecified.
1606        except subprocess.TimeoutExpired as exc:
1607            after_secs = time.monotonic()
1608            stacks = traceback.format_exc()  # assertRaises doesn't give this.
1609        else:
1610            self.fail("TimeoutExpired not raised.")
1611        self.assertLess(after_secs - before_secs, 1.5,
1612                        msg="TimeoutExpired was delayed! Bad traceback:\n```\n"
1613                        f"{stacks}```")
1614
1615
1616@unittest.skipIf(mswindows, "POSIX specific tests")
1617class POSIXProcessTestCase(BaseTestCase):
1618
1619    def setUp(self):
1620        super().setUp()
1621        self._nonexistent_dir = "/_this/pa.th/does/not/exist"
1622
1623    def _get_chdir_exception(self):
1624        try:
1625            os.chdir(self._nonexistent_dir)
1626        except OSError as e:
1627            # This avoids hard coding the errno value or the OS perror()
1628            # string and instead capture the exception that we want to see
1629            # below for comparison.
1630            desired_exception = e
1631        else:
1632            self.fail("chdir to nonexistent directory %s succeeded." %
1633                      self._nonexistent_dir)
1634        return desired_exception
1635
1636    def test_exception_cwd(self):
1637        """Test error in the child raised in the parent for a bad cwd."""
1638        desired_exception = self._get_chdir_exception()
1639        try:
1640            p = subprocess.Popen([sys.executable, "-c", ""],
1641                                 cwd=self._nonexistent_dir)
1642        except OSError as e:
1643            # Test that the child process chdir failure actually makes
1644            # it up to the parent process as the correct exception.
1645            self.assertEqual(desired_exception.errno, e.errno)
1646            self.assertEqual(desired_exception.strerror, e.strerror)
1647            self.assertEqual(desired_exception.filename, e.filename)
1648        else:
1649            self.fail("Expected OSError: %s" % desired_exception)
1650
1651    def test_exception_bad_executable(self):
1652        """Test error in the child raised in the parent for a bad executable."""
1653        desired_exception = self._get_chdir_exception()
1654        try:
1655            p = subprocess.Popen([sys.executable, "-c", ""],
1656                                 executable=self._nonexistent_dir)
1657        except OSError as e:
1658            # Test that the child process exec failure actually makes
1659            # it up to the parent process as the correct exception.
1660            self.assertEqual(desired_exception.errno, e.errno)
1661            self.assertEqual(desired_exception.strerror, e.strerror)
1662            self.assertEqual(desired_exception.filename, e.filename)
1663        else:
1664            self.fail("Expected OSError: %s" % desired_exception)
1665
1666    def test_exception_bad_args_0(self):
1667        """Test error in the child raised in the parent for a bad args[0]."""
1668        desired_exception = self._get_chdir_exception()
1669        try:
1670            p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
1671        except OSError as e:
1672            # Test that the child process exec failure actually makes
1673            # it up to the parent process as the correct exception.
1674            self.assertEqual(desired_exception.errno, e.errno)
1675            self.assertEqual(desired_exception.strerror, e.strerror)
1676            self.assertEqual(desired_exception.filename, e.filename)
1677        else:
1678            self.fail("Expected OSError: %s" % desired_exception)
1679
1680    # We mock the __del__ method for Popen in the next two tests
1681    # because it does cleanup based on the pid returned by fork_exec
1682    # along with issuing a resource warning if it still exists. Since
1683    # we don't actually spawn a process in these tests we can forego
1684    # the destructor. An alternative would be to set _child_created to
1685    # False before the destructor is called but there is no easy way
1686    # to do that
1687    class PopenNoDestructor(subprocess.Popen):
1688        def __del__(self):
1689            pass
1690
1691    @mock.patch("subprocess._posixsubprocess.fork_exec")
1692    def test_exception_errpipe_normal(self, fork_exec):
1693        """Test error passing done through errpipe_write in the good case"""
1694        def proper_error(*args):
1695            errpipe_write = args[13]
1696            # Write the hex for the error code EISDIR: 'is a directory'
1697            err_code = '{:x}'.format(errno.EISDIR).encode()
1698            os.write(errpipe_write, b"OSError:" + err_code + b":")
1699            return 0
1700
1701        fork_exec.side_effect = proper_error
1702
1703        with mock.patch("subprocess.os.waitpid",
1704                        side_effect=ChildProcessError):
1705            with self.assertRaises(IsADirectoryError):
1706                self.PopenNoDestructor(["non_existent_command"])
1707
1708    @mock.patch("subprocess._posixsubprocess.fork_exec")
1709    def test_exception_errpipe_bad_data(self, fork_exec):
1710        """Test error passing done through errpipe_write where its not
1711        in the expected format"""
1712        error_data = b"\xFF\x00\xDE\xAD"
1713        def bad_error(*args):
1714            errpipe_write = args[13]
1715            # Anything can be in the pipe, no assumptions should
1716            # be made about its encoding, so we'll write some
1717            # arbitrary hex bytes to test it out
1718            os.write(errpipe_write, error_data)
1719            return 0
1720
1721        fork_exec.side_effect = bad_error
1722
1723        with mock.patch("subprocess.os.waitpid",
1724                        side_effect=ChildProcessError):
1725            with self.assertRaises(subprocess.SubprocessError) as e:
1726                self.PopenNoDestructor(["non_existent_command"])
1727
1728        self.assertIn(repr(error_data), str(e.exception))
1729
1730    @unittest.skipIf(not os.path.exists('/proc/self/status'),
1731                     "need /proc/self/status")
1732    def test_restore_signals(self):
1733        # Blindly assume that cat exists on systems with /proc/self/status...
1734        default_proc_status = subprocess.check_output(
1735                ['cat', '/proc/self/status'],
1736                restore_signals=False)
1737        for line in default_proc_status.splitlines():
1738            if line.startswith(b'SigIgn'):
1739                default_sig_ign_mask = line
1740                break
1741        else:
1742            self.skipTest("SigIgn not found in /proc/self/status.")
1743        restored_proc_status = subprocess.check_output(
1744                ['cat', '/proc/self/status'],
1745                restore_signals=True)
1746        for line in restored_proc_status.splitlines():
1747            if line.startswith(b'SigIgn'):
1748                restored_sig_ign_mask = line
1749                break
1750        self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask,
1751                            msg="restore_signals=True should've unblocked "
1752                            "SIGPIPE and friends.")
1753
1754    def test_start_new_session(self):
1755        # For code coverage of calling setsid().  We don't care if we get an
1756        # EPERM error from it depending on the test execution environment, that
1757        # still indicates that it was called.
1758        try:
1759            output = subprocess.check_output(
1760                    [sys.executable, "-c", "import os; print(os.getsid(0))"],
1761                    start_new_session=True)
1762        except OSError as e:
1763            if e.errno != errno.EPERM:
1764                raise
1765        else:
1766            parent_sid = os.getsid(0)
1767            child_sid = int(output)
1768            self.assertNotEqual(parent_sid, child_sid)
1769
1770    def test_run_abort(self):
1771        # returncode handles signal termination
1772        with support.SuppressCrashReport():
1773            p = subprocess.Popen([sys.executable, "-c",
1774                                  'import os; os.abort()'])
1775            p.wait()
1776        self.assertEqual(-p.returncode, signal.SIGABRT)
1777
1778    def test_CalledProcessError_str_signal(self):
1779        err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
1780        error_string = str(err)
1781        # We're relying on the repr() of the signal.Signals intenum to provide
1782        # the word signal, the signal name and the numeric value.
1783        self.assertIn("signal", error_string.lower())
1784        # We're not being specific about the signal name as some signals have
1785        # multiple names and which name is revealed can vary.
1786        self.assertIn("SIG", error_string)
1787        self.assertIn(str(signal.SIGABRT), error_string)
1788
1789    def test_CalledProcessError_str_unknown_signal(self):
1790        err = subprocess.CalledProcessError(-9876543, "fake cmd")
1791        error_string = str(err)
1792        self.assertIn("unknown signal 9876543.", error_string)
1793
1794    def test_CalledProcessError_str_non_zero(self):
1795        err = subprocess.CalledProcessError(2, "fake cmd")
1796        error_string = str(err)
1797        self.assertIn("non-zero exit status 2.", error_string)
1798
1799    def test_preexec(self):
1800        # DISCLAIMER: Setting environment variables is *not* a good use
1801        # of a preexec_fn.  This is merely a test.
1802        p = subprocess.Popen([sys.executable, "-c",
1803                              'import sys,os;'
1804                              'sys.stdout.write(os.getenv("FRUIT"))'],
1805                             stdout=subprocess.PIPE,
1806                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
1807        with p:
1808            self.assertEqual(p.stdout.read(), b"apple")
1809
1810    def test_preexec_exception(self):
1811        def raise_it():
1812            raise ValueError("What if two swallows carried a coconut?")
1813        try:
1814            p = subprocess.Popen([sys.executable, "-c", ""],
1815                                 preexec_fn=raise_it)
1816        except subprocess.SubprocessError as e:
1817            self.assertTrue(
1818                    subprocess._posixsubprocess,
1819                    "Expected a ValueError from the preexec_fn")
1820        except ValueError as e:
1821            self.assertIn("coconut", e.args[0])
1822        else:
1823            self.fail("Exception raised by preexec_fn did not make it "
1824                      "to the parent process.")
1825
1826    class _TestExecuteChildPopen(subprocess.Popen):
1827        """Used to test behavior at the end of _execute_child."""
1828        def __init__(self, testcase, *args, **kwargs):
1829            self._testcase = testcase
1830            subprocess.Popen.__init__(self, *args, **kwargs)
1831
1832        def _execute_child(self, *args, **kwargs):
1833            try:
1834                subprocess.Popen._execute_child(self, *args, **kwargs)
1835            finally:
1836                # Open a bunch of file descriptors and verify that
1837                # none of them are the same as the ones the Popen
1838                # instance is using for stdin/stdout/stderr.
1839                devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
1840                               for _ in range(8)]
1841                try:
1842                    for fd in devzero_fds:
1843                        self._testcase.assertNotIn(
1844                                fd, (self.stdin.fileno(), self.stdout.fileno(),
1845                                     self.stderr.fileno()),
1846                                msg="At least one fd was closed early.")
1847                finally:
1848                    for fd in devzero_fds:
1849                        os.close(fd)
1850
1851    @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
1852    def test_preexec_errpipe_does_not_double_close_pipes(self):
1853        """Issue16140: Don't double close pipes on preexec error."""
1854
1855        def raise_it():
1856            raise subprocess.SubprocessError(
1857                    "force the _execute_child() errpipe_data path.")
1858
1859        with self.assertRaises(subprocess.SubprocessError):
1860            self._TestExecuteChildPopen(
1861                        self, ZERO_RETURN_CMD,
1862                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1863                        stderr=subprocess.PIPE, preexec_fn=raise_it)
1864
1865    def test_preexec_gc_module_failure(self):
1866        # This tests the code that disables garbage collection if the child
1867        # process will execute any Python.
1868        def raise_runtime_error():
1869            raise RuntimeError("this shouldn't escape")
1870        enabled = gc.isenabled()
1871        orig_gc_disable = gc.disable
1872        orig_gc_isenabled = gc.isenabled
1873        try:
1874            gc.disable()
1875            self.assertFalse(gc.isenabled())
1876            subprocess.call([sys.executable, '-c', ''],
1877                            preexec_fn=lambda: None)
1878            self.assertFalse(gc.isenabled(),
1879                             "Popen enabled gc when it shouldn't.")
1880
1881            gc.enable()
1882            self.assertTrue(gc.isenabled())
1883            subprocess.call([sys.executable, '-c', ''],
1884                            preexec_fn=lambda: None)
1885            self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
1886
1887            gc.disable = raise_runtime_error
1888            self.assertRaises(RuntimeError, subprocess.Popen,
1889                              [sys.executable, '-c', ''],
1890                              preexec_fn=lambda: None)
1891
1892            del gc.isenabled  # force an AttributeError
1893            self.assertRaises(AttributeError, subprocess.Popen,
1894                              [sys.executable, '-c', ''],
1895                              preexec_fn=lambda: None)
1896        finally:
1897            gc.disable = orig_gc_disable
1898            gc.isenabled = orig_gc_isenabled
1899            if not enabled:
1900                gc.disable()
1901
1902    @unittest.skipIf(
1903        sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
1904    def test_preexec_fork_failure(self):
1905        # The internal code did not preserve the previous exception when
1906        # re-enabling garbage collection
1907        try:
1908            from resource import getrlimit, setrlimit, RLIMIT_NPROC
1909        except ImportError as err:
1910            self.skipTest(err)  # RLIMIT_NPROC is specific to Linux and BSD
1911        limits = getrlimit(RLIMIT_NPROC)
1912        [_, hard] = limits
1913        setrlimit(RLIMIT_NPROC, (0, hard))
1914        self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
1915        try:
1916            subprocess.call([sys.executable, '-c', ''],
1917                            preexec_fn=lambda: None)
1918        except BlockingIOError:
1919            # Forking should raise EAGAIN, translated to BlockingIOError
1920            pass
1921        else:
1922            self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
1923
1924    def test_args_string(self):
1925        # args is a string
1926        fd, fname = tempfile.mkstemp()
1927        # reopen in text mode
1928        with open(fd, "w", errors="surrogateescape") as fobj:
1929            fobj.write("#!%s\n" % support.unix_shell)
1930            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
1931                       sys.executable)
1932        os.chmod(fname, 0o700)
1933        p = subprocess.Popen(fname)
1934        p.wait()
1935        os.remove(fname)
1936        self.assertEqual(p.returncode, 47)
1937
1938    def test_invalid_args(self):
1939        # invalid arguments should raise ValueError
1940        self.assertRaises(ValueError, subprocess.call,
1941                          [sys.executable, "-c",
1942                           "import sys; sys.exit(47)"],
1943                          startupinfo=47)
1944        self.assertRaises(ValueError, subprocess.call,
1945                          [sys.executable, "-c",
1946                           "import sys; sys.exit(47)"],
1947                          creationflags=47)
1948
1949    def test_shell_sequence(self):
1950        # Run command through the shell (sequence)
1951        newenv = os.environ.copy()
1952        newenv["FRUIT"] = "apple"
1953        p = subprocess.Popen(["echo $FRUIT"], shell=1,
1954                             stdout=subprocess.PIPE,
1955                             env=newenv)
1956        with p:
1957            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
1958
1959    def test_shell_string(self):
1960        # Run command through the shell (string)
1961        newenv = os.environ.copy()
1962        newenv["FRUIT"] = "apple"
1963        p = subprocess.Popen("echo $FRUIT", shell=1,
1964                             stdout=subprocess.PIPE,
1965                             env=newenv)
1966        with p:
1967            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
1968
1969    def test_call_string(self):
1970        # call() function with string argument on UNIX
1971        fd, fname = tempfile.mkstemp()
1972        # reopen in text mode
1973        with open(fd, "w", errors="surrogateescape") as fobj:
1974            fobj.write("#!%s\n" % support.unix_shell)
1975            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
1976                       sys.executable)
1977        os.chmod(fname, 0o700)
1978        rc = subprocess.call(fname)
1979        os.remove(fname)
1980        self.assertEqual(rc, 47)
1981
1982    def test_specific_shell(self):
1983        # Issue #9265: Incorrect name passed as arg[0].
1984        shells = []
1985        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
1986            for name in ['bash', 'ksh']:
1987                sh = os.path.join(prefix, name)
1988                if os.path.isfile(sh):
1989                    shells.append(sh)
1990        if not shells: # Will probably work for any shell but csh.
1991            self.skipTest("bash or ksh required for this test")
1992        sh = '/bin/sh'
1993        if os.path.isfile(sh) and not os.path.islink(sh):
1994            # Test will fail if /bin/sh is a symlink to csh.
1995            shells.append(sh)
1996        for sh in shells:
1997            p = subprocess.Popen("echo $0", executable=sh, shell=True,
1998                                 stdout=subprocess.PIPE)
1999            with p:
2000                self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
2001
2002    def _kill_process(self, method, *args):
2003        # Do not inherit file handles from the parent.
2004        # It should fix failures on some platforms.
2005        # Also set the SIGINT handler to the default to make sure it's not
2006        # being ignored (some tests rely on that.)
2007        old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
2008        try:
2009            p = subprocess.Popen([sys.executable, "-c", """if 1:
2010                                 import sys, time
2011                                 sys.stdout.write('x\\n')
2012                                 sys.stdout.flush()
2013                                 time.sleep(30)
2014                                 """],
2015                                 close_fds=True,
2016                                 stdin=subprocess.PIPE,
2017                                 stdout=subprocess.PIPE,
2018                                 stderr=subprocess.PIPE)
2019        finally:
2020            signal.signal(signal.SIGINT, old_handler)
2021        # Wait for the interpreter to be completely initialized before
2022        # sending any signal.
2023        p.stdout.read(1)
2024        getattr(p, method)(*args)
2025        return p
2026
2027    @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
2028                     "Due to known OS bug (issue #16762)")
2029    def _kill_dead_process(self, method, *args):
2030        # Do not inherit file handles from the parent.
2031        # It should fix failures on some platforms.
2032        p = subprocess.Popen([sys.executable, "-c", """if 1:
2033                             import sys, time
2034                             sys.stdout.write('x\\n')
2035                             sys.stdout.flush()
2036                             """],
2037                             close_fds=True,
2038                             stdin=subprocess.PIPE,
2039                             stdout=subprocess.PIPE,
2040                             stderr=subprocess.PIPE)
2041        # Wait for the interpreter to be completely initialized before
2042        # sending any signal.
2043        p.stdout.read(1)
2044        # The process should end after this
2045        time.sleep(1)
2046        # This shouldn't raise even though the child is now dead
2047        getattr(p, method)(*args)
2048        p.communicate()
2049
2050    def test_send_signal(self):
2051        p = self._kill_process('send_signal', signal.SIGINT)
2052        _, stderr = p.communicate()
2053        self.assertIn(b'KeyboardInterrupt', stderr)
2054        self.assertNotEqual(p.wait(), 0)
2055
2056    def test_kill(self):
2057        p = self._kill_process('kill')
2058        _, stderr = p.communicate()
2059        self.assertStderrEqual(stderr, b'')
2060        self.assertEqual(p.wait(), -signal.SIGKILL)
2061
2062    def test_terminate(self):
2063        p = self._kill_process('terminate')
2064        _, stderr = p.communicate()
2065        self.assertStderrEqual(stderr, b'')
2066        self.assertEqual(p.wait(), -signal.SIGTERM)
2067
2068    def test_send_signal_dead(self):
2069        # Sending a signal to a dead process
2070        self._kill_dead_process('send_signal', signal.SIGINT)
2071
2072    def test_kill_dead(self):
2073        # Killing a dead process
2074        self._kill_dead_process('kill')
2075
2076    def test_terminate_dead(self):
2077        # Terminating a dead process
2078        self._kill_dead_process('terminate')
2079
2080    def _save_fds(self, save_fds):
2081        fds = []
2082        for fd in save_fds:
2083            inheritable = os.get_inheritable(fd)
2084            saved = os.dup(fd)
2085            fds.append((fd, saved, inheritable))
2086        return fds
2087
2088    def _restore_fds(self, fds):
2089        for fd, saved, inheritable in fds:
2090            os.dup2(saved, fd, inheritable=inheritable)
2091            os.close(saved)
2092
2093    def check_close_std_fds(self, fds):
2094        # Issue #9905: test that subprocess pipes still work properly with
2095        # some standard fds closed
2096        stdin = 0
2097        saved_fds = self._save_fds(fds)
2098        for fd, saved, inheritable in saved_fds:
2099            if fd == 0:
2100                stdin = saved
2101                break
2102        try:
2103            for fd in fds:
2104                os.close(fd)
2105            out, err = subprocess.Popen([sys.executable, "-c",
2106                              'import sys;'
2107                              'sys.stdout.write("apple");'
2108                              'sys.stdout.flush();'
2109                              'sys.stderr.write("orange")'],
2110                       stdin=stdin,
2111                       stdout=subprocess.PIPE,
2112                       stderr=subprocess.PIPE).communicate()
2113            err = support.strip_python_stderr(err)
2114            self.assertEqual((out, err), (b'apple', b'orange'))
2115        finally:
2116            self._restore_fds(saved_fds)
2117
2118    def test_close_fd_0(self):
2119        self.check_close_std_fds([0])
2120
2121    def test_close_fd_1(self):
2122        self.check_close_std_fds([1])
2123
2124    def test_close_fd_2(self):
2125        self.check_close_std_fds([2])
2126
2127    def test_close_fds_0_1(self):
2128        self.check_close_std_fds([0, 1])
2129
2130    def test_close_fds_0_2(self):
2131        self.check_close_std_fds([0, 2])
2132
2133    def test_close_fds_1_2(self):
2134        self.check_close_std_fds([1, 2])
2135
2136    def test_close_fds_0_1_2(self):
2137        # Issue #10806: test that subprocess pipes still work properly with
2138        # all standard fds closed.
2139        self.check_close_std_fds([0, 1, 2])
2140
2141    def test_small_errpipe_write_fd(self):
2142        """Issue #15798: Popen should work when stdio fds are available."""
2143        new_stdin = os.dup(0)
2144        new_stdout = os.dup(1)
2145        try:
2146            os.close(0)
2147            os.close(1)
2148
2149            # Side test: if errpipe_write fails to have its CLOEXEC
2150            # flag set this should cause the parent to think the exec
2151            # failed.  Extremely unlikely: everyone supports CLOEXEC.
2152            subprocess.Popen([
2153                    sys.executable, "-c",
2154                    "print('AssertionError:0:CLOEXEC failure.')"]).wait()
2155        finally:
2156            # Restore original stdin and stdout
2157            os.dup2(new_stdin, 0)
2158            os.dup2(new_stdout, 1)
2159            os.close(new_stdin)
2160            os.close(new_stdout)
2161
2162    def test_remapping_std_fds(self):
2163        # open up some temporary files
2164        temps = [tempfile.mkstemp() for i in range(3)]
2165        try:
2166            temp_fds = [fd for fd, fname in temps]
2167
2168            # unlink the files -- we won't need to reopen them
2169            for fd, fname in temps:
2170                os.unlink(fname)
2171
2172            # write some data to what will become stdin, and rewind
2173            os.write(temp_fds[1], b"STDIN")
2174            os.lseek(temp_fds[1], 0, 0)
2175
2176            # move the standard file descriptors out of the way
2177            saved_fds = self._save_fds(range(3))
2178            try:
2179                # duplicate the file objects over the standard fd's
2180                for fd, temp_fd in enumerate(temp_fds):
2181                    os.dup2(temp_fd, fd)
2182
2183                # now use those files in the "wrong" order, so that subprocess
2184                # has to rearrange them in the child
2185                p = subprocess.Popen([sys.executable, "-c",
2186                    'import sys; got = sys.stdin.read();'
2187                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2188                    stdin=temp_fds[1],
2189                    stdout=temp_fds[2],
2190                    stderr=temp_fds[0])
2191                p.wait()
2192            finally:
2193                self._restore_fds(saved_fds)
2194
2195            for fd in temp_fds:
2196                os.lseek(fd, 0, 0)
2197
2198            out = os.read(temp_fds[2], 1024)
2199            err = support.strip_python_stderr(os.read(temp_fds[0], 1024))
2200            self.assertEqual(out, b"got STDIN")
2201            self.assertEqual(err, b"err")
2202
2203        finally:
2204            for fd in temp_fds:
2205                os.close(fd)
2206
2207    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
2208        # open up some temporary files
2209        temps = [tempfile.mkstemp() for i in range(3)]
2210        temp_fds = [fd for fd, fname in temps]
2211        try:
2212            # unlink the files -- we won't need to reopen them
2213            for fd, fname in temps:
2214                os.unlink(fname)
2215
2216            # save a copy of the standard file descriptors
2217            saved_fds = self._save_fds(range(3))
2218            try:
2219                # duplicate the temp files over the standard fd's 0, 1, 2
2220                for fd, temp_fd in enumerate(temp_fds):
2221                    os.dup2(temp_fd, fd)
2222
2223                # write some data to what will become stdin, and rewind
2224                os.write(stdin_no, b"STDIN")
2225                os.lseek(stdin_no, 0, 0)
2226
2227                # now use those files in the given order, so that subprocess
2228                # has to rearrange them in the child
2229                p = subprocess.Popen([sys.executable, "-c",
2230                    'import sys; got = sys.stdin.read();'
2231                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2232                    stdin=stdin_no,
2233                    stdout=stdout_no,
2234                    stderr=stderr_no)
2235                p.wait()
2236
2237                for fd in temp_fds:
2238                    os.lseek(fd, 0, 0)
2239
2240                out = os.read(stdout_no, 1024)
2241                err = support.strip_python_stderr(os.read(stderr_no, 1024))
2242            finally:
2243                self._restore_fds(saved_fds)
2244
2245            self.assertEqual(out, b"got STDIN")
2246            self.assertEqual(err, b"err")
2247
2248        finally:
2249            for fd in temp_fds:
2250                os.close(fd)
2251
2252    # When duping fds, if there arises a situation where one of the fds is
2253    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
2254    # This tests all combinations of this.
2255    def test_swap_fds(self):
2256        self.check_swap_fds(0, 1, 2)
2257        self.check_swap_fds(0, 2, 1)
2258        self.check_swap_fds(1, 0, 2)
2259        self.check_swap_fds(1, 2, 0)
2260        self.check_swap_fds(2, 0, 1)
2261        self.check_swap_fds(2, 1, 0)
2262
2263    def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds):
2264        saved_fds = self._save_fds(range(3))
2265        try:
2266            for from_fd in from_fds:
2267                with tempfile.TemporaryFile() as f:
2268                    os.dup2(f.fileno(), from_fd)
2269
2270            fd_to_close = (set(range(3)) - set(from_fds)).pop()
2271            os.close(fd_to_close)
2272
2273            arg_names = ['stdin', 'stdout', 'stderr']
2274            kwargs = {}
2275            for from_fd, to_fd in zip(from_fds, to_fds):
2276                kwargs[arg_names[to_fd]] = from_fd
2277
2278            code = textwrap.dedent(r'''
2279                import os, sys
2280                skipped_fd = int(sys.argv[1])
2281                for fd in range(3):
2282                    if fd != skipped_fd:
2283                        os.write(fd, str(fd).encode('ascii'))
2284            ''')
2285
2286            skipped_fd = (set(range(3)) - set(to_fds)).pop()
2287
2288            rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)],
2289                                 **kwargs)
2290            self.assertEqual(rc, 0)
2291
2292            for from_fd, to_fd in zip(from_fds, to_fds):
2293                os.lseek(from_fd, 0, os.SEEK_SET)
2294                read_bytes = os.read(from_fd, 1024)
2295                read_fds = list(map(int, read_bytes.decode('ascii')))
2296                msg = textwrap.dedent(f"""
2297                    When testing {from_fds} to {to_fds} redirection,
2298                    parent descriptor {from_fd} got redirected
2299                    to descriptor(s) {read_fds} instead of descriptor {to_fd}.
2300                """)
2301                self.assertEqual([to_fd], read_fds, msg)
2302        finally:
2303            self._restore_fds(saved_fds)
2304
2305    # Check that subprocess can remap std fds correctly even
2306    # if one of them is closed (#32844).
2307    def test_swap_std_fds_with_one_closed(self):
2308        for from_fds in itertools.combinations(range(3), 2):
2309            for to_fds in itertools.permutations(range(3), 2):
2310                self._check_swap_std_fds_with_one_closed(from_fds, to_fds)
2311
2312    def test_surrogates_error_message(self):
2313        def prepare():
2314            raise ValueError("surrogate:\uDCff")
2315
2316        try:
2317            subprocess.call(
2318                ZERO_RETURN_CMD,
2319                preexec_fn=prepare)
2320        except ValueError as err:
2321            # Pure Python implementations keeps the message
2322            self.assertIsNone(subprocess._posixsubprocess)
2323            self.assertEqual(str(err), "surrogate:\uDCff")
2324        except subprocess.SubprocessError as err:
2325            # _posixsubprocess uses a default message
2326            self.assertIsNotNone(subprocess._posixsubprocess)
2327            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
2328        else:
2329            self.fail("Expected ValueError or subprocess.SubprocessError")
2330
2331    def test_undecodable_env(self):
2332        for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
2333            encoded_value = value.encode("ascii", "surrogateescape")
2334
2335            # test str with surrogates
2336            script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
2337            env = os.environ.copy()
2338            env[key] = value
2339            # Use C locale to get ASCII for the locale encoding to force
2340            # surrogate-escaping of \xFF in the child process
2341            env['LC_ALL'] = 'C'
2342            decoded_value = value
2343            stdout = subprocess.check_output(
2344                [sys.executable, "-c", script],
2345                env=env)
2346            stdout = stdout.rstrip(b'\n\r')
2347            self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
2348
2349            # test bytes
2350            key = key.encode("ascii", "surrogateescape")
2351            script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
2352            env = os.environ.copy()
2353            env[key] = encoded_value
2354            stdout = subprocess.check_output(
2355                [sys.executable, "-c", script],
2356                env=env)
2357            stdout = stdout.rstrip(b'\n\r')
2358            self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
2359
2360    def test_bytes_program(self):
2361        abs_program = os.fsencode(ZERO_RETURN_CMD[0])
2362        args = list(ZERO_RETURN_CMD[1:])
2363        path, program = os.path.split(ZERO_RETURN_CMD[0])
2364        program = os.fsencode(program)
2365
2366        # absolute bytes path
2367        exitcode = subprocess.call([abs_program]+args)
2368        self.assertEqual(exitcode, 0)
2369
2370        # absolute bytes path as a string
2371        cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8"))
2372        exitcode = subprocess.call(cmd, shell=True)
2373        self.assertEqual(exitcode, 0)
2374
2375        # bytes program, unicode PATH
2376        env = os.environ.copy()
2377        env["PATH"] = path
2378        exitcode = subprocess.call([program]+args, env=env)
2379        self.assertEqual(exitcode, 0)
2380
2381        # bytes program, bytes PATH
2382        envb = os.environb.copy()
2383        envb[b"PATH"] = os.fsencode(path)
2384        exitcode = subprocess.call([program]+args, env=envb)
2385        self.assertEqual(exitcode, 0)
2386
2387    def test_pipe_cloexec(self):
2388        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
2389        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2390
2391        p1 = subprocess.Popen([sys.executable, sleeper],
2392                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2393                              stderr=subprocess.PIPE, close_fds=False)
2394
2395        self.addCleanup(p1.communicate, b'')
2396
2397        p2 = subprocess.Popen([sys.executable, fd_status],
2398                              stdout=subprocess.PIPE, close_fds=False)
2399
2400        output, error = p2.communicate()
2401        result_fds = set(map(int, output.split(b',')))
2402        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
2403                            p1.stderr.fileno()])
2404
2405        self.assertFalse(result_fds & unwanted_fds,
2406                         "Expected no fds from %r to be open in child, "
2407                         "found %r" %
2408                              (unwanted_fds, result_fds & unwanted_fds))
2409
2410    def test_pipe_cloexec_real_tools(self):
2411        qcat = support.findfile("qcat.py", subdir="subprocessdata")
2412        qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
2413
2414        subdata = b'zxcvbn'
2415        data = subdata * 4 + b'\n'
2416
2417        p1 = subprocess.Popen([sys.executable, qcat],
2418                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2419                              close_fds=False)
2420
2421        p2 = subprocess.Popen([sys.executable, qgrep, subdata],
2422                              stdin=p1.stdout, stdout=subprocess.PIPE,
2423                              close_fds=False)
2424
2425        self.addCleanup(p1.wait)
2426        self.addCleanup(p2.wait)
2427        def kill_p1():
2428            try:
2429                p1.terminate()
2430            except ProcessLookupError:
2431                pass
2432        def kill_p2():
2433            try:
2434                p2.terminate()
2435            except ProcessLookupError:
2436                pass
2437        self.addCleanup(kill_p1)
2438        self.addCleanup(kill_p2)
2439
2440        p1.stdin.write(data)
2441        p1.stdin.close()
2442
2443        readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
2444
2445        self.assertTrue(readfiles, "The child hung")
2446        self.assertEqual(p2.stdout.read(), data)
2447
2448        p1.stdout.close()
2449        p2.stdout.close()
2450
2451    def test_close_fds(self):
2452        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2453
2454        fds = os.pipe()
2455        self.addCleanup(os.close, fds[0])
2456        self.addCleanup(os.close, fds[1])
2457
2458        open_fds = set(fds)
2459        # add a bunch more fds
2460        for _ in range(9):
2461            fd = os.open(os.devnull, os.O_RDONLY)
2462            self.addCleanup(os.close, fd)
2463            open_fds.add(fd)
2464
2465        for fd in open_fds:
2466            os.set_inheritable(fd, True)
2467
2468        p = subprocess.Popen([sys.executable, fd_status],
2469                             stdout=subprocess.PIPE, close_fds=False)
2470        output, ignored = p.communicate()
2471        remaining_fds = set(map(int, output.split(b',')))
2472
2473        self.assertEqual(remaining_fds & open_fds, open_fds,
2474                         "Some fds were closed")
2475
2476        p = subprocess.Popen([sys.executable, fd_status],
2477                             stdout=subprocess.PIPE, close_fds=True)
2478        output, ignored = p.communicate()
2479        remaining_fds = set(map(int, output.split(b',')))
2480
2481        self.assertFalse(remaining_fds & open_fds,
2482                         "Some fds were left open")
2483        self.assertIn(1, remaining_fds, "Subprocess failed")
2484
2485        # Keep some of the fd's we opened open in the subprocess.
2486        # This tests _posixsubprocess.c's proper handling of fds_to_keep.
2487        fds_to_keep = set(open_fds.pop() for _ in range(8))
2488        p = subprocess.Popen([sys.executable, fd_status],
2489                             stdout=subprocess.PIPE, close_fds=True,
2490                             pass_fds=fds_to_keep)
2491        output, ignored = p.communicate()
2492        remaining_fds = set(map(int, output.split(b',')))
2493
2494        self.assertFalse((remaining_fds - fds_to_keep) & open_fds,
2495                         "Some fds not in pass_fds were left open")
2496        self.assertIn(1, remaining_fds, "Subprocess failed")
2497
2498
2499    @unittest.skipIf(sys.platform.startswith("freebsd") and
2500                     os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
2501                     "Requires fdescfs mounted on /dev/fd on FreeBSD.")
2502    def test_close_fds_when_max_fd_is_lowered(self):
2503        """Confirm that issue21618 is fixed (may fail under valgrind)."""
2504        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2505
2506        # This launches the meat of the test in a child process to
2507        # avoid messing with the larger unittest processes maximum
2508        # number of file descriptors.
2509        #  This process launches:
2510        #  +--> Process that lowers its RLIMIT_NOFILE aftr setting up
2511        #    a bunch of high open fds above the new lower rlimit.
2512        #    Those are reported via stdout before launching a new
2513        #    process with close_fds=False to run the actual test:
2514        #    +--> The TEST: This one launches a fd_status.py
2515        #      subprocess with close_fds=True so we can find out if
2516        #      any of the fds above the lowered rlimit are still open.
2517        p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
2518        '''
2519        import os, resource, subprocess, sys, textwrap
2520        open_fds = set()
2521        # Add a bunch more fds to pass down.
2522        for _ in range(40):
2523            fd = os.open(os.devnull, os.O_RDONLY)
2524            open_fds.add(fd)
2525
2526        # Leave a two pairs of low ones available for use by the
2527        # internal child error pipe and the stdout pipe.
2528        # We also leave 10 more open as some Python buildbots run into
2529        # "too many open files" errors during the test if we do not.
2530        for fd in sorted(open_fds)[:14]:
2531            os.close(fd)
2532            open_fds.remove(fd)
2533
2534        for fd in open_fds:
2535            #self.addCleanup(os.close, fd)
2536            os.set_inheritable(fd, True)
2537
2538        max_fd_open = max(open_fds)
2539
2540        # Communicate the open_fds to the parent unittest.TestCase process.
2541        print(','.join(map(str, sorted(open_fds))))
2542        sys.stdout.flush()
2543
2544        rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
2545        try:
2546            # 29 is lower than the highest fds we are leaving open.
2547            resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
2548            # Launch a new Python interpreter with our low fd rlim_cur that
2549            # inherits open fds above that limit.  It then uses subprocess
2550            # with close_fds=True to get a report of open fds in the child.
2551            # An explicit list of fds to check is passed to fd_status.py as
2552            # letting fd_status rely on its default logic would miss the
2553            # fds above rlim_cur as it normally only checks up to that limit.
2554            subprocess.Popen(
2555                [sys.executable, '-c',
2556                 textwrap.dedent("""
2557                     import subprocess, sys
2558                     subprocess.Popen([sys.executable, %r] +
2559                                      [str(x) for x in range({max_fd})],
2560                                      close_fds=True).wait()
2561                     """.format(max_fd=max_fd_open+1))],
2562                close_fds=False).wait()
2563        finally:
2564            resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
2565        ''' % fd_status)], stdout=subprocess.PIPE)
2566
2567        output, unused_stderr = p.communicate()
2568        output_lines = output.splitlines()
2569        self.assertEqual(len(output_lines), 2,
2570                         msg="expected exactly two lines of output:\n%r" % output)
2571        opened_fds = set(map(int, output_lines[0].strip().split(b',')))
2572        remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
2573
2574        self.assertFalse(remaining_fds & opened_fds,
2575                         msg="Some fds were left open.")
2576
2577
2578    # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
2579    # descriptor of a pipe closed in the parent process is valid in the
2580    # child process according to fstat(), but the mode of the file
2581    # descriptor is invalid, and read or write raise an error.
2582    @support.requires_mac_ver(10, 5)
2583    def test_pass_fds(self):
2584        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2585
2586        open_fds = set()
2587
2588        for x in range(5):
2589            fds = os.pipe()
2590            self.addCleanup(os.close, fds[0])
2591            self.addCleanup(os.close, fds[1])
2592            os.set_inheritable(fds[0], True)
2593            os.set_inheritable(fds[1], True)
2594            open_fds.update(fds)
2595
2596        for fd in open_fds:
2597            p = subprocess.Popen([sys.executable, fd_status],
2598                                 stdout=subprocess.PIPE, close_fds=True,
2599                                 pass_fds=(fd, ))
2600            output, ignored = p.communicate()
2601
2602            remaining_fds = set(map(int, output.split(b',')))
2603            to_be_closed = open_fds - {fd}
2604
2605            self.assertIn(fd, remaining_fds, "fd to be passed not passed")
2606            self.assertFalse(remaining_fds & to_be_closed,
2607                             "fd to be closed passed")
2608
2609            # pass_fds overrides close_fds with a warning.
2610            with self.assertWarns(RuntimeWarning) as context:
2611                self.assertFalse(subprocess.call(
2612                        ZERO_RETURN_CMD,
2613                        close_fds=False, pass_fds=(fd, )))
2614            self.assertIn('overriding close_fds', str(context.warning))
2615
2616    def test_pass_fds_inheritable(self):
2617        script = support.findfile("fd_status.py", subdir="subprocessdata")
2618
2619        inheritable, non_inheritable = os.pipe()
2620        self.addCleanup(os.close, inheritable)
2621        self.addCleanup(os.close, non_inheritable)
2622        os.set_inheritable(inheritable, True)
2623        os.set_inheritable(non_inheritable, False)
2624        pass_fds = (inheritable, non_inheritable)
2625        args = [sys.executable, script]
2626        args += list(map(str, pass_fds))
2627
2628        p = subprocess.Popen(args,
2629                             stdout=subprocess.PIPE, close_fds=True,
2630                             pass_fds=pass_fds)
2631        output, ignored = p.communicate()
2632        fds = set(map(int, output.split(b',')))
2633
2634        # the inheritable file descriptor must be inherited, so its inheritable
2635        # flag must be set in the child process after fork() and before exec()
2636        self.assertEqual(fds, set(pass_fds), "output=%a" % output)
2637
2638        # inheritable flag must not be changed in the parent process
2639        self.assertEqual(os.get_inheritable(inheritable), True)
2640        self.assertEqual(os.get_inheritable(non_inheritable), False)
2641
2642
2643    # bpo-32270: Ensure that descriptors specified in pass_fds
2644    # are inherited even if they are used in redirections.
2645    # Contributed by @izbyshev.
2646    def test_pass_fds_redirected(self):
2647        """Regression test for https://bugs.python.org/issue32270."""
2648        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2649        pass_fds = []
2650        for _ in range(2):
2651            fd = os.open(os.devnull, os.O_RDWR)
2652            self.addCleanup(os.close, fd)
2653            pass_fds.append(fd)
2654
2655        stdout_r, stdout_w = os.pipe()
2656        self.addCleanup(os.close, stdout_r)
2657        self.addCleanup(os.close, stdout_w)
2658        pass_fds.insert(1, stdout_w)
2659
2660        with subprocess.Popen([sys.executable, fd_status],
2661                              stdin=pass_fds[0],
2662                              stdout=pass_fds[1],
2663                              stderr=pass_fds[2],
2664                              close_fds=True,
2665                              pass_fds=pass_fds):
2666            output = os.read(stdout_r, 1024)
2667        fds = {int(num) for num in output.split(b',')}
2668
2669        self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}")
2670
2671
2672    def test_stdout_stdin_are_single_inout_fd(self):
2673        with io.open(os.devnull, "r+") as inout:
2674            p = subprocess.Popen(ZERO_RETURN_CMD,
2675                                 stdout=inout, stdin=inout)
2676            p.wait()
2677
2678    def test_stdout_stderr_are_single_inout_fd(self):
2679        with io.open(os.devnull, "r+") as inout:
2680            p = subprocess.Popen(ZERO_RETURN_CMD,
2681                                 stdout=inout, stderr=inout)
2682            p.wait()
2683
2684    def test_stderr_stdin_are_single_inout_fd(self):
2685        with io.open(os.devnull, "r+") as inout:
2686            p = subprocess.Popen(ZERO_RETURN_CMD,
2687                                 stderr=inout, stdin=inout)
2688            p.wait()
2689
2690    def test_wait_when_sigchild_ignored(self):
2691        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
2692        sigchild_ignore = support.findfile("sigchild_ignore.py",
2693                                           subdir="subprocessdata")
2694        p = subprocess.Popen([sys.executable, sigchild_ignore],
2695                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2696        stdout, stderr = p.communicate()
2697        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
2698                         " non-zero with this error:\n%s" %
2699                         stderr.decode('utf-8'))
2700
2701    def test_select_unbuffered(self):
2702        # Issue #11459: bufsize=0 should really set the pipes as
2703        # unbuffered (and therefore let select() work properly).
2704        select = support.import_module("select")
2705        p = subprocess.Popen([sys.executable, "-c",
2706                              'import sys;'
2707                              'sys.stdout.write("apple")'],
2708                             stdout=subprocess.PIPE,
2709                             bufsize=0)
2710        f = p.stdout
2711        self.addCleanup(f.close)
2712        try:
2713            self.assertEqual(f.read(4), b"appl")
2714            self.assertIn(f, select.select([f], [], [], 0.0)[0])
2715        finally:
2716            p.wait()
2717
2718    def test_zombie_fast_process_del(self):
2719        # Issue #12650: on Unix, if Popen.__del__() was called before the
2720        # process exited, it wouldn't be added to subprocess._active, and would
2721        # remain a zombie.
2722        # spawn a Popen, and delete its reference before it exits
2723        p = subprocess.Popen([sys.executable, "-c",
2724                              'import sys, time;'
2725                              'time.sleep(0.2)'],
2726                             stdout=subprocess.PIPE,
2727                             stderr=subprocess.PIPE)
2728        self.addCleanup(p.stdout.close)
2729        self.addCleanup(p.stderr.close)
2730        ident = id(p)
2731        pid = p.pid
2732        with support.check_warnings(('', ResourceWarning)):
2733            p = None
2734
2735        if mswindows:
2736            # subprocess._active is not used on Windows and is set to None.
2737            self.assertIsNone(subprocess._active)
2738        else:
2739            # check that p is in the active processes list
2740            self.assertIn(ident, [id(o) for o in subprocess._active])
2741
2742    def test_leak_fast_process_del_killed(self):
2743        # Issue #12650: on Unix, if Popen.__del__() was called before the
2744        # process exited, and the process got killed by a signal, it would never
2745        # be removed from subprocess._active, which triggered a FD and memory
2746        # leak.
2747        # spawn a Popen, delete its reference and kill it
2748        p = subprocess.Popen([sys.executable, "-c",
2749                              'import time;'
2750                              'time.sleep(3)'],
2751                             stdout=subprocess.PIPE,
2752                             stderr=subprocess.PIPE)
2753        self.addCleanup(p.stdout.close)
2754        self.addCleanup(p.stderr.close)
2755        ident = id(p)
2756        pid = p.pid
2757        with support.check_warnings(('', ResourceWarning)):
2758            p = None
2759
2760        os.kill(pid, signal.SIGKILL)
2761        if mswindows:
2762            # subprocess._active is not used on Windows and is set to None.
2763            self.assertIsNone(subprocess._active)
2764        else:
2765            # check that p is in the active processes list
2766            self.assertIn(ident, [id(o) for o in subprocess._active])
2767
2768        # let some time for the process to exit, and create a new Popen: this
2769        # should trigger the wait() of p
2770        time.sleep(0.2)
2771        with self.assertRaises(OSError):
2772            with subprocess.Popen(NONEXISTING_CMD,
2773                                  stdout=subprocess.PIPE,
2774                                  stderr=subprocess.PIPE) as proc:
2775                pass
2776        # p should have been wait()ed on, and removed from the _active list
2777        self.assertRaises(OSError, os.waitpid, pid, 0)
2778        if mswindows:
2779            # subprocess._active is not used on Windows and is set to None.
2780            self.assertIsNone(subprocess._active)
2781        else:
2782            self.assertNotIn(ident, [id(o) for o in subprocess._active])
2783
2784    def test_close_fds_after_preexec(self):
2785        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2786
2787        # this FD is used as dup2() target by preexec_fn, and should be closed
2788        # in the child process
2789        fd = os.dup(1)
2790        self.addCleanup(os.close, fd)
2791
2792        p = subprocess.Popen([sys.executable, fd_status],
2793                             stdout=subprocess.PIPE, close_fds=True,
2794                             preexec_fn=lambda: os.dup2(1, fd))
2795        output, ignored = p.communicate()
2796
2797        remaining_fds = set(map(int, output.split(b',')))
2798
2799        self.assertNotIn(fd, remaining_fds)
2800
2801    @support.cpython_only
2802    def test_fork_exec(self):
2803        # Issue #22290: fork_exec() must not crash on memory allocation failure
2804        # or other errors
2805        import _posixsubprocess
2806        gc_enabled = gc.isenabled()
2807        try:
2808            # Use a preexec function and enable the garbage collector
2809            # to force fork_exec() to re-enable the garbage collector
2810            # on error.
2811            func = lambda: None
2812            gc.enable()
2813
2814            for args, exe_list, cwd, env_list in (
2815                (123,      [b"exe"], None, [b"env"]),
2816                ([b"arg"], 123,      None, [b"env"]),
2817                ([b"arg"], [b"exe"], 123,  [b"env"]),
2818                ([b"arg"], [b"exe"], None, 123),
2819            ):
2820                with self.assertRaises(TypeError):
2821                    _posixsubprocess.fork_exec(
2822                        args, exe_list,
2823                        True, (), cwd, env_list,
2824                        -1, -1, -1, -1,
2825                        1, 2, 3, 4,
2826                        True, True, func)
2827        finally:
2828            if not gc_enabled:
2829                gc.disable()
2830
2831    @support.cpython_only
2832    def test_fork_exec_sorted_fd_sanity_check(self):
2833        # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
2834        import _posixsubprocess
2835        class BadInt:
2836            first = True
2837            def __init__(self, value):
2838                self.value = value
2839            def __int__(self):
2840                if self.first:
2841                    self.first = False
2842                    return self.value
2843                raise ValueError
2844
2845        gc_enabled = gc.isenabled()
2846        try:
2847            gc.enable()
2848
2849            for fds_to_keep in (
2850                (-1, 2, 3, 4, 5),  # Negative number.
2851                ('str', 4),  # Not an int.
2852                (18, 23, 42, 2**63),  # Out of range.
2853                (5, 4),  # Not sorted.
2854                (6, 7, 7, 8),  # Duplicate.
2855                (BadInt(1), BadInt(2)),
2856            ):
2857                with self.assertRaises(
2858                        ValueError,
2859                        msg='fds_to_keep={}'.format(fds_to_keep)) as c:
2860                    _posixsubprocess.fork_exec(
2861                        [b"false"], [b"false"],
2862                        True, fds_to_keep, None, [b"env"],
2863                        -1, -1, -1, -1,
2864                        1, 2, 3, 4,
2865                        True, True, None)
2866                self.assertIn('fds_to_keep', str(c.exception))
2867        finally:
2868            if not gc_enabled:
2869                gc.disable()
2870
2871    def test_communicate_BrokenPipeError_stdin_close(self):
2872        # By not setting stdout or stderr or a timeout we force the fast path
2873        # that just calls _stdin_write() internally due to our mock.
2874        proc = subprocess.Popen(ZERO_RETURN_CMD)
2875        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2876            mock_proc_stdin.close.side_effect = BrokenPipeError
2877            proc.communicate()  # Should swallow BrokenPipeError from close.
2878            mock_proc_stdin.close.assert_called_with()
2879
2880    def test_communicate_BrokenPipeError_stdin_write(self):
2881        # By not setting stdout or stderr or a timeout we force the fast path
2882        # that just calls _stdin_write() internally due to our mock.
2883        proc = subprocess.Popen(ZERO_RETURN_CMD)
2884        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2885            mock_proc_stdin.write.side_effect = BrokenPipeError
2886            proc.communicate(b'stuff')  # Should swallow the BrokenPipeError.
2887            mock_proc_stdin.write.assert_called_once_with(b'stuff')
2888            mock_proc_stdin.close.assert_called_once_with()
2889
2890    def test_communicate_BrokenPipeError_stdin_flush(self):
2891        # Setting stdin and stdout forces the ._communicate() code path.
2892        # python -h exits faster than python -c pass (but spams stdout).
2893        proc = subprocess.Popen([sys.executable, '-h'],
2894                                stdin=subprocess.PIPE,
2895                                stdout=subprocess.PIPE)
2896        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
2897                open(os.devnull, 'wb') as dev_null:
2898            mock_proc_stdin.flush.side_effect = BrokenPipeError
2899            # because _communicate registers a selector using proc.stdin...
2900            mock_proc_stdin.fileno.return_value = dev_null.fileno()
2901            # _communicate() should swallow BrokenPipeError from flush.
2902            proc.communicate(b'stuff')
2903            mock_proc_stdin.flush.assert_called_once_with()
2904
2905    def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
2906        # Setting stdin and stdout forces the ._communicate() code path.
2907        # python -h exits faster than python -c pass (but spams stdout).
2908        proc = subprocess.Popen([sys.executable, '-h'],
2909                                stdin=subprocess.PIPE,
2910                                stdout=subprocess.PIPE)
2911        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2912            mock_proc_stdin.close.side_effect = BrokenPipeError
2913            # _communicate() should swallow BrokenPipeError from close.
2914            proc.communicate(timeout=999)
2915            mock_proc_stdin.close.assert_called_once_with()
2916
2917    @unittest.skipUnless(_testcapi is not None
2918                         and hasattr(_testcapi, 'W_STOPCODE'),
2919                         'need _testcapi.W_STOPCODE')
2920    def test_stopped(self):
2921        """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
2922        args = ZERO_RETURN_CMD
2923        proc = subprocess.Popen(args)
2924
2925        # Wait until the real process completes to avoid zombie process
2926        pid = proc.pid
2927        pid, status = os.waitpid(pid, 0)
2928        self.assertEqual(status, 0)
2929
2930        status = _testcapi.W_STOPCODE(3)
2931        with mock.patch('subprocess.os.waitpid', return_value=(pid, status)):
2932            returncode = proc.wait()
2933
2934        self.assertEqual(returncode, -3)
2935
2936    def test_communicate_repeated_call_after_stdout_close(self):
2937        proc = subprocess.Popen([sys.executable, '-c',
2938                                 'import os, time; os.close(1), time.sleep(2)'],
2939                                stdout=subprocess.PIPE)
2940        while True:
2941            try:
2942                proc.communicate(timeout=0.1)
2943                return
2944            except subprocess.TimeoutExpired:
2945                pass
2946
2947
2948@unittest.skipUnless(mswindows, "Windows specific tests")
2949class Win32ProcessTestCase(BaseTestCase):
2950
2951    def test_startupinfo(self):
2952        # startupinfo argument
2953        # We uses hardcoded constants, because we do not want to
2954        # depend on win32all.
2955        STARTF_USESHOWWINDOW = 1
2956        SW_MAXIMIZE = 3
2957        startupinfo = subprocess.STARTUPINFO()
2958        startupinfo.dwFlags = STARTF_USESHOWWINDOW
2959        startupinfo.wShowWindow = SW_MAXIMIZE
2960        # Since Python is a console process, it won't be affected
2961        # by wShowWindow, but the argument should be silently
2962        # ignored
2963        subprocess.call(ZERO_RETURN_CMD,
2964                        startupinfo=startupinfo)
2965
2966    def test_startupinfo_keywords(self):
2967        # startupinfo argument
2968        # We use hardcoded constants, because we do not want to
2969        # depend on win32all.
2970        STARTF_USERSHOWWINDOW = 1
2971        SW_MAXIMIZE = 3
2972        startupinfo = subprocess.STARTUPINFO(
2973            dwFlags=STARTF_USERSHOWWINDOW,
2974            wShowWindow=SW_MAXIMIZE
2975        )
2976        # Since Python is a console process, it won't be affected
2977        # by wShowWindow, but the argument should be silently
2978        # ignored
2979        subprocess.call(ZERO_RETURN_CMD,
2980                        startupinfo=startupinfo)
2981
2982    def test_startupinfo_copy(self):
2983        # bpo-34044: Popen must not modify input STARTUPINFO structure
2984        startupinfo = subprocess.STARTUPINFO()
2985        startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
2986        startupinfo.wShowWindow = subprocess.SW_HIDE
2987
2988        # Call Popen() twice with the same startupinfo object to make sure
2989        # that it's not modified
2990        for _ in range(2):
2991            cmd = ZERO_RETURN_CMD
2992            with open(os.devnull, 'w') as null:
2993                proc = subprocess.Popen(cmd,
2994                                        stdout=null,
2995                                        stderr=subprocess.STDOUT,
2996                                        startupinfo=startupinfo)
2997                with proc:
2998                    proc.communicate()
2999                self.assertEqual(proc.returncode, 0)
3000
3001            self.assertEqual(startupinfo.dwFlags,
3002                             subprocess.STARTF_USESHOWWINDOW)
3003            self.assertIsNone(startupinfo.hStdInput)
3004            self.assertIsNone(startupinfo.hStdOutput)
3005            self.assertIsNone(startupinfo.hStdError)
3006            self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE)
3007            self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []})
3008
3009    def test_creationflags(self):
3010        # creationflags argument
3011        CREATE_NEW_CONSOLE = 16
3012        sys.stderr.write("    a DOS box should flash briefly ...\n")
3013        subprocess.call(sys.executable +
3014                        ' -c "import time; time.sleep(0.25)"',
3015                        creationflags=CREATE_NEW_CONSOLE)
3016
3017    def test_invalid_args(self):
3018        # invalid arguments should raise ValueError
3019        self.assertRaises(ValueError, subprocess.call,
3020                          [sys.executable, "-c",
3021                           "import sys; sys.exit(47)"],
3022                          preexec_fn=lambda: 1)
3023
3024    @support.cpython_only
3025    def test_issue31471(self):
3026        # There shouldn't be an assertion failure in Popen() in case the env
3027        # argument has a bad keys() method.
3028        class BadEnv(dict):
3029            keys = None
3030        with self.assertRaises(TypeError):
3031            subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv())
3032
3033    def test_close_fds(self):
3034        # close file descriptors
3035        rc = subprocess.call([sys.executable, "-c",
3036                              "import sys; sys.exit(47)"],
3037                              close_fds=True)
3038        self.assertEqual(rc, 47)
3039
3040    def test_close_fds_with_stdio(self):
3041        import msvcrt
3042
3043        fds = os.pipe()
3044        self.addCleanup(os.close, fds[0])
3045        self.addCleanup(os.close, fds[1])
3046
3047        handles = []
3048        for fd in fds:
3049            os.set_inheritable(fd, True)
3050            handles.append(msvcrt.get_osfhandle(fd))
3051
3052        p = subprocess.Popen([sys.executable, "-c",
3053                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3054                             stdout=subprocess.PIPE, close_fds=False)
3055        stdout, stderr = p.communicate()
3056        self.assertEqual(p.returncode, 0)
3057        int(stdout.strip())  # Check that stdout is an integer
3058
3059        p = subprocess.Popen([sys.executable, "-c",
3060                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3061                             stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
3062        stdout, stderr = p.communicate()
3063        self.assertEqual(p.returncode, 1)
3064        self.assertIn(b"OSError", stderr)
3065
3066        # The same as the previous call, but with an empty handle_list
3067        handle_list = []
3068        startupinfo = subprocess.STARTUPINFO()
3069        startupinfo.lpAttributeList = {"handle_list": handle_list}
3070        p = subprocess.Popen([sys.executable, "-c",
3071                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3072                             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3073                             startupinfo=startupinfo, close_fds=True)
3074        stdout, stderr = p.communicate()
3075        self.assertEqual(p.returncode, 1)
3076        self.assertIn(b"OSError", stderr)
3077
3078        # Check for a warning due to using handle_list and close_fds=False
3079        with support.check_warnings((".*overriding close_fds", RuntimeWarning)):
3080            startupinfo = subprocess.STARTUPINFO()
3081            startupinfo.lpAttributeList = {"handle_list": handles[:]}
3082            p = subprocess.Popen([sys.executable, "-c",
3083                                  "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3084                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3085                                 startupinfo=startupinfo, close_fds=False)
3086            stdout, stderr = p.communicate()
3087            self.assertEqual(p.returncode, 0)
3088
3089    def test_empty_attribute_list(self):
3090        startupinfo = subprocess.STARTUPINFO()
3091        startupinfo.lpAttributeList = {}
3092        subprocess.call(ZERO_RETURN_CMD,
3093                        startupinfo=startupinfo)
3094
3095    def test_empty_handle_list(self):
3096        startupinfo = subprocess.STARTUPINFO()
3097        startupinfo.lpAttributeList = {"handle_list": []}
3098        subprocess.call(ZERO_RETURN_CMD,
3099                        startupinfo=startupinfo)
3100
3101    def test_shell_sequence(self):
3102        # Run command through the shell (sequence)
3103        newenv = os.environ.copy()
3104        newenv["FRUIT"] = "physalis"
3105        p = subprocess.Popen(["set"], shell=1,
3106                             stdout=subprocess.PIPE,
3107                             env=newenv)
3108        with p:
3109            self.assertIn(b"physalis", p.stdout.read())
3110
3111    def test_shell_string(self):
3112        # Run command through the shell (string)
3113        newenv = os.environ.copy()
3114        newenv["FRUIT"] = "physalis"
3115        p = subprocess.Popen("set", shell=1,
3116                             stdout=subprocess.PIPE,
3117                             env=newenv)
3118        with p:
3119            self.assertIn(b"physalis", p.stdout.read())
3120
3121    def test_shell_encodings(self):
3122        # Run command through the shell (string)
3123        for enc in ['ansi', 'oem']:
3124            newenv = os.environ.copy()
3125            newenv["FRUIT"] = "physalis"
3126            p = subprocess.Popen("set", shell=1,
3127                                 stdout=subprocess.PIPE,
3128                                 env=newenv,
3129                                 encoding=enc)
3130            with p:
3131                self.assertIn("physalis", p.stdout.read(), enc)
3132
3133    def test_call_string(self):
3134        # call() function with string argument on Windows
3135        rc = subprocess.call(sys.executable +
3136                             ' -c "import sys; sys.exit(47)"')
3137        self.assertEqual(rc, 47)
3138
3139    def _kill_process(self, method, *args):
3140        # Some win32 buildbot raises EOFError if stdin is inherited
3141        p = subprocess.Popen([sys.executable, "-c", """if 1:
3142                             import sys, time
3143                             sys.stdout.write('x\\n')
3144                             sys.stdout.flush()
3145                             time.sleep(30)
3146                             """],
3147                             stdin=subprocess.PIPE,
3148                             stdout=subprocess.PIPE,
3149                             stderr=subprocess.PIPE)
3150        with p:
3151            # Wait for the interpreter to be completely initialized before
3152            # sending any signal.
3153            p.stdout.read(1)
3154            getattr(p, method)(*args)
3155            _, stderr = p.communicate()
3156            self.assertStderrEqual(stderr, b'')
3157            returncode = p.wait()
3158        self.assertNotEqual(returncode, 0)
3159
3160    def _kill_dead_process(self, method, *args):
3161        p = subprocess.Popen([sys.executable, "-c", """if 1:
3162                             import sys, time
3163                             sys.stdout.write('x\\n')
3164                             sys.stdout.flush()
3165                             sys.exit(42)
3166                             """],
3167                             stdin=subprocess.PIPE,
3168                             stdout=subprocess.PIPE,
3169                             stderr=subprocess.PIPE)
3170        with p:
3171            # Wait for the interpreter to be completely initialized before
3172            # sending any signal.
3173            p.stdout.read(1)
3174            # The process should end after this
3175            time.sleep(1)
3176            # This shouldn't raise even though the child is now dead
3177            getattr(p, method)(*args)
3178            _, stderr = p.communicate()
3179            self.assertStderrEqual(stderr, b'')
3180            rc = p.wait()
3181        self.assertEqual(rc, 42)
3182
3183    def test_send_signal(self):
3184        self._kill_process('send_signal', signal.SIGTERM)
3185
3186    def test_kill(self):
3187        self._kill_process('kill')
3188
3189    def test_terminate(self):
3190        self._kill_process('terminate')
3191
3192    def test_send_signal_dead(self):
3193        self._kill_dead_process('send_signal', signal.SIGTERM)
3194
3195    def test_kill_dead(self):
3196        self._kill_dead_process('kill')
3197
3198    def test_terminate_dead(self):
3199        self._kill_dead_process('terminate')
3200
3201class MiscTests(unittest.TestCase):
3202
3203    class RecordingPopen(subprocess.Popen):
3204        """A Popen that saves a reference to each instance for testing."""
3205        instances_created = []
3206
3207        def __init__(self, *args, **kwargs):
3208            super().__init__(*args, **kwargs)
3209            self.instances_created.append(self)
3210
3211    @mock.patch.object(subprocess.Popen, "_communicate")
3212    def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate,
3213                                        **kwargs):
3214        """Fake a SIGINT happening during Popen._communicate() and ._wait().
3215
3216        This avoids the need to actually try and get test environments to send
3217        and receive signals reliably across platforms.  The net effect of a ^C
3218        happening during a blocking subprocess execution which we want to clean
3219        up from is a KeyboardInterrupt coming out of communicate() or wait().
3220        """
3221
3222        mock__communicate.side_effect = KeyboardInterrupt
3223        try:
3224            with mock.patch.object(subprocess.Popen, "_wait") as mock__wait:
3225                # We patch out _wait() as no signal was involved so the
3226                # child process isn't actually going to exit rapidly.
3227                mock__wait.side_effect = KeyboardInterrupt
3228                with mock.patch.object(subprocess, "Popen",
3229                                       self.RecordingPopen):
3230                    with self.assertRaises(KeyboardInterrupt):
3231                        popener([sys.executable, "-c",
3232                                 "import time\ntime.sleep(9)\nimport sys\n"
3233                                 "sys.stderr.write('\\n!runaway child!\\n')"],
3234                                stdout=subprocess.DEVNULL, **kwargs)
3235                for call in mock__wait.call_args_list[1:]:
3236                    self.assertNotEqual(
3237                            call, mock.call(timeout=None),
3238                            "no open-ended wait() after the first allowed: "
3239                            f"{mock__wait.call_args_list}")
3240                sigint_calls = []
3241                for call in mock__wait.call_args_list:
3242                    if call == mock.call(timeout=0.25):  # from Popen.__init__
3243                        sigint_calls.append(call)
3244                self.assertLessEqual(mock__wait.call_count, 2,
3245                                     msg=mock__wait.call_args_list)
3246                self.assertEqual(len(sigint_calls), 1,
3247                                 msg=mock__wait.call_args_list)
3248        finally:
3249            # cleanup the forgotten (due to our mocks) child process
3250            process = self.RecordingPopen.instances_created.pop()
3251            process.kill()
3252            process.wait()
3253            self.assertEqual([], self.RecordingPopen.instances_created)
3254
3255    def test_call_keyboardinterrupt_no_kill(self):
3256        self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282)
3257
3258    def test_run_keyboardinterrupt_no_kill(self):
3259        self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282)
3260
3261    def test_context_manager_keyboardinterrupt_no_kill(self):
3262        def popen_via_context_manager(*args, **kwargs):
3263            with subprocess.Popen(*args, **kwargs) as unused_process:
3264                raise KeyboardInterrupt  # Test how __exit__ handles ^C.
3265        self._test_keyboardinterrupt_no_kill(popen_via_context_manager)
3266
3267    def test_getoutput(self):
3268        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
3269        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
3270                         (0, 'xyzzy'))
3271
3272        # we use mkdtemp in the next line to create an empty directory
3273        # under our exclusive control; from that, we can invent a pathname
3274        # that we _know_ won't exist.  This is guaranteed to fail.
3275        dir = None
3276        try:
3277            dir = tempfile.mkdtemp()
3278            name = os.path.join(dir, "foo")
3279            status, output = subprocess.getstatusoutput(
3280                ("type " if mswindows else "cat ") + name)
3281            self.assertNotEqual(status, 0)
3282        finally:
3283            if dir is not None:
3284                os.rmdir(dir)
3285
3286    def test__all__(self):
3287        """Ensure that __all__ is populated properly."""
3288        intentionally_excluded = {"list2cmdline", "Handle"}
3289        exported = set(subprocess.__all__)
3290        possible_exports = set()
3291        import types
3292        for name, value in subprocess.__dict__.items():
3293            if name.startswith('_'):
3294                continue
3295            if isinstance(value, (types.ModuleType,)):
3296                continue
3297            possible_exports.add(name)
3298        self.assertEqual(exported, possible_exports - intentionally_excluded)
3299
3300
3301@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
3302                     "Test needs selectors.PollSelector")
3303class ProcessTestCaseNoPoll(ProcessTestCase):
3304    def setUp(self):
3305        self.orig_selector = subprocess._PopenSelector
3306        subprocess._PopenSelector = selectors.SelectSelector
3307        ProcessTestCase.setUp(self)
3308
3309    def tearDown(self):
3310        subprocess._PopenSelector = self.orig_selector
3311        ProcessTestCase.tearDown(self)
3312
3313
3314@unittest.skipUnless(mswindows, "Windows-specific tests")
3315class CommandsWithSpaces (BaseTestCase):
3316
3317    def setUp(self):
3318        super().setUp()
3319        f, fname = tempfile.mkstemp(".py", "te st")
3320        self.fname = fname.lower ()
3321        os.write(f, b"import sys;"
3322                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
3323        )
3324        os.close(f)
3325
3326    def tearDown(self):
3327        os.remove(self.fname)
3328        super().tearDown()
3329
3330    def with_spaces(self, *args, **kwargs):
3331        kwargs['stdout'] = subprocess.PIPE
3332        p = subprocess.Popen(*args, **kwargs)
3333        with p:
3334            self.assertEqual(
3335              p.stdout.read ().decode("mbcs"),
3336              "2 [%r, 'ab cd']" % self.fname
3337            )
3338
3339    def test_shell_string_with_spaces(self):
3340        # call() function with string argument with spaces on Windows
3341        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3342                                             "ab cd"), shell=1)
3343
3344    def test_shell_sequence_with_spaces(self):
3345        # call() function with sequence argument with spaces on Windows
3346        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
3347
3348    def test_noshell_string_with_spaces(self):
3349        # call() function with string argument with spaces on Windows
3350        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3351                             "ab cd"))
3352
3353    def test_noshell_sequence_with_spaces(self):
3354        # call() function with sequence argument with spaces on Windows
3355        self.with_spaces([sys.executable, self.fname, "ab cd"])
3356
3357
3358class ContextManagerTests(BaseTestCase):
3359
3360    def test_pipe(self):
3361        with subprocess.Popen([sys.executable, "-c",
3362                               "import sys;"
3363                               "sys.stdout.write('stdout');"
3364                               "sys.stderr.write('stderr');"],
3365                              stdout=subprocess.PIPE,
3366                              stderr=subprocess.PIPE) as proc:
3367            self.assertEqual(proc.stdout.read(), b"stdout")
3368            self.assertStderrEqual(proc.stderr.read(), b"stderr")
3369
3370        self.assertTrue(proc.stdout.closed)
3371        self.assertTrue(proc.stderr.closed)
3372
3373    def test_returncode(self):
3374        with subprocess.Popen([sys.executable, "-c",
3375                               "import sys; sys.exit(100)"]) as proc:
3376            pass
3377        # __exit__ calls wait(), so the returncode should be set
3378        self.assertEqual(proc.returncode, 100)
3379
3380    def test_communicate_stdin(self):
3381        with subprocess.Popen([sys.executable, "-c",
3382                              "import sys;"
3383                              "sys.exit(sys.stdin.read() == 'context')"],
3384                             stdin=subprocess.PIPE) as proc:
3385            proc.communicate(b"context")
3386            self.assertEqual(proc.returncode, 1)
3387
3388    def test_invalid_args(self):
3389        with self.assertRaises(NONEXISTING_ERRORS):
3390            with subprocess.Popen(NONEXISTING_CMD,
3391                                  stdout=subprocess.PIPE,
3392                                  stderr=subprocess.PIPE) as proc:
3393                pass
3394
3395    def test_broken_pipe_cleanup(self):
3396        """Broken pipe error should not prevent wait() (Issue 21619)"""
3397        proc = subprocess.Popen(ZERO_RETURN_CMD,
3398                                stdin=subprocess.PIPE,
3399                                bufsize=support.PIPE_MAX_SIZE*2)
3400        proc = proc.__enter__()
3401        # Prepare to send enough data to overflow any OS pipe buffering and
3402        # guarantee a broken pipe error. Data is held in BufferedWriter
3403        # buffer until closed.
3404        proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
3405        self.assertIsNone(proc.returncode)
3406        # EPIPE expected under POSIX; EINVAL under Windows
3407        self.assertRaises(OSError, proc.__exit__, None, None, None)
3408        self.assertEqual(proc.returncode, 0)
3409        self.assertTrue(proc.stdin.closed)
3410
3411
3412if __name__ == "__main__":
3413    unittest.main()
3414