1import re 2import io 3import os 4import sys 5import codecs 6from weakref import WeakKeyDictionary 7 8 9PY2 = sys.version_info[0] == 2 10WIN = sys.platform.startswith('win') 11DEFAULT_COLUMNS = 80 12 13 14_ansi_re = re.compile('\033\[((?:\d|;)*)([a-zA-Z])') 15 16 17def get_filesystem_encoding(): 18 return sys.getfilesystemencoding() or sys.getdefaultencoding() 19 20 21def _make_text_stream(stream, encoding, errors): 22 if encoding is None: 23 encoding = get_best_encoding(stream) 24 if errors is None: 25 errors = 'replace' 26 return _NonClosingTextIOWrapper(stream, encoding, errors, 27 line_buffering=True) 28 29 30def is_ascii_encoding(encoding): 31 """Checks if a given encoding is ascii.""" 32 try: 33 return codecs.lookup(encoding).name == 'ascii' 34 except LookupError: 35 return False 36 37 38def get_best_encoding(stream): 39 """Returns the default stream encoding if not found.""" 40 rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding() 41 if is_ascii_encoding(rv): 42 return 'utf-8' 43 return rv 44 45 46class _NonClosingTextIOWrapper(io.TextIOWrapper): 47 48 def __init__(self, stream, encoding, errors, **extra): 49 self._stream = stream = _FixupStream(stream) 50 io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra) 51 52 # The io module is a place where the Python 3 text behavior 53 # was forced upon Python 2, so we need to unbreak 54 # it to look like Python 2. 55 if PY2: 56 def write(self, x): 57 if isinstance(x, str) or is_bytes(x): 58 try: 59 self.flush() 60 except Exception: 61 pass 62 return self.buffer.write(str(x)) 63 return io.TextIOWrapper.write(self, x) 64 65 def writelines(self, lines): 66 for line in lines: 67 self.write(line) 68 69 def __del__(self): 70 try: 71 self.detach() 72 except Exception: 73 pass 74 75 def isatty(self): 76 # https://bitbucket.org/pypy/pypy/issue/1803 77 return self._stream.isatty() 78 79 80class _FixupStream(object): 81 """The new io interface needs more from streams than streams 82 traditionally implement. As such, this fix-up code is necessary in 83 some circumstances. 84 """ 85 86 def __init__(self, stream): 87 self._stream = stream 88 89 def __getattr__(self, name): 90 return getattr(self._stream, name) 91 92 def read1(self, size): 93 f = getattr(self._stream, 'read1', None) 94 if f is not None: 95 return f(size) 96 # We only dispatch to readline instead of read in Python 2 as we 97 # do not want cause problems with the different implementation 98 # of line buffering. 99 if PY2: 100 return self._stream.readline(size) 101 return self._stream.read(size) 102 103 def readable(self): 104 x = getattr(self._stream, 'readable', None) 105 if x is not None: 106 return x() 107 try: 108 self._stream.read(0) 109 except Exception: 110 return False 111 return True 112 113 def writable(self): 114 x = getattr(self._stream, 'writable', None) 115 if x is not None: 116 return x() 117 try: 118 self._stream.write('') 119 except Exception: 120 try: 121 self._stream.write(b'') 122 except Exception: 123 return False 124 return True 125 126 def seekable(self): 127 x = getattr(self._stream, 'seekable', None) 128 if x is not None: 129 return x() 130 try: 131 self._stream.seek(self._stream.tell()) 132 except Exception: 133 return False 134 return True 135 136 137if PY2: 138 text_type = unicode 139 bytes = str 140 raw_input = raw_input 141 string_types = (str, unicode) 142 iteritems = lambda x: x.iteritems() 143 range_type = xrange 144 145 def is_bytes(x): 146 return isinstance(x, (buffer, bytearray)) 147 148 _identifier_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') 149 150 # For Windows, we need to force stdout/stdin/stderr to binary if it's 151 # fetched for that. This obviously is not the most correct way to do 152 # it as it changes global state. Unfortunately, there does not seem to 153 # be a clear better way to do it as just reopening the file in binary 154 # mode does not change anything. 155 # 156 # An option would be to do what Python 3 does and to open the file as 157 # binary only, patch it back to the system, and then use a wrapper 158 # stream that converts newlines. It's not quite clear what's the 159 # correct option here. 160 # 161 # This code also lives in _winconsole for the fallback to the console 162 # emulation stream. 163 # 164 # There are also Windows environments where the `msvcrt` module is not 165 # available (which is why we use try-catch instead of the WIN variable 166 # here), such as the Google App Engine development server on Windows. In 167 # those cases there is just nothing we can do. 168 try: 169 import msvcrt 170 except ImportError: 171 set_binary_mode = lambda x: x 172 else: 173 def set_binary_mode(f): 174 try: 175 fileno = f.fileno() 176 except Exception: 177 pass 178 else: 179 msvcrt.setmode(fileno, os.O_BINARY) 180 return f 181 182 def isidentifier(x): 183 return _identifier_re.search(x) is not None 184 185 def get_binary_stdin(): 186 return set_binary_mode(sys.stdin) 187 188 def get_binary_stdout(): 189 return set_binary_mode(sys.stdout) 190 191 def get_binary_stderr(): 192 return set_binary_mode(sys.stderr) 193 194 def get_text_stdin(encoding=None, errors=None): 195 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 196 if rv is not None: 197 return rv 198 return _make_text_stream(sys.stdin, encoding, errors) 199 200 def get_text_stdout(encoding=None, errors=None): 201 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 202 if rv is not None: 203 return rv 204 return _make_text_stream(sys.stdout, encoding, errors) 205 206 def get_text_stderr(encoding=None, errors=None): 207 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 208 if rv is not None: 209 return rv 210 return _make_text_stream(sys.stderr, encoding, errors) 211 212 def filename_to_ui(value): 213 if isinstance(value, bytes): 214 value = value.decode(get_filesystem_encoding(), 'replace') 215 return value 216else: 217 import io 218 text_type = str 219 raw_input = input 220 string_types = (str,) 221 range_type = range 222 isidentifier = lambda x: x.isidentifier() 223 iteritems = lambda x: iter(x.items()) 224 225 def is_bytes(x): 226 return isinstance(x, (bytes, memoryview, bytearray)) 227 228 def _is_binary_reader(stream, default=False): 229 try: 230 return isinstance(stream.read(0), bytes) 231 except Exception: 232 return default 233 # This happens in some cases where the stream was already 234 # closed. In this case, we assume the default. 235 236 def _is_binary_writer(stream, default=False): 237 try: 238 stream.write(b'') 239 except Exception: 240 try: 241 stream.write('') 242 return False 243 except Exception: 244 pass 245 return default 246 return True 247 248 def _find_binary_reader(stream): 249 # We need to figure out if the given stream is already binary. 250 # This can happen because the official docs recommend detaching 251 # the streams to get binary streams. Some code might do this, so 252 # we need to deal with this case explicitly. 253 if _is_binary_reader(stream, False): 254 return stream 255 256 buf = getattr(stream, 'buffer', None) 257 258 # Same situation here; this time we assume that the buffer is 259 # actually binary in case it's closed. 260 if buf is not None and _is_binary_reader(buf, True): 261 return buf 262 263 def _find_binary_writer(stream): 264 # We need to figure out if the given stream is already binary. 265 # This can happen because the official docs recommend detatching 266 # the streams to get binary streams. Some code might do this, so 267 # we need to deal with this case explicitly. 268 if _is_binary_writer(stream, False): 269 return stream 270 271 buf = getattr(stream, 'buffer', None) 272 273 # Same situation here; this time we assume that the buffer is 274 # actually binary in case it's closed. 275 if buf is not None and _is_binary_writer(buf, True): 276 return buf 277 278 def _stream_is_misconfigured(stream): 279 """A stream is misconfigured if its encoding is ASCII.""" 280 # If the stream does not have an encoding set, we assume it's set 281 # to ASCII. This appears to happen in certain unittest 282 # environments. It's not quite clear what the correct behavior is 283 # but this at least will force Click to recover somehow. 284 return is_ascii_encoding(getattr(stream, 'encoding', None) or 'ascii') 285 286 def _is_compatible_text_stream(stream, encoding, errors): 287 stream_encoding = getattr(stream, 'encoding', None) 288 stream_errors = getattr(stream, 'errors', None) 289 290 # Perfect match. 291 if stream_encoding == encoding and stream_errors == errors: 292 return True 293 294 # Otherwise, it's only a compatible stream if we did not ask for 295 # an encoding. 296 if encoding is None: 297 return stream_encoding is not None 298 299 return False 300 301 def _force_correct_text_reader(text_reader, encoding, errors): 302 if _is_binary_reader(text_reader, False): 303 binary_reader = text_reader 304 else: 305 # If there is no target encoding set, we need to verify that the 306 # reader is not actually misconfigured. 307 if encoding is None and not _stream_is_misconfigured(text_reader): 308 return text_reader 309 310 if _is_compatible_text_stream(text_reader, encoding, errors): 311 return text_reader 312 313 # If the reader has no encoding, we try to find the underlying 314 # binary reader for it. If that fails because the environment is 315 # misconfigured, we silently go with the same reader because this 316 # is too common to happen. In that case, mojibake is better than 317 # exceptions. 318 binary_reader = _find_binary_reader(text_reader) 319 if binary_reader is None: 320 return text_reader 321 322 # At this point, we default the errors to replace instead of strict 323 # because nobody handles those errors anyways and at this point 324 # we're so fundamentally fucked that nothing can repair it. 325 if errors is None: 326 errors = 'replace' 327 return _make_text_stream(binary_reader, encoding, errors) 328 329 def _force_correct_text_writer(text_writer, encoding, errors): 330 if _is_binary_writer(text_writer, False): 331 binary_writer = text_writer 332 else: 333 # If there is no target encoding set, we need to verify that the 334 # writer is not actually misconfigured. 335 if encoding is None and not _stream_is_misconfigured(text_writer): 336 return text_writer 337 338 if _is_compatible_text_stream(text_writer, encoding, errors): 339 return text_writer 340 341 # If the writer has no encoding, we try to find the underlying 342 # binary writer for it. If that fails because the environment is 343 # misconfigured, we silently go with the same writer because this 344 # is too common to happen. In that case, mojibake is better than 345 # exceptions. 346 binary_writer = _find_binary_writer(text_writer) 347 if binary_writer is None: 348 return text_writer 349 350 # At this point, we default the errors to replace instead of strict 351 # because nobody handles those errors anyways and at this point 352 # we're so fundamentally fucked that nothing can repair it. 353 if errors is None: 354 errors = 'replace' 355 return _make_text_stream(binary_writer, encoding, errors) 356 357 def get_binary_stdin(): 358 reader = _find_binary_reader(sys.stdin) 359 if reader is None: 360 raise RuntimeError('Was not able to determine binary ' 361 'stream for sys.stdin.') 362 return reader 363 364 def get_binary_stdout(): 365 writer = _find_binary_writer(sys.stdout) 366 if writer is None: 367 raise RuntimeError('Was not able to determine binary ' 368 'stream for sys.stdout.') 369 return writer 370 371 def get_binary_stderr(): 372 writer = _find_binary_writer(sys.stderr) 373 if writer is None: 374 raise RuntimeError('Was not able to determine binary ' 375 'stream for sys.stderr.') 376 return writer 377 378 def get_text_stdin(encoding=None, errors=None): 379 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 380 if rv is not None: 381 return rv 382 return _force_correct_text_reader(sys.stdin, encoding, errors) 383 384 def get_text_stdout(encoding=None, errors=None): 385 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 386 if rv is not None: 387 return rv 388 return _force_correct_text_writer(sys.stdout, encoding, errors) 389 390 def get_text_stderr(encoding=None, errors=None): 391 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 392 if rv is not None: 393 return rv 394 return _force_correct_text_writer(sys.stderr, encoding, errors) 395 396 def filename_to_ui(value): 397 if isinstance(value, bytes): 398 value = value.decode(get_filesystem_encoding(), 'replace') 399 else: 400 value = value.encode('utf-8', 'surrogateescape') \ 401 .decode('utf-8', 'replace') 402 return value 403 404 405def get_streerror(e, default=None): 406 if hasattr(e, 'strerror'): 407 msg = e.strerror 408 else: 409 if default is not None: 410 msg = default 411 else: 412 msg = str(e) 413 if isinstance(msg, bytes): 414 msg = msg.decode('utf-8', 'replace') 415 return msg 416 417 418def open_stream(filename, mode='r', encoding=None, errors='strict', 419 atomic=False): 420 # Standard streams first. These are simple because they don't need 421 # special handling for the atomic flag. It's entirely ignored. 422 if filename == '-': 423 if 'w' in mode: 424 if 'b' in mode: 425 return get_binary_stdout(), False 426 return get_text_stdout(encoding=encoding, errors=errors), False 427 if 'b' in mode: 428 return get_binary_stdin(), False 429 return get_text_stdin(encoding=encoding, errors=errors), False 430 431 # Non-atomic writes directly go out through the regular open functions. 432 if not atomic: 433 if encoding is None: 434 return open(filename, mode), True 435 return io.open(filename, mode, encoding=encoding, errors=errors), True 436 437 # Some usability stuff for atomic writes 438 if 'a' in mode: 439 raise ValueError( 440 'Appending to an existing file is not supported, because that ' 441 'would involve an expensive `copy`-operation to a temporary ' 442 'file. Open the file in normal `w`-mode and copy explicitly ' 443 'if that\'s what you\'re after.' 444 ) 445 if 'x' in mode: 446 raise ValueError('Use the `overwrite`-parameter instead.') 447 if 'w' not in mode: 448 raise ValueError('Atomic writes only make sense with `w`-mode.') 449 450 # Atomic writes are more complicated. They work by opening a file 451 # as a proxy in the same folder and then using the fdopen 452 # functionality to wrap it in a Python file. Then we wrap it in an 453 # atomic file that moves the file over on close. 454 import tempfile 455 fd, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename), 456 prefix='.__atomic-write') 457 458 if encoding is not None: 459 f = io.open(fd, mode, encoding=encoding, errors=errors) 460 else: 461 f = os.fdopen(fd, mode) 462 463 return _AtomicFile(f, tmp_filename, filename), True 464 465 466# Used in a destructor call, needs extra protection from interpreter cleanup. 467if hasattr(os, 'replace'): 468 _replace = os.replace 469 _can_replace = True 470else: 471 _replace = os.rename 472 _can_replace = not WIN 473 474 475class _AtomicFile(object): 476 477 def __init__(self, f, tmp_filename, real_filename): 478 self._f = f 479 self._tmp_filename = tmp_filename 480 self._real_filename = real_filename 481 self.closed = False 482 483 @property 484 def name(self): 485 return self._real_filename 486 487 def close(self, delete=False): 488 if self.closed: 489 return 490 self._f.close() 491 if not _can_replace: 492 try: 493 os.remove(self._real_filename) 494 except OSError: 495 pass 496 _replace(self._tmp_filename, self._real_filename) 497 self.closed = True 498 499 def __getattr__(self, name): 500 return getattr(self._f, name) 501 502 def __enter__(self): 503 return self 504 505 def __exit__(self, exc_type, exc_value, tb): 506 self.close(delete=exc_type is not None) 507 508 def __repr__(self): 509 return repr(self._f) 510 511 512auto_wrap_for_ansi = None 513colorama = None 514get_winterm_size = None 515 516 517def strip_ansi(value): 518 return _ansi_re.sub('', value) 519 520 521def should_strip_ansi(stream=None, color=None): 522 if color is None: 523 if stream is None: 524 stream = sys.stdin 525 return not isatty(stream) 526 return not color 527 528 529# If we're on Windows, we provide transparent integration through 530# colorama. This will make ANSI colors through the echo function 531# work automatically. 532if WIN: 533 # Windows has a smaller terminal 534 DEFAULT_COLUMNS = 79 535 536 from ._winconsole import _get_windows_console_stream 537 538 def _get_argv_encoding(): 539 import locale 540 return locale.getpreferredencoding() 541 542 if PY2: 543 def raw_input(prompt=''): 544 sys.stderr.flush() 545 if prompt: 546 stdout = _default_text_stdout() 547 stdout.write(prompt) 548 stdin = _default_text_stdin() 549 return stdin.readline().rstrip('\r\n') 550 551 try: 552 import colorama 553 except ImportError: 554 pass 555 else: 556 _ansi_stream_wrappers = WeakKeyDictionary() 557 558 def auto_wrap_for_ansi(stream, color=None): 559 """This function wraps a stream so that calls through colorama 560 are issued to the win32 console API to recolor on demand. It 561 also ensures to reset the colors if a write call is interrupted 562 to not destroy the console afterwards. 563 """ 564 try: 565 cached = _ansi_stream_wrappers.get(stream) 566 except Exception: 567 cached = None 568 if cached is not None: 569 return cached 570 strip = should_strip_ansi(stream, color) 571 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) 572 rv = ansi_wrapper.stream 573 _write = rv.write 574 575 def _safe_write(s): 576 try: 577 return _write(s) 578 except: 579 ansi_wrapper.reset_all() 580 raise 581 582 rv.write = _safe_write 583 try: 584 _ansi_stream_wrappers[stream] = rv 585 except Exception: 586 pass 587 return rv 588 589 def get_winterm_size(): 590 win = colorama.win32.GetConsoleScreenBufferInfo( 591 colorama.win32.STDOUT).srWindow 592 return win.Right - win.Left, win.Bottom - win.Top 593else: 594 def _get_argv_encoding(): 595 return getattr(sys.stdin, 'encoding', None) or get_filesystem_encoding() 596 597 _get_windows_console_stream = lambda *x: None 598 599 600def term_len(x): 601 return len(strip_ansi(x)) 602 603 604def isatty(stream): 605 try: 606 return stream.isatty() 607 except Exception: 608 return False 609 610 611def _make_cached_stream_func(src_func, wrapper_func): 612 cache = WeakKeyDictionary() 613 def func(): 614 stream = src_func() 615 try: 616 rv = cache.get(stream) 617 except Exception: 618 rv = None 619 if rv is not None: 620 return rv 621 rv = wrapper_func() 622 try: 623 cache[stream] = rv 624 except Exception: 625 pass 626 return rv 627 return func 628 629 630_default_text_stdin = _make_cached_stream_func( 631 lambda: sys.stdin, get_text_stdin) 632_default_text_stdout = _make_cached_stream_func( 633 lambda: sys.stdout, get_text_stdout) 634_default_text_stderr = _make_cached_stream_func( 635 lambda: sys.stderr, get_text_stderr) 636 637 638binary_streams = { 639 'stdin': get_binary_stdin, 640 'stdout': get_binary_stdout, 641 'stderr': get_binary_stderr, 642} 643 644text_streams = { 645 'stdin': get_text_stdin, 646 'stdout': get_text_stdout, 647 'stderr': get_text_stderr, 648} 649