1"""Utilities related to handling / interacting with wheel files."""
2
3import contextlib
4import hashlib
5import io
6import os
7import re
8import sys
9from collections import namedtuple
10from email.parser import FeedParser
11from typing import NewType
12
13from installer._compat import ConfigParser
14from installer._compat.typing import TYPE_CHECKING, Text, cast
15
16Scheme = NewType("Scheme", str)
17
18if TYPE_CHECKING:
19    from email.message import Message
20    from typing import BinaryIO, Iterable, Iterator, Tuple
21
22    from installer.records import RecordEntry
23    from installer.scripts import LauncherKind, ScriptSection
24
25    AllSchemes = Tuple[Scheme, ...]
26
27__all__ = [
28    "parse_metadata_file",
29    "parse_wheel_filename",
30    "WheelFilename",
31    "SCHEME_NAMES",
32]
33
34# Borrowed from https://github.com/python/cpython/blob/v3.9.1/Lib/shutil.py#L52
35_WINDOWS = os.name == "nt"
36_COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
37
38# According to https://www.python.org/dev/peps/pep-0427/#file-name-convention
39_WHEEL_FILENAME_REGEX = re.compile(
40    r"""
41    ^
42    (?P<distribution>.+?)
43    -(?P<version>.*?)
44    (?:-(?P<build_tag>\d[^-]*?))?
45    -(?P<tag>.+?-.+?-.+?)
46    \.whl
47    $
48    """,
49    re.VERBOSE | re.UNICODE,
50)
51WheelFilename = namedtuple(
52    "WheelFilename", ["distribution", "version", "build_tag", "tag"]
53)
54
55# Adapted from https://github.com/python/importlib_metadata/blob/v3.4.0/importlib_metadata/__init__.py#L90  # noqa
56_ENTRYPOINT_REGEX = re.compile(
57    r"""
58    (?P<module>[\w.]+)\s*
59    (:\s*(?P<attrs>[\w.]+))\s*
60    (?P<extras>\[.*\])?\s*$
61    """,
62    re.VERBOSE | re.UNICODE,
63)
64
65# According to https://www.python.org/dev/peps/pep-0427/#id7
66SCHEME_NAMES = cast("AllSchemes", ("purelib", "platlib", "headers", "scripts", "data"))
67
68
69def parse_metadata_file(contents):
70    # type: (Text) -> Message
71    """Parse :pep:`376` ``PKG-INFO``-style metadata files.
72
73    ``METADATA`` and ``WHEEL`` files (as per :pep:`427`) use the same syntax
74    and can also be parsed using this function.
75
76    :param contents: The entire contents of the file.
77    """
78    feed_parser = FeedParser()
79    feed_parser.feed(contents)
80    return feed_parser.close()
81
82
83def parse_wheel_filename(filename):
84    # type: (Text) -> WheelFilename
85    """Parse a wheel filename, into it's various components.
86
87    :param filename: The filename to parse.
88    """
89    wheel_info = _WHEEL_FILENAME_REGEX.match(filename)
90    if not wheel_info:
91        raise ValueError("Not a valid wheel filename: {}".format(filename))
92    return WheelFilename(*wheel_info.groups())
93
94
95def copyfileobj_with_hashing(
96    source,  # type: BinaryIO
97    dest,  # type: BinaryIO
98    hash_algorithm,  # type: str
99):
100    # type: (...) -> Tuple[str, int]
101    """Copy a buffer while computing the content's hash and size.
102
103    Copies the source buffer into the destination buffer while computing the
104    hash of the contents. Adapted from :ref:`shutil.copyfileobj`.
105
106    :param source: buffer holding the source data
107    :param dest: destination buffer
108    :param hash_algorithm: hashing algorithm
109
110    :return: size, hash digest of the contents
111    """
112    hasher = hashlib.new(hash_algorithm)
113    size = 0
114    while True:
115        buf = source.read(_COPY_BUFSIZE)
116        if not buf:
117            break
118        hasher.update(buf)
119        dest.write(buf)
120        size += len(buf)
121
122    return hasher.hexdigest(), size
123
124
125def get_launcher_kind():  # pragma: no cover
126    # type: () -> LauncherKind
127    """Get the launcher kind for the current machine."""
128    if os.name != "nt":
129        return "posix"
130
131    if "amd64" in sys.version.lower():
132        return "win-amd64"
133    if "(arm64)" in sys.version.lower():
134        return "win-arm64"
135    if "(arm)" in sys.version.lower():
136        return "win-arm"
137    if sys.platform == "win32":
138        return "win-ia32"
139
140    raise NotImplementedError("Unknown launcher kind for this machine")
141
142
143@contextlib.contextmanager
144def fix_shebang(stream, interpreter):
145    # type: (BinaryIO, str) -> Iterator[BinaryIO]
146    """Replace ^#!python shebang in a stream with the correct interpreter.
147
148    The original stream should be closed by the caller.
149    """
150    stream.seek(0)
151    if stream.read(8) == b"#!python":
152        new_stream = io.BytesIO()
153        # write our new shebang
154        new_stream.write("#!{}\n".format(interpreter).encode())
155        # copy the rest of the stream
156        stream.seek(0)
157        stream.readline()  # skip first line
158        while True:
159            buf = stream.read(_COPY_BUFSIZE)
160            if not buf:
161                break
162            new_stream.write(buf)
163        new_stream.seek(0)
164        yield new_stream
165        new_stream.close()
166    else:
167        stream.seek(0)
168        yield stream
169
170
171def construct_record_file(records):
172    # type: (Iterable[Tuple[Scheme, RecordEntry]]) -> BinaryIO
173    """Construct a RECORD file given some records.
174
175    The original stream should be closed by the caller.
176    """
177    stream = io.BytesIO()
178    for scheme, record in records:
179        stream.write(str(record).encode("utf-8") + b"\n")
180    stream.seek(0)
181    return stream
182
183
184def parse_entrypoints(text):
185    # type: (Text) -> Iterable[Tuple[Text, Text, Text, ScriptSection]]
186    # Borrowed from https://github.com/python/importlib_metadata/blob/v3.4.0/importlib_metadata/__init__.py#L115  # noqa
187    config = ConfigParser(delimiters="=")
188    config.optionxform = Text  # type: ignore
189    config.read_string(text)
190
191    for section in config.sections():
192        if section not in ["console_scripts", "gui_scripts"]:
193            continue
194
195        for name, value in config.items(section):
196            assert isinstance(name, Text)
197            match = _ENTRYPOINT_REGEX.match(value)
198            assert match
199
200            module = match.group("module")
201            assert isinstance(module, Text)
202
203            attrs = match.group("attrs")
204            # TODO: make this a proper error, which can be caught.
205            assert attrs is not None
206            assert isinstance(attrs, Text)
207
208            script_section = cast("ScriptSection", section[: -len("_scripts")])
209
210            yield name, module, attrs, script_section
211