1from __future__ import division
2
3import itertools
4import sys
5from signal import SIGINT, default_int_handler, signal
6
7from pip._vendor import six
8from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
9from pip._vendor.progress.spinner import Spinner
10
11from pip._internal.utils.compat import WINDOWS
12from pip._internal.utils.logging import get_indentation
13from pip._internal.utils.misc import format_size
14from pip._internal.utils.typing import MYPY_CHECK_RUNNING
15
16if MYPY_CHECK_RUNNING:
17    from typing import Any, Dict, List
18
19try:
20    from pip._vendor import colorama
21# Lots of different errors can come from this, including SystemError and
22# ImportError.
23except Exception:
24    colorama = None
25
26
27def _select_progress_class(preferred, fallback):
28    # type: (Bar, Bar) -> Bar
29    encoding = getattr(preferred.file, "encoding", None)
30
31    # If we don't know what encoding this file is in, then we'll just assume
32    # that it doesn't support unicode and use the ASCII bar.
33    if not encoding:
34        return fallback
35
36    # Collect all of the possible characters we want to use with the preferred
37    # bar.
38    characters = [
39        getattr(preferred, "empty_fill", six.text_type()),
40        getattr(preferred, "fill", six.text_type()),
41    ]
42    characters += list(getattr(preferred, "phases", []))
43
44    # Try to decode the characters we're using for the bar using the encoding
45    # of the given file, if this works then we'll assume that we can use the
46    # fancier bar and if not we'll fall back to the plaintext bar.
47    try:
48        six.text_type().join(characters).encode(encoding)
49    except UnicodeEncodeError:
50        return fallback
51    else:
52        return preferred
53
54
55_BaseBar = _select_progress_class(IncrementalBar, Bar)  # type: Any
56
57
58class InterruptibleMixin(object):
59    """
60    Helper to ensure that self.finish() gets called on keyboard interrupt.
61
62    This allows downloads to be interrupted without leaving temporary state
63    (like hidden cursors) behind.
64
65    This class is similar to the progress library's existing SigIntMixin
66    helper, but as of version 1.2, that helper has the following problems:
67
68    1. It calls sys.exit().
69    2. It discards the existing SIGINT handler completely.
70    3. It leaves its own handler in place even after an uninterrupted finish,
71       which will have unexpected delayed effects if the user triggers an
72       unrelated keyboard interrupt some time after a progress-displaying
73       download has already completed, for example.
74    """
75
76    def __init__(self, *args, **kwargs):
77        # type: (List[Any], Dict[Any, Any]) -> None
78        """
79        Save the original SIGINT handler for later.
80        """
81        # https://github.com/python/mypy/issues/5887
82        super(InterruptibleMixin, self).__init__(  # type: ignore
83            *args,
84            **kwargs
85        )
86
87        self.original_handler = signal(SIGINT, self.handle_sigint)
88
89        # If signal() returns None, the previous handler was not installed from
90        # Python, and we cannot restore it. This probably should not happen,
91        # but if it does, we must restore something sensible instead, at least.
92        # The least bad option should be Python's default SIGINT handler, which
93        # just raises KeyboardInterrupt.
94        if self.original_handler is None:
95            self.original_handler = default_int_handler
96
97    def finish(self):
98        # type: () -> None
99        """
100        Restore the original SIGINT handler after finishing.
101
102        This should happen regardless of whether the progress display finishes
103        normally, or gets interrupted.
104        """
105        super(InterruptibleMixin, self).finish()  # type: ignore
106        signal(SIGINT, self.original_handler)
107
108    def handle_sigint(self, signum, frame):  # type: ignore
109        """
110        Call self.finish() before delegating to the original SIGINT handler.
111
112        This handler should only be in place while the progress display is
113        active.
114        """
115        self.finish()
116        self.original_handler(signum, frame)
117
118
119class SilentBar(Bar):
120
121    def update(self):
122        # type: () -> None
123        pass
124
125
126class BlueEmojiBar(IncrementalBar):
127
128    suffix = "%(percent)d%%"
129    bar_prefix = " "
130    bar_suffix = " "
131    phases = (u"\U0001F539", u"\U0001F537", u"\U0001F535")  # type: Any
132
133
134class DownloadProgressMixin(object):
135
136    def __init__(self, *args, **kwargs):
137        # type: (List[Any], Dict[Any, Any]) -> None
138        # https://github.com/python/mypy/issues/5887
139        super(DownloadProgressMixin, self).__init__(  # type: ignore
140            *args,
141            **kwargs
142        )
143        self.message = (" " * (
144            get_indentation() + 2
145        )) + self.message  # type: str
146
147    @property
148    def downloaded(self):
149        # type: () -> str
150        return format_size(self.index)  # type: ignore
151
152    @property
153    def download_speed(self):
154        # type: () -> str
155        # Avoid zero division errors...
156        if self.avg == 0.0:  # type: ignore
157            return "..."
158        return format_size(1 / self.avg) + "/s"  # type: ignore
159
160    @property
161    def pretty_eta(self):
162        # type: () -> str
163        if self.eta:  # type: ignore
164            return "eta {}".format(self.eta_td)  # type: ignore
165        return ""
166
167    def iter(self, it):  # type: ignore
168        for x in it:
169            yield x
170            # B305 is incorrectly raised here
171            # https://github.com/PyCQA/flake8-bugbear/issues/59
172            self.next(len(x))  # noqa: B305
173        self.finish()
174
175
176class WindowsMixin(object):
177
178    def __init__(self, *args, **kwargs):
179        # type: (List[Any], Dict[Any, Any]) -> None
180        # The Windows terminal does not support the hide/show cursor ANSI codes
181        # even with colorama. So we'll ensure that hide_cursor is False on
182        # Windows.
183        # This call needs to go before the super() call, so that hide_cursor
184        # is set in time. The base progress bar class writes the "hide cursor"
185        # code to the terminal in its init, so if we don't set this soon
186        # enough, we get a "hide" with no corresponding "show"...
187        if WINDOWS and self.hide_cursor:  # type: ignore
188            self.hide_cursor = False
189
190        # https://github.com/python/mypy/issues/5887
191        super(WindowsMixin, self).__init__(*args, **kwargs)  # type: ignore
192
193        # Check if we are running on Windows and we have the colorama module,
194        # if we do then wrap our file with it.
195        if WINDOWS and colorama:
196            self.file = colorama.AnsiToWin32(self.file)  # type: ignore
197            # The progress code expects to be able to call self.file.isatty()
198            # but the colorama.AnsiToWin32() object doesn't have that, so we'll
199            # add it.
200            self.file.isatty = lambda: self.file.wrapped.isatty()
201            # The progress code expects to be able to call self.file.flush()
202            # but the colorama.AnsiToWin32() object doesn't have that, so we'll
203            # add it.
204            self.file.flush = lambda: self.file.wrapped.flush()
205
206
207class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin,
208                              DownloadProgressMixin):
209
210    file = sys.stdout
211    message = "%(percent)d%%"
212    suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
213
214
215class DefaultDownloadProgressBar(BaseDownloadProgressBar,
216                                 _BaseBar):
217    pass
218
219
220class DownloadSilentBar(BaseDownloadProgressBar, SilentBar):
221    pass
222
223
224class DownloadBar(BaseDownloadProgressBar,
225                  Bar):
226    pass
227
228
229class DownloadFillingCirclesBar(BaseDownloadProgressBar,
230                                FillingCirclesBar):
231    pass
232
233
234class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar,
235                                   BlueEmojiBar):
236    pass
237
238
239class DownloadProgressSpinner(WindowsMixin, InterruptibleMixin,
240                              DownloadProgressMixin, Spinner):
241
242    file = sys.stdout
243    suffix = "%(downloaded)s %(download_speed)s"
244
245    def next_phase(self):
246        # type: () -> str
247        if not hasattr(self, "_phaser"):
248            self._phaser = itertools.cycle(self.phases)
249        return next(self._phaser)
250
251    def update(self):
252        # type: () -> None
253        message = self.message % self
254        phase = self.next_phase()
255        suffix = self.suffix % self
256        line = ''.join([
257            message,
258            " " if message else "",
259            phase,
260            " " if suffix else "",
261            suffix,
262        ])
263
264        self.writeln(line)
265
266
267BAR_TYPES = {
268    "off": (DownloadSilentBar, DownloadSilentBar),
269    "on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
270    "ascii": (DownloadBar, DownloadProgressSpinner),
271    "pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
272    "emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner)
273}
274
275
276def DownloadProgressProvider(progress_bar, max=None):  # type: ignore
277    if max is None or max == 0:
278        return BAR_TYPES[progress_bar][1]().iter
279    else:
280        return BAR_TYPES[progress_bar][0](max=max).iter
281