1import pipes
2from functools import wraps, partial
3from io import TextIOWrapper
4from itertools import chain
5from subprocess import STDOUT, PIPE
6from tempfile import TemporaryFile
7from testfixtures.compat import basestring, PY3, zip_longest, reduce, PY2
8from testfixtures.utils import extend_docstring
9
10from .mock import Mock, call
11
12
13def shell_join(command):
14    if not isinstance(command, basestring):
15        command = " ".join(pipes.quote(part) for part in command)
16    return command
17
18
19class PopenBehaviour(object):
20    """
21    An object representing the behaviour of a :class:`MockPopen` when
22    simulating a particular command.
23    """
24
25    def __init__(self, stdout=b'', stderr=b'', returncode=0, pid=1234,
26                 poll_count=3):
27        self.stdout = stdout
28        self.stderr = stderr
29        self.returncode = returncode
30        self.pid = pid
31        self.poll_count = poll_count
32
33
34def record(func):
35    @wraps(func)
36    def recorder(self, *args, **kw):
37        self._record((func.__name__,), *args, **kw)
38        return func(self, *args, **kw)
39    return recorder
40
41
42class MockPopenInstance(object):
43    """
44    A mock process as returned by :class:`MockPopen`.
45    """
46
47    #: A :class:`~unittest.mock.Mock` representing the pipe into this process.
48    #: This is only set if ``stdin=PIPE`` is passed the constructor.
49    #: The mock records writes and closes in :attr:`MockPopen.all_calls`.
50    stdin = None
51
52    #: A file representing standard output from this process.
53    stdout = None
54
55    #: A file representing error output from this process.
56    stderr = None
57
58    def __init__(self, mock_class, root_call,
59                 args, bufsize=0, executable=None,
60                 stdin=None, stdout=None, stderr=None,
61                 preexec_fn=None, close_fds=False, shell=False, cwd=None,
62                 env=None, universal_newlines=False,
63                 startupinfo=None, creationflags=0, restore_signals=True,
64                 start_new_session=False, pass_fds=(),
65                 encoding=None, errors=None, text=None):
66        self.mock = Mock()
67        self.class_instance_mock = mock_class.mock.Popen_instance
68        #: A :func:`unittest.mock.call` representing the call made to instantiate
69        #: this mock process.
70        self.root_call = root_call
71        #: The calls made on this mock process, represented using
72        #: :func:`~unittest.mock.call` instances.
73        self.calls = []
74        self.all_calls = mock_class.all_calls
75
76        cmd = shell_join(args)
77
78        behaviour = mock_class.commands.get(cmd, mock_class.default_behaviour)
79        if behaviour is None:
80            raise KeyError('Nothing specified for command %r' % cmd)
81
82        if callable(behaviour):
83            behaviour = behaviour(command=cmd, stdin=stdin)
84
85        self.behaviour = behaviour
86
87        stdout_value = behaviour.stdout
88        stderr_value = behaviour.stderr
89
90        if stderr == STDOUT:
91            line_iterator = chain.from_iterable(zip_longest(
92                stdout_value.splitlines(True),
93                stderr_value.splitlines(True)
94            ))
95            stdout_value = b''.join(l for l in line_iterator if l)
96            stderr_value = None
97
98        self.poll_count = behaviour.poll_count
99        for name, option, mock_value in (
100            ('stdout', stdout, stdout_value),
101            ('stderr', stderr, stderr_value)
102        ):
103            value = None
104            if option is PIPE:
105                value = TemporaryFile()
106                value.write(mock_value)
107                value.flush()
108                value.seek(0)
109                if PY3 and (universal_newlines or text or encoding):
110                    value = TextIOWrapper(value, encoding=encoding, errors=errors)
111            setattr(self, name, value)
112
113        if stdin == PIPE:
114            self.stdin = Mock()
115            for method in 'write', 'close':
116                record_writes = partial(self._record, ('stdin', method))
117                getattr(self.stdin, method).side_effect = record_writes
118
119        self.pid = behaviour.pid
120        #: The return code of this mock process.
121        self.returncode = None
122        if PY3:
123            self.args = args
124
125    def _record(self, names, *args, **kw):
126        for mock in self.class_instance_mock, self.mock:
127            reduce(getattr, names, mock)(*args, **kw)
128        for base_call, store in (
129            (call, self.calls),
130            (self.root_call, self.all_calls)
131        ):
132            store.append(reduce(getattr, names, base_call)(*args, **kw))
133
134    if PY3:
135        def __enter__(self):
136            return self
137
138        def __exit__(self, exc_type, exc_val, exc_tb):
139            self.wait()
140            for stream in self.stdout, self.stderr:
141                if stream:
142                    stream.close()
143
144        @record
145        def wait(self, timeout=None):
146            "Simulate calls to :meth:`subprocess.Popen.wait`"
147            self.returncode = self.behaviour.returncode
148            return self.returncode
149
150        @record
151        def communicate(self, input=None, timeout=None):
152            "Simulate calls to :meth:`subprocess.Popen.communicate`"
153            self.returncode = self.behaviour.returncode
154            return (self.stdout and self.stdout.read(),
155                    self.stderr and self.stderr.read())
156    else:
157        @record
158        def wait(self):
159            "Simulate calls to :meth:`subprocess.Popen.wait`"
160            self.returncode = self.behaviour.returncode
161            return self.returncode
162
163        @record
164        def communicate(self, input=None):
165            "Simulate calls to :meth:`subprocess.Popen.communicate`"
166            self.returncode = self.behaviour.returncode
167            return (self.stdout and self.stdout.read(),
168                    self.stderr and self.stderr.read())
169
170    @record
171    def poll(self):
172        "Simulate calls to :meth:`subprocess.Popen.poll`"
173        while self.poll_count and self.returncode is None:
174            self.poll_count -= 1
175            return None
176        # This call to wait() is NOT how poll() behaves in reality.
177        # poll() NEVER sets the returncode.
178        # The returncode is *only* ever set by process completion.
179        # The following is an artifact of the fixture's implementation.
180        self.returncode = self.behaviour.returncode
181        return self.returncode
182
183    @record
184    def send_signal(self, signal):
185        "Simulate calls to :meth:`subprocess.Popen.send_signal`"
186        pass
187
188    @record
189    def terminate(self):
190        "Simulate calls to :meth:`subprocess.Popen.terminate`"
191        pass
192
193    @record
194    def kill(self):
195        "Simulate calls to :meth:`subprocess.Popen.kill`"
196        pass
197
198
199class MockPopen(object):
200    """
201    A specialised mock for testing use of :class:`subprocess.Popen`.
202    An instance of this class can be used in place of the
203    :class:`subprocess.Popen` and is often inserted where it's needed using
204    :func:`unittest.mock.patch` or a :class:`~testfixtures.Replacer`.
205    """
206
207    default_behaviour = None
208
209    def __init__(self):
210        self.commands = {}
211        self.mock = Mock()
212        #: All calls made using this mock and the objects it returns, represented using
213        #: :func:`~unittest.mock.call` instances.
214        self.all_calls = []
215
216    def _resolve_behaviour(self, stdout, stderr, returncode,
217                           pid, poll_count, behaviour):
218        if behaviour is None:
219            return PopenBehaviour(
220                stdout, stderr, returncode, pid, poll_count
221            )
222        else:
223            return behaviour
224
225    def set_command(self, command, stdout=b'', stderr=b'', returncode=0,
226                    pid=1234, poll_count=3, behaviour=None):
227        """
228        Set the behaviour of this mock when it is used to simulate the
229        specified command.
230
231        :param command: A string representing the command to be simulated.
232        """
233        self.commands[shell_join(command)] = self._resolve_behaviour(
234            stdout, stderr, returncode, pid, poll_count, behaviour
235        )
236
237    def set_default(self, stdout=b'', stderr=b'', returncode=0,
238                    pid=1234, poll_count=3, behaviour=None):
239        """
240        Set the behaviour of this mock when it is used to simulate commands
241        that have no explicit behavior specified using
242        :meth:`~MockPopen.set_command` or :meth:`~MockPopen.set_callable`.
243        """
244        self.default_behaviour = self._resolve_behaviour(
245            stdout, stderr, returncode, pid, poll_count, behaviour
246        )
247
248    def __call__(self, *args, **kw):
249        self.mock.Popen(*args, **kw)
250        root_call = call.Popen(*args, **kw)
251        self.all_calls.append(root_call)
252        return MockPopenInstance(self, root_call, *args, **kw)
253
254
255set_command_params = """
256:param stdout:
257    A string representing the simulated content written by the process
258    to the stdout pipe.
259:param stderr:
260    A string representing the simulated content written by the process
261    to the stderr pipe.
262:param returncode:
263    An integer representing the return code of the simulated process.
264:param pid:
265    An integer representing the process identifier of the simulated
266    process. This is useful if you have code the prints out the pids
267    of running processes.
268:param poll_count:
269    Specifies the number of times :meth:`MockPopen.poll` can be
270    called before :attr:`MockPopen.returncode` is set and returned
271    by :meth:`MockPopen.poll`.
272
273If supplied, ``behaviour`` must be either a :class:`PopenBehaviour`
274instance or a callable that takes the ``command`` string representing
275the command to be simulated and the ``stdin`` for that command and
276returns a :class:`PopenBehaviour` instance.
277"""
278
279
280# add the param docs, so we only have one copy of them!
281extend_docstring(set_command_params,
282                 [MockPopen.set_command, MockPopen.set_default])
283