1"""Helper for building wheels as would be in test cases.
2"""
3import itertools
4from base64 import urlsafe_b64encode
5from collections import namedtuple
6from copy import deepcopy
7from email.message import Message
8from enum import Enum
9from functools import partial
10from hashlib import sha256
11from io import BytesIO, StringIO
12from zipfile import ZipFile
13
14import csv23
15from pip._vendor.requests.structures import CaseInsensitiveDict
16from pip._vendor.six import ensure_binary, ensure_text, iteritems
17
18from pip._internal.utils.typing import MYPY_CHECK_RUNNING
19from tests.lib.path import Path
20
21if MYPY_CHECK_RUNNING:
22    from typing import (
23        AnyStr,
24        Callable,
25        Dict,
26        Iterable,
27        List,
28        Optional,
29        Sequence,
30        Tuple,
31        TypeVar,
32        Union,
33    )
34
35    # path, digest, size
36    RecordLike = Tuple[str, str, str]
37    RecordCallback = Callable[
38        [List["Record"]], Union[str, bytes, List[RecordLike]]
39    ]
40    # As would be used in metadata
41    HeaderValue = Union[str, List[str]]
42
43
44File = namedtuple("File", ["name", "contents"])
45Record = namedtuple("Record", ["path", "digest", "size"])
46
47
48class Default(Enum):
49    token = 0
50
51
52_default = Default.token
53
54
55if MYPY_CHECK_RUNNING:
56    T = TypeVar("T")
57
58    class Defaulted(Union[Default, T]):
59        """A type which may be defaulted.
60        """
61        pass
62
63
64def message_from_dict(headers):
65    # type: (Dict[str, HeaderValue]) -> Message
66    """Plain key-value pairs are set in the returned message.
67
68    List values are converted into repeated headers in the result.
69    """
70    message = Message()
71    for name, value in iteritems(headers):
72        if isinstance(value, list):
73            for v in value:
74                message[name] = v
75        else:
76            message[name] = value
77    return message
78
79
80def dist_info_path(name, version, path):
81    # type: (str, str, str) -> str
82    return "{}-{}.dist-info/{}".format(name, version, path)
83
84
85def make_metadata_file(
86    name,  # type: str
87    version,  # type: str
88    value,  # type: Defaulted[Optional[AnyStr]]
89    updates,  # type: Defaulted[Dict[str, HeaderValue]]
90    body,  # type: Defaulted[AnyStr]
91):
92    # type: () -> File
93    if value is None:
94        return None
95
96    path = dist_info_path(name, version, "METADATA")
97
98    if value is not _default:
99        return File(path, ensure_binary(value))
100
101    metadata = CaseInsensitiveDict({
102        "Metadata-Version": "2.1",
103        "Name": name,
104        "Version": version,
105    })
106    if updates is not _default:
107        metadata.update(updates)
108
109    message = message_from_dict(metadata)
110    if body is not _default:
111        message.set_payload(body)
112
113    return File(path, ensure_binary(message_from_dict(metadata).as_string()))
114
115
116def make_wheel_metadata_file(
117    name,  # type: str
118    version,  # type: str
119    value,  # type: Defaulted[Optional[AnyStr]]
120    tags,  # type: Sequence[Tuple[str, str, str]]
121    updates,  # type: Defaulted[Dict[str, HeaderValue]]
122):
123    # type: (...) -> Optional[File]
124    if value is None:
125        return None
126
127    path = dist_info_path(name, version, "WHEEL")
128
129    if value is not _default:
130        return File(path, ensure_binary(value))
131
132    metadata = CaseInsensitiveDict({
133        "Wheel-Version": "1.0",
134        "Generator": "pip-test-suite",
135        "Root-Is-Purelib": "true",
136        "Tag": ["-".join(parts) for parts in tags],
137    })
138
139    if updates is not _default:
140        metadata.update(updates)
141
142    return File(path, ensure_binary(message_from_dict(metadata).as_string()))
143
144
145def make_entry_points_file(
146    name,  # type: str
147    version,  # type: str
148    entry_points,  # type: Defaulted[Dict[str, List[str]]]
149    console_scripts,  # type: Defaulted[List[str]]
150):
151    # type: (...) -> Optional[File]
152    if entry_points is _default and console_scripts is _default:
153        return None
154
155    if entry_points is _default:
156        entry_points_data = {}
157    else:
158        entry_points_data = deepcopy(entry_points)
159
160    if console_scripts is not _default:
161        entry_points_data["console_scripts"] = console_scripts
162
163    lines = []
164    for section, values in iteritems(entry_points_data):
165        lines.append("[{}]".format(section))
166        lines.extend(values)
167
168    return File(
169        dist_info_path(name, version, "entry_points.txt"),
170        ensure_binary("\n".join(lines)),
171    )
172
173
174def make_files(files):
175    # type: (Dict[str, AnyStr]) -> List[File]
176    return [
177        File(name, ensure_binary(contents))
178        for name, contents in iteritems(files)
179    ]
180
181
182def make_metadata_files(name, version, files):
183    # type: (str, str, Dict[str, AnyStr]) -> List[File]
184    get_path = partial(dist_info_path, name, version)
185    return [
186        File(get_path(name), ensure_binary(contents))
187        for name, contents in iteritems(files)
188    ]
189
190
191def make_data_files(name, version, files):
192    # type: (str, str, Dict[str, AnyStr]) -> List[File]
193    data_dir = "{}-{}.data".format(name, version)
194    return [
195        File("{}/{}".format(data_dir, name), ensure_binary(contents))
196        for name, contents in iteritems(files)
197    ]
198
199
200def urlsafe_b64encode_nopad(data):
201    # type: (bytes) -> str
202    return urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
203
204
205def digest(contents):
206    # type: (bytes) -> str
207    return "sha256={}".format(
208        urlsafe_b64encode_nopad(sha256(contents).digest())
209    )
210
211
212def record_file_maker_wrapper(
213    name,  # type: str
214    version,  # type: str
215    files,  # type: List[File]
216    record,  # type: Defaulted[Optional[AnyStr]]
217    record_callback,  # type: Defaulted[RecordCallback]
218):
219    # type: (...) -> Iterable[File]
220    records = []  # type: List[Record]
221    for file in files:
222        records.append(
223            Record(
224                file.name, digest(file.contents), str(len(file.contents))
225            )
226        )
227        yield file
228
229    if record is None:
230        return
231
232    record_path = dist_info_path(name, version, "RECORD")
233
234    if record is not _default:
235        yield File(record_path, ensure_binary(record))
236        return
237
238    records.append(Record(record_path, "", ""))
239
240    if record_callback is not _default:
241        records = record_callback(records)
242
243    with StringIO(newline=u"") as buf:
244        writer = csv23.writer(buf)
245        for record in records:
246            writer.writerow(map(ensure_text, record))
247        contents = buf.getvalue().encode("utf-8")
248
249    yield File(record_path, contents)
250
251
252def wheel_name(name, version, pythons, abis, platforms):
253    # type: (str, str, str, str, str) -> str
254    stem = "-".join([
255        name,
256        version,
257        ".".join(pythons),
258        ".".join(abis),
259        ".".join(platforms),
260    ])
261    return "{}.whl".format(stem)
262
263
264class WheelBuilder(object):
265    """A wheel that can be saved or converted to several formats.
266    """
267
268    def __init__(self, name, files):
269        # type: (str, List[File]) -> None
270        self._name = name
271        self._files = files
272
273    def save_to_dir(self, path):
274        # type: (Union[Path, str]) -> str
275        """Generate wheel file with correct name and save into the provided
276        directory.
277
278        :returns the wheel file path
279        """
280        path = Path(path) / self._name
281        path.write_bytes(self.as_bytes())
282        return str(path)
283
284    def save_to(self, path):
285        # type: (Union[Path, str]) -> str
286        """Generate wheel file, saving to the provided path. Any parent
287        directories must already exist.
288
289        :returns the wheel file path
290        """
291        path = Path(path)
292        path.write_bytes(self.as_bytes())
293        return str(path)
294
295    def as_bytes(self):
296        # type: () -> bytes
297        with BytesIO() as buf:
298            with ZipFile(buf, "w") as z:
299                for file in self._files:
300                    z.writestr(file.name, file.contents)
301            return buf.getvalue()
302
303    def as_zipfile(self):
304        # type: () -> ZipFile
305        return ZipFile(BytesIO(self.as_bytes()))
306
307
308def make_wheel(
309    name,  # type: str
310    version,  # type: str
311    wheel_metadata=_default,  # type: Defaulted[Optional[AnyStr]]
312    wheel_metadata_updates=_default,  # type: Defaulted[Dict[str, HeaderValue]]
313    metadata=_default,  # type: Defaulted[Optional[AnyStr]]
314    metadata_body=_default,  # type: Defaulted[AnyStr]
315    metadata_updates=_default,  # type: Defaulted[Dict[str, HeaderValue]]
316    extra_files=_default,  # type: Defaulted[Dict[str, AnyStr]]
317    extra_metadata_files=_default,  # type: Defaulted[Dict[str, AnyStr]]
318    extra_data_files=_default,  # type: Defaulted[Dict[str, AnyStr]]
319    console_scripts=_default,  # type: Defaulted[List[str]]
320    entry_points=_default,  # type: Defaulted[Dict[str, List[str]]]
321    record=_default,  # type: Defaulted[Optional[AnyStr]]
322    record_callback=_default,  # type: Defaulted[RecordCallback]
323):
324    # type: (...) -> WheelBuilder
325    """
326    Helper function for generating test wheels which are compliant by default.
327
328    Examples:
329
330    ```
331    # Basic wheel, which will have valid metadata, RECORD, etc
332    make_wheel(name="foo", version="0.1.0")
333    # Wheel with custom metadata
334    make_wheel(
335        name="foo",
336        version="0.1.0",
337        metadata_updates={
338            # Overrides default
339            "Name": "hello",
340            # Expands into separate Requires-Dist entries
341            "Requires-Dist": ["a == 1.0", "b == 2.0; sys_platform == 'win32'"],
342        },
343    )
344    ```
345
346    After specifying the wheel, it can be consumed in several ways:
347
348    ```
349    # Normal case, valid wheel we want pip to pick up.
350    make_wheel(...).save_to_dir(tmpdir)
351    # For a test case, to check that pip validates contents against wheel name.
352    make_wheel(name="simple", ...).save_to(tmpdir / "notsimple-...")
353    # In-memory, for unit tests.
354    z = make_wheel(...).as_zipfile()
355    ```
356
357    Below, any unicode value provided for AnyStr will be encoded as utf-8.
358
359    :param name: name of the distribution, propagated to the .dist-info
360        directory, METADATA, and wheel file name
361    :param version: version of the distribution, propagated to the .dist-info
362        directory, METADATA, and wheel file name
363    :param wheel_metadata: if provided and None, then no WHEEL metadata file
364        is generated; else if a string then sets the content of the WHEEL file
365    :param wheel_metadata_updates: override the default WHEEL metadata fields,
366        ignored if wheel_metadata is provided
367    :param metadata: if provided and None, then no METADATA file is generated;
368        else if a string then sets the content of the METADATA file
369    :param metadata_body: sets the value of the body text in METADATA, ignored
370        if metadata is provided
371    :param metadata_updates: override the default METADATA fields,
372        ignored if metadata is provided
373    :param extra_files: map from path to file contents for additional files to
374        be put in the wheel
375    :param extra_metadata_files: map from path (relative to .dist-info) to file
376        contents for additional files to be put in the wheel
377    :param extra_data_files: map from path (relative to .data) to file contents
378        for additional files to be put in the wheel
379    :param console_scripts: list of console scripts text to be put into
380        entry_points.txt - overrides any value set in entry_points
381    :param entry_points:
382    :param record: if provided and None, then no RECORD file is generated;
383        else if a string then sets the content of the RECORD file
384    :param record_callback: callback function that receives and can edit the
385        records before they are written to RECORD, ignored if record is
386        provided
387    """
388    pythons = ["py2", "py3"]
389    abis = ["none"]
390    platforms = ["any"]
391    tags = list(itertools.product(pythons, abis, platforms))
392
393    possible_files = [
394        make_metadata_file(
395            name, version, metadata, metadata_updates, metadata_body
396        ),
397        make_wheel_metadata_file(
398            name, version, wheel_metadata, tags, wheel_metadata_updates
399        ),
400        make_entry_points_file(name, version, entry_points, console_scripts),
401    ]
402
403    if extra_files is not _default:
404        possible_files.extend(make_files(extra_files))
405
406    if extra_metadata_files is not _default:
407        possible_files.extend(
408            make_metadata_files(name, version, extra_metadata_files)
409        )
410
411    if extra_data_files is not _default:
412        possible_files.extend(make_data_files(name, version, extra_data_files))
413
414    actual_files = filter(None, possible_files)
415
416    files_and_record_file = record_file_maker_wrapper(
417        name, version, actual_files, record, record_callback
418    )
419    wheel_file_name = wheel_name(name, version, pythons, abis, platforms)
420
421    return WheelBuilder(wheel_file_name, files_and_record_file)
422