1import re
2import io
3import os
4import sys
5import codecs
6from weakref import WeakKeyDictionary
7
8
9PY2 = sys.version_info[0] == 2
10WIN = sys.platform.startswith('win')
11DEFAULT_COLUMNS = 80
12
13
14_ansi_re = re.compile('\033\[((?:\d|;)*)([a-zA-Z])')
15
16
17def get_filesystem_encoding():
18    return sys.getfilesystemencoding() or sys.getdefaultencoding()
19
20
21def _make_text_stream(stream, encoding, errors):
22    if encoding is None:
23        encoding = get_best_encoding(stream)
24    if errors is None:
25        errors = 'replace'
26    return _NonClosingTextIOWrapper(stream, encoding, errors,
27                                    line_buffering=True)
28
29
30def is_ascii_encoding(encoding):
31    """Checks if a given encoding is ascii."""
32    try:
33        return codecs.lookup(encoding).name == 'ascii'
34    except LookupError:
35        return False
36
37
38def get_best_encoding(stream):
39    """Returns the default stream encoding if not found."""
40    rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding()
41    if is_ascii_encoding(rv):
42        return 'utf-8'
43    return rv
44
45
46class _NonClosingTextIOWrapper(io.TextIOWrapper):
47
48    def __init__(self, stream, encoding, errors, **extra):
49        self._stream = stream = _FixupStream(stream)
50        io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra)
51
52    # The io module is a place where the Python 3 text behavior
53    # was forced upon Python 2, so we need to unbreak
54    # it to look like Python 2.
55    if PY2:
56        def write(self, x):
57            if isinstance(x, str) or is_bytes(x):
58                try:
59                    self.flush()
60                except Exception:
61                    pass
62                return self.buffer.write(str(x))
63            return io.TextIOWrapper.write(self, x)
64
65        def writelines(self, lines):
66            for line in lines:
67                self.write(line)
68
69    def __del__(self):
70        try:
71            self.detach()
72        except Exception:
73            pass
74
75    def isatty(self):
76        # https://bitbucket.org/pypy/pypy/issue/1803
77        return self._stream.isatty()
78
79
80class _FixupStream(object):
81    """The new io interface needs more from streams than streams
82    traditionally implement.  As such, this fix-up code is necessary in
83    some circumstances.
84    """
85
86    def __init__(self, stream):
87        self._stream = stream
88
89    def __getattr__(self, name):
90        return getattr(self._stream, name)
91
92    def read1(self, size):
93        f = getattr(self._stream, 'read1', None)
94        if f is not None:
95            return f(size)
96        # We only dispatch to readline instead of read in Python 2 as we
97        # do not want cause problems with the different implementation
98        # of line buffering.
99        if PY2:
100            return self._stream.readline(size)
101        return self._stream.read(size)
102
103    def readable(self):
104        x = getattr(self._stream, 'readable', None)
105        if x is not None:
106            return x()
107        try:
108            self._stream.read(0)
109        except Exception:
110            return False
111        return True
112
113    def writable(self):
114        x = getattr(self._stream, 'writable', None)
115        if x is not None:
116            return x()
117        try:
118            self._stream.write('')
119        except Exception:
120            try:
121                self._stream.write(b'')
122            except Exception:
123                return False
124        return True
125
126    def seekable(self):
127        x = getattr(self._stream, 'seekable', None)
128        if x is not None:
129            return x()
130        try:
131            self._stream.seek(self._stream.tell())
132        except Exception:
133            return False
134        return True
135
136
137if PY2:
138    text_type = unicode
139    bytes = str
140    raw_input = raw_input
141    string_types = (str, unicode)
142    iteritems = lambda x: x.iteritems()
143    range_type = xrange
144
145    def is_bytes(x):
146        return isinstance(x, (buffer, bytearray))
147
148    _identifier_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
149
150    # For Windows, we need to force stdout/stdin/stderr to binary if it's
151    # fetched for that.  This obviously is not the most correct way to do
152    # it as it changes global state.  Unfortunately, there does not seem to
153    # be a clear better way to do it as just reopening the file in binary
154    # mode does not change anything.
155    #
156    # An option would be to do what Python 3 does and to open the file as
157    # binary only, patch it back to the system, and then use a wrapper
158    # stream that converts newlines.  It's not quite clear what's the
159    # correct option here.
160    #
161    # This code also lives in _winconsole for the fallback to the console
162    # emulation stream.
163    #
164    # There are also Windows environments where the `msvcrt` module is not
165    # available (which is why we use try-catch instead of the WIN variable
166    # here), such as the Google App Engine development server on Windows. In
167    # those cases there is just nothing we can do.
168    try:
169        import msvcrt
170    except ImportError:
171        set_binary_mode = lambda x: x
172    else:
173        def set_binary_mode(f):
174            try:
175                fileno = f.fileno()
176            except Exception:
177                pass
178            else:
179                msvcrt.setmode(fileno, os.O_BINARY)
180            return f
181
182    def isidentifier(x):
183        return _identifier_re.search(x) is not None
184
185    def get_binary_stdin():
186        return set_binary_mode(sys.stdin)
187
188    def get_binary_stdout():
189        return set_binary_mode(sys.stdout)
190
191    def get_binary_stderr():
192        return set_binary_mode(sys.stderr)
193
194    def get_text_stdin(encoding=None, errors=None):
195        rv = _get_windows_console_stream(sys.stdin, encoding, errors)
196        if rv is not None:
197            return rv
198        return _make_text_stream(sys.stdin, encoding, errors)
199
200    def get_text_stdout(encoding=None, errors=None):
201        rv = _get_windows_console_stream(sys.stdout, encoding, errors)
202        if rv is not None:
203            return rv
204        return _make_text_stream(sys.stdout, encoding, errors)
205
206    def get_text_stderr(encoding=None, errors=None):
207        rv = _get_windows_console_stream(sys.stderr, encoding, errors)
208        if rv is not None:
209            return rv
210        return _make_text_stream(sys.stderr, encoding, errors)
211
212    def filename_to_ui(value):
213        if isinstance(value, bytes):
214            value = value.decode(get_filesystem_encoding(), 'replace')
215        return value
216else:
217    import io
218    text_type = str
219    raw_input = input
220    string_types = (str,)
221    range_type = range
222    isidentifier = lambda x: x.isidentifier()
223    iteritems = lambda x: iter(x.items())
224
225    def is_bytes(x):
226        return isinstance(x, (bytes, memoryview, bytearray))
227
228    def _is_binary_reader(stream, default=False):
229        try:
230            return isinstance(stream.read(0), bytes)
231        except Exception:
232            return default
233            # This happens in some cases where the stream was already
234            # closed.  In this case, we assume the default.
235
236    def _is_binary_writer(stream, default=False):
237        try:
238            stream.write(b'')
239        except Exception:
240            try:
241                stream.write('')
242                return False
243            except Exception:
244                pass
245            return default
246        return True
247
248    def _find_binary_reader(stream):
249        # We need to figure out if the given stream is already binary.
250        # This can happen because the official docs recommend detaching
251        # the streams to get binary streams.  Some code might do this, so
252        # we need to deal with this case explicitly.
253        if _is_binary_reader(stream, False):
254            return stream
255
256        buf = getattr(stream, 'buffer', None)
257
258        # Same situation here; this time we assume that the buffer is
259        # actually binary in case it's closed.
260        if buf is not None and _is_binary_reader(buf, True):
261            return buf
262
263    def _find_binary_writer(stream):
264        # We need to figure out if the given stream is already binary.
265        # This can happen because the official docs recommend detatching
266        # the streams to get binary streams.  Some code might do this, so
267        # we need to deal with this case explicitly.
268        if _is_binary_writer(stream, False):
269            return stream
270
271        buf = getattr(stream, 'buffer', None)
272
273        # Same situation here; this time we assume that the buffer is
274        # actually binary in case it's closed.
275        if buf is not None and _is_binary_writer(buf, True):
276            return buf
277
278    def _stream_is_misconfigured(stream):
279        """A stream is misconfigured if its encoding is ASCII."""
280        # If the stream does not have an encoding set, we assume it's set
281        # to ASCII.  This appears to happen in certain unittest
282        # environments.  It's not quite clear what the correct behavior is
283        # but this at least will force Click to recover somehow.
284        return is_ascii_encoding(getattr(stream, 'encoding', None) or 'ascii')
285
286    def _is_compatible_text_stream(stream, encoding, errors):
287        stream_encoding = getattr(stream, 'encoding', None)
288        stream_errors = getattr(stream, 'errors', None)
289
290        # Perfect match.
291        if stream_encoding == encoding and stream_errors == errors:
292            return True
293
294        # Otherwise, it's only a compatible stream if we did not ask for
295        # an encoding.
296        if encoding is None:
297            return stream_encoding is not None
298
299        return False
300
301    def _force_correct_text_reader(text_reader, encoding, errors):
302        if _is_binary_reader(text_reader, False):
303            binary_reader = text_reader
304        else:
305            # If there is no target encoding set, we need to verify that the
306            # reader is not actually misconfigured.
307            if encoding is None and not _stream_is_misconfigured(text_reader):
308                return text_reader
309
310            if _is_compatible_text_stream(text_reader, encoding, errors):
311                return text_reader
312
313            # If the reader has no encoding, we try to find the underlying
314            # binary reader for it.  If that fails because the environment is
315            # misconfigured, we silently go with the same reader because this
316            # is too common to happen.  In that case, mojibake is better than
317            # exceptions.
318            binary_reader = _find_binary_reader(text_reader)
319            if binary_reader is None:
320                return text_reader
321
322        # At this point, we default the errors to replace instead of strict
323        # because nobody handles those errors anyways and at this point
324        # we're so fundamentally fucked that nothing can repair it.
325        if errors is None:
326            errors = 'replace'
327        return _make_text_stream(binary_reader, encoding, errors)
328
329    def _force_correct_text_writer(text_writer, encoding, errors):
330        if _is_binary_writer(text_writer, False):
331            binary_writer = text_writer
332        else:
333            # If there is no target encoding set, we need to verify that the
334            # writer is not actually misconfigured.
335            if encoding is None and not _stream_is_misconfigured(text_writer):
336                return text_writer
337
338            if _is_compatible_text_stream(text_writer, encoding, errors):
339                return text_writer
340
341            # If the writer has no encoding, we try to find the underlying
342            # binary writer for it.  If that fails because the environment is
343            # misconfigured, we silently go with the same writer because this
344            # is too common to happen.  In that case, mojibake is better than
345            # exceptions.
346            binary_writer = _find_binary_writer(text_writer)
347            if binary_writer is None:
348                return text_writer
349
350        # At this point, we default the errors to replace instead of strict
351        # because nobody handles those errors anyways and at this point
352        # we're so fundamentally fucked that nothing can repair it.
353        if errors is None:
354            errors = 'replace'
355        return _make_text_stream(binary_writer, encoding, errors)
356
357    def get_binary_stdin():
358        reader = _find_binary_reader(sys.stdin)
359        if reader is None:
360            raise RuntimeError('Was not able to determine binary '
361                               'stream for sys.stdin.')
362        return reader
363
364    def get_binary_stdout():
365        writer = _find_binary_writer(sys.stdout)
366        if writer is None:
367            raise RuntimeError('Was not able to determine binary '
368                               'stream for sys.stdout.')
369        return writer
370
371    def get_binary_stderr():
372        writer = _find_binary_writer(sys.stderr)
373        if writer is None:
374            raise RuntimeError('Was not able to determine binary '
375                               'stream for sys.stderr.')
376        return writer
377
378    def get_text_stdin(encoding=None, errors=None):
379        rv = _get_windows_console_stream(sys.stdin, encoding, errors)
380        if rv is not None:
381            return rv
382        return _force_correct_text_reader(sys.stdin, encoding, errors)
383
384    def get_text_stdout(encoding=None, errors=None):
385        rv = _get_windows_console_stream(sys.stdout, encoding, errors)
386        if rv is not None:
387            return rv
388        return _force_correct_text_writer(sys.stdout, encoding, errors)
389
390    def get_text_stderr(encoding=None, errors=None):
391        rv = _get_windows_console_stream(sys.stderr, encoding, errors)
392        if rv is not None:
393            return rv
394        return _force_correct_text_writer(sys.stderr, encoding, errors)
395
396    def filename_to_ui(value):
397        if isinstance(value, bytes):
398            value = value.decode(get_filesystem_encoding(), 'replace')
399        else:
400            value = value.encode('utf-8', 'surrogateescape') \
401                .decode('utf-8', 'replace')
402        return value
403
404
405def get_streerror(e, default=None):
406    if hasattr(e, 'strerror'):
407        msg = e.strerror
408    else:
409        if default is not None:
410            msg = default
411        else:
412            msg = str(e)
413    if isinstance(msg, bytes):
414        msg = msg.decode('utf-8', 'replace')
415    return msg
416
417
418def open_stream(filename, mode='r', encoding=None, errors='strict',
419                atomic=False):
420    # Standard streams first.  These are simple because they don't need
421    # special handling for the atomic flag.  It's entirely ignored.
422    if filename == '-':
423        if 'w' in mode:
424            if 'b' in mode:
425                return get_binary_stdout(), False
426            return get_text_stdout(encoding=encoding, errors=errors), False
427        if 'b' in mode:
428            return get_binary_stdin(), False
429        return get_text_stdin(encoding=encoding, errors=errors), False
430
431    # Non-atomic writes directly go out through the regular open functions.
432    if not atomic:
433        if encoding is None:
434            return open(filename, mode), True
435        return io.open(filename, mode, encoding=encoding, errors=errors), True
436
437    # Some usability stuff for atomic writes
438    if 'a' in mode:
439        raise ValueError(
440            'Appending to an existing file is not supported, because that '
441            'would involve an expensive `copy`-operation to a temporary '
442            'file. Open the file in normal `w`-mode and copy explicitly '
443            'if that\'s what you\'re after.'
444        )
445    if 'x' in mode:
446        raise ValueError('Use the `overwrite`-parameter instead.')
447    if 'w' not in mode:
448        raise ValueError('Atomic writes only make sense with `w`-mode.')
449
450    # Atomic writes are more complicated.  They work by opening a file
451    # as a proxy in the same folder and then using the fdopen
452    # functionality to wrap it in a Python file.  Then we wrap it in an
453    # atomic file that moves the file over on close.
454    import tempfile
455    fd, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename),
456                                        prefix='.__atomic-write')
457
458    if encoding is not None:
459        f = io.open(fd, mode, encoding=encoding, errors=errors)
460    else:
461        f = os.fdopen(fd, mode)
462
463    return _AtomicFile(f, tmp_filename, filename), True
464
465
466# Used in a destructor call, needs extra protection from interpreter cleanup.
467if hasattr(os, 'replace'):
468    _replace = os.replace
469    _can_replace = True
470else:
471    _replace = os.rename
472    _can_replace = not WIN
473
474
475class _AtomicFile(object):
476
477    def __init__(self, f, tmp_filename, real_filename):
478        self._f = f
479        self._tmp_filename = tmp_filename
480        self._real_filename = real_filename
481        self.closed = False
482
483    @property
484    def name(self):
485        return self._real_filename
486
487    def close(self, delete=False):
488        if self.closed:
489            return
490        self._f.close()
491        if not _can_replace:
492            try:
493                os.remove(self._real_filename)
494            except OSError:
495                pass
496        _replace(self._tmp_filename, self._real_filename)
497        self.closed = True
498
499    def __getattr__(self, name):
500        return getattr(self._f, name)
501
502    def __enter__(self):
503        return self
504
505    def __exit__(self, exc_type, exc_value, tb):
506        self.close(delete=exc_type is not None)
507
508    def __repr__(self):
509        return repr(self._f)
510
511
512auto_wrap_for_ansi = None
513colorama = None
514get_winterm_size = None
515
516
517def strip_ansi(value):
518    return _ansi_re.sub('', value)
519
520
521def should_strip_ansi(stream=None, color=None):
522    if color is None:
523        if stream is None:
524            stream = sys.stdin
525        return not isatty(stream)
526    return not color
527
528
529# If we're on Windows, we provide transparent integration through
530# colorama.  This will make ANSI colors through the echo function
531# work automatically.
532if WIN:
533    # Windows has a smaller terminal
534    DEFAULT_COLUMNS = 79
535
536    from ._winconsole import _get_windows_console_stream
537
538    def _get_argv_encoding():
539        import locale
540        return locale.getpreferredencoding()
541
542    if PY2:
543        def raw_input(prompt=''):
544            sys.stderr.flush()
545            if prompt:
546                stdout = _default_text_stdout()
547                stdout.write(prompt)
548            stdin = _default_text_stdin()
549            return stdin.readline().rstrip('\r\n')
550
551    try:
552        import colorama
553    except ImportError:
554        pass
555    else:
556        _ansi_stream_wrappers = WeakKeyDictionary()
557
558        def auto_wrap_for_ansi(stream, color=None):
559            """This function wraps a stream so that calls through colorama
560            are issued to the win32 console API to recolor on demand.  It
561            also ensures to reset the colors if a write call is interrupted
562            to not destroy the console afterwards.
563            """
564            try:
565                cached = _ansi_stream_wrappers.get(stream)
566            except Exception:
567                cached = None
568            if cached is not None:
569                return cached
570            strip = should_strip_ansi(stream, color)
571            ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
572            rv = ansi_wrapper.stream
573            _write = rv.write
574
575            def _safe_write(s):
576                try:
577                    return _write(s)
578                except:
579                    ansi_wrapper.reset_all()
580                    raise
581
582            rv.write = _safe_write
583            try:
584                _ansi_stream_wrappers[stream] = rv
585            except Exception:
586                pass
587            return rv
588
589        def get_winterm_size():
590            win = colorama.win32.GetConsoleScreenBufferInfo(
591                colorama.win32.STDOUT).srWindow
592            return win.Right - win.Left, win.Bottom - win.Top
593else:
594    def _get_argv_encoding():
595        return getattr(sys.stdin, 'encoding', None) or get_filesystem_encoding()
596
597    _get_windows_console_stream = lambda *x: None
598
599
600def term_len(x):
601    return len(strip_ansi(x))
602
603
604def isatty(stream):
605    try:
606        return stream.isatty()
607    except Exception:
608        return False
609
610
611def _make_cached_stream_func(src_func, wrapper_func):
612    cache = WeakKeyDictionary()
613    def func():
614        stream = src_func()
615        try:
616            rv = cache.get(stream)
617        except Exception:
618            rv = None
619        if rv is not None:
620            return rv
621        rv = wrapper_func()
622        try:
623            cache[stream] = rv
624        except Exception:
625            pass
626        return rv
627    return func
628
629
630_default_text_stdin = _make_cached_stream_func(
631    lambda: sys.stdin, get_text_stdin)
632_default_text_stdout = _make_cached_stream_func(
633    lambda: sys.stdout, get_text_stdout)
634_default_text_stderr = _make_cached_stream_func(
635    lambda: sys.stderr, get_text_stderr)
636
637
638binary_streams = {
639    'stdin': get_binary_stdin,
640    'stdout': get_binary_stdout,
641    'stderr': get_binary_stderr,
642}
643
644text_streams = {
645    'stdin': get_text_stdin,
646    'stdout': get_text_stdout,
647    'stderr': get_text_stderr,
648}
649