1# -*- coding: utf-8 -*- 2from __future__ import absolute_import, print_function 3 4import copy 5import itertools 6import os 7 8import attr 9import plette.lockfiles 10import six 11from vistir.compat import FileNotFoundError, JSONDecodeError, Path 12 13from ..exceptions import LockfileCorruptException, MissingParameter, PipfileNotFound 14from ..utils import is_editable, is_vcs, merge_items 15from .project import ProjectFile 16from .requirements import Requirement 17from .utils import optional_instance_of 18 19DEFAULT_NEWLINES = six.text_type("\n") 20 21 22def preferred_newlines(f): 23 if isinstance(f.newlines, six.text_type): 24 return f.newlines 25 return DEFAULT_NEWLINES 26 27 28is_lockfile = optional_instance_of(plette.lockfiles.Lockfile) 29is_projectfile = optional_instance_of(ProjectFile) 30 31 32@attr.s(slots=True) 33class Lockfile(object): 34 path = attr.ib(validator=optional_instance_of(Path), type=Path) 35 _requirements = attr.ib(default=attr.Factory(list), type=list) 36 _dev_requirements = attr.ib(default=attr.Factory(list), type=list) 37 projectfile = attr.ib(validator=is_projectfile, type=ProjectFile) 38 _lockfile = attr.ib(validator=is_lockfile, type=plette.lockfiles.Lockfile) 39 newlines = attr.ib(default=DEFAULT_NEWLINES, type=six.text_type) 40 41 @path.default 42 def _get_path(self): 43 return Path(os.curdir).joinpath("Pipfile.lock").absolute() 44 45 @projectfile.default 46 def _get_projectfile(self): 47 return self.load_projectfile(self.path) 48 49 @_lockfile.default 50 def _get_lockfile(self): 51 return self.projectfile.model 52 53 @property 54 def lockfile(self): 55 return self._lockfile 56 57 @property 58 def section_keys(self): 59 return ["default", "develop"] 60 61 @property 62 def extended_keys(self): 63 return [k for k in itertools.product(self.section_keys, ["", "vcs", "editable"])] 64 65 def get(self, k): 66 return self.__getitem__(k) 67 68 def __contains__(self, k): 69 check_lockfile = k in self.extended_keys or self.lockfile.__contains__(k) 70 if check_lockfile: 71 return True 72 return super(Lockfile, self).__contains__(k) 73 74 def __setitem__(self, k, v): 75 lockfile = self._lockfile 76 lockfile.__setitem__(k, v) 77 78 def __getitem__(self, k, *args, **kwargs): 79 retval = None 80 lockfile = self._lockfile 81 section = None 82 pkg_type = None 83 try: 84 retval = lockfile[k] 85 except KeyError: 86 if "-" in k: 87 section, _, pkg_type = k.rpartition("-") 88 vals = getattr(lockfile.get(section, {}), "_data", {}) 89 if pkg_type == "vcs": 90 retval = {k: v for k, v in vals.items() if is_vcs(v)} 91 elif pkg_type == "editable": 92 retval = {k: v for k, v in vals.items() if is_editable(v)} 93 if retval is None: 94 raise 95 else: 96 retval = getattr(retval, "_data", retval) 97 return retval 98 99 def __getattr__(self, k, *args, **kwargs): 100 retval = None 101 lockfile = super(Lockfile, self).__getattribute__("_lockfile") 102 try: 103 return super(Lockfile, self).__getattribute__(k) 104 except AttributeError: 105 retval = getattr(lockfile, k, None) 106 if retval is not None: 107 return retval 108 return super(Lockfile, self).__getattribute__(k, *args, **kwargs) 109 110 def get_deps(self, dev=False, only=True): 111 deps = {} 112 if dev: 113 deps.update(self.develop._data) 114 if only: 115 return deps 116 deps = merge_items([deps, self.default._data]) 117 return deps 118 119 @classmethod 120 def read_projectfile(cls, path): 121 """Read the specified project file and provide an interface for writing/updating. 122 123 :param str path: Path to the target file. 124 :return: A project file with the model and location for interaction 125 :rtype: :class:`~requirementslib.models.project.ProjectFile` 126 """ 127 128 pf = ProjectFile.read(path, plette.lockfiles.Lockfile, invalid_ok=True) 129 return pf 130 131 @classmethod 132 def lockfile_from_pipfile(cls, pipfile_path): 133 from .pipfile import Pipfile 134 135 if os.path.isfile(pipfile_path): 136 if not os.path.isabs(pipfile_path): 137 pipfile_path = os.path.abspath(pipfile_path) 138 pipfile = Pipfile.load(os.path.dirname(pipfile_path)) 139 return plette.lockfiles.Lockfile.with_meta_from(pipfile._pipfile) 140 raise PipfileNotFound(pipfile_path) 141 142 @classmethod 143 def load_projectfile(cls, path, create=True, data=None): 144 """Given a path, load or create the necessary lockfile. 145 146 :param str path: Path to the project root or lockfile 147 :param bool create: Whether to create the lockfile if not found, defaults to True 148 :raises OSError: Thrown if the project root directory doesn't exist 149 :raises FileNotFoundError: Thrown if the lockfile doesn't exist and ``create=False`` 150 :return: A project file instance for the supplied project 151 :rtype: :class:`~requirementslib.models.project.ProjectFile` 152 """ 153 154 if not path: 155 path = os.curdir 156 path = Path(path).absolute() 157 project_path = path if path.is_dir() else path.parent 158 lockfile_path = path if path.is_file() else project_path / "Pipfile.lock" 159 if not project_path.exists(): 160 raise OSError("Project does not exist: %s" % project_path.as_posix()) 161 elif not lockfile_path.exists() and not create: 162 raise FileNotFoundError( 163 "Lockfile does not exist: %s" % lockfile_path.as_posix() 164 ) 165 projectfile = cls.read_projectfile(lockfile_path.as_posix()) 166 if not lockfile_path.exists(): 167 if not data: 168 path_str = lockfile_path.as_posix() 169 if path_str[-5:] == ".lock": 170 pipfile = Path(path_str[:-5]) 171 else: 172 pipfile = project_path.joinpath("Pipfile") 173 lf = cls.lockfile_from_pipfile(pipfile) 174 else: 175 lf = plette.lockfiles.Lockfile(data) 176 projectfile.model = lf 177 return projectfile 178 179 @classmethod 180 def from_data(cls, path, data, meta_from_project=True): 181 """Create a new lockfile instance from a dictionary. 182 183 :param str path: Path to the project root. 184 :param dict data: Data to load into the lockfile. 185 :param bool meta_from_project: Attempt to populate the meta section from the 186 project root, default True. 187 """ 188 189 if path is None: 190 raise MissingParameter("path") 191 if data is None: 192 raise MissingParameter("data") 193 if not isinstance(data, dict): 194 raise TypeError("Expecting a dictionary for parameter 'data'") 195 path = os.path.abspath(str(path)) 196 if os.path.isdir(path): 197 project_path = path 198 elif not os.path.isdir(path) and os.path.isdir(os.path.dirname(path)): 199 project_path = os.path.dirname(path) 200 pipfile_path = os.path.join(project_path, "Pipfile") 201 lockfile_path = os.path.join(project_path, "Pipfile.lock") 202 if meta_from_project: 203 lockfile = cls.lockfile_from_pipfile(pipfile_path) 204 lockfile.update(data) 205 else: 206 lockfile = plette.lockfiles.Lockfile(data) 207 projectfile = ProjectFile( 208 line_ending=DEFAULT_NEWLINES, location=lockfile_path, model=lockfile 209 ) 210 return cls( 211 projectfile=projectfile, 212 lockfile=lockfile, 213 newlines=projectfile.line_ending, 214 path=Path(projectfile.location), 215 ) 216 217 @classmethod 218 def load(cls, path, create=True): 219 """Create a new lockfile instance. 220 221 :param project_path: Path to project root or lockfile 222 :type project_path: str or :class:`pathlib.Path` 223 :param str lockfile_name: Name of the lockfile in the project root directory 224 :param pipfile_path: Path to the project pipfile 225 :type pipfile_path: :class:`pathlib.Path` 226 :returns: A new lockfile representing the supplied project paths 227 :rtype: :class:`~requirementslib.models.lockfile.Lockfile` 228 """ 229 230 try: 231 projectfile = cls.load_projectfile(path, create=create) 232 except JSONDecodeError: 233 path = os.path.abspath(path) 234 path = Path( 235 os.path.join(path, "Pipfile.lock") if os.path.isdir(path) else path 236 ) 237 formatted_path = path.as_posix() 238 backup_path = "%s.bak" % formatted_path 239 LockfileCorruptException.show(formatted_path, backup_path=backup_path) 240 path.rename(backup_path) 241 cls.load(formatted_path, create=True) 242 lockfile_path = Path(projectfile.location) 243 creation_args = { 244 "projectfile": projectfile, 245 "lockfile": projectfile.model, 246 "newlines": projectfile.line_ending, 247 "path": lockfile_path, 248 } 249 return cls(**creation_args) 250 251 @classmethod 252 def create(cls, path, create=True): 253 return cls.load(path, create=create) 254 255 @property 256 def develop(self): 257 return self._lockfile.develop 258 259 @property 260 def default(self): 261 return self._lockfile.default 262 263 def get_requirements(self, dev=True, only=False): 264 """Produces a generator which generates requirements from the desired section. 265 266 :param bool dev: Indicates whether to use dev requirements, defaults to False 267 :return: Requirements from the relevant the relevant pipfile 268 :rtype: :class:`~requirementslib.models.requirements.Requirement` 269 """ 270 271 deps = self.get_deps(dev=dev, only=only) 272 for k, v in deps.items(): 273 yield Requirement.from_pipfile(k, v) 274 275 @property 276 def dev_requirements(self): 277 if not self._dev_requirements: 278 self._dev_requirements = list(self.get_requirements(dev=True, only=True)) 279 return self._dev_requirements 280 281 @property 282 def requirements(self): 283 if not self._requirements: 284 self._requirements = list(self.get_requirements(dev=False, only=True)) 285 return self._requirements 286 287 @property 288 def dev_requirements_list(self): 289 return [{name: entry._data} for name, entry in self._lockfile.develop.items()] 290 291 @property 292 def requirements_list(self): 293 return [{name: entry._data} for name, entry in self._lockfile.default.items()] 294 295 def write(self): 296 self.projectfile.model = copy.deepcopy(self._lockfile) 297 self.projectfile.write() 298 299 def as_requirements(self, include_hashes=False, dev=False): 300 """Returns a list of requirements in pip-style format""" 301 lines = [] 302 section = self.dev_requirements if dev else self.requirements 303 for req in section: 304 kwargs = {"include_hashes": include_hashes} 305 if req.editable: 306 kwargs["include_markers"] = False 307 r = req.as_line(**kwargs) 308 lines.append(r.strip()) 309 return lines 310