1# The following comment should be removed at some point in the future.
2# mypy: strict-optional=False
3# mypy: disallow-untyped-defs=False
4
5from __future__ import absolute_import
6
7import contextlib
8import errno
9import getpass
10import hashlib
11import io
12import logging
13import os
14import posixpath
15import shutil
16import stat
17import sys
18from collections import deque
19from itertools import tee
20
21from pip._vendor import pkg_resources
22from pip._vendor.packaging.utils import canonicalize_name
23
24# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is
25#       why we ignore the type on this import.
26from pip._vendor.retrying import retry  # type: ignore
27from pip._vendor.six import PY2, text_type
28from pip._vendor.six.moves import filter, filterfalse, input, map, zip_longest
29from pip._vendor.six.moves.urllib import parse as urllib_parse
30from pip._vendor.six.moves.urllib.parse import unquote as urllib_unquote
31
32from pip import __version__
33from pip._internal.exceptions import CommandError
34from pip._internal.locations import get_major_minor_version, site_packages, user_site
35from pip._internal.utils.compat import WINDOWS, expanduser, stdlib_pkgs, str_to_display
36from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast
37from pip._internal.utils.virtualenv import (
38    running_under_virtualenv,
39    virtualenv_no_global,
40)
41
42if PY2:
43    from io import BytesIO as StringIO
44else:
45    from io import StringIO
46
47if MYPY_CHECK_RUNNING:
48    from typing import (
49        Any,
50        AnyStr,
51        Callable,
52        Container,
53        Iterable,
54        Iterator,
55        List,
56        Optional,
57        Text,
58        Tuple,
59        TypeVar,
60        Union,
61    )
62
63    from pip._vendor.pkg_resources import Distribution
64
65    VersionInfo = Tuple[int, int, int]
66    T = TypeVar("T")
67
68
69__all__ = ['rmtree', 'display_path', 'backup_dir',
70           'ask', 'splitext',
71           'format_size', 'is_installable_dir',
72           'normalize_path',
73           'renames', 'get_prog',
74           'captured_stdout', 'ensure_dir',
75           'get_installed_version', 'remove_auth_from_url']
76
77
78logger = logging.getLogger(__name__)
79
80
81def get_pip_version():
82    # type: () -> str
83    pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
84    pip_pkg_dir = os.path.abspath(pip_pkg_dir)
85
86    return (
87        'pip {} from {} (python {})'.format(
88            __version__, pip_pkg_dir, get_major_minor_version(),
89        )
90    )
91
92
93def normalize_version_info(py_version_info):
94    # type: (Tuple[int, ...]) -> Tuple[int, int, int]
95    """
96    Convert a tuple of ints representing a Python version to one of length
97    three.
98
99    :param py_version_info: a tuple of ints representing a Python version,
100        or None to specify no version. The tuple can have any length.
101
102    :return: a tuple of length three if `py_version_info` is non-None.
103        Otherwise, return `py_version_info` unchanged (i.e. None).
104    """
105    if len(py_version_info) < 3:
106        py_version_info += (3 - len(py_version_info)) * (0,)
107    elif len(py_version_info) > 3:
108        py_version_info = py_version_info[:3]
109
110    return cast('VersionInfo', py_version_info)
111
112
113def ensure_dir(path):
114    # type: (AnyStr) -> None
115    """os.path.makedirs without EEXIST."""
116    try:
117        os.makedirs(path)
118    except OSError as e:
119        # Windows can raise spurious ENOTEMPTY errors. See #6426.
120        if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:
121            raise
122
123
124def get_prog():
125    # type: () -> str
126    try:
127        prog = os.path.basename(sys.argv[0])
128        if prog in ('__main__.py', '-c'):
129            return "{} -m pip".format(sys.executable)
130        else:
131            return prog
132    except (AttributeError, TypeError, IndexError):
133        pass
134    return 'pip'
135
136
137# Retry every half second for up to 3 seconds
138@retry(stop_max_delay=3000, wait_fixed=500)
139def rmtree(dir, ignore_errors=False):
140    # type: (AnyStr, bool) -> None
141    shutil.rmtree(dir, ignore_errors=ignore_errors,
142                  onerror=rmtree_errorhandler)
143
144
145def rmtree_errorhandler(func, path, exc_info):
146    """On Windows, the files in .svn are read-only, so when rmtree() tries to
147    remove them, an exception is thrown.  We catch that here, remove the
148    read-only attribute, and hopefully continue without problems."""
149    try:
150        has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE)
151    except (IOError, OSError):
152        # it's equivalent to os.path.exists
153        return
154
155    if has_attr_readonly:
156        # convert to read/write
157        os.chmod(path, stat.S_IWRITE)
158        # use the original function to repeat the operation
159        func(path)
160        return
161    else:
162        raise
163
164
165def path_to_display(path):
166    # type: (Optional[Union[str, Text]]) -> Optional[Text]
167    """
168    Convert a bytes (or text) path to text (unicode in Python 2) for display
169    and logging purposes.
170
171    This function should never error out. Also, this function is mainly needed
172    for Python 2 since in Python 3 str paths are already text.
173    """
174    if path is None:
175        return None
176    if isinstance(path, text_type):
177        return path
178    # Otherwise, path is a bytes object (str in Python 2).
179    try:
180        display_path = path.decode(sys.getfilesystemencoding(), 'strict')
181    except UnicodeDecodeError:
182        # Include the full bytes to make troubleshooting easier, even though
183        # it may not be very human readable.
184        if PY2:
185            # Convert the bytes to a readable str representation using
186            # repr(), and then convert the str to unicode.
187            #   Also, we add the prefix "b" to the repr() return value both
188            # to make the Python 2 output look like the Python 3 output, and
189            # to signal to the user that this is a bytes representation.
190            display_path = str_to_display('b{!r}'.format(path))
191        else:
192            # Silence the "F821 undefined name 'ascii'" flake8 error since
193            # in Python 3 ascii() is a built-in.
194            display_path = ascii(path)  # noqa: F821
195
196    return display_path
197
198
199def display_path(path):
200    # type: (Union[str, Text]) -> str
201    """Gives the display value for a given path, making it relative to cwd
202    if possible."""
203    path = os.path.normcase(os.path.abspath(path))
204    if sys.version_info[0] == 2:
205        path = path.decode(sys.getfilesystemencoding(), 'replace')
206        path = path.encode(sys.getdefaultencoding(), 'replace')
207    if path.startswith(os.getcwd() + os.path.sep):
208        path = '.' + path[len(os.getcwd()):]
209    return path
210
211
212def backup_dir(dir, ext='.bak'):
213    # type: (str, str) -> str
214    """Figure out the name of a directory to back up the given dir to
215    (adding .bak, .bak2, etc)"""
216    n = 1
217    extension = ext
218    while os.path.exists(dir + extension):
219        n += 1
220        extension = ext + str(n)
221    return dir + extension
222
223
224def ask_path_exists(message, options):
225    # type: (str, Iterable[str]) -> str
226    for action in os.environ.get('PIP_EXISTS_ACTION', '').split():
227        if action in options:
228            return action
229    return ask(message, options)
230
231
232def _check_no_input(message):
233    # type: (str) -> None
234    """Raise an error if no input is allowed."""
235    if os.environ.get('PIP_NO_INPUT'):
236        raise Exception(
237            'No input was expected ($PIP_NO_INPUT set); question: {}'.format(
238                message)
239        )
240
241
242def ask(message, options):
243    # type: (str, Iterable[str]) -> str
244    """Ask the message interactively, with the given possible responses"""
245    while 1:
246        _check_no_input(message)
247        response = input(message)
248        response = response.strip().lower()
249        if response not in options:
250            print(
251                'Your response ({!r}) was not one of the expected responses: '
252                '{}'.format(response, ', '.join(options))
253            )
254        else:
255            return response
256
257
258def ask_input(message):
259    # type: (str) -> str
260    """Ask for input interactively."""
261    _check_no_input(message)
262    return input(message)
263
264
265def ask_password(message):
266    # type: (str) -> str
267    """Ask for a password interactively."""
268    _check_no_input(message)
269    return getpass.getpass(message)
270
271
272def format_size(bytes):
273    # type: (float) -> str
274    if bytes > 1000 * 1000:
275        return '{:.1f} MB'.format(bytes / 1000.0 / 1000)
276    elif bytes > 10 * 1000:
277        return '{} kB'.format(int(bytes / 1000))
278    elif bytes > 1000:
279        return '{:.1f} kB'.format(bytes / 1000.0)
280    else:
281        return '{} bytes'.format(int(bytes))
282
283
284def tabulate(rows):
285    # type: (Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]
286    """Return a list of formatted rows and a list of column sizes.
287
288    For example::
289
290    >>> tabulate([['foobar', 2000], [0xdeadbeef]])
291    (['foobar     2000', '3735928559'], [10, 4])
292    """
293    rows = [tuple(map(str, row)) for row in rows]
294    sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue='')]
295    table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
296    return table, sizes
297
298
299def is_installable_dir(path):
300    # type: (str) -> bool
301    """Is path is a directory containing setup.py or pyproject.toml?
302    """
303    if not os.path.isdir(path):
304        return False
305    setup_py = os.path.join(path, 'setup.py')
306    if os.path.isfile(setup_py):
307        return True
308    pyproject_toml = os.path.join(path, 'pyproject.toml')
309    if os.path.isfile(pyproject_toml):
310        return True
311    return False
312
313
314def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
315    """Yield pieces of data from a file-like object until EOF."""
316    while True:
317        chunk = file.read(size)
318        if not chunk:
319            break
320        yield chunk
321
322
323def normalize_path(path, resolve_symlinks=True):
324    # type: (str, bool) -> str
325    """
326    Convert a path to its canonical, case-normalized, absolute version.
327
328    """
329    path = expanduser(path)
330    if resolve_symlinks:
331        path = os.path.realpath(path)
332    else:
333        path = os.path.abspath(path)
334    return os.path.normcase(path)
335
336
337def splitext(path):
338    # type: (str) -> Tuple[str, str]
339    """Like os.path.splitext, but take off .tar too"""
340    base, ext = posixpath.splitext(path)
341    if base.lower().endswith('.tar'):
342        ext = base[-4:] + ext
343        base = base[:-4]
344    return base, ext
345
346
347def renames(old, new):
348    # type: (str, str) -> None
349    """Like os.renames(), but handles renaming across devices."""
350    # Implementation borrowed from os.renames().
351    head, tail = os.path.split(new)
352    if head and tail and not os.path.exists(head):
353        os.makedirs(head)
354
355    shutil.move(old, new)
356
357    head, tail = os.path.split(old)
358    if head and tail:
359        try:
360            os.removedirs(head)
361        except OSError:
362            pass
363
364
365def is_local(path):
366    # type: (str) -> bool
367    """
368    Return True if path is within sys.prefix, if we're running in a virtualenv.
369
370    If we're not in a virtualenv, all paths are considered "local."
371
372    Caution: this function assumes the head of path has been normalized
373    with normalize_path.
374    """
375    if not running_under_virtualenv():
376        return True
377    return path.startswith(normalize_path(sys.prefix))
378
379
380def dist_is_local(dist):
381    # type: (Distribution) -> bool
382    """
383    Return True if given Distribution object is installed locally
384    (i.e. within current virtualenv).
385
386    Always True if we're not in a virtualenv.
387
388    """
389    return is_local(dist_location(dist))
390
391
392def dist_in_usersite(dist):
393    # type: (Distribution) -> bool
394    """
395    Return True if given Distribution is installed in user site.
396    """
397    return dist_location(dist).startswith(normalize_path(user_site))
398
399
400def dist_in_site_packages(dist):
401    # type: (Distribution) -> bool
402    """
403    Return True if given Distribution is installed in
404    sysconfig.get_python_lib().
405    """
406    return dist_location(dist).startswith(normalize_path(site_packages))
407
408
409def dist_is_editable(dist):
410    # type: (Distribution) -> bool
411    """
412    Return True if given Distribution is an editable install.
413    """
414    for path_item in sys.path:
415        egg_link = os.path.join(path_item, dist.project_name + '.egg-link')
416        if os.path.isfile(egg_link):
417            return True
418    return False
419
420
421def get_installed_distributions(
422        local_only=True,  # type: bool
423        skip=stdlib_pkgs,  # type: Container[str]
424        include_editables=True,  # type: bool
425        editables_only=False,  # type: bool
426        user_only=False,  # type: bool
427        paths=None  # type: Optional[List[str]]
428):
429    # type: (...) -> List[Distribution]
430    """
431    Return a list of installed Distribution objects.
432
433    If ``local_only`` is True (default), only return installations
434    local to the current virtualenv, if in a virtualenv.
435
436    ``skip`` argument is an iterable of lower-case project names to
437    ignore; defaults to stdlib_pkgs
438
439    If ``include_editables`` is False, don't report editables.
440
441    If ``editables_only`` is True , only report editables.
442
443    If ``user_only`` is True , only report installations in the user
444    site directory.
445
446    If ``paths`` is set, only report the distributions present at the
447    specified list of locations.
448    """
449    if paths:
450        working_set = pkg_resources.WorkingSet(paths)
451    else:
452        working_set = pkg_resources.working_set
453
454    if local_only:
455        local_test = dist_is_local
456    else:
457        def local_test(d):
458            return True
459
460    if include_editables:
461        def editable_test(d):
462            return True
463    else:
464        def editable_test(d):
465            return not dist_is_editable(d)
466
467    if editables_only:
468        def editables_only_test(d):
469            return dist_is_editable(d)
470    else:
471        def editables_only_test(d):
472            return True
473
474    if user_only:
475        user_test = dist_in_usersite
476    else:
477        def user_test(d):
478            return True
479
480    return [d for d in working_set
481            if local_test(d) and
482            d.key not in skip and
483            editable_test(d) and
484            editables_only_test(d) and
485            user_test(d)
486            ]
487
488
489def _search_distribution(req_name):
490    # type: (str) -> Optional[Distribution]
491    """Find a distribution matching the ``req_name`` in the environment.
492
493    This searches from *all* distributions available in the environment, to
494    match the behavior of ``pkg_resources.get_distribution()``.
495    """
496    # Canonicalize the name before searching in the list of
497    # installed distributions and also while creating the package
498    # dictionary to get the Distribution object
499    req_name = canonicalize_name(req_name)
500    packages = get_installed_distributions(
501        local_only=False,
502        skip=(),
503        include_editables=True,
504        editables_only=False,
505        user_only=False,
506        paths=None,
507    )
508    pkg_dict = {canonicalize_name(p.key): p for p in packages}
509    return pkg_dict.get(req_name)
510
511
512def get_distribution(req_name):
513    # type: (str) -> Optional[Distribution]
514    """Given a requirement name, return the installed Distribution object.
515
516    This searches from *all* distributions available in the environment, to
517    match the behavior of ``pkg_resources.get_distribution()``.
518    """
519
520    # Search the distribution by looking through the working set
521    dist = _search_distribution(req_name)
522
523    # If distribution could not be found, call working_set.require
524    # to update the working set, and try to find the distribution
525    # again.
526    # This might happen for e.g. when you install a package
527    # twice, once using setup.py develop and again using setup.py install.
528    # Now when run pip uninstall twice, the package gets removed
529    # from the working set in the first uninstall, so we have to populate
530    # the working set again so that pip knows about it and the packages
531    # gets picked up and is successfully uninstalled the second time too.
532    if not dist:
533        try:
534            pkg_resources.working_set.require(req_name)
535        except pkg_resources.DistributionNotFound:
536            return None
537    return _search_distribution(req_name)
538
539
540def egg_link_path(dist):
541    # type: (Distribution) -> Optional[str]
542    """
543    Return the path for the .egg-link file if it exists, otherwise, None.
544
545    There's 3 scenarios:
546    1) not in a virtualenv
547       try to find in site.USER_SITE, then site_packages
548    2) in a no-global virtualenv
549       try to find in site_packages
550    3) in a yes-global virtualenv
551       try to find in site_packages, then site.USER_SITE
552       (don't look in global location)
553
554    For #1 and #3, there could be odd cases, where there's an egg-link in 2
555    locations.
556
557    This method will just return the first one found.
558    """
559    sites = []
560    if running_under_virtualenv():
561        sites.append(site_packages)
562        if not virtualenv_no_global() and user_site:
563            sites.append(user_site)
564    else:
565        if user_site:
566            sites.append(user_site)
567        sites.append(site_packages)
568
569    for site in sites:
570        egglink = os.path.join(site, dist.project_name) + '.egg-link'
571        if os.path.isfile(egglink):
572            return egglink
573    return None
574
575
576def dist_location(dist):
577    # type: (Distribution) -> str
578    """
579    Get the site-packages location of this distribution. Generally
580    this is dist.location, except in the case of develop-installed
581    packages, where dist.location is the source code location, and we
582    want to know where the egg-link file is.
583
584    The returned location is normalized (in particular, with symlinks removed).
585    """
586    egg_link = egg_link_path(dist)
587    if egg_link:
588        return normalize_path(egg_link)
589    return normalize_path(dist.location)
590
591
592def write_output(msg, *args):
593    # type: (Any, Any) -> None
594    logger.info(msg, *args)
595
596
597class FakeFile(object):
598    """Wrap a list of lines in an object with readline() to make
599    ConfigParser happy."""
600    def __init__(self, lines):
601        self._gen = iter(lines)
602
603    def readline(self):
604        try:
605            return next(self._gen)
606        except StopIteration:
607            return ''
608
609    def __iter__(self):
610        return self._gen
611
612
613class StreamWrapper(StringIO):
614
615    @classmethod
616    def from_stream(cls, orig_stream):
617        cls.orig_stream = orig_stream
618        return cls()
619
620    # compileall.compile_dir() needs stdout.encoding to print to stdout
621    @property
622    def encoding(self):
623        return self.orig_stream.encoding
624
625
626@contextlib.contextmanager
627def captured_output(stream_name):
628    """Return a context manager used by captured_stdout/stdin/stderr
629    that temporarily replaces the sys stream *stream_name* with a StringIO.
630
631    Taken from Lib/support/__init__.py in the CPython repo.
632    """
633    orig_stdout = getattr(sys, stream_name)
634    setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
635    try:
636        yield getattr(sys, stream_name)
637    finally:
638        setattr(sys, stream_name, orig_stdout)
639
640
641def captured_stdout():
642    """Capture the output of sys.stdout:
643
644       with captured_stdout() as stdout:
645           print('hello')
646       self.assertEqual(stdout.getvalue(), 'hello\n')
647
648    Taken from Lib/support/__init__.py in the CPython repo.
649    """
650    return captured_output('stdout')
651
652
653def captured_stderr():
654    """
655    See captured_stdout().
656    """
657    return captured_output('stderr')
658
659
660def get_installed_version(dist_name, working_set=None):
661    """Get the installed version of dist_name avoiding pkg_resources cache"""
662    # Create a requirement that we'll look for inside of setuptools.
663    req = pkg_resources.Requirement.parse(dist_name)
664
665    if working_set is None:
666        # We want to avoid having this cached, so we need to construct a new
667        # working set each time.
668        working_set = pkg_resources.WorkingSet()
669
670    # Get the installed distribution from our working set
671    dist = working_set.find(req)
672
673    # Check to see if we got an installed distribution or not, if we did
674    # we want to return it's version.
675    return dist.version if dist else None
676
677
678def consume(iterator):
679    """Consume an iterable at C speed."""
680    deque(iterator, maxlen=0)
681
682
683# Simulates an enum
684def enum(*sequential, **named):
685    enums = dict(zip(sequential, range(len(sequential))), **named)
686    reverse = {value: key for key, value in enums.items()}
687    enums['reverse_mapping'] = reverse
688    return type('Enum', (), enums)
689
690
691def build_netloc(host, port):
692    # type: (str, Optional[int]) -> str
693    """
694    Build a netloc from a host-port pair
695    """
696    if port is None:
697        return host
698    if ':' in host:
699        # Only wrap host with square brackets when it is IPv6
700        host = '[{}]'.format(host)
701    return '{}:{}'.format(host, port)
702
703
704def build_url_from_netloc(netloc, scheme='https'):
705    # type: (str, str) -> str
706    """
707    Build a full URL from a netloc.
708    """
709    if netloc.count(':') >= 2 and '@' not in netloc and '[' not in netloc:
710        # It must be a bare IPv6 address, so wrap it with brackets.
711        netloc = '[{}]'.format(netloc)
712    return '{}://{}'.format(scheme, netloc)
713
714
715def parse_netloc(netloc):
716    # type: (str) -> Tuple[str, Optional[int]]
717    """
718    Return the host-port pair from a netloc.
719    """
720    url = build_url_from_netloc(netloc)
721    parsed = urllib_parse.urlparse(url)
722    return parsed.hostname, parsed.port
723
724
725def split_auth_from_netloc(netloc):
726    """
727    Parse out and remove the auth information from a netloc.
728
729    Returns: (netloc, (username, password)).
730    """
731    if '@' not in netloc:
732        return netloc, (None, None)
733
734    # Split from the right because that's how urllib.parse.urlsplit()
735    # behaves if more than one @ is present (which can be checked using
736    # the password attribute of urlsplit()'s return value).
737    auth, netloc = netloc.rsplit('@', 1)
738    if ':' in auth:
739        # Split from the left because that's how urllib.parse.urlsplit()
740        # behaves if more than one : is present (which again can be checked
741        # using the password attribute of the return value)
742        user_pass = auth.split(':', 1)
743    else:
744        user_pass = auth, None
745
746    user_pass = tuple(
747        None if x is None else urllib_unquote(x) for x in user_pass
748    )
749
750    return netloc, user_pass
751
752
753def redact_netloc(netloc):
754    # type: (str) -> str
755    """
756    Replace the sensitive data in a netloc with "****", if it exists.
757
758    For example:
759        - "user:pass@example.com" returns "user:****@example.com"
760        - "accesstoken@example.com" returns "****@example.com"
761    """
762    netloc, (user, password) = split_auth_from_netloc(netloc)
763    if user is None:
764        return netloc
765    if password is None:
766        user = '****'
767        password = ''
768    else:
769        user = urllib_parse.quote(user)
770        password = ':****'
771    return '{user}{password}@{netloc}'.format(user=user,
772                                              password=password,
773                                              netloc=netloc)
774
775
776def _transform_url(url, transform_netloc):
777    """Transform and replace netloc in a url.
778
779    transform_netloc is a function taking the netloc and returning a
780    tuple. The first element of this tuple is the new netloc. The
781    entire tuple is returned.
782
783    Returns a tuple containing the transformed url as item 0 and the
784    original tuple returned by transform_netloc as item 1.
785    """
786    purl = urllib_parse.urlsplit(url)
787    netloc_tuple = transform_netloc(purl.netloc)
788    # stripped url
789    url_pieces = (
790        purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment
791    )
792    surl = urllib_parse.urlunsplit(url_pieces)
793    return surl, netloc_tuple
794
795
796def _get_netloc(netloc):
797    return split_auth_from_netloc(netloc)
798
799
800def _redact_netloc(netloc):
801    return (redact_netloc(netloc),)
802
803
804def split_auth_netloc_from_url(url):
805    # type: (str) -> Tuple[str, str, Tuple[str, str]]
806    """
807    Parse a url into separate netloc, auth, and url with no auth.
808
809    Returns: (url_without_auth, netloc, (username, password))
810    """
811    url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)
812    return url_without_auth, netloc, auth
813
814
815def remove_auth_from_url(url):
816    # type: (str) -> str
817    """Return a copy of url with 'username:password@' removed."""
818    # username/pass params are passed to subversion through flags
819    # and are not recognized in the url.
820    return _transform_url(url, _get_netloc)[0]
821
822
823def redact_auth_from_url(url):
824    # type: (str) -> str
825    """Replace the password in a given url with ****."""
826    return _transform_url(url, _redact_netloc)[0]
827
828
829class HiddenText(object):
830    def __init__(
831        self,
832        secret,    # type: str
833        redacted,  # type: str
834    ):
835        # type: (...) -> None
836        self.secret = secret
837        self.redacted = redacted
838
839    def __repr__(self):
840        # type: (...) -> str
841        return '<HiddenText {!r}>'.format(str(self))
842
843    def __str__(self):
844        # type: (...) -> str
845        return self.redacted
846
847    # This is useful for testing.
848    def __eq__(self, other):
849        # type: (Any) -> bool
850        if type(self) != type(other):
851            return False
852
853        # The string being used for redaction doesn't also have to match,
854        # just the raw, original string.
855        return (self.secret == other.secret)
856
857    # We need to provide an explicit __ne__ implementation for Python 2.
858    # TODO: remove this when we drop PY2 support.
859    def __ne__(self, other):
860        # type: (Any) -> bool
861        return not self == other
862
863
864def hide_value(value):
865    # type: (str) -> HiddenText
866    return HiddenText(value, redacted='****')
867
868
869def hide_url(url):
870    # type: (str) -> HiddenText
871    redacted = redact_auth_from_url(url)
872    return HiddenText(url, redacted=redacted)
873
874
875def protect_pip_from_modification_on_windows(modifying_pip):
876    # type: (bool) -> None
877    """Protection of pip.exe from modification on Windows
878
879    On Windows, any operation modifying pip should be run as:
880        python -m pip ...
881    """
882    pip_names = [
883        "pip.exe",
884        "pip{}.exe".format(sys.version_info[0]),
885        "pip{}.{}.exe".format(*sys.version_info[:2])
886    ]
887
888    # See https://github.com/pypa/pip/issues/1299 for more discussion
889    should_show_use_python_msg = (
890        modifying_pip and
891        WINDOWS and
892        os.path.basename(sys.argv[0]) in pip_names
893    )
894
895    if should_show_use_python_msg:
896        new_command = [
897            sys.executable, "-m", "pip"
898        ] + sys.argv[1:]
899        raise CommandError(
900            'To modify pip, please run the following command:\n{}'
901            .format(" ".join(new_command))
902        )
903
904
905def is_console_interactive():
906    # type: () -> bool
907    """Is this console interactive?
908    """
909    return sys.stdin is not None and sys.stdin.isatty()
910
911
912def hash_file(path, blocksize=1 << 20):
913    # type: (Text, int) -> Tuple[Any, int]
914    """Return (hash, length) for path using hashlib.sha256()
915    """
916
917    h = hashlib.sha256()
918    length = 0
919    with open(path, 'rb') as f:
920        for block in read_chunks(f, size=blocksize):
921            length += len(block)
922            h.update(block)
923    return h, length
924
925
926def is_wheel_installed():
927    """
928    Return whether the wheel package is installed.
929    """
930    try:
931        import wheel  # noqa: F401
932    except ImportError:
933        return False
934
935    return True
936
937
938def pairwise(iterable):
939    # type: (Iterable[Any]) -> Iterator[Tuple[Any, Any]]
940    """
941    Return paired elements.
942
943    For example:
944        s -> (s0, s1), (s2, s3), (s4, s5), ...
945    """
946    iterable = iter(iterable)
947    return zip_longest(iterable, iterable)
948
949
950def partition(
951    pred,  # type: Callable[[T], bool]
952    iterable,  # type: Iterable[T]
953):
954    # type: (...) -> Tuple[Iterable[T], Iterable[T]]
955    """
956    Use a predicate to partition entries into false entries and true entries,
957    like
958
959        partition(is_odd, range(10)) --> 0 2 4 6 8   and  1 3 5 7 9
960    """
961    t1, t2 = tee(iterable)
962    return filterfalse(pred, t1), filter(pred, t2)
963