1#!/usr/bin/python 2# -*- coding: utf-8 -*- 3 4# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org> 5# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at> 6# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 7 8from __future__ import absolute_import, division, print_function 9__metaclass__ = type 10 11ANSIBLE_METADATA = {'metadata_version': '1.1', 12 'status': ['preview'], 13 'supported_by': 'community'} 14 15DOCUMENTATION = r''' 16--- 17module: openssl_certificate_info 18version_added: '2.8' 19short_description: Provide information of OpenSSL X.509 certificates 20description: 21 - This module allows one to query information on OpenSSL certificates. 22 - It uses the pyOpenSSL or cryptography python library to interact with OpenSSL. If both the 23 cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements) 24 cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with 25 C(select_crypto_backend)). Please note that the PyOpenSSL backend was deprecated in Ansible 2.9 26 and will be removed in Ansible 2.13. 27requirements: 28 - PyOpenSSL >= 0.15 or cryptography >= 1.6 29author: 30 - Felix Fontein (@felixfontein) 31 - Yanis Guenane (@Spredzy) 32 - Markus Teufelberger (@MarkusTeufelberger) 33options: 34 path: 35 description: 36 - Remote absolute path where the certificate file is loaded from. 37 type: path 38 required: true 39 valid_at: 40 description: 41 - A dict of names mapping to time specifications. Every time specified here 42 will be checked whether the certificate is valid at this point. See the 43 C(valid_at) return value for informations on the result. 44 - Time can be specified either as relative time or as absolute timestamp. 45 - Time will always be interpreted as UTC. 46 - Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer 47 + C([w | d | h | m | s]) (e.g. C(+32w1d2h), and ASN.1 TIME (i.e. pattern C(YYYYMMDDHHMMSSZ)). 48 Note that all timestamps will be treated as being in UTC. 49 type: dict 50 select_crypto_backend: 51 description: 52 - Determines which crypto backend to use. 53 - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). 54 - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. 55 - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. 56 - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in Ansible 2.13. 57 From that point on, only the C(cryptography) backend will be available. 58 type: str 59 default: auto 60 choices: [ auto, cryptography, pyopenssl ] 61 62notes: 63 - All timestamp values are provided in ASN.1 TIME format, i.e. following the C(YYYYMMDDHHMMSSZ) pattern. 64 They are all in UTC. 65seealso: 66- module: openssl_certificate 67''' 68 69EXAMPLES = r''' 70- name: Generate a Self Signed OpenSSL certificate 71 openssl_certificate: 72 path: /etc/ssl/crt/ansible.com.crt 73 privatekey_path: /etc/ssl/private/ansible.com.pem 74 csr_path: /etc/ssl/csr/ansible.com.csr 75 provider: selfsigned 76 77 78# Get information on the certificate 79 80- name: Get information on generated certificate 81 openssl_certificate_info: 82 path: /etc/ssl/crt/ansible.com.crt 83 register: result 84 85- name: Dump information 86 debug: 87 var: result 88 89 90# Check whether the certificate is valid or not valid at certain times, fail 91# if this is not the case. The first task (openssl_certificate_info) collects 92# the information, and the second task (assert) validates the result and 93# makes the playbook fail in case something is not as expected. 94 95- name: Test whether that certificate is valid tomorrow and/or in three weeks 96 openssl_certificate_info: 97 path: /etc/ssl/crt/ansible.com.crt 98 valid_at: 99 point_1: "+1d" 100 point_2: "+3w" 101 register: result 102 103- name: Validate that certificate is valid tomorrow, but not in three weeks 104 assert: 105 that: 106 - result.valid_at.point_1 # valid in one day 107 - not result.valid_at.point_2 # not valid in three weeks 108''' 109 110RETURN = r''' 111expired: 112 description: Whether the certificate is expired (i.e. C(notAfter) is in the past) 113 returned: success 114 type: bool 115basic_constraints: 116 description: Entries in the C(basic_constraints) extension, or C(none) if extension is not present. 117 returned: success 118 type: list 119 elements: str 120 sample: "[CA:TRUE, pathlen:1]" 121basic_constraints_critical: 122 description: Whether the C(basic_constraints) extension is critical. 123 returned: success 124 type: bool 125extended_key_usage: 126 description: Entries in the C(extended_key_usage) extension, or C(none) if extension is not present. 127 returned: success 128 type: list 129 elements: str 130 sample: "[Biometric Info, DVCS, Time Stamping]" 131extended_key_usage_critical: 132 description: Whether the C(extended_key_usage) extension is critical. 133 returned: success 134 type: bool 135extensions_by_oid: 136 description: Returns a dictionary for every extension OID 137 returned: success 138 type: dict 139 contains: 140 critical: 141 description: Whether the extension is critical. 142 returned: success 143 type: bool 144 value: 145 description: The Base64 encoded value (in DER format) of the extension 146 returned: success 147 type: str 148 sample: "MAMCAQU=" 149 sample: '{"1.3.6.1.5.5.7.1.24": { "critical": false, "value": "MAMCAQU="}}' 150key_usage: 151 description: Entries in the C(key_usage) extension, or C(none) if extension is not present. 152 returned: success 153 type: str 154 sample: "[Key Agreement, Data Encipherment]" 155key_usage_critical: 156 description: Whether the C(key_usage) extension is critical. 157 returned: success 158 type: bool 159subject_alt_name: 160 description: Entries in the C(subject_alt_name) extension, or C(none) if extension is not present. 161 returned: success 162 type: list 163 elements: str 164 sample: "[DNS:www.ansible.com, IP:1.2.3.4]" 165subject_alt_name_critical: 166 description: Whether the C(subject_alt_name) extension is critical. 167 returned: success 168 type: bool 169ocsp_must_staple: 170 description: C(yes) if the OCSP Must Staple extension is present, C(none) otherwise. 171 returned: success 172 type: bool 173ocsp_must_staple_critical: 174 description: Whether the C(ocsp_must_staple) extension is critical. 175 returned: success 176 type: bool 177issuer: 178 description: 179 - The certificate's issuer. 180 - Note that for repeated values, only the last one will be returned. 181 returned: success 182 type: dict 183 sample: '{"organizationName": "Ansible", "commonName": "ca.example.com"}' 184issuer_ordered: 185 description: The certificate's issuer as an ordered list of tuples. 186 returned: success 187 type: list 188 elements: list 189 sample: '[["organizationName", "Ansible"], ["commonName": "ca.example.com"]]' 190 version_added: "2.9" 191subject: 192 description: 193 - The certificate's subject as a dictionary. 194 - Note that for repeated values, only the last one will be returned. 195 returned: success 196 type: dict 197 sample: '{"commonName": "www.example.com", "emailAddress": "test@example.com"}' 198subject_ordered: 199 description: The certificate's subject as an ordered list of tuples. 200 returned: success 201 type: list 202 elements: list 203 sample: '[["commonName", "www.example.com"], ["emailAddress": "test@example.com"]]' 204 version_added: "2.9" 205not_after: 206 description: C(notAfter) date as ASN.1 TIME 207 returned: success 208 type: str 209 sample: 20190413202428Z 210not_before: 211 description: C(notBefore) date as ASN.1 TIME 212 returned: success 213 type: str 214 sample: 20190331202428Z 215public_key: 216 description: Certificate's public key in PEM format 217 returned: success 218 type: str 219 sample: "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A..." 220public_key_fingerprints: 221 description: 222 - Fingerprints of certificate's public key. 223 - For every hash algorithm available, the fingerprint is computed. 224 returned: success 225 type: dict 226 sample: "{'sha256': 'd4:b3:aa:6d:c8:04:ce:4e:ba:f6:29:4d:92:a3:94:b0:c2:ff:bd:bf:33:63:11:43:34:0f:51:b0:95:09:2f:63', 227 'sha512': 'f7:07:4a:f0:b0:f0:e6:8b:95:5f:f9:e6:61:0a:32:68:f1..." 228signature_algorithm: 229 description: The signature algorithm used to sign the certificate. 230 returned: success 231 type: str 232 sample: sha256WithRSAEncryption 233serial_number: 234 description: The certificate's serial number. 235 returned: success 236 type: int 237 sample: 1234 238version: 239 description: The certificate version. 240 returned: success 241 type: int 242 sample: 3 243valid_at: 244 description: For every time stamp provided in the I(valid_at) option, a 245 boolean whether the certificate is valid at that point in time 246 or not. 247 returned: success 248 type: dict 249subject_key_identifier: 250 description: 251 - The certificate's subject key identifier. 252 - The identifier is returned in hexadecimal, with C(:) used to separate bytes. 253 - Is C(none) if the C(SubjectKeyIdentifier) extension is not present. 254 returned: success and if the pyOpenSSL backend is I(not) used 255 type: str 256 sample: '00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33' 257 version_added: "2.9" 258authority_key_identifier: 259 description: 260 - The certificate's authority key identifier. 261 - The identifier is returned in hexadecimal, with C(:) used to separate bytes. 262 - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present. 263 returned: success and if the pyOpenSSL backend is I(not) used 264 type: str 265 sample: '00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33' 266 version_added: "2.9" 267authority_cert_issuer: 268 description: 269 - The certificate's authority cert issuer as a list of general names. 270 - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present. 271 returned: success and if the pyOpenSSL backend is I(not) used 272 type: list 273 elements: str 274 sample: "[DNS:www.ansible.com, IP:1.2.3.4]" 275 version_added: "2.9" 276authority_cert_serial_number: 277 description: 278 - The certificate's authority cert serial number. 279 - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present. 280 returned: success and if the pyOpenSSL backend is I(not) used 281 type: int 282 sample: '12345' 283 version_added: "2.9" 284ocsp_uri: 285 description: The OCSP responder URI, if included in the certificate. Will be 286 C(none) if no OCSP responder URI is included. 287 returned: success 288 type: str 289 version_added: "2.9" 290''' 291 292 293import abc 294import binascii 295import datetime 296import os 297import re 298import traceback 299from distutils.version import LooseVersion 300 301from ansible.module_utils import crypto as crypto_utils 302from ansible.module_utils.basic import AnsibleModule, missing_required_lib 303from ansible.module_utils.six import string_types 304from ansible.module_utils._text import to_native, to_text, to_bytes 305from ansible.module_utils.compat import ipaddress as compat_ipaddress 306 307MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' 308MINIMAL_PYOPENSSL_VERSION = '0.15' 309 310PYOPENSSL_IMP_ERR = None 311try: 312 import OpenSSL 313 from OpenSSL import crypto 314 PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) 315 if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: 316 # OpenSSL 1.1.0 or newer 317 OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" 318 OPENSSL_MUST_STAPLE_VALUE = b"status_request" 319 else: 320 # OpenSSL 1.0.x or older 321 OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" 322 OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" 323except ImportError: 324 PYOPENSSL_IMP_ERR = traceback.format_exc() 325 PYOPENSSL_FOUND = False 326else: 327 PYOPENSSL_FOUND = True 328 329CRYPTOGRAPHY_IMP_ERR = None 330try: 331 import cryptography 332 from cryptography import x509 333 from cryptography.hazmat.primitives import serialization 334 CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) 335except ImportError: 336 CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() 337 CRYPTOGRAPHY_FOUND = False 338else: 339 CRYPTOGRAPHY_FOUND = True 340 341 342TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" 343 344 345def get_relative_time_option(input_string, input_name): 346 """Return an ASN1 formatted string if a relative timespec 347 or an ASN1 formatted string is provided.""" 348 result = input_string 349 if result.startswith("+") or result.startswith("-"): 350 return crypto_utils.convert_relative_to_datetime(result) 351 if result is None: 352 raise crypto_utils.OpenSSLObjectError( 353 'The timespec "%s" for %s is not valid' % 354 input_string, input_name) 355 for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']: 356 try: 357 result = datetime.datetime.strptime(input_string, date_fmt) 358 break 359 except ValueError: 360 pass 361 362 if not isinstance(result, datetime.datetime): 363 raise crypto_utils.OpenSSLObjectError( 364 'The time spec "%s" for %s is invalid' % 365 (input_string, input_name) 366 ) 367 return result 368 369 370class CertificateInfo(crypto_utils.OpenSSLObject): 371 def __init__(self, module, backend): 372 super(CertificateInfo, self).__init__( 373 module.params['path'], 374 'present', 375 False, 376 module.check_mode, 377 ) 378 self.backend = backend 379 self.module = module 380 381 self.valid_at = module.params['valid_at'] 382 if self.valid_at: 383 for k, v in self.valid_at.items(): 384 if not isinstance(v, string_types): 385 self.module.fail_json( 386 msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v)) 387 ) 388 self.valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k)) 389 390 def generate(self): 391 # Empty method because crypto_utils.OpenSSLObject wants this 392 pass 393 394 def dump(self): 395 # Empty method because crypto_utils.OpenSSLObject wants this 396 pass 397 398 @abc.abstractmethod 399 def _get_signature_algorithm(self): 400 pass 401 402 @abc.abstractmethod 403 def _get_subject_ordered(self): 404 pass 405 406 @abc.abstractmethod 407 def _get_issuer_ordered(self): 408 pass 409 410 @abc.abstractmethod 411 def _get_version(self): 412 pass 413 414 @abc.abstractmethod 415 def _get_key_usage(self): 416 pass 417 418 @abc.abstractmethod 419 def _get_extended_key_usage(self): 420 pass 421 422 @abc.abstractmethod 423 def _get_basic_constraints(self): 424 pass 425 426 @abc.abstractmethod 427 def _get_ocsp_must_staple(self): 428 pass 429 430 @abc.abstractmethod 431 def _get_subject_alt_name(self): 432 pass 433 434 @abc.abstractmethod 435 def _get_not_before(self): 436 pass 437 438 @abc.abstractmethod 439 def _get_not_after(self): 440 pass 441 442 @abc.abstractmethod 443 def _get_public_key(self, binary): 444 pass 445 446 @abc.abstractmethod 447 def _get_subject_key_identifier(self): 448 pass 449 450 @abc.abstractmethod 451 def _get_authority_key_identifier(self): 452 pass 453 454 @abc.abstractmethod 455 def _get_serial_number(self): 456 pass 457 458 @abc.abstractmethod 459 def _get_all_extensions(self): 460 pass 461 462 @abc.abstractmethod 463 def _get_ocsp_uri(self): 464 pass 465 466 def get_info(self): 467 result = dict() 468 self.cert = crypto_utils.load_certificate(self.path, backend=self.backend) 469 470 result['signature_algorithm'] = self._get_signature_algorithm() 471 subject = self._get_subject_ordered() 472 issuer = self._get_issuer_ordered() 473 result['subject'] = dict() 474 for k, v in subject: 475 result['subject'][k] = v 476 result['subject_ordered'] = subject 477 result['issuer'] = dict() 478 for k, v in issuer: 479 result['issuer'][k] = v 480 result['issuer_ordered'] = issuer 481 result['version'] = self._get_version() 482 result['key_usage'], result['key_usage_critical'] = self._get_key_usage() 483 result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage() 484 result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints() 485 result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple() 486 result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name() 487 488 not_before = self._get_not_before() 489 not_after = self._get_not_after() 490 result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT) 491 result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT) 492 result['expired'] = not_after < datetime.datetime.utcnow() 493 494 result['valid_at'] = dict() 495 if self.valid_at: 496 for k, v in self.valid_at.items(): 497 result['valid_at'][k] = not_before <= v <= not_after 498 499 result['public_key'] = self._get_public_key(binary=False) 500 pk = self._get_public_key(binary=True) 501 result['public_key_fingerprints'] = crypto_utils.get_fingerprint_of_bytes(pk) if pk is not None else dict() 502 503 if self.backend != 'pyopenssl': 504 ski = self._get_subject_key_identifier() 505 if ski is not None: 506 ski = to_native(binascii.hexlify(ski)) 507 ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)]) 508 result['subject_key_identifier'] = ski 509 510 aki, aci, acsn = self._get_authority_key_identifier() 511 if aki is not None: 512 aki = to_native(binascii.hexlify(aki)) 513 aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)]) 514 result['authority_key_identifier'] = aki 515 result['authority_cert_issuer'] = aci 516 result['authority_cert_serial_number'] = acsn 517 518 result['serial_number'] = self._get_serial_number() 519 result['extensions_by_oid'] = self._get_all_extensions() 520 result['ocsp_uri'] = self._get_ocsp_uri() 521 522 return result 523 524 525class CertificateInfoCryptography(CertificateInfo): 526 """Validate the supplied cert, using the cryptography backend""" 527 def __init__(self, module): 528 super(CertificateInfoCryptography, self).__init__(module, 'cryptography') 529 530 def _get_signature_algorithm(self): 531 return crypto_utils.cryptography_oid_to_name(self.cert.signature_algorithm_oid) 532 533 def _get_subject_ordered(self): 534 result = [] 535 for attribute in self.cert.subject: 536 result.append([crypto_utils.cryptography_oid_to_name(attribute.oid), attribute.value]) 537 return result 538 539 def _get_issuer_ordered(self): 540 result = [] 541 for attribute in self.cert.issuer: 542 result.append([crypto_utils.cryptography_oid_to_name(attribute.oid), attribute.value]) 543 return result 544 545 def _get_version(self): 546 if self.cert.version == x509.Version.v1: 547 return 1 548 if self.cert.version == x509.Version.v3: 549 return 3 550 return "unknown" 551 552 def _get_key_usage(self): 553 try: 554 current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage) 555 current_key_usage = current_key_ext.value 556 key_usage = dict( 557 digital_signature=current_key_usage.digital_signature, 558 content_commitment=current_key_usage.content_commitment, 559 key_encipherment=current_key_usage.key_encipherment, 560 data_encipherment=current_key_usage.data_encipherment, 561 key_agreement=current_key_usage.key_agreement, 562 key_cert_sign=current_key_usage.key_cert_sign, 563 crl_sign=current_key_usage.crl_sign, 564 encipher_only=False, 565 decipher_only=False, 566 ) 567 if key_usage['key_agreement']: 568 key_usage.update(dict( 569 encipher_only=current_key_usage.encipher_only, 570 decipher_only=current_key_usage.decipher_only 571 )) 572 573 key_usage_names = dict( 574 digital_signature='Digital Signature', 575 content_commitment='Non Repudiation', 576 key_encipherment='Key Encipherment', 577 data_encipherment='Data Encipherment', 578 key_agreement='Key Agreement', 579 key_cert_sign='Certificate Sign', 580 crl_sign='CRL Sign', 581 encipher_only='Encipher Only', 582 decipher_only='Decipher Only', 583 ) 584 return sorted([ 585 key_usage_names[name] for name, value in key_usage.items() if value 586 ]), current_key_ext.critical 587 except cryptography.x509.ExtensionNotFound: 588 return None, False 589 590 def _get_extended_key_usage(self): 591 try: 592 ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage) 593 return sorted([ 594 crypto_utils.cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value 595 ]), ext_keyusage_ext.critical 596 except cryptography.x509.ExtensionNotFound: 597 return None, False 598 599 def _get_basic_constraints(self): 600 try: 601 ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints) 602 result = [] 603 result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE')) 604 if ext_keyusage_ext.value.path_length is not None: 605 result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length)) 606 return sorted(result), ext_keyusage_ext.critical 607 except cryptography.x509.ExtensionNotFound: 608 return None, False 609 610 def _get_ocsp_must_staple(self): 611 try: 612 try: 613 # This only works with cryptography >= 2.1 614 tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature) 615 value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value 616 except AttributeError as dummy: 617 # Fallback for cryptography < 2.1 618 oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") 619 tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid) 620 value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05" 621 return value, tlsfeature_ext.critical 622 except cryptography.x509.ExtensionNotFound: 623 return None, False 624 625 def _get_subject_alt_name(self): 626 try: 627 san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) 628 result = [crypto_utils.cryptography_decode_name(san) for san in san_ext.value] 629 return result, san_ext.critical 630 except cryptography.x509.ExtensionNotFound: 631 return None, False 632 633 def _get_not_before(self): 634 return self.cert.not_valid_before 635 636 def _get_not_after(self): 637 return self.cert.not_valid_after 638 639 def _get_public_key(self, binary): 640 return self.cert.public_key().public_bytes( 641 serialization.Encoding.DER if binary else serialization.Encoding.PEM, 642 serialization.PublicFormat.SubjectPublicKeyInfo 643 ) 644 645 def _get_subject_key_identifier(self): 646 try: 647 ext = self.cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier) 648 return ext.value.digest 649 except cryptography.x509.ExtensionNotFound: 650 return None 651 652 def _get_authority_key_identifier(self): 653 try: 654 ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier) 655 issuer = None 656 if ext.value.authority_cert_issuer is not None: 657 issuer = [crypto_utils.cryptography_decode_name(san) for san in ext.value.authority_cert_issuer] 658 return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number 659 except cryptography.x509.ExtensionNotFound: 660 return None, None, None 661 662 def _get_serial_number(self): 663 return self.cert.serial_number 664 665 def _get_all_extensions(self): 666 return crypto_utils.cryptography_get_extensions_from_cert(self.cert) 667 668 def _get_ocsp_uri(self): 669 try: 670 ext = self.cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess) 671 for desc in ext.value: 672 if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP: 673 if isinstance(desc.access_location, x509.UniformResourceIdentifier): 674 return desc.access_location.value 675 except x509.ExtensionNotFound as dummy: 676 pass 677 return None 678 679 680class CertificateInfoPyOpenSSL(CertificateInfo): 681 """validate the supplied certificate.""" 682 683 def __init__(self, module): 684 super(CertificateInfoPyOpenSSL, self).__init__(module, 'pyopenssl') 685 686 def _get_signature_algorithm(self): 687 return to_text(self.cert.get_signature_algorithm()) 688 689 def __get_name(self, name): 690 result = [] 691 for sub in name.get_components(): 692 result.append([crypto_utils.pyopenssl_normalize_name(sub[0]), to_text(sub[1])]) 693 return result 694 695 def _get_subject_ordered(self): 696 return self.__get_name(self.cert.get_subject()) 697 698 def _get_issuer_ordered(self): 699 return self.__get_name(self.cert.get_issuer()) 700 701 def _get_version(self): 702 # Version numbers in certs are off by one: 703 # v1: 0, v2: 1, v3: 2 ... 704 return self.cert.get_version() + 1 705 706 def _get_extension(self, short_name): 707 for extension_idx in range(0, self.cert.get_extension_count()): 708 extension = self.cert.get_extension(extension_idx) 709 if extension.get_short_name() == short_name: 710 result = [ 711 crypto_utils.pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',') 712 ] 713 return sorted(result), bool(extension.get_critical()) 714 return None, False 715 716 def _get_key_usage(self): 717 return self._get_extension(b'keyUsage') 718 719 def _get_extended_key_usage(self): 720 return self._get_extension(b'extendedKeyUsage') 721 722 def _get_basic_constraints(self): 723 return self._get_extension(b'basicConstraints') 724 725 def _get_ocsp_must_staple(self): 726 extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())] 727 oms_ext = [ 728 ext for ext in extensions 729 if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE 730 ] 731 if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: 732 # Older versions of libssl don't know about OCSP Must Staple 733 oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) 734 if oms_ext: 735 return True, bool(oms_ext[0].get_critical()) 736 else: 737 return None, False 738 739 def _normalize_san(self, san): 740 if san.startswith('IP Address:'): 741 san = 'IP:' + san[len('IP Address:'):] 742 if san.startswith('IP:'): 743 ip = compat_ipaddress.ip_address(san[3:]) 744 san = 'IP:{0}'.format(ip.compressed) 745 return san 746 747 def _get_subject_alt_name(self): 748 for extension_idx in range(0, self.cert.get_extension_count()): 749 extension = self.cert.get_extension(extension_idx) 750 if extension.get_short_name() == b'subjectAltName': 751 result = [self._normalize_san(altname.strip()) for altname in 752 to_text(extension, errors='surrogate_or_strict').split(', ')] 753 return result, bool(extension.get_critical()) 754 return None, False 755 756 def _get_not_before(self): 757 time_string = to_native(self.cert.get_notBefore()) 758 return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 759 760 def _get_not_after(self): 761 time_string = to_native(self.cert.get_notAfter()) 762 return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 763 764 def _get_public_key(self, binary): 765 try: 766 return crypto.dump_publickey( 767 crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM, 768 self.cert.get_pubkey() 769 ) 770 except AttributeError: 771 try: 772 # pyOpenSSL < 16.0: 773 bio = crypto._new_mem_buf() 774 if binary: 775 rc = crypto._lib.i2d_PUBKEY_bio(bio, self.cert.get_pubkey()._pkey) 776 else: 777 rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey) 778 if rc != 1: 779 crypto._raise_current_error() 780 return crypto._bio_to_string(bio) 781 except AttributeError: 782 self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' 783 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') 784 785 def _get_subject_key_identifier(self): 786 # Won't be implemented 787 return None 788 789 def _get_authority_key_identifier(self): 790 # Won't be implemented 791 return None, None, None 792 793 def _get_serial_number(self): 794 return self.cert.get_serial_number() 795 796 def _get_all_extensions(self): 797 return crypto_utils.pyopenssl_get_extensions_from_cert(self.cert) 798 799 def _get_ocsp_uri(self): 800 for i in range(self.cert.get_extension_count()): 801 ext = self.cert.get_extension(i) 802 if ext.get_short_name() == b'authorityInfoAccess': 803 v = str(ext) 804 m = re.search('^OCSP - URI:(.*)$', v, flags=re.MULTILINE) 805 if m: 806 return m.group(1) 807 return None 808 809 810def main(): 811 module = AnsibleModule( 812 argument_spec=dict( 813 path=dict(type='path', required=True), 814 valid_at=dict(type='dict'), 815 select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), 816 ), 817 supports_check_mode=True, 818 ) 819 820 try: 821 base_dir = os.path.dirname(module.params['path']) or '.' 822 if not os.path.isdir(base_dir): 823 module.fail_json( 824 name=base_dir, 825 msg='The directory %s does not exist or the file is not a directory' % base_dir 826 ) 827 828 backend = module.params['select_crypto_backend'] 829 if backend == 'auto': 830 # Detect what backend we can use 831 can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) 832 can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) 833 834 # If cryptography is available we'll use it 835 if can_use_cryptography: 836 backend = 'cryptography' 837 elif can_use_pyopenssl: 838 backend = 'pyopenssl' 839 840 # Fail if no backend has been found 841 if backend == 'auto': 842 module.fail_json(msg=("Can't detect any of the required Python libraries " 843 "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( 844 MINIMAL_CRYPTOGRAPHY_VERSION, 845 MINIMAL_PYOPENSSL_VERSION)) 846 847 if backend == 'pyopenssl': 848 if not PYOPENSSL_FOUND: 849 module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), 850 exception=PYOPENSSL_IMP_ERR) 851 try: 852 getattr(crypto.X509Req, 'get_extensions') 853 except AttributeError: 854 module.fail_json(msg='You need to have PyOpenSSL>=0.15') 855 856 module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', version='2.13') 857 certificate = CertificateInfoPyOpenSSL(module) 858 elif backend == 'cryptography': 859 if not CRYPTOGRAPHY_FOUND: 860 module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), 861 exception=CRYPTOGRAPHY_IMP_ERR) 862 certificate = CertificateInfoCryptography(module) 863 864 result = certificate.get_info() 865 module.exit_json(**result) 866 except crypto_utils.OpenSSLObjectError as exc: 867 module.fail_json(msg=to_native(exc)) 868 869 870if __name__ == "__main__": 871 main() 872