1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import datetime
8import operator
9
10from cryptography import utils, x509
11from cryptography.exceptions import UnsupportedAlgorithm
12from cryptography.hazmat.backends.openssl.decode_asn1 import (
13    _asn1_integer_to_int,
14    _asn1_string_to_bytes,
15    _decode_x509_name,
16    _obj2txt,
17    _parse_asn1_time,
18)
19from cryptography.hazmat.backends.openssl.encode_asn1 import (
20    _encode_asn1_int_gc,
21    _txt2obj_gc,
22)
23from cryptography.hazmat.primitives import hashes, serialization
24from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
25from cryptography.x509.name import _ASN1Type
26
27
28@utils.register_interface(x509.Certificate)
29class _Certificate(object):
30    def __init__(self, backend, x509_cert):
31        self._backend = backend
32        self._x509 = x509_cert
33
34        version = self._backend._lib.X509_get_version(self._x509)
35        if version == 0:
36            self._version = x509.Version.v1
37        elif version == 2:
38            self._version = x509.Version.v3
39        else:
40            raise x509.InvalidVersion(
41                "{} is not a valid X509 version".format(version), version
42            )
43
44    def __repr__(self):
45        return "<Certificate(subject={}, ...)>".format(self.subject)
46
47    def __eq__(self, other):
48        if not isinstance(other, x509.Certificate):
49            return NotImplemented
50
51        res = self._backend._lib.X509_cmp(self._x509, other._x509)
52        return res == 0
53
54    def __ne__(self, other):
55        return not self == other
56
57    def __hash__(self):
58        return hash(self.public_bytes(serialization.Encoding.DER))
59
60    def __deepcopy__(self, memo):
61        return self
62
63    def fingerprint(self, algorithm):
64        h = hashes.Hash(algorithm, self._backend)
65        h.update(self.public_bytes(serialization.Encoding.DER))
66        return h.finalize()
67
68    version = utils.read_only_property("_version")
69
70    @property
71    def serial_number(self):
72        asn1_int = self._backend._lib.X509_get_serialNumber(self._x509)
73        self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
74        return _asn1_integer_to_int(self._backend, asn1_int)
75
76    def public_key(self):
77        pkey = self._backend._lib.X509_get_pubkey(self._x509)
78        if pkey == self._backend._ffi.NULL:
79            # Remove errors from the stack.
80            self._backend._consume_errors()
81            raise ValueError("Certificate public key is of an unknown type")
82
83        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
84
85        return self._backend._evp_pkey_to_public_key(pkey)
86
87    @property
88    def not_valid_before(self):
89        asn1_time = self._backend._lib.X509_getm_notBefore(self._x509)
90        return _parse_asn1_time(self._backend, asn1_time)
91
92    @property
93    def not_valid_after(self):
94        asn1_time = self._backend._lib.X509_getm_notAfter(self._x509)
95        return _parse_asn1_time(self._backend, asn1_time)
96
97    @property
98    def issuer(self):
99        issuer = self._backend._lib.X509_get_issuer_name(self._x509)
100        self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
101        return _decode_x509_name(self._backend, issuer)
102
103    @property
104    def subject(self):
105        subject = self._backend._lib.X509_get_subject_name(self._x509)
106        self._backend.openssl_assert(subject != self._backend._ffi.NULL)
107        return _decode_x509_name(self._backend, subject)
108
109    @property
110    def signature_hash_algorithm(self):
111        oid = self.signature_algorithm_oid
112        try:
113            return x509._SIG_OIDS_TO_HASH[oid]
114        except KeyError:
115            raise UnsupportedAlgorithm(
116                "Signature algorithm OID:{} not recognized".format(oid)
117            )
118
119    @property
120    def signature_algorithm_oid(self):
121        alg = self._backend._ffi.new("X509_ALGOR **")
122        self._backend._lib.X509_get0_signature(
123            self._backend._ffi.NULL, alg, self._x509
124        )
125        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
126        oid = _obj2txt(self._backend, alg[0].algorithm)
127        return x509.ObjectIdentifier(oid)
128
129    @utils.cached_property
130    def extensions(self):
131        return self._backend._certificate_extension_parser.parse(self._x509)
132
133    @property
134    def signature(self):
135        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
136        self._backend._lib.X509_get0_signature(
137            sig, self._backend._ffi.NULL, self._x509
138        )
139        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
140        return _asn1_string_to_bytes(self._backend, sig[0])
141
142    @property
143    def tbs_certificate_bytes(self):
144        pp = self._backend._ffi.new("unsigned char **")
145        res = self._backend._lib.i2d_re_X509_tbs(self._x509, pp)
146        self._backend.openssl_assert(res > 0)
147        pp = self._backend._ffi.gc(
148            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
149        )
150        return self._backend._ffi.buffer(pp[0], res)[:]
151
152    def public_bytes(self, encoding):
153        bio = self._backend._create_mem_bio_gc()
154        if encoding is serialization.Encoding.PEM:
155            res = self._backend._lib.PEM_write_bio_X509(bio, self._x509)
156        elif encoding is serialization.Encoding.DER:
157            res = self._backend._lib.i2d_X509_bio(bio, self._x509)
158        else:
159            raise TypeError("encoding must be an item from the Encoding enum")
160
161        self._backend.openssl_assert(res == 1)
162        return self._backend._read_mem_bio(bio)
163
164
165@utils.register_interface(x509.RevokedCertificate)
166class _RevokedCertificate(object):
167    def __init__(self, backend, crl, x509_revoked):
168        self._backend = backend
169        # The X509_REVOKED_value is a X509_REVOKED * that has
170        # no reference counting. This means when X509_CRL_free is
171        # called then the CRL and all X509_REVOKED * are freed. Since
172        # you can retain a reference to a single revoked certificate
173        # and let the CRL fall out of scope we need to retain a
174        # private reference to the CRL inside the RevokedCertificate
175        # object to prevent the gc from being called inappropriately.
176        self._crl = crl
177        self._x509_revoked = x509_revoked
178
179    @property
180    def serial_number(self):
181        asn1_int = self._backend._lib.X509_REVOKED_get0_serialNumber(
182            self._x509_revoked
183        )
184        self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
185        return _asn1_integer_to_int(self._backend, asn1_int)
186
187    @property
188    def revocation_date(self):
189        return _parse_asn1_time(
190            self._backend,
191            self._backend._lib.X509_REVOKED_get0_revocationDate(
192                self._x509_revoked
193            ),
194        )
195
196    @utils.cached_property
197    def extensions(self):
198        return self._backend._revoked_cert_extension_parser.parse(
199            self._x509_revoked
200        )
201
202
203@utils.register_interface(x509.CertificateRevocationList)
204class _CertificateRevocationList(object):
205    def __init__(self, backend, x509_crl):
206        self._backend = backend
207        self._x509_crl = x509_crl
208
209    def __eq__(self, other):
210        if not isinstance(other, x509.CertificateRevocationList):
211            return NotImplemented
212
213        res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl)
214        return res == 0
215
216    def __ne__(self, other):
217        return not self == other
218
219    def fingerprint(self, algorithm):
220        h = hashes.Hash(algorithm, self._backend)
221        bio = self._backend._create_mem_bio_gc()
222        res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
223        self._backend.openssl_assert(res == 1)
224        der = self._backend._read_mem_bio(bio)
225        h.update(der)
226        return h.finalize()
227
228    @utils.cached_property
229    def _sorted_crl(self):
230        # X509_CRL_get0_by_serial sorts in place, which breaks a variety of
231        # things we don't want to break (like iteration and the signature).
232        # Let's dupe it and sort that instead.
233        dup = self._backend._lib.X509_CRL_dup(self._x509_crl)
234        self._backend.openssl_assert(dup != self._backend._ffi.NULL)
235        dup = self._backend._ffi.gc(dup, self._backend._lib.X509_CRL_free)
236        return dup
237
238    def get_revoked_certificate_by_serial_number(self, serial_number):
239        revoked = self._backend._ffi.new("X509_REVOKED **")
240        asn1_int = _encode_asn1_int_gc(self._backend, serial_number)
241        res = self._backend._lib.X509_CRL_get0_by_serial(
242            self._sorted_crl, revoked, asn1_int
243        )
244        if res == 0:
245            return None
246        else:
247            self._backend.openssl_assert(revoked[0] != self._backend._ffi.NULL)
248            return _RevokedCertificate(
249                self._backend, self._sorted_crl, revoked[0]
250            )
251
252    @property
253    def signature_hash_algorithm(self):
254        oid = self.signature_algorithm_oid
255        try:
256            return x509._SIG_OIDS_TO_HASH[oid]
257        except KeyError:
258            raise UnsupportedAlgorithm(
259                "Signature algorithm OID:{} not recognized".format(oid)
260            )
261
262    @property
263    def signature_algorithm_oid(self):
264        alg = self._backend._ffi.new("X509_ALGOR **")
265        self._backend._lib.X509_CRL_get0_signature(
266            self._x509_crl, self._backend._ffi.NULL, alg
267        )
268        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
269        oid = _obj2txt(self._backend, alg[0].algorithm)
270        return x509.ObjectIdentifier(oid)
271
272    @property
273    def issuer(self):
274        issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl)
275        self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
276        return _decode_x509_name(self._backend, issuer)
277
278    @property
279    def next_update(self):
280        nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl)
281        self._backend.openssl_assert(nu != self._backend._ffi.NULL)
282        return _parse_asn1_time(self._backend, nu)
283
284    @property
285    def last_update(self):
286        lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl)
287        self._backend.openssl_assert(lu != self._backend._ffi.NULL)
288        return _parse_asn1_time(self._backend, lu)
289
290    @property
291    def signature(self):
292        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
293        self._backend._lib.X509_CRL_get0_signature(
294            self._x509_crl, sig, self._backend._ffi.NULL
295        )
296        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
297        return _asn1_string_to_bytes(self._backend, sig[0])
298
299    @property
300    def tbs_certlist_bytes(self):
301        pp = self._backend._ffi.new("unsigned char **")
302        res = self._backend._lib.i2d_re_X509_CRL_tbs(self._x509_crl, pp)
303        self._backend.openssl_assert(res > 0)
304        pp = self._backend._ffi.gc(
305            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
306        )
307        return self._backend._ffi.buffer(pp[0], res)[:]
308
309    def public_bytes(self, encoding):
310        bio = self._backend._create_mem_bio_gc()
311        if encoding is serialization.Encoding.PEM:
312            res = self._backend._lib.PEM_write_bio_X509_CRL(
313                bio, self._x509_crl
314            )
315        elif encoding is serialization.Encoding.DER:
316            res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
317        else:
318            raise TypeError("encoding must be an item from the Encoding enum")
319
320        self._backend.openssl_assert(res == 1)
321        return self._backend._read_mem_bio(bio)
322
323    def _revoked_cert(self, idx):
324        revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
325        r = self._backend._lib.sk_X509_REVOKED_value(revoked, idx)
326        self._backend.openssl_assert(r != self._backend._ffi.NULL)
327        return _RevokedCertificate(self._backend, self, r)
328
329    def __iter__(self):
330        for i in range(len(self)):
331            yield self._revoked_cert(i)
332
333    def __getitem__(self, idx):
334        if isinstance(idx, slice):
335            start, stop, step = idx.indices(len(self))
336            return [self._revoked_cert(i) for i in range(start, stop, step)]
337        else:
338            idx = operator.index(idx)
339            if idx < 0:
340                idx += len(self)
341            if not 0 <= idx < len(self):
342                raise IndexError
343            return self._revoked_cert(idx)
344
345    def __len__(self):
346        revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
347        if revoked == self._backend._ffi.NULL:
348            return 0
349        else:
350            return self._backend._lib.sk_X509_REVOKED_num(revoked)
351
352    @utils.cached_property
353    def extensions(self):
354        return self._backend._crl_extension_parser.parse(self._x509_crl)
355
356    def is_signature_valid(self, public_key):
357        if not isinstance(
358            public_key,
359            (dsa.DSAPublicKey, rsa.RSAPublicKey, ec.EllipticCurvePublicKey),
360        ):
361            raise TypeError(
362                "Expecting one of DSAPublicKey, RSAPublicKey,"
363                " or EllipticCurvePublicKey."
364            )
365        res = self._backend._lib.X509_CRL_verify(
366            self._x509_crl, public_key._evp_pkey
367        )
368
369        if res != 1:
370            self._backend._consume_errors()
371            return False
372
373        return True
374
375
376@utils.register_interface(x509.CertificateSigningRequest)
377class _CertificateSigningRequest(object):
378    def __init__(self, backend, x509_req):
379        self._backend = backend
380        self._x509_req = x509_req
381
382    def __eq__(self, other):
383        if not isinstance(other, _CertificateSigningRequest):
384            return NotImplemented
385
386        self_bytes = self.public_bytes(serialization.Encoding.DER)
387        other_bytes = other.public_bytes(serialization.Encoding.DER)
388        return self_bytes == other_bytes
389
390    def __ne__(self, other):
391        return not self == other
392
393    def __hash__(self):
394        return hash(self.public_bytes(serialization.Encoding.DER))
395
396    def public_key(self):
397        pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
398        self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
399        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
400        return self._backend._evp_pkey_to_public_key(pkey)
401
402    @property
403    def subject(self):
404        subject = self._backend._lib.X509_REQ_get_subject_name(self._x509_req)
405        self._backend.openssl_assert(subject != self._backend._ffi.NULL)
406        return _decode_x509_name(self._backend, subject)
407
408    @property
409    def signature_hash_algorithm(self):
410        oid = self.signature_algorithm_oid
411        try:
412            return x509._SIG_OIDS_TO_HASH[oid]
413        except KeyError:
414            raise UnsupportedAlgorithm(
415                "Signature algorithm OID:{} not recognized".format(oid)
416            )
417
418    @property
419    def signature_algorithm_oid(self):
420        alg = self._backend._ffi.new("X509_ALGOR **")
421        self._backend._lib.X509_REQ_get0_signature(
422            self._x509_req, self._backend._ffi.NULL, alg
423        )
424        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
425        oid = _obj2txt(self._backend, alg[0].algorithm)
426        return x509.ObjectIdentifier(oid)
427
428    @utils.cached_property
429    def extensions(self):
430        x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req)
431        x509_exts = self._backend._ffi.gc(
432            x509_exts,
433            lambda x: self._backend._lib.sk_X509_EXTENSION_pop_free(
434                x,
435                self._backend._ffi.addressof(
436                    self._backend._lib._original_lib, "X509_EXTENSION_free"
437                ),
438            ),
439        )
440        return self._backend._csr_extension_parser.parse(x509_exts)
441
442    def public_bytes(self, encoding):
443        bio = self._backend._create_mem_bio_gc()
444        if encoding is serialization.Encoding.PEM:
445            res = self._backend._lib.PEM_write_bio_X509_REQ(
446                bio, self._x509_req
447            )
448        elif encoding is serialization.Encoding.DER:
449            res = self._backend._lib.i2d_X509_REQ_bio(bio, self._x509_req)
450        else:
451            raise TypeError("encoding must be an item from the Encoding enum")
452
453        self._backend.openssl_assert(res == 1)
454        return self._backend._read_mem_bio(bio)
455
456    @property
457    def tbs_certrequest_bytes(self):
458        pp = self._backend._ffi.new("unsigned char **")
459        res = self._backend._lib.i2d_re_X509_REQ_tbs(self._x509_req, pp)
460        self._backend.openssl_assert(res > 0)
461        pp = self._backend._ffi.gc(
462            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
463        )
464        return self._backend._ffi.buffer(pp[0], res)[:]
465
466    @property
467    def signature(self):
468        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
469        self._backend._lib.X509_REQ_get0_signature(
470            self._x509_req, sig, self._backend._ffi.NULL
471        )
472        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
473        return _asn1_string_to_bytes(self._backend, sig[0])
474
475    @property
476    def is_signature_valid(self):
477        pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
478        self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
479        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
480        res = self._backend._lib.X509_REQ_verify(self._x509_req, pkey)
481
482        if res != 1:
483            self._backend._consume_errors()
484            return False
485
486        return True
487
488    def get_attribute_for_oid(self, oid):
489        obj = _txt2obj_gc(self._backend, oid.dotted_string)
490        pos = self._backend._lib.X509_REQ_get_attr_by_OBJ(
491            self._x509_req, obj, -1
492        )
493        if pos == -1:
494            raise x509.AttributeNotFound(
495                "No {} attribute was found".format(oid), oid
496            )
497
498        attr = self._backend._lib.X509_REQ_get_attr(self._x509_req, pos)
499        self._backend.openssl_assert(attr != self._backend._ffi.NULL)
500        # We don't support multiple valued attributes for now.
501        self._backend.openssl_assert(
502            self._backend._lib.X509_ATTRIBUTE_count(attr) == 1
503        )
504        asn1_type = self._backend._lib.X509_ATTRIBUTE_get0_type(attr, 0)
505        self._backend.openssl_assert(asn1_type != self._backend._ffi.NULL)
506        # We need this to ensure that our C type cast is safe.
507        # Also this should always be a sane string type, but we'll see if
508        # that is true in the real world...
509        if asn1_type.type not in (
510            _ASN1Type.UTF8String.value,
511            _ASN1Type.PrintableString.value,
512            _ASN1Type.IA5String.value,
513        ):
514            raise ValueError(
515                "OID {} has a disallowed ASN.1 type: {}".format(
516                    oid, asn1_type.type
517                )
518            )
519
520        data = self._backend._lib.X509_ATTRIBUTE_get0_data(
521            attr, 0, asn1_type.type, self._backend._ffi.NULL
522        )
523        self._backend.openssl_assert(data != self._backend._ffi.NULL)
524        # This cast is safe iff we assert on the type above to ensure
525        # that it is always a type of ASN1_STRING
526        data = self._backend._ffi.cast("ASN1_STRING *", data)
527        return _asn1_string_to_bytes(self._backend, data)
528
529
530@utils.register_interface(
531    x509.certificate_transparency.SignedCertificateTimestamp
532)
533class _SignedCertificateTimestamp(object):
534    def __init__(self, backend, sct_list, sct):
535        self._backend = backend
536        # Keep the SCT_LIST that this SCT came from alive.
537        self._sct_list = sct_list
538        self._sct = sct
539
540    @property
541    def version(self):
542        version = self._backend._lib.SCT_get_version(self._sct)
543        assert version == self._backend._lib.SCT_VERSION_V1
544        return x509.certificate_transparency.Version.v1
545
546    @property
547    def log_id(self):
548        out = self._backend._ffi.new("unsigned char **")
549        log_id_length = self._backend._lib.SCT_get0_log_id(self._sct, out)
550        assert log_id_length >= 0
551        return self._backend._ffi.buffer(out[0], log_id_length)[:]
552
553    @property
554    def timestamp(self):
555        timestamp = self._backend._lib.SCT_get_timestamp(self._sct)
556        milliseconds = timestamp % 1000
557        return datetime.datetime.utcfromtimestamp(timestamp // 1000).replace(
558            microsecond=milliseconds * 1000
559        )
560
561    @property
562    def entry_type(self):
563        entry_type = self._backend._lib.SCT_get_log_entry_type(self._sct)
564        # We currently only support loading SCTs from the X.509 extension, so
565        # we only have precerts.
566        assert entry_type == self._backend._lib.CT_LOG_ENTRY_TYPE_PRECERT
567        return x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE
568
569    @property
570    def _signature(self):
571        ptrptr = self._backend._ffi.new("unsigned char **")
572        res = self._backend._lib.SCT_get0_signature(self._sct, ptrptr)
573        self._backend.openssl_assert(res > 0)
574        self._backend.openssl_assert(ptrptr[0] != self._backend._ffi.NULL)
575        return self._backend._ffi.buffer(ptrptr[0], res)[:]
576
577    def __hash__(self):
578        return hash(self._signature)
579
580    def __eq__(self, other):
581        if not isinstance(other, _SignedCertificateTimestamp):
582            return NotImplemented
583
584        return self._signature == other._signature
585
586    def __ne__(self, other):
587        return not self == other
588