1# Copyright 2005 Divmod, Inc.  See LICENSE file for details
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Tests for L{twisted.internet._sslverify}.
7"""
8
9
10import datetime
11import itertools
12import sys
13from unittest import skipIf
14
15from zope.interface import implementer
16
17from incremental import Version
18
19from twisted.internet import defer, interfaces, protocol, reactor
20from twisted.internet._idna import _idnaText
21from twisted.internet.error import CertificateError, ConnectionClosed, ConnectionLost
22from twisted.python.compat import nativeString
23from twisted.python.filepath import FilePath
24from twisted.python.modules import getModule
25from twisted.python.reflect import requireModule
26from twisted.test.iosim import connectedServerAndClient
27from twisted.test.test_twisted import SetAsideModule
28from twisted.trial import util
29from twisted.trial.unittest import SkipTest, SynchronousTestCase, TestCase
30
31skipSSL = ""
32skipSNI = ""
33skipNPN = ""
34skipALPN = ""
35
36if requireModule("OpenSSL"):
37    import ipaddress
38
39    from OpenSSL import SSL
40    from OpenSSL.crypto import FILETYPE_PEM, TYPE_RSA, X509, PKey, get_elliptic_curves
41
42    from cryptography import x509
43    from cryptography.hazmat.backends import default_backend
44    from cryptography.hazmat.primitives import hashes
45    from cryptography.hazmat.primitives.asymmetric import rsa
46    from cryptography.hazmat.primitives.serialization import (
47        Encoding,
48        NoEncryption,
49        PrivateFormat,
50    )
51    from cryptography.x509.oid import NameOID
52
53    from twisted.internet import ssl
54
55    try:
56        ctx = SSL.Context(SSL.SSLv23_METHOD)
57        ctx.set_npn_advertise_callback(lambda c: None)
58    except (NotImplementedError, AttributeError):
59        skipNPN = (
60            "NPN is deprecated (and OpenSSL 1.0.1 or greater required for NPN"
61            " support)"
62        )
63
64    try:
65        ctx = SSL.Context(SSL.SSLv23_METHOD)
66        ctx.set_alpn_select_callback(lambda c: None)  # type: ignore[misc,arg-type]
67    except NotImplementedError:
68        skipALPN = "OpenSSL 1.0.2 or greater required for ALPN support"
69else:
70    skipSSL = "OpenSSL is required for SSL tests."
71    skipSNI = skipSSL
72    skipNPN = skipSSL
73    skipALPN = skipSSL
74
75if not skipSSL:
76    from twisted.internet import _sslverify as sslverify
77    from twisted.internet.ssl import VerificationError, platformTrust
78    from twisted.protocols.tls import TLSMemoryBIOFactory
79
80
81# A couple of static PEM-format certificates to be used by various tests.
82A_HOST_CERTIFICATE_PEM = """
83-----BEGIN CERTIFICATE-----
84        MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG
85        A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
86        MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
87        dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x
88        ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw
89        OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy
90        aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4
91        IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v
92        Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ
93        KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp
94        8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi
95        KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ
96        VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj
97        JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO
98        S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls
99        fXzCWdG0O/3Lk2SRM0I=
100-----END CERTIFICATE-----
101"""
102
103A_PEER_CERTIFICATE_PEM = """
104-----BEGIN CERTIFICATE-----
105        MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG
106        A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
107        MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
108        dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv
109        bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw
110        MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h
111        dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy
112        aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa
113        c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf
114        MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4
115        CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE
116        JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ
117        e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA
118        vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg
119        i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr
120        yqDtGhklsWW3ZwBzEh5VEOUp
121-----END CERTIFICATE-----
122"""
123
124A_KEYPAIR = getModule(__name__).filePath.sibling("server.pem").getContent()
125
126
127def counter(counter=itertools.count()):
128    """
129    Each time we're called, return the next integer in the natural numbers.
130    """
131    return next(counter)
132
133
134def makeCertificate(**kw):
135    keypair = PKey()
136    keypair.generate_key(TYPE_RSA, 2048)
137
138    certificate = X509()
139    certificate.gmtime_adj_notBefore(0)
140    certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365)  # One year
141    for xname in certificate.get_issuer(), certificate.get_subject():
142        for (k, v) in kw.items():
143            setattr(xname, k, nativeString(v))
144
145    certificate.set_serial_number(counter())
146    certificate.set_pubkey(keypair)
147    certificate.sign(keypair, "md5")
148
149    return keypair, certificate
150
151
152def certificatesForAuthorityAndServer(serviceIdentity="example.com"):
153    """
154    Create a self-signed CA certificate and server certificate signed by the
155    CA.
156
157    @param serviceIdentity: The identity (hostname) of the server.
158    @type serviceIdentity: L{unicode}
159
160    @return: a 2-tuple of C{(certificate_authority_certificate,
161        server_certificate)}
162    @rtype: L{tuple} of (L{sslverify.Certificate},
163        L{sslverify.PrivateCertificate})
164    """
165    commonNameForCA = x509.Name(
166        [x509.NameAttribute(NameOID.COMMON_NAME, "Testing Example CA")]
167    )
168    commonNameForServer = x509.Name(
169        [x509.NameAttribute(NameOID.COMMON_NAME, "Testing Example Server")]
170    )
171    oneDay = datetime.timedelta(1, 0, 0)
172    privateKeyForCA = rsa.generate_private_key(
173        public_exponent=65537, key_size=4096, backend=default_backend()
174    )
175    publicKeyForCA = privateKeyForCA.public_key()
176    caCertificate = (
177        x509.CertificateBuilder()
178        .subject_name(commonNameForCA)
179        .issuer_name(commonNameForCA)
180        .not_valid_before(datetime.datetime.today() - oneDay)
181        .not_valid_after(datetime.datetime.today() + oneDay)
182        .serial_number(x509.random_serial_number())
183        .public_key(publicKeyForCA)
184        .add_extension(
185            x509.BasicConstraints(ca=True, path_length=9),
186            critical=True,
187        )
188        .sign(
189            private_key=privateKeyForCA,
190            algorithm=hashes.SHA256(),
191            backend=default_backend(),
192        )
193    )
194
195    privateKeyForServer = rsa.generate_private_key(
196        public_exponent=65537, key_size=4096, backend=default_backend()
197    )
198    publicKeyForServer = privateKeyForServer.public_key()
199
200    try:
201        ipAddress = ipaddress.ip_address(serviceIdentity)
202    except ValueError:
203        subjectAlternativeNames = [
204            x509.DNSName(serviceIdentity.encode("idna").decode("ascii"))
205        ]
206    else:
207        subjectAlternativeNames = [x509.IPAddress(ipAddress)]
208
209    serverCertificate = (
210        x509.CertificateBuilder()
211        .subject_name(commonNameForServer)
212        .issuer_name(commonNameForCA)
213        .not_valid_before(datetime.datetime.today() - oneDay)
214        .not_valid_after(datetime.datetime.today() + oneDay)
215        .serial_number(x509.random_serial_number())
216        .public_key(publicKeyForServer)
217        .add_extension(
218            x509.BasicConstraints(ca=False, path_length=None),
219            critical=True,
220        )
221        .add_extension(
222            x509.SubjectAlternativeName(subjectAlternativeNames),
223            critical=True,
224        )
225        .sign(
226            private_key=privateKeyForCA,
227            algorithm=hashes.SHA256(),
228            backend=default_backend(),
229        )
230    )
231    caSelfCert = sslverify.Certificate.loadPEM(caCertificate.public_bytes(Encoding.PEM))
232    serverCert = sslverify.PrivateCertificate.loadPEM(
233        b"\n".join(
234            [
235                privateKeyForServer.private_bytes(
236                    Encoding.PEM,
237                    PrivateFormat.TraditionalOpenSSL,
238                    NoEncryption(),
239                ),
240                serverCertificate.public_bytes(Encoding.PEM),
241            ]
242        )
243    )
244
245    return caSelfCert, serverCert
246
247
248def _loopbackTLSConnection(serverOpts, clientOpts):
249    """
250    Common implementation code for both L{loopbackTLSConnection} and
251    L{loopbackTLSConnectionInMemory}. Creates a loopback TLS connection
252    using the provided server and client context factories.
253
254    @param serverOpts: An OpenSSL context factory for the server.
255    @type serverOpts: C{OpenSSLCertificateOptions}, or any class with an
256        equivalent API.
257
258    @param clientOpts: An OpenSSL context factory for the client.
259    @type clientOpts: C{OpenSSLCertificateOptions}, or any class with an
260        equivalent API.
261
262    @return: 5-tuple of server-tls-protocol, server-inner-protocol,
263        client-tls-protocol, client-inner-protocol and L{IOPump}
264    @rtype: L{tuple}
265    """
266
267    class GreetingServer(protocol.Protocol):
268        greeting = b"greetings!"
269
270        def connectionMade(self):
271            self.transport.write(self.greeting)
272
273    class ListeningClient(protocol.Protocol):
274        data = b""
275        lostReason = None
276
277        def dataReceived(self, data):
278            self.data += data
279
280        def connectionLost(self, reason):
281            self.lostReason = reason
282
283    clientWrappedProto = ListeningClient()
284    serverWrappedProto = GreetingServer()
285
286    plainClientFactory = protocol.Factory()
287    plainClientFactory.protocol = lambda: clientWrappedProto
288    plainServerFactory = protocol.Factory()
289    plainServerFactory.protocol = lambda: serverWrappedProto
290
291    clientFactory = TLSMemoryBIOFactory(
292        clientOpts, isClient=True, wrappedFactory=plainServerFactory
293    )
294    serverFactory = TLSMemoryBIOFactory(
295        serverOpts, isClient=False, wrappedFactory=plainClientFactory
296    )
297
298    sProto, cProto, pump = connectedServerAndClient(
299        lambda: serverFactory.buildProtocol(None),
300        lambda: clientFactory.buildProtocol(None),
301    )
302    return sProto, cProto, serverWrappedProto, clientWrappedProto, pump
303
304
305def loopbackTLSConnection(trustRoot, privateKeyFile, chainedCertFile=None):
306    """
307    Create a loopback TLS connection with the given trust and keys.
308
309    @param trustRoot: the C{trustRoot} argument for the client connection's
310        context.
311    @type trustRoot: L{sslverify.IOpenSSLTrustRoot}
312
313    @param privateKeyFile: The name of the file containing the private key.
314    @type privateKeyFile: L{str} (native string; file name)
315
316    @param chainedCertFile: The name of the chained certificate file.
317    @type chainedCertFile: L{str} (native string; file name)
318
319    @return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
320    @rtype: L{tuple}
321    """
322
323    class ContextFactory:
324        def getContext(self):
325            """
326            Create a context for the server side of the connection.
327
328            @return: an SSL context using a certificate and key.
329            @rtype: C{OpenSSL.SSL.Context}
330            """
331            ctx = SSL.Context(SSL.SSLv23_METHOD)
332            if chainedCertFile is not None:
333                ctx.use_certificate_chain_file(chainedCertFile)
334            ctx.use_privatekey_file(privateKeyFile)
335            # Let the test author know if they screwed something up.
336            ctx.check_privatekey()
337            return ctx
338
339    serverOpts = ContextFactory()
340    clientOpts = sslverify.OpenSSLCertificateOptions(trustRoot=trustRoot)
341
342    return _loopbackTLSConnection(serverOpts, clientOpts)
343
344
345def loopbackTLSConnectionInMemory(
346    trustRoot,
347    privateKey,
348    serverCertificate,
349    clientProtocols=None,
350    serverProtocols=None,
351    clientOptions=None,
352):
353    """
354    Create a loopback TLS connection with the given trust and keys. Like
355    L{loopbackTLSConnection}, but using in-memory certificates and keys rather
356    than writing them to disk.
357
358    @param trustRoot: the C{trustRoot} argument for the client connection's
359        context.
360    @type trustRoot: L{sslverify.IOpenSSLTrustRoot}
361
362    @param privateKey: The private key.
363    @type privateKey: L{str} (native string)
364
365    @param serverCertificate: The certificate used by the server.
366    @type chainedCertFile: L{str} (native string)
367
368    @param clientProtocols: The protocols the client is willing to negotiate
369        using NPN/ALPN.
370
371    @param serverProtocols: The protocols the server is willing to negotiate
372        using NPN/ALPN.
373
374    @param clientOptions: The type of C{OpenSSLCertificateOptions} class to
375        use for the client. Defaults to C{OpenSSLCertificateOptions}.
376
377    @return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
378    @rtype: L{tuple}
379    """
380    if clientOptions is None:
381        clientOptions = sslverify.OpenSSLCertificateOptions
382
383    clientCertOpts = clientOptions(
384        trustRoot=trustRoot, acceptableProtocols=clientProtocols
385    )
386    serverCertOpts = sslverify.OpenSSLCertificateOptions(
387        privateKey=privateKey,
388        certificate=serverCertificate,
389        acceptableProtocols=serverProtocols,
390    )
391
392    return _loopbackTLSConnection(serverCertOpts, clientCertOpts)
393
394
395def pathContainingDumpOf(testCase, *dumpables):
396    """
397    Create a temporary file to store some serializable-as-PEM objects in, and
398    return its name.
399
400    @param testCase: a test case to use for generating a temporary directory.
401    @type testCase: L{twisted.trial.unittest.TestCase}
402
403    @param dumpables: arguments are objects from pyOpenSSL with a C{dump}
404        method, taking a pyOpenSSL file-type constant, such as
405        L{OpenSSL.crypto.FILETYPE_PEM} or L{OpenSSL.crypto.FILETYPE_ASN1}.
406    @type dumpables: L{tuple} of L{object} with C{dump} method taking L{int}
407        returning L{bytes}
408
409    @return: the path to a file where all of the dumpables were dumped in PEM
410        format.
411    @rtype: L{str}
412    """
413    fname = testCase.mktemp()
414    with open(fname, "wb") as f:
415        for dumpable in dumpables:
416            f.write(dumpable.dump(FILETYPE_PEM))
417    return fname
418
419
420class DataCallbackProtocol(protocol.Protocol):
421    def dataReceived(self, data):
422        d, self.factory.onData = self.factory.onData, None
423        if d is not None:
424            d.callback(data)
425
426    def connectionLost(self, reason):
427        d, self.factory.onLost = self.factory.onLost, None
428        if d is not None:
429            d.errback(reason)
430
431
432class WritingProtocol(protocol.Protocol):
433    byte = b"x"
434
435    def connectionMade(self):
436        self.transport.write(self.byte)
437
438    def connectionLost(self, reason):
439        self.factory.onLost.errback(reason)
440
441
442class FakeContext:
443    """
444    Introspectable fake of an C{OpenSSL.SSL.Context}.
445
446    Saves call arguments for later introspection.
447
448    Necessary because C{Context} offers poor introspection.  cf. this
449    U{pyOpenSSL bug<https://bugs.launchpad.net/pyopenssl/+bug/1173899>}.
450
451    @ivar _method: See C{method} parameter of L{__init__}.
452
453    @ivar _options: L{int} of C{OR}ed values from calls of L{set_options}.
454
455    @ivar _certificate: Set by L{use_certificate}.
456
457    @ivar _privateKey: Set by L{use_privatekey}.
458
459    @ivar _verify: Set by L{set_verify}.
460
461    @ivar _verifyDepth: Set by L{set_verify_depth}.
462
463    @ivar _mode: Set by L{set_mode}.
464
465    @ivar _sessionID: Set by L{set_session_id}.
466
467    @ivar _extraCertChain: Accumulated L{list} of all extra certificates added
468        by L{add_extra_chain_cert}.
469
470    @ivar _cipherList: Set by L{set_cipher_list}.
471
472    @ivar _dhFilename: Set by L{load_tmp_dh}.
473
474    @ivar _defaultVerifyPathsSet: Set by L{set_default_verify_paths}
475
476    @ivar _ecCurve: Set by L{set_tmp_ecdh}
477    """
478
479    _options = 0
480
481    def __init__(self, method):
482        self._method = method
483        self._extraCertChain = []
484        self._defaultVerifyPathsSet = False
485        self._ecCurve = None
486
487        # Note that this value is explicitly documented as the default by
488        # https://www.openssl.org/docs/man1.1.1/man3/
489        # SSL_CTX_set_session_cache_mode.html
490        self._sessionCacheMode = SSL.SESS_CACHE_SERVER
491
492    def set_options(self, options):
493        self._options |= options
494
495    def use_certificate(self, certificate):
496        self._certificate = certificate
497
498    def use_privatekey(self, privateKey):
499        self._privateKey = privateKey
500
501    def check_privatekey(self):
502        return None
503
504    def set_mode(self, mode):
505        """
506        Set the mode. See L{SSL.Context.set_mode}.
507
508        @param mode: See L{SSL.Context.set_mode}.
509        """
510        self._mode = mode
511
512    def set_verify(self, flags, callback):
513        self._verify = flags, callback
514
515    def set_verify_depth(self, depth):
516        self._verifyDepth = depth
517
518    def set_session_id(self, sessionIDContext):
519        # This fake should change when the upstream changes:
520        # https://github.com/pyca/pyopenssl/issues/845
521        self._sessionIDContext = sessionIDContext
522
523    def set_session_cache_mode(self, cacheMode):
524        """
525        Set the session cache mode on the context, as per
526        L{SSL.Context.set_session_cache_mode}.
527        """
528        self._sessionCacheMode = cacheMode
529
530    def get_session_cache_mode(self):
531        """
532        Retrieve the session cache mode from the context, as per
533        L{SSL.Context.get_session_cache_mode}.
534        """
535        return self._sessionCacheMode
536
537    def add_extra_chain_cert(self, cert):
538        self._extraCertChain.append(cert)
539
540    def set_cipher_list(self, cipherList):
541        self._cipherList = cipherList
542
543    def load_tmp_dh(self, dhfilename):
544        self._dhFilename = dhfilename
545
546    def set_default_verify_paths(self):
547        """
548        Set the default paths for the platform.
549        """
550        self._defaultVerifyPathsSet = True
551
552    def set_tmp_ecdh(self, curve):
553        """
554        Set an ECDH curve.  Should only be called by OpenSSL 1.0.1
555        code.
556
557        @param curve: See L{OpenSSL.SSL.Context.set_tmp_ecdh}
558        """
559        self._ecCurve = curve
560
561
562class ClientOptionsTests(SynchronousTestCase):
563    """
564    Tests for L{sslverify.optionsForClientTLS}.
565    """
566
567    if skipSSL:
568        skip = skipSSL
569
570    def test_extraKeywords(self):
571        """
572        When passed a keyword parameter other than C{extraCertificateOptions},
573        L{sslverify.optionsForClientTLS} raises an exception just like a
574        normal Python function would.
575        """
576        error = self.assertRaises(
577            TypeError,
578            sslverify.optionsForClientTLS,
579            hostname="alpha",
580            someRandomThing="beta",
581        )
582        self.assertEqual(
583            str(error),
584            "optionsForClientTLS() got an unexpected keyword argument "
585            "'someRandomThing'",
586        )
587
588    def test_bytesFailFast(self):
589        """
590        If you pass L{bytes} as the hostname to
591        L{sslverify.optionsForClientTLS} it immediately raises a L{TypeError}.
592        """
593        error = self.assertRaises(
594            TypeError, sslverify.optionsForClientTLS, b"not-actually-a-hostname.com"
595        )
596        expectedText = (
597            "optionsForClientTLS requires text for host names, not " + bytes.__name__
598        )
599        self.assertEqual(str(error), expectedText)
600
601    def test_dNSNameHostname(self):
602        """
603        If you pass a dNSName to L{sslverify.optionsForClientTLS}
604        L{_hostnameIsDnsName} will be True
605        """
606        options = sslverify.optionsForClientTLS("example.com")
607        self.assertTrue(options._hostnameIsDnsName)
608
609    def test_IPv4AddressHostname(self):
610        """
611        If you pass an IPv4 address to L{sslverify.optionsForClientTLS}
612        L{_hostnameIsDnsName} will be False
613        """
614        options = sslverify.optionsForClientTLS("127.0.0.1")
615        self.assertFalse(options._hostnameIsDnsName)
616
617    def test_IPv6AddressHostname(self):
618        """
619        If you pass an IPv6 address to L{sslverify.optionsForClientTLS}
620        L{_hostnameIsDnsName} will be False
621        """
622        options = sslverify.optionsForClientTLS("::1")
623        self.assertFalse(options._hostnameIsDnsName)
624
625
626class FakeChooseDiffieHellmanEllipticCurve:
627    """
628    A fake implementation of L{_ChooseDiffieHellmanEllipticCurve}
629    """
630
631    def __init__(self, versionNumber, openSSLlib, openSSLcrypto):
632        """
633        A no-op constructor.
634        """
635
636    def configureECDHCurve(self, ctx):
637        """
638        A null configuration.
639
640        @param ctx: An L{OpenSSL.SSL.Context} that would be
641            configured.
642        """
643
644
645class OpenSSLOptionsTestsMixin:
646    """
647    A mixin for L{OpenSSLOptions} test cases creates client and server
648    certificates, signs them with a CA, and provides a L{loopback}
649    that creates TLS a connections with them.
650    """
651
652    if skipSSL:
653        skip = skipSSL
654
655    serverPort = clientConn = None
656    onServerLost = onClientLost = None
657
658    def setUp(self):
659        """
660        Create class variables of client and server certificates.
661        """
662        self.sKey, self.sCert = makeCertificate(
663            O=b"Server Test Certificate", CN=b"server"
664        )
665        self.cKey, self.cCert = makeCertificate(
666            O=b"Client Test Certificate", CN=b"client"
667        )
668        self.caCert1 = makeCertificate(O=b"CA Test Certificate 1", CN=b"ca1")[1]
669        self.caCert2 = makeCertificate(O=b"CA Test Certificate", CN=b"ca2")[1]
670        self.caCerts = [self.caCert1, self.caCert2]
671        self.extraCertChain = self.caCerts
672
673    def tearDown(self):
674        if self.serverPort is not None:
675            self.serverPort.stopListening()
676        if self.clientConn is not None:
677            self.clientConn.disconnect()
678
679        L = []
680        if self.onServerLost is not None:
681            L.append(self.onServerLost)
682        if self.onClientLost is not None:
683            L.append(self.onClientLost)
684
685        return defer.DeferredList(L, consumeErrors=True)
686
687    def loopback(
688        self,
689        serverCertOpts,
690        clientCertOpts,
691        onServerLost=None,
692        onClientLost=None,
693        onData=None,
694    ):
695        if onServerLost is None:
696            self.onServerLost = onServerLost = defer.Deferred()
697        if onClientLost is None:
698            self.onClientLost = onClientLost = defer.Deferred()
699        if onData is None:
700            onData = defer.Deferred()
701
702        serverFactory = protocol.ServerFactory()
703        serverFactory.protocol = DataCallbackProtocol
704        serverFactory.onLost = onServerLost
705        serverFactory.onData = onData
706
707        clientFactory = protocol.ClientFactory()
708        clientFactory.protocol = WritingProtocol
709        clientFactory.onLost = onClientLost
710
711        self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
712        self.clientConn = reactor.connectSSL(
713            "127.0.0.1", self.serverPort.getHost().port, clientFactory, clientCertOpts
714        )
715
716
717class OpenSSLOptionsTests(OpenSSLOptionsTestsMixin, TestCase):
718    """
719    Tests for L{sslverify.OpenSSLOptions}.
720    """
721
722    def setUp(self):
723        """
724        Same as L{OpenSSLOptionsTestsMixin.setUp}, but it also patches
725        L{sslverify._ChooseDiffieHellmanEllipticCurve}.
726        """
727        super().setUp()
728        self.patch(
729            sslverify,
730            "_ChooseDiffieHellmanEllipticCurve",
731            FakeChooseDiffieHellmanEllipticCurve,
732        )
733
734    def test_constructorWithOnlyPrivateKey(self):
735        """
736        C{privateKey} and C{certificate} make only sense if both are set.
737        """
738        self.assertRaises(
739            ValueError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey
740        )
741
742    def test_constructorWithOnlyCertificate(self):
743        """
744        C{privateKey} and C{certificate} make only sense if both are set.
745        """
746        self.assertRaises(
747            ValueError, sslverify.OpenSSLCertificateOptions, certificate=self.sCert
748        )
749
750    def test_constructorWithCertificateAndPrivateKey(self):
751        """
752        Specifying C{privateKey} and C{certificate} initializes correctly.
753        """
754        opts = sslverify.OpenSSLCertificateOptions(
755            privateKey=self.sKey, certificate=self.sCert
756        )
757        self.assertEqual(opts.privateKey, self.sKey)
758        self.assertEqual(opts.certificate, self.sCert)
759        self.assertEqual(opts.extraCertChain, [])
760
761    def test_constructorDoesNotAllowVerifyWithoutCACerts(self):
762        """
763        C{verify} must not be C{True} without specifying C{caCerts}.
764        """
765        self.assertRaises(
766            ValueError,
767            sslverify.OpenSSLCertificateOptions,
768            privateKey=self.sKey,
769            certificate=self.sCert,
770            verify=True,
771        )
772
773    def test_constructorDoesNotAllowLegacyWithTrustRoot(self):
774        """
775        C{verify}, C{requireCertificate}, and C{caCerts} must not be specified
776        by the caller (to be I{any} value, even the default!) when specifying
777        C{trustRoot}.
778        """
779        self.assertRaises(
780            TypeError,
781            sslverify.OpenSSLCertificateOptions,
782            privateKey=self.sKey,
783            certificate=self.sCert,
784            verify=True,
785            trustRoot=None,
786            caCerts=self.caCerts,
787        )
788        self.assertRaises(
789            TypeError,
790            sslverify.OpenSSLCertificateOptions,
791            privateKey=self.sKey,
792            certificate=self.sCert,
793            trustRoot=None,
794            requireCertificate=True,
795        )
796
797    def test_constructorAllowsCACertsWithoutVerify(self):
798        """
799        It's currently a NOP, but valid.
800        """
801        opts = sslverify.OpenSSLCertificateOptions(
802            privateKey=self.sKey, certificate=self.sCert, caCerts=self.caCerts
803        )
804        self.assertFalse(opts.verify)
805        self.assertEqual(self.caCerts, opts.caCerts)
806
807    def test_constructorWithVerifyAndCACerts(self):
808        """
809        Specifying C{verify} and C{caCerts} initializes correctly.
810        """
811        opts = sslverify.OpenSSLCertificateOptions(
812            privateKey=self.sKey,
813            certificate=self.sCert,
814            verify=True,
815            caCerts=self.caCerts,
816        )
817        self.assertTrue(opts.verify)
818        self.assertEqual(self.caCerts, opts.caCerts)
819
820    def test_constructorSetsExtraChain(self):
821        """
822        Setting C{extraCertChain} works if C{certificate} and C{privateKey} are
823        set along with it.
824        """
825        opts = sslverify.OpenSSLCertificateOptions(
826            privateKey=self.sKey,
827            certificate=self.sCert,
828            extraCertChain=self.extraCertChain,
829        )
830        self.assertEqual(self.extraCertChain, opts.extraCertChain)
831
832    def test_constructorDoesNotAllowExtraChainWithoutPrivateKey(self):
833        """
834        A C{extraCertChain} without C{privateKey} doesn't make sense and is
835        thus rejected.
836        """
837        self.assertRaises(
838            ValueError,
839            sslverify.OpenSSLCertificateOptions,
840            certificate=self.sCert,
841            extraCertChain=self.extraCertChain,
842        )
843
844    def test_constructorDoesNotAllowExtraChainWithOutPrivateKey(self):
845        """
846        A C{extraCertChain} without C{certificate} doesn't make sense and is
847        thus rejected.
848        """
849        self.assertRaises(
850            ValueError,
851            sslverify.OpenSSLCertificateOptions,
852            privateKey=self.sKey,
853            extraCertChain=self.extraCertChain,
854        )
855
856    def test_extraChainFilesAreAddedIfSupplied(self):
857        """
858        If C{extraCertChain} is set and all prerequisites are met, the
859        specified chain certificates are added to C{Context}s that get
860        created.
861        """
862        opts = sslverify.OpenSSLCertificateOptions(
863            privateKey=self.sKey,
864            certificate=self.sCert,
865            extraCertChain=self.extraCertChain,
866        )
867        opts._contextFactory = FakeContext
868        ctx = opts.getContext()
869        self.assertEqual(self.sKey, ctx._privateKey)
870        self.assertEqual(self.sCert, ctx._certificate)
871        self.assertEqual(self.extraCertChain, ctx._extraCertChain)
872
873    def test_extraChainDoesNotBreakPyOpenSSL(self):
874        """
875        C{extraCertChain} doesn't break C{OpenSSL.SSL.Context} creation.
876        """
877        opts = sslverify.OpenSSLCertificateOptions(
878            privateKey=self.sKey,
879            certificate=self.sCert,
880            extraCertChain=self.extraCertChain,
881        )
882        ctx = opts.getContext()
883        self.assertIsInstance(ctx, SSL.Context)
884
885    def test_acceptableCiphersAreAlwaysSet(self):
886        """
887        If the user doesn't supply custom acceptable ciphers, a shipped secure
888        default is used.  We can't check directly for it because the effective
889        cipher string we set varies with platforms.
890        """
891        opts = sslverify.OpenSSLCertificateOptions(
892            privateKey=self.sKey,
893            certificate=self.sCert,
894        )
895        opts._contextFactory = FakeContext
896        ctx = opts.getContext()
897        self.assertEqual(opts._cipherString.encode("ascii"), ctx._cipherList)
898
899    def test_givesMeaningfulErrorMessageIfNoCipherMatches(self):
900        """
901        If there is no valid cipher that matches the user's wishes,
902        a L{ValueError} is raised.
903        """
904        self.assertRaises(
905            ValueError,
906            sslverify.OpenSSLCertificateOptions,
907            privateKey=self.sKey,
908            certificate=self.sCert,
909            acceptableCiphers=sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString(
910                ""
911            ),
912        )
913
914    def test_honorsAcceptableCiphersArgument(self):
915        """
916        If acceptable ciphers are passed, they are used.
917        """
918
919        @implementer(interfaces.IAcceptableCiphers)
920        class FakeAcceptableCiphers:
921            def selectCiphers(self, _):
922                return [sslverify.OpenSSLCipher("sentinel")]
923
924        opts = sslverify.OpenSSLCertificateOptions(
925            privateKey=self.sKey,
926            certificate=self.sCert,
927            acceptableCiphers=FakeAcceptableCiphers(),
928        )
929        opts._contextFactory = FakeContext
930        ctx = opts.getContext()
931        self.assertEqual(b"sentinel", ctx._cipherList)
932
933    def test_basicSecurityOptionsAreSet(self):
934        """
935        Every context must have C{OP_NO_SSLv2}, C{OP_NO_COMPRESSION}, and
936        C{OP_CIPHER_SERVER_PREFERENCE} set.
937        """
938        opts = sslverify.OpenSSLCertificateOptions(
939            privateKey=self.sKey,
940            certificate=self.sCert,
941        )
942        opts._contextFactory = FakeContext
943        ctx = opts.getContext()
944        options = (
945            SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE
946        )
947        self.assertEqual(options, ctx._options & options)
948
949    def test_modeIsSet(self):
950        """
951        Every context must be in C{MODE_RELEASE_BUFFERS} mode.
952        """
953        opts = sslverify.OpenSSLCertificateOptions(
954            privateKey=self.sKey,
955            certificate=self.sCert,
956        )
957        opts._contextFactory = FakeContext
958        ctx = opts.getContext()
959        self.assertEqual(SSL.MODE_RELEASE_BUFFERS, ctx._mode)
960
961    def test_singleUseKeys(self):
962        """
963        If C{singleUseKeys} is set, every context must have
964        C{OP_SINGLE_DH_USE} and C{OP_SINGLE_ECDH_USE} set.
965        """
966        opts = sslverify.OpenSSLCertificateOptions(
967            privateKey=self.sKey,
968            certificate=self.sCert,
969            enableSingleUseKeys=True,
970        )
971        opts._contextFactory = FakeContext
972        ctx = opts.getContext()
973        options = SSL.OP_SINGLE_DH_USE | SSL.OP_SINGLE_ECDH_USE
974        self.assertEqual(options, ctx._options & options)
975
976    def test_methodIsDeprecated(self):
977        """
978        Passing C{method} to L{sslverify.OpenSSLCertificateOptions} is
979        deprecated.
980        """
981        sslverify.OpenSSLCertificateOptions(
982            privateKey=self.sKey,
983            certificate=self.sCert,
984            method=SSL.SSLv23_METHOD,
985        )
986
987        message = (
988            "Passing method to twisted.internet.ssl.CertificateOptions "
989            "was deprecated in Twisted 17.1.0. Please use a "
990            "combination of insecurelyLowerMinimumTo, raiseMinimumTo, "
991            "and lowerMaximumSecurityTo instead, as Twisted will "
992            "correctly configure the method."
993        )
994
995        warnings = self.flushWarnings([self.test_methodIsDeprecated])
996        self.assertEqual(1, len(warnings))
997        self.assertEqual(DeprecationWarning, warnings[0]["category"])
998        self.assertEqual(message, warnings[0]["message"])
999
1000    def test_tlsv1ByDefault(self):
1001        """
1002        L{sslverify.OpenSSLCertificateOptions} will make the default minimum
1003        TLS version v1.0, if no C{method}, or C{insecurelyLowerMinimumTo} is
1004        given.
1005        """
1006        opts = sslverify.OpenSSLCertificateOptions(
1007            privateKey=self.sKey, certificate=self.sCert
1008        )
1009        opts._contextFactory = FakeContext
1010        ctx = opts.getContext()
1011        options = (
1012            SSL.OP_NO_SSLv2
1013            | SSL.OP_NO_COMPRESSION
1014            | SSL.OP_CIPHER_SERVER_PREFERENCE
1015            | SSL.OP_NO_SSLv3
1016        )
1017        self.assertEqual(options, ctx._options & options)
1018
1019    def test_tlsProtocolsAtLeastWithMinimum(self):
1020        """
1021        Passing C{insecurelyLowerMinimumTo} along with C{raiseMinimumTo} to
1022        L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
1023        exception.
1024        """
1025        with self.assertRaises(TypeError) as e:
1026            sslverify.OpenSSLCertificateOptions(
1027                privateKey=self.sKey,
1028                certificate=self.sCert,
1029                raiseMinimumTo=sslverify.TLSVersion.TLSv1_2,
1030                insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
1031            )
1032
1033        self.assertIn("raiseMinimumTo", e.exception.args[0])
1034        self.assertIn("insecurelyLowerMinimumTo", e.exception.args[0])
1035        self.assertIn("exclusive", e.exception.args[0])
1036
1037    def test_tlsProtocolsNoMethodWithAtLeast(self):
1038        """
1039        Passing C{raiseMinimumTo} along with C{method} to
1040        L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
1041        exception.
1042        """
1043        with self.assertRaises(TypeError) as e:
1044            sslverify.OpenSSLCertificateOptions(
1045                privateKey=self.sKey,
1046                certificate=self.sCert,
1047                method=SSL.SSLv23_METHOD,
1048                raiseMinimumTo=sslverify.TLSVersion.TLSv1_2,
1049            )
1050
1051        self.assertIn("method", e.exception.args[0])
1052        self.assertIn("raiseMinimumTo", e.exception.args[0])
1053        self.assertIn("exclusive", e.exception.args[0])
1054
1055    def test_tlsProtocolsNoMethodWithMinimum(self):
1056        """
1057        Passing C{insecurelyLowerMinimumTo} along with C{method} to
1058        L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
1059        exception.
1060        """
1061        with self.assertRaises(TypeError) as e:
1062            sslverify.OpenSSLCertificateOptions(
1063                privateKey=self.sKey,
1064                certificate=self.sCert,
1065                method=SSL.SSLv23_METHOD,
1066                insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
1067            )
1068
1069        self.assertIn("method", e.exception.args[0])
1070        self.assertIn("insecurelyLowerMinimumTo", e.exception.args[0])
1071        self.assertIn("exclusive", e.exception.args[0])
1072
1073    def test_tlsProtocolsNoMethodWithMaximum(self):
1074        """
1075        Passing C{lowerMaximumSecurityTo} along with C{method} to
1076        L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
1077        exception.
1078        """
1079        with self.assertRaises(TypeError) as e:
1080            sslverify.OpenSSLCertificateOptions(
1081                privateKey=self.sKey,
1082                certificate=self.sCert,
1083                method=SSL.SSLv23_METHOD,
1084                lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
1085            )
1086
1087        self.assertIn("method", e.exception.args[0])
1088        self.assertIn("lowerMaximumSecurityTo", e.exception.args[0])
1089        self.assertIn("exclusive", e.exception.args[0])
1090
1091    def test_tlsVersionRangeInOrder(self):
1092        """
1093        Passing out of order TLS versions to C{insecurelyLowerMinimumTo} and
1094        C{lowerMaximumSecurityTo} will cause it to raise an exception.
1095        """
1096        with self.assertRaises(ValueError) as e:
1097            sslverify.OpenSSLCertificateOptions(
1098                privateKey=self.sKey,
1099                certificate=self.sCert,
1100                insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
1101                lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
1102            )
1103
1104        self.assertEqual(
1105            e.exception.args,
1106            (
1107                (
1108                    "insecurelyLowerMinimumTo needs to be lower than "
1109                    "lowerMaximumSecurityTo"
1110                ),
1111            ),
1112        )
1113
1114    def test_tlsVersionRangeInOrderAtLeast(self):
1115        """
1116        Passing out of order TLS versions to C{raiseMinimumTo} and
1117        C{lowerMaximumSecurityTo} will cause it to raise an exception.
1118        """
1119        with self.assertRaises(ValueError) as e:
1120            sslverify.OpenSSLCertificateOptions(
1121                privateKey=self.sKey,
1122                certificate=self.sCert,
1123                raiseMinimumTo=sslverify.TLSVersion.TLSv1_0,
1124                lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
1125            )
1126
1127        self.assertEqual(
1128            e.exception.args,
1129            (("raiseMinimumTo needs to be lower than " "lowerMaximumSecurityTo"),),
1130        )
1131
1132    def test_tlsProtocolsreduceToMaxWithoutMin(self):
1133        """
1134        When calling L{sslverify.OpenSSLCertificateOptions} with
1135        C{lowerMaximumSecurityTo} but no C{raiseMinimumTo} or
1136        C{insecurelyLowerMinimumTo} set, and C{lowerMaximumSecurityTo} is
1137        below the minimum default, the minimum will be made the new maximum.
1138        """
1139        opts = sslverify.OpenSSLCertificateOptions(
1140            privateKey=self.sKey,
1141            certificate=self.sCert,
1142            lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
1143        )
1144        opts._contextFactory = FakeContext
1145        ctx = opts.getContext()
1146        options = (
1147            SSL.OP_NO_SSLv2
1148            | SSL.OP_NO_COMPRESSION
1149            | SSL.OP_CIPHER_SERVER_PREFERENCE
1150            | SSL.OP_NO_TLSv1
1151            | SSL.OP_NO_TLSv1_1
1152            | SSL.OP_NO_TLSv1_2
1153            | opts._OP_NO_TLSv1_3
1154        )
1155        self.assertEqual(options, ctx._options & options)
1156
1157    def test_tlsProtocolsSSLv3Only(self):
1158        """
1159        When calling L{sslverify.OpenSSLCertificateOptions} with
1160        C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to
1161        SSLv3, it will exclude all others.
1162        """
1163        opts = sslverify.OpenSSLCertificateOptions(
1164            privateKey=self.sKey,
1165            certificate=self.sCert,
1166            insecurelyLowerMinimumTo=sslverify.TLSVersion.SSLv3,
1167            lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
1168        )
1169        opts._contextFactory = FakeContext
1170        ctx = opts.getContext()
1171        options = (
1172            SSL.OP_NO_SSLv2
1173            | SSL.OP_NO_COMPRESSION
1174            | SSL.OP_CIPHER_SERVER_PREFERENCE
1175            | SSL.OP_NO_TLSv1
1176            | SSL.OP_NO_TLSv1_1
1177            | SSL.OP_NO_TLSv1_2
1178            | opts._OP_NO_TLSv1_3
1179        )
1180        self.assertEqual(options, ctx._options & options)
1181
1182    def test_tlsProtocolsTLSv1Point0Only(self):
1183        """
1184        When calling L{sslverify.OpenSSLCertificateOptions} with
1185        C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.0,
1186        it will exclude all others.
1187        """
1188        opts = sslverify.OpenSSLCertificateOptions(
1189            privateKey=self.sKey,
1190            certificate=self.sCert,
1191            insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
1192            lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_0,
1193        )
1194        opts._contextFactory = FakeContext
1195        ctx = opts.getContext()
1196        options = (
1197            SSL.OP_NO_SSLv2
1198            | SSL.OP_NO_COMPRESSION
1199            | SSL.OP_CIPHER_SERVER_PREFERENCE
1200            | SSL.OP_NO_SSLv3
1201            | SSL.OP_NO_TLSv1_1
1202            | SSL.OP_NO_TLSv1_2
1203            | opts._OP_NO_TLSv1_3
1204        )
1205        self.assertEqual(options, ctx._options & options)
1206
1207    def test_tlsProtocolsTLSv1Point1Only(self):
1208        """
1209        When calling L{sslverify.OpenSSLCertificateOptions} with
1210        C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.1,
1211        it will exclude all others.
1212        """
1213        opts = sslverify.OpenSSLCertificateOptions(
1214            privateKey=self.sKey,
1215            certificate=self.sCert,
1216            insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_1,
1217            lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_1,
1218        )
1219        opts._contextFactory = FakeContext
1220        ctx = opts.getContext()
1221        options = (
1222            SSL.OP_NO_SSLv2
1223            | SSL.OP_NO_COMPRESSION
1224            | SSL.OP_CIPHER_SERVER_PREFERENCE
1225            | SSL.OP_NO_SSLv3
1226            | SSL.OP_NO_TLSv1
1227            | SSL.OP_NO_TLSv1_2
1228            | opts._OP_NO_TLSv1_3
1229        )
1230        self.assertEqual(options, ctx._options & options)
1231
1232    def test_tlsProtocolsTLSv1Point2Only(self):
1233        """
1234        When calling L{sslverify.OpenSSLCertificateOptions} with
1235        C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.2,
1236        it will exclude all others.
1237        """
1238        opts = sslverify.OpenSSLCertificateOptions(
1239            privateKey=self.sKey,
1240            certificate=self.sCert,
1241            insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
1242            lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
1243        )
1244        opts._contextFactory = FakeContext
1245        ctx = opts.getContext()
1246        options = (
1247            SSL.OP_NO_SSLv2
1248            | SSL.OP_NO_COMPRESSION
1249            | SSL.OP_CIPHER_SERVER_PREFERENCE
1250            | SSL.OP_NO_SSLv3
1251            | SSL.OP_NO_TLSv1
1252            | SSL.OP_NO_TLSv1_1
1253            | opts._OP_NO_TLSv1_3
1254        )
1255        self.assertEqual(options, ctx._options & options)
1256
1257    def test_tlsProtocolsAllModernTLS(self):
1258        """
1259        When calling L{sslverify.OpenSSLCertificateOptions} with
1260        C{insecurelyLowerMinimumTo} set to TLSv1.0 and
1261        C{lowerMaximumSecurityTo} to TLSv1.2, it will exclude both SSLs and
1262        the (unreleased) TLSv1.3.
1263        """
1264        opts = sslverify.OpenSSLCertificateOptions(
1265            privateKey=self.sKey,
1266            certificate=self.sCert,
1267            insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
1268            lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
1269        )
1270        opts._contextFactory = FakeContext
1271        ctx = opts.getContext()
1272        options = (
1273            SSL.OP_NO_SSLv2
1274            | SSL.OP_NO_COMPRESSION
1275            | SSL.OP_CIPHER_SERVER_PREFERENCE
1276            | SSL.OP_NO_SSLv3
1277            | opts._OP_NO_TLSv1_3
1278        )
1279        self.assertEqual(options, ctx._options & options)
1280
1281    def test_tlsProtocolsAtLeastAllSecureTLS(self):
1282        """
1283        When calling L{sslverify.OpenSSLCertificateOptions} with
1284        C{raiseMinimumTo} set to TLSv1.2, it will ignore all TLSs below
1285        1.2 and SSL.
1286        """
1287        opts = sslverify.OpenSSLCertificateOptions(
1288            privateKey=self.sKey,
1289            certificate=self.sCert,
1290            raiseMinimumTo=sslverify.TLSVersion.TLSv1_2,
1291        )
1292        opts._contextFactory = FakeContext
1293        ctx = opts.getContext()
1294        options = (
1295            SSL.OP_NO_SSLv2
1296            | SSL.OP_NO_COMPRESSION
1297            | SSL.OP_CIPHER_SERVER_PREFERENCE
1298            | SSL.OP_NO_SSLv3
1299            | SSL.OP_NO_TLSv1
1300            | SSL.OP_NO_TLSv1_1
1301        )
1302        self.assertEqual(options, ctx._options & options)
1303
1304    def test_tlsProtocolsAtLeastWillAcceptHigherDefault(self):
1305        """
1306        When calling L{sslverify.OpenSSLCertificateOptions} with
1307        C{raiseMinimumTo} set to a value lower than Twisted's default will
1308        cause it to use the more secure default.
1309        """
1310        opts = sslverify.OpenSSLCertificateOptions(
1311            privateKey=self.sKey,
1312            certificate=self.sCert,
1313            raiseMinimumTo=sslverify.TLSVersion.SSLv3,
1314        )
1315        opts._contextFactory = FakeContext
1316        ctx = opts.getContext()
1317        # Future maintainer warning: this will break if we change our default
1318        # up, so you should change it to add the relevant OP_NO flags when we
1319        # do make that change and this test fails.
1320        options = (
1321            SSL.OP_NO_SSLv2
1322            | SSL.OP_NO_COMPRESSION
1323            | SSL.OP_CIPHER_SERVER_PREFERENCE
1324            | SSL.OP_NO_SSLv3
1325        )
1326        self.assertEqual(options, ctx._options & options)
1327        self.assertEqual(opts._defaultMinimumTLSVersion, sslverify.TLSVersion.TLSv1_0)
1328
1329    def test_tlsProtocolsAllSecureTLS(self):
1330        """
1331        When calling L{sslverify.OpenSSLCertificateOptions} with
1332        C{insecurelyLowerMinimumTo} set to TLSv1.2, it will ignore all TLSs below
1333        1.2 and SSL.
1334        """
1335        opts = sslverify.OpenSSLCertificateOptions(
1336            privateKey=self.sKey,
1337            certificate=self.sCert,
1338            insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
1339        )
1340        opts._contextFactory = FakeContext
1341        ctx = opts.getContext()
1342        options = (
1343            SSL.OP_NO_SSLv2
1344            | SSL.OP_NO_COMPRESSION
1345            | SSL.OP_CIPHER_SERVER_PREFERENCE
1346            | SSL.OP_NO_SSLv3
1347            | SSL.OP_NO_TLSv1
1348            | SSL.OP_NO_TLSv1_1
1349        )
1350        self.assertEqual(options, ctx._options & options)
1351
1352    def test_dhParams(self):
1353        """
1354        If C{dhParams} is set, they are loaded into each new context.
1355        """
1356
1357        class FakeDiffieHellmanParameters:
1358            _dhFile = FilePath(b"dh.params")
1359
1360        dhParams = FakeDiffieHellmanParameters()
1361        opts = sslverify.OpenSSLCertificateOptions(
1362            privateKey=self.sKey,
1363            certificate=self.sCert,
1364            dhParameters=dhParams,
1365        )
1366        opts._contextFactory = FakeContext
1367        ctx = opts.getContext()
1368        self.assertEqual(FakeDiffieHellmanParameters._dhFile.path, ctx._dhFilename)
1369
1370    def test_abbreviatingDistinguishedNames(self):
1371        """
1372        Check that abbreviations used in certificates correctly map to
1373        complete names.
1374        """
1375        self.assertEqual(
1376            sslverify.DN(CN=b"a", OU=b"hello"),
1377            sslverify.DistinguishedName(
1378                commonName=b"a", organizationalUnitName=b"hello"
1379            ),
1380        )
1381        self.assertNotEqual(
1382            sslverify.DN(CN=b"a", OU=b"hello"),
1383            sslverify.DN(CN=b"a", OU=b"hello", emailAddress=b"xxx"),
1384        )
1385        dn = sslverify.DN(CN=b"abcdefg")
1386        self.assertRaises(AttributeError, setattr, dn, "Cn", b"x")
1387        self.assertEqual(dn.CN, dn.commonName)
1388        dn.CN = b"bcdefga"
1389        self.assertEqual(dn.CN, dn.commonName)
1390
1391    def testInspectDistinguishedName(self):
1392        n = sslverify.DN(
1393            commonName=b"common name",
1394            organizationName=b"organization name",
1395            organizationalUnitName=b"organizational unit name",
1396            localityName=b"locality name",
1397            stateOrProvinceName=b"state or province name",
1398            countryName=b"country name",
1399            emailAddress=b"email address",
1400        )
1401        s = n.inspect()
1402        for k in [
1403            "common name",
1404            "organization name",
1405            "organizational unit name",
1406            "locality name",
1407            "state or province name",
1408            "country name",
1409            "email address",
1410        ]:
1411            self.assertIn(k, s, f"{k!r} was not in inspect output.")
1412            self.assertIn(k.title(), s, f"{k!r} was not in inspect output.")
1413
1414    def testInspectDistinguishedNameWithoutAllFields(self):
1415        n = sslverify.DN(localityName=b"locality name")
1416        s = n.inspect()
1417        for k in [
1418            "common name",
1419            "organization name",
1420            "organizational unit name",
1421            "state or province name",
1422            "country name",
1423            "email address",
1424        ]:
1425            self.assertNotIn(k, s, f"{k!r} was in inspect output.")
1426            self.assertNotIn(k.title(), s, f"{k!r} was in inspect output.")
1427        self.assertIn("locality name", s)
1428        self.assertIn("Locality Name", s)
1429
1430    def test_inspectCertificate(self):
1431        """
1432        Test that the C{inspect} method of L{sslverify.Certificate} returns
1433        a human-readable string containing some basic information about the
1434        certificate.
1435        """
1436        c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
1437        pk = c.getPublicKey()
1438        keyHash = pk.keyHash()
1439        # Maintenance Note: the algorithm used to compute the "public key hash"
1440        # is highly dubious and can differ between underlying versions of
1441        # OpenSSL (and across versions of Twisted), since it is not actually
1442        # the hash of the public key by itself.  If we can get the appropriate
1443        # APIs to get the hash of the key itself out of OpenSSL, then we should
1444        # be able to make it statically declared inline below again rather than
1445        # computing it here.
1446        self.assertEqual(
1447            c.inspect().split("\n"),
1448            [
1449                "Certificate For Subject:",
1450                "               Common Name: example.twistedmatrix.com",
1451                "              Country Name: US",
1452                "             Email Address: nobody@twistedmatrix.com",
1453                "             Locality Name: Boston",
1454                "         Organization Name: Twisted Matrix Labs",
1455                "  Organizational Unit Name: Security",
1456                "    State Or Province Name: Massachusetts",
1457                "",
1458                "Issuer:",
1459                "               Common Name: example.twistedmatrix.com",
1460                "              Country Name: US",
1461                "             Email Address: nobody@twistedmatrix.com",
1462                "             Locality Name: Boston",
1463                "         Organization Name: Twisted Matrix Labs",
1464                "  Organizational Unit Name: Security",
1465                "    State Or Province Name: Massachusetts",
1466                "",
1467                "Serial Number: 12345",
1468                "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18",
1469                "Public Key with Hash: " + keyHash,
1470            ],
1471        )
1472
1473    def test_publicKeyMatching(self):
1474        """
1475        L{PublicKey.matches} returns L{True} for keys from certificates with
1476        the same key, and L{False} for keys from certificates with different
1477        keys.
1478        """
1479        hostA = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
1480        hostB = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
1481        peerA = sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM)
1482
1483        self.assertTrue(hostA.getPublicKey().matches(hostB.getPublicKey()))
1484        self.assertFalse(peerA.getPublicKey().matches(hostA.getPublicKey()))
1485
1486    def test_enablingAndDisablingSessions(self):
1487        """
1488        The enableSessions argument sets the session cache mode; it defaults to
1489        False (at least until https://twistedmatrix.com/trac/ticket/9764 can be
1490        resolved).
1491        """
1492        options = sslverify.OpenSSLCertificateOptions()
1493        self.assertEqual(options.enableSessions, False)
1494        ctx = options.getContext()
1495        self.assertEqual(ctx.get_session_cache_mode(), SSL.SESS_CACHE_OFF)
1496        options = sslverify.OpenSSLCertificateOptions(enableSessions=True)
1497        self.assertEqual(options.enableSessions, True)
1498        ctx = options.getContext()
1499        self.assertEqual(ctx.get_session_cache_mode(), SSL.SESS_CACHE_SERVER)
1500
1501    def test_certificateOptionsSerialization(self):
1502        """
1503        Test that __setstate__(__getstate__()) round-trips properly.
1504        """
1505        firstOpts = sslverify.OpenSSLCertificateOptions(
1506            privateKey=self.sKey,
1507            certificate=self.sCert,
1508            method=SSL.SSLv23_METHOD,
1509            verify=True,
1510            caCerts=[self.sCert],
1511            verifyDepth=2,
1512            requireCertificate=False,
1513            verifyOnce=False,
1514            enableSingleUseKeys=False,
1515            enableSessions=False,
1516            fixBrokenPeers=True,
1517            enableSessionTickets=True,
1518        )
1519        context = firstOpts.getContext()
1520        self.assertIs(context, firstOpts._context)
1521        self.assertIsNotNone(context)
1522        state = firstOpts.__getstate__()
1523        self.assertNotIn("_context", state)
1524
1525        opts = sslverify.OpenSSLCertificateOptions()
1526        opts.__setstate__(state)
1527        self.assertEqual(opts.privateKey, self.sKey)
1528        self.assertEqual(opts.certificate, self.sCert)
1529        self.assertEqual(opts.method, SSL.SSLv23_METHOD)
1530        self.assertTrue(opts.verify)
1531        self.assertEqual(opts.caCerts, [self.sCert])
1532        self.assertEqual(opts.verifyDepth, 2)
1533        self.assertFalse(opts.requireCertificate)
1534        self.assertFalse(opts.verifyOnce)
1535        self.assertFalse(opts.enableSingleUseKeys)
1536        self.assertFalse(opts.enableSessions)
1537        self.assertTrue(opts.fixBrokenPeers)
1538        self.assertTrue(opts.enableSessionTickets)
1539
1540    test_certificateOptionsSerialization.suppress = [  # type: ignore[attr-defined]
1541        util.suppress(
1542            category=DeprecationWarning,
1543            message=r"twisted\.internet\._sslverify\.*__[gs]etstate__",
1544        )
1545    ]
1546
1547    def test_certificateOptionsSessionTickets(self):
1548        """
1549        Enabling session tickets should not set the OP_NO_TICKET option.
1550        """
1551        opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=True)
1552        ctx = opts.getContext()
1553        self.assertEqual(0, ctx.set_options(0) & 0x00004000)
1554
1555    def test_certificateOptionsSessionTicketsDisabled(self):
1556        """
1557        Enabling session tickets should set the OP_NO_TICKET option.
1558        """
1559        opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=False)
1560        ctx = opts.getContext()
1561        self.assertEqual(0x00004000, ctx.set_options(0) & 0x00004000)
1562
1563    def test_allowedAnonymousClientConnection(self):
1564        """
1565        Check that anonymous connections are allowed when certificates aren't
1566        required on the server.
1567        """
1568        onData = defer.Deferred()
1569        self.loopback(
1570            sslverify.OpenSSLCertificateOptions(
1571                privateKey=self.sKey, certificate=self.sCert, requireCertificate=False
1572            ),
1573            sslverify.OpenSSLCertificateOptions(requireCertificate=False),
1574            onData=onData,
1575        )
1576
1577        return onData.addCallback(
1578            lambda result: self.assertEqual(result, WritingProtocol.byte)
1579        )
1580
1581    def test_refusedAnonymousClientConnection(self):
1582        """
1583        Check that anonymous connections are refused when certificates are
1584        required on the server.
1585        """
1586        onServerLost = defer.Deferred()
1587        onClientLost = defer.Deferred()
1588        self.loopback(
1589            sslverify.OpenSSLCertificateOptions(
1590                privateKey=self.sKey,
1591                certificate=self.sCert,
1592                verify=True,
1593                caCerts=[self.sCert],
1594                requireCertificate=True,
1595            ),
1596            sslverify.OpenSSLCertificateOptions(requireCertificate=False),
1597            onServerLost=onServerLost,
1598            onClientLost=onClientLost,
1599        )
1600
1601        d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True)
1602
1603        def afterLost(result):
1604            ((cSuccess, cResult), (sSuccess, sResult)) = result
1605            self.assertFalse(cSuccess)
1606            self.assertFalse(sSuccess)
1607            # Win32 fails to report the SSL Error, and report a connection lost
1608            # instead: there is a race condition so that's not totally
1609            # surprising (see ticket #2877 in the tracker)
1610            self.assertIsInstance(cResult.value, (SSL.Error, ConnectionLost))
1611            self.assertIsInstance(sResult.value, SSL.Error)
1612
1613        return d.addCallback(afterLost)
1614
1615    def test_failedCertificateVerification(self):
1616        """
1617        Check that connecting with a certificate not accepted by the server CA
1618        fails.
1619        """
1620        onServerLost = defer.Deferred()
1621        onClientLost = defer.Deferred()
1622        self.loopback(
1623            sslverify.OpenSSLCertificateOptions(
1624                privateKey=self.sKey,
1625                certificate=self.sCert,
1626                verify=False,
1627                requireCertificate=False,
1628            ),
1629            sslverify.OpenSSLCertificateOptions(
1630                verify=True, requireCertificate=False, caCerts=[self.cCert]
1631            ),
1632            onServerLost=onServerLost,
1633            onClientLost=onClientLost,
1634        )
1635
1636        d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True)
1637
1638        def afterLost(result):
1639            ((cSuccess, cResult), (sSuccess, sResult)) = result
1640            self.assertFalse(cSuccess)
1641            self.assertFalse(sSuccess)
1642
1643        return d.addCallback(afterLost)
1644
1645    def test_successfulCertificateVerification(self):
1646        """
1647        Test a successful connection with client certificate validation on
1648        server side.
1649        """
1650        onData = defer.Deferred()
1651        self.loopback(
1652            sslverify.OpenSSLCertificateOptions(
1653                privateKey=self.sKey,
1654                certificate=self.sCert,
1655                verify=False,
1656                requireCertificate=False,
1657            ),
1658            sslverify.OpenSSLCertificateOptions(
1659                verify=True, requireCertificate=True, caCerts=[self.sCert]
1660            ),
1661            onData=onData,
1662        )
1663
1664        return onData.addCallback(
1665            lambda result: self.assertEqual(result, WritingProtocol.byte)
1666        )
1667
1668    def test_successfulSymmetricSelfSignedCertificateVerification(self):
1669        """
1670        Test a successful connection with validation on both server and client
1671        sides.
1672        """
1673        onData = defer.Deferred()
1674        self.loopback(
1675            sslverify.OpenSSLCertificateOptions(
1676                privateKey=self.sKey,
1677                certificate=self.sCert,
1678                verify=True,
1679                requireCertificate=True,
1680                caCerts=[self.cCert],
1681            ),
1682            sslverify.OpenSSLCertificateOptions(
1683                privateKey=self.cKey,
1684                certificate=self.cCert,
1685                verify=True,
1686                requireCertificate=True,
1687                caCerts=[self.sCert],
1688            ),
1689            onData=onData,
1690        )
1691
1692        return onData.addCallback(
1693            lambda result: self.assertEqual(result, WritingProtocol.byte)
1694        )
1695
1696    def test_verification(self):
1697        """
1698        Check certificates verification building custom certificates data.
1699        """
1700        clientDN = sslverify.DistinguishedName(commonName="client")
1701        clientKey = sslverify.KeyPair.generate()
1702        clientCertReq = clientKey.certificateRequest(clientDN)
1703
1704        serverDN = sslverify.DistinguishedName(commonName="server")
1705        serverKey = sslverify.KeyPair.generate()
1706        serverCertReq = serverKey.certificateRequest(serverDN)
1707
1708        clientSelfCertReq = clientKey.certificateRequest(clientDN)
1709        clientSelfCertData = clientKey.signCertificateRequest(
1710            clientDN, clientSelfCertReq, lambda dn: True, 132
1711        )
1712        clientSelfCert = clientKey.newCertificate(clientSelfCertData)
1713
1714        serverSelfCertReq = serverKey.certificateRequest(serverDN)
1715        serverSelfCertData = serverKey.signCertificateRequest(
1716            serverDN, serverSelfCertReq, lambda dn: True, 516
1717        )
1718        serverSelfCert = serverKey.newCertificate(serverSelfCertData)
1719
1720        clientCertData = serverKey.signCertificateRequest(
1721            serverDN, clientCertReq, lambda dn: True, 7
1722        )
1723        clientCert = clientKey.newCertificate(clientCertData)
1724
1725        serverCertData = clientKey.signCertificateRequest(
1726            clientDN, serverCertReq, lambda dn: True, 42
1727        )
1728        serverCert = serverKey.newCertificate(serverCertData)
1729
1730        onData = defer.Deferred()
1731
1732        serverOpts = serverCert.options(serverSelfCert)
1733        clientOpts = clientCert.options(clientSelfCert)
1734
1735        self.loopback(serverOpts, clientOpts, onData=onData)
1736
1737        return onData.addCallback(
1738            lambda result: self.assertEqual(result, WritingProtocol.byte)
1739        )
1740
1741
1742class OpenSSLOptionsECDHIntegrationTests(OpenSSLOptionsTestsMixin, TestCase):
1743    """
1744    ECDH-related integration tests for L{OpenSSLOptions}.
1745    """
1746
1747    def test_ellipticCurveDiffieHellman(self):
1748        """
1749        Connections use ECDH when OpenSSL supports it.
1750        """
1751        if not get_elliptic_curves():
1752            raise SkipTest("OpenSSL does not support ECDH.")
1753
1754        onData = defer.Deferred()
1755        # TLS 1.3 cipher suites do not specify the key exchange
1756        # mechanism:
1757        # https://wiki.openssl.org/index.php/TLS1.3#Differences_with_TLS1.2_and_below
1758        #
1759        # and OpenSSL only supports ECHDE groups with TLS 1.3:
1760        # https://wiki.openssl.org/index.php/TLS1.3#Groups
1761        #
1762        # so TLS 1.3 implies ECDHE.  Force this test to use TLS 1.3 to
1763        # ensure ECDH is selected when it might not be.
1764        self.loopback(
1765            sslverify.OpenSSLCertificateOptions(
1766                privateKey=self.sKey,
1767                certificate=self.sCert,
1768                requireCertificate=False,
1769                lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_3,
1770            ),
1771            sslverify.OpenSSLCertificateOptions(
1772                requireCertificate=False,
1773                lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_3,
1774            ),
1775            onData=onData,
1776        )
1777
1778        @onData.addCallback
1779        def assertECDH(_):
1780            self.assertEqual(len(self.clientConn.factory.protocols), 1)
1781            [clientProtocol] = self.clientConn.factory.protocols
1782            cipher = clientProtocol.getHandle().get_cipher_name()
1783            self.assertIn("ECDH", cipher)
1784
1785        return onData
1786
1787
1788class DeprecationTests(SynchronousTestCase):
1789    """
1790    Tests for deprecation of L{sslverify.OpenSSLCertificateOptions}'s support
1791    of the pickle protocol.
1792    """
1793
1794    if skipSSL:
1795        skip = skipSSL
1796
1797    def test_getstateDeprecation(self):
1798        """
1799        L{sslverify.OpenSSLCertificateOptions.__getstate__} is deprecated.
1800        """
1801        self.callDeprecated(
1802            (Version("Twisted", 15, 0, 0), "a real persistence system"),
1803            sslverify.OpenSSLCertificateOptions().__getstate__,
1804        )
1805
1806    def test_setstateDeprecation(self):
1807        """
1808        L{sslverify.OpenSSLCertificateOptions.__setstate__} is deprecated.
1809        """
1810        self.callDeprecated(
1811            (Version("Twisted", 15, 0, 0), "a real persistence system"),
1812            sslverify.OpenSSLCertificateOptions().__setstate__,
1813            {},
1814        )
1815
1816
1817class TrustRootTests(TestCase):
1818    """
1819    Tests for L{sslverify.OpenSSLCertificateOptions}' C{trustRoot} argument,
1820    L{sslverify.platformTrust}, and their interactions.
1821    """
1822
1823    if skipSSL:
1824        skip = skipSSL
1825
1826    def setUp(self):
1827        """
1828        Patch L{sslverify._ChooseDiffieHellmanEllipticCurve}.
1829        """
1830        self.patch(
1831            sslverify,
1832            "_ChooseDiffieHellmanEllipticCurve",
1833            FakeChooseDiffieHellmanEllipticCurve,
1834        )
1835
1836    def test_caCertsPlatformDefaults(self):
1837        """
1838        Specifying a C{trustRoot} of L{sslverify.OpenSSLDefaultPaths} when
1839        initializing L{sslverify.OpenSSLCertificateOptions} loads the
1840        platform-provided trusted certificates via C{set_default_verify_paths}.
1841        """
1842        opts = sslverify.OpenSSLCertificateOptions(
1843            trustRoot=sslverify.OpenSSLDefaultPaths(),
1844        )
1845        fc = FakeContext(SSL.TLSv1_METHOD)
1846        opts._contextFactory = lambda method: fc
1847        opts.getContext()
1848        self.assertTrue(fc._defaultVerifyPathsSet)
1849
1850    def test_trustRootPlatformRejectsUntrustedCA(self):
1851        """
1852        Specifying a C{trustRoot} of L{platformTrust} when initializing
1853        L{sslverify.OpenSSLCertificateOptions} causes certificates issued by a
1854        newly created CA to be rejected by an SSL connection using these
1855        options.
1856
1857        Note that this test should I{always} pass, even on platforms where the
1858        CA certificates are not installed, as long as L{platformTrust} rejects
1859        completely invalid / unknown root CA certificates.  This is simply a
1860        smoke test to make sure that verification is happening at all.
1861        """
1862        caSelfCert, serverCert = certificatesForAuthorityAndServer()
1863        chainedCert = pathContainingDumpOf(self, serverCert, caSelfCert)
1864        privateKey = pathContainingDumpOf(self, serverCert.privateKey)
1865
1866        sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnection(
1867            trustRoot=platformTrust(),
1868            privateKeyFile=privateKey,
1869            chainedCertFile=chainedCert,
1870        )
1871        # No data was received.
1872        self.assertEqual(cWrapped.data, b"")
1873
1874        # It was an L{SSL.Error}.
1875        self.assertEqual(cWrapped.lostReason.type, SSL.Error)
1876
1877        # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors.
1878        err = cWrapped.lostReason.value
1879        self.assertEqual(err.args[0][0][2], "tlsv1 alert unknown ca")
1880
1881    def test_trustRootSpecificCertificate(self):
1882        """
1883        Specifying a L{Certificate} object for L{trustRoot} will result in that
1884        certificate being the only trust root for a client.
1885        """
1886        caCert, serverCert = certificatesForAuthorityAndServer()
1887        otherCa, otherServer = certificatesForAuthorityAndServer()
1888        sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnection(
1889            trustRoot=caCert,
1890            privateKeyFile=pathContainingDumpOf(self, serverCert.privateKey),
1891            chainedCertFile=pathContainingDumpOf(self, serverCert),
1892        )
1893        pump.flush()
1894        self.assertIsNone(cWrapped.lostReason)
1895        self.assertEqual(cWrapped.data, sWrapped.greeting)
1896
1897
1898class ServiceIdentityTests(SynchronousTestCase):
1899    """
1900    Tests for the verification of the peer's service's identity via the
1901    C{hostname} argument to L{sslverify.OpenSSLCertificateOptions}.
1902    """
1903
1904    if skipSSL:
1905        skip = skipSSL
1906
1907    def serviceIdentitySetup(
1908        self,
1909        clientHostname,
1910        serverHostname,
1911        serverContextSetup=lambda ctx: None,
1912        validCertificate=True,
1913        clientPresentsCertificate=False,
1914        validClientCertificate=True,
1915        serverVerifies=False,
1916        buggyInfoCallback=False,
1917        fakePlatformTrust=False,
1918        useDefaultTrust=False,
1919    ):
1920        """
1921        Connect a server and a client.
1922
1923        @param clientHostname: The I{client's idea} of the server's hostname;
1924            passed as the C{hostname} to the
1925            L{sslverify.OpenSSLCertificateOptions} instance.
1926        @type clientHostname: L{unicode}
1927
1928        @param serverHostname: The I{server's own idea} of the server's
1929            hostname; present in the certificate presented by the server.
1930        @type serverHostname: L{unicode}
1931
1932        @param serverContextSetup: a 1-argument callable invoked with the
1933            L{OpenSSL.SSL.Context} after it's produced.
1934        @type serverContextSetup: L{callable} taking L{OpenSSL.SSL.Context}
1935            returning L{None}.
1936
1937        @param validCertificate: Is the server's certificate valid?  L{True} if
1938            so, L{False} otherwise.
1939        @type validCertificate: L{bool}
1940
1941        @param clientPresentsCertificate: Should the client present a
1942            certificate to the server?  Defaults to 'no'.
1943        @type clientPresentsCertificate: L{bool}
1944
1945        @param validClientCertificate: If the client presents a certificate,
1946            should it actually be a valid one, i.e. signed by the same CA that
1947            the server is checking?  Defaults to 'yes'.
1948        @type validClientCertificate: L{bool}
1949
1950        @param serverVerifies: Should the server verify the client's
1951            certificate?  Defaults to 'no'.
1952        @type serverVerifies: L{bool}
1953
1954        @param buggyInfoCallback: Should we patch the implementation so that
1955            the C{info_callback} passed to OpenSSL to have a bug and raise an
1956            exception (L{ZeroDivisionError})?  Defaults to 'no'.
1957        @type buggyInfoCallback: L{bool}
1958
1959        @param fakePlatformTrust: Should we fake the platformTrust to be the
1960            same as our fake server certificate authority, so that we can test
1961            it's being used?  Defaults to 'no' and we just pass platform trust.
1962        @type fakePlatformTrust: L{bool}
1963
1964        @param useDefaultTrust: Should we avoid passing the C{trustRoot} to
1965            L{ssl.optionsForClientTLS}?  Defaults to 'no'.
1966        @type useDefaultTrust: L{bool}
1967
1968        @return: the client TLS protocol, the client wrapped protocol,
1969            the server TLS protocol, the server wrapped protocol and
1970            an L{IOPump} which, when its C{pump} and C{flush} methods are
1971            called, will move data between the created client and server
1972            protocol instances
1973        @rtype: 5-L{tuple} of 4 L{IProtocol}s and L{IOPump}
1974        """
1975        serverCA, serverCert = certificatesForAuthorityAndServer(serverHostname)
1976        other = {}
1977        passClientCert = None
1978        clientCA, clientCert = certificatesForAuthorityAndServer("client")
1979        if serverVerifies:
1980            other.update(trustRoot=clientCA)
1981
1982        if clientPresentsCertificate:
1983            if validClientCertificate:
1984                passClientCert = clientCert
1985            else:
1986                bogusCA, bogus = certificatesForAuthorityAndServer("client")
1987                passClientCert = bogus
1988
1989        serverOpts = sslverify.OpenSSLCertificateOptions(
1990            privateKey=serverCert.privateKey.original,
1991            certificate=serverCert.original,
1992            **other,
1993        )
1994        serverContextSetup(serverOpts.getContext())
1995        if not validCertificate:
1996            serverCA, otherServer = certificatesForAuthorityAndServer(serverHostname)
1997        if buggyInfoCallback:
1998
1999            def broken(*a, **k):
2000                """
2001                Raise an exception.
2002
2003                @param a: Arguments for an C{info_callback}
2004
2005                @param k: Keyword arguments for an C{info_callback}
2006                """
2007                1 / 0
2008
2009            self.patch(
2010                sslverify.ClientTLSOptions,
2011                "_identityVerifyingInfoCallback",
2012                broken,
2013            )
2014
2015        signature = {"hostname": clientHostname}
2016        if passClientCert:
2017            signature.update(clientCertificate=passClientCert)
2018        if not useDefaultTrust:
2019            signature.update(trustRoot=serverCA)
2020        if fakePlatformTrust:
2021            self.patch(sslverify, "platformTrust", lambda: serverCA)
2022
2023        clientOpts = sslverify.optionsForClientTLS(**signature)
2024
2025        class GreetingServer(protocol.Protocol):
2026            greeting = b"greetings!"
2027            lostReason = None
2028            data = b""
2029
2030            def connectionMade(self):
2031                self.transport.write(self.greeting)
2032
2033            def dataReceived(self, data):
2034                self.data += data
2035
2036            def connectionLost(self, reason):
2037                self.lostReason = reason
2038
2039        class GreetingClient(protocol.Protocol):
2040            greeting = b"cheerio!"
2041            data = b""
2042            lostReason = None
2043
2044            def connectionMade(self):
2045                self.transport.write(self.greeting)
2046
2047            def dataReceived(self, data):
2048                self.data += data
2049
2050            def connectionLost(self, reason):
2051                self.lostReason = reason
2052
2053        serverWrappedProto = GreetingServer()
2054        clientWrappedProto = GreetingClient()
2055
2056        clientFactory = protocol.Factory()
2057        clientFactory.protocol = lambda: clientWrappedProto
2058        serverFactory = protocol.Factory()
2059        serverFactory.protocol = lambda: serverWrappedProto
2060
2061        self.serverOpts = serverOpts
2062        self.clientOpts = clientOpts
2063
2064        clientTLSFactory = TLSMemoryBIOFactory(
2065            clientOpts, isClient=True, wrappedFactory=clientFactory
2066        )
2067        serverTLSFactory = TLSMemoryBIOFactory(
2068            serverOpts, isClient=False, wrappedFactory=serverFactory
2069        )
2070
2071        cProto, sProto, pump = connectedServerAndClient(
2072            lambda: serverTLSFactory.buildProtocol(None),
2073            lambda: clientTLSFactory.buildProtocol(None),
2074        )
2075        return cProto, sProto, clientWrappedProto, serverWrappedProto, pump
2076
2077    def test_invalidHostname(self):
2078        """
2079        When a certificate containing an invalid hostname is received from the
2080        server, the connection is immediately dropped.
2081        """
2082        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2083            "wrong-host.example.com",
2084            "correct-host.example.com",
2085        )
2086        self.assertEqual(cWrapped.data, b"")
2087        self.assertEqual(sWrapped.data, b"")
2088
2089        cErr = cWrapped.lostReason.value
2090        sErr = sWrapped.lostReason.value
2091
2092        self.assertIsInstance(cErr, VerificationError)
2093        self.assertIsInstance(sErr, ConnectionClosed)
2094
2095    def test_validHostname(self):
2096        """
2097        Whenever a valid certificate containing a valid hostname is received,
2098        connection proceeds normally.
2099        """
2100        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2101            "valid.example.com",
2102            "valid.example.com",
2103        )
2104        self.assertEqual(cWrapped.data, b"greetings!")
2105
2106        cErr = cWrapped.lostReason
2107        sErr = sWrapped.lostReason
2108        self.assertIsNone(cErr)
2109        self.assertIsNone(sErr)
2110
2111    def test_validHostnameInvalidCertificate(self):
2112        """
2113        When an invalid certificate containing a perfectly valid hostname is
2114        received, the connection is aborted with an OpenSSL error.
2115        """
2116        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2117            "valid.example.com",
2118            "valid.example.com",
2119            validCertificate=False,
2120        )
2121
2122        self.assertEqual(cWrapped.data, b"")
2123        self.assertEqual(sWrapped.data, b"")
2124
2125        cErr = cWrapped.lostReason.value
2126        sErr = sWrapped.lostReason.value
2127
2128        self.assertIsInstance(cErr, SSL.Error)
2129        self.assertIsInstance(sErr, SSL.Error)
2130
2131    def test_realCAsBetterNotSignOurBogusTestCerts(self):
2132        """
2133        If we use the default trust from the platform, our dinky certificate
2134        should I{really} fail.
2135        """
2136        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2137            "valid.example.com",
2138            "valid.example.com",
2139            validCertificate=False,
2140            useDefaultTrust=True,
2141        )
2142
2143        self.assertEqual(cWrapped.data, b"")
2144        self.assertEqual(sWrapped.data, b"")
2145
2146        cErr = cWrapped.lostReason.value
2147        sErr = sWrapped.lostReason.value
2148
2149        self.assertIsInstance(cErr, SSL.Error)
2150        self.assertIsInstance(sErr, SSL.Error)
2151
2152    def test_butIfTheyDidItWouldWork(self):
2153        """
2154        L{ssl.optionsForClientTLS} should be using L{ssl.platformTrust} by
2155        default, so if we fake that out then it should trust ourselves again.
2156        """
2157        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2158            "valid.example.com",
2159            "valid.example.com",
2160            useDefaultTrust=True,
2161            fakePlatformTrust=True,
2162        )
2163        self.assertEqual(cWrapped.data, b"greetings!")
2164
2165        cErr = cWrapped.lostReason
2166        sErr = sWrapped.lostReason
2167        self.assertIsNone(cErr)
2168        self.assertIsNone(sErr)
2169
2170    def test_clientPresentsCertificate(self):
2171        """
2172        When the server verifies and the client presents a valid certificate
2173        for that verification by passing it to
2174        L{sslverify.optionsForClientTLS}, communication proceeds.
2175        """
2176        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2177            "valid.example.com",
2178            "valid.example.com",
2179            validCertificate=True,
2180            serverVerifies=True,
2181            clientPresentsCertificate=True,
2182        )
2183
2184        self.assertEqual(cWrapped.data, b"greetings!")
2185
2186        cErr = cWrapped.lostReason
2187        sErr = sWrapped.lostReason
2188        self.assertIsNone(cErr)
2189        self.assertIsNone(sErr)
2190
2191    def test_clientPresentsBadCertificate(self):
2192        """
2193        When the server verifies and the client presents an invalid certificate
2194        for that verification by passing it to
2195        L{sslverify.optionsForClientTLS}, the connection cannot be established
2196        with an SSL error.
2197        """
2198        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2199            "valid.example.com",
2200            "valid.example.com",
2201            validCertificate=True,
2202            serverVerifies=True,
2203            validClientCertificate=False,
2204            clientPresentsCertificate=True,
2205        )
2206
2207        self.assertEqual(cWrapped.data, b"")
2208
2209        cErr = cWrapped.lostReason.value
2210        sErr = sWrapped.lostReason.value
2211
2212        self.assertIsInstance(cErr, SSL.Error)
2213        self.assertIsInstance(sErr, SSL.Error)
2214
2215    @skipIf(skipSNI, skipSNI)
2216    def test_hostnameIsIndicated(self):
2217        """
2218        Specifying the C{hostname} argument to L{CertificateOptions} also sets
2219        the U{Server Name Extension
2220        <https://en.wikipedia.org/wiki/Server_Name_Indication>} TLS indication
2221        field to the correct value.
2222        """
2223        names = []
2224
2225        def setupServerContext(ctx):
2226            def servername_received(conn):
2227                names.append(conn.get_servername().decode("ascii"))
2228
2229            ctx.set_tlsext_servername_callback(servername_received)
2230
2231        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2232            "valid.example.com", "valid.example.com", setupServerContext
2233        )
2234        self.assertEqual(names, ["valid.example.com"])
2235
2236    @skipIf(skipSNI, skipSNI)
2237    def test_hostnameEncoding(self):
2238        """
2239        Hostnames are encoded as IDNA.
2240        """
2241        names = []
2242        hello = "h\N{LATIN SMALL LETTER A WITH ACUTE}llo.example.com"
2243
2244        def setupServerContext(ctx):
2245            def servername_received(conn):
2246                serverIDNA = _idnaText(conn.get_servername())
2247                names.append(serverIDNA)
2248
2249            ctx.set_tlsext_servername_callback(servername_received)
2250
2251        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2252            hello, hello, setupServerContext
2253        )
2254        self.assertEqual(names, [hello])
2255        self.assertEqual(cWrapped.data, b"greetings!")
2256
2257        cErr = cWrapped.lostReason
2258        sErr = sWrapped.lostReason
2259        self.assertIsNone(cErr)
2260        self.assertIsNone(sErr)
2261
2262    def test_fallback(self):
2263        """
2264        L{sslverify.simpleVerifyHostname} checks string equality on the
2265        commonName of a connection's certificate's subject, doing nothing if it
2266        matches and raising L{VerificationError} if it doesn't.
2267        """
2268        name = "something.example.com"
2269
2270        class Connection:
2271            def get_peer_certificate(self):
2272                """
2273                Fake of L{OpenSSL.SSL.Connection.get_peer_certificate}.
2274
2275                @return: A certificate with a known common name.
2276                @rtype: L{OpenSSL.crypto.X509}
2277                """
2278                cert = X509()
2279                cert.get_subject().commonName = name
2280                return cert
2281
2282        conn = Connection()
2283        self.assertIs(
2284            sslverify.simpleVerifyHostname(conn, "something.example.com"), None
2285        )
2286        self.assertRaises(
2287            sslverify.SimpleVerificationError,
2288            sslverify.simpleVerifyHostname,
2289            conn,
2290            "nonsense",
2291        )
2292
2293    def test_surpriseFromInfoCallback(self):
2294        """
2295        pyOpenSSL isn't always so great about reporting errors.  If one occurs
2296        in the verification info callback, it should be logged and the
2297        connection should be shut down (if possible, anyway; the app_data could
2298        be clobbered but there's no point testing for that).
2299        """
2300        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
2301            "correct-host.example.com",
2302            "correct-host.example.com",
2303            buggyInfoCallback=True,
2304        )
2305
2306        self.assertEqual(cWrapped.data, b"")
2307        self.assertEqual(sWrapped.data, b"")
2308
2309        cErr = cWrapped.lostReason.value
2310        sErr = sWrapped.lostReason.value
2311
2312        self.assertIsInstance(cErr, ZeroDivisionError)
2313        self.assertIsInstance(sErr, (ConnectionClosed, SSL.Error))
2314        errors = self.flushLoggedErrors(ZeroDivisionError)
2315        self.assertTrue(errors)
2316
2317
2318def negotiateProtocol(serverProtocols, clientProtocols, clientOptions=None):
2319    """
2320    Create the TLS connection and negotiate a next protocol.
2321
2322    @param serverProtocols: The protocols the server is willing to negotiate.
2323    @param clientProtocols: The protocols the client is willing to negotiate.
2324    @param clientOptions: The type of C{OpenSSLCertificateOptions} class to
2325        use for the client. Defaults to C{OpenSSLCertificateOptions}.
2326    @return: A L{tuple} of the negotiated protocol and the reason the
2327        connection was lost.
2328    """
2329    caCertificate, serverCertificate = certificatesForAuthorityAndServer()
2330    trustRoot = sslverify.OpenSSLCertificateAuthorities(
2331        [
2332            caCertificate.original,
2333        ]
2334    )
2335
2336    sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnectionInMemory(
2337        trustRoot=trustRoot,
2338        privateKey=serverCertificate.privateKey.original,
2339        serverCertificate=serverCertificate.original,
2340        clientProtocols=clientProtocols,
2341        serverProtocols=serverProtocols,
2342        clientOptions=clientOptions,
2343    )
2344    pump.flush()
2345
2346    return (cProto.negotiatedProtocol, cWrapped.lostReason)
2347
2348
2349class NPNOrALPNTests(TestCase):
2350    """
2351    NPN and ALPN protocol selection.
2352
2353    These tests only run on platforms that have a PyOpenSSL version >= 0.15,
2354    and OpenSSL version 1.0.1 or later.
2355    """
2356
2357    if skipSSL:
2358        skip = skipSSL
2359    elif skipNPN:
2360        skip = skipNPN
2361
2362    def test_nextProtocolMechanismsNPNIsSupported(self):
2363        """
2364        When at least NPN is available on the platform, NPN is in the set of
2365        supported negotiation protocols.
2366        """
2367        supportedProtocols = sslverify.protocolNegotiationMechanisms()
2368        self.assertTrue(sslverify.ProtocolNegotiationSupport.NPN in supportedProtocols)
2369
2370    def test_NPNAndALPNSuccess(self):
2371        """
2372        When both ALPN and NPN are used, and both the client and server have
2373        overlapping protocol choices, a protocol is successfully negotiated.
2374        Further, the negotiated protocol is the first one in the list.
2375        """
2376        protocols = [b"h2", b"http/1.1"]
2377        negotiatedProtocol, lostReason = negotiateProtocol(
2378            clientProtocols=protocols,
2379            serverProtocols=protocols,
2380        )
2381        self.assertEqual(negotiatedProtocol, b"h2")
2382        self.assertIsNone(lostReason)
2383
2384    def test_NPNAndALPNDifferent(self):
2385        """
2386        Client and server have different protocol lists: only the common
2387        element is chosen.
2388        """
2389        serverProtocols = [b"h2", b"http/1.1", b"spdy/2"]
2390        clientProtocols = [b"spdy/3", b"http/1.1"]
2391        negotiatedProtocol, lostReason = negotiateProtocol(
2392            clientProtocols=clientProtocols,
2393            serverProtocols=serverProtocols,
2394        )
2395        self.assertEqual(negotiatedProtocol, b"http/1.1")
2396        self.assertIsNone(lostReason)
2397
2398    def test_NPNAndALPNNoAdvertise(self):
2399        """
2400        When one peer does not advertise any protocols, the connection is set
2401        up with no next protocol.
2402        """
2403        protocols = [b"h2", b"http/1.1"]
2404        negotiatedProtocol, lostReason = negotiateProtocol(
2405            clientProtocols=protocols,
2406            serverProtocols=[],
2407        )
2408        self.assertIsNone(negotiatedProtocol)
2409        self.assertIsNone(lostReason)
2410
2411    def test_NPNAndALPNNoOverlap(self):
2412        """
2413        When the client and server have no overlap of protocols, the connection
2414        fails.
2415        """
2416        clientProtocols = [b"h2", b"http/1.1"]
2417        serverProtocols = [b"spdy/3"]
2418        negotiatedProtocol, lostReason = negotiateProtocol(
2419            serverProtocols=clientProtocols,
2420            clientProtocols=serverProtocols,
2421        )
2422        self.assertIsNone(negotiatedProtocol)
2423        self.assertEqual(lostReason.type, SSL.Error)
2424
2425
2426class ALPNTests(TestCase):
2427    """
2428    ALPN protocol selection.
2429
2430    These tests only run on platforms that have a PyOpenSSL version >= 0.15,
2431    and OpenSSL version 1.0.2 or later.
2432
2433    This covers only the ALPN specific logic, as any platform that has ALPN
2434    will also have NPN and so will run the NPNAndALPNTest suite as well.
2435    """
2436
2437    if skipSSL:
2438        skip = skipSSL
2439    elif skipALPN:
2440        skip = skipALPN
2441
2442    def test_nextProtocolMechanismsALPNIsSupported(self):
2443        """
2444        When ALPN is available on a platform, protocolNegotiationMechanisms
2445        includes ALPN in the suported protocols.
2446        """
2447        supportedProtocols = sslverify.protocolNegotiationMechanisms()
2448        self.assertTrue(sslverify.ProtocolNegotiationSupport.ALPN in supportedProtocols)
2449
2450
2451class NPNAndALPNAbsentTests(TestCase):
2452    """
2453    NPN/ALPN operations fail on platforms that do not support them.
2454
2455    These tests only run on platforms that have a PyOpenSSL version < 0.15,
2456    an OpenSSL version earlier than 1.0.1, or an OpenSSL/cryptography built
2457    without NPN support.
2458    """
2459
2460    if skipSSL:
2461        skip = skipSSL
2462    elif not skipNPN or not skipALPN:
2463        skip = "NPN and/or ALPN is present on this platform"
2464
2465    def test_nextProtocolMechanismsNoNegotiationSupported(self):
2466        """
2467        When neither NPN or ALPN are available on a platform, there are no
2468        supported negotiation protocols.
2469        """
2470        supportedProtocols = sslverify.protocolNegotiationMechanisms()
2471        self.assertFalse(supportedProtocols)
2472
2473    def test_NPNAndALPNNotImplemented(self):
2474        """
2475        A NotImplementedError is raised when using acceptableProtocols on a
2476        platform that does not support either NPN or ALPN.
2477        """
2478        protocols = [b"h2", b"http/1.1"]
2479        self.assertRaises(
2480            NotImplementedError,
2481            negotiateProtocol,
2482            serverProtocols=protocols,
2483            clientProtocols=protocols,
2484        )
2485
2486    def test_NegotiatedProtocolReturnsNone(self):
2487        """
2488        negotiatedProtocol return L{None} even when NPN/ALPN aren't supported.
2489        This works because, as neither are supported, negotiation isn't even
2490        attempted.
2491        """
2492        serverProtocols = None
2493        clientProtocols = None
2494        negotiatedProtocol, lostReason = negotiateProtocol(
2495            clientProtocols=clientProtocols,
2496            serverProtocols=serverProtocols,
2497        )
2498        self.assertIsNone(negotiatedProtocol)
2499        self.assertIsNone(lostReason)
2500
2501
2502class _NotSSLTransport:
2503    def getHandle(self):
2504        return self
2505
2506
2507class _MaybeSSLTransport:
2508    def getHandle(self):
2509        return self
2510
2511    def get_peer_certificate(self):
2512        return None
2513
2514    def get_host_certificate(self):
2515        return None
2516
2517
2518class _ActualSSLTransport:
2519    def getHandle(self):
2520        return self
2521
2522    def get_host_certificate(self):
2523        return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original
2524
2525    def get_peer_certificate(self):
2526        return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original
2527
2528
2529class ConstructorsTests(TestCase):
2530    if skipSSL:
2531        skip = skipSSL
2532
2533    def test_peerFromNonSSLTransport(self):
2534        """
2535        Verify that peerFromTransport raises an exception if the transport
2536        passed is not actually an SSL transport.
2537        """
2538        x = self.assertRaises(
2539            CertificateError,
2540            sslverify.Certificate.peerFromTransport,
2541            _NotSSLTransport(),
2542        )
2543        self.assertTrue(str(x).startswith("non-TLS"))
2544
2545    def test_peerFromBlankSSLTransport(self):
2546        """
2547        Verify that peerFromTransport raises an exception if the transport
2548        passed is an SSL transport, but doesn't have a peer certificate.
2549        """
2550        x = self.assertRaises(
2551            CertificateError,
2552            sslverify.Certificate.peerFromTransport,
2553            _MaybeSSLTransport(),
2554        )
2555        self.assertTrue(str(x).startswith("TLS"))
2556
2557    def test_hostFromNonSSLTransport(self):
2558        """
2559        Verify that hostFromTransport raises an exception if the transport
2560        passed is not actually an SSL transport.
2561        """
2562        x = self.assertRaises(
2563            CertificateError,
2564            sslverify.Certificate.hostFromTransport,
2565            _NotSSLTransport(),
2566        )
2567        self.assertTrue(str(x).startswith("non-TLS"))
2568
2569    def test_hostFromBlankSSLTransport(self):
2570        """
2571        Verify that hostFromTransport raises an exception if the transport
2572        passed is an SSL transport, but doesn't have a host certificate.
2573        """
2574        x = self.assertRaises(
2575            CertificateError,
2576            sslverify.Certificate.hostFromTransport,
2577            _MaybeSSLTransport(),
2578        )
2579        self.assertTrue(str(x).startswith("TLS"))
2580
2581    def test_hostFromSSLTransport(self):
2582        """
2583        Verify that hostFromTransport successfully creates the correct
2584        certificate if passed a valid SSL transport.
2585        """
2586        self.assertEqual(
2587            sslverify.Certificate.hostFromTransport(
2588                _ActualSSLTransport()
2589            ).serialNumber(),
2590            12345,
2591        )
2592
2593    def test_peerFromSSLTransport(self):
2594        """
2595        Verify that peerFromTransport successfully creates the correct
2596        certificate if passed a valid SSL transport.
2597        """
2598        self.assertEqual(
2599            sslverify.Certificate.peerFromTransport(
2600                _ActualSSLTransport()
2601            ).serialNumber(),
2602            12346,
2603        )
2604
2605
2606class MultipleCertificateTrustRootTests(TestCase):
2607    """
2608    Test the behavior of the trustRootFromCertificates() API call.
2609    """
2610
2611    if skipSSL:
2612        skip = skipSSL
2613
2614    def test_trustRootFromCertificatesPrivatePublic(self):
2615        """
2616        L{trustRootFromCertificates} accepts either a L{sslverify.Certificate}
2617        or a L{sslverify.PrivateCertificate} instance.
2618        """
2619        privateCert = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR)
2620        cert = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
2621
2622        mt = sslverify.trustRootFromCertificates([privateCert, cert])
2623
2624        # Verify that the returned object acts correctly when used as a
2625        # trustRoot= param to optionsForClientTLS.
2626        sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory(
2627            trustRoot=mt,
2628            privateKey=privateCert.privateKey.original,
2629            serverCertificate=privateCert.original,
2630        )
2631
2632        # This connection should succeed
2633        self.assertEqual(cWrap.data, b"greetings!")
2634        self.assertIsNone(cWrap.lostReason)
2635
2636    def test_trustRootSelfSignedServerCertificate(self):
2637        """
2638        L{trustRootFromCertificates} called with a single self-signed
2639        certificate will cause L{optionsForClientTLS} to accept client
2640        connections to a server with that certificate.
2641        """
2642        key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server")
2643        selfSigned = sslverify.PrivateCertificate.fromCertificateAndKeyPair(
2644            sslverify.Certificate(cert),
2645            sslverify.KeyPair(key),
2646        )
2647
2648        trust = sslverify.trustRootFromCertificates([selfSigned])
2649
2650        # Since we trust this exact certificate, connections to this server
2651        # should succeed.
2652        sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory(
2653            trustRoot=trust,
2654            privateKey=selfSigned.privateKey.original,
2655            serverCertificate=selfSigned.original,
2656        )
2657        self.assertEqual(cWrap.data, b"greetings!")
2658        self.assertIsNone(cWrap.lostReason)
2659
2660    def test_trustRootCertificateAuthorityTrustsConnection(self):
2661        """
2662        L{trustRootFromCertificates} called with certificate A will cause
2663        L{optionsForClientTLS} to accept client connections to a server with
2664        certificate B where B is signed by A.
2665        """
2666        caCert, serverCert = certificatesForAuthorityAndServer()
2667
2668        trust = sslverify.trustRootFromCertificates([caCert])
2669
2670        # Since we've listed the CA's certificate as a trusted cert, a
2671        # connection to the server certificate it signed should succeed.
2672        sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory(
2673            trustRoot=trust,
2674            privateKey=serverCert.privateKey.original,
2675            serverCertificate=serverCert.original,
2676        )
2677        self.assertEqual(cWrap.data, b"greetings!")
2678        self.assertIsNone(cWrap.lostReason)
2679
2680    def test_trustRootFromCertificatesUntrusted(self):
2681        """
2682        L{trustRootFromCertificates} called with certificate A will cause
2683        L{optionsForClientTLS} to disallow any connections to a server with
2684        certificate B where B is not signed by A.
2685        """
2686        key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server")
2687        serverCert = sslverify.PrivateCertificate.fromCertificateAndKeyPair(
2688            sslverify.Certificate(cert),
2689            sslverify.KeyPair(key),
2690        )
2691        untrustedCert = sslverify.Certificate(
2692            makeCertificate(O=b"CA Test Certificate", CN=b"unknown CA")[1]
2693        )
2694
2695        trust = sslverify.trustRootFromCertificates([untrustedCert])
2696
2697        # Since we only trust 'untrustedCert' which has not signed our
2698        # server's cert, we should reject this connection
2699        sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory(
2700            trustRoot=trust,
2701            privateKey=serverCert.privateKey.original,
2702            serverCertificate=serverCert.original,
2703        )
2704
2705        # This connection should fail, so no data was received.
2706        self.assertEqual(cWrap.data, b"")
2707
2708        # It was an L{SSL.Error}.
2709        self.assertEqual(cWrap.lostReason.type, SSL.Error)
2710
2711        # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors.
2712        err = cWrap.lostReason.value
2713        self.assertEqual(err.args[0][0][2], "tlsv1 alert unknown ca")
2714
2715    def test_trustRootFromCertificatesOpenSSLObjects(self):
2716        """
2717        L{trustRootFromCertificates} rejects any L{OpenSSL.crypto.X509}
2718        instances in the list passed to it.
2719        """
2720        private = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR)
2721        certX509 = private.original
2722
2723        exception = self.assertRaises(
2724            TypeError,
2725            sslverify.trustRootFromCertificates,
2726            [certX509],
2727        )
2728        self.assertEqual(
2729            "certificates items must be twisted.internet.ssl.CertBase " "instances",
2730            exception.args[0],
2731        )
2732
2733
2734class OpenSSLCipherTests(TestCase):
2735    """
2736    Tests for twisted.internet._sslverify.OpenSSLCipher.
2737    """
2738
2739    if skipSSL:
2740        skip = skipSSL
2741
2742    cipherName = "CIPHER-STRING"
2743
2744    def test_constructorSetsFullName(self):
2745        """
2746        The first argument passed to the constructor becomes the full name.
2747        """
2748        self.assertEqual(
2749            self.cipherName, sslverify.OpenSSLCipher(self.cipherName).fullName
2750        )
2751
2752    def test_repr(self):
2753        """
2754        C{repr(cipher)} returns a valid constructor call.
2755        """
2756        cipher = sslverify.OpenSSLCipher(self.cipherName)
2757        self.assertEqual(
2758            cipher, eval(repr(cipher), {"OpenSSLCipher": sslverify.OpenSSLCipher})
2759        )
2760
2761    def test_eqSameClass(self):
2762        """
2763        Equal type and C{fullName} means that the objects are equal.
2764        """
2765        cipher1 = sslverify.OpenSSLCipher(self.cipherName)
2766        cipher2 = sslverify.OpenSSLCipher(self.cipherName)
2767        self.assertEqual(cipher1, cipher2)
2768
2769    def test_eqSameNameDifferentType(self):
2770        """
2771        If ciphers have the same name but different types, they're still
2772        different.
2773        """
2774
2775        class DifferentCipher:
2776            fullName = self.cipherName
2777
2778        self.assertNotEqual(
2779            sslverify.OpenSSLCipher(self.cipherName),
2780            DifferentCipher(),
2781        )
2782
2783
2784class ExpandCipherStringTests(TestCase):
2785    """
2786    Tests for twisted.internet._sslverify._expandCipherString.
2787    """
2788
2789    if skipSSL:
2790        skip = skipSSL
2791
2792    def test_doesNotStumbleOverEmptyList(self):
2793        """
2794        If the expanded cipher list is empty, an empty L{list} is returned.
2795        """
2796        self.assertEqual(
2797            tuple(), sslverify._expandCipherString("", SSL.SSLv23_METHOD, 0)
2798        )
2799
2800    def test_doesNotSwallowOtherSSLErrors(self):
2801        """
2802        Only no cipher matches get swallowed, every other SSL error gets
2803        propagated.
2804        """
2805
2806        def raiser(_):
2807            # Unfortunately, there seems to be no way to trigger a real SSL
2808            # error artificially.
2809            raise SSL.Error([["", "", ""]])
2810
2811        ctx = FakeContext(SSL.SSLv23_METHOD)
2812        ctx.set_cipher_list = raiser
2813        self.patch(sslverify.SSL, "Context", lambda _: ctx)
2814        self.assertRaises(
2815            SSL.Error, sslverify._expandCipherString, "ALL", SSL.SSLv23_METHOD, 0
2816        )
2817
2818    def test_returnsTupleOfICiphers(self):
2819        """
2820        L{sslverify._expandCipherString} always returns a L{tuple} of
2821        L{interfaces.ICipher}.
2822        """
2823        ciphers = sslverify._expandCipherString("ALL", SSL.SSLv23_METHOD, 0)
2824        self.assertIsInstance(ciphers, tuple)
2825        bogus = []
2826        for c in ciphers:
2827            if not interfaces.ICipher.providedBy(c):
2828                bogus.append(c)
2829
2830        self.assertEqual([], bogus)
2831
2832
2833class AcceptableCiphersTests(TestCase):
2834    """
2835    Tests for twisted.internet._sslverify.OpenSSLAcceptableCiphers.
2836    """
2837
2838    if skipSSL:
2839        skip = skipSSL
2840
2841    def test_selectOnEmptyListReturnsEmptyList(self):
2842        """
2843        If no ciphers are available, nothing can be selected.
2844        """
2845        ac = sslverify.OpenSSLAcceptableCiphers(tuple())
2846        self.assertEqual(tuple(), ac.selectCiphers(tuple()))
2847
2848    def test_selectReturnsOnlyFromAvailable(self):
2849        """
2850        Select only returns a cross section of what is available and what is
2851        desirable.
2852        """
2853        ac = sslverify.OpenSSLAcceptableCiphers(
2854            [
2855                sslverify.OpenSSLCipher("A"),
2856                sslverify.OpenSSLCipher("B"),
2857            ]
2858        )
2859        self.assertEqual(
2860            (sslverify.OpenSSLCipher("B"),),
2861            ac.selectCiphers(
2862                [sslverify.OpenSSLCipher("B"), sslverify.OpenSSLCipher("C")]
2863            ),
2864        )
2865
2866    def test_fromOpenSSLCipherStringExpandsToTupleOfCiphers(self):
2867        """
2868        If L{sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString} is
2869        called it expands the string to a tuple of ciphers.
2870        """
2871        ac = sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString("ALL")
2872        self.assertIsInstance(ac._ciphers, tuple)
2873        self.assertTrue(all(sslverify.ICipher.providedBy(c) for c in ac._ciphers))
2874
2875
2876class DiffieHellmanParametersTests(TestCase):
2877    """
2878    Tests for twisted.internet._sslverify.OpenSSLDHParameters.
2879    """
2880
2881    if skipSSL:
2882        skip = skipSSL
2883    filePath = FilePath(b"dh.params")
2884
2885    def test_fromFile(self):
2886        """
2887        Calling C{fromFile} with a filename returns an instance with that file
2888        name saved.
2889        """
2890        params = sslverify.OpenSSLDiffieHellmanParameters.fromFile(self.filePath)
2891        self.assertEqual(self.filePath, params._dhFile)
2892
2893
2894class FakeLibState:
2895    """
2896    State for L{FakeLib}
2897
2898    @param setECDHAutoRaises: An exception
2899        L{FakeLib.SSL_CTX_set_ecdh_auto} should raise; if L{None},
2900        nothing is raised.
2901
2902    @ivar ecdhContexts: A list of SSL contexts with which
2903        L{FakeLib.SSL_CTX_set_ecdh_auto} was called
2904    @type ecdhContexts: L{list} of L{OpenSSL.SSL.Context}s
2905
2906    @ivar ecdhValues: A list of boolean values with which
2907        L{FakeLib.SSL_CTX_set_ecdh_auto} was called
2908    @type ecdhValues: L{list} of L{boolean}s
2909    """
2910
2911    __slots__ = ("setECDHAutoRaises", "ecdhContexts", "ecdhValues")
2912
2913    def __init__(self, setECDHAutoRaises):
2914        self.setECDHAutoRaises = setECDHAutoRaises
2915        self.ecdhContexts = []
2916        self.ecdhValues = []
2917
2918
2919class FakeLib:
2920    """
2921    An introspectable fake of cryptography's lib object.
2922
2923    @param state: A L{FakeLibState} instance that contains this fake's
2924        state.
2925    """
2926
2927    def __init__(self, state):
2928        self._state = state
2929
2930    def SSL_CTX_set_ecdh_auto(self, ctx, value):
2931        """
2932        Record the context and value under in the C{_state} instance
2933        variable.
2934
2935        @see: L{FakeLibState}
2936
2937        @param ctx: An SSL context.
2938        @type ctx: L{OpenSSL.SSL.Context}
2939
2940        @param value: A boolean value
2941        @type value: L{bool}
2942        """
2943        self._state.ecdhContexts.append(ctx)
2944        self._state.ecdhValues.append(value)
2945        if self._state.setECDHAutoRaises is not None:
2946            raise self._state.setECDHAutoRaises
2947
2948
2949class FakeLibTests(TestCase):
2950    """
2951    Tests for L{FakeLib}.
2952    """
2953
2954    def test_SSL_CTX_set_ecdh_auto(self):
2955        """
2956        L{FakeLib.SSL_CTX_set_ecdh_auto} records context and value it
2957        was called with.
2958        """
2959        state = FakeLibState(setECDHAutoRaises=None)
2960        lib = FakeLib(state)
2961        self.assertNot(state.ecdhContexts)
2962        self.assertNot(state.ecdhValues)
2963
2964        context, value = "CONTEXT", True
2965        lib.SSL_CTX_set_ecdh_auto(context, value)
2966        self.assertEqual(state.ecdhContexts, [context])
2967        self.assertEqual(state.ecdhValues, [True])
2968
2969    def test_SSL_CTX_set_ecdh_autoRaises(self):
2970        """
2971        L{FakeLib.SSL_CTX_set_ecdh_auto} raises the exception provided
2972        by its state, while still recording its arguments.
2973        """
2974        state = FakeLibState(setECDHAutoRaises=ValueError)
2975        lib = FakeLib(state)
2976        self.assertNot(state.ecdhContexts)
2977        self.assertNot(state.ecdhValues)
2978
2979        context, value = "CONTEXT", True
2980        self.assertRaises(ValueError, lib.SSL_CTX_set_ecdh_auto, context, value)
2981        self.assertEqual(state.ecdhContexts, [context])
2982        self.assertEqual(state.ecdhValues, [True])
2983
2984
2985class FakeCryptoState:
2986    """
2987    State for L{FakeCrypto}
2988
2989    @param getEllipticCurveRaises: What
2990        L{FakeCrypto.get_elliptic_curve} should raise; L{None} and it
2991        won't raise anything
2992
2993    @param getEllipticCurveReturns: What
2994        L{FakeCrypto.get_elliptic_curve} should return.
2995
2996    @ivar getEllipticCurveCalls: The arguments with which
2997        L{FakeCrypto.get_elliptic_curve} has been called.
2998    @type getEllipticCurveCalls: L{list}
2999    """
3000
3001    __slots__ = (
3002        "getEllipticCurveRaises",
3003        "getEllipticCurveReturns",
3004        "getEllipticCurveCalls",
3005    )
3006
3007    def __init__(
3008        self,
3009        getEllipticCurveRaises,
3010        getEllipticCurveReturns,
3011    ):
3012        self.getEllipticCurveRaises = getEllipticCurveRaises
3013        self.getEllipticCurveReturns = getEllipticCurveReturns
3014        self.getEllipticCurveCalls = []
3015
3016
3017class FakeCrypto:
3018    """
3019    An introspectable fake of pyOpenSSL's L{OpenSSL.crypto} module.
3020
3021    @ivar state: A L{FakeCryptoState} instance
3022    """
3023
3024    def __init__(self, state):
3025        self._state = state
3026
3027    def get_elliptic_curve(self, curve):
3028        """
3029        A fake that records the curve with which it was called.
3030
3031        @param curve: see L{crypto.get_elliptic_curve}
3032
3033        @return: see L{FakeCryptoState.getEllipticCurveReturns}
3034        @raises: see L{FakeCryptoState.getEllipticCurveRaises}
3035        """
3036        self._state.getEllipticCurveCalls.append(curve)
3037        if self._state.getEllipticCurveRaises is not None:
3038            raise self._state.getEllipticCurveRaises
3039        return self._state.getEllipticCurveReturns
3040
3041
3042class FakeCryptoTests(SynchronousTestCase):
3043    """
3044    Tests for L{FakeCrypto}.
3045    """
3046
3047    def test_get_elliptic_curveRecordsArgument(self):
3048        """
3049        L{FakeCrypto.test_get_elliptic_curve} records the curve with
3050        which it was called.
3051        """
3052        state = FakeCryptoState(
3053            getEllipticCurveRaises=None,
3054            getEllipticCurveReturns=None,
3055        )
3056        crypto = FakeCrypto(state)
3057        crypto.get_elliptic_curve("a curve name")
3058        self.assertEqual(state.getEllipticCurveCalls, ["a curve name"])
3059
3060    def test_get_elliptic_curveReturns(self):
3061        """
3062        L{FakeCrypto.test_get_elliptic_curve} returns the value
3063        specified by its state object and records what it was called
3064        with.
3065        """
3066        returnValue = "object"
3067        state = FakeCryptoState(
3068            getEllipticCurveRaises=None,
3069            getEllipticCurveReturns=returnValue,
3070        )
3071        crypto = FakeCrypto(state)
3072        self.assertIs(
3073            crypto.get_elliptic_curve("another curve name"),
3074            returnValue,
3075        )
3076        self.assertEqual(state.getEllipticCurveCalls, ["another curve name"])
3077
3078    def test_get_elliptic_curveRaises(self):
3079        """
3080        L{FakeCrypto.test_get_elliptic_curve} raises the exception
3081        specified by its state object.
3082        """
3083        state = FakeCryptoState(
3084            getEllipticCurveRaises=ValueError, getEllipticCurveReturns=None
3085        )
3086        crypto = FakeCrypto(state)
3087        self.assertRaises(
3088            ValueError,
3089            crypto.get_elliptic_curve,
3090            "yet another curve name",
3091        )
3092        self.assertEqual(
3093            state.getEllipticCurveCalls,
3094            ["yet another curve name"],
3095        )
3096
3097
3098class ChooseDiffieHellmanEllipticCurveTests(SynchronousTestCase):
3099    """
3100    Tests for L{sslverify._ChooseDiffieHellmanEllipticCurve}.
3101
3102    @cvar OPENSSL_110: A version number for OpenSSL 1.1.0
3103
3104    @cvar OPENSSL_102: A version number for OpenSSL 1.0.2
3105
3106    @cvar OPENSSL_101: A version number for OpenSSL 1.0.1
3107
3108    @see:
3109        U{https://wiki.openssl.org/index.php/Manual:OPENSSL_VERSION_NUMBER(3)}
3110    """
3111
3112    if skipSSL:
3113        skip = skipSSL
3114
3115    OPENSSL_110 = 0x1010007F
3116    OPENSSL_102 = 0x100020EF
3117    OPENSSL_101 = 0x1000114F
3118
3119    def setUp(self):
3120        self.libState = FakeLibState(setECDHAutoRaises=False)
3121        self.lib = FakeLib(self.libState)
3122
3123        self.cryptoState = FakeCryptoState(
3124            getEllipticCurveReturns=None, getEllipticCurveRaises=None
3125        )
3126        self.crypto = FakeCrypto(self.cryptoState)
3127        self.context = FakeContext(SSL.SSLv23_METHOD)
3128
3129    def test_openSSL110(self):
3130        """
3131        No configuration of contexts occurs under OpenSSL 1.1.0 and
3132        later, because they create contexts with secure ECDH curves.
3133
3134        @see: U{http://twistedmatrix.com/trac/ticket/9210}
3135        """
3136        chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
3137            self.OPENSSL_110,
3138            openSSLlib=self.lib,
3139            openSSLcrypto=self.crypto,
3140        )
3141        chooser.configureECDHCurve(self.context)
3142
3143        self.assertFalse(self.libState.ecdhContexts)
3144        self.assertFalse(self.libState.ecdhValues)
3145        self.assertFalse(self.cryptoState.getEllipticCurveCalls)
3146        self.assertIsNone(self.context._ecCurve)
3147
3148    def test_openSSL102(self):
3149        """
3150        OpenSSL 1.0.2 does not set ECDH curves by default, but
3151        C{SSL_CTX_set_ecdh_auto} requests that a context choose a
3152        secure set curves automatically.
3153        """
3154        context = SSL.Context(SSL.SSLv23_METHOD)
3155        chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
3156            self.OPENSSL_102,
3157            openSSLlib=self.lib,
3158            openSSLcrypto=self.crypto,
3159        )
3160        chooser.configureECDHCurve(context)
3161
3162        self.assertEqual(self.libState.ecdhContexts, [context._context])
3163        self.assertEqual(self.libState.ecdhValues, [True])
3164        self.assertFalse(self.cryptoState.getEllipticCurveCalls)
3165        self.assertIsNone(self.context._ecCurve)
3166
3167    def test_openSSL102SetECDHAutoRaises(self):
3168        """
3169        An exception raised by C{SSL_CTX_set_ecdh_auto} under OpenSSL
3170        1.0.2 is suppressed because ECDH is best-effort.
3171        """
3172        self.libState.setECDHAutoRaises = BaseException
3173        context = SSL.Context(SSL.SSLv23_METHOD)
3174        chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
3175            self.OPENSSL_102,
3176            openSSLlib=self.lib,
3177            openSSLcrypto=self.crypto,
3178        )
3179        chooser.configureECDHCurve(context)
3180
3181        self.assertEqual(self.libState.ecdhContexts, [context._context])
3182        self.assertEqual(self.libState.ecdhValues, [True])
3183        self.assertFalse(self.cryptoState.getEllipticCurveCalls)
3184
3185    def test_openSSL101(self):
3186        """
3187        OpenSSL 1.0.1 does not set ECDH curves by default, nor does
3188        it expose L{SSL_CTX_set_ecdh_auto}.  Instead, a single ECDH
3189        curve can be set with L{OpenSSL.SSL.Context.set_tmp_ecdh}.
3190        """
3191        self.cryptoState.getEllipticCurveReturns = curve = "curve object"
3192        chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
3193            self.OPENSSL_101,
3194            openSSLlib=self.lib,
3195            openSSLcrypto=self.crypto,
3196        )
3197        chooser.configureECDHCurve(self.context)
3198
3199        self.assertFalse(self.libState.ecdhContexts)
3200        self.assertFalse(self.libState.ecdhValues)
3201        self.assertEqual(
3202            self.cryptoState.getEllipticCurveCalls,
3203            [sslverify._defaultCurveName],
3204        )
3205        self.assertIs(self.context._ecCurve, curve)
3206
3207    def test_openSSL101SetECDHRaises(self):
3208        """
3209        An exception raised by L{OpenSSL.SSL.Context.set_tmp_ecdh}
3210        under OpenSSL 1.0.1 is suppressed because ECHDE is best-effort.
3211        """
3212
3213        def set_tmp_ecdh(ctx):
3214            raise BaseException
3215
3216        self.context.set_tmp_ecdh = set_tmp_ecdh
3217
3218        chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
3219            self.OPENSSL_101,
3220            openSSLlib=self.lib,
3221            openSSLcrypto=self.crypto,
3222        )
3223        chooser.configureECDHCurve(self.context)
3224
3225        self.assertFalse(self.libState.ecdhContexts)
3226        self.assertFalse(self.libState.ecdhValues)
3227        self.assertEqual(
3228            self.cryptoState.getEllipticCurveCalls,
3229            [sslverify._defaultCurveName],
3230        )
3231
3232    def test_openSSL101NoECC(self):
3233        """
3234        Contexts created under an OpenSSL 1.0.1 that doesn't support
3235        ECC have no configuration applied.
3236        """
3237        self.cryptoState.getEllipticCurveRaises = ValueError
3238        chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
3239            self.OPENSSL_101,
3240            openSSLlib=self.lib,
3241            openSSLcrypto=self.crypto,
3242        )
3243        chooser.configureECDHCurve(self.context)
3244
3245        self.assertFalse(self.libState.ecdhContexts)
3246        self.assertFalse(self.libState.ecdhValues)
3247        self.assertIsNone(self.context._ecCurve)
3248
3249
3250class KeyPairTests(TestCase):
3251    """
3252    Tests for L{sslverify.KeyPair}.
3253    """
3254
3255    if skipSSL:
3256        skip = skipSSL
3257
3258    def setUp(self):
3259        """
3260        Create test certificate.
3261        """
3262        self.sKey = makeCertificate(O=b"Server Test Certificate", CN=b"server")[0]
3263
3264    def test_getstateDeprecation(self):
3265        """
3266        L{sslverify.KeyPair.__getstate__} is deprecated.
3267        """
3268        self.callDeprecated(
3269            (Version("Twisted", 15, 0, 0), "a real persistence system"),
3270            sslverify.KeyPair(self.sKey).__getstate__,
3271        )
3272
3273    def test_setstateDeprecation(self):
3274        """
3275        {sslverify.KeyPair.__setstate__} is deprecated.
3276        """
3277        state = sslverify.KeyPair(self.sKey).dump()
3278        self.callDeprecated(
3279            (Version("Twisted", 15, 0, 0), "a real persistence system"),
3280            sslverify.KeyPair(self.sKey).__setstate__,
3281            state,
3282        )
3283
3284    def test_noTrailingNewlinePemCert(self):
3285        noTrailingNewlineKeyPemPath = getModule("twisted.test").filePath.sibling(
3286            "cert.pem.no_trailing_newline"
3287        )
3288
3289        certPEM = noTrailingNewlineKeyPemPath.getContent()
3290        ssl.Certificate.loadPEM(certPEM)
3291
3292
3293class SelectVerifyImplementationTests(SynchronousTestCase):
3294    """
3295    Tests for L{_selectVerifyImplementation}.
3296    """
3297
3298    if skipSSL:
3299        skip = skipSSL
3300
3301    def test_dependencyMissing(self):
3302        """
3303        If I{service_identity} cannot be imported then
3304        L{_selectVerifyImplementation} returns L{simpleVerifyHostname} and
3305        L{SimpleVerificationError}.
3306        """
3307        with SetAsideModule("service_identity"):
3308            sys.modules["service_identity"] = None
3309
3310            result = sslverify._selectVerifyImplementation()
3311            expected = (
3312                sslverify.simpleVerifyHostname,
3313                sslverify.simpleVerifyIPAddress,
3314                sslverify.SimpleVerificationError,
3315            )
3316            self.assertEqual(expected, result)
3317
3318    test_dependencyMissing.suppress = [  # type: ignore[attr-defined]
3319        util.suppress(
3320            message=(
3321                "You do not have a working installation of the "
3322                "service_identity module"
3323            ),
3324        ),
3325    ]
3326
3327    def test_dependencyMissingWarning(self):
3328        """
3329        If I{service_identity} cannot be imported then
3330        L{_selectVerifyImplementation} emits a L{UserWarning} advising the user
3331        of the exact error.
3332        """
3333        with SetAsideModule("service_identity"):
3334            sys.modules["service_identity"] = None
3335
3336            sslverify._selectVerifyImplementation()
3337
3338        [warning] = list(
3339            warning
3340            for warning in self.flushWarnings()
3341            if warning["category"] == UserWarning
3342        )
3343
3344        importErrors = [
3345            # Python 3.6.3
3346            "'import of service_identity halted; None in sys.modules'",
3347            # Python 3
3348            "'import of 'service_identity' halted; None in sys.modules'",
3349            # Python 2
3350            "'No module named service_identity'",
3351        ]
3352
3353        expectedMessages = []
3354        for importError in importErrors:
3355            expectedMessages.append(
3356                "You do not have a working installation of the "
3357                "service_identity module: {message}.  Please install it from "
3358                "<https://pypi.python.org/pypi/service_identity> "
3359                "and make sure all of its dependencies are satisfied.  "
3360                "Without the service_identity module, Twisted can perform only"
3361                " rudimentary TLS client hostname verification.  Many valid "
3362                "certificate/hostname mappings may be rejected.".format(
3363                    message=importError
3364                )
3365            )
3366
3367        self.assertIn(warning["message"], expectedMessages)
3368
3369        # Make sure we're abusing the warning system to a sufficient
3370        # degree: there is no filename or line number that makes sense for
3371        # this warning to "blame" for the problem.  It is a system
3372        # misconfiguration.  So the location information should be blank
3373        # (or as blank as we can make it).
3374        self.assertEqual(warning["filename"], "")
3375        self.assertEqual(warning["lineno"], 0)
3376