1""" 2Low-level helpers for the SecureTransport bindings. 3 4These are Python functions that are not directly related to the high-level APIs 5but are necessary to get them to work. They include a whole bunch of low-level 6CoreFoundation messing about and memory management. The concerns in this module 7are almost entirely about trying to avoid memory leaks and providing 8appropriate and useful assistance to the higher-level code. 9""" 10import base64 11import ctypes 12import itertools 13import os 14import re 15import ssl 16import struct 17import tempfile 18 19from .bindings import CFConst, CoreFoundation, Security 20 21# This regular expression is used to grab PEM data out of a PEM bundle. 22_PEM_CERTS_RE = re.compile( 23 b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL 24) 25 26 27def _cf_data_from_bytes(bytestring): 28 """ 29 Given a bytestring, create a CFData object from it. This CFData object must 30 be CFReleased by the caller. 31 """ 32 return CoreFoundation.CFDataCreate( 33 CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring) 34 ) 35 36 37def _cf_dictionary_from_tuples(tuples): 38 """ 39 Given a list of Python tuples, create an associated CFDictionary. 40 """ 41 dictionary_size = len(tuples) 42 43 # We need to get the dictionary keys and values out in the same order. 44 keys = (t[0] for t in tuples) 45 values = (t[1] for t in tuples) 46 cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys) 47 cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values) 48 49 return CoreFoundation.CFDictionaryCreate( 50 CoreFoundation.kCFAllocatorDefault, 51 cf_keys, 52 cf_values, 53 dictionary_size, 54 CoreFoundation.kCFTypeDictionaryKeyCallBacks, 55 CoreFoundation.kCFTypeDictionaryValueCallBacks, 56 ) 57 58 59def _cfstr(py_bstr): 60 """ 61 Given a Python binary data, create a CFString. 62 The string must be CFReleased by the caller. 63 """ 64 c_str = ctypes.c_char_p(py_bstr) 65 cf_str = CoreFoundation.CFStringCreateWithCString( 66 CoreFoundation.kCFAllocatorDefault, 67 c_str, 68 CFConst.kCFStringEncodingUTF8, 69 ) 70 return cf_str 71 72 73def _create_cfstring_array(lst): 74 """ 75 Given a list of Python binary data, create an associated CFMutableArray. 76 The array must be CFReleased by the caller. 77 78 Raises an ssl.SSLError on failure. 79 """ 80 cf_arr = None 81 try: 82 cf_arr = CoreFoundation.CFArrayCreateMutable( 83 CoreFoundation.kCFAllocatorDefault, 84 0, 85 ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), 86 ) 87 if not cf_arr: 88 raise MemoryError("Unable to allocate memory!") 89 for item in lst: 90 cf_str = _cfstr(item) 91 if not cf_str: 92 raise MemoryError("Unable to allocate memory!") 93 try: 94 CoreFoundation.CFArrayAppendValue(cf_arr, cf_str) 95 finally: 96 CoreFoundation.CFRelease(cf_str) 97 except BaseException as e: 98 if cf_arr: 99 CoreFoundation.CFRelease(cf_arr) 100 raise ssl.SSLError("Unable to allocate array: %s" % (e,)) 101 return cf_arr 102 103 104def _cf_string_to_unicode(value): 105 """ 106 Creates a Unicode string from a CFString object. Used entirely for error 107 reporting. 108 109 Yes, it annoys me quite a lot that this function is this complex. 110 """ 111 value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p)) 112 113 string = CoreFoundation.CFStringGetCStringPtr( 114 value_as_void_p, CFConst.kCFStringEncodingUTF8 115 ) 116 if string is None: 117 buffer = ctypes.create_string_buffer(1024) 118 result = CoreFoundation.CFStringGetCString( 119 value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8 120 ) 121 if not result: 122 raise OSError("Error copying C string from CFStringRef") 123 string = buffer.value 124 if string is not None: 125 string = string.decode("utf-8") 126 return string 127 128 129def _assert_no_error(error, exception_class=None): 130 """ 131 Checks the return code and throws an exception if there is an error to 132 report 133 """ 134 if error == 0: 135 return 136 137 cf_error_string = Security.SecCopyErrorMessageString(error, None) 138 output = _cf_string_to_unicode(cf_error_string) 139 CoreFoundation.CFRelease(cf_error_string) 140 141 if output is None or output == u"": 142 output = u"OSStatus %s" % error 143 144 if exception_class is None: 145 exception_class = ssl.SSLError 146 147 raise exception_class(output) 148 149 150def _cert_array_from_pem(pem_bundle): 151 """ 152 Given a bundle of certs in PEM format, turns them into a CFArray of certs 153 that can be used to validate a cert chain. 154 """ 155 # Normalize the PEM bundle's line endings. 156 pem_bundle = pem_bundle.replace(b"\r\n", b"\n") 157 158 der_certs = [ 159 base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle) 160 ] 161 if not der_certs: 162 raise ssl.SSLError("No root certificates specified") 163 164 cert_array = CoreFoundation.CFArrayCreateMutable( 165 CoreFoundation.kCFAllocatorDefault, 166 0, 167 ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), 168 ) 169 if not cert_array: 170 raise ssl.SSLError("Unable to allocate memory!") 171 172 try: 173 for der_bytes in der_certs: 174 certdata = _cf_data_from_bytes(der_bytes) 175 if not certdata: 176 raise ssl.SSLError("Unable to allocate memory!") 177 cert = Security.SecCertificateCreateWithData( 178 CoreFoundation.kCFAllocatorDefault, certdata 179 ) 180 CoreFoundation.CFRelease(certdata) 181 if not cert: 182 raise ssl.SSLError("Unable to build cert object!") 183 184 CoreFoundation.CFArrayAppendValue(cert_array, cert) 185 CoreFoundation.CFRelease(cert) 186 except Exception: 187 # We need to free the array before the exception bubbles further. 188 # We only want to do that if an error occurs: otherwise, the caller 189 # should free. 190 CoreFoundation.CFRelease(cert_array) 191 raise 192 193 return cert_array 194 195 196def _is_cert(item): 197 """ 198 Returns True if a given CFTypeRef is a certificate. 199 """ 200 expected = Security.SecCertificateGetTypeID() 201 return CoreFoundation.CFGetTypeID(item) == expected 202 203 204def _is_identity(item): 205 """ 206 Returns True if a given CFTypeRef is an identity. 207 """ 208 expected = Security.SecIdentityGetTypeID() 209 return CoreFoundation.CFGetTypeID(item) == expected 210 211 212def _temporary_keychain(): 213 """ 214 This function creates a temporary Mac keychain that we can use to work with 215 credentials. This keychain uses a one-time password and a temporary file to 216 store the data. We expect to have one keychain per socket. The returned 217 SecKeychainRef must be freed by the caller, including calling 218 SecKeychainDelete. 219 220 Returns a tuple of the SecKeychainRef and the path to the temporary 221 directory that contains it. 222 """ 223 # Unfortunately, SecKeychainCreate requires a path to a keychain. This 224 # means we cannot use mkstemp to use a generic temporary file. Instead, 225 # we're going to create a temporary directory and a filename to use there. 226 # This filename will be 8 random bytes expanded into base64. We also need 227 # some random bytes to password-protect the keychain we're creating, so we 228 # ask for 40 random bytes. 229 random_bytes = os.urandom(40) 230 filename = base64.b16encode(random_bytes[:8]).decode("utf-8") 231 password = base64.b16encode(random_bytes[8:]) # Must be valid UTF-8 232 tempdirectory = tempfile.mkdtemp() 233 234 keychain_path = os.path.join(tempdirectory, filename).encode("utf-8") 235 236 # We now want to create the keychain itself. 237 keychain = Security.SecKeychainRef() 238 status = Security.SecKeychainCreate( 239 keychain_path, len(password), password, False, None, ctypes.byref(keychain) 240 ) 241 _assert_no_error(status) 242 243 # Having created the keychain, we want to pass it off to the caller. 244 return keychain, tempdirectory 245 246 247def _load_items_from_file(keychain, path): 248 """ 249 Given a single file, loads all the trust objects from it into arrays and 250 the keychain. 251 Returns a tuple of lists: the first list is a list of identities, the 252 second a list of certs. 253 """ 254 certificates = [] 255 identities = [] 256 result_array = None 257 258 with open(path, "rb") as f: 259 raw_filedata = f.read() 260 261 try: 262 filedata = CoreFoundation.CFDataCreate( 263 CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata) 264 ) 265 result_array = CoreFoundation.CFArrayRef() 266 result = Security.SecItemImport( 267 filedata, # cert data 268 None, # Filename, leaving it out for now 269 None, # What the type of the file is, we don't care 270 None, # what's in the file, we don't care 271 0, # import flags 272 None, # key params, can include passphrase in the future 273 keychain, # The keychain to insert into 274 ctypes.byref(result_array), # Results 275 ) 276 _assert_no_error(result) 277 278 # A CFArray is not very useful to us as an intermediary 279 # representation, so we are going to extract the objects we want 280 # and then free the array. We don't need to keep hold of keys: the 281 # keychain already has them! 282 result_count = CoreFoundation.CFArrayGetCount(result_array) 283 for index in range(result_count): 284 item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index) 285 item = ctypes.cast(item, CoreFoundation.CFTypeRef) 286 287 if _is_cert(item): 288 CoreFoundation.CFRetain(item) 289 certificates.append(item) 290 elif _is_identity(item): 291 CoreFoundation.CFRetain(item) 292 identities.append(item) 293 finally: 294 if result_array: 295 CoreFoundation.CFRelease(result_array) 296 297 CoreFoundation.CFRelease(filedata) 298 299 return (identities, certificates) 300 301 302def _load_client_cert_chain(keychain, *paths): 303 """ 304 Load certificates and maybe keys from a number of files. Has the end goal 305 of returning a CFArray containing one SecIdentityRef, and then zero or more 306 SecCertificateRef objects, suitable for use as a client certificate trust 307 chain. 308 """ 309 # Ok, the strategy. 310 # 311 # This relies on knowing that macOS will not give you a SecIdentityRef 312 # unless you have imported a key into a keychain. This is a somewhat 313 # artificial limitation of macOS (for example, it doesn't necessarily 314 # affect iOS), but there is nothing inside Security.framework that lets you 315 # get a SecIdentityRef without having a key in a keychain. 316 # 317 # So the policy here is we take all the files and iterate them in order. 318 # Each one will use SecItemImport to have one or more objects loaded from 319 # it. We will also point at a keychain that macOS can use to work with the 320 # private key. 321 # 322 # Once we have all the objects, we'll check what we actually have. If we 323 # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise, 324 # we'll take the first certificate (which we assume to be our leaf) and 325 # ask the keychain to give us a SecIdentityRef with that cert's associated 326 # key. 327 # 328 # We'll then return a CFArray containing the trust chain: one 329 # SecIdentityRef and then zero-or-more SecCertificateRef objects. The 330 # responsibility for freeing this CFArray will be with the caller. This 331 # CFArray must remain alive for the entire connection, so in practice it 332 # will be stored with a single SSLSocket, along with the reference to the 333 # keychain. 334 certificates = [] 335 identities = [] 336 337 # Filter out bad paths. 338 paths = (path for path in paths if path) 339 340 try: 341 for file_path in paths: 342 new_identities, new_certs = _load_items_from_file(keychain, file_path) 343 identities.extend(new_identities) 344 certificates.extend(new_certs) 345 346 # Ok, we have everything. The question is: do we have an identity? If 347 # not, we want to grab one from the first cert we have. 348 if not identities: 349 new_identity = Security.SecIdentityRef() 350 status = Security.SecIdentityCreateWithCertificate( 351 keychain, certificates[0], ctypes.byref(new_identity) 352 ) 353 _assert_no_error(status) 354 identities.append(new_identity) 355 356 # We now want to release the original certificate, as we no longer 357 # need it. 358 CoreFoundation.CFRelease(certificates.pop(0)) 359 360 # We now need to build a new CFArray that holds the trust chain. 361 trust_chain = CoreFoundation.CFArrayCreateMutable( 362 CoreFoundation.kCFAllocatorDefault, 363 0, 364 ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), 365 ) 366 for item in itertools.chain(identities, certificates): 367 # ArrayAppendValue does a CFRetain on the item. That's fine, 368 # because the finally block will release our other refs to them. 369 CoreFoundation.CFArrayAppendValue(trust_chain, item) 370 371 return trust_chain 372 finally: 373 for obj in itertools.chain(identities, certificates): 374 CoreFoundation.CFRelease(obj) 375 376 377TLS_PROTOCOL_VERSIONS = { 378 "SSLv2": (0, 2), 379 "SSLv3": (3, 0), 380 "TLSv1": (3, 1), 381 "TLSv1.1": (3, 2), 382 "TLSv1.2": (3, 3), 383} 384 385 386def _build_tls_unknown_ca_alert(version): 387 """ 388 Builds a TLS alert record for an unknown CA. 389 """ 390 ver_maj, ver_min = TLS_PROTOCOL_VERSIONS[version] 391 severity_fatal = 0x02 392 description_unknown_ca = 0x30 393 msg = struct.pack(">BB", severity_fatal, description_unknown_ca) 394 msg_len = len(msg) 395 record_type_alert = 0x15 396 record = struct.pack(">BBBH", record_type_alert, ver_maj, ver_min, msg_len) + msg 397 return record 398