1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import datetime
8import ipaddress
9
10import six
11
12from cryptography import x509
13from cryptography.hazmat._der import DERReader, INTEGER, NULL, SEQUENCE
14from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM
15from cryptography.x509.name import _ASN1_TYPE_TO_ENUM
16from cryptography.x509.oid import (
17    CRLEntryExtensionOID,
18    CertificatePoliciesOID,
19    ExtensionOID,
20    OCSPExtensionOID,
21)
22
23
24def _obj2txt(backend, obj):
25    # Set to 80 on the recommendation of
26    # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
27    #
28    # But OIDs longer than this occur in real life (e.g. Active
29    # Directory makes some very long OIDs).  So we need to detect
30    # and properly handle the case where the default buffer is not
31    # big enough.
32    #
33    buf_len = 80
34    buf = backend._ffi.new("char[]", buf_len)
35
36    # 'res' is the number of bytes that *would* be written if the
37    # buffer is large enough.  If 'res' > buf_len - 1, we need to
38    # alloc a big-enough buffer and go again.
39    res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
40    if res > buf_len - 1:  # account for terminating null byte
41        buf_len = res + 1
42        buf = backend._ffi.new("char[]", buf_len)
43        res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
44    backend.openssl_assert(res > 0)
45    return backend._ffi.buffer(buf, res)[:].decode()
46
47
48def _decode_x509_name_entry(backend, x509_name_entry):
49    obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
50    backend.openssl_assert(obj != backend._ffi.NULL)
51    data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
52    backend.openssl_assert(data != backend._ffi.NULL)
53    value = _asn1_string_to_utf8(backend, data)
54    oid = _obj2txt(backend, obj)
55    type = _ASN1_TYPE_TO_ENUM[data.type]
56
57    return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type)
58
59
60def _decode_x509_name(backend, x509_name):
61    count = backend._lib.X509_NAME_entry_count(x509_name)
62    attributes = []
63    prev_set_id = -1
64    for x in range(count):
65        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
66        attribute = _decode_x509_name_entry(backend, entry)
67        set_id = backend._lib.X509_NAME_ENTRY_set(entry)
68        if set_id != prev_set_id:
69            attributes.append({attribute})
70        else:
71            # is in the same RDN a previous entry
72            attributes[-1].add(attribute)
73        prev_set_id = set_id
74
75    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
76
77
78def _decode_general_names(backend, gns):
79    num = backend._lib.sk_GENERAL_NAME_num(gns)
80    names = []
81    for i in range(num):
82        gn = backend._lib.sk_GENERAL_NAME_value(gns, i)
83        backend.openssl_assert(gn != backend._ffi.NULL)
84        names.append(_decode_general_name(backend, gn))
85
86    return names
87
88
89def _decode_general_name(backend, gn):
90    if gn.type == backend._lib.GEN_DNS:
91        # Convert to bytes and then decode to utf8. We don't use
92        # asn1_string_to_utf8 here because it doesn't properly convert
93        # utf8 from ia5strings.
94        data = _asn1_string_to_bytes(backend, gn.d.dNSName).decode("utf8")
95        # We don't use the constructor for DNSName so we can bypass validation
96        # This allows us to create DNSName objects that have unicode chars
97        # when a certificate (against the RFC) contains them.
98        return x509.DNSName._init_without_validation(data)
99    elif gn.type == backend._lib.GEN_URI:
100        # Convert to bytes and then decode to utf8. We don't use
101        # asn1_string_to_utf8 here because it doesn't properly convert
102        # utf8 from ia5strings.
103        data = _asn1_string_to_bytes(
104            backend, gn.d.uniformResourceIdentifier
105        ).decode("utf8")
106        # We don't use the constructor for URI so we can bypass validation
107        # This allows us to create URI objects that have unicode chars
108        # when a certificate (against the RFC) contains them.
109        return x509.UniformResourceIdentifier._init_without_validation(data)
110    elif gn.type == backend._lib.GEN_RID:
111        oid = _obj2txt(backend, gn.d.registeredID)
112        return x509.RegisteredID(x509.ObjectIdentifier(oid))
113    elif gn.type == backend._lib.GEN_IPADD:
114        data = _asn1_string_to_bytes(backend, gn.d.iPAddress)
115        data_len = len(data)
116        if data_len == 8 or data_len == 32:
117            # This is an IPv4 or IPv6 Network and not a single IP. This
118            # type of data appears in Name Constraints. Unfortunately,
119            # ipaddress doesn't support packed bytes + netmask. Additionally,
120            # IPv6Network can only handle CIDR rather than the full 16 byte
121            # netmask. To handle this we convert the netmask to integer, then
122            # find the first 0 bit, which will be the prefix. If another 1
123            # bit is present after that the netmask is invalid.
124            base = ipaddress.ip_address(data[: data_len // 2])
125            netmask = ipaddress.ip_address(data[data_len // 2 :])
126            bits = bin(int(netmask))[2:]
127            prefix = bits.find("0")
128            # If no 0 bits are found it is a /32 or /128
129            if prefix == -1:
130                prefix = len(bits)
131
132            if "1" in bits[prefix:]:
133                raise ValueError("Invalid netmask")
134
135            ip = ipaddress.ip_network(base.exploded + u"/{}".format(prefix))
136        else:
137            ip = ipaddress.ip_address(data)
138
139        return x509.IPAddress(ip)
140    elif gn.type == backend._lib.GEN_DIRNAME:
141        return x509.DirectoryName(
142            _decode_x509_name(backend, gn.d.directoryName)
143        )
144    elif gn.type == backend._lib.GEN_EMAIL:
145        # Convert to bytes and then decode to utf8. We don't use
146        # asn1_string_to_utf8 here because it doesn't properly convert
147        # utf8 from ia5strings.
148        data = _asn1_string_to_bytes(backend, gn.d.rfc822Name).decode("utf8")
149        # We don't use the constructor for RFC822Name so we can bypass
150        # validation. This allows us to create RFC822Name objects that have
151        # unicode chars when a certificate (against the RFC) contains them.
152        return x509.RFC822Name._init_without_validation(data)
153    elif gn.type == backend._lib.GEN_OTHERNAME:
154        type_id = _obj2txt(backend, gn.d.otherName.type_id)
155        value = _asn1_to_der(backend, gn.d.otherName.value)
156        return x509.OtherName(x509.ObjectIdentifier(type_id), value)
157    else:
158        # x400Address or ediPartyName
159        raise x509.UnsupportedGeneralNameType(
160            "{} is not a supported type".format(
161                x509._GENERAL_NAMES.get(gn.type, gn.type)
162            ),
163            gn.type,
164        )
165
166
167def _decode_ocsp_no_check(backend, ext):
168    return x509.OCSPNoCheck()
169
170
171def _decode_crl_number(backend, ext):
172    asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
173    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
174    return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int))
175
176
177def _decode_delta_crl_indicator(backend, ext):
178    asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
179    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
180    return x509.DeltaCRLIndicator(_asn1_integer_to_int(backend, asn1_int))
181
182
183class _X509ExtensionParser(object):
184    def __init__(self, backend, ext_count, get_ext, handlers):
185        self.ext_count = ext_count
186        self.get_ext = get_ext
187        self.handlers = handlers
188        self._backend = backend
189
190    def parse(self, x509_obj):
191        extensions = []
192        seen_oids = set()
193        for i in range(self.ext_count(x509_obj)):
194            ext = self.get_ext(x509_obj, i)
195            self._backend.openssl_assert(ext != self._backend._ffi.NULL)
196            crit = self._backend._lib.X509_EXTENSION_get_critical(ext)
197            critical = crit == 1
198            oid = x509.ObjectIdentifier(
199                _obj2txt(
200                    self._backend,
201                    self._backend._lib.X509_EXTENSION_get_object(ext),
202                )
203            )
204            if oid in seen_oids:
205                raise x509.DuplicateExtension(
206                    "Duplicate {} extension found".format(oid), oid
207                )
208
209            # These OIDs are only supported in OpenSSL 1.1.0+ but we want
210            # to support them in all versions of OpenSSL so we decode them
211            # ourselves.
212            if oid == ExtensionOID.TLS_FEATURE:
213                # The extension contents are a SEQUENCE OF INTEGERs.
214                data = self._backend._lib.X509_EXTENSION_get_data(ext)
215                data_bytes = _asn1_string_to_bytes(self._backend, data)
216                features = DERReader(data_bytes).read_single_element(SEQUENCE)
217                parsed = []
218                while not features.is_empty():
219                    parsed.append(features.read_element(INTEGER).as_integer())
220                # Map the features to their enum value.
221                value = x509.TLSFeature(
222                    [_TLS_FEATURE_TYPE_TO_ENUM[x] for x in parsed]
223                )
224                extensions.append(x509.Extension(oid, critical, value))
225                seen_oids.add(oid)
226                continue
227            elif oid == ExtensionOID.PRECERT_POISON:
228                data = self._backend._lib.X509_EXTENSION_get_data(ext)
229                # The contents of the extension must be an ASN.1 NULL.
230                reader = DERReader(_asn1_string_to_bytes(self._backend, data))
231                reader.read_single_element(NULL).check_empty()
232                extensions.append(
233                    x509.Extension(oid, critical, x509.PrecertPoison())
234                )
235                seen_oids.add(oid)
236                continue
237
238            try:
239                handler = self.handlers[oid]
240            except KeyError:
241                # Dump the DER payload into an UnrecognizedExtension object
242                data = self._backend._lib.X509_EXTENSION_get_data(ext)
243                self._backend.openssl_assert(data != self._backend._ffi.NULL)
244                der = self._backend._ffi.buffer(data.data, data.length)[:]
245                unrecognized = x509.UnrecognizedExtension(oid, der)
246                extensions.append(x509.Extension(oid, critical, unrecognized))
247            else:
248                ext_data = self._backend._lib.X509V3_EXT_d2i(ext)
249                if ext_data == self._backend._ffi.NULL:
250                    self._backend._consume_errors()
251                    raise ValueError(
252                        "The {} extension is invalid and can't be "
253                        "parsed".format(oid)
254                    )
255
256                value = handler(self._backend, ext_data)
257                extensions.append(x509.Extension(oid, critical, value))
258
259            seen_oids.add(oid)
260
261        return x509.Extensions(extensions)
262
263
264def _decode_certificate_policies(backend, cp):
265    cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp)
266    cp = backend._ffi.gc(cp, backend._lib.CERTIFICATEPOLICIES_free)
267
268    num = backend._lib.sk_POLICYINFO_num(cp)
269    certificate_policies = []
270    for i in range(num):
271        qualifiers = None
272        pi = backend._lib.sk_POLICYINFO_value(cp, i)
273        oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid))
274        if pi.qualifiers != backend._ffi.NULL:
275            qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
276            qualifiers = []
277            for j in range(qnum):
278                pqi = backend._lib.sk_POLICYQUALINFO_value(pi.qualifiers, j)
279                pqualid = x509.ObjectIdentifier(_obj2txt(backend, pqi.pqualid))
280                if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
281                    cpsuri = backend._ffi.buffer(
282                        pqi.d.cpsuri.data, pqi.d.cpsuri.length
283                    )[:].decode("ascii")
284                    qualifiers.append(cpsuri)
285                else:
286                    assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
287                    user_notice = _decode_user_notice(
288                        backend, pqi.d.usernotice
289                    )
290                    qualifiers.append(user_notice)
291
292        certificate_policies.append(x509.PolicyInformation(oid, qualifiers))
293
294    return x509.CertificatePolicies(certificate_policies)
295
296
297def _decode_user_notice(backend, un):
298    explicit_text = None
299    notice_reference = None
300
301    if un.exptext != backend._ffi.NULL:
302        explicit_text = _asn1_string_to_utf8(backend, un.exptext)
303
304    if un.noticeref != backend._ffi.NULL:
305        organization = _asn1_string_to_utf8(backend, un.noticeref.organization)
306
307        num = backend._lib.sk_ASN1_INTEGER_num(un.noticeref.noticenos)
308        notice_numbers = []
309        for i in range(num):
310            asn1_int = backend._lib.sk_ASN1_INTEGER_value(
311                un.noticeref.noticenos, i
312            )
313            notice_num = _asn1_integer_to_int(backend, asn1_int)
314            notice_numbers.append(notice_num)
315
316        notice_reference = x509.NoticeReference(organization, notice_numbers)
317
318    return x509.UserNotice(notice_reference, explicit_text)
319
320
321def _decode_basic_constraints(backend, bc_st):
322    basic_constraints = backend._ffi.cast("BASIC_CONSTRAINTS *", bc_st)
323    basic_constraints = backend._ffi.gc(
324        basic_constraints, backend._lib.BASIC_CONSTRAINTS_free
325    )
326    # The byte representation of an ASN.1 boolean true is \xff. OpenSSL
327    # chooses to just map this to its ordinal value, so true is 255 and
328    # false is 0.
329    ca = basic_constraints.ca == 255
330    path_length = _asn1_integer_to_int_or_none(
331        backend, basic_constraints.pathlen
332    )
333
334    return x509.BasicConstraints(ca, path_length)
335
336
337def _decode_subject_key_identifier(backend, asn1_string):
338    asn1_string = backend._ffi.cast("ASN1_OCTET_STRING *", asn1_string)
339    asn1_string = backend._ffi.gc(
340        asn1_string, backend._lib.ASN1_OCTET_STRING_free
341    )
342    return x509.SubjectKeyIdentifier(
343        backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]
344    )
345
346
347def _decode_authority_key_identifier(backend, akid):
348    akid = backend._ffi.cast("AUTHORITY_KEYID *", akid)
349    akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free)
350    key_identifier = None
351    authority_cert_issuer = None
352
353    if akid.keyid != backend._ffi.NULL:
354        key_identifier = backend._ffi.buffer(
355            akid.keyid.data, akid.keyid.length
356        )[:]
357
358    if akid.issuer != backend._ffi.NULL:
359        authority_cert_issuer = _decode_general_names(backend, akid.issuer)
360
361    authority_cert_serial_number = _asn1_integer_to_int_or_none(
362        backend, akid.serial
363    )
364
365    return x509.AuthorityKeyIdentifier(
366        key_identifier, authority_cert_issuer, authority_cert_serial_number
367    )
368
369
370def _decode_information_access(backend, ia):
371    ia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", ia)
372    ia = backend._ffi.gc(
373        ia,
374        lambda x: backend._lib.sk_ACCESS_DESCRIPTION_pop_free(
375            x,
376            backend._ffi.addressof(
377                backend._lib._original_lib, "ACCESS_DESCRIPTION_free"
378            ),
379        ),
380    )
381    num = backend._lib.sk_ACCESS_DESCRIPTION_num(ia)
382    access_descriptions = []
383    for i in range(num):
384        ad = backend._lib.sk_ACCESS_DESCRIPTION_value(ia, i)
385        backend.openssl_assert(ad.method != backend._ffi.NULL)
386        oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method))
387        backend.openssl_assert(ad.location != backend._ffi.NULL)
388        gn = _decode_general_name(backend, ad.location)
389        access_descriptions.append(x509.AccessDescription(oid, gn))
390
391    return access_descriptions
392
393
394def _decode_authority_information_access(backend, aia):
395    access_descriptions = _decode_information_access(backend, aia)
396    return x509.AuthorityInformationAccess(access_descriptions)
397
398
399def _decode_subject_information_access(backend, aia):
400    access_descriptions = _decode_information_access(backend, aia)
401    return x509.SubjectInformationAccess(access_descriptions)
402
403
404def _decode_key_usage(backend, bit_string):
405    bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
406    bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
407    get_bit = backend._lib.ASN1_BIT_STRING_get_bit
408    digital_signature = get_bit(bit_string, 0) == 1
409    content_commitment = get_bit(bit_string, 1) == 1
410    key_encipherment = get_bit(bit_string, 2) == 1
411    data_encipherment = get_bit(bit_string, 3) == 1
412    key_agreement = get_bit(bit_string, 4) == 1
413    key_cert_sign = get_bit(bit_string, 5) == 1
414    crl_sign = get_bit(bit_string, 6) == 1
415    encipher_only = get_bit(bit_string, 7) == 1
416    decipher_only = get_bit(bit_string, 8) == 1
417    return x509.KeyUsage(
418        digital_signature,
419        content_commitment,
420        key_encipherment,
421        data_encipherment,
422        key_agreement,
423        key_cert_sign,
424        crl_sign,
425        encipher_only,
426        decipher_only,
427    )
428
429
430def _decode_general_names_extension(backend, gns):
431    gns = backend._ffi.cast("GENERAL_NAMES *", gns)
432    gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
433    general_names = _decode_general_names(backend, gns)
434    return general_names
435
436
437def _decode_subject_alt_name(backend, ext):
438    return x509.SubjectAlternativeName(
439        _decode_general_names_extension(backend, ext)
440    )
441
442
443def _decode_issuer_alt_name(backend, ext):
444    return x509.IssuerAlternativeName(
445        _decode_general_names_extension(backend, ext)
446    )
447
448
449def _decode_name_constraints(backend, nc):
450    nc = backend._ffi.cast("NAME_CONSTRAINTS *", nc)
451    nc = backend._ffi.gc(nc, backend._lib.NAME_CONSTRAINTS_free)
452    permitted = _decode_general_subtrees(backend, nc.permittedSubtrees)
453    excluded = _decode_general_subtrees(backend, nc.excludedSubtrees)
454    return x509.NameConstraints(
455        permitted_subtrees=permitted, excluded_subtrees=excluded
456    )
457
458
459def _decode_general_subtrees(backend, stack_subtrees):
460    if stack_subtrees == backend._ffi.NULL:
461        return None
462
463    num = backend._lib.sk_GENERAL_SUBTREE_num(stack_subtrees)
464    subtrees = []
465
466    for i in range(num):
467        obj = backend._lib.sk_GENERAL_SUBTREE_value(stack_subtrees, i)
468        backend.openssl_assert(obj != backend._ffi.NULL)
469        name = _decode_general_name(backend, obj.base)
470        subtrees.append(name)
471
472    return subtrees
473
474
475def _decode_issuing_dist_point(backend, idp):
476    idp = backend._ffi.cast("ISSUING_DIST_POINT *", idp)
477    idp = backend._ffi.gc(idp, backend._lib.ISSUING_DIST_POINT_free)
478    if idp.distpoint != backend._ffi.NULL:
479        full_name, relative_name = _decode_distpoint(backend, idp.distpoint)
480    else:
481        full_name = None
482        relative_name = None
483
484    only_user = idp.onlyuser == 255
485    only_ca = idp.onlyCA == 255
486    indirect_crl = idp.indirectCRL == 255
487    only_attr = idp.onlyattr == 255
488    if idp.onlysomereasons != backend._ffi.NULL:
489        only_some_reasons = _decode_reasons(backend, idp.onlysomereasons)
490    else:
491        only_some_reasons = None
492
493    return x509.IssuingDistributionPoint(
494        full_name,
495        relative_name,
496        only_user,
497        only_ca,
498        only_some_reasons,
499        indirect_crl,
500        only_attr,
501    )
502
503
504def _decode_policy_constraints(backend, pc):
505    pc = backend._ffi.cast("POLICY_CONSTRAINTS *", pc)
506    pc = backend._ffi.gc(pc, backend._lib.POLICY_CONSTRAINTS_free)
507
508    require_explicit_policy = _asn1_integer_to_int_or_none(
509        backend, pc.requireExplicitPolicy
510    )
511    inhibit_policy_mapping = _asn1_integer_to_int_or_none(
512        backend, pc.inhibitPolicyMapping
513    )
514
515    return x509.PolicyConstraints(
516        require_explicit_policy, inhibit_policy_mapping
517    )
518
519
520def _decode_extended_key_usage(backend, sk):
521    sk = backend._ffi.cast("Cryptography_STACK_OF_ASN1_OBJECT *", sk)
522    sk = backend._ffi.gc(sk, backend._lib.sk_ASN1_OBJECT_free)
523    num = backend._lib.sk_ASN1_OBJECT_num(sk)
524    ekus = []
525
526    for i in range(num):
527        obj = backend._lib.sk_ASN1_OBJECT_value(sk, i)
528        backend.openssl_assert(obj != backend._ffi.NULL)
529        oid = x509.ObjectIdentifier(_obj2txt(backend, obj))
530        ekus.append(oid)
531
532    return x509.ExtendedKeyUsage(ekus)
533
534
535_DISTPOINT_TYPE_FULLNAME = 0
536_DISTPOINT_TYPE_RELATIVENAME = 1
537
538
539def _decode_dist_points(backend, cdps):
540    cdps = backend._ffi.cast("Cryptography_STACK_OF_DIST_POINT *", cdps)
541    cdps = backend._ffi.gc(cdps, backend._lib.CRL_DIST_POINTS_free)
542
543    num = backend._lib.sk_DIST_POINT_num(cdps)
544    dist_points = []
545    for i in range(num):
546        full_name = None
547        relative_name = None
548        crl_issuer = None
549        reasons = None
550        cdp = backend._lib.sk_DIST_POINT_value(cdps, i)
551        if cdp.reasons != backend._ffi.NULL:
552            reasons = _decode_reasons(backend, cdp.reasons)
553
554        if cdp.CRLissuer != backend._ffi.NULL:
555            crl_issuer = _decode_general_names(backend, cdp.CRLissuer)
556
557        # Certificates may have a crl_issuer/reasons and no distribution
558        # point so make sure it's not null.
559        if cdp.distpoint != backend._ffi.NULL:
560            full_name, relative_name = _decode_distpoint(
561                backend, cdp.distpoint
562            )
563
564        dist_points.append(
565            x509.DistributionPoint(
566                full_name, relative_name, reasons, crl_issuer
567            )
568        )
569
570    return dist_points
571
572
573# ReasonFlags ::= BIT STRING {
574#      unused                  (0),
575#      keyCompromise           (1),
576#      cACompromise            (2),
577#      affiliationChanged      (3),
578#      superseded              (4),
579#      cessationOfOperation    (5),
580#      certificateHold         (6),
581#      privilegeWithdrawn      (7),
582#      aACompromise            (8) }
583_REASON_BIT_MAPPING = {
584    1: x509.ReasonFlags.key_compromise,
585    2: x509.ReasonFlags.ca_compromise,
586    3: x509.ReasonFlags.affiliation_changed,
587    4: x509.ReasonFlags.superseded,
588    5: x509.ReasonFlags.cessation_of_operation,
589    6: x509.ReasonFlags.certificate_hold,
590    7: x509.ReasonFlags.privilege_withdrawn,
591    8: x509.ReasonFlags.aa_compromise,
592}
593
594
595def _decode_reasons(backend, reasons):
596    # We will check each bit from RFC 5280
597    enum_reasons = []
598    for bit_position, reason in six.iteritems(_REASON_BIT_MAPPING):
599        if backend._lib.ASN1_BIT_STRING_get_bit(reasons, bit_position):
600            enum_reasons.append(reason)
601
602    return frozenset(enum_reasons)
603
604
605def _decode_distpoint(backend, distpoint):
606    if distpoint.type == _DISTPOINT_TYPE_FULLNAME:
607        full_name = _decode_general_names(backend, distpoint.name.fullname)
608        return full_name, None
609
610    # OpenSSL code doesn't test for a specific type for
611    # relativename, everything that isn't fullname is considered
612    # relativename.  Per RFC 5280:
613    #
614    # DistributionPointName ::= CHOICE {
615    #      fullName                [0]      GeneralNames,
616    #      nameRelativeToCRLIssuer [1]      RelativeDistinguishedName }
617    rns = distpoint.name.relativename
618    rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns)
619    attributes = set()
620    for i in range(rnum):
621        rn = backend._lib.sk_X509_NAME_ENTRY_value(rns, i)
622        backend.openssl_assert(rn != backend._ffi.NULL)
623        attributes.add(_decode_x509_name_entry(backend, rn))
624
625    relative_name = x509.RelativeDistinguishedName(attributes)
626
627    return None, relative_name
628
629
630def _decode_crl_distribution_points(backend, cdps):
631    dist_points = _decode_dist_points(backend, cdps)
632    return x509.CRLDistributionPoints(dist_points)
633
634
635def _decode_freshest_crl(backend, cdps):
636    dist_points = _decode_dist_points(backend, cdps)
637    return x509.FreshestCRL(dist_points)
638
639
640def _decode_inhibit_any_policy(backend, asn1_int):
641    asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int)
642    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
643    skip_certs = _asn1_integer_to_int(backend, asn1_int)
644    return x509.InhibitAnyPolicy(skip_certs)
645
646
647def _decode_scts(backend, asn1_scts):
648    from cryptography.hazmat.backends.openssl.x509 import (
649        _SignedCertificateTimestamp,
650    )
651
652    asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts)
653    asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free)
654
655    scts = []
656    for i in range(backend._lib.sk_SCT_num(asn1_scts)):
657        sct = backend._lib.sk_SCT_value(asn1_scts, i)
658
659        scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct))
660    return scts
661
662
663def _decode_precert_signed_certificate_timestamps(backend, asn1_scts):
664    return x509.PrecertificateSignedCertificateTimestamps(
665        _decode_scts(backend, asn1_scts)
666    )
667
668
669def _decode_signed_certificate_timestamps(backend, asn1_scts):
670    return x509.SignedCertificateTimestamps(_decode_scts(backend, asn1_scts))
671
672
673#    CRLReason ::= ENUMERATED {
674#        unspecified             (0),
675#        keyCompromise           (1),
676#        cACompromise            (2),
677#        affiliationChanged      (3),
678#        superseded              (4),
679#        cessationOfOperation    (5),
680#        certificateHold         (6),
681#             -- value 7 is not used
682#        removeFromCRL           (8),
683#        privilegeWithdrawn      (9),
684#        aACompromise           (10) }
685_CRL_ENTRY_REASON_CODE_TO_ENUM = {
686    0: x509.ReasonFlags.unspecified,
687    1: x509.ReasonFlags.key_compromise,
688    2: x509.ReasonFlags.ca_compromise,
689    3: x509.ReasonFlags.affiliation_changed,
690    4: x509.ReasonFlags.superseded,
691    5: x509.ReasonFlags.cessation_of_operation,
692    6: x509.ReasonFlags.certificate_hold,
693    8: x509.ReasonFlags.remove_from_crl,
694    9: x509.ReasonFlags.privilege_withdrawn,
695    10: x509.ReasonFlags.aa_compromise,
696}
697
698
699_CRL_ENTRY_REASON_ENUM_TO_CODE = {
700    x509.ReasonFlags.unspecified: 0,
701    x509.ReasonFlags.key_compromise: 1,
702    x509.ReasonFlags.ca_compromise: 2,
703    x509.ReasonFlags.affiliation_changed: 3,
704    x509.ReasonFlags.superseded: 4,
705    x509.ReasonFlags.cessation_of_operation: 5,
706    x509.ReasonFlags.certificate_hold: 6,
707    x509.ReasonFlags.remove_from_crl: 8,
708    x509.ReasonFlags.privilege_withdrawn: 9,
709    x509.ReasonFlags.aa_compromise: 10,
710}
711
712
713def _decode_crl_reason(backend, enum):
714    enum = backend._ffi.cast("ASN1_ENUMERATED *", enum)
715    enum = backend._ffi.gc(enum, backend._lib.ASN1_ENUMERATED_free)
716    code = backend._lib.ASN1_ENUMERATED_get(enum)
717
718    try:
719        return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code])
720    except KeyError:
721        raise ValueError("Unsupported reason code: {}".format(code))
722
723
724def _decode_invalidity_date(backend, inv_date):
725    generalized_time = backend._ffi.cast("ASN1_GENERALIZEDTIME *", inv_date)
726    generalized_time = backend._ffi.gc(
727        generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
728    )
729    return x509.InvalidityDate(
730        _parse_asn1_generalized_time(backend, generalized_time)
731    )
732
733
734def _decode_cert_issuer(backend, gns):
735    gns = backend._ffi.cast("GENERAL_NAMES *", gns)
736    gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
737    general_names = _decode_general_names(backend, gns)
738    return x509.CertificateIssuer(general_names)
739
740
741def _asn1_to_der(backend, asn1_type):
742    buf = backend._ffi.new("unsigned char **")
743    res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf)
744    backend.openssl_assert(res >= 0)
745    backend.openssl_assert(buf[0] != backend._ffi.NULL)
746    buf = backend._ffi.gc(
747        buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
748    )
749    return backend._ffi.buffer(buf[0], res)[:]
750
751
752def _asn1_integer_to_int(backend, asn1_int):
753    bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL)
754    backend.openssl_assert(bn != backend._ffi.NULL)
755    bn = backend._ffi.gc(bn, backend._lib.BN_free)
756    return backend._bn_to_int(bn)
757
758
759def _asn1_integer_to_int_or_none(backend, asn1_int):
760    if asn1_int == backend._ffi.NULL:
761        return None
762    else:
763        return _asn1_integer_to_int(backend, asn1_int)
764
765
766def _asn1_string_to_bytes(backend, asn1_string):
767    return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]
768
769
770def _asn1_string_to_ascii(backend, asn1_string):
771    return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
772
773
774def _asn1_string_to_utf8(backend, asn1_string):
775    buf = backend._ffi.new("unsigned char **")
776    res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
777    if res == -1:
778        raise ValueError(
779            "Unsupported ASN1 string type. Type: {}".format(asn1_string.type)
780        )
781
782    backend.openssl_assert(buf[0] != backend._ffi.NULL)
783    buf = backend._ffi.gc(
784        buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
785    )
786    return backend._ffi.buffer(buf[0], res)[:].decode("utf8")
787
788
789def _parse_asn1_time(backend, asn1_time):
790    backend.openssl_assert(asn1_time != backend._ffi.NULL)
791    generalized_time = backend._lib.ASN1_TIME_to_generalizedtime(
792        asn1_time, backend._ffi.NULL
793    )
794    if generalized_time == backend._ffi.NULL:
795        raise ValueError(
796            "Couldn't parse ASN.1 time as generalizedtime {!r}".format(
797                _asn1_string_to_bytes(backend, asn1_time)
798            )
799        )
800
801    generalized_time = backend._ffi.gc(
802        generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
803    )
804    return _parse_asn1_generalized_time(backend, generalized_time)
805
806
807def _parse_asn1_generalized_time(backend, generalized_time):
808    time = _asn1_string_to_ascii(
809        backend, backend._ffi.cast("ASN1_STRING *", generalized_time)
810    )
811    return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")
812
813
814def _decode_nonce(backend, nonce):
815    nonce = backend._ffi.cast("ASN1_OCTET_STRING *", nonce)
816    nonce = backend._ffi.gc(nonce, backend._lib.ASN1_OCTET_STRING_free)
817    return x509.OCSPNonce(_asn1_string_to_bytes(backend, nonce))
818
819
820_EXTENSION_HANDLERS_BASE = {
821    ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints,
822    ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier,
823    ExtensionOID.KEY_USAGE: _decode_key_usage,
824    ExtensionOID.SUBJECT_ALTERNATIVE_NAME: _decode_subject_alt_name,
825    ExtensionOID.EXTENDED_KEY_USAGE: _decode_extended_key_usage,
826    ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier,
827    ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
828        _decode_authority_information_access
829    ),
830    ExtensionOID.SUBJECT_INFORMATION_ACCESS: (
831        _decode_subject_information_access
832    ),
833    ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies,
834    ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points,
835    ExtensionOID.FRESHEST_CRL: _decode_freshest_crl,
836    ExtensionOID.OCSP_NO_CHECK: _decode_ocsp_no_check,
837    ExtensionOID.INHIBIT_ANY_POLICY: _decode_inhibit_any_policy,
838    ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name,
839    ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints,
840    ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints,
841}
842_EXTENSION_HANDLERS_SCT = {
843    ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS: (
844        _decode_precert_signed_certificate_timestamps
845    )
846}
847
848_REVOKED_EXTENSION_HANDLERS = {
849    CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason,
850    CRLEntryExtensionOID.INVALIDITY_DATE: _decode_invalidity_date,
851    CRLEntryExtensionOID.CERTIFICATE_ISSUER: _decode_cert_issuer,
852}
853
854_CRL_EXTENSION_HANDLERS = {
855    ExtensionOID.CRL_NUMBER: _decode_crl_number,
856    ExtensionOID.DELTA_CRL_INDICATOR: _decode_delta_crl_indicator,
857    ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier,
858    ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name,
859    ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
860        _decode_authority_information_access
861    ),
862    ExtensionOID.ISSUING_DISTRIBUTION_POINT: _decode_issuing_dist_point,
863    ExtensionOID.FRESHEST_CRL: _decode_freshest_crl,
864}
865
866_OCSP_REQ_EXTENSION_HANDLERS = {
867    OCSPExtensionOID.NONCE: _decode_nonce,
868}
869
870_OCSP_BASICRESP_EXTENSION_HANDLERS = {
871    OCSPExtensionOID.NONCE: _decode_nonce,
872}
873
874_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT = {
875    ExtensionOID.SIGNED_CERTIFICATE_TIMESTAMPS: (
876        _decode_signed_certificate_timestamps
877    )
878}
879