1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3
4# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
5# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
6# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
7
8from __future__ import absolute_import, division, print_function
9__metaclass__ = type
10
11ANSIBLE_METADATA = {'metadata_version': '1.1',
12                    'status': ['preview'],
13                    'supported_by': 'community'}
14
15DOCUMENTATION = r'''
16---
17module: openssl_certificate_info
18version_added: '2.8'
19short_description: Provide information of OpenSSL X.509 certificates
20description:
21    - This module allows one to query information on OpenSSL certificates.
22    - It uses the pyOpenSSL or cryptography python library to interact with OpenSSL. If both the
23      cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements)
24      cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with
25      C(select_crypto_backend)). Please note that the PyOpenSSL backend was deprecated in Ansible 2.9
26      and will be removed in Ansible 2.13.
27requirements:
28    - PyOpenSSL >= 0.15 or cryptography >= 1.6
29author:
30  - Felix Fontein (@felixfontein)
31  - Yanis Guenane (@Spredzy)
32  - Markus Teufelberger (@MarkusTeufelberger)
33options:
34    path:
35        description:
36            - Remote absolute path where the certificate file is loaded from.
37        type: path
38        required: true
39    valid_at:
40        description:
41            - A dict of names mapping to time specifications. Every time specified here
42              will be checked whether the certificate is valid at this point. See the
43              C(valid_at) return value for informations on the result.
44            - Time can be specified either as relative time or as absolute timestamp.
45            - Time will always be interpreted as UTC.
46            - Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer
47              + C([w | d | h | m | s]) (e.g. C(+32w1d2h), and ASN.1 TIME (i.e. pattern C(YYYYMMDDHHMMSSZ)).
48              Note that all timestamps will be treated as being in UTC.
49        type: dict
50    select_crypto_backend:
51        description:
52            - Determines which crypto backend to use.
53            - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
54            - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
55            - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
56            - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in Ansible 2.13.
57              From that point on, only the C(cryptography) backend will be available.
58        type: str
59        default: auto
60        choices: [ auto, cryptography, pyopenssl ]
61
62notes:
63    - All timestamp values are provided in ASN.1 TIME format, i.e. following the C(YYYYMMDDHHMMSSZ) pattern.
64      They are all in UTC.
65seealso:
66- module: openssl_certificate
67'''
68
69EXAMPLES = r'''
70- name: Generate a Self Signed OpenSSL certificate
71  openssl_certificate:
72    path: /etc/ssl/crt/ansible.com.crt
73    privatekey_path: /etc/ssl/private/ansible.com.pem
74    csr_path: /etc/ssl/csr/ansible.com.csr
75    provider: selfsigned
76
77
78# Get information on the certificate
79
80- name: Get information on generated certificate
81  openssl_certificate_info:
82    path: /etc/ssl/crt/ansible.com.crt
83  register: result
84
85- name: Dump information
86  debug:
87    var: result
88
89
90# Check whether the certificate is valid or not valid at certain times, fail
91# if this is not the case. The first task (openssl_certificate_info) collects
92# the information, and the second task (assert) validates the result and
93# makes the playbook fail in case something is not as expected.
94
95- name: Test whether that certificate is valid tomorrow and/or in three weeks
96  openssl_certificate_info:
97    path: /etc/ssl/crt/ansible.com.crt
98    valid_at:
99      point_1: "+1d"
100      point_2: "+3w"
101  register: result
102
103- name: Validate that certificate is valid tomorrow, but not in three weeks
104  assert:
105    that:
106      - result.valid_at.point_1      # valid in one day
107      - not result.valid_at.point_2  # not valid in three weeks
108'''
109
110RETURN = r'''
111expired:
112    description: Whether the certificate is expired (i.e. C(notAfter) is in the past)
113    returned: success
114    type: bool
115basic_constraints:
116    description: Entries in the C(basic_constraints) extension, or C(none) if extension is not present.
117    returned: success
118    type: list
119    elements: str
120    sample: "[CA:TRUE, pathlen:1]"
121basic_constraints_critical:
122    description: Whether the C(basic_constraints) extension is critical.
123    returned: success
124    type: bool
125extended_key_usage:
126    description: Entries in the C(extended_key_usage) extension, or C(none) if extension is not present.
127    returned: success
128    type: list
129    elements: str
130    sample: "[Biometric Info, DVCS, Time Stamping]"
131extended_key_usage_critical:
132    description: Whether the C(extended_key_usage) extension is critical.
133    returned: success
134    type: bool
135extensions_by_oid:
136    description: Returns a dictionary for every extension OID
137    returned: success
138    type: dict
139    contains:
140        critical:
141            description: Whether the extension is critical.
142            returned: success
143            type: bool
144        value:
145            description: The Base64 encoded value (in DER format) of the extension
146            returned: success
147            type: str
148            sample: "MAMCAQU="
149    sample: '{"1.3.6.1.5.5.7.1.24": { "critical": false, "value": "MAMCAQU="}}'
150key_usage:
151    description: Entries in the C(key_usage) extension, or C(none) if extension is not present.
152    returned: success
153    type: str
154    sample: "[Key Agreement, Data Encipherment]"
155key_usage_critical:
156    description: Whether the C(key_usage) extension is critical.
157    returned: success
158    type: bool
159subject_alt_name:
160    description: Entries in the C(subject_alt_name) extension, or C(none) if extension is not present.
161    returned: success
162    type: list
163    elements: str
164    sample: "[DNS:www.ansible.com, IP:1.2.3.4]"
165subject_alt_name_critical:
166    description: Whether the C(subject_alt_name) extension is critical.
167    returned: success
168    type: bool
169ocsp_must_staple:
170    description: C(yes) if the OCSP Must Staple extension is present, C(none) otherwise.
171    returned: success
172    type: bool
173ocsp_must_staple_critical:
174    description: Whether the C(ocsp_must_staple) extension is critical.
175    returned: success
176    type: bool
177issuer:
178    description:
179        - The certificate's issuer.
180        - Note that for repeated values, only the last one will be returned.
181    returned: success
182    type: dict
183    sample: '{"organizationName": "Ansible", "commonName": "ca.example.com"}'
184issuer_ordered:
185    description: The certificate's issuer as an ordered list of tuples.
186    returned: success
187    type: list
188    elements: list
189    sample: '[["organizationName", "Ansible"], ["commonName": "ca.example.com"]]'
190    version_added: "2.9"
191subject:
192    description:
193        - The certificate's subject as a dictionary.
194        - Note that for repeated values, only the last one will be returned.
195    returned: success
196    type: dict
197    sample: '{"commonName": "www.example.com", "emailAddress": "test@example.com"}'
198subject_ordered:
199    description: The certificate's subject as an ordered list of tuples.
200    returned: success
201    type: list
202    elements: list
203    sample: '[["commonName", "www.example.com"], ["emailAddress": "test@example.com"]]'
204    version_added: "2.9"
205not_after:
206    description: C(notAfter) date as ASN.1 TIME
207    returned: success
208    type: str
209    sample: 20190413202428Z
210not_before:
211    description: C(notBefore) date as ASN.1 TIME
212    returned: success
213    type: str
214    sample: 20190331202428Z
215public_key:
216    description: Certificate's public key in PEM format
217    returned: success
218    type: str
219    sample: "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A..."
220public_key_fingerprints:
221    description:
222        - Fingerprints of certificate's public key.
223        - For every hash algorithm available, the fingerprint is computed.
224    returned: success
225    type: dict
226    sample: "{'sha256': 'd4:b3:aa:6d:c8:04:ce:4e:ba:f6:29:4d:92:a3:94:b0:c2:ff:bd:bf:33:63:11:43:34:0f:51:b0:95:09:2f:63',
227              'sha512': 'f7:07:4a:f0:b0:f0:e6:8b:95:5f:f9:e6:61:0a:32:68:f1..."
228signature_algorithm:
229    description: The signature algorithm used to sign the certificate.
230    returned: success
231    type: str
232    sample: sha256WithRSAEncryption
233serial_number:
234    description: The certificate's serial number.
235    returned: success
236    type: int
237    sample: 1234
238version:
239    description: The certificate version.
240    returned: success
241    type: int
242    sample: 3
243valid_at:
244    description: For every time stamp provided in the I(valid_at) option, a
245                 boolean whether the certificate is valid at that point in time
246                 or not.
247    returned: success
248    type: dict
249subject_key_identifier:
250    description:
251        - The certificate's subject key identifier.
252        - The identifier is returned in hexadecimal, with C(:) used to separate bytes.
253        - Is C(none) if the C(SubjectKeyIdentifier) extension is not present.
254    returned: success and if the pyOpenSSL backend is I(not) used
255    type: str
256    sample: '00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33'
257    version_added: "2.9"
258authority_key_identifier:
259    description:
260        - The certificate's authority key identifier.
261        - The identifier is returned in hexadecimal, with C(:) used to separate bytes.
262        - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present.
263    returned: success and if the pyOpenSSL backend is I(not) used
264    type: str
265    sample: '00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33'
266    version_added: "2.9"
267authority_cert_issuer:
268    description:
269        - The certificate's authority cert issuer as a list of general names.
270        - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present.
271    returned: success and if the pyOpenSSL backend is I(not) used
272    type: list
273    elements: str
274    sample: "[DNS:www.ansible.com, IP:1.2.3.4]"
275    version_added: "2.9"
276authority_cert_serial_number:
277    description:
278        - The certificate's authority cert serial number.
279        - Is C(none) if the C(AuthorityKeyIdentifier) extension is not present.
280    returned: success and if the pyOpenSSL backend is I(not) used
281    type: int
282    sample: '12345'
283    version_added: "2.9"
284ocsp_uri:
285    description: The OCSP responder URI, if included in the certificate. Will be
286                 C(none) if no OCSP responder URI is included.
287    returned: success
288    type: str
289    version_added: "2.9"
290'''
291
292
293import abc
294import binascii
295import datetime
296import os
297import re
298import traceback
299from distutils.version import LooseVersion
300
301from ansible.module_utils import crypto as crypto_utils
302from ansible.module_utils.basic import AnsibleModule, missing_required_lib
303from ansible.module_utils.six import string_types
304from ansible.module_utils._text import to_native, to_text, to_bytes
305from ansible.module_utils.compat import ipaddress as compat_ipaddress
306
307MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
308MINIMAL_PYOPENSSL_VERSION = '0.15'
309
310PYOPENSSL_IMP_ERR = None
311try:
312    import OpenSSL
313    from OpenSSL import crypto
314    PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
315    if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
316        # OpenSSL 1.1.0 or newer
317        OPENSSL_MUST_STAPLE_NAME = b"tlsfeature"
318        OPENSSL_MUST_STAPLE_VALUE = b"status_request"
319    else:
320        # OpenSSL 1.0.x or older
321        OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24"
322        OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05"
323except ImportError:
324    PYOPENSSL_IMP_ERR = traceback.format_exc()
325    PYOPENSSL_FOUND = False
326else:
327    PYOPENSSL_FOUND = True
328
329CRYPTOGRAPHY_IMP_ERR = None
330try:
331    import cryptography
332    from cryptography import x509
333    from cryptography.hazmat.primitives import serialization
334    CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
335except ImportError:
336    CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
337    CRYPTOGRAPHY_FOUND = False
338else:
339    CRYPTOGRAPHY_FOUND = True
340
341
342TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
343
344
345def get_relative_time_option(input_string, input_name):
346    """Return an ASN1 formatted string if a relative timespec
347       or an ASN1 formatted string is provided."""
348    result = input_string
349    if result.startswith("+") or result.startswith("-"):
350        return crypto_utils.convert_relative_to_datetime(result)
351    if result is None:
352        raise crypto_utils.OpenSSLObjectError(
353            'The timespec "%s" for %s is not valid' %
354            input_string, input_name)
355    for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
356        try:
357            result = datetime.datetime.strptime(input_string, date_fmt)
358            break
359        except ValueError:
360            pass
361
362    if not isinstance(result, datetime.datetime):
363        raise crypto_utils.OpenSSLObjectError(
364            'The time spec "%s" for %s is invalid' %
365            (input_string, input_name)
366        )
367    return result
368
369
370class CertificateInfo(crypto_utils.OpenSSLObject):
371    def __init__(self, module, backend):
372        super(CertificateInfo, self).__init__(
373            module.params['path'],
374            'present',
375            False,
376            module.check_mode,
377        )
378        self.backend = backend
379        self.module = module
380
381        self.valid_at = module.params['valid_at']
382        if self.valid_at:
383            for k, v in self.valid_at.items():
384                if not isinstance(v, string_types):
385                    self.module.fail_json(
386                        msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v))
387                    )
388                self.valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k))
389
390    def generate(self):
391        # Empty method because crypto_utils.OpenSSLObject wants this
392        pass
393
394    def dump(self):
395        # Empty method because crypto_utils.OpenSSLObject wants this
396        pass
397
398    @abc.abstractmethod
399    def _get_signature_algorithm(self):
400        pass
401
402    @abc.abstractmethod
403    def _get_subject_ordered(self):
404        pass
405
406    @abc.abstractmethod
407    def _get_issuer_ordered(self):
408        pass
409
410    @abc.abstractmethod
411    def _get_version(self):
412        pass
413
414    @abc.abstractmethod
415    def _get_key_usage(self):
416        pass
417
418    @abc.abstractmethod
419    def _get_extended_key_usage(self):
420        pass
421
422    @abc.abstractmethod
423    def _get_basic_constraints(self):
424        pass
425
426    @abc.abstractmethod
427    def _get_ocsp_must_staple(self):
428        pass
429
430    @abc.abstractmethod
431    def _get_subject_alt_name(self):
432        pass
433
434    @abc.abstractmethod
435    def _get_not_before(self):
436        pass
437
438    @abc.abstractmethod
439    def _get_not_after(self):
440        pass
441
442    @abc.abstractmethod
443    def _get_public_key(self, binary):
444        pass
445
446    @abc.abstractmethod
447    def _get_subject_key_identifier(self):
448        pass
449
450    @abc.abstractmethod
451    def _get_authority_key_identifier(self):
452        pass
453
454    @abc.abstractmethod
455    def _get_serial_number(self):
456        pass
457
458    @abc.abstractmethod
459    def _get_all_extensions(self):
460        pass
461
462    @abc.abstractmethod
463    def _get_ocsp_uri(self):
464        pass
465
466    def get_info(self):
467        result = dict()
468        self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
469
470        result['signature_algorithm'] = self._get_signature_algorithm()
471        subject = self._get_subject_ordered()
472        issuer = self._get_issuer_ordered()
473        result['subject'] = dict()
474        for k, v in subject:
475            result['subject'][k] = v
476        result['subject_ordered'] = subject
477        result['issuer'] = dict()
478        for k, v in issuer:
479            result['issuer'][k] = v
480        result['issuer_ordered'] = issuer
481        result['version'] = self._get_version()
482        result['key_usage'], result['key_usage_critical'] = self._get_key_usage()
483        result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage()
484        result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints()
485        result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple()
486        result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name()
487
488        not_before = self._get_not_before()
489        not_after = self._get_not_after()
490        result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT)
491        result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT)
492        result['expired'] = not_after < datetime.datetime.utcnow()
493
494        result['valid_at'] = dict()
495        if self.valid_at:
496            for k, v in self.valid_at.items():
497                result['valid_at'][k] = not_before <= v <= not_after
498
499        result['public_key'] = self._get_public_key(binary=False)
500        pk = self._get_public_key(binary=True)
501        result['public_key_fingerprints'] = crypto_utils.get_fingerprint_of_bytes(pk) if pk is not None else dict()
502
503        if self.backend != 'pyopenssl':
504            ski = self._get_subject_key_identifier()
505            if ski is not None:
506                ski = to_native(binascii.hexlify(ski))
507                ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)])
508            result['subject_key_identifier'] = ski
509
510            aki, aci, acsn = self._get_authority_key_identifier()
511            if aki is not None:
512                aki = to_native(binascii.hexlify(aki))
513                aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)])
514            result['authority_key_identifier'] = aki
515            result['authority_cert_issuer'] = aci
516            result['authority_cert_serial_number'] = acsn
517
518        result['serial_number'] = self._get_serial_number()
519        result['extensions_by_oid'] = self._get_all_extensions()
520        result['ocsp_uri'] = self._get_ocsp_uri()
521
522        return result
523
524
525class CertificateInfoCryptography(CertificateInfo):
526    """Validate the supplied cert, using the cryptography backend"""
527    def __init__(self, module):
528        super(CertificateInfoCryptography, self).__init__(module, 'cryptography')
529
530    def _get_signature_algorithm(self):
531        return crypto_utils.cryptography_oid_to_name(self.cert.signature_algorithm_oid)
532
533    def _get_subject_ordered(self):
534        result = []
535        for attribute in self.cert.subject:
536            result.append([crypto_utils.cryptography_oid_to_name(attribute.oid), attribute.value])
537        return result
538
539    def _get_issuer_ordered(self):
540        result = []
541        for attribute in self.cert.issuer:
542            result.append([crypto_utils.cryptography_oid_to_name(attribute.oid), attribute.value])
543        return result
544
545    def _get_version(self):
546        if self.cert.version == x509.Version.v1:
547            return 1
548        if self.cert.version == x509.Version.v3:
549            return 3
550        return "unknown"
551
552    def _get_key_usage(self):
553        try:
554            current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage)
555            current_key_usage = current_key_ext.value
556            key_usage = dict(
557                digital_signature=current_key_usage.digital_signature,
558                content_commitment=current_key_usage.content_commitment,
559                key_encipherment=current_key_usage.key_encipherment,
560                data_encipherment=current_key_usage.data_encipherment,
561                key_agreement=current_key_usage.key_agreement,
562                key_cert_sign=current_key_usage.key_cert_sign,
563                crl_sign=current_key_usage.crl_sign,
564                encipher_only=False,
565                decipher_only=False,
566            )
567            if key_usage['key_agreement']:
568                key_usage.update(dict(
569                    encipher_only=current_key_usage.encipher_only,
570                    decipher_only=current_key_usage.decipher_only
571                ))
572
573            key_usage_names = dict(
574                digital_signature='Digital Signature',
575                content_commitment='Non Repudiation',
576                key_encipherment='Key Encipherment',
577                data_encipherment='Data Encipherment',
578                key_agreement='Key Agreement',
579                key_cert_sign='Certificate Sign',
580                crl_sign='CRL Sign',
581                encipher_only='Encipher Only',
582                decipher_only='Decipher Only',
583            )
584            return sorted([
585                key_usage_names[name] for name, value in key_usage.items() if value
586            ]), current_key_ext.critical
587        except cryptography.x509.ExtensionNotFound:
588            return None, False
589
590    def _get_extended_key_usage(self):
591        try:
592            ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
593            return sorted([
594                crypto_utils.cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value
595            ]), ext_keyusage_ext.critical
596        except cryptography.x509.ExtensionNotFound:
597            return None, False
598
599    def _get_basic_constraints(self):
600        try:
601            ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints)
602            result = []
603            result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE'))
604            if ext_keyusage_ext.value.path_length is not None:
605                result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length))
606            return sorted(result), ext_keyusage_ext.critical
607        except cryptography.x509.ExtensionNotFound:
608            return None, False
609
610    def _get_ocsp_must_staple(self):
611        try:
612            try:
613                # This only works with cryptography >= 2.1
614                tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature)
615                value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value
616            except AttributeError as dummy:
617                # Fallback for cryptography < 2.1
618                oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
619                tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid)
620                value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05"
621            return value, tlsfeature_ext.critical
622        except cryptography.x509.ExtensionNotFound:
623            return None, False
624
625    def _get_subject_alt_name(self):
626        try:
627            san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
628            result = [crypto_utils.cryptography_decode_name(san) for san in san_ext.value]
629            return result, san_ext.critical
630        except cryptography.x509.ExtensionNotFound:
631            return None, False
632
633    def _get_not_before(self):
634        return self.cert.not_valid_before
635
636    def _get_not_after(self):
637        return self.cert.not_valid_after
638
639    def _get_public_key(self, binary):
640        return self.cert.public_key().public_bytes(
641            serialization.Encoding.DER if binary else serialization.Encoding.PEM,
642            serialization.PublicFormat.SubjectPublicKeyInfo
643        )
644
645    def _get_subject_key_identifier(self):
646        try:
647            ext = self.cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
648            return ext.value.digest
649        except cryptography.x509.ExtensionNotFound:
650            return None
651
652    def _get_authority_key_identifier(self):
653        try:
654            ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
655            issuer = None
656            if ext.value.authority_cert_issuer is not None:
657                issuer = [crypto_utils.cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
658            return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
659        except cryptography.x509.ExtensionNotFound:
660            return None, None, None
661
662    def _get_serial_number(self):
663        return self.cert.serial_number
664
665    def _get_all_extensions(self):
666        return crypto_utils.cryptography_get_extensions_from_cert(self.cert)
667
668    def _get_ocsp_uri(self):
669        try:
670            ext = self.cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
671            for desc in ext.value:
672                if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP:
673                    if isinstance(desc.access_location, x509.UniformResourceIdentifier):
674                        return desc.access_location.value
675        except x509.ExtensionNotFound as dummy:
676            pass
677        return None
678
679
680class CertificateInfoPyOpenSSL(CertificateInfo):
681    """validate the supplied certificate."""
682
683    def __init__(self, module):
684        super(CertificateInfoPyOpenSSL, self).__init__(module, 'pyopenssl')
685
686    def _get_signature_algorithm(self):
687        return to_text(self.cert.get_signature_algorithm())
688
689    def __get_name(self, name):
690        result = []
691        for sub in name.get_components():
692            result.append([crypto_utils.pyopenssl_normalize_name(sub[0]), to_text(sub[1])])
693        return result
694
695    def _get_subject_ordered(self):
696        return self.__get_name(self.cert.get_subject())
697
698    def _get_issuer_ordered(self):
699        return self.__get_name(self.cert.get_issuer())
700
701    def _get_version(self):
702        # Version numbers in certs are off by one:
703        # v1: 0, v2: 1, v3: 2 ...
704        return self.cert.get_version() + 1
705
706    def _get_extension(self, short_name):
707        for extension_idx in range(0, self.cert.get_extension_count()):
708            extension = self.cert.get_extension(extension_idx)
709            if extension.get_short_name() == short_name:
710                result = [
711                    crypto_utils.pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',')
712                ]
713                return sorted(result), bool(extension.get_critical())
714        return None, False
715
716    def _get_key_usage(self):
717        return self._get_extension(b'keyUsage')
718
719    def _get_extended_key_usage(self):
720        return self._get_extension(b'extendedKeyUsage')
721
722    def _get_basic_constraints(self):
723        return self._get_extension(b'basicConstraints')
724
725    def _get_ocsp_must_staple(self):
726        extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())]
727        oms_ext = [
728            ext for ext in extensions
729            if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE
730        ]
731        if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
732            # Older versions of libssl don't know about OCSP Must Staple
733            oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05'])
734        if oms_ext:
735            return True, bool(oms_ext[0].get_critical())
736        else:
737            return None, False
738
739    def _normalize_san(self, san):
740        if san.startswith('IP Address:'):
741            san = 'IP:' + san[len('IP Address:'):]
742        if san.startswith('IP:'):
743            ip = compat_ipaddress.ip_address(san[3:])
744            san = 'IP:{0}'.format(ip.compressed)
745        return san
746
747    def _get_subject_alt_name(self):
748        for extension_idx in range(0, self.cert.get_extension_count()):
749            extension = self.cert.get_extension(extension_idx)
750            if extension.get_short_name() == b'subjectAltName':
751                result = [self._normalize_san(altname.strip()) for altname in
752                          to_text(extension, errors='surrogate_or_strict').split(', ')]
753                return result, bool(extension.get_critical())
754        return None, False
755
756    def _get_not_before(self):
757        time_string = to_native(self.cert.get_notBefore())
758        return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
759
760    def _get_not_after(self):
761        time_string = to_native(self.cert.get_notAfter())
762        return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
763
764    def _get_public_key(self, binary):
765        try:
766            return crypto.dump_publickey(
767                crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM,
768                self.cert.get_pubkey()
769            )
770        except AttributeError:
771            try:
772                # pyOpenSSL < 16.0:
773                bio = crypto._new_mem_buf()
774                if binary:
775                    rc = crypto._lib.i2d_PUBKEY_bio(bio, self.cert.get_pubkey()._pkey)
776                else:
777                    rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey)
778                if rc != 1:
779                    crypto._raise_current_error()
780                return crypto._bio_to_string(bio)
781            except AttributeError:
782                self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
783                                 'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
784
785    def _get_subject_key_identifier(self):
786        # Won't be implemented
787        return None
788
789    def _get_authority_key_identifier(self):
790        # Won't be implemented
791        return None, None, None
792
793    def _get_serial_number(self):
794        return self.cert.get_serial_number()
795
796    def _get_all_extensions(self):
797        return crypto_utils.pyopenssl_get_extensions_from_cert(self.cert)
798
799    def _get_ocsp_uri(self):
800        for i in range(self.cert.get_extension_count()):
801            ext = self.cert.get_extension(i)
802            if ext.get_short_name() == b'authorityInfoAccess':
803                v = str(ext)
804                m = re.search('^OCSP - URI:(.*)$', v, flags=re.MULTILINE)
805                if m:
806                    return m.group(1)
807        return None
808
809
810def main():
811    module = AnsibleModule(
812        argument_spec=dict(
813            path=dict(type='path', required=True),
814            valid_at=dict(type='dict'),
815            select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
816        ),
817        supports_check_mode=True,
818    )
819
820    try:
821        base_dir = os.path.dirname(module.params['path']) or '.'
822        if not os.path.isdir(base_dir):
823            module.fail_json(
824                name=base_dir,
825                msg='The directory %s does not exist or the file is not a directory' % base_dir
826            )
827
828        backend = module.params['select_crypto_backend']
829        if backend == 'auto':
830            # Detect what backend we can use
831            can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
832            can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
833
834            # If cryptography is available we'll use it
835            if can_use_cryptography:
836                backend = 'cryptography'
837            elif can_use_pyopenssl:
838                backend = 'pyopenssl'
839
840            # Fail if no backend has been found
841            if backend == 'auto':
842                module.fail_json(msg=("Can't detect any of the required Python libraries "
843                                      "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
844                                          MINIMAL_CRYPTOGRAPHY_VERSION,
845                                          MINIMAL_PYOPENSSL_VERSION))
846
847        if backend == 'pyopenssl':
848            if not PYOPENSSL_FOUND:
849                module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
850                                 exception=PYOPENSSL_IMP_ERR)
851            try:
852                getattr(crypto.X509Req, 'get_extensions')
853            except AttributeError:
854                module.fail_json(msg='You need to have PyOpenSSL>=0.15')
855
856            module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', version='2.13')
857            certificate = CertificateInfoPyOpenSSL(module)
858        elif backend == 'cryptography':
859            if not CRYPTOGRAPHY_FOUND:
860                module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
861                                 exception=CRYPTOGRAPHY_IMP_ERR)
862            certificate = CertificateInfoCryptography(module)
863
864        result = certificate.get_info()
865        module.exit_json(**result)
866    except crypto_utils.OpenSSLObjectError as exc:
867        module.fail_json(msg=to_native(exc))
868
869
870if __name__ == "__main__":
871    main()
872