1import re 2import io 3import os 4import sys 5import codecs 6from weakref import WeakKeyDictionary 7 8 9PY2 = sys.version_info[0] == 2 10CYGWIN = sys.platform.startswith('cygwin') 11# Determine local App Engine environment, per Google's own suggestion 12APP_ENGINE = ('APPENGINE_RUNTIME' in os.environ and 13 'Development/' in os.environ['SERVER_SOFTWARE']) 14WIN = sys.platform.startswith('win') and not APP_ENGINE 15DEFAULT_COLUMNS = 80 16 17 18_ansi_re = re.compile(r'\033\[((?:\d|;)*)([a-zA-Z])') 19 20 21def get_filesystem_encoding(): 22 return sys.getfilesystemencoding() or sys.getdefaultencoding() 23 24 25def _make_text_stream(stream, encoding, errors, 26 force_readable=False, force_writable=False): 27 if encoding is None: 28 encoding = get_best_encoding(stream) 29 if errors is None: 30 errors = 'replace' 31 return _NonClosingTextIOWrapper(stream, encoding, errors, 32 line_buffering=True, 33 force_readable=force_readable, 34 force_writable=force_writable) 35 36 37def is_ascii_encoding(encoding): 38 """Checks if a given encoding is ascii.""" 39 try: 40 return codecs.lookup(encoding).name == 'ascii' 41 except LookupError: 42 return False 43 44 45def get_best_encoding(stream): 46 """Returns the default stream encoding if not found.""" 47 rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding() 48 if is_ascii_encoding(rv): 49 return 'utf-8' 50 return rv 51 52 53class _NonClosingTextIOWrapper(io.TextIOWrapper): 54 55 def __init__(self, stream, encoding, errors, 56 force_readable=False, force_writable=False, **extra): 57 self._stream = stream = _FixupStream(stream, force_readable, 58 force_writable) 59 io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra) 60 61 # The io module is a place where the Python 3 text behavior 62 # was forced upon Python 2, so we need to unbreak 63 # it to look like Python 2. 64 if PY2: 65 def write(self, x): 66 if isinstance(x, str) or is_bytes(x): 67 try: 68 self.flush() 69 except Exception: 70 pass 71 return self.buffer.write(str(x)) 72 return io.TextIOWrapper.write(self, x) 73 74 def writelines(self, lines): 75 for line in lines: 76 self.write(line) 77 78 def __del__(self): 79 try: 80 self.detach() 81 except Exception: 82 pass 83 84 def isatty(self): 85 # https://bitbucket.org/pypy/pypy/issue/1803 86 return self._stream.isatty() 87 88 89class _FixupStream(object): 90 """The new io interface needs more from streams than streams 91 traditionally implement. As such, this fix-up code is necessary in 92 some circumstances. 93 94 The forcing of readable and writable flags are there because some tools 95 put badly patched objects on sys (one such offender are certain version 96 of jupyter notebook). 97 """ 98 99 def __init__(self, stream, force_readable=False, force_writable=False): 100 self._stream = stream 101 self._force_readable = force_readable 102 self._force_writable = force_writable 103 104 def __getattr__(self, name): 105 return getattr(self._stream, name) 106 107 def read1(self, size): 108 f = getattr(self._stream, 'read1', None) 109 if f is not None: 110 return f(size) 111 # We only dispatch to readline instead of read in Python 2 as we 112 # do not want cause problems with the different implementation 113 # of line buffering. 114 if PY2: 115 return self._stream.readline(size) 116 return self._stream.read(size) 117 118 def readable(self): 119 if self._force_readable: 120 return True 121 x = getattr(self._stream, 'readable', None) 122 if x is not None: 123 return x() 124 try: 125 self._stream.read(0) 126 except Exception: 127 return False 128 return True 129 130 def writable(self): 131 if self._force_writable: 132 return True 133 x = getattr(self._stream, 'writable', None) 134 if x is not None: 135 return x() 136 try: 137 self._stream.write('') 138 except Exception: 139 try: 140 self._stream.write(b'') 141 except Exception: 142 return False 143 return True 144 145 def seekable(self): 146 x = getattr(self._stream, 'seekable', None) 147 if x is not None: 148 return x() 149 try: 150 self._stream.seek(self._stream.tell()) 151 except Exception: 152 return False 153 return True 154 155 156if PY2: 157 text_type = unicode 158 bytes = str 159 raw_input = raw_input 160 string_types = (str, unicode) 161 int_types = (int, long) 162 iteritems = lambda x: x.iteritems() 163 range_type = xrange 164 165 def is_bytes(x): 166 return isinstance(x, (buffer, bytearray)) 167 168 _identifier_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') 169 170 # For Windows, we need to force stdout/stdin/stderr to binary if it's 171 # fetched for that. This obviously is not the most correct way to do 172 # it as it changes global state. Unfortunately, there does not seem to 173 # be a clear better way to do it as just reopening the file in binary 174 # mode does not change anything. 175 # 176 # An option would be to do what Python 3 does and to open the file as 177 # binary only, patch it back to the system, and then use a wrapper 178 # stream that converts newlines. It's not quite clear what's the 179 # correct option here. 180 # 181 # This code also lives in _winconsole for the fallback to the console 182 # emulation stream. 183 # 184 # There are also Windows environments where the `msvcrt` module is not 185 # available (which is why we use try-catch instead of the WIN variable 186 # here), such as the Google App Engine development server on Windows. In 187 # those cases there is just nothing we can do. 188 def set_binary_mode(f): 189 return f 190 191 try: 192 import msvcrt 193 except ImportError: 194 pass 195 else: 196 def set_binary_mode(f): 197 try: 198 fileno = f.fileno() 199 except Exception: 200 pass 201 else: 202 msvcrt.setmode(fileno, os.O_BINARY) 203 return f 204 205 try: 206 import fcntl 207 except ImportError: 208 pass 209 else: 210 def set_binary_mode(f): 211 try: 212 fileno = f.fileno() 213 except Exception: 214 pass 215 else: 216 flags = fcntl.fcntl(fileno, fcntl.F_GETFL) 217 fcntl.fcntl(fileno, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) 218 return f 219 220 def isidentifier(x): 221 return _identifier_re.search(x) is not None 222 223 def get_binary_stdin(): 224 return set_binary_mode(sys.stdin) 225 226 def get_binary_stdout(): 227 _wrap_std_stream('stdout') 228 return set_binary_mode(sys.stdout) 229 230 def get_binary_stderr(): 231 _wrap_std_stream('stderr') 232 return set_binary_mode(sys.stderr) 233 234 def get_text_stdin(encoding=None, errors=None): 235 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 236 if rv is not None: 237 return rv 238 return _make_text_stream(sys.stdin, encoding, errors, 239 force_readable=True) 240 241 def get_text_stdout(encoding=None, errors=None): 242 _wrap_std_stream('stdout') 243 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 244 if rv is not None: 245 return rv 246 return _make_text_stream(sys.stdout, encoding, errors, 247 force_writable=True) 248 249 def get_text_stderr(encoding=None, errors=None): 250 _wrap_std_stream('stderr') 251 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 252 if rv is not None: 253 return rv 254 return _make_text_stream(sys.stderr, encoding, errors, 255 force_writable=True) 256 257 def filename_to_ui(value): 258 if isinstance(value, bytes): 259 value = value.decode(get_filesystem_encoding(), 'replace') 260 return value 261else: 262 import io 263 text_type = str 264 raw_input = input 265 string_types = (str,) 266 int_types = (int,) 267 range_type = range 268 isidentifier = lambda x: x.isidentifier() 269 iteritems = lambda x: iter(x.items()) 270 271 def is_bytes(x): 272 return isinstance(x, (bytes, memoryview, bytearray)) 273 274 def _is_binary_reader(stream, default=False): 275 try: 276 return isinstance(stream.read(0), bytes) 277 except Exception: 278 return default 279 # This happens in some cases where the stream was already 280 # closed. In this case, we assume the default. 281 282 def _is_binary_writer(stream, default=False): 283 try: 284 stream.write(b'') 285 except Exception: 286 try: 287 stream.write('') 288 return False 289 except Exception: 290 pass 291 return default 292 return True 293 294 def _find_binary_reader(stream): 295 # We need to figure out if the given stream is already binary. 296 # This can happen because the official docs recommend detaching 297 # the streams to get binary streams. Some code might do this, so 298 # we need to deal with this case explicitly. 299 if _is_binary_reader(stream, False): 300 return stream 301 302 buf = getattr(stream, 'buffer', None) 303 304 # Same situation here; this time we assume that the buffer is 305 # actually binary in case it's closed. 306 if buf is not None and _is_binary_reader(buf, True): 307 return buf 308 309 def _find_binary_writer(stream): 310 # We need to figure out if the given stream is already binary. 311 # This can happen because the official docs recommend detatching 312 # the streams to get binary streams. Some code might do this, so 313 # we need to deal with this case explicitly. 314 if _is_binary_writer(stream, False): 315 return stream 316 317 buf = getattr(stream, 'buffer', None) 318 319 # Same situation here; this time we assume that the buffer is 320 # actually binary in case it's closed. 321 if buf is not None and _is_binary_writer(buf, True): 322 return buf 323 324 def _stream_is_misconfigured(stream): 325 """A stream is misconfigured if its encoding is ASCII.""" 326 # If the stream does not have an encoding set, we assume it's set 327 # to ASCII. This appears to happen in certain unittest 328 # environments. It's not quite clear what the correct behavior is 329 # but this at least will force Click to recover somehow. 330 return is_ascii_encoding(getattr(stream, 'encoding', None) or 'ascii') 331 332 def _is_compatible_text_stream(stream, encoding, errors): 333 stream_encoding = getattr(stream, 'encoding', None) 334 stream_errors = getattr(stream, 'errors', None) 335 336 # Perfect match. 337 if stream_encoding == encoding and stream_errors == errors: 338 return True 339 340 # Otherwise, it's only a compatible stream if we did not ask for 341 # an encoding. 342 if encoding is None: 343 return stream_encoding is not None 344 345 return False 346 347 def _force_correct_text_reader(text_reader, encoding, errors, 348 force_readable=False): 349 if _is_binary_reader(text_reader, False): 350 binary_reader = text_reader 351 else: 352 # If there is no target encoding set, we need to verify that the 353 # reader is not actually misconfigured. 354 if encoding is None and not _stream_is_misconfigured(text_reader): 355 return text_reader 356 357 if _is_compatible_text_stream(text_reader, encoding, errors): 358 return text_reader 359 360 # If the reader has no encoding, we try to find the underlying 361 # binary reader for it. If that fails because the environment is 362 # misconfigured, we silently go with the same reader because this 363 # is too common to happen. In that case, mojibake is better than 364 # exceptions. 365 binary_reader = _find_binary_reader(text_reader) 366 if binary_reader is None: 367 return text_reader 368 369 # At this point, we default the errors to replace instead of strict 370 # because nobody handles those errors anyways and at this point 371 # we're so fundamentally fucked that nothing can repair it. 372 if errors is None: 373 errors = 'replace' 374 return _make_text_stream(binary_reader, encoding, errors, 375 force_readable=force_readable) 376 377 def _force_correct_text_writer(text_writer, encoding, errors, 378 force_writable=False): 379 if _is_binary_writer(text_writer, False): 380 binary_writer = text_writer 381 else: 382 # If there is no target encoding set, we need to verify that the 383 # writer is not actually misconfigured. 384 if encoding is None and not _stream_is_misconfigured(text_writer): 385 return text_writer 386 387 if _is_compatible_text_stream(text_writer, encoding, errors): 388 return text_writer 389 390 # If the writer has no encoding, we try to find the underlying 391 # binary writer for it. If that fails because the environment is 392 # misconfigured, we silently go with the same writer because this 393 # is too common to happen. In that case, mojibake is better than 394 # exceptions. 395 binary_writer = _find_binary_writer(text_writer) 396 if binary_writer is None: 397 return text_writer 398 399 # At this point, we default the errors to replace instead of strict 400 # because nobody handles those errors anyways and at this point 401 # we're so fundamentally fucked that nothing can repair it. 402 if errors is None: 403 errors = 'replace' 404 return _make_text_stream(binary_writer, encoding, errors, 405 force_writable=force_writable) 406 407 def get_binary_stdin(): 408 reader = _find_binary_reader(sys.stdin) 409 if reader is None: 410 raise RuntimeError('Was not able to determine binary ' 411 'stream for sys.stdin.') 412 return reader 413 414 def get_binary_stdout(): 415 writer = _find_binary_writer(sys.stdout) 416 if writer is None: 417 raise RuntimeError('Was not able to determine binary ' 418 'stream for sys.stdout.') 419 return writer 420 421 def get_binary_stderr(): 422 writer = _find_binary_writer(sys.stderr) 423 if writer is None: 424 raise RuntimeError('Was not able to determine binary ' 425 'stream for sys.stderr.') 426 return writer 427 428 def get_text_stdin(encoding=None, errors=None): 429 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 430 if rv is not None: 431 return rv 432 return _force_correct_text_reader(sys.stdin, encoding, errors, 433 force_readable=True) 434 435 def get_text_stdout(encoding=None, errors=None): 436 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 437 if rv is not None: 438 return rv 439 return _force_correct_text_writer(sys.stdout, encoding, errors, 440 force_writable=True) 441 442 def get_text_stderr(encoding=None, errors=None): 443 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 444 if rv is not None: 445 return rv 446 return _force_correct_text_writer(sys.stderr, encoding, errors, 447 force_writable=True) 448 449 def filename_to_ui(value): 450 if isinstance(value, bytes): 451 value = value.decode(get_filesystem_encoding(), 'replace') 452 else: 453 value = value.encode('utf-8', 'surrogateescape') \ 454 .decode('utf-8', 'replace') 455 return value 456 457 458def get_streerror(e, default=None): 459 if hasattr(e, 'strerror'): 460 msg = e.strerror 461 else: 462 if default is not None: 463 msg = default 464 else: 465 msg = str(e) 466 if isinstance(msg, bytes): 467 msg = msg.decode('utf-8', 'replace') 468 return msg 469 470 471def open_stream(filename, mode='r', encoding=None, errors='strict', 472 atomic=False): 473 # Standard streams first. These are simple because they don't need 474 # special handling for the atomic flag. It's entirely ignored. 475 if filename == '-': 476 if any(m in mode for m in ['w', 'a', 'x']): 477 if 'b' in mode: 478 return get_binary_stdout(), False 479 return get_text_stdout(encoding=encoding, errors=errors), False 480 if 'b' in mode: 481 return get_binary_stdin(), False 482 return get_text_stdin(encoding=encoding, errors=errors), False 483 484 # Non-atomic writes directly go out through the regular open functions. 485 if not atomic: 486 if encoding is None: 487 return open(filename, mode), True 488 return io.open(filename, mode, encoding=encoding, errors=errors), True 489 490 # Some usability stuff for atomic writes 491 if 'a' in mode: 492 raise ValueError( 493 'Appending to an existing file is not supported, because that ' 494 'would involve an expensive `copy`-operation to a temporary ' 495 'file. Open the file in normal `w`-mode and copy explicitly ' 496 'if that\'s what you\'re after.' 497 ) 498 if 'x' in mode: 499 raise ValueError('Use the `overwrite`-parameter instead.') 500 if 'w' not in mode: 501 raise ValueError('Atomic writes only make sense with `w`-mode.') 502 503 # Atomic writes are more complicated. They work by opening a file 504 # as a proxy in the same folder and then using the fdopen 505 # functionality to wrap it in a Python file. Then we wrap it in an 506 # atomic file that moves the file over on close. 507 import tempfile 508 fd, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename), 509 prefix='.__atomic-write') 510 511 if encoding is not None: 512 f = io.open(fd, mode, encoding=encoding, errors=errors) 513 else: 514 f = os.fdopen(fd, mode) 515 516 return _AtomicFile(f, tmp_filename, os.path.realpath(filename)), True 517 518 519# Used in a destructor call, needs extra protection from interpreter cleanup. 520if hasattr(os, 'replace'): 521 _replace = os.replace 522 _can_replace = True 523else: 524 _replace = os.rename 525 _can_replace = not WIN 526 527 528class _AtomicFile(object): 529 530 def __init__(self, f, tmp_filename, real_filename): 531 self._f = f 532 self._tmp_filename = tmp_filename 533 self._real_filename = real_filename 534 self.closed = False 535 536 @property 537 def name(self): 538 return self._real_filename 539 540 def close(self, delete=False): 541 if self.closed: 542 return 543 self._f.close() 544 if not _can_replace: 545 try: 546 os.remove(self._real_filename) 547 except OSError: 548 pass 549 _replace(self._tmp_filename, self._real_filename) 550 self.closed = True 551 552 def __getattr__(self, name): 553 return getattr(self._f, name) 554 555 def __enter__(self): 556 return self 557 558 def __exit__(self, exc_type, exc_value, tb): 559 self.close(delete=exc_type is not None) 560 561 def __repr__(self): 562 return repr(self._f) 563 564 565auto_wrap_for_ansi = None 566colorama = None 567get_winterm_size = None 568 569 570def strip_ansi(value): 571 return _ansi_re.sub('', value) 572 573 574def should_strip_ansi(stream=None, color=None): 575 if color is None: 576 if stream is None: 577 stream = sys.stdin 578 return not isatty(stream) 579 return not color 580 581 582# If we're on Windows, we provide transparent integration through 583# colorama. This will make ANSI colors through the echo function 584# work automatically. 585if WIN: 586 # Windows has a smaller terminal 587 DEFAULT_COLUMNS = 79 588 589 from ._winconsole import _get_windows_console_stream, _wrap_std_stream 590 591 def _get_argv_encoding(): 592 import locale 593 return locale.getpreferredencoding() 594 595 if PY2: 596 def raw_input(prompt=''): 597 sys.stderr.flush() 598 if prompt: 599 stdout = _default_text_stdout() 600 stdout.write(prompt) 601 stdin = _default_text_stdin() 602 return stdin.readline().rstrip('\r\n') 603 604 try: 605 import colorama 606 except ImportError: 607 pass 608 else: 609 _ansi_stream_wrappers = WeakKeyDictionary() 610 611 def auto_wrap_for_ansi(stream, color=None): 612 """This function wraps a stream so that calls through colorama 613 are issued to the win32 console API to recolor on demand. It 614 also ensures to reset the colors if a write call is interrupted 615 to not destroy the console afterwards. 616 """ 617 try: 618 cached = _ansi_stream_wrappers.get(stream) 619 except Exception: 620 cached = None 621 if cached is not None: 622 return cached 623 strip = should_strip_ansi(stream, color) 624 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) 625 rv = ansi_wrapper.stream 626 _write = rv.write 627 628 def _safe_write(s): 629 try: 630 return _write(s) 631 except: 632 ansi_wrapper.reset_all() 633 raise 634 635 rv.write = _safe_write 636 try: 637 _ansi_stream_wrappers[stream] = rv 638 except Exception: 639 pass 640 return rv 641 642 def get_winterm_size(): 643 win = colorama.win32.GetConsoleScreenBufferInfo( 644 colorama.win32.STDOUT).srWindow 645 return win.Right - win.Left, win.Bottom - win.Top 646else: 647 def _get_argv_encoding(): 648 return getattr(sys.stdin, 'encoding', None) or get_filesystem_encoding() 649 650 _get_windows_console_stream = lambda *x: None 651 _wrap_std_stream = lambda *x: None 652 653 654def term_len(x): 655 return len(strip_ansi(x)) 656 657 658def isatty(stream): 659 try: 660 return stream.isatty() 661 except Exception: 662 return False 663 664 665def _make_cached_stream_func(src_func, wrapper_func): 666 cache = WeakKeyDictionary() 667 def func(): 668 stream = src_func() 669 try: 670 rv = cache.get(stream) 671 except Exception: 672 rv = None 673 if rv is not None: 674 return rv 675 rv = wrapper_func() 676 try: 677 stream = src_func() # In case wrapper_func() modified the stream 678 cache[stream] = rv 679 except Exception: 680 pass 681 return rv 682 return func 683 684 685_default_text_stdin = _make_cached_stream_func( 686 lambda: sys.stdin, get_text_stdin) 687_default_text_stdout = _make_cached_stream_func( 688 lambda: sys.stdout, get_text_stdout) 689_default_text_stderr = _make_cached_stream_func( 690 lambda: sys.stderr, get_text_stderr) 691 692 693binary_streams = { 694 'stdin': get_binary_stdin, 695 'stdout': get_binary_stdout, 696 'stderr': get_binary_stderr, 697} 698 699text_streams = { 700 'stdin': get_text_stdin, 701 'stdout': get_text_stdout, 702 'stderr': get_text_stderr, 703} 704