1# -*- coding: utf-8 -*- 2 3# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org> 4# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at> 5# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 6 7from __future__ import absolute_import, division, print_function 8__metaclass__ = type 9 10 11import os 12 13from random import randrange 14 15from ansible.module_utils.common.text.converters import to_bytes 16 17from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( 18 get_relative_time_option, 19 select_message_digest, 20) 21 22from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( 23 cryptography_key_needs_digest_for_signing, 24 cryptography_serial_number_of_cert, 25) 26 27from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import ( 28 CertificateError, 29 CertificateBackend, 30 CertificateProvider, 31) 32 33try: 34 from OpenSSL import crypto 35except ImportError: 36 pass 37 38try: 39 import cryptography 40 from cryptography import x509 41 from cryptography.hazmat.backends import default_backend 42 from cryptography.hazmat.primitives.serialization import Encoding 43except ImportError: 44 pass 45 46 47class SelfSignedCertificateBackendCryptography(CertificateBackend): 48 def __init__(self, module): 49 super(SelfSignedCertificateBackendCryptography, self).__init__(module, 'cryptography') 50 51 self.create_subject_key_identifier = module.params['selfsigned_create_subject_key_identifier'] 52 self.notBefore = get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend) 53 self.notAfter = get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend) 54 self.digest = select_message_digest(module.params['selfsigned_digest']) 55 self.version = module.params['selfsigned_version'] 56 self.serial_number = x509.random_serial_number() 57 58 if self.csr_path is not None and not os.path.exists(self.csr_path): 59 raise CertificateError( 60 'The certificate signing request file {0} does not exist'.format(self.csr_path) 61 ) 62 if self.privatekey_content is None and not os.path.exists(self.privatekey_path): 63 raise CertificateError( 64 'The private key file {0} does not exist'.format(self.privatekey_path) 65 ) 66 67 self._module = module 68 69 self._ensure_private_key_loaded() 70 71 self._ensure_csr_loaded() 72 if self.csr is None: 73 # Create empty CSR on the fly 74 csr = cryptography.x509.CertificateSigningRequestBuilder() 75 csr = csr.subject_name(cryptography.x509.Name([])) 76 digest = None 77 if cryptography_key_needs_digest_for_signing(self.privatekey): 78 digest = self.digest 79 if digest is None: 80 self.module.fail_json(msg='Unsupported digest "{0}"'.format(module.params['selfsigned_digest'])) 81 try: 82 self.csr = csr.sign(self.privatekey, digest, default_backend()) 83 except TypeError as e: 84 if str(e) == 'Algorithm must be a registered hash algorithm.' and digest is None: 85 self.module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.') 86 raise 87 88 if cryptography_key_needs_digest_for_signing(self.privatekey): 89 if self.digest is None: 90 raise CertificateError( 91 'The digest %s is not supported with the cryptography backend' % module.params['selfsigned_digest'] 92 ) 93 else: 94 self.digest = None 95 96 def generate_certificate(self): 97 """(Re-)Generate certificate.""" 98 try: 99 cert_builder = x509.CertificateBuilder() 100 cert_builder = cert_builder.subject_name(self.csr.subject) 101 cert_builder = cert_builder.issuer_name(self.csr.subject) 102 cert_builder = cert_builder.serial_number(self.serial_number) 103 cert_builder = cert_builder.not_valid_before(self.notBefore) 104 cert_builder = cert_builder.not_valid_after(self.notAfter) 105 cert_builder = cert_builder.public_key(self.privatekey.public_key()) 106 has_ski = False 107 for extension in self.csr.extensions: 108 if isinstance(extension.value, x509.SubjectKeyIdentifier): 109 if self.create_subject_key_identifier == 'always_create': 110 continue 111 has_ski = True 112 cert_builder = cert_builder.add_extension(extension.value, critical=extension.critical) 113 if not has_ski and self.create_subject_key_identifier != 'never_create': 114 cert_builder = cert_builder.add_extension( 115 x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()), 116 critical=False 117 ) 118 except ValueError as e: 119 raise CertificateError(str(e)) 120 121 try: 122 certificate = cert_builder.sign( 123 private_key=self.privatekey, algorithm=self.digest, 124 backend=default_backend() 125 ) 126 except TypeError as e: 127 if str(e) == 'Algorithm must be a registered hash algorithm.' and self.digest is None: 128 self.module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.') 129 raise 130 131 self.cert = certificate 132 133 def get_certificate_data(self): 134 """Return bytes for self.cert.""" 135 return self.cert.public_bytes(Encoding.PEM) 136 137 def dump(self, include_certificate): 138 result = super(SelfSignedCertificateBackendCryptography, self).dump(include_certificate) 139 140 if self.module.check_mode: 141 result.update({ 142 'notBefore': self.notBefore.strftime("%Y%m%d%H%M%SZ"), 143 'notAfter': self.notAfter.strftime("%Y%m%d%H%M%SZ"), 144 'serial_number': self.serial_number, 145 }) 146 else: 147 if self.cert is None: 148 self.cert = self.existing_certificate 149 result.update({ 150 'notBefore': self.cert.not_valid_before.strftime("%Y%m%d%H%M%SZ"), 151 'notAfter': self.cert.not_valid_after.strftime("%Y%m%d%H%M%SZ"), 152 'serial_number': cryptography_serial_number_of_cert(self.cert), 153 }) 154 155 return result 156 157 158def generate_serial_number(): 159 """Generate a serial number for a certificate""" 160 while True: 161 result = randrange(0, 1 << 160) 162 if result >= 1000: 163 return result 164 165 166class SelfSignedCertificateBackendPyOpenSSL(CertificateBackend): 167 def __init__(self, module): 168 super(SelfSignedCertificateBackendPyOpenSSL, self).__init__(module, 'pyopenssl') 169 170 if module.params['selfsigned_create_subject_key_identifier'] != 'create_if_not_provided': 171 module.fail_json(msg='selfsigned_create_subject_key_identifier cannot be used with the pyOpenSSL backend!') 172 self.notBefore = get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend) 173 self.notAfter = get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend) 174 self.digest = module.params['selfsigned_digest'] 175 self.version = module.params['selfsigned_version'] 176 self.serial_number = generate_serial_number() 177 178 if self.csr_path is not None and not os.path.exists(self.csr_path): 179 raise CertificateError( 180 'The certificate signing request file {0} does not exist'.format(self.csr_path) 181 ) 182 if self.privatekey_content is None and not os.path.exists(self.privatekey_path): 183 raise CertificateError( 184 'The private key file {0} does not exist'.format(self.privatekey_path) 185 ) 186 187 self._ensure_private_key_loaded() 188 189 self._ensure_csr_loaded() 190 if self.csr is None: 191 # Create empty CSR on the fly 192 self.csr = crypto.X509Req() 193 self.csr.set_pubkey(self.privatekey) 194 self.csr.sign(self.privatekey, self.digest) 195 196 def generate_certificate(self): 197 """(Re-)Generate certificate.""" 198 cert = crypto.X509() 199 cert.set_serial_number(self.serial_number) 200 cert.set_notBefore(to_bytes(self.notBefore)) 201 cert.set_notAfter(to_bytes(self.notAfter)) 202 cert.set_subject(self.csr.get_subject()) 203 cert.set_issuer(self.csr.get_subject()) 204 cert.set_version(self.version - 1) 205 cert.set_pubkey(self.csr.get_pubkey()) 206 cert.add_extensions(self.csr.get_extensions()) 207 208 cert.sign(self.privatekey, self.digest) 209 self.cert = cert 210 211 def get_certificate_data(self): 212 """Return bytes for self.cert.""" 213 return crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert) 214 215 def dump(self, include_certificate): 216 result = super(SelfSignedCertificateBackendPyOpenSSL, self).dump(include_certificate) 217 218 if self.module.check_mode: 219 result.update({ 220 'notBefore': self.notBefore, 221 'notAfter': self.notAfter, 222 'serial_number': self.serial_number, 223 }) 224 else: 225 if self.cert is None: 226 self.cert = self.existing_certificate 227 result.update({ 228 'notBefore': self.cert.get_notBefore(), 229 'notAfter': self.cert.get_notAfter(), 230 'serial_number': self.cert.get_serial_number(), 231 }) 232 233 return result 234 235 236class SelfSignedCertificateProvider(CertificateProvider): 237 def validate_module_args(self, module): 238 if module.params['privatekey_path'] is None and module.params['privatekey_content'] is None: 239 module.fail_json(msg='One of privatekey_path and privatekey_content must be specified for the selfsigned provider.') 240 241 def needs_version_two_certs(self, module): 242 return module.params['selfsigned_version'] == 2 243 244 def create_backend(self, module, backend): 245 if backend == 'cryptography': 246 return SelfSignedCertificateBackendCryptography(module) 247 if backend == 'pyopenssl': 248 return SelfSignedCertificateBackendPyOpenSSL(module) 249 250 251def add_selfsigned_provider_to_argument_spec(argument_spec): 252 argument_spec.argument_spec['provider']['choices'].append('selfsigned') 253 argument_spec.argument_spec.update(dict( 254 selfsigned_version=dict(type='int', default=3), 255 selfsigned_digest=dict(type='str', default='sha256'), 256 selfsigned_not_before=dict(type='str', default='+0s', aliases=['selfsigned_notBefore']), 257 selfsigned_not_after=dict(type='str', default='+3650d', aliases=['selfsigned_notAfter']), 258 selfsigned_create_subject_key_identifier=dict( 259 type='str', 260 default='create_if_not_provided', 261 choices=['create_if_not_provided', 'always_create', 'never_create'] 262 ), 263 )) 264