1import os
2import sys
3import shutil
4import tempfile
5import contextlib
6import shlex
8from ._compat import iteritems, PY2, string_types
11# If someone wants to vendor click, we want to ensure the
12# correct package is discovered.  Ideally we could use a
13# relative import here but unfortunately Python does not
14# support that.
15clickpkg = sys.modules[__name__.rsplit('.', 1)[0]]
18if PY2:
19    from cStringIO import StringIO
21    import io
22    from ._compat import _find_binary_reader
25class EchoingStdin(object):
27    def __init__(self, input, output):
28        self._input = input
29        self._output = output
31    def __getattr__(self, x):
32        return getattr(self._input, x)
34    def _echo(self, rv):
35        self._output.write(rv)
36        return rv
38    def read(self, n=-1):
39        return self._echo(self._input.read(n))
41    def readline(self, n=-1):
42        return self._echo(self._input.readline(n))
44    def readlines(self):
45        return [self._echo(x) for x in self._input.readlines()]
47    def __iter__(self):
48        return iter(self._echo(x) for x in self._input)
50    def __repr__(self):
51        return repr(self._input)
54def make_input_stream(input, charset):
55    # Is already an input stream.
56    if hasattr(input, 'read'):
57        if PY2:
58            return input
59        rv = _find_binary_reader(input)
60        if rv is not None:
61            return rv
62        raise TypeError('Could not find binary reader for input stream.')
64    if input is None:
65        input = b''
66    elif not isinstance(input, bytes):
67        input = input.encode(charset)
68    if PY2:
69        return StringIO(input)
70    return io.BytesIO(input)
73class Result(object):
74    """Holds the captured result of an invoked CLI script."""
76    def __init__(self, runner, stdout_bytes, stderr_bytes, exit_code,
77                 exception, exc_info=None):
78        #: The runner that created the result
79        self.runner = runner
80        #: The standard output as bytes.
81        self.stdout_bytes = stdout_bytes
82        #: The standard error as bytes, or False(y) if not available
83        self.stderr_bytes = stderr_bytes
84        #: The exit code as integer.
85        self.exit_code = exit_code
86        #: The exception that happened if one did.
87        self.exception = exception
88        #: The traceback
89        self.exc_info = exc_info
91    @property
92    def output(self):
93        """The (standard) output as unicode string."""
94        return self.stdout
96    @property
97    def stdout(self):
98        """The standard output as unicode string."""
99        return self.stdout_bytes.decode(self.runner.charset, 'replace') \
100            .replace('\r\n', '\n')
102    @property
103    def stderr(self):
104        """The standard error as unicode string."""
105        if not self.stderr_bytes:
106            raise ValueError("stderr not separately captured")
107        return self.stderr_bytes.decode(self.runner.charset, 'replace') \
108            .replace('\r\n', '\n')
111    def __repr__(self):
112        return '<%s %s>' % (
113            type(self).__name__,
114            self.exception and repr(self.exception) or 'okay',
115        )
118class CliRunner(object):
119    """The CLI runner provides functionality to invoke a Click command line
120    script for unittesting purposes in a isolated environment.  This only
121    works in single-threaded systems without any concurrency as it changes the
122    global interpreter state.
124    :param charset: the character set for the input and output data.  This is
125                    UTF-8 by default and should not be changed currently as
126                    the reporting to Click only works in Python 2 properly.
127    :param env: a dictionary with environment variables for overriding.
128    :param echo_stdin: if this is set to `True`, then reading from stdin writes
129                       to stdout.  This is useful for showing examples in
130                       some circumstances.  Note that regular prompts
131                       will automatically echo the input.
132    :param mix_stderr: if this is set to `False`, then stdout and stderr are
133                       preserved as independent streams.  This is useful for
134                       Unix-philosophy apps that have predictable stdout and
135                       noisy stderr, such that each may be measured
136                       independently
137    """
139    def __init__(self, charset=None, env=None, echo_stdin=False,
140                 mix_stderr=True):
141        if charset is None:
142            charset = 'utf-8'
143        self.charset = charset
144        self.env = env or {}
145        self.echo_stdin = echo_stdin
146        self.mix_stderr = mix_stderr
148    def get_default_prog_name(self, cli):
149        """Given a command object it will return the default program name
150        for it.  The default is the `name` attribute or ``"root"`` if not
151        set.
152        """
153        return cli.name or 'root'
155    def make_env(self, overrides=None):
156        """Returns the environment overrides for invoking a script."""
157        rv = dict(self.env)
158        if overrides:
159            rv.update(overrides)
160        return rv
162    @contextlib.contextmanager
163    def isolation(self, input=None, env=None, color=False):
164        """A context manager that sets up the isolation for invoking of a
165        command line tool.  This sets up stdin with the given input data
166        and `os.environ` with the overrides from the given dictionary.
167        This also rebinds some internals in Click to be mocked (like the
168        prompt functionality).
170        This is automatically done in the :meth:`invoke` method.
172        .. versionadded:: 4.0
173           The ``color`` parameter was added.
175        :param input: the input stream to put into sys.stdin.
176        :param env: the environment overrides as dictionary.
177        :param color: whether the output should contain color codes. The
178                      application can still override this explicitly.
179        """
180        input = make_input_stream(input, self.charset)
182        old_stdin = sys.stdin
183        old_stdout = sys.stdout
184        old_stderr = sys.stderr
185        old_forced_width = clickpkg.formatting.FORCED_WIDTH
186        clickpkg.formatting.FORCED_WIDTH = 80
188        env = self.make_env(env)
190        if PY2:
191            bytes_output = StringIO()
192            if self.echo_stdin:
193                input = EchoingStdin(input, bytes_output)
194            sys.stdout = bytes_output
195            if not self.mix_stderr:
196                bytes_error = StringIO()
197                sys.stderr = bytes_error
198        else:
199            bytes_output = io.BytesIO()
200            if self.echo_stdin:
201                input = EchoingStdin(input, bytes_output)
202            input = io.TextIOWrapper(input, encoding=self.charset)
203            sys.stdout = io.TextIOWrapper(
204                bytes_output, encoding=self.charset)
205            if not self.mix_stderr:
206                bytes_error = io.BytesIO()
207                sys.stderr = io.TextIOWrapper(
208                    bytes_error, encoding=self.charset)
210        if self.mix_stderr:
211            sys.stderr = sys.stdout
213        sys.stdin = input
215        def visible_input(prompt=None):
216            sys.stdout.write(prompt or '')
217            val = input.readline().rstrip('\r\n')
218            sys.stdout.write(val + '\n')
219            sys.stdout.flush()
220            return val
222        def hidden_input(prompt=None):
223            sys.stdout.write((prompt or '') + '\n')
224            sys.stdout.flush()
225            return input.readline().rstrip('\r\n')
227        def _getchar(echo):
228            char = sys.stdin.read(1)
229            if echo:
230                sys.stdout.write(char)
231                sys.stdout.flush()
232            return char
234        default_color = color
236        def should_strip_ansi(stream=None, color=None):
237            if color is None:
238                return not default_color
239            return not color
241        old_visible_prompt_func = clickpkg.termui.visible_prompt_func
242        old_hidden_prompt_func = clickpkg.termui.hidden_prompt_func
243        old__getchar_func = clickpkg.termui._getchar
244        old_should_strip_ansi = clickpkg.utils.should_strip_ansi
245        clickpkg.termui.visible_prompt_func = visible_input
246        clickpkg.termui.hidden_prompt_func = hidden_input
247        clickpkg.termui._getchar = _getchar
248        clickpkg.utils.should_strip_ansi = should_strip_ansi
250        old_env = {}
251        try:
252            for key, value in iteritems(env):
253                old_env[key] = os.environ.get(key)
254                if value is None:
255                    try:
256                        del os.environ[key]
257                    except Exception:
258                        pass
259                else:
260                    os.environ[key] = value
261            yield (bytes_output, not self.mix_stderr and bytes_error)
262        finally:
263            for key, value in iteritems(old_env):
264                if value is None:
265                    try:
266                        del os.environ[key]
267                    except Exception:
268                        pass
269                else:
270                    os.environ[key] = value
271            sys.stdout = old_stdout
272            sys.stderr = old_stderr
273            sys.stdin = old_stdin
274            clickpkg.termui.visible_prompt_func = old_visible_prompt_func
275            clickpkg.termui.hidden_prompt_func = old_hidden_prompt_func
276            clickpkg.termui._getchar = old__getchar_func
277            clickpkg.utils.should_strip_ansi = old_should_strip_ansi
278            clickpkg.formatting.FORCED_WIDTH = old_forced_width
280    def invoke(self, cli, args=None, input=None, env=None,
281               catch_exceptions=True, color=False, mix_stderr=False, **extra):
282        """Invokes a command in an isolated environment.  The arguments are
283        forwarded directly to the command line script, the `extra` keyword
284        arguments are passed to the :meth:`~clickpkg.Command.main` function of
285        the command.
287        This returns a :class:`Result` object.
289        .. versionadded:: 3.0
290           The ``catch_exceptions`` parameter was added.
292        .. versionchanged:: 3.0
293           The result object now has an `exc_info` attribute with the
294           traceback if available.
296        .. versionadded:: 4.0
297           The ``color`` parameter was added.
299        :param cli: the command to invoke
300        :param args: the arguments to invoke. It may be given as an iterable
301                     or a string. When given as string it will be interpreted
302                     as a Unix shell command. More details at
303                     :func:`shlex.split`.
304        :param input: the input data for `sys.stdin`.
305        :param env: the environment overrides.
306        :param catch_exceptions: Whether to catch any other exceptions than
307                                 ``SystemExit``.
308        :param extra: the keyword arguments to pass to :meth:`main`.
309        :param color: whether the output should contain color codes. The
310                      application can still override this explicitly.
311        """
312        exc_info = None
313        with self.isolation(input=input, env=env, color=color) as outstreams:
314            exception = None
315            exit_code = 0
317            if isinstance(args, string_types):
318                args = shlex.split(args)
320            try:
321                prog_name = extra.pop("prog_name")
322            except KeyError:
323                prog_name = self.get_default_prog_name(cli)
325            try:
326                cli.main(args=args or (), prog_name=prog_name, **extra)
327            except SystemExit as e:
328                exc_info = sys.exc_info()
329                exit_code = e.code
330                if exit_code is None:
331                    exit_code = 0
333                if exit_code != 0:
334                    exception = e
336                if not isinstance(exit_code, int):
337                    sys.stdout.write(str(exit_code))
338                    sys.stdout.write('\n')
339                    exit_code = 1
341            except Exception as e:
342                if not catch_exceptions:
343                    raise
344                exception = e
345                exit_code = 1
346                exc_info = sys.exc_info()
347            finally:
348                sys.stdout.flush()
349                stdout = outstreams[0].getvalue()
350                stderr = outstreams[1] and outstreams[1].getvalue()
352        return Result(runner=self,
353                      stdout_bytes=stdout,
354                      stderr_bytes=stderr,
355                      exit_code=exit_code,
356                      exception=exception,
357                      exc_info=exc_info)
359    @contextlib.contextmanager
360    def isolated_filesystem(self):
361        """A context manager that creates a temporary folder and changes
362        the current working directory to it for isolated filesystem tests.
363        """
364        cwd = os.getcwd()
365        t = tempfile.mkdtemp()
366        os.chdir(t)
367        try:
368            yield t
369        finally:
370            os.chdir(cwd)
371            try:
372                shutil.rmtree(t)
373            except (OSError, IOError):
374                pass