1"""Utilities related to handling / interacting with wheel files.""" 2 3import contextlib 4import hashlib 5import io 6import os 7import re 8import sys 9from collections import namedtuple 10from email.parser import FeedParser 11from typing import NewType 12 13from installer._compat import ConfigParser 14from installer._compat.typing import TYPE_CHECKING, Text, cast 15 16Scheme = NewType("Scheme", str) 17 18if TYPE_CHECKING: 19 from email.message import Message 20 from typing import BinaryIO, Iterable, Iterator, Tuple 21 22 from installer.records import RecordEntry 23 from installer.scripts import LauncherKind, ScriptSection 24 25 AllSchemes = Tuple[Scheme, ...] 26 27__all__ = [ 28 "parse_metadata_file", 29 "parse_wheel_filename", 30 "WheelFilename", 31 "SCHEME_NAMES", 32] 33 34# Borrowed from https://github.com/python/cpython/blob/v3.9.1/Lib/shutil.py#L52 35_WINDOWS = os.name == "nt" 36_COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 37 38# According to https://www.python.org/dev/peps/pep-0427/#file-name-convention 39_WHEEL_FILENAME_REGEX = re.compile( 40 r""" 41 ^ 42 (?P<distribution>.+?) 43 -(?P<version>.*?) 44 (?:-(?P<build_tag>\d[^-]*?))? 45 -(?P<tag>.+?-.+?-.+?) 46 \.whl 47 $ 48 """, 49 re.VERBOSE | re.UNICODE, 50) 51WheelFilename = namedtuple( 52 "WheelFilename", ["distribution", "version", "build_tag", "tag"] 53) 54 55# Adapted from https://github.com/python/importlib_metadata/blob/v3.4.0/importlib_metadata/__init__.py#L90 # noqa 56_ENTRYPOINT_REGEX = re.compile( 57 r""" 58 (?P<module>[\w.]+)\s* 59 (:\s*(?P<attrs>[\w.]+))\s* 60 (?P<extras>\[.*\])?\s*$ 61 """, 62 re.VERBOSE | re.UNICODE, 63) 64 65# According to https://www.python.org/dev/peps/pep-0427/#id7 66SCHEME_NAMES = cast("AllSchemes", ("purelib", "platlib", "headers", "scripts", "data")) 67 68 69def parse_metadata_file(contents): 70 # type: (Text) -> Message 71 """Parse :pep:`376` ``PKG-INFO``-style metadata files. 72 73 ``METADATA`` and ``WHEEL`` files (as per :pep:`427`) use the same syntax 74 and can also be parsed using this function. 75 76 :param contents: The entire contents of the file. 77 """ 78 feed_parser = FeedParser() 79 feed_parser.feed(contents) 80 return feed_parser.close() 81 82 83def parse_wheel_filename(filename): 84 # type: (Text) -> WheelFilename 85 """Parse a wheel filename, into it's various components. 86 87 :param filename: The filename to parse. 88 """ 89 wheel_info = _WHEEL_FILENAME_REGEX.match(filename) 90 if not wheel_info: 91 raise ValueError("Not a valid wheel filename: {}".format(filename)) 92 return WheelFilename(*wheel_info.groups()) 93 94 95def copyfileobj_with_hashing( 96 source, # type: BinaryIO 97 dest, # type: BinaryIO 98 hash_algorithm, # type: str 99): 100 # type: (...) -> Tuple[str, int] 101 """Copy a buffer while computing the content's hash and size. 102 103 Copies the source buffer into the destination buffer while computing the 104 hash of the contents. Adapted from :ref:`shutil.copyfileobj`. 105 106 :param source: buffer holding the source data 107 :param dest: destination buffer 108 :param hash_algorithm: hashing algorithm 109 110 :return: size, hash digest of the contents 111 """ 112 hasher = hashlib.new(hash_algorithm) 113 size = 0 114 while True: 115 buf = source.read(_COPY_BUFSIZE) 116 if not buf: 117 break 118 hasher.update(buf) 119 dest.write(buf) 120 size += len(buf) 121 122 return hasher.hexdigest(), size 123 124 125def get_launcher_kind(): # pragma: no cover 126 # type: () -> LauncherKind 127 """Get the launcher kind for the current machine.""" 128 if os.name != "nt": 129 return "posix" 130 131 if "amd64" in sys.version.lower(): 132 return "win-amd64" 133 if "(arm64)" in sys.version.lower(): 134 return "win-arm64" 135 if "(arm)" in sys.version.lower(): 136 return "win-arm" 137 if sys.platform == "win32": 138 return "win-ia32" 139 140 raise NotImplementedError("Unknown launcher kind for this machine") 141 142 143@contextlib.contextmanager 144def fix_shebang(stream, interpreter): 145 # type: (BinaryIO, str) -> Iterator[BinaryIO] 146 """Replace ^#!python shebang in a stream with the correct interpreter. 147 148 The original stream should be closed by the caller. 149 """ 150 stream.seek(0) 151 if stream.read(8) == b"#!python": 152 new_stream = io.BytesIO() 153 # write our new shebang 154 new_stream.write("#!{}\n".format(interpreter).encode()) 155 # copy the rest of the stream 156 stream.seek(0) 157 stream.readline() # skip first line 158 while True: 159 buf = stream.read(_COPY_BUFSIZE) 160 if not buf: 161 break 162 new_stream.write(buf) 163 new_stream.seek(0) 164 yield new_stream 165 new_stream.close() 166 else: 167 stream.seek(0) 168 yield stream 169 170 171def construct_record_file(records): 172 # type: (Iterable[Tuple[Scheme, RecordEntry]]) -> BinaryIO 173 """Construct a RECORD file given some records. 174 175 The original stream should be closed by the caller. 176 """ 177 stream = io.BytesIO() 178 for scheme, record in records: 179 stream.write(str(record).encode("utf-8") + b"\n") 180 stream.seek(0) 181 return stream 182 183 184def parse_entrypoints(text): 185 # type: (Text) -> Iterable[Tuple[Text, Text, Text, ScriptSection]] 186 # Borrowed from https://github.com/python/importlib_metadata/blob/v3.4.0/importlib_metadata/__init__.py#L115 # noqa 187 config = ConfigParser(delimiters="=") 188 config.optionxform = Text # type: ignore 189 config.read_string(text) 190 191 for section in config.sections(): 192 if section not in ["console_scripts", "gui_scripts"]: 193 continue 194 195 for name, value in config.items(section): 196 assert isinstance(name, Text) 197 match = _ENTRYPOINT_REGEX.match(value) 198 assert match 199 200 module = match.group("module") 201 assert isinstance(module, Text) 202 203 attrs = match.group("attrs") 204 # TODO: make this a proper error, which can be caught. 205 assert attrs is not None 206 assert isinstance(attrs, Text) 207 208 script_section = cast("ScriptSection", section[: -len("_scripts")]) 209 210 yield name, module, attrs, script_section 211