1"""Helper for building wheels as would be in test cases. 2""" 3import itertools 4from base64 import urlsafe_b64encode 5from collections import namedtuple 6from copy import deepcopy 7from email.message import Message 8from enum import Enum 9from functools import partial 10from hashlib import sha256 11from io import BytesIO, StringIO 12from zipfile import ZipFile 13 14import csv23 15from pip._vendor.requests.structures import CaseInsensitiveDict 16from pip._vendor.six import ensure_binary, ensure_text, iteritems 17 18from pip._internal.utils.typing import MYPY_CHECK_RUNNING 19from tests.lib.path import Path 20 21if MYPY_CHECK_RUNNING: 22 from typing import ( 23 AnyStr, 24 Callable, 25 Dict, 26 Iterable, 27 List, 28 Optional, 29 Sequence, 30 Tuple, 31 TypeVar, 32 Union, 33 ) 34 35 # path, digest, size 36 RecordLike = Tuple[str, str, str] 37 RecordCallback = Callable[ 38 [List["Record"]], Union[str, bytes, List[RecordLike]] 39 ] 40 # As would be used in metadata 41 HeaderValue = Union[str, List[str]] 42 43 44File = namedtuple("File", ["name", "contents"]) 45Record = namedtuple("Record", ["path", "digest", "size"]) 46 47 48class Default(Enum): 49 token = 0 50 51 52_default = Default.token 53 54 55if MYPY_CHECK_RUNNING: 56 T = TypeVar("T") 57 58 class Defaulted(Union[Default, T]): 59 """A type which may be defaulted. 60 """ 61 pass 62 63 64def message_from_dict(headers): 65 # type: (Dict[str, HeaderValue]) -> Message 66 """Plain key-value pairs are set in the returned message. 67 68 List values are converted into repeated headers in the result. 69 """ 70 message = Message() 71 for name, value in iteritems(headers): 72 if isinstance(value, list): 73 for v in value: 74 message[name] = v 75 else: 76 message[name] = value 77 return message 78 79 80def dist_info_path(name, version, path): 81 # type: (str, str, str) -> str 82 return "{}-{}.dist-info/{}".format(name, version, path) 83 84 85def make_metadata_file( 86 name, # type: str 87 version, # type: str 88 value, # type: Defaulted[Optional[AnyStr]] 89 updates, # type: Defaulted[Dict[str, HeaderValue]] 90 body, # type: Defaulted[AnyStr] 91): 92 # type: () -> File 93 if value is None: 94 return None 95 96 path = dist_info_path(name, version, "METADATA") 97 98 if value is not _default: 99 return File(path, ensure_binary(value)) 100 101 metadata = CaseInsensitiveDict({ 102 "Metadata-Version": "2.1", 103 "Name": name, 104 "Version": version, 105 }) 106 if updates is not _default: 107 metadata.update(updates) 108 109 message = message_from_dict(metadata) 110 if body is not _default: 111 message.set_payload(body) 112 113 return File(path, ensure_binary(message_from_dict(metadata).as_string())) 114 115 116def make_wheel_metadata_file( 117 name, # type: str 118 version, # type: str 119 value, # type: Defaulted[Optional[AnyStr]] 120 tags, # type: Sequence[Tuple[str, str, str]] 121 updates, # type: Defaulted[Dict[str, HeaderValue]] 122): 123 # type: (...) -> Optional[File] 124 if value is None: 125 return None 126 127 path = dist_info_path(name, version, "WHEEL") 128 129 if value is not _default: 130 return File(path, ensure_binary(value)) 131 132 metadata = CaseInsensitiveDict({ 133 "Wheel-Version": "1.0", 134 "Generator": "pip-test-suite", 135 "Root-Is-Purelib": "true", 136 "Tag": ["-".join(parts) for parts in tags], 137 }) 138 139 if updates is not _default: 140 metadata.update(updates) 141 142 return File(path, ensure_binary(message_from_dict(metadata).as_string())) 143 144 145def make_entry_points_file( 146 name, # type: str 147 version, # type: str 148 entry_points, # type: Defaulted[Dict[str, List[str]]] 149 console_scripts, # type: Defaulted[List[str]] 150): 151 # type: (...) -> Optional[File] 152 if entry_points is _default and console_scripts is _default: 153 return None 154 155 if entry_points is _default: 156 entry_points_data = {} 157 else: 158 entry_points_data = deepcopy(entry_points) 159 160 if console_scripts is not _default: 161 entry_points_data["console_scripts"] = console_scripts 162 163 lines = [] 164 for section, values in iteritems(entry_points_data): 165 lines.append("[{}]".format(section)) 166 lines.extend(values) 167 168 return File( 169 dist_info_path(name, version, "entry_points.txt"), 170 ensure_binary("\n".join(lines)), 171 ) 172 173 174def make_files(files): 175 # type: (Dict[str, AnyStr]) -> List[File] 176 return [ 177 File(name, ensure_binary(contents)) 178 for name, contents in iteritems(files) 179 ] 180 181 182def make_metadata_files(name, version, files): 183 # type: (str, str, Dict[str, AnyStr]) -> List[File] 184 get_path = partial(dist_info_path, name, version) 185 return [ 186 File(get_path(name), ensure_binary(contents)) 187 for name, contents in iteritems(files) 188 ] 189 190 191def make_data_files(name, version, files): 192 # type: (str, str, Dict[str, AnyStr]) -> List[File] 193 data_dir = "{}-{}.data".format(name, version) 194 return [ 195 File("{}/{}".format(data_dir, name), ensure_binary(contents)) 196 for name, contents in iteritems(files) 197 ] 198 199 200def urlsafe_b64encode_nopad(data): 201 # type: (bytes) -> str 202 return urlsafe_b64encode(data).rstrip(b"=").decode("ascii") 203 204 205def digest(contents): 206 # type: (bytes) -> str 207 return "sha256={}".format( 208 urlsafe_b64encode_nopad(sha256(contents).digest()) 209 ) 210 211 212def record_file_maker_wrapper( 213 name, # type: str 214 version, # type: str 215 files, # type: List[File] 216 record, # type: Defaulted[Optional[AnyStr]] 217 record_callback, # type: Defaulted[RecordCallback] 218): 219 # type: (...) -> Iterable[File] 220 records = [] # type: List[Record] 221 for file in files: 222 records.append( 223 Record( 224 file.name, digest(file.contents), str(len(file.contents)) 225 ) 226 ) 227 yield file 228 229 if record is None: 230 return 231 232 record_path = dist_info_path(name, version, "RECORD") 233 234 if record is not _default: 235 yield File(record_path, ensure_binary(record)) 236 return 237 238 records.append(Record(record_path, "", "")) 239 240 if record_callback is not _default: 241 records = record_callback(records) 242 243 with StringIO(newline=u"") as buf: 244 writer = csv23.writer(buf) 245 for record in records: 246 writer.writerow(map(ensure_text, record)) 247 contents = buf.getvalue().encode("utf-8") 248 249 yield File(record_path, contents) 250 251 252def wheel_name(name, version, pythons, abis, platforms): 253 # type: (str, str, str, str, str) -> str 254 stem = "-".join([ 255 name, 256 version, 257 ".".join(pythons), 258 ".".join(abis), 259 ".".join(platforms), 260 ]) 261 return "{}.whl".format(stem) 262 263 264class WheelBuilder(object): 265 """A wheel that can be saved or converted to several formats. 266 """ 267 268 def __init__(self, name, files): 269 # type: (str, List[File]) -> None 270 self._name = name 271 self._files = files 272 273 def save_to_dir(self, path): 274 # type: (Union[Path, str]) -> str 275 """Generate wheel file with correct name and save into the provided 276 directory. 277 278 :returns the wheel file path 279 """ 280 path = Path(path) / self._name 281 path.write_bytes(self.as_bytes()) 282 return str(path) 283 284 def save_to(self, path): 285 # type: (Union[Path, str]) -> str 286 """Generate wheel file, saving to the provided path. Any parent 287 directories must already exist. 288 289 :returns the wheel file path 290 """ 291 path = Path(path) 292 path.write_bytes(self.as_bytes()) 293 return str(path) 294 295 def as_bytes(self): 296 # type: () -> bytes 297 with BytesIO() as buf: 298 with ZipFile(buf, "w") as z: 299 for file in self._files: 300 z.writestr(file.name, file.contents) 301 return buf.getvalue() 302 303 def as_zipfile(self): 304 # type: () -> ZipFile 305 return ZipFile(BytesIO(self.as_bytes())) 306 307 308def make_wheel( 309 name, # type: str 310 version, # type: str 311 wheel_metadata=_default, # type: Defaulted[Optional[AnyStr]] 312 wheel_metadata_updates=_default, # type: Defaulted[Dict[str, HeaderValue]] 313 metadata=_default, # type: Defaulted[Optional[AnyStr]] 314 metadata_body=_default, # type: Defaulted[AnyStr] 315 metadata_updates=_default, # type: Defaulted[Dict[str, HeaderValue]] 316 extra_files=_default, # type: Defaulted[Dict[str, AnyStr]] 317 extra_metadata_files=_default, # type: Defaulted[Dict[str, AnyStr]] 318 extra_data_files=_default, # type: Defaulted[Dict[str, AnyStr]] 319 console_scripts=_default, # type: Defaulted[List[str]] 320 entry_points=_default, # type: Defaulted[Dict[str, List[str]]] 321 record=_default, # type: Defaulted[Optional[AnyStr]] 322 record_callback=_default, # type: Defaulted[RecordCallback] 323): 324 # type: (...) -> WheelBuilder 325 """ 326 Helper function for generating test wheels which are compliant by default. 327 328 Examples: 329 330 ``` 331 # Basic wheel, which will have valid metadata, RECORD, etc 332 make_wheel(name="foo", version="0.1.0") 333 # Wheel with custom metadata 334 make_wheel( 335 name="foo", 336 version="0.1.0", 337 metadata_updates={ 338 # Overrides default 339 "Name": "hello", 340 # Expands into separate Requires-Dist entries 341 "Requires-Dist": ["a == 1.0", "b == 2.0; sys_platform == 'win32'"], 342 }, 343 ) 344 ``` 345 346 After specifying the wheel, it can be consumed in several ways: 347 348 ``` 349 # Normal case, valid wheel we want pip to pick up. 350 make_wheel(...).save_to_dir(tmpdir) 351 # For a test case, to check that pip validates contents against wheel name. 352 make_wheel(name="simple", ...).save_to(tmpdir / "notsimple-...") 353 # In-memory, for unit tests. 354 z = make_wheel(...).as_zipfile() 355 ``` 356 357 Below, any unicode value provided for AnyStr will be encoded as utf-8. 358 359 :param name: name of the distribution, propagated to the .dist-info 360 directory, METADATA, and wheel file name 361 :param version: version of the distribution, propagated to the .dist-info 362 directory, METADATA, and wheel file name 363 :param wheel_metadata: if provided and None, then no WHEEL metadata file 364 is generated; else if a string then sets the content of the WHEEL file 365 :param wheel_metadata_updates: override the default WHEEL metadata fields, 366 ignored if wheel_metadata is provided 367 :param metadata: if provided and None, then no METADATA file is generated; 368 else if a string then sets the content of the METADATA file 369 :param metadata_body: sets the value of the body text in METADATA, ignored 370 if metadata is provided 371 :param metadata_updates: override the default METADATA fields, 372 ignored if metadata is provided 373 :param extra_files: map from path to file contents for additional files to 374 be put in the wheel 375 :param extra_metadata_files: map from path (relative to .dist-info) to file 376 contents for additional files to be put in the wheel 377 :param extra_data_files: map from path (relative to .data) to file contents 378 for additional files to be put in the wheel 379 :param console_scripts: list of console scripts text to be put into 380 entry_points.txt - overrides any value set in entry_points 381 :param entry_points: 382 :param record: if provided and None, then no RECORD file is generated; 383 else if a string then sets the content of the RECORD file 384 :param record_callback: callback function that receives and can edit the 385 records before they are written to RECORD, ignored if record is 386 provided 387 """ 388 pythons = ["py2", "py3"] 389 abis = ["none"] 390 platforms = ["any"] 391 tags = list(itertools.product(pythons, abis, platforms)) 392 393 possible_files = [ 394 make_metadata_file( 395 name, version, metadata, metadata_updates, metadata_body 396 ), 397 make_wheel_metadata_file( 398 name, version, wheel_metadata, tags, wheel_metadata_updates 399 ), 400 make_entry_points_file(name, version, entry_points, console_scripts), 401 ] 402 403 if extra_files is not _default: 404 possible_files.extend(make_files(extra_files)) 405 406 if extra_metadata_files is not _default: 407 possible_files.extend( 408 make_metadata_files(name, version, extra_metadata_files) 409 ) 410 411 if extra_data_files is not _default: 412 possible_files.extend(make_data_files(name, version, extra_data_files)) 413 414 actual_files = filter(None, possible_files) 415 416 files_and_record_file = record_file_maker_wrapper( 417 name, version, actual_files, record, record_callback 418 ) 419 wheel_file_name = wheel_name(name, version, pythons, abis, platforms) 420 421 return WheelBuilder(wheel_file_name, files_and_record_file) 422