1# -*- coding: utf-8 -*-
2import re
3import subprocess
4
5from collections import namedtuple
6from typing import Any
7from typing import Optional
8
9from poetry.core.utils._compat import PY36
10from poetry.core.utils._compat import WINDOWS
11from poetry.core.utils._compat import Path
12from poetry.core.utils._compat import decode
13
14
15pattern_formats = {
16    "protocol": r"\w+",
17    "user": r"[a-zA-Z0-9_.-]+",
18    "resource": r"[a-zA-Z0-9_.-]+",
19    "port": r"\d+",
20    "path": r"[\w~.\-/\\]+",
21    "name": r"[\w~.\-]+",
22    "rev": r"[^@#]+",
23}
24
25PATTERNS = [
26    re.compile(
27        r"^(git\+)?"
28        r"(?P<protocol>https?|git|ssh|rsync|file)://"
29        r"(?:(?P<user>{user})@)?"
30        r"(?P<resource>{resource})?"
31        r"(:(?P<port>{port}))?"
32        r"(?P<pathname>[:/\\]({path}[/\\])?"
33        r"((?P<name>{name}?)(\.git|[/\\])?)?)"
34        r"([@#](?P<rev>{rev}))?"
35        r"$".format(
36            user=pattern_formats["user"],
37            resource=pattern_formats["resource"],
38            port=pattern_formats["port"],
39            path=pattern_formats["path"],
40            name=pattern_formats["name"],
41            rev=pattern_formats["rev"],
42        )
43    ),
44    re.compile(
45        r"(git\+)?"
46        r"((?P<protocol>{protocol})://)"
47        r"(?:(?P<user>{user})@)?"
48        r"(?P<resource>{resource}:?)"
49        r"(:(?P<port>{port}))?"
50        r"(?P<pathname>({path})"
51        r"(?P<name>{name})(\.git|/)?)"
52        r"([@#](?P<rev>{rev}))?"
53        r"$".format(
54            protocol=pattern_formats["protocol"],
55            user=pattern_formats["user"],
56            resource=pattern_formats["resource"],
57            port=pattern_formats["port"],
58            path=pattern_formats["path"],
59            name=pattern_formats["name"],
60            rev=pattern_formats["rev"],
61        )
62    ),
63    re.compile(
64        r"^(?:(?P<user>{user})@)?"
65        r"(?P<resource>{resource})"
66        r"(:(?P<port>{port}))?"
67        r"(?P<pathname>([:/]{path}/)"
68        r"(?P<name>{name})(\.git|/)?)"
69        r"([@#](?P<rev>{rev}))?"
70        r"$".format(
71            user=pattern_formats["user"],
72            resource=pattern_formats["resource"],
73            port=pattern_formats["port"],
74            path=pattern_formats["path"],
75            name=pattern_formats["name"],
76            rev=pattern_formats["rev"],
77        )
78    ),
79    re.compile(
80        r"((?P<user>{user})@)?"
81        r"(?P<resource>{resource})"
82        r"[:/]{{1,2}}"
83        r"(?P<pathname>({path})"
84        r"(?P<name>{name})(\.git|/)?)"
85        r"([@#](?P<rev>{rev}))?"
86        r"$".format(
87            user=pattern_formats["user"],
88            resource=pattern_formats["resource"],
89            path=pattern_formats["path"],
90            name=pattern_formats["name"],
91            rev=pattern_formats["rev"],
92        )
93    ),
94]
95
96
97class GitError(RuntimeError):
98
99    pass
100
101
102class ParsedUrl:
103    def __init__(
104        self,
105        protocol,  # type: Optional[str]
106        resource,  # type: Optional[str]
107        pathname,  # type: Optional[str]
108        user,  # type: Optional[str]
109        port,  # type: Optional[str]
110        name,  # type: Optional[str]
111        rev,  # type: Optional[str]
112    ):
113        self.protocol = protocol
114        self.resource = resource
115        self.pathname = pathname
116        self.user = user
117        self.port = port
118        self.name = name
119        self.rev = rev
120
121    @classmethod
122    def parse(cls, url):  # type: (str) -> ParsedUrl
123        for pattern in PATTERNS:
124            m = pattern.match(url)
125            if m:
126                groups = m.groupdict()
127                return ParsedUrl(
128                    groups.get("protocol"),
129                    groups.get("resource"),
130                    groups.get("pathname"),
131                    groups.get("user"),
132                    groups.get("port"),
133                    groups.get("name"),
134                    groups.get("rev"),
135                )
136
137        raise ValueError('Invalid git url "{}"'.format(url))
138
139    @property
140    def url(self):  # type: () -> str
141        return "{}{}{}{}{}".format(
142            "{}://".format(self.protocol) if self.protocol else "",
143            "{}@".format(self.user) if self.user else "",
144            self.resource,
145            ":{}".format(self.port) if self.port else "",
146            "/" + self.pathname.lstrip(":/"),
147        )
148
149    def format(self):  # type: () -> str
150        return self.url
151
152    def __str__(self):  # type: () -> str
153        return self.format()
154
155
156GitUrl = namedtuple("GitUrl", ["url", "revision"])
157
158
159_executable = None
160
161
162def executable():
163    global _executable
164
165    if _executable is not None:
166        return _executable
167
168    if WINDOWS and PY36:
169        # Finding git via where.exe
170        where = "%WINDIR%\\System32\\where.exe"
171        paths = decode(
172            subprocess.check_output([where, "git"], shell=True, encoding="oem")
173        ).split("\n")
174        for path in paths:
175            if not path:
176                continue
177
178            path = Path(path.strip())
179            try:
180                path.relative_to(Path.cwd())
181            except ValueError:
182                _executable = str(path)
183
184                break
185    else:
186        _executable = "git"
187
188    if _executable is None:
189        raise RuntimeError("Unable to find a valid git executable")
190
191    return _executable
192
193
194def _reset_executable():
195    global _executable
196
197    _executable = None
198
199
200class GitConfig:
201    def __init__(self, requires_git_presence=False):  # type: (bool) -> None
202        self._config = {}
203
204        try:
205            config_list = decode(
206                subprocess.check_output(
207                    [executable(), "config", "-l"], stderr=subprocess.STDOUT
208                )
209            )
210
211            m = re.findall("(?ms)^([^=]+)=(.*?)$", config_list)
212            if m:
213                for group in m:
214                    self._config[group[0]] = group[1]
215        except (subprocess.CalledProcessError, OSError):
216            if requires_git_presence:
217                raise
218
219    def get(self, key, default=None):  # type: (Any, Optional[Any]) -> Any
220        return self._config.get(key, default)
221
222    def __getitem__(self, item):  # type: (Any) -> Any
223        return self._config[item]
224
225
226class Git:
227    def __init__(self, work_dir=None):  # type: (Optional[Path]) -> None
228        self._config = GitConfig(requires_git_presence=True)
229        self._work_dir = work_dir
230
231    @classmethod
232    def normalize_url(cls, url):  # type: (str) -> GitUrl
233        parsed = ParsedUrl.parse(url)
234
235        formatted = re.sub(r"^git\+", "", url)
236        if parsed.rev:
237            formatted = re.sub(r"[#@]{}$".format(parsed.rev), "", formatted)
238
239        altered = parsed.format() != formatted
240
241        if altered:
242            if re.match(r"^git\+https?", url) and re.match(
243                r"^/?:[^0-9]", parsed.pathname
244            ):
245                normalized = re.sub(r"git\+(.*:[^:]+):(.*)", "\\1/\\2", url)
246            elif re.match(r"^git\+file", url):
247                normalized = re.sub(r"git\+", "", url)
248            else:
249                normalized = re.sub(r"^(?:git\+)?ssh://", "", url)
250        else:
251            normalized = parsed.format()
252
253        return GitUrl(re.sub(r"#[^#]*$", "", normalized), parsed.rev)
254
255    @property
256    def config(self):  # type: () -> GitConfig
257        return self._config
258
259    def clone(self, repository, dest):  # type: (str, Path) -> str
260        self._check_parameter(repository)
261
262        return self.run("clone", "--recurse-submodules", "--", repository, str(dest))
263
264    def checkout(self, rev, folder=None):  # type: (str, Optional[Path]) -> str
265        args = []
266        if folder is None and self._work_dir:
267            folder = self._work_dir
268
269        if folder:
270            args += [
271                "--git-dir",
272                (folder / ".git").as_posix(),
273                "--work-tree",
274                folder.as_posix(),
275            ]
276
277        self._check_parameter(rev)
278
279        args += ["checkout", rev]
280
281        return self.run(*args)
282
283    def rev_parse(self, rev, folder=None):  # type: (str, Optional[Path]) -> str
284        args = []
285        if folder is None and self._work_dir:
286            folder = self._work_dir
287
288        if folder:
289            args += [
290                "--git-dir",
291                (folder / ".git").as_posix(),
292                "--work-tree",
293                folder.as_posix(),
294            ]
295
296        self._check_parameter(rev)
297
298        # We need "^0" (an alternative to "^{commit}") to ensure that the
299        # commit SHA of the commit the tag points to is returned, even in
300        # the case of annotated tags.
301        #
302        # We deliberately avoid the "^{commit}" syntax itself as on some
303        # platforms (cygwin/msys to be specific), the braces are interpreted
304        # as special characters and would require escaping, while on others
305        # they should not be escaped.
306        args += ["rev-parse", rev + "^0"]
307
308        return self.run(*args)
309
310    def get_ignored_files(self, folder=None):  # type: (Optional[Path]) -> list
311        args = []
312        if folder is None and self._work_dir:
313            folder = self._work_dir
314
315        if folder:
316            args += [
317                "--git-dir",
318                (folder / ".git").as_posix(),
319                "--work-tree",
320                folder.as_posix(),
321            ]
322
323        args += ["ls-files", "--others", "-i", "--exclude-standard"]
324        output = self.run(*args)
325
326        return output.strip().split("\n")
327
328    def remote_urls(self, folder=None):  # type: (Optional[Path]) -> dict
329        output = self.run(
330            "config", "--get-regexp", r"remote\..*\.url", folder=folder
331        ).strip()
332
333        urls = {}
334        for url in output.splitlines():
335            name, url = url.split(" ", 1)
336            urls[name.strip()] = url.strip()
337
338        return urls
339
340    def remote_url(self, folder=None):  # type: (Optional[Path]) -> str
341        urls = self.remote_urls(folder=folder)
342
343        return urls.get("remote.origin.url", urls[list(urls.keys())[0]])
344
345    def run(self, *args, **kwargs):  # type: (*Any, **Any) -> str
346        folder = kwargs.pop("folder", None)
347        if folder:
348            args = (
349                "--git-dir",
350                (folder / ".git").as_posix(),
351                "--work-tree",
352                folder.as_posix(),
353            ) + args
354
355        return decode(
356            subprocess.check_output(
357                [executable()] + list(args), stderr=subprocess.STDOUT
358            )
359        ).strip()
360
361    def _check_parameter(self, parameter):  # type: (str) -> None
362        """
363        Checks a git parameter to avoid unwanted code execution.
364        """
365        if parameter.strip().startswith("-"):
366            raise GitError("Invalid Git parameter: {}".format(parameter))
367