1# The following comment should be removed at some point in the future. 2# mypy: strict-optional=False 3# mypy: disallow-untyped-defs=False 4 5from __future__ import absolute_import 6 7import contextlib 8import errno 9import getpass 10import hashlib 11import io 12import logging 13import os 14import posixpath 15import shutil 16import stat 17import sys 18from collections import deque 19from itertools import tee 20 21from pip._vendor import pkg_resources 22from pip._vendor.packaging.utils import canonicalize_name 23 24# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is 25# why we ignore the type on this import. 26from pip._vendor.retrying import retry # type: ignore 27from pip._vendor.six import PY2, text_type 28from pip._vendor.six.moves import filter, filterfalse, input, map, zip_longest 29from pip._vendor.six.moves.urllib import parse as urllib_parse 30from pip._vendor.six.moves.urllib.parse import unquote as urllib_unquote 31 32from pip import __version__ 33from pip._internal.exceptions import CommandError 34from pip._internal.locations import get_major_minor_version, site_packages, user_site 35from pip._internal.utils.compat import WINDOWS, expanduser, stdlib_pkgs, str_to_display 36from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast 37from pip._internal.utils.virtualenv import ( 38 running_under_virtualenv, 39 virtualenv_no_global, 40) 41 42if PY2: 43 from io import BytesIO as StringIO 44else: 45 from io import StringIO 46 47if MYPY_CHECK_RUNNING: 48 from typing import ( 49 Any, 50 AnyStr, 51 Callable, 52 Container, 53 Iterable, 54 Iterator, 55 List, 56 Optional, 57 Text, 58 Tuple, 59 TypeVar, 60 Union, 61 ) 62 63 from pip._vendor.pkg_resources import Distribution 64 65 VersionInfo = Tuple[int, int, int] 66 T = TypeVar("T") 67 68 69__all__ = ['rmtree', 'display_path', 'backup_dir', 70 'ask', 'splitext', 71 'format_size', 'is_installable_dir', 72 'normalize_path', 73 'renames', 'get_prog', 74 'captured_stdout', 'ensure_dir', 75 'get_installed_version', 'remove_auth_from_url'] 76 77 78logger = logging.getLogger(__name__) 79 80 81def get_pip_version(): 82 # type: () -> str 83 pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..") 84 pip_pkg_dir = os.path.abspath(pip_pkg_dir) 85 86 return ( 87 'pip {} from {} (python {})'.format( 88 __version__, pip_pkg_dir, get_major_minor_version(), 89 ) 90 ) 91 92 93def normalize_version_info(py_version_info): 94 # type: (Tuple[int, ...]) -> Tuple[int, int, int] 95 """ 96 Convert a tuple of ints representing a Python version to one of length 97 three. 98 99 :param py_version_info: a tuple of ints representing a Python version, 100 or None to specify no version. The tuple can have any length. 101 102 :return: a tuple of length three if `py_version_info` is non-None. 103 Otherwise, return `py_version_info` unchanged (i.e. None). 104 """ 105 if len(py_version_info) < 3: 106 py_version_info += (3 - len(py_version_info)) * (0,) 107 elif len(py_version_info) > 3: 108 py_version_info = py_version_info[:3] 109 110 return cast('VersionInfo', py_version_info) 111 112 113def ensure_dir(path): 114 # type: (AnyStr) -> None 115 """os.path.makedirs without EEXIST.""" 116 try: 117 os.makedirs(path) 118 except OSError as e: 119 # Windows can raise spurious ENOTEMPTY errors. See #6426. 120 if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY: 121 raise 122 123 124def get_prog(): 125 # type: () -> str 126 try: 127 prog = os.path.basename(sys.argv[0]) 128 if prog in ('__main__.py', '-c'): 129 return "{} -m pip".format(sys.executable) 130 else: 131 return prog 132 except (AttributeError, TypeError, IndexError): 133 pass 134 return 'pip' 135 136 137# Retry every half second for up to 3 seconds 138@retry(stop_max_delay=3000, wait_fixed=500) 139def rmtree(dir, ignore_errors=False): 140 # type: (AnyStr, bool) -> None 141 shutil.rmtree(dir, ignore_errors=ignore_errors, 142 onerror=rmtree_errorhandler) 143 144 145def rmtree_errorhandler(func, path, exc_info): 146 """On Windows, the files in .svn are read-only, so when rmtree() tries to 147 remove them, an exception is thrown. We catch that here, remove the 148 read-only attribute, and hopefully continue without problems.""" 149 try: 150 has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE) 151 except (IOError, OSError): 152 # it's equivalent to os.path.exists 153 return 154 155 if has_attr_readonly: 156 # convert to read/write 157 os.chmod(path, stat.S_IWRITE) 158 # use the original function to repeat the operation 159 func(path) 160 return 161 else: 162 raise 163 164 165def path_to_display(path): 166 # type: (Optional[Union[str, Text]]) -> Optional[Text] 167 """ 168 Convert a bytes (or text) path to text (unicode in Python 2) for display 169 and logging purposes. 170 171 This function should never error out. Also, this function is mainly needed 172 for Python 2 since in Python 3 str paths are already text. 173 """ 174 if path is None: 175 return None 176 if isinstance(path, text_type): 177 return path 178 # Otherwise, path is a bytes object (str in Python 2). 179 try: 180 display_path = path.decode(sys.getfilesystemencoding(), 'strict') 181 except UnicodeDecodeError: 182 # Include the full bytes to make troubleshooting easier, even though 183 # it may not be very human readable. 184 if PY2: 185 # Convert the bytes to a readable str representation using 186 # repr(), and then convert the str to unicode. 187 # Also, we add the prefix "b" to the repr() return value both 188 # to make the Python 2 output look like the Python 3 output, and 189 # to signal to the user that this is a bytes representation. 190 display_path = str_to_display('b{!r}'.format(path)) 191 else: 192 # Silence the "F821 undefined name 'ascii'" flake8 error since 193 # in Python 3 ascii() is a built-in. 194 display_path = ascii(path) # noqa: F821 195 196 return display_path 197 198 199def display_path(path): 200 # type: (Union[str, Text]) -> str 201 """Gives the display value for a given path, making it relative to cwd 202 if possible.""" 203 path = os.path.normcase(os.path.abspath(path)) 204 if sys.version_info[0] == 2: 205 path = path.decode(sys.getfilesystemencoding(), 'replace') 206 path = path.encode(sys.getdefaultencoding(), 'replace') 207 if path.startswith(os.getcwd() + os.path.sep): 208 path = '.' + path[len(os.getcwd()):] 209 return path 210 211 212def backup_dir(dir, ext='.bak'): 213 # type: (str, str) -> str 214 """Figure out the name of a directory to back up the given dir to 215 (adding .bak, .bak2, etc)""" 216 n = 1 217 extension = ext 218 while os.path.exists(dir + extension): 219 n += 1 220 extension = ext + str(n) 221 return dir + extension 222 223 224def ask_path_exists(message, options): 225 # type: (str, Iterable[str]) -> str 226 for action in os.environ.get('PIP_EXISTS_ACTION', '').split(): 227 if action in options: 228 return action 229 return ask(message, options) 230 231 232def _check_no_input(message): 233 # type: (str) -> None 234 """Raise an error if no input is allowed.""" 235 if os.environ.get('PIP_NO_INPUT'): 236 raise Exception( 237 'No input was expected ($PIP_NO_INPUT set); question: {}'.format( 238 message) 239 ) 240 241 242def ask(message, options): 243 # type: (str, Iterable[str]) -> str 244 """Ask the message interactively, with the given possible responses""" 245 while 1: 246 _check_no_input(message) 247 response = input(message) 248 response = response.strip().lower() 249 if response not in options: 250 print( 251 'Your response ({!r}) was not one of the expected responses: ' 252 '{}'.format(response, ', '.join(options)) 253 ) 254 else: 255 return response 256 257 258def ask_input(message): 259 # type: (str) -> str 260 """Ask for input interactively.""" 261 _check_no_input(message) 262 return input(message) 263 264 265def ask_password(message): 266 # type: (str) -> str 267 """Ask for a password interactively.""" 268 _check_no_input(message) 269 return getpass.getpass(message) 270 271 272def format_size(bytes): 273 # type: (float) -> str 274 if bytes > 1000 * 1000: 275 return '{:.1f} MB'.format(bytes / 1000.0 / 1000) 276 elif bytes > 10 * 1000: 277 return '{} kB'.format(int(bytes / 1000)) 278 elif bytes > 1000: 279 return '{:.1f} kB'.format(bytes / 1000.0) 280 else: 281 return '{} bytes'.format(int(bytes)) 282 283 284def tabulate(rows): 285 # type: (Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]] 286 """Return a list of formatted rows and a list of column sizes. 287 288 For example:: 289 290 >>> tabulate([['foobar', 2000], [0xdeadbeef]]) 291 (['foobar 2000', '3735928559'], [10, 4]) 292 """ 293 rows = [tuple(map(str, row)) for row in rows] 294 sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue='')] 295 table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows] 296 return table, sizes 297 298 299def is_installable_dir(path): 300 # type: (str) -> bool 301 """Is path is a directory containing setup.py or pyproject.toml? 302 """ 303 if not os.path.isdir(path): 304 return False 305 setup_py = os.path.join(path, 'setup.py') 306 if os.path.isfile(setup_py): 307 return True 308 pyproject_toml = os.path.join(path, 'pyproject.toml') 309 if os.path.isfile(pyproject_toml): 310 return True 311 return False 312 313 314def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): 315 """Yield pieces of data from a file-like object until EOF.""" 316 while True: 317 chunk = file.read(size) 318 if not chunk: 319 break 320 yield chunk 321 322 323def normalize_path(path, resolve_symlinks=True): 324 # type: (str, bool) -> str 325 """ 326 Convert a path to its canonical, case-normalized, absolute version. 327 328 """ 329 path = expanduser(path) 330 if resolve_symlinks: 331 path = os.path.realpath(path) 332 else: 333 path = os.path.abspath(path) 334 return os.path.normcase(path) 335 336 337def splitext(path): 338 # type: (str) -> Tuple[str, str] 339 """Like os.path.splitext, but take off .tar too""" 340 base, ext = posixpath.splitext(path) 341 if base.lower().endswith('.tar'): 342 ext = base[-4:] + ext 343 base = base[:-4] 344 return base, ext 345 346 347def renames(old, new): 348 # type: (str, str) -> None 349 """Like os.renames(), but handles renaming across devices.""" 350 # Implementation borrowed from os.renames(). 351 head, tail = os.path.split(new) 352 if head and tail and not os.path.exists(head): 353 os.makedirs(head) 354 355 shutil.move(old, new) 356 357 head, tail = os.path.split(old) 358 if head and tail: 359 try: 360 os.removedirs(head) 361 except OSError: 362 pass 363 364 365def is_local(path): 366 # type: (str) -> bool 367 """ 368 Return True if path is within sys.prefix, if we're running in a virtualenv. 369 370 If we're not in a virtualenv, all paths are considered "local." 371 372 Caution: this function assumes the head of path has been normalized 373 with normalize_path. 374 """ 375 if not running_under_virtualenv(): 376 return True 377 return path.startswith(normalize_path(sys.prefix)) 378 379 380def dist_is_local(dist): 381 # type: (Distribution) -> bool 382 """ 383 Return True if given Distribution object is installed locally 384 (i.e. within current virtualenv). 385 386 Always True if we're not in a virtualenv. 387 388 """ 389 return is_local(dist_location(dist)) 390 391 392def dist_in_usersite(dist): 393 # type: (Distribution) -> bool 394 """ 395 Return True if given Distribution is installed in user site. 396 """ 397 return dist_location(dist).startswith(normalize_path(user_site)) 398 399 400def dist_in_site_packages(dist): 401 # type: (Distribution) -> bool 402 """ 403 Return True if given Distribution is installed in 404 sysconfig.get_python_lib(). 405 """ 406 return dist_location(dist).startswith(normalize_path(site_packages)) 407 408 409def dist_is_editable(dist): 410 # type: (Distribution) -> bool 411 """ 412 Return True if given Distribution is an editable install. 413 """ 414 for path_item in sys.path: 415 egg_link = os.path.join(path_item, dist.project_name + '.egg-link') 416 if os.path.isfile(egg_link): 417 return True 418 return False 419 420 421def get_installed_distributions( 422 local_only=True, # type: bool 423 skip=stdlib_pkgs, # type: Container[str] 424 include_editables=True, # type: bool 425 editables_only=False, # type: bool 426 user_only=False, # type: bool 427 paths=None # type: Optional[List[str]] 428): 429 # type: (...) -> List[Distribution] 430 """ 431 Return a list of installed Distribution objects. 432 433 If ``local_only`` is True (default), only return installations 434 local to the current virtualenv, if in a virtualenv. 435 436 ``skip`` argument is an iterable of lower-case project names to 437 ignore; defaults to stdlib_pkgs 438 439 If ``include_editables`` is False, don't report editables. 440 441 If ``editables_only`` is True , only report editables. 442 443 If ``user_only`` is True , only report installations in the user 444 site directory. 445 446 If ``paths`` is set, only report the distributions present at the 447 specified list of locations. 448 """ 449 if paths: 450 working_set = pkg_resources.WorkingSet(paths) 451 else: 452 working_set = pkg_resources.working_set 453 454 if local_only: 455 local_test = dist_is_local 456 else: 457 def local_test(d): 458 return True 459 460 if include_editables: 461 def editable_test(d): 462 return True 463 else: 464 def editable_test(d): 465 return not dist_is_editable(d) 466 467 if editables_only: 468 def editables_only_test(d): 469 return dist_is_editable(d) 470 else: 471 def editables_only_test(d): 472 return True 473 474 if user_only: 475 user_test = dist_in_usersite 476 else: 477 def user_test(d): 478 return True 479 480 return [d for d in working_set 481 if local_test(d) and 482 d.key not in skip and 483 editable_test(d) and 484 editables_only_test(d) and 485 user_test(d) 486 ] 487 488 489def _search_distribution(req_name): 490 # type: (str) -> Optional[Distribution] 491 """Find a distribution matching the ``req_name`` in the environment. 492 493 This searches from *all* distributions available in the environment, to 494 match the behavior of ``pkg_resources.get_distribution()``. 495 """ 496 # Canonicalize the name before searching in the list of 497 # installed distributions and also while creating the package 498 # dictionary to get the Distribution object 499 req_name = canonicalize_name(req_name) 500 packages = get_installed_distributions( 501 local_only=False, 502 skip=(), 503 include_editables=True, 504 editables_only=False, 505 user_only=False, 506 paths=None, 507 ) 508 pkg_dict = {canonicalize_name(p.key): p for p in packages} 509 return pkg_dict.get(req_name) 510 511 512def get_distribution(req_name): 513 # type: (str) -> Optional[Distribution] 514 """Given a requirement name, return the installed Distribution object. 515 516 This searches from *all* distributions available in the environment, to 517 match the behavior of ``pkg_resources.get_distribution()``. 518 """ 519 520 # Search the distribution by looking through the working set 521 dist = _search_distribution(req_name) 522 523 # If distribution could not be found, call working_set.require 524 # to update the working set, and try to find the distribution 525 # again. 526 # This might happen for e.g. when you install a package 527 # twice, once using setup.py develop and again using setup.py install. 528 # Now when run pip uninstall twice, the package gets removed 529 # from the working set in the first uninstall, so we have to populate 530 # the working set again so that pip knows about it and the packages 531 # gets picked up and is successfully uninstalled the second time too. 532 if not dist: 533 try: 534 pkg_resources.working_set.require(req_name) 535 except pkg_resources.DistributionNotFound: 536 return None 537 return _search_distribution(req_name) 538 539 540def egg_link_path(dist): 541 # type: (Distribution) -> Optional[str] 542 """ 543 Return the path for the .egg-link file if it exists, otherwise, None. 544 545 There's 3 scenarios: 546 1) not in a virtualenv 547 try to find in site.USER_SITE, then site_packages 548 2) in a no-global virtualenv 549 try to find in site_packages 550 3) in a yes-global virtualenv 551 try to find in site_packages, then site.USER_SITE 552 (don't look in global location) 553 554 For #1 and #3, there could be odd cases, where there's an egg-link in 2 555 locations. 556 557 This method will just return the first one found. 558 """ 559 sites = [] 560 if running_under_virtualenv(): 561 sites.append(site_packages) 562 if not virtualenv_no_global() and user_site: 563 sites.append(user_site) 564 else: 565 if user_site: 566 sites.append(user_site) 567 sites.append(site_packages) 568 569 for site in sites: 570 egglink = os.path.join(site, dist.project_name) + '.egg-link' 571 if os.path.isfile(egglink): 572 return egglink 573 return None 574 575 576def dist_location(dist): 577 # type: (Distribution) -> str 578 """ 579 Get the site-packages location of this distribution. Generally 580 this is dist.location, except in the case of develop-installed 581 packages, where dist.location is the source code location, and we 582 want to know where the egg-link file is. 583 584 The returned location is normalized (in particular, with symlinks removed). 585 """ 586 egg_link = egg_link_path(dist) 587 if egg_link: 588 return normalize_path(egg_link) 589 return normalize_path(dist.location) 590 591 592def write_output(msg, *args): 593 # type: (Any, Any) -> None 594 logger.info(msg, *args) 595 596 597class FakeFile(object): 598 """Wrap a list of lines in an object with readline() to make 599 ConfigParser happy.""" 600 def __init__(self, lines): 601 self._gen = iter(lines) 602 603 def readline(self): 604 try: 605 return next(self._gen) 606 except StopIteration: 607 return '' 608 609 def __iter__(self): 610 return self._gen 611 612 613class StreamWrapper(StringIO): 614 615 @classmethod 616 def from_stream(cls, orig_stream): 617 cls.orig_stream = orig_stream 618 return cls() 619 620 # compileall.compile_dir() needs stdout.encoding to print to stdout 621 @property 622 def encoding(self): 623 return self.orig_stream.encoding 624 625 626@contextlib.contextmanager 627def captured_output(stream_name): 628 """Return a context manager used by captured_stdout/stdin/stderr 629 that temporarily replaces the sys stream *stream_name* with a StringIO. 630 631 Taken from Lib/support/__init__.py in the CPython repo. 632 """ 633 orig_stdout = getattr(sys, stream_name) 634 setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout)) 635 try: 636 yield getattr(sys, stream_name) 637 finally: 638 setattr(sys, stream_name, orig_stdout) 639 640 641def captured_stdout(): 642 """Capture the output of sys.stdout: 643 644 with captured_stdout() as stdout: 645 print('hello') 646 self.assertEqual(stdout.getvalue(), 'hello\n') 647 648 Taken from Lib/support/__init__.py in the CPython repo. 649 """ 650 return captured_output('stdout') 651 652 653def captured_stderr(): 654 """ 655 See captured_stdout(). 656 """ 657 return captured_output('stderr') 658 659 660def get_installed_version(dist_name, working_set=None): 661 """Get the installed version of dist_name avoiding pkg_resources cache""" 662 # Create a requirement that we'll look for inside of setuptools. 663 req = pkg_resources.Requirement.parse(dist_name) 664 665 if working_set is None: 666 # We want to avoid having this cached, so we need to construct a new 667 # working set each time. 668 working_set = pkg_resources.WorkingSet() 669 670 # Get the installed distribution from our working set 671 dist = working_set.find(req) 672 673 # Check to see if we got an installed distribution or not, if we did 674 # we want to return it's version. 675 return dist.version if dist else None 676 677 678def consume(iterator): 679 """Consume an iterable at C speed.""" 680 deque(iterator, maxlen=0) 681 682 683# Simulates an enum 684def enum(*sequential, **named): 685 enums = dict(zip(sequential, range(len(sequential))), **named) 686 reverse = {value: key for key, value in enums.items()} 687 enums['reverse_mapping'] = reverse 688 return type('Enum', (), enums) 689 690 691def build_netloc(host, port): 692 # type: (str, Optional[int]) -> str 693 """ 694 Build a netloc from a host-port pair 695 """ 696 if port is None: 697 return host 698 if ':' in host: 699 # Only wrap host with square brackets when it is IPv6 700 host = '[{}]'.format(host) 701 return '{}:{}'.format(host, port) 702 703 704def build_url_from_netloc(netloc, scheme='https'): 705 # type: (str, str) -> str 706 """ 707 Build a full URL from a netloc. 708 """ 709 if netloc.count(':') >= 2 and '@' not in netloc and '[' not in netloc: 710 # It must be a bare IPv6 address, so wrap it with brackets. 711 netloc = '[{}]'.format(netloc) 712 return '{}://{}'.format(scheme, netloc) 713 714 715def parse_netloc(netloc): 716 # type: (str) -> Tuple[str, Optional[int]] 717 """ 718 Return the host-port pair from a netloc. 719 """ 720 url = build_url_from_netloc(netloc) 721 parsed = urllib_parse.urlparse(url) 722 return parsed.hostname, parsed.port 723 724 725def split_auth_from_netloc(netloc): 726 """ 727 Parse out and remove the auth information from a netloc. 728 729 Returns: (netloc, (username, password)). 730 """ 731 if '@' not in netloc: 732 return netloc, (None, None) 733 734 # Split from the right because that's how urllib.parse.urlsplit() 735 # behaves if more than one @ is present (which can be checked using 736 # the password attribute of urlsplit()'s return value). 737 auth, netloc = netloc.rsplit('@', 1) 738 if ':' in auth: 739 # Split from the left because that's how urllib.parse.urlsplit() 740 # behaves if more than one : is present (which again can be checked 741 # using the password attribute of the return value) 742 user_pass = auth.split(':', 1) 743 else: 744 user_pass = auth, None 745 746 user_pass = tuple( 747 None if x is None else urllib_unquote(x) for x in user_pass 748 ) 749 750 return netloc, user_pass 751 752 753def redact_netloc(netloc): 754 # type: (str) -> str 755 """ 756 Replace the sensitive data in a netloc with "****", if it exists. 757 758 For example: 759 - "user:pass@example.com" returns "user:****@example.com" 760 - "accesstoken@example.com" returns "****@example.com" 761 """ 762 netloc, (user, password) = split_auth_from_netloc(netloc) 763 if user is None: 764 return netloc 765 if password is None: 766 user = '****' 767 password = '' 768 else: 769 user = urllib_parse.quote(user) 770 password = ':****' 771 return '{user}{password}@{netloc}'.format(user=user, 772 password=password, 773 netloc=netloc) 774 775 776def _transform_url(url, transform_netloc): 777 """Transform and replace netloc in a url. 778 779 transform_netloc is a function taking the netloc and returning a 780 tuple. The first element of this tuple is the new netloc. The 781 entire tuple is returned. 782 783 Returns a tuple containing the transformed url as item 0 and the 784 original tuple returned by transform_netloc as item 1. 785 """ 786 purl = urllib_parse.urlsplit(url) 787 netloc_tuple = transform_netloc(purl.netloc) 788 # stripped url 789 url_pieces = ( 790 purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment 791 ) 792 surl = urllib_parse.urlunsplit(url_pieces) 793 return surl, netloc_tuple 794 795 796def _get_netloc(netloc): 797 return split_auth_from_netloc(netloc) 798 799 800def _redact_netloc(netloc): 801 return (redact_netloc(netloc),) 802 803 804def split_auth_netloc_from_url(url): 805 # type: (str) -> Tuple[str, str, Tuple[str, str]] 806 """ 807 Parse a url into separate netloc, auth, and url with no auth. 808 809 Returns: (url_without_auth, netloc, (username, password)) 810 """ 811 url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc) 812 return url_without_auth, netloc, auth 813 814 815def remove_auth_from_url(url): 816 # type: (str) -> str 817 """Return a copy of url with 'username:password@' removed.""" 818 # username/pass params are passed to subversion through flags 819 # and are not recognized in the url. 820 return _transform_url(url, _get_netloc)[0] 821 822 823def redact_auth_from_url(url): 824 # type: (str) -> str 825 """Replace the password in a given url with ****.""" 826 return _transform_url(url, _redact_netloc)[0] 827 828 829class HiddenText(object): 830 def __init__( 831 self, 832 secret, # type: str 833 redacted, # type: str 834 ): 835 # type: (...) -> None 836 self.secret = secret 837 self.redacted = redacted 838 839 def __repr__(self): 840 # type: (...) -> str 841 return '<HiddenText {!r}>'.format(str(self)) 842 843 def __str__(self): 844 # type: (...) -> str 845 return self.redacted 846 847 # This is useful for testing. 848 def __eq__(self, other): 849 # type: (Any) -> bool 850 if type(self) != type(other): 851 return False 852 853 # The string being used for redaction doesn't also have to match, 854 # just the raw, original string. 855 return (self.secret == other.secret) 856 857 # We need to provide an explicit __ne__ implementation for Python 2. 858 # TODO: remove this when we drop PY2 support. 859 def __ne__(self, other): 860 # type: (Any) -> bool 861 return not self == other 862 863 864def hide_value(value): 865 # type: (str) -> HiddenText 866 return HiddenText(value, redacted='****') 867 868 869def hide_url(url): 870 # type: (str) -> HiddenText 871 redacted = redact_auth_from_url(url) 872 return HiddenText(url, redacted=redacted) 873 874 875def protect_pip_from_modification_on_windows(modifying_pip): 876 # type: (bool) -> None 877 """Protection of pip.exe from modification on Windows 878 879 On Windows, any operation modifying pip should be run as: 880 python -m pip ... 881 """ 882 pip_names = [ 883 "pip.exe", 884 "pip{}.exe".format(sys.version_info[0]), 885 "pip{}.{}.exe".format(*sys.version_info[:2]) 886 ] 887 888 # See https://github.com/pypa/pip/issues/1299 for more discussion 889 should_show_use_python_msg = ( 890 modifying_pip and 891 WINDOWS and 892 os.path.basename(sys.argv[0]) in pip_names 893 ) 894 895 if should_show_use_python_msg: 896 new_command = [ 897 sys.executable, "-m", "pip" 898 ] + sys.argv[1:] 899 raise CommandError( 900 'To modify pip, please run the following command:\n{}' 901 .format(" ".join(new_command)) 902 ) 903 904 905def is_console_interactive(): 906 # type: () -> bool 907 """Is this console interactive? 908 """ 909 return sys.stdin is not None and sys.stdin.isatty() 910 911 912def hash_file(path, blocksize=1 << 20): 913 # type: (Text, int) -> Tuple[Any, int] 914 """Return (hash, length) for path using hashlib.sha256() 915 """ 916 917 h = hashlib.sha256() 918 length = 0 919 with open(path, 'rb') as f: 920 for block in read_chunks(f, size=blocksize): 921 length += len(block) 922 h.update(block) 923 return h, length 924 925 926def is_wheel_installed(): 927 """ 928 Return whether the wheel package is installed. 929 """ 930 try: 931 import wheel # noqa: F401 932 except ImportError: 933 return False 934 935 return True 936 937 938def pairwise(iterable): 939 # type: (Iterable[Any]) -> Iterator[Tuple[Any, Any]] 940 """ 941 Return paired elements. 942 943 For example: 944 s -> (s0, s1), (s2, s3), (s4, s5), ... 945 """ 946 iterable = iter(iterable) 947 return zip_longest(iterable, iterable) 948 949 950def partition( 951 pred, # type: Callable[[T], bool] 952 iterable, # type: Iterable[T] 953): 954 # type: (...) -> Tuple[Iterable[T], Iterable[T]] 955 """ 956 Use a predicate to partition entries into false entries and true entries, 957 like 958 959 partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9 960 """ 961 t1, t2 = tee(iterable) 962 return filterfalse(pred, t1), filter(pred, t2) 963