1"""Certbot client crypto utility functions.
2
3.. todo:: Make the transition to use PSS rather than PKCS1_v1_5 when the server
4    is capable of handling the signatures.
5
6"""
7import datetime
8import hashlib
9import logging
10import re
11from typing import Callable
12from typing import List
13from typing import Optional
14from typing import Set
15from typing import Tuple
16from typing import TYPE_CHECKING
17from typing import Union
18import warnings
19
20from cryptography import x509
21from cryptography.exceptions import InvalidSignature
22from cryptography.exceptions import UnsupportedAlgorithm
23from cryptography.hazmat.backends import default_backend
24from cryptography.hazmat.primitives import hashes
25from cryptography.hazmat.primitives.asymmetric import ec
26from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey
27from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
28from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
29from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
30from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
31from cryptography.hazmat.primitives.serialization import Encoding
32from cryptography.hazmat.primitives.serialization import NoEncryption
33from cryptography.hazmat.primitives.serialization import PrivateFormat
34import josepy
35from OpenSSL import crypto
36from OpenSSL import SSL
37import pyrfc3339
38import zope.component
39
40from acme import crypto_util as acme_crypto_util
41from certbot import errors
42from certbot import interfaces
43from certbot import util
44from certbot.compat import os
45
46# Cryptography ed448 and ed25519 modules do not exist on oldest tests
47if TYPE_CHECKING:
48    from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey
49    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
50
51logger = logging.getLogger(__name__)
52
53
54# High level functions
55
56def generate_key(key_size: int, key_dir: str, key_type: str = "rsa",
57                 elliptic_curve: str = "secp256r1", keyname: str = "key-certbot.pem",
58                 strict_permissions: bool = True) -> util.Key:
59    """Initializes and saves a privkey.
60
61    Inits key and saves it in PEM format on the filesystem.
62
63    .. note:: keyname is the attempted filename, it may be different if a file
64        already exists at the path.
65
66    :param int key_size: key size in bits if key size is rsa.
67    :param str key_dir: Key save directory.
68    :param str key_type: Key Type [rsa, ecdsa]
69    :param str elliptic_curve: Name of the elliptic curve if key type is ecdsa.
70    :param str keyname: Filename of key
71    :param bool strict_permissions: If true and key_dir exists, an exception is raised if
72        the directory doesn't have 0700 permissions or isn't owned by the current user.
73
74    :returns: Key
75    :rtype: :class:`certbot.util.Key`
76
77    :raises ValueError: If unable to generate the key given key_size.
78
79    """
80    try:
81        key_pem = make_key(
82            bits=key_size, elliptic_curve=elliptic_curve or "secp256r1", key_type=key_type,
83        )
84    except ValueError as err:
85        logger.debug("", exc_info=True)
86        logger.error("Encountered error while making key: %s", str(err))
87        raise err
88
89    # Save file
90    util.make_or_verify_dir(key_dir, 0o700, strict_permissions)
91    key_f, key_path = util.unique_file(
92        os.path.join(key_dir, keyname), 0o600, "wb")
93    with key_f:
94        key_f.write(key_pem)
95    if key_type == 'rsa':
96        logger.debug("Generating RSA key (%d bits): %s", key_size, key_path)
97    else:
98        logger.debug("Generating ECDSA key (%d bits): %s", key_size, key_path)
99
100    return util.Key(key_path, key_pem)
101
102
103# TODO: Remove this call once zope dependencies are removed from Certbot.
104def init_save_key(key_size: int, key_dir: str, key_type: str = "rsa",
105                  elliptic_curve: str = "secp256r1",
106                  keyname: str = "key-certbot.pem") -> util.Key:
107    """Initializes and saves a privkey.
108
109    Inits key and saves it in PEM format on the filesystem.
110
111    .. note:: keyname is the attempted filename, it may be different if a file
112        already exists at the path.
113
114    .. deprecated:: 1.16.0
115       Use :func:`generate_key` instead.
116
117    :param int key_size: key size in bits if key size is rsa.
118    :param str key_dir: Key save directory.
119    :param str key_type: Key Type [rsa, ecdsa]
120    :param str elliptic_curve: Name of the elliptic curve if key type is ecdsa.
121    :param str keyname: Filename of key
122
123    :returns: Key
124    :rtype: :class:`certbot.util.Key`
125
126    :raises ValueError: If unable to generate the key given key_size.
127
128    """
129    warnings.warn("certbot.crypto_util.init_save_key is deprecated, please use "
130                  "certbot.crypto_util.generate_key instead.", DeprecationWarning)
131
132    config = zope.component.getUtility(interfaces.IConfig)
133
134    return generate_key(key_size, key_dir, key_type=key_type, elliptic_curve=elliptic_curve,
135                        keyname=keyname, strict_permissions=config.strict_permissions)
136
137
138def generate_csr(privkey: util.Key, names: Union[List[str], Set[str]], path: str,
139                 must_staple: bool = False, strict_permissions: bool = True) -> util.CSR:
140    """Initialize a CSR with the given private key.
141
142    :param privkey: Key to include in the CSR
143    :type privkey: :class:`certbot.util.Key`
144    :param set names: `str` names to include in the CSR
145    :param str path: Certificate save directory.
146    :param bool must_staple: If true, include the TLS Feature extension "OCSP Must Staple"
147    :param bool strict_permissions: If true and path exists, an exception is raised if
148        the directory doesn't have 0755 permissions or isn't owned by the current user.
149
150    :returns: CSR
151    :rtype: :class:`certbot.util.CSR`
152
153    """
154    csr_pem = acme_crypto_util.make_csr(
155        privkey.pem, names, must_staple=must_staple)
156
157    # Save CSR
158    util.make_or_verify_dir(path, 0o755, strict_permissions)
159    csr_f, csr_filename = util.unique_file(
160        os.path.join(path, "csr-certbot.pem"), 0o644, "wb")
161    with csr_f:
162        csr_f.write(csr_pem)
163    logger.debug("Creating CSR: %s", csr_filename)
164
165    return util.CSR(csr_filename, csr_pem, "pem")
166
167
168# TODO: Remove this call once zope dependencies are removed from Certbot.
169def init_save_csr(privkey: util.Key, names: Set[str], path: str) -> util.CSR:
170    """Initialize a CSR with the given private key.
171
172    .. deprecated:: 1.16.0
173       Use :func:`generate_csr` instead.
174
175    :param privkey: Key to include in the CSR
176    :type privkey: :class:`certbot.util.Key`
177
178    :param set names: `str` names to include in the CSR
179
180    :param str path: Certificate save directory.
181
182    :returns: CSR
183    :rtype: :class:`certbot.util.CSR`
184
185    """
186    warnings.warn("certbot.crypto_util.init_save_csr is deprecated, please use "
187                  "certbot.crypto_util.generate_csr instead.", DeprecationWarning)
188
189    config = zope.component.getUtility(interfaces.IConfig)
190
191    return generate_csr(privkey, names, path, must_staple=config.must_staple,
192                        strict_permissions=config.strict_permissions)
193
194
195# WARNING: the csr and private key file are possible attack vectors for TOCTOU
196# We should either...
197# A. Do more checks to verify that the CSR is trusted/valid
198# B. Audit the parsing code for vulnerabilities
199
200def valid_csr(csr: bytes) -> bool:
201    """Validate CSR.
202
203    Check if `csr` is a valid CSR for the given domains.
204
205    :param bytes csr: CSR in PEM.
206
207    :returns: Validity of CSR.
208    :rtype: bool
209
210    """
211    try:
212        req = crypto.load_certificate_request(
213            crypto.FILETYPE_PEM, csr)
214        return req.verify(req.get_pubkey())
215    except crypto.Error:
216        logger.debug("", exc_info=True)
217        return False
218
219
220def csr_matches_pubkey(csr: bytes, privkey: bytes) -> bool:
221    """Does private key correspond to the subject public key in the CSR?
222
223    :param bytes csr: CSR in PEM.
224    :param bytes privkey: Private key file contents (PEM)
225
226    :returns: Correspondence of private key to CSR subject public key.
227    :rtype: bool
228
229    """
230    req = crypto.load_certificate_request(
231        crypto.FILETYPE_PEM, csr)
232    pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, privkey)
233    try:
234        return req.verify(pkey)
235    except crypto.Error:
236        logger.debug("", exc_info=True)
237        return False
238
239
240def import_csr_file(csrfile: str, data: bytes) -> Tuple[int, util.CSR, List[str]]:
241    """Import a CSR file, which can be either PEM or DER.
242
243    :param str csrfile: CSR filename
244    :param bytes data: contents of the CSR file
245
246    :returns: (`crypto.FILETYPE_PEM`,
247               util.CSR object representing the CSR,
248               list of domains requested in the CSR)
249    :rtype: tuple
250
251    """
252    PEM = crypto.FILETYPE_PEM
253    load = crypto.load_certificate_request
254    try:
255        # Try to parse as DER first, then fall back to PEM.
256        csr = load(crypto.FILETYPE_ASN1, data)
257    except crypto.Error:
258        try:
259            csr = load(PEM, data)
260        except crypto.Error:
261            raise errors.Error("Failed to parse CSR file: {0}".format(csrfile))
262
263    domains = _get_names_from_loaded_cert_or_req(csr)
264    # Internally we always use PEM, so re-encode as PEM before returning.
265    data_pem = crypto.dump_certificate_request(PEM, csr)
266    return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains
267
268
269def make_key(bits: int = 1024, key_type: str = "rsa",
270             elliptic_curve: Optional[str] = None) -> bytes:
271    """Generate PEM encoded RSA|EC key.
272
273    :param int bits: Number of bits if key_type=rsa. At least 1024 for RSA.
274    :param str key_type: The type of key to generate, but be rsa or ecdsa
275    :param str elliptic_curve: The elliptic curve to use.
276
277    :returns: new RSA or ECDSA key in PEM form with specified number of bits
278              or of type ec_curve when key_type ecdsa is used.
279    :rtype: str
280    """
281    if key_type == 'rsa':
282        if bits < 1024:
283            raise errors.Error("Unsupported RSA key length: {}".format(bits))
284
285        key = crypto.PKey()
286        key.generate_key(crypto.TYPE_RSA, bits)
287    elif key_type == 'ecdsa':
288        if not elliptic_curve:
289            raise errors.Error("When key_type == ecdsa, elliptic_curve must be set.")
290        try:
291            name = elliptic_curve.upper()
292            if name in ('SECP256R1', 'SECP384R1', 'SECP521R1'):
293                _key = ec.generate_private_key(
294                    curve=getattr(ec, elliptic_curve.upper(), None)(),
295                    backend=default_backend()
296                )
297            else:
298                raise errors.Error("Unsupported elliptic curve: {}".format(elliptic_curve))
299        except TypeError:
300            raise errors.Error("Unsupported elliptic curve: {}".format(elliptic_curve))
301        except UnsupportedAlgorithm as e:
302            raise e from errors.Error(str(e))
303        # This type ignore directive is required due to an outdated version of types-cryptography.
304        # It can be removed once package types-pyOpenSSL depends on cryptography instead of
305        # types-cryptography and so types-cryptography is not installed anymore.
306        # See https://github.com/python/typeshed/issues/5618
307        _key_pem = _key.private_bytes(  # type: ignore
308            encoding=Encoding.PEM,
309            format=PrivateFormat.TraditionalOpenSSL,
310            encryption_algorithm=NoEncryption()
311        )
312        key = crypto.load_privatekey(crypto.FILETYPE_PEM, _key_pem)
313    else:
314        raise errors.Error("Invalid key_type specified: {}.  Use [rsa|ecdsa]".format(key_type))
315    return crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
316
317
318def valid_privkey(privkey: str) -> bool:
319    """Is valid RSA private key?
320
321    :param str privkey: Private key file contents in PEM
322
323    :returns: Validity of private key.
324    :rtype: bool
325
326    """
327    try:
328        return crypto.load_privatekey(
329            crypto.FILETYPE_PEM, privkey).check()
330    except (TypeError, crypto.Error):
331        return False
332
333
334def verify_renewable_cert(renewable_cert: interfaces.RenewableCert) -> None:
335    """For checking that your certs were not corrupted on disk.
336
337    Several things are checked:
338        1. Signature verification for the cert.
339        2. That fullchain matches cert and chain when concatenated.
340        3. Check that the private key matches the certificate.
341
342    :param renewable_cert: cert to verify
343    :type renewable_cert: certbot.interfaces.RenewableCert
344
345    :raises errors.Error: If verification fails.
346    """
347    verify_renewable_cert_sig(renewable_cert)
348    verify_fullchain(renewable_cert)
349    verify_cert_matches_priv_key(renewable_cert.cert_path, renewable_cert.key_path)
350
351
352def verify_renewable_cert_sig(renewable_cert: interfaces.RenewableCert) -> None:
353    """Verifies the signature of a RenewableCert object.
354
355    :param renewable_cert: cert to verify
356    :type renewable_cert: certbot.interfaces.RenewableCert
357
358    :raises errors.Error: If signature verification fails.
359    """
360    try:
361        with open(renewable_cert.chain_path, 'rb') as chain_file:
362            chain = x509.load_pem_x509_certificate(chain_file.read(), default_backend())
363        with open(renewable_cert.cert_path, 'rb') as cert_file:
364            cert = x509.load_pem_x509_certificate(cert_file.read(), default_backend())
365        pk = chain.public_key()
366        verify_signed_payload(pk, cert.signature, cert.tbs_certificate_bytes,
367                                cert.signature_hash_algorithm)
368    except (IOError, ValueError, InvalidSignature) as e:
369        error_str = "verifying the signature of the certificate located at {0} has failed. \
370                Details: {1}".format(renewable_cert.cert_path, e)
371        logger.exception(error_str)
372        raise errors.Error(error_str)
373
374
375def verify_signed_payload(public_key: Union[DSAPublicKey, 'Ed25519PublicKey', 'Ed448PublicKey',
376                                            EllipticCurvePublicKey, RSAPublicKey],
377                          signature: bytes, payload: bytes,
378                          signature_hash_algorithm: hashes.HashAlgorithm) -> None:
379    """Check the signature of a payload.
380
381    :param RSAPublicKey/EllipticCurvePublicKey public_key: the public_key to check signature
382    :param bytes signature: the signature bytes
383    :param bytes payload: the payload bytes
384    :param hashes.HashAlgorithm signature_hash_algorithm: algorithm used to hash the payload
385
386    :raises InvalidSignature: If signature verification fails.
387    :raises errors.Error: If public key type is not supported
388    """
389    if isinstance(public_key, RSAPublicKey):
390        public_key.verify(
391            signature, payload, PKCS1v15(), signature_hash_algorithm
392        )
393    elif isinstance(public_key, EllipticCurvePublicKey):
394        public_key.verify(
395            signature, payload, ECDSA(signature_hash_algorithm)
396        )
397    else:
398        raise errors.Error("Unsupported public key type.")
399
400
401def verify_cert_matches_priv_key(cert_path: str, key_path: str) -> None:
402    """ Verifies that the private key and cert match.
403
404    :param str cert_path: path to a cert in PEM format
405    :param str key_path: path to a private key file
406
407    :raises errors.Error: If they don't match.
408    """
409    try:
410        context = SSL.Context(SSL.SSLv23_METHOD)
411        context.use_certificate_file(cert_path)
412        context.use_privatekey_file(key_path)
413        context.check_privatekey()
414    except (IOError, SSL.Error) as e:
415        error_str = "verifying the certificate located at {0} matches the \
416                private key located at {1} has failed. \
417                Details: {2}".format(cert_path,
418                        key_path, e)
419        logger.exception(error_str)
420        raise errors.Error(error_str)
421
422
423def verify_fullchain(renewable_cert: interfaces.RenewableCert) -> None:
424    """ Verifies that fullchain is indeed cert concatenated with chain.
425
426    :param renewable_cert: cert to verify
427    :type renewable_cert: certbot.interfaces.RenewableCert
428
429    :raises errors.Error: If cert and chain do not combine to fullchain.
430    """
431    try:
432        with open(renewable_cert.chain_path) as chain_file:
433            chain = chain_file.read()
434        with open(renewable_cert.cert_path) as cert_file:
435            cert = cert_file.read()
436        with open(renewable_cert.fullchain_path) as fullchain_file:
437            fullchain = fullchain_file.read()
438        if (cert + chain) != fullchain:
439            error_str = "fullchain does not match cert + chain for {0}!"
440            error_str = error_str.format(renewable_cert.lineagename)
441            raise errors.Error(error_str)
442    except IOError as e:
443        error_str = "reading one of cert, chain, or fullchain has failed: {0}".format(e)
444        logger.exception(error_str)
445        raise errors.Error(error_str)
446    except errors.Error as e:
447        raise e
448
449
450def pyopenssl_load_certificate(data: bytes) -> Tuple[crypto.X509, int]:
451    """Load PEM/DER certificate.
452
453    :raises errors.Error:
454
455    """
456
457    openssl_errors = []
458
459    for file_type in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1):
460        try:
461            return crypto.load_certificate(file_type, data), file_type
462        except crypto.Error as error:  # TODO: other errors?
463            openssl_errors.append(error)
464    raise errors.Error("Unable to load: {0}".format(",".join(
465        str(error) for error in openssl_errors)))
466
467
468def _load_cert_or_req(cert_or_req_str: bytes,
469                      load_func: Callable[[int, bytes], Union[crypto.X509, crypto.X509Req]],
470                      typ: int = crypto.FILETYPE_PEM) -> Union[crypto.X509, crypto.X509Req]:
471    try:
472        return load_func(typ, cert_or_req_str)
473    except crypto.Error as err:
474        logger.debug("", exc_info=True)
475        logger.error("Encountered error while loading certificate or csr: %s", str(err))
476        raise
477
478
479def _get_sans_from_cert_or_req(cert_or_req_str: bytes,
480                               load_func: Callable[[int, bytes], Union[crypto.X509,
481                                                                       crypto.X509Req]],
482                               typ: int = crypto.FILETYPE_PEM) -> List[str]:
483    # pylint: disable=protected-access
484    return acme_crypto_util._pyopenssl_cert_or_req_san(_load_cert_or_req(
485        cert_or_req_str, load_func, typ))
486
487
488def get_sans_from_cert(cert: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]:
489    """Get a list of Subject Alternative Names from a certificate.
490
491    :param str cert: Certificate (encoded).
492    :param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
493
494    :returns: A list of Subject Alternative Names.
495    :rtype: list
496
497    """
498    return _get_sans_from_cert_or_req(
499        cert, crypto.load_certificate, typ)
500
501
502def _get_names_from_cert_or_req(cert_or_req: bytes,
503                                load_func: Callable[[int, bytes], Union[crypto.X509,
504                                                                        crypto.X509Req]],
505                                typ: int) -> List[str]:
506    loaded_cert_or_req = _load_cert_or_req(cert_or_req, load_func, typ)
507    return _get_names_from_loaded_cert_or_req(loaded_cert_or_req)
508
509
510def _get_names_from_loaded_cert_or_req(loaded_cert_or_req: Union[crypto.X509, crypto.X509Req]
511                                       ) -> List[str]:
512    # pylint: disable=protected-access
513    return acme_crypto_util._pyopenssl_cert_or_req_all_names(loaded_cert_or_req)
514
515
516def get_names_from_cert(cert: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]:
517    """Get a list of domains from a cert, including the CN if it is set.
518
519    :param str cert: Certificate (encoded).
520    :param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
521
522    :returns: A list of domain names.
523    :rtype: list
524
525    """
526    return _get_names_from_cert_or_req(
527        cert, crypto.load_certificate, typ)
528
529
530def get_names_from_req(csr: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]:
531    """Get a list of domains from a CSR, including the CN if it is set.
532
533    :param str csr: CSR (encoded).
534    :param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
535    :returns: A list of domain names.
536    :rtype: list
537
538    """
539    return _get_names_from_cert_or_req(csr, crypto.load_certificate_request, typ)
540
541
542def dump_pyopenssl_chain(chain: Union[List[crypto.X509], List[josepy.ComparableX509]],
543                         filetype: int = crypto.FILETYPE_PEM) -> bytes:
544    """Dump certificate chain into a bundle.
545
546    :param list chain: List of `crypto.X509` (or wrapped in
547        :class:`josepy.util.ComparableX509`).
548
549    """
550    # XXX: returns empty string when no chain is available, which
551    # shuts up RenewableCert, but might not be the best solution...
552    return acme_crypto_util.dump_pyopenssl_chain(chain, filetype)
553
554
555def notBefore(cert_path: str) -> datetime.datetime:
556    """When does the cert at cert_path start being valid?
557
558    :param str cert_path: path to a cert in PEM format
559
560    :returns: the notBefore value from the cert at cert_path
561    :rtype: :class:`datetime.datetime`
562
563    """
564    return _notAfterBefore(cert_path, crypto.X509.get_notBefore)
565
566
567def notAfter(cert_path: str) -> datetime.datetime:
568    """When does the cert at cert_path stop being valid?
569
570    :param str cert_path: path to a cert in PEM format
571
572    :returns: the notAfter value from the cert at cert_path
573    :rtype: :class:`datetime.datetime`
574
575    """
576    return _notAfterBefore(cert_path, crypto.X509.get_notAfter)
577
578
579def _notAfterBefore(cert_path: str,
580                    method: Callable[[crypto.X509], Optional[bytes]]) -> datetime.datetime:
581    """Internal helper function for finding notbefore/notafter.
582
583    :param str cert_path: path to a cert in PEM format
584    :param function method: one of ``crypto.X509.get_notBefore``
585        or ``crypto.X509.get_notAfter``
586
587    :returns: the notBefore or notAfter value from the cert at cert_path
588    :rtype: :class:`datetime.datetime`
589
590    """
591    # pylint: disable=redefined-outer-name
592    with open(cert_path, "rb") as f:
593        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
594    # pyopenssl always returns bytes
595    timestamp = method(x509)
596    if not timestamp:
597        raise errors.Error("Error while invoking timestamp method, None has been returned.")
598    reformatted_timestamp = [timestamp[0:4], b"-", timestamp[4:6], b"-",
599                             timestamp[6:8], b"T", timestamp[8:10], b":",
600                             timestamp[10:12], b":", timestamp[12:]]
601    # pyrfc3339 always uses the type `str`
602    timestamp_bytes = b"".join(reformatted_timestamp)
603    timestamp_str = timestamp_bytes.decode('ascii')
604    return pyrfc3339.parse(timestamp_str)
605
606
607def sha256sum(filename: str) -> str:
608    """Compute a sha256sum of a file.
609
610    NB: In given file, platform specific newlines characters will be converted
611    into their equivalent unicode counterparts before calculating the hash.
612
613    :param str filename: path to the file whose hash will be computed
614
615    :returns: sha256 digest of the file in hexadecimal
616    :rtype: str
617    """
618    sha256 = hashlib.sha256()
619    with open(filename, 'r') as file_d:
620        sha256.update(file_d.read().encode('UTF-8'))
621    return sha256.hexdigest()
622
623# Finds one CERTIFICATE stricttextualmsg according to rfc7468#section-3.
624# Does not validate the base64text - use crypto.load_certificate.
625CERT_PEM_REGEX = re.compile(
626    b"""-----BEGIN CERTIFICATE-----\r?
627.+?\r?
628-----END CERTIFICATE-----\r?
629""",
630    re.DOTALL # DOTALL (/s) because the base64text may include newlines
631)
632
633
634def cert_and_chain_from_fullchain(fullchain_pem: str) -> Tuple[str, str]:
635    """Split fullchain_pem into cert_pem and chain_pem
636
637    :param str fullchain_pem: concatenated cert + chain
638
639    :returns: tuple of string cert_pem and chain_pem
640    :rtype: tuple
641
642    :raises errors.Error: If there are less than 2 certificates in the chain.
643
644    """
645    # First pass: find the boundary of each certificate in the chain.
646    # TODO: This will silently skip over any "explanatory text" in between boundaries,
647    # which is prohibited by RFC8555.
648    certs = CERT_PEM_REGEX.findall(fullchain_pem.encode())
649    if len(certs) < 2:
650        raise errors.Error("failed to parse fullchain into cert and chain: " +
651                           "less than 2 certificates in chain")
652
653    # Second pass: for each certificate found, parse it using OpenSSL and re-encode it,
654    # with the effect of normalizing any encoding variations (e.g. CRLF, whitespace).
655    certs_normalized = [crypto.dump_certificate(crypto.FILETYPE_PEM,
656        crypto.load_certificate(crypto.FILETYPE_PEM, cert)).decode() for cert in certs]
657
658    # Since each normalized cert has a newline suffix, no extra newlines are required.
659    return (certs_normalized[0], "".join(certs_normalized[1:]))
660
661
662def get_serial_from_cert(cert_path: str) -> int:
663    """Retrieve the serial number of a certificate from certificate path
664
665    :param str cert_path: path to a cert in PEM format
666
667    :returns: serial number of the certificate
668    :rtype: int
669    """
670    # pylint: disable=redefined-outer-name
671    with open(cert_path, "rb") as f:
672        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
673    return x509.get_serial_number()
674
675
676def find_chain_with_issuer(fullchains: List[str], issuer_cn: str,
677                           warn_on_no_match: bool = False) -> str:
678    """Chooses the first certificate chain from fullchains whose topmost
679    intermediate has an Issuer Common Name matching issuer_cn (in other words
680    the first chain which chains to a root whose name matches issuer_cn).
681
682    :param fullchains: The list of fullchains in PEM chain format.
683    :type fullchains: `list` of `str`
684    :param `str` issuer_cn: The exact Subject Common Name to match against any
685        issuer in the certificate chain.
686
687    :returns: The best-matching fullchain, PEM-encoded, or the first if none match.
688    :rtype: `str`
689    """
690    for chain in fullchains:
691        certs = CERT_PEM_REGEX.findall(chain.encode())
692        top_cert = x509.load_pem_x509_certificate(certs[-1], default_backend())
693        top_issuer_cn = top_cert.issuer.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
694        if top_issuer_cn and top_issuer_cn[0].value == issuer_cn:
695            return chain
696
697    # Nothing matched, return whatever was first in the list.
698    if warn_on_no_match:
699        logger.warning("Certbot has been configured to prefer certificate chains with "
700                    "issuer '%s', but no chain from the CA matched this issuer. Using "
701                    "the default certificate chain instead.", issuer_cn)
702    return fullchains[0]
703