1import csv 2import functools 3import os 4import sys 5import sysconfig 6from importlib.util import cache_from_source 7from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple 8 9from pip._vendor import pkg_resources 10from pip._vendor.pkg_resources import Distribution 11 12from pip._internal.exceptions import UninstallationError 13from pip._internal.locations import get_bin_prefix, get_bin_user 14from pip._internal.utils.compat import WINDOWS 15from pip._internal.utils.logging import getLogger, indent_log 16from pip._internal.utils.misc import ( 17 ask, 18 dist_in_usersite, 19 dist_is_local, 20 egg_link_path, 21 is_local, 22 normalize_path, 23 renames, 24 rmtree, 25) 26from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory 27 28logger = getLogger(__name__) 29 30 31def _script_names(dist: Distribution, script_name: str, is_gui: bool) -> List[str]: 32 """Create the fully qualified name of the files created by 33 {console,gui}_scripts for the given ``dist``. 34 Returns the list of file names 35 """ 36 if dist_in_usersite(dist): 37 bin_dir = get_bin_user() 38 else: 39 bin_dir = get_bin_prefix() 40 exe_name = os.path.join(bin_dir, script_name) 41 paths_to_remove = [exe_name] 42 if WINDOWS: 43 paths_to_remove.append(exe_name + '.exe') 44 paths_to_remove.append(exe_name + '.exe.manifest') 45 if is_gui: 46 paths_to_remove.append(exe_name + '-script.pyw') 47 else: 48 paths_to_remove.append(exe_name + '-script.py') 49 return paths_to_remove 50 51 52def _unique(fn: Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]: 53 @functools.wraps(fn) 54 def unique(*args: Any, **kw: Any) -> Iterator[Any]: 55 seen: Set[Any] = set() 56 for item in fn(*args, **kw): 57 if item not in seen: 58 seen.add(item) 59 yield item 60 return unique 61 62 63@_unique 64def uninstallation_paths(dist: Distribution) -> Iterator[str]: 65 """ 66 Yield all the uninstallation paths for dist based on RECORD-without-.py[co] 67 68 Yield paths to all the files in RECORD. For each .py file in RECORD, add 69 the .pyc and .pyo in the same directory. 70 71 UninstallPathSet.add() takes care of the __pycache__ .py[co]. 72 73 If RECORD is not found, raises UninstallationError, 74 with possible information from the INSTALLER file. 75 76 https://packaging.python.org/specifications/recording-installed-packages/ 77 """ 78 try: 79 r = csv.reader(dist.get_metadata_lines('RECORD')) 80 except FileNotFoundError as missing_record_exception: 81 msg = 'Cannot uninstall {dist}, RECORD file not found.'.format(dist=dist) 82 try: 83 installer = next(dist.get_metadata_lines('INSTALLER')) 84 if not installer or installer == 'pip': 85 raise ValueError() 86 except (OSError, StopIteration, ValueError): 87 dep = '{}=={}'.format(dist.project_name, dist.version) 88 msg += (" You might be able to recover from this via: " 89 "'pip install --force-reinstall --no-deps {}'.".format(dep)) 90 else: 91 msg += ' Hint: The package was installed by {}.'.format(installer) 92 raise UninstallationError(msg) from missing_record_exception 93 for row in r: 94 path = os.path.join(dist.location, row[0]) 95 yield path 96 if path.endswith('.py'): 97 dn, fn = os.path.split(path) 98 base = fn[:-3] 99 path = os.path.join(dn, base + '.pyc') 100 yield path 101 path = os.path.join(dn, base + '.pyo') 102 yield path 103 104 105def compact(paths: Iterable[str]) -> Set[str]: 106 """Compact a path set to contain the minimal number of paths 107 necessary to contain all paths in the set. If /a/path/ and 108 /a/path/to/a/file.txt are both in the set, leave only the 109 shorter path.""" 110 111 sep = os.path.sep 112 short_paths: Set[str] = set() 113 for path in sorted(paths, key=len): 114 should_skip = any( 115 path.startswith(shortpath.rstrip("*")) and 116 path[len(shortpath.rstrip("*").rstrip(sep))] == sep 117 for shortpath in short_paths 118 ) 119 if not should_skip: 120 short_paths.add(path) 121 return short_paths 122 123 124def compress_for_rename(paths: Iterable[str]) -> Set[str]: 125 """Returns a set containing the paths that need to be renamed. 126 127 This set may include directories when the original sequence of paths 128 included every file on disk. 129 """ 130 case_map = {os.path.normcase(p): p for p in paths} 131 remaining = set(case_map) 132 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len) 133 wildcards: Set[str] = set() 134 135 def norm_join(*a: str) -> str: 136 return os.path.normcase(os.path.join(*a)) 137 138 for root in unchecked: 139 if any(os.path.normcase(root).startswith(w) 140 for w in wildcards): 141 # This directory has already been handled. 142 continue 143 144 all_files: Set[str] = set() 145 all_subdirs: Set[str] = set() 146 for dirname, subdirs, files in os.walk(root): 147 all_subdirs.update(norm_join(root, dirname, d) 148 for d in subdirs) 149 all_files.update(norm_join(root, dirname, f) 150 for f in files) 151 # If all the files we found are in our remaining set of files to 152 # remove, then remove them from the latter set and add a wildcard 153 # for the directory. 154 if not (all_files - remaining): 155 remaining.difference_update(all_files) 156 wildcards.add(root + os.sep) 157 158 return set(map(case_map.__getitem__, remaining)) | wildcards 159 160 161def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]: 162 """Returns a tuple of 2 sets of which paths to display to user 163 164 The first set contains paths that would be deleted. Files of a package 165 are not added and the top-level directory of the package has a '*' added 166 at the end - to signify that all it's contents are removed. 167 168 The second set contains files that would have been skipped in the above 169 folders. 170 """ 171 172 will_remove = set(paths) 173 will_skip = set() 174 175 # Determine folders and files 176 folders = set() 177 files = set() 178 for path in will_remove: 179 if path.endswith(".pyc"): 180 continue 181 if path.endswith("__init__.py") or ".dist-info" in path: 182 folders.add(os.path.dirname(path)) 183 files.add(path) 184 185 # probably this one https://github.com/python/mypy/issues/390 186 _normcased_files = set(map(os.path.normcase, files)) # type: ignore 187 188 folders = compact(folders) 189 190 # This walks the tree using os.walk to not miss extra folders 191 # that might get added. 192 for folder in folders: 193 for dirpath, _, dirfiles in os.walk(folder): 194 for fname in dirfiles: 195 if fname.endswith(".pyc"): 196 continue 197 198 file_ = os.path.join(dirpath, fname) 199 if (os.path.isfile(file_) and 200 os.path.normcase(file_) not in _normcased_files): 201 # We are skipping this file. Add it to the set. 202 will_skip.add(file_) 203 204 will_remove = files | { 205 os.path.join(folder, "*") for folder in folders 206 } 207 208 return will_remove, will_skip 209 210 211class StashedUninstallPathSet: 212 """A set of file rename operations to stash files while 213 tentatively uninstalling them.""" 214 def __init__(self) -> None: 215 # Mapping from source file root to [Adjacent]TempDirectory 216 # for files under that directory. 217 self._save_dirs: Dict[str, TempDirectory] = {} 218 # (old path, new path) tuples for each move that may need 219 # to be undone. 220 self._moves: List[Tuple[str, str]] = [] 221 222 def _get_directory_stash(self, path: str) -> str: 223 """Stashes a directory. 224 225 Directories are stashed adjacent to their original location if 226 possible, or else moved/copied into the user's temp dir.""" 227 228 try: 229 save_dir: TempDirectory = AdjacentTempDirectory(path) 230 except OSError: 231 save_dir = TempDirectory(kind="uninstall") 232 self._save_dirs[os.path.normcase(path)] = save_dir 233 234 return save_dir.path 235 236 def _get_file_stash(self, path: str) -> str: 237 """Stashes a file. 238 239 If no root has been provided, one will be created for the directory 240 in the user's temp directory.""" 241 path = os.path.normcase(path) 242 head, old_head = os.path.dirname(path), None 243 save_dir = None 244 245 while head != old_head: 246 try: 247 save_dir = self._save_dirs[head] 248 break 249 except KeyError: 250 pass 251 head, old_head = os.path.dirname(head), head 252 else: 253 # Did not find any suitable root 254 head = os.path.dirname(path) 255 save_dir = TempDirectory(kind='uninstall') 256 self._save_dirs[head] = save_dir 257 258 relpath = os.path.relpath(path, head) 259 if relpath and relpath != os.path.curdir: 260 return os.path.join(save_dir.path, relpath) 261 return save_dir.path 262 263 def stash(self, path: str) -> str: 264 """Stashes the directory or file and returns its new location. 265 Handle symlinks as files to avoid modifying the symlink targets. 266 """ 267 path_is_dir = os.path.isdir(path) and not os.path.islink(path) 268 if path_is_dir: 269 new_path = self._get_directory_stash(path) 270 else: 271 new_path = self._get_file_stash(path) 272 273 self._moves.append((path, new_path)) 274 if (path_is_dir and os.path.isdir(new_path)): 275 # If we're moving a directory, we need to 276 # remove the destination first or else it will be 277 # moved to inside the existing directory. 278 # We just created new_path ourselves, so it will 279 # be removable. 280 os.rmdir(new_path) 281 renames(path, new_path) 282 return new_path 283 284 def commit(self) -> None: 285 """Commits the uninstall by removing stashed files.""" 286 for _, save_dir in self._save_dirs.items(): 287 save_dir.cleanup() 288 self._moves = [] 289 self._save_dirs = {} 290 291 def rollback(self) -> None: 292 """Undoes the uninstall by moving stashed files back.""" 293 for p in self._moves: 294 logger.info("Moving to %s\n from %s", *p) 295 296 for new_path, path in self._moves: 297 try: 298 logger.debug('Replacing %s from %s', new_path, path) 299 if os.path.isfile(new_path) or os.path.islink(new_path): 300 os.unlink(new_path) 301 elif os.path.isdir(new_path): 302 rmtree(new_path) 303 renames(path, new_path) 304 except OSError as ex: 305 logger.error("Failed to restore %s", new_path) 306 logger.debug("Exception: %s", ex) 307 308 self.commit() 309 310 @property 311 def can_rollback(self) -> bool: 312 return bool(self._moves) 313 314 315class UninstallPathSet: 316 """A set of file paths to be removed in the uninstallation of a 317 requirement.""" 318 def __init__(self, dist: Distribution) -> None: 319 self.paths: Set[str] = set() 320 self._refuse: Set[str] = set() 321 self.pth: Dict[str, UninstallPthEntries] = {} 322 self.dist = dist 323 self._moved_paths = StashedUninstallPathSet() 324 325 def _permitted(self, path: str) -> bool: 326 """ 327 Return True if the given path is one we are permitted to 328 remove/modify, False otherwise. 329 330 """ 331 return is_local(path) 332 333 def add(self, path: str) -> None: 334 head, tail = os.path.split(path) 335 336 # we normalize the head to resolve parent directory symlinks, but not 337 # the tail, since we only want to uninstall symlinks, not their targets 338 path = os.path.join(normalize_path(head), os.path.normcase(tail)) 339 340 if not os.path.exists(path): 341 return 342 if self._permitted(path): 343 self.paths.add(path) 344 else: 345 self._refuse.add(path) 346 347 # __pycache__ files can show up after 'installed-files.txt' is created, 348 # due to imports 349 if os.path.splitext(path)[1] == '.py': 350 self.add(cache_from_source(path)) 351 352 def add_pth(self, pth_file: str, entry: str) -> None: 353 pth_file = normalize_path(pth_file) 354 if self._permitted(pth_file): 355 if pth_file not in self.pth: 356 self.pth[pth_file] = UninstallPthEntries(pth_file) 357 self.pth[pth_file].add(entry) 358 else: 359 self._refuse.add(pth_file) 360 361 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None: 362 """Remove paths in ``self.paths`` with confirmation (unless 363 ``auto_confirm`` is True).""" 364 365 if not self.paths: 366 logger.info( 367 "Can't uninstall '%s'. No files were found to uninstall.", 368 self.dist.project_name, 369 ) 370 return 371 372 dist_name_version = ( 373 self.dist.project_name + "-" + self.dist.version 374 ) 375 logger.info('Uninstalling %s:', dist_name_version) 376 377 with indent_log(): 378 if auto_confirm or self._allowed_to_proceed(verbose): 379 moved = self._moved_paths 380 381 for_rename = compress_for_rename(self.paths) 382 383 for path in sorted(compact(for_rename)): 384 moved.stash(path) 385 logger.verbose('Removing file or directory %s', path) 386 387 for pth in self.pth.values(): 388 pth.remove() 389 390 logger.info('Successfully uninstalled %s', dist_name_version) 391 392 def _allowed_to_proceed(self, verbose: bool) -> bool: 393 """Display which files would be deleted and prompt for confirmation 394 """ 395 396 def _display(msg: str, paths: Iterable[str]) -> None: 397 if not paths: 398 return 399 400 logger.info(msg) 401 with indent_log(): 402 for path in sorted(compact(paths)): 403 logger.info(path) 404 405 if not verbose: 406 will_remove, will_skip = compress_for_output_listing(self.paths) 407 else: 408 # In verbose mode, display all the files that are going to be 409 # deleted. 410 will_remove = set(self.paths) 411 will_skip = set() 412 413 _display('Would remove:', will_remove) 414 _display('Would not remove (might be manually added):', will_skip) 415 _display('Would not remove (outside of prefix):', self._refuse) 416 if verbose: 417 _display('Will actually move:', compress_for_rename(self.paths)) 418 419 return ask('Proceed (Y/n)? ', ('y', 'n', '')) != 'n' 420 421 def rollback(self) -> None: 422 """Rollback the changes previously made by remove().""" 423 if not self._moved_paths.can_rollback: 424 logger.error( 425 "Can't roll back %s; was not uninstalled", 426 self.dist.project_name, 427 ) 428 return 429 logger.info('Rolling back uninstall of %s', self.dist.project_name) 430 self._moved_paths.rollback() 431 for pth in self.pth.values(): 432 pth.rollback() 433 434 def commit(self) -> None: 435 """Remove temporary save dir: rollback will no longer be possible.""" 436 self._moved_paths.commit() 437 438 @classmethod 439 def from_dist(cls, dist: Distribution) -> "UninstallPathSet": 440 dist_path = normalize_path(dist.location) 441 if not dist_is_local(dist): 442 logger.info( 443 "Not uninstalling %s at %s, outside environment %s", 444 dist.key, 445 dist_path, 446 sys.prefix, 447 ) 448 return cls(dist) 449 450 if dist_path in {p for p in {sysconfig.get_path("stdlib"), 451 sysconfig.get_path("platstdlib")} 452 if p}: 453 logger.info( 454 "Not uninstalling %s at %s, as it is in the standard library.", 455 dist.key, 456 dist_path, 457 ) 458 return cls(dist) 459 460 paths_to_remove = cls(dist) 461 develop_egg_link = egg_link_path(dist) 462 develop_egg_link_egg_info = '{}.egg-info'.format( 463 pkg_resources.to_filename(dist.project_name)) 464 egg_info_exists = dist.egg_info and os.path.exists(dist.egg_info) 465 # Special case for distutils installed package 466 distutils_egg_info = getattr(dist._provider, 'path', None) 467 468 # Uninstall cases order do matter as in the case of 2 installs of the 469 # same package, pip needs to uninstall the currently detected version 470 if (egg_info_exists and dist.egg_info.endswith('.egg-info') and 471 not dist.egg_info.endswith(develop_egg_link_egg_info)): 472 # if dist.egg_info.endswith(develop_egg_link_egg_info), we 473 # are in fact in the develop_egg_link case 474 paths_to_remove.add(dist.egg_info) 475 if dist.has_metadata('installed-files.txt'): 476 for installed_file in dist.get_metadata( 477 'installed-files.txt').splitlines(): 478 path = os.path.normpath( 479 os.path.join(dist.egg_info, installed_file) 480 ) 481 paths_to_remove.add(path) 482 # FIXME: need a test for this elif block 483 # occurs with --single-version-externally-managed/--record outside 484 # of pip 485 elif dist.has_metadata('top_level.txt'): 486 if dist.has_metadata('namespace_packages.txt'): 487 namespaces = dist.get_metadata('namespace_packages.txt') 488 else: 489 namespaces = [] 490 for top_level_pkg in [ 491 p for p 492 in dist.get_metadata('top_level.txt').splitlines() 493 if p and p not in namespaces]: 494 path = os.path.join(dist.location, top_level_pkg) 495 paths_to_remove.add(path) 496 paths_to_remove.add(path + '.py') 497 paths_to_remove.add(path + '.pyc') 498 paths_to_remove.add(path + '.pyo') 499 500 elif distutils_egg_info: 501 raise UninstallationError( 502 "Cannot uninstall {!r}. It is a distutils installed project " 503 "and thus we cannot accurately determine which files belong " 504 "to it which would lead to only a partial uninstall.".format( 505 dist.project_name, 506 ) 507 ) 508 509 elif dist.location.endswith('.egg'): 510 # package installed by easy_install 511 # We cannot match on dist.egg_name because it can slightly vary 512 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg 513 paths_to_remove.add(dist.location) 514 easy_install_egg = os.path.split(dist.location)[1] 515 easy_install_pth = os.path.join(os.path.dirname(dist.location), 516 'easy-install.pth') 517 paths_to_remove.add_pth(easy_install_pth, './' + easy_install_egg) 518 519 elif egg_info_exists and dist.egg_info.endswith('.dist-info'): 520 for path in uninstallation_paths(dist): 521 paths_to_remove.add(path) 522 523 elif develop_egg_link: 524 # develop egg 525 with open(develop_egg_link) as fh: 526 link_pointer = os.path.normcase(fh.readline().strip()) 527 assert (link_pointer == dist.location), ( 528 'Egg-link {} does not match installed location of {} ' 529 '(at {})'.format( 530 link_pointer, dist.project_name, dist.location) 531 ) 532 paths_to_remove.add(develop_egg_link) 533 easy_install_pth = os.path.join(os.path.dirname(develop_egg_link), 534 'easy-install.pth') 535 paths_to_remove.add_pth(easy_install_pth, dist.location) 536 537 else: 538 logger.debug( 539 'Not sure how to uninstall: %s - Check: %s', 540 dist, dist.location, 541 ) 542 543 # find distutils scripts= scripts 544 if dist.has_metadata('scripts') and dist.metadata_isdir('scripts'): 545 for script in dist.metadata_listdir('scripts'): 546 if dist_in_usersite(dist): 547 bin_dir = get_bin_user() 548 else: 549 bin_dir = get_bin_prefix() 550 paths_to_remove.add(os.path.join(bin_dir, script)) 551 if WINDOWS: 552 paths_to_remove.add(os.path.join(bin_dir, script) + '.bat') 553 554 # find console_scripts 555 _scripts_to_remove = [] 556 console_scripts = dist.get_entry_map(group='console_scripts') 557 for name in console_scripts.keys(): 558 _scripts_to_remove.extend(_script_names(dist, name, False)) 559 # find gui_scripts 560 gui_scripts = dist.get_entry_map(group='gui_scripts') 561 for name in gui_scripts.keys(): 562 _scripts_to_remove.extend(_script_names(dist, name, True)) 563 564 for s in _scripts_to_remove: 565 paths_to_remove.add(s) 566 567 return paths_to_remove 568 569 570class UninstallPthEntries: 571 def __init__(self, pth_file: str) -> None: 572 self.file = pth_file 573 self.entries: Set[str] = set() 574 self._saved_lines: Optional[List[bytes]] = None 575 576 def add(self, entry: str) -> None: 577 entry = os.path.normcase(entry) 578 # On Windows, os.path.normcase converts the entry to use 579 # backslashes. This is correct for entries that describe absolute 580 # paths outside of site-packages, but all the others use forward 581 # slashes. 582 # os.path.splitdrive is used instead of os.path.isabs because isabs 583 # treats non-absolute paths with drive letter markings like c:foo\bar 584 # as absolute paths. It also does not recognize UNC paths if they don't 585 # have more than "\\sever\share". Valid examples: "\\server\share\" or 586 # "\\server\share\folder". 587 if WINDOWS and not os.path.splitdrive(entry)[0]: 588 entry = entry.replace('\\', '/') 589 self.entries.add(entry) 590 591 def remove(self) -> None: 592 logger.verbose('Removing pth entries from %s:', self.file) 593 594 # If the file doesn't exist, log a warning and return 595 if not os.path.isfile(self.file): 596 logger.warning( 597 "Cannot remove entries from nonexistent file %s", self.file 598 ) 599 return 600 with open(self.file, 'rb') as fh: 601 # windows uses '\r\n' with py3k, but uses '\n' with py2.x 602 lines = fh.readlines() 603 self._saved_lines = lines 604 if any(b'\r\n' in line for line in lines): 605 endline = '\r\n' 606 else: 607 endline = '\n' 608 # handle missing trailing newline 609 if lines and not lines[-1].endswith(endline.encode("utf-8")): 610 lines[-1] = lines[-1] + endline.encode("utf-8") 611 for entry in self.entries: 612 try: 613 logger.verbose('Removing entry: %s', entry) 614 lines.remove((entry + endline).encode("utf-8")) 615 except ValueError: 616 pass 617 with open(self.file, 'wb') as fh: 618 fh.writelines(lines) 619 620 def rollback(self) -> bool: 621 if self._saved_lines is None: 622 logger.error( 623 'Cannot roll back changes to %s, none were made', self.file 624 ) 625 return False 626 logger.debug('Rolling %s back to previous state', self.file) 627 with open(self.file, 'wb') as fh: 628 fh.writelines(self._saved_lines) 629 return True 630