1from __future__ import absolute_import
2
3__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
4
5import subprocess
6
7from . import events
8from . import protocols
9from . import streams
10from . import tasks
11from .coroutines import coroutine, From, Return
12from .py33_exceptions import BrokenPipeError, ConnectionResetError
13from .log import logger
14
15
16PIPE = subprocess.PIPE
17STDOUT = subprocess.STDOUT
18if hasattr(subprocess, 'DEVNULL'):
19    DEVNULL = subprocess.DEVNULL
20
21
22class SubprocessStreamProtocol(streams.FlowControlMixin,
23                               protocols.SubprocessProtocol):
24    """Like StreamReaderProtocol, but for a subprocess."""
25
26    def __init__(self, limit, loop):
27        super(SubprocessStreamProtocol, self).__init__(loop=loop)
28        self._limit = limit
29        self.stdin = self.stdout = self.stderr = None
30        self._transport = None
31
32    def __repr__(self):
33        info = [self.__class__.__name__]
34        if self.stdin is not None:
35            info.append('stdin=%r' % self.stdin)
36        if self.stdout is not None:
37            info.append('stdout=%r' % self.stdout)
38        if self.stderr is not None:
39            info.append('stderr=%r' % self.stderr)
40        return '<%s>' % ' '.join(info)
41
42    def connection_made(self, transport):
43        self._transport = transport
44
45        stdout_transport = transport.get_pipe_transport(1)
46        if stdout_transport is not None:
47            self.stdout = streams.StreamReader(limit=self._limit,
48                                               loop=self._loop)
49            self.stdout.set_transport(stdout_transport)
50
51        stderr_transport = transport.get_pipe_transport(2)
52        if stderr_transport is not None:
53            self.stderr = streams.StreamReader(limit=self._limit,
54                                               loop=self._loop)
55            self.stderr.set_transport(stderr_transport)
56
57        stdin_transport = transport.get_pipe_transport(0)
58        if stdin_transport is not None:
59            self.stdin = streams.StreamWriter(stdin_transport,
60                                              protocol=self,
61                                              reader=None,
62                                              loop=self._loop)
63
64    def pipe_data_received(self, fd, data):
65        if fd == 1:
66            reader = self.stdout
67        elif fd == 2:
68            reader = self.stderr
69        else:
70            reader = None
71        if reader is not None:
72            reader.feed_data(data)
73
74    def pipe_connection_lost(self, fd, exc):
75        if fd == 0:
76            pipe = self.stdin
77            if pipe is not None:
78                pipe.close()
79            self.connection_lost(exc)
80            return
81        if fd == 1:
82            reader = self.stdout
83        elif fd == 2:
84            reader = self.stderr
85        else:
86            reader = None
87        if reader != None:
88            if exc is None:
89                reader.feed_eof()
90            else:
91                reader.set_exception(exc)
92
93    def process_exited(self):
94        self._transport.close()
95        self._transport = None
96
97
98class Process:
99    def __init__(self, transport, protocol, loop):
100        self._transport = transport
101        self._protocol = protocol
102        self._loop = loop
103        self.stdin = protocol.stdin
104        self.stdout = protocol.stdout
105        self.stderr = protocol.stderr
106        self.pid = transport.get_pid()
107
108    def __repr__(self):
109        return '<%s %s>' % (self.__class__.__name__, self.pid)
110
111    @property
112    def returncode(self):
113        return self._transport.get_returncode()
114
115    @coroutine
116    def wait(self):
117        """Wait until the process exit and return the process return code.
118
119        This method is a coroutine."""
120        return_code = yield From(self._transport._wait())
121        raise Return(return_code)
122
123    def send_signal(self, signal):
124        self._transport.send_signal(signal)
125
126    def terminate(self):
127        self._transport.terminate()
128
129    def kill(self):
130        self._transport.kill()
131
132    @coroutine
133    def _feed_stdin(self, input):
134        debug = self._loop.get_debug()
135        self.stdin.write(input)
136        if debug:
137            logger.debug('%r communicate: feed stdin (%s bytes)',
138                        self, len(input))
139        try:
140            yield From(self.stdin.drain())
141        except (BrokenPipeError, ConnectionResetError) as exc:
142            # communicate() ignores BrokenPipeError and ConnectionResetError
143            if debug:
144                logger.debug('%r communicate: stdin got %r', self, exc)
145
146        if debug:
147            logger.debug('%r communicate: close stdin', self)
148        self.stdin.close()
149
150    @coroutine
151    def _noop(self):
152        return None
153
154    @coroutine
155    def _read_stream(self, fd):
156        transport = self._transport.get_pipe_transport(fd)
157        if fd == 2:
158            stream = self.stderr
159        else:
160            assert fd == 1
161            stream = self.stdout
162        if self._loop.get_debug():
163            name = 'stdout' if fd == 1 else 'stderr'
164            logger.debug('%r communicate: read %s', self, name)
165        output = yield From(stream.read())
166        if self._loop.get_debug():
167            name = 'stdout' if fd == 1 else 'stderr'
168            logger.debug('%r communicate: close %s', self, name)
169        transport.close()
170        raise Return(output)
171
172    @coroutine
173    def communicate(self, input=None):
174        if input:
175            stdin = self._feed_stdin(input)
176        else:
177            stdin = self._noop()
178        if self.stdout is not None:
179            stdout = self._read_stream(1)
180        else:
181            stdout = self._noop()
182        if self.stderr is not None:
183            stderr = self._read_stream(2)
184        else:
185            stderr = self._noop()
186        stdin, stdout, stderr = yield From(tasks.gather(stdin, stdout, stderr,
187                                                        loop=self._loop))
188        yield From(self.wait())
189        raise Return(stdout, stderr)
190
191
192@coroutine
193def create_subprocess_shell(cmd, **kwds):
194    stdin = kwds.pop('stdin', None)
195    stdout = kwds.pop('stdout', None)
196    stderr = kwds.pop('stderr', None)
197    loop = kwds.pop('loop', None)
198    limit = kwds.pop('limit', streams._DEFAULT_LIMIT)
199    if loop is None:
200        loop = events.get_event_loop()
201    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
202                                                        loop=loop)
203    transport, protocol = yield From(loop.subprocess_shell(
204                                            protocol_factory,
205                                            cmd, stdin=stdin, stdout=stdout,
206                                            stderr=stderr, **kwds))
207    raise Return(Process(transport, protocol, loop))
208
209@coroutine
210def create_subprocess_exec(program, *args, **kwds):
211    stdin = kwds.pop('stdin', None)
212    stdout = kwds.pop('stdout', None)
213    stderr = kwds.pop('stderr', None)
214    loop = kwds.pop('loop', None)
215    limit = kwds.pop('limit', streams._DEFAULT_LIMIT)
216    if loop is None:
217        loop = events.get_event_loop()
218    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
219                                                        loop=loop)
220    transport, protocol = yield From(loop.subprocess_exec(
221                                            protocol_factory,
222                                            program, *args,
223                                            stdin=stdin, stdout=stdout,
224                                            stderr=stderr, **kwds))
225    raise Return(Process(transport, protocol, loop))
226