1import os 2import sys 3import shutil 4import tempfile 5import contextlib 6import shlex 7 8from ._compat import iteritems, PY2, string_types 9 10 11# If someone wants to vendor click, we want to ensure the 12# correct package is discovered. Ideally we could use a 13# relative import here but unfortunately Python does not 14# support that. 15clickpkg = sys.modules[__name__.rsplit('.', 1)[0]] 16 17 18if PY2: 19 from cStringIO import StringIO 20else: 21 import io 22 from ._compat import _find_binary_reader 23 24 25class EchoingStdin(object): 26 27 def __init__(self, input, output): 28 self._input = input 29 self._output = output 30 31 def __getattr__(self, x): 32 return getattr(self._input, x) 33 34 def _echo(self, rv): 35 self._output.write(rv) 36 return rv 37 38 def read(self, n=-1): 39 return self._echo(self._input.read(n)) 40 41 def readline(self, n=-1): 42 return self._echo(self._input.readline(n)) 43 44 def readlines(self): 45 return [self._echo(x) for x in self._input.readlines()] 46 47 def __iter__(self): 48 return iter(self._echo(x) for x in self._input) 49 50 def __repr__(self): 51 return repr(self._input) 52 53 54def make_input_stream(input, charset): 55 # Is already an input stream. 56 if hasattr(input, 'read'): 57 if PY2: 58 return input 59 rv = _find_binary_reader(input) 60 if rv is not None: 61 return rv 62 raise TypeError('Could not find binary reader for input stream.') 63 64 if input is None: 65 input = b'' 66 elif not isinstance(input, bytes): 67 input = input.encode(charset) 68 if PY2: 69 return StringIO(input) 70 return io.BytesIO(input) 71 72 73class Result(object): 74 """Holds the captured result of an invoked CLI script.""" 75 76 def __init__(self, runner, stdout_bytes, stderr_bytes, exit_code, 77 exception, exc_info=None): 78 #: The runner that created the result 79 self.runner = runner 80 #: The standard output as bytes. 81 self.stdout_bytes = stdout_bytes 82 #: The standard error as bytes, or False(y) if not available 83 self.stderr_bytes = stderr_bytes 84 #: The exit code as integer. 85 self.exit_code = exit_code 86 #: The exception that happened if one did. 87 self.exception = exception 88 #: The traceback 89 self.exc_info = exc_info 90 91 @property 92 def output(self): 93 """The (standard) output as unicode string.""" 94 return self.stdout 95 96 @property 97 def stdout(self): 98 """The standard output as unicode string.""" 99 return self.stdout_bytes.decode(self.runner.charset, 'replace') \ 100 .replace('\r\n', '\n') 101 102 @property 103 def stderr(self): 104 """The standard error as unicode string.""" 105 if not self.stderr_bytes: 106 raise ValueError("stderr not separately captured") 107 return self.stderr_bytes.decode(self.runner.charset, 'replace') \ 108 .replace('\r\n', '\n') 109 110 111 def __repr__(self): 112 return '<%s %s>' % ( 113 type(self).__name__, 114 self.exception and repr(self.exception) or 'okay', 115 ) 116 117 118class CliRunner(object): 119 """The CLI runner provides functionality to invoke a Click command line 120 script for unittesting purposes in a isolated environment. This only 121 works in single-threaded systems without any concurrency as it changes the 122 global interpreter state. 123 124 :param charset: the character set for the input and output data. This is 125 UTF-8 by default and should not be changed currently as 126 the reporting to Click only works in Python 2 properly. 127 :param env: a dictionary with environment variables for overriding. 128 :param echo_stdin: if this is set to `True`, then reading from stdin writes 129 to stdout. This is useful for showing examples in 130 some circumstances. Note that regular prompts 131 will automatically echo the input. 132 :param mix_stderr: if this is set to `False`, then stdout and stderr are 133 preserved as independent streams. This is useful for 134 Unix-philosophy apps that have predictable stdout and 135 noisy stderr, such that each may be measured 136 independently 137 """ 138 139 def __init__(self, charset=None, env=None, echo_stdin=False, 140 mix_stderr=True): 141 if charset is None: 142 charset = 'utf-8' 143 self.charset = charset 144 self.env = env or {} 145 self.echo_stdin = echo_stdin 146 self.mix_stderr = mix_stderr 147 148 def get_default_prog_name(self, cli): 149 """Given a command object it will return the default program name 150 for it. The default is the `name` attribute or ``"root"`` if not 151 set. 152 """ 153 return cli.name or 'root' 154 155 def make_env(self, overrides=None): 156 """Returns the environment overrides for invoking a script.""" 157 rv = dict(self.env) 158 if overrides: 159 rv.update(overrides) 160 return rv 161 162 @contextlib.contextmanager 163 def isolation(self, input=None, env=None, color=False): 164 """A context manager that sets up the isolation for invoking of a 165 command line tool. This sets up stdin with the given input data 166 and `os.environ` with the overrides from the given dictionary. 167 This also rebinds some internals in Click to be mocked (like the 168 prompt functionality). 169 170 This is automatically done in the :meth:`invoke` method. 171 172 .. versionadded:: 4.0 173 The ``color`` parameter was added. 174 175 :param input: the input stream to put into sys.stdin. 176 :param env: the environment overrides as dictionary. 177 :param color: whether the output should contain color codes. The 178 application can still override this explicitly. 179 """ 180 input = make_input_stream(input, self.charset) 181 182 old_stdin = sys.stdin 183 old_stdout = sys.stdout 184 old_stderr = sys.stderr 185 old_forced_width = clickpkg.formatting.FORCED_WIDTH 186 clickpkg.formatting.FORCED_WIDTH = 80 187 188 env = self.make_env(env) 189 190 if PY2: 191 bytes_output = StringIO() 192 if self.echo_stdin: 193 input = EchoingStdin(input, bytes_output) 194 sys.stdout = bytes_output 195 if not self.mix_stderr: 196 bytes_error = StringIO() 197 sys.stderr = bytes_error 198 else: 199 bytes_output = io.BytesIO() 200 if self.echo_stdin: 201 input = EchoingStdin(input, bytes_output) 202 input = io.TextIOWrapper(input, encoding=self.charset) 203 sys.stdout = io.TextIOWrapper( 204 bytes_output, encoding=self.charset) 205 if not self.mix_stderr: 206 bytes_error = io.BytesIO() 207 sys.stderr = io.TextIOWrapper( 208 bytes_error, encoding=self.charset) 209 210 if self.mix_stderr: 211 sys.stderr = sys.stdout 212 213 sys.stdin = input 214 215 def visible_input(prompt=None): 216 sys.stdout.write(prompt or '') 217 val = input.readline().rstrip('\r\n') 218 sys.stdout.write(val + '\n') 219 sys.stdout.flush() 220 return val 221 222 def hidden_input(prompt=None): 223 sys.stdout.write((prompt or '') + '\n') 224 sys.stdout.flush() 225 return input.readline().rstrip('\r\n') 226 227 def _getchar(echo): 228 char = sys.stdin.read(1) 229 if echo: 230 sys.stdout.write(char) 231 sys.stdout.flush() 232 return char 233 234 default_color = color 235 236 def should_strip_ansi(stream=None, color=None): 237 if color is None: 238 return not default_color 239 return not color 240 241 old_visible_prompt_func = clickpkg.termui.visible_prompt_func 242 old_hidden_prompt_func = clickpkg.termui.hidden_prompt_func 243 old__getchar_func = clickpkg.termui._getchar 244 old_should_strip_ansi = clickpkg.utils.should_strip_ansi 245 clickpkg.termui.visible_prompt_func = visible_input 246 clickpkg.termui.hidden_prompt_func = hidden_input 247 clickpkg.termui._getchar = _getchar 248 clickpkg.utils.should_strip_ansi = should_strip_ansi 249 250 old_env = {} 251 try: 252 for key, value in iteritems(env): 253 old_env[key] = os.environ.get(key) 254 if value is None: 255 try: 256 del os.environ[key] 257 except Exception: 258 pass 259 else: 260 os.environ[key] = value 261 yield (bytes_output, not self.mix_stderr and bytes_error) 262 finally: 263 for key, value in iteritems(old_env): 264 if value is None: 265 try: 266 del os.environ[key] 267 except Exception: 268 pass 269 else: 270 os.environ[key] = value 271 sys.stdout = old_stdout 272 sys.stderr = old_stderr 273 sys.stdin = old_stdin 274 clickpkg.termui.visible_prompt_func = old_visible_prompt_func 275 clickpkg.termui.hidden_prompt_func = old_hidden_prompt_func 276 clickpkg.termui._getchar = old__getchar_func 277 clickpkg.utils.should_strip_ansi = old_should_strip_ansi 278 clickpkg.formatting.FORCED_WIDTH = old_forced_width 279 280 def invoke(self, cli, args=None, input=None, env=None, 281 catch_exceptions=True, color=False, mix_stderr=False, **extra): 282 """Invokes a command in an isolated environment. The arguments are 283 forwarded directly to the command line script, the `extra` keyword 284 arguments are passed to the :meth:`~clickpkg.Command.main` function of 285 the command. 286 287 This returns a :class:`Result` object. 288 289 .. versionadded:: 3.0 290 The ``catch_exceptions`` parameter was added. 291 292 .. versionchanged:: 3.0 293 The result object now has an `exc_info` attribute with the 294 traceback if available. 295 296 .. versionadded:: 4.0 297 The ``color`` parameter was added. 298 299 :param cli: the command to invoke 300 :param args: the arguments to invoke. It may be given as an iterable 301 or a string. When given as string it will be interpreted 302 as a Unix shell command. More details at 303 :func:`shlex.split`. 304 :param input: the input data for `sys.stdin`. 305 :param env: the environment overrides. 306 :param catch_exceptions: Whether to catch any other exceptions than 307 ``SystemExit``. 308 :param extra: the keyword arguments to pass to :meth:`main`. 309 :param color: whether the output should contain color codes. The 310 application can still override this explicitly. 311 """ 312 exc_info = None 313 with self.isolation(input=input, env=env, color=color) as outstreams: 314 exception = None 315 exit_code = 0 316 317 if isinstance(args, string_types): 318 args = shlex.split(args) 319 320 try: 321 prog_name = extra.pop("prog_name") 322 except KeyError: 323 prog_name = self.get_default_prog_name(cli) 324 325 try: 326 cli.main(args=args or (), prog_name=prog_name, **extra) 327 except SystemExit as e: 328 exc_info = sys.exc_info() 329 exit_code = e.code 330 if exit_code is None: 331 exit_code = 0 332 333 if exit_code != 0: 334 exception = e 335 336 if not isinstance(exit_code, int): 337 sys.stdout.write(str(exit_code)) 338 sys.stdout.write('\n') 339 exit_code = 1 340 341 except Exception as e: 342 if not catch_exceptions: 343 raise 344 exception = e 345 exit_code = 1 346 exc_info = sys.exc_info() 347 finally: 348 sys.stdout.flush() 349 stdout = outstreams[0].getvalue() 350 stderr = outstreams[1] and outstreams[1].getvalue() 351 352 return Result(runner=self, 353 stdout_bytes=stdout, 354 stderr_bytes=stderr, 355 exit_code=exit_code, 356 exception=exception, 357 exc_info=exc_info) 358 359 @contextlib.contextmanager 360 def isolated_filesystem(self): 361 """A context manager that creates a temporary folder and changes 362 the current working directory to it for isolated filesystem tests. 363 """ 364 cwd = os.getcwd() 365 t = tempfile.mkdtemp() 366 os.chdir(t) 367 try: 368 yield t 369 finally: 370 os.chdir(cwd) 371 try: 372 shutil.rmtree(t) 373 except (OSError, IOError): 374 pass 375