1"""
2Low-level helpers for the SecureTransport bindings.
3
4These are Python functions that are not directly related to the high-level APIs
5but are necessary to get them to work. They include a whole bunch of low-level
6CoreFoundation messing about and memory management. The concerns in this module
7are almost entirely about trying to avoid memory leaks and providing
8appropriate and useful assistance to the higher-level code.
9"""
10import base64
11import ctypes
12import itertools
13import os
14import re
15import ssl
16import struct
17import tempfile
18
19from .bindings import CFConst, CoreFoundation, Security
20
21# This regular expression is used to grab PEM data out of a PEM bundle.
22_PEM_CERTS_RE = re.compile(
23    b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL
24)
25
26
27def _cf_data_from_bytes(bytestring):
28    """
29    Given a bytestring, create a CFData object from it. This CFData object must
30    be CFReleased by the caller.
31    """
32    return CoreFoundation.CFDataCreate(
33        CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring)
34    )
35
36
37def _cf_dictionary_from_tuples(tuples):
38    """
39    Given a list of Python tuples, create an associated CFDictionary.
40    """
41    dictionary_size = len(tuples)
42
43    # We need to get the dictionary keys and values out in the same order.
44    keys = (t[0] for t in tuples)
45    values = (t[1] for t in tuples)
46    cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys)
47    cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values)
48
49    return CoreFoundation.CFDictionaryCreate(
50        CoreFoundation.kCFAllocatorDefault,
51        cf_keys,
52        cf_values,
53        dictionary_size,
54        CoreFoundation.kCFTypeDictionaryKeyCallBacks,
55        CoreFoundation.kCFTypeDictionaryValueCallBacks,
56    )
57
58
59def _cfstr(py_bstr):
60    """
61    Given a Python binary data, create a CFString.
62    The string must be CFReleased by the caller.
63    """
64    c_str = ctypes.c_char_p(py_bstr)
65    cf_str = CoreFoundation.CFStringCreateWithCString(
66        CoreFoundation.kCFAllocatorDefault,
67        c_str,
68        CFConst.kCFStringEncodingUTF8,
69    )
70    return cf_str
71
72
73def _create_cfstring_array(lst):
74    """
75    Given a list of Python binary data, create an associated CFMutableArray.
76    The array must be CFReleased by the caller.
77
78    Raises an ssl.SSLError on failure.
79    """
80    cf_arr = None
81    try:
82        cf_arr = CoreFoundation.CFArrayCreateMutable(
83            CoreFoundation.kCFAllocatorDefault,
84            0,
85            ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
86        )
87        if not cf_arr:
88            raise MemoryError("Unable to allocate memory!")
89        for item in lst:
90            cf_str = _cfstr(item)
91            if not cf_str:
92                raise MemoryError("Unable to allocate memory!")
93            try:
94                CoreFoundation.CFArrayAppendValue(cf_arr, cf_str)
95            finally:
96                CoreFoundation.CFRelease(cf_str)
97    except BaseException as e:
98        if cf_arr:
99            CoreFoundation.CFRelease(cf_arr)
100        raise ssl.SSLError("Unable to allocate array: %s" % (e,))
101    return cf_arr
102
103
104def _cf_string_to_unicode(value):
105    """
106    Creates a Unicode string from a CFString object. Used entirely for error
107    reporting.
108
109    Yes, it annoys me quite a lot that this function is this complex.
110    """
111    value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p))
112
113    string = CoreFoundation.CFStringGetCStringPtr(
114        value_as_void_p, CFConst.kCFStringEncodingUTF8
115    )
116    if string is None:
117        buffer = ctypes.create_string_buffer(1024)
118        result = CoreFoundation.CFStringGetCString(
119            value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8
120        )
121        if not result:
122            raise OSError("Error copying C string from CFStringRef")
123        string = buffer.value
124    if string is not None:
125        string = string.decode("utf-8")
126    return string
127
128
129def _assert_no_error(error, exception_class=None):
130    """
131    Checks the return code and throws an exception if there is an error to
132    report
133    """
134    if error == 0:
135        return
136
137    cf_error_string = Security.SecCopyErrorMessageString(error, None)
138    output = _cf_string_to_unicode(cf_error_string)
139    CoreFoundation.CFRelease(cf_error_string)
140
141    if output is None or output == u"":
142        output = u"OSStatus %s" % error
143
144    if exception_class is None:
145        exception_class = ssl.SSLError
146
147    raise exception_class(output)
148
149
150def _cert_array_from_pem(pem_bundle):
151    """
152    Given a bundle of certs in PEM format, turns them into a CFArray of certs
153    that can be used to validate a cert chain.
154    """
155    # Normalize the PEM bundle's line endings.
156    pem_bundle = pem_bundle.replace(b"\r\n", b"\n")
157
158    der_certs = [
159        base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle)
160    ]
161    if not der_certs:
162        raise ssl.SSLError("No root certificates specified")
163
164    cert_array = CoreFoundation.CFArrayCreateMutable(
165        CoreFoundation.kCFAllocatorDefault,
166        0,
167        ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
168    )
169    if not cert_array:
170        raise ssl.SSLError("Unable to allocate memory!")
171
172    try:
173        for der_bytes in der_certs:
174            certdata = _cf_data_from_bytes(der_bytes)
175            if not certdata:
176                raise ssl.SSLError("Unable to allocate memory!")
177            cert = Security.SecCertificateCreateWithData(
178                CoreFoundation.kCFAllocatorDefault, certdata
179            )
180            CoreFoundation.CFRelease(certdata)
181            if not cert:
182                raise ssl.SSLError("Unable to build cert object!")
183
184            CoreFoundation.CFArrayAppendValue(cert_array, cert)
185            CoreFoundation.CFRelease(cert)
186    except Exception:
187        # We need to free the array before the exception bubbles further.
188        # We only want to do that if an error occurs: otherwise, the caller
189        # should free.
190        CoreFoundation.CFRelease(cert_array)
191        raise
192
193    return cert_array
194
195
196def _is_cert(item):
197    """
198    Returns True if a given CFTypeRef is a certificate.
199    """
200    expected = Security.SecCertificateGetTypeID()
201    return CoreFoundation.CFGetTypeID(item) == expected
202
203
204def _is_identity(item):
205    """
206    Returns True if a given CFTypeRef is an identity.
207    """
208    expected = Security.SecIdentityGetTypeID()
209    return CoreFoundation.CFGetTypeID(item) == expected
210
211
212def _temporary_keychain():
213    """
214    This function creates a temporary Mac keychain that we can use to work with
215    credentials. This keychain uses a one-time password and a temporary file to
216    store the data. We expect to have one keychain per socket. The returned
217    SecKeychainRef must be freed by the caller, including calling
218    SecKeychainDelete.
219
220    Returns a tuple of the SecKeychainRef and the path to the temporary
221    directory that contains it.
222    """
223    # Unfortunately, SecKeychainCreate requires a path to a keychain. This
224    # means we cannot use mkstemp to use a generic temporary file. Instead,
225    # we're going to create a temporary directory and a filename to use there.
226    # This filename will be 8 random bytes expanded into base64. We also need
227    # some random bytes to password-protect the keychain we're creating, so we
228    # ask for 40 random bytes.
229    random_bytes = os.urandom(40)
230    filename = base64.b16encode(random_bytes[:8]).decode("utf-8")
231    password = base64.b16encode(random_bytes[8:])  # Must be valid UTF-8
232    tempdirectory = tempfile.mkdtemp()
233
234    keychain_path = os.path.join(tempdirectory, filename).encode("utf-8")
235
236    # We now want to create the keychain itself.
237    keychain = Security.SecKeychainRef()
238    status = Security.SecKeychainCreate(
239        keychain_path, len(password), password, False, None, ctypes.byref(keychain)
240    )
241    _assert_no_error(status)
242
243    # Having created the keychain, we want to pass it off to the caller.
244    return keychain, tempdirectory
245
246
247def _load_items_from_file(keychain, path):
248    """
249    Given a single file, loads all the trust objects from it into arrays and
250    the keychain.
251    Returns a tuple of lists: the first list is a list of identities, the
252    second a list of certs.
253    """
254    certificates = []
255    identities = []
256    result_array = None
257
258    with open(path, "rb") as f:
259        raw_filedata = f.read()
260
261    try:
262        filedata = CoreFoundation.CFDataCreate(
263            CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata)
264        )
265        result_array = CoreFoundation.CFArrayRef()
266        result = Security.SecItemImport(
267            filedata,  # cert data
268            None,  # Filename, leaving it out for now
269            None,  # What the type of the file is, we don't care
270            None,  # what's in the file, we don't care
271            0,  # import flags
272            None,  # key params, can include passphrase in the future
273            keychain,  # The keychain to insert into
274            ctypes.byref(result_array),  # Results
275        )
276        _assert_no_error(result)
277
278        # A CFArray is not very useful to us as an intermediary
279        # representation, so we are going to extract the objects we want
280        # and then free the array. We don't need to keep hold of keys: the
281        # keychain already has them!
282        result_count = CoreFoundation.CFArrayGetCount(result_array)
283        for index in range(result_count):
284            item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index)
285            item = ctypes.cast(item, CoreFoundation.CFTypeRef)
286
287            if _is_cert(item):
288                CoreFoundation.CFRetain(item)
289                certificates.append(item)
290            elif _is_identity(item):
291                CoreFoundation.CFRetain(item)
292                identities.append(item)
293    finally:
294        if result_array:
295            CoreFoundation.CFRelease(result_array)
296
297        CoreFoundation.CFRelease(filedata)
298
299    return (identities, certificates)
300
301
302def _load_client_cert_chain(keychain, *paths):
303    """
304    Load certificates and maybe keys from a number of files. Has the end goal
305    of returning a CFArray containing one SecIdentityRef, and then zero or more
306    SecCertificateRef objects, suitable for use as a client certificate trust
307    chain.
308    """
309    # Ok, the strategy.
310    #
311    # This relies on knowing that macOS will not give you a SecIdentityRef
312    # unless you have imported a key into a keychain. This is a somewhat
313    # artificial limitation of macOS (for example, it doesn't necessarily
314    # affect iOS), but there is nothing inside Security.framework that lets you
315    # get a SecIdentityRef without having a key in a keychain.
316    #
317    # So the policy here is we take all the files and iterate them in order.
318    # Each one will use SecItemImport to have one or more objects loaded from
319    # it. We will also point at a keychain that macOS can use to work with the
320    # private key.
321    #
322    # Once we have all the objects, we'll check what we actually have. If we
323    # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise,
324    # we'll take the first certificate (which we assume to be our leaf) and
325    # ask the keychain to give us a SecIdentityRef with that cert's associated
326    # key.
327    #
328    # We'll then return a CFArray containing the trust chain: one
329    # SecIdentityRef and then zero-or-more SecCertificateRef objects. The
330    # responsibility for freeing this CFArray will be with the caller. This
331    # CFArray must remain alive for the entire connection, so in practice it
332    # will be stored with a single SSLSocket, along with the reference to the
333    # keychain.
334    certificates = []
335    identities = []
336
337    # Filter out bad paths.
338    paths = (path for path in paths if path)
339
340    try:
341        for file_path in paths:
342            new_identities, new_certs = _load_items_from_file(keychain, file_path)
343            identities.extend(new_identities)
344            certificates.extend(new_certs)
345
346        # Ok, we have everything. The question is: do we have an identity? If
347        # not, we want to grab one from the first cert we have.
348        if not identities:
349            new_identity = Security.SecIdentityRef()
350            status = Security.SecIdentityCreateWithCertificate(
351                keychain, certificates[0], ctypes.byref(new_identity)
352            )
353            _assert_no_error(status)
354            identities.append(new_identity)
355
356            # We now want to release the original certificate, as we no longer
357            # need it.
358            CoreFoundation.CFRelease(certificates.pop(0))
359
360        # We now need to build a new CFArray that holds the trust chain.
361        trust_chain = CoreFoundation.CFArrayCreateMutable(
362            CoreFoundation.kCFAllocatorDefault,
363            0,
364            ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
365        )
366        for item in itertools.chain(identities, certificates):
367            # ArrayAppendValue does a CFRetain on the item. That's fine,
368            # because the finally block will release our other refs to them.
369            CoreFoundation.CFArrayAppendValue(trust_chain, item)
370
371        return trust_chain
372    finally:
373        for obj in itertools.chain(identities, certificates):
374            CoreFoundation.CFRelease(obj)
375
376
377TLS_PROTOCOL_VERSIONS = {
378    "SSLv2": (0, 2),
379    "SSLv3": (3, 0),
380    "TLSv1": (3, 1),
381    "TLSv1.1": (3, 2),
382    "TLSv1.2": (3, 3),
383}
384
385
386def _build_tls_unknown_ca_alert(version):
387    """
388    Builds a TLS alert record for an unknown CA.
389    """
390    ver_maj, ver_min = TLS_PROTOCOL_VERSIONS[version]
391    severity_fatal = 0x02
392    description_unknown_ca = 0x30
393    msg = struct.pack(">BB", severity_fatal, description_unknown_ca)
394    msg_len = len(msg)
395    record_type_alert = 0x15
396    record = struct.pack(">BBBH", record_type_alert, ver_maj, ver_min, msg_len) + msg
397    return record
398