1import contextlib 2import os 3import shlex 4import shutil 5import sys 6import tempfile 7 8from . import formatting 9from . import termui 10from . import utils 11from ._compat import iteritems 12from ._compat import PY2 13from ._compat import string_types 14 15 16if PY2: 17 from cStringIO import StringIO 18else: 19 import io 20 from ._compat import _find_binary_reader 21 22 23class EchoingStdin(object): 24 def __init__(self, input, output): 25 self._input = input 26 self._output = output 27 28 def __getattr__(self, x): 29 return getattr(self._input, x) 30 31 def _echo(self, rv): 32 self._output.write(rv) 33 return rv 34 35 def read(self, n=-1): 36 return self._echo(self._input.read(n)) 37 38 def readline(self, n=-1): 39 return self._echo(self._input.readline(n)) 40 41 def readlines(self): 42 return [self._echo(x) for x in self._input.readlines()] 43 44 def __iter__(self): 45 return iter(self._echo(x) for x in self._input) 46 47 def __repr__(self): 48 return repr(self._input) 49 50 51def make_input_stream(input, charset): 52 # Is already an input stream. 53 if hasattr(input, "read"): 54 if PY2: 55 return input 56 rv = _find_binary_reader(input) 57 if rv is not None: 58 return rv 59 raise TypeError("Could not find binary reader for input stream.") 60 61 if input is None: 62 input = b"" 63 elif not isinstance(input, bytes): 64 input = input.encode(charset) 65 if PY2: 66 return StringIO(input) 67 return io.BytesIO(input) 68 69 70class Result(object): 71 """Holds the captured result of an invoked CLI script.""" 72 73 def __init__( 74 self, runner, stdout_bytes, stderr_bytes, exit_code, exception, exc_info=None 75 ): 76 #: The runner that created the result 77 self.runner = runner 78 #: The standard output as bytes. 79 self.stdout_bytes = stdout_bytes 80 #: The standard error as bytes, or None if not available 81 self.stderr_bytes = stderr_bytes 82 #: The exit code as integer. 83 self.exit_code = exit_code 84 #: The exception that happened if one did. 85 self.exception = exception 86 #: The traceback 87 self.exc_info = exc_info 88 89 @property 90 def output(self): 91 """The (standard) output as unicode string.""" 92 return self.stdout 93 94 @property 95 def stdout(self): 96 """The standard output as unicode string.""" 97 return self.stdout_bytes.decode(self.runner.charset, "replace").replace( 98 "\r\n", "\n" 99 ) 100 101 @property 102 def stderr(self): 103 """The standard error as unicode string.""" 104 if self.stderr_bytes is None: 105 raise ValueError("stderr not separately captured") 106 return self.stderr_bytes.decode(self.runner.charset, "replace").replace( 107 "\r\n", "\n" 108 ) 109 110 def __repr__(self): 111 return "<{} {}>".format( 112 type(self).__name__, repr(self.exception) if self.exception else "okay" 113 ) 114 115 116class CliRunner(object): 117 """The CLI runner provides functionality to invoke a Click command line 118 script for unittesting purposes in a isolated environment. This only 119 works in single-threaded systems without any concurrency as it changes the 120 global interpreter state. 121 122 :param charset: the character set for the input and output data. This is 123 UTF-8 by default and should not be changed currently as 124 the reporting to Click only works in Python 2 properly. 125 :param env: a dictionary with environment variables for overriding. 126 :param echo_stdin: if this is set to `True`, then reading from stdin writes 127 to stdout. This is useful for showing examples in 128 some circumstances. Note that regular prompts 129 will automatically echo the input. 130 :param mix_stderr: if this is set to `False`, then stdout and stderr are 131 preserved as independent streams. This is useful for 132 Unix-philosophy apps that have predictable stdout and 133 noisy stderr, such that each may be measured 134 independently 135 """ 136 137 def __init__(self, charset=None, env=None, echo_stdin=False, mix_stderr=True): 138 if charset is None: 139 charset = "utf-8" 140 self.charset = charset 141 self.env = env or {} 142 self.echo_stdin = echo_stdin 143 self.mix_stderr = mix_stderr 144 145 def get_default_prog_name(self, cli): 146 """Given a command object it will return the default program name 147 for it. The default is the `name` attribute or ``"root"`` if not 148 set. 149 """ 150 return cli.name or "root" 151 152 def make_env(self, overrides=None): 153 """Returns the environment overrides for invoking a script.""" 154 rv = dict(self.env) 155 if overrides: 156 rv.update(overrides) 157 return rv 158 159 @contextlib.contextmanager 160 def isolation(self, input=None, env=None, color=False): 161 """A context manager that sets up the isolation for invoking of a 162 command line tool. This sets up stdin with the given input data 163 and `os.environ` with the overrides from the given dictionary. 164 This also rebinds some internals in Click to be mocked (like the 165 prompt functionality). 166 167 This is automatically done in the :meth:`invoke` method. 168 169 .. versionadded:: 4.0 170 The ``color`` parameter was added. 171 172 :param input: the input stream to put into sys.stdin. 173 :param env: the environment overrides as dictionary. 174 :param color: whether the output should contain color codes. The 175 application can still override this explicitly. 176 """ 177 input = make_input_stream(input, self.charset) 178 179 old_stdin = sys.stdin 180 old_stdout = sys.stdout 181 old_stderr = sys.stderr 182 old_forced_width = formatting.FORCED_WIDTH 183 formatting.FORCED_WIDTH = 80 184 185 env = self.make_env(env) 186 187 if PY2: 188 bytes_output = StringIO() 189 if self.echo_stdin: 190 input = EchoingStdin(input, bytes_output) 191 sys.stdout = bytes_output 192 if not self.mix_stderr: 193 bytes_error = StringIO() 194 sys.stderr = bytes_error 195 else: 196 bytes_output = io.BytesIO() 197 if self.echo_stdin: 198 input = EchoingStdin(input, bytes_output) 199 input = io.TextIOWrapper(input, encoding=self.charset) 200 sys.stdout = io.TextIOWrapper(bytes_output, encoding=self.charset) 201 if not self.mix_stderr: 202 bytes_error = io.BytesIO() 203 sys.stderr = io.TextIOWrapper(bytes_error, encoding=self.charset) 204 205 if self.mix_stderr: 206 sys.stderr = sys.stdout 207 208 sys.stdin = input 209 210 def visible_input(prompt=None): 211 sys.stdout.write(prompt or "") 212 val = input.readline().rstrip("\r\n") 213 sys.stdout.write("{}\n".format(val)) 214 sys.stdout.flush() 215 return val 216 217 def hidden_input(prompt=None): 218 sys.stdout.write("{}\n".format(prompt or "")) 219 sys.stdout.flush() 220 return input.readline().rstrip("\r\n") 221 222 def _getchar(echo): 223 char = sys.stdin.read(1) 224 if echo: 225 sys.stdout.write(char) 226 sys.stdout.flush() 227 return char 228 229 default_color = color 230 231 def should_strip_ansi(stream=None, color=None): 232 if color is None: 233 return not default_color 234 return not color 235 236 old_visible_prompt_func = termui.visible_prompt_func 237 old_hidden_prompt_func = termui.hidden_prompt_func 238 old__getchar_func = termui._getchar 239 old_should_strip_ansi = utils.should_strip_ansi 240 termui.visible_prompt_func = visible_input 241 termui.hidden_prompt_func = hidden_input 242 termui._getchar = _getchar 243 utils.should_strip_ansi = should_strip_ansi 244 245 old_env = {} 246 try: 247 for key, value in iteritems(env): 248 old_env[key] = os.environ.get(key) 249 if value is None: 250 try: 251 del os.environ[key] 252 except Exception: 253 pass 254 else: 255 os.environ[key] = value 256 yield (bytes_output, not self.mix_stderr and bytes_error) 257 finally: 258 for key, value in iteritems(old_env): 259 if value is None: 260 try: 261 del os.environ[key] 262 except Exception: 263 pass 264 else: 265 os.environ[key] = value 266 sys.stdout = old_stdout 267 sys.stderr = old_stderr 268 sys.stdin = old_stdin 269 termui.visible_prompt_func = old_visible_prompt_func 270 termui.hidden_prompt_func = old_hidden_prompt_func 271 termui._getchar = old__getchar_func 272 utils.should_strip_ansi = old_should_strip_ansi 273 formatting.FORCED_WIDTH = old_forced_width 274 275 def invoke( 276 self, 277 cli, 278 args=None, 279 input=None, 280 env=None, 281 catch_exceptions=True, 282 color=False, 283 **extra 284 ): 285 """Invokes a command in an isolated environment. The arguments are 286 forwarded directly to the command line script, the `extra` keyword 287 arguments are passed to the :meth:`~clickpkg.Command.main` function of 288 the command. 289 290 This returns a :class:`Result` object. 291 292 .. versionadded:: 3.0 293 The ``catch_exceptions`` parameter was added. 294 295 .. versionchanged:: 3.0 296 The result object now has an `exc_info` attribute with the 297 traceback if available. 298 299 .. versionadded:: 4.0 300 The ``color`` parameter was added. 301 302 :param cli: the command to invoke 303 :param args: the arguments to invoke. It may be given as an iterable 304 or a string. When given as string it will be interpreted 305 as a Unix shell command. More details at 306 :func:`shlex.split`. 307 :param input: the input data for `sys.stdin`. 308 :param env: the environment overrides. 309 :param catch_exceptions: Whether to catch any other exceptions than 310 ``SystemExit``. 311 :param extra: the keyword arguments to pass to :meth:`main`. 312 :param color: whether the output should contain color codes. The 313 application can still override this explicitly. 314 """ 315 exc_info = None 316 with self.isolation(input=input, env=env, color=color) as outstreams: 317 exception = None 318 exit_code = 0 319 320 if isinstance(args, string_types): 321 args = shlex.split(args) 322 323 try: 324 prog_name = extra.pop("prog_name") 325 except KeyError: 326 prog_name = self.get_default_prog_name(cli) 327 328 try: 329 cli.main(args=args or (), prog_name=prog_name, **extra) 330 except SystemExit as e: 331 exc_info = sys.exc_info() 332 exit_code = e.code 333 if exit_code is None: 334 exit_code = 0 335 336 if exit_code != 0: 337 exception = e 338 339 if not isinstance(exit_code, int): 340 sys.stdout.write(str(exit_code)) 341 sys.stdout.write("\n") 342 exit_code = 1 343 344 except Exception as e: 345 if not catch_exceptions: 346 raise 347 exception = e 348 exit_code = 1 349 exc_info = sys.exc_info() 350 finally: 351 sys.stdout.flush() 352 stdout = outstreams[0].getvalue() 353 if self.mix_stderr: 354 stderr = None 355 else: 356 stderr = outstreams[1].getvalue() 357 358 return Result( 359 runner=self, 360 stdout_bytes=stdout, 361 stderr_bytes=stderr, 362 exit_code=exit_code, 363 exception=exception, 364 exc_info=exc_info, 365 ) 366 367 @contextlib.contextmanager 368 def isolated_filesystem(self): 369 """A context manager that creates a temporary folder and changes 370 the current working directory to it for isolated filesystem tests. 371 """ 372 cwd = os.getcwd() 373 t = tempfile.mkdtemp() 374 os.chdir(t) 375 try: 376 yield t 377 finally: 378 os.chdir(cwd) 379 try: 380 shutil.rmtree(t) 381 except (OSError, IOError): # noqa: B014 382 pass 383