1# -*- coding: utf-8 -*-
2# Test the support for SSL and sockets
3
4import sys
5import unittest
6from test import test_support as support
7from test.script_helper import assert_python_ok
8import asyncore
9import socket
10import select
11import time
12import datetime
13import gc
14import os
15import errno
16import pprint
17import tempfile
18import urllib2
19import traceback
20import weakref
21import platform
22import functools
23from contextlib import closing
24
25ssl = support.import_module("ssl")
26
27PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
28HOST = support.HOST
29IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
30IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
31
32
33def data_file(*name):
34    return os.path.join(os.path.dirname(__file__), *name)
35
36# The custom key and certificate files used in test_ssl are generated
37# using Lib/test/make_ssl_certs.py.
38# Other certificates are simply fetched from the Internet servers they
39# are meant to authenticate.
40
41CERTFILE = data_file("keycert.pem")
42BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
43ONLYCERT = data_file("ssl_cert.pem")
44ONLYKEY = data_file("ssl_key.pem")
45BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
46BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
47CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
48ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
49KEY_PASSWORD = "somepass"
50CAPATH = data_file("capath")
51BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
52CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
53CAFILE_CACERT = data_file("capath", "5ed36f99.0")
54
55
56# empty CRL
57CRLFILE = data_file("revocation.crl")
58
59# Two keys and certs signed by the same CA (for SNI tests)
60SIGNED_CERTFILE = data_file("keycert3.pem")
61SIGNED_CERTFILE2 = data_file("keycert4.pem")
62SIGNING_CA = data_file("pycacert.pem")
63# cert with all kinds of subject alt names
64ALLSANFILE = data_file("allsans.pem")
65
66REMOTE_HOST = "self-signed.pythontest.net"
67REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
68
69EMPTYCERT = data_file("nullcert.pem")
70BADCERT = data_file("badcert.pem")
71NONEXISTINGCERT = data_file("XXXnonexisting.pem")
72BADKEY = data_file("badkey.pem")
73NOKIACERT = data_file("nokia.pem")
74NULLBYTECERT = data_file("nullbytecert.pem")
75
76DHFILE = data_file("dh1024.pem")
77BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
78
79
80def handle_error(prefix):
81    exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
82    if support.verbose:
83        sys.stdout.write(prefix + exc_format)
84
85
86class BasicTests(unittest.TestCase):
87
88    def test_sslwrap_simple(self):
89        # A crude test for the legacy API
90        try:
91            ssl.sslwrap_simple(socket.socket(socket.AF_INET))
92        except IOError, e:
93            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94                pass
95            else:
96                raise
97        try:
98            ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
99        except IOError, e:
100            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
101                pass
102            else:
103                raise
104
105
106def can_clear_options():
107    # 0.9.8m or higher
108    return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
109
110def no_sslv2_implies_sslv3_hello():
111    # 0.9.7h or higher
112    return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
113
114def have_verify_flags():
115    # 0.9.8 or higher
116    return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
117
118def utc_offset(): #NOTE: ignore issues like #1647654
119    # local time = utc time + utc offset
120    if time.daylight and time.localtime().tm_isdst > 0:
121        return -time.altzone  # seconds
122    return -time.timezone
123
124def asn1time(cert_time):
125    # Some versions of OpenSSL ignore seconds, see #18207
126    # 0.9.8.i
127    if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
128        fmt = "%b %d %H:%M:%S %Y GMT"
129        dt = datetime.datetime.strptime(cert_time, fmt)
130        dt = dt.replace(second=0)
131        cert_time = dt.strftime(fmt)
132        # %d adds leading zero but ASN1_TIME_print() uses leading space
133        if cert_time[4] == "0":
134            cert_time = cert_time[:4] + " " + cert_time[5:]
135
136    return cert_time
137
138# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
139def skip_if_broken_ubuntu_ssl(func):
140    if hasattr(ssl, 'PROTOCOL_SSLv2'):
141        @functools.wraps(func)
142        def f(*args, **kwargs):
143            try:
144                ssl.SSLContext(ssl.PROTOCOL_SSLv2)
145            except ssl.SSLError:
146                if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
147                    platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
148                    raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
149            return func(*args, **kwargs)
150        return f
151    else:
152        return func
153
154needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
155
156
157class BasicSocketTests(unittest.TestCase):
158
159    def test_constants(self):
160        ssl.CERT_NONE
161        ssl.CERT_OPTIONAL
162        ssl.CERT_REQUIRED
163        ssl.OP_CIPHER_SERVER_PREFERENCE
164        ssl.OP_SINGLE_DH_USE
165        if ssl.HAS_ECDH:
166            ssl.OP_SINGLE_ECDH_USE
167        if ssl.OPENSSL_VERSION_INFO >= (1, 0):
168            ssl.OP_NO_COMPRESSION
169        self.assertIn(ssl.HAS_SNI, {True, False})
170        self.assertIn(ssl.HAS_ECDH, {True, False})
171
172    def test_random(self):
173        v = ssl.RAND_status()
174        if support.verbose:
175            sys.stdout.write("\n RAND_status is %d (%s)\n"
176                             % (v, (v and "sufficient randomness") or
177                                "insufficient randomness"))
178        if hasattr(ssl, 'RAND_egd'):
179            self.assertRaises(TypeError, ssl.RAND_egd, 1)
180            self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
181        ssl.RAND_add("this is a random string", 75.0)
182
183    def test_parse_cert(self):
184        # note that this uses an 'unofficial' function in _ssl.c,
185        # provided solely for this test, to exercise the certificate
186        # parsing code
187        p = ssl._ssl._test_decode_cert(CERTFILE)
188        if support.verbose:
189            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
190        self.assertEqual(p['issuer'],
191                         ((('countryName', 'XY'),),
192                          (('localityName', 'Castle Anthrax'),),
193                          (('organizationName', 'Python Software Foundation'),),
194                          (('commonName', 'localhost'),))
195                        )
196        # Note the next three asserts will fail if the keys are regenerated
197        self.assertEqual(p['notAfter'], asn1time('Oct  5 23:01:56 2020 GMT'))
198        self.assertEqual(p['notBefore'], asn1time('Oct  8 23:01:56 2010 GMT'))
199        self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
200        self.assertEqual(p['subject'],
201                         ((('countryName', 'XY'),),
202                          (('localityName', 'Castle Anthrax'),),
203                          (('organizationName', 'Python Software Foundation'),),
204                          (('commonName', 'localhost'),))
205                        )
206        self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
207        # Issue #13034: the subjectAltName in some certificates
208        # (notably projects.developer.nokia.com:443) wasn't parsed
209        p = ssl._ssl._test_decode_cert(NOKIACERT)
210        if support.verbose:
211            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
212        self.assertEqual(p['subjectAltName'],
213                         (('DNS', 'projects.developer.nokia.com'),
214                          ('DNS', 'projects.forum.nokia.com'))
215                        )
216        # extra OCSP and AIA fields
217        self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
218        self.assertEqual(p['caIssuers'],
219                         ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
220        self.assertEqual(p['crlDistributionPoints'],
221                         ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
222
223    def test_parse_cert_CVE_2013_4238(self):
224        p = ssl._ssl._test_decode_cert(NULLBYTECERT)
225        if support.verbose:
226            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
227        subject = ((('countryName', 'US'),),
228                   (('stateOrProvinceName', 'Oregon'),),
229                   (('localityName', 'Beaverton'),),
230                   (('organizationName', 'Python Software Foundation'),),
231                   (('organizationalUnitName', 'Python Core Development'),),
232                   (('commonName', 'null.python.org\x00example.org'),),
233                   (('emailAddress', 'python-dev@python.org'),))
234        self.assertEqual(p['subject'], subject)
235        self.assertEqual(p['issuer'], subject)
236        if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
237            san = (('DNS', 'altnull.python.org\x00example.com'),
238                   ('email', 'null@python.org\x00user@example.org'),
239                   ('URI', 'http://null.python.org\x00http://example.org'),
240                   ('IP Address', '192.0.2.1'),
241                   ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
242        else:
243            # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
244            san = (('DNS', 'altnull.python.org\x00example.com'),
245                   ('email', 'null@python.org\x00user@example.org'),
246                   ('URI', 'http://null.python.org\x00http://example.org'),
247                   ('IP Address', '192.0.2.1'),
248                   ('IP Address', '<invalid>'))
249
250        self.assertEqual(p['subjectAltName'], san)
251
252    def test_parse_all_sans(self):
253        p = ssl._ssl._test_decode_cert(ALLSANFILE)
254        self.assertEqual(p['subjectAltName'],
255            (
256                ('DNS', 'allsans'),
257                ('othername', '<unsupported>'),
258                ('othername', '<unsupported>'),
259                ('email', 'user@example.org'),
260                ('DNS', 'www.example.org'),
261                ('DirName',
262                    ((('countryName', 'XY'),),
263                    (('localityName', 'Castle Anthrax'),),
264                    (('organizationName', 'Python Software Foundation'),),
265                    (('commonName', 'dirname example'),))),
266                ('URI', 'https://www.python.org/'),
267                ('IP Address', '127.0.0.1'),
268                ('IP Address', '0:0:0:0:0:0:0:1\n'),
269                ('Registered ID', '1.2.3.4.5')
270            )
271        )
272
273    def test_DER_to_PEM(self):
274        with open(CAFILE_CACERT, 'r') as f:
275            pem = f.read()
276        d1 = ssl.PEM_cert_to_DER_cert(pem)
277        p2 = ssl.DER_cert_to_PEM_cert(d1)
278        d2 = ssl.PEM_cert_to_DER_cert(p2)
279        self.assertEqual(d1, d2)
280        if not p2.startswith(ssl.PEM_HEADER + '\n'):
281            self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
282        if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
283            self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
284
285    def test_openssl_version(self):
286        n = ssl.OPENSSL_VERSION_NUMBER
287        t = ssl.OPENSSL_VERSION_INFO
288        s = ssl.OPENSSL_VERSION
289        self.assertIsInstance(n, (int, long))
290        self.assertIsInstance(t, tuple)
291        self.assertIsInstance(s, str)
292        # Some sanity checks follow
293        # >= 0.9
294        self.assertGreaterEqual(n, 0x900000)
295        # < 3.0
296        self.assertLess(n, 0x30000000)
297        major, minor, fix, patch, status = t
298        self.assertGreaterEqual(major, 0)
299        self.assertLess(major, 3)
300        self.assertGreaterEqual(minor, 0)
301        self.assertLess(minor, 256)
302        self.assertGreaterEqual(fix, 0)
303        self.assertLess(fix, 256)
304        self.assertGreaterEqual(patch, 0)
305        self.assertLessEqual(patch, 63)
306        self.assertGreaterEqual(status, 0)
307        self.assertLessEqual(status, 15)
308        # Version string as returned by {Open,Libre}SSL, the format might change
309        if IS_LIBRESSL:
310            self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
311                            (s, t, hex(n)))
312        else:
313            self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
314                            (s, t))
315
316    @support.cpython_only
317    def test_refcycle(self):
318        # Issue #7943: an SSL object doesn't create reference cycles with
319        # itself.
320        s = socket.socket(socket.AF_INET)
321        ss = ssl.wrap_socket(s)
322        wr = weakref.ref(ss)
323        del ss
324        self.assertEqual(wr(), None)
325
326    def test_wrapped_unconnected(self):
327        # Methods on an unconnected SSLSocket propagate the original
328        # socket.error raise by the underlying socket object.
329        s = socket.socket(socket.AF_INET)
330        with closing(ssl.wrap_socket(s)) as ss:
331            self.assertRaises(socket.error, ss.recv, 1)
332            self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
333            self.assertRaises(socket.error, ss.recvfrom, 1)
334            self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
335            self.assertRaises(socket.error, ss.send, b'x')
336            self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
337
338    def test_timeout(self):
339        # Issue #8524: when creating an SSL socket, the timeout of the
340        # original socket should be retained.
341        for timeout in (None, 0.0, 5.0):
342            s = socket.socket(socket.AF_INET)
343            s.settimeout(timeout)
344            with closing(ssl.wrap_socket(s)) as ss:
345                self.assertEqual(timeout, ss.gettimeout())
346
347    def test_errors(self):
348        sock = socket.socket()
349        self.assertRaisesRegexp(ValueError,
350                        "certfile must be specified",
351                        ssl.wrap_socket, sock, keyfile=CERTFILE)
352        self.assertRaisesRegexp(ValueError,
353                        "certfile must be specified for server-side operations",
354                        ssl.wrap_socket, sock, server_side=True)
355        self.assertRaisesRegexp(ValueError,
356                        "certfile must be specified for server-side operations",
357                        ssl.wrap_socket, sock, server_side=True, certfile="")
358        with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
359            self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
360                                    s.connect, (HOST, 8080))
361        with self.assertRaises(IOError) as cm:
362            with closing(socket.socket()) as sock:
363                ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
364        self.assertEqual(cm.exception.errno, errno.ENOENT)
365        with self.assertRaises(IOError) as cm:
366            with closing(socket.socket()) as sock:
367                ssl.wrap_socket(sock,
368                    certfile=CERTFILE, keyfile=NONEXISTINGCERT)
369        self.assertEqual(cm.exception.errno, errno.ENOENT)
370        with self.assertRaises(IOError) as cm:
371            with closing(socket.socket()) as sock:
372                ssl.wrap_socket(sock,
373                    certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
374        self.assertEqual(cm.exception.errno, errno.ENOENT)
375
376    def bad_cert_test(self, certfile):
377        """Check that trying to use the given client certificate fails"""
378        certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
379                                   certfile)
380        sock = socket.socket()
381        self.addCleanup(sock.close)
382        with self.assertRaises(ssl.SSLError):
383            ssl.wrap_socket(sock,
384                            certfile=certfile,
385                            ssl_version=ssl.PROTOCOL_TLSv1)
386
387    def test_empty_cert(self):
388        """Wrapping with an empty cert file"""
389        self.bad_cert_test("nullcert.pem")
390
391    def test_malformed_cert(self):
392        """Wrapping with a badly formatted certificate (syntax error)"""
393        self.bad_cert_test("badcert.pem")
394
395    def test_malformed_key(self):
396        """Wrapping with a badly formatted key (syntax error)"""
397        self.bad_cert_test("badkey.pem")
398
399    def test_match_hostname(self):
400        def ok(cert, hostname):
401            ssl.match_hostname(cert, hostname)
402        def fail(cert, hostname):
403            self.assertRaises(ssl.CertificateError,
404                              ssl.match_hostname, cert, hostname)
405
406        cert = {'subject': ((('commonName', 'example.com'),),)}
407        ok(cert, 'example.com')
408        ok(cert, 'ExAmple.cOm')
409        fail(cert, 'www.example.com')
410        fail(cert, '.example.com')
411        fail(cert, 'example.org')
412        fail(cert, 'exampleXcom')
413
414        cert = {'subject': ((('commonName', '*.a.com'),),)}
415        ok(cert, 'foo.a.com')
416        fail(cert, 'bar.foo.a.com')
417        fail(cert, 'a.com')
418        fail(cert, 'Xa.com')
419        fail(cert, '.a.com')
420
421        # only match one left-most wildcard
422        cert = {'subject': ((('commonName', 'f*.com'),),)}
423        ok(cert, 'foo.com')
424        ok(cert, 'f.com')
425        fail(cert, 'bar.com')
426        fail(cert, 'foo.a.com')
427        fail(cert, 'bar.foo.com')
428
429        # NULL bytes are bad, CVE-2013-4073
430        cert = {'subject': ((('commonName',
431                              'null.python.org\x00example.org'),),)}
432        ok(cert, 'null.python.org\x00example.org') # or raise an error?
433        fail(cert, 'example.org')
434        fail(cert, 'null.python.org')
435
436        # error cases with wildcards
437        cert = {'subject': ((('commonName', '*.*.a.com'),),)}
438        fail(cert, 'bar.foo.a.com')
439        fail(cert, 'a.com')
440        fail(cert, 'Xa.com')
441        fail(cert, '.a.com')
442
443        cert = {'subject': ((('commonName', 'a.*.com'),),)}
444        fail(cert, 'a.foo.com')
445        fail(cert, 'a..com')
446        fail(cert, 'a.com')
447
448        # wildcard doesn't match IDNA prefix 'xn--'
449        idna = u'püthon.python.org'.encode("idna").decode("ascii")
450        cert = {'subject': ((('commonName', idna),),)}
451        ok(cert, idna)
452        cert = {'subject': ((('commonName', 'x*.python.org'),),)}
453        fail(cert, idna)
454        cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
455        fail(cert, idna)
456
457        # wildcard in first fragment and  IDNA A-labels in sequent fragments
458        # are supported.
459        idna = u'www*.pythön.org'.encode("idna").decode("ascii")
460        cert = {'subject': ((('commonName', idna),),)}
461        ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
462        ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
463        fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
464        fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
465
466        # Slightly fake real-world example
467        cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
468                'subject': ((('commonName', 'linuxfrz.org'),),),
469                'subjectAltName': (('DNS', 'linuxfr.org'),
470                                   ('DNS', 'linuxfr.com'),
471                                   ('othername', '<unsupported>'))}
472        ok(cert, 'linuxfr.org')
473        ok(cert, 'linuxfr.com')
474        # Not a "DNS" entry
475        fail(cert, '<unsupported>')
476        # When there is a subjectAltName, commonName isn't used
477        fail(cert, 'linuxfrz.org')
478
479        # A pristine real-world example
480        cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
481                'subject': ((('countryName', 'US'),),
482                            (('stateOrProvinceName', 'California'),),
483                            (('localityName', 'Mountain View'),),
484                            (('organizationName', 'Google Inc'),),
485                            (('commonName', 'mail.google.com'),))}
486        ok(cert, 'mail.google.com')
487        fail(cert, 'gmail.com')
488        # Only commonName is considered
489        fail(cert, 'California')
490
491        # Neither commonName nor subjectAltName
492        cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
493                'subject': ((('countryName', 'US'),),
494                            (('stateOrProvinceName', 'California'),),
495                            (('localityName', 'Mountain View'),),
496                            (('organizationName', 'Google Inc'),))}
497        fail(cert, 'mail.google.com')
498
499        # No DNS entry in subjectAltName but a commonName
500        cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
501                'subject': ((('countryName', 'US'),),
502                            (('stateOrProvinceName', 'California'),),
503                            (('localityName', 'Mountain View'),),
504                            (('commonName', 'mail.google.com'),)),
505                'subjectAltName': (('othername', 'blabla'), )}
506        ok(cert, 'mail.google.com')
507
508        # No DNS entry subjectAltName and no commonName
509        cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
510                'subject': ((('countryName', 'US'),),
511                            (('stateOrProvinceName', 'California'),),
512                            (('localityName', 'Mountain View'),),
513                            (('organizationName', 'Google Inc'),)),
514                'subjectAltName': (('othername', 'blabla'),)}
515        fail(cert, 'google.com')
516
517        # Empty cert / no cert
518        self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
519        self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
520
521        # Issue #17980: avoid denials of service by refusing more than one
522        # wildcard per fragment.
523        cert = {'subject': ((('commonName', 'a*b.com'),),)}
524        ok(cert, 'axxb.com')
525        cert = {'subject': ((('commonName', 'a*b.co*'),),)}
526        fail(cert, 'axxb.com')
527        cert = {'subject': ((('commonName', 'a*b*.com'),),)}
528        with self.assertRaises(ssl.CertificateError) as cm:
529            ssl.match_hostname(cert, 'axxbxxc.com')
530        self.assertIn("too many wildcards", str(cm.exception))
531
532    def test_server_side(self):
533        # server_hostname doesn't work for server sockets
534        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
535        with closing(socket.socket()) as sock:
536            self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
537                              server_hostname="some.hostname")
538
539    def test_unknown_channel_binding(self):
540        # should raise ValueError for unknown type
541        s = socket.socket(socket.AF_INET)
542        with closing(ssl.wrap_socket(s)) as ss:
543            with self.assertRaises(ValueError):
544                ss.get_channel_binding("unknown-type")
545
546    @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
547                         "'tls-unique' channel binding not available")
548    def test_tls_unique_channel_binding(self):
549        # unconnected should return None for known type
550        s = socket.socket(socket.AF_INET)
551        with closing(ssl.wrap_socket(s)) as ss:
552            self.assertIsNone(ss.get_channel_binding("tls-unique"))
553        # the same for server-side
554        s = socket.socket(socket.AF_INET)
555        with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
556            self.assertIsNone(ss.get_channel_binding("tls-unique"))
557
558    def test_get_default_verify_paths(self):
559        paths = ssl.get_default_verify_paths()
560        self.assertEqual(len(paths), 6)
561        self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
562
563        with support.EnvironmentVarGuard() as env:
564            env["SSL_CERT_DIR"] = CAPATH
565            env["SSL_CERT_FILE"] = CERTFILE
566            paths = ssl.get_default_verify_paths()
567            self.assertEqual(paths.cafile, CERTFILE)
568            self.assertEqual(paths.capath, CAPATH)
569
570    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
571    def test_enum_certificates(self):
572        self.assertTrue(ssl.enum_certificates("CA"))
573        self.assertTrue(ssl.enum_certificates("ROOT"))
574
575        self.assertRaises(TypeError, ssl.enum_certificates)
576        self.assertRaises(WindowsError, ssl.enum_certificates, "")
577
578        trust_oids = set()
579        for storename in ("CA", "ROOT"):
580            store = ssl.enum_certificates(storename)
581            self.assertIsInstance(store, list)
582            for element in store:
583                self.assertIsInstance(element, tuple)
584                self.assertEqual(len(element), 3)
585                cert, enc, trust = element
586                self.assertIsInstance(cert, bytes)
587                self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
588                self.assertIsInstance(trust, (set, bool))
589                if isinstance(trust, set):
590                    trust_oids.update(trust)
591
592        serverAuth = "1.3.6.1.5.5.7.3.1"
593        self.assertIn(serverAuth, trust_oids)
594
595    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
596    def test_enum_crls(self):
597        self.assertTrue(ssl.enum_crls("CA"))
598        self.assertRaises(TypeError, ssl.enum_crls)
599        self.assertRaises(WindowsError, ssl.enum_crls, "")
600
601        crls = ssl.enum_crls("CA")
602        self.assertIsInstance(crls, list)
603        for element in crls:
604            self.assertIsInstance(element, tuple)
605            self.assertEqual(len(element), 2)
606            self.assertIsInstance(element[0], bytes)
607            self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
608
609
610    def test_asn1object(self):
611        expected = (129, 'serverAuth', 'TLS Web Server Authentication',
612                    '1.3.6.1.5.5.7.3.1')
613
614        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
615        self.assertEqual(val, expected)
616        self.assertEqual(val.nid, 129)
617        self.assertEqual(val.shortname, 'serverAuth')
618        self.assertEqual(val.longname, 'TLS Web Server Authentication')
619        self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
620        self.assertIsInstance(val, ssl._ASN1Object)
621        self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
622
623        val = ssl._ASN1Object.fromnid(129)
624        self.assertEqual(val, expected)
625        self.assertIsInstance(val, ssl._ASN1Object)
626        self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
627        with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
628            ssl._ASN1Object.fromnid(100000)
629        for i in range(1000):
630            try:
631                obj = ssl._ASN1Object.fromnid(i)
632            except ValueError:
633                pass
634            else:
635                self.assertIsInstance(obj.nid, int)
636                self.assertIsInstance(obj.shortname, str)
637                self.assertIsInstance(obj.longname, str)
638                self.assertIsInstance(obj.oid, (str, type(None)))
639
640        val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
641        self.assertEqual(val, expected)
642        self.assertIsInstance(val, ssl._ASN1Object)
643        self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
644        self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
645                         expected)
646        with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
647            ssl._ASN1Object.fromname('serverauth')
648
649    def test_purpose_enum(self):
650        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
651        self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
652        self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
653        self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
654        self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
655        self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
656                              '1.3.6.1.5.5.7.3.1')
657
658        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
659        self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
660        self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
661        self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
662        self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
663        self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
664                              '1.3.6.1.5.5.7.3.2')
665
666    def test_unsupported_dtls(self):
667        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
668        self.addCleanup(s.close)
669        with self.assertRaises(NotImplementedError) as cx:
670            ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
671        self.assertEqual(str(cx.exception), "only stream sockets are supported")
672        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
673        with self.assertRaises(NotImplementedError) as cx:
674            ctx.wrap_socket(s)
675        self.assertEqual(str(cx.exception), "only stream sockets are supported")
676
677    def cert_time_ok(self, timestring, timestamp):
678        self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
679
680    def cert_time_fail(self, timestring):
681        with self.assertRaises(ValueError):
682            ssl.cert_time_to_seconds(timestring)
683
684    @unittest.skipUnless(utc_offset(),
685                         'local time needs to be different from UTC')
686    def test_cert_time_to_seconds_timezone(self):
687        # Issue #19940: ssl.cert_time_to_seconds() returns wrong
688        #               results if local timezone is not UTC
689        self.cert_time_ok("May  9 00:00:00 2007 GMT", 1178668800.0)
690        self.cert_time_ok("Jan  5 09:34:43 2018 GMT", 1515144883.0)
691
692    def test_cert_time_to_seconds(self):
693        timestring = "Jan  5 09:34:43 2018 GMT"
694        ts = 1515144883.0
695        self.cert_time_ok(timestring, ts)
696        # accept keyword parameter, assert its name
697        self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
698        # accept both %e and %d (space or zero generated by strftime)
699        self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
700        # case-insensitive
701        self.cert_time_ok("JaN  5 09:34:43 2018 GmT", ts)
702        self.cert_time_fail("Jan  5 09:34 2018 GMT")     # no seconds
703        self.cert_time_fail("Jan  5 09:34:43 2018")      # no GMT
704        self.cert_time_fail("Jan  5 09:34:43 2018 UTC")  # not GMT timezone
705        self.cert_time_fail("Jan 35 09:34:43 2018 GMT")  # invalid day
706        self.cert_time_fail("Jon  5 09:34:43 2018 GMT")  # invalid month
707        self.cert_time_fail("Jan  5 24:00:00 2018 GMT")  # invalid hour
708        self.cert_time_fail("Jan  5 09:60:43 2018 GMT")  # invalid minute
709
710        newyear_ts = 1230768000.0
711        # leap seconds
712        self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
713        # same timestamp
714        self.cert_time_ok("Jan  1 00:00:00 2009 GMT", newyear_ts)
715
716        self.cert_time_ok("Jan  5 09:34:59 2018 GMT", 1515144899)
717        #  allow 60th second (even if it is not a leap second)
718        self.cert_time_ok("Jan  5 09:34:60 2018 GMT", 1515144900)
719        #  allow 2nd leap second for compatibility with time.strptime()
720        self.cert_time_ok("Jan  5 09:34:61 2018 GMT", 1515144901)
721        self.cert_time_fail("Jan  5 09:34:62 2018 GMT")  # invalid seconds
722
723        # no special treatement for the special value:
724        #   99991231235959Z (rfc 5280)
725        self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
726
727    @support.run_with_locale('LC_ALL', '')
728    def test_cert_time_to_seconds_locale(self):
729        # `cert_time_to_seconds()` should be locale independent
730
731        def local_february_name():
732            return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
733
734        if local_february_name().lower() == 'feb':
735            self.skipTest("locale-specific month name needs to be "
736                          "different from C locale")
737
738        # locale-independent
739        self.cert_time_ok("Feb  9 00:00:00 2007 GMT", 1170979200.0)
740        self.cert_time_fail(local_february_name() + "  9 00:00:00 2007 GMT")
741
742
743class ContextTests(unittest.TestCase):
744
745    @skip_if_broken_ubuntu_ssl
746    def test_constructor(self):
747        for protocol in PROTOCOLS:
748            ssl.SSLContext(protocol)
749        self.assertRaises(TypeError, ssl.SSLContext)
750        self.assertRaises(ValueError, ssl.SSLContext, -1)
751        self.assertRaises(ValueError, ssl.SSLContext, 42)
752
753    @skip_if_broken_ubuntu_ssl
754    def test_protocol(self):
755        for proto in PROTOCOLS:
756            ctx = ssl.SSLContext(proto)
757            self.assertEqual(ctx.protocol, proto)
758
759    def test_ciphers(self):
760        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
761        ctx.set_ciphers("ALL")
762        ctx.set_ciphers("DEFAULT")
763        with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
764            ctx.set_ciphers("^$:,;?*'dorothyx")
765
766    @skip_if_broken_ubuntu_ssl
767    def _test_options(self):
768        '''
769        Disable this test, it is too flaky. Different platforms define
770        different defaults
771        '''
772        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
773        # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
774        default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
775        if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
776            default |= ssl.OP_NO_COMPRESSION
777            default |= ssl.OP_ENABLE_MIDDLEBOX_COMPAT
778        self.assertEqual(default, ctx.options)
779        ctx.options |= ssl.OP_NO_TLSv1
780        self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
781        if can_clear_options():
782            ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
783            self.assertEqual(default, ctx.options)
784            ctx.options = 0
785            self.assertEqual(0, ctx.options)
786        else:
787            with self.assertRaises(ValueError):
788                ctx.options = 0
789
790    def test_verify_mode(self):
791        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
792        # Default value
793        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
794        ctx.verify_mode = ssl.CERT_OPTIONAL
795        self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
796        ctx.verify_mode = ssl.CERT_REQUIRED
797        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
798        ctx.verify_mode = ssl.CERT_NONE
799        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
800        with self.assertRaises(TypeError):
801            ctx.verify_mode = None
802        with self.assertRaises(ValueError):
803            ctx.verify_mode = 42
804
805    @unittest.skipUnless(have_verify_flags(),
806                         "verify_flags need OpenSSL > 0.9.8")
807    def test_verify_flags(self):
808        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
809        # default value
810        tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
811        self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
812        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
813        self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
814        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
815        self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
816        ctx.verify_flags = ssl.VERIFY_DEFAULT
817        self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
818        # supports any value
819        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
820        self.assertEqual(ctx.verify_flags,
821                         ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
822        with self.assertRaises(TypeError):
823            ctx.verify_flags = None
824
825    def test_load_cert_chain(self):
826        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
827        # Combined key and cert in a single file
828        ctx.load_cert_chain(CERTFILE, keyfile=None)
829        ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
830        self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
831        with self.assertRaises(IOError) as cm:
832            ctx.load_cert_chain(NONEXISTINGCERT)
833        self.assertEqual(cm.exception.errno, errno.ENOENT)
834        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
835            ctx.load_cert_chain(BADCERT)
836        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
837            ctx.load_cert_chain(EMPTYCERT)
838        # Separate key and cert
839        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
840        ctx.load_cert_chain(ONLYCERT, ONLYKEY)
841        ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
842        ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
843        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
844            ctx.load_cert_chain(ONLYCERT)
845        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
846            ctx.load_cert_chain(ONLYKEY)
847        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
848            ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
849        # Mismatching key and cert
850        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
851        with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
852            ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
853        # Password protected key and cert
854        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
855        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
856        ctx.load_cert_chain(CERTFILE_PROTECTED,
857                            password=bytearray(KEY_PASSWORD.encode()))
858        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
859        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
860        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
861                            bytearray(KEY_PASSWORD.encode()))
862        with self.assertRaisesRegexp(TypeError, "should be a string"):
863            ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
864        with self.assertRaises(ssl.SSLError):
865            ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
866        with self.assertRaisesRegexp(ValueError, "cannot be longer"):
867            # openssl has a fixed limit on the password buffer.
868            # PEM_BUFSIZE is generally set to 1kb.
869            # Return a string larger than this.
870            ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
871        # Password callback
872        def getpass_unicode():
873            return KEY_PASSWORD
874        def getpass_bytes():
875            return KEY_PASSWORD.encode()
876        def getpass_bytearray():
877            return bytearray(KEY_PASSWORD.encode())
878        def getpass_badpass():
879            return "badpass"
880        def getpass_huge():
881            return b'a' * (1024 * 1024)
882        def getpass_bad_type():
883            return 9
884        def getpass_exception():
885            raise Exception('getpass error')
886        class GetPassCallable:
887            def __call__(self):
888                return KEY_PASSWORD
889            def getpass(self):
890                return KEY_PASSWORD
891        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
892        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
893        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
894        ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
895        ctx.load_cert_chain(CERTFILE_PROTECTED,
896                            password=GetPassCallable().getpass)
897        with self.assertRaises(ssl.SSLError):
898            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
899        with self.assertRaisesRegexp(ValueError, "cannot be longer"):
900            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
901        with self.assertRaisesRegexp(TypeError, "must return a string"):
902            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
903        with self.assertRaisesRegexp(Exception, "getpass error"):
904            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
905        # Make sure the password function isn't called if it isn't needed
906        ctx.load_cert_chain(CERTFILE, password=getpass_exception)
907
908    def test_load_verify_locations(self):
909        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
910        ctx.load_verify_locations(CERTFILE)
911        ctx.load_verify_locations(cafile=CERTFILE, capath=None)
912        ctx.load_verify_locations(BYTES_CERTFILE)
913        ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
914        ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
915        self.assertRaises(TypeError, ctx.load_verify_locations)
916        self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
917        with self.assertRaises(IOError) as cm:
918            ctx.load_verify_locations(NONEXISTINGCERT)
919        self.assertEqual(cm.exception.errno, errno.ENOENT)
920        with self.assertRaises(IOError):
921            ctx.load_verify_locations(u'')
922        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
923            ctx.load_verify_locations(BADCERT)
924        ctx.load_verify_locations(CERTFILE, CAPATH)
925        ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
926
927        # Issue #10989: crash if the second argument type is invalid
928        self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
929
930    def test_load_verify_cadata(self):
931        # test cadata
932        with open(CAFILE_CACERT) as f:
933            cacert_pem = f.read().decode("ascii")
934        cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
935        with open(CAFILE_NEURONIO) as f:
936            neuronio_pem = f.read().decode("ascii")
937        neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
938
939        # test PEM
940        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
941        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
942        ctx.load_verify_locations(cadata=cacert_pem)
943        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
944        ctx.load_verify_locations(cadata=neuronio_pem)
945        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
946        # cert already in hash table
947        ctx.load_verify_locations(cadata=neuronio_pem)
948        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
949
950        # combined
951        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
952        combined = "\n".join((cacert_pem, neuronio_pem))
953        ctx.load_verify_locations(cadata=combined)
954        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
955
956        # with junk around the certs
957        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
958        combined = ["head", cacert_pem, "other", neuronio_pem, "again",
959                    neuronio_pem, "tail"]
960        ctx.load_verify_locations(cadata="\n".join(combined))
961        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
962
963        # test DER
964        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
965        ctx.load_verify_locations(cadata=cacert_der)
966        ctx.load_verify_locations(cadata=neuronio_der)
967        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
968        # cert already in hash table
969        ctx.load_verify_locations(cadata=cacert_der)
970        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
971
972        # combined
973        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
974        combined = b"".join((cacert_der, neuronio_der))
975        ctx.load_verify_locations(cadata=combined)
976        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
977
978        # error cases
979        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
980        self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
981
982        with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
983            ctx.load_verify_locations(cadata=u"broken")
984        with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
985            ctx.load_verify_locations(cadata=b"broken")
986
987
988    def test_load_dh_params(self):
989        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
990        ctx.load_dh_params(DHFILE)
991        if os.name != 'nt':
992            ctx.load_dh_params(BYTES_DHFILE)
993        self.assertRaises(TypeError, ctx.load_dh_params)
994        self.assertRaises(TypeError, ctx.load_dh_params, None)
995        with self.assertRaises(IOError) as cm:
996            ctx.load_dh_params(NONEXISTINGCERT)
997        self.assertEqual(cm.exception.errno, errno.ENOENT)
998        with self.assertRaises(ssl.SSLError) as cm:
999            ctx.load_dh_params(CERTFILE)
1000
1001    @skip_if_broken_ubuntu_ssl
1002    def test_session_stats(self):
1003        for proto in PROTOCOLS:
1004            ctx = ssl.SSLContext(proto)
1005            self.assertEqual(ctx.session_stats(), {
1006                'number': 0,
1007                'connect': 0,
1008                'connect_good': 0,
1009                'connect_renegotiate': 0,
1010                'accept': 0,
1011                'accept_good': 0,
1012                'accept_renegotiate': 0,
1013                'hits': 0,
1014                'misses': 0,
1015                'timeouts': 0,
1016                'cache_full': 0,
1017            })
1018
1019    def test_set_default_verify_paths(self):
1020        # There's not much we can do to test that it acts as expected,
1021        # so just check it doesn't crash or raise an exception.
1022        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1023        ctx.set_default_verify_paths()
1024
1025    @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
1026    def test_set_ecdh_curve(self):
1027        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1028        ctx.set_ecdh_curve("prime256v1")
1029        ctx.set_ecdh_curve(b"prime256v1")
1030        self.assertRaises(TypeError, ctx.set_ecdh_curve)
1031        self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1032        self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1033        self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1034
1035    @needs_sni
1036    def test_sni_callback(self):
1037        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1038
1039        # set_servername_callback expects a callable, or None
1040        self.assertRaises(TypeError, ctx.set_servername_callback)
1041        self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1042        self.assertRaises(TypeError, ctx.set_servername_callback, "")
1043        self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1044
1045        def dummycallback(sock, servername, ctx):
1046            pass
1047        ctx.set_servername_callback(None)
1048        ctx.set_servername_callback(dummycallback)
1049
1050    @needs_sni
1051    def test_sni_callback_refcycle(self):
1052        # Reference cycles through the servername callback are detected
1053        # and cleared.
1054        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1055        def dummycallback(sock, servername, ctx, cycle=ctx):
1056            pass
1057        ctx.set_servername_callback(dummycallback)
1058        wr = weakref.ref(ctx)
1059        del ctx, dummycallback
1060        gc.collect()
1061        self.assertIs(wr(), None)
1062
1063    def test_cert_store_stats(self):
1064        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1065        self.assertEqual(ctx.cert_store_stats(),
1066            {'x509_ca': 0, 'crl': 0, 'x509': 0})
1067        ctx.load_cert_chain(CERTFILE)
1068        self.assertEqual(ctx.cert_store_stats(),
1069            {'x509_ca': 0, 'crl': 0, 'x509': 0})
1070        ctx.load_verify_locations(CERTFILE)
1071        self.assertEqual(ctx.cert_store_stats(),
1072            {'x509_ca': 0, 'crl': 0, 'x509': 1})
1073        ctx.load_verify_locations(CAFILE_CACERT)
1074        self.assertEqual(ctx.cert_store_stats(),
1075            {'x509_ca': 1, 'crl': 0, 'x509': 2})
1076
1077    def test_get_ca_certs(self):
1078        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1079        self.assertEqual(ctx.get_ca_certs(), [])
1080        # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1081        ctx.load_verify_locations(CERTFILE)
1082        self.assertEqual(ctx.get_ca_certs(), [])
1083        # but CAFILE_CACERT is a CA cert
1084        ctx.load_verify_locations(CAFILE_CACERT)
1085        self.assertEqual(ctx.get_ca_certs(),
1086            [{'issuer': ((('organizationName', 'Root CA'),),
1087                         (('organizationalUnitName', 'http://www.cacert.org'),),
1088                         (('commonName', 'CA Cert Signing Authority'),),
1089                         (('emailAddress', 'support@cacert.org'),)),
1090              'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1091              'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1092              'serialNumber': '00',
1093              'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1094              'subject': ((('organizationName', 'Root CA'),),
1095                          (('organizationalUnitName', 'http://www.cacert.org'),),
1096                          (('commonName', 'CA Cert Signing Authority'),),
1097                          (('emailAddress', 'support@cacert.org'),)),
1098              'version': 3}])
1099
1100        with open(CAFILE_CACERT) as f:
1101            pem = f.read()
1102        der = ssl.PEM_cert_to_DER_cert(pem)
1103        self.assertEqual(ctx.get_ca_certs(True), [der])
1104
1105    def test_load_default_certs(self):
1106        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1107        ctx.load_default_certs()
1108
1109        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1110        ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1111        ctx.load_default_certs()
1112
1113        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1114        ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1115
1116        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1117        self.assertRaises(TypeError, ctx.load_default_certs, None)
1118        self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1119
1120    @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
1121    @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
1122    def test_load_default_certs_env(self):
1123        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1124        with support.EnvironmentVarGuard() as env:
1125            env["SSL_CERT_DIR"] = CAPATH
1126            env["SSL_CERT_FILE"] = CERTFILE
1127            ctx.load_default_certs()
1128            self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1129
1130    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1131    def test_load_default_certs_env_windows(self):
1132        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1133        ctx.load_default_certs()
1134        stats = ctx.cert_store_stats()
1135
1136        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1137        with support.EnvironmentVarGuard() as env:
1138            env["SSL_CERT_DIR"] = CAPATH
1139            env["SSL_CERT_FILE"] = CERTFILE
1140            ctx.load_default_certs()
1141            stats["x509"] += 1
1142            self.assertEqual(ctx.cert_store_stats(), stats)
1143
1144    def test_create_default_context(self):
1145        ctx = ssl.create_default_context()
1146        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1147        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1148        self.assertTrue(ctx.check_hostname)
1149        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1150        self.assertEqual(
1151            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1152            getattr(ssl, "OP_NO_COMPRESSION", 0),
1153        )
1154
1155        with open(SIGNING_CA) as f:
1156            cadata = f.read().decode("ascii")
1157        ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1158                                         cadata=cadata)
1159        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1160        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1161        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1162        self.assertEqual(
1163            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1164            getattr(ssl, "OP_NO_COMPRESSION", 0),
1165        )
1166
1167        ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1168        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1169        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1170        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1171        self.assertEqual(
1172            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1173            getattr(ssl, "OP_NO_COMPRESSION", 0),
1174        )
1175        self.assertEqual(
1176            ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1177            getattr(ssl, "OP_SINGLE_DH_USE", 0),
1178        )
1179        self.assertEqual(
1180            ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1181            getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1182        )
1183
1184    def test__create_stdlib_context(self):
1185        ctx = ssl._create_stdlib_context()
1186        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1187        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1188        self.assertFalse(ctx.check_hostname)
1189        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1190
1191        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1192        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1193        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1194        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1195
1196        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1197                                         cert_reqs=ssl.CERT_REQUIRED,
1198                                         check_hostname=True)
1199        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1200        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1201        self.assertTrue(ctx.check_hostname)
1202        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1203
1204        ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1205        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1206        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1207        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1208
1209    def test__https_verify_certificates(self):
1210        # Unit test to check the contect factory mapping
1211        # The factories themselves are tested above
1212        # This test will fail by design if run under PYTHONHTTPSVERIFY=0
1213        # (as will various test_httplib tests)
1214
1215        # Uses a fresh SSL module to avoid affecting the real one
1216        local_ssl = support.import_fresh_module("ssl")
1217        # Certificate verification is enabled by default
1218        self.assertIs(local_ssl._create_default_https_context,
1219                      local_ssl.create_default_context)
1220        # Turn default verification off
1221        local_ssl._https_verify_certificates(enable=False)
1222        self.assertIs(local_ssl._create_default_https_context,
1223                      local_ssl._create_unverified_context)
1224        # And back on
1225        local_ssl._https_verify_certificates(enable=True)
1226        self.assertIs(local_ssl._create_default_https_context,
1227                      local_ssl.create_default_context)
1228        # The default behaviour is to enable
1229        local_ssl._https_verify_certificates(enable=False)
1230        local_ssl._https_verify_certificates()
1231        self.assertIs(local_ssl._create_default_https_context,
1232                      local_ssl.create_default_context)
1233
1234    def test__https_verify_envvar(self):
1235        # Unit test to check the PYTHONHTTPSVERIFY handling
1236        # Need to use a subprocess so it can still be run under -E
1237        https_is_verified = """import ssl, sys; \
1238            status = "Error: _create_default_https_context does not verify certs" \
1239                       if ssl._create_default_https_context is \
1240                          ssl._create_unverified_context \
1241                     else None; \
1242            sys.exit(status)"""
1243        https_is_not_verified = """import ssl, sys; \
1244            status = "Error: _create_default_https_context verifies certs" \
1245                       if ssl._create_default_https_context is \
1246                          ssl.create_default_context \
1247                     else None; \
1248            sys.exit(status)"""
1249        extra_env = {}
1250        # Omitting it leaves verification on
1251        assert_python_ok("-c", https_is_verified, **extra_env)
1252        # Setting it to zero turns verification off
1253        extra_env[ssl._https_verify_envvar] = "0"
1254        assert_python_ok("-c", https_is_not_verified, **extra_env)
1255        # Any other value should also leave it on
1256        for setting in ("", "1", "enabled", "foo"):
1257            extra_env[ssl._https_verify_envvar] = setting
1258            assert_python_ok("-c", https_is_verified, **extra_env)
1259
1260    def test_check_hostname(self):
1261        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1262        self.assertFalse(ctx.check_hostname)
1263
1264        # Requires CERT_REQUIRED or CERT_OPTIONAL
1265        with self.assertRaises(ValueError):
1266            ctx.check_hostname = True
1267        ctx.verify_mode = ssl.CERT_REQUIRED
1268        self.assertFalse(ctx.check_hostname)
1269        ctx.check_hostname = True
1270        self.assertTrue(ctx.check_hostname)
1271
1272        ctx.verify_mode = ssl.CERT_OPTIONAL
1273        ctx.check_hostname = True
1274        self.assertTrue(ctx.check_hostname)
1275
1276        # Cannot set CERT_NONE with check_hostname enabled
1277        with self.assertRaises(ValueError):
1278            ctx.verify_mode = ssl.CERT_NONE
1279        ctx.check_hostname = False
1280        self.assertFalse(ctx.check_hostname)
1281
1282
1283class SSLErrorTests(unittest.TestCase):
1284
1285    def test_str(self):
1286        # The str() of a SSLError doesn't include the errno
1287        e = ssl.SSLError(1, "foo")
1288        self.assertEqual(str(e), "foo")
1289        self.assertEqual(e.errno, 1)
1290        # Same for a subclass
1291        e = ssl.SSLZeroReturnError(1, "foo")
1292        self.assertEqual(str(e), "foo")
1293        self.assertEqual(e.errno, 1)
1294
1295    def test_lib_reason(self):
1296        # Test the library and reason attributes
1297        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1298        with self.assertRaises(ssl.SSLError) as cm:
1299            ctx.load_dh_params(CERTFILE)
1300        self.assertEqual(cm.exception.library, 'PEM')
1301        self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1302        s = str(cm.exception)
1303        self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1304
1305    def test_subclass(self):
1306        # Check that the appropriate SSLError subclass is raised
1307        # (this only tests one of them)
1308        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1309        with closing(socket.socket()) as s:
1310            s.bind(("127.0.0.1", 0))
1311            s.listen(5)
1312            c = socket.socket()
1313            c.connect(s.getsockname())
1314            c.setblocking(False)
1315            with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1316                with self.assertRaises(ssl.SSLWantReadError) as cm:
1317                    c.do_handshake()
1318                s = str(cm.exception)
1319                self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1320                # For compatibility
1321                self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1322
1323
1324class NetworkedTests(unittest.TestCase):
1325
1326    def test_connect(self):
1327        with support.transient_internet(REMOTE_HOST):
1328            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1329                                cert_reqs=ssl.CERT_NONE)
1330            try:
1331                s.connect((REMOTE_HOST, 443))
1332                self.assertEqual({}, s.getpeercert())
1333            finally:
1334                s.close()
1335
1336            # this should fail because we have no verification certs
1337            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1338                                cert_reqs=ssl.CERT_REQUIRED)
1339            self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1340                                   s.connect, (REMOTE_HOST, 443))
1341            s.close()
1342
1343            # this should succeed because we specify the root cert
1344            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1345                                cert_reqs=ssl.CERT_REQUIRED,
1346                                ca_certs=REMOTE_ROOT_CERT)
1347            try:
1348                s.connect((REMOTE_HOST, 443))
1349                self.assertTrue(s.getpeercert())
1350            finally:
1351                s.close()
1352
1353    def test_connect_ex(self):
1354        # Issue #11326: check connect_ex() implementation
1355        with support.transient_internet(REMOTE_HOST):
1356            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1357                                cert_reqs=ssl.CERT_REQUIRED,
1358                                ca_certs=REMOTE_ROOT_CERT)
1359            try:
1360                self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
1361                self.assertTrue(s.getpeercert())
1362            finally:
1363                s.close()
1364
1365    def test_non_blocking_connect_ex(self):
1366        # Issue #11326: non-blocking connect_ex() should allow handshake
1367        # to proceed after the socket gets ready.
1368        with support.transient_internet(REMOTE_HOST):
1369            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1370                                cert_reqs=ssl.CERT_REQUIRED,
1371                                ca_certs=REMOTE_ROOT_CERT,
1372                                do_handshake_on_connect=False)
1373            try:
1374                s.setblocking(False)
1375                rc = s.connect_ex((REMOTE_HOST, 443))
1376                # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1377                self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1378                # Wait for connect to finish
1379                select.select([], [s], [], 5.0)
1380                # Non-blocking handshake
1381                while True:
1382                    try:
1383                        s.do_handshake()
1384                        break
1385                    except ssl.SSLWantReadError:
1386                        select.select([s], [], [], 5.0)
1387                    except ssl.SSLWantWriteError:
1388                        select.select([], [s], [], 5.0)
1389                # SSL established
1390                self.assertTrue(s.getpeercert())
1391            finally:
1392                s.close()
1393
1394    def test_timeout_connect_ex(self):
1395        # Issue #12065: on a timeout, connect_ex() should return the original
1396        # errno (mimicking the behaviour of non-SSL sockets).
1397        with support.transient_internet(REMOTE_HOST):
1398            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1399                                cert_reqs=ssl.CERT_REQUIRED,
1400                                ca_certs=REMOTE_ROOT_CERT,
1401                                do_handshake_on_connect=False)
1402            try:
1403                s.settimeout(0.0000001)
1404                rc = s.connect_ex((REMOTE_HOST, 443))
1405                if rc == 0:
1406                    self.skipTest("REMOTE_HOST responded too quickly")
1407                self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1408            finally:
1409                s.close()
1410
1411    def test_connect_ex_error(self):
1412        with support.transient_internet(REMOTE_HOST):
1413            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1414                                cert_reqs=ssl.CERT_REQUIRED,
1415                                ca_certs=REMOTE_ROOT_CERT)
1416            try:
1417                rc = s.connect_ex((REMOTE_HOST, 444))
1418                # Issue #19919: Windows machines or VMs hosted on Windows
1419                # machines sometimes return EWOULDBLOCK.
1420                errors = (
1421                    errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1422                    errno.EWOULDBLOCK,
1423                )
1424                self.assertIn(rc, errors)
1425            finally:
1426                s.close()
1427
1428    def test_connect_with_context(self):
1429        with support.transient_internet(REMOTE_HOST):
1430            # Same as test_connect, but with a separately created context
1431            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1432            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1433            s.connect((REMOTE_HOST, 443))
1434            try:
1435                self.assertEqual({}, s.getpeercert())
1436            finally:
1437                s.close()
1438            # Same with a server hostname
1439            s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1440                                server_hostname=REMOTE_HOST)
1441            s.connect((REMOTE_HOST, 443))
1442            s.close()
1443            # This should fail because we have no verification certs
1444            ctx.verify_mode = ssl.CERT_REQUIRED
1445            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1446            self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1447                                    s.connect, (REMOTE_HOST, 443))
1448            s.close()
1449            # This should succeed because we specify the root cert
1450            ctx.load_verify_locations(REMOTE_ROOT_CERT)
1451            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1452            s.connect((REMOTE_HOST, 443))
1453            try:
1454                cert = s.getpeercert()
1455                self.assertTrue(cert)
1456            finally:
1457                s.close()
1458
1459    def test_connect_capath(self):
1460        # Verify server certificates using the `capath` argument
1461        # NOTE: the subject hashing algorithm has been changed between
1462        # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1463        # contain both versions of each certificate (same content, different
1464        # filename) for this test to be portable across OpenSSL releases.
1465        with support.transient_internet(REMOTE_HOST):
1466            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1467            ctx.verify_mode = ssl.CERT_REQUIRED
1468            ctx.load_verify_locations(capath=CAPATH)
1469            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1470            s.connect((REMOTE_HOST, 443))
1471            try:
1472                cert = s.getpeercert()
1473                self.assertTrue(cert)
1474            finally:
1475                s.close()
1476            # Same with a bytes `capath` argument
1477            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1478            ctx.verify_mode = ssl.CERT_REQUIRED
1479            ctx.load_verify_locations(capath=BYTES_CAPATH)
1480            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1481            s.connect((REMOTE_HOST, 443))
1482            try:
1483                cert = s.getpeercert()
1484                self.assertTrue(cert)
1485            finally:
1486                s.close()
1487
1488    def test_connect_cadata(self):
1489        with open(REMOTE_ROOT_CERT) as f:
1490            pem = f.read().decode('ascii')
1491        der = ssl.PEM_cert_to_DER_cert(pem)
1492        with support.transient_internet(REMOTE_HOST):
1493            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1494            ctx.verify_mode = ssl.CERT_REQUIRED
1495            ctx.load_verify_locations(cadata=pem)
1496            with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1497                s.connect((REMOTE_HOST, 443))
1498                cert = s.getpeercert()
1499                self.assertTrue(cert)
1500
1501            # same with DER
1502            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1503            ctx.verify_mode = ssl.CERT_REQUIRED
1504            ctx.load_verify_locations(cadata=der)
1505            with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1506                s.connect((REMOTE_HOST, 443))
1507                cert = s.getpeercert()
1508                self.assertTrue(cert)
1509
1510    @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1511    def test_makefile_close(self):
1512        # Issue #5238: creating a file-like object with makefile() shouldn't
1513        # delay closing the underlying "real socket" (here tested with its
1514        # file descriptor, hence skipping the test under Windows).
1515        with support.transient_internet(REMOTE_HOST):
1516            ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1517            ss.connect((REMOTE_HOST, 443))
1518            fd = ss.fileno()
1519            f = ss.makefile()
1520            f.close()
1521            # The fd is still open
1522            os.read(fd, 0)
1523            # Closing the SSL socket should close the fd too
1524            ss.close()
1525            gc.collect()
1526            with self.assertRaises(OSError) as e:
1527                os.read(fd, 0)
1528            self.assertEqual(e.exception.errno, errno.EBADF)
1529
1530    def test_non_blocking_handshake(self):
1531        with support.transient_internet(REMOTE_HOST):
1532            s = socket.socket(socket.AF_INET)
1533            s.connect((REMOTE_HOST, 443))
1534            s.setblocking(False)
1535            s = ssl.wrap_socket(s,
1536                                cert_reqs=ssl.CERT_NONE,
1537                                do_handshake_on_connect=False)
1538            count = 0
1539            while True:
1540                try:
1541                    count += 1
1542                    s.do_handshake()
1543                    break
1544                except ssl.SSLWantReadError:
1545                    select.select([s], [], [])
1546                except ssl.SSLWantWriteError:
1547                    select.select([], [s], [])
1548            s.close()
1549            if support.verbose:
1550                sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
1551
1552    def test_get_server_certificate(self):
1553        def _test_get_server_certificate(host, port, cert=None):
1554            with support.transient_internet(host):
1555                pem = ssl.get_server_certificate((host, port))
1556                if not pem:
1557                    self.fail("No server certificate on %s:%s!" % (host, port))
1558
1559                try:
1560                    pem = ssl.get_server_certificate((host, port),
1561                                                     ca_certs=CERTFILE)
1562                except ssl.SSLError as x:
1563                    #should fail
1564                    if support.verbose:
1565                        sys.stdout.write("%s\n" % x)
1566                else:
1567                    self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1568                pem = ssl.get_server_certificate((host, port),
1569                                                 ca_certs=cert)
1570                if not pem:
1571                    self.fail("No server certificate on %s:%s!" % (host, port))
1572                if support.verbose:
1573                    sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1574
1575        _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
1576        if support.IPV6_ENABLED:
1577            _test_get_server_certificate('ipv6.google.com', 443)
1578
1579    def test_ciphers(self):
1580        remote = (REMOTE_HOST, 443)
1581        with support.transient_internet(remote[0]):
1582            with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1583                                         cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1584                s.connect(remote)
1585            with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1586                                         cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1587                s.connect(remote)
1588            # Error checking can happen at instantiation or when connecting
1589            with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1590                with closing(socket.socket(socket.AF_INET)) as sock:
1591                    s = ssl.wrap_socket(sock,
1592                                        cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1593                    s.connect(remote)
1594
1595    def test_algorithms(self):
1596        # Issue #8484: all algorithms should be available when verifying a
1597        # certificate.
1598        # SHA256 was added in OpenSSL 0.9.8
1599        if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1600            self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
1601        # sha256.tbs-internet.com needs SNI to use the correct certificate
1602        if not ssl.HAS_SNI:
1603            self.skipTest("SNI needed for this test")
1604        # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1605        remote = ("sha256.tbs-internet.com", 443)
1606        sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
1607        with support.transient_internet("sha256.tbs-internet.com"):
1608            ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1609            ctx.verify_mode = ssl.CERT_REQUIRED
1610            ctx.load_verify_locations(sha256_cert)
1611            s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1612                                server_hostname="sha256.tbs-internet.com")
1613            try:
1614                s.connect(remote)
1615                if support.verbose:
1616                    sys.stdout.write("\nCipher with %r is %r\n" %
1617                                     (remote, s.cipher()))
1618                    sys.stdout.write("Certificate is:\n%s\n" %
1619                                     pprint.pformat(s.getpeercert()))
1620            finally:
1621                s.close()
1622
1623    def test_get_ca_certs_capath(self):
1624        # capath certs are loaded on request
1625        with support.transient_internet(REMOTE_HOST):
1626            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1627            ctx.verify_mode = ssl.CERT_REQUIRED
1628            ctx.load_verify_locations(capath=CAPATH)
1629            self.assertEqual(ctx.get_ca_certs(), [])
1630            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1631            s.connect((REMOTE_HOST, 443))
1632            try:
1633                cert = s.getpeercert()
1634                self.assertTrue(cert)
1635            finally:
1636                s.close()
1637            self.assertEqual(len(ctx.get_ca_certs()), 1)
1638
1639    @needs_sni
1640    def test_context_setget(self):
1641        # Check that the context of a connected socket can be replaced.
1642        with support.transient_internet(REMOTE_HOST):
1643            ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1644            ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1645            s = socket.socket(socket.AF_INET)
1646            with closing(ctx1.wrap_socket(s)) as ss:
1647                ss.connect((REMOTE_HOST, 443))
1648                self.assertIs(ss.context, ctx1)
1649                self.assertIs(ss._sslobj.context, ctx1)
1650                ss.context = ctx2
1651                self.assertIs(ss.context, ctx2)
1652                self.assertIs(ss._sslobj.context, ctx2)
1653
1654try:
1655    import threading
1656except ImportError:
1657    _have_threads = False
1658else:
1659    _have_threads = True
1660
1661    from test.ssl_servers import make_https_server
1662
1663    class ThreadedEchoServer(threading.Thread):
1664
1665        class ConnectionHandler(threading.Thread):
1666
1667            """A mildly complicated class, because we want it to work both
1668            with and without the SSL wrapper around the socket connection, so
1669            that we can test the STARTTLS functionality."""
1670
1671            def __init__(self, server, connsock, addr):
1672                self.server = server
1673                self.running = False
1674                self.sock = connsock
1675                self.addr = addr
1676                self.sock.setblocking(1)
1677                self.sslconn = None
1678                threading.Thread.__init__(self)
1679                self.daemon = True
1680
1681            def wrap_conn(self):
1682                try:
1683                    self.sslconn = self.server.context.wrap_socket(
1684                        self.sock, server_side=True)
1685                    self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1686                    self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
1687                except socket.error as e:
1688                    # We treat ConnectionResetError as though it were an
1689                    # SSLError - OpenSSL on Ubuntu abruptly closes the
1690                    # connection when asked to use an unsupported protocol.
1691                    #
1692                    # XXX Various errors can have happened here, for example
1693                    # a mismatching protocol version, an invalid certificate,
1694                    # or a low-level bug. This should be made more discriminating.
1695                    if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1696                        raise
1697                    self.server.conn_errors.append(e)
1698                    if self.server.chatty:
1699                        handle_error("\n server:  bad connection attempt from " + repr(self.addr) + ":\n")
1700                    self.running = False
1701                    self.server.stop()
1702                    self.close()
1703                    return False
1704                else:
1705                    if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1706                        cert = self.sslconn.getpeercert()
1707                        if support.verbose and self.server.chatty:
1708                            sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1709                        cert_binary = self.sslconn.getpeercert(True)
1710                        if support.verbose and self.server.chatty:
1711                            sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1712                    cipher = self.sslconn.cipher()
1713                    if support.verbose and self.server.chatty:
1714                        sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1715                        sys.stdout.write(" server: selected protocol is now "
1716                                + str(self.sslconn.selected_npn_protocol()) + "\n")
1717                    return True
1718
1719            def read(self):
1720                if self.sslconn:
1721                    return self.sslconn.read()
1722                else:
1723                    return self.sock.recv(1024)
1724
1725            def write(self, bytes):
1726                if self.sslconn:
1727                    return self.sslconn.write(bytes)
1728                else:
1729                    return self.sock.send(bytes)
1730
1731            def close(self):
1732                if self.sslconn:
1733                    self.sslconn.close()
1734                else:
1735                    self.sock.close()
1736
1737            def run(self):
1738                self.running = True
1739                if not self.server.starttls_server:
1740                    if not self.wrap_conn():
1741                        return
1742                while self.running:
1743                    try:
1744                        msg = self.read()
1745                        stripped = msg.strip()
1746                        if not stripped:
1747                            # eof, so quit this handler
1748                            self.running = False
1749                            self.close()
1750                        elif stripped == b'over':
1751                            if support.verbose and self.server.connectionchatty:
1752                                sys.stdout.write(" server: client closed connection\n")
1753                            self.close()
1754                            return
1755                        elif (self.server.starttls_server and
1756                              stripped == b'STARTTLS'):
1757                            if support.verbose and self.server.connectionchatty:
1758                                sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
1759                            self.write(b"OK\n")
1760                            if not self.wrap_conn():
1761                                return
1762                        elif (self.server.starttls_server and self.sslconn
1763                              and stripped == b'ENDTLS'):
1764                            if support.verbose and self.server.connectionchatty:
1765                                sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
1766                            self.write(b"OK\n")
1767                            self.sock = self.sslconn.unwrap()
1768                            self.sslconn = None
1769                            if support.verbose and self.server.connectionchatty:
1770                                sys.stdout.write(" server: connection is now unencrypted...\n")
1771                        elif stripped == b'CB tls-unique':
1772                            if support.verbose and self.server.connectionchatty:
1773                                sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1774                            data = self.sslconn.get_channel_binding("tls-unique")
1775                            self.write(repr(data).encode("us-ascii") + b"\n")
1776                        else:
1777                            if (support.verbose and
1778                                self.server.connectionchatty):
1779                                ctype = (self.sslconn and "encrypted") or "unencrypted"
1780                                sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1781                                                 % (msg, ctype, msg.lower(), ctype))
1782                            self.write(msg.lower())
1783                    except ssl.SSLError:
1784                        if self.server.chatty:
1785                            handle_error("Test server failure:\n")
1786                        self.close()
1787                        self.running = False
1788                        # normally, we'd just stop here, but for the test
1789                        # harness, we want to stop the server
1790                        self.server.stop()
1791
1792        def __init__(self, certificate=None, ssl_version=None,
1793                     certreqs=None, cacerts=None,
1794                     chatty=True, connectionchatty=False, starttls_server=False,
1795                     npn_protocols=None, alpn_protocols=None,
1796                     ciphers=None, context=None):
1797            if context:
1798                self.context = context
1799            else:
1800                self.context = ssl.SSLContext(ssl_version
1801                                              if ssl_version is not None
1802                                              else ssl.PROTOCOL_TLSv1)
1803                self.context.verify_mode = (certreqs if certreqs is not None
1804                                            else ssl.CERT_NONE)
1805                if cacerts:
1806                    self.context.load_verify_locations(cacerts)
1807                if certificate:
1808                    self.context.load_cert_chain(certificate)
1809                if npn_protocols:
1810                    self.context.set_npn_protocols(npn_protocols)
1811                if alpn_protocols:
1812                    self.context.set_alpn_protocols(alpn_protocols)
1813                if ciphers:
1814                    self.context.set_ciphers(ciphers)
1815            self.chatty = chatty
1816            self.connectionchatty = connectionchatty
1817            self.starttls_server = starttls_server
1818            self.sock = socket.socket()
1819            self.port = support.bind_port(self.sock)
1820            self.flag = None
1821            self.active = False
1822            self.selected_npn_protocols = []
1823            self.selected_alpn_protocols = []
1824            self.conn_errors = []
1825            threading.Thread.__init__(self)
1826            self.daemon = True
1827
1828        def __enter__(self):
1829            self.start(threading.Event())
1830            self.flag.wait()
1831            return self
1832
1833        def __exit__(self, *args):
1834            self.stop()
1835            self.join()
1836
1837        def start(self, flag=None):
1838            self.flag = flag
1839            threading.Thread.start(self)
1840
1841        def run(self):
1842            self.sock.settimeout(0.05)
1843            self.sock.listen(5)
1844            self.active = True
1845            if self.flag:
1846                # signal an event
1847                self.flag.set()
1848            while self.active:
1849                try:
1850                    newconn, connaddr = self.sock.accept()
1851                    if support.verbose and self.chatty:
1852                        sys.stdout.write(' server:  new connection from '
1853                                         + repr(connaddr) + '\n')
1854                    handler = self.ConnectionHandler(self, newconn, connaddr)
1855                    handler.start()
1856                    handler.join()
1857                except socket.timeout:
1858                    pass
1859                except KeyboardInterrupt:
1860                    self.stop()
1861            self.sock.close()
1862
1863        def stop(self):
1864            self.active = False
1865
1866    class AsyncoreEchoServer(threading.Thread):
1867
1868        class EchoServer(asyncore.dispatcher):
1869
1870            class ConnectionHandler(asyncore.dispatcher_with_send):
1871
1872                def __init__(self, conn, certfile):
1873                    self.socket = ssl.wrap_socket(conn, server_side=True,
1874                                                  certfile=certfile,
1875                                                  do_handshake_on_connect=False)
1876                    asyncore.dispatcher_with_send.__init__(self, self.socket)
1877                    self._ssl_accepting = True
1878                    self._do_ssl_handshake()
1879
1880                def readable(self):
1881                    if isinstance(self.socket, ssl.SSLSocket):
1882                        while self.socket.pending() > 0:
1883                            self.handle_read_event()
1884                    return True
1885
1886                def _do_ssl_handshake(self):
1887                    try:
1888                        self.socket.do_handshake()
1889                    except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1890                        return
1891                    except ssl.SSLEOFError:
1892                        return self.handle_close()
1893                    except ssl.SSLError:
1894                        raise
1895                    except socket.error, err:
1896                        if err.args[0] == errno.ECONNABORTED:
1897                            return self.handle_close()
1898                    else:
1899                        self._ssl_accepting = False
1900
1901                def handle_read(self):
1902                    if self._ssl_accepting:
1903                        self._do_ssl_handshake()
1904                    else:
1905                        data = self.recv(1024)
1906                        if support.verbose:
1907                            sys.stdout.write(" server:  read %s from client\n" % repr(data))
1908                        if not data:
1909                            self.close()
1910                        else:
1911                            self.send(data.lower())
1912
1913                def handle_close(self):
1914                    self.close()
1915                    if support.verbose:
1916                        sys.stdout.write(" server:  closed connection %s\n" % self.socket)
1917
1918                def handle_error(self):
1919                    raise
1920
1921            def __init__(self, certfile):
1922                self.certfile = certfile
1923                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1924                self.port = support.bind_port(sock, '')
1925                asyncore.dispatcher.__init__(self, sock)
1926                self.listen(5)
1927
1928            def handle_accept(self):
1929                sock_obj, addr = self.accept()
1930                if support.verbose:
1931                    sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
1932                self.ConnectionHandler(sock_obj, self.certfile)
1933
1934            def handle_error(self):
1935                raise
1936
1937        def __init__(self, certfile):
1938            self.flag = None
1939            self.active = False
1940            self.server = self.EchoServer(certfile)
1941            self.port = self.server.port
1942            threading.Thread.__init__(self)
1943            self.daemon = True
1944
1945        def __str__(self):
1946            return "<%s %s>" % (self.__class__.__name__, self.server)
1947
1948        def __enter__(self):
1949            self.start(threading.Event())
1950            self.flag.wait()
1951            return self
1952
1953        def __exit__(self, *args):
1954            if support.verbose:
1955                sys.stdout.write(" cleanup: stopping server.\n")
1956            self.stop()
1957            if support.verbose:
1958                sys.stdout.write(" cleanup: joining server thread.\n")
1959            self.join()
1960            if support.verbose:
1961                sys.stdout.write(" cleanup: successfully joined.\n")
1962
1963        def start(self, flag=None):
1964            self.flag = flag
1965            threading.Thread.start(self)
1966
1967        def run(self):
1968            self.active = True
1969            if self.flag:
1970                self.flag.set()
1971            while self.active:
1972                try:
1973                    asyncore.loop(1)
1974                except:
1975                    pass
1976
1977        def stop(self):
1978            self.active = False
1979            self.server.close()
1980
1981    def server_params_test(client_context, server_context, indata=b"FOO\n",
1982                           chatty=True, connectionchatty=False, sni_name=None):
1983        """
1984        Launch a server, connect a client to it and try various reads
1985        and writes.
1986        """
1987        stats = {}
1988        server = ThreadedEchoServer(context=server_context,
1989                                    chatty=chatty,
1990                                    connectionchatty=False)
1991        with server:
1992            with closing(client_context.wrap_socket(socket.socket(),
1993                    server_hostname=sni_name)) as s:
1994                s.connect((HOST, server.port))
1995                for arg in [indata, bytearray(indata), memoryview(indata)]:
1996                    if connectionchatty:
1997                        if support.verbose:
1998                            sys.stdout.write(
1999                                " client:  sending %r...\n" % indata)
2000                    s.write(arg)
2001                    outdata = s.read()
2002                    if connectionchatty:
2003                        if support.verbose:
2004                            sys.stdout.write(" client:  read %r\n" % outdata)
2005                    if outdata != indata.lower():
2006                        raise AssertionError(
2007                            "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2008                            % (outdata[:20], len(outdata),
2009                               indata[:20].lower(), len(indata)))
2010                s.write(b"over\n")
2011                if connectionchatty:
2012                    if support.verbose:
2013                        sys.stdout.write(" client:  closing connection.\n")
2014                stats.update({
2015                    'compression': s.compression(),
2016                    'cipher': s.cipher(),
2017                    'peercert': s.getpeercert(),
2018                    'client_alpn_protocol': s.selected_alpn_protocol(),
2019                    'client_npn_protocol': s.selected_npn_protocol(),
2020                    'version': s.version(),
2021                })
2022                s.close()
2023            stats['server_alpn_protocols'] = server.selected_alpn_protocols
2024            stats['server_npn_protocols'] = server.selected_npn_protocols
2025        return stats
2026
2027    def try_protocol_combo(server_protocol, client_protocol, expect_success,
2028                           certsreqs=None, server_options=0, client_options=0):
2029        """
2030        Try to SSL-connect using *client_protocol* to *server_protocol*.
2031        If *expect_success* is true, assert that the connection succeeds,
2032        if it's false, assert that the connection fails.
2033        Also, if *expect_success* is a string, assert that it is the protocol
2034        version actually used by the connection.
2035        """
2036        if certsreqs is None:
2037            certsreqs = ssl.CERT_NONE
2038        certtype = {
2039            ssl.CERT_NONE: "CERT_NONE",
2040            ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2041            ssl.CERT_REQUIRED: "CERT_REQUIRED",
2042        }[certsreqs]
2043        if support.verbose:
2044            formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2045            sys.stdout.write(formatstr %
2046                             (ssl.get_protocol_name(client_protocol),
2047                              ssl.get_protocol_name(server_protocol),
2048                              certtype))
2049        client_context = ssl.SSLContext(client_protocol)
2050        client_context.options |= client_options
2051        server_context = ssl.SSLContext(server_protocol)
2052        server_context.options |= server_options
2053
2054        # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2055        # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2056        # starting from OpenSSL 1.0.0 (see issue #8322).
2057        if client_context.protocol == ssl.PROTOCOL_SSLv23:
2058            client_context.set_ciphers("ALL")
2059
2060        for ctx in (client_context, server_context):
2061            ctx.verify_mode = certsreqs
2062            ctx.load_cert_chain(CERTFILE)
2063            ctx.load_verify_locations(CERTFILE)
2064        try:
2065            stats = server_params_test(client_context, server_context,
2066                                       chatty=False, connectionchatty=False)
2067        # Protocol mismatch can result in either an SSLError, or a
2068        # "Connection reset by peer" error.
2069        except ssl.SSLError:
2070            if expect_success:
2071                raise
2072        except socket.error as e:
2073            if expect_success or e.errno != errno.ECONNRESET:
2074                raise
2075        else:
2076            if not expect_success:
2077                raise AssertionError(
2078                    "Client protocol %s succeeded with server protocol %s!"
2079                    % (ssl.get_protocol_name(client_protocol),
2080                       ssl.get_protocol_name(server_protocol)))
2081            elif (expect_success is not True
2082                  and expect_success != stats['version']):
2083                raise AssertionError("version mismatch: expected %r, got %r"
2084                                     % (expect_success, stats['version']))
2085
2086
2087    class ThreadedTests(unittest.TestCase):
2088
2089        @skip_if_broken_ubuntu_ssl
2090        def test_echo(self):
2091            """Basic test of an SSL client connecting to a server"""
2092            if support.verbose:
2093                sys.stdout.write("\n")
2094            for protocol in PROTOCOLS:
2095                context = ssl.SSLContext(protocol)
2096                context.load_cert_chain(CERTFILE)
2097                server_params_test(context, context,
2098                                   chatty=True, connectionchatty=True)
2099
2100        def test_getpeercert(self):
2101            if support.verbose:
2102                sys.stdout.write("\n")
2103            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2104            context.verify_mode = ssl.CERT_REQUIRED
2105            context.load_verify_locations(CERTFILE)
2106            context.load_cert_chain(CERTFILE)
2107            server = ThreadedEchoServer(context=context, chatty=False)
2108            with server:
2109                s = context.wrap_socket(socket.socket(),
2110                                        do_handshake_on_connect=False)
2111                s.connect((HOST, server.port))
2112                # getpeercert() raise ValueError while the handshake isn't
2113                # done.
2114                with self.assertRaises(ValueError):
2115                    s.getpeercert()
2116                s.do_handshake()
2117                cert = s.getpeercert()
2118                self.assertTrue(cert, "Can't get peer certificate.")
2119                cipher = s.cipher()
2120                if support.verbose:
2121                    sys.stdout.write(pprint.pformat(cert) + '\n')
2122                    sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2123                if 'subject' not in cert:
2124                    self.fail("No subject field in certificate: %s." %
2125                              pprint.pformat(cert))
2126                if ((('organizationName', 'Python Software Foundation'),)
2127                    not in cert['subject']):
2128                    self.fail(
2129                        "Missing or invalid 'organizationName' field in certificate subject; "
2130                        "should be 'Python Software Foundation'.")
2131                self.assertIn('notBefore', cert)
2132                self.assertIn('notAfter', cert)
2133                before = ssl.cert_time_to_seconds(cert['notBefore'])
2134                after = ssl.cert_time_to_seconds(cert['notAfter'])
2135                self.assertLess(before, after)
2136                s.close()
2137
2138        @unittest.skipUnless(have_verify_flags(),
2139                            "verify_flags need OpenSSL > 0.9.8")
2140        def test_crl_check(self):
2141            if support.verbose:
2142                sys.stdout.write("\n")
2143
2144            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2145            server_context.load_cert_chain(SIGNED_CERTFILE)
2146
2147            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2148            context.verify_mode = ssl.CERT_REQUIRED
2149            context.load_verify_locations(SIGNING_CA)
2150            tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2151            self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
2152
2153            # VERIFY_DEFAULT should pass
2154            server = ThreadedEchoServer(context=server_context, chatty=True)
2155            with server:
2156                with closing(context.wrap_socket(socket.socket())) as s:
2157                    s.connect((HOST, server.port))
2158                    cert = s.getpeercert()
2159                    self.assertTrue(cert, "Can't get peer certificate.")
2160
2161            # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2162            context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2163
2164            server = ThreadedEchoServer(context=server_context, chatty=True)
2165            with server:
2166                with closing(context.wrap_socket(socket.socket())) as s:
2167                    with self.assertRaisesRegexp(ssl.SSLError,
2168                                                "certificate verify failed"):
2169                        s.connect((HOST, server.port))
2170
2171            # now load a CRL file. The CRL file is signed by the CA.
2172            context.load_verify_locations(CRLFILE)
2173
2174            server = ThreadedEchoServer(context=server_context, chatty=True)
2175            with server:
2176                with closing(context.wrap_socket(socket.socket())) as s:
2177                    s.connect((HOST, server.port))
2178                    cert = s.getpeercert()
2179                    self.assertTrue(cert, "Can't get peer certificate.")
2180
2181        def test_check_hostname(self):
2182            if support.verbose:
2183                sys.stdout.write("\n")
2184
2185            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2186            server_context.load_cert_chain(SIGNED_CERTFILE)
2187
2188            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2189            context.verify_mode = ssl.CERT_REQUIRED
2190            context.check_hostname = True
2191            context.load_verify_locations(SIGNING_CA)
2192
2193            # correct hostname should verify
2194            server = ThreadedEchoServer(context=server_context, chatty=True)
2195            with server:
2196                with closing(context.wrap_socket(socket.socket(),
2197                                                 server_hostname="localhost")) as s:
2198                    s.connect((HOST, server.port))
2199                    cert = s.getpeercert()
2200                    self.assertTrue(cert, "Can't get peer certificate.")
2201
2202            # incorrect hostname should raise an exception
2203            server = ThreadedEchoServer(context=server_context, chatty=True)
2204            with server:
2205                with closing(context.wrap_socket(socket.socket(),
2206                                                 server_hostname="invalid")) as s:
2207                    with self.assertRaisesRegexp(ssl.CertificateError,
2208                                                "hostname 'invalid' doesn't match u?'localhost'"):
2209                        s.connect((HOST, server.port))
2210
2211            # missing server_hostname arg should cause an exception, too
2212            server = ThreadedEchoServer(context=server_context, chatty=True)
2213            with server:
2214                with closing(socket.socket()) as s:
2215                    with self.assertRaisesRegexp(ValueError,
2216                                                "check_hostname requires server_hostname"):
2217                        context.wrap_socket(s)
2218
2219        def test_wrong_cert(self):
2220            """Connecting when the server rejects the client's certificate
2221
2222            Launch a server with CERT_REQUIRED, and check that trying to
2223            connect to it with a wrong client certificate fails.
2224            """
2225            certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2226                                       "wrongcert.pem")
2227            server = ThreadedEchoServer(CERTFILE,
2228                                        certreqs=ssl.CERT_REQUIRED,
2229                                        cacerts=CERTFILE, chatty=False,
2230                                        connectionchatty=False)
2231            with server, \
2232                    closing(socket.socket()) as sock, \
2233                    closing(ssl.wrap_socket(sock,
2234                                        certfile=certfile,
2235                                        ssl_version=ssl.PROTOCOL_TLSv1)) as s:
2236                try:
2237                    # Expect either an SSL error about the server rejecting
2238                    # the connection, or a low-level connection reset (which
2239                    # sometimes happens on Windows)
2240                    s.connect((HOST, server.port))
2241                except ssl.SSLError as e:
2242                    if support.verbose:
2243                        sys.stdout.write("\nSSLError is %r\n" % e)
2244                except socket.error as e:
2245                    if e.errno != errno.ECONNRESET:
2246                        raise
2247                    if support.verbose:
2248                        sys.stdout.write("\nsocket.error is %r\n" % e)
2249                else:
2250                    self.fail("Use of invalid cert should have failed!")
2251
2252        def test_rude_shutdown(self):
2253            """A brutal shutdown of an SSL server should raise an OSError
2254            in the client when attempting handshake.
2255            """
2256            listener_ready = threading.Event()
2257            listener_gone = threading.Event()
2258
2259            s = socket.socket()
2260            port = support.bind_port(s, HOST)
2261
2262            # `listener` runs in a thread.  It sits in an accept() until
2263            # the main thread connects.  Then it rudely closes the socket,
2264            # and sets Event `listener_gone` to let the main thread know
2265            # the socket is gone.
2266            def listener():
2267                s.listen(5)
2268                listener_ready.set()
2269                newsock, addr = s.accept()
2270                newsock.close()
2271                s.close()
2272                listener_gone.set()
2273
2274            def connector():
2275                listener_ready.wait()
2276                with closing(socket.socket()) as c:
2277                    c.connect((HOST, port))
2278                    listener_gone.wait()
2279                    try:
2280                        ssl_sock = ssl.wrap_socket(c)
2281                    except socket.error:
2282                        pass
2283                    else:
2284                        self.fail('connecting to closed SSL socket should have failed')
2285
2286            t = threading.Thread(target=listener)
2287            t.start()
2288            try:
2289                connector()
2290            finally:
2291                t.join()
2292
2293        @skip_if_broken_ubuntu_ssl
2294        @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2295                             "OpenSSL is compiled without SSLv2 support")
2296        def test_protocol_sslv2(self):
2297            """Connecting to an SSLv2 server with various client options"""
2298            if support.verbose:
2299                sys.stdout.write("\n")
2300            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2301            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2302            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
2303            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
2304            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2305            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2306            # SSLv23 client with specific SSL options
2307            if no_sslv2_implies_sslv3_hello():
2308                # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2309                try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2310                                   client_options=ssl.OP_NO_SSLv2)
2311            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2312                               client_options=ssl.OP_NO_SSLv3)
2313            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2314                               client_options=ssl.OP_NO_TLSv1)
2315
2316        @skip_if_broken_ubuntu_ssl
2317        def test_protocol_sslv23(self):
2318            """Connecting to an SSLv23 server with various client options"""
2319            if support.verbose:
2320                sys.stdout.write("\n")
2321            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2322                try:
2323                    try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2324                except socket.error as x:
2325                    # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2326                    if support.verbose:
2327                        sys.stdout.write(
2328                            " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2329                            % str(x))
2330            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2331                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
2332            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
2333            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
2334
2335            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2336                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2337            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
2338            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2339
2340            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2341                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2342            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
2343            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2344
2345            # Server with specific SSL options
2346            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2347                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2348                               server_options=ssl.OP_NO_SSLv3)
2349            # Will choose TLSv1
2350            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2351                               server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2352            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2353                               server_options=ssl.OP_NO_TLSv1)
2354
2355
2356        @skip_if_broken_ubuntu_ssl
2357        @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2358                             "OpenSSL is compiled without SSLv3 support")
2359        def test_protocol_sslv3(self):
2360            """Connecting to an SSLv3 server with various client options"""
2361            if support.verbose:
2362                sys.stdout.write("\n")
2363            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2364            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2365            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2366            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2367                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
2368            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2369                               client_options=ssl.OP_NO_SSLv3)
2370            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2371            if no_sslv2_implies_sslv3_hello():
2372                # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2373                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2374                                   False, client_options=ssl.OP_NO_SSLv2)
2375
2376        @skip_if_broken_ubuntu_ssl
2377        def test_protocol_tlsv1(self):
2378            """Connecting to a TLSv1 server with various client options"""
2379            if support.verbose:
2380                sys.stdout.write("\n")
2381            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2382            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2383            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2384            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2385                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2386            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2387                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
2388            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2389                               client_options=ssl.OP_NO_TLSv1)
2390
2391        @skip_if_broken_ubuntu_ssl
2392        @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2393                             "TLS version 1.1 not supported.")
2394        def test_protocol_tlsv1_1(self):
2395            """Connecting to a TLSv1.1 server with various client options.
2396               Testing against older TLS versions."""
2397            if support.verbose:
2398                sys.stdout.write("\n")
2399            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2400            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2401                try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2402            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2403                try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2404            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2405                               client_options=ssl.OP_NO_TLSv1_1)
2406
2407            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2408            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2409            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2410
2411
2412        @skip_if_broken_ubuntu_ssl
2413        @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2414                             "TLS version 1.2 not supported.")
2415        def test_protocol_tlsv1_2(self):
2416            """Connecting to a TLSv1.2 server with various client options.
2417               Testing against older TLS versions."""
2418            if support.verbose:
2419                sys.stdout.write("\n")
2420            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2421                               server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2422                               client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2423            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2424                try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2425            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2426                try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2427            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2428                               client_options=ssl.OP_NO_TLSv1_2)
2429
2430            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
2431            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2432            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2433            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2434            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2435
2436        def test_starttls(self):
2437            """Switching from clear text to encrypted and back again."""
2438            msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2439
2440            server = ThreadedEchoServer(CERTFILE,
2441                                        ssl_version=ssl.PROTOCOL_TLSv1,
2442                                        starttls_server=True,
2443                                        chatty=True,
2444                                        connectionchatty=True)
2445            wrapped = False
2446            with server:
2447                s = socket.socket()
2448                s.setblocking(1)
2449                s.connect((HOST, server.port))
2450                if support.verbose:
2451                    sys.stdout.write("\n")
2452                for indata in msgs:
2453                    if support.verbose:
2454                        sys.stdout.write(
2455                            " client:  sending %r...\n" % indata)
2456                    if wrapped:
2457                        conn.write(indata)
2458                        outdata = conn.read()
2459                    else:
2460                        s.send(indata)
2461                        outdata = s.recv(1024)
2462                    msg = outdata.strip().lower()
2463                    if indata == b"STARTTLS" and msg.startswith(b"ok"):
2464                        # STARTTLS ok, switch to secure mode
2465                        if support.verbose:
2466                            sys.stdout.write(
2467                                " client:  read %r from server, starting TLS...\n"
2468                                % msg)
2469                        conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2470                        wrapped = True
2471                    elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2472                        # ENDTLS ok, switch back to clear text
2473                        if support.verbose:
2474                            sys.stdout.write(
2475                                " client:  read %r from server, ending TLS...\n"
2476                                % msg)
2477                        s = conn.unwrap()
2478                        wrapped = False
2479                    else:
2480                        if support.verbose:
2481                            sys.stdout.write(
2482                                " client:  read %r from server\n" % msg)
2483                if support.verbose:
2484                    sys.stdout.write(" client:  closing connection.\n")
2485                if wrapped:
2486                    conn.write(b"over\n")
2487                else:
2488                    s.send(b"over\n")
2489                if wrapped:
2490                    conn.close()
2491                else:
2492                    s.close()
2493
2494        def test_socketserver(self):
2495            """Using a SocketServer to create and manage SSL connections."""
2496            server = make_https_server(self, certfile=CERTFILE)
2497            # try to connect
2498            if support.verbose:
2499                sys.stdout.write('\n')
2500            with open(CERTFILE, 'rb') as f:
2501                d1 = f.read()
2502            d2 = ''
2503            # now fetch the same data from the HTTPS server
2504            url = 'https://localhost:%d/%s' % (
2505                server.port, os.path.split(CERTFILE)[1])
2506            context = ssl.create_default_context(cafile=CERTFILE)
2507            f = urllib2.urlopen(url, context=context)
2508            try:
2509                dlen = f.info().getheader("content-length")
2510                if dlen and (int(dlen) > 0):
2511                    d2 = f.read(int(dlen))
2512                    if support.verbose:
2513                        sys.stdout.write(
2514                            " client: read %d bytes from remote server '%s'\n"
2515                            % (len(d2), server))
2516            finally:
2517                f.close()
2518            self.assertEqual(d1, d2)
2519
2520        def test_asyncore_server(self):
2521            """Check the example asyncore integration."""
2522            if support.verbose:
2523                sys.stdout.write("\n")
2524
2525            indata = b"FOO\n"
2526            server = AsyncoreEchoServer(CERTFILE)
2527            with server:
2528                s = ssl.wrap_socket(socket.socket())
2529                s.connect(('127.0.0.1', server.port))
2530                if support.verbose:
2531                    sys.stdout.write(
2532                        " client:  sending %r...\n" % indata)
2533                s.write(indata)
2534                outdata = s.read()
2535                if support.verbose:
2536                    sys.stdout.write(" client:  read %r\n" % outdata)
2537                if outdata != indata.lower():
2538                    self.fail(
2539                        "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2540                        % (outdata[:20], len(outdata),
2541                           indata[:20].lower(), len(indata)))
2542                s.write(b"over\n")
2543                if support.verbose:
2544                    sys.stdout.write(" client:  closing connection.\n")
2545                s.close()
2546                if support.verbose:
2547                    sys.stdout.write(" client:  connection closed.\n")
2548
2549        def test_recv_send(self):
2550            """Test recv(), send() and friends."""
2551            if support.verbose:
2552                sys.stdout.write("\n")
2553
2554            server = ThreadedEchoServer(CERTFILE,
2555                                        certreqs=ssl.CERT_NONE,
2556                                        ssl_version=ssl.PROTOCOL_TLSv1,
2557                                        cacerts=CERTFILE,
2558                                        chatty=True,
2559                                        connectionchatty=False)
2560            with server:
2561                s = ssl.wrap_socket(socket.socket(),
2562                                    server_side=False,
2563                                    certfile=CERTFILE,
2564                                    ca_certs=CERTFILE,
2565                                    cert_reqs=ssl.CERT_NONE,
2566                                    ssl_version=ssl.PROTOCOL_TLSv1)
2567                s.connect((HOST, server.port))
2568                # helper methods for standardising recv* method signatures
2569                def _recv_into():
2570                    b = bytearray(b"\0"*100)
2571                    count = s.recv_into(b)
2572                    return b[:count]
2573
2574                def _recvfrom_into():
2575                    b = bytearray(b"\0"*100)
2576                    count, addr = s.recvfrom_into(b)
2577                    return b[:count]
2578
2579                # (name, method, whether to expect success, *args)
2580                send_methods = [
2581                    ('send', s.send, True, []),
2582                    ('sendto', s.sendto, False, ["some.address"]),
2583                    ('sendall', s.sendall, True, []),
2584                ]
2585                recv_methods = [
2586                    ('recv', s.recv, True, []),
2587                    ('recvfrom', s.recvfrom, False, ["some.address"]),
2588                    ('recv_into', _recv_into, True, []),
2589                    ('recvfrom_into', _recvfrom_into, False, []),
2590                ]
2591                data_prefix = u"PREFIX_"
2592
2593                for meth_name, send_meth, expect_success, args in send_methods:
2594                    indata = (data_prefix + meth_name).encode('ascii')
2595                    try:
2596                        send_meth(indata, *args)
2597                        outdata = s.read()
2598                        if outdata != indata.lower():
2599                            self.fail(
2600                                "While sending with <<{name:s}>> bad data "
2601                                "<<{outdata:r}>> ({nout:d}) received; "
2602                                "expected <<{indata:r}>> ({nin:d})\n".format(
2603                                    name=meth_name, outdata=outdata[:20],
2604                                    nout=len(outdata),
2605                                    indata=indata[:20], nin=len(indata)
2606                                )
2607                            )
2608                    except ValueError as e:
2609                        if expect_success:
2610                            self.fail(
2611                                "Failed to send with method <<{name:s}>>; "
2612                                "expected to succeed.\n".format(name=meth_name)
2613                            )
2614                        if not str(e).startswith(meth_name):
2615                            self.fail(
2616                                "Method <<{name:s}>> failed with unexpected "
2617                                "exception message: {exp:s}\n".format(
2618                                    name=meth_name, exp=e
2619                                )
2620                            )
2621
2622                for meth_name, recv_meth, expect_success, args in recv_methods:
2623                    indata = (data_prefix + meth_name).encode('ascii')
2624                    try:
2625                        s.send(indata)
2626                        outdata = recv_meth(*args)
2627                        if outdata != indata.lower():
2628                            self.fail(
2629                                "While receiving with <<{name:s}>> bad data "
2630                                "<<{outdata:r}>> ({nout:d}) received; "
2631                                "expected <<{indata:r}>> ({nin:d})\n".format(
2632                                    name=meth_name, outdata=outdata[:20],
2633                                    nout=len(outdata),
2634                                    indata=indata[:20], nin=len(indata)
2635                                )
2636                            )
2637                    except ValueError as e:
2638                        if expect_success:
2639                            self.fail(
2640                                "Failed to receive with method <<{name:s}>>; "
2641                                "expected to succeed.\n".format(name=meth_name)
2642                            )
2643                        if not str(e).startswith(meth_name):
2644                            self.fail(
2645                                "Method <<{name:s}>> failed with unexpected "
2646                                "exception message: {exp:s}\n".format(
2647                                    name=meth_name, exp=e
2648                                )
2649                            )
2650                        # consume data
2651                        s.read()
2652
2653                # read(-1, buffer) is supported, even though read(-1) is not
2654                data = b"data"
2655                s.send(data)
2656                buffer = bytearray(len(data))
2657                self.assertEqual(s.read(-1, buffer), len(data))
2658                self.assertEqual(buffer, data)
2659
2660                s.write(b"over\n")
2661
2662                self.assertRaises(ValueError, s.recv, -1)
2663                self.assertRaises(ValueError, s.read, -1)
2664
2665                s.close()
2666
2667        def test_recv_zero(self):
2668            server = ThreadedEchoServer(CERTFILE)
2669            server.__enter__()
2670            self.addCleanup(server.__exit__, None, None)
2671            s = socket.create_connection((HOST, server.port))
2672            self.addCleanup(s.close)
2673            s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
2674            self.addCleanup(s.close)
2675
2676            # recv/read(0) should return no data
2677            s.send(b"data")
2678            self.assertEqual(s.recv(0), b"")
2679            self.assertEqual(s.read(0), b"")
2680            self.assertEqual(s.read(), b"data")
2681
2682            # Should not block if the other end sends no data
2683            s.setblocking(False)
2684            self.assertEqual(s.recv(0), b"")
2685            self.assertEqual(s.recv_into(bytearray()), 0)
2686
2687        def test_handshake_timeout(self):
2688            # Issue #5103: SSL handshake must respect the socket timeout
2689            server = socket.socket(socket.AF_INET)
2690            host = "127.0.0.1"
2691            port = support.bind_port(server)
2692            started = threading.Event()
2693            finish = False
2694
2695            def serve():
2696                server.listen(5)
2697                started.set()
2698                conns = []
2699                while not finish:
2700                    r, w, e = select.select([server], [], [], 0.1)
2701                    if server in r:
2702                        # Let the socket hang around rather than having
2703                        # it closed by garbage collection.
2704                        conns.append(server.accept()[0])
2705                for sock in conns:
2706                    sock.close()
2707
2708            t = threading.Thread(target=serve)
2709            t.start()
2710            started.wait()
2711
2712            try:
2713                try:
2714                    c = socket.socket(socket.AF_INET)
2715                    c.settimeout(0.2)
2716                    c.connect((host, port))
2717                    # Will attempt handshake and time out
2718                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
2719                                            ssl.wrap_socket, c)
2720                finally:
2721                    c.close()
2722                try:
2723                    c = socket.socket(socket.AF_INET)
2724                    c = ssl.wrap_socket(c)
2725                    c.settimeout(0.2)
2726                    # Will attempt handshake and time out
2727                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
2728                                            c.connect, (host, port))
2729                finally:
2730                    c.close()
2731            finally:
2732                finish = True
2733                t.join()
2734                server.close()
2735
2736        def test_server_accept(self):
2737            # Issue #16357: accept() on a SSLSocket created through
2738            # SSLContext.wrap_socket().
2739            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2740            context.verify_mode = ssl.CERT_REQUIRED
2741            context.load_verify_locations(CERTFILE)
2742            context.load_cert_chain(CERTFILE)
2743            server = socket.socket(socket.AF_INET)
2744            host = "127.0.0.1"
2745            port = support.bind_port(server)
2746            server = context.wrap_socket(server, server_side=True)
2747
2748            evt = threading.Event()
2749            remote = [None]
2750            peer = [None]
2751            def serve():
2752                server.listen(5)
2753                # Block on the accept and wait on the connection to close.
2754                evt.set()
2755                remote[0], peer[0] = server.accept()
2756                remote[0].recv(1)
2757
2758            t = threading.Thread(target=serve)
2759            t.start()
2760            # Client wait until server setup and perform a connect.
2761            evt.wait()
2762            client = context.wrap_socket(socket.socket())
2763            client.connect((host, port))
2764            client_addr = client.getsockname()
2765            client.close()
2766            t.join()
2767            remote[0].close()
2768            server.close()
2769            # Sanity checks.
2770            self.assertIsInstance(remote[0], ssl.SSLSocket)
2771            self.assertEqual(peer[0], client_addr)
2772
2773        def test_getpeercert_enotconn(self):
2774            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2775            with closing(context.wrap_socket(socket.socket())) as sock:
2776                with self.assertRaises(socket.error) as cm:
2777                    sock.getpeercert()
2778                self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2779
2780        def test_do_handshake_enotconn(self):
2781            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2782            with closing(context.wrap_socket(socket.socket())) as sock:
2783                with self.assertRaises(socket.error) as cm:
2784                    sock.do_handshake()
2785                self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2786
2787        def test_default_ciphers(self):
2788            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2789            try:
2790                # Force a set of weak ciphers on our client context
2791                context.set_ciphers("DES")
2792            except ssl.SSLError:
2793                self.skipTest("no DES cipher available")
2794            with ThreadedEchoServer(CERTFILE,
2795                                    ssl_version=ssl.PROTOCOL_SSLv23,
2796                                    chatty=False) as server:
2797                with closing(context.wrap_socket(socket.socket())) as s:
2798                    with self.assertRaises(ssl.SSLError):
2799                        s.connect((HOST, server.port))
2800            self.assertIn("no shared cipher", str(server.conn_errors[0]))
2801
2802        def test_version_basic(self):
2803            """
2804            Basic tests for SSLSocket.version().
2805            More tests are done in the test_protocol_*() methods.
2806            """
2807            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2808            with ThreadedEchoServer(CERTFILE,
2809                                    ssl_version=ssl.PROTOCOL_TLSv1,
2810                                    chatty=False) as server:
2811                with closing(context.wrap_socket(socket.socket())) as s:
2812                    self.assertIs(s.version(), None)
2813                    s.connect((HOST, server.port))
2814                    self.assertEqual(s.version(), 'TLSv1')
2815                self.assertIs(s.version(), None)
2816
2817        @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2818        def test_default_ecdh_curve(self):
2819            # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2820            # should be enabled by default on SSL contexts.
2821            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2822            context.load_cert_chain(CERTFILE)
2823            # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2824            # explicitly using the 'ECCdraft' cipher alias.  Otherwise,
2825            # our default cipher list should prefer ECDH-based ciphers
2826            # automatically.
2827            if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2828                context.set_ciphers("ECCdraft:ECDH")
2829            with ThreadedEchoServer(context=context) as server:
2830                with closing(context.wrap_socket(socket.socket())) as s:
2831                    s.connect((HOST, server.port))
2832                    self.assertIn("ECDH", s.cipher()[0])
2833
2834        @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2835                             "'tls-unique' channel binding not available")
2836        def test_tls_unique_channel_binding(self):
2837            """Test tls-unique channel binding."""
2838            if support.verbose:
2839                sys.stdout.write("\n")
2840
2841            server = ThreadedEchoServer(CERTFILE,
2842                                        certreqs=ssl.CERT_NONE,
2843                                        ssl_version=ssl.PROTOCOL_TLSv1,
2844                                        cacerts=CERTFILE,
2845                                        chatty=True,
2846                                        connectionchatty=False)
2847            with server:
2848                s = ssl.wrap_socket(socket.socket(),
2849                                    server_side=False,
2850                                    certfile=CERTFILE,
2851                                    ca_certs=CERTFILE,
2852                                    cert_reqs=ssl.CERT_NONE,
2853                                    ssl_version=ssl.PROTOCOL_TLSv1)
2854                s.connect((HOST, server.port))
2855                # get the data
2856                cb_data = s.get_channel_binding("tls-unique")
2857                if support.verbose:
2858                    sys.stdout.write(" got channel binding data: {0!r}\n"
2859                                     .format(cb_data))
2860
2861                # check if it is sane
2862                self.assertIsNotNone(cb_data)
2863                self.assertEqual(len(cb_data), 12) # True for TLSv1
2864
2865                # and compare with the peers version
2866                s.write(b"CB tls-unique\n")
2867                peer_data_repr = s.read().strip()
2868                self.assertEqual(peer_data_repr,
2869                                 repr(cb_data).encode("us-ascii"))
2870                s.close()
2871
2872                # now, again
2873                s = ssl.wrap_socket(socket.socket(),
2874                                    server_side=False,
2875                                    certfile=CERTFILE,
2876                                    ca_certs=CERTFILE,
2877                                    cert_reqs=ssl.CERT_NONE,
2878                                    ssl_version=ssl.PROTOCOL_TLSv1)
2879                s.connect((HOST, server.port))
2880                new_cb_data = s.get_channel_binding("tls-unique")
2881                if support.verbose:
2882                    sys.stdout.write(" got another channel binding data: {0!r}\n"
2883                                     .format(new_cb_data))
2884                # is it really unique
2885                self.assertNotEqual(cb_data, new_cb_data)
2886                self.assertIsNotNone(cb_data)
2887                self.assertEqual(len(cb_data), 12) # True for TLSv1
2888                s.write(b"CB tls-unique\n")
2889                peer_data_repr = s.read().strip()
2890                self.assertEqual(peer_data_repr,
2891                                 repr(new_cb_data).encode("us-ascii"))
2892                s.close()
2893
2894        def test_compression(self):
2895            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2896            context.load_cert_chain(CERTFILE)
2897            stats = server_params_test(context, context,
2898                                       chatty=True, connectionchatty=True)
2899            if support.verbose:
2900                sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2901            self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2902
2903        @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2904                             "ssl.OP_NO_COMPRESSION needed for this test")
2905        def test_compression_disabled(self):
2906            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2907            context.load_cert_chain(CERTFILE)
2908            context.options |= ssl.OP_NO_COMPRESSION
2909            stats = server_params_test(context, context,
2910                                       chatty=True, connectionchatty=True)
2911            self.assertIs(stats['compression'], None)
2912
2913        def test_dh_params(self):
2914            # Check we can get a connection with ephemeral Diffie-Hellman
2915            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2916            context.load_cert_chain(CERTFILE)
2917            context.load_dh_params(DHFILE)
2918            context.set_ciphers("kEDH")
2919            stats = server_params_test(context, context,
2920                                       chatty=True, connectionchatty=True)
2921            cipher = stats["cipher"][0]
2922            parts = cipher.split("-")
2923            if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2924                self.fail("Non-DH cipher: " + cipher[0])
2925
2926        def test_selected_alpn_protocol(self):
2927            # selected_alpn_protocol() is None unless ALPN is used.
2928            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2929            context.load_cert_chain(CERTFILE)
2930            stats = server_params_test(context, context,
2931                                       chatty=True, connectionchatty=True)
2932            self.assertIs(stats['client_alpn_protocol'], None)
2933
2934        @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
2935        def test_selected_alpn_protocol_if_server_uses_alpn(self):
2936            # selected_alpn_protocol() is None unless ALPN is used by the client.
2937            client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2938            client_context.load_verify_locations(CERTFILE)
2939            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2940            server_context.load_cert_chain(CERTFILE)
2941            server_context.set_alpn_protocols(['foo', 'bar'])
2942            stats = server_params_test(client_context, server_context,
2943                                       chatty=True, connectionchatty=True)
2944            self.assertIs(stats['client_alpn_protocol'], None)
2945
2946        @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
2947        def test_alpn_protocols(self):
2948            server_protocols = ['foo', 'bar', 'milkshake']
2949            protocol_tests = [
2950                (['foo', 'bar'], 'foo'),
2951                (['bar', 'foo'], 'foo'),
2952                (['milkshake'], 'milkshake'),
2953                (['http/3.0', 'http/4.0'], None)
2954            ]
2955            for client_protocols, expected in protocol_tests:
2956                server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
2957                server_context.load_cert_chain(CERTFILE)
2958                server_context.set_alpn_protocols(server_protocols)
2959                client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
2960                client_context.load_cert_chain(CERTFILE)
2961                client_context.set_alpn_protocols(client_protocols)
2962
2963                try:
2964                    stats = server_params_test(client_context,
2965                                               server_context,
2966                                               chatty=True,
2967                                               connectionchatty=True)
2968                except ssl.SSLError as e:
2969                    stats = e
2970
2971                if expected is None and IS_OPENSSL_1_1:
2972                    # OpenSSL 1.1.0 raises handshake error
2973                    self.assertIsInstance(stats, ssl.SSLError)
2974                else:
2975                    msg = "failed trying %s (s) and %s (c).\n" \
2976                        "was expecting %s, but got %%s from the %%s" \
2977                            % (str(server_protocols), str(client_protocols),
2978                                str(expected))
2979                    client_result = stats['client_alpn_protocol']
2980                    self.assertEqual(client_result, expected,
2981                                     msg % (client_result, "client"))
2982                    server_result = stats['server_alpn_protocols'][-1] \
2983                        if len(stats['server_alpn_protocols']) else 'nothing'
2984                    self.assertEqual(server_result, expected,
2985                                     msg % (server_result, "server"))
2986
2987        def test_selected_npn_protocol(self):
2988            # selected_npn_protocol() is None unless NPN is used
2989            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2990            context.load_cert_chain(CERTFILE)
2991            stats = server_params_test(context, context,
2992                                       chatty=True, connectionchatty=True)
2993            self.assertIs(stats['client_npn_protocol'], None)
2994
2995        @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2996        def test_npn_protocols(self):
2997            server_protocols = ['http/1.1', 'spdy/2']
2998            protocol_tests = [
2999                (['http/1.1', 'spdy/2'], 'http/1.1'),
3000                (['spdy/2', 'http/1.1'], 'http/1.1'),
3001                (['spdy/2', 'test'], 'spdy/2'),
3002                (['abc', 'def'], 'abc')
3003            ]
3004            for client_protocols, expected in protocol_tests:
3005                server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3006                server_context.load_cert_chain(CERTFILE)
3007                server_context.set_npn_protocols(server_protocols)
3008                client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3009                client_context.load_cert_chain(CERTFILE)
3010                client_context.set_npn_protocols(client_protocols)
3011                stats = server_params_test(client_context, server_context,
3012                                           chatty=True, connectionchatty=True)
3013
3014                msg = "failed trying %s (s) and %s (c).\n" \
3015                      "was expecting %s, but got %%s from the %%s" \
3016                          % (str(server_protocols), str(client_protocols),
3017                             str(expected))
3018                client_result = stats['client_npn_protocol']
3019                self.assertEqual(client_result, expected, msg % (client_result, "client"))
3020                server_result = stats['server_npn_protocols'][-1] \
3021                    if len(stats['server_npn_protocols']) else 'nothing'
3022                self.assertEqual(server_result, expected, msg % (server_result, "server"))
3023
3024        def sni_contexts(self):
3025            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3026            server_context.load_cert_chain(SIGNED_CERTFILE)
3027            other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3028            other_context.load_cert_chain(SIGNED_CERTFILE2)
3029            client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3030            client_context.verify_mode = ssl.CERT_REQUIRED
3031            client_context.load_verify_locations(SIGNING_CA)
3032            return server_context, other_context, client_context
3033
3034        def check_common_name(self, stats, name):
3035            cert = stats['peercert']
3036            self.assertIn((('commonName', name),), cert['subject'])
3037
3038        @needs_sni
3039        def test_sni_callback(self):
3040            calls = []
3041            server_context, other_context, client_context = self.sni_contexts()
3042
3043            def servername_cb(ssl_sock, server_name, initial_context):
3044                calls.append((server_name, initial_context))
3045                if server_name is not None:
3046                    ssl_sock.context = other_context
3047            server_context.set_servername_callback(servername_cb)
3048
3049            stats = server_params_test(client_context, server_context,
3050                                       chatty=True,
3051                                       sni_name='supermessage')
3052            # The hostname was fetched properly, and the certificate was
3053            # changed for the connection.
3054            self.assertEqual(calls, [("supermessage", server_context)])
3055            # CERTFILE4 was selected
3056            self.check_common_name(stats, 'fakehostname')
3057
3058            calls = []
3059            # The callback is called with server_name=None
3060            stats = server_params_test(client_context, server_context,
3061                                       chatty=True,
3062                                       sni_name=None)
3063            self.assertEqual(calls, [(None, server_context)])
3064            self.check_common_name(stats, 'localhost')
3065
3066            # Check disabling the callback
3067            calls = []
3068            server_context.set_servername_callback(None)
3069
3070            stats = server_params_test(client_context, server_context,
3071                                       chatty=True,
3072                                       sni_name='notfunny')
3073            # Certificate didn't change
3074            self.check_common_name(stats, 'localhost')
3075            self.assertEqual(calls, [])
3076
3077        @needs_sni
3078        def test_sni_callback_alert(self):
3079            # Returning a TLS alert is reflected to the connecting client
3080            server_context, other_context, client_context = self.sni_contexts()
3081
3082            def cb_returning_alert(ssl_sock, server_name, initial_context):
3083                return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3084            server_context.set_servername_callback(cb_returning_alert)
3085
3086            with self.assertRaises(ssl.SSLError) as cm:
3087                stats = server_params_test(client_context, server_context,
3088                                           chatty=False,
3089                                           sni_name='supermessage')
3090            self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3091
3092        @needs_sni
3093        def test_sni_callback_raising(self):
3094            # Raising fails the connection with a TLS handshake failure alert.
3095            server_context, other_context, client_context = self.sni_contexts()
3096
3097            def cb_raising(ssl_sock, server_name, initial_context):
3098                1.0/0.0
3099            server_context.set_servername_callback(cb_raising)
3100
3101            with self.assertRaises(ssl.SSLError) as cm, \
3102                 support.captured_stderr() as stderr:
3103                stats = server_params_test(client_context, server_context,
3104                                           chatty=False,
3105                                           sni_name='supermessage')
3106            self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3107            self.assertIn("ZeroDivisionError", stderr.getvalue())
3108
3109        @needs_sni
3110        def test_sni_callback_wrong_return_type(self):
3111            # Returning the wrong return type terminates the TLS connection
3112            # with an internal error alert.
3113            server_context, other_context, client_context = self.sni_contexts()
3114
3115            def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3116                return "foo"
3117            server_context.set_servername_callback(cb_wrong_return_type)
3118
3119            with self.assertRaises(ssl.SSLError) as cm, \
3120                 support.captured_stderr() as stderr:
3121                stats = server_params_test(client_context, server_context,
3122                                           chatty=False,
3123                                           sni_name='supermessage')
3124            self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3125            self.assertIn("TypeError", stderr.getvalue())
3126
3127        def test_read_write_after_close_raises_valuerror(self):
3128            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3129            context.verify_mode = ssl.CERT_REQUIRED
3130            context.load_verify_locations(CERTFILE)
3131            context.load_cert_chain(CERTFILE)
3132            server = ThreadedEchoServer(context=context, chatty=False)
3133
3134            with server:
3135                s = context.wrap_socket(socket.socket())
3136                s.connect((HOST, server.port))
3137                s.close()
3138
3139                self.assertRaises(ValueError, s.read, 1024)
3140                self.assertRaises(ValueError, s.write, b'hello')
3141
3142
3143def test_main(verbose=False):
3144    if support.verbose:
3145        plats = {
3146            'Linux': platform.linux_distribution,
3147            'Mac': platform.mac_ver,
3148            'Windows': platform.win32_ver,
3149        }
3150        for name, func in plats.items():
3151            plat = func()
3152            if plat and plat[0]:
3153                plat = '%s %r' % (name, plat)
3154                break
3155        else:
3156            plat = repr(platform.platform())
3157        print("test_ssl: testing with %r %r" %
3158            (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3159        print("          under %s" % plat)
3160        print("          HAS_SNI = %r" % ssl.HAS_SNI)
3161        print("          OP_ALL = 0x%8x" % ssl.OP_ALL)
3162        try:
3163            print("          OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3164        except AttributeError:
3165            pass
3166
3167    for filename in [
3168        CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
3169        ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
3170        SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
3171        BADCERT, BADKEY, EMPTYCERT]:
3172        if not os.path.exists(filename):
3173            raise support.TestFailed("Can't read certificate file %r" % filename)
3174
3175    tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
3176
3177    if support.is_resource_enabled('network'):
3178        tests.append(NetworkedTests)
3179
3180    if _have_threads:
3181        thread_info = support.threading_setup()
3182        if thread_info:
3183            tests.append(ThreadedTests)
3184
3185    try:
3186        support.run_unittest(*tests)
3187    finally:
3188        if _have_threads:
3189            support.threading_cleanup(*thread_info)
3190
3191if __name__ == "__main__":
3192    test_main()
3193