1# -*- coding: utf-8 -*-
2
3# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
4# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
5# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
6
7from __future__ import absolute_import, division, print_function
8__metaclass__ = type
9
10
11import os
12
13from random import randrange
14
15from ansible.module_utils.common.text.converters import to_bytes
16
17from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
18    get_relative_time_option,
19    select_message_digest,
20)
21
22from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
23    cryptography_key_needs_digest_for_signing,
24    cryptography_serial_number_of_cert,
25)
26
27from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
28    CertificateError,
29    CertificateBackend,
30    CertificateProvider,
31)
32
33try:
34    from OpenSSL import crypto
35except ImportError:
36    pass
37
38try:
39    import cryptography
40    from cryptography import x509
41    from cryptography.hazmat.backends import default_backend
42    from cryptography.hazmat.primitives.serialization import Encoding
43except ImportError:
44    pass
45
46
47class SelfSignedCertificateBackendCryptography(CertificateBackend):
48    def __init__(self, module):
49        super(SelfSignedCertificateBackendCryptography, self).__init__(module, 'cryptography')
50
51        self.create_subject_key_identifier = module.params['selfsigned_create_subject_key_identifier']
52        self.notBefore = get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend)
53        self.notAfter = get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend)
54        self.digest = select_message_digest(module.params['selfsigned_digest'])
55        self.version = module.params['selfsigned_version']
56        self.serial_number = x509.random_serial_number()
57
58        if self.csr_path is not None and not os.path.exists(self.csr_path):
59            raise CertificateError(
60                'The certificate signing request file {0} does not exist'.format(self.csr_path)
61            )
62        if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
63            raise CertificateError(
64                'The private key file {0} does not exist'.format(self.privatekey_path)
65            )
66
67        self._module = module
68
69        self._ensure_private_key_loaded()
70
71        self._ensure_csr_loaded()
72        if self.csr is None:
73            # Create empty CSR on the fly
74            csr = cryptography.x509.CertificateSigningRequestBuilder()
75            csr = csr.subject_name(cryptography.x509.Name([]))
76            digest = None
77            if cryptography_key_needs_digest_for_signing(self.privatekey):
78                digest = self.digest
79                if digest is None:
80                    self.module.fail_json(msg='Unsupported digest "{0}"'.format(module.params['selfsigned_digest']))
81            try:
82                self.csr = csr.sign(self.privatekey, digest, default_backend())
83            except TypeError as e:
84                if str(e) == 'Algorithm must be a registered hash algorithm.' and digest is None:
85                    self.module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.')
86                raise
87
88        if cryptography_key_needs_digest_for_signing(self.privatekey):
89            if self.digest is None:
90                raise CertificateError(
91                    'The digest %s is not supported with the cryptography backend' % module.params['selfsigned_digest']
92                )
93        else:
94            self.digest = None
95
96    def generate_certificate(self):
97        """(Re-)Generate certificate."""
98        try:
99            cert_builder = x509.CertificateBuilder()
100            cert_builder = cert_builder.subject_name(self.csr.subject)
101            cert_builder = cert_builder.issuer_name(self.csr.subject)
102            cert_builder = cert_builder.serial_number(self.serial_number)
103            cert_builder = cert_builder.not_valid_before(self.notBefore)
104            cert_builder = cert_builder.not_valid_after(self.notAfter)
105            cert_builder = cert_builder.public_key(self.privatekey.public_key())
106            has_ski = False
107            for extension in self.csr.extensions:
108                if isinstance(extension.value, x509.SubjectKeyIdentifier):
109                    if self.create_subject_key_identifier == 'always_create':
110                        continue
111                    has_ski = True
112                cert_builder = cert_builder.add_extension(extension.value, critical=extension.critical)
113            if not has_ski and self.create_subject_key_identifier != 'never_create':
114                cert_builder = cert_builder.add_extension(
115                    x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()),
116                    critical=False
117                )
118        except ValueError as e:
119            raise CertificateError(str(e))
120
121        try:
122            certificate = cert_builder.sign(
123                private_key=self.privatekey, algorithm=self.digest,
124                backend=default_backend()
125            )
126        except TypeError as e:
127            if str(e) == 'Algorithm must be a registered hash algorithm.' and self.digest is None:
128                self.module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.')
129            raise
130
131        self.cert = certificate
132
133    def get_certificate_data(self):
134        """Return bytes for self.cert."""
135        return self.cert.public_bytes(Encoding.PEM)
136
137    def dump(self, include_certificate):
138        result = super(SelfSignedCertificateBackendCryptography, self).dump(include_certificate)
139
140        if self.module.check_mode:
141            result.update({
142                'notBefore': self.notBefore.strftime("%Y%m%d%H%M%SZ"),
143                'notAfter': self.notAfter.strftime("%Y%m%d%H%M%SZ"),
144                'serial_number': self.serial_number,
145            })
146        else:
147            if self.cert is None:
148                self.cert = self.existing_certificate
149            result.update({
150                'notBefore': self.cert.not_valid_before.strftime("%Y%m%d%H%M%SZ"),
151                'notAfter': self.cert.not_valid_after.strftime("%Y%m%d%H%M%SZ"),
152                'serial_number': cryptography_serial_number_of_cert(self.cert),
153            })
154
155        return result
156
157
158def generate_serial_number():
159    """Generate a serial number for a certificate"""
160    while True:
161        result = randrange(0, 1 << 160)
162        if result >= 1000:
163            return result
164
165
166class SelfSignedCertificateBackendPyOpenSSL(CertificateBackend):
167    def __init__(self, module):
168        super(SelfSignedCertificateBackendPyOpenSSL, self).__init__(module, 'pyopenssl')
169
170        if module.params['selfsigned_create_subject_key_identifier'] != 'create_if_not_provided':
171            module.fail_json(msg='selfsigned_create_subject_key_identifier cannot be used with the pyOpenSSL backend!')
172        self.notBefore = get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend)
173        self.notAfter = get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend)
174        self.digest = module.params['selfsigned_digest']
175        self.version = module.params['selfsigned_version']
176        self.serial_number = generate_serial_number()
177
178        if self.csr_path is not None and not os.path.exists(self.csr_path):
179            raise CertificateError(
180                'The certificate signing request file {0} does not exist'.format(self.csr_path)
181            )
182        if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
183            raise CertificateError(
184                'The private key file {0} does not exist'.format(self.privatekey_path)
185            )
186
187        self._ensure_private_key_loaded()
188
189        self._ensure_csr_loaded()
190        if self.csr is None:
191            # Create empty CSR on the fly
192            self.csr = crypto.X509Req()
193            self.csr.set_pubkey(self.privatekey)
194            self.csr.sign(self.privatekey, self.digest)
195
196    def generate_certificate(self):
197        """(Re-)Generate certificate."""
198        cert = crypto.X509()
199        cert.set_serial_number(self.serial_number)
200        cert.set_notBefore(to_bytes(self.notBefore))
201        cert.set_notAfter(to_bytes(self.notAfter))
202        cert.set_subject(self.csr.get_subject())
203        cert.set_issuer(self.csr.get_subject())
204        cert.set_version(self.version - 1)
205        cert.set_pubkey(self.csr.get_pubkey())
206        cert.add_extensions(self.csr.get_extensions())
207
208        cert.sign(self.privatekey, self.digest)
209        self.cert = cert
210
211    def get_certificate_data(self):
212        """Return bytes for self.cert."""
213        return crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert)
214
215    def dump(self, include_certificate):
216        result = super(SelfSignedCertificateBackendPyOpenSSL, self).dump(include_certificate)
217
218        if self.module.check_mode:
219            result.update({
220                'notBefore': self.notBefore,
221                'notAfter': self.notAfter,
222                'serial_number': self.serial_number,
223            })
224        else:
225            if self.cert is None:
226                self.cert = self.existing_certificate
227            result.update({
228                'notBefore': self.cert.get_notBefore(),
229                'notAfter': self.cert.get_notAfter(),
230                'serial_number': self.cert.get_serial_number(),
231            })
232
233        return result
234
235
236class SelfSignedCertificateProvider(CertificateProvider):
237    def validate_module_args(self, module):
238        if module.params['privatekey_path'] is None and module.params['privatekey_content'] is None:
239            module.fail_json(msg='One of privatekey_path and privatekey_content must be specified for the selfsigned provider.')
240
241    def needs_version_two_certs(self, module):
242        return module.params['selfsigned_version'] == 2
243
244    def create_backend(self, module, backend):
245        if backend == 'cryptography':
246            return SelfSignedCertificateBackendCryptography(module)
247        if backend == 'pyopenssl':
248            return SelfSignedCertificateBackendPyOpenSSL(module)
249
250
251def add_selfsigned_provider_to_argument_spec(argument_spec):
252    argument_spec.argument_spec['provider']['choices'].append('selfsigned')
253    argument_spec.argument_spec.update(dict(
254        selfsigned_version=dict(type='int', default=3),
255        selfsigned_digest=dict(type='str', default='sha256'),
256        selfsigned_not_before=dict(type='str', default='+0s', aliases=['selfsigned_notBefore']),
257        selfsigned_not_after=dict(type='str', default='+3650d', aliases=['selfsigned_notAfter']),
258        selfsigned_create_subject_key_identifier=dict(
259            type='str',
260            default='create_if_not_provided',
261            choices=['create_if_not_provided', 'always_create', 'never_create']
262        ),
263    ))
264