1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Test running processes.
6"""
7
8import gzip
9import os
10import sys
11import signal
12import StringIO
13import errno
14import gc
15import stat
16import operator
17try:
18    import fcntl
19except ImportError:
20    fcntl = process = None
21else:
22    from twisted.internet import process
23
24
25from zope.interface.verify import verifyObject
26
27from twisted.python.log import msg
28from twisted.internet import reactor, protocol, error, interfaces, defer
29from twisted.trial import unittest
30from twisted.python import util, runtime, procutils
31
32
33
34class StubProcessProtocol(protocol.ProcessProtocol):
35    """
36    ProcessProtocol counter-implementation: all methods on this class raise an
37    exception, so instances of this may be used to verify that only certain
38    methods are called.
39    """
40    def outReceived(self, data):
41        raise NotImplementedError()
42
43    def errReceived(self, data):
44        raise NotImplementedError()
45
46    def inConnectionLost(self):
47        raise NotImplementedError()
48
49    def outConnectionLost(self):
50        raise NotImplementedError()
51
52    def errConnectionLost(self):
53        raise NotImplementedError()
54
55
56
57class ProcessProtocolTests(unittest.TestCase):
58    """
59    Tests for behavior provided by the process protocol base class,
60    L{protocol.ProcessProtocol}.
61    """
62    def test_interface(self):
63        """
64        L{ProcessProtocol} implements L{IProcessProtocol}.
65        """
66        verifyObject(interfaces.IProcessProtocol, protocol.ProcessProtocol())
67
68
69    def test_outReceived(self):
70        """
71        Verify that when stdout is delivered to
72        L{ProcessProtocol.childDataReceived}, it is forwarded to
73        L{ProcessProtocol.outReceived}.
74        """
75        received = []
76        class OutProtocol(StubProcessProtocol):
77            def outReceived(self, data):
78                received.append(data)
79
80        bytes = "bytes"
81        p = OutProtocol()
82        p.childDataReceived(1, bytes)
83        self.assertEqual(received, [bytes])
84
85
86    def test_errReceived(self):
87        """
88        Similar to L{test_outReceived}, but for stderr.
89        """
90        received = []
91        class ErrProtocol(StubProcessProtocol):
92            def errReceived(self, data):
93                received.append(data)
94
95        bytes = "bytes"
96        p = ErrProtocol()
97        p.childDataReceived(2, bytes)
98        self.assertEqual(received, [bytes])
99
100
101    def test_inConnectionLost(self):
102        """
103        Verify that when stdin close notification is delivered to
104        L{ProcessProtocol.childConnectionLost}, it is forwarded to
105        L{ProcessProtocol.inConnectionLost}.
106        """
107        lost = []
108        class InLostProtocol(StubProcessProtocol):
109            def inConnectionLost(self):
110                lost.append(None)
111
112        p = InLostProtocol()
113        p.childConnectionLost(0)
114        self.assertEqual(lost, [None])
115
116
117    def test_outConnectionLost(self):
118        """
119        Similar to L{test_inConnectionLost}, but for stdout.
120        """
121        lost = []
122        class OutLostProtocol(StubProcessProtocol):
123            def outConnectionLost(self):
124                lost.append(None)
125
126        p = OutLostProtocol()
127        p.childConnectionLost(1)
128        self.assertEqual(lost, [None])
129
130
131    def test_errConnectionLost(self):
132        """
133        Similar to L{test_inConnectionLost}, but for stderr.
134        """
135        lost = []
136        class ErrLostProtocol(StubProcessProtocol):
137            def errConnectionLost(self):
138                lost.append(None)
139
140        p = ErrLostProtocol()
141        p.childConnectionLost(2)
142        self.assertEqual(lost, [None])
143
144
145
146class TrivialProcessProtocol(protocol.ProcessProtocol):
147    """
148    Simple process protocol for tests purpose.
149
150    @ivar outData: data received from stdin
151    @ivar errData: data received from stderr
152    """
153
154    def __init__(self, d):
155        """
156        Create the deferred that will be fired at the end, and initialize
157        data structures.
158        """
159        self.deferred = d
160        self.outData = []
161        self.errData = []
162
163    def processEnded(self, reason):
164        self.reason = reason
165        self.deferred.callback(None)
166
167    def outReceived(self, data):
168        self.outData.append(data)
169
170    def errReceived(self, data):
171        self.errData.append(data)
172
173
174class TestProcessProtocol(protocol.ProcessProtocol):
175
176    def connectionMade(self):
177        self.stages = [1]
178        self.data = ''
179        self.err = ''
180        self.transport.write("abcd")
181
182    def childDataReceived(self, childFD, data):
183        """
184        Override and disable the dispatch provided by the base class to ensure
185        that it is really this method which is being called, and the transport
186        is not going directly to L{outReceived} or L{errReceived}.
187        """
188        if childFD == 1:
189            self.data += data
190        elif childFD == 2:
191            self.err += data
192
193
194    def childConnectionLost(self, childFD):
195        """
196        Similarly to L{childDataReceived}, disable the automatic dispatch
197        provided by the base implementation to verify that the transport is
198        calling this method directly.
199        """
200        if childFD == 1:
201            self.stages.append(2)
202            if self.data != "abcd":
203                raise RuntimeError(
204                    "Data was %r instead of 'abcd'" % (self.data,))
205            self.transport.write("1234")
206        elif childFD == 2:
207            self.stages.append(3)
208            if self.err != "1234":
209                raise RuntimeError(
210                    "Err was %r instead of '1234'" % (self.err,))
211            self.transport.write("abcd")
212            self.stages.append(4)
213        elif childFD == 0:
214            self.stages.append(5)
215
216    def processEnded(self, reason):
217        self.reason = reason
218        self.deferred.callback(None)
219
220
221class EchoProtocol(protocol.ProcessProtocol):
222
223    s = "1234567" * 1001
224    n = 10
225    finished = 0
226
227    failure = None
228
229    def __init__(self, onEnded):
230        self.onEnded = onEnded
231        self.count = 0
232
233    def connectionMade(self):
234        assert self.n > 2
235        for i in range(self.n - 2):
236            self.transport.write(self.s)
237        # test writeSequence
238        self.transport.writeSequence([self.s, self.s])
239        self.buffer = self.s * self.n
240
241    def outReceived(self, data):
242        if buffer(self.buffer, self.count, len(data)) != buffer(data):
243            self.failure = ("wrong bytes received", data, self.count)
244            self.transport.closeStdin()
245        else:
246            self.count += len(data)
247            if self.count == len(self.buffer):
248                self.transport.closeStdin()
249
250    def processEnded(self, reason):
251        self.finished = 1
252        if not reason.check(error.ProcessDone):
253            self.failure = "process didn't terminate normally: " + str(reason)
254        self.onEnded.callback(self)
255
256
257
258class SignalProtocol(protocol.ProcessProtocol):
259    """
260    A process protocol that sends a signal when data is first received.
261
262    @ivar deferred: deferred firing on C{processEnded}.
263    @type deferred: L{defer.Deferred}
264
265    @ivar signal: the signal to send to the process.
266    @type signal: C{str}
267
268    @ivar signaled: A flag tracking whether the signal has been sent to the
269        child or not yet.  C{False} until it is sent, then C{True}.
270    @type signaled: C{bool}
271    """
272
273    def __init__(self, deferred, sig):
274        self.deferred = deferred
275        self.signal = sig
276        self.signaled = False
277
278
279    def outReceived(self, data):
280        """
281        Handle the first output from the child process (which indicates it
282        is set up and ready to receive the signal) by sending the signal to
283        it.  Also log all output to help with debugging.
284        """
285        msg("Received %r from child stdout" % (data,))
286        if not self.signaled:
287            self.signaled = True
288            self.transport.signalProcess(self.signal)
289
290
291    def errReceived(self, data):
292        """
293        Log all data received from the child's stderr to help with
294        debugging.
295        """
296        msg("Received %r from child stderr" % (data,))
297
298
299    def processEnded(self, reason):
300        """
301        Callback C{self.deferred} with C{None} if C{reason} is a
302        L{error.ProcessTerminated} failure with C{exitCode} set to C{None},
303        C{signal} set to C{self.signal}, and C{status} holding the status code
304        of the exited process. Otherwise, errback with a C{ValueError}
305        describing the problem.
306        """
307        msg("Child exited: %r" % (reason.getTraceback(),))
308        if not reason.check(error.ProcessTerminated):
309            return self.deferred.errback(
310                ValueError("wrong termination: %s" % (reason,)))
311        v = reason.value
312        if isinstance(self.signal, str):
313            signalValue = getattr(signal, 'SIG' + self.signal)
314        else:
315            signalValue = self.signal
316        if v.exitCode is not None:
317            return self.deferred.errback(
318                ValueError("SIG%s: exitCode is %s, not None" %
319                           (self.signal, v.exitCode)))
320        if v.signal != signalValue:
321            return self.deferred.errback(
322                ValueError("SIG%s: .signal was %s, wanted %s" %
323                           (self.signal, v.signal, signalValue)))
324        if os.WTERMSIG(v.status) != signalValue:
325            return self.deferred.errback(
326                ValueError('SIG%s: %s' % (self.signal, os.WTERMSIG(v.status))))
327        self.deferred.callback(None)
328
329
330
331class TestManyProcessProtocol(TestProcessProtocol):
332    def __init__(self):
333        self.deferred = defer.Deferred()
334
335    def processEnded(self, reason):
336        self.reason = reason
337        if reason.check(error.ProcessDone):
338            self.deferred.callback(None)
339        else:
340            self.deferred.errback(reason)
341
342
343
344class UtilityProcessProtocol(protocol.ProcessProtocol):
345    """
346    Helper class for launching a Python process and getting a result from it.
347
348    @ivar program: A string giving a Python program for the child process to
349    run.
350    """
351    program = None
352
353    def run(cls, reactor, argv, env):
354        """
355        Run a Python process connected to a new instance of this protocol
356        class.  Return the protocol instance.
357
358        The Python process is given C{self.program} on the command line to
359        execute, in addition to anything specified by C{argv}.  C{env} is
360        the complete environment.
361        """
362        exe = sys.executable
363        self = cls()
364        reactor.spawnProcess(
365            self, exe, [exe, "-c", self.program] + argv, env=env)
366        return self
367    run = classmethod(run)
368
369
370    def __init__(self):
371        self.bytes = []
372        self.requests = []
373
374
375    def parseChunks(self, bytes):
376        """
377        Called with all bytes received on stdout when the process exits.
378        """
379        raise NotImplementedError()
380
381
382    def getResult(self):
383        """
384        Return a Deferred which will fire with the result of L{parseChunks}
385        when the child process exits.
386        """
387        d = defer.Deferred()
388        self.requests.append(d)
389        return d
390
391
392    def _fireResultDeferreds(self, result):
393        """
394        Callback all Deferreds returned up until now by L{getResult}
395        with the given result object.
396        """
397        requests = self.requests
398        self.requests = None
399        for d in requests:
400            d.callback(result)
401
402
403    def outReceived(self, bytes):
404        """
405        Accumulate output from the child process in a list.
406        """
407        self.bytes.append(bytes)
408
409
410    def processEnded(self, reason):
411        """
412        Handle process termination by parsing all received output and firing
413        any waiting Deferreds.
414        """
415        self._fireResultDeferreds(self.parseChunks(self.bytes))
416
417
418
419
420class GetArgumentVector(UtilityProcessProtocol):
421    """
422    Protocol which will read a serialized argv from a process and
423    expose it to interested parties.
424    """
425    program = (
426        "from sys import stdout, argv\n"
427        "stdout.write(chr(0).join(argv))\n"
428        "stdout.flush()\n")
429
430    def parseChunks(self, chunks):
431        """
432        Parse the output from the process to which this protocol was
433        connected, which is a single unterminated line of \\0-separated
434        strings giving the argv of that process.  Return this as a list of
435        str objects.
436        """
437        return ''.join(chunks).split('\0')
438
439
440
441class GetEnvironmentDictionary(UtilityProcessProtocol):
442    """
443    Protocol which will read a serialized environment dict from a process
444    and expose it to interested parties.
445    """
446    program = (
447        "from sys import stdout\n"
448        "from os import environ\n"
449        "items = environ.iteritems()\n"
450        "stdout.write(chr(0).join([k + chr(0) + v for k, v in items]))\n"
451        "stdout.flush()\n")
452
453    def parseChunks(self, chunks):
454        """
455        Parse the output from the process to which this protocol was
456        connected, which is a single unterminated line of \\0-separated
457        strings giving key value pairs of the environment from that process.
458        Return this as a dictionary.
459        """
460        environString = ''.join(chunks)
461        if not environString:
462            return {}
463        environ = iter(environString.split('\0'))
464        d = {}
465        while 1:
466            try:
467                k = environ.next()
468            except StopIteration:
469                break
470            else:
471                v = environ.next()
472                d[k] = v
473        return d
474
475
476
477class ProcessTestCase(unittest.TestCase):
478    """Test running a process."""
479
480    usePTY = False
481
482    def testStdio(self):
483        """twisted.internet.stdio test."""
484        exe = sys.executable
485        scriptPath = util.sibpath(__file__, "process_twisted.py")
486        p = Accumulator()
487        d = p.endedDeferred = defer.Deferred()
488        env = {"PYTHONPATH": os.pathsep.join(sys.path)}
489        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env,
490                             path=None, usePTY=self.usePTY)
491        p.transport.write("hello, world")
492        p.transport.write("abc")
493        p.transport.write("123")
494        p.transport.closeStdin()
495
496        def processEnded(ign):
497            self.assertEqual(p.outF.getvalue(), "hello, worldabc123",
498                              "Output follows:\n"
499                              "%s\n"
500                              "Error message from process_twisted follows:\n"
501                              "%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
502        return d.addCallback(processEnded)
503
504
505    def test_unsetPid(self):
506        """
507        Test if pid is None/non-None before/after process termination.  This
508        reuses process_echoer.py to get a process that blocks on stdin.
509        """
510        finished = defer.Deferred()
511        p = TrivialProcessProtocol(finished)
512        exe = sys.executable
513        scriptPath = util.sibpath(__file__, "process_echoer.py")
514        procTrans = reactor.spawnProcess(p, exe,
515                                    [exe, scriptPath], env=None)
516        self.failUnless(procTrans.pid)
517
518        def afterProcessEnd(ignored):
519            self.assertEqual(procTrans.pid, None)
520
521        p.transport.closeStdin()
522        return finished.addCallback(afterProcessEnd)
523
524
525    def test_process(self):
526        """
527        Test running a process: check its output, it exitCode, some property of
528        signalProcess.
529        """
530        exe = sys.executable
531        scriptPath = util.sibpath(__file__, "process_tester.py")
532        d = defer.Deferred()
533        p = TestProcessProtocol()
534        p.deferred = d
535        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
536        def check(ignored):
537            self.assertEqual(p.stages, [1, 2, 3, 4, 5])
538            f = p.reason
539            f.trap(error.ProcessTerminated)
540            self.assertEqual(f.value.exitCode, 23)
541            # would .signal be available on non-posix?
542            # self.assertEqual(f.value.signal, None)
543            self.assertRaises(
544                error.ProcessExitedAlready, p.transport.signalProcess, 'INT')
545            try:
546                import process_tester, glob
547                for f in glob.glob(process_tester.test_file_match):
548                    os.remove(f)
549            except:
550                pass
551        d.addCallback(check)
552        return d
553
554    def testManyProcesses(self):
555
556        def _check(results, protocols):
557            for p in protocols:
558                self.assertEqual(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
559                # test status code
560                f = p.reason
561                f.trap(error.ProcessTerminated)
562                self.assertEqual(f.value.exitCode, 23)
563
564        exe = sys.executable
565        scriptPath = util.sibpath(__file__, "process_tester.py")
566        args = [exe, "-u", scriptPath]
567        protocols = []
568        deferreds = []
569
570        for i in xrange(50):
571            p = TestManyProcessProtocol()
572            protocols.append(p)
573            reactor.spawnProcess(p, exe, args, env=None)
574            deferreds.append(p.deferred)
575
576        deferredList = defer.DeferredList(deferreds, consumeErrors=True)
577        deferredList.addCallback(_check, protocols)
578        return deferredList
579
580
581    def test_echo(self):
582        """
583        A spawning a subprocess which echoes its stdin to its stdout via
584        C{reactor.spawnProcess} will result in that echoed output being
585        delivered to outReceived.
586        """
587        finished = defer.Deferred()
588        p = EchoProtocol(finished)
589
590        exe = sys.executable
591        scriptPath = util.sibpath(__file__, "process_echoer.py")
592        reactor.spawnProcess(p, exe, [exe, scriptPath], env=None)
593
594        def asserts(ignored):
595            self.failIf(p.failure, p.failure)
596            self.failUnless(hasattr(p, 'buffer'))
597            self.assertEqual(len(''.join(p.buffer)), len(p.s * p.n))
598
599        def takedownProcess(err):
600            p.transport.closeStdin()
601            return err
602
603        return finished.addCallback(asserts).addErrback(takedownProcess)
604
605
606    def testCommandLine(self):
607        args = [r'a\"b ', r'a\b ', r' a\\"b', r' a\\b', r'"foo bar" "', '\tab', '"\\', 'a"b', "a'b"]
608        pyExe = sys.executable
609        scriptPath = util.sibpath(__file__, "process_cmdline.py")
610        p = Accumulator()
611        d = p.endedDeferred = defer.Deferred()
612        reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None,
613                             path=None)
614
615        def processEnded(ign):
616            self.assertEqual(p.errF.getvalue(), "")
617            recvdArgs = p.outF.getvalue().splitlines()
618            self.assertEqual(recvdArgs, args)
619        return d.addCallback(processEnded)
620
621
622    def test_wrongArguments(self):
623        """
624        Test invalid arguments to spawnProcess: arguments and environment
625        must only contains string or unicode, and not null bytes.
626        """
627        exe = sys.executable
628        p = protocol.ProcessProtocol()
629
630        badEnvs = [
631            {"foo": 2},
632            {"foo": "egg\0a"},
633            {3: "bar"},
634            {"bar\0foo": "bar"}]
635
636        badArgs = [
637            [exe, 2],
638            "spam",
639            [exe, "foo\0bar"]]
640
641        # Sanity check - this will fail for people who have mucked with
642        # their site configuration in a stupid way, but there's nothing we
643        # can do about that.
644        badUnicode = u'\N{SNOWMAN}'
645        try:
646            badUnicode.encode(sys.getdefaultencoding())
647        except UnicodeEncodeError:
648            # Okay, that unicode doesn't encode, put it in as a bad environment
649            # key.
650            badEnvs.append({badUnicode: 'value for bad unicode key'})
651            badEnvs.append({'key for bad unicode value': badUnicode})
652            badArgs.append([exe, badUnicode])
653        else:
654            # It _did_ encode.  Most likely, Gtk2 is being used and the
655            # default system encoding is UTF-8, which can encode anything.
656            # In any case, if implicit unicode -> str conversion works for
657            # that string, we can't test that TypeError gets raised instead,
658            # so just leave it off.
659            pass
660
661        for env in badEnvs:
662            self.assertRaises(
663                TypeError,
664                reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env)
665
666        for args in badArgs:
667            self.assertRaises(
668                TypeError,
669                reactor.spawnProcess, p, exe, args, env=None)
670
671
672    # Use upper-case so that the environment key test uses an upper case
673    # name: some versions of Windows only support upper case environment
674    # variable names, and I think Python (as of 2.5) doesn't use the right
675    # syscall for lowercase or mixed case names to work anyway.
676    okayUnicode = u"UNICODE"
677    encodedValue = "UNICODE"
678
679    def _deprecatedUnicodeSupportTest(self, processProtocolClass, argv=[], env={}):
680        """
681        Check that a deprecation warning is emitted when passing unicode to
682        spawnProcess for an argv value or an environment key or value.
683        Check that the warning is of the right type, has the right message,
684        and refers to the correct file.  Unfortunately, don't check that the
685        line number is correct, because that is too hard for me to figure
686        out.
687
688        @param processProtocolClass: A L{UtilityProcessProtocol} subclass
689        which will be instantiated to communicate with the child process.
690
691        @param argv: The argv argument to spawnProcess.
692
693        @param env: The env argument to spawnProcess.
694
695        @return: A Deferred which fires when the test is complete.
696        """
697        # Sanity to check to make sure we can actually encode this unicode
698        # with the default system encoding.  This may be excessively
699        # paranoid. -exarkun
700        self.assertEqual(
701            self.okayUnicode.encode(sys.getdefaultencoding()),
702            self.encodedValue)
703
704        p = self.assertWarns(DeprecationWarning,
705            "Argument strings and environment keys/values passed to "
706            "reactor.spawnProcess should be str, not unicode.", __file__,
707            processProtocolClass.run, reactor, argv, env)
708        return p.getResult()
709
710
711    def test_deprecatedUnicodeArgvSupport(self):
712        """
713        Test that a unicode string passed for an argument value is allowed
714        if it can be encoded with the default system encoding, but that a
715        deprecation warning is emitted.
716        """
717        d = self._deprecatedUnicodeSupportTest(GetArgumentVector, argv=[self.okayUnicode])
718        def gotArgVector(argv):
719            self.assertEqual(argv, ['-c', self.encodedValue])
720        d.addCallback(gotArgVector)
721        return d
722
723
724    def test_deprecatedUnicodeEnvKeySupport(self):
725        """
726        Test that a unicode string passed for the key of the environment
727        dictionary is allowed if it can be encoded with the default system
728        encoding, but that a deprecation warning is emitted.
729        """
730        d = self._deprecatedUnicodeSupportTest(
731            GetEnvironmentDictionary, env={self.okayUnicode: self.encodedValue})
732        def gotEnvironment(environ):
733            self.assertEqual(environ[self.encodedValue], self.encodedValue)
734        d.addCallback(gotEnvironment)
735        return d
736
737
738    def test_deprecatedUnicodeEnvValueSupport(self):
739        """
740        Test that a unicode string passed for the value of the environment
741        dictionary is allowed if it can be encoded with the default system
742        encoding, but that a deprecation warning is emitted.
743        """
744        d = self._deprecatedUnicodeSupportTest(
745            GetEnvironmentDictionary, env={self.encodedValue: self.okayUnicode})
746        def gotEnvironment(environ):
747            # On Windows, the environment contains more things than we
748            # specified, so only make sure that at least the key we wanted
749            # is there, rather than testing the dictionary for exact
750            # equality.
751            self.assertEqual(environ[self.encodedValue], self.encodedValue)
752        d.addCallback(gotEnvironment)
753        return d
754
755
756
757class TwoProcessProtocol(protocol.ProcessProtocol):
758    num = -1
759    finished = 0
760    def __init__(self):
761        self.deferred = defer.Deferred()
762    def outReceived(self, data):
763        pass
764    def processEnded(self, reason):
765        self.finished = 1
766        self.deferred.callback(None)
767
768class TestTwoProcessesBase:
769    def setUp(self):
770        self.processes = [None, None]
771        self.pp = [None, None]
772        self.done = 0
773        self.verbose = 0
774
775    def createProcesses(self, usePTY=0):
776        exe = sys.executable
777        scriptPath = util.sibpath(__file__, "process_reader.py")
778        for num in (0,1):
779            self.pp[num] = TwoProcessProtocol()
780            self.pp[num].num = num
781            p = reactor.spawnProcess(self.pp[num],
782                                     exe, [exe, "-u", scriptPath], env=None,
783                                     usePTY=usePTY)
784            self.processes[num] = p
785
786    def close(self, num):
787        if self.verbose: print "closing stdin [%d]" % num
788        p = self.processes[num]
789        pp = self.pp[num]
790        self.failIf(pp.finished, "Process finished too early")
791        p.loseConnection()
792        if self.verbose: print self.pp[0].finished, self.pp[1].finished
793
794    def _onClose(self):
795        return defer.gatherResults([ p.deferred for p in self.pp ])
796
797    def testClose(self):
798        if self.verbose: print "starting processes"
799        self.createProcesses()
800        reactor.callLater(1, self.close, 0)
801        reactor.callLater(2, self.close, 1)
802        return self._onClose()
803
804class TestTwoProcessesNonPosix(TestTwoProcessesBase, unittest.TestCase):
805    pass
806
807class TestTwoProcessesPosix(TestTwoProcessesBase, unittest.TestCase):
808    def tearDown(self):
809        for pp, pr in zip(self.pp, self.processes):
810            if not pp.finished:
811                try:
812                    os.kill(pr.pid, signal.SIGTERM)
813                except OSError:
814                    # If the test failed the process may already be dead
815                    # The error here is only noise
816                    pass
817        return self._onClose()
818
819    def kill(self, num):
820        if self.verbose: print "kill [%d] with SIGTERM" % num
821        p = self.processes[num]
822        pp = self.pp[num]
823        self.failIf(pp.finished, "Process finished too early")
824        os.kill(p.pid, signal.SIGTERM)
825        if self.verbose: print self.pp[0].finished, self.pp[1].finished
826
827    def testKill(self):
828        if self.verbose: print "starting processes"
829        self.createProcesses(usePTY=0)
830        reactor.callLater(1, self.kill, 0)
831        reactor.callLater(2, self.kill, 1)
832        return self._onClose()
833
834    def testClosePty(self):
835        if self.verbose: print "starting processes"
836        self.createProcesses(usePTY=1)
837        reactor.callLater(1, self.close, 0)
838        reactor.callLater(2, self.close, 1)
839        return self._onClose()
840
841    def testKillPty(self):
842        if self.verbose: print "starting processes"
843        self.createProcesses(usePTY=1)
844        reactor.callLater(1, self.kill, 0)
845        reactor.callLater(2, self.kill, 1)
846        return self._onClose()
847
848class FDChecker(protocol.ProcessProtocol):
849    state = 0
850    data = ""
851    failed = None
852
853    def __init__(self, d):
854        self.deferred = d
855
856    def fail(self, why):
857        self.failed = why
858        self.deferred.callback(None)
859
860    def connectionMade(self):
861        self.transport.writeToChild(0, "abcd")
862        self.state = 1
863
864    def childDataReceived(self, childFD, data):
865        if self.state == 1:
866            if childFD != 1:
867                self.fail("read '%s' on fd %d (not 1) during state 1" \
868                          % (childFD, data))
869                return
870            self.data += data
871            #print "len", len(self.data)
872            if len(self.data) == 6:
873                if self.data != "righto":
874                    self.fail("got '%s' on fd1, expected 'righto'" \
875                              % self.data)
876                    return
877                self.data = ""
878                self.state = 2
879                #print "state2", self.state
880                self.transport.writeToChild(3, "efgh")
881                return
882        if self.state == 2:
883            self.fail("read '%s' on fd %s during state 2" % (childFD, data))
884            return
885        if self.state == 3:
886            if childFD != 1:
887                self.fail("read '%s' on fd %s (not 1) during state 3" \
888                          % (childFD, data))
889                return
890            self.data += data
891            if len(self.data) == 6:
892                if self.data != "closed":
893                    self.fail("got '%s' on fd1, expected 'closed'" \
894                              % self.data)
895                    return
896                self.state = 4
897            return
898        if self.state == 4:
899            self.fail("read '%s' on fd %s during state 4" % (childFD, data))
900            return
901
902    def childConnectionLost(self, childFD):
903        if self.state == 1:
904            self.fail("got connectionLost(%d) during state 1" % childFD)
905            return
906        if self.state == 2:
907            if childFD != 4:
908                self.fail("got connectionLost(%d) (not 4) during state 2" \
909                          % childFD)
910                return
911            self.state = 3
912            self.transport.closeChildFD(5)
913            return
914
915    def processEnded(self, status):
916        rc = status.value.exitCode
917        if self.state != 4:
918            self.fail("processEnded early, rc %d" % rc)
919            return
920        if status.value.signal != None:
921            self.fail("processEnded with signal %s" % status.value.signal)
922            return
923        if rc != 0:
924            self.fail("processEnded with rc %d" % rc)
925            return
926        self.deferred.callback(None)
927
928
929class FDTest(unittest.TestCase):
930
931    def testFD(self):
932        exe = sys.executable
933        scriptPath = util.sibpath(__file__, "process_fds.py")
934        d = defer.Deferred()
935        p = FDChecker(d)
936        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
937                             path=None,
938                             childFDs={0:"w", 1:"r", 2:2,
939                                       3:"w", 4:"r", 5:"w"})
940        d.addCallback(lambda x : self.failIf(p.failed, p.failed))
941        return d
942
943    def testLinger(self):
944        # See what happens when all the pipes close before the process
945        # actually stops. This test *requires* SIGCHLD catching to work,
946        # as there is no other way to find out the process is done.
947        exe = sys.executable
948        scriptPath = util.sibpath(__file__, "process_linger.py")
949        p = Accumulator()
950        d = p.endedDeferred = defer.Deferred()
951        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
952                             path=None,
953                             childFDs={1:"r", 2:2},
954                             )
955        def processEnded(ign):
956            self.assertEqual(p.outF.getvalue(),
957                                 "here is some text\ngoodbye\n")
958        return d.addCallback(processEnded)
959
960
961
962class Accumulator(protocol.ProcessProtocol):
963    """Accumulate data from a process."""
964
965    closed = 0
966    endedDeferred = None
967
968    def connectionMade(self):
969        self.outF = StringIO.StringIO()
970        self.errF = StringIO.StringIO()
971
972    def outReceived(self, d):
973        self.outF.write(d)
974
975    def errReceived(self, d):
976        self.errF.write(d)
977
978    def outConnectionLost(self):
979        pass
980
981    def errConnectionLost(self):
982        pass
983
984    def processEnded(self, reason):
985        self.closed = 1
986        if self.endedDeferred is not None:
987            d, self.endedDeferred = self.endedDeferred, None
988            d.callback(None)
989
990
991class PosixProcessBase:
992    """
993    Test running processes.
994    """
995    usePTY = False
996
997    def getCommand(self, commandName):
998        """
999        Return the path of the shell command named C{commandName}, looking at
1000        common locations.
1001        """
1002        if os.path.exists('/bin/%s' % (commandName,)):
1003            cmd = '/bin/%s' % (commandName,)
1004        elif os.path.exists('/usr/bin/%s' % (commandName,)):
1005            cmd = '/usr/bin/%s' % (commandName,)
1006        else:
1007            raise RuntimeError(
1008                "%s not found in /bin or /usr/bin" % (commandName,))
1009        return cmd
1010
1011    def testNormalTermination(self):
1012        cmd = self.getCommand('true')
1013
1014        d = defer.Deferred()
1015        p = TrivialProcessProtocol(d)
1016        reactor.spawnProcess(p, cmd, ['true'], env=None,
1017                             usePTY=self.usePTY)
1018        def check(ignored):
1019            p.reason.trap(error.ProcessDone)
1020            self.assertEqual(p.reason.value.exitCode, 0)
1021            self.assertEqual(p.reason.value.signal, None)
1022        d.addCallback(check)
1023        return d
1024
1025
1026    def test_abnormalTermination(self):
1027        """
1028        When a process terminates with a system exit code set to 1,
1029        C{processEnded} is called with a L{error.ProcessTerminated} error,
1030        the C{exitCode} attribute reflecting the system exit code.
1031        """
1032        exe = sys.executable
1033
1034        d = defer.Deferred()
1035        p = TrivialProcessProtocol(d)
1036        reactor.spawnProcess(p, exe, [exe, '-c', 'import sys; sys.exit(1)'],
1037                             env=None, usePTY=self.usePTY)
1038
1039        def check(ignored):
1040            p.reason.trap(error.ProcessTerminated)
1041            self.assertEqual(p.reason.value.exitCode, 1)
1042            self.assertEqual(p.reason.value.signal, None)
1043        d.addCallback(check)
1044        return d
1045
1046
1047    def _testSignal(self, sig):
1048        exe = sys.executable
1049        scriptPath = util.sibpath(__file__, "process_signal.py")
1050        d = defer.Deferred()
1051        p = SignalProtocol(d, sig)
1052        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
1053                             usePTY=self.usePTY)
1054        return d
1055
1056
1057    def test_signalHUP(self):
1058        """
1059        Sending the SIGHUP signal to a running process interrupts it, and
1060        C{processEnded} is called with a L{error.ProcessTerminated} instance
1061        with the C{exitCode} set to C{None} and the C{signal} attribute set to
1062        C{signal.SIGHUP}. C{os.WTERMSIG} can also be used on the C{status}
1063        attribute to extract the signal value.
1064        """
1065        return self._testSignal('HUP')
1066
1067
1068    def test_signalINT(self):
1069        """
1070        Sending the SIGINT signal to a running process interrupts it, and
1071        C{processEnded} is called with a L{error.ProcessTerminated} instance
1072        with the C{exitCode} set to C{None} and the C{signal} attribute set to
1073        C{signal.SIGINT}. C{os.WTERMSIG} can also be used on the C{status}
1074        attribute to extract the signal value.
1075        """
1076        return self._testSignal('INT')
1077
1078
1079    def test_signalKILL(self):
1080        """
1081        Sending the SIGKILL signal to a running process interrupts it, and
1082        C{processEnded} is called with a L{error.ProcessTerminated} instance
1083        with the C{exitCode} set to C{None} and the C{signal} attribute set to
1084        C{signal.SIGKILL}. C{os.WTERMSIG} can also be used on the C{status}
1085        attribute to extract the signal value.
1086        """
1087        return self._testSignal('KILL')
1088
1089
1090    def test_signalTERM(self):
1091        """
1092        Sending the SIGTERM signal to a running process interrupts it, and
1093        C{processEnded} is called with a L{error.ProcessTerminated} instance
1094        with the C{exitCode} set to C{None} and the C{signal} attribute set to
1095        C{signal.SIGTERM}. C{os.WTERMSIG} can also be used on the C{status}
1096        attribute to extract the signal value.
1097        """
1098        return self._testSignal('TERM')
1099
1100
1101    def test_childSignalHandling(self):
1102        """
1103        The disposition of signals which are ignored in the parent
1104        process is reset to the default behavior for the child
1105        process.
1106        """
1107        # Somewhat arbitrarily select SIGUSR1 here.  It satisfies our
1108        # requirements that:
1109        #    - The interpreter not fiddle around with the handler
1110        #      behind our backs at startup time (this disqualifies
1111        #      signals like SIGINT and SIGPIPE).
1112        #    - The default behavior is to exit.
1113        #
1114        # This lets us send the signal to the child and then verify
1115        # that it exits with a status code indicating that it was
1116        # indeed the signal which caused it to exit.
1117        which = signal.SIGUSR1
1118
1119        # Ignore the signal in the parent (and make sure we clean it
1120        # up).
1121        handler = signal.signal(which, signal.SIG_IGN)
1122        self.addCleanup(signal.signal, signal.SIGUSR1, handler)
1123
1124        # Now do the test.
1125        return self._testSignal(signal.SIGUSR1)
1126
1127
1128    def test_executionError(self):
1129        """
1130        Raise an error during execvpe to check error management.
1131        """
1132        cmd = self.getCommand('false')
1133
1134        d = defer.Deferred()
1135        p = TrivialProcessProtocol(d)
1136        def buggyexecvpe(command, args, environment):
1137            raise RuntimeError("Ouch")
1138        oldexecvpe = os.execvpe
1139        os.execvpe = buggyexecvpe
1140        try:
1141            reactor.spawnProcess(p, cmd, ['false'], env=None,
1142                                 usePTY=self.usePTY)
1143
1144            def check(ignored):
1145                errData = "".join(p.errData + p.outData)
1146                self.assertIn("Upon execvpe", errData)
1147                self.assertIn("Ouch", errData)
1148            d.addCallback(check)
1149        finally:
1150            os.execvpe = oldexecvpe
1151        return d
1152
1153
1154    def test_errorInProcessEnded(self):
1155        """
1156        The handler which reaps a process is removed when the process is
1157        reaped, even if the protocol's C{processEnded} method raises an
1158        exception.
1159        """
1160        connected = defer.Deferred()
1161        ended = defer.Deferred()
1162
1163        # This script runs until we disconnect its transport.
1164        pythonExecutable = sys.executable
1165        scriptPath = util.sibpath(__file__, "process_echoer.py")
1166
1167        class ErrorInProcessEnded(protocol.ProcessProtocol):
1168            """
1169            A protocol that raises an error in C{processEnded}.
1170            """
1171            def makeConnection(self, transport):
1172                connected.callback(transport)
1173
1174            def processEnded(self, reason):
1175                reactor.callLater(0, ended.callback, None)
1176                raise RuntimeError("Deliberate error")
1177
1178        # Launch the process.
1179        reactor.spawnProcess(
1180            ErrorInProcessEnded(), pythonExecutable,
1181            [pythonExecutable, scriptPath],
1182            env=None, path=None)
1183
1184        pid = []
1185        def cbConnected(transport):
1186            pid.append(transport.pid)
1187            # There's now a reap process handler registered.
1188            self.assertIn(transport.pid, process.reapProcessHandlers)
1189
1190            # Kill the process cleanly, triggering an error in the protocol.
1191            transport.loseConnection()
1192        connected.addCallback(cbConnected)
1193
1194        def checkTerminated(ignored):
1195            # The exception was logged.
1196            excs = self.flushLoggedErrors(RuntimeError)
1197            self.assertEqual(len(excs), 1)
1198            # The process is no longer scheduled for reaping.
1199            self.assertNotIn(pid[0], process.reapProcessHandlers)
1200        ended.addCallback(checkTerminated)
1201
1202        return ended
1203
1204
1205
1206class MockSignal(object):
1207    """
1208    Neuter L{signal.signal}, but pass other attributes unscathed
1209    """
1210    def signal(self, sig, action):
1211        return signal.getsignal(sig)
1212
1213    def __getattr__(self, attr):
1214        return getattr(signal, attr)
1215
1216
1217class MockOS(object):
1218    """
1219    The mock OS: overwrite L{os}, L{fcntl} and {sys} functions with fake ones.
1220
1221    @ivar exited: set to True when C{_exit} is called.
1222    @type exited: C{bool}
1223
1224    @ivar O_RDWR: dumb value faking C{os.O_RDWR}.
1225    @type O_RDWR: C{int}
1226
1227    @ivar O_NOCTTY: dumb value faking C{os.O_NOCTTY}.
1228    @type O_NOCTTY: C{int}
1229
1230    @ivar WNOHANG: dumb value faking C{os.WNOHANG}.
1231    @type WNOHANG: C{int}
1232
1233    @ivar raiseFork: if not C{None}, subsequent calls to fork will raise this
1234        object.
1235    @type raiseFork: C{NoneType} or C{Exception}
1236
1237    @ivar raiseExec: if set, subsequent calls to execvpe will raise an error.
1238    @type raiseExec: C{bool}
1239
1240    @ivar fdio: fake file object returned by calls to fdopen.
1241    @type fdio: C{StringIO.StringIO}
1242
1243    @ivar actions: hold names of some actions executed by the object, in order
1244        of execution.
1245
1246    @type actions: C{list} of C{str}
1247
1248    @ivar closed: keep track of the file descriptor closed.
1249    @type closed: C{list} of C{int}
1250
1251    @ivar child: whether fork return for the child or the parent.
1252    @type child: C{bool}
1253
1254    @ivar pipeCount: count the number of time that C{os.pipe} has been called.
1255    @type pipeCount: C{int}
1256
1257    @ivar raiseWaitPid: if set, subsequent calls to waitpid will raise
1258        the error specified.
1259    @type raiseWaitPid: C{None} or a class
1260
1261    @ivar waitChild: if set, subsequent calls to waitpid will return it.
1262    @type waitChild: C{None} or a tuple
1263
1264    @ivar euid: the uid returned by the fake C{os.geteuid}
1265    @type euid: C{int}
1266
1267    @ivar egid: the gid returned by the fake C{os.getegid}
1268    @type egid: C{int}
1269
1270    @ivar seteuidCalls: stored results of C{os.seteuid} calls.
1271    @type seteuidCalls: C{list}
1272
1273    @ivar setegidCalls: stored results of C{os.setegid} calls.
1274    @type setegidCalls: C{list}
1275
1276    @ivar path: the path returned by C{os.path.expanduser}.
1277    @type path: C{str}
1278
1279    @ivar raiseKill: if set, subsequent call to kill will raise the error
1280        specified.
1281    @type raiseKill: C{None} or an exception instance.
1282
1283    @ivar readData: data returned by C{os.read}.
1284    @type readData: C{str}
1285    """
1286    exited = False
1287    raiseExec = False
1288    fdio = None
1289    child = True
1290    raiseWaitPid = None
1291    raiseFork = None
1292    waitChild = None
1293    euid = 0
1294    egid = 0
1295    path = None
1296    raiseKill = None
1297    readData = ""
1298
1299    def __init__(self):
1300        """
1301        Initialize data structures.
1302        """
1303        self.actions = []
1304        self.closed = []
1305        self.pipeCount = 0
1306        self.O_RDWR = -1
1307        self.O_NOCTTY = -2
1308        self.WNOHANG = -4
1309        self.WEXITSTATUS = lambda x: 0
1310        self.WIFEXITED = lambda x: 1
1311        self.seteuidCalls = []
1312        self.setegidCalls = []
1313
1314
1315    def open(self, dev, flags):
1316        """
1317        Fake C{os.open}. Return a non fd number to be sure it's not used
1318        elsewhere.
1319        """
1320        return -3
1321
1322
1323    def fstat(self, fd):
1324        """
1325        Fake C{os.fstat}.  Return a C{os.stat_result} filled with garbage.
1326        """
1327        return os.stat_result((0,) * 10)
1328
1329
1330    def fdopen(self, fd, flag):
1331        """
1332        Fake C{os.fdopen}. Return a StringIO object whose content can be tested
1333        later via C{self.fdio}.
1334        """
1335        self.fdio = StringIO.StringIO()
1336        return self.fdio
1337
1338
1339    def setsid(self):
1340        """
1341        Fake C{os.setsid}. Save action.
1342        """
1343        self.actions.append('setsid')
1344
1345
1346    def fork(self):
1347        """
1348        Fake C{os.fork}. Save the action in C{self.actions}, and return 0 if
1349        C{self.child} is set, or a dumb number.
1350        """
1351        self.actions.append(('fork', gc.isenabled()))
1352        if self.raiseFork is not None:
1353            raise self.raiseFork
1354        elif self.child:
1355            # Child result is 0
1356            return 0
1357        else:
1358            return 21
1359
1360
1361    def close(self, fd):
1362        """
1363        Fake C{os.close}, saving the closed fd in C{self.closed}.
1364        """
1365        self.closed.append(fd)
1366
1367
1368    def dup2(self, fd1, fd2):
1369        """
1370        Fake C{os.dup2}. Do nothing.
1371        """
1372
1373
1374    def write(self, fd, data):
1375        """
1376        Fake C{os.write}. Save action.
1377        """
1378        self.actions.append(("write", fd, data))
1379
1380
1381    def read(self, fd, size):
1382        """
1383        Fake C{os.read}: save action, and return C{readData} content.
1384
1385        @param fd: The file descriptor to read.
1386
1387        @param size: The maximum number of bytes to read.
1388
1389        @return: A fixed C{bytes} buffer.
1390        """
1391        self.actions.append(('read', fd, size))
1392        return self.readData
1393
1394
1395    def execvpe(self, command, args, env):
1396        """
1397        Fake C{os.execvpe}. Save the action, and raise an error if
1398        C{self.raiseExec} is set.
1399        """
1400        self.actions.append('exec')
1401        if self.raiseExec:
1402            raise RuntimeError("Bar")
1403
1404
1405    def pipe(self):
1406        """
1407        Fake C{os.pipe}. Return non fd numbers to be sure it's not used
1408        elsewhere, and increment C{self.pipeCount}. This is used to uniquify
1409        the result.
1410        """
1411        self.pipeCount += 1
1412        return - 2 * self.pipeCount + 1,  - 2 * self.pipeCount
1413
1414
1415    def ttyname(self, fd):
1416        """
1417        Fake C{os.ttyname}. Return a dumb string.
1418        """
1419        return "foo"
1420
1421
1422    def _exit(self, code):
1423        """
1424        Fake C{os._exit}. Save the action, set the C{self.exited} flag, and
1425        raise C{SystemError}.
1426        """
1427        self.actions.append(('exit', code))
1428        self.exited = True
1429        # Don't forget to raise an error, or you'll end up in parent
1430        # code path.
1431        raise SystemError()
1432
1433
1434    def ioctl(self, fd, flags, arg):
1435        """
1436        Override C{fcntl.ioctl}. Do nothing.
1437        """
1438
1439
1440    def setNonBlocking(self, fd):
1441        """
1442        Override C{fdesc.setNonBlocking}. Do nothing.
1443        """
1444
1445
1446    def waitpid(self, pid, options):
1447        """
1448        Override C{os.waitpid}. Return values meaning that the child process
1449        has exited, save executed action.
1450        """
1451        self.actions.append('waitpid')
1452        if self.raiseWaitPid is not None:
1453            raise self.raiseWaitPid
1454        if self.waitChild is not None:
1455            return self.waitChild
1456        return 1, 0
1457
1458
1459    def settrace(self, arg):
1460        """
1461        Override C{sys.settrace} to keep coverage working.
1462        """
1463
1464
1465    def getgid(self):
1466        """
1467        Override C{os.getgid}. Return a dumb number.
1468        """
1469        return 1235
1470
1471
1472    def getuid(self):
1473        """
1474        Override C{os.getuid}. Return a dumb number.
1475        """
1476        return 1237
1477
1478
1479    def setuid(self, val):
1480        """
1481        Override C{os.setuid}. Do nothing.
1482        """
1483        self.actions.append(('setuid', val))
1484
1485
1486    def setgid(self, val):
1487        """
1488        Override C{os.setgid}. Do nothing.
1489        """
1490        self.actions.append(('setgid', val))
1491
1492
1493    def setregid(self, val1, val2):
1494        """
1495        Override C{os.setregid}. Do nothing.
1496        """
1497        self.actions.append(('setregid', val1, val2))
1498
1499
1500    def setreuid(self, val1, val2):
1501        """
1502        Override C{os.setreuid}.  Save the action.
1503        """
1504        self.actions.append(('setreuid', val1, val2))
1505
1506
1507    def switchUID(self, uid, gid):
1508        """
1509        Override C{util.switchuid}. Save the action.
1510        """
1511        self.actions.append(('switchuid', uid, gid))
1512
1513
1514    def openpty(self):
1515        """
1516        Override C{pty.openpty}, returning fake file descriptors.
1517        """
1518        return -12, -13
1519
1520
1521    def chdir(self, path):
1522        """
1523        Override C{os.chdir}. Save the action.
1524
1525        @param path: The path to change the current directory to.
1526        """
1527        self.actions.append(('chdir', path))
1528
1529
1530    def geteuid(self):
1531        """
1532        Mock C{os.geteuid}, returning C{self.euid} instead.
1533        """
1534        return self.euid
1535
1536
1537    def getegid(self):
1538        """
1539        Mock C{os.getegid}, returning C{self.egid} instead.
1540        """
1541        return self.egid
1542
1543
1544    def seteuid(self, egid):
1545        """
1546        Mock C{os.seteuid}, store result.
1547        """
1548        self.seteuidCalls.append(egid)
1549
1550
1551    def setegid(self, egid):
1552        """
1553        Mock C{os.setegid}, store result.
1554        """
1555        self.setegidCalls.append(egid)
1556
1557
1558    def expanduser(self, path):
1559        """
1560        Mock C{os.path.expanduser}.
1561        """
1562        return self.path
1563
1564
1565    def getpwnam(self, user):
1566        """
1567        Mock C{pwd.getpwnam}.
1568        """
1569        return 0, 0, 1, 2
1570
1571
1572    def listdir(self, path):
1573        """
1574        Override C{os.listdir}, returning fake contents of '/dev/fd'
1575        """
1576        return "-1", "-2"
1577
1578
1579    def kill(self, pid, signalID):
1580        """
1581        Override C{os.kill}: save the action and raise C{self.raiseKill} if
1582        specified.
1583        """
1584        self.actions.append(('kill', pid, signalID))
1585        if self.raiseKill is not None:
1586            raise self.raiseKill
1587
1588
1589    def unlink(self, filename):
1590        """
1591        Override C{os.unlink}. Save the action.
1592
1593        @param filename: The file name to remove.
1594        """
1595        self.actions.append(('unlink', filename))
1596
1597
1598    def umask(self, mask):
1599        """
1600        Override C{os.umask}. Save the action.
1601
1602        @param mask: The new file mode creation mask.
1603        """
1604        self.actions.append(('umask', mask))
1605
1606
1607    def getpid(self):
1608        """
1609        Return a fixed PID value.
1610
1611        @return: A fixed value.
1612        """
1613        return 6789
1614
1615
1616if process is not None:
1617    class DumbProcessWriter(process.ProcessWriter):
1618        """
1619        A fake L{process.ProcessWriter} used for tests.
1620        """
1621
1622        def startReading(self):
1623            """
1624            Here's the faking: don't do anything here.
1625            """
1626
1627
1628
1629    class DumbProcessReader(process.ProcessReader):
1630        """
1631        A fake L{process.ProcessReader} used for tests.
1632        """
1633
1634        def startReading(self):
1635            """
1636            Here's the faking: don't do anything here.
1637            """
1638
1639
1640
1641    class DumbPTYProcess(process.PTYProcess):
1642        """
1643        A fake L{process.PTYProcess} used for tests.
1644        """
1645
1646        def startReading(self):
1647            """
1648            Here's the faking: don't do anything here.
1649            """
1650
1651
1652
1653class MockProcessTestCase(unittest.TestCase):
1654    """
1655    Mock a process runner to test forked child code path.
1656    """
1657    if process is None:
1658        skip = "twisted.internet.process is never used on Windows"
1659
1660    def setUp(self):
1661        """
1662        Replace L{process} os, fcntl, sys, switchUID, fdesc and pty modules
1663        with the mock class L{MockOS}.
1664        """
1665        if gc.isenabled():
1666            self.addCleanup(gc.enable)
1667        else:
1668            self.addCleanup(gc.disable)
1669        self.mockos = MockOS()
1670        self.mockos.euid = 1236
1671        self.mockos.egid = 1234
1672        self.patch(process, "os", self.mockos)
1673        self.patch(process, "fcntl", self.mockos)
1674        self.patch(process, "sys", self.mockos)
1675        self.patch(process, "switchUID", self.mockos.switchUID)
1676        self.patch(process, "fdesc", self.mockos)
1677        self.patch(process.Process, "processReaderFactory", DumbProcessReader)
1678        self.patch(process.Process, "processWriterFactory", DumbProcessWriter)
1679        self.patch(process, "pty", self.mockos)
1680
1681        self.mocksig = MockSignal()
1682        self.patch(process, "signal", self.mocksig)
1683
1684
1685    def tearDown(self):
1686        """
1687        Reset processes registered for reap.
1688        """
1689        process.reapProcessHandlers = {}
1690
1691
1692    def test_mockFork(self):
1693        """
1694        Test a classic spawnProcess. Check the path of the client code:
1695        fork, exec, exit.
1696        """
1697        gc.enable()
1698
1699        cmd = '/mock/ouch'
1700
1701        d = defer.Deferred()
1702        p = TrivialProcessProtocol(d)
1703        try:
1704            reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1705                                 usePTY=False)
1706        except SystemError:
1707            self.assert_(self.mockos.exited)
1708            self.assertEqual(
1709                self.mockos.actions, [("fork", False), "exec", ("exit", 1)])
1710        else:
1711            self.fail("Should not be here")
1712
1713        # It should leave the garbage collector disabled.
1714        self.assertFalse(gc.isenabled())
1715
1716
1717    def _mockForkInParentTest(self):
1718        """
1719        Assert that in the main process, spawnProcess disables the garbage
1720        collector, calls fork, closes the pipe file descriptors it created for
1721        the child process, and calls waitpid.
1722        """
1723        self.mockos.child = False
1724        cmd = '/mock/ouch'
1725
1726        d = defer.Deferred()
1727        p = TrivialProcessProtocol(d)
1728        reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1729                             usePTY=False)
1730        # It should close the first read pipe, and the 2 last write pipes
1731        self.assertEqual(set(self.mockos.closed), set([-1, -4, -6]))
1732        self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])
1733
1734
1735    def test_mockForkInParentGarbageCollectorEnabled(self):
1736        """
1737        The garbage collector should be enabled when L{reactor.spawnProcess}
1738        returns if it was initially enabled.
1739
1740        @see L{_mockForkInParentTest}
1741        """
1742        gc.enable()
1743        self._mockForkInParentTest()
1744        self.assertTrue(gc.isenabled())
1745
1746
1747    def test_mockForkInParentGarbageCollectorDisabled(self):
1748        """
1749        The garbage collector should be disabled when L{reactor.spawnProcess}
1750        returns if it was initially disabled.
1751
1752        @see L{_mockForkInParentTest}
1753        """
1754        gc.disable()
1755        self._mockForkInParentTest()
1756        self.assertFalse(gc.isenabled())
1757
1758
1759    def test_mockForkTTY(self):
1760        """
1761        Test a TTY spawnProcess: check the path of the client code:
1762        fork, exec, exit.
1763        """
1764        cmd = '/mock/ouch'
1765
1766        d = defer.Deferred()
1767        p = TrivialProcessProtocol(d)
1768        self.assertRaises(SystemError, reactor.spawnProcess, p, cmd, ['ouch'],
1769                          env=None, usePTY=True)
1770        self.assertTrue(self.mockos.exited)
1771        self.assertEqual(
1772            self.mockos.actions,
1773            [("fork", False), "setsid", "exec", ("exit", 1)])
1774
1775
1776    def _mockWithForkError(self):
1777        """
1778        Assert that if the fork call fails, no other process setup calls are
1779        made and that spawnProcess raises the exception fork raised.
1780        """
1781        self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1782        protocol = TrivialProcessProtocol(None)
1783        self.assertRaises(OSError, reactor.spawnProcess, protocol, None)
1784        self.assertEqual(self.mockos.actions, [("fork", False)])
1785
1786
1787    def test_mockWithForkErrorGarbageCollectorEnabled(self):
1788        """
1789        The garbage collector should be enabled when L{reactor.spawnProcess}
1790        raises because L{os.fork} raised, if it was initially enabled.
1791        """
1792        gc.enable()
1793        self._mockWithForkError()
1794        self.assertTrue(gc.isenabled())
1795
1796
1797    def test_mockWithForkErrorGarbageCollectorDisabled(self):
1798        """
1799        The garbage collector should be disabled when
1800        L{reactor.spawnProcess} raises because L{os.fork} raised, if it was
1801        initially disabled.
1802        """
1803        gc.disable()
1804        self._mockWithForkError()
1805        self.assertFalse(gc.isenabled())
1806
1807
1808    def test_mockForkErrorCloseFDs(self):
1809        """
1810        When C{os.fork} raises an exception, the file descriptors created
1811        before are closed and don't leak.
1812        """
1813        self._mockWithForkError()
1814        self.assertEqual(set(self.mockos.closed), set([-1, -4, -6, -2, -3, -5]))
1815
1816
1817    def test_mockForkErrorGivenFDs(self):
1818        """
1819        When C{os.forks} raises an exception and that file descriptors have
1820        been specified with the C{childFDs} arguments of
1821        L{reactor.spawnProcess}, they are not closed.
1822        """
1823        self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1824        protocol = TrivialProcessProtocol(None)
1825        self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1826            childFDs={0: -10, 1: -11, 2: -13})
1827        self.assertEqual(self.mockos.actions, [("fork", False)])
1828        self.assertEqual(self.mockos.closed, [])
1829
1830        # We can also put "r" or "w" to let twisted create the pipes
1831        self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1832            childFDs={0: "r", 1: -11, 2: -13})
1833        self.assertEqual(set(self.mockos.closed), set([-1, -2]))
1834
1835
1836    def test_mockForkErrorClosePTY(self):
1837        """
1838        When C{os.fork} raises an exception, the file descriptors created by
1839        C{pty.openpty} are closed and don't leak, when C{usePTY} is set to
1840        C{True}.
1841        """
1842        self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1843        protocol = TrivialProcessProtocol(None)
1844        self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1845                          usePTY=True)
1846        self.assertEqual(self.mockos.actions, [("fork", False)])
1847        self.assertEqual(set(self.mockos.closed), set([-12, -13]))
1848
1849
1850    def test_mockForkErrorPTYGivenFDs(self):
1851        """
1852        If a tuple is passed to C{usePTY} to specify slave and master file
1853        descriptors and that C{os.fork} raises an exception, these file
1854        descriptors aren't closed.
1855        """
1856        self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1857        protocol = TrivialProcessProtocol(None)
1858        self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1859                          usePTY=(-20, -21, 'foo'))
1860        self.assertEqual(self.mockos.actions, [("fork", False)])
1861        self.assertEqual(self.mockos.closed, [])
1862
1863
1864    def test_mockWithExecError(self):
1865        """
1866        Spawn a process but simulate an error during execution in the client
1867        path: C{os.execvpe} raises an error. It should close all the standard
1868        fds, try to print the error encountered, and exit cleanly.
1869        """
1870        cmd = '/mock/ouch'
1871
1872        d = defer.Deferred()
1873        p = TrivialProcessProtocol(d)
1874        self.mockos.raiseExec = True
1875        try:
1876            reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1877                                 usePTY=False)
1878        except SystemError:
1879            self.assert_(self.mockos.exited)
1880            self.assertEqual(
1881                self.mockos.actions, [("fork", False), "exec", ("exit", 1)])
1882            # Check that fd have been closed
1883            self.assertIn(0, self.mockos.closed)
1884            self.assertIn(1, self.mockos.closed)
1885            self.assertIn(2, self.mockos.closed)
1886            # Check content of traceback
1887            self.assertIn("RuntimeError: Bar", self.mockos.fdio.getvalue())
1888        else:
1889            self.fail("Should not be here")
1890
1891
1892    def test_mockSetUid(self):
1893        """
1894        Try creating a process with setting its uid: it's almost the same path
1895        as the standard path, but with a C{switchUID} call before the exec.
1896        """
1897        cmd = '/mock/ouch'
1898
1899        d = defer.Deferred()
1900        p = TrivialProcessProtocol(d)
1901        try:
1902            reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1903                                 usePTY=False, uid=8080)
1904        except SystemError:
1905            self.assert_(self.mockos.exited)
1906            self.assertEqual(
1907                self.mockos.actions,
1908                [('fork', False), ('setuid', 0), ('setgid', 0),
1909                 ('switchuid', 8080, 1234), 'exec', ('exit', 1)])
1910        else:
1911            self.fail("Should not be here")
1912
1913
1914    def test_mockSetUidInParent(self):
1915        """
1916        When spawning a child process with a UID different from the UID of the
1917        current process, the current process does not have its UID changed.
1918        """
1919        self.mockos.child = False
1920        cmd = '/mock/ouch'
1921
1922        d = defer.Deferred()
1923        p = TrivialProcessProtocol(d)
1924        reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1925                             usePTY=False, uid=8080)
1926        self.assertEqual(self.mockos.actions, [('fork', False), 'waitpid'])
1927
1928
1929    def test_mockPTYSetUid(self):
1930        """
1931        Try creating a PTY process with setting its uid: it's almost the same
1932        path as the standard path, but with a C{switchUID} call before the
1933        exec.
1934        """
1935        cmd = '/mock/ouch'
1936
1937        d = defer.Deferred()
1938        p = TrivialProcessProtocol(d)
1939        try:
1940            reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1941                                 usePTY=True, uid=8081)
1942        except SystemError:
1943            self.assertTrue(self.mockos.exited)
1944            self.assertEqual(
1945                self.mockos.actions,
1946                [('fork', False), 'setsid', ('setuid', 0), ('setgid', 0),
1947                 ('switchuid', 8081, 1234), 'exec', ('exit', 1)])
1948        else:
1949            self.fail("Should not be here")
1950
1951
1952    def test_mockPTYSetUidInParent(self):
1953        """
1954        When spawning a child process with PTY and a UID different from the UID
1955        of the current process, the current process does not have its UID
1956        changed.
1957        """
1958        self.mockos.child = False
1959        cmd = '/mock/ouch'
1960
1961        d = defer.Deferred()
1962        p = TrivialProcessProtocol(d)
1963        oldPTYProcess = process.PTYProcess
1964        try:
1965            process.PTYProcess = DumbPTYProcess
1966            reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1967                                 usePTY=True, uid=8080)
1968        finally:
1969            process.PTYProcess = oldPTYProcess
1970        self.assertEqual(self.mockos.actions, [('fork', False), 'waitpid'])
1971
1972
1973    def test_mockWithWaitError(self):
1974        """
1975        Test that reapProcess logs errors raised.
1976        """
1977        self.mockos.child = False
1978        cmd = '/mock/ouch'
1979        self.mockos.waitChild = (0, 0)
1980
1981        d = defer.Deferred()
1982        p = TrivialProcessProtocol(d)
1983        proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1984                             usePTY=False)
1985        self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])
1986
1987        self.mockos.raiseWaitPid = OSError()
1988        proc.reapProcess()
1989        errors = self.flushLoggedErrors()
1990        self.assertEqual(len(errors), 1)
1991        errors[0].trap(OSError)
1992
1993
1994    def test_mockErrorECHILDInReapProcess(self):
1995        """
1996        Test that reapProcess doesn't log anything when waitpid raises a
1997        C{OSError} with errno C{ECHILD}.
1998        """
1999        self.mockos.child = False
2000        cmd = '/mock/ouch'
2001        self.mockos.waitChild = (0, 0)
2002
2003        d = defer.Deferred()
2004        p = TrivialProcessProtocol(d)
2005        proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None,
2006                                    usePTY=False)
2007        self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])
2008
2009        self.mockos.raiseWaitPid = OSError()
2010        self.mockos.raiseWaitPid.errno = errno.ECHILD
2011        # This should not produce any errors
2012        proc.reapProcess()
2013
2014
2015    def test_mockErrorInPipe(self):
2016        """
2017        If C{os.pipe} raises an exception after some pipes where created, the
2018        created pipes are closed and don't leak.
2019        """
2020        pipes = [-1, -2, -3, -4]
2021        def pipe():
2022            try:
2023                return pipes.pop(0), pipes.pop(0)
2024            except IndexError:
2025                raise OSError()
2026        self.mockos.pipe = pipe
2027        protocol = TrivialProcessProtocol(None)
2028        self.assertRaises(OSError, reactor.spawnProcess, protocol, None)
2029        self.assertEqual(self.mockos.actions, [])
2030        self.assertEqual(set(self.mockos.closed), set([-4, -3, -2, -1]))
2031
2032
2033    def test_kill(self):
2034        """
2035        L{process.Process.signalProcess} calls C{os.kill} translating the given
2036        signal string to the PID.
2037        """
2038        self.mockos.child = False
2039        self.mockos.waitChild = (0, 0)
2040        cmd = '/mock/ouch'
2041        p = TrivialProcessProtocol(None)
2042        proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False)
2043        proc.signalProcess("KILL")
2044        self.assertEqual(self.mockos.actions,
2045            [('fork', False), 'waitpid', ('kill', 21, signal.SIGKILL)])
2046
2047
2048    def test_killExited(self):
2049        """
2050        L{process.Process.signalProcess} raises L{error.ProcessExitedAlready}
2051        if the process has exited.
2052        """
2053        self.mockos.child = False
2054        cmd = '/mock/ouch'
2055        p = TrivialProcessProtocol(None)
2056        proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False)
2057        # We didn't specify a waitpid value, so the waitpid call in
2058        # registerReapProcessHandler has already reaped the process
2059        self.assertRaises(error.ProcessExitedAlready,
2060                          proc.signalProcess, "KILL")
2061
2062
2063    def test_killExitedButNotDetected(self):
2064        """
2065        L{process.Process.signalProcess} raises L{error.ProcessExitedAlready}
2066        if the process has exited but that twisted hasn't seen it (for example,
2067        if the process has been waited outside of twisted): C{os.kill} then
2068        raise C{OSError} with C{errno.ESRCH} as errno.
2069        """
2070        self.mockos.child = False
2071        self.mockos.waitChild = (0, 0)
2072        cmd = '/mock/ouch'
2073        p = TrivialProcessProtocol(None)
2074        proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False)
2075        self.mockos.raiseKill = OSError(errno.ESRCH, "Not found")
2076        self.assertRaises(error.ProcessExitedAlready,
2077                          proc.signalProcess, "KILL")
2078
2079
2080    def test_killErrorInKill(self):
2081        """
2082        L{process.Process.signalProcess} doesn't mask C{OSError} exceptions if
2083        the errno is different from C{errno.ESRCH}.
2084        """
2085        self.mockos.child = False
2086        self.mockos.waitChild = (0, 0)
2087        cmd = '/mock/ouch'
2088        p = TrivialProcessProtocol(None)
2089        proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False)
2090        self.mockos.raiseKill = OSError(errno.EINVAL, "Invalid signal")
2091        err = self.assertRaises(OSError,
2092                                proc.signalProcess, "KILL")
2093        self.assertEquals(err.errno, errno.EINVAL)
2094
2095
2096
2097class PosixProcessTestCase(unittest.TestCase, PosixProcessBase):
2098    # add two non-pty test cases
2099
2100    def test_stderr(self):
2101        """
2102        Bytes written to stderr by the spawned process are passed to the
2103        C{errReceived} callback on the C{ProcessProtocol} passed to
2104        C{spawnProcess}.
2105        """
2106        cmd = sys.executable
2107
2108        value = "42"
2109
2110        p = Accumulator()
2111        d = p.endedDeferred = defer.Deferred()
2112        reactor.spawnProcess(p, cmd,
2113                             [cmd, "-c",
2114                              "import sys; sys.stderr.write('%s')" % (value,)],
2115                             env=None, path="/tmp",
2116                             usePTY=self.usePTY)
2117
2118        def processEnded(ign):
2119            self.assertEqual(value, p.errF.getvalue())
2120        return d.addCallback(processEnded)
2121
2122
2123    def testProcess(self):
2124        cmd = self.getCommand('gzip')
2125        s = "there's no place like home!\n" * 3
2126        p = Accumulator()
2127        d = p.endedDeferred = defer.Deferred()
2128        reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp",
2129                             usePTY=self.usePTY)
2130        p.transport.write(s)
2131        p.transport.closeStdin()
2132
2133        def processEnded(ign):
2134            f = p.outF
2135            f.seek(0, 0)
2136            gf = gzip.GzipFile(fileobj=f)
2137            self.assertEqual(gf.read(), s)
2138        return d.addCallback(processEnded)
2139
2140
2141
2142class PosixProcessTestCasePTY(unittest.TestCase, PosixProcessBase):
2143    """
2144    Just like PosixProcessTestCase, but use ptys instead of pipes.
2145    """
2146    usePTY = True
2147    # PTYs only offer one input and one output. What still makes sense?
2148    # testNormalTermination
2149    # test_abnormalTermination
2150    # testSignal
2151    # testProcess, but not without p.transport.closeStdin
2152    #  might be solveable: TODO: add test if so
2153
2154    def testOpeningTTY(self):
2155        exe = sys.executable
2156        scriptPath = util.sibpath(__file__, "process_tty.py")
2157        p = Accumulator()
2158        d = p.endedDeferred = defer.Deferred()
2159        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
2160                            path=None, usePTY=self.usePTY)
2161        p.transport.write("hello world!\n")
2162
2163        def processEnded(ign):
2164            self.assertRaises(
2165                error.ProcessExitedAlready, p.transport.signalProcess, 'HUP')
2166            self.assertEqual(
2167                p.outF.getvalue(),
2168                "hello world!\r\nhello world!\r\n",
2169                "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue())
2170        return d.addCallback(processEnded)
2171
2172
2173    def testBadArgs(self):
2174        pyExe = sys.executable
2175        pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
2176        p = Accumulator()
2177        self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs,
2178            usePTY=1, childFDs={1:'r'})
2179
2180
2181
2182class Win32SignalProtocol(SignalProtocol):
2183    """
2184    A win32-specific process protocol that handles C{processEnded}
2185    differently: processes should exit with exit code 1.
2186    """
2187
2188    def processEnded(self, reason):
2189        """
2190        Callback C{self.deferred} with C{None} if C{reason} is a
2191        L{error.ProcessTerminated} failure with C{exitCode} set to 1.
2192        Otherwise, errback with a C{ValueError} describing the problem.
2193        """
2194        if not reason.check(error.ProcessTerminated):
2195            return self.deferred.errback(
2196                ValueError("wrong termination: %s" % (reason,)))
2197        v = reason.value
2198        if v.exitCode != 1:
2199            return self.deferred.errback(
2200                ValueError("Wrong exit code: %s" % (reason.exitCode,)))
2201        self.deferred.callback(None)
2202
2203
2204
2205class Win32ProcessTestCase(unittest.TestCase):
2206    """
2207    Test process programs that are packaged with twisted.
2208    """
2209
2210    def testStdinReader(self):
2211        pyExe = sys.executable
2212        scriptPath = util.sibpath(__file__, "process_stdinreader.py")
2213        p = Accumulator()
2214        d = p.endedDeferred = defer.Deferred()
2215        reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath], env=None,
2216                             path=None)
2217        p.transport.write("hello, world")
2218        p.transport.closeStdin()
2219
2220        def processEnded(ign):
2221            self.assertEqual(p.errF.getvalue(), "err\nerr\n")
2222            self.assertEqual(p.outF.getvalue(), "out\nhello, world\nout\n")
2223        return d.addCallback(processEnded)
2224
2225
2226    def testBadArgs(self):
2227        pyExe = sys.executable
2228        pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
2229        p = Accumulator()
2230        self.assertRaises(ValueError,
2231            reactor.spawnProcess, p, pyExe, pyArgs, uid=1)
2232        self.assertRaises(ValueError,
2233            reactor.spawnProcess, p, pyExe, pyArgs, gid=1)
2234        self.assertRaises(ValueError,
2235            reactor.spawnProcess, p, pyExe, pyArgs, usePTY=1)
2236        self.assertRaises(ValueError,
2237            reactor.spawnProcess, p, pyExe, pyArgs, childFDs={1:'r'})
2238
2239
2240    def _testSignal(self, sig):
2241        exe = sys.executable
2242        scriptPath = util.sibpath(__file__, "process_signal.py")
2243        d = defer.Deferred()
2244        p = Win32SignalProtocol(d, sig)
2245        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
2246        return d
2247
2248
2249    def test_signalTERM(self):
2250        """
2251        Sending the SIGTERM signal terminates a created process, and
2252        C{processEnded} is called with a L{error.ProcessTerminated} instance
2253        with the C{exitCode} attribute set to 1.
2254        """
2255        return self._testSignal('TERM')
2256
2257
2258    def test_signalINT(self):
2259        """
2260        Sending the SIGINT signal terminates a created process, and
2261        C{processEnded} is called with a L{error.ProcessTerminated} instance
2262        with the C{exitCode} attribute set to 1.
2263        """
2264        return self._testSignal('INT')
2265
2266
2267    def test_signalKILL(self):
2268        """
2269        Sending the SIGKILL signal terminates a created process, and
2270        C{processEnded} is called with a L{error.ProcessTerminated} instance
2271        with the C{exitCode} attribute set to 1.
2272        """
2273        return self._testSignal('KILL')
2274
2275
2276    def test_closeHandles(self):
2277        """
2278        The win32 handles should be properly closed when the process exits.
2279        """
2280        import win32api
2281
2282        connected = defer.Deferred()
2283        ended = defer.Deferred()
2284
2285        class SimpleProtocol(protocol.ProcessProtocol):
2286            """
2287            A protocol that fires deferreds when connected and disconnected.
2288            """
2289            def makeConnection(self, transport):
2290                connected.callback(transport)
2291
2292            def processEnded(self, reason):
2293                ended.callback(None)
2294
2295        p = SimpleProtocol()
2296
2297        pyExe = sys.executable
2298        pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
2299        proc = reactor.spawnProcess(p, pyExe, pyArgs)
2300
2301        def cbConnected(transport):
2302            self.assertIdentical(transport, proc)
2303            # perform a basic validity test on the handles
2304            win32api.GetHandleInformation(proc.hProcess)
2305            win32api.GetHandleInformation(proc.hThread)
2306            # And save their values for later
2307            self.hProcess = proc.hProcess
2308            self.hThread = proc.hThread
2309        connected.addCallback(cbConnected)
2310
2311        def checkTerminated(ignored):
2312            # The attributes on the process object must be reset...
2313            self.assertIdentical(proc.pid, None)
2314            self.assertIdentical(proc.hProcess, None)
2315            self.assertIdentical(proc.hThread, None)
2316            # ...and the handles must be closed.
2317            self.assertRaises(win32api.error,
2318                              win32api.GetHandleInformation, self.hProcess)
2319            self.assertRaises(win32api.error,
2320                              win32api.GetHandleInformation, self.hThread)
2321        ended.addCallback(checkTerminated)
2322
2323        return defer.gatherResults([connected, ended])
2324
2325
2326
2327class Win32UnicodeEnvironmentTest(unittest.TestCase):
2328    """
2329    Tests for Unicode environment on Windows
2330    """
2331    goodKey = u'UNICODE'
2332    goodValue = u'UNICODE'
2333
2334    def test_encodableUnicodeEnvironment(self):
2335        """
2336        Test C{os.environ} (inherited by every subprocess on Windows) that
2337        contains an ascii-encodable Unicode string. This is different from
2338        passing Unicode environment explicitly to spawnProcess (which is not
2339        supported).
2340        """
2341        os.environ[self.goodKey] = self.goodValue
2342        self.addCleanup(operator.delitem, os.environ, self.goodKey)
2343
2344        p = GetEnvironmentDictionary.run(reactor, [], {})
2345        def gotEnvironment(environ):
2346            self.assertEqual(
2347                environ[self.goodKey.encode('ascii')],
2348                self.goodValue.encode('ascii'))
2349        return p.getResult().addCallback(gotEnvironment)
2350
2351
2352
2353class Dumbwin32procPidTest(unittest.TestCase):
2354    """
2355    Simple test for the pid attribute of Process on win32.
2356    """
2357
2358    def test_pid(self):
2359        """
2360        Launch process with mock win32process. The only mock aspect of this
2361        module is that the pid of the process created will always be 42.
2362        """
2363        from twisted.internet import _dumbwin32proc
2364        from twisted.test import mock_win32process
2365        self.patch(_dumbwin32proc, "win32process", mock_win32process)
2366        exe = sys.executable
2367        scriptPath = util.sibpath(__file__, "process_cmdline.py")
2368
2369        d = defer.Deferred()
2370        processProto = TrivialProcessProtocol(d)
2371        comspec = str(os.environ["COMSPEC"])
2372        cmd = [comspec, "/c", exe, scriptPath]
2373
2374        p = _dumbwin32proc.Process(reactor,
2375                                  processProto,
2376                                  None,
2377                                  cmd,
2378                                  {},
2379                                  None)
2380        self.assertEqual(42, p.pid)
2381        self.assertEqual("<Process pid=42>", repr(p))
2382
2383        def pidCompleteCb(result):
2384            self.assertEqual(None, p.pid)
2385        return d.addCallback(pidCompleteCb)
2386
2387
2388
2389class UtilTestCase(unittest.TestCase):
2390    """
2391    Tests for process-related helper functions (currently only
2392    L{procutils.which}.
2393    """
2394    def setUp(self):
2395        """
2396        Create several directories and files, some of which are executable
2397        and some of which are not.  Save the current PATH setting.
2398        """
2399        j = os.path.join
2400
2401        base = self.mktemp()
2402
2403        self.foo = j(base, "foo")
2404        self.baz = j(base, "baz")
2405        self.foobar = j(self.foo, "bar")
2406        self.foobaz = j(self.foo, "baz")
2407        self.bazfoo = j(self.baz, "foo")
2408        self.bazbar = j(self.baz, "bar")
2409
2410        for d in self.foobar, self.foobaz, self.bazfoo, self.bazbar:
2411            os.makedirs(d)
2412
2413        for name, mode in [(j(self.foobaz, "executable"), 0700),
2414                           (j(self.foo, "executable"), 0700),
2415                           (j(self.bazfoo, "executable"), 0700),
2416                           (j(self.bazfoo, "executable.bin"), 0700),
2417                           (j(self.bazbar, "executable"), 0)]:
2418            f = file(name, "w")
2419            f.close()
2420            os.chmod(name, mode)
2421
2422        self.oldPath = os.environ.get('PATH', None)
2423        os.environ['PATH'] = os.pathsep.join((
2424            self.foobar, self.foobaz, self.bazfoo, self.bazbar))
2425
2426
2427    def tearDown(self):
2428        """
2429        Restore the saved PATH setting, and set all created files readable
2430        again so that they can be deleted easily.
2431        """
2432        os.chmod(os.path.join(self.bazbar, "executable"), stat.S_IWUSR)
2433        if self.oldPath is None:
2434            try:
2435                del os.environ['PATH']
2436            except KeyError:
2437                pass
2438        else:
2439            os.environ['PATH'] = self.oldPath
2440
2441
2442    def test_whichWithoutPATH(self):
2443        """
2444        Test that if C{os.environ} does not have a C{'PATH'} key,
2445        L{procutils.which} returns an empty list.
2446        """
2447        del os.environ['PATH']
2448        self.assertEqual(procutils.which("executable"), [])
2449
2450
2451    def testWhich(self):
2452        j = os.path.join
2453        paths = procutils.which("executable")
2454        expectedPaths = [j(self.foobaz, "executable"),
2455                         j(self.bazfoo, "executable")]
2456        if runtime.platform.isWindows():
2457            expectedPaths.append(j(self.bazbar, "executable"))
2458        self.assertEqual(paths, expectedPaths)
2459
2460
2461    def testWhichPathExt(self):
2462        j = os.path.join
2463        old = os.environ.get('PATHEXT', None)
2464        os.environ['PATHEXT'] = os.pathsep.join(('.bin', '.exe', '.sh'))
2465        try:
2466            paths = procutils.which("executable")
2467        finally:
2468            if old is None:
2469                del os.environ['PATHEXT']
2470            else:
2471                os.environ['PATHEXT'] = old
2472        expectedPaths = [j(self.foobaz, "executable"),
2473                         j(self.bazfoo, "executable"),
2474                         j(self.bazfoo, "executable.bin")]
2475        if runtime.platform.isWindows():
2476            expectedPaths.append(j(self.bazbar, "executable"))
2477        self.assertEqual(paths, expectedPaths)
2478
2479
2480
2481class ClosingPipesProcessProtocol(protocol.ProcessProtocol):
2482    output = ''
2483    errput = ''
2484
2485    def __init__(self, outOrErr):
2486        self.deferred = defer.Deferred()
2487        self.outOrErr = outOrErr
2488
2489    def processEnded(self, reason):
2490        self.deferred.callback(reason)
2491
2492    def outReceived(self, data):
2493        self.output += data
2494
2495    def errReceived(self, data):
2496        self.errput += data
2497
2498
2499
2500class ClosingPipes(unittest.TestCase):
2501
2502    def doit(self, fd):
2503        """
2504        Create a child process and close one of its output descriptors using
2505        L{IProcessTransport.closeStdout} or L{IProcessTransport.closeStderr}.
2506        Return a L{Deferred} which fires after verifying that the descriptor was
2507        really closed.
2508        """
2509        p = ClosingPipesProcessProtocol(True)
2510        self.assertFailure(p.deferred, error.ProcessTerminated)
2511        p.deferred.addCallback(self._endProcess, p)
2512        reactor.spawnProcess(
2513            p, sys.executable, [
2514                sys.executable, '-u', '-c',
2515                'raw_input()\n'
2516                'import sys, os, time\n'
2517                # Give the system a bit of time to notice the closed
2518                # descriptor.  Another option would be to poll() for HUP
2519                # instead of relying on an os.write to fail with SIGPIPE.
2520                # However, that wouldn't work on OS X (or Windows?).
2521                'for i in range(1000):\n'
2522                '    os.write(%d, "foo\\n")\n'
2523                '    time.sleep(0.01)\n'
2524                'sys.exit(42)\n' % (fd,)
2525                ],
2526            env=None)
2527
2528        if fd == 1:
2529            p.transport.closeStdout()
2530        elif fd == 2:
2531            p.transport.closeStderr()
2532        else:
2533            raise RuntimeError
2534
2535        # Give the close time to propagate
2536        p.transport.write('go\n')
2537
2538        # make the buggy case not hang
2539        p.transport.closeStdin()
2540        return p.deferred
2541
2542
2543    def _endProcess(self, reason, p):
2544        """
2545        Check that a failed write prevented the process from getting to its
2546        custom exit code.
2547        """
2548        # child must not get past that write without raising
2549        self.assertNotEquals(
2550            reason.exitCode, 42, 'process reason was %r' % reason)
2551        self.assertEqual(p.output, '')
2552        return p.errput
2553
2554
2555    def test_stdout(self):
2556        """
2557        ProcessProtocol.transport.closeStdout actually closes the pipe.
2558        """
2559        d = self.doit(1)
2560        def _check(errput):
2561            self.assertIn('OSError', errput)
2562            if runtime.platform.getType() != 'win32':
2563                self.assertIn('Broken pipe', errput)
2564        d.addCallback(_check)
2565        return d
2566
2567
2568    def test_stderr(self):
2569        """
2570        ProcessProtocol.transport.closeStderr actually closes the pipe.
2571        """
2572        d = self.doit(2)
2573        def _check(errput):
2574            # there should be no stderr open, so nothing for it to
2575            # write the error to.
2576            self.assertEqual(errput, '')
2577        d.addCallback(_check)
2578        return d
2579
2580
2581skipMessage = "wrong platform or reactor doesn't support IReactorProcess"
2582if (runtime.platform.getType() != 'posix') or (not interfaces.IReactorProcess(reactor, None)):
2583    PosixProcessTestCase.skip = skipMessage
2584    PosixProcessTestCasePTY.skip = skipMessage
2585    TestTwoProcessesPosix.skip = skipMessage
2586    FDTest.skip = skipMessage
2587
2588if (runtime.platform.getType() != 'win32') or (not interfaces.IReactorProcess(reactor, None)):
2589    Win32ProcessTestCase.skip = skipMessage
2590    TestTwoProcessesNonPosix.skip = skipMessage
2591    Dumbwin32procPidTest.skip = skipMessage
2592    Win32UnicodeEnvironmentTest.skip = skipMessage
2593
2594if not interfaces.IReactorProcess(reactor, None):
2595    ProcessTestCase.skip = skipMessage
2596    ClosingPipes.skip = skipMessage
2597
2598