1"""Certbot client crypto utility functions. 2 3.. todo:: Make the transition to use PSS rather than PKCS1_v1_5 when the server 4 is capable of handling the signatures. 5 6""" 7import datetime 8import hashlib 9import logging 10import re 11from typing import Callable 12from typing import List 13from typing import Optional 14from typing import Set 15from typing import Tuple 16from typing import TYPE_CHECKING 17from typing import Union 18import warnings 19 20from cryptography import x509 21from cryptography.exceptions import InvalidSignature 22from cryptography.exceptions import UnsupportedAlgorithm 23from cryptography.hazmat.backends import default_backend 24from cryptography.hazmat.primitives import hashes 25from cryptography.hazmat.primitives.asymmetric import ec 26from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey 27from cryptography.hazmat.primitives.asymmetric.ec import ECDSA 28from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey 29from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 30from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey 31from cryptography.hazmat.primitives.serialization import Encoding 32from cryptography.hazmat.primitives.serialization import NoEncryption 33from cryptography.hazmat.primitives.serialization import PrivateFormat 34import josepy 35from OpenSSL import crypto 36from OpenSSL import SSL 37import pyrfc3339 38import zope.component 39 40from acme import crypto_util as acme_crypto_util 41from certbot import errors 42from certbot import interfaces 43from certbot import util 44from certbot.compat import os 45 46# Cryptography ed448 and ed25519 modules do not exist on oldest tests 47if TYPE_CHECKING: 48 from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey 49 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey 50 51logger = logging.getLogger(__name__) 52 53 54# High level functions 55 56def generate_key(key_size: int, key_dir: str, key_type: str = "rsa", 57 elliptic_curve: str = "secp256r1", keyname: str = "key-certbot.pem", 58 strict_permissions: bool = True) -> util.Key: 59 """Initializes and saves a privkey. 60 61 Inits key and saves it in PEM format on the filesystem. 62 63 .. note:: keyname is the attempted filename, it may be different if a file 64 already exists at the path. 65 66 :param int key_size: key size in bits if key size is rsa. 67 :param str key_dir: Key save directory. 68 :param str key_type: Key Type [rsa, ecdsa] 69 :param str elliptic_curve: Name of the elliptic curve if key type is ecdsa. 70 :param str keyname: Filename of key 71 :param bool strict_permissions: If true and key_dir exists, an exception is raised if 72 the directory doesn't have 0700 permissions or isn't owned by the current user. 73 74 :returns: Key 75 :rtype: :class:`certbot.util.Key` 76 77 :raises ValueError: If unable to generate the key given key_size. 78 79 """ 80 try: 81 key_pem = make_key( 82 bits=key_size, elliptic_curve=elliptic_curve or "secp256r1", key_type=key_type, 83 ) 84 except ValueError as err: 85 logger.debug("", exc_info=True) 86 logger.error("Encountered error while making key: %s", str(err)) 87 raise err 88 89 # Save file 90 util.make_or_verify_dir(key_dir, 0o700, strict_permissions) 91 key_f, key_path = util.unique_file( 92 os.path.join(key_dir, keyname), 0o600, "wb") 93 with key_f: 94 key_f.write(key_pem) 95 if key_type == 'rsa': 96 logger.debug("Generating RSA key (%d bits): %s", key_size, key_path) 97 else: 98 logger.debug("Generating ECDSA key (%d bits): %s", key_size, key_path) 99 100 return util.Key(key_path, key_pem) 101 102 103# TODO: Remove this call once zope dependencies are removed from Certbot. 104def init_save_key(key_size: int, key_dir: str, key_type: str = "rsa", 105 elliptic_curve: str = "secp256r1", 106 keyname: str = "key-certbot.pem") -> util.Key: 107 """Initializes and saves a privkey. 108 109 Inits key and saves it in PEM format on the filesystem. 110 111 .. note:: keyname is the attempted filename, it may be different if a file 112 already exists at the path. 113 114 .. deprecated:: 1.16.0 115 Use :func:`generate_key` instead. 116 117 :param int key_size: key size in bits if key size is rsa. 118 :param str key_dir: Key save directory. 119 :param str key_type: Key Type [rsa, ecdsa] 120 :param str elliptic_curve: Name of the elliptic curve if key type is ecdsa. 121 :param str keyname: Filename of key 122 123 :returns: Key 124 :rtype: :class:`certbot.util.Key` 125 126 :raises ValueError: If unable to generate the key given key_size. 127 128 """ 129 warnings.warn("certbot.crypto_util.init_save_key is deprecated, please use " 130 "certbot.crypto_util.generate_key instead.", DeprecationWarning) 131 132 config = zope.component.getUtility(interfaces.IConfig) 133 134 return generate_key(key_size, key_dir, key_type=key_type, elliptic_curve=elliptic_curve, 135 keyname=keyname, strict_permissions=config.strict_permissions) 136 137 138def generate_csr(privkey: util.Key, names: Union[List[str], Set[str]], path: str, 139 must_staple: bool = False, strict_permissions: bool = True) -> util.CSR: 140 """Initialize a CSR with the given private key. 141 142 :param privkey: Key to include in the CSR 143 :type privkey: :class:`certbot.util.Key` 144 :param set names: `str` names to include in the CSR 145 :param str path: Certificate save directory. 146 :param bool must_staple: If true, include the TLS Feature extension "OCSP Must Staple" 147 :param bool strict_permissions: If true and path exists, an exception is raised if 148 the directory doesn't have 0755 permissions or isn't owned by the current user. 149 150 :returns: CSR 151 :rtype: :class:`certbot.util.CSR` 152 153 """ 154 csr_pem = acme_crypto_util.make_csr( 155 privkey.pem, names, must_staple=must_staple) 156 157 # Save CSR 158 util.make_or_verify_dir(path, 0o755, strict_permissions) 159 csr_f, csr_filename = util.unique_file( 160 os.path.join(path, "csr-certbot.pem"), 0o644, "wb") 161 with csr_f: 162 csr_f.write(csr_pem) 163 logger.debug("Creating CSR: %s", csr_filename) 164 165 return util.CSR(csr_filename, csr_pem, "pem") 166 167 168# TODO: Remove this call once zope dependencies are removed from Certbot. 169def init_save_csr(privkey: util.Key, names: Set[str], path: str) -> util.CSR: 170 """Initialize a CSR with the given private key. 171 172 .. deprecated:: 1.16.0 173 Use :func:`generate_csr` instead. 174 175 :param privkey: Key to include in the CSR 176 :type privkey: :class:`certbot.util.Key` 177 178 :param set names: `str` names to include in the CSR 179 180 :param str path: Certificate save directory. 181 182 :returns: CSR 183 :rtype: :class:`certbot.util.CSR` 184 185 """ 186 warnings.warn("certbot.crypto_util.init_save_csr is deprecated, please use " 187 "certbot.crypto_util.generate_csr instead.", DeprecationWarning) 188 189 config = zope.component.getUtility(interfaces.IConfig) 190 191 return generate_csr(privkey, names, path, must_staple=config.must_staple, 192 strict_permissions=config.strict_permissions) 193 194 195# WARNING: the csr and private key file are possible attack vectors for TOCTOU 196# We should either... 197# A. Do more checks to verify that the CSR is trusted/valid 198# B. Audit the parsing code for vulnerabilities 199 200def valid_csr(csr: bytes) -> bool: 201 """Validate CSR. 202 203 Check if `csr` is a valid CSR for the given domains. 204 205 :param bytes csr: CSR in PEM. 206 207 :returns: Validity of CSR. 208 :rtype: bool 209 210 """ 211 try: 212 req = crypto.load_certificate_request( 213 crypto.FILETYPE_PEM, csr) 214 return req.verify(req.get_pubkey()) 215 except crypto.Error: 216 logger.debug("", exc_info=True) 217 return False 218 219 220def csr_matches_pubkey(csr: bytes, privkey: bytes) -> bool: 221 """Does private key correspond to the subject public key in the CSR? 222 223 :param bytes csr: CSR in PEM. 224 :param bytes privkey: Private key file contents (PEM) 225 226 :returns: Correspondence of private key to CSR subject public key. 227 :rtype: bool 228 229 """ 230 req = crypto.load_certificate_request( 231 crypto.FILETYPE_PEM, csr) 232 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, privkey) 233 try: 234 return req.verify(pkey) 235 except crypto.Error: 236 logger.debug("", exc_info=True) 237 return False 238 239 240def import_csr_file(csrfile: str, data: bytes) -> Tuple[int, util.CSR, List[str]]: 241 """Import a CSR file, which can be either PEM or DER. 242 243 :param str csrfile: CSR filename 244 :param bytes data: contents of the CSR file 245 246 :returns: (`crypto.FILETYPE_PEM`, 247 util.CSR object representing the CSR, 248 list of domains requested in the CSR) 249 :rtype: tuple 250 251 """ 252 PEM = crypto.FILETYPE_PEM 253 load = crypto.load_certificate_request 254 try: 255 # Try to parse as DER first, then fall back to PEM. 256 csr = load(crypto.FILETYPE_ASN1, data) 257 except crypto.Error: 258 try: 259 csr = load(PEM, data) 260 except crypto.Error: 261 raise errors.Error("Failed to parse CSR file: {0}".format(csrfile)) 262 263 domains = _get_names_from_loaded_cert_or_req(csr) 264 # Internally we always use PEM, so re-encode as PEM before returning. 265 data_pem = crypto.dump_certificate_request(PEM, csr) 266 return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains 267 268 269def make_key(bits: int = 1024, key_type: str = "rsa", 270 elliptic_curve: Optional[str] = None) -> bytes: 271 """Generate PEM encoded RSA|EC key. 272 273 :param int bits: Number of bits if key_type=rsa. At least 1024 for RSA. 274 :param str key_type: The type of key to generate, but be rsa or ecdsa 275 :param str elliptic_curve: The elliptic curve to use. 276 277 :returns: new RSA or ECDSA key in PEM form with specified number of bits 278 or of type ec_curve when key_type ecdsa is used. 279 :rtype: str 280 """ 281 if key_type == 'rsa': 282 if bits < 1024: 283 raise errors.Error("Unsupported RSA key length: {}".format(bits)) 284 285 key = crypto.PKey() 286 key.generate_key(crypto.TYPE_RSA, bits) 287 elif key_type == 'ecdsa': 288 if not elliptic_curve: 289 raise errors.Error("When key_type == ecdsa, elliptic_curve must be set.") 290 try: 291 name = elliptic_curve.upper() 292 if name in ('SECP256R1', 'SECP384R1', 'SECP521R1'): 293 _key = ec.generate_private_key( 294 curve=getattr(ec, elliptic_curve.upper(), None)(), 295 backend=default_backend() 296 ) 297 else: 298 raise errors.Error("Unsupported elliptic curve: {}".format(elliptic_curve)) 299 except TypeError: 300 raise errors.Error("Unsupported elliptic curve: {}".format(elliptic_curve)) 301 except UnsupportedAlgorithm as e: 302 raise e from errors.Error(str(e)) 303 # This type ignore directive is required due to an outdated version of types-cryptography. 304 # It can be removed once package types-pyOpenSSL depends on cryptography instead of 305 # types-cryptography and so types-cryptography is not installed anymore. 306 # See https://github.com/python/typeshed/issues/5618 307 _key_pem = _key.private_bytes( # type: ignore 308 encoding=Encoding.PEM, 309 format=PrivateFormat.TraditionalOpenSSL, 310 encryption_algorithm=NoEncryption() 311 ) 312 key = crypto.load_privatekey(crypto.FILETYPE_PEM, _key_pem) 313 else: 314 raise errors.Error("Invalid key_type specified: {}. Use [rsa|ecdsa]".format(key_type)) 315 return crypto.dump_privatekey(crypto.FILETYPE_PEM, key) 316 317 318def valid_privkey(privkey: str) -> bool: 319 """Is valid RSA private key? 320 321 :param str privkey: Private key file contents in PEM 322 323 :returns: Validity of private key. 324 :rtype: bool 325 326 """ 327 try: 328 return crypto.load_privatekey( 329 crypto.FILETYPE_PEM, privkey).check() 330 except (TypeError, crypto.Error): 331 return False 332 333 334def verify_renewable_cert(renewable_cert: interfaces.RenewableCert) -> None: 335 """For checking that your certs were not corrupted on disk. 336 337 Several things are checked: 338 1. Signature verification for the cert. 339 2. That fullchain matches cert and chain when concatenated. 340 3. Check that the private key matches the certificate. 341 342 :param renewable_cert: cert to verify 343 :type renewable_cert: certbot.interfaces.RenewableCert 344 345 :raises errors.Error: If verification fails. 346 """ 347 verify_renewable_cert_sig(renewable_cert) 348 verify_fullchain(renewable_cert) 349 verify_cert_matches_priv_key(renewable_cert.cert_path, renewable_cert.key_path) 350 351 352def verify_renewable_cert_sig(renewable_cert: interfaces.RenewableCert) -> None: 353 """Verifies the signature of a RenewableCert object. 354 355 :param renewable_cert: cert to verify 356 :type renewable_cert: certbot.interfaces.RenewableCert 357 358 :raises errors.Error: If signature verification fails. 359 """ 360 try: 361 with open(renewable_cert.chain_path, 'rb') as chain_file: 362 chain = x509.load_pem_x509_certificate(chain_file.read(), default_backend()) 363 with open(renewable_cert.cert_path, 'rb') as cert_file: 364 cert = x509.load_pem_x509_certificate(cert_file.read(), default_backend()) 365 pk = chain.public_key() 366 verify_signed_payload(pk, cert.signature, cert.tbs_certificate_bytes, 367 cert.signature_hash_algorithm) 368 except (IOError, ValueError, InvalidSignature) as e: 369 error_str = "verifying the signature of the certificate located at {0} has failed. \ 370 Details: {1}".format(renewable_cert.cert_path, e) 371 logger.exception(error_str) 372 raise errors.Error(error_str) 373 374 375def verify_signed_payload(public_key: Union[DSAPublicKey, 'Ed25519PublicKey', 'Ed448PublicKey', 376 EllipticCurvePublicKey, RSAPublicKey], 377 signature: bytes, payload: bytes, 378 signature_hash_algorithm: hashes.HashAlgorithm) -> None: 379 """Check the signature of a payload. 380 381 :param RSAPublicKey/EllipticCurvePublicKey public_key: the public_key to check signature 382 :param bytes signature: the signature bytes 383 :param bytes payload: the payload bytes 384 :param hashes.HashAlgorithm signature_hash_algorithm: algorithm used to hash the payload 385 386 :raises InvalidSignature: If signature verification fails. 387 :raises errors.Error: If public key type is not supported 388 """ 389 if isinstance(public_key, RSAPublicKey): 390 public_key.verify( 391 signature, payload, PKCS1v15(), signature_hash_algorithm 392 ) 393 elif isinstance(public_key, EllipticCurvePublicKey): 394 public_key.verify( 395 signature, payload, ECDSA(signature_hash_algorithm) 396 ) 397 else: 398 raise errors.Error("Unsupported public key type.") 399 400 401def verify_cert_matches_priv_key(cert_path: str, key_path: str) -> None: 402 """ Verifies that the private key and cert match. 403 404 :param str cert_path: path to a cert in PEM format 405 :param str key_path: path to a private key file 406 407 :raises errors.Error: If they don't match. 408 """ 409 try: 410 context = SSL.Context(SSL.SSLv23_METHOD) 411 context.use_certificate_file(cert_path) 412 context.use_privatekey_file(key_path) 413 context.check_privatekey() 414 except (IOError, SSL.Error) as e: 415 error_str = "verifying the certificate located at {0} matches the \ 416 private key located at {1} has failed. \ 417 Details: {2}".format(cert_path, 418 key_path, e) 419 logger.exception(error_str) 420 raise errors.Error(error_str) 421 422 423def verify_fullchain(renewable_cert: interfaces.RenewableCert) -> None: 424 """ Verifies that fullchain is indeed cert concatenated with chain. 425 426 :param renewable_cert: cert to verify 427 :type renewable_cert: certbot.interfaces.RenewableCert 428 429 :raises errors.Error: If cert and chain do not combine to fullchain. 430 """ 431 try: 432 with open(renewable_cert.chain_path) as chain_file: 433 chain = chain_file.read() 434 with open(renewable_cert.cert_path) as cert_file: 435 cert = cert_file.read() 436 with open(renewable_cert.fullchain_path) as fullchain_file: 437 fullchain = fullchain_file.read() 438 if (cert + chain) != fullchain: 439 error_str = "fullchain does not match cert + chain for {0}!" 440 error_str = error_str.format(renewable_cert.lineagename) 441 raise errors.Error(error_str) 442 except IOError as e: 443 error_str = "reading one of cert, chain, or fullchain has failed: {0}".format(e) 444 logger.exception(error_str) 445 raise errors.Error(error_str) 446 except errors.Error as e: 447 raise e 448 449 450def pyopenssl_load_certificate(data: bytes) -> Tuple[crypto.X509, int]: 451 """Load PEM/DER certificate. 452 453 :raises errors.Error: 454 455 """ 456 457 openssl_errors = [] 458 459 for file_type in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1): 460 try: 461 return crypto.load_certificate(file_type, data), file_type 462 except crypto.Error as error: # TODO: other errors? 463 openssl_errors.append(error) 464 raise errors.Error("Unable to load: {0}".format(",".join( 465 str(error) for error in openssl_errors))) 466 467 468def _load_cert_or_req(cert_or_req_str: bytes, 469 load_func: Callable[[int, bytes], Union[crypto.X509, crypto.X509Req]], 470 typ: int = crypto.FILETYPE_PEM) -> Union[crypto.X509, crypto.X509Req]: 471 try: 472 return load_func(typ, cert_or_req_str) 473 except crypto.Error as err: 474 logger.debug("", exc_info=True) 475 logger.error("Encountered error while loading certificate or csr: %s", str(err)) 476 raise 477 478 479def _get_sans_from_cert_or_req(cert_or_req_str: bytes, 480 load_func: Callable[[int, bytes], Union[crypto.X509, 481 crypto.X509Req]], 482 typ: int = crypto.FILETYPE_PEM) -> List[str]: 483 # pylint: disable=protected-access 484 return acme_crypto_util._pyopenssl_cert_or_req_san(_load_cert_or_req( 485 cert_or_req_str, load_func, typ)) 486 487 488def get_sans_from_cert(cert: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]: 489 """Get a list of Subject Alternative Names from a certificate. 490 491 :param str cert: Certificate (encoded). 492 :param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1` 493 494 :returns: A list of Subject Alternative Names. 495 :rtype: list 496 497 """ 498 return _get_sans_from_cert_or_req( 499 cert, crypto.load_certificate, typ) 500 501 502def _get_names_from_cert_or_req(cert_or_req: bytes, 503 load_func: Callable[[int, bytes], Union[crypto.X509, 504 crypto.X509Req]], 505 typ: int) -> List[str]: 506 loaded_cert_or_req = _load_cert_or_req(cert_or_req, load_func, typ) 507 return _get_names_from_loaded_cert_or_req(loaded_cert_or_req) 508 509 510def _get_names_from_loaded_cert_or_req(loaded_cert_or_req: Union[crypto.X509, crypto.X509Req] 511 ) -> List[str]: 512 # pylint: disable=protected-access 513 return acme_crypto_util._pyopenssl_cert_or_req_all_names(loaded_cert_or_req) 514 515 516def get_names_from_cert(cert: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]: 517 """Get a list of domains from a cert, including the CN if it is set. 518 519 :param str cert: Certificate (encoded). 520 :param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1` 521 522 :returns: A list of domain names. 523 :rtype: list 524 525 """ 526 return _get_names_from_cert_or_req( 527 cert, crypto.load_certificate, typ) 528 529 530def get_names_from_req(csr: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]: 531 """Get a list of domains from a CSR, including the CN if it is set. 532 533 :param str csr: CSR (encoded). 534 :param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1` 535 :returns: A list of domain names. 536 :rtype: list 537 538 """ 539 return _get_names_from_cert_or_req(csr, crypto.load_certificate_request, typ) 540 541 542def dump_pyopenssl_chain(chain: Union[List[crypto.X509], List[josepy.ComparableX509]], 543 filetype: int = crypto.FILETYPE_PEM) -> bytes: 544 """Dump certificate chain into a bundle. 545 546 :param list chain: List of `crypto.X509` (or wrapped in 547 :class:`josepy.util.ComparableX509`). 548 549 """ 550 # XXX: returns empty string when no chain is available, which 551 # shuts up RenewableCert, but might not be the best solution... 552 return acme_crypto_util.dump_pyopenssl_chain(chain, filetype) 553 554 555def notBefore(cert_path: str) -> datetime.datetime: 556 """When does the cert at cert_path start being valid? 557 558 :param str cert_path: path to a cert in PEM format 559 560 :returns: the notBefore value from the cert at cert_path 561 :rtype: :class:`datetime.datetime` 562 563 """ 564 return _notAfterBefore(cert_path, crypto.X509.get_notBefore) 565 566 567def notAfter(cert_path: str) -> datetime.datetime: 568 """When does the cert at cert_path stop being valid? 569 570 :param str cert_path: path to a cert in PEM format 571 572 :returns: the notAfter value from the cert at cert_path 573 :rtype: :class:`datetime.datetime` 574 575 """ 576 return _notAfterBefore(cert_path, crypto.X509.get_notAfter) 577 578 579def _notAfterBefore(cert_path: str, 580 method: Callable[[crypto.X509], Optional[bytes]]) -> datetime.datetime: 581 """Internal helper function for finding notbefore/notafter. 582 583 :param str cert_path: path to a cert in PEM format 584 :param function method: one of ``crypto.X509.get_notBefore`` 585 or ``crypto.X509.get_notAfter`` 586 587 :returns: the notBefore or notAfter value from the cert at cert_path 588 :rtype: :class:`datetime.datetime` 589 590 """ 591 # pylint: disable=redefined-outer-name 592 with open(cert_path, "rb") as f: 593 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) 594 # pyopenssl always returns bytes 595 timestamp = method(x509) 596 if not timestamp: 597 raise errors.Error("Error while invoking timestamp method, None has been returned.") 598 reformatted_timestamp = [timestamp[0:4], b"-", timestamp[4:6], b"-", 599 timestamp[6:8], b"T", timestamp[8:10], b":", 600 timestamp[10:12], b":", timestamp[12:]] 601 # pyrfc3339 always uses the type `str` 602 timestamp_bytes = b"".join(reformatted_timestamp) 603 timestamp_str = timestamp_bytes.decode('ascii') 604 return pyrfc3339.parse(timestamp_str) 605 606 607def sha256sum(filename: str) -> str: 608 """Compute a sha256sum of a file. 609 610 NB: In given file, platform specific newlines characters will be converted 611 into their equivalent unicode counterparts before calculating the hash. 612 613 :param str filename: path to the file whose hash will be computed 614 615 :returns: sha256 digest of the file in hexadecimal 616 :rtype: str 617 """ 618 sha256 = hashlib.sha256() 619 with open(filename, 'r') as file_d: 620 sha256.update(file_d.read().encode('UTF-8')) 621 return sha256.hexdigest() 622 623# Finds one CERTIFICATE stricttextualmsg according to rfc7468#section-3. 624# Does not validate the base64text - use crypto.load_certificate. 625CERT_PEM_REGEX = re.compile( 626 b"""-----BEGIN CERTIFICATE-----\r? 627.+?\r? 628-----END CERTIFICATE-----\r? 629""", 630 re.DOTALL # DOTALL (/s) because the base64text may include newlines 631) 632 633 634def cert_and_chain_from_fullchain(fullchain_pem: str) -> Tuple[str, str]: 635 """Split fullchain_pem into cert_pem and chain_pem 636 637 :param str fullchain_pem: concatenated cert + chain 638 639 :returns: tuple of string cert_pem and chain_pem 640 :rtype: tuple 641 642 :raises errors.Error: If there are less than 2 certificates in the chain. 643 644 """ 645 # First pass: find the boundary of each certificate in the chain. 646 # TODO: This will silently skip over any "explanatory text" in between boundaries, 647 # which is prohibited by RFC8555. 648 certs = CERT_PEM_REGEX.findall(fullchain_pem.encode()) 649 if len(certs) < 2: 650 raise errors.Error("failed to parse fullchain into cert and chain: " + 651 "less than 2 certificates in chain") 652 653 # Second pass: for each certificate found, parse it using OpenSSL and re-encode it, 654 # with the effect of normalizing any encoding variations (e.g. CRLF, whitespace). 655 certs_normalized = [crypto.dump_certificate(crypto.FILETYPE_PEM, 656 crypto.load_certificate(crypto.FILETYPE_PEM, cert)).decode() for cert in certs] 657 658 # Since each normalized cert has a newline suffix, no extra newlines are required. 659 return (certs_normalized[0], "".join(certs_normalized[1:])) 660 661 662def get_serial_from_cert(cert_path: str) -> int: 663 """Retrieve the serial number of a certificate from certificate path 664 665 :param str cert_path: path to a cert in PEM format 666 667 :returns: serial number of the certificate 668 :rtype: int 669 """ 670 # pylint: disable=redefined-outer-name 671 with open(cert_path, "rb") as f: 672 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) 673 return x509.get_serial_number() 674 675 676def find_chain_with_issuer(fullchains: List[str], issuer_cn: str, 677 warn_on_no_match: bool = False) -> str: 678 """Chooses the first certificate chain from fullchains whose topmost 679 intermediate has an Issuer Common Name matching issuer_cn (in other words 680 the first chain which chains to a root whose name matches issuer_cn). 681 682 :param fullchains: The list of fullchains in PEM chain format. 683 :type fullchains: `list` of `str` 684 :param `str` issuer_cn: The exact Subject Common Name to match against any 685 issuer in the certificate chain. 686 687 :returns: The best-matching fullchain, PEM-encoded, or the first if none match. 688 :rtype: `str` 689 """ 690 for chain in fullchains: 691 certs = CERT_PEM_REGEX.findall(chain.encode()) 692 top_cert = x509.load_pem_x509_certificate(certs[-1], default_backend()) 693 top_issuer_cn = top_cert.issuer.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 694 if top_issuer_cn and top_issuer_cn[0].value == issuer_cn: 695 return chain 696 697 # Nothing matched, return whatever was first in the list. 698 if warn_on_no_match: 699 logger.warning("Certbot has been configured to prefer certificate chains with " 700 "issuer '%s', but no chain from the CA matched this issuer. Using " 701 "the default certificate chain instead.", issuer_cn) 702 return fullchains[0] 703