1import os
2import sys
3import shutil
4import tempfile
5import contextlib
6import shlex
7
8from ._compat import iteritems, PY2, string_types
9
10
11# If someone wants to vendor click, we want to ensure the
12# correct package is discovered.  Ideally we could use a
13# relative import here but unfortunately Python does not
14# support that.
15clickpkg = sys.modules[__name__.rsplit('.', 1)[0]]
16
17
18if PY2:
19    from cStringIO import StringIO
20else:
21    import io
22    from ._compat import _find_binary_reader
23
24
25class EchoingStdin(object):
26
27    def __init__(self, input, output):
28        self._input = input
29        self._output = output
30
31    def __getattr__(self, x):
32        return getattr(self._input, x)
33
34    def _echo(self, rv):
35        self._output.write(rv)
36        return rv
37
38    def read(self, n=-1):
39        return self._echo(self._input.read(n))
40
41    def readline(self, n=-1):
42        return self._echo(self._input.readline(n))
43
44    def readlines(self):
45        return [self._echo(x) for x in self._input.readlines()]
46
47    def __iter__(self):
48        return iter(self._echo(x) for x in self._input)
49
50    def __repr__(self):
51        return repr(self._input)
52
53
54def make_input_stream(input, charset):
55    # Is already an input stream.
56    if hasattr(input, 'read'):
57        if PY2:
58            return input
59        rv = _find_binary_reader(input)
60        if rv is not None:
61            return rv
62        raise TypeError('Could not find binary reader for input stream.')
63
64    if input is None:
65        input = b''
66    elif not isinstance(input, bytes):
67        input = input.encode(charset)
68    if PY2:
69        return StringIO(input)
70    return io.BytesIO(input)
71
72
73class Result(object):
74    """Holds the captured result of an invoked CLI script."""
75
76    def __init__(self, runner, stdout_bytes, stderr_bytes, exit_code,
77                 exception, exc_info=None):
78        #: The runner that created the result
79        self.runner = runner
80        #: The standard output as bytes.
81        self.stdout_bytes = stdout_bytes
82        #: The standard error as bytes, or False(y) if not available
83        self.stderr_bytes = stderr_bytes
84        #: The exit code as integer.
85        self.exit_code = exit_code
86        #: The exception that happened if one did.
87        self.exception = exception
88        #: The traceback
89        self.exc_info = exc_info
90
91    @property
92    def output(self):
93        """The (standard) output as unicode string."""
94        return self.stdout
95
96    @property
97    def stdout(self):
98        """The standard output as unicode string."""
99        return self.stdout_bytes.decode(self.runner.charset, 'replace') \
100            .replace('\r\n', '\n')
101
102    @property
103    def stderr(self):
104        """The standard error as unicode string."""
105        if not self.stderr_bytes:
106            raise ValueError("stderr not separately captured")
107        return self.stderr_bytes.decode(self.runner.charset, 'replace') \
108            .replace('\r\n', '\n')
109
110
111    def __repr__(self):
112        return '<%s %s>' % (
113            type(self).__name__,
114            self.exception and repr(self.exception) or 'okay',
115        )
116
117
118class CliRunner(object):
119    """The CLI runner provides functionality to invoke a Click command line
120    script for unittesting purposes in a isolated environment.  This only
121    works in single-threaded systems without any concurrency as it changes the
122    global interpreter state.
123
124    :param charset: the character set for the input and output data.  This is
125                    UTF-8 by default and should not be changed currently as
126                    the reporting to Click only works in Python 2 properly.
127    :param env: a dictionary with environment variables for overriding.
128    :param echo_stdin: if this is set to `True`, then reading from stdin writes
129                       to stdout.  This is useful for showing examples in
130                       some circumstances.  Note that regular prompts
131                       will automatically echo the input.
132    :param mix_stderr: if this is set to `False`, then stdout and stderr are
133                       preserved as independent streams.  This is useful for
134                       Unix-philosophy apps that have predictable stdout and
135                       noisy stderr, such that each may be measured
136                       independently
137    """
138
139    def __init__(self, charset=None, env=None, echo_stdin=False,
140                 mix_stderr=True):
141        if charset is None:
142            charset = 'utf-8'
143        self.charset = charset
144        self.env = env or {}
145        self.echo_stdin = echo_stdin
146        self.mix_stderr = mix_stderr
147
148    def get_default_prog_name(self, cli):
149        """Given a command object it will return the default program name
150        for it.  The default is the `name` attribute or ``"root"`` if not
151        set.
152        """
153        return cli.name or 'root'
154
155    def make_env(self, overrides=None):
156        """Returns the environment overrides for invoking a script."""
157        rv = dict(self.env)
158        if overrides:
159            rv.update(overrides)
160        return rv
161
162    @contextlib.contextmanager
163    def isolation(self, input=None, env=None, color=False):
164        """A context manager that sets up the isolation for invoking of a
165        command line tool.  This sets up stdin with the given input data
166        and `os.environ` with the overrides from the given dictionary.
167        This also rebinds some internals in Click to be mocked (like the
168        prompt functionality).
169
170        This is automatically done in the :meth:`invoke` method.
171
172        .. versionadded:: 4.0
173           The ``color`` parameter was added.
174
175        :param input: the input stream to put into sys.stdin.
176        :param env: the environment overrides as dictionary.
177        :param color: whether the output should contain color codes. The
178                      application can still override this explicitly.
179        """
180        input = make_input_stream(input, self.charset)
181
182        old_stdin = sys.stdin
183        old_stdout = sys.stdout
184        old_stderr = sys.stderr
185        old_forced_width = clickpkg.formatting.FORCED_WIDTH
186        clickpkg.formatting.FORCED_WIDTH = 80
187
188        env = self.make_env(env)
189
190        if PY2:
191            bytes_output = StringIO()
192            if self.echo_stdin:
193                input = EchoingStdin(input, bytes_output)
194            sys.stdout = bytes_output
195            if not self.mix_stderr:
196                bytes_error = StringIO()
197                sys.stderr = bytes_error
198        else:
199            bytes_output = io.BytesIO()
200            if self.echo_stdin:
201                input = EchoingStdin(input, bytes_output)
202            input = io.TextIOWrapper(input, encoding=self.charset)
203            sys.stdout = io.TextIOWrapper(
204                bytes_output, encoding=self.charset)
205            if not self.mix_stderr:
206                bytes_error = io.BytesIO()
207                sys.stderr = io.TextIOWrapper(
208                    bytes_error, encoding=self.charset)
209
210        if self.mix_stderr:
211            sys.stderr = sys.stdout
212
213        sys.stdin = input
214
215        def visible_input(prompt=None):
216            sys.stdout.write(prompt or '')
217            val = input.readline().rstrip('\r\n')
218            sys.stdout.write(val + '\n')
219            sys.stdout.flush()
220            return val
221
222        def hidden_input(prompt=None):
223            sys.stdout.write((prompt or '') + '\n')
224            sys.stdout.flush()
225            return input.readline().rstrip('\r\n')
226
227        def _getchar(echo):
228            char = sys.stdin.read(1)
229            if echo:
230                sys.stdout.write(char)
231                sys.stdout.flush()
232            return char
233
234        default_color = color
235
236        def should_strip_ansi(stream=None, color=None):
237            if color is None:
238                return not default_color
239            return not color
240
241        old_visible_prompt_func = clickpkg.termui.visible_prompt_func
242        old_hidden_prompt_func = clickpkg.termui.hidden_prompt_func
243        old__getchar_func = clickpkg.termui._getchar
244        old_should_strip_ansi = clickpkg.utils.should_strip_ansi
245        clickpkg.termui.visible_prompt_func = visible_input
246        clickpkg.termui.hidden_prompt_func = hidden_input
247        clickpkg.termui._getchar = _getchar
248        clickpkg.utils.should_strip_ansi = should_strip_ansi
249
250        old_env = {}
251        try:
252            for key, value in iteritems(env):
253                old_env[key] = os.environ.get(key)
254                if value is None:
255                    try:
256                        del os.environ[key]
257                    except Exception:
258                        pass
259                else:
260                    os.environ[key] = value
261            yield (bytes_output, not self.mix_stderr and bytes_error)
262        finally:
263            for key, value in iteritems(old_env):
264                if value is None:
265                    try:
266                        del os.environ[key]
267                    except Exception:
268                        pass
269                else:
270                    os.environ[key] = value
271            sys.stdout = old_stdout
272            sys.stderr = old_stderr
273            sys.stdin = old_stdin
274            clickpkg.termui.visible_prompt_func = old_visible_prompt_func
275            clickpkg.termui.hidden_prompt_func = old_hidden_prompt_func
276            clickpkg.termui._getchar = old__getchar_func
277            clickpkg.utils.should_strip_ansi = old_should_strip_ansi
278            clickpkg.formatting.FORCED_WIDTH = old_forced_width
279
280    def invoke(self, cli, args=None, input=None, env=None,
281               catch_exceptions=True, color=False, mix_stderr=False, **extra):
282        """Invokes a command in an isolated environment.  The arguments are
283        forwarded directly to the command line script, the `extra` keyword
284        arguments are passed to the :meth:`~clickpkg.Command.main` function of
285        the command.
286
287        This returns a :class:`Result` object.
288
289        .. versionadded:: 3.0
290           The ``catch_exceptions`` parameter was added.
291
292        .. versionchanged:: 3.0
293           The result object now has an `exc_info` attribute with the
294           traceback if available.
295
296        .. versionadded:: 4.0
297           The ``color`` parameter was added.
298
299        :param cli: the command to invoke
300        :param args: the arguments to invoke. It may be given as an iterable
301                     or a string. When given as string it will be interpreted
302                     as a Unix shell command. More details at
303                     :func:`shlex.split`.
304        :param input: the input data for `sys.stdin`.
305        :param env: the environment overrides.
306        :param catch_exceptions: Whether to catch any other exceptions than
307                                 ``SystemExit``.
308        :param extra: the keyword arguments to pass to :meth:`main`.
309        :param color: whether the output should contain color codes. The
310                      application can still override this explicitly.
311        """
312        exc_info = None
313        with self.isolation(input=input, env=env, color=color) as outstreams:
314            exception = None
315            exit_code = 0
316
317            if isinstance(args, string_types):
318                args = shlex.split(args)
319
320            try:
321                prog_name = extra.pop("prog_name")
322            except KeyError:
323                prog_name = self.get_default_prog_name(cli)
324
325            try:
326                cli.main(args=args or (), prog_name=prog_name, **extra)
327            except SystemExit as e:
328                exc_info = sys.exc_info()
329                exit_code = e.code
330                if exit_code is None:
331                    exit_code = 0
332
333                if exit_code != 0:
334                    exception = e
335
336                if not isinstance(exit_code, int):
337                    sys.stdout.write(str(exit_code))
338                    sys.stdout.write('\n')
339                    exit_code = 1
340
341            except Exception as e:
342                if not catch_exceptions:
343                    raise
344                exception = e
345                exit_code = 1
346                exc_info = sys.exc_info()
347            finally:
348                sys.stdout.flush()
349                stdout = outstreams[0].getvalue()
350                stderr = outstreams[1] and outstreams[1].getvalue()
351
352        return Result(runner=self,
353                      stdout_bytes=stdout,
354                      stderr_bytes=stderr,
355                      exit_code=exit_code,
356                      exception=exception,
357                      exc_info=exc_info)
358
359    @contextlib.contextmanager
360    def isolated_filesystem(self):
361        """A context manager that creates a temporary folder and changes
362        the current working directory to it for isolated filesystem tests.
363        """
364        cwd = os.getcwd()
365        t = tempfile.mkdtemp()
366        os.chdir(t)
367        try:
368            yield t
369        finally:
370            os.chdir(cwd)
371            try:
372                shutil.rmtree(t)
373            except (OSError, IOError):
374                pass
375