1# -*- coding: utf-8 -*-
2#
3# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
4# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
5# Copyright: (c) 2020, Felix Fontein <felix@fontein.de>
6# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
7
8from __future__ import absolute_import, division, print_function
9__metaclass__ = type
10
11
12import abc
13import binascii
14import datetime
15import re
16import traceback
17
18from distutils.version import LooseVersion
19
20from ansible.module_utils import six
21from ansible.module_utils.basic import missing_required_lib
22from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes
23
24from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
25    load_certificate,
26    get_fingerprint_of_bytes,
27)
28
29from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
30    cryptography_decode_name,
31    cryptography_get_extensions_from_cert,
32    cryptography_oid_to_name,
33    cryptography_serial_number_of_cert,
34)
35
36from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
37    pyopenssl_get_extensions_from_cert,
38    pyopenssl_normalize_name,
39    pyopenssl_normalize_name_attribute,
40)
41
42from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import (
43    get_publickey_info,
44)
45
46MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
47MINIMAL_PYOPENSSL_VERSION = '0.15'
48
49PYOPENSSL_IMP_ERR = None
50try:
51    import OpenSSL
52    from OpenSSL import crypto
53    PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
54    if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
55        # OpenSSL 1.1.0 or newer
56        OPENSSL_MUST_STAPLE_NAME = b"tlsfeature"
57        OPENSSL_MUST_STAPLE_VALUE = b"status_request"
58    else:
59        # OpenSSL 1.0.x or older
60        OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24"
61        OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05"
62except ImportError:
63    PYOPENSSL_IMP_ERR = traceback.format_exc()
64    PYOPENSSL_FOUND = False
65else:
66    PYOPENSSL_FOUND = True
67
68CRYPTOGRAPHY_IMP_ERR = None
69try:
70    import cryptography
71    from cryptography import x509
72    from cryptography.hazmat.primitives import serialization
73    CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
74except ImportError:
75    CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
76    CRYPTOGRAPHY_FOUND = False
77else:
78    CRYPTOGRAPHY_FOUND = True
79
80
81TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
82
83
84@six.add_metaclass(abc.ABCMeta)
85class CertificateInfoRetrieval(object):
86    def __init__(self, module, backend, content):
87        # content must be a bytes string
88        self.module = module
89        self.backend = backend
90        self.content = content
91
92    @abc.abstractmethod
93    def _get_der_bytes(self):
94        pass
95
96    @abc.abstractmethod
97    def _get_signature_algorithm(self):
98        pass
99
100    @abc.abstractmethod
101    def _get_subject_ordered(self):
102        pass
103
104    @abc.abstractmethod
105    def _get_issuer_ordered(self):
106        pass
107
108    @abc.abstractmethod
109    def _get_version(self):
110        pass
111
112    @abc.abstractmethod
113    def _get_key_usage(self):
114        pass
115
116    @abc.abstractmethod
117    def _get_extended_key_usage(self):
118        pass
119
120    @abc.abstractmethod
121    def _get_basic_constraints(self):
122        pass
123
124    @abc.abstractmethod
125    def _get_ocsp_must_staple(self):
126        pass
127
128    @abc.abstractmethod
129    def _get_subject_alt_name(self):
130        pass
131
132    @abc.abstractmethod
133    def get_not_before(self):
134        pass
135
136    @abc.abstractmethod
137    def get_not_after(self):
138        pass
139
140    @abc.abstractmethod
141    def _get_public_key_pem(self):
142        pass
143
144    @abc.abstractmethod
145    def _get_public_key_object(self):
146        pass
147
148    @abc.abstractmethod
149    def _get_subject_key_identifier(self):
150        pass
151
152    @abc.abstractmethod
153    def _get_authority_key_identifier(self):
154        pass
155
156    @abc.abstractmethod
157    def _get_serial_number(self):
158        pass
159
160    @abc.abstractmethod
161    def _get_all_extensions(self):
162        pass
163
164    @abc.abstractmethod
165    def _get_ocsp_uri(self):
166        pass
167
168    def get_info(self, prefer_one_fingerprint=False):
169        result = dict()
170        self.cert = load_certificate(None, content=self.content, backend=self.backend)
171
172        result['signature_algorithm'] = self._get_signature_algorithm()
173        subject = self._get_subject_ordered()
174        issuer = self._get_issuer_ordered()
175        result['subject'] = dict()
176        for k, v in subject:
177            result['subject'][k] = v
178        result['subject_ordered'] = subject
179        result['issuer'] = dict()
180        for k, v in issuer:
181            result['issuer'][k] = v
182        result['issuer_ordered'] = issuer
183        result['version'] = self._get_version()
184        result['key_usage'], result['key_usage_critical'] = self._get_key_usage()
185        result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage()
186        result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints()
187        result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple()
188        result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name()
189
190        not_before = self.get_not_before()
191        not_after = self.get_not_after()
192        result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT)
193        result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT)
194        result['expired'] = not_after < datetime.datetime.utcnow()
195
196        result['public_key'] = self._get_public_key_pem()
197
198        public_key_info = get_publickey_info(
199            self.module,
200            self.backend,
201            key=self._get_public_key_object(),
202            prefer_one_fingerprint=prefer_one_fingerprint)
203        result.update({
204            'public_key_type': public_key_info['type'],
205            'public_key_data': public_key_info['public_data'],
206            'public_key_fingerprints': public_key_info['fingerprints'],
207        })
208
209        result['fingerprints'] = get_fingerprint_of_bytes(
210            self._get_der_bytes(), prefer_one=prefer_one_fingerprint)
211
212        if self.backend != 'pyopenssl':
213            ski = self._get_subject_key_identifier()
214            if ski is not None:
215                ski = to_native(binascii.hexlify(ski))
216                ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)])
217            result['subject_key_identifier'] = ski
218
219            aki, aci, acsn = self._get_authority_key_identifier()
220            if aki is not None:
221                aki = to_native(binascii.hexlify(aki))
222                aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)])
223            result['authority_key_identifier'] = aki
224            result['authority_cert_issuer'] = aci
225            result['authority_cert_serial_number'] = acsn
226
227        result['serial_number'] = self._get_serial_number()
228        result['extensions_by_oid'] = self._get_all_extensions()
229        result['ocsp_uri'] = self._get_ocsp_uri()
230
231        return result
232
233
234class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
235    """Validate the supplied cert, using the cryptography backend"""
236    def __init__(self, module, content):
237        super(CertificateInfoRetrievalCryptography, self).__init__(module, 'cryptography', content)
238
239    def _get_der_bytes(self):
240        return self.cert.public_bytes(serialization.Encoding.DER)
241
242    def _get_signature_algorithm(self):
243        return cryptography_oid_to_name(self.cert.signature_algorithm_oid)
244
245    def _get_subject_ordered(self):
246        result = []
247        for attribute in self.cert.subject:
248            result.append([cryptography_oid_to_name(attribute.oid), attribute.value])
249        return result
250
251    def _get_issuer_ordered(self):
252        result = []
253        for attribute in self.cert.issuer:
254            result.append([cryptography_oid_to_name(attribute.oid), attribute.value])
255        return result
256
257    def _get_version(self):
258        if self.cert.version == x509.Version.v1:
259            return 1
260        if self.cert.version == x509.Version.v3:
261            return 3
262        return "unknown"
263
264    def _get_key_usage(self):
265        try:
266            current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage)
267            current_key_usage = current_key_ext.value
268            key_usage = dict(
269                digital_signature=current_key_usage.digital_signature,
270                content_commitment=current_key_usage.content_commitment,
271                key_encipherment=current_key_usage.key_encipherment,
272                data_encipherment=current_key_usage.data_encipherment,
273                key_agreement=current_key_usage.key_agreement,
274                key_cert_sign=current_key_usage.key_cert_sign,
275                crl_sign=current_key_usage.crl_sign,
276                encipher_only=False,
277                decipher_only=False,
278            )
279            if key_usage['key_agreement']:
280                key_usage.update(dict(
281                    encipher_only=current_key_usage.encipher_only,
282                    decipher_only=current_key_usage.decipher_only
283                ))
284
285            key_usage_names = dict(
286                digital_signature='Digital Signature',
287                content_commitment='Non Repudiation',
288                key_encipherment='Key Encipherment',
289                data_encipherment='Data Encipherment',
290                key_agreement='Key Agreement',
291                key_cert_sign='Certificate Sign',
292                crl_sign='CRL Sign',
293                encipher_only='Encipher Only',
294                decipher_only='Decipher Only',
295            )
296            return sorted([
297                key_usage_names[name] for name, value in key_usage.items() if value
298            ]), current_key_ext.critical
299        except cryptography.x509.ExtensionNotFound:
300            return None, False
301
302    def _get_extended_key_usage(self):
303        try:
304            ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
305            return sorted([
306                cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value
307            ]), ext_keyusage_ext.critical
308        except cryptography.x509.ExtensionNotFound:
309            return None, False
310
311    def _get_basic_constraints(self):
312        try:
313            ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints)
314            result = []
315            result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE'))
316            if ext_keyusage_ext.value.path_length is not None:
317                result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length))
318            return sorted(result), ext_keyusage_ext.critical
319        except cryptography.x509.ExtensionNotFound:
320            return None, False
321
322    def _get_ocsp_must_staple(self):
323        try:
324            try:
325                # This only works with cryptography >= 2.1
326                tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature)
327                value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value
328            except AttributeError:
329                # Fallback for cryptography < 2.1
330                oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
331                tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid)
332                value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05"
333            return value, tlsfeature_ext.critical
334        except cryptography.x509.ExtensionNotFound:
335            return None, False
336
337    def _get_subject_alt_name(self):
338        try:
339            san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
340            result = [cryptography_decode_name(san) for san in san_ext.value]
341            return result, san_ext.critical
342        except cryptography.x509.ExtensionNotFound:
343            return None, False
344
345    def get_not_before(self):
346        return self.cert.not_valid_before
347
348    def get_not_after(self):
349        return self.cert.not_valid_after
350
351    def _get_public_key_pem(self):
352        return self.cert.public_key().public_bytes(
353            serialization.Encoding.PEM,
354            serialization.PublicFormat.SubjectPublicKeyInfo,
355        )
356
357    def _get_public_key_object(self):
358        return self.cert.public_key()
359
360    def _get_subject_key_identifier(self):
361        try:
362            ext = self.cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
363            return ext.value.digest
364        except cryptography.x509.ExtensionNotFound:
365            return None
366
367    def _get_authority_key_identifier(self):
368        try:
369            ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
370            issuer = None
371            if ext.value.authority_cert_issuer is not None:
372                issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
373            return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
374        except cryptography.x509.ExtensionNotFound:
375            return None, None, None
376
377    def _get_serial_number(self):
378        return cryptography_serial_number_of_cert(self.cert)
379
380    def _get_all_extensions(self):
381        return cryptography_get_extensions_from_cert(self.cert)
382
383    def _get_ocsp_uri(self):
384        try:
385            ext = self.cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
386            for desc in ext.value:
387                if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP:
388                    if isinstance(desc.access_location, x509.UniformResourceIdentifier):
389                        return desc.access_location.value
390        except x509.ExtensionNotFound as dummy:
391            pass
392        return None
393
394
395class CertificateInfoRetrievalPyOpenSSL(CertificateInfoRetrieval):
396    """validate the supplied certificate."""
397
398    def __init__(self, module, content):
399        super(CertificateInfoRetrievalPyOpenSSL, self).__init__(module, 'pyopenssl', content)
400
401    def _get_der_bytes(self):
402        return crypto.dump_certificate(crypto.FILETYPE_ASN1, self.cert)
403
404    def _get_signature_algorithm(self):
405        return to_text(self.cert.get_signature_algorithm())
406
407    def __get_name(self, name):
408        result = []
409        for sub in name.get_components():
410            result.append([pyopenssl_normalize_name(sub[0]), to_text(sub[1])])
411        return result
412
413    def _get_subject_ordered(self):
414        return self.__get_name(self.cert.get_subject())
415
416    def _get_issuer_ordered(self):
417        return self.__get_name(self.cert.get_issuer())
418
419    def _get_version(self):
420        # Version numbers in certs are off by one:
421        # v1: 0, v2: 1, v3: 2 ...
422        return self.cert.get_version() + 1
423
424    def _get_extension(self, short_name):
425        for extension_idx in range(0, self.cert.get_extension_count()):
426            extension = self.cert.get_extension(extension_idx)
427            if extension.get_short_name() == short_name:
428                result = [
429                    pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',')
430                ]
431                return sorted(result), bool(extension.get_critical())
432        return None, False
433
434    def _get_key_usage(self):
435        return self._get_extension(b'keyUsage')
436
437    def _get_extended_key_usage(self):
438        return self._get_extension(b'extendedKeyUsage')
439
440    def _get_basic_constraints(self):
441        return self._get_extension(b'basicConstraints')
442
443    def _get_ocsp_must_staple(self):
444        extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())]
445        oms_ext = [
446            ext for ext in extensions
447            if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE
448        ]
449        if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
450            # Older versions of libssl don't know about OCSP Must Staple
451            oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05'])
452        if oms_ext:
453            return True, bool(oms_ext[0].get_critical())
454        else:
455            return None, False
456
457    def _get_subject_alt_name(self):
458        for extension_idx in range(0, self.cert.get_extension_count()):
459            extension = self.cert.get_extension(extension_idx)
460            if extension.get_short_name() == b'subjectAltName':
461                result = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
462                          to_text(extension, errors='surrogate_or_strict').split(', ')]
463                return result, bool(extension.get_critical())
464        return None, False
465
466    def get_not_before(self):
467        time_string = to_native(self.cert.get_notBefore())
468        return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
469
470    def get_not_after(self):
471        time_string = to_native(self.cert.get_notAfter())
472        return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
473
474    def _get_public_key_pem(self):
475        try:
476            return crypto.dump_publickey(
477                crypto.FILETYPE_PEM,
478                self.cert.get_pubkey(),
479            )
480        except AttributeError:
481            try:
482                # pyOpenSSL < 16.0:
483                bio = crypto._new_mem_buf()
484                rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey)
485                if rc != 1:
486                    crypto._raise_current_error()
487                return crypto._bio_to_string(bio)
488            except AttributeError:
489                self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
490                                 'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
491
492    def _get_public_key_object(self):
493        return self.cert.get_pubkey()
494
495    def _get_subject_key_identifier(self):
496        # Won't be implemented
497        return None
498
499    def _get_authority_key_identifier(self):
500        # Won't be implemented
501        return None, None, None
502
503    def _get_serial_number(self):
504        return self.cert.get_serial_number()
505
506    def _get_all_extensions(self):
507        return pyopenssl_get_extensions_from_cert(self.cert)
508
509    def _get_ocsp_uri(self):
510        for i in range(self.cert.get_extension_count()):
511            ext = self.cert.get_extension(i)
512            if ext.get_short_name() == b'authorityInfoAccess':
513                v = str(ext)
514                m = re.search('^OCSP - URI:(.*)$', v, flags=re.MULTILINE)
515                if m:
516                    return m.group(1)
517        return None
518
519
520def get_certificate_info(module, backend, content, prefer_one_fingerprint=False):
521    if backend == 'cryptography':
522        info = CertificateInfoRetrievalCryptography(module, content)
523    elif backend == 'pyopenssl':
524        info = CertificateInfoRetrievalPyOpenSSL(module, content)
525    return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)
526
527
528def select_backend(module, backend, content):
529    if backend == 'auto':
530        # Detection what is possible
531        can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
532        can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
533
534        # First try cryptography, then pyOpenSSL
535        if can_use_cryptography:
536            backend = 'cryptography'
537        elif can_use_pyopenssl:
538            backend = 'pyopenssl'
539
540        # Success?
541        if backend == 'auto':
542            module.fail_json(msg=("Can't detect any of the required Python libraries "
543                                  "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
544                                      MINIMAL_CRYPTOGRAPHY_VERSION,
545                                      MINIMAL_PYOPENSSL_VERSION))
546
547    if backend == 'pyopenssl':
548        if not PYOPENSSL_FOUND:
549            module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
550                             exception=PYOPENSSL_IMP_ERR)
551        try:
552            getattr(crypto.X509Req, 'get_extensions')
553        except AttributeError:
554            module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs')
555
556        module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
557                         version='2.0.0', collection_name='community.crypto')
558        return backend, CertificateInfoRetrievalPyOpenSSL(module, content)
559    elif backend == 'cryptography':
560        if not CRYPTOGRAPHY_FOUND:
561            module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
562                             exception=CRYPTOGRAPHY_IMP_ERR)
563        return backend, CertificateInfoRetrievalCryptography(module, content)
564    else:
565        raise ValueError('Unsupported value for backend: {0}'.format(backend))
566