1import logging
2from collections import OrderedDict
3
4from pip._vendor.packaging.utils import canonicalize_name
5
6from pip._internal.exceptions import InstallationError
7from pip._internal.models.wheel import Wheel
8from pip._internal.utils import compatibility_tags
9from pip._internal.utils.typing import MYPY_CHECK_RUNNING
10
11if MYPY_CHECK_RUNNING:
12    from typing import Dict, Iterable, List, Optional, Tuple
13
14    from pip._internal.req.req_install import InstallRequirement
15
16
17logger = logging.getLogger(__name__)
18
19
20class RequirementSet:
21
22    def __init__(self, check_supported_wheels=True):
23        # type: (bool) -> None
24        """Create a RequirementSet.
25        """
26
27        self.requirements = OrderedDict()  # type: Dict[str, InstallRequirement]
28        self.check_supported_wheels = check_supported_wheels
29
30        self.unnamed_requirements = []  # type: List[InstallRequirement]
31
32    def __str__(self):
33        # type: () -> str
34        requirements = sorted(
35            (req for req in self.requirements.values() if not req.comes_from),
36            key=lambda req: canonicalize_name(req.name),
37        )
38        return ' '.join(str(req.req) for req in requirements)
39
40    def __repr__(self):
41        # type: () -> str
42        requirements = sorted(
43            self.requirements.values(),
44            key=lambda req: canonicalize_name(req.name),
45        )
46
47        format_string = '<{classname} object; {count} requirement(s): {reqs}>'
48        return format_string.format(
49            classname=self.__class__.__name__,
50            count=len(requirements),
51            reqs=', '.join(str(req.req) for req in requirements),
52        )
53
54    def add_unnamed_requirement(self, install_req):
55        # type: (InstallRequirement) -> None
56        assert not install_req.name
57        self.unnamed_requirements.append(install_req)
58
59    def add_named_requirement(self, install_req):
60        # type: (InstallRequirement) -> None
61        assert install_req.name
62
63        project_name = canonicalize_name(install_req.name)
64        self.requirements[project_name] = install_req
65
66    def add_requirement(
67        self,
68        install_req,  # type: InstallRequirement
69        parent_req_name=None,  # type: Optional[str]
70        extras_requested=None  # type: Optional[Iterable[str]]
71    ):
72        # type: (...) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]]
73        """Add install_req as a requirement to install.
74
75        :param parent_req_name: The name of the requirement that needed this
76            added. The name is used because when multiple unnamed requirements
77            resolve to the same name, we could otherwise end up with dependency
78            links that point outside the Requirements set. parent_req must
79            already be added. Note that None implies that this is a user
80            supplied requirement, vs an inferred one.
81        :param extras_requested: an iterable of extras used to evaluate the
82            environment markers.
83        :return: Additional requirements to scan. That is either [] if
84            the requirement is not applicable, or [install_req] if the
85            requirement is applicable and has just been added.
86        """
87        # If the markers do not match, ignore this requirement.
88        if not install_req.match_markers(extras_requested):
89            logger.info(
90                "Ignoring %s: markers '%s' don't match your environment",
91                install_req.name, install_req.markers,
92            )
93            return [], None
94
95        # If the wheel is not supported, raise an error.
96        # Should check this after filtering out based on environment markers to
97        # allow specifying different wheels based on the environment/OS, in a
98        # single requirements file.
99        if install_req.link and install_req.link.is_wheel:
100            wheel = Wheel(install_req.link.filename)
101            tags = compatibility_tags.get_supported()
102            if (self.check_supported_wheels and not wheel.supported(tags)):
103                raise InstallationError(
104                    "{} is not a supported wheel on this platform.".format(
105                        wheel.filename)
106                )
107
108        # This next bit is really a sanity check.
109        assert not install_req.user_supplied or parent_req_name is None, (
110            "a user supplied req shouldn't have a parent"
111        )
112
113        # Unnamed requirements are scanned again and the requirement won't be
114        # added as a dependency until after scanning.
115        if not install_req.name:
116            self.add_unnamed_requirement(install_req)
117            return [install_req], None
118
119        try:
120            existing_req = self.get_requirement(
121                install_req.name)  # type: Optional[InstallRequirement]
122        except KeyError:
123            existing_req = None
124
125        has_conflicting_requirement = (
126            parent_req_name is None and
127            existing_req and
128            not existing_req.constraint and
129            existing_req.extras == install_req.extras and
130            existing_req.req.specifier != install_req.req.specifier
131        )
132        if has_conflicting_requirement:
133            raise InstallationError(
134                "Double requirement given: {} (already in {}, name={!r})"
135                .format(install_req, existing_req, install_req.name)
136            )
137
138        # When no existing requirement exists, add the requirement as a
139        # dependency and it will be scanned again after.
140        if not existing_req:
141            self.add_named_requirement(install_req)
142            # We'd want to rescan this requirement later
143            return [install_req], install_req
144
145        # Assume there's no need to scan, and that we've already
146        # encountered this for scanning.
147        if install_req.constraint or not existing_req.constraint:
148            return [], existing_req
149
150        does_not_satisfy_constraint = (
151            install_req.link and
152            not (
153                existing_req.link and
154                install_req.link.path == existing_req.link.path
155            )
156        )
157        if does_not_satisfy_constraint:
158            raise InstallationError(
159                "Could not satisfy constraints for '{}': "
160                "installation from path or url cannot be "
161                "constrained to a version".format(install_req.name)
162            )
163        # If we're now installing a constraint, mark the existing
164        # object for real installation.
165        existing_req.constraint = False
166        # If we're now installing a user supplied requirement,
167        # mark the existing object as such.
168        if install_req.user_supplied:
169            existing_req.user_supplied = True
170        existing_req.extras = tuple(sorted(
171            set(existing_req.extras) | set(install_req.extras)
172        ))
173        logger.debug(
174            "Setting %s extras to: %s",
175            existing_req, existing_req.extras,
176        )
177        # Return the existing requirement for addition to the parent and
178        # scanning again.
179        return [existing_req], existing_req
180
181    def has_requirement(self, name):
182        # type: (str) -> bool
183        project_name = canonicalize_name(name)
184
185        return (
186            project_name in self.requirements and
187            not self.requirements[project_name].constraint
188        )
189
190    def get_requirement(self, name):
191        # type: (str) -> InstallRequirement
192        project_name = canonicalize_name(name)
193
194        if project_name in self.requirements:
195            return self.requirements[project_name]
196
197        raise KeyError("No project with the name {name!r}".format(**locals()))
198
199    @property
200    def all_requirements(self):
201        # type: () -> List[InstallRequirement]
202        return self.unnamed_requirements + list(self.requirements.values())
203