1# -*- coding: utf-8 -*- 2# 3# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org> 4# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at> 5# Copyright: (c) 2020, Felix Fontein <felix@fontein.de> 6# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 7 8from __future__ import absolute_import, division, print_function 9__metaclass__ = type 10 11 12import abc 13import binascii 14import datetime 15import re 16import traceback 17 18from distutils.version import LooseVersion 19 20from ansible.module_utils import six 21from ansible.module_utils.basic import missing_required_lib 22from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes 23 24from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( 25 load_certificate, 26 get_fingerprint_of_bytes, 27) 28 29from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( 30 cryptography_decode_name, 31 cryptography_get_extensions_from_cert, 32 cryptography_oid_to_name, 33 cryptography_serial_number_of_cert, 34) 35 36from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import ( 37 pyopenssl_get_extensions_from_cert, 38 pyopenssl_normalize_name, 39 pyopenssl_normalize_name_attribute, 40) 41 42from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import ( 43 get_publickey_info, 44) 45 46MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' 47MINIMAL_PYOPENSSL_VERSION = '0.15' 48 49PYOPENSSL_IMP_ERR = None 50try: 51 import OpenSSL 52 from OpenSSL import crypto 53 PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) 54 if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: 55 # OpenSSL 1.1.0 or newer 56 OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" 57 OPENSSL_MUST_STAPLE_VALUE = b"status_request" 58 else: 59 # OpenSSL 1.0.x or older 60 OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" 61 OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" 62except ImportError: 63 PYOPENSSL_IMP_ERR = traceback.format_exc() 64 PYOPENSSL_FOUND = False 65else: 66 PYOPENSSL_FOUND = True 67 68CRYPTOGRAPHY_IMP_ERR = None 69try: 70 import cryptography 71 from cryptography import x509 72 from cryptography.hazmat.primitives import serialization 73 CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) 74except ImportError: 75 CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() 76 CRYPTOGRAPHY_FOUND = False 77else: 78 CRYPTOGRAPHY_FOUND = True 79 80 81TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" 82 83 84@six.add_metaclass(abc.ABCMeta) 85class CertificateInfoRetrieval(object): 86 def __init__(self, module, backend, content): 87 # content must be a bytes string 88 self.module = module 89 self.backend = backend 90 self.content = content 91 92 @abc.abstractmethod 93 def _get_der_bytes(self): 94 pass 95 96 @abc.abstractmethod 97 def _get_signature_algorithm(self): 98 pass 99 100 @abc.abstractmethod 101 def _get_subject_ordered(self): 102 pass 103 104 @abc.abstractmethod 105 def _get_issuer_ordered(self): 106 pass 107 108 @abc.abstractmethod 109 def _get_version(self): 110 pass 111 112 @abc.abstractmethod 113 def _get_key_usage(self): 114 pass 115 116 @abc.abstractmethod 117 def _get_extended_key_usage(self): 118 pass 119 120 @abc.abstractmethod 121 def _get_basic_constraints(self): 122 pass 123 124 @abc.abstractmethod 125 def _get_ocsp_must_staple(self): 126 pass 127 128 @abc.abstractmethod 129 def _get_subject_alt_name(self): 130 pass 131 132 @abc.abstractmethod 133 def get_not_before(self): 134 pass 135 136 @abc.abstractmethod 137 def get_not_after(self): 138 pass 139 140 @abc.abstractmethod 141 def _get_public_key_pem(self): 142 pass 143 144 @abc.abstractmethod 145 def _get_public_key_object(self): 146 pass 147 148 @abc.abstractmethod 149 def _get_subject_key_identifier(self): 150 pass 151 152 @abc.abstractmethod 153 def _get_authority_key_identifier(self): 154 pass 155 156 @abc.abstractmethod 157 def _get_serial_number(self): 158 pass 159 160 @abc.abstractmethod 161 def _get_all_extensions(self): 162 pass 163 164 @abc.abstractmethod 165 def _get_ocsp_uri(self): 166 pass 167 168 def get_info(self, prefer_one_fingerprint=False): 169 result = dict() 170 self.cert = load_certificate(None, content=self.content, backend=self.backend) 171 172 result['signature_algorithm'] = self._get_signature_algorithm() 173 subject = self._get_subject_ordered() 174 issuer = self._get_issuer_ordered() 175 result['subject'] = dict() 176 for k, v in subject: 177 result['subject'][k] = v 178 result['subject_ordered'] = subject 179 result['issuer'] = dict() 180 for k, v in issuer: 181 result['issuer'][k] = v 182 result['issuer_ordered'] = issuer 183 result['version'] = self._get_version() 184 result['key_usage'], result['key_usage_critical'] = self._get_key_usage() 185 result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage() 186 result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints() 187 result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple() 188 result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name() 189 190 not_before = self.get_not_before() 191 not_after = self.get_not_after() 192 result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT) 193 result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT) 194 result['expired'] = not_after < datetime.datetime.utcnow() 195 196 result['public_key'] = self._get_public_key_pem() 197 198 public_key_info = get_publickey_info( 199 self.module, 200 self.backend, 201 key=self._get_public_key_object(), 202 prefer_one_fingerprint=prefer_one_fingerprint) 203 result.update({ 204 'public_key_type': public_key_info['type'], 205 'public_key_data': public_key_info['public_data'], 206 'public_key_fingerprints': public_key_info['fingerprints'], 207 }) 208 209 result['fingerprints'] = get_fingerprint_of_bytes( 210 self._get_der_bytes(), prefer_one=prefer_one_fingerprint) 211 212 if self.backend != 'pyopenssl': 213 ski = self._get_subject_key_identifier() 214 if ski is not None: 215 ski = to_native(binascii.hexlify(ski)) 216 ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)]) 217 result['subject_key_identifier'] = ski 218 219 aki, aci, acsn = self._get_authority_key_identifier() 220 if aki is not None: 221 aki = to_native(binascii.hexlify(aki)) 222 aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)]) 223 result['authority_key_identifier'] = aki 224 result['authority_cert_issuer'] = aci 225 result['authority_cert_serial_number'] = acsn 226 227 result['serial_number'] = self._get_serial_number() 228 result['extensions_by_oid'] = self._get_all_extensions() 229 result['ocsp_uri'] = self._get_ocsp_uri() 230 231 return result 232 233 234class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval): 235 """Validate the supplied cert, using the cryptography backend""" 236 def __init__(self, module, content): 237 super(CertificateInfoRetrievalCryptography, self).__init__(module, 'cryptography', content) 238 239 def _get_der_bytes(self): 240 return self.cert.public_bytes(serialization.Encoding.DER) 241 242 def _get_signature_algorithm(self): 243 return cryptography_oid_to_name(self.cert.signature_algorithm_oid) 244 245 def _get_subject_ordered(self): 246 result = [] 247 for attribute in self.cert.subject: 248 result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) 249 return result 250 251 def _get_issuer_ordered(self): 252 result = [] 253 for attribute in self.cert.issuer: 254 result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) 255 return result 256 257 def _get_version(self): 258 if self.cert.version == x509.Version.v1: 259 return 1 260 if self.cert.version == x509.Version.v3: 261 return 3 262 return "unknown" 263 264 def _get_key_usage(self): 265 try: 266 current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage) 267 current_key_usage = current_key_ext.value 268 key_usage = dict( 269 digital_signature=current_key_usage.digital_signature, 270 content_commitment=current_key_usage.content_commitment, 271 key_encipherment=current_key_usage.key_encipherment, 272 data_encipherment=current_key_usage.data_encipherment, 273 key_agreement=current_key_usage.key_agreement, 274 key_cert_sign=current_key_usage.key_cert_sign, 275 crl_sign=current_key_usage.crl_sign, 276 encipher_only=False, 277 decipher_only=False, 278 ) 279 if key_usage['key_agreement']: 280 key_usage.update(dict( 281 encipher_only=current_key_usage.encipher_only, 282 decipher_only=current_key_usage.decipher_only 283 )) 284 285 key_usage_names = dict( 286 digital_signature='Digital Signature', 287 content_commitment='Non Repudiation', 288 key_encipherment='Key Encipherment', 289 data_encipherment='Data Encipherment', 290 key_agreement='Key Agreement', 291 key_cert_sign='Certificate Sign', 292 crl_sign='CRL Sign', 293 encipher_only='Encipher Only', 294 decipher_only='Decipher Only', 295 ) 296 return sorted([ 297 key_usage_names[name] for name, value in key_usage.items() if value 298 ]), current_key_ext.critical 299 except cryptography.x509.ExtensionNotFound: 300 return None, False 301 302 def _get_extended_key_usage(self): 303 try: 304 ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage) 305 return sorted([ 306 cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value 307 ]), ext_keyusage_ext.critical 308 except cryptography.x509.ExtensionNotFound: 309 return None, False 310 311 def _get_basic_constraints(self): 312 try: 313 ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints) 314 result = [] 315 result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE')) 316 if ext_keyusage_ext.value.path_length is not None: 317 result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length)) 318 return sorted(result), ext_keyusage_ext.critical 319 except cryptography.x509.ExtensionNotFound: 320 return None, False 321 322 def _get_ocsp_must_staple(self): 323 try: 324 try: 325 # This only works with cryptography >= 2.1 326 tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature) 327 value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value 328 except AttributeError: 329 # Fallback for cryptography < 2.1 330 oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") 331 tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid) 332 value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05" 333 return value, tlsfeature_ext.critical 334 except cryptography.x509.ExtensionNotFound: 335 return None, False 336 337 def _get_subject_alt_name(self): 338 try: 339 san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) 340 result = [cryptography_decode_name(san) for san in san_ext.value] 341 return result, san_ext.critical 342 except cryptography.x509.ExtensionNotFound: 343 return None, False 344 345 def get_not_before(self): 346 return self.cert.not_valid_before 347 348 def get_not_after(self): 349 return self.cert.not_valid_after 350 351 def _get_public_key_pem(self): 352 return self.cert.public_key().public_bytes( 353 serialization.Encoding.PEM, 354 serialization.PublicFormat.SubjectPublicKeyInfo, 355 ) 356 357 def _get_public_key_object(self): 358 return self.cert.public_key() 359 360 def _get_subject_key_identifier(self): 361 try: 362 ext = self.cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier) 363 return ext.value.digest 364 except cryptography.x509.ExtensionNotFound: 365 return None 366 367 def _get_authority_key_identifier(self): 368 try: 369 ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier) 370 issuer = None 371 if ext.value.authority_cert_issuer is not None: 372 issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer] 373 return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number 374 except cryptography.x509.ExtensionNotFound: 375 return None, None, None 376 377 def _get_serial_number(self): 378 return cryptography_serial_number_of_cert(self.cert) 379 380 def _get_all_extensions(self): 381 return cryptography_get_extensions_from_cert(self.cert) 382 383 def _get_ocsp_uri(self): 384 try: 385 ext = self.cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess) 386 for desc in ext.value: 387 if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP: 388 if isinstance(desc.access_location, x509.UniformResourceIdentifier): 389 return desc.access_location.value 390 except x509.ExtensionNotFound as dummy: 391 pass 392 return None 393 394 395class CertificateInfoRetrievalPyOpenSSL(CertificateInfoRetrieval): 396 """validate the supplied certificate.""" 397 398 def __init__(self, module, content): 399 super(CertificateInfoRetrievalPyOpenSSL, self).__init__(module, 'pyopenssl', content) 400 401 def _get_der_bytes(self): 402 return crypto.dump_certificate(crypto.FILETYPE_ASN1, self.cert) 403 404 def _get_signature_algorithm(self): 405 return to_text(self.cert.get_signature_algorithm()) 406 407 def __get_name(self, name): 408 result = [] 409 for sub in name.get_components(): 410 result.append([pyopenssl_normalize_name(sub[0]), to_text(sub[1])]) 411 return result 412 413 def _get_subject_ordered(self): 414 return self.__get_name(self.cert.get_subject()) 415 416 def _get_issuer_ordered(self): 417 return self.__get_name(self.cert.get_issuer()) 418 419 def _get_version(self): 420 # Version numbers in certs are off by one: 421 # v1: 0, v2: 1, v3: 2 ... 422 return self.cert.get_version() + 1 423 424 def _get_extension(self, short_name): 425 for extension_idx in range(0, self.cert.get_extension_count()): 426 extension = self.cert.get_extension(extension_idx) 427 if extension.get_short_name() == short_name: 428 result = [ 429 pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',') 430 ] 431 return sorted(result), bool(extension.get_critical()) 432 return None, False 433 434 def _get_key_usage(self): 435 return self._get_extension(b'keyUsage') 436 437 def _get_extended_key_usage(self): 438 return self._get_extension(b'extendedKeyUsage') 439 440 def _get_basic_constraints(self): 441 return self._get_extension(b'basicConstraints') 442 443 def _get_ocsp_must_staple(self): 444 extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())] 445 oms_ext = [ 446 ext for ext in extensions 447 if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE 448 ] 449 if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: 450 # Older versions of libssl don't know about OCSP Must Staple 451 oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) 452 if oms_ext: 453 return True, bool(oms_ext[0].get_critical()) 454 else: 455 return None, False 456 457 def _get_subject_alt_name(self): 458 for extension_idx in range(0, self.cert.get_extension_count()): 459 extension = self.cert.get_extension(extension_idx) 460 if extension.get_short_name() == b'subjectAltName': 461 result = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in 462 to_text(extension, errors='surrogate_or_strict').split(', ')] 463 return result, bool(extension.get_critical()) 464 return None, False 465 466 def get_not_before(self): 467 time_string = to_native(self.cert.get_notBefore()) 468 return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 469 470 def get_not_after(self): 471 time_string = to_native(self.cert.get_notAfter()) 472 return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 473 474 def _get_public_key_pem(self): 475 try: 476 return crypto.dump_publickey( 477 crypto.FILETYPE_PEM, 478 self.cert.get_pubkey(), 479 ) 480 except AttributeError: 481 try: 482 # pyOpenSSL < 16.0: 483 bio = crypto._new_mem_buf() 484 rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey) 485 if rc != 1: 486 crypto._raise_current_error() 487 return crypto._bio_to_string(bio) 488 except AttributeError: 489 self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' 490 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') 491 492 def _get_public_key_object(self): 493 return self.cert.get_pubkey() 494 495 def _get_subject_key_identifier(self): 496 # Won't be implemented 497 return None 498 499 def _get_authority_key_identifier(self): 500 # Won't be implemented 501 return None, None, None 502 503 def _get_serial_number(self): 504 return self.cert.get_serial_number() 505 506 def _get_all_extensions(self): 507 return pyopenssl_get_extensions_from_cert(self.cert) 508 509 def _get_ocsp_uri(self): 510 for i in range(self.cert.get_extension_count()): 511 ext = self.cert.get_extension(i) 512 if ext.get_short_name() == b'authorityInfoAccess': 513 v = str(ext) 514 m = re.search('^OCSP - URI:(.*)$', v, flags=re.MULTILINE) 515 if m: 516 return m.group(1) 517 return None 518 519 520def get_certificate_info(module, backend, content, prefer_one_fingerprint=False): 521 if backend == 'cryptography': 522 info = CertificateInfoRetrievalCryptography(module, content) 523 elif backend == 'pyopenssl': 524 info = CertificateInfoRetrievalPyOpenSSL(module, content) 525 return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) 526 527 528def select_backend(module, backend, content): 529 if backend == 'auto': 530 # Detection what is possible 531 can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) 532 can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) 533 534 # First try cryptography, then pyOpenSSL 535 if can_use_cryptography: 536 backend = 'cryptography' 537 elif can_use_pyopenssl: 538 backend = 'pyopenssl' 539 540 # Success? 541 if backend == 'auto': 542 module.fail_json(msg=("Can't detect any of the required Python libraries " 543 "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( 544 MINIMAL_CRYPTOGRAPHY_VERSION, 545 MINIMAL_PYOPENSSL_VERSION)) 546 547 if backend == 'pyopenssl': 548 if not PYOPENSSL_FOUND: 549 module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), 550 exception=PYOPENSSL_IMP_ERR) 551 try: 552 getattr(crypto.X509Req, 'get_extensions') 553 except AttributeError: 554 module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs') 555 556 module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', 557 version='2.0.0', collection_name='community.crypto') 558 return backend, CertificateInfoRetrievalPyOpenSSL(module, content) 559 elif backend == 'cryptography': 560 if not CRYPTOGRAPHY_FOUND: 561 module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), 562 exception=CRYPTOGRAPHY_IMP_ERR) 563 return backend, CertificateInfoRetrievalCryptography(module, content) 564 else: 565 raise ValueError('Unsupported value for backend: {0}'.format(backend)) 566