1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import datetime 8import operator 9 10from cryptography import utils, x509 11from cryptography.exceptions import UnsupportedAlgorithm 12from cryptography.hazmat.backends.openssl.decode_asn1 import ( 13 _asn1_integer_to_int, 14 _asn1_string_to_bytes, 15 _decode_x509_name, 16 _obj2txt, 17 _parse_asn1_time, 18) 19from cryptography.hazmat.backends.openssl.encode_asn1 import ( 20 _encode_asn1_int_gc, 21 _txt2obj_gc, 22) 23from cryptography.hazmat.primitives import hashes, serialization 24from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa 25from cryptography.x509.name import _ASN1Type 26 27 28@utils.register_interface(x509.Certificate) 29class _Certificate(object): 30 def __init__(self, backend, x509_cert): 31 self._backend = backend 32 self._x509 = x509_cert 33 34 version = self._backend._lib.X509_get_version(self._x509) 35 if version == 0: 36 self._version = x509.Version.v1 37 elif version == 2: 38 self._version = x509.Version.v3 39 else: 40 raise x509.InvalidVersion( 41 "{} is not a valid X509 version".format(version), version 42 ) 43 44 def __repr__(self): 45 return "<Certificate(subject={}, ...)>".format(self.subject) 46 47 def __eq__(self, other): 48 if not isinstance(other, x509.Certificate): 49 return NotImplemented 50 51 res = self._backend._lib.X509_cmp(self._x509, other._x509) 52 return res == 0 53 54 def __ne__(self, other): 55 return not self == other 56 57 def __hash__(self): 58 return hash(self.public_bytes(serialization.Encoding.DER)) 59 60 def __deepcopy__(self, memo): 61 return self 62 63 def fingerprint(self, algorithm): 64 h = hashes.Hash(algorithm, self._backend) 65 h.update(self.public_bytes(serialization.Encoding.DER)) 66 return h.finalize() 67 68 version = utils.read_only_property("_version") 69 70 @property 71 def serial_number(self): 72 asn1_int = self._backend._lib.X509_get_serialNumber(self._x509) 73 self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL) 74 return _asn1_integer_to_int(self._backend, asn1_int) 75 76 def public_key(self): 77 pkey = self._backend._lib.X509_get_pubkey(self._x509) 78 if pkey == self._backend._ffi.NULL: 79 # Remove errors from the stack. 80 self._backend._consume_errors() 81 raise ValueError("Certificate public key is of an unknown type") 82 83 pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free) 84 85 return self._backend._evp_pkey_to_public_key(pkey) 86 87 @property 88 def not_valid_before(self): 89 asn1_time = self._backend._lib.X509_getm_notBefore(self._x509) 90 return _parse_asn1_time(self._backend, asn1_time) 91 92 @property 93 def not_valid_after(self): 94 asn1_time = self._backend._lib.X509_getm_notAfter(self._x509) 95 return _parse_asn1_time(self._backend, asn1_time) 96 97 @property 98 def issuer(self): 99 issuer = self._backend._lib.X509_get_issuer_name(self._x509) 100 self._backend.openssl_assert(issuer != self._backend._ffi.NULL) 101 return _decode_x509_name(self._backend, issuer) 102 103 @property 104 def subject(self): 105 subject = self._backend._lib.X509_get_subject_name(self._x509) 106 self._backend.openssl_assert(subject != self._backend._ffi.NULL) 107 return _decode_x509_name(self._backend, subject) 108 109 @property 110 def signature_hash_algorithm(self): 111 oid = self.signature_algorithm_oid 112 try: 113 return x509._SIG_OIDS_TO_HASH[oid] 114 except KeyError: 115 raise UnsupportedAlgorithm( 116 "Signature algorithm OID:{} not recognized".format(oid) 117 ) 118 119 @property 120 def signature_algorithm_oid(self): 121 alg = self._backend._ffi.new("X509_ALGOR **") 122 self._backend._lib.X509_get0_signature( 123 self._backend._ffi.NULL, alg, self._x509 124 ) 125 self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL) 126 oid = _obj2txt(self._backend, alg[0].algorithm) 127 return x509.ObjectIdentifier(oid) 128 129 @utils.cached_property 130 def extensions(self): 131 return self._backend._certificate_extension_parser.parse(self._x509) 132 133 @property 134 def signature(self): 135 sig = self._backend._ffi.new("ASN1_BIT_STRING **") 136 self._backend._lib.X509_get0_signature( 137 sig, self._backend._ffi.NULL, self._x509 138 ) 139 self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL) 140 return _asn1_string_to_bytes(self._backend, sig[0]) 141 142 @property 143 def tbs_certificate_bytes(self): 144 pp = self._backend._ffi.new("unsigned char **") 145 res = self._backend._lib.i2d_re_X509_tbs(self._x509, pp) 146 self._backend.openssl_assert(res > 0) 147 pp = self._backend._ffi.gc( 148 pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) 149 ) 150 return self._backend._ffi.buffer(pp[0], res)[:] 151 152 def public_bytes(self, encoding): 153 bio = self._backend._create_mem_bio_gc() 154 if encoding is serialization.Encoding.PEM: 155 res = self._backend._lib.PEM_write_bio_X509(bio, self._x509) 156 elif encoding is serialization.Encoding.DER: 157 res = self._backend._lib.i2d_X509_bio(bio, self._x509) 158 else: 159 raise TypeError("encoding must be an item from the Encoding enum") 160 161 self._backend.openssl_assert(res == 1) 162 return self._backend._read_mem_bio(bio) 163 164 165@utils.register_interface(x509.RevokedCertificate) 166class _RevokedCertificate(object): 167 def __init__(self, backend, crl, x509_revoked): 168 self._backend = backend 169 # The X509_REVOKED_value is a X509_REVOKED * that has 170 # no reference counting. This means when X509_CRL_free is 171 # called then the CRL and all X509_REVOKED * are freed. Since 172 # you can retain a reference to a single revoked certificate 173 # and let the CRL fall out of scope we need to retain a 174 # private reference to the CRL inside the RevokedCertificate 175 # object to prevent the gc from being called inappropriately. 176 self._crl = crl 177 self._x509_revoked = x509_revoked 178 179 @property 180 def serial_number(self): 181 asn1_int = self._backend._lib.X509_REVOKED_get0_serialNumber( 182 self._x509_revoked 183 ) 184 self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL) 185 return _asn1_integer_to_int(self._backend, asn1_int) 186 187 @property 188 def revocation_date(self): 189 return _parse_asn1_time( 190 self._backend, 191 self._backend._lib.X509_REVOKED_get0_revocationDate( 192 self._x509_revoked 193 ), 194 ) 195 196 @utils.cached_property 197 def extensions(self): 198 return self._backend._revoked_cert_extension_parser.parse( 199 self._x509_revoked 200 ) 201 202 203@utils.register_interface(x509.CertificateRevocationList) 204class _CertificateRevocationList(object): 205 def __init__(self, backend, x509_crl): 206 self._backend = backend 207 self._x509_crl = x509_crl 208 209 def __eq__(self, other): 210 if not isinstance(other, x509.CertificateRevocationList): 211 return NotImplemented 212 213 res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl) 214 return res == 0 215 216 def __ne__(self, other): 217 return not self == other 218 219 def fingerprint(self, algorithm): 220 h = hashes.Hash(algorithm, self._backend) 221 bio = self._backend._create_mem_bio_gc() 222 res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl) 223 self._backend.openssl_assert(res == 1) 224 der = self._backend._read_mem_bio(bio) 225 h.update(der) 226 return h.finalize() 227 228 @utils.cached_property 229 def _sorted_crl(self): 230 # X509_CRL_get0_by_serial sorts in place, which breaks a variety of 231 # things we don't want to break (like iteration and the signature). 232 # Let's dupe it and sort that instead. 233 dup = self._backend._lib.X509_CRL_dup(self._x509_crl) 234 self._backend.openssl_assert(dup != self._backend._ffi.NULL) 235 dup = self._backend._ffi.gc(dup, self._backend._lib.X509_CRL_free) 236 return dup 237 238 def get_revoked_certificate_by_serial_number(self, serial_number): 239 revoked = self._backend._ffi.new("X509_REVOKED **") 240 asn1_int = _encode_asn1_int_gc(self._backend, serial_number) 241 res = self._backend._lib.X509_CRL_get0_by_serial( 242 self._sorted_crl, revoked, asn1_int 243 ) 244 if res == 0: 245 return None 246 else: 247 self._backend.openssl_assert(revoked[0] != self._backend._ffi.NULL) 248 return _RevokedCertificate( 249 self._backend, self._sorted_crl, revoked[0] 250 ) 251 252 @property 253 def signature_hash_algorithm(self): 254 oid = self.signature_algorithm_oid 255 try: 256 return x509._SIG_OIDS_TO_HASH[oid] 257 except KeyError: 258 raise UnsupportedAlgorithm( 259 "Signature algorithm OID:{} not recognized".format(oid) 260 ) 261 262 @property 263 def signature_algorithm_oid(self): 264 alg = self._backend._ffi.new("X509_ALGOR **") 265 self._backend._lib.X509_CRL_get0_signature( 266 self._x509_crl, self._backend._ffi.NULL, alg 267 ) 268 self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL) 269 oid = _obj2txt(self._backend, alg[0].algorithm) 270 return x509.ObjectIdentifier(oid) 271 272 @property 273 def issuer(self): 274 issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl) 275 self._backend.openssl_assert(issuer != self._backend._ffi.NULL) 276 return _decode_x509_name(self._backend, issuer) 277 278 @property 279 def next_update(self): 280 nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl) 281 self._backend.openssl_assert(nu != self._backend._ffi.NULL) 282 return _parse_asn1_time(self._backend, nu) 283 284 @property 285 def last_update(self): 286 lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl) 287 self._backend.openssl_assert(lu != self._backend._ffi.NULL) 288 return _parse_asn1_time(self._backend, lu) 289 290 @property 291 def signature(self): 292 sig = self._backend._ffi.new("ASN1_BIT_STRING **") 293 self._backend._lib.X509_CRL_get0_signature( 294 self._x509_crl, sig, self._backend._ffi.NULL 295 ) 296 self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL) 297 return _asn1_string_to_bytes(self._backend, sig[0]) 298 299 @property 300 def tbs_certlist_bytes(self): 301 pp = self._backend._ffi.new("unsigned char **") 302 res = self._backend._lib.i2d_re_X509_CRL_tbs(self._x509_crl, pp) 303 self._backend.openssl_assert(res > 0) 304 pp = self._backend._ffi.gc( 305 pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) 306 ) 307 return self._backend._ffi.buffer(pp[0], res)[:] 308 309 def public_bytes(self, encoding): 310 bio = self._backend._create_mem_bio_gc() 311 if encoding is serialization.Encoding.PEM: 312 res = self._backend._lib.PEM_write_bio_X509_CRL( 313 bio, self._x509_crl 314 ) 315 elif encoding is serialization.Encoding.DER: 316 res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl) 317 else: 318 raise TypeError("encoding must be an item from the Encoding enum") 319 320 self._backend.openssl_assert(res == 1) 321 return self._backend._read_mem_bio(bio) 322 323 def _revoked_cert(self, idx): 324 revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl) 325 r = self._backend._lib.sk_X509_REVOKED_value(revoked, idx) 326 self._backend.openssl_assert(r != self._backend._ffi.NULL) 327 return _RevokedCertificate(self._backend, self, r) 328 329 def __iter__(self): 330 for i in range(len(self)): 331 yield self._revoked_cert(i) 332 333 def __getitem__(self, idx): 334 if isinstance(idx, slice): 335 start, stop, step = idx.indices(len(self)) 336 return [self._revoked_cert(i) for i in range(start, stop, step)] 337 else: 338 idx = operator.index(idx) 339 if idx < 0: 340 idx += len(self) 341 if not 0 <= idx < len(self): 342 raise IndexError 343 return self._revoked_cert(idx) 344 345 def __len__(self): 346 revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl) 347 if revoked == self._backend._ffi.NULL: 348 return 0 349 else: 350 return self._backend._lib.sk_X509_REVOKED_num(revoked) 351 352 @utils.cached_property 353 def extensions(self): 354 return self._backend._crl_extension_parser.parse(self._x509_crl) 355 356 def is_signature_valid(self, public_key): 357 if not isinstance( 358 public_key, 359 (dsa.DSAPublicKey, rsa.RSAPublicKey, ec.EllipticCurvePublicKey), 360 ): 361 raise TypeError( 362 "Expecting one of DSAPublicKey, RSAPublicKey," 363 " or EllipticCurvePublicKey." 364 ) 365 res = self._backend._lib.X509_CRL_verify( 366 self._x509_crl, public_key._evp_pkey 367 ) 368 369 if res != 1: 370 self._backend._consume_errors() 371 return False 372 373 return True 374 375 376@utils.register_interface(x509.CertificateSigningRequest) 377class _CertificateSigningRequest(object): 378 def __init__(self, backend, x509_req): 379 self._backend = backend 380 self._x509_req = x509_req 381 382 def __eq__(self, other): 383 if not isinstance(other, _CertificateSigningRequest): 384 return NotImplemented 385 386 self_bytes = self.public_bytes(serialization.Encoding.DER) 387 other_bytes = other.public_bytes(serialization.Encoding.DER) 388 return self_bytes == other_bytes 389 390 def __ne__(self, other): 391 return not self == other 392 393 def __hash__(self): 394 return hash(self.public_bytes(serialization.Encoding.DER)) 395 396 def public_key(self): 397 pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req) 398 self._backend.openssl_assert(pkey != self._backend._ffi.NULL) 399 pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free) 400 return self._backend._evp_pkey_to_public_key(pkey) 401 402 @property 403 def subject(self): 404 subject = self._backend._lib.X509_REQ_get_subject_name(self._x509_req) 405 self._backend.openssl_assert(subject != self._backend._ffi.NULL) 406 return _decode_x509_name(self._backend, subject) 407 408 @property 409 def signature_hash_algorithm(self): 410 oid = self.signature_algorithm_oid 411 try: 412 return x509._SIG_OIDS_TO_HASH[oid] 413 except KeyError: 414 raise UnsupportedAlgorithm( 415 "Signature algorithm OID:{} not recognized".format(oid) 416 ) 417 418 @property 419 def signature_algorithm_oid(self): 420 alg = self._backend._ffi.new("X509_ALGOR **") 421 self._backend._lib.X509_REQ_get0_signature( 422 self._x509_req, self._backend._ffi.NULL, alg 423 ) 424 self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL) 425 oid = _obj2txt(self._backend, alg[0].algorithm) 426 return x509.ObjectIdentifier(oid) 427 428 @utils.cached_property 429 def extensions(self): 430 x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req) 431 x509_exts = self._backend._ffi.gc( 432 x509_exts, 433 lambda x: self._backend._lib.sk_X509_EXTENSION_pop_free( 434 x, 435 self._backend._ffi.addressof( 436 self._backend._lib._original_lib, "X509_EXTENSION_free" 437 ), 438 ), 439 ) 440 return self._backend._csr_extension_parser.parse(x509_exts) 441 442 def public_bytes(self, encoding): 443 bio = self._backend._create_mem_bio_gc() 444 if encoding is serialization.Encoding.PEM: 445 res = self._backend._lib.PEM_write_bio_X509_REQ( 446 bio, self._x509_req 447 ) 448 elif encoding is serialization.Encoding.DER: 449 res = self._backend._lib.i2d_X509_REQ_bio(bio, self._x509_req) 450 else: 451 raise TypeError("encoding must be an item from the Encoding enum") 452 453 self._backend.openssl_assert(res == 1) 454 return self._backend._read_mem_bio(bio) 455 456 @property 457 def tbs_certrequest_bytes(self): 458 pp = self._backend._ffi.new("unsigned char **") 459 res = self._backend._lib.i2d_re_X509_REQ_tbs(self._x509_req, pp) 460 self._backend.openssl_assert(res > 0) 461 pp = self._backend._ffi.gc( 462 pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) 463 ) 464 return self._backend._ffi.buffer(pp[0], res)[:] 465 466 @property 467 def signature(self): 468 sig = self._backend._ffi.new("ASN1_BIT_STRING **") 469 self._backend._lib.X509_REQ_get0_signature( 470 self._x509_req, sig, self._backend._ffi.NULL 471 ) 472 self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL) 473 return _asn1_string_to_bytes(self._backend, sig[0]) 474 475 @property 476 def is_signature_valid(self): 477 pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req) 478 self._backend.openssl_assert(pkey != self._backend._ffi.NULL) 479 pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free) 480 res = self._backend._lib.X509_REQ_verify(self._x509_req, pkey) 481 482 if res != 1: 483 self._backend._consume_errors() 484 return False 485 486 return True 487 488 def get_attribute_for_oid(self, oid): 489 obj = _txt2obj_gc(self._backend, oid.dotted_string) 490 pos = self._backend._lib.X509_REQ_get_attr_by_OBJ( 491 self._x509_req, obj, -1 492 ) 493 if pos == -1: 494 raise x509.AttributeNotFound( 495 "No {} attribute was found".format(oid), oid 496 ) 497 498 attr = self._backend._lib.X509_REQ_get_attr(self._x509_req, pos) 499 self._backend.openssl_assert(attr != self._backend._ffi.NULL) 500 # We don't support multiple valued attributes for now. 501 self._backend.openssl_assert( 502 self._backend._lib.X509_ATTRIBUTE_count(attr) == 1 503 ) 504 asn1_type = self._backend._lib.X509_ATTRIBUTE_get0_type(attr, 0) 505 self._backend.openssl_assert(asn1_type != self._backend._ffi.NULL) 506 # We need this to ensure that our C type cast is safe. 507 # Also this should always be a sane string type, but we'll see if 508 # that is true in the real world... 509 if asn1_type.type not in ( 510 _ASN1Type.UTF8String.value, 511 _ASN1Type.PrintableString.value, 512 _ASN1Type.IA5String.value, 513 ): 514 raise ValueError( 515 "OID {} has a disallowed ASN.1 type: {}".format( 516 oid, asn1_type.type 517 ) 518 ) 519 520 data = self._backend._lib.X509_ATTRIBUTE_get0_data( 521 attr, 0, asn1_type.type, self._backend._ffi.NULL 522 ) 523 self._backend.openssl_assert(data != self._backend._ffi.NULL) 524 # This cast is safe iff we assert on the type above to ensure 525 # that it is always a type of ASN1_STRING 526 data = self._backend._ffi.cast("ASN1_STRING *", data) 527 return _asn1_string_to_bytes(self._backend, data) 528 529 530@utils.register_interface( 531 x509.certificate_transparency.SignedCertificateTimestamp 532) 533class _SignedCertificateTimestamp(object): 534 def __init__(self, backend, sct_list, sct): 535 self._backend = backend 536 # Keep the SCT_LIST that this SCT came from alive. 537 self._sct_list = sct_list 538 self._sct = sct 539 540 @property 541 def version(self): 542 version = self._backend._lib.SCT_get_version(self._sct) 543 assert version == self._backend._lib.SCT_VERSION_V1 544 return x509.certificate_transparency.Version.v1 545 546 @property 547 def log_id(self): 548 out = self._backend._ffi.new("unsigned char **") 549 log_id_length = self._backend._lib.SCT_get0_log_id(self._sct, out) 550 assert log_id_length >= 0 551 return self._backend._ffi.buffer(out[0], log_id_length)[:] 552 553 @property 554 def timestamp(self): 555 timestamp = self._backend._lib.SCT_get_timestamp(self._sct) 556 milliseconds = timestamp % 1000 557 return datetime.datetime.utcfromtimestamp(timestamp // 1000).replace( 558 microsecond=milliseconds * 1000 559 ) 560 561 @property 562 def entry_type(self): 563 entry_type = self._backend._lib.SCT_get_log_entry_type(self._sct) 564 # We currently only support loading SCTs from the X.509 extension, so 565 # we only have precerts. 566 assert entry_type == self._backend._lib.CT_LOG_ENTRY_TYPE_PRECERT 567 return x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE 568 569 @property 570 def _signature(self): 571 ptrptr = self._backend._ffi.new("unsigned char **") 572 res = self._backend._lib.SCT_get0_signature(self._sct, ptrptr) 573 self._backend.openssl_assert(res > 0) 574 self._backend.openssl_assert(ptrptr[0] != self._backend._ffi.NULL) 575 return self._backend._ffi.buffer(ptrptr[0], res)[:] 576 577 def __hash__(self): 578 return hash(self._signature) 579 580 def __eq__(self, other): 581 if not isinstance(other, _SignedCertificateTimestamp): 582 return NotImplemented 583 584 return self._signature == other._signature 585 586 def __ne__(self, other): 587 return not self == other 588