1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for implementations of L{IReactorProcess}.
6
7@var properEnv: A copy of L{os.environ} which has L{bytes} keys/values on POSIX
8    platforms and native L{str} keys/values on Windows.
9"""
10
11
12import io
13import os
14import signal
15import subprocess
16import sys
17import threading
18from unittest import skipIf
19
20import hamcrest
21
22import twisted
23from twisted.internet import utils
24from twisted.internet.defer import Deferred, inlineCallbacks, succeed
25from twisted.internet.error import ProcessDone, ProcessTerminated
26from twisted.internet.interfaces import IProcessTransport, IReactorProcess
27from twisted.internet.protocol import ProcessProtocol
28from twisted.internet.test.reactormixins import ReactorBuilder
29from twisted.python.compat import networkString
30from twisted.python.filepath import FilePath, _asFilesystemBytes
31from twisted.python.log import err, msg
32from twisted.python.runtime import platform
33from twisted.trial.unittest import TestCase
34
35# Get the current Python executable as a bytestring.
36pyExe = FilePath(sys.executable)._asBytesPath()
37twistedRoot = FilePath(twisted.__file__).parent().parent()
38
39_uidgidSkip = False
40_uidgidSkipReason = ""
41properEnv = dict(os.environ)
42properEnv["PYTHONPATH"] = os.pathsep.join(sys.path)
43try:
44    import resource as _resource
45
46    from twisted.internet import process as _process
47
48    if os.getuid() != 0:
49        _uidgidSkip = True
50        _uidgidSkipReason = "Cannot change UID/GID except as root"
51except ImportError:
52    resource = None
53    process = None
54    _uidgidSkip = True
55    _uidgidSkipReason = "Cannot change UID/GID on Windows"
56else:
57    resource = _resource
58    process = _process
59
60
61def onlyOnPOSIX(testMethod):
62    """
63    Only run this test on POSIX platforms.
64
65    @param testMethod: A test function, being decorated.
66
67    @return: the C{testMethod} argument.
68    """
69    if resource is None:
70        testMethod.skip = "Test only applies to POSIX platforms."
71    return testMethod
72
73
74class _ShutdownCallbackProcessProtocol(ProcessProtocol):
75    """
76    An L{IProcessProtocol} which fires a Deferred when the process it is
77    associated with ends.
78
79    @ivar received: A C{dict} mapping file descriptors to lists of bytes
80        received from the child process on those file descriptors.
81    """
82
83    def __init__(self, whenFinished):
84        self.whenFinished = whenFinished
85        self.received = {}
86
87    def childDataReceived(self, fd, bytes):
88        self.received.setdefault(fd, []).append(bytes)
89
90    def processEnded(self, reason):
91        self.whenFinished.callback(None)
92
93
94class ProcessTestsBuilderBase(ReactorBuilder):
95    """
96    Base class for L{IReactorProcess} tests which defines some tests which
97    can be applied to PTY or non-PTY uses of C{spawnProcess}.
98
99    Subclasses are expected to set the C{usePTY} attribute to C{True} or
100    C{False}.
101    """
102
103    requiredInterfaces = [IReactorProcess]
104
105    def test_processTransportInterface(self):
106        """
107        L{IReactorProcess.spawnProcess} connects the protocol passed to it
108        to a transport which provides L{IProcessTransport}.
109        """
110        ended = Deferred()
111        protocol = _ShutdownCallbackProcessProtocol(ended)
112
113        reactor = self.buildReactor()
114        transport = reactor.spawnProcess(
115            protocol, pyExe, [pyExe, b"-c", b""], usePTY=self.usePTY
116        )
117
118        # The transport is available synchronously, so we can check it right
119        # away (unlike many transport-based tests).  This is convenient even
120        # though it's probably not how the spawnProcess interface should really
121        # work.
122        # We're not using verifyObject here because part of
123        # IProcessTransport is a lie - there are no getHost or getPeer
124        # methods.  See #1124.
125        self.assertTrue(IProcessTransport.providedBy(transport))
126
127        # Let the process run and exit so we don't leave a zombie around.
128        ended.addCallback(lambda ignored: reactor.stop())
129        self.runReactor(reactor)
130
131    def _writeTest(self, write):
132        """
133        Helper for testing L{IProcessTransport} write functionality.  This
134        method spawns a child process and gives C{write} a chance to write some
135        bytes to it.  It then verifies that the bytes were actually written to
136        it (by relying on the child process to echo them back).
137
138        @param write: A two-argument callable.  This is invoked with a process
139            transport and some bytes to write to it.
140        """
141        reactor = self.buildReactor()
142
143        ended = Deferred()
144        protocol = _ShutdownCallbackProcessProtocol(ended)
145
146        bytesToSend = b"hello, world" + networkString(os.linesep)
147        program = b"import sys\n" b"sys.stdout.write(sys.stdin.readline())\n"
148
149        def startup():
150            transport = reactor.spawnProcess(protocol, pyExe, [pyExe, b"-c", program])
151            try:
152                write(transport, bytesToSend)
153            except BaseException:
154                err(None, "Unhandled exception while writing")
155                transport.signalProcess("KILL")
156
157        reactor.callWhenRunning(startup)
158
159        ended.addCallback(lambda ignored: reactor.stop())
160
161        self.runReactor(reactor)
162        self.assertEqual(bytesToSend, b"".join(protocol.received[1]))
163
164    def test_write(self):
165        """
166        L{IProcessTransport.write} writes the specified C{bytes} to the standard
167        input of the child process.
168        """
169
170        def write(transport, bytesToSend):
171            transport.write(bytesToSend)
172
173        self._writeTest(write)
174
175    def test_writeSequence(self):
176        """
177        L{IProcessTransport.writeSequence} writes the specified C{list} of
178        C{bytes} to the standard input of the child process.
179        """
180
181        def write(transport, bytesToSend):
182            transport.writeSequence([bytesToSend])
183
184        self._writeTest(write)
185
186    def test_writeToChild(self):
187        """
188        L{IProcessTransport.writeToChild} writes the specified C{bytes} to the
189        specified file descriptor of the child process.
190        """
191
192        def write(transport, bytesToSend):
193            transport.writeToChild(0, bytesToSend)
194
195        self._writeTest(write)
196
197    def test_writeToChildBadFileDescriptor(self):
198        """
199        L{IProcessTransport.writeToChild} raises L{KeyError} if passed a file
200        descriptor which is was not set up by L{IReactorProcess.spawnProcess}.
201        """
202
203        def write(transport, bytesToSend):
204            try:
205                self.assertRaises(KeyError, transport.writeToChild, 13, bytesToSend)
206            finally:
207                # Just get the process to exit so the test can complete
208                transport.write(bytesToSend)
209
210        self._writeTest(write)
211
212    @skipIf(
213        getattr(signal, "SIGCHLD", None) is None,
214        "Platform lacks SIGCHLD, early-spawnProcess test can't work.",
215    )
216    def test_spawnProcessEarlyIsReaped(self):
217        """
218        If, before the reactor is started with L{IReactorCore.run}, a
219        process is started with L{IReactorProcess.spawnProcess} and
220        terminates, the process is reaped once the reactor is started.
221        """
222        reactor = self.buildReactor()
223
224        # Create the process with no shared file descriptors, so that there
225        # are no other events for the reactor to notice and "cheat" with.
226        # We want to be sure it's really dealing with the process exiting,
227        # not some associated event.
228        if self.usePTY:
229            childFDs = None
230        else:
231            childFDs = {}
232
233        # Arrange to notice the SIGCHLD.
234        signaled = threading.Event()
235
236        def handler(*args):
237            signaled.set()
238
239        signal.signal(signal.SIGCHLD, handler)
240
241        # Start a process - before starting the reactor!
242        ended = Deferred()
243        reactor.spawnProcess(
244            _ShutdownCallbackProcessProtocol(ended),
245            pyExe,
246            [pyExe, b"-c", b""],
247            usePTY=self.usePTY,
248            childFDs=childFDs,
249        )
250
251        # Wait for the SIGCHLD (which might have been delivered before we got
252        # here, but that's okay because the signal handler was installed above,
253        # before we could have gotten it).
254        signaled.wait(120)
255        if not signaled.isSet():
256            self.fail("Timed out waiting for child process to exit.")
257
258        # Capture the processEnded callback.
259        result = []
260        ended.addCallback(result.append)
261
262        if result:
263            # The synchronous path through spawnProcess / Process.__init__ /
264            # registerReapProcessHandler was encountered.  There's no reason to
265            # start the reactor, because everything is done already.
266            return
267
268        # Otherwise, though, start the reactor so it can tell us the process
269        # exited.
270        ended.addCallback(lambda ignored: reactor.stop())
271        self.runReactor(reactor)
272
273        # Make sure the reactor stopped because the Deferred fired.
274        self.assertTrue(result)
275
276    def test_processExitedWithSignal(self):
277        """
278        The C{reason} argument passed to L{IProcessProtocol.processExited} is a
279        L{ProcessTerminated} instance if the child process exits with a signal.
280        """
281        sigName = "TERM"
282        sigNum = getattr(signal, "SIG" + sigName)
283        exited = Deferred()
284        source = (
285            b"import sys\n"
286            # Talk so the parent process knows the process is running.  This is
287            # necessary because ProcessProtocol.makeConnection may be called
288            # before this process is exec'd.  It would be unfortunate if we
289            # SIGTERM'd the Twisted process while it was on its way to doing
290            # the exec.
291            b"sys.stdout.write('x')\n"
292            b"sys.stdout.flush()\n"
293            b"sys.stdin.read()\n"
294        )
295
296        class Exiter(ProcessProtocol):
297            def childDataReceived(self, fd, data):
298                msg("childDataReceived(%d, %r)" % (fd, data))
299                self.transport.signalProcess(sigName)
300
301            def childConnectionLost(self, fd):
302                msg("childConnectionLost(%d)" % (fd,))
303
304            def processExited(self, reason):
305                msg(f"processExited({reason!r})")
306                # Protect the Deferred from the failure so that it follows
307                # the callback chain.  This doesn't use the errback chain
308                # because it wants to make sure reason is a Failure.  An
309                # Exception would also make an errback-based test pass, and
310                # that would be wrong.
311                exited.callback([reason])
312
313            def processEnded(self, reason):
314                msg(f"processEnded({reason!r})")
315
316        reactor = self.buildReactor()
317        reactor.callWhenRunning(
318            reactor.spawnProcess,
319            Exiter(),
320            pyExe,
321            [pyExe, b"-c", source],
322            usePTY=self.usePTY,
323        )
324
325        def cbExited(args):
326            (failure,) = args
327            # Trapping implicitly verifies that it's a Failure (rather than
328            # an exception) and explicitly makes sure it's the right type.
329            failure.trap(ProcessTerminated)
330            err = failure.value
331            if platform.isWindows():
332                # Windows can't really /have/ signals, so it certainly can't
333                # report them as the reason for termination.  Maybe there's
334                # something better we could be doing here, anyway?  Hard to
335                # say.  Anyway, this inconsistency between different platforms
336                # is extremely unfortunate and I would remove it if I
337                # could. -exarkun
338                self.assertIsNone(err.signal)
339                self.assertEqual(err.exitCode, 1)
340            else:
341                self.assertEqual(err.signal, sigNum)
342                self.assertIsNone(err.exitCode)
343
344        exited.addCallback(cbExited)
345        exited.addErrback(err)
346        exited.addCallback(lambda ign: reactor.stop())
347
348        self.runReactor(reactor)
349
350    def test_systemCallUninterruptedByChildExit(self):
351        """
352        If a child process exits while a system call is in progress, the system
353        call should not be interfered with.  In particular, it should not fail
354        with EINTR.
355
356        Older versions of Twisted installed a SIGCHLD handler on POSIX without
357        using the feature exposed by the SA_RESTART flag to sigaction(2).  The
358        most noticeable problem this caused was for blocking reads and writes to
359        sometimes fail with EINTR.
360        """
361        reactor = self.buildReactor()
362        result = []
363
364        def f():
365            try:
366                exe = pyExe.decode(sys.getfilesystemencoding())
367
368                subprocess.Popen([exe, "-c", "import time; time.sleep(0.1)"])
369                f2 = subprocess.Popen(
370                    [exe, "-c", ("import time; time.sleep(0.5);" "print('Foo')")],
371                    stdout=subprocess.PIPE,
372                )
373                # The read call below will blow up with an EINTR from the
374                # SIGCHLD from the first process exiting if we install a
375                # SIGCHLD handler without SA_RESTART.  (which we used to do)
376                with f2.stdout:
377                    result.append(f2.stdout.read())
378            finally:
379                reactor.stop()
380
381        reactor.callWhenRunning(f)
382        self.runReactor(reactor)
383        self.assertEqual(result, [b"Foo" + os.linesep.encode("ascii")])
384
385    @onlyOnPOSIX
386    def test_openFileDescriptors(self):
387        """
388        Processes spawned with spawnProcess() close all extraneous file
389        descriptors in the parent.  They do have a stdin, stdout, and stderr
390        open.
391        """
392
393        # To test this, we are going to open a file descriptor in the parent
394        # that is unlikely to be opened in the child, then verify that it's not
395        # open in the child.
396        source = networkString(
397            """
398import sys
399sys.path.insert(0, '{}')
400from twisted.internet import process
401sys.stdout.write(repr(process._listOpenFDs()))
402sys.stdout.flush()""".format(
403                twistedRoot.path
404            )
405        )
406
407        r, w = os.pipe()
408        self.addCleanup(os.close, r)
409        self.addCleanup(os.close, w)
410
411        # The call to "os.listdir()" (in _listOpenFDs's implementation) opens a
412        # file descriptor (with "opendir"), which shows up in _listOpenFDs's
413        # result.  And speaking of "random" file descriptors, the code required
414        # for _listOpenFDs itself imports logger, which imports random, which
415        # (depending on your Python version) might leave /dev/urandom open.
416
417        # More generally though, even if we were to use an extremely minimal C
418        # program, the operating system would be within its rights to open file
419        # descriptors we might not know about in the C library's
420        # initialization; things like debuggers, profilers, or nsswitch plugins
421        # might open some and this test should pass in those environments.
422
423        # Although some of these file descriptors aren't predictable, we should
424        # at least be able to select a very large file descriptor which is very
425        # unlikely to be opened automatically in the subprocess.  (Apply a
426        # fudge factor to avoid hard-coding something too near a limit
427        # condition like the maximum possible file descriptor, which a library
428        # might at least hypothetically select.)
429
430        fudgeFactor = 17
431        unlikelyFD = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - fudgeFactor
432
433        os.dup2(w, unlikelyFD)
434        self.addCleanup(os.close, unlikelyFD)
435
436        output = io.BytesIO()
437
438        class GatheringProtocol(ProcessProtocol):
439            outReceived = output.write
440
441            def processEnded(self, reason):
442                reactor.stop()
443
444        reactor = self.buildReactor()
445
446        reactor.callWhenRunning(
447            reactor.spawnProcess,
448            GatheringProtocol(),
449            pyExe,
450            [pyExe, b"-Wignore", b"-c", source],
451            usePTY=self.usePTY,
452        )
453
454        self.runReactor(reactor)
455        reportedChildFDs = set(eval(output.getvalue()))
456
457        stdFDs = [0, 1, 2]
458
459        # Unfortunately this assertion is still not *entirely* deterministic,
460        # since hypothetically, any library could open any file descriptor at
461        # any time.  See comment above.
462        self.assertEqual(
463            reportedChildFDs.intersection(set(stdFDs + [unlikelyFD])), set(stdFDs)
464        )
465
466    @onlyOnPOSIX
467    def test_errorDuringExec(self):
468        """
469        When L{os.execvpe} raises an exception, it will format that exception
470        on stderr as UTF-8, regardless of system encoding information.
471        """
472
473        def execvpe(*args, **kw):
474            # Ensure that real traceback formatting has some non-ASCII in it,
475            # by forcing the filename of the last frame to contain non-ASCII.
476            filename = "<\N{SNOWMAN}>"
477            if not isinstance(filename, str):
478                filename = filename.encode("utf-8")
479            codeobj = compile("1/0", filename, "single")
480            eval(codeobj)
481
482        self.patch(os, "execvpe", execvpe)
483        self.patch(sys, "getfilesystemencoding", lambda: "ascii")
484
485        reactor = self.buildReactor()
486        output = io.BytesIO()
487
488        @reactor.callWhenRunning
489        def whenRunning():
490            class TracebackCatcher(ProcessProtocol):
491                errReceived = output.write
492
493                def processEnded(self, reason):
494                    reactor.stop()
495
496            reactor.spawnProcess(TracebackCatcher(), pyExe, [pyExe, b"-c", b""])
497
498        self.runReactor(reactor, timeout=30)
499        self.assertIn("\N{SNOWMAN}".encode(), output.getvalue())
500
501    def test_timelyProcessExited(self):
502        """
503        If a spawned process exits, C{processExited} will be called in a
504        timely manner.
505        """
506        reactor = self.buildReactor()
507
508        class ExitingProtocol(ProcessProtocol):
509            exited = False
510
511            def processExited(protoSelf, reason):
512                protoSelf.exited = True
513                reactor.stop()
514                self.assertEqual(reason.value.exitCode, 0)
515
516        protocol = ExitingProtocol()
517        reactor.callWhenRunning(
518            reactor.spawnProcess,
519            protocol,
520            pyExe,
521            [pyExe, b"-c", b"raise SystemExit(0)"],
522            usePTY=self.usePTY,
523        )
524
525        # This will timeout if processExited isn't called:
526        self.runReactor(reactor, timeout=30)
527        self.assertTrue(protocol.exited)
528
529    def _changeIDTest(self, which):
530        """
531        Launch a child process, using either the C{uid} or C{gid} argument to
532        L{IReactorProcess.spawnProcess} to change either its UID or GID to a
533        different value.  If the child process reports this hasn't happened,
534        raise an exception to fail the test.
535
536        @param which: Either C{b"uid"} or C{b"gid"}.
537        """
538        program = ["import os", f"raise SystemExit(os.get{which}() != 1)"]
539
540        container = []
541
542        class CaptureExitStatus(ProcessProtocol):
543            def processEnded(self, reason):
544                container.append(reason)
545                reactor.stop()
546
547        reactor = self.buildReactor()
548        protocol = CaptureExitStatus()
549        reactor.callWhenRunning(
550            reactor.spawnProcess,
551            protocol,
552            pyExe,
553            [pyExe, "-c", "\n".join(program)],
554            **{which: 1},
555        )
556
557        self.runReactor(reactor)
558
559        self.assertEqual(0, container[0].value.exitCode)
560
561    @skipIf(_uidgidSkip, _uidgidSkipReason)
562    def test_changeUID(self):
563        """
564        If a value is passed for L{IReactorProcess.spawnProcess}'s C{uid}, the
565        child process is run with that UID.
566        """
567        self._changeIDTest("uid")
568
569    @skipIf(_uidgidSkip, _uidgidSkipReason)
570    def test_changeGID(self):
571        """
572        If a value is passed for L{IReactorProcess.spawnProcess}'s C{gid}, the
573        child process is run with that GID.
574        """
575        self._changeIDTest("gid")
576
577    def test_processExitedRaises(self):
578        """
579        If L{IProcessProtocol.processExited} raises an exception, it is logged.
580        """
581        # Ideally we wouldn't need to poke the process module; see
582        # https://twistedmatrix.com/trac/ticket/6889
583        reactor = self.buildReactor()
584
585        class TestException(Exception):
586            pass
587
588        class Protocol(ProcessProtocol):
589            def processExited(self, reason):
590                reactor.stop()
591                raise TestException("processedExited raised")
592
593        protocol = Protocol()
594        transport = reactor.spawnProcess(
595            protocol, pyExe, [pyExe, b"-c", b""], usePTY=self.usePTY
596        )
597        self.runReactor(reactor)
598
599        # Manually clean-up broken process handler.
600        # Only required if the test fails on systems that support
601        # the process module.
602        if process is not None:
603            for pid, handler in list(process.reapProcessHandlers.items()):
604                if handler is not transport:
605                    continue
606                process.unregisterReapProcessHandler(pid, handler)
607                self.fail(
608                    "After processExited raised, transport was left in"
609                    " reapProcessHandlers"
610                )
611
612        self.assertEqual(1, len(self.flushLoggedErrors(TestException)))
613
614
615class ProcessTestsBuilder(ProcessTestsBuilderBase):
616    """
617    Builder defining tests relating to L{IReactorProcess} for child processes
618    which do not have a PTY.
619    """
620
621    usePTY = False
622
623    keepStdioOpenProgram = b"twisted.internet.test.process_helper"
624    if platform.isWindows():
625        keepStdioOpenArg = b"windows"
626    else:
627        # Just a value that doesn't equal "windows"
628        keepStdioOpenArg = b""
629
630    # Define this test here because PTY-using processes only have stdin and
631    # stdout and the test would need to be different for that to work.
632    def test_childConnectionLost(self):
633        """
634        L{IProcessProtocol.childConnectionLost} is called each time a file
635        descriptor associated with a child process is closed.
636        """
637        connected = Deferred()
638        lost = {0: Deferred(), 1: Deferred(), 2: Deferred()}
639
640        class Closer(ProcessProtocol):
641            def makeConnection(self, transport):
642                connected.callback(transport)
643
644            def childConnectionLost(self, childFD):
645                lost[childFD].callback(None)
646
647        target = b"twisted.internet.test.process_loseconnection"
648
649        reactor = self.buildReactor()
650        reactor.callWhenRunning(
651            reactor.spawnProcess,
652            Closer(),
653            pyExe,
654            [pyExe, b"-m", target],
655            env=properEnv,
656            usePTY=self.usePTY,
657        )
658
659        def cbConnected(transport):
660            transport.write(b"2\n")
661            return lost[2].addCallback(lambda ign: transport)
662
663        connected.addCallback(cbConnected)
664
665        def lostSecond(transport):
666            transport.write(b"1\n")
667            return lost[1].addCallback(lambda ign: transport)
668
669        connected.addCallback(lostSecond)
670
671        def lostFirst(transport):
672            transport.write(b"\n")
673
674        connected.addCallback(lostFirst)
675        connected.addErrback(err)
676
677        def cbEnded(ignored):
678            reactor.stop()
679
680        connected.addCallback(cbEnded)
681
682        self.runReactor(reactor)
683
684    # This test is here because PTYProcess never delivers childConnectionLost.
685    def test_processEnded(self):
686        """
687        L{IProcessProtocol.processEnded} is called after the child process
688        exits and L{IProcessProtocol.childConnectionLost} is called for each of
689        its file descriptors.
690        """
691        ended = Deferred()
692        lost = []
693
694        class Ender(ProcessProtocol):
695            def childDataReceived(self, fd, data):
696                msg("childDataReceived(%d, %r)" % (fd, data))
697                self.transport.loseConnection()
698
699            def childConnectionLost(self, childFD):
700                msg("childConnectionLost(%d)" % (childFD,))
701                lost.append(childFD)
702
703            def processExited(self, reason):
704                msg(f"processExited({reason!r})")
705
706            def processEnded(self, reason):
707                msg(f"processEnded({reason!r})")
708                ended.callback([reason])
709
710        reactor = self.buildReactor()
711        reactor.callWhenRunning(
712            reactor.spawnProcess,
713            Ender(),
714            pyExe,
715            [pyExe, b"-m", self.keepStdioOpenProgram, b"child", self.keepStdioOpenArg],
716            env=properEnv,
717            usePTY=self.usePTY,
718        )
719
720        def cbEnded(args):
721            (failure,) = args
722            failure.trap(ProcessDone)
723            self.assertEqual(set(lost), {0, 1, 2})
724
725        ended.addCallback(cbEnded)
726
727        ended.addErrback(err)
728        ended.addCallback(lambda ign: reactor.stop())
729
730        self.runReactor(reactor)
731
732    # This test is here because PTYProcess.loseConnection does not actually
733    # close the file descriptors to the child process.  This test needs to be
734    # written fairly differently for PTYProcess.
735    def test_processExited(self):
736        """
737        L{IProcessProtocol.processExited} is called when the child process
738        exits, even if file descriptors associated with the child are still
739        open.
740        """
741        exited = Deferred()
742        allLost = Deferred()
743        lost = []
744
745        class Waiter(ProcessProtocol):
746            def childDataReceived(self, fd, data):
747                msg("childDataReceived(%d, %r)" % (fd, data))
748
749            def childConnectionLost(self, childFD):
750                msg("childConnectionLost(%d)" % (childFD,))
751                lost.append(childFD)
752                if len(lost) == 3:
753                    allLost.callback(None)
754
755            def processExited(self, reason):
756                msg(f"processExited({reason!r})")
757                # See test_processExitedWithSignal
758                exited.callback([reason])
759                self.transport.loseConnection()
760
761        reactor = self.buildReactor()
762        reactor.callWhenRunning(
763            reactor.spawnProcess,
764            Waiter(),
765            pyExe,
766            [
767                pyExe,
768                b"-u",
769                b"-m",
770                self.keepStdioOpenProgram,
771                b"child",
772                self.keepStdioOpenArg,
773            ],
774            env=properEnv,
775            usePTY=self.usePTY,
776        )
777
778        def cbExited(args):
779            (failure,) = args
780            failure.trap(ProcessDone)
781            msg(f"cbExited; lost = {lost}")
782            self.assertEqual(lost, [])
783            return allLost
784
785        exited.addCallback(cbExited)
786
787        def cbAllLost(ignored):
788            self.assertEqual(set(lost), {0, 1, 2})
789
790        exited.addCallback(cbAllLost)
791
792        exited.addErrback(err)
793        exited.addCallback(lambda ign: reactor.stop())
794
795        self.runReactor(reactor)
796
797    def makeSourceFile(self, sourceLines):
798        """
799        Write the given list of lines to a text file and return the absolute
800        path to it.
801        """
802        script = _asFilesystemBytes(self.mktemp())
803        with open(script, "wt") as scriptFile:
804            scriptFile.write(os.linesep.join(sourceLines) + os.linesep)
805        return os.path.abspath(script)
806
807    def test_shebang(self):
808        """
809        Spawning a process with an executable which is a script starting
810        with an interpreter definition line (#!) uses that interpreter to
811        evaluate the script.
812        """
813        shebangOutput = b"this is the shebang output"
814
815        scriptFile = self.makeSourceFile(
816            [
817                "#!{}".format(pyExe.decode("ascii")),
818                "import sys",
819                "sys.stdout.write('{}')".format(shebangOutput.decode("ascii")),
820                "sys.stdout.flush()",
821            ]
822        )
823        os.chmod(scriptFile, 0o700)
824
825        reactor = self.buildReactor()
826
827        def cbProcessExited(args):
828            out, err, code = args
829            msg("cbProcessExited((%r, %r, %d))" % (out, err, code))
830            self.assertEqual(out, shebangOutput)
831            self.assertEqual(err, b"")
832            self.assertEqual(code, 0)
833
834        def shutdown(passthrough):
835            reactor.stop()
836            return passthrough
837
838        def start():
839            d = utils.getProcessOutputAndValue(scriptFile, reactor=reactor)
840            d.addBoth(shutdown)
841            d.addCallback(cbProcessExited)
842            d.addErrback(err)
843
844        reactor.callWhenRunning(start)
845        self.runReactor(reactor)
846
847    def test_pauseAndResumeProducing(self):
848        """
849        Pause producing and then resume producing.
850        """
851
852        def pauseAndResume(reactor):
853            try:
854                protocol = ProcessProtocol()
855                transport = reactor.spawnProcess(
856                    protocol, pyExe, [pyExe, b"-c", b""], usePTY=self.usePTY
857                )
858                transport.pauseProducing()
859                transport.resumeProducing()
860            finally:
861                reactor.stop()
862
863        reactor = self.buildReactor()
864        reactor.callWhenRunning(pauseAndResume, reactor)
865        self.runReactor(reactor)
866
867    def test_processCommandLineArguments(self):
868        """
869        Arguments given to spawnProcess are passed to the child process as
870        originally intended.
871        """
872        us = b"twisted.internet.test.process_cli"
873
874        args = [b"hello", b'"', b" \t|<>^&", br'"\\"hello\\"', br'"foo\ bar baz\""']
875        # Ensure that all non-NUL characters can be passed too.
876        allChars = "".join(map(chr, range(1, 255)))
877        if isinstance(allChars, str):
878            allChars.encode("utf-8")
879
880        reactor = self.buildReactor()
881
882        def processFinished(finishedArgs):
883            output, err, code = finishedArgs
884            output = output.split(b"\0")
885            # Drop the trailing \0.
886            output.pop()
887            self.assertEqual(args, output)
888
889        def shutdown(result):
890            reactor.stop()
891            return result
892
893        def spawnChild():
894            d = succeed(None)
895            d.addCallback(
896                lambda dummy: utils.getProcessOutputAndValue(
897                    pyExe, [b"-m", us] + args, env=properEnv, reactor=reactor
898                )
899            )
900            d.addCallback(processFinished)
901            d.addBoth(shutdown)
902
903        reactor.callWhenRunning(spawnChild)
904        self.runReactor(reactor)
905
906    @onlyOnPOSIX
907    def test_process_unregistered_before_protocol_ended_callback(self):
908        """
909        Process is removed from reapProcessHandler dict before running
910        ProcessProtocol.processEnded() callback.
911        """
912        results = []
913
914        class TestProcessProtocol(ProcessProtocol):
915            """
916            Process protocol captures own presence in
917            process.reapProcessHandlers at time of .processEnded() callback.
918
919            @ivar deferred: A deferred fired when the .processEnded() callback
920                has completed.
921            @type deferred: L{Deferred<defer.Deferred>}
922            """
923
924            def __init__(self):
925                self.deferred = Deferred()
926
927            def processEnded(self, status):
928                """
929                Capture whether the process has already been removed
930                from process.reapProcessHandlers.
931
932                @param status: unused
933                """
934                from twisted.internet import process
935
936                handlers = process.reapProcessHandlers
937                processes = handlers.values()
938
939                if self.transport in processes:
940                    results.append("process present but should not be")
941                else:
942                    results.append("process already removed as desired")
943
944                self.deferred.callback(None)
945
946        @inlineCallbacks
947        def launchProcessAndWait(reactor):
948            """
949            Launch and wait for a subprocess and allow the TestProcessProtocol
950            to capture the order of the .processEnded() callback vs. removal
951            from process.reapProcessHandlers.
952
953            @param reactor: Reactor used to spawn the test process and to be
954                stopped when checks are complete.
955            @type reactor: object providing
956                L{twisted.internet.interfaces.IReactorProcess} and
957                L{twisted.internet.interfaces.IReactorCore}.
958            """
959            try:
960                testProcessProtocol = TestProcessProtocol()
961                reactor.spawnProcess(
962                    testProcessProtocol,
963                    pyExe,
964                    [pyExe, "--version"],
965                )
966                yield testProcessProtocol.deferred
967            except Exception as e:
968                results.append(e)
969            finally:
970                reactor.stop()
971
972        reactor = self.buildReactor()
973        reactor.callWhenRunning(launchProcessAndWait, reactor)
974        self.runReactor(reactor)
975
976        hamcrest.assert_that(
977            results,
978            hamcrest.equal_to(["process already removed as desired"]),
979        )
980
981
982globals().update(ProcessTestsBuilder.makeTestCaseClasses())
983
984
985class PTYProcessTestsBuilder(ProcessTestsBuilderBase):
986    """
987    Builder defining tests relating to L{IReactorProcess} for child processes
988    which have a PTY.
989    """
990
991    usePTY = True
992
993    if platform.isWindows():
994        skip = "PTYs are not supported on Windows."
995    elif platform.isMacOSX():
996        skip = "PTYs are flaky from a Darwin bug. See #8840."
997
998        skippedReactors = {
999            "twisted.internet.pollreactor.PollReactor": "macOS's poll() does not support PTYs"
1000        }
1001
1002
1003globals().update(PTYProcessTestsBuilder.makeTestCaseClasses())
1004
1005
1006class PotentialZombieWarningTests(TestCase):
1007    """
1008    Tests for L{twisted.internet.error.PotentialZombieWarning}.
1009    """
1010
1011    def test_deprecated(self):
1012        """
1013        Accessing L{PotentialZombieWarning} via the
1014        I{PotentialZombieWarning} attribute of L{twisted.internet.error}
1015        results in a deprecation warning being emitted.
1016        """
1017        from twisted.internet import error
1018
1019        error.PotentialZombieWarning
1020
1021        warnings = self.flushWarnings([self.test_deprecated])
1022        self.assertEqual(warnings[0]["category"], DeprecationWarning)
1023        self.assertEqual(
1024            warnings[0]["message"],
1025            "twisted.internet.error.PotentialZombieWarning was deprecated in "
1026            "Twisted 10.0.0: There is no longer any potential for zombie "
1027            "process.",
1028        )
1029        self.assertEqual(len(warnings), 1)
1030
1031
1032class ProcessIsUnimportableOnUnsupportedPlatormsTests(TestCase):
1033    """
1034    Tests to ensure that L{twisted.internet.process} is unimportable on
1035    platforms where it does not work (namely Windows).
1036    """
1037
1038    @skipIf(not platform.isWindows(), "Only relevant on Windows.")
1039    def test_unimportableOnWindows(self):
1040        """
1041        L{twisted.internet.process} is unimportable on Windows.
1042        """
1043        with self.assertRaises(ImportError):
1044            import twisted.internet.process
1045
1046            twisted.internet.process  # shh pyflakes
1047
1048
1049class ReapingNonePidsLogsProperly(TestCase):
1050    try:
1051        # ignore mypy error, since we are testing passing
1052        # the wrong type to waitpid
1053        os.waitpid(None, None)  # type: ignore[arg-type]
1054    except Exception as e:
1055        expected_message = str(e)
1056        expected_type = type(e)
1057
1058    @onlyOnPOSIX
1059    def test_registerReapProcessHandler(self):
1060        process.registerReapProcessHandler(None, None)
1061
1062        [error] = self.flushLoggedErrors()
1063        self.assertEqual(
1064            type(error.value),
1065            self.expected_type,
1066            "Wrong error type logged",
1067        )
1068        self.assertEqual(
1069            str(error.value),
1070            self.expected_message,
1071            "Wrong error message logged",
1072        )
1073
1074    @onlyOnPOSIX
1075    def test__BaseProcess_reapProcess(self):
1076        _baseProcess = process._BaseProcess(None)
1077        _baseProcess.reapProcess()
1078
1079        [error] = self.flushLoggedErrors()
1080        self.assertEqual(
1081            type(error.value),
1082            self.expected_type,
1083            "Wrong error type logged",
1084        )
1085        self.assertEqual(
1086            str(error.value),
1087            self.expected_message,
1088            "Wrong error message logged",
1089        )
1090