1"""Handles all VCS (version control) support"""
2
3import logging
4import os
5import shutil
6import sys
7import urllib.parse
8
9from pip._vendor import pkg_resources
10
11from pip._internal.exceptions import BadCommand, InstallationError
12from pip._internal.utils.misc import (
13    ask_path_exists,
14    backup_dir,
15    display_path,
16    hide_url,
17    hide_value,
18    rmtree,
19)
20from pip._internal.utils.subprocess import call_subprocess, make_command
21from pip._internal.utils.typing import MYPY_CHECK_RUNNING
22from pip._internal.utils.urls import get_url_scheme
23
24if MYPY_CHECK_RUNNING:
25    from typing import (
26        Any,
27        Dict,
28        Iterable,
29        Iterator,
30        List,
31        Mapping,
32        Optional,
33        Tuple,
34        Type,
35        Union,
36    )
37
38    from pip._internal.cli.spinners import SpinnerInterface
39    from pip._internal.utils.misc import HiddenText
40    from pip._internal.utils.subprocess import CommandArgs
41
42    AuthInfo = Tuple[Optional[str], Optional[str]]
43
44
45__all__ = ['vcs']
46
47
48logger = logging.getLogger(__name__)
49
50
51def is_url(name):
52    # type: (str) -> bool
53    """
54    Return true if the name looks like a URL.
55    """
56    scheme = get_url_scheme(name)
57    if scheme is None:
58        return False
59    return scheme in ['http', 'https', 'file', 'ftp'] + vcs.all_schemes
60
61
62def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None):
63    # type: (str, str, str, Optional[str]) -> str
64    """
65    Return the URL for a VCS requirement.
66
67    Args:
68      repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
69      project_name: the (unescaped) project name.
70    """
71    egg_project_name = pkg_resources.to_filename(project_name)
72    req = f'{repo_url}@{rev}#egg={egg_project_name}'
73    if subdir:
74        req += f'&subdirectory={subdir}'
75
76    return req
77
78
79def find_path_to_setup_from_repo_root(location, repo_root):
80    # type: (str, str) -> Optional[str]
81    """
82    Find the path to `setup.py` by searching up the filesystem from `location`.
83    Return the path to `setup.py` relative to `repo_root`.
84    Return None if `setup.py` is in `repo_root` or cannot be found.
85    """
86    # find setup.py
87    orig_location = location
88    while not os.path.exists(os.path.join(location, 'setup.py')):
89        last_location = location
90        location = os.path.dirname(location)
91        if location == last_location:
92            # We've traversed up to the root of the filesystem without
93            # finding setup.py
94            logger.warning(
95                "Could not find setup.py for directory %s (tried all "
96                "parent directories)",
97                orig_location,
98            )
99            return None
100
101    if os.path.samefile(repo_root, location):
102        return None
103
104    return os.path.relpath(location, repo_root)
105
106
107class RemoteNotFoundError(Exception):
108    pass
109
110
111class RevOptions:
112
113    """
114    Encapsulates a VCS-specific revision to install, along with any VCS
115    install options.
116
117    Instances of this class should be treated as if immutable.
118    """
119
120    def __init__(
121        self,
122        vc_class,  # type: Type[VersionControl]
123        rev=None,  # type: Optional[str]
124        extra_args=None,  # type: Optional[CommandArgs]
125    ):
126        # type: (...) -> None
127        """
128        Args:
129          vc_class: a VersionControl subclass.
130          rev: the name of the revision to install.
131          extra_args: a list of extra options.
132        """
133        if extra_args is None:
134            extra_args = []
135
136        self.extra_args = extra_args
137        self.rev = rev
138        self.vc_class = vc_class
139        self.branch_name = None  # type: Optional[str]
140
141    def __repr__(self):
142        # type: () -> str
143        return f'<RevOptions {self.vc_class.name}: rev={self.rev!r}>'
144
145    @property
146    def arg_rev(self):
147        # type: () -> Optional[str]
148        if self.rev is None:
149            return self.vc_class.default_arg_rev
150
151        return self.rev
152
153    def to_args(self):
154        # type: () -> CommandArgs
155        """
156        Return the VCS-specific command arguments.
157        """
158        args = []  # type: CommandArgs
159        rev = self.arg_rev
160        if rev is not None:
161            args += self.vc_class.get_base_rev_args(rev)
162        args += self.extra_args
163
164        return args
165
166    def to_display(self):
167        # type: () -> str
168        if not self.rev:
169            return ''
170
171        return f' (to revision {self.rev})'
172
173    def make_new(self, rev):
174        # type: (str) -> RevOptions
175        """
176        Make a copy of the current instance, but with a new rev.
177
178        Args:
179          rev: the name of the revision for the new object.
180        """
181        return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
182
183
184class VcsSupport:
185    _registry = {}  # type: Dict[str, VersionControl]
186    schemes = ['ssh', 'git', 'hg', 'bzr', 'sftp', 'svn']
187
188    def __init__(self):
189        # type: () -> None
190        # Register more schemes with urlparse for various version control
191        # systems
192        urllib.parse.uses_netloc.extend(self.schemes)
193        super().__init__()
194
195    def __iter__(self):
196        # type: () -> Iterator[str]
197        return self._registry.__iter__()
198
199    @property
200    def backends(self):
201        # type: () -> List[VersionControl]
202        return list(self._registry.values())
203
204    @property
205    def dirnames(self):
206        # type: () -> List[str]
207        return [backend.dirname for backend in self.backends]
208
209    @property
210    def all_schemes(self):
211        # type: () -> List[str]
212        schemes = []  # type: List[str]
213        for backend in self.backends:
214            schemes.extend(backend.schemes)
215        return schemes
216
217    def register(self, cls):
218        # type: (Type[VersionControl]) -> None
219        if not hasattr(cls, 'name'):
220            logger.warning('Cannot register VCS %s', cls.__name__)
221            return
222        if cls.name not in self._registry:
223            self._registry[cls.name] = cls()
224            logger.debug('Registered VCS backend: %s', cls.name)
225
226    def unregister(self, name):
227        # type: (str) -> None
228        if name in self._registry:
229            del self._registry[name]
230
231    def get_backend_for_dir(self, location):
232        # type: (str) -> Optional[VersionControl]
233        """
234        Return a VersionControl object if a repository of that type is found
235        at the given directory.
236        """
237        vcs_backends = {}
238        for vcs_backend in self._registry.values():
239            repo_path = vcs_backend.get_repository_root(location)
240            if not repo_path:
241                continue
242            logger.debug('Determine that %s uses VCS: %s',
243                         location, vcs_backend.name)
244            vcs_backends[repo_path] = vcs_backend
245
246        if not vcs_backends:
247            return None
248
249        # Choose the VCS in the inner-most directory. Since all repository
250        # roots found here would be either `location` or one of its
251        # parents, the longest path should have the most path components,
252        # i.e. the backend representing the inner-most repository.
253        inner_most_repo_path = max(vcs_backends, key=len)
254        return vcs_backends[inner_most_repo_path]
255
256    def get_backend_for_scheme(self, scheme):
257        # type: (str) -> Optional[VersionControl]
258        """
259        Return a VersionControl object or None.
260        """
261        for vcs_backend in self._registry.values():
262            if scheme in vcs_backend.schemes:
263                return vcs_backend
264        return None
265
266    def get_backend(self, name):
267        # type: (str) -> Optional[VersionControl]
268        """
269        Return a VersionControl object or None.
270        """
271        name = name.lower()
272        return self._registry.get(name)
273
274
275vcs = VcsSupport()
276
277
278class VersionControl:
279    name = ''
280    dirname = ''
281    repo_name = ''
282    # List of supported schemes for this Version Control
283    schemes = ()  # type: Tuple[str, ...]
284    # Iterable of environment variable names to pass to call_subprocess().
285    unset_environ = ()  # type: Tuple[str, ...]
286    default_arg_rev = None  # type: Optional[str]
287
288    @classmethod
289    def should_add_vcs_url_prefix(cls, remote_url):
290        # type: (str) -> bool
291        """
292        Return whether the vcs prefix (e.g. "git+") should be added to a
293        repository's remote url when used in a requirement.
294        """
295        return not remote_url.lower().startswith(f'{cls.name}:')
296
297    @classmethod
298    def get_subdirectory(cls, location):
299        # type: (str) -> Optional[str]
300        """
301        Return the path to setup.py, relative to the repo root.
302        Return None if setup.py is in the repo root.
303        """
304        return None
305
306    @classmethod
307    def get_requirement_revision(cls, repo_dir):
308        # type: (str) -> str
309        """
310        Return the revision string that should be used in a requirement.
311        """
312        return cls.get_revision(repo_dir)
313
314    @classmethod
315    def get_src_requirement(cls, repo_dir, project_name):
316        # type: (str, str) -> str
317        """
318        Return the requirement string to use to redownload the files
319        currently at the given repository directory.
320
321        Args:
322          project_name: the (unescaped) project name.
323
324        The return value has a form similar to the following:
325
326            {repository_url}@{revision}#egg={project_name}
327        """
328        repo_url = cls.get_remote_url(repo_dir)
329
330        if cls.should_add_vcs_url_prefix(repo_url):
331            repo_url = f'{cls.name}+{repo_url}'
332
333        revision = cls.get_requirement_revision(repo_dir)
334        subdir = cls.get_subdirectory(repo_dir)
335        req = make_vcs_requirement_url(repo_url, revision, project_name,
336                                       subdir=subdir)
337
338        return req
339
340    @staticmethod
341    def get_base_rev_args(rev):
342        # type: (str) -> List[str]
343        """
344        Return the base revision arguments for a vcs command.
345
346        Args:
347          rev: the name of a revision to install.  Cannot be None.
348        """
349        raise NotImplementedError
350
351    def is_immutable_rev_checkout(self, url, dest):
352        # type: (str, str) -> bool
353        """
354        Return true if the commit hash checked out at dest matches
355        the revision in url.
356
357        Always return False, if the VCS does not support immutable commit
358        hashes.
359
360        This method does not check if there are local uncommitted changes
361        in dest after checkout, as pip currently has no use case for that.
362        """
363        return False
364
365    @classmethod
366    def make_rev_options(cls, rev=None, extra_args=None):
367        # type: (Optional[str], Optional[CommandArgs]) -> RevOptions
368        """
369        Return a RevOptions object.
370
371        Args:
372          rev: the name of a revision to install.
373          extra_args: a list of extra options.
374        """
375        return RevOptions(cls, rev, extra_args=extra_args)
376
377    @classmethod
378    def _is_local_repository(cls, repo):
379        # type: (str) -> bool
380        """
381           posix absolute paths start with os.path.sep,
382           win32 ones start with drive (like c:\\folder)
383        """
384        drive, tail = os.path.splitdrive(repo)
385        return repo.startswith(os.path.sep) or bool(drive)
386
387    def export(self, location, url):
388        # type: (str, HiddenText) -> None
389        """
390        Export the repository at the url to the destination location
391        i.e. only download the files, without vcs informations
392
393        :param url: the repository URL starting with a vcs prefix.
394        """
395        raise NotImplementedError
396
397    @classmethod
398    def get_netloc_and_auth(cls, netloc, scheme):
399        # type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]]
400        """
401        Parse the repository URL's netloc, and return the new netloc to use
402        along with auth information.
403
404        Args:
405          netloc: the original repository URL netloc.
406          scheme: the repository URL's scheme without the vcs prefix.
407
408        This is mainly for the Subversion class to override, so that auth
409        information can be provided via the --username and --password options
410        instead of through the URL.  For other subclasses like Git without
411        such an option, auth information must stay in the URL.
412
413        Returns: (netloc, (username, password)).
414        """
415        return netloc, (None, None)
416
417    @classmethod
418    def get_url_rev_and_auth(cls, url):
419        # type: (str) -> Tuple[str, Optional[str], AuthInfo]
420        """
421        Parse the repository URL to use, and return the URL, revision,
422        and auth info to use.
423
424        Returns: (url, rev, (username, password)).
425        """
426        scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
427        if '+' not in scheme:
428            raise ValueError(
429                "Sorry, {!r} is a malformed VCS url. "
430                "The format is <vcs>+<protocol>://<url>, "
431                "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
432            )
433        # Remove the vcs prefix.
434        scheme = scheme.split('+', 1)[1]
435        netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
436        rev = None
437        if '@' in path:
438            path, rev = path.rsplit('@', 1)
439            if not rev:
440                raise InstallationError(
441                    "The URL {!r} has an empty revision (after @) "
442                    "which is not supported. Include a revision after @ "
443                    "or remove @ from the URL.".format(url)
444                )
445        url = urllib.parse.urlunsplit((scheme, netloc, path, query, ''))
446        return url, rev, user_pass
447
448    @staticmethod
449    def make_rev_args(username, password):
450        # type: (Optional[str], Optional[HiddenText]) -> CommandArgs
451        """
452        Return the RevOptions "extra arguments" to use in obtain().
453        """
454        return []
455
456    def get_url_rev_options(self, url):
457        # type: (HiddenText) -> Tuple[HiddenText, RevOptions]
458        """
459        Return the URL and RevOptions object to use in obtain() and in
460        some cases export(), as a tuple (url, rev_options).
461        """
462        secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
463        username, secret_password = user_pass
464        password = None  # type: Optional[HiddenText]
465        if secret_password is not None:
466            password = hide_value(secret_password)
467        extra_args = self.make_rev_args(username, password)
468        rev_options = self.make_rev_options(rev, extra_args=extra_args)
469
470        return hide_url(secret_url), rev_options
471
472    @staticmethod
473    def normalize_url(url):
474        # type: (str) -> str
475        """
476        Normalize a URL for comparison by unquoting it and removing any
477        trailing slash.
478        """
479        return urllib.parse.unquote(url).rstrip('/')
480
481    @classmethod
482    def compare_urls(cls, url1, url2):
483        # type: (str, str) -> bool
484        """
485        Compare two repo URLs for identity, ignoring incidental differences.
486        """
487        return (cls.normalize_url(url1) == cls.normalize_url(url2))
488
489    def fetch_new(self, dest, url, rev_options):
490        # type: (str, HiddenText, RevOptions) -> None
491        """
492        Fetch a revision from a repository, in the case that this is the
493        first fetch from the repository.
494
495        Args:
496          dest: the directory to fetch the repository to.
497          rev_options: a RevOptions object.
498        """
499        raise NotImplementedError
500
501    def switch(self, dest, url, rev_options):
502        # type: (str, HiddenText, RevOptions) -> None
503        """
504        Switch the repo at ``dest`` to point to ``URL``.
505
506        Args:
507          rev_options: a RevOptions object.
508        """
509        raise NotImplementedError
510
511    def update(self, dest, url, rev_options):
512        # type: (str, HiddenText, RevOptions) -> None
513        """
514        Update an already-existing repo to the given ``rev_options``.
515
516        Args:
517          rev_options: a RevOptions object.
518        """
519        raise NotImplementedError
520
521    @classmethod
522    def is_commit_id_equal(cls, dest, name):
523        # type: (str, Optional[str]) -> bool
524        """
525        Return whether the id of the current commit equals the given name.
526
527        Args:
528          dest: the repository directory.
529          name: a string name.
530        """
531        raise NotImplementedError
532
533    def obtain(self, dest, url):
534        # type: (str, HiddenText) -> None
535        """
536        Install or update in editable mode the package represented by this
537        VersionControl object.
538
539        :param dest: the repository directory in which to install or update.
540        :param url: the repository URL starting with a vcs prefix.
541        """
542        url, rev_options = self.get_url_rev_options(url)
543
544        if not os.path.exists(dest):
545            self.fetch_new(dest, url, rev_options)
546            return
547
548        rev_display = rev_options.to_display()
549        if self.is_repository_directory(dest):
550            existing_url = self.get_remote_url(dest)
551            if self.compare_urls(existing_url, url.secret):
552                logger.debug(
553                    '%s in %s exists, and has correct URL (%s)',
554                    self.repo_name.title(),
555                    display_path(dest),
556                    url,
557                )
558                if not self.is_commit_id_equal(dest, rev_options.rev):
559                    logger.info(
560                        'Updating %s %s%s',
561                        display_path(dest),
562                        self.repo_name,
563                        rev_display,
564                    )
565                    self.update(dest, url, rev_options)
566                else:
567                    logger.info('Skipping because already up-to-date.')
568                return
569
570            logger.warning(
571                '%s %s in %s exists with URL %s',
572                self.name,
573                self.repo_name,
574                display_path(dest),
575                existing_url,
576            )
577            prompt = ('(s)witch, (i)gnore, (w)ipe, (b)ackup ',
578                      ('s', 'i', 'w', 'b'))
579        else:
580            logger.warning(
581                'Directory %s already exists, and is not a %s %s.',
582                dest,
583                self.name,
584                self.repo_name,
585            )
586            # https://github.com/python/mypy/issues/1174
587            prompt = ('(i)gnore, (w)ipe, (b)ackup ',  # type: ignore
588                      ('i', 'w', 'b'))
589
590        logger.warning(
591            'The plan is to install the %s repository %s',
592            self.name,
593            url,
594        )
595        response = ask_path_exists('What to do?  {}'.format(
596            prompt[0]), prompt[1])
597
598        if response == 'a':
599            sys.exit(-1)
600
601        if response == 'w':
602            logger.warning('Deleting %s', display_path(dest))
603            rmtree(dest)
604            self.fetch_new(dest, url, rev_options)
605            return
606
607        if response == 'b':
608            dest_dir = backup_dir(dest)
609            logger.warning(
610                'Backing up %s to %s', display_path(dest), dest_dir,
611            )
612            shutil.move(dest, dest_dir)
613            self.fetch_new(dest, url, rev_options)
614            return
615
616        # Do nothing if the response is "i".
617        if response == 's':
618            logger.info(
619                'Switching %s %s to %s%s',
620                self.repo_name,
621                display_path(dest),
622                url,
623                rev_display,
624            )
625            self.switch(dest, url, rev_options)
626
627    def unpack(self, location, url):
628        # type: (str, HiddenText) -> None
629        """
630        Clean up current location and download the url repository
631        (and vcs infos) into location
632
633        :param url: the repository URL starting with a vcs prefix.
634        """
635        if os.path.exists(location):
636            rmtree(location)
637        self.obtain(location, url=url)
638
639    @classmethod
640    def get_remote_url(cls, location):
641        # type: (str) -> str
642        """
643        Return the url used at location
644
645        Raises RemoteNotFoundError if the repository does not have a remote
646        url configured.
647        """
648        raise NotImplementedError
649
650    @classmethod
651    def get_revision(cls, location):
652        # type: (str) -> str
653        """
654        Return the current commit id of the files at the given location.
655        """
656        raise NotImplementedError
657
658    @classmethod
659    def run_command(
660        cls,
661        cmd,  # type: Union[List[str], CommandArgs]
662        show_stdout=True,  # type: bool
663        cwd=None,  # type: Optional[str]
664        on_returncode='raise',  # type: str
665        extra_ok_returncodes=None,  # type: Optional[Iterable[int]]
666        command_desc=None,  # type: Optional[str]
667        extra_environ=None,  # type: Optional[Mapping[str, Any]]
668        spinner=None,  # type: Optional[SpinnerInterface]
669        log_failed_cmd=True,  # type: bool
670        stdout_only=False,  # type: bool
671    ):
672        # type: (...) -> str
673        """
674        Run a VCS subcommand
675        This is simply a wrapper around call_subprocess that adds the VCS
676        command name, and checks that the VCS is available
677        """
678        cmd = make_command(cls.name, *cmd)
679        try:
680            return call_subprocess(cmd, show_stdout, cwd,
681                                   on_returncode=on_returncode,
682                                   extra_ok_returncodes=extra_ok_returncodes,
683                                   command_desc=command_desc,
684                                   extra_environ=extra_environ,
685                                   unset_environ=cls.unset_environ,
686                                   spinner=spinner,
687                                   log_failed_cmd=log_failed_cmd,
688                                   stdout_only=stdout_only)
689        except FileNotFoundError:
690            # errno.ENOENT = no such file or directory
691            # In other words, the VCS executable isn't available
692            raise BadCommand(
693                'Cannot find command {cls.name!r} - do you have '
694                '{cls.name!r} installed and in your '
695                'PATH?'.format(**locals()))
696
697    @classmethod
698    def is_repository_directory(cls, path):
699        # type: (str) -> bool
700        """
701        Return whether a directory path is a repository directory.
702        """
703        logger.debug('Checking in %s for %s (%s)...',
704                     path, cls.dirname, cls.name)
705        return os.path.exists(os.path.join(path, cls.dirname))
706
707    @classmethod
708    def get_repository_root(cls, location):
709        # type: (str) -> Optional[str]
710        """
711        Return the "root" (top-level) directory controlled by the vcs,
712        or `None` if the directory is not in any.
713
714        It is meant to be overridden to implement smarter detection
715        mechanisms for specific vcs.
716
717        This can do more than is_repository_directory() alone. For
718        example, the Git override checks that Git is actually available.
719        """
720        if cls.is_repository_directory(location):
721            return location
722        return None
723