1# -*- coding: utf-8 -*- 2import re 3import subprocess 4 5from collections import namedtuple 6from typing import Any 7from typing import Optional 8 9from poetry.core.utils._compat import PY36 10from poetry.core.utils._compat import WINDOWS 11from poetry.core.utils._compat import Path 12from poetry.core.utils._compat import decode 13 14 15pattern_formats = { 16 "protocol": r"\w+", 17 "user": r"[a-zA-Z0-9_.-]+", 18 "resource": r"[a-zA-Z0-9_.-]+", 19 "port": r"\d+", 20 "path": r"[\w~.\-/\\]+", 21 "name": r"[\w~.\-]+", 22 "rev": r"[^@#]+", 23} 24 25PATTERNS = [ 26 re.compile( 27 r"^(git\+)?" 28 r"(?P<protocol>https?|git|ssh|rsync|file)://" 29 r"(?:(?P<user>{user})@)?" 30 r"(?P<resource>{resource})?" 31 r"(:(?P<port>{port}))?" 32 r"(?P<pathname>[:/\\]({path}[/\\])?" 33 r"((?P<name>{name}?)(\.git|[/\\])?)?)" 34 r"([@#](?P<rev>{rev}))?" 35 r"$".format( 36 user=pattern_formats["user"], 37 resource=pattern_formats["resource"], 38 port=pattern_formats["port"], 39 path=pattern_formats["path"], 40 name=pattern_formats["name"], 41 rev=pattern_formats["rev"], 42 ) 43 ), 44 re.compile( 45 r"(git\+)?" 46 r"((?P<protocol>{protocol})://)" 47 r"(?:(?P<user>{user})@)?" 48 r"(?P<resource>{resource}:?)" 49 r"(:(?P<port>{port}))?" 50 r"(?P<pathname>({path})" 51 r"(?P<name>{name})(\.git|/)?)" 52 r"([@#](?P<rev>{rev}))?" 53 r"$".format( 54 protocol=pattern_formats["protocol"], 55 user=pattern_formats["user"], 56 resource=pattern_formats["resource"], 57 port=pattern_formats["port"], 58 path=pattern_formats["path"], 59 name=pattern_formats["name"], 60 rev=pattern_formats["rev"], 61 ) 62 ), 63 re.compile( 64 r"^(?:(?P<user>{user})@)?" 65 r"(?P<resource>{resource})" 66 r"(:(?P<port>{port}))?" 67 r"(?P<pathname>([:/]{path}/)" 68 r"(?P<name>{name})(\.git|/)?)" 69 r"([@#](?P<rev>{rev}))?" 70 r"$".format( 71 user=pattern_formats["user"], 72 resource=pattern_formats["resource"], 73 port=pattern_formats["port"], 74 path=pattern_formats["path"], 75 name=pattern_formats["name"], 76 rev=pattern_formats["rev"], 77 ) 78 ), 79 re.compile( 80 r"((?P<user>{user})@)?" 81 r"(?P<resource>{resource})" 82 r"[:/]{{1,2}}" 83 r"(?P<pathname>({path})" 84 r"(?P<name>{name})(\.git|/)?)" 85 r"([@#](?P<rev>{rev}))?" 86 r"$".format( 87 user=pattern_formats["user"], 88 resource=pattern_formats["resource"], 89 path=pattern_formats["path"], 90 name=pattern_formats["name"], 91 rev=pattern_formats["rev"], 92 ) 93 ), 94] 95 96 97class GitError(RuntimeError): 98 99 pass 100 101 102class ParsedUrl: 103 def __init__( 104 self, 105 protocol, # type: Optional[str] 106 resource, # type: Optional[str] 107 pathname, # type: Optional[str] 108 user, # type: Optional[str] 109 port, # type: Optional[str] 110 name, # type: Optional[str] 111 rev, # type: Optional[str] 112 ): 113 self.protocol = protocol 114 self.resource = resource 115 self.pathname = pathname 116 self.user = user 117 self.port = port 118 self.name = name 119 self.rev = rev 120 121 @classmethod 122 def parse(cls, url): # type: (str) -> ParsedUrl 123 for pattern in PATTERNS: 124 m = pattern.match(url) 125 if m: 126 groups = m.groupdict() 127 return ParsedUrl( 128 groups.get("protocol"), 129 groups.get("resource"), 130 groups.get("pathname"), 131 groups.get("user"), 132 groups.get("port"), 133 groups.get("name"), 134 groups.get("rev"), 135 ) 136 137 raise ValueError('Invalid git url "{}"'.format(url)) 138 139 @property 140 def url(self): # type: () -> str 141 return "{}{}{}{}{}".format( 142 "{}://".format(self.protocol) if self.protocol else "", 143 "{}@".format(self.user) if self.user else "", 144 self.resource, 145 ":{}".format(self.port) if self.port else "", 146 "/" + self.pathname.lstrip(":/"), 147 ) 148 149 def format(self): # type: () -> str 150 return self.url 151 152 def __str__(self): # type: () -> str 153 return self.format() 154 155 156GitUrl = namedtuple("GitUrl", ["url", "revision"]) 157 158 159_executable = None 160 161 162def executable(): 163 global _executable 164 165 if _executable is not None: 166 return _executable 167 168 if WINDOWS and PY36: 169 # Finding git via where.exe 170 where = "%WINDIR%\\System32\\where.exe" 171 paths = decode( 172 subprocess.check_output([where, "git"], shell=True, encoding="oem") 173 ).split("\n") 174 for path in paths: 175 if not path: 176 continue 177 178 path = Path(path.strip()) 179 try: 180 path.relative_to(Path.cwd()) 181 except ValueError: 182 _executable = str(path) 183 184 break 185 else: 186 _executable = "git" 187 188 if _executable is None: 189 raise RuntimeError("Unable to find a valid git executable") 190 191 return _executable 192 193 194def _reset_executable(): 195 global _executable 196 197 _executable = None 198 199 200class GitConfig: 201 def __init__(self, requires_git_presence=False): # type: (bool) -> None 202 self._config = {} 203 204 try: 205 config_list = decode( 206 subprocess.check_output( 207 [executable(), "config", "-l"], stderr=subprocess.STDOUT 208 ) 209 ) 210 211 m = re.findall("(?ms)^([^=]+)=(.*?)$", config_list) 212 if m: 213 for group in m: 214 self._config[group[0]] = group[1] 215 except (subprocess.CalledProcessError, OSError): 216 if requires_git_presence: 217 raise 218 219 def get(self, key, default=None): # type: (Any, Optional[Any]) -> Any 220 return self._config.get(key, default) 221 222 def __getitem__(self, item): # type: (Any) -> Any 223 return self._config[item] 224 225 226class Git: 227 def __init__(self, work_dir=None): # type: (Optional[Path]) -> None 228 self._config = GitConfig(requires_git_presence=True) 229 self._work_dir = work_dir 230 231 @classmethod 232 def normalize_url(cls, url): # type: (str) -> GitUrl 233 parsed = ParsedUrl.parse(url) 234 235 formatted = re.sub(r"^git\+", "", url) 236 if parsed.rev: 237 formatted = re.sub(r"[#@]{}$".format(parsed.rev), "", formatted) 238 239 altered = parsed.format() != formatted 240 241 if altered: 242 if re.match(r"^git\+https?", url) and re.match( 243 r"^/?:[^0-9]", parsed.pathname 244 ): 245 normalized = re.sub(r"git\+(.*:[^:]+):(.*)", "\\1/\\2", url) 246 elif re.match(r"^git\+file", url): 247 normalized = re.sub(r"git\+", "", url) 248 else: 249 normalized = re.sub(r"^(?:git\+)?ssh://", "", url) 250 else: 251 normalized = parsed.format() 252 253 return GitUrl(re.sub(r"#[^#]*$", "", normalized), parsed.rev) 254 255 @property 256 def config(self): # type: () -> GitConfig 257 return self._config 258 259 def clone(self, repository, dest): # type: (str, Path) -> str 260 self._check_parameter(repository) 261 262 return self.run("clone", "--recurse-submodules", "--", repository, str(dest)) 263 264 def checkout(self, rev, folder=None): # type: (str, Optional[Path]) -> str 265 args = [] 266 if folder is None and self._work_dir: 267 folder = self._work_dir 268 269 if folder: 270 args += [ 271 "--git-dir", 272 (folder / ".git").as_posix(), 273 "--work-tree", 274 folder.as_posix(), 275 ] 276 277 self._check_parameter(rev) 278 279 args += ["checkout", rev] 280 281 return self.run(*args) 282 283 def rev_parse(self, rev, folder=None): # type: (str, Optional[Path]) -> str 284 args = [] 285 if folder is None and self._work_dir: 286 folder = self._work_dir 287 288 if folder: 289 args += [ 290 "--git-dir", 291 (folder / ".git").as_posix(), 292 "--work-tree", 293 folder.as_posix(), 294 ] 295 296 self._check_parameter(rev) 297 298 # We need "^0" (an alternative to "^{commit}") to ensure that the 299 # commit SHA of the commit the tag points to is returned, even in 300 # the case of annotated tags. 301 # 302 # We deliberately avoid the "^{commit}" syntax itself as on some 303 # platforms (cygwin/msys to be specific), the braces are interpreted 304 # as special characters and would require escaping, while on others 305 # they should not be escaped. 306 args += ["rev-parse", rev + "^0"] 307 308 return self.run(*args) 309 310 def get_ignored_files(self, folder=None): # type: (Optional[Path]) -> list 311 args = [] 312 if folder is None and self._work_dir: 313 folder = self._work_dir 314 315 if folder: 316 args += [ 317 "--git-dir", 318 (folder / ".git").as_posix(), 319 "--work-tree", 320 folder.as_posix(), 321 ] 322 323 args += ["ls-files", "--others", "-i", "--exclude-standard"] 324 output = self.run(*args) 325 326 return output.strip().split("\n") 327 328 def remote_urls(self, folder=None): # type: (Optional[Path]) -> dict 329 output = self.run( 330 "config", "--get-regexp", r"remote\..*\.url", folder=folder 331 ).strip() 332 333 urls = {} 334 for url in output.splitlines(): 335 name, url = url.split(" ", 1) 336 urls[name.strip()] = url.strip() 337 338 return urls 339 340 def remote_url(self, folder=None): # type: (Optional[Path]) -> str 341 urls = self.remote_urls(folder=folder) 342 343 return urls.get("remote.origin.url", urls[list(urls.keys())[0]]) 344 345 def run(self, *args, **kwargs): # type: (*Any, **Any) -> str 346 folder = kwargs.pop("folder", None) 347 if folder: 348 args = ( 349 "--git-dir", 350 (folder / ".git").as_posix(), 351 "--work-tree", 352 folder.as_posix(), 353 ) + args 354 355 return decode( 356 subprocess.check_output( 357 [executable()] + list(args), stderr=subprocess.STDOUT 358 ) 359 ).strip() 360 361 def _check_parameter(self, parameter): # type: (str) -> None 362 """ 363 Checks a git parameter to avoid unwanted code execution. 364 """ 365 if parameter.strip().startswith("-"): 366 raise GitError("Invalid Git parameter: {}".format(parameter)) 367