1# -*- coding: utf-8 -*-
2"""
3click._termui_impl
4~~~~~~~~~~~~~~~~~~
5
6This module contains implementations for the termui module. To keep the
7import time of Click down, some infrequently used functionality is
8placed in this module and only imported as needed.
9
10:copyright: © 2014 by the Pallets team.
11:license: BSD, see LICENSE.rst for more details.
12"""
13
14import os
15import sys
16import time
17import math
18import contextlib
19from ._compat import _default_text_stdout, range_type, PY2, isatty, \
20     open_stream, strip_ansi, term_len, get_best_encoding, WIN, int_types, \
21     CYGWIN
22from .utils import echo
23from .exceptions import ClickException
24
25
26if os.name == 'nt':
27    BEFORE_BAR = '\r'
28    AFTER_BAR = '\n'
29else:
30    BEFORE_BAR = '\r\033[?25l'
31    AFTER_BAR = '\033[?25h\n'
32
33
34def _length_hint(obj):
35    """Returns the length hint of an object."""
36    try:
37        return len(obj)
38    except (AttributeError, TypeError):
39        try:
40            get_hint = type(obj).__length_hint__
41        except AttributeError:
42            return None
43        try:
44            hint = get_hint(obj)
45        except TypeError:
46            return None
47        if hint is NotImplemented or \
48           not isinstance(hint, int_types) or \
49           hint < 0:
50            return None
51        return hint
52
53
54class ProgressBar(object):
55
56    def __init__(self, iterable, length=None, fill_char='#', empty_char=' ',
57                 bar_template='%(bar)s', info_sep='  ', show_eta=True,
58                 show_percent=None, show_pos=False, item_show_func=None,
59                 label=None, file=None, color=None, width=30):
60        self.fill_char = fill_char
61        self.empty_char = empty_char
62        self.bar_template = bar_template
63        self.info_sep = info_sep
64        self.show_eta = show_eta
65        self.show_percent = show_percent
66        self.show_pos = show_pos
67        self.item_show_func = item_show_func
68        self.label = label or ''
69        if file is None:
70            file = _default_text_stdout()
71        self.file = file
72        self.color = color
73        self.width = width
74        self.autowidth = width == 0
75
76        if length is None:
77            length = _length_hint(iterable)
78        if iterable is None:
79            if length is None:
80                raise TypeError('iterable or length is required')
81            iterable = range_type(length)
82        self.iter = iter(iterable)
83        self.length = length
84        self.length_known = length is not None
85        self.pos = 0
86        self.avg = []
87        self.start = self.last_eta = time.time()
88        self.eta_known = False
89        self.finished = False
90        self.max_width = None
91        self.entered = False
92        self.current_item = None
93        self.is_hidden = not isatty(self.file)
94        self._last_line = None
95        self.short_limit = 0.5
96
97    def __enter__(self):
98        self.entered = True
99        self.render_progress()
100        return self
101
102    def __exit__(self, exc_type, exc_value, tb):
103        self.render_finish()
104
105    def __iter__(self):
106        if not self.entered:
107            raise RuntimeError('You need to use progress bars in a with block.')
108        self.render_progress()
109        return self.generator()
110
111    def is_fast(self):
112        return time.time() - self.start <= self.short_limit
113
114    def render_finish(self):
115        if self.is_hidden or self.is_fast():
116            return
117        self.file.write(AFTER_BAR)
118        self.file.flush()
119
120    @property
121    def pct(self):
122        if self.finished:
123            return 1.0
124        return min(self.pos / (float(self.length) or 1), 1.0)
125
126    @property
127    def time_per_iteration(self):
128        if not self.avg:
129            return 0.0
130        return sum(self.avg) / float(len(self.avg))
131
132    @property
133    def eta(self):
134        if self.length_known and not self.finished:
135            return self.time_per_iteration * (self.length - self.pos)
136        return 0.0
137
138    def format_eta(self):
139        if self.eta_known:
140            t = int(self.eta)
141            seconds = t % 60
142            t //= 60
143            minutes = t % 60
144            t //= 60
145            hours = t % 24
146            t //= 24
147            if t > 0:
148                days = t
149                return '%dd %02d:%02d:%02d' % (days, hours, minutes, seconds)
150            else:
151                return '%02d:%02d:%02d' % (hours, minutes, seconds)
152        return ''
153
154    def format_pos(self):
155        pos = str(self.pos)
156        if self.length_known:
157            pos += '/%s' % self.length
158        return pos
159
160    def format_pct(self):
161        return ('% 4d%%' % int(self.pct * 100))[1:]
162
163    def format_bar(self):
164        if self.length_known:
165            bar_length = int(self.pct * self.width)
166            bar = self.fill_char * bar_length
167            bar += self.empty_char * (self.width - bar_length)
168        elif self.finished:
169            bar = self.fill_char * self.width
170        else:
171            bar = list(self.empty_char * (self.width or 1))
172            if self.time_per_iteration != 0:
173                bar[int((math.cos(self.pos * self.time_per_iteration)
174                    / 2.0 + 0.5) * self.width)] = self.fill_char
175            bar = ''.join(bar)
176        return bar
177
178    def format_progress_line(self):
179        show_percent = self.show_percent
180
181        info_bits = []
182        if self.length_known and show_percent is None:
183            show_percent = not self.show_pos
184
185        if self.show_pos:
186            info_bits.append(self.format_pos())
187        if show_percent:
188            info_bits.append(self.format_pct())
189        if self.show_eta and self.eta_known and not self.finished:
190            info_bits.append(self.format_eta())
191        if self.item_show_func is not None:
192            item_info = self.item_show_func(self.current_item)
193            if item_info is not None:
194                info_bits.append(item_info)
195
196        return (self.bar_template % {
197            'label': self.label,
198            'bar': self.format_bar(),
199            'info': self.info_sep.join(info_bits)
200        }).rstrip()
201
202    def render_progress(self):
203        from .termui import get_terminal_size
204
205        if self.is_hidden:
206            return
207
208        buf = []
209        # Update width in case the terminal has been resized
210        if self.autowidth:
211            old_width = self.width
212            self.width = 0
213            clutter_length = term_len(self.format_progress_line())
214            new_width = max(0, get_terminal_size()[0] - clutter_length)
215            if new_width < old_width:
216                buf.append(BEFORE_BAR)
217                buf.append(' ' * self.max_width)
218                self.max_width = new_width
219            self.width = new_width
220
221        clear_width = self.width
222        if self.max_width is not None:
223            clear_width = self.max_width
224
225        buf.append(BEFORE_BAR)
226        line = self.format_progress_line()
227        line_len = term_len(line)
228        if self.max_width is None or self.max_width < line_len:
229            self.max_width = line_len
230
231        buf.append(line)
232        buf.append(' ' * (clear_width - line_len))
233        line = ''.join(buf)
234        # Render the line only if it changed.
235
236        if line != self._last_line and not self.is_fast():
237            self._last_line = line
238            echo(line, file=self.file, color=self.color, nl=False)
239            self.file.flush()
240
241    def make_step(self, n_steps):
242        self.pos += n_steps
243        if self.length_known and self.pos >= self.length:
244            self.finished = True
245
246        if (time.time() - self.last_eta) < 1.0:
247            return
248
249        self.last_eta = time.time()
250
251        # self.avg is a rolling list of length <= 7 of steps where steps are
252        # defined as time elapsed divided by the total progress through
253        # self.length.
254        if self.pos:
255            step = (time.time() - self.start) / self.pos
256        else:
257            step = time.time() - self.start
258
259        self.avg = self.avg[-6:] + [step]
260
261        self.eta_known = self.length_known
262
263    def update(self, n_steps):
264        self.make_step(n_steps)
265        self.render_progress()
266
267    def finish(self):
268        self.eta_known = 0
269        self.current_item = None
270        self.finished = True
271
272    def generator(self):
273        """
274        Returns a generator which yields the items added to the bar during
275        construction, and updates the progress bar *after* the yielded block
276        returns.
277        """
278        if not self.entered:
279            raise RuntimeError('You need to use progress bars in a with block.')
280
281        if self.is_hidden:
282            for rv in self.iter:
283                yield rv
284        else:
285            for rv in self.iter:
286                self.current_item = rv
287                yield rv
288                self.update(1)
289            self.finish()
290            self.render_progress()
291
292
293def pager(generator, color=None):
294    """Decide what method to use for paging through text."""
295    stdout = _default_text_stdout()
296    if not isatty(sys.stdin) or not isatty(stdout):
297        return _nullpager(stdout, generator, color)
298    pager_cmd = (os.environ.get('PAGER', None) or '').strip()
299    if pager_cmd:
300        if WIN:
301            return _tempfilepager(generator, pager_cmd, color)
302        return _pipepager(generator, pager_cmd, color)
303    if os.environ.get('TERM') in ('dumb', 'emacs'):
304        return _nullpager(stdout, generator, color)
305    if WIN or sys.platform.startswith('os2'):
306        return _tempfilepager(generator, 'more <', color)
307    if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
308        return _pipepager(generator, 'less', color)
309
310    import tempfile
311    fd, filename = tempfile.mkstemp()
312    os.close(fd)
313    try:
314        if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
315            return _pipepager(generator, 'more', color)
316        return _nullpager(stdout, generator, color)
317    finally:
318        os.unlink(filename)
319
320
321def _pipepager(generator, cmd, color):
322    """Page through text by feeding it to another program.  Invoking a
323    pager through this might support colors.
324    """
325    import subprocess
326    env = dict(os.environ)
327
328    # If we're piping to less we might support colors under the
329    # condition that
330    cmd_detail = cmd.rsplit('/', 1)[-1].split()
331    if color is None and cmd_detail[0] == 'less':
332        less_flags = os.environ.get('LESS', '') + ' '.join(cmd_detail[1:])
333        if not less_flags:
334            env['LESS'] = '-R'
335            color = True
336        elif 'r' in less_flags or 'R' in less_flags:
337            color = True
338
339    c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
340                         env=env)
341    encoding = get_best_encoding(c.stdin)
342    try:
343        for text in generator:
344            if not color:
345                text = strip_ansi(text)
346
347            c.stdin.write(text.encode(encoding, 'replace'))
348    except (IOError, KeyboardInterrupt):
349        pass
350    else:
351        c.stdin.close()
352
353    # Less doesn't respect ^C, but catches it for its own UI purposes (aborting
354    # search or other commands inside less).
355    #
356    # That means when the user hits ^C, the parent process (click) terminates,
357    # but less is still alive, paging the output and messing up the terminal.
358    #
359    # If the user wants to make the pager exit on ^C, they should set
360    # `LESS='-K'`. It's not our decision to make.
361    while True:
362        try:
363            c.wait()
364        except KeyboardInterrupt:
365            pass
366        else:
367            break
368
369
370def _tempfilepager(generator, cmd, color):
371    """Page through text by invoking a program on a temporary file."""
372    import tempfile
373    filename = tempfile.mktemp()
374    # TODO: This never terminates if the passed generator never terminates.
375    text = "".join(generator)
376    if not color:
377        text = strip_ansi(text)
378    encoding = get_best_encoding(sys.stdout)
379    with open_stream(filename, 'wb')[0] as f:
380        f.write(text.encode(encoding))
381    try:
382        os.system(cmd + ' "' + filename + '"')
383    finally:
384        os.unlink(filename)
385
386
387def _nullpager(stream, generator, color):
388    """Simply print unformatted text.  This is the ultimate fallback."""
389    for text in generator:
390        if not color:
391            text = strip_ansi(text)
392        stream.write(text)
393
394
395class Editor(object):
396
397    def __init__(self, editor=None, env=None, require_save=True,
398                 extension='.txt'):
399        self.editor = editor
400        self.env = env
401        self.require_save = require_save
402        self.extension = extension
403
404    def get_editor(self):
405        if self.editor is not None:
406            return self.editor
407        for key in 'VISUAL', 'EDITOR':
408            rv = os.environ.get(key)
409            if rv:
410                return rv
411        if WIN:
412            return 'notepad'
413        for editor in 'vim', 'nano':
414            if os.system('which %s >/dev/null 2>&1' % editor) == 0:
415                return editor
416        return 'vi'
417
418    def edit_file(self, filename):
419        import subprocess
420        editor = self.get_editor()
421        if self.env:
422            environ = os.environ.copy()
423            environ.update(self.env)
424        else:
425            environ = None
426        try:
427            c = subprocess.Popen('%s "%s"' % (editor, filename),
428                                 env=environ, shell=True)
429            exit_code = c.wait()
430            if exit_code != 0:
431                raise ClickException('%s: Editing failed!' % editor)
432        except OSError as e:
433            raise ClickException('%s: Editing failed: %s' % (editor, e))
434
435    def edit(self, text):
436        import tempfile
437
438        text = text or ''
439        if text and not text.endswith('\n'):
440            text += '\n'
441
442        fd, name = tempfile.mkstemp(prefix='editor-', suffix=self.extension)
443        try:
444            if WIN:
445                encoding = 'utf-8-sig'
446                text = text.replace('\n', '\r\n')
447            else:
448                encoding = 'utf-8'
449            text = text.encode(encoding)
450
451            f = os.fdopen(fd, 'wb')
452            f.write(text)
453            f.close()
454            timestamp = os.path.getmtime(name)
455
456            self.edit_file(name)
457
458            if self.require_save \
459               and os.path.getmtime(name) == timestamp:
460                return None
461
462            f = open(name, 'rb')
463            try:
464                rv = f.read()
465            finally:
466                f.close()
467            return rv.decode('utf-8-sig').replace('\r\n', '\n')
468        finally:
469            os.unlink(name)
470
471
472def open_url(url, wait=False, locate=False):
473    import subprocess
474
475    def _unquote_file(url):
476        try:
477            import urllib
478        except ImportError:
479            import urllib
480        if url.startswith('file://'):
481            url = urllib.unquote(url[7:])
482        return url
483
484    if sys.platform == 'darwin':
485        args = ['open']
486        if wait:
487            args.append('-W')
488        if locate:
489            args.append('-R')
490        args.append(_unquote_file(url))
491        null = open('/dev/null', 'w')
492        try:
493            return subprocess.Popen(args, stderr=null).wait()
494        finally:
495            null.close()
496    elif WIN:
497        if locate:
498            url = _unquote_file(url)
499            args = 'explorer /select,"%s"' % _unquote_file(
500                url.replace('"', ''))
501        else:
502            args = 'start %s "" "%s"' % (
503                wait and '/WAIT' or '', url.replace('"', ''))
504        return os.system(args)
505    elif CYGWIN:
506        if locate:
507            url = _unquote_file(url)
508            args = 'cygstart "%s"' % (os.path.dirname(url).replace('"', ''))
509        else:
510            args = 'cygstart %s "%s"' % (
511                wait and '-w' or '', url.replace('"', ''))
512        return os.system(args)
513
514    try:
515        if locate:
516            url = os.path.dirname(_unquote_file(url)) or '.'
517        else:
518            url = _unquote_file(url)
519        c = subprocess.Popen(['xdg-open', url])
520        if wait:
521            return c.wait()
522        return 0
523    except OSError:
524        if url.startswith(('http://', 'https://')) and not locate and not wait:
525            import webbrowser
526            webbrowser.open(url)
527            return 0
528        return 1
529
530
531def _translate_ch_to_exc(ch):
532    if ch == u'\x03':
533        raise KeyboardInterrupt()
534    if ch == u'\x04' and not WIN:  # Unix-like, Ctrl+D
535        raise EOFError()
536    if ch == u'\x1a' and WIN:      # Windows, Ctrl+Z
537        raise EOFError()
538
539
540if WIN:
541    import msvcrt
542
543    @contextlib.contextmanager
544    def raw_terminal():
545        yield
546
547    def getchar(echo):
548        # The function `getch` will return a bytes object corresponding to
549        # the pressed character. Since Windows 10 build 1803, it will also
550        # return \x00 when called a second time after pressing a regular key.
551        #
552        # `getwch` does not share this probably-bugged behavior. Moreover, it
553        # returns a Unicode object by default, which is what we want.
554        #
555        # Either of these functions will return \x00 or \xe0 to indicate
556        # a special key, and you need to call the same function again to get
557        # the "rest" of the code. The fun part is that \u00e0 is
558        # "latin small letter a with grave", so if you type that on a French
559        # keyboard, you _also_ get a \xe0.
560        # E.g., consider the Up arrow. This returns \xe0 and then \x48. The
561        # resulting Unicode string reads as "a with grave" + "capital H".
562        # This is indistinguishable from when the user actually types
563        # "a with grave" and then "capital H".
564        #
565        # When \xe0 is returned, we assume it's part of a special-key sequence
566        # and call `getwch` again, but that means that when the user types
567        # the \u00e0 character, `getchar` doesn't return until a second
568        # character is typed.
569        # The alternative is returning immediately, but that would mess up
570        # cross-platform handling of arrow keys and others that start with
571        # \xe0. Another option is using `getch`, but then we can't reliably
572        # read non-ASCII characters, because return values of `getch` are
573        # limited to the current 8-bit codepage.
574        #
575        # Anyway, Click doesn't claim to do this Right(tm), and using `getwch`
576        # is doing the right thing in more situations than with `getch`.
577        if echo:
578            func = msvcrt.getwche
579        else:
580            func = msvcrt.getwch
581
582        rv = func()
583        if rv in (u'\x00', u'\xe0'):
584            # \x00 and \xe0 are control characters that indicate special key,
585            # see above.
586            rv += func()
587        _translate_ch_to_exc(rv)
588        return rv
589else:
590    import tty
591    import termios
592
593    @contextlib.contextmanager
594    def raw_terminal():
595        if not isatty(sys.stdin):
596            f = open('/dev/tty')
597            fd = f.fileno()
598        else:
599            fd = sys.stdin.fileno()
600            f = None
601        try:
602            old_settings = termios.tcgetattr(fd)
603            try:
604                tty.setraw(fd)
605                yield fd
606            finally:
607                termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
608                sys.stdout.flush()
609                if f is not None:
610                    f.close()
611        except termios.error:
612            pass
613
614    def getchar(echo):
615        with raw_terminal() as fd:
616            ch = os.read(fd, 32)
617            ch = ch.decode(get_best_encoding(sys.stdin), 'replace')
618            if echo and isatty(sys.stdout):
619                sys.stdout.write(ch)
620            _translate_ch_to_exc(ch)
621            return ch
622