1import csv
2import functools
3import os
4import sys
5import sysconfig
6from importlib.util import cache_from_source
7from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple
8
9from pip._vendor import pkg_resources
10from pip._vendor.pkg_resources import Distribution
11
12from pip._internal.exceptions import UninstallationError
13from pip._internal.locations import get_bin_prefix, get_bin_user
14from pip._internal.utils.compat import WINDOWS
15from pip._internal.utils.logging import getLogger, indent_log
16from pip._internal.utils.misc import (
17    ask,
18    dist_in_usersite,
19    dist_is_local,
20    egg_link_path,
21    is_local,
22    normalize_path,
23    renames,
24    rmtree,
25)
26from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
27
28logger = getLogger(__name__)
29
30
31def _script_names(dist: Distribution, script_name: str, is_gui: bool) -> List[str]:
32    """Create the fully qualified name of the files created by
33    {console,gui}_scripts for the given ``dist``.
34    Returns the list of file names
35    """
36    if dist_in_usersite(dist):
37        bin_dir = get_bin_user()
38    else:
39        bin_dir = get_bin_prefix()
40    exe_name = os.path.join(bin_dir, script_name)
41    paths_to_remove = [exe_name]
42    if WINDOWS:
43        paths_to_remove.append(exe_name + '.exe')
44        paths_to_remove.append(exe_name + '.exe.manifest')
45        if is_gui:
46            paths_to_remove.append(exe_name + '-script.pyw')
47        else:
48            paths_to_remove.append(exe_name + '-script.py')
49    return paths_to_remove
50
51
52def _unique(fn: Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]:
53    @functools.wraps(fn)
54    def unique(*args: Any, **kw: Any) -> Iterator[Any]:
55        seen: Set[Any] = set()
56        for item in fn(*args, **kw):
57            if item not in seen:
58                seen.add(item)
59                yield item
60    return unique
61
62
63@_unique
64def uninstallation_paths(dist: Distribution) -> Iterator[str]:
65    """
66    Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
67
68    Yield paths to all the files in RECORD. For each .py file in RECORD, add
69    the .pyc and .pyo in the same directory.
70
71    UninstallPathSet.add() takes care of the __pycache__ .py[co].
72
73    If RECORD is not found, raises UninstallationError,
74    with possible information from the INSTALLER file.
75
76    https://packaging.python.org/specifications/recording-installed-packages/
77    """
78    try:
79        r = csv.reader(dist.get_metadata_lines('RECORD'))
80    except FileNotFoundError as missing_record_exception:
81        msg = 'Cannot uninstall {dist}, RECORD file not found.'.format(dist=dist)
82        try:
83            installer = next(dist.get_metadata_lines('INSTALLER'))
84            if not installer or installer == 'pip':
85                raise ValueError()
86        except (OSError, StopIteration, ValueError):
87            dep = '{}=={}'.format(dist.project_name, dist.version)
88            msg += (" You might be able to recover from this via: "
89                    "'pip install --force-reinstall --no-deps {}'.".format(dep))
90        else:
91            msg += ' Hint: The package was installed by {}.'.format(installer)
92        raise UninstallationError(msg) from missing_record_exception
93    for row in r:
94        path = os.path.join(dist.location, row[0])
95        yield path
96        if path.endswith('.py'):
97            dn, fn = os.path.split(path)
98            base = fn[:-3]
99            path = os.path.join(dn, base + '.pyc')
100            yield path
101            path = os.path.join(dn, base + '.pyo')
102            yield path
103
104
105def compact(paths: Iterable[str]) -> Set[str]:
106    """Compact a path set to contain the minimal number of paths
107    necessary to contain all paths in the set. If /a/path/ and
108    /a/path/to/a/file.txt are both in the set, leave only the
109    shorter path."""
110
111    sep = os.path.sep
112    short_paths: Set[str] = set()
113    for path in sorted(paths, key=len):
114        should_skip = any(
115            path.startswith(shortpath.rstrip("*")) and
116            path[len(shortpath.rstrip("*").rstrip(sep))] == sep
117            for shortpath in short_paths
118        )
119        if not should_skip:
120            short_paths.add(path)
121    return short_paths
122
123
124def compress_for_rename(paths: Iterable[str]) -> Set[str]:
125    """Returns a set containing the paths that need to be renamed.
126
127    This set may include directories when the original sequence of paths
128    included every file on disk.
129    """
130    case_map = {os.path.normcase(p): p for p in paths}
131    remaining = set(case_map)
132    unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len)
133    wildcards: Set[str] = set()
134
135    def norm_join(*a: str) -> str:
136        return os.path.normcase(os.path.join(*a))
137
138    for root in unchecked:
139        if any(os.path.normcase(root).startswith(w)
140               for w in wildcards):
141            # This directory has already been handled.
142            continue
143
144        all_files: Set[str] = set()
145        all_subdirs: Set[str] = set()
146        for dirname, subdirs, files in os.walk(root):
147            all_subdirs.update(norm_join(root, dirname, d)
148                               for d in subdirs)
149            all_files.update(norm_join(root, dirname, f)
150                             for f in files)
151        # If all the files we found are in our remaining set of files to
152        # remove, then remove them from the latter set and add a wildcard
153        # for the directory.
154        if not (all_files - remaining):
155            remaining.difference_update(all_files)
156            wildcards.add(root + os.sep)
157
158    return set(map(case_map.__getitem__, remaining)) | wildcards
159
160
161def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]:
162    """Returns a tuple of 2 sets of which paths to display to user
163
164    The first set contains paths that would be deleted. Files of a package
165    are not added and the top-level directory of the package has a '*' added
166    at the end - to signify that all it's contents are removed.
167
168    The second set contains files that would have been skipped in the above
169    folders.
170    """
171
172    will_remove = set(paths)
173    will_skip = set()
174
175    # Determine folders and files
176    folders = set()
177    files = set()
178    for path in will_remove:
179        if path.endswith(".pyc"):
180            continue
181        if path.endswith("__init__.py") or ".dist-info" in path:
182            folders.add(os.path.dirname(path))
183        files.add(path)
184
185    # probably this one https://github.com/python/mypy/issues/390
186    _normcased_files = set(map(os.path.normcase, files))  # type: ignore
187
188    folders = compact(folders)
189
190    # This walks the tree using os.walk to not miss extra folders
191    # that might get added.
192    for folder in folders:
193        for dirpath, _, dirfiles in os.walk(folder):
194            for fname in dirfiles:
195                if fname.endswith(".pyc"):
196                    continue
197
198                file_ = os.path.join(dirpath, fname)
199                if (os.path.isfile(file_) and
200                        os.path.normcase(file_) not in _normcased_files):
201                    # We are skipping this file. Add it to the set.
202                    will_skip.add(file_)
203
204    will_remove = files | {
205        os.path.join(folder, "*") for folder in folders
206    }
207
208    return will_remove, will_skip
209
210
211class StashedUninstallPathSet:
212    """A set of file rename operations to stash files while
213    tentatively uninstalling them."""
214    def __init__(self) -> None:
215        # Mapping from source file root to [Adjacent]TempDirectory
216        # for files under that directory.
217        self._save_dirs: Dict[str, TempDirectory] = {}
218        # (old path, new path) tuples for each move that may need
219        # to be undone.
220        self._moves: List[Tuple[str, str]] = []
221
222    def _get_directory_stash(self, path: str) -> str:
223        """Stashes a directory.
224
225        Directories are stashed adjacent to their original location if
226        possible, or else moved/copied into the user's temp dir."""
227
228        try:
229            save_dir: TempDirectory = AdjacentTempDirectory(path)
230        except OSError:
231            save_dir = TempDirectory(kind="uninstall")
232        self._save_dirs[os.path.normcase(path)] = save_dir
233
234        return save_dir.path
235
236    def _get_file_stash(self, path: str) -> str:
237        """Stashes a file.
238
239        If no root has been provided, one will be created for the directory
240        in the user's temp directory."""
241        path = os.path.normcase(path)
242        head, old_head = os.path.dirname(path), None
243        save_dir = None
244
245        while head != old_head:
246            try:
247                save_dir = self._save_dirs[head]
248                break
249            except KeyError:
250                pass
251            head, old_head = os.path.dirname(head), head
252        else:
253            # Did not find any suitable root
254            head = os.path.dirname(path)
255            save_dir = TempDirectory(kind='uninstall')
256            self._save_dirs[head] = save_dir
257
258        relpath = os.path.relpath(path, head)
259        if relpath and relpath != os.path.curdir:
260            return os.path.join(save_dir.path, relpath)
261        return save_dir.path
262
263    def stash(self, path: str) -> str:
264        """Stashes the directory or file and returns its new location.
265        Handle symlinks as files to avoid modifying the symlink targets.
266        """
267        path_is_dir = os.path.isdir(path) and not os.path.islink(path)
268        if path_is_dir:
269            new_path = self._get_directory_stash(path)
270        else:
271            new_path = self._get_file_stash(path)
272
273        self._moves.append((path, new_path))
274        if (path_is_dir and os.path.isdir(new_path)):
275            # If we're moving a directory, we need to
276            # remove the destination first or else it will be
277            # moved to inside the existing directory.
278            # We just created new_path ourselves, so it will
279            # be removable.
280            os.rmdir(new_path)
281        renames(path, new_path)
282        return new_path
283
284    def commit(self) -> None:
285        """Commits the uninstall by removing stashed files."""
286        for _, save_dir in self._save_dirs.items():
287            save_dir.cleanup()
288        self._moves = []
289        self._save_dirs = {}
290
291    def rollback(self) -> None:
292        """Undoes the uninstall by moving stashed files back."""
293        for p in self._moves:
294            logger.info("Moving to %s\n from %s", *p)
295
296        for new_path, path in self._moves:
297            try:
298                logger.debug('Replacing %s from %s', new_path, path)
299                if os.path.isfile(new_path) or os.path.islink(new_path):
300                    os.unlink(new_path)
301                elif os.path.isdir(new_path):
302                    rmtree(new_path)
303                renames(path, new_path)
304            except OSError as ex:
305                logger.error("Failed to restore %s", new_path)
306                logger.debug("Exception: %s", ex)
307
308        self.commit()
309
310    @property
311    def can_rollback(self) -> bool:
312        return bool(self._moves)
313
314
315class UninstallPathSet:
316    """A set of file paths to be removed in the uninstallation of a
317    requirement."""
318    def __init__(self, dist: Distribution) -> None:
319        self.paths: Set[str] = set()
320        self._refuse: Set[str] = set()
321        self.pth: Dict[str, UninstallPthEntries] = {}
322        self.dist = dist
323        self._moved_paths = StashedUninstallPathSet()
324
325    def _permitted(self, path: str) -> bool:
326        """
327        Return True if the given path is one we are permitted to
328        remove/modify, False otherwise.
329
330        """
331        return is_local(path)
332
333    def add(self, path: str) -> None:
334        head, tail = os.path.split(path)
335
336        # we normalize the head to resolve parent directory symlinks, but not
337        # the tail, since we only want to uninstall symlinks, not their targets
338        path = os.path.join(normalize_path(head), os.path.normcase(tail))
339
340        if not os.path.exists(path):
341            return
342        if self._permitted(path):
343            self.paths.add(path)
344        else:
345            self._refuse.add(path)
346
347        # __pycache__ files can show up after 'installed-files.txt' is created,
348        # due to imports
349        if os.path.splitext(path)[1] == '.py':
350            self.add(cache_from_source(path))
351
352    def add_pth(self, pth_file: str, entry: str) -> None:
353        pth_file = normalize_path(pth_file)
354        if self._permitted(pth_file):
355            if pth_file not in self.pth:
356                self.pth[pth_file] = UninstallPthEntries(pth_file)
357            self.pth[pth_file].add(entry)
358        else:
359            self._refuse.add(pth_file)
360
361    def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None:
362        """Remove paths in ``self.paths`` with confirmation (unless
363        ``auto_confirm`` is True)."""
364
365        if not self.paths:
366            logger.info(
367                "Can't uninstall '%s'. No files were found to uninstall.",
368                self.dist.project_name,
369            )
370            return
371
372        dist_name_version = (
373            self.dist.project_name + "-" + self.dist.version
374        )
375        logger.info('Uninstalling %s:', dist_name_version)
376
377        with indent_log():
378            if auto_confirm or self._allowed_to_proceed(verbose):
379                moved = self._moved_paths
380
381                for_rename = compress_for_rename(self.paths)
382
383                for path in sorted(compact(for_rename)):
384                    moved.stash(path)
385                    logger.verbose('Removing file or directory %s', path)
386
387                for pth in self.pth.values():
388                    pth.remove()
389
390                logger.info('Successfully uninstalled %s', dist_name_version)
391
392    def _allowed_to_proceed(self, verbose: bool) -> bool:
393        """Display which files would be deleted and prompt for confirmation
394        """
395
396        def _display(msg: str, paths: Iterable[str]) -> None:
397            if not paths:
398                return
399
400            logger.info(msg)
401            with indent_log():
402                for path in sorted(compact(paths)):
403                    logger.info(path)
404
405        if not verbose:
406            will_remove, will_skip = compress_for_output_listing(self.paths)
407        else:
408            # In verbose mode, display all the files that are going to be
409            # deleted.
410            will_remove = set(self.paths)
411            will_skip = set()
412
413        _display('Would remove:', will_remove)
414        _display('Would not remove (might be manually added):', will_skip)
415        _display('Would not remove (outside of prefix):', self._refuse)
416        if verbose:
417            _display('Will actually move:', compress_for_rename(self.paths))
418
419        return ask('Proceed (Y/n)? ', ('y', 'n', '')) != 'n'
420
421    def rollback(self) -> None:
422        """Rollback the changes previously made by remove()."""
423        if not self._moved_paths.can_rollback:
424            logger.error(
425                "Can't roll back %s; was not uninstalled",
426                self.dist.project_name,
427            )
428            return
429        logger.info('Rolling back uninstall of %s', self.dist.project_name)
430        self._moved_paths.rollback()
431        for pth in self.pth.values():
432            pth.rollback()
433
434    def commit(self) -> None:
435        """Remove temporary save dir: rollback will no longer be possible."""
436        self._moved_paths.commit()
437
438    @classmethod
439    def from_dist(cls, dist: Distribution) -> "UninstallPathSet":
440        dist_path = normalize_path(dist.location)
441        if not dist_is_local(dist):
442            logger.info(
443                "Not uninstalling %s at %s, outside environment %s",
444                dist.key,
445                dist_path,
446                sys.prefix,
447            )
448            return cls(dist)
449
450        if dist_path in {p for p in {sysconfig.get_path("stdlib"),
451                                     sysconfig.get_path("platstdlib")}
452                         if p}:
453            logger.info(
454                "Not uninstalling %s at %s, as it is in the standard library.",
455                dist.key,
456                dist_path,
457            )
458            return cls(dist)
459
460        paths_to_remove = cls(dist)
461        develop_egg_link = egg_link_path(dist)
462        develop_egg_link_egg_info = '{}.egg-info'.format(
463            pkg_resources.to_filename(dist.project_name))
464        egg_info_exists = dist.egg_info and os.path.exists(dist.egg_info)
465        # Special case for distutils installed package
466        distutils_egg_info = getattr(dist._provider, 'path', None)
467
468        # Uninstall cases order do matter as in the case of 2 installs of the
469        # same package, pip needs to uninstall the currently detected version
470        if (egg_info_exists and dist.egg_info.endswith('.egg-info') and
471                not dist.egg_info.endswith(develop_egg_link_egg_info)):
472            # if dist.egg_info.endswith(develop_egg_link_egg_info), we
473            # are in fact in the develop_egg_link case
474            paths_to_remove.add(dist.egg_info)
475            if dist.has_metadata('installed-files.txt'):
476                for installed_file in dist.get_metadata(
477                        'installed-files.txt').splitlines():
478                    path = os.path.normpath(
479                        os.path.join(dist.egg_info, installed_file)
480                    )
481                    paths_to_remove.add(path)
482            # FIXME: need a test for this elif block
483            # occurs with --single-version-externally-managed/--record outside
484            # of pip
485            elif dist.has_metadata('top_level.txt'):
486                if dist.has_metadata('namespace_packages.txt'):
487                    namespaces = dist.get_metadata('namespace_packages.txt')
488                else:
489                    namespaces = []
490                for top_level_pkg in [
491                        p for p
492                        in dist.get_metadata('top_level.txt').splitlines()
493                        if p and p not in namespaces]:
494                    path = os.path.join(dist.location, top_level_pkg)
495                    paths_to_remove.add(path)
496                    paths_to_remove.add(path + '.py')
497                    paths_to_remove.add(path + '.pyc')
498                    paths_to_remove.add(path + '.pyo')
499
500        elif distutils_egg_info:
501            raise UninstallationError(
502                "Cannot uninstall {!r}. It is a distutils installed project "
503                "and thus we cannot accurately determine which files belong "
504                "to it which would lead to only a partial uninstall.".format(
505                    dist.project_name,
506                )
507            )
508
509        elif dist.location.endswith('.egg'):
510            # package installed by easy_install
511            # We cannot match on dist.egg_name because it can slightly vary
512            # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
513            paths_to_remove.add(dist.location)
514            easy_install_egg = os.path.split(dist.location)[1]
515            easy_install_pth = os.path.join(os.path.dirname(dist.location),
516                                            'easy-install.pth')
517            paths_to_remove.add_pth(easy_install_pth, './' + easy_install_egg)
518
519        elif egg_info_exists and dist.egg_info.endswith('.dist-info'):
520            for path in uninstallation_paths(dist):
521                paths_to_remove.add(path)
522
523        elif develop_egg_link:
524            # develop egg
525            with open(develop_egg_link) as fh:
526                link_pointer = os.path.normcase(fh.readline().strip())
527            assert (link_pointer == dist.location), (
528                'Egg-link {} does not match installed location of {} '
529                '(at {})'.format(
530                    link_pointer, dist.project_name, dist.location)
531            )
532            paths_to_remove.add(develop_egg_link)
533            easy_install_pth = os.path.join(os.path.dirname(develop_egg_link),
534                                            'easy-install.pth')
535            paths_to_remove.add_pth(easy_install_pth, dist.location)
536
537        else:
538            logger.debug(
539                'Not sure how to uninstall: %s - Check: %s',
540                dist, dist.location,
541            )
542
543        # find distutils scripts= scripts
544        if dist.has_metadata('scripts') and dist.metadata_isdir('scripts'):
545            for script in dist.metadata_listdir('scripts'):
546                if dist_in_usersite(dist):
547                    bin_dir = get_bin_user()
548                else:
549                    bin_dir = get_bin_prefix()
550                paths_to_remove.add(os.path.join(bin_dir, script))
551                if WINDOWS:
552                    paths_to_remove.add(os.path.join(bin_dir, script) + '.bat')
553
554        # find console_scripts
555        _scripts_to_remove = []
556        console_scripts = dist.get_entry_map(group='console_scripts')
557        for name in console_scripts.keys():
558            _scripts_to_remove.extend(_script_names(dist, name, False))
559        # find gui_scripts
560        gui_scripts = dist.get_entry_map(group='gui_scripts')
561        for name in gui_scripts.keys():
562            _scripts_to_remove.extend(_script_names(dist, name, True))
563
564        for s in _scripts_to_remove:
565            paths_to_remove.add(s)
566
567        return paths_to_remove
568
569
570class UninstallPthEntries:
571    def __init__(self, pth_file: str) -> None:
572        self.file = pth_file
573        self.entries: Set[str] = set()
574        self._saved_lines: Optional[List[bytes]] = None
575
576    def add(self, entry: str) -> None:
577        entry = os.path.normcase(entry)
578        # On Windows, os.path.normcase converts the entry to use
579        # backslashes.  This is correct for entries that describe absolute
580        # paths outside of site-packages, but all the others use forward
581        # slashes.
582        # os.path.splitdrive is used instead of os.path.isabs because isabs
583        # treats non-absolute paths with drive letter markings like c:foo\bar
584        # as absolute paths. It also does not recognize UNC paths if they don't
585        # have more than "\\sever\share". Valid examples: "\\server\share\" or
586        # "\\server\share\folder".
587        if WINDOWS and not os.path.splitdrive(entry)[0]:
588            entry = entry.replace('\\', '/')
589        self.entries.add(entry)
590
591    def remove(self) -> None:
592        logger.verbose('Removing pth entries from %s:', self.file)
593
594        # If the file doesn't exist, log a warning and return
595        if not os.path.isfile(self.file):
596            logger.warning(
597                "Cannot remove entries from nonexistent file %s", self.file
598            )
599            return
600        with open(self.file, 'rb') as fh:
601            # windows uses '\r\n' with py3k, but uses '\n' with py2.x
602            lines = fh.readlines()
603            self._saved_lines = lines
604        if any(b'\r\n' in line for line in lines):
605            endline = '\r\n'
606        else:
607            endline = '\n'
608        # handle missing trailing newline
609        if lines and not lines[-1].endswith(endline.encode("utf-8")):
610            lines[-1] = lines[-1] + endline.encode("utf-8")
611        for entry in self.entries:
612            try:
613                logger.verbose('Removing entry: %s', entry)
614                lines.remove((entry + endline).encode("utf-8"))
615            except ValueError:
616                pass
617        with open(self.file, 'wb') as fh:
618            fh.writelines(lines)
619
620    def rollback(self) -> bool:
621        if self._saved_lines is None:
622            logger.error(
623                'Cannot roll back changes to %s, none were made', self.file
624            )
625            return False
626        logger.debug('Rolling %s back to previous state', self.file)
627        with open(self.file, 'wb') as fh:
628            fh.writelines(self._saved_lines)
629        return True
630