1# Copyright 2005 Divmod, Inc. See LICENSE file for details 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6Tests for L{twisted.internet._sslverify}. 7""" 8 9 10import datetime 11import itertools 12import sys 13from unittest import skipIf 14 15from zope.interface import implementer 16 17from incremental import Version 18 19from twisted.internet import defer, interfaces, protocol, reactor 20from twisted.internet._idna import _idnaText 21from twisted.internet.error import CertificateError, ConnectionClosed, ConnectionLost 22from twisted.python.compat import nativeString 23from twisted.python.filepath import FilePath 24from twisted.python.modules import getModule 25from twisted.python.reflect import requireModule 26from twisted.test.iosim import connectedServerAndClient 27from twisted.test.test_twisted import SetAsideModule 28from twisted.trial import util 29from twisted.trial.unittest import SkipTest, SynchronousTestCase, TestCase 30 31skipSSL = "" 32skipSNI = "" 33skipNPN = "" 34skipALPN = "" 35 36if requireModule("OpenSSL"): 37 import ipaddress 38 39 from OpenSSL import SSL 40 from OpenSSL.crypto import FILETYPE_PEM, TYPE_RSA, X509, PKey, get_elliptic_curves 41 42 from cryptography import x509 43 from cryptography.hazmat.backends import default_backend 44 from cryptography.hazmat.primitives import hashes 45 from cryptography.hazmat.primitives.asymmetric import rsa 46 from cryptography.hazmat.primitives.serialization import ( 47 Encoding, 48 NoEncryption, 49 PrivateFormat, 50 ) 51 from cryptography.x509.oid import NameOID 52 53 from twisted.internet import ssl 54 55 try: 56 ctx = SSL.Context(SSL.SSLv23_METHOD) 57 ctx.set_npn_advertise_callback(lambda c: None) 58 except (NotImplementedError, AttributeError): 59 skipNPN = ( 60 "NPN is deprecated (and OpenSSL 1.0.1 or greater required for NPN" 61 " support)" 62 ) 63 64 try: 65 ctx = SSL.Context(SSL.SSLv23_METHOD) 66 ctx.set_alpn_select_callback(lambda c: None) # type: ignore[misc,arg-type] 67 except NotImplementedError: 68 skipALPN = "OpenSSL 1.0.2 or greater required for ALPN support" 69else: 70 skipSSL = "OpenSSL is required for SSL tests." 71 skipSNI = skipSSL 72 skipNPN = skipSSL 73 skipALPN = skipSSL 74 75if not skipSSL: 76 from twisted.internet import _sslverify as sslverify 77 from twisted.internet.ssl import VerificationError, platformTrust 78 from twisted.protocols.tls import TLSMemoryBIOFactory 79 80 81# A couple of static PEM-format certificates to be used by various tests. 82A_HOST_CERTIFICATE_PEM = """ 83-----BEGIN CERTIFICATE----- 84 MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG 85 A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u 86 MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo 87 dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x 88 ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw 89 OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy 90 aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4 91 IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v 92 Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ 93 KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp 94 8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi 95 KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ 96 VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj 97 JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO 98 S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls 99 fXzCWdG0O/3Lk2SRM0I= 100-----END CERTIFICATE----- 101""" 102 103A_PEER_CERTIFICATE_PEM = """ 104-----BEGIN CERTIFICATE----- 105 MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG 106 A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u 107 MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo 108 dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv 109 bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw 110 MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h 111 dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy 112 aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa 113 c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf 114 MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4 115 CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE 116 JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ 117 e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA 118 vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg 119 i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr 120 yqDtGhklsWW3ZwBzEh5VEOUp 121-----END CERTIFICATE----- 122""" 123 124A_KEYPAIR = getModule(__name__).filePath.sibling("server.pem").getContent() 125 126 127def counter(counter=itertools.count()): 128 """ 129 Each time we're called, return the next integer in the natural numbers. 130 """ 131 return next(counter) 132 133 134def makeCertificate(**kw): 135 keypair = PKey() 136 keypair.generate_key(TYPE_RSA, 2048) 137 138 certificate = X509() 139 certificate.gmtime_adj_notBefore(0) 140 certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year 141 for xname in certificate.get_issuer(), certificate.get_subject(): 142 for (k, v) in kw.items(): 143 setattr(xname, k, nativeString(v)) 144 145 certificate.set_serial_number(counter()) 146 certificate.set_pubkey(keypair) 147 certificate.sign(keypair, "md5") 148 149 return keypair, certificate 150 151 152def certificatesForAuthorityAndServer(serviceIdentity="example.com"): 153 """ 154 Create a self-signed CA certificate and server certificate signed by the 155 CA. 156 157 @param serviceIdentity: The identity (hostname) of the server. 158 @type serviceIdentity: L{unicode} 159 160 @return: a 2-tuple of C{(certificate_authority_certificate, 161 server_certificate)} 162 @rtype: L{tuple} of (L{sslverify.Certificate}, 163 L{sslverify.PrivateCertificate}) 164 """ 165 commonNameForCA = x509.Name( 166 [x509.NameAttribute(NameOID.COMMON_NAME, "Testing Example CA")] 167 ) 168 commonNameForServer = x509.Name( 169 [x509.NameAttribute(NameOID.COMMON_NAME, "Testing Example Server")] 170 ) 171 oneDay = datetime.timedelta(1, 0, 0) 172 privateKeyForCA = rsa.generate_private_key( 173 public_exponent=65537, key_size=4096, backend=default_backend() 174 ) 175 publicKeyForCA = privateKeyForCA.public_key() 176 caCertificate = ( 177 x509.CertificateBuilder() 178 .subject_name(commonNameForCA) 179 .issuer_name(commonNameForCA) 180 .not_valid_before(datetime.datetime.today() - oneDay) 181 .not_valid_after(datetime.datetime.today() + oneDay) 182 .serial_number(x509.random_serial_number()) 183 .public_key(publicKeyForCA) 184 .add_extension( 185 x509.BasicConstraints(ca=True, path_length=9), 186 critical=True, 187 ) 188 .sign( 189 private_key=privateKeyForCA, 190 algorithm=hashes.SHA256(), 191 backend=default_backend(), 192 ) 193 ) 194 195 privateKeyForServer = rsa.generate_private_key( 196 public_exponent=65537, key_size=4096, backend=default_backend() 197 ) 198 publicKeyForServer = privateKeyForServer.public_key() 199 200 try: 201 ipAddress = ipaddress.ip_address(serviceIdentity) 202 except ValueError: 203 subjectAlternativeNames = [ 204 x509.DNSName(serviceIdentity.encode("idna").decode("ascii")) 205 ] 206 else: 207 subjectAlternativeNames = [x509.IPAddress(ipAddress)] 208 209 serverCertificate = ( 210 x509.CertificateBuilder() 211 .subject_name(commonNameForServer) 212 .issuer_name(commonNameForCA) 213 .not_valid_before(datetime.datetime.today() - oneDay) 214 .not_valid_after(datetime.datetime.today() + oneDay) 215 .serial_number(x509.random_serial_number()) 216 .public_key(publicKeyForServer) 217 .add_extension( 218 x509.BasicConstraints(ca=False, path_length=None), 219 critical=True, 220 ) 221 .add_extension( 222 x509.SubjectAlternativeName(subjectAlternativeNames), 223 critical=True, 224 ) 225 .sign( 226 private_key=privateKeyForCA, 227 algorithm=hashes.SHA256(), 228 backend=default_backend(), 229 ) 230 ) 231 caSelfCert = sslverify.Certificate.loadPEM(caCertificate.public_bytes(Encoding.PEM)) 232 serverCert = sslverify.PrivateCertificate.loadPEM( 233 b"\n".join( 234 [ 235 privateKeyForServer.private_bytes( 236 Encoding.PEM, 237 PrivateFormat.TraditionalOpenSSL, 238 NoEncryption(), 239 ), 240 serverCertificate.public_bytes(Encoding.PEM), 241 ] 242 ) 243 ) 244 245 return caSelfCert, serverCert 246 247 248def _loopbackTLSConnection(serverOpts, clientOpts): 249 """ 250 Common implementation code for both L{loopbackTLSConnection} and 251 L{loopbackTLSConnectionInMemory}. Creates a loopback TLS connection 252 using the provided server and client context factories. 253 254 @param serverOpts: An OpenSSL context factory for the server. 255 @type serverOpts: C{OpenSSLCertificateOptions}, or any class with an 256 equivalent API. 257 258 @param clientOpts: An OpenSSL context factory for the client. 259 @type clientOpts: C{OpenSSLCertificateOptions}, or any class with an 260 equivalent API. 261 262 @return: 5-tuple of server-tls-protocol, server-inner-protocol, 263 client-tls-protocol, client-inner-protocol and L{IOPump} 264 @rtype: L{tuple} 265 """ 266 267 class GreetingServer(protocol.Protocol): 268 greeting = b"greetings!" 269 270 def connectionMade(self): 271 self.transport.write(self.greeting) 272 273 class ListeningClient(protocol.Protocol): 274 data = b"" 275 lostReason = None 276 277 def dataReceived(self, data): 278 self.data += data 279 280 def connectionLost(self, reason): 281 self.lostReason = reason 282 283 clientWrappedProto = ListeningClient() 284 serverWrappedProto = GreetingServer() 285 286 plainClientFactory = protocol.Factory() 287 plainClientFactory.protocol = lambda: clientWrappedProto 288 plainServerFactory = protocol.Factory() 289 plainServerFactory.protocol = lambda: serverWrappedProto 290 291 clientFactory = TLSMemoryBIOFactory( 292 clientOpts, isClient=True, wrappedFactory=plainServerFactory 293 ) 294 serverFactory = TLSMemoryBIOFactory( 295 serverOpts, isClient=False, wrappedFactory=plainClientFactory 296 ) 297 298 sProto, cProto, pump = connectedServerAndClient( 299 lambda: serverFactory.buildProtocol(None), 300 lambda: clientFactory.buildProtocol(None), 301 ) 302 return sProto, cProto, serverWrappedProto, clientWrappedProto, pump 303 304 305def loopbackTLSConnection(trustRoot, privateKeyFile, chainedCertFile=None): 306 """ 307 Create a loopback TLS connection with the given trust and keys. 308 309 @param trustRoot: the C{trustRoot} argument for the client connection's 310 context. 311 @type trustRoot: L{sslverify.IOpenSSLTrustRoot} 312 313 @param privateKeyFile: The name of the file containing the private key. 314 @type privateKeyFile: L{str} (native string; file name) 315 316 @param chainedCertFile: The name of the chained certificate file. 317 @type chainedCertFile: L{str} (native string; file name) 318 319 @return: 3-tuple of server-protocol, client-protocol, and L{IOPump} 320 @rtype: L{tuple} 321 """ 322 323 class ContextFactory: 324 def getContext(self): 325 """ 326 Create a context for the server side of the connection. 327 328 @return: an SSL context using a certificate and key. 329 @rtype: C{OpenSSL.SSL.Context} 330 """ 331 ctx = SSL.Context(SSL.SSLv23_METHOD) 332 if chainedCertFile is not None: 333 ctx.use_certificate_chain_file(chainedCertFile) 334 ctx.use_privatekey_file(privateKeyFile) 335 # Let the test author know if they screwed something up. 336 ctx.check_privatekey() 337 return ctx 338 339 serverOpts = ContextFactory() 340 clientOpts = sslverify.OpenSSLCertificateOptions(trustRoot=trustRoot) 341 342 return _loopbackTLSConnection(serverOpts, clientOpts) 343 344 345def loopbackTLSConnectionInMemory( 346 trustRoot, 347 privateKey, 348 serverCertificate, 349 clientProtocols=None, 350 serverProtocols=None, 351 clientOptions=None, 352): 353 """ 354 Create a loopback TLS connection with the given trust and keys. Like 355 L{loopbackTLSConnection}, but using in-memory certificates and keys rather 356 than writing them to disk. 357 358 @param trustRoot: the C{trustRoot} argument for the client connection's 359 context. 360 @type trustRoot: L{sslverify.IOpenSSLTrustRoot} 361 362 @param privateKey: The private key. 363 @type privateKey: L{str} (native string) 364 365 @param serverCertificate: The certificate used by the server. 366 @type chainedCertFile: L{str} (native string) 367 368 @param clientProtocols: The protocols the client is willing to negotiate 369 using NPN/ALPN. 370 371 @param serverProtocols: The protocols the server is willing to negotiate 372 using NPN/ALPN. 373 374 @param clientOptions: The type of C{OpenSSLCertificateOptions} class to 375 use for the client. Defaults to C{OpenSSLCertificateOptions}. 376 377 @return: 3-tuple of server-protocol, client-protocol, and L{IOPump} 378 @rtype: L{tuple} 379 """ 380 if clientOptions is None: 381 clientOptions = sslverify.OpenSSLCertificateOptions 382 383 clientCertOpts = clientOptions( 384 trustRoot=trustRoot, acceptableProtocols=clientProtocols 385 ) 386 serverCertOpts = sslverify.OpenSSLCertificateOptions( 387 privateKey=privateKey, 388 certificate=serverCertificate, 389 acceptableProtocols=serverProtocols, 390 ) 391 392 return _loopbackTLSConnection(serverCertOpts, clientCertOpts) 393 394 395def pathContainingDumpOf(testCase, *dumpables): 396 """ 397 Create a temporary file to store some serializable-as-PEM objects in, and 398 return its name. 399 400 @param testCase: a test case to use for generating a temporary directory. 401 @type testCase: L{twisted.trial.unittest.TestCase} 402 403 @param dumpables: arguments are objects from pyOpenSSL with a C{dump} 404 method, taking a pyOpenSSL file-type constant, such as 405 L{OpenSSL.crypto.FILETYPE_PEM} or L{OpenSSL.crypto.FILETYPE_ASN1}. 406 @type dumpables: L{tuple} of L{object} with C{dump} method taking L{int} 407 returning L{bytes} 408 409 @return: the path to a file where all of the dumpables were dumped in PEM 410 format. 411 @rtype: L{str} 412 """ 413 fname = testCase.mktemp() 414 with open(fname, "wb") as f: 415 for dumpable in dumpables: 416 f.write(dumpable.dump(FILETYPE_PEM)) 417 return fname 418 419 420class DataCallbackProtocol(protocol.Protocol): 421 def dataReceived(self, data): 422 d, self.factory.onData = self.factory.onData, None 423 if d is not None: 424 d.callback(data) 425 426 def connectionLost(self, reason): 427 d, self.factory.onLost = self.factory.onLost, None 428 if d is not None: 429 d.errback(reason) 430 431 432class WritingProtocol(protocol.Protocol): 433 byte = b"x" 434 435 def connectionMade(self): 436 self.transport.write(self.byte) 437 438 def connectionLost(self, reason): 439 self.factory.onLost.errback(reason) 440 441 442class FakeContext: 443 """ 444 Introspectable fake of an C{OpenSSL.SSL.Context}. 445 446 Saves call arguments for later introspection. 447 448 Necessary because C{Context} offers poor introspection. cf. this 449 U{pyOpenSSL bug<https://bugs.launchpad.net/pyopenssl/+bug/1173899>}. 450 451 @ivar _method: See C{method} parameter of L{__init__}. 452 453 @ivar _options: L{int} of C{OR}ed values from calls of L{set_options}. 454 455 @ivar _certificate: Set by L{use_certificate}. 456 457 @ivar _privateKey: Set by L{use_privatekey}. 458 459 @ivar _verify: Set by L{set_verify}. 460 461 @ivar _verifyDepth: Set by L{set_verify_depth}. 462 463 @ivar _mode: Set by L{set_mode}. 464 465 @ivar _sessionID: Set by L{set_session_id}. 466 467 @ivar _extraCertChain: Accumulated L{list} of all extra certificates added 468 by L{add_extra_chain_cert}. 469 470 @ivar _cipherList: Set by L{set_cipher_list}. 471 472 @ivar _dhFilename: Set by L{load_tmp_dh}. 473 474 @ivar _defaultVerifyPathsSet: Set by L{set_default_verify_paths} 475 476 @ivar _ecCurve: Set by L{set_tmp_ecdh} 477 """ 478 479 _options = 0 480 481 def __init__(self, method): 482 self._method = method 483 self._extraCertChain = [] 484 self._defaultVerifyPathsSet = False 485 self._ecCurve = None 486 487 # Note that this value is explicitly documented as the default by 488 # https://www.openssl.org/docs/man1.1.1/man3/ 489 # SSL_CTX_set_session_cache_mode.html 490 self._sessionCacheMode = SSL.SESS_CACHE_SERVER 491 492 def set_options(self, options): 493 self._options |= options 494 495 def use_certificate(self, certificate): 496 self._certificate = certificate 497 498 def use_privatekey(self, privateKey): 499 self._privateKey = privateKey 500 501 def check_privatekey(self): 502 return None 503 504 def set_mode(self, mode): 505 """ 506 Set the mode. See L{SSL.Context.set_mode}. 507 508 @param mode: See L{SSL.Context.set_mode}. 509 """ 510 self._mode = mode 511 512 def set_verify(self, flags, callback): 513 self._verify = flags, callback 514 515 def set_verify_depth(self, depth): 516 self._verifyDepth = depth 517 518 def set_session_id(self, sessionIDContext): 519 # This fake should change when the upstream changes: 520 # https://github.com/pyca/pyopenssl/issues/845 521 self._sessionIDContext = sessionIDContext 522 523 def set_session_cache_mode(self, cacheMode): 524 """ 525 Set the session cache mode on the context, as per 526 L{SSL.Context.set_session_cache_mode}. 527 """ 528 self._sessionCacheMode = cacheMode 529 530 def get_session_cache_mode(self): 531 """ 532 Retrieve the session cache mode from the context, as per 533 L{SSL.Context.get_session_cache_mode}. 534 """ 535 return self._sessionCacheMode 536 537 def add_extra_chain_cert(self, cert): 538 self._extraCertChain.append(cert) 539 540 def set_cipher_list(self, cipherList): 541 self._cipherList = cipherList 542 543 def load_tmp_dh(self, dhfilename): 544 self._dhFilename = dhfilename 545 546 def set_default_verify_paths(self): 547 """ 548 Set the default paths for the platform. 549 """ 550 self._defaultVerifyPathsSet = True 551 552 def set_tmp_ecdh(self, curve): 553 """ 554 Set an ECDH curve. Should only be called by OpenSSL 1.0.1 555 code. 556 557 @param curve: See L{OpenSSL.SSL.Context.set_tmp_ecdh} 558 """ 559 self._ecCurve = curve 560 561 562class ClientOptionsTests(SynchronousTestCase): 563 """ 564 Tests for L{sslverify.optionsForClientTLS}. 565 """ 566 567 if skipSSL: 568 skip = skipSSL 569 570 def test_extraKeywords(self): 571 """ 572 When passed a keyword parameter other than C{extraCertificateOptions}, 573 L{sslverify.optionsForClientTLS} raises an exception just like a 574 normal Python function would. 575 """ 576 error = self.assertRaises( 577 TypeError, 578 sslverify.optionsForClientTLS, 579 hostname="alpha", 580 someRandomThing="beta", 581 ) 582 self.assertEqual( 583 str(error), 584 "optionsForClientTLS() got an unexpected keyword argument " 585 "'someRandomThing'", 586 ) 587 588 def test_bytesFailFast(self): 589 """ 590 If you pass L{bytes} as the hostname to 591 L{sslverify.optionsForClientTLS} it immediately raises a L{TypeError}. 592 """ 593 error = self.assertRaises( 594 TypeError, sslverify.optionsForClientTLS, b"not-actually-a-hostname.com" 595 ) 596 expectedText = ( 597 "optionsForClientTLS requires text for host names, not " + bytes.__name__ 598 ) 599 self.assertEqual(str(error), expectedText) 600 601 def test_dNSNameHostname(self): 602 """ 603 If you pass a dNSName to L{sslverify.optionsForClientTLS} 604 L{_hostnameIsDnsName} will be True 605 """ 606 options = sslverify.optionsForClientTLS("example.com") 607 self.assertTrue(options._hostnameIsDnsName) 608 609 def test_IPv4AddressHostname(self): 610 """ 611 If you pass an IPv4 address to L{sslverify.optionsForClientTLS} 612 L{_hostnameIsDnsName} will be False 613 """ 614 options = sslverify.optionsForClientTLS("127.0.0.1") 615 self.assertFalse(options._hostnameIsDnsName) 616 617 def test_IPv6AddressHostname(self): 618 """ 619 If you pass an IPv6 address to L{sslverify.optionsForClientTLS} 620 L{_hostnameIsDnsName} will be False 621 """ 622 options = sslverify.optionsForClientTLS("::1") 623 self.assertFalse(options._hostnameIsDnsName) 624 625 626class FakeChooseDiffieHellmanEllipticCurve: 627 """ 628 A fake implementation of L{_ChooseDiffieHellmanEllipticCurve} 629 """ 630 631 def __init__(self, versionNumber, openSSLlib, openSSLcrypto): 632 """ 633 A no-op constructor. 634 """ 635 636 def configureECDHCurve(self, ctx): 637 """ 638 A null configuration. 639 640 @param ctx: An L{OpenSSL.SSL.Context} that would be 641 configured. 642 """ 643 644 645class OpenSSLOptionsTestsMixin: 646 """ 647 A mixin for L{OpenSSLOptions} test cases creates client and server 648 certificates, signs them with a CA, and provides a L{loopback} 649 that creates TLS a connections with them. 650 """ 651 652 if skipSSL: 653 skip = skipSSL 654 655 serverPort = clientConn = None 656 onServerLost = onClientLost = None 657 658 def setUp(self): 659 """ 660 Create class variables of client and server certificates. 661 """ 662 self.sKey, self.sCert = makeCertificate( 663 O=b"Server Test Certificate", CN=b"server" 664 ) 665 self.cKey, self.cCert = makeCertificate( 666 O=b"Client Test Certificate", CN=b"client" 667 ) 668 self.caCert1 = makeCertificate(O=b"CA Test Certificate 1", CN=b"ca1")[1] 669 self.caCert2 = makeCertificate(O=b"CA Test Certificate", CN=b"ca2")[1] 670 self.caCerts = [self.caCert1, self.caCert2] 671 self.extraCertChain = self.caCerts 672 673 def tearDown(self): 674 if self.serverPort is not None: 675 self.serverPort.stopListening() 676 if self.clientConn is not None: 677 self.clientConn.disconnect() 678 679 L = [] 680 if self.onServerLost is not None: 681 L.append(self.onServerLost) 682 if self.onClientLost is not None: 683 L.append(self.onClientLost) 684 685 return defer.DeferredList(L, consumeErrors=True) 686 687 def loopback( 688 self, 689 serverCertOpts, 690 clientCertOpts, 691 onServerLost=None, 692 onClientLost=None, 693 onData=None, 694 ): 695 if onServerLost is None: 696 self.onServerLost = onServerLost = defer.Deferred() 697 if onClientLost is None: 698 self.onClientLost = onClientLost = defer.Deferred() 699 if onData is None: 700 onData = defer.Deferred() 701 702 serverFactory = protocol.ServerFactory() 703 serverFactory.protocol = DataCallbackProtocol 704 serverFactory.onLost = onServerLost 705 serverFactory.onData = onData 706 707 clientFactory = protocol.ClientFactory() 708 clientFactory.protocol = WritingProtocol 709 clientFactory.onLost = onClientLost 710 711 self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts) 712 self.clientConn = reactor.connectSSL( 713 "127.0.0.1", self.serverPort.getHost().port, clientFactory, clientCertOpts 714 ) 715 716 717class OpenSSLOptionsTests(OpenSSLOptionsTestsMixin, TestCase): 718 """ 719 Tests for L{sslverify.OpenSSLOptions}. 720 """ 721 722 def setUp(self): 723 """ 724 Same as L{OpenSSLOptionsTestsMixin.setUp}, but it also patches 725 L{sslverify._ChooseDiffieHellmanEllipticCurve}. 726 """ 727 super().setUp() 728 self.patch( 729 sslverify, 730 "_ChooseDiffieHellmanEllipticCurve", 731 FakeChooseDiffieHellmanEllipticCurve, 732 ) 733 734 def test_constructorWithOnlyPrivateKey(self): 735 """ 736 C{privateKey} and C{certificate} make only sense if both are set. 737 """ 738 self.assertRaises( 739 ValueError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey 740 ) 741 742 def test_constructorWithOnlyCertificate(self): 743 """ 744 C{privateKey} and C{certificate} make only sense if both are set. 745 """ 746 self.assertRaises( 747 ValueError, sslverify.OpenSSLCertificateOptions, certificate=self.sCert 748 ) 749 750 def test_constructorWithCertificateAndPrivateKey(self): 751 """ 752 Specifying C{privateKey} and C{certificate} initializes correctly. 753 """ 754 opts = sslverify.OpenSSLCertificateOptions( 755 privateKey=self.sKey, certificate=self.sCert 756 ) 757 self.assertEqual(opts.privateKey, self.sKey) 758 self.assertEqual(opts.certificate, self.sCert) 759 self.assertEqual(opts.extraCertChain, []) 760 761 def test_constructorDoesNotAllowVerifyWithoutCACerts(self): 762 """ 763 C{verify} must not be C{True} without specifying C{caCerts}. 764 """ 765 self.assertRaises( 766 ValueError, 767 sslverify.OpenSSLCertificateOptions, 768 privateKey=self.sKey, 769 certificate=self.sCert, 770 verify=True, 771 ) 772 773 def test_constructorDoesNotAllowLegacyWithTrustRoot(self): 774 """ 775 C{verify}, C{requireCertificate}, and C{caCerts} must not be specified 776 by the caller (to be I{any} value, even the default!) when specifying 777 C{trustRoot}. 778 """ 779 self.assertRaises( 780 TypeError, 781 sslverify.OpenSSLCertificateOptions, 782 privateKey=self.sKey, 783 certificate=self.sCert, 784 verify=True, 785 trustRoot=None, 786 caCerts=self.caCerts, 787 ) 788 self.assertRaises( 789 TypeError, 790 sslverify.OpenSSLCertificateOptions, 791 privateKey=self.sKey, 792 certificate=self.sCert, 793 trustRoot=None, 794 requireCertificate=True, 795 ) 796 797 def test_constructorAllowsCACertsWithoutVerify(self): 798 """ 799 It's currently a NOP, but valid. 800 """ 801 opts = sslverify.OpenSSLCertificateOptions( 802 privateKey=self.sKey, certificate=self.sCert, caCerts=self.caCerts 803 ) 804 self.assertFalse(opts.verify) 805 self.assertEqual(self.caCerts, opts.caCerts) 806 807 def test_constructorWithVerifyAndCACerts(self): 808 """ 809 Specifying C{verify} and C{caCerts} initializes correctly. 810 """ 811 opts = sslverify.OpenSSLCertificateOptions( 812 privateKey=self.sKey, 813 certificate=self.sCert, 814 verify=True, 815 caCerts=self.caCerts, 816 ) 817 self.assertTrue(opts.verify) 818 self.assertEqual(self.caCerts, opts.caCerts) 819 820 def test_constructorSetsExtraChain(self): 821 """ 822 Setting C{extraCertChain} works if C{certificate} and C{privateKey} are 823 set along with it. 824 """ 825 opts = sslverify.OpenSSLCertificateOptions( 826 privateKey=self.sKey, 827 certificate=self.sCert, 828 extraCertChain=self.extraCertChain, 829 ) 830 self.assertEqual(self.extraCertChain, opts.extraCertChain) 831 832 def test_constructorDoesNotAllowExtraChainWithoutPrivateKey(self): 833 """ 834 A C{extraCertChain} without C{privateKey} doesn't make sense and is 835 thus rejected. 836 """ 837 self.assertRaises( 838 ValueError, 839 sslverify.OpenSSLCertificateOptions, 840 certificate=self.sCert, 841 extraCertChain=self.extraCertChain, 842 ) 843 844 def test_constructorDoesNotAllowExtraChainWithOutPrivateKey(self): 845 """ 846 A C{extraCertChain} without C{certificate} doesn't make sense and is 847 thus rejected. 848 """ 849 self.assertRaises( 850 ValueError, 851 sslverify.OpenSSLCertificateOptions, 852 privateKey=self.sKey, 853 extraCertChain=self.extraCertChain, 854 ) 855 856 def test_extraChainFilesAreAddedIfSupplied(self): 857 """ 858 If C{extraCertChain} is set and all prerequisites are met, the 859 specified chain certificates are added to C{Context}s that get 860 created. 861 """ 862 opts = sslverify.OpenSSLCertificateOptions( 863 privateKey=self.sKey, 864 certificate=self.sCert, 865 extraCertChain=self.extraCertChain, 866 ) 867 opts._contextFactory = FakeContext 868 ctx = opts.getContext() 869 self.assertEqual(self.sKey, ctx._privateKey) 870 self.assertEqual(self.sCert, ctx._certificate) 871 self.assertEqual(self.extraCertChain, ctx._extraCertChain) 872 873 def test_extraChainDoesNotBreakPyOpenSSL(self): 874 """ 875 C{extraCertChain} doesn't break C{OpenSSL.SSL.Context} creation. 876 """ 877 opts = sslverify.OpenSSLCertificateOptions( 878 privateKey=self.sKey, 879 certificate=self.sCert, 880 extraCertChain=self.extraCertChain, 881 ) 882 ctx = opts.getContext() 883 self.assertIsInstance(ctx, SSL.Context) 884 885 def test_acceptableCiphersAreAlwaysSet(self): 886 """ 887 If the user doesn't supply custom acceptable ciphers, a shipped secure 888 default is used. We can't check directly for it because the effective 889 cipher string we set varies with platforms. 890 """ 891 opts = sslverify.OpenSSLCertificateOptions( 892 privateKey=self.sKey, 893 certificate=self.sCert, 894 ) 895 opts._contextFactory = FakeContext 896 ctx = opts.getContext() 897 self.assertEqual(opts._cipherString.encode("ascii"), ctx._cipherList) 898 899 def test_givesMeaningfulErrorMessageIfNoCipherMatches(self): 900 """ 901 If there is no valid cipher that matches the user's wishes, 902 a L{ValueError} is raised. 903 """ 904 self.assertRaises( 905 ValueError, 906 sslverify.OpenSSLCertificateOptions, 907 privateKey=self.sKey, 908 certificate=self.sCert, 909 acceptableCiphers=sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString( 910 "" 911 ), 912 ) 913 914 def test_honorsAcceptableCiphersArgument(self): 915 """ 916 If acceptable ciphers are passed, they are used. 917 """ 918 919 @implementer(interfaces.IAcceptableCiphers) 920 class FakeAcceptableCiphers: 921 def selectCiphers(self, _): 922 return [sslverify.OpenSSLCipher("sentinel")] 923 924 opts = sslverify.OpenSSLCertificateOptions( 925 privateKey=self.sKey, 926 certificate=self.sCert, 927 acceptableCiphers=FakeAcceptableCiphers(), 928 ) 929 opts._contextFactory = FakeContext 930 ctx = opts.getContext() 931 self.assertEqual(b"sentinel", ctx._cipherList) 932 933 def test_basicSecurityOptionsAreSet(self): 934 """ 935 Every context must have C{OP_NO_SSLv2}, C{OP_NO_COMPRESSION}, and 936 C{OP_CIPHER_SERVER_PREFERENCE} set. 937 """ 938 opts = sslverify.OpenSSLCertificateOptions( 939 privateKey=self.sKey, 940 certificate=self.sCert, 941 ) 942 opts._contextFactory = FakeContext 943 ctx = opts.getContext() 944 options = ( 945 SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE 946 ) 947 self.assertEqual(options, ctx._options & options) 948 949 def test_modeIsSet(self): 950 """ 951 Every context must be in C{MODE_RELEASE_BUFFERS} mode. 952 """ 953 opts = sslverify.OpenSSLCertificateOptions( 954 privateKey=self.sKey, 955 certificate=self.sCert, 956 ) 957 opts._contextFactory = FakeContext 958 ctx = opts.getContext() 959 self.assertEqual(SSL.MODE_RELEASE_BUFFERS, ctx._mode) 960 961 def test_singleUseKeys(self): 962 """ 963 If C{singleUseKeys} is set, every context must have 964 C{OP_SINGLE_DH_USE} and C{OP_SINGLE_ECDH_USE} set. 965 """ 966 opts = sslverify.OpenSSLCertificateOptions( 967 privateKey=self.sKey, 968 certificate=self.sCert, 969 enableSingleUseKeys=True, 970 ) 971 opts._contextFactory = FakeContext 972 ctx = opts.getContext() 973 options = SSL.OP_SINGLE_DH_USE | SSL.OP_SINGLE_ECDH_USE 974 self.assertEqual(options, ctx._options & options) 975 976 def test_methodIsDeprecated(self): 977 """ 978 Passing C{method} to L{sslverify.OpenSSLCertificateOptions} is 979 deprecated. 980 """ 981 sslverify.OpenSSLCertificateOptions( 982 privateKey=self.sKey, 983 certificate=self.sCert, 984 method=SSL.SSLv23_METHOD, 985 ) 986 987 message = ( 988 "Passing method to twisted.internet.ssl.CertificateOptions " 989 "was deprecated in Twisted 17.1.0. Please use a " 990 "combination of insecurelyLowerMinimumTo, raiseMinimumTo, " 991 "and lowerMaximumSecurityTo instead, as Twisted will " 992 "correctly configure the method." 993 ) 994 995 warnings = self.flushWarnings([self.test_methodIsDeprecated]) 996 self.assertEqual(1, len(warnings)) 997 self.assertEqual(DeprecationWarning, warnings[0]["category"]) 998 self.assertEqual(message, warnings[0]["message"]) 999 1000 def test_tlsv1ByDefault(self): 1001 """ 1002 L{sslverify.OpenSSLCertificateOptions} will make the default minimum 1003 TLS version v1.0, if no C{method}, or C{insecurelyLowerMinimumTo} is 1004 given. 1005 """ 1006 opts = sslverify.OpenSSLCertificateOptions( 1007 privateKey=self.sKey, certificate=self.sCert 1008 ) 1009 opts._contextFactory = FakeContext 1010 ctx = opts.getContext() 1011 options = ( 1012 SSL.OP_NO_SSLv2 1013 | SSL.OP_NO_COMPRESSION 1014 | SSL.OP_CIPHER_SERVER_PREFERENCE 1015 | SSL.OP_NO_SSLv3 1016 ) 1017 self.assertEqual(options, ctx._options & options) 1018 1019 def test_tlsProtocolsAtLeastWithMinimum(self): 1020 """ 1021 Passing C{insecurelyLowerMinimumTo} along with C{raiseMinimumTo} to 1022 L{sslverify.OpenSSLCertificateOptions} will cause it to raise an 1023 exception. 1024 """ 1025 with self.assertRaises(TypeError) as e: 1026 sslverify.OpenSSLCertificateOptions( 1027 privateKey=self.sKey, 1028 certificate=self.sCert, 1029 raiseMinimumTo=sslverify.TLSVersion.TLSv1_2, 1030 insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2, 1031 ) 1032 1033 self.assertIn("raiseMinimumTo", e.exception.args[0]) 1034 self.assertIn("insecurelyLowerMinimumTo", e.exception.args[0]) 1035 self.assertIn("exclusive", e.exception.args[0]) 1036 1037 def test_tlsProtocolsNoMethodWithAtLeast(self): 1038 """ 1039 Passing C{raiseMinimumTo} along with C{method} to 1040 L{sslverify.OpenSSLCertificateOptions} will cause it to raise an 1041 exception. 1042 """ 1043 with self.assertRaises(TypeError) as e: 1044 sslverify.OpenSSLCertificateOptions( 1045 privateKey=self.sKey, 1046 certificate=self.sCert, 1047 method=SSL.SSLv23_METHOD, 1048 raiseMinimumTo=sslverify.TLSVersion.TLSv1_2, 1049 ) 1050 1051 self.assertIn("method", e.exception.args[0]) 1052 self.assertIn("raiseMinimumTo", e.exception.args[0]) 1053 self.assertIn("exclusive", e.exception.args[0]) 1054 1055 def test_tlsProtocolsNoMethodWithMinimum(self): 1056 """ 1057 Passing C{insecurelyLowerMinimumTo} along with C{method} to 1058 L{sslverify.OpenSSLCertificateOptions} will cause it to raise an 1059 exception. 1060 """ 1061 with self.assertRaises(TypeError) as e: 1062 sslverify.OpenSSLCertificateOptions( 1063 privateKey=self.sKey, 1064 certificate=self.sCert, 1065 method=SSL.SSLv23_METHOD, 1066 insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2, 1067 ) 1068 1069 self.assertIn("method", e.exception.args[0]) 1070 self.assertIn("insecurelyLowerMinimumTo", e.exception.args[0]) 1071 self.assertIn("exclusive", e.exception.args[0]) 1072 1073 def test_tlsProtocolsNoMethodWithMaximum(self): 1074 """ 1075 Passing C{lowerMaximumSecurityTo} along with C{method} to 1076 L{sslverify.OpenSSLCertificateOptions} will cause it to raise an 1077 exception. 1078 """ 1079 with self.assertRaises(TypeError) as e: 1080 sslverify.OpenSSLCertificateOptions( 1081 privateKey=self.sKey, 1082 certificate=self.sCert, 1083 method=SSL.SSLv23_METHOD, 1084 lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2, 1085 ) 1086 1087 self.assertIn("method", e.exception.args[0]) 1088 self.assertIn("lowerMaximumSecurityTo", e.exception.args[0]) 1089 self.assertIn("exclusive", e.exception.args[0]) 1090 1091 def test_tlsVersionRangeInOrder(self): 1092 """ 1093 Passing out of order TLS versions to C{insecurelyLowerMinimumTo} and 1094 C{lowerMaximumSecurityTo} will cause it to raise an exception. 1095 """ 1096 with self.assertRaises(ValueError) as e: 1097 sslverify.OpenSSLCertificateOptions( 1098 privateKey=self.sKey, 1099 certificate=self.sCert, 1100 insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0, 1101 lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3, 1102 ) 1103 1104 self.assertEqual( 1105 e.exception.args, 1106 ( 1107 ( 1108 "insecurelyLowerMinimumTo needs to be lower than " 1109 "lowerMaximumSecurityTo" 1110 ), 1111 ), 1112 ) 1113 1114 def test_tlsVersionRangeInOrderAtLeast(self): 1115 """ 1116 Passing out of order TLS versions to C{raiseMinimumTo} and 1117 C{lowerMaximumSecurityTo} will cause it to raise an exception. 1118 """ 1119 with self.assertRaises(ValueError) as e: 1120 sslverify.OpenSSLCertificateOptions( 1121 privateKey=self.sKey, 1122 certificate=self.sCert, 1123 raiseMinimumTo=sslverify.TLSVersion.TLSv1_0, 1124 lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3, 1125 ) 1126 1127 self.assertEqual( 1128 e.exception.args, 1129 (("raiseMinimumTo needs to be lower than " "lowerMaximumSecurityTo"),), 1130 ) 1131 1132 def test_tlsProtocolsreduceToMaxWithoutMin(self): 1133 """ 1134 When calling L{sslverify.OpenSSLCertificateOptions} with 1135 C{lowerMaximumSecurityTo} but no C{raiseMinimumTo} or 1136 C{insecurelyLowerMinimumTo} set, and C{lowerMaximumSecurityTo} is 1137 below the minimum default, the minimum will be made the new maximum. 1138 """ 1139 opts = sslverify.OpenSSLCertificateOptions( 1140 privateKey=self.sKey, 1141 certificate=self.sCert, 1142 lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3, 1143 ) 1144 opts._contextFactory = FakeContext 1145 ctx = opts.getContext() 1146 options = ( 1147 SSL.OP_NO_SSLv2 1148 | SSL.OP_NO_COMPRESSION 1149 | SSL.OP_CIPHER_SERVER_PREFERENCE 1150 | SSL.OP_NO_TLSv1 1151 | SSL.OP_NO_TLSv1_1 1152 | SSL.OP_NO_TLSv1_2 1153 | opts._OP_NO_TLSv1_3 1154 ) 1155 self.assertEqual(options, ctx._options & options) 1156 1157 def test_tlsProtocolsSSLv3Only(self): 1158 """ 1159 When calling L{sslverify.OpenSSLCertificateOptions} with 1160 C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to 1161 SSLv3, it will exclude all others. 1162 """ 1163 opts = sslverify.OpenSSLCertificateOptions( 1164 privateKey=self.sKey, 1165 certificate=self.sCert, 1166 insecurelyLowerMinimumTo=sslverify.TLSVersion.SSLv3, 1167 lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3, 1168 ) 1169 opts._contextFactory = FakeContext 1170 ctx = opts.getContext() 1171 options = ( 1172 SSL.OP_NO_SSLv2 1173 | SSL.OP_NO_COMPRESSION 1174 | SSL.OP_CIPHER_SERVER_PREFERENCE 1175 | SSL.OP_NO_TLSv1 1176 | SSL.OP_NO_TLSv1_1 1177 | SSL.OP_NO_TLSv1_2 1178 | opts._OP_NO_TLSv1_3 1179 ) 1180 self.assertEqual(options, ctx._options & options) 1181 1182 def test_tlsProtocolsTLSv1Point0Only(self): 1183 """ 1184 When calling L{sslverify.OpenSSLCertificateOptions} with 1185 C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.0, 1186 it will exclude all others. 1187 """ 1188 opts = sslverify.OpenSSLCertificateOptions( 1189 privateKey=self.sKey, 1190 certificate=self.sCert, 1191 insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0, 1192 lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_0, 1193 ) 1194 opts._contextFactory = FakeContext 1195 ctx = opts.getContext() 1196 options = ( 1197 SSL.OP_NO_SSLv2 1198 | SSL.OP_NO_COMPRESSION 1199 | SSL.OP_CIPHER_SERVER_PREFERENCE 1200 | SSL.OP_NO_SSLv3 1201 | SSL.OP_NO_TLSv1_1 1202 | SSL.OP_NO_TLSv1_2 1203 | opts._OP_NO_TLSv1_3 1204 ) 1205 self.assertEqual(options, ctx._options & options) 1206 1207 def test_tlsProtocolsTLSv1Point1Only(self): 1208 """ 1209 When calling L{sslverify.OpenSSLCertificateOptions} with 1210 C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.1, 1211 it will exclude all others. 1212 """ 1213 opts = sslverify.OpenSSLCertificateOptions( 1214 privateKey=self.sKey, 1215 certificate=self.sCert, 1216 insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_1, 1217 lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_1, 1218 ) 1219 opts._contextFactory = FakeContext 1220 ctx = opts.getContext() 1221 options = ( 1222 SSL.OP_NO_SSLv2 1223 | SSL.OP_NO_COMPRESSION 1224 | SSL.OP_CIPHER_SERVER_PREFERENCE 1225 | SSL.OP_NO_SSLv3 1226 | SSL.OP_NO_TLSv1 1227 | SSL.OP_NO_TLSv1_2 1228 | opts._OP_NO_TLSv1_3 1229 ) 1230 self.assertEqual(options, ctx._options & options) 1231 1232 def test_tlsProtocolsTLSv1Point2Only(self): 1233 """ 1234 When calling L{sslverify.OpenSSLCertificateOptions} with 1235 C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.2, 1236 it will exclude all others. 1237 """ 1238 opts = sslverify.OpenSSLCertificateOptions( 1239 privateKey=self.sKey, 1240 certificate=self.sCert, 1241 insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2, 1242 lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2, 1243 ) 1244 opts._contextFactory = FakeContext 1245 ctx = opts.getContext() 1246 options = ( 1247 SSL.OP_NO_SSLv2 1248 | SSL.OP_NO_COMPRESSION 1249 | SSL.OP_CIPHER_SERVER_PREFERENCE 1250 | SSL.OP_NO_SSLv3 1251 | SSL.OP_NO_TLSv1 1252 | SSL.OP_NO_TLSv1_1 1253 | opts._OP_NO_TLSv1_3 1254 ) 1255 self.assertEqual(options, ctx._options & options) 1256 1257 def test_tlsProtocolsAllModernTLS(self): 1258 """ 1259 When calling L{sslverify.OpenSSLCertificateOptions} with 1260 C{insecurelyLowerMinimumTo} set to TLSv1.0 and 1261 C{lowerMaximumSecurityTo} to TLSv1.2, it will exclude both SSLs and 1262 the (unreleased) TLSv1.3. 1263 """ 1264 opts = sslverify.OpenSSLCertificateOptions( 1265 privateKey=self.sKey, 1266 certificate=self.sCert, 1267 insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0, 1268 lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2, 1269 ) 1270 opts._contextFactory = FakeContext 1271 ctx = opts.getContext() 1272 options = ( 1273 SSL.OP_NO_SSLv2 1274 | SSL.OP_NO_COMPRESSION 1275 | SSL.OP_CIPHER_SERVER_PREFERENCE 1276 | SSL.OP_NO_SSLv3 1277 | opts._OP_NO_TLSv1_3 1278 ) 1279 self.assertEqual(options, ctx._options & options) 1280 1281 def test_tlsProtocolsAtLeastAllSecureTLS(self): 1282 """ 1283 When calling L{sslverify.OpenSSLCertificateOptions} with 1284 C{raiseMinimumTo} set to TLSv1.2, it will ignore all TLSs below 1285 1.2 and SSL. 1286 """ 1287 opts = sslverify.OpenSSLCertificateOptions( 1288 privateKey=self.sKey, 1289 certificate=self.sCert, 1290 raiseMinimumTo=sslverify.TLSVersion.TLSv1_2, 1291 ) 1292 opts._contextFactory = FakeContext 1293 ctx = opts.getContext() 1294 options = ( 1295 SSL.OP_NO_SSLv2 1296 | SSL.OP_NO_COMPRESSION 1297 | SSL.OP_CIPHER_SERVER_PREFERENCE 1298 | SSL.OP_NO_SSLv3 1299 | SSL.OP_NO_TLSv1 1300 | SSL.OP_NO_TLSv1_1 1301 ) 1302 self.assertEqual(options, ctx._options & options) 1303 1304 def test_tlsProtocolsAtLeastWillAcceptHigherDefault(self): 1305 """ 1306 When calling L{sslverify.OpenSSLCertificateOptions} with 1307 C{raiseMinimumTo} set to a value lower than Twisted's default will 1308 cause it to use the more secure default. 1309 """ 1310 opts = sslverify.OpenSSLCertificateOptions( 1311 privateKey=self.sKey, 1312 certificate=self.sCert, 1313 raiseMinimumTo=sslverify.TLSVersion.SSLv3, 1314 ) 1315 opts._contextFactory = FakeContext 1316 ctx = opts.getContext() 1317 # Future maintainer warning: this will break if we change our default 1318 # up, so you should change it to add the relevant OP_NO flags when we 1319 # do make that change and this test fails. 1320 options = ( 1321 SSL.OP_NO_SSLv2 1322 | SSL.OP_NO_COMPRESSION 1323 | SSL.OP_CIPHER_SERVER_PREFERENCE 1324 | SSL.OP_NO_SSLv3 1325 ) 1326 self.assertEqual(options, ctx._options & options) 1327 self.assertEqual(opts._defaultMinimumTLSVersion, sslverify.TLSVersion.TLSv1_0) 1328 1329 def test_tlsProtocolsAllSecureTLS(self): 1330 """ 1331 When calling L{sslverify.OpenSSLCertificateOptions} with 1332 C{insecurelyLowerMinimumTo} set to TLSv1.2, it will ignore all TLSs below 1333 1.2 and SSL. 1334 """ 1335 opts = sslverify.OpenSSLCertificateOptions( 1336 privateKey=self.sKey, 1337 certificate=self.sCert, 1338 insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2, 1339 ) 1340 opts._contextFactory = FakeContext 1341 ctx = opts.getContext() 1342 options = ( 1343 SSL.OP_NO_SSLv2 1344 | SSL.OP_NO_COMPRESSION 1345 | SSL.OP_CIPHER_SERVER_PREFERENCE 1346 | SSL.OP_NO_SSLv3 1347 | SSL.OP_NO_TLSv1 1348 | SSL.OP_NO_TLSv1_1 1349 ) 1350 self.assertEqual(options, ctx._options & options) 1351 1352 def test_dhParams(self): 1353 """ 1354 If C{dhParams} is set, they are loaded into each new context. 1355 """ 1356 1357 class FakeDiffieHellmanParameters: 1358 _dhFile = FilePath(b"dh.params") 1359 1360 dhParams = FakeDiffieHellmanParameters() 1361 opts = sslverify.OpenSSLCertificateOptions( 1362 privateKey=self.sKey, 1363 certificate=self.sCert, 1364 dhParameters=dhParams, 1365 ) 1366 opts._contextFactory = FakeContext 1367 ctx = opts.getContext() 1368 self.assertEqual(FakeDiffieHellmanParameters._dhFile.path, ctx._dhFilename) 1369 1370 def test_abbreviatingDistinguishedNames(self): 1371 """ 1372 Check that abbreviations used in certificates correctly map to 1373 complete names. 1374 """ 1375 self.assertEqual( 1376 sslverify.DN(CN=b"a", OU=b"hello"), 1377 sslverify.DistinguishedName( 1378 commonName=b"a", organizationalUnitName=b"hello" 1379 ), 1380 ) 1381 self.assertNotEqual( 1382 sslverify.DN(CN=b"a", OU=b"hello"), 1383 sslverify.DN(CN=b"a", OU=b"hello", emailAddress=b"xxx"), 1384 ) 1385 dn = sslverify.DN(CN=b"abcdefg") 1386 self.assertRaises(AttributeError, setattr, dn, "Cn", b"x") 1387 self.assertEqual(dn.CN, dn.commonName) 1388 dn.CN = b"bcdefga" 1389 self.assertEqual(dn.CN, dn.commonName) 1390 1391 def testInspectDistinguishedName(self): 1392 n = sslverify.DN( 1393 commonName=b"common name", 1394 organizationName=b"organization name", 1395 organizationalUnitName=b"organizational unit name", 1396 localityName=b"locality name", 1397 stateOrProvinceName=b"state or province name", 1398 countryName=b"country name", 1399 emailAddress=b"email address", 1400 ) 1401 s = n.inspect() 1402 for k in [ 1403 "common name", 1404 "organization name", 1405 "organizational unit name", 1406 "locality name", 1407 "state or province name", 1408 "country name", 1409 "email address", 1410 ]: 1411 self.assertIn(k, s, f"{k!r} was not in inspect output.") 1412 self.assertIn(k.title(), s, f"{k!r} was not in inspect output.") 1413 1414 def testInspectDistinguishedNameWithoutAllFields(self): 1415 n = sslverify.DN(localityName=b"locality name") 1416 s = n.inspect() 1417 for k in [ 1418 "common name", 1419 "organization name", 1420 "organizational unit name", 1421 "state or province name", 1422 "country name", 1423 "email address", 1424 ]: 1425 self.assertNotIn(k, s, f"{k!r} was in inspect output.") 1426 self.assertNotIn(k.title(), s, f"{k!r} was in inspect output.") 1427 self.assertIn("locality name", s) 1428 self.assertIn("Locality Name", s) 1429 1430 def test_inspectCertificate(self): 1431 """ 1432 Test that the C{inspect} method of L{sslverify.Certificate} returns 1433 a human-readable string containing some basic information about the 1434 certificate. 1435 """ 1436 c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) 1437 pk = c.getPublicKey() 1438 keyHash = pk.keyHash() 1439 # Maintenance Note: the algorithm used to compute the "public key hash" 1440 # is highly dubious and can differ between underlying versions of 1441 # OpenSSL (and across versions of Twisted), since it is not actually 1442 # the hash of the public key by itself. If we can get the appropriate 1443 # APIs to get the hash of the key itself out of OpenSSL, then we should 1444 # be able to make it statically declared inline below again rather than 1445 # computing it here. 1446 self.assertEqual( 1447 c.inspect().split("\n"), 1448 [ 1449 "Certificate For Subject:", 1450 " Common Name: example.twistedmatrix.com", 1451 " Country Name: US", 1452 " Email Address: nobody@twistedmatrix.com", 1453 " Locality Name: Boston", 1454 " Organization Name: Twisted Matrix Labs", 1455 " Organizational Unit Name: Security", 1456 " State Or Province Name: Massachusetts", 1457 "", 1458 "Issuer:", 1459 " Common Name: example.twistedmatrix.com", 1460 " Country Name: US", 1461 " Email Address: nobody@twistedmatrix.com", 1462 " Locality Name: Boston", 1463 " Organization Name: Twisted Matrix Labs", 1464 " Organizational Unit Name: Security", 1465 " State Or Province Name: Massachusetts", 1466 "", 1467 "Serial Number: 12345", 1468 "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18", 1469 "Public Key with Hash: " + keyHash, 1470 ], 1471 ) 1472 1473 def test_publicKeyMatching(self): 1474 """ 1475 L{PublicKey.matches} returns L{True} for keys from certificates with 1476 the same key, and L{False} for keys from certificates with different 1477 keys. 1478 """ 1479 hostA = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) 1480 hostB = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) 1481 peerA = sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM) 1482 1483 self.assertTrue(hostA.getPublicKey().matches(hostB.getPublicKey())) 1484 self.assertFalse(peerA.getPublicKey().matches(hostA.getPublicKey())) 1485 1486 def test_enablingAndDisablingSessions(self): 1487 """ 1488 The enableSessions argument sets the session cache mode; it defaults to 1489 False (at least until https://twistedmatrix.com/trac/ticket/9764 can be 1490 resolved). 1491 """ 1492 options = sslverify.OpenSSLCertificateOptions() 1493 self.assertEqual(options.enableSessions, False) 1494 ctx = options.getContext() 1495 self.assertEqual(ctx.get_session_cache_mode(), SSL.SESS_CACHE_OFF) 1496 options = sslverify.OpenSSLCertificateOptions(enableSessions=True) 1497 self.assertEqual(options.enableSessions, True) 1498 ctx = options.getContext() 1499 self.assertEqual(ctx.get_session_cache_mode(), SSL.SESS_CACHE_SERVER) 1500 1501 def test_certificateOptionsSerialization(self): 1502 """ 1503 Test that __setstate__(__getstate__()) round-trips properly. 1504 """ 1505 firstOpts = sslverify.OpenSSLCertificateOptions( 1506 privateKey=self.sKey, 1507 certificate=self.sCert, 1508 method=SSL.SSLv23_METHOD, 1509 verify=True, 1510 caCerts=[self.sCert], 1511 verifyDepth=2, 1512 requireCertificate=False, 1513 verifyOnce=False, 1514 enableSingleUseKeys=False, 1515 enableSessions=False, 1516 fixBrokenPeers=True, 1517 enableSessionTickets=True, 1518 ) 1519 context = firstOpts.getContext() 1520 self.assertIs(context, firstOpts._context) 1521 self.assertIsNotNone(context) 1522 state = firstOpts.__getstate__() 1523 self.assertNotIn("_context", state) 1524 1525 opts = sslverify.OpenSSLCertificateOptions() 1526 opts.__setstate__(state) 1527 self.assertEqual(opts.privateKey, self.sKey) 1528 self.assertEqual(opts.certificate, self.sCert) 1529 self.assertEqual(opts.method, SSL.SSLv23_METHOD) 1530 self.assertTrue(opts.verify) 1531 self.assertEqual(opts.caCerts, [self.sCert]) 1532 self.assertEqual(opts.verifyDepth, 2) 1533 self.assertFalse(opts.requireCertificate) 1534 self.assertFalse(opts.verifyOnce) 1535 self.assertFalse(opts.enableSingleUseKeys) 1536 self.assertFalse(opts.enableSessions) 1537 self.assertTrue(opts.fixBrokenPeers) 1538 self.assertTrue(opts.enableSessionTickets) 1539 1540 test_certificateOptionsSerialization.suppress = [ # type: ignore[attr-defined] 1541 util.suppress( 1542 category=DeprecationWarning, 1543 message=r"twisted\.internet\._sslverify\.*__[gs]etstate__", 1544 ) 1545 ] 1546 1547 def test_certificateOptionsSessionTickets(self): 1548 """ 1549 Enabling session tickets should not set the OP_NO_TICKET option. 1550 """ 1551 opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=True) 1552 ctx = opts.getContext() 1553 self.assertEqual(0, ctx.set_options(0) & 0x00004000) 1554 1555 def test_certificateOptionsSessionTicketsDisabled(self): 1556 """ 1557 Enabling session tickets should set the OP_NO_TICKET option. 1558 """ 1559 opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=False) 1560 ctx = opts.getContext() 1561 self.assertEqual(0x00004000, ctx.set_options(0) & 0x00004000) 1562 1563 def test_allowedAnonymousClientConnection(self): 1564 """ 1565 Check that anonymous connections are allowed when certificates aren't 1566 required on the server. 1567 """ 1568 onData = defer.Deferred() 1569 self.loopback( 1570 sslverify.OpenSSLCertificateOptions( 1571 privateKey=self.sKey, certificate=self.sCert, requireCertificate=False 1572 ), 1573 sslverify.OpenSSLCertificateOptions(requireCertificate=False), 1574 onData=onData, 1575 ) 1576 1577 return onData.addCallback( 1578 lambda result: self.assertEqual(result, WritingProtocol.byte) 1579 ) 1580 1581 def test_refusedAnonymousClientConnection(self): 1582 """ 1583 Check that anonymous connections are refused when certificates are 1584 required on the server. 1585 """ 1586 onServerLost = defer.Deferred() 1587 onClientLost = defer.Deferred() 1588 self.loopback( 1589 sslverify.OpenSSLCertificateOptions( 1590 privateKey=self.sKey, 1591 certificate=self.sCert, 1592 verify=True, 1593 caCerts=[self.sCert], 1594 requireCertificate=True, 1595 ), 1596 sslverify.OpenSSLCertificateOptions(requireCertificate=False), 1597 onServerLost=onServerLost, 1598 onClientLost=onClientLost, 1599 ) 1600 1601 d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True) 1602 1603 def afterLost(result): 1604 ((cSuccess, cResult), (sSuccess, sResult)) = result 1605 self.assertFalse(cSuccess) 1606 self.assertFalse(sSuccess) 1607 # Win32 fails to report the SSL Error, and report a connection lost 1608 # instead: there is a race condition so that's not totally 1609 # surprising (see ticket #2877 in the tracker) 1610 self.assertIsInstance(cResult.value, (SSL.Error, ConnectionLost)) 1611 self.assertIsInstance(sResult.value, SSL.Error) 1612 1613 return d.addCallback(afterLost) 1614 1615 def test_failedCertificateVerification(self): 1616 """ 1617 Check that connecting with a certificate not accepted by the server CA 1618 fails. 1619 """ 1620 onServerLost = defer.Deferred() 1621 onClientLost = defer.Deferred() 1622 self.loopback( 1623 sslverify.OpenSSLCertificateOptions( 1624 privateKey=self.sKey, 1625 certificate=self.sCert, 1626 verify=False, 1627 requireCertificate=False, 1628 ), 1629 sslverify.OpenSSLCertificateOptions( 1630 verify=True, requireCertificate=False, caCerts=[self.cCert] 1631 ), 1632 onServerLost=onServerLost, 1633 onClientLost=onClientLost, 1634 ) 1635 1636 d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True) 1637 1638 def afterLost(result): 1639 ((cSuccess, cResult), (sSuccess, sResult)) = result 1640 self.assertFalse(cSuccess) 1641 self.assertFalse(sSuccess) 1642 1643 return d.addCallback(afterLost) 1644 1645 def test_successfulCertificateVerification(self): 1646 """ 1647 Test a successful connection with client certificate validation on 1648 server side. 1649 """ 1650 onData = defer.Deferred() 1651 self.loopback( 1652 sslverify.OpenSSLCertificateOptions( 1653 privateKey=self.sKey, 1654 certificate=self.sCert, 1655 verify=False, 1656 requireCertificate=False, 1657 ), 1658 sslverify.OpenSSLCertificateOptions( 1659 verify=True, requireCertificate=True, caCerts=[self.sCert] 1660 ), 1661 onData=onData, 1662 ) 1663 1664 return onData.addCallback( 1665 lambda result: self.assertEqual(result, WritingProtocol.byte) 1666 ) 1667 1668 def test_successfulSymmetricSelfSignedCertificateVerification(self): 1669 """ 1670 Test a successful connection with validation on both server and client 1671 sides. 1672 """ 1673 onData = defer.Deferred() 1674 self.loopback( 1675 sslverify.OpenSSLCertificateOptions( 1676 privateKey=self.sKey, 1677 certificate=self.sCert, 1678 verify=True, 1679 requireCertificate=True, 1680 caCerts=[self.cCert], 1681 ), 1682 sslverify.OpenSSLCertificateOptions( 1683 privateKey=self.cKey, 1684 certificate=self.cCert, 1685 verify=True, 1686 requireCertificate=True, 1687 caCerts=[self.sCert], 1688 ), 1689 onData=onData, 1690 ) 1691 1692 return onData.addCallback( 1693 lambda result: self.assertEqual(result, WritingProtocol.byte) 1694 ) 1695 1696 def test_verification(self): 1697 """ 1698 Check certificates verification building custom certificates data. 1699 """ 1700 clientDN = sslverify.DistinguishedName(commonName="client") 1701 clientKey = sslverify.KeyPair.generate() 1702 clientCertReq = clientKey.certificateRequest(clientDN) 1703 1704 serverDN = sslverify.DistinguishedName(commonName="server") 1705 serverKey = sslverify.KeyPair.generate() 1706 serverCertReq = serverKey.certificateRequest(serverDN) 1707 1708 clientSelfCertReq = clientKey.certificateRequest(clientDN) 1709 clientSelfCertData = clientKey.signCertificateRequest( 1710 clientDN, clientSelfCertReq, lambda dn: True, 132 1711 ) 1712 clientSelfCert = clientKey.newCertificate(clientSelfCertData) 1713 1714 serverSelfCertReq = serverKey.certificateRequest(serverDN) 1715 serverSelfCertData = serverKey.signCertificateRequest( 1716 serverDN, serverSelfCertReq, lambda dn: True, 516 1717 ) 1718 serverSelfCert = serverKey.newCertificate(serverSelfCertData) 1719 1720 clientCertData = serverKey.signCertificateRequest( 1721 serverDN, clientCertReq, lambda dn: True, 7 1722 ) 1723 clientCert = clientKey.newCertificate(clientCertData) 1724 1725 serverCertData = clientKey.signCertificateRequest( 1726 clientDN, serverCertReq, lambda dn: True, 42 1727 ) 1728 serverCert = serverKey.newCertificate(serverCertData) 1729 1730 onData = defer.Deferred() 1731 1732 serverOpts = serverCert.options(serverSelfCert) 1733 clientOpts = clientCert.options(clientSelfCert) 1734 1735 self.loopback(serverOpts, clientOpts, onData=onData) 1736 1737 return onData.addCallback( 1738 lambda result: self.assertEqual(result, WritingProtocol.byte) 1739 ) 1740 1741 1742class OpenSSLOptionsECDHIntegrationTests(OpenSSLOptionsTestsMixin, TestCase): 1743 """ 1744 ECDH-related integration tests for L{OpenSSLOptions}. 1745 """ 1746 1747 def test_ellipticCurveDiffieHellman(self): 1748 """ 1749 Connections use ECDH when OpenSSL supports it. 1750 """ 1751 if not get_elliptic_curves(): 1752 raise SkipTest("OpenSSL does not support ECDH.") 1753 1754 onData = defer.Deferred() 1755 # TLS 1.3 cipher suites do not specify the key exchange 1756 # mechanism: 1757 # https://wiki.openssl.org/index.php/TLS1.3#Differences_with_TLS1.2_and_below 1758 # 1759 # and OpenSSL only supports ECHDE groups with TLS 1.3: 1760 # https://wiki.openssl.org/index.php/TLS1.3#Groups 1761 # 1762 # so TLS 1.3 implies ECDHE. Force this test to use TLS 1.3 to 1763 # ensure ECDH is selected when it might not be. 1764 self.loopback( 1765 sslverify.OpenSSLCertificateOptions( 1766 privateKey=self.sKey, 1767 certificate=self.sCert, 1768 requireCertificate=False, 1769 lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_3, 1770 ), 1771 sslverify.OpenSSLCertificateOptions( 1772 requireCertificate=False, 1773 lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_3, 1774 ), 1775 onData=onData, 1776 ) 1777 1778 @onData.addCallback 1779 def assertECDH(_): 1780 self.assertEqual(len(self.clientConn.factory.protocols), 1) 1781 [clientProtocol] = self.clientConn.factory.protocols 1782 cipher = clientProtocol.getHandle().get_cipher_name() 1783 self.assertIn("ECDH", cipher) 1784 1785 return onData 1786 1787 1788class DeprecationTests(SynchronousTestCase): 1789 """ 1790 Tests for deprecation of L{sslverify.OpenSSLCertificateOptions}'s support 1791 of the pickle protocol. 1792 """ 1793 1794 if skipSSL: 1795 skip = skipSSL 1796 1797 def test_getstateDeprecation(self): 1798 """ 1799 L{sslverify.OpenSSLCertificateOptions.__getstate__} is deprecated. 1800 """ 1801 self.callDeprecated( 1802 (Version("Twisted", 15, 0, 0), "a real persistence system"), 1803 sslverify.OpenSSLCertificateOptions().__getstate__, 1804 ) 1805 1806 def test_setstateDeprecation(self): 1807 """ 1808 L{sslverify.OpenSSLCertificateOptions.__setstate__} is deprecated. 1809 """ 1810 self.callDeprecated( 1811 (Version("Twisted", 15, 0, 0), "a real persistence system"), 1812 sslverify.OpenSSLCertificateOptions().__setstate__, 1813 {}, 1814 ) 1815 1816 1817class TrustRootTests(TestCase): 1818 """ 1819 Tests for L{sslverify.OpenSSLCertificateOptions}' C{trustRoot} argument, 1820 L{sslverify.platformTrust}, and their interactions. 1821 """ 1822 1823 if skipSSL: 1824 skip = skipSSL 1825 1826 def setUp(self): 1827 """ 1828 Patch L{sslverify._ChooseDiffieHellmanEllipticCurve}. 1829 """ 1830 self.patch( 1831 sslverify, 1832 "_ChooseDiffieHellmanEllipticCurve", 1833 FakeChooseDiffieHellmanEllipticCurve, 1834 ) 1835 1836 def test_caCertsPlatformDefaults(self): 1837 """ 1838 Specifying a C{trustRoot} of L{sslverify.OpenSSLDefaultPaths} when 1839 initializing L{sslverify.OpenSSLCertificateOptions} loads the 1840 platform-provided trusted certificates via C{set_default_verify_paths}. 1841 """ 1842 opts = sslverify.OpenSSLCertificateOptions( 1843 trustRoot=sslverify.OpenSSLDefaultPaths(), 1844 ) 1845 fc = FakeContext(SSL.TLSv1_METHOD) 1846 opts._contextFactory = lambda method: fc 1847 opts.getContext() 1848 self.assertTrue(fc._defaultVerifyPathsSet) 1849 1850 def test_trustRootPlatformRejectsUntrustedCA(self): 1851 """ 1852 Specifying a C{trustRoot} of L{platformTrust} when initializing 1853 L{sslverify.OpenSSLCertificateOptions} causes certificates issued by a 1854 newly created CA to be rejected by an SSL connection using these 1855 options. 1856 1857 Note that this test should I{always} pass, even on platforms where the 1858 CA certificates are not installed, as long as L{platformTrust} rejects 1859 completely invalid / unknown root CA certificates. This is simply a 1860 smoke test to make sure that verification is happening at all. 1861 """ 1862 caSelfCert, serverCert = certificatesForAuthorityAndServer() 1863 chainedCert = pathContainingDumpOf(self, serverCert, caSelfCert) 1864 privateKey = pathContainingDumpOf(self, serverCert.privateKey) 1865 1866 sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnection( 1867 trustRoot=platformTrust(), 1868 privateKeyFile=privateKey, 1869 chainedCertFile=chainedCert, 1870 ) 1871 # No data was received. 1872 self.assertEqual(cWrapped.data, b"") 1873 1874 # It was an L{SSL.Error}. 1875 self.assertEqual(cWrapped.lostReason.type, SSL.Error) 1876 1877 # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors. 1878 err = cWrapped.lostReason.value 1879 self.assertEqual(err.args[0][0][2], "tlsv1 alert unknown ca") 1880 1881 def test_trustRootSpecificCertificate(self): 1882 """ 1883 Specifying a L{Certificate} object for L{trustRoot} will result in that 1884 certificate being the only trust root for a client. 1885 """ 1886 caCert, serverCert = certificatesForAuthorityAndServer() 1887 otherCa, otherServer = certificatesForAuthorityAndServer() 1888 sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnection( 1889 trustRoot=caCert, 1890 privateKeyFile=pathContainingDumpOf(self, serverCert.privateKey), 1891 chainedCertFile=pathContainingDumpOf(self, serverCert), 1892 ) 1893 pump.flush() 1894 self.assertIsNone(cWrapped.lostReason) 1895 self.assertEqual(cWrapped.data, sWrapped.greeting) 1896 1897 1898class ServiceIdentityTests(SynchronousTestCase): 1899 """ 1900 Tests for the verification of the peer's service's identity via the 1901 C{hostname} argument to L{sslverify.OpenSSLCertificateOptions}. 1902 """ 1903 1904 if skipSSL: 1905 skip = skipSSL 1906 1907 def serviceIdentitySetup( 1908 self, 1909 clientHostname, 1910 serverHostname, 1911 serverContextSetup=lambda ctx: None, 1912 validCertificate=True, 1913 clientPresentsCertificate=False, 1914 validClientCertificate=True, 1915 serverVerifies=False, 1916 buggyInfoCallback=False, 1917 fakePlatformTrust=False, 1918 useDefaultTrust=False, 1919 ): 1920 """ 1921 Connect a server and a client. 1922 1923 @param clientHostname: The I{client's idea} of the server's hostname; 1924 passed as the C{hostname} to the 1925 L{sslverify.OpenSSLCertificateOptions} instance. 1926 @type clientHostname: L{unicode} 1927 1928 @param serverHostname: The I{server's own idea} of the server's 1929 hostname; present in the certificate presented by the server. 1930 @type serverHostname: L{unicode} 1931 1932 @param serverContextSetup: a 1-argument callable invoked with the 1933 L{OpenSSL.SSL.Context} after it's produced. 1934 @type serverContextSetup: L{callable} taking L{OpenSSL.SSL.Context} 1935 returning L{None}. 1936 1937 @param validCertificate: Is the server's certificate valid? L{True} if 1938 so, L{False} otherwise. 1939 @type validCertificate: L{bool} 1940 1941 @param clientPresentsCertificate: Should the client present a 1942 certificate to the server? Defaults to 'no'. 1943 @type clientPresentsCertificate: L{bool} 1944 1945 @param validClientCertificate: If the client presents a certificate, 1946 should it actually be a valid one, i.e. signed by the same CA that 1947 the server is checking? Defaults to 'yes'. 1948 @type validClientCertificate: L{bool} 1949 1950 @param serverVerifies: Should the server verify the client's 1951 certificate? Defaults to 'no'. 1952 @type serverVerifies: L{bool} 1953 1954 @param buggyInfoCallback: Should we patch the implementation so that 1955 the C{info_callback} passed to OpenSSL to have a bug and raise an 1956 exception (L{ZeroDivisionError})? Defaults to 'no'. 1957 @type buggyInfoCallback: L{bool} 1958 1959 @param fakePlatformTrust: Should we fake the platformTrust to be the 1960 same as our fake server certificate authority, so that we can test 1961 it's being used? Defaults to 'no' and we just pass platform trust. 1962 @type fakePlatformTrust: L{bool} 1963 1964 @param useDefaultTrust: Should we avoid passing the C{trustRoot} to 1965 L{ssl.optionsForClientTLS}? Defaults to 'no'. 1966 @type useDefaultTrust: L{bool} 1967 1968 @return: the client TLS protocol, the client wrapped protocol, 1969 the server TLS protocol, the server wrapped protocol and 1970 an L{IOPump} which, when its C{pump} and C{flush} methods are 1971 called, will move data between the created client and server 1972 protocol instances 1973 @rtype: 5-L{tuple} of 4 L{IProtocol}s and L{IOPump} 1974 """ 1975 serverCA, serverCert = certificatesForAuthorityAndServer(serverHostname) 1976 other = {} 1977 passClientCert = None 1978 clientCA, clientCert = certificatesForAuthorityAndServer("client") 1979 if serverVerifies: 1980 other.update(trustRoot=clientCA) 1981 1982 if clientPresentsCertificate: 1983 if validClientCertificate: 1984 passClientCert = clientCert 1985 else: 1986 bogusCA, bogus = certificatesForAuthorityAndServer("client") 1987 passClientCert = bogus 1988 1989 serverOpts = sslverify.OpenSSLCertificateOptions( 1990 privateKey=serverCert.privateKey.original, 1991 certificate=serverCert.original, 1992 **other, 1993 ) 1994 serverContextSetup(serverOpts.getContext()) 1995 if not validCertificate: 1996 serverCA, otherServer = certificatesForAuthorityAndServer(serverHostname) 1997 if buggyInfoCallback: 1998 1999 def broken(*a, **k): 2000 """ 2001 Raise an exception. 2002 2003 @param a: Arguments for an C{info_callback} 2004 2005 @param k: Keyword arguments for an C{info_callback} 2006 """ 2007 1 / 0 2008 2009 self.patch( 2010 sslverify.ClientTLSOptions, 2011 "_identityVerifyingInfoCallback", 2012 broken, 2013 ) 2014 2015 signature = {"hostname": clientHostname} 2016 if passClientCert: 2017 signature.update(clientCertificate=passClientCert) 2018 if not useDefaultTrust: 2019 signature.update(trustRoot=serverCA) 2020 if fakePlatformTrust: 2021 self.patch(sslverify, "platformTrust", lambda: serverCA) 2022 2023 clientOpts = sslverify.optionsForClientTLS(**signature) 2024 2025 class GreetingServer(protocol.Protocol): 2026 greeting = b"greetings!" 2027 lostReason = None 2028 data = b"" 2029 2030 def connectionMade(self): 2031 self.transport.write(self.greeting) 2032 2033 def dataReceived(self, data): 2034 self.data += data 2035 2036 def connectionLost(self, reason): 2037 self.lostReason = reason 2038 2039 class GreetingClient(protocol.Protocol): 2040 greeting = b"cheerio!" 2041 data = b"" 2042 lostReason = None 2043 2044 def connectionMade(self): 2045 self.transport.write(self.greeting) 2046 2047 def dataReceived(self, data): 2048 self.data += data 2049 2050 def connectionLost(self, reason): 2051 self.lostReason = reason 2052 2053 serverWrappedProto = GreetingServer() 2054 clientWrappedProto = GreetingClient() 2055 2056 clientFactory = protocol.Factory() 2057 clientFactory.protocol = lambda: clientWrappedProto 2058 serverFactory = protocol.Factory() 2059 serverFactory.protocol = lambda: serverWrappedProto 2060 2061 self.serverOpts = serverOpts 2062 self.clientOpts = clientOpts 2063 2064 clientTLSFactory = TLSMemoryBIOFactory( 2065 clientOpts, isClient=True, wrappedFactory=clientFactory 2066 ) 2067 serverTLSFactory = TLSMemoryBIOFactory( 2068 serverOpts, isClient=False, wrappedFactory=serverFactory 2069 ) 2070 2071 cProto, sProto, pump = connectedServerAndClient( 2072 lambda: serverTLSFactory.buildProtocol(None), 2073 lambda: clientTLSFactory.buildProtocol(None), 2074 ) 2075 return cProto, sProto, clientWrappedProto, serverWrappedProto, pump 2076 2077 def test_invalidHostname(self): 2078 """ 2079 When a certificate containing an invalid hostname is received from the 2080 server, the connection is immediately dropped. 2081 """ 2082 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2083 "wrong-host.example.com", 2084 "correct-host.example.com", 2085 ) 2086 self.assertEqual(cWrapped.data, b"") 2087 self.assertEqual(sWrapped.data, b"") 2088 2089 cErr = cWrapped.lostReason.value 2090 sErr = sWrapped.lostReason.value 2091 2092 self.assertIsInstance(cErr, VerificationError) 2093 self.assertIsInstance(sErr, ConnectionClosed) 2094 2095 def test_validHostname(self): 2096 """ 2097 Whenever a valid certificate containing a valid hostname is received, 2098 connection proceeds normally. 2099 """ 2100 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2101 "valid.example.com", 2102 "valid.example.com", 2103 ) 2104 self.assertEqual(cWrapped.data, b"greetings!") 2105 2106 cErr = cWrapped.lostReason 2107 sErr = sWrapped.lostReason 2108 self.assertIsNone(cErr) 2109 self.assertIsNone(sErr) 2110 2111 def test_validHostnameInvalidCertificate(self): 2112 """ 2113 When an invalid certificate containing a perfectly valid hostname is 2114 received, the connection is aborted with an OpenSSL error. 2115 """ 2116 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2117 "valid.example.com", 2118 "valid.example.com", 2119 validCertificate=False, 2120 ) 2121 2122 self.assertEqual(cWrapped.data, b"") 2123 self.assertEqual(sWrapped.data, b"") 2124 2125 cErr = cWrapped.lostReason.value 2126 sErr = sWrapped.lostReason.value 2127 2128 self.assertIsInstance(cErr, SSL.Error) 2129 self.assertIsInstance(sErr, SSL.Error) 2130 2131 def test_realCAsBetterNotSignOurBogusTestCerts(self): 2132 """ 2133 If we use the default trust from the platform, our dinky certificate 2134 should I{really} fail. 2135 """ 2136 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2137 "valid.example.com", 2138 "valid.example.com", 2139 validCertificate=False, 2140 useDefaultTrust=True, 2141 ) 2142 2143 self.assertEqual(cWrapped.data, b"") 2144 self.assertEqual(sWrapped.data, b"") 2145 2146 cErr = cWrapped.lostReason.value 2147 sErr = sWrapped.lostReason.value 2148 2149 self.assertIsInstance(cErr, SSL.Error) 2150 self.assertIsInstance(sErr, SSL.Error) 2151 2152 def test_butIfTheyDidItWouldWork(self): 2153 """ 2154 L{ssl.optionsForClientTLS} should be using L{ssl.platformTrust} by 2155 default, so if we fake that out then it should trust ourselves again. 2156 """ 2157 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2158 "valid.example.com", 2159 "valid.example.com", 2160 useDefaultTrust=True, 2161 fakePlatformTrust=True, 2162 ) 2163 self.assertEqual(cWrapped.data, b"greetings!") 2164 2165 cErr = cWrapped.lostReason 2166 sErr = sWrapped.lostReason 2167 self.assertIsNone(cErr) 2168 self.assertIsNone(sErr) 2169 2170 def test_clientPresentsCertificate(self): 2171 """ 2172 When the server verifies and the client presents a valid certificate 2173 for that verification by passing it to 2174 L{sslverify.optionsForClientTLS}, communication proceeds. 2175 """ 2176 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2177 "valid.example.com", 2178 "valid.example.com", 2179 validCertificate=True, 2180 serverVerifies=True, 2181 clientPresentsCertificate=True, 2182 ) 2183 2184 self.assertEqual(cWrapped.data, b"greetings!") 2185 2186 cErr = cWrapped.lostReason 2187 sErr = sWrapped.lostReason 2188 self.assertIsNone(cErr) 2189 self.assertIsNone(sErr) 2190 2191 def test_clientPresentsBadCertificate(self): 2192 """ 2193 When the server verifies and the client presents an invalid certificate 2194 for that verification by passing it to 2195 L{sslverify.optionsForClientTLS}, the connection cannot be established 2196 with an SSL error. 2197 """ 2198 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2199 "valid.example.com", 2200 "valid.example.com", 2201 validCertificate=True, 2202 serverVerifies=True, 2203 validClientCertificate=False, 2204 clientPresentsCertificate=True, 2205 ) 2206 2207 self.assertEqual(cWrapped.data, b"") 2208 2209 cErr = cWrapped.lostReason.value 2210 sErr = sWrapped.lostReason.value 2211 2212 self.assertIsInstance(cErr, SSL.Error) 2213 self.assertIsInstance(sErr, SSL.Error) 2214 2215 @skipIf(skipSNI, skipSNI) 2216 def test_hostnameIsIndicated(self): 2217 """ 2218 Specifying the C{hostname} argument to L{CertificateOptions} also sets 2219 the U{Server Name Extension 2220 <https://en.wikipedia.org/wiki/Server_Name_Indication>} TLS indication 2221 field to the correct value. 2222 """ 2223 names = [] 2224 2225 def setupServerContext(ctx): 2226 def servername_received(conn): 2227 names.append(conn.get_servername().decode("ascii")) 2228 2229 ctx.set_tlsext_servername_callback(servername_received) 2230 2231 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2232 "valid.example.com", "valid.example.com", setupServerContext 2233 ) 2234 self.assertEqual(names, ["valid.example.com"]) 2235 2236 @skipIf(skipSNI, skipSNI) 2237 def test_hostnameEncoding(self): 2238 """ 2239 Hostnames are encoded as IDNA. 2240 """ 2241 names = [] 2242 hello = "h\N{LATIN SMALL LETTER A WITH ACUTE}llo.example.com" 2243 2244 def setupServerContext(ctx): 2245 def servername_received(conn): 2246 serverIDNA = _idnaText(conn.get_servername()) 2247 names.append(serverIDNA) 2248 2249 ctx.set_tlsext_servername_callback(servername_received) 2250 2251 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2252 hello, hello, setupServerContext 2253 ) 2254 self.assertEqual(names, [hello]) 2255 self.assertEqual(cWrapped.data, b"greetings!") 2256 2257 cErr = cWrapped.lostReason 2258 sErr = sWrapped.lostReason 2259 self.assertIsNone(cErr) 2260 self.assertIsNone(sErr) 2261 2262 def test_fallback(self): 2263 """ 2264 L{sslverify.simpleVerifyHostname} checks string equality on the 2265 commonName of a connection's certificate's subject, doing nothing if it 2266 matches and raising L{VerificationError} if it doesn't. 2267 """ 2268 name = "something.example.com" 2269 2270 class Connection: 2271 def get_peer_certificate(self): 2272 """ 2273 Fake of L{OpenSSL.SSL.Connection.get_peer_certificate}. 2274 2275 @return: A certificate with a known common name. 2276 @rtype: L{OpenSSL.crypto.X509} 2277 """ 2278 cert = X509() 2279 cert.get_subject().commonName = name 2280 return cert 2281 2282 conn = Connection() 2283 self.assertIs( 2284 sslverify.simpleVerifyHostname(conn, "something.example.com"), None 2285 ) 2286 self.assertRaises( 2287 sslverify.SimpleVerificationError, 2288 sslverify.simpleVerifyHostname, 2289 conn, 2290 "nonsense", 2291 ) 2292 2293 def test_surpriseFromInfoCallback(self): 2294 """ 2295 pyOpenSSL isn't always so great about reporting errors. If one occurs 2296 in the verification info callback, it should be logged and the 2297 connection should be shut down (if possible, anyway; the app_data could 2298 be clobbered but there's no point testing for that). 2299 """ 2300 cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( 2301 "correct-host.example.com", 2302 "correct-host.example.com", 2303 buggyInfoCallback=True, 2304 ) 2305 2306 self.assertEqual(cWrapped.data, b"") 2307 self.assertEqual(sWrapped.data, b"") 2308 2309 cErr = cWrapped.lostReason.value 2310 sErr = sWrapped.lostReason.value 2311 2312 self.assertIsInstance(cErr, ZeroDivisionError) 2313 self.assertIsInstance(sErr, (ConnectionClosed, SSL.Error)) 2314 errors = self.flushLoggedErrors(ZeroDivisionError) 2315 self.assertTrue(errors) 2316 2317 2318def negotiateProtocol(serverProtocols, clientProtocols, clientOptions=None): 2319 """ 2320 Create the TLS connection and negotiate a next protocol. 2321 2322 @param serverProtocols: The protocols the server is willing to negotiate. 2323 @param clientProtocols: The protocols the client is willing to negotiate. 2324 @param clientOptions: The type of C{OpenSSLCertificateOptions} class to 2325 use for the client. Defaults to C{OpenSSLCertificateOptions}. 2326 @return: A L{tuple} of the negotiated protocol and the reason the 2327 connection was lost. 2328 """ 2329 caCertificate, serverCertificate = certificatesForAuthorityAndServer() 2330 trustRoot = sslverify.OpenSSLCertificateAuthorities( 2331 [ 2332 caCertificate.original, 2333 ] 2334 ) 2335 2336 sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnectionInMemory( 2337 trustRoot=trustRoot, 2338 privateKey=serverCertificate.privateKey.original, 2339 serverCertificate=serverCertificate.original, 2340 clientProtocols=clientProtocols, 2341 serverProtocols=serverProtocols, 2342 clientOptions=clientOptions, 2343 ) 2344 pump.flush() 2345 2346 return (cProto.negotiatedProtocol, cWrapped.lostReason) 2347 2348 2349class NPNOrALPNTests(TestCase): 2350 """ 2351 NPN and ALPN protocol selection. 2352 2353 These tests only run on platforms that have a PyOpenSSL version >= 0.15, 2354 and OpenSSL version 1.0.1 or later. 2355 """ 2356 2357 if skipSSL: 2358 skip = skipSSL 2359 elif skipNPN: 2360 skip = skipNPN 2361 2362 def test_nextProtocolMechanismsNPNIsSupported(self): 2363 """ 2364 When at least NPN is available on the platform, NPN is in the set of 2365 supported negotiation protocols. 2366 """ 2367 supportedProtocols = sslverify.protocolNegotiationMechanisms() 2368 self.assertTrue(sslverify.ProtocolNegotiationSupport.NPN in supportedProtocols) 2369 2370 def test_NPNAndALPNSuccess(self): 2371 """ 2372 When both ALPN and NPN are used, and both the client and server have 2373 overlapping protocol choices, a protocol is successfully negotiated. 2374 Further, the negotiated protocol is the first one in the list. 2375 """ 2376 protocols = [b"h2", b"http/1.1"] 2377 negotiatedProtocol, lostReason = negotiateProtocol( 2378 clientProtocols=protocols, 2379 serverProtocols=protocols, 2380 ) 2381 self.assertEqual(negotiatedProtocol, b"h2") 2382 self.assertIsNone(lostReason) 2383 2384 def test_NPNAndALPNDifferent(self): 2385 """ 2386 Client and server have different protocol lists: only the common 2387 element is chosen. 2388 """ 2389 serverProtocols = [b"h2", b"http/1.1", b"spdy/2"] 2390 clientProtocols = [b"spdy/3", b"http/1.1"] 2391 negotiatedProtocol, lostReason = negotiateProtocol( 2392 clientProtocols=clientProtocols, 2393 serverProtocols=serverProtocols, 2394 ) 2395 self.assertEqual(negotiatedProtocol, b"http/1.1") 2396 self.assertIsNone(lostReason) 2397 2398 def test_NPNAndALPNNoAdvertise(self): 2399 """ 2400 When one peer does not advertise any protocols, the connection is set 2401 up with no next protocol. 2402 """ 2403 protocols = [b"h2", b"http/1.1"] 2404 negotiatedProtocol, lostReason = negotiateProtocol( 2405 clientProtocols=protocols, 2406 serverProtocols=[], 2407 ) 2408 self.assertIsNone(negotiatedProtocol) 2409 self.assertIsNone(lostReason) 2410 2411 def test_NPNAndALPNNoOverlap(self): 2412 """ 2413 When the client and server have no overlap of protocols, the connection 2414 fails. 2415 """ 2416 clientProtocols = [b"h2", b"http/1.1"] 2417 serverProtocols = [b"spdy/3"] 2418 negotiatedProtocol, lostReason = negotiateProtocol( 2419 serverProtocols=clientProtocols, 2420 clientProtocols=serverProtocols, 2421 ) 2422 self.assertIsNone(negotiatedProtocol) 2423 self.assertEqual(lostReason.type, SSL.Error) 2424 2425 2426class ALPNTests(TestCase): 2427 """ 2428 ALPN protocol selection. 2429 2430 These tests only run on platforms that have a PyOpenSSL version >= 0.15, 2431 and OpenSSL version 1.0.2 or later. 2432 2433 This covers only the ALPN specific logic, as any platform that has ALPN 2434 will also have NPN and so will run the NPNAndALPNTest suite as well. 2435 """ 2436 2437 if skipSSL: 2438 skip = skipSSL 2439 elif skipALPN: 2440 skip = skipALPN 2441 2442 def test_nextProtocolMechanismsALPNIsSupported(self): 2443 """ 2444 When ALPN is available on a platform, protocolNegotiationMechanisms 2445 includes ALPN in the suported protocols. 2446 """ 2447 supportedProtocols = sslverify.protocolNegotiationMechanisms() 2448 self.assertTrue(sslverify.ProtocolNegotiationSupport.ALPN in supportedProtocols) 2449 2450 2451class NPNAndALPNAbsentTests(TestCase): 2452 """ 2453 NPN/ALPN operations fail on platforms that do not support them. 2454 2455 These tests only run on platforms that have a PyOpenSSL version < 0.15, 2456 an OpenSSL version earlier than 1.0.1, or an OpenSSL/cryptography built 2457 without NPN support. 2458 """ 2459 2460 if skipSSL: 2461 skip = skipSSL 2462 elif not skipNPN or not skipALPN: 2463 skip = "NPN and/or ALPN is present on this platform" 2464 2465 def test_nextProtocolMechanismsNoNegotiationSupported(self): 2466 """ 2467 When neither NPN or ALPN are available on a platform, there are no 2468 supported negotiation protocols. 2469 """ 2470 supportedProtocols = sslverify.protocolNegotiationMechanisms() 2471 self.assertFalse(supportedProtocols) 2472 2473 def test_NPNAndALPNNotImplemented(self): 2474 """ 2475 A NotImplementedError is raised when using acceptableProtocols on a 2476 platform that does not support either NPN or ALPN. 2477 """ 2478 protocols = [b"h2", b"http/1.1"] 2479 self.assertRaises( 2480 NotImplementedError, 2481 negotiateProtocol, 2482 serverProtocols=protocols, 2483 clientProtocols=protocols, 2484 ) 2485 2486 def test_NegotiatedProtocolReturnsNone(self): 2487 """ 2488 negotiatedProtocol return L{None} even when NPN/ALPN aren't supported. 2489 This works because, as neither are supported, negotiation isn't even 2490 attempted. 2491 """ 2492 serverProtocols = None 2493 clientProtocols = None 2494 negotiatedProtocol, lostReason = negotiateProtocol( 2495 clientProtocols=clientProtocols, 2496 serverProtocols=serverProtocols, 2497 ) 2498 self.assertIsNone(negotiatedProtocol) 2499 self.assertIsNone(lostReason) 2500 2501 2502class _NotSSLTransport: 2503 def getHandle(self): 2504 return self 2505 2506 2507class _MaybeSSLTransport: 2508 def getHandle(self): 2509 return self 2510 2511 def get_peer_certificate(self): 2512 return None 2513 2514 def get_host_certificate(self): 2515 return None 2516 2517 2518class _ActualSSLTransport: 2519 def getHandle(self): 2520 return self 2521 2522 def get_host_certificate(self): 2523 return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original 2524 2525 def get_peer_certificate(self): 2526 return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original 2527 2528 2529class ConstructorsTests(TestCase): 2530 if skipSSL: 2531 skip = skipSSL 2532 2533 def test_peerFromNonSSLTransport(self): 2534 """ 2535 Verify that peerFromTransport raises an exception if the transport 2536 passed is not actually an SSL transport. 2537 """ 2538 x = self.assertRaises( 2539 CertificateError, 2540 sslverify.Certificate.peerFromTransport, 2541 _NotSSLTransport(), 2542 ) 2543 self.assertTrue(str(x).startswith("non-TLS")) 2544 2545 def test_peerFromBlankSSLTransport(self): 2546 """ 2547 Verify that peerFromTransport raises an exception if the transport 2548 passed is an SSL transport, but doesn't have a peer certificate. 2549 """ 2550 x = self.assertRaises( 2551 CertificateError, 2552 sslverify.Certificate.peerFromTransport, 2553 _MaybeSSLTransport(), 2554 ) 2555 self.assertTrue(str(x).startswith("TLS")) 2556 2557 def test_hostFromNonSSLTransport(self): 2558 """ 2559 Verify that hostFromTransport raises an exception if the transport 2560 passed is not actually an SSL transport. 2561 """ 2562 x = self.assertRaises( 2563 CertificateError, 2564 sslverify.Certificate.hostFromTransport, 2565 _NotSSLTransport(), 2566 ) 2567 self.assertTrue(str(x).startswith("non-TLS")) 2568 2569 def test_hostFromBlankSSLTransport(self): 2570 """ 2571 Verify that hostFromTransport raises an exception if the transport 2572 passed is an SSL transport, but doesn't have a host certificate. 2573 """ 2574 x = self.assertRaises( 2575 CertificateError, 2576 sslverify.Certificate.hostFromTransport, 2577 _MaybeSSLTransport(), 2578 ) 2579 self.assertTrue(str(x).startswith("TLS")) 2580 2581 def test_hostFromSSLTransport(self): 2582 """ 2583 Verify that hostFromTransport successfully creates the correct 2584 certificate if passed a valid SSL transport. 2585 """ 2586 self.assertEqual( 2587 sslverify.Certificate.hostFromTransport( 2588 _ActualSSLTransport() 2589 ).serialNumber(), 2590 12345, 2591 ) 2592 2593 def test_peerFromSSLTransport(self): 2594 """ 2595 Verify that peerFromTransport successfully creates the correct 2596 certificate if passed a valid SSL transport. 2597 """ 2598 self.assertEqual( 2599 sslverify.Certificate.peerFromTransport( 2600 _ActualSSLTransport() 2601 ).serialNumber(), 2602 12346, 2603 ) 2604 2605 2606class MultipleCertificateTrustRootTests(TestCase): 2607 """ 2608 Test the behavior of the trustRootFromCertificates() API call. 2609 """ 2610 2611 if skipSSL: 2612 skip = skipSSL 2613 2614 def test_trustRootFromCertificatesPrivatePublic(self): 2615 """ 2616 L{trustRootFromCertificates} accepts either a L{sslverify.Certificate} 2617 or a L{sslverify.PrivateCertificate} instance. 2618 """ 2619 privateCert = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR) 2620 cert = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) 2621 2622 mt = sslverify.trustRootFromCertificates([privateCert, cert]) 2623 2624 # Verify that the returned object acts correctly when used as a 2625 # trustRoot= param to optionsForClientTLS. 2626 sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory( 2627 trustRoot=mt, 2628 privateKey=privateCert.privateKey.original, 2629 serverCertificate=privateCert.original, 2630 ) 2631 2632 # This connection should succeed 2633 self.assertEqual(cWrap.data, b"greetings!") 2634 self.assertIsNone(cWrap.lostReason) 2635 2636 def test_trustRootSelfSignedServerCertificate(self): 2637 """ 2638 L{trustRootFromCertificates} called with a single self-signed 2639 certificate will cause L{optionsForClientTLS} to accept client 2640 connections to a server with that certificate. 2641 """ 2642 key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server") 2643 selfSigned = sslverify.PrivateCertificate.fromCertificateAndKeyPair( 2644 sslverify.Certificate(cert), 2645 sslverify.KeyPair(key), 2646 ) 2647 2648 trust = sslverify.trustRootFromCertificates([selfSigned]) 2649 2650 # Since we trust this exact certificate, connections to this server 2651 # should succeed. 2652 sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory( 2653 trustRoot=trust, 2654 privateKey=selfSigned.privateKey.original, 2655 serverCertificate=selfSigned.original, 2656 ) 2657 self.assertEqual(cWrap.data, b"greetings!") 2658 self.assertIsNone(cWrap.lostReason) 2659 2660 def test_trustRootCertificateAuthorityTrustsConnection(self): 2661 """ 2662 L{trustRootFromCertificates} called with certificate A will cause 2663 L{optionsForClientTLS} to accept client connections to a server with 2664 certificate B where B is signed by A. 2665 """ 2666 caCert, serverCert = certificatesForAuthorityAndServer() 2667 2668 trust = sslverify.trustRootFromCertificates([caCert]) 2669 2670 # Since we've listed the CA's certificate as a trusted cert, a 2671 # connection to the server certificate it signed should succeed. 2672 sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory( 2673 trustRoot=trust, 2674 privateKey=serverCert.privateKey.original, 2675 serverCertificate=serverCert.original, 2676 ) 2677 self.assertEqual(cWrap.data, b"greetings!") 2678 self.assertIsNone(cWrap.lostReason) 2679 2680 def test_trustRootFromCertificatesUntrusted(self): 2681 """ 2682 L{trustRootFromCertificates} called with certificate A will cause 2683 L{optionsForClientTLS} to disallow any connections to a server with 2684 certificate B where B is not signed by A. 2685 """ 2686 key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server") 2687 serverCert = sslverify.PrivateCertificate.fromCertificateAndKeyPair( 2688 sslverify.Certificate(cert), 2689 sslverify.KeyPair(key), 2690 ) 2691 untrustedCert = sslverify.Certificate( 2692 makeCertificate(O=b"CA Test Certificate", CN=b"unknown CA")[1] 2693 ) 2694 2695 trust = sslverify.trustRootFromCertificates([untrustedCert]) 2696 2697 # Since we only trust 'untrustedCert' which has not signed our 2698 # server's cert, we should reject this connection 2699 sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory( 2700 trustRoot=trust, 2701 privateKey=serverCert.privateKey.original, 2702 serverCertificate=serverCert.original, 2703 ) 2704 2705 # This connection should fail, so no data was received. 2706 self.assertEqual(cWrap.data, b"") 2707 2708 # It was an L{SSL.Error}. 2709 self.assertEqual(cWrap.lostReason.type, SSL.Error) 2710 2711 # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors. 2712 err = cWrap.lostReason.value 2713 self.assertEqual(err.args[0][0][2], "tlsv1 alert unknown ca") 2714 2715 def test_trustRootFromCertificatesOpenSSLObjects(self): 2716 """ 2717 L{trustRootFromCertificates} rejects any L{OpenSSL.crypto.X509} 2718 instances in the list passed to it. 2719 """ 2720 private = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR) 2721 certX509 = private.original 2722 2723 exception = self.assertRaises( 2724 TypeError, 2725 sslverify.trustRootFromCertificates, 2726 [certX509], 2727 ) 2728 self.assertEqual( 2729 "certificates items must be twisted.internet.ssl.CertBase " "instances", 2730 exception.args[0], 2731 ) 2732 2733 2734class OpenSSLCipherTests(TestCase): 2735 """ 2736 Tests for twisted.internet._sslverify.OpenSSLCipher. 2737 """ 2738 2739 if skipSSL: 2740 skip = skipSSL 2741 2742 cipherName = "CIPHER-STRING" 2743 2744 def test_constructorSetsFullName(self): 2745 """ 2746 The first argument passed to the constructor becomes the full name. 2747 """ 2748 self.assertEqual( 2749 self.cipherName, sslverify.OpenSSLCipher(self.cipherName).fullName 2750 ) 2751 2752 def test_repr(self): 2753 """ 2754 C{repr(cipher)} returns a valid constructor call. 2755 """ 2756 cipher = sslverify.OpenSSLCipher(self.cipherName) 2757 self.assertEqual( 2758 cipher, eval(repr(cipher), {"OpenSSLCipher": sslverify.OpenSSLCipher}) 2759 ) 2760 2761 def test_eqSameClass(self): 2762 """ 2763 Equal type and C{fullName} means that the objects are equal. 2764 """ 2765 cipher1 = sslverify.OpenSSLCipher(self.cipherName) 2766 cipher2 = sslverify.OpenSSLCipher(self.cipherName) 2767 self.assertEqual(cipher1, cipher2) 2768 2769 def test_eqSameNameDifferentType(self): 2770 """ 2771 If ciphers have the same name but different types, they're still 2772 different. 2773 """ 2774 2775 class DifferentCipher: 2776 fullName = self.cipherName 2777 2778 self.assertNotEqual( 2779 sslverify.OpenSSLCipher(self.cipherName), 2780 DifferentCipher(), 2781 ) 2782 2783 2784class ExpandCipherStringTests(TestCase): 2785 """ 2786 Tests for twisted.internet._sslverify._expandCipherString. 2787 """ 2788 2789 if skipSSL: 2790 skip = skipSSL 2791 2792 def test_doesNotStumbleOverEmptyList(self): 2793 """ 2794 If the expanded cipher list is empty, an empty L{list} is returned. 2795 """ 2796 self.assertEqual( 2797 tuple(), sslverify._expandCipherString("", SSL.SSLv23_METHOD, 0) 2798 ) 2799 2800 def test_doesNotSwallowOtherSSLErrors(self): 2801 """ 2802 Only no cipher matches get swallowed, every other SSL error gets 2803 propagated. 2804 """ 2805 2806 def raiser(_): 2807 # Unfortunately, there seems to be no way to trigger a real SSL 2808 # error artificially. 2809 raise SSL.Error([["", "", ""]]) 2810 2811 ctx = FakeContext(SSL.SSLv23_METHOD) 2812 ctx.set_cipher_list = raiser 2813 self.patch(sslverify.SSL, "Context", lambda _: ctx) 2814 self.assertRaises( 2815 SSL.Error, sslverify._expandCipherString, "ALL", SSL.SSLv23_METHOD, 0 2816 ) 2817 2818 def test_returnsTupleOfICiphers(self): 2819 """ 2820 L{sslverify._expandCipherString} always returns a L{tuple} of 2821 L{interfaces.ICipher}. 2822 """ 2823 ciphers = sslverify._expandCipherString("ALL", SSL.SSLv23_METHOD, 0) 2824 self.assertIsInstance(ciphers, tuple) 2825 bogus = [] 2826 for c in ciphers: 2827 if not interfaces.ICipher.providedBy(c): 2828 bogus.append(c) 2829 2830 self.assertEqual([], bogus) 2831 2832 2833class AcceptableCiphersTests(TestCase): 2834 """ 2835 Tests for twisted.internet._sslverify.OpenSSLAcceptableCiphers. 2836 """ 2837 2838 if skipSSL: 2839 skip = skipSSL 2840 2841 def test_selectOnEmptyListReturnsEmptyList(self): 2842 """ 2843 If no ciphers are available, nothing can be selected. 2844 """ 2845 ac = sslverify.OpenSSLAcceptableCiphers(tuple()) 2846 self.assertEqual(tuple(), ac.selectCiphers(tuple())) 2847 2848 def test_selectReturnsOnlyFromAvailable(self): 2849 """ 2850 Select only returns a cross section of what is available and what is 2851 desirable. 2852 """ 2853 ac = sslverify.OpenSSLAcceptableCiphers( 2854 [ 2855 sslverify.OpenSSLCipher("A"), 2856 sslverify.OpenSSLCipher("B"), 2857 ] 2858 ) 2859 self.assertEqual( 2860 (sslverify.OpenSSLCipher("B"),), 2861 ac.selectCiphers( 2862 [sslverify.OpenSSLCipher("B"), sslverify.OpenSSLCipher("C")] 2863 ), 2864 ) 2865 2866 def test_fromOpenSSLCipherStringExpandsToTupleOfCiphers(self): 2867 """ 2868 If L{sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString} is 2869 called it expands the string to a tuple of ciphers. 2870 """ 2871 ac = sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString("ALL") 2872 self.assertIsInstance(ac._ciphers, tuple) 2873 self.assertTrue(all(sslverify.ICipher.providedBy(c) for c in ac._ciphers)) 2874 2875 2876class DiffieHellmanParametersTests(TestCase): 2877 """ 2878 Tests for twisted.internet._sslverify.OpenSSLDHParameters. 2879 """ 2880 2881 if skipSSL: 2882 skip = skipSSL 2883 filePath = FilePath(b"dh.params") 2884 2885 def test_fromFile(self): 2886 """ 2887 Calling C{fromFile} with a filename returns an instance with that file 2888 name saved. 2889 """ 2890 params = sslverify.OpenSSLDiffieHellmanParameters.fromFile(self.filePath) 2891 self.assertEqual(self.filePath, params._dhFile) 2892 2893 2894class FakeLibState: 2895 """ 2896 State for L{FakeLib} 2897 2898 @param setECDHAutoRaises: An exception 2899 L{FakeLib.SSL_CTX_set_ecdh_auto} should raise; if L{None}, 2900 nothing is raised. 2901 2902 @ivar ecdhContexts: A list of SSL contexts with which 2903 L{FakeLib.SSL_CTX_set_ecdh_auto} was called 2904 @type ecdhContexts: L{list} of L{OpenSSL.SSL.Context}s 2905 2906 @ivar ecdhValues: A list of boolean values with which 2907 L{FakeLib.SSL_CTX_set_ecdh_auto} was called 2908 @type ecdhValues: L{list} of L{boolean}s 2909 """ 2910 2911 __slots__ = ("setECDHAutoRaises", "ecdhContexts", "ecdhValues") 2912 2913 def __init__(self, setECDHAutoRaises): 2914 self.setECDHAutoRaises = setECDHAutoRaises 2915 self.ecdhContexts = [] 2916 self.ecdhValues = [] 2917 2918 2919class FakeLib: 2920 """ 2921 An introspectable fake of cryptography's lib object. 2922 2923 @param state: A L{FakeLibState} instance that contains this fake's 2924 state. 2925 """ 2926 2927 def __init__(self, state): 2928 self._state = state 2929 2930 def SSL_CTX_set_ecdh_auto(self, ctx, value): 2931 """ 2932 Record the context and value under in the C{_state} instance 2933 variable. 2934 2935 @see: L{FakeLibState} 2936 2937 @param ctx: An SSL context. 2938 @type ctx: L{OpenSSL.SSL.Context} 2939 2940 @param value: A boolean value 2941 @type value: L{bool} 2942 """ 2943 self._state.ecdhContexts.append(ctx) 2944 self._state.ecdhValues.append(value) 2945 if self._state.setECDHAutoRaises is not None: 2946 raise self._state.setECDHAutoRaises 2947 2948 2949class FakeLibTests(TestCase): 2950 """ 2951 Tests for L{FakeLib}. 2952 """ 2953 2954 def test_SSL_CTX_set_ecdh_auto(self): 2955 """ 2956 L{FakeLib.SSL_CTX_set_ecdh_auto} records context and value it 2957 was called with. 2958 """ 2959 state = FakeLibState(setECDHAutoRaises=None) 2960 lib = FakeLib(state) 2961 self.assertNot(state.ecdhContexts) 2962 self.assertNot(state.ecdhValues) 2963 2964 context, value = "CONTEXT", True 2965 lib.SSL_CTX_set_ecdh_auto(context, value) 2966 self.assertEqual(state.ecdhContexts, [context]) 2967 self.assertEqual(state.ecdhValues, [True]) 2968 2969 def test_SSL_CTX_set_ecdh_autoRaises(self): 2970 """ 2971 L{FakeLib.SSL_CTX_set_ecdh_auto} raises the exception provided 2972 by its state, while still recording its arguments. 2973 """ 2974 state = FakeLibState(setECDHAutoRaises=ValueError) 2975 lib = FakeLib(state) 2976 self.assertNot(state.ecdhContexts) 2977 self.assertNot(state.ecdhValues) 2978 2979 context, value = "CONTEXT", True 2980 self.assertRaises(ValueError, lib.SSL_CTX_set_ecdh_auto, context, value) 2981 self.assertEqual(state.ecdhContexts, [context]) 2982 self.assertEqual(state.ecdhValues, [True]) 2983 2984 2985class FakeCryptoState: 2986 """ 2987 State for L{FakeCrypto} 2988 2989 @param getEllipticCurveRaises: What 2990 L{FakeCrypto.get_elliptic_curve} should raise; L{None} and it 2991 won't raise anything 2992 2993 @param getEllipticCurveReturns: What 2994 L{FakeCrypto.get_elliptic_curve} should return. 2995 2996 @ivar getEllipticCurveCalls: The arguments with which 2997 L{FakeCrypto.get_elliptic_curve} has been called. 2998 @type getEllipticCurveCalls: L{list} 2999 """ 3000 3001 __slots__ = ( 3002 "getEllipticCurveRaises", 3003 "getEllipticCurveReturns", 3004 "getEllipticCurveCalls", 3005 ) 3006 3007 def __init__( 3008 self, 3009 getEllipticCurveRaises, 3010 getEllipticCurveReturns, 3011 ): 3012 self.getEllipticCurveRaises = getEllipticCurveRaises 3013 self.getEllipticCurveReturns = getEllipticCurveReturns 3014 self.getEllipticCurveCalls = [] 3015 3016 3017class FakeCrypto: 3018 """ 3019 An introspectable fake of pyOpenSSL's L{OpenSSL.crypto} module. 3020 3021 @ivar state: A L{FakeCryptoState} instance 3022 """ 3023 3024 def __init__(self, state): 3025 self._state = state 3026 3027 def get_elliptic_curve(self, curve): 3028 """ 3029 A fake that records the curve with which it was called. 3030 3031 @param curve: see L{crypto.get_elliptic_curve} 3032 3033 @return: see L{FakeCryptoState.getEllipticCurveReturns} 3034 @raises: see L{FakeCryptoState.getEllipticCurveRaises} 3035 """ 3036 self._state.getEllipticCurveCalls.append(curve) 3037 if self._state.getEllipticCurveRaises is not None: 3038 raise self._state.getEllipticCurveRaises 3039 return self._state.getEllipticCurveReturns 3040 3041 3042class FakeCryptoTests(SynchronousTestCase): 3043 """ 3044 Tests for L{FakeCrypto}. 3045 """ 3046 3047 def test_get_elliptic_curveRecordsArgument(self): 3048 """ 3049 L{FakeCrypto.test_get_elliptic_curve} records the curve with 3050 which it was called. 3051 """ 3052 state = FakeCryptoState( 3053 getEllipticCurveRaises=None, 3054 getEllipticCurveReturns=None, 3055 ) 3056 crypto = FakeCrypto(state) 3057 crypto.get_elliptic_curve("a curve name") 3058 self.assertEqual(state.getEllipticCurveCalls, ["a curve name"]) 3059 3060 def test_get_elliptic_curveReturns(self): 3061 """ 3062 L{FakeCrypto.test_get_elliptic_curve} returns the value 3063 specified by its state object and records what it was called 3064 with. 3065 """ 3066 returnValue = "object" 3067 state = FakeCryptoState( 3068 getEllipticCurveRaises=None, 3069 getEllipticCurveReturns=returnValue, 3070 ) 3071 crypto = FakeCrypto(state) 3072 self.assertIs( 3073 crypto.get_elliptic_curve("another curve name"), 3074 returnValue, 3075 ) 3076 self.assertEqual(state.getEllipticCurveCalls, ["another curve name"]) 3077 3078 def test_get_elliptic_curveRaises(self): 3079 """ 3080 L{FakeCrypto.test_get_elliptic_curve} raises the exception 3081 specified by its state object. 3082 """ 3083 state = FakeCryptoState( 3084 getEllipticCurveRaises=ValueError, getEllipticCurveReturns=None 3085 ) 3086 crypto = FakeCrypto(state) 3087 self.assertRaises( 3088 ValueError, 3089 crypto.get_elliptic_curve, 3090 "yet another curve name", 3091 ) 3092 self.assertEqual( 3093 state.getEllipticCurveCalls, 3094 ["yet another curve name"], 3095 ) 3096 3097 3098class ChooseDiffieHellmanEllipticCurveTests(SynchronousTestCase): 3099 """ 3100 Tests for L{sslverify._ChooseDiffieHellmanEllipticCurve}. 3101 3102 @cvar OPENSSL_110: A version number for OpenSSL 1.1.0 3103 3104 @cvar OPENSSL_102: A version number for OpenSSL 1.0.2 3105 3106 @cvar OPENSSL_101: A version number for OpenSSL 1.0.1 3107 3108 @see: 3109 U{https://wiki.openssl.org/index.php/Manual:OPENSSL_VERSION_NUMBER(3)} 3110 """ 3111 3112 if skipSSL: 3113 skip = skipSSL 3114 3115 OPENSSL_110 = 0x1010007F 3116 OPENSSL_102 = 0x100020EF 3117 OPENSSL_101 = 0x1000114F 3118 3119 def setUp(self): 3120 self.libState = FakeLibState(setECDHAutoRaises=False) 3121 self.lib = FakeLib(self.libState) 3122 3123 self.cryptoState = FakeCryptoState( 3124 getEllipticCurveReturns=None, getEllipticCurveRaises=None 3125 ) 3126 self.crypto = FakeCrypto(self.cryptoState) 3127 self.context = FakeContext(SSL.SSLv23_METHOD) 3128 3129 def test_openSSL110(self): 3130 """ 3131 No configuration of contexts occurs under OpenSSL 1.1.0 and 3132 later, because they create contexts with secure ECDH curves. 3133 3134 @see: U{http://twistedmatrix.com/trac/ticket/9210} 3135 """ 3136 chooser = sslverify._ChooseDiffieHellmanEllipticCurve( 3137 self.OPENSSL_110, 3138 openSSLlib=self.lib, 3139 openSSLcrypto=self.crypto, 3140 ) 3141 chooser.configureECDHCurve(self.context) 3142 3143 self.assertFalse(self.libState.ecdhContexts) 3144 self.assertFalse(self.libState.ecdhValues) 3145 self.assertFalse(self.cryptoState.getEllipticCurveCalls) 3146 self.assertIsNone(self.context._ecCurve) 3147 3148 def test_openSSL102(self): 3149 """ 3150 OpenSSL 1.0.2 does not set ECDH curves by default, but 3151 C{SSL_CTX_set_ecdh_auto} requests that a context choose a 3152 secure set curves automatically. 3153 """ 3154 context = SSL.Context(SSL.SSLv23_METHOD) 3155 chooser = sslverify._ChooseDiffieHellmanEllipticCurve( 3156 self.OPENSSL_102, 3157 openSSLlib=self.lib, 3158 openSSLcrypto=self.crypto, 3159 ) 3160 chooser.configureECDHCurve(context) 3161 3162 self.assertEqual(self.libState.ecdhContexts, [context._context]) 3163 self.assertEqual(self.libState.ecdhValues, [True]) 3164 self.assertFalse(self.cryptoState.getEllipticCurveCalls) 3165 self.assertIsNone(self.context._ecCurve) 3166 3167 def test_openSSL102SetECDHAutoRaises(self): 3168 """ 3169 An exception raised by C{SSL_CTX_set_ecdh_auto} under OpenSSL 3170 1.0.2 is suppressed because ECDH is best-effort. 3171 """ 3172 self.libState.setECDHAutoRaises = BaseException 3173 context = SSL.Context(SSL.SSLv23_METHOD) 3174 chooser = sslverify._ChooseDiffieHellmanEllipticCurve( 3175 self.OPENSSL_102, 3176 openSSLlib=self.lib, 3177 openSSLcrypto=self.crypto, 3178 ) 3179 chooser.configureECDHCurve(context) 3180 3181 self.assertEqual(self.libState.ecdhContexts, [context._context]) 3182 self.assertEqual(self.libState.ecdhValues, [True]) 3183 self.assertFalse(self.cryptoState.getEllipticCurveCalls) 3184 3185 def test_openSSL101(self): 3186 """ 3187 OpenSSL 1.0.1 does not set ECDH curves by default, nor does 3188 it expose L{SSL_CTX_set_ecdh_auto}. Instead, a single ECDH 3189 curve can be set with L{OpenSSL.SSL.Context.set_tmp_ecdh}. 3190 """ 3191 self.cryptoState.getEllipticCurveReturns = curve = "curve object" 3192 chooser = sslverify._ChooseDiffieHellmanEllipticCurve( 3193 self.OPENSSL_101, 3194 openSSLlib=self.lib, 3195 openSSLcrypto=self.crypto, 3196 ) 3197 chooser.configureECDHCurve(self.context) 3198 3199 self.assertFalse(self.libState.ecdhContexts) 3200 self.assertFalse(self.libState.ecdhValues) 3201 self.assertEqual( 3202 self.cryptoState.getEllipticCurveCalls, 3203 [sslverify._defaultCurveName], 3204 ) 3205 self.assertIs(self.context._ecCurve, curve) 3206 3207 def test_openSSL101SetECDHRaises(self): 3208 """ 3209 An exception raised by L{OpenSSL.SSL.Context.set_tmp_ecdh} 3210 under OpenSSL 1.0.1 is suppressed because ECHDE is best-effort. 3211 """ 3212 3213 def set_tmp_ecdh(ctx): 3214 raise BaseException 3215 3216 self.context.set_tmp_ecdh = set_tmp_ecdh 3217 3218 chooser = sslverify._ChooseDiffieHellmanEllipticCurve( 3219 self.OPENSSL_101, 3220 openSSLlib=self.lib, 3221 openSSLcrypto=self.crypto, 3222 ) 3223 chooser.configureECDHCurve(self.context) 3224 3225 self.assertFalse(self.libState.ecdhContexts) 3226 self.assertFalse(self.libState.ecdhValues) 3227 self.assertEqual( 3228 self.cryptoState.getEllipticCurveCalls, 3229 [sslverify._defaultCurveName], 3230 ) 3231 3232 def test_openSSL101NoECC(self): 3233 """ 3234 Contexts created under an OpenSSL 1.0.1 that doesn't support 3235 ECC have no configuration applied. 3236 """ 3237 self.cryptoState.getEllipticCurveRaises = ValueError 3238 chooser = sslverify._ChooseDiffieHellmanEllipticCurve( 3239 self.OPENSSL_101, 3240 openSSLlib=self.lib, 3241 openSSLcrypto=self.crypto, 3242 ) 3243 chooser.configureECDHCurve(self.context) 3244 3245 self.assertFalse(self.libState.ecdhContexts) 3246 self.assertFalse(self.libState.ecdhValues) 3247 self.assertIsNone(self.context._ecCurve) 3248 3249 3250class KeyPairTests(TestCase): 3251 """ 3252 Tests for L{sslverify.KeyPair}. 3253 """ 3254 3255 if skipSSL: 3256 skip = skipSSL 3257 3258 def setUp(self): 3259 """ 3260 Create test certificate. 3261 """ 3262 self.sKey = makeCertificate(O=b"Server Test Certificate", CN=b"server")[0] 3263 3264 def test_getstateDeprecation(self): 3265 """ 3266 L{sslverify.KeyPair.__getstate__} is deprecated. 3267 """ 3268 self.callDeprecated( 3269 (Version("Twisted", 15, 0, 0), "a real persistence system"), 3270 sslverify.KeyPair(self.sKey).__getstate__, 3271 ) 3272 3273 def test_setstateDeprecation(self): 3274 """ 3275 {sslverify.KeyPair.__setstate__} is deprecated. 3276 """ 3277 state = sslverify.KeyPair(self.sKey).dump() 3278 self.callDeprecated( 3279 (Version("Twisted", 15, 0, 0), "a real persistence system"), 3280 sslverify.KeyPair(self.sKey).__setstate__, 3281 state, 3282 ) 3283 3284 def test_noTrailingNewlinePemCert(self): 3285 noTrailingNewlineKeyPemPath = getModule("twisted.test").filePath.sibling( 3286 "cert.pem.no_trailing_newline" 3287 ) 3288 3289 certPEM = noTrailingNewlineKeyPemPath.getContent() 3290 ssl.Certificate.loadPEM(certPEM) 3291 3292 3293class SelectVerifyImplementationTests(SynchronousTestCase): 3294 """ 3295 Tests for L{_selectVerifyImplementation}. 3296 """ 3297 3298 if skipSSL: 3299 skip = skipSSL 3300 3301 def test_dependencyMissing(self): 3302 """ 3303 If I{service_identity} cannot be imported then 3304 L{_selectVerifyImplementation} returns L{simpleVerifyHostname} and 3305 L{SimpleVerificationError}. 3306 """ 3307 with SetAsideModule("service_identity"): 3308 sys.modules["service_identity"] = None 3309 3310 result = sslverify._selectVerifyImplementation() 3311 expected = ( 3312 sslverify.simpleVerifyHostname, 3313 sslverify.simpleVerifyIPAddress, 3314 sslverify.SimpleVerificationError, 3315 ) 3316 self.assertEqual(expected, result) 3317 3318 test_dependencyMissing.suppress = [ # type: ignore[attr-defined] 3319 util.suppress( 3320 message=( 3321 "You do not have a working installation of the " 3322 "service_identity module" 3323 ), 3324 ), 3325 ] 3326 3327 def test_dependencyMissingWarning(self): 3328 """ 3329 If I{service_identity} cannot be imported then 3330 L{_selectVerifyImplementation} emits a L{UserWarning} advising the user 3331 of the exact error. 3332 """ 3333 with SetAsideModule("service_identity"): 3334 sys.modules["service_identity"] = None 3335 3336 sslverify._selectVerifyImplementation() 3337 3338 [warning] = list( 3339 warning 3340 for warning in self.flushWarnings() 3341 if warning["category"] == UserWarning 3342 ) 3343 3344 importErrors = [ 3345 # Python 3.6.3 3346 "'import of service_identity halted; None in sys.modules'", 3347 # Python 3 3348 "'import of 'service_identity' halted; None in sys.modules'", 3349 # Python 2 3350 "'No module named service_identity'", 3351 ] 3352 3353 expectedMessages = [] 3354 for importError in importErrors: 3355 expectedMessages.append( 3356 "You do not have a working installation of the " 3357 "service_identity module: {message}. Please install it from " 3358 "<https://pypi.python.org/pypi/service_identity> " 3359 "and make sure all of its dependencies are satisfied. " 3360 "Without the service_identity module, Twisted can perform only" 3361 " rudimentary TLS client hostname verification. Many valid " 3362 "certificate/hostname mappings may be rejected.".format( 3363 message=importError 3364 ) 3365 ) 3366 3367 self.assertIn(warning["message"], expectedMessages) 3368 3369 # Make sure we're abusing the warning system to a sufficient 3370 # degree: there is no filename or line number that makes sense for 3371 # this warning to "blame" for the problem. It is a system 3372 # misconfiguration. So the location information should be blank 3373 # (or as blank as we can make it). 3374 self.assertEqual(warning["filename"], "") 3375 self.assertEqual(warning["lineno"], 0) 3376