1# The following comment should be removed at some point in the future. 2# mypy: strict-optional=False 3# mypy: disallow-untyped-defs=False 4 5from __future__ import absolute_import 6 7import collections 8import logging 9import os 10import re 11 12from pipenv.patched.notpip._vendor import six 13from pipenv.patched.notpip._vendor.packaging.utils import canonicalize_name 14from pipenv.patched.notpip._vendor.pkg_resources import RequirementParseError 15 16from pipenv.patched.notpip._internal.exceptions import BadCommand, InstallationError 17from pipenv.patched.notpip._internal.req.constructors import ( 18 install_req_from_editable, 19 install_req_from_line, 20) 21from pipenv.patched.notpip._internal.req.req_file import COMMENT_RE 22from pipenv.patched.notpip._internal.utils.misc import ( 23 dist_is_editable, 24 get_installed_distributions, 25) 26from pipenv.patched.notpip._internal.utils.typing import MYPY_CHECK_RUNNING 27 28if MYPY_CHECK_RUNNING: 29 from typing import ( 30 Iterator, Optional, List, Container, Set, Dict, Tuple, Iterable, Union 31 ) 32 from pipenv.patched.notpip._internal.cache import WheelCache 33 from pipenv.patched.notpip._vendor.pkg_resources import ( 34 Distribution, Requirement 35 ) 36 37 RequirementInfo = Tuple[Optional[Union[str, Requirement]], bool, List[str]] 38 39 40logger = logging.getLogger(__name__) 41 42 43def freeze( 44 requirement=None, # type: Optional[List[str]] 45 find_links=None, # type: Optional[List[str]] 46 local_only=None, # type: Optional[bool] 47 user_only=None, # type: Optional[bool] 48 paths=None, # type: Optional[List[str]] 49 skip_regex=None, # type: Optional[str] 50 isolated=False, # type: bool 51 wheel_cache=None, # type: Optional[WheelCache] 52 exclude_editable=False, # type: bool 53 skip=() # type: Container[str] 54): 55 # type: (...) -> Iterator[str] 56 find_links = find_links or [] 57 skip_match = None 58 59 if skip_regex: 60 skip_match = re.compile(skip_regex).search 61 62 for link in find_links: 63 yield '-f %s' % link 64 installations = {} # type: Dict[str, FrozenRequirement] 65 for dist in get_installed_distributions(local_only=local_only, 66 skip=(), 67 user_only=user_only, 68 paths=paths): 69 try: 70 req = FrozenRequirement.from_dist(dist) 71 except RequirementParseError as exc: 72 # We include dist rather than dist.project_name because the 73 # dist string includes more information, like the version and 74 # location. We also include the exception message to aid 75 # troubleshooting. 76 logger.warning( 77 'Could not generate requirement for distribution %r: %s', 78 dist, exc 79 ) 80 continue 81 if exclude_editable and req.editable: 82 continue 83 installations[req.canonical_name] = req 84 85 if requirement: 86 # the options that don't get turned into an InstallRequirement 87 # should only be emitted once, even if the same option is in multiple 88 # requirements files, so we need to keep track of what has been emitted 89 # so that we don't emit it again if it's seen again 90 emitted_options = set() # type: Set[str] 91 # keep track of which files a requirement is in so that we can 92 # give an accurate warning if a requirement appears multiple times. 93 req_files = collections.defaultdict(list) # type: Dict[str, List[str]] 94 for req_file_path in requirement: 95 with open(req_file_path) as req_file: 96 for line in req_file: 97 if (not line.strip() or 98 line.strip().startswith('#') or 99 (skip_match and skip_match(line)) or 100 line.startswith(( 101 '-r', '--requirement', 102 '-Z', '--always-unzip', 103 '-f', '--find-links', 104 '-i', '--index-url', 105 '--pre', 106 '--trusted-host', 107 '--process-dependency-links', 108 '--extra-index-url'))): 109 line = line.rstrip() 110 if line not in emitted_options: 111 emitted_options.add(line) 112 yield line 113 continue 114 115 if line.startswith('-e') or line.startswith('--editable'): 116 if line.startswith('-e'): 117 line = line[2:].strip() 118 else: 119 line = line[len('--editable'):].strip().lstrip('=') 120 line_req = install_req_from_editable( 121 line, 122 isolated=isolated, 123 wheel_cache=wheel_cache, 124 ) 125 else: 126 line_req = install_req_from_line( 127 COMMENT_RE.sub('', line).strip(), 128 isolated=isolated, 129 wheel_cache=wheel_cache, 130 ) 131 132 if not line_req.name: 133 logger.info( 134 "Skipping line in requirement file [%s] because " 135 "it's not clear what it would install: %s", 136 req_file_path, line.strip(), 137 ) 138 logger.info( 139 " (add #egg=PackageName to the URL to avoid" 140 " this warning)" 141 ) 142 else: 143 line_req_canonical_name = canonicalize_name( 144 line_req.name) 145 if line_req_canonical_name not in installations: 146 # either it's not installed, or it is installed 147 # but has been processed already 148 if not req_files[line_req.name]: 149 logger.warning( 150 "Requirement file [%s] contains %s, but " 151 "package %r is not installed", 152 req_file_path, 153 COMMENT_RE.sub('', line).strip(), 154 line_req.name 155 ) 156 else: 157 req_files[line_req.name].append(req_file_path) 158 else: 159 yield str(installations[ 160 line_req_canonical_name]).rstrip() 161 del installations[line_req_canonical_name] 162 req_files[line_req.name].append(req_file_path) 163 164 # Warn about requirements that were included multiple times (in a 165 # single requirements file or in different requirements files). 166 for name, files in six.iteritems(req_files): 167 if len(files) > 1: 168 logger.warning("Requirement %s included multiple times [%s]", 169 name, ', '.join(sorted(set(files)))) 170 171 yield( 172 '## The following requirements were added by ' 173 'pip freeze:' 174 ) 175 for installation in sorted( 176 installations.values(), key=lambda x: x.name.lower()): 177 if installation.canonical_name not in skip: 178 yield str(installation).rstrip() 179 180 181def get_requirement_info(dist): 182 # type: (Distribution) -> RequirementInfo 183 """ 184 Compute and return values (req, editable, comments) for use in 185 FrozenRequirement.from_dist(). 186 """ 187 if not dist_is_editable(dist): 188 return (None, False, []) 189 190 location = os.path.normcase(os.path.abspath(dist.location)) 191 192 from pipenv.patched.notpip._internal.vcs import vcs, RemoteNotFoundError 193 vcs_backend = vcs.get_backend_for_dir(location) 194 195 if vcs_backend is None: 196 req = dist.as_requirement() 197 logger.debug( 198 'No VCS found for editable requirement "%s" in: %r', req, 199 location, 200 ) 201 comments = [ 202 '# Editable install with no version control ({})'.format(req) 203 ] 204 return (location, True, comments) 205 206 try: 207 req = vcs_backend.get_src_requirement(location, dist.project_name) 208 except RemoteNotFoundError: 209 req = dist.as_requirement() 210 comments = [ 211 '# Editable {} install with no remote ({})'.format( 212 type(vcs_backend).__name__, req, 213 ) 214 ] 215 return (location, True, comments) 216 217 except BadCommand: 218 logger.warning( 219 'cannot determine version of editable source in %s ' 220 '(%s command not found in path)', 221 location, 222 vcs_backend.name, 223 ) 224 return (None, True, []) 225 226 except InstallationError as exc: 227 logger.warning( 228 "Error when trying to get requirement for VCS system %s, " 229 "falling back to uneditable format", exc 230 ) 231 else: 232 if req is not None: 233 return (req, True, []) 234 235 logger.warning( 236 'Could not determine repository location of %s', location 237 ) 238 comments = ['## !! Could not determine repository location'] 239 240 return (None, False, comments) 241 242 243class FrozenRequirement(object): 244 def __init__(self, name, req, editable, comments=()): 245 # type: (str, Union[str, Requirement], bool, Iterable[str]) -> None 246 self.name = name 247 self.canonical_name = canonicalize_name(name) 248 self.req = req 249 self.editable = editable 250 self.comments = comments 251 252 @classmethod 253 def from_dist(cls, dist): 254 # type: (Distribution) -> FrozenRequirement 255 req, editable, comments = get_requirement_info(dist) 256 if req is None: 257 req = dist.as_requirement() 258 259 return cls(dist.project_name, req, editable, comments=comments) 260 261 def __str__(self): 262 req = self.req 263 if self.editable: 264 req = '-e %s' % req 265 return '\n'.join(list(self.comments) + [str(req)]) + '\n' 266