1import contextlib
2import os
3import shlex
4import shutil
5import sys
6import tempfile
7
8from . import formatting
9from . import termui
10from . import utils
11from ._compat import iteritems
12from ._compat import PY2
13from ._compat import string_types
14
15
16if PY2:
17    from cStringIO import StringIO
18else:
19    import io
20    from ._compat import _find_binary_reader
21
22
23class EchoingStdin(object):
24    def __init__(self, input, output):
25        self._input = input
26        self._output = output
27
28    def __getattr__(self, x):
29        return getattr(self._input, x)
30
31    def _echo(self, rv):
32        self._output.write(rv)
33        return rv
34
35    def read(self, n=-1):
36        return self._echo(self._input.read(n))
37
38    def readline(self, n=-1):
39        return self._echo(self._input.readline(n))
40
41    def readlines(self):
42        return [self._echo(x) for x in self._input.readlines()]
43
44    def __iter__(self):
45        return iter(self._echo(x) for x in self._input)
46
47    def __repr__(self):
48        return repr(self._input)
49
50
51def make_input_stream(input, charset):
52    # Is already an input stream.
53    if hasattr(input, "read"):
54        if PY2:
55            return input
56        rv = _find_binary_reader(input)
57        if rv is not None:
58            return rv
59        raise TypeError("Could not find binary reader for input stream.")
60
61    if input is None:
62        input = b""
63    elif not isinstance(input, bytes):
64        input = input.encode(charset)
65    if PY2:
66        return StringIO(input)
67    return io.BytesIO(input)
68
69
70class Result(object):
71    """Holds the captured result of an invoked CLI script."""
72
73    def __init__(
74        self, runner, stdout_bytes, stderr_bytes, exit_code, exception, exc_info=None
75    ):
76        #: The runner that created the result
77        self.runner = runner
78        #: The standard output as bytes.
79        self.stdout_bytes = stdout_bytes
80        #: The standard error as bytes, or None if not available
81        self.stderr_bytes = stderr_bytes
82        #: The exit code as integer.
83        self.exit_code = exit_code
84        #: The exception that happened if one did.
85        self.exception = exception
86        #: The traceback
87        self.exc_info = exc_info
88
89    @property
90    def output(self):
91        """The (standard) output as unicode string."""
92        return self.stdout
93
94    @property
95    def stdout(self):
96        """The standard output as unicode string."""
97        return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
98            "\r\n", "\n"
99        )
100
101    @property
102    def stderr(self):
103        """The standard error as unicode string."""
104        if self.stderr_bytes is None:
105            raise ValueError("stderr not separately captured")
106        return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
107            "\r\n", "\n"
108        )
109
110    def __repr__(self):
111        return "<{} {}>".format(
112            type(self).__name__, repr(self.exception) if self.exception else "okay"
113        )
114
115
116class CliRunner(object):
117    """The CLI runner provides functionality to invoke a Click command line
118    script for unittesting purposes in a isolated environment.  This only
119    works in single-threaded systems without any concurrency as it changes the
120    global interpreter state.
121
122    :param charset: the character set for the input and output data.  This is
123                    UTF-8 by default and should not be changed currently as
124                    the reporting to Click only works in Python 2 properly.
125    :param env: a dictionary with environment variables for overriding.
126    :param echo_stdin: if this is set to `True`, then reading from stdin writes
127                       to stdout.  This is useful for showing examples in
128                       some circumstances.  Note that regular prompts
129                       will automatically echo the input.
130    :param mix_stderr: if this is set to `False`, then stdout and stderr are
131                       preserved as independent streams.  This is useful for
132                       Unix-philosophy apps that have predictable stdout and
133                       noisy stderr, such that each may be measured
134                       independently
135    """
136
137    def __init__(self, charset=None, env=None, echo_stdin=False, mix_stderr=True):
138        if charset is None:
139            charset = "utf-8"
140        self.charset = charset
141        self.env = env or {}
142        self.echo_stdin = echo_stdin
143        self.mix_stderr = mix_stderr
144
145    def get_default_prog_name(self, cli):
146        """Given a command object it will return the default program name
147        for it.  The default is the `name` attribute or ``"root"`` if not
148        set.
149        """
150        return cli.name or "root"
151
152    def make_env(self, overrides=None):
153        """Returns the environment overrides for invoking a script."""
154        rv = dict(self.env)
155        if overrides:
156            rv.update(overrides)
157        return rv
158
159    @contextlib.contextmanager
160    def isolation(self, input=None, env=None, color=False):
161        """A context manager that sets up the isolation for invoking of a
162        command line tool.  This sets up stdin with the given input data
163        and `os.environ` with the overrides from the given dictionary.
164        This also rebinds some internals in Click to be mocked (like the
165        prompt functionality).
166
167        This is automatically done in the :meth:`invoke` method.
168
169        .. versionadded:: 4.0
170           The ``color`` parameter was added.
171
172        :param input: the input stream to put into sys.stdin.
173        :param env: the environment overrides as dictionary.
174        :param color: whether the output should contain color codes. The
175                      application can still override this explicitly.
176        """
177        input = make_input_stream(input, self.charset)
178
179        old_stdin = sys.stdin
180        old_stdout = sys.stdout
181        old_stderr = sys.stderr
182        old_forced_width = formatting.FORCED_WIDTH
183        formatting.FORCED_WIDTH = 80
184
185        env = self.make_env(env)
186
187        if PY2:
188            bytes_output = StringIO()
189            if self.echo_stdin:
190                input = EchoingStdin(input, bytes_output)
191            sys.stdout = bytes_output
192            if not self.mix_stderr:
193                bytes_error = StringIO()
194                sys.stderr = bytes_error
195        else:
196            bytes_output = io.BytesIO()
197            if self.echo_stdin:
198                input = EchoingStdin(input, bytes_output)
199            input = io.TextIOWrapper(input, encoding=self.charset)
200            sys.stdout = io.TextIOWrapper(bytes_output, encoding=self.charset)
201            if not self.mix_stderr:
202                bytes_error = io.BytesIO()
203                sys.stderr = io.TextIOWrapper(bytes_error, encoding=self.charset)
204
205        if self.mix_stderr:
206            sys.stderr = sys.stdout
207
208        sys.stdin = input
209
210        def visible_input(prompt=None):
211            sys.stdout.write(prompt or "")
212            val = input.readline().rstrip("\r\n")
213            sys.stdout.write("{}\n".format(val))
214            sys.stdout.flush()
215            return val
216
217        def hidden_input(prompt=None):
218            sys.stdout.write("{}\n".format(prompt or ""))
219            sys.stdout.flush()
220            return input.readline().rstrip("\r\n")
221
222        def _getchar(echo):
223            char = sys.stdin.read(1)
224            if echo:
225                sys.stdout.write(char)
226                sys.stdout.flush()
227            return char
228
229        default_color = color
230
231        def should_strip_ansi(stream=None, color=None):
232            if color is None:
233                return not default_color
234            return not color
235
236        old_visible_prompt_func = termui.visible_prompt_func
237        old_hidden_prompt_func = termui.hidden_prompt_func
238        old__getchar_func = termui._getchar
239        old_should_strip_ansi = utils.should_strip_ansi
240        termui.visible_prompt_func = visible_input
241        termui.hidden_prompt_func = hidden_input
242        termui._getchar = _getchar
243        utils.should_strip_ansi = should_strip_ansi
244
245        old_env = {}
246        try:
247            for key, value in iteritems(env):
248                old_env[key] = os.environ.get(key)
249                if value is None:
250                    try:
251                        del os.environ[key]
252                    except Exception:
253                        pass
254                else:
255                    os.environ[key] = value
256            yield (bytes_output, not self.mix_stderr and bytes_error)
257        finally:
258            for key, value in iteritems(old_env):
259                if value is None:
260                    try:
261                        del os.environ[key]
262                    except Exception:
263                        pass
264                else:
265                    os.environ[key] = value
266            sys.stdout = old_stdout
267            sys.stderr = old_stderr
268            sys.stdin = old_stdin
269            termui.visible_prompt_func = old_visible_prompt_func
270            termui.hidden_prompt_func = old_hidden_prompt_func
271            termui._getchar = old__getchar_func
272            utils.should_strip_ansi = old_should_strip_ansi
273            formatting.FORCED_WIDTH = old_forced_width
274
275    def invoke(
276        self,
277        cli,
278        args=None,
279        input=None,
280        env=None,
281        catch_exceptions=True,
282        color=False,
283        **extra
284    ):
285        """Invokes a command in an isolated environment.  The arguments are
286        forwarded directly to the command line script, the `extra` keyword
287        arguments are passed to the :meth:`~clickpkg.Command.main` function of
288        the command.
289
290        This returns a :class:`Result` object.
291
292        .. versionadded:: 3.0
293           The ``catch_exceptions`` parameter was added.
294
295        .. versionchanged:: 3.0
296           The result object now has an `exc_info` attribute with the
297           traceback if available.
298
299        .. versionadded:: 4.0
300           The ``color`` parameter was added.
301
302        :param cli: the command to invoke
303        :param args: the arguments to invoke. It may be given as an iterable
304                     or a string. When given as string it will be interpreted
305                     as a Unix shell command. More details at
306                     :func:`shlex.split`.
307        :param input: the input data for `sys.stdin`.
308        :param env: the environment overrides.
309        :param catch_exceptions: Whether to catch any other exceptions than
310                                 ``SystemExit``.
311        :param extra: the keyword arguments to pass to :meth:`main`.
312        :param color: whether the output should contain color codes. The
313                      application can still override this explicitly.
314        """
315        exc_info = None
316        with self.isolation(input=input, env=env, color=color) as outstreams:
317            exception = None
318            exit_code = 0
319
320            if isinstance(args, string_types):
321                args = shlex.split(args)
322
323            try:
324                prog_name = extra.pop("prog_name")
325            except KeyError:
326                prog_name = self.get_default_prog_name(cli)
327
328            try:
329                cli.main(args=args or (), prog_name=prog_name, **extra)
330            except SystemExit as e:
331                exc_info = sys.exc_info()
332                exit_code = e.code
333                if exit_code is None:
334                    exit_code = 0
335
336                if exit_code != 0:
337                    exception = e
338
339                if not isinstance(exit_code, int):
340                    sys.stdout.write(str(exit_code))
341                    sys.stdout.write("\n")
342                    exit_code = 1
343
344            except Exception as e:
345                if not catch_exceptions:
346                    raise
347                exception = e
348                exit_code = 1
349                exc_info = sys.exc_info()
350            finally:
351                sys.stdout.flush()
352                stdout = outstreams[0].getvalue()
353                if self.mix_stderr:
354                    stderr = None
355                else:
356                    stderr = outstreams[1].getvalue()
357
358        return Result(
359            runner=self,
360            stdout_bytes=stdout,
361            stderr_bytes=stderr,
362            exit_code=exit_code,
363            exception=exception,
364            exc_info=exc_info,
365        )
366
367    @contextlib.contextmanager
368    def isolated_filesystem(self):
369        """A context manager that creates a temporary folder and changes
370        the current working directory to it for isolated filesystem tests.
371        """
372        cwd = os.getcwd()
373        t = tempfile.mkdtemp()
374        os.chdir(t)
375        try:
376            yield t
377        finally:
378            os.chdir(cwd)
379            try:
380                shutil.rmtree(t)
381            except (OSError, IOError):  # noqa: B014
382                pass
383