1# -*- coding: utf-8 -*- 2# Test the support for SSL and sockets 3 4import sys 5import unittest 6from test import test_support as support 7from test.script_helper import assert_python_ok 8import asyncore 9import socket 10import select 11import time 12import datetime 13import gc 14import os 15import errno 16import pprint 17import tempfile 18import urllib2 19import traceback 20import weakref 21import platform 22import functools 23from contextlib import closing 24 25ssl = support.import_module("ssl") 26 27PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) 28HOST = support.HOST 29IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL') 30IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0) 31 32 33def data_file(*name): 34 return os.path.join(os.path.dirname(__file__), *name) 35 36# The custom key and certificate files used in test_ssl are generated 37# using Lib/test/make_ssl_certs.py. 38# Other certificates are simply fetched from the Internet servers they 39# are meant to authenticate. 40 41CERTFILE = data_file("keycert.pem") 42BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding()) 43ONLYCERT = data_file("ssl_cert.pem") 44ONLYKEY = data_file("ssl_key.pem") 45BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding()) 46BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding()) 47CERTFILE_PROTECTED = data_file("keycert.passwd.pem") 48ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem") 49KEY_PASSWORD = "somepass" 50CAPATH = data_file("capath") 51BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding()) 52CAFILE_NEURONIO = data_file("capath", "4e1295a3.0") 53CAFILE_CACERT = data_file("capath", "5ed36f99.0") 54 55 56# empty CRL 57CRLFILE = data_file("revocation.crl") 58 59# Two keys and certs signed by the same CA (for SNI tests) 60SIGNED_CERTFILE = data_file("keycert3.pem") 61SIGNED_CERTFILE2 = data_file("keycert4.pem") 62SIGNING_CA = data_file("pycacert.pem") 63# cert with all kinds of subject alt names 64ALLSANFILE = data_file("allsans.pem") 65 66REMOTE_HOST = "self-signed.pythontest.net" 67REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem") 68 69EMPTYCERT = data_file("nullcert.pem") 70BADCERT = data_file("badcert.pem") 71NONEXISTINGCERT = data_file("XXXnonexisting.pem") 72BADKEY = data_file("badkey.pem") 73NOKIACERT = data_file("nokia.pem") 74NULLBYTECERT = data_file("nullbytecert.pem") 75 76DHFILE = data_file("dh1024.pem") 77BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding()) 78 79 80def handle_error(prefix): 81 exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) 82 if support.verbose: 83 sys.stdout.write(prefix + exc_format) 84 85 86class BasicTests(unittest.TestCase): 87 88 def test_sslwrap_simple(self): 89 # A crude test for the legacy API 90 try: 91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)) 92 except IOError, e: 93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that 94 pass 95 else: 96 raise 97 try: 98 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) 99 except IOError, e: 100 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that 101 pass 102 else: 103 raise 104 105 106def can_clear_options(): 107 # 0.9.8m or higher 108 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15) 109 110def no_sslv2_implies_sslv3_hello(): 111 # 0.9.7h or higher 112 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) 113 114def have_verify_flags(): 115 # 0.9.8 or higher 116 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15) 117 118def utc_offset(): #NOTE: ignore issues like #1647654 119 # local time = utc time + utc offset 120 if time.daylight and time.localtime().tm_isdst > 0: 121 return -time.altzone # seconds 122 return -time.timezone 123 124def asn1time(cert_time): 125 # Some versions of OpenSSL ignore seconds, see #18207 126 # 0.9.8.i 127 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15): 128 fmt = "%b %d %H:%M:%S %Y GMT" 129 dt = datetime.datetime.strptime(cert_time, fmt) 130 dt = dt.replace(second=0) 131 cert_time = dt.strftime(fmt) 132 # %d adds leading zero but ASN1_TIME_print() uses leading space 133 if cert_time[4] == "0": 134 cert_time = cert_time[:4] + " " + cert_time[5:] 135 136 return cert_time 137 138# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 139def skip_if_broken_ubuntu_ssl(func): 140 if hasattr(ssl, 'PROTOCOL_SSLv2'): 141 @functools.wraps(func) 142 def f(*args, **kwargs): 143 try: 144 ssl.SSLContext(ssl.PROTOCOL_SSLv2) 145 except ssl.SSLError: 146 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and 147 platform.linux_distribution() == ('debian', 'squeeze/sid', '')): 148 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") 149 return func(*args, **kwargs) 150 return f 151 else: 152 return func 153 154needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") 155 156 157class BasicSocketTests(unittest.TestCase): 158 159 def test_constants(self): 160 ssl.CERT_NONE 161 ssl.CERT_OPTIONAL 162 ssl.CERT_REQUIRED 163 ssl.OP_CIPHER_SERVER_PREFERENCE 164 ssl.OP_SINGLE_DH_USE 165 if ssl.HAS_ECDH: 166 ssl.OP_SINGLE_ECDH_USE 167 if ssl.OPENSSL_VERSION_INFO >= (1, 0): 168 ssl.OP_NO_COMPRESSION 169 self.assertIn(ssl.HAS_SNI, {True, False}) 170 self.assertIn(ssl.HAS_ECDH, {True, False}) 171 172 def test_random(self): 173 v = ssl.RAND_status() 174 if support.verbose: 175 sys.stdout.write("\n RAND_status is %d (%s)\n" 176 % (v, (v and "sufficient randomness") or 177 "insufficient randomness")) 178 if hasattr(ssl, 'RAND_egd'): 179 self.assertRaises(TypeError, ssl.RAND_egd, 1) 180 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) 181 ssl.RAND_add("this is a random string", 75.0) 182 183 def test_parse_cert(self): 184 # note that this uses an 'unofficial' function in _ssl.c, 185 # provided solely for this test, to exercise the certificate 186 # parsing code 187 p = ssl._ssl._test_decode_cert(CERTFILE) 188 if support.verbose: 189 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 190 self.assertEqual(p['issuer'], 191 ((('countryName', 'XY'),), 192 (('localityName', 'Castle Anthrax'),), 193 (('organizationName', 'Python Software Foundation'),), 194 (('commonName', 'localhost'),)) 195 ) 196 # Note the next three asserts will fail if the keys are regenerated 197 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT')) 198 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT')) 199 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E') 200 self.assertEqual(p['subject'], 201 ((('countryName', 'XY'),), 202 (('localityName', 'Castle Anthrax'),), 203 (('organizationName', 'Python Software Foundation'),), 204 (('commonName', 'localhost'),)) 205 ) 206 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) 207 # Issue #13034: the subjectAltName in some certificates 208 # (notably projects.developer.nokia.com:443) wasn't parsed 209 p = ssl._ssl._test_decode_cert(NOKIACERT) 210 if support.verbose: 211 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 212 self.assertEqual(p['subjectAltName'], 213 (('DNS', 'projects.developer.nokia.com'), 214 ('DNS', 'projects.forum.nokia.com')) 215 ) 216 # extra OCSP and AIA fields 217 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',)) 218 self.assertEqual(p['caIssuers'], 219 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',)) 220 self.assertEqual(p['crlDistributionPoints'], 221 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',)) 222 223 def test_parse_cert_CVE_2013_4238(self): 224 p = ssl._ssl._test_decode_cert(NULLBYTECERT) 225 if support.verbose: 226 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 227 subject = ((('countryName', 'US'),), 228 (('stateOrProvinceName', 'Oregon'),), 229 (('localityName', 'Beaverton'),), 230 (('organizationName', 'Python Software Foundation'),), 231 (('organizationalUnitName', 'Python Core Development'),), 232 (('commonName', 'null.python.org\x00example.org'),), 233 (('emailAddress', 'python-dev@python.org'),)) 234 self.assertEqual(p['subject'], subject) 235 self.assertEqual(p['issuer'], subject) 236 if ssl._OPENSSL_API_VERSION >= (0, 9, 8): 237 san = (('DNS', 'altnull.python.org\x00example.com'), 238 ('email', 'null@python.org\x00user@example.org'), 239 ('URI', 'http://null.python.org\x00http://example.org'), 240 ('IP Address', '192.0.2.1'), 241 ('IP Address', '2001:DB8:0:0:0:0:0:1\n')) 242 else: 243 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName 244 san = (('DNS', 'altnull.python.org\x00example.com'), 245 ('email', 'null@python.org\x00user@example.org'), 246 ('URI', 'http://null.python.org\x00http://example.org'), 247 ('IP Address', '192.0.2.1'), 248 ('IP Address', '<invalid>')) 249 250 self.assertEqual(p['subjectAltName'], san) 251 252 def test_parse_all_sans(self): 253 p = ssl._ssl._test_decode_cert(ALLSANFILE) 254 self.assertEqual(p['subjectAltName'], 255 ( 256 ('DNS', 'allsans'), 257 ('othername', '<unsupported>'), 258 ('othername', '<unsupported>'), 259 ('email', 'user@example.org'), 260 ('DNS', 'www.example.org'), 261 ('DirName', 262 ((('countryName', 'XY'),), 263 (('localityName', 'Castle Anthrax'),), 264 (('organizationName', 'Python Software Foundation'),), 265 (('commonName', 'dirname example'),))), 266 ('URI', 'https://www.python.org/'), 267 ('IP Address', '127.0.0.1'), 268 ('IP Address', '0:0:0:0:0:0:0:1\n'), 269 ('Registered ID', '1.2.3.4.5') 270 ) 271 ) 272 273 def test_DER_to_PEM(self): 274 with open(CAFILE_CACERT, 'r') as f: 275 pem = f.read() 276 d1 = ssl.PEM_cert_to_DER_cert(pem) 277 p2 = ssl.DER_cert_to_PEM_cert(d1) 278 d2 = ssl.PEM_cert_to_DER_cert(p2) 279 self.assertEqual(d1, d2) 280 if not p2.startswith(ssl.PEM_HEADER + '\n'): 281 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) 282 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): 283 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) 284 285 def test_openssl_version(self): 286 n = ssl.OPENSSL_VERSION_NUMBER 287 t = ssl.OPENSSL_VERSION_INFO 288 s = ssl.OPENSSL_VERSION 289 self.assertIsInstance(n, (int, long)) 290 self.assertIsInstance(t, tuple) 291 self.assertIsInstance(s, str) 292 # Some sanity checks follow 293 # >= 0.9 294 self.assertGreaterEqual(n, 0x900000) 295 # < 3.0 296 self.assertLess(n, 0x30000000) 297 major, minor, fix, patch, status = t 298 self.assertGreaterEqual(major, 0) 299 self.assertLess(major, 3) 300 self.assertGreaterEqual(minor, 0) 301 self.assertLess(minor, 256) 302 self.assertGreaterEqual(fix, 0) 303 self.assertLess(fix, 256) 304 self.assertGreaterEqual(patch, 0) 305 self.assertLessEqual(patch, 63) 306 self.assertGreaterEqual(status, 0) 307 self.assertLessEqual(status, 15) 308 # Version string as returned by {Open,Libre}SSL, the format might change 309 if IS_LIBRESSL: 310 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)), 311 (s, t, hex(n))) 312 else: 313 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), 314 (s, t)) 315 316 @support.cpython_only 317 def test_refcycle(self): 318 # Issue #7943: an SSL object doesn't create reference cycles with 319 # itself. 320 s = socket.socket(socket.AF_INET) 321 ss = ssl.wrap_socket(s) 322 wr = weakref.ref(ss) 323 del ss 324 self.assertEqual(wr(), None) 325 326 def test_wrapped_unconnected(self): 327 # Methods on an unconnected SSLSocket propagate the original 328 # socket.error raise by the underlying socket object. 329 s = socket.socket(socket.AF_INET) 330 with closing(ssl.wrap_socket(s)) as ss: 331 self.assertRaises(socket.error, ss.recv, 1) 332 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) 333 self.assertRaises(socket.error, ss.recvfrom, 1) 334 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) 335 self.assertRaises(socket.error, ss.send, b'x') 336 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) 337 338 def test_timeout(self): 339 # Issue #8524: when creating an SSL socket, the timeout of the 340 # original socket should be retained. 341 for timeout in (None, 0.0, 5.0): 342 s = socket.socket(socket.AF_INET) 343 s.settimeout(timeout) 344 with closing(ssl.wrap_socket(s)) as ss: 345 self.assertEqual(timeout, ss.gettimeout()) 346 347 def test_errors(self): 348 sock = socket.socket() 349 self.assertRaisesRegexp(ValueError, 350 "certfile must be specified", 351 ssl.wrap_socket, sock, keyfile=CERTFILE) 352 self.assertRaisesRegexp(ValueError, 353 "certfile must be specified for server-side operations", 354 ssl.wrap_socket, sock, server_side=True) 355 self.assertRaisesRegexp(ValueError, 356 "certfile must be specified for server-side operations", 357 ssl.wrap_socket, sock, server_side=True, certfile="") 358 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s: 359 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode", 360 s.connect, (HOST, 8080)) 361 with self.assertRaises(IOError) as cm: 362 with closing(socket.socket()) as sock: 363 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT) 364 self.assertEqual(cm.exception.errno, errno.ENOENT) 365 with self.assertRaises(IOError) as cm: 366 with closing(socket.socket()) as sock: 367 ssl.wrap_socket(sock, 368 certfile=CERTFILE, keyfile=NONEXISTINGCERT) 369 self.assertEqual(cm.exception.errno, errno.ENOENT) 370 with self.assertRaises(IOError) as cm: 371 with closing(socket.socket()) as sock: 372 ssl.wrap_socket(sock, 373 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT) 374 self.assertEqual(cm.exception.errno, errno.ENOENT) 375 376 def bad_cert_test(self, certfile): 377 """Check that trying to use the given client certificate fails""" 378 certfile = os.path.join(os.path.dirname(__file__) or os.curdir, 379 certfile) 380 sock = socket.socket() 381 self.addCleanup(sock.close) 382 with self.assertRaises(ssl.SSLError): 383 ssl.wrap_socket(sock, 384 certfile=certfile, 385 ssl_version=ssl.PROTOCOL_TLSv1) 386 387 def test_empty_cert(self): 388 """Wrapping with an empty cert file""" 389 self.bad_cert_test("nullcert.pem") 390 391 def test_malformed_cert(self): 392 """Wrapping with a badly formatted certificate (syntax error)""" 393 self.bad_cert_test("badcert.pem") 394 395 def test_malformed_key(self): 396 """Wrapping with a badly formatted key (syntax error)""" 397 self.bad_cert_test("badkey.pem") 398 399 def test_match_hostname(self): 400 def ok(cert, hostname): 401 ssl.match_hostname(cert, hostname) 402 def fail(cert, hostname): 403 self.assertRaises(ssl.CertificateError, 404 ssl.match_hostname, cert, hostname) 405 406 cert = {'subject': ((('commonName', 'example.com'),),)} 407 ok(cert, 'example.com') 408 ok(cert, 'ExAmple.cOm') 409 fail(cert, 'www.example.com') 410 fail(cert, '.example.com') 411 fail(cert, 'example.org') 412 fail(cert, 'exampleXcom') 413 414 cert = {'subject': ((('commonName', '*.a.com'),),)} 415 ok(cert, 'foo.a.com') 416 fail(cert, 'bar.foo.a.com') 417 fail(cert, 'a.com') 418 fail(cert, 'Xa.com') 419 fail(cert, '.a.com') 420 421 # only match one left-most wildcard 422 cert = {'subject': ((('commonName', 'f*.com'),),)} 423 ok(cert, 'foo.com') 424 ok(cert, 'f.com') 425 fail(cert, 'bar.com') 426 fail(cert, 'foo.a.com') 427 fail(cert, 'bar.foo.com') 428 429 # NULL bytes are bad, CVE-2013-4073 430 cert = {'subject': ((('commonName', 431 'null.python.org\x00example.org'),),)} 432 ok(cert, 'null.python.org\x00example.org') # or raise an error? 433 fail(cert, 'example.org') 434 fail(cert, 'null.python.org') 435 436 # error cases with wildcards 437 cert = {'subject': ((('commonName', '*.*.a.com'),),)} 438 fail(cert, 'bar.foo.a.com') 439 fail(cert, 'a.com') 440 fail(cert, 'Xa.com') 441 fail(cert, '.a.com') 442 443 cert = {'subject': ((('commonName', 'a.*.com'),),)} 444 fail(cert, 'a.foo.com') 445 fail(cert, 'a..com') 446 fail(cert, 'a.com') 447 448 # wildcard doesn't match IDNA prefix 'xn--' 449 idna = u'püthon.python.org'.encode("idna").decode("ascii") 450 cert = {'subject': ((('commonName', idna),),)} 451 ok(cert, idna) 452 cert = {'subject': ((('commonName', 'x*.python.org'),),)} 453 fail(cert, idna) 454 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)} 455 fail(cert, idna) 456 457 # wildcard in first fragment and IDNA A-labels in sequent fragments 458 # are supported. 459 idna = u'www*.pythön.org'.encode("idna").decode("ascii") 460 cert = {'subject': ((('commonName', idna),),)} 461 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii")) 462 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii")) 463 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii")) 464 fail(cert, u'pythön.org'.encode("idna").decode("ascii")) 465 466 # Slightly fake real-world example 467 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT', 468 'subject': ((('commonName', 'linuxfrz.org'),),), 469 'subjectAltName': (('DNS', 'linuxfr.org'), 470 ('DNS', 'linuxfr.com'), 471 ('othername', '<unsupported>'))} 472 ok(cert, 'linuxfr.org') 473 ok(cert, 'linuxfr.com') 474 # Not a "DNS" entry 475 fail(cert, '<unsupported>') 476 # When there is a subjectAltName, commonName isn't used 477 fail(cert, 'linuxfrz.org') 478 479 # A pristine real-world example 480 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', 481 'subject': ((('countryName', 'US'),), 482 (('stateOrProvinceName', 'California'),), 483 (('localityName', 'Mountain View'),), 484 (('organizationName', 'Google Inc'),), 485 (('commonName', 'mail.google.com'),))} 486 ok(cert, 'mail.google.com') 487 fail(cert, 'gmail.com') 488 # Only commonName is considered 489 fail(cert, 'California') 490 491 # Neither commonName nor subjectAltName 492 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', 493 'subject': ((('countryName', 'US'),), 494 (('stateOrProvinceName', 'California'),), 495 (('localityName', 'Mountain View'),), 496 (('organizationName', 'Google Inc'),))} 497 fail(cert, 'mail.google.com') 498 499 # No DNS entry in subjectAltName but a commonName 500 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', 501 'subject': ((('countryName', 'US'),), 502 (('stateOrProvinceName', 'California'),), 503 (('localityName', 'Mountain View'),), 504 (('commonName', 'mail.google.com'),)), 505 'subjectAltName': (('othername', 'blabla'), )} 506 ok(cert, 'mail.google.com') 507 508 # No DNS entry subjectAltName and no commonName 509 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', 510 'subject': ((('countryName', 'US'),), 511 (('stateOrProvinceName', 'California'),), 512 (('localityName', 'Mountain View'),), 513 (('organizationName', 'Google Inc'),)), 514 'subjectAltName': (('othername', 'blabla'),)} 515 fail(cert, 'google.com') 516 517 # Empty cert / no cert 518 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com') 519 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com') 520 521 # Issue #17980: avoid denials of service by refusing more than one 522 # wildcard per fragment. 523 cert = {'subject': ((('commonName', 'a*b.com'),),)} 524 ok(cert, 'axxb.com') 525 cert = {'subject': ((('commonName', 'a*b.co*'),),)} 526 fail(cert, 'axxb.com') 527 cert = {'subject': ((('commonName', 'a*b*.com'),),)} 528 with self.assertRaises(ssl.CertificateError) as cm: 529 ssl.match_hostname(cert, 'axxbxxc.com') 530 self.assertIn("too many wildcards", str(cm.exception)) 531 532 def test_server_side(self): 533 # server_hostname doesn't work for server sockets 534 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 535 with closing(socket.socket()) as sock: 536 self.assertRaises(ValueError, ctx.wrap_socket, sock, True, 537 server_hostname="some.hostname") 538 539 def test_unknown_channel_binding(self): 540 # should raise ValueError for unknown type 541 s = socket.socket(socket.AF_INET) 542 with closing(ssl.wrap_socket(s)) as ss: 543 with self.assertRaises(ValueError): 544 ss.get_channel_binding("unknown-type") 545 546 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, 547 "'tls-unique' channel binding not available") 548 def test_tls_unique_channel_binding(self): 549 # unconnected should return None for known type 550 s = socket.socket(socket.AF_INET) 551 with closing(ssl.wrap_socket(s)) as ss: 552 self.assertIsNone(ss.get_channel_binding("tls-unique")) 553 # the same for server-side 554 s = socket.socket(socket.AF_INET) 555 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss: 556 self.assertIsNone(ss.get_channel_binding("tls-unique")) 557 558 def test_get_default_verify_paths(self): 559 paths = ssl.get_default_verify_paths() 560 self.assertEqual(len(paths), 6) 561 self.assertIsInstance(paths, ssl.DefaultVerifyPaths) 562 563 with support.EnvironmentVarGuard() as env: 564 env["SSL_CERT_DIR"] = CAPATH 565 env["SSL_CERT_FILE"] = CERTFILE 566 paths = ssl.get_default_verify_paths() 567 self.assertEqual(paths.cafile, CERTFILE) 568 self.assertEqual(paths.capath, CAPATH) 569 570 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 571 def test_enum_certificates(self): 572 self.assertTrue(ssl.enum_certificates("CA")) 573 self.assertTrue(ssl.enum_certificates("ROOT")) 574 575 self.assertRaises(TypeError, ssl.enum_certificates) 576 self.assertRaises(WindowsError, ssl.enum_certificates, "") 577 578 trust_oids = set() 579 for storename in ("CA", "ROOT"): 580 store = ssl.enum_certificates(storename) 581 self.assertIsInstance(store, list) 582 for element in store: 583 self.assertIsInstance(element, tuple) 584 self.assertEqual(len(element), 3) 585 cert, enc, trust = element 586 self.assertIsInstance(cert, bytes) 587 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"}) 588 self.assertIsInstance(trust, (set, bool)) 589 if isinstance(trust, set): 590 trust_oids.update(trust) 591 592 serverAuth = "1.3.6.1.5.5.7.3.1" 593 self.assertIn(serverAuth, trust_oids) 594 595 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 596 def test_enum_crls(self): 597 self.assertTrue(ssl.enum_crls("CA")) 598 self.assertRaises(TypeError, ssl.enum_crls) 599 self.assertRaises(WindowsError, ssl.enum_crls, "") 600 601 crls = ssl.enum_crls("CA") 602 self.assertIsInstance(crls, list) 603 for element in crls: 604 self.assertIsInstance(element, tuple) 605 self.assertEqual(len(element), 2) 606 self.assertIsInstance(element[0], bytes) 607 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"}) 608 609 610 def test_asn1object(self): 611 expected = (129, 'serverAuth', 'TLS Web Server Authentication', 612 '1.3.6.1.5.5.7.3.1') 613 614 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1') 615 self.assertEqual(val, expected) 616 self.assertEqual(val.nid, 129) 617 self.assertEqual(val.shortname, 'serverAuth') 618 self.assertEqual(val.longname, 'TLS Web Server Authentication') 619 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1') 620 self.assertIsInstance(val, ssl._ASN1Object) 621 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth') 622 623 val = ssl._ASN1Object.fromnid(129) 624 self.assertEqual(val, expected) 625 self.assertIsInstance(val, ssl._ASN1Object) 626 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1) 627 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"): 628 ssl._ASN1Object.fromnid(100000) 629 for i in range(1000): 630 try: 631 obj = ssl._ASN1Object.fromnid(i) 632 except ValueError: 633 pass 634 else: 635 self.assertIsInstance(obj.nid, int) 636 self.assertIsInstance(obj.shortname, str) 637 self.assertIsInstance(obj.longname, str) 638 self.assertIsInstance(obj.oid, (str, type(None))) 639 640 val = ssl._ASN1Object.fromname('TLS Web Server Authentication') 641 self.assertEqual(val, expected) 642 self.assertIsInstance(val, ssl._ASN1Object) 643 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected) 644 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'), 645 expected) 646 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"): 647 ssl._ASN1Object.fromname('serverauth') 648 649 def test_purpose_enum(self): 650 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1') 651 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object) 652 self.assertEqual(ssl.Purpose.SERVER_AUTH, val) 653 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129) 654 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth') 655 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid, 656 '1.3.6.1.5.5.7.3.1') 657 658 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2') 659 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object) 660 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val) 661 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130) 662 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth') 663 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid, 664 '1.3.6.1.5.5.7.3.2') 665 666 def test_unsupported_dtls(self): 667 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 668 self.addCleanup(s.close) 669 with self.assertRaises(NotImplementedError) as cx: 670 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE) 671 self.assertEqual(str(cx.exception), "only stream sockets are supported") 672 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 673 with self.assertRaises(NotImplementedError) as cx: 674 ctx.wrap_socket(s) 675 self.assertEqual(str(cx.exception), "only stream sockets are supported") 676 677 def cert_time_ok(self, timestring, timestamp): 678 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp) 679 680 def cert_time_fail(self, timestring): 681 with self.assertRaises(ValueError): 682 ssl.cert_time_to_seconds(timestring) 683 684 @unittest.skipUnless(utc_offset(), 685 'local time needs to be different from UTC') 686 def test_cert_time_to_seconds_timezone(self): 687 # Issue #19940: ssl.cert_time_to_seconds() returns wrong 688 # results if local timezone is not UTC 689 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0) 690 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0) 691 692 def test_cert_time_to_seconds(self): 693 timestring = "Jan 5 09:34:43 2018 GMT" 694 ts = 1515144883.0 695 self.cert_time_ok(timestring, ts) 696 # accept keyword parameter, assert its name 697 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts) 698 # accept both %e and %d (space or zero generated by strftime) 699 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts) 700 # case-insensitive 701 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts) 702 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds 703 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT 704 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone 705 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day 706 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month 707 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour 708 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute 709 710 newyear_ts = 1230768000.0 711 # leap seconds 712 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts) 713 # same timestamp 714 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts) 715 716 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899) 717 # allow 60th second (even if it is not a leap second) 718 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900) 719 # allow 2nd leap second for compatibility with time.strptime() 720 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901) 721 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds 722 723 # no special treatement for the special value: 724 # 99991231235959Z (rfc 5280) 725 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0) 726 727 @support.run_with_locale('LC_ALL', '') 728 def test_cert_time_to_seconds_locale(self): 729 # `cert_time_to_seconds()` should be locale independent 730 731 def local_february_name(): 732 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0)) 733 734 if local_february_name().lower() == 'feb': 735 self.skipTest("locale-specific month name needs to be " 736 "different from C locale") 737 738 # locale-independent 739 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0) 740 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT") 741 742 743class ContextTests(unittest.TestCase): 744 745 @skip_if_broken_ubuntu_ssl 746 def test_constructor(self): 747 for protocol in PROTOCOLS: 748 ssl.SSLContext(protocol) 749 self.assertRaises(TypeError, ssl.SSLContext) 750 self.assertRaises(ValueError, ssl.SSLContext, -1) 751 self.assertRaises(ValueError, ssl.SSLContext, 42) 752 753 @skip_if_broken_ubuntu_ssl 754 def test_protocol(self): 755 for proto in PROTOCOLS: 756 ctx = ssl.SSLContext(proto) 757 self.assertEqual(ctx.protocol, proto) 758 759 def test_ciphers(self): 760 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 761 ctx.set_ciphers("ALL") 762 ctx.set_ciphers("DEFAULT") 763 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): 764 ctx.set_ciphers("^$:,;?*'dorothyx") 765 766 @skip_if_broken_ubuntu_ssl 767 def _test_options(self): 768 ''' 769 Disable this test, it is too flaky. Different platforms define 770 different defaults 771 ''' 772 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 773 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value 774 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) 775 if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0): 776 default |= ssl.OP_NO_COMPRESSION 777 default |= ssl.OP_ENABLE_MIDDLEBOX_COMPAT 778 self.assertEqual(default, ctx.options) 779 ctx.options |= ssl.OP_NO_TLSv1 780 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options) 781 if can_clear_options(): 782 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1) 783 self.assertEqual(default, ctx.options) 784 ctx.options = 0 785 self.assertEqual(0, ctx.options) 786 else: 787 with self.assertRaises(ValueError): 788 ctx.options = 0 789 790 def test_verify_mode(self): 791 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 792 # Default value 793 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 794 ctx.verify_mode = ssl.CERT_OPTIONAL 795 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) 796 ctx.verify_mode = ssl.CERT_REQUIRED 797 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 798 ctx.verify_mode = ssl.CERT_NONE 799 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 800 with self.assertRaises(TypeError): 801 ctx.verify_mode = None 802 with self.assertRaises(ValueError): 803 ctx.verify_mode = 42 804 805 @unittest.skipUnless(have_verify_flags(), 806 "verify_flags need OpenSSL > 0.9.8") 807 def test_verify_flags(self): 808 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 809 # default value 810 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0) 811 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf) 812 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF 813 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF) 814 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN 815 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN) 816 ctx.verify_flags = ssl.VERIFY_DEFAULT 817 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT) 818 # supports any value 819 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT 820 self.assertEqual(ctx.verify_flags, 821 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT) 822 with self.assertRaises(TypeError): 823 ctx.verify_flags = None 824 825 def test_load_cert_chain(self): 826 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 827 # Combined key and cert in a single file 828 ctx.load_cert_chain(CERTFILE, keyfile=None) 829 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) 830 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) 831 with self.assertRaises(IOError) as cm: 832 ctx.load_cert_chain(NONEXISTINGCERT) 833 self.assertEqual(cm.exception.errno, errno.ENOENT) 834 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 835 ctx.load_cert_chain(BADCERT) 836 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 837 ctx.load_cert_chain(EMPTYCERT) 838 # Separate key and cert 839 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 840 ctx.load_cert_chain(ONLYCERT, ONLYKEY) 841 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY) 842 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY) 843 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 844 ctx.load_cert_chain(ONLYCERT) 845 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 846 ctx.load_cert_chain(ONLYKEY) 847 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 848 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT) 849 # Mismatching key and cert 850 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 851 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"): 852 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY) 853 # Password protected key and cert 854 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD) 855 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode()) 856 ctx.load_cert_chain(CERTFILE_PROTECTED, 857 password=bytearray(KEY_PASSWORD.encode())) 858 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD) 859 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode()) 860 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, 861 bytearray(KEY_PASSWORD.encode())) 862 with self.assertRaisesRegexp(TypeError, "should be a string"): 863 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True) 864 with self.assertRaises(ssl.SSLError): 865 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass") 866 with self.assertRaisesRegexp(ValueError, "cannot be longer"): 867 # openssl has a fixed limit on the password buffer. 868 # PEM_BUFSIZE is generally set to 1kb. 869 # Return a string larger than this. 870 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400) 871 # Password callback 872 def getpass_unicode(): 873 return KEY_PASSWORD 874 def getpass_bytes(): 875 return KEY_PASSWORD.encode() 876 def getpass_bytearray(): 877 return bytearray(KEY_PASSWORD.encode()) 878 def getpass_badpass(): 879 return "badpass" 880 def getpass_huge(): 881 return b'a' * (1024 * 1024) 882 def getpass_bad_type(): 883 return 9 884 def getpass_exception(): 885 raise Exception('getpass error') 886 class GetPassCallable: 887 def __call__(self): 888 return KEY_PASSWORD 889 def getpass(self): 890 return KEY_PASSWORD 891 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode) 892 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes) 893 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray) 894 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable()) 895 ctx.load_cert_chain(CERTFILE_PROTECTED, 896 password=GetPassCallable().getpass) 897 with self.assertRaises(ssl.SSLError): 898 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass) 899 with self.assertRaisesRegexp(ValueError, "cannot be longer"): 900 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge) 901 with self.assertRaisesRegexp(TypeError, "must return a string"): 902 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type) 903 with self.assertRaisesRegexp(Exception, "getpass error"): 904 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception) 905 # Make sure the password function isn't called if it isn't needed 906 ctx.load_cert_chain(CERTFILE, password=getpass_exception) 907 908 def test_load_verify_locations(self): 909 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 910 ctx.load_verify_locations(CERTFILE) 911 ctx.load_verify_locations(cafile=CERTFILE, capath=None) 912 ctx.load_verify_locations(BYTES_CERTFILE) 913 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) 914 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8')) 915 self.assertRaises(TypeError, ctx.load_verify_locations) 916 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None) 917 with self.assertRaises(IOError) as cm: 918 ctx.load_verify_locations(NONEXISTINGCERT) 919 self.assertEqual(cm.exception.errno, errno.ENOENT) 920 with self.assertRaises(IOError): 921 ctx.load_verify_locations(u'') 922 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 923 ctx.load_verify_locations(BADCERT) 924 ctx.load_verify_locations(CERTFILE, CAPATH) 925 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) 926 927 # Issue #10989: crash if the second argument type is invalid 928 self.assertRaises(TypeError, ctx.load_verify_locations, None, True) 929 930 def test_load_verify_cadata(self): 931 # test cadata 932 with open(CAFILE_CACERT) as f: 933 cacert_pem = f.read().decode("ascii") 934 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem) 935 with open(CAFILE_NEURONIO) as f: 936 neuronio_pem = f.read().decode("ascii") 937 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem) 938 939 # test PEM 940 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 941 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0) 942 ctx.load_verify_locations(cadata=cacert_pem) 943 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1) 944 ctx.load_verify_locations(cadata=neuronio_pem) 945 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 946 # cert already in hash table 947 ctx.load_verify_locations(cadata=neuronio_pem) 948 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 949 950 # combined 951 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 952 combined = "\n".join((cacert_pem, neuronio_pem)) 953 ctx.load_verify_locations(cadata=combined) 954 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 955 956 # with junk around the certs 957 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 958 combined = ["head", cacert_pem, "other", neuronio_pem, "again", 959 neuronio_pem, "tail"] 960 ctx.load_verify_locations(cadata="\n".join(combined)) 961 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 962 963 # test DER 964 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 965 ctx.load_verify_locations(cadata=cacert_der) 966 ctx.load_verify_locations(cadata=neuronio_der) 967 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 968 # cert already in hash table 969 ctx.load_verify_locations(cadata=cacert_der) 970 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 971 972 # combined 973 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 974 combined = b"".join((cacert_der, neuronio_der)) 975 ctx.load_verify_locations(cadata=combined) 976 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 977 978 # error cases 979 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 980 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object) 981 982 with self.assertRaisesRegexp(ssl.SSLError, "no start line"): 983 ctx.load_verify_locations(cadata=u"broken") 984 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"): 985 ctx.load_verify_locations(cadata=b"broken") 986 987 988 def test_load_dh_params(self): 989 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 990 ctx.load_dh_params(DHFILE) 991 if os.name != 'nt': 992 ctx.load_dh_params(BYTES_DHFILE) 993 self.assertRaises(TypeError, ctx.load_dh_params) 994 self.assertRaises(TypeError, ctx.load_dh_params, None) 995 with self.assertRaises(IOError) as cm: 996 ctx.load_dh_params(NONEXISTINGCERT) 997 self.assertEqual(cm.exception.errno, errno.ENOENT) 998 with self.assertRaises(ssl.SSLError) as cm: 999 ctx.load_dh_params(CERTFILE) 1000 1001 @skip_if_broken_ubuntu_ssl 1002 def test_session_stats(self): 1003 for proto in PROTOCOLS: 1004 ctx = ssl.SSLContext(proto) 1005 self.assertEqual(ctx.session_stats(), { 1006 'number': 0, 1007 'connect': 0, 1008 'connect_good': 0, 1009 'connect_renegotiate': 0, 1010 'accept': 0, 1011 'accept_good': 0, 1012 'accept_renegotiate': 0, 1013 'hits': 0, 1014 'misses': 0, 1015 'timeouts': 0, 1016 'cache_full': 0, 1017 }) 1018 1019 def test_set_default_verify_paths(self): 1020 # There's not much we can do to test that it acts as expected, 1021 # so just check it doesn't crash or raise an exception. 1022 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1023 ctx.set_default_verify_paths() 1024 1025 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build") 1026 def test_set_ecdh_curve(self): 1027 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1028 ctx.set_ecdh_curve("prime256v1") 1029 ctx.set_ecdh_curve(b"prime256v1") 1030 self.assertRaises(TypeError, ctx.set_ecdh_curve) 1031 self.assertRaises(TypeError, ctx.set_ecdh_curve, None) 1032 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo") 1033 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo") 1034 1035 @needs_sni 1036 def test_sni_callback(self): 1037 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1038 1039 # set_servername_callback expects a callable, or None 1040 self.assertRaises(TypeError, ctx.set_servername_callback) 1041 self.assertRaises(TypeError, ctx.set_servername_callback, 4) 1042 self.assertRaises(TypeError, ctx.set_servername_callback, "") 1043 self.assertRaises(TypeError, ctx.set_servername_callback, ctx) 1044 1045 def dummycallback(sock, servername, ctx): 1046 pass 1047 ctx.set_servername_callback(None) 1048 ctx.set_servername_callback(dummycallback) 1049 1050 @needs_sni 1051 def test_sni_callback_refcycle(self): 1052 # Reference cycles through the servername callback are detected 1053 # and cleared. 1054 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1055 def dummycallback(sock, servername, ctx, cycle=ctx): 1056 pass 1057 ctx.set_servername_callback(dummycallback) 1058 wr = weakref.ref(ctx) 1059 del ctx, dummycallback 1060 gc.collect() 1061 self.assertIs(wr(), None) 1062 1063 def test_cert_store_stats(self): 1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1065 self.assertEqual(ctx.cert_store_stats(), 1066 {'x509_ca': 0, 'crl': 0, 'x509': 0}) 1067 ctx.load_cert_chain(CERTFILE) 1068 self.assertEqual(ctx.cert_store_stats(), 1069 {'x509_ca': 0, 'crl': 0, 'x509': 0}) 1070 ctx.load_verify_locations(CERTFILE) 1071 self.assertEqual(ctx.cert_store_stats(), 1072 {'x509_ca': 0, 'crl': 0, 'x509': 1}) 1073 ctx.load_verify_locations(CAFILE_CACERT) 1074 self.assertEqual(ctx.cert_store_stats(), 1075 {'x509_ca': 1, 'crl': 0, 'x509': 2}) 1076 1077 def test_get_ca_certs(self): 1078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1079 self.assertEqual(ctx.get_ca_certs(), []) 1080 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE 1081 ctx.load_verify_locations(CERTFILE) 1082 self.assertEqual(ctx.get_ca_certs(), []) 1083 # but CAFILE_CACERT is a CA cert 1084 ctx.load_verify_locations(CAFILE_CACERT) 1085 self.assertEqual(ctx.get_ca_certs(), 1086 [{'issuer': ((('organizationName', 'Root CA'),), 1087 (('organizationalUnitName', 'http://www.cacert.org'),), 1088 (('commonName', 'CA Cert Signing Authority'),), 1089 (('emailAddress', 'support@cacert.org'),)), 1090 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'), 1091 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'), 1092 'serialNumber': '00', 1093 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',), 1094 'subject': ((('organizationName', 'Root CA'),), 1095 (('organizationalUnitName', 'http://www.cacert.org'),), 1096 (('commonName', 'CA Cert Signing Authority'),), 1097 (('emailAddress', 'support@cacert.org'),)), 1098 'version': 3}]) 1099 1100 with open(CAFILE_CACERT) as f: 1101 pem = f.read() 1102 der = ssl.PEM_cert_to_DER_cert(pem) 1103 self.assertEqual(ctx.get_ca_certs(True), [der]) 1104 1105 def test_load_default_certs(self): 1106 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1107 ctx.load_default_certs() 1108 1109 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1110 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH) 1111 ctx.load_default_certs() 1112 1113 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1114 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH) 1115 1116 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1117 self.assertRaises(TypeError, ctx.load_default_certs, None) 1118 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH') 1119 1120 @unittest.skipIf(sys.platform == "win32", "not-Windows specific") 1121 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars") 1122 def test_load_default_certs_env(self): 1123 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1124 with support.EnvironmentVarGuard() as env: 1125 env["SSL_CERT_DIR"] = CAPATH 1126 env["SSL_CERT_FILE"] = CERTFILE 1127 ctx.load_default_certs() 1128 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0}) 1129 1130 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 1131 def test_load_default_certs_env_windows(self): 1132 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1133 ctx.load_default_certs() 1134 stats = ctx.cert_store_stats() 1135 1136 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1137 with support.EnvironmentVarGuard() as env: 1138 env["SSL_CERT_DIR"] = CAPATH 1139 env["SSL_CERT_FILE"] = CERTFILE 1140 ctx.load_default_certs() 1141 stats["x509"] += 1 1142 self.assertEqual(ctx.cert_store_stats(), stats) 1143 1144 def test_create_default_context(self): 1145 ctx = ssl.create_default_context() 1146 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1147 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1148 self.assertTrue(ctx.check_hostname) 1149 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1150 self.assertEqual( 1151 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1152 getattr(ssl, "OP_NO_COMPRESSION", 0), 1153 ) 1154 1155 with open(SIGNING_CA) as f: 1156 cadata = f.read().decode("ascii") 1157 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH, 1158 cadata=cadata) 1159 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1160 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1161 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1162 self.assertEqual( 1163 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1164 getattr(ssl, "OP_NO_COMPRESSION", 0), 1165 ) 1166 1167 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 1168 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1169 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1170 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1171 self.assertEqual( 1172 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1173 getattr(ssl, "OP_NO_COMPRESSION", 0), 1174 ) 1175 self.assertEqual( 1176 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0), 1177 getattr(ssl, "OP_SINGLE_DH_USE", 0), 1178 ) 1179 self.assertEqual( 1180 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0), 1181 getattr(ssl, "OP_SINGLE_ECDH_USE", 0), 1182 ) 1183 1184 def test__create_stdlib_context(self): 1185 ctx = ssl._create_stdlib_context() 1186 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1187 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1188 self.assertFalse(ctx.check_hostname) 1189 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1190 1191 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1) 1192 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) 1193 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1194 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1195 1196 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1, 1197 cert_reqs=ssl.CERT_REQUIRED, 1198 check_hostname=True) 1199 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) 1200 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1201 self.assertTrue(ctx.check_hostname) 1202 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1203 1204 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH) 1205 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1206 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1207 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1208 1209 def test__https_verify_certificates(self): 1210 # Unit test to check the contect factory mapping 1211 # The factories themselves are tested above 1212 # This test will fail by design if run under PYTHONHTTPSVERIFY=0 1213 # (as will various test_httplib tests) 1214 1215 # Uses a fresh SSL module to avoid affecting the real one 1216 local_ssl = support.import_fresh_module("ssl") 1217 # Certificate verification is enabled by default 1218 self.assertIs(local_ssl._create_default_https_context, 1219 local_ssl.create_default_context) 1220 # Turn default verification off 1221 local_ssl._https_verify_certificates(enable=False) 1222 self.assertIs(local_ssl._create_default_https_context, 1223 local_ssl._create_unverified_context) 1224 # And back on 1225 local_ssl._https_verify_certificates(enable=True) 1226 self.assertIs(local_ssl._create_default_https_context, 1227 local_ssl.create_default_context) 1228 # The default behaviour is to enable 1229 local_ssl._https_verify_certificates(enable=False) 1230 local_ssl._https_verify_certificates() 1231 self.assertIs(local_ssl._create_default_https_context, 1232 local_ssl.create_default_context) 1233 1234 def test__https_verify_envvar(self): 1235 # Unit test to check the PYTHONHTTPSVERIFY handling 1236 # Need to use a subprocess so it can still be run under -E 1237 https_is_verified = """import ssl, sys; \ 1238 status = "Error: _create_default_https_context does not verify certs" \ 1239 if ssl._create_default_https_context is \ 1240 ssl._create_unverified_context \ 1241 else None; \ 1242 sys.exit(status)""" 1243 https_is_not_verified = """import ssl, sys; \ 1244 status = "Error: _create_default_https_context verifies certs" \ 1245 if ssl._create_default_https_context is \ 1246 ssl.create_default_context \ 1247 else None; \ 1248 sys.exit(status)""" 1249 extra_env = {} 1250 # Omitting it leaves verification on 1251 assert_python_ok("-c", https_is_verified, **extra_env) 1252 # Setting it to zero turns verification off 1253 extra_env[ssl._https_verify_envvar] = "0" 1254 assert_python_ok("-c", https_is_not_verified, **extra_env) 1255 # Any other value should also leave it on 1256 for setting in ("", "1", "enabled", "foo"): 1257 extra_env[ssl._https_verify_envvar] = setting 1258 assert_python_ok("-c", https_is_verified, **extra_env) 1259 1260 def test_check_hostname(self): 1261 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1262 self.assertFalse(ctx.check_hostname) 1263 1264 # Requires CERT_REQUIRED or CERT_OPTIONAL 1265 with self.assertRaises(ValueError): 1266 ctx.check_hostname = True 1267 ctx.verify_mode = ssl.CERT_REQUIRED 1268 self.assertFalse(ctx.check_hostname) 1269 ctx.check_hostname = True 1270 self.assertTrue(ctx.check_hostname) 1271 1272 ctx.verify_mode = ssl.CERT_OPTIONAL 1273 ctx.check_hostname = True 1274 self.assertTrue(ctx.check_hostname) 1275 1276 # Cannot set CERT_NONE with check_hostname enabled 1277 with self.assertRaises(ValueError): 1278 ctx.verify_mode = ssl.CERT_NONE 1279 ctx.check_hostname = False 1280 self.assertFalse(ctx.check_hostname) 1281 1282 1283class SSLErrorTests(unittest.TestCase): 1284 1285 def test_str(self): 1286 # The str() of a SSLError doesn't include the errno 1287 e = ssl.SSLError(1, "foo") 1288 self.assertEqual(str(e), "foo") 1289 self.assertEqual(e.errno, 1) 1290 # Same for a subclass 1291 e = ssl.SSLZeroReturnError(1, "foo") 1292 self.assertEqual(str(e), "foo") 1293 self.assertEqual(e.errno, 1) 1294 1295 def test_lib_reason(self): 1296 # Test the library and reason attributes 1297 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1298 with self.assertRaises(ssl.SSLError) as cm: 1299 ctx.load_dh_params(CERTFILE) 1300 self.assertEqual(cm.exception.library, 'PEM') 1301 self.assertEqual(cm.exception.reason, 'NO_START_LINE') 1302 s = str(cm.exception) 1303 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s) 1304 1305 def test_subclass(self): 1306 # Check that the appropriate SSLError subclass is raised 1307 # (this only tests one of them) 1308 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1309 with closing(socket.socket()) as s: 1310 s.bind(("127.0.0.1", 0)) 1311 s.listen(5) 1312 c = socket.socket() 1313 c.connect(s.getsockname()) 1314 c.setblocking(False) 1315 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c: 1316 with self.assertRaises(ssl.SSLWantReadError) as cm: 1317 c.do_handshake() 1318 s = str(cm.exception) 1319 self.assertTrue(s.startswith("The operation did not complete (read)"), s) 1320 # For compatibility 1321 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ) 1322 1323 1324class NetworkedTests(unittest.TestCase): 1325 1326 def test_connect(self): 1327 with support.transient_internet(REMOTE_HOST): 1328 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1329 cert_reqs=ssl.CERT_NONE) 1330 try: 1331 s.connect((REMOTE_HOST, 443)) 1332 self.assertEqual({}, s.getpeercert()) 1333 finally: 1334 s.close() 1335 1336 # this should fail because we have no verification certs 1337 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1338 cert_reqs=ssl.CERT_REQUIRED) 1339 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed", 1340 s.connect, (REMOTE_HOST, 443)) 1341 s.close() 1342 1343 # this should succeed because we specify the root cert 1344 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1345 cert_reqs=ssl.CERT_REQUIRED, 1346 ca_certs=REMOTE_ROOT_CERT) 1347 try: 1348 s.connect((REMOTE_HOST, 443)) 1349 self.assertTrue(s.getpeercert()) 1350 finally: 1351 s.close() 1352 1353 def test_connect_ex(self): 1354 # Issue #11326: check connect_ex() implementation 1355 with support.transient_internet(REMOTE_HOST): 1356 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1357 cert_reqs=ssl.CERT_REQUIRED, 1358 ca_certs=REMOTE_ROOT_CERT) 1359 try: 1360 self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443))) 1361 self.assertTrue(s.getpeercert()) 1362 finally: 1363 s.close() 1364 1365 def test_non_blocking_connect_ex(self): 1366 # Issue #11326: non-blocking connect_ex() should allow handshake 1367 # to proceed after the socket gets ready. 1368 with support.transient_internet(REMOTE_HOST): 1369 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1370 cert_reqs=ssl.CERT_REQUIRED, 1371 ca_certs=REMOTE_ROOT_CERT, 1372 do_handshake_on_connect=False) 1373 try: 1374 s.setblocking(False) 1375 rc = s.connect_ex((REMOTE_HOST, 443)) 1376 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere 1377 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) 1378 # Wait for connect to finish 1379 select.select([], [s], [], 5.0) 1380 # Non-blocking handshake 1381 while True: 1382 try: 1383 s.do_handshake() 1384 break 1385 except ssl.SSLWantReadError: 1386 select.select([s], [], [], 5.0) 1387 except ssl.SSLWantWriteError: 1388 select.select([], [s], [], 5.0) 1389 # SSL established 1390 self.assertTrue(s.getpeercert()) 1391 finally: 1392 s.close() 1393 1394 def test_timeout_connect_ex(self): 1395 # Issue #12065: on a timeout, connect_ex() should return the original 1396 # errno (mimicking the behaviour of non-SSL sockets). 1397 with support.transient_internet(REMOTE_HOST): 1398 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1399 cert_reqs=ssl.CERT_REQUIRED, 1400 ca_certs=REMOTE_ROOT_CERT, 1401 do_handshake_on_connect=False) 1402 try: 1403 s.settimeout(0.0000001) 1404 rc = s.connect_ex((REMOTE_HOST, 443)) 1405 if rc == 0: 1406 self.skipTest("REMOTE_HOST responded too quickly") 1407 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) 1408 finally: 1409 s.close() 1410 1411 def test_connect_ex_error(self): 1412 with support.transient_internet(REMOTE_HOST): 1413 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1414 cert_reqs=ssl.CERT_REQUIRED, 1415 ca_certs=REMOTE_ROOT_CERT) 1416 try: 1417 rc = s.connect_ex((REMOTE_HOST, 444)) 1418 # Issue #19919: Windows machines or VMs hosted on Windows 1419 # machines sometimes return EWOULDBLOCK. 1420 errors = ( 1421 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT, 1422 errno.EWOULDBLOCK, 1423 ) 1424 self.assertIn(rc, errors) 1425 finally: 1426 s.close() 1427 1428 def test_connect_with_context(self): 1429 with support.transient_internet(REMOTE_HOST): 1430 # Same as test_connect, but with a separately created context 1431 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1432 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1433 s.connect((REMOTE_HOST, 443)) 1434 try: 1435 self.assertEqual({}, s.getpeercert()) 1436 finally: 1437 s.close() 1438 # Same with a server hostname 1439 s = ctx.wrap_socket(socket.socket(socket.AF_INET), 1440 server_hostname=REMOTE_HOST) 1441 s.connect((REMOTE_HOST, 443)) 1442 s.close() 1443 # This should fail because we have no verification certs 1444 ctx.verify_mode = ssl.CERT_REQUIRED 1445 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1446 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed", 1447 s.connect, (REMOTE_HOST, 443)) 1448 s.close() 1449 # This should succeed because we specify the root cert 1450 ctx.load_verify_locations(REMOTE_ROOT_CERT) 1451 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1452 s.connect((REMOTE_HOST, 443)) 1453 try: 1454 cert = s.getpeercert() 1455 self.assertTrue(cert) 1456 finally: 1457 s.close() 1458 1459 def test_connect_capath(self): 1460 # Verify server certificates using the `capath` argument 1461 # NOTE: the subject hashing algorithm has been changed between 1462 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must 1463 # contain both versions of each certificate (same content, different 1464 # filename) for this test to be portable across OpenSSL releases. 1465 with support.transient_internet(REMOTE_HOST): 1466 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1467 ctx.verify_mode = ssl.CERT_REQUIRED 1468 ctx.load_verify_locations(capath=CAPATH) 1469 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1470 s.connect((REMOTE_HOST, 443)) 1471 try: 1472 cert = s.getpeercert() 1473 self.assertTrue(cert) 1474 finally: 1475 s.close() 1476 # Same with a bytes `capath` argument 1477 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1478 ctx.verify_mode = ssl.CERT_REQUIRED 1479 ctx.load_verify_locations(capath=BYTES_CAPATH) 1480 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1481 s.connect((REMOTE_HOST, 443)) 1482 try: 1483 cert = s.getpeercert() 1484 self.assertTrue(cert) 1485 finally: 1486 s.close() 1487 1488 def test_connect_cadata(self): 1489 with open(REMOTE_ROOT_CERT) as f: 1490 pem = f.read().decode('ascii') 1491 der = ssl.PEM_cert_to_DER_cert(pem) 1492 with support.transient_internet(REMOTE_HOST): 1493 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1494 ctx.verify_mode = ssl.CERT_REQUIRED 1495 ctx.load_verify_locations(cadata=pem) 1496 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s: 1497 s.connect((REMOTE_HOST, 443)) 1498 cert = s.getpeercert() 1499 self.assertTrue(cert) 1500 1501 # same with DER 1502 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1503 ctx.verify_mode = ssl.CERT_REQUIRED 1504 ctx.load_verify_locations(cadata=der) 1505 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s: 1506 s.connect((REMOTE_HOST, 443)) 1507 cert = s.getpeercert() 1508 self.assertTrue(cert) 1509 1510 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") 1511 def test_makefile_close(self): 1512 # Issue #5238: creating a file-like object with makefile() shouldn't 1513 # delay closing the underlying "real socket" (here tested with its 1514 # file descriptor, hence skipping the test under Windows). 1515 with support.transient_internet(REMOTE_HOST): 1516 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 1517 ss.connect((REMOTE_HOST, 443)) 1518 fd = ss.fileno() 1519 f = ss.makefile() 1520 f.close() 1521 # The fd is still open 1522 os.read(fd, 0) 1523 # Closing the SSL socket should close the fd too 1524 ss.close() 1525 gc.collect() 1526 with self.assertRaises(OSError) as e: 1527 os.read(fd, 0) 1528 self.assertEqual(e.exception.errno, errno.EBADF) 1529 1530 def test_non_blocking_handshake(self): 1531 with support.transient_internet(REMOTE_HOST): 1532 s = socket.socket(socket.AF_INET) 1533 s.connect((REMOTE_HOST, 443)) 1534 s.setblocking(False) 1535 s = ssl.wrap_socket(s, 1536 cert_reqs=ssl.CERT_NONE, 1537 do_handshake_on_connect=False) 1538 count = 0 1539 while True: 1540 try: 1541 count += 1 1542 s.do_handshake() 1543 break 1544 except ssl.SSLWantReadError: 1545 select.select([s], [], []) 1546 except ssl.SSLWantWriteError: 1547 select.select([], [s], []) 1548 s.close() 1549 if support.verbose: 1550 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) 1551 1552 def test_get_server_certificate(self): 1553 def _test_get_server_certificate(host, port, cert=None): 1554 with support.transient_internet(host): 1555 pem = ssl.get_server_certificate((host, port)) 1556 if not pem: 1557 self.fail("No server certificate on %s:%s!" % (host, port)) 1558 1559 try: 1560 pem = ssl.get_server_certificate((host, port), 1561 ca_certs=CERTFILE) 1562 except ssl.SSLError as x: 1563 #should fail 1564 if support.verbose: 1565 sys.stdout.write("%s\n" % x) 1566 else: 1567 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) 1568 pem = ssl.get_server_certificate((host, port), 1569 ca_certs=cert) 1570 if not pem: 1571 self.fail("No server certificate on %s:%s!" % (host, port)) 1572 if support.verbose: 1573 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) 1574 1575 _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT) 1576 if support.IPV6_ENABLED: 1577 _test_get_server_certificate('ipv6.google.com', 443) 1578 1579 def test_ciphers(self): 1580 remote = (REMOTE_HOST, 443) 1581 with support.transient_internet(remote[0]): 1582 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET), 1583 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s: 1584 s.connect(remote) 1585 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET), 1586 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s: 1587 s.connect(remote) 1588 # Error checking can happen at instantiation or when connecting 1589 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): 1590 with closing(socket.socket(socket.AF_INET)) as sock: 1591 s = ssl.wrap_socket(sock, 1592 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") 1593 s.connect(remote) 1594 1595 def test_algorithms(self): 1596 # Issue #8484: all algorithms should be available when verifying a 1597 # certificate. 1598 # SHA256 was added in OpenSSL 0.9.8 1599 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): 1600 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) 1601 # sha256.tbs-internet.com needs SNI to use the correct certificate 1602 if not ssl.HAS_SNI: 1603 self.skipTest("SNI needed for this test") 1604 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host) 1605 remote = ("sha256.tbs-internet.com", 443) 1606 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") 1607 with support.transient_internet("sha256.tbs-internet.com"): 1608 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1609 ctx.verify_mode = ssl.CERT_REQUIRED 1610 ctx.load_verify_locations(sha256_cert) 1611 s = ctx.wrap_socket(socket.socket(socket.AF_INET), 1612 server_hostname="sha256.tbs-internet.com") 1613 try: 1614 s.connect(remote) 1615 if support.verbose: 1616 sys.stdout.write("\nCipher with %r is %r\n" % 1617 (remote, s.cipher())) 1618 sys.stdout.write("Certificate is:\n%s\n" % 1619 pprint.pformat(s.getpeercert())) 1620 finally: 1621 s.close() 1622 1623 def test_get_ca_certs_capath(self): 1624 # capath certs are loaded on request 1625 with support.transient_internet(REMOTE_HOST): 1626 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1627 ctx.verify_mode = ssl.CERT_REQUIRED 1628 ctx.load_verify_locations(capath=CAPATH) 1629 self.assertEqual(ctx.get_ca_certs(), []) 1630 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1631 s.connect((REMOTE_HOST, 443)) 1632 try: 1633 cert = s.getpeercert() 1634 self.assertTrue(cert) 1635 finally: 1636 s.close() 1637 self.assertEqual(len(ctx.get_ca_certs()), 1) 1638 1639 @needs_sni 1640 def test_context_setget(self): 1641 # Check that the context of a connected socket can be replaced. 1642 with support.transient_internet(REMOTE_HOST): 1643 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1644 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1645 s = socket.socket(socket.AF_INET) 1646 with closing(ctx1.wrap_socket(s)) as ss: 1647 ss.connect((REMOTE_HOST, 443)) 1648 self.assertIs(ss.context, ctx1) 1649 self.assertIs(ss._sslobj.context, ctx1) 1650 ss.context = ctx2 1651 self.assertIs(ss.context, ctx2) 1652 self.assertIs(ss._sslobj.context, ctx2) 1653 1654try: 1655 import threading 1656except ImportError: 1657 _have_threads = False 1658else: 1659 _have_threads = True 1660 1661 from test.ssl_servers import make_https_server 1662 1663 class ThreadedEchoServer(threading.Thread): 1664 1665 class ConnectionHandler(threading.Thread): 1666 1667 """A mildly complicated class, because we want it to work both 1668 with and without the SSL wrapper around the socket connection, so 1669 that we can test the STARTTLS functionality.""" 1670 1671 def __init__(self, server, connsock, addr): 1672 self.server = server 1673 self.running = False 1674 self.sock = connsock 1675 self.addr = addr 1676 self.sock.setblocking(1) 1677 self.sslconn = None 1678 threading.Thread.__init__(self) 1679 self.daemon = True 1680 1681 def wrap_conn(self): 1682 try: 1683 self.sslconn = self.server.context.wrap_socket( 1684 self.sock, server_side=True) 1685 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol()) 1686 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol()) 1687 except socket.error as e: 1688 # We treat ConnectionResetError as though it were an 1689 # SSLError - OpenSSL on Ubuntu abruptly closes the 1690 # connection when asked to use an unsupported protocol. 1691 # 1692 # XXX Various errors can have happened here, for example 1693 # a mismatching protocol version, an invalid certificate, 1694 # or a low-level bug. This should be made more discriminating. 1695 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET: 1696 raise 1697 self.server.conn_errors.append(e) 1698 if self.server.chatty: 1699 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n") 1700 self.running = False 1701 self.server.stop() 1702 self.close() 1703 return False 1704 else: 1705 if self.server.context.verify_mode == ssl.CERT_REQUIRED: 1706 cert = self.sslconn.getpeercert() 1707 if support.verbose and self.server.chatty: 1708 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") 1709 cert_binary = self.sslconn.getpeercert(True) 1710 if support.verbose and self.server.chatty: 1711 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") 1712 cipher = self.sslconn.cipher() 1713 if support.verbose and self.server.chatty: 1714 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") 1715 sys.stdout.write(" server: selected protocol is now " 1716 + str(self.sslconn.selected_npn_protocol()) + "\n") 1717 return True 1718 1719 def read(self): 1720 if self.sslconn: 1721 return self.sslconn.read() 1722 else: 1723 return self.sock.recv(1024) 1724 1725 def write(self, bytes): 1726 if self.sslconn: 1727 return self.sslconn.write(bytes) 1728 else: 1729 return self.sock.send(bytes) 1730 1731 def close(self): 1732 if self.sslconn: 1733 self.sslconn.close() 1734 else: 1735 self.sock.close() 1736 1737 def run(self): 1738 self.running = True 1739 if not self.server.starttls_server: 1740 if not self.wrap_conn(): 1741 return 1742 while self.running: 1743 try: 1744 msg = self.read() 1745 stripped = msg.strip() 1746 if not stripped: 1747 # eof, so quit this handler 1748 self.running = False 1749 self.close() 1750 elif stripped == b'over': 1751 if support.verbose and self.server.connectionchatty: 1752 sys.stdout.write(" server: client closed connection\n") 1753 self.close() 1754 return 1755 elif (self.server.starttls_server and 1756 stripped == b'STARTTLS'): 1757 if support.verbose and self.server.connectionchatty: 1758 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") 1759 self.write(b"OK\n") 1760 if not self.wrap_conn(): 1761 return 1762 elif (self.server.starttls_server and self.sslconn 1763 and stripped == b'ENDTLS'): 1764 if support.verbose and self.server.connectionchatty: 1765 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") 1766 self.write(b"OK\n") 1767 self.sock = self.sslconn.unwrap() 1768 self.sslconn = None 1769 if support.verbose and self.server.connectionchatty: 1770 sys.stdout.write(" server: connection is now unencrypted...\n") 1771 elif stripped == b'CB tls-unique': 1772 if support.verbose and self.server.connectionchatty: 1773 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n") 1774 data = self.sslconn.get_channel_binding("tls-unique") 1775 self.write(repr(data).encode("us-ascii") + b"\n") 1776 else: 1777 if (support.verbose and 1778 self.server.connectionchatty): 1779 ctype = (self.sslconn and "encrypted") or "unencrypted" 1780 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n" 1781 % (msg, ctype, msg.lower(), ctype)) 1782 self.write(msg.lower()) 1783 except ssl.SSLError: 1784 if self.server.chatty: 1785 handle_error("Test server failure:\n") 1786 self.close() 1787 self.running = False 1788 # normally, we'd just stop here, but for the test 1789 # harness, we want to stop the server 1790 self.server.stop() 1791 1792 def __init__(self, certificate=None, ssl_version=None, 1793 certreqs=None, cacerts=None, 1794 chatty=True, connectionchatty=False, starttls_server=False, 1795 npn_protocols=None, alpn_protocols=None, 1796 ciphers=None, context=None): 1797 if context: 1798 self.context = context 1799 else: 1800 self.context = ssl.SSLContext(ssl_version 1801 if ssl_version is not None 1802 else ssl.PROTOCOL_TLSv1) 1803 self.context.verify_mode = (certreqs if certreqs is not None 1804 else ssl.CERT_NONE) 1805 if cacerts: 1806 self.context.load_verify_locations(cacerts) 1807 if certificate: 1808 self.context.load_cert_chain(certificate) 1809 if npn_protocols: 1810 self.context.set_npn_protocols(npn_protocols) 1811 if alpn_protocols: 1812 self.context.set_alpn_protocols(alpn_protocols) 1813 if ciphers: 1814 self.context.set_ciphers(ciphers) 1815 self.chatty = chatty 1816 self.connectionchatty = connectionchatty 1817 self.starttls_server = starttls_server 1818 self.sock = socket.socket() 1819 self.port = support.bind_port(self.sock) 1820 self.flag = None 1821 self.active = False 1822 self.selected_npn_protocols = [] 1823 self.selected_alpn_protocols = [] 1824 self.conn_errors = [] 1825 threading.Thread.__init__(self) 1826 self.daemon = True 1827 1828 def __enter__(self): 1829 self.start(threading.Event()) 1830 self.flag.wait() 1831 return self 1832 1833 def __exit__(self, *args): 1834 self.stop() 1835 self.join() 1836 1837 def start(self, flag=None): 1838 self.flag = flag 1839 threading.Thread.start(self) 1840 1841 def run(self): 1842 self.sock.settimeout(0.05) 1843 self.sock.listen(5) 1844 self.active = True 1845 if self.flag: 1846 # signal an event 1847 self.flag.set() 1848 while self.active: 1849 try: 1850 newconn, connaddr = self.sock.accept() 1851 if support.verbose and self.chatty: 1852 sys.stdout.write(' server: new connection from ' 1853 + repr(connaddr) + '\n') 1854 handler = self.ConnectionHandler(self, newconn, connaddr) 1855 handler.start() 1856 handler.join() 1857 except socket.timeout: 1858 pass 1859 except KeyboardInterrupt: 1860 self.stop() 1861 self.sock.close() 1862 1863 def stop(self): 1864 self.active = False 1865 1866 class AsyncoreEchoServer(threading.Thread): 1867 1868 class EchoServer(asyncore.dispatcher): 1869 1870 class ConnectionHandler(asyncore.dispatcher_with_send): 1871 1872 def __init__(self, conn, certfile): 1873 self.socket = ssl.wrap_socket(conn, server_side=True, 1874 certfile=certfile, 1875 do_handshake_on_connect=False) 1876 asyncore.dispatcher_with_send.__init__(self, self.socket) 1877 self._ssl_accepting = True 1878 self._do_ssl_handshake() 1879 1880 def readable(self): 1881 if isinstance(self.socket, ssl.SSLSocket): 1882 while self.socket.pending() > 0: 1883 self.handle_read_event() 1884 return True 1885 1886 def _do_ssl_handshake(self): 1887 try: 1888 self.socket.do_handshake() 1889 except (ssl.SSLWantReadError, ssl.SSLWantWriteError): 1890 return 1891 except ssl.SSLEOFError: 1892 return self.handle_close() 1893 except ssl.SSLError: 1894 raise 1895 except socket.error, err: 1896 if err.args[0] == errno.ECONNABORTED: 1897 return self.handle_close() 1898 else: 1899 self._ssl_accepting = False 1900 1901 def handle_read(self): 1902 if self._ssl_accepting: 1903 self._do_ssl_handshake() 1904 else: 1905 data = self.recv(1024) 1906 if support.verbose: 1907 sys.stdout.write(" server: read %s from client\n" % repr(data)) 1908 if not data: 1909 self.close() 1910 else: 1911 self.send(data.lower()) 1912 1913 def handle_close(self): 1914 self.close() 1915 if support.verbose: 1916 sys.stdout.write(" server: closed connection %s\n" % self.socket) 1917 1918 def handle_error(self): 1919 raise 1920 1921 def __init__(self, certfile): 1922 self.certfile = certfile 1923 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1924 self.port = support.bind_port(sock, '') 1925 asyncore.dispatcher.__init__(self, sock) 1926 self.listen(5) 1927 1928 def handle_accept(self): 1929 sock_obj, addr = self.accept() 1930 if support.verbose: 1931 sys.stdout.write(" server: new connection from %s:%s\n" %addr) 1932 self.ConnectionHandler(sock_obj, self.certfile) 1933 1934 def handle_error(self): 1935 raise 1936 1937 def __init__(self, certfile): 1938 self.flag = None 1939 self.active = False 1940 self.server = self.EchoServer(certfile) 1941 self.port = self.server.port 1942 threading.Thread.__init__(self) 1943 self.daemon = True 1944 1945 def __str__(self): 1946 return "<%s %s>" % (self.__class__.__name__, self.server) 1947 1948 def __enter__(self): 1949 self.start(threading.Event()) 1950 self.flag.wait() 1951 return self 1952 1953 def __exit__(self, *args): 1954 if support.verbose: 1955 sys.stdout.write(" cleanup: stopping server.\n") 1956 self.stop() 1957 if support.verbose: 1958 sys.stdout.write(" cleanup: joining server thread.\n") 1959 self.join() 1960 if support.verbose: 1961 sys.stdout.write(" cleanup: successfully joined.\n") 1962 1963 def start(self, flag=None): 1964 self.flag = flag 1965 threading.Thread.start(self) 1966 1967 def run(self): 1968 self.active = True 1969 if self.flag: 1970 self.flag.set() 1971 while self.active: 1972 try: 1973 asyncore.loop(1) 1974 except: 1975 pass 1976 1977 def stop(self): 1978 self.active = False 1979 self.server.close() 1980 1981 def server_params_test(client_context, server_context, indata=b"FOO\n", 1982 chatty=True, connectionchatty=False, sni_name=None): 1983 """ 1984 Launch a server, connect a client to it and try various reads 1985 and writes. 1986 """ 1987 stats = {} 1988 server = ThreadedEchoServer(context=server_context, 1989 chatty=chatty, 1990 connectionchatty=False) 1991 with server: 1992 with closing(client_context.wrap_socket(socket.socket(), 1993 server_hostname=sni_name)) as s: 1994 s.connect((HOST, server.port)) 1995 for arg in [indata, bytearray(indata), memoryview(indata)]: 1996 if connectionchatty: 1997 if support.verbose: 1998 sys.stdout.write( 1999 " client: sending %r...\n" % indata) 2000 s.write(arg) 2001 outdata = s.read() 2002 if connectionchatty: 2003 if support.verbose: 2004 sys.stdout.write(" client: read %r\n" % outdata) 2005 if outdata != indata.lower(): 2006 raise AssertionError( 2007 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" 2008 % (outdata[:20], len(outdata), 2009 indata[:20].lower(), len(indata))) 2010 s.write(b"over\n") 2011 if connectionchatty: 2012 if support.verbose: 2013 sys.stdout.write(" client: closing connection.\n") 2014 stats.update({ 2015 'compression': s.compression(), 2016 'cipher': s.cipher(), 2017 'peercert': s.getpeercert(), 2018 'client_alpn_protocol': s.selected_alpn_protocol(), 2019 'client_npn_protocol': s.selected_npn_protocol(), 2020 'version': s.version(), 2021 }) 2022 s.close() 2023 stats['server_alpn_protocols'] = server.selected_alpn_protocols 2024 stats['server_npn_protocols'] = server.selected_npn_protocols 2025 return stats 2026 2027 def try_protocol_combo(server_protocol, client_protocol, expect_success, 2028 certsreqs=None, server_options=0, client_options=0): 2029 """ 2030 Try to SSL-connect using *client_protocol* to *server_protocol*. 2031 If *expect_success* is true, assert that the connection succeeds, 2032 if it's false, assert that the connection fails. 2033 Also, if *expect_success* is a string, assert that it is the protocol 2034 version actually used by the connection. 2035 """ 2036 if certsreqs is None: 2037 certsreqs = ssl.CERT_NONE 2038 certtype = { 2039 ssl.CERT_NONE: "CERT_NONE", 2040 ssl.CERT_OPTIONAL: "CERT_OPTIONAL", 2041 ssl.CERT_REQUIRED: "CERT_REQUIRED", 2042 }[certsreqs] 2043 if support.verbose: 2044 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" 2045 sys.stdout.write(formatstr % 2046 (ssl.get_protocol_name(client_protocol), 2047 ssl.get_protocol_name(server_protocol), 2048 certtype)) 2049 client_context = ssl.SSLContext(client_protocol) 2050 client_context.options |= client_options 2051 server_context = ssl.SSLContext(server_protocol) 2052 server_context.options |= server_options 2053 2054 # NOTE: we must enable "ALL" ciphers on the client, otherwise an 2055 # SSLv23 client will send an SSLv3 hello (rather than SSLv2) 2056 # starting from OpenSSL 1.0.0 (see issue #8322). 2057 if client_context.protocol == ssl.PROTOCOL_SSLv23: 2058 client_context.set_ciphers("ALL") 2059 2060 for ctx in (client_context, server_context): 2061 ctx.verify_mode = certsreqs 2062 ctx.load_cert_chain(CERTFILE) 2063 ctx.load_verify_locations(CERTFILE) 2064 try: 2065 stats = server_params_test(client_context, server_context, 2066 chatty=False, connectionchatty=False) 2067 # Protocol mismatch can result in either an SSLError, or a 2068 # "Connection reset by peer" error. 2069 except ssl.SSLError: 2070 if expect_success: 2071 raise 2072 except socket.error as e: 2073 if expect_success or e.errno != errno.ECONNRESET: 2074 raise 2075 else: 2076 if not expect_success: 2077 raise AssertionError( 2078 "Client protocol %s succeeded with server protocol %s!" 2079 % (ssl.get_protocol_name(client_protocol), 2080 ssl.get_protocol_name(server_protocol))) 2081 elif (expect_success is not True 2082 and expect_success != stats['version']): 2083 raise AssertionError("version mismatch: expected %r, got %r" 2084 % (expect_success, stats['version'])) 2085 2086 2087 class ThreadedTests(unittest.TestCase): 2088 2089 @skip_if_broken_ubuntu_ssl 2090 def test_echo(self): 2091 """Basic test of an SSL client connecting to a server""" 2092 if support.verbose: 2093 sys.stdout.write("\n") 2094 for protocol in PROTOCOLS: 2095 context = ssl.SSLContext(protocol) 2096 context.load_cert_chain(CERTFILE) 2097 server_params_test(context, context, 2098 chatty=True, connectionchatty=True) 2099 2100 def test_getpeercert(self): 2101 if support.verbose: 2102 sys.stdout.write("\n") 2103 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2104 context.verify_mode = ssl.CERT_REQUIRED 2105 context.load_verify_locations(CERTFILE) 2106 context.load_cert_chain(CERTFILE) 2107 server = ThreadedEchoServer(context=context, chatty=False) 2108 with server: 2109 s = context.wrap_socket(socket.socket(), 2110 do_handshake_on_connect=False) 2111 s.connect((HOST, server.port)) 2112 # getpeercert() raise ValueError while the handshake isn't 2113 # done. 2114 with self.assertRaises(ValueError): 2115 s.getpeercert() 2116 s.do_handshake() 2117 cert = s.getpeercert() 2118 self.assertTrue(cert, "Can't get peer certificate.") 2119 cipher = s.cipher() 2120 if support.verbose: 2121 sys.stdout.write(pprint.pformat(cert) + '\n') 2122 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') 2123 if 'subject' not in cert: 2124 self.fail("No subject field in certificate: %s." % 2125 pprint.pformat(cert)) 2126 if ((('organizationName', 'Python Software Foundation'),) 2127 not in cert['subject']): 2128 self.fail( 2129 "Missing or invalid 'organizationName' field in certificate subject; " 2130 "should be 'Python Software Foundation'.") 2131 self.assertIn('notBefore', cert) 2132 self.assertIn('notAfter', cert) 2133 before = ssl.cert_time_to_seconds(cert['notBefore']) 2134 after = ssl.cert_time_to_seconds(cert['notAfter']) 2135 self.assertLess(before, after) 2136 s.close() 2137 2138 @unittest.skipUnless(have_verify_flags(), 2139 "verify_flags need OpenSSL > 0.9.8") 2140 def test_crl_check(self): 2141 if support.verbose: 2142 sys.stdout.write("\n") 2143 2144 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2145 server_context.load_cert_chain(SIGNED_CERTFILE) 2146 2147 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2148 context.verify_mode = ssl.CERT_REQUIRED 2149 context.load_verify_locations(SIGNING_CA) 2150 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0) 2151 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf) 2152 2153 # VERIFY_DEFAULT should pass 2154 server = ThreadedEchoServer(context=server_context, chatty=True) 2155 with server: 2156 with closing(context.wrap_socket(socket.socket())) as s: 2157 s.connect((HOST, server.port)) 2158 cert = s.getpeercert() 2159 self.assertTrue(cert, "Can't get peer certificate.") 2160 2161 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails 2162 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF 2163 2164 server = ThreadedEchoServer(context=server_context, chatty=True) 2165 with server: 2166 with closing(context.wrap_socket(socket.socket())) as s: 2167 with self.assertRaisesRegexp(ssl.SSLError, 2168 "certificate verify failed"): 2169 s.connect((HOST, server.port)) 2170 2171 # now load a CRL file. The CRL file is signed by the CA. 2172 context.load_verify_locations(CRLFILE) 2173 2174 server = ThreadedEchoServer(context=server_context, chatty=True) 2175 with server: 2176 with closing(context.wrap_socket(socket.socket())) as s: 2177 s.connect((HOST, server.port)) 2178 cert = s.getpeercert() 2179 self.assertTrue(cert, "Can't get peer certificate.") 2180 2181 def test_check_hostname(self): 2182 if support.verbose: 2183 sys.stdout.write("\n") 2184 2185 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2186 server_context.load_cert_chain(SIGNED_CERTFILE) 2187 2188 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2189 context.verify_mode = ssl.CERT_REQUIRED 2190 context.check_hostname = True 2191 context.load_verify_locations(SIGNING_CA) 2192 2193 # correct hostname should verify 2194 server = ThreadedEchoServer(context=server_context, chatty=True) 2195 with server: 2196 with closing(context.wrap_socket(socket.socket(), 2197 server_hostname="localhost")) as s: 2198 s.connect((HOST, server.port)) 2199 cert = s.getpeercert() 2200 self.assertTrue(cert, "Can't get peer certificate.") 2201 2202 # incorrect hostname should raise an exception 2203 server = ThreadedEchoServer(context=server_context, chatty=True) 2204 with server: 2205 with closing(context.wrap_socket(socket.socket(), 2206 server_hostname="invalid")) as s: 2207 with self.assertRaisesRegexp(ssl.CertificateError, 2208 "hostname 'invalid' doesn't match u?'localhost'"): 2209 s.connect((HOST, server.port)) 2210 2211 # missing server_hostname arg should cause an exception, too 2212 server = ThreadedEchoServer(context=server_context, chatty=True) 2213 with server: 2214 with closing(socket.socket()) as s: 2215 with self.assertRaisesRegexp(ValueError, 2216 "check_hostname requires server_hostname"): 2217 context.wrap_socket(s) 2218 2219 def test_wrong_cert(self): 2220 """Connecting when the server rejects the client's certificate 2221 2222 Launch a server with CERT_REQUIRED, and check that trying to 2223 connect to it with a wrong client certificate fails. 2224 """ 2225 certfile = os.path.join(os.path.dirname(__file__) or os.curdir, 2226 "wrongcert.pem") 2227 server = ThreadedEchoServer(CERTFILE, 2228 certreqs=ssl.CERT_REQUIRED, 2229 cacerts=CERTFILE, chatty=False, 2230 connectionchatty=False) 2231 with server, \ 2232 closing(socket.socket()) as sock, \ 2233 closing(ssl.wrap_socket(sock, 2234 certfile=certfile, 2235 ssl_version=ssl.PROTOCOL_TLSv1)) as s: 2236 try: 2237 # Expect either an SSL error about the server rejecting 2238 # the connection, or a low-level connection reset (which 2239 # sometimes happens on Windows) 2240 s.connect((HOST, server.port)) 2241 except ssl.SSLError as e: 2242 if support.verbose: 2243 sys.stdout.write("\nSSLError is %r\n" % e) 2244 except socket.error as e: 2245 if e.errno != errno.ECONNRESET: 2246 raise 2247 if support.verbose: 2248 sys.stdout.write("\nsocket.error is %r\n" % e) 2249 else: 2250 self.fail("Use of invalid cert should have failed!") 2251 2252 def test_rude_shutdown(self): 2253 """A brutal shutdown of an SSL server should raise an OSError 2254 in the client when attempting handshake. 2255 """ 2256 listener_ready = threading.Event() 2257 listener_gone = threading.Event() 2258 2259 s = socket.socket() 2260 port = support.bind_port(s, HOST) 2261 2262 # `listener` runs in a thread. It sits in an accept() until 2263 # the main thread connects. Then it rudely closes the socket, 2264 # and sets Event `listener_gone` to let the main thread know 2265 # the socket is gone. 2266 def listener(): 2267 s.listen(5) 2268 listener_ready.set() 2269 newsock, addr = s.accept() 2270 newsock.close() 2271 s.close() 2272 listener_gone.set() 2273 2274 def connector(): 2275 listener_ready.wait() 2276 with closing(socket.socket()) as c: 2277 c.connect((HOST, port)) 2278 listener_gone.wait() 2279 try: 2280 ssl_sock = ssl.wrap_socket(c) 2281 except socket.error: 2282 pass 2283 else: 2284 self.fail('connecting to closed SSL socket should have failed') 2285 2286 t = threading.Thread(target=listener) 2287 t.start() 2288 try: 2289 connector() 2290 finally: 2291 t.join() 2292 2293 @skip_if_broken_ubuntu_ssl 2294 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'), 2295 "OpenSSL is compiled without SSLv2 support") 2296 def test_protocol_sslv2(self): 2297 """Connecting to an SSLv2 server with various client options""" 2298 if support.verbose: 2299 sys.stdout.write("\n") 2300 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 2301 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) 2302 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) 2303 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False) 2304 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 2305 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 2306 # SSLv23 client with specific SSL options 2307 if no_sslv2_implies_sslv3_hello(): 2308 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2309 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2310 client_options=ssl.OP_NO_SSLv2) 2311 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2312 client_options=ssl.OP_NO_SSLv3) 2313 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2314 client_options=ssl.OP_NO_TLSv1) 2315 2316 @skip_if_broken_ubuntu_ssl 2317 def test_protocol_sslv23(self): 2318 """Connecting to an SSLv23 server with various client options""" 2319 if support.verbose: 2320 sys.stdout.write("\n") 2321 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2322 try: 2323 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) 2324 except socket.error as x: 2325 # this fails on some older versions of OpenSSL (0.9.7l, for instance) 2326 if support.verbose: 2327 sys.stdout.write( 2328 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" 2329 % str(x)) 2330 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2331 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False) 2332 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) 2333 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1') 2334 2335 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2336 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL) 2337 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) 2338 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL) 2339 2340 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2341 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED) 2342 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) 2343 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED) 2344 2345 # Server with specific SSL options 2346 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2347 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, 2348 server_options=ssl.OP_NO_SSLv3) 2349 # Will choose TLSv1 2350 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, 2351 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) 2352 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False, 2353 server_options=ssl.OP_NO_TLSv1) 2354 2355 2356 @skip_if_broken_ubuntu_ssl 2357 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'), 2358 "OpenSSL is compiled without SSLv3 support") 2359 def test_protocol_sslv3(self): 2360 """Connecting to an SSLv3 server with various client options""" 2361 if support.verbose: 2362 sys.stdout.write("\n") 2363 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3') 2364 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL) 2365 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED) 2366 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2367 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) 2368 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False, 2369 client_options=ssl.OP_NO_SSLv3) 2370 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) 2371 if no_sslv2_implies_sslv3_hello(): 2372 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2373 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 2374 False, client_options=ssl.OP_NO_SSLv2) 2375 2376 @skip_if_broken_ubuntu_ssl 2377 def test_protocol_tlsv1(self): 2378 """Connecting to a TLSv1 server with various client options""" 2379 if support.verbose: 2380 sys.stdout.write("\n") 2381 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1') 2382 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL) 2383 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED) 2384 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2385 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) 2386 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2387 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) 2388 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False, 2389 client_options=ssl.OP_NO_TLSv1) 2390 2391 @skip_if_broken_ubuntu_ssl 2392 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"), 2393 "TLS version 1.1 not supported.") 2394 def test_protocol_tlsv1_1(self): 2395 """Connecting to a TLSv1.1 server with various client options. 2396 Testing against older TLS versions.""" 2397 if support.verbose: 2398 sys.stdout.write("\n") 2399 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1') 2400 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2401 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False) 2402 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2403 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False) 2404 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False, 2405 client_options=ssl.OP_NO_TLSv1_1) 2406 2407 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1') 2408 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False) 2409 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False) 2410 2411 2412 @skip_if_broken_ubuntu_ssl 2413 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"), 2414 "TLS version 1.2 not supported.") 2415 def test_protocol_tlsv1_2(self): 2416 """Connecting to a TLSv1.2 server with various client options. 2417 Testing against older TLS versions.""" 2418 if support.verbose: 2419 sys.stdout.write("\n") 2420 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2', 2421 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2, 2422 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,) 2423 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2424 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False) 2425 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2426 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False) 2427 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False, 2428 client_options=ssl.OP_NO_TLSv1_2) 2429 2430 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2') 2431 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False) 2432 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False) 2433 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False) 2434 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False) 2435 2436 def test_starttls(self): 2437 """Switching from clear text to encrypted and back again.""" 2438 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6") 2439 2440 server = ThreadedEchoServer(CERTFILE, 2441 ssl_version=ssl.PROTOCOL_TLSv1, 2442 starttls_server=True, 2443 chatty=True, 2444 connectionchatty=True) 2445 wrapped = False 2446 with server: 2447 s = socket.socket() 2448 s.setblocking(1) 2449 s.connect((HOST, server.port)) 2450 if support.verbose: 2451 sys.stdout.write("\n") 2452 for indata in msgs: 2453 if support.verbose: 2454 sys.stdout.write( 2455 " client: sending %r...\n" % indata) 2456 if wrapped: 2457 conn.write(indata) 2458 outdata = conn.read() 2459 else: 2460 s.send(indata) 2461 outdata = s.recv(1024) 2462 msg = outdata.strip().lower() 2463 if indata == b"STARTTLS" and msg.startswith(b"ok"): 2464 # STARTTLS ok, switch to secure mode 2465 if support.verbose: 2466 sys.stdout.write( 2467 " client: read %r from server, starting TLS...\n" 2468 % msg) 2469 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) 2470 wrapped = True 2471 elif indata == b"ENDTLS" and msg.startswith(b"ok"): 2472 # ENDTLS ok, switch back to clear text 2473 if support.verbose: 2474 sys.stdout.write( 2475 " client: read %r from server, ending TLS...\n" 2476 % msg) 2477 s = conn.unwrap() 2478 wrapped = False 2479 else: 2480 if support.verbose: 2481 sys.stdout.write( 2482 " client: read %r from server\n" % msg) 2483 if support.verbose: 2484 sys.stdout.write(" client: closing connection.\n") 2485 if wrapped: 2486 conn.write(b"over\n") 2487 else: 2488 s.send(b"over\n") 2489 if wrapped: 2490 conn.close() 2491 else: 2492 s.close() 2493 2494 def test_socketserver(self): 2495 """Using a SocketServer to create and manage SSL connections.""" 2496 server = make_https_server(self, certfile=CERTFILE) 2497 # try to connect 2498 if support.verbose: 2499 sys.stdout.write('\n') 2500 with open(CERTFILE, 'rb') as f: 2501 d1 = f.read() 2502 d2 = '' 2503 # now fetch the same data from the HTTPS server 2504 url = 'https://localhost:%d/%s' % ( 2505 server.port, os.path.split(CERTFILE)[1]) 2506 context = ssl.create_default_context(cafile=CERTFILE) 2507 f = urllib2.urlopen(url, context=context) 2508 try: 2509 dlen = f.info().getheader("content-length") 2510 if dlen and (int(dlen) > 0): 2511 d2 = f.read(int(dlen)) 2512 if support.verbose: 2513 sys.stdout.write( 2514 " client: read %d bytes from remote server '%s'\n" 2515 % (len(d2), server)) 2516 finally: 2517 f.close() 2518 self.assertEqual(d1, d2) 2519 2520 def test_asyncore_server(self): 2521 """Check the example asyncore integration.""" 2522 if support.verbose: 2523 sys.stdout.write("\n") 2524 2525 indata = b"FOO\n" 2526 server = AsyncoreEchoServer(CERTFILE) 2527 with server: 2528 s = ssl.wrap_socket(socket.socket()) 2529 s.connect(('127.0.0.1', server.port)) 2530 if support.verbose: 2531 sys.stdout.write( 2532 " client: sending %r...\n" % indata) 2533 s.write(indata) 2534 outdata = s.read() 2535 if support.verbose: 2536 sys.stdout.write(" client: read %r\n" % outdata) 2537 if outdata != indata.lower(): 2538 self.fail( 2539 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" 2540 % (outdata[:20], len(outdata), 2541 indata[:20].lower(), len(indata))) 2542 s.write(b"over\n") 2543 if support.verbose: 2544 sys.stdout.write(" client: closing connection.\n") 2545 s.close() 2546 if support.verbose: 2547 sys.stdout.write(" client: connection closed.\n") 2548 2549 def test_recv_send(self): 2550 """Test recv(), send() and friends.""" 2551 if support.verbose: 2552 sys.stdout.write("\n") 2553 2554 server = ThreadedEchoServer(CERTFILE, 2555 certreqs=ssl.CERT_NONE, 2556 ssl_version=ssl.PROTOCOL_TLSv1, 2557 cacerts=CERTFILE, 2558 chatty=True, 2559 connectionchatty=False) 2560 with server: 2561 s = ssl.wrap_socket(socket.socket(), 2562 server_side=False, 2563 certfile=CERTFILE, 2564 ca_certs=CERTFILE, 2565 cert_reqs=ssl.CERT_NONE, 2566 ssl_version=ssl.PROTOCOL_TLSv1) 2567 s.connect((HOST, server.port)) 2568 # helper methods for standardising recv* method signatures 2569 def _recv_into(): 2570 b = bytearray(b"\0"*100) 2571 count = s.recv_into(b) 2572 return b[:count] 2573 2574 def _recvfrom_into(): 2575 b = bytearray(b"\0"*100) 2576 count, addr = s.recvfrom_into(b) 2577 return b[:count] 2578 2579 # (name, method, whether to expect success, *args) 2580 send_methods = [ 2581 ('send', s.send, True, []), 2582 ('sendto', s.sendto, False, ["some.address"]), 2583 ('sendall', s.sendall, True, []), 2584 ] 2585 recv_methods = [ 2586 ('recv', s.recv, True, []), 2587 ('recvfrom', s.recvfrom, False, ["some.address"]), 2588 ('recv_into', _recv_into, True, []), 2589 ('recvfrom_into', _recvfrom_into, False, []), 2590 ] 2591 data_prefix = u"PREFIX_" 2592 2593 for meth_name, send_meth, expect_success, args in send_methods: 2594 indata = (data_prefix + meth_name).encode('ascii') 2595 try: 2596 send_meth(indata, *args) 2597 outdata = s.read() 2598 if outdata != indata.lower(): 2599 self.fail( 2600 "While sending with <<{name:s}>> bad data " 2601 "<<{outdata:r}>> ({nout:d}) received; " 2602 "expected <<{indata:r}>> ({nin:d})\n".format( 2603 name=meth_name, outdata=outdata[:20], 2604 nout=len(outdata), 2605 indata=indata[:20], nin=len(indata) 2606 ) 2607 ) 2608 except ValueError as e: 2609 if expect_success: 2610 self.fail( 2611 "Failed to send with method <<{name:s}>>; " 2612 "expected to succeed.\n".format(name=meth_name) 2613 ) 2614 if not str(e).startswith(meth_name): 2615 self.fail( 2616 "Method <<{name:s}>> failed with unexpected " 2617 "exception message: {exp:s}\n".format( 2618 name=meth_name, exp=e 2619 ) 2620 ) 2621 2622 for meth_name, recv_meth, expect_success, args in recv_methods: 2623 indata = (data_prefix + meth_name).encode('ascii') 2624 try: 2625 s.send(indata) 2626 outdata = recv_meth(*args) 2627 if outdata != indata.lower(): 2628 self.fail( 2629 "While receiving with <<{name:s}>> bad data " 2630 "<<{outdata:r}>> ({nout:d}) received; " 2631 "expected <<{indata:r}>> ({nin:d})\n".format( 2632 name=meth_name, outdata=outdata[:20], 2633 nout=len(outdata), 2634 indata=indata[:20], nin=len(indata) 2635 ) 2636 ) 2637 except ValueError as e: 2638 if expect_success: 2639 self.fail( 2640 "Failed to receive with method <<{name:s}>>; " 2641 "expected to succeed.\n".format(name=meth_name) 2642 ) 2643 if not str(e).startswith(meth_name): 2644 self.fail( 2645 "Method <<{name:s}>> failed with unexpected " 2646 "exception message: {exp:s}\n".format( 2647 name=meth_name, exp=e 2648 ) 2649 ) 2650 # consume data 2651 s.read() 2652 2653 # read(-1, buffer) is supported, even though read(-1) is not 2654 data = b"data" 2655 s.send(data) 2656 buffer = bytearray(len(data)) 2657 self.assertEqual(s.read(-1, buffer), len(data)) 2658 self.assertEqual(buffer, data) 2659 2660 s.write(b"over\n") 2661 2662 self.assertRaises(ValueError, s.recv, -1) 2663 self.assertRaises(ValueError, s.read, -1) 2664 2665 s.close() 2666 2667 def test_recv_zero(self): 2668 server = ThreadedEchoServer(CERTFILE) 2669 server.__enter__() 2670 self.addCleanup(server.__exit__, None, None) 2671 s = socket.create_connection((HOST, server.port)) 2672 self.addCleanup(s.close) 2673 s = ssl.wrap_socket(s, suppress_ragged_eofs=False) 2674 self.addCleanup(s.close) 2675 2676 # recv/read(0) should return no data 2677 s.send(b"data") 2678 self.assertEqual(s.recv(0), b"") 2679 self.assertEqual(s.read(0), b"") 2680 self.assertEqual(s.read(), b"data") 2681 2682 # Should not block if the other end sends no data 2683 s.setblocking(False) 2684 self.assertEqual(s.recv(0), b"") 2685 self.assertEqual(s.recv_into(bytearray()), 0) 2686 2687 def test_handshake_timeout(self): 2688 # Issue #5103: SSL handshake must respect the socket timeout 2689 server = socket.socket(socket.AF_INET) 2690 host = "127.0.0.1" 2691 port = support.bind_port(server) 2692 started = threading.Event() 2693 finish = False 2694 2695 def serve(): 2696 server.listen(5) 2697 started.set() 2698 conns = [] 2699 while not finish: 2700 r, w, e = select.select([server], [], [], 0.1) 2701 if server in r: 2702 # Let the socket hang around rather than having 2703 # it closed by garbage collection. 2704 conns.append(server.accept()[0]) 2705 for sock in conns: 2706 sock.close() 2707 2708 t = threading.Thread(target=serve) 2709 t.start() 2710 started.wait() 2711 2712 try: 2713 try: 2714 c = socket.socket(socket.AF_INET) 2715 c.settimeout(0.2) 2716 c.connect((host, port)) 2717 # Will attempt handshake and time out 2718 self.assertRaisesRegexp(ssl.SSLError, "timed out", 2719 ssl.wrap_socket, c) 2720 finally: 2721 c.close() 2722 try: 2723 c = socket.socket(socket.AF_INET) 2724 c = ssl.wrap_socket(c) 2725 c.settimeout(0.2) 2726 # Will attempt handshake and time out 2727 self.assertRaisesRegexp(ssl.SSLError, "timed out", 2728 c.connect, (host, port)) 2729 finally: 2730 c.close() 2731 finally: 2732 finish = True 2733 t.join() 2734 server.close() 2735 2736 def test_server_accept(self): 2737 # Issue #16357: accept() on a SSLSocket created through 2738 # SSLContext.wrap_socket(). 2739 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2740 context.verify_mode = ssl.CERT_REQUIRED 2741 context.load_verify_locations(CERTFILE) 2742 context.load_cert_chain(CERTFILE) 2743 server = socket.socket(socket.AF_INET) 2744 host = "127.0.0.1" 2745 port = support.bind_port(server) 2746 server = context.wrap_socket(server, server_side=True) 2747 2748 evt = threading.Event() 2749 remote = [None] 2750 peer = [None] 2751 def serve(): 2752 server.listen(5) 2753 # Block on the accept and wait on the connection to close. 2754 evt.set() 2755 remote[0], peer[0] = server.accept() 2756 remote[0].recv(1) 2757 2758 t = threading.Thread(target=serve) 2759 t.start() 2760 # Client wait until server setup and perform a connect. 2761 evt.wait() 2762 client = context.wrap_socket(socket.socket()) 2763 client.connect((host, port)) 2764 client_addr = client.getsockname() 2765 client.close() 2766 t.join() 2767 remote[0].close() 2768 server.close() 2769 # Sanity checks. 2770 self.assertIsInstance(remote[0], ssl.SSLSocket) 2771 self.assertEqual(peer[0], client_addr) 2772 2773 def test_getpeercert_enotconn(self): 2774 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2775 with closing(context.wrap_socket(socket.socket())) as sock: 2776 with self.assertRaises(socket.error) as cm: 2777 sock.getpeercert() 2778 self.assertEqual(cm.exception.errno, errno.ENOTCONN) 2779 2780 def test_do_handshake_enotconn(self): 2781 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2782 with closing(context.wrap_socket(socket.socket())) as sock: 2783 with self.assertRaises(socket.error) as cm: 2784 sock.do_handshake() 2785 self.assertEqual(cm.exception.errno, errno.ENOTCONN) 2786 2787 def test_default_ciphers(self): 2788 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2789 try: 2790 # Force a set of weak ciphers on our client context 2791 context.set_ciphers("DES") 2792 except ssl.SSLError: 2793 self.skipTest("no DES cipher available") 2794 with ThreadedEchoServer(CERTFILE, 2795 ssl_version=ssl.PROTOCOL_SSLv23, 2796 chatty=False) as server: 2797 with closing(context.wrap_socket(socket.socket())) as s: 2798 with self.assertRaises(ssl.SSLError): 2799 s.connect((HOST, server.port)) 2800 self.assertIn("no shared cipher", str(server.conn_errors[0])) 2801 2802 def test_version_basic(self): 2803 """ 2804 Basic tests for SSLSocket.version(). 2805 More tests are done in the test_protocol_*() methods. 2806 """ 2807 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2808 with ThreadedEchoServer(CERTFILE, 2809 ssl_version=ssl.PROTOCOL_TLSv1, 2810 chatty=False) as server: 2811 with closing(context.wrap_socket(socket.socket())) as s: 2812 self.assertIs(s.version(), None) 2813 s.connect((HOST, server.port)) 2814 self.assertEqual(s.version(), 'TLSv1') 2815 self.assertIs(s.version(), None) 2816 2817 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL") 2818 def test_default_ecdh_curve(self): 2819 # Issue #21015: elliptic curve-based Diffie Hellman key exchange 2820 # should be enabled by default on SSL contexts. 2821 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2822 context.load_cert_chain(CERTFILE) 2823 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled 2824 # explicitly using the 'ECCdraft' cipher alias. Otherwise, 2825 # our default cipher list should prefer ECDH-based ciphers 2826 # automatically. 2827 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0): 2828 context.set_ciphers("ECCdraft:ECDH") 2829 with ThreadedEchoServer(context=context) as server: 2830 with closing(context.wrap_socket(socket.socket())) as s: 2831 s.connect((HOST, server.port)) 2832 self.assertIn("ECDH", s.cipher()[0]) 2833 2834 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, 2835 "'tls-unique' channel binding not available") 2836 def test_tls_unique_channel_binding(self): 2837 """Test tls-unique channel binding.""" 2838 if support.verbose: 2839 sys.stdout.write("\n") 2840 2841 server = ThreadedEchoServer(CERTFILE, 2842 certreqs=ssl.CERT_NONE, 2843 ssl_version=ssl.PROTOCOL_TLSv1, 2844 cacerts=CERTFILE, 2845 chatty=True, 2846 connectionchatty=False) 2847 with server: 2848 s = ssl.wrap_socket(socket.socket(), 2849 server_side=False, 2850 certfile=CERTFILE, 2851 ca_certs=CERTFILE, 2852 cert_reqs=ssl.CERT_NONE, 2853 ssl_version=ssl.PROTOCOL_TLSv1) 2854 s.connect((HOST, server.port)) 2855 # get the data 2856 cb_data = s.get_channel_binding("tls-unique") 2857 if support.verbose: 2858 sys.stdout.write(" got channel binding data: {0!r}\n" 2859 .format(cb_data)) 2860 2861 # check if it is sane 2862 self.assertIsNotNone(cb_data) 2863 self.assertEqual(len(cb_data), 12) # True for TLSv1 2864 2865 # and compare with the peers version 2866 s.write(b"CB tls-unique\n") 2867 peer_data_repr = s.read().strip() 2868 self.assertEqual(peer_data_repr, 2869 repr(cb_data).encode("us-ascii")) 2870 s.close() 2871 2872 # now, again 2873 s = ssl.wrap_socket(socket.socket(), 2874 server_side=False, 2875 certfile=CERTFILE, 2876 ca_certs=CERTFILE, 2877 cert_reqs=ssl.CERT_NONE, 2878 ssl_version=ssl.PROTOCOL_TLSv1) 2879 s.connect((HOST, server.port)) 2880 new_cb_data = s.get_channel_binding("tls-unique") 2881 if support.verbose: 2882 sys.stdout.write(" got another channel binding data: {0!r}\n" 2883 .format(new_cb_data)) 2884 # is it really unique 2885 self.assertNotEqual(cb_data, new_cb_data) 2886 self.assertIsNotNone(cb_data) 2887 self.assertEqual(len(cb_data), 12) # True for TLSv1 2888 s.write(b"CB tls-unique\n") 2889 peer_data_repr = s.read().strip() 2890 self.assertEqual(peer_data_repr, 2891 repr(new_cb_data).encode("us-ascii")) 2892 s.close() 2893 2894 def test_compression(self): 2895 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2896 context.load_cert_chain(CERTFILE) 2897 stats = server_params_test(context, context, 2898 chatty=True, connectionchatty=True) 2899 if support.verbose: 2900 sys.stdout.write(" got compression: {!r}\n".format(stats['compression'])) 2901 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' }) 2902 2903 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'), 2904 "ssl.OP_NO_COMPRESSION needed for this test") 2905 def test_compression_disabled(self): 2906 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2907 context.load_cert_chain(CERTFILE) 2908 context.options |= ssl.OP_NO_COMPRESSION 2909 stats = server_params_test(context, context, 2910 chatty=True, connectionchatty=True) 2911 self.assertIs(stats['compression'], None) 2912 2913 def test_dh_params(self): 2914 # Check we can get a connection with ephemeral Diffie-Hellman 2915 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2916 context.load_cert_chain(CERTFILE) 2917 context.load_dh_params(DHFILE) 2918 context.set_ciphers("kEDH") 2919 stats = server_params_test(context, context, 2920 chatty=True, connectionchatty=True) 2921 cipher = stats["cipher"][0] 2922 parts = cipher.split("-") 2923 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts: 2924 self.fail("Non-DH cipher: " + cipher[0]) 2925 2926 def test_selected_alpn_protocol(self): 2927 # selected_alpn_protocol() is None unless ALPN is used. 2928 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2929 context.load_cert_chain(CERTFILE) 2930 stats = server_params_test(context, context, 2931 chatty=True, connectionchatty=True) 2932 self.assertIs(stats['client_alpn_protocol'], None) 2933 2934 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required") 2935 def test_selected_alpn_protocol_if_server_uses_alpn(self): 2936 # selected_alpn_protocol() is None unless ALPN is used by the client. 2937 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2938 client_context.load_verify_locations(CERTFILE) 2939 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2940 server_context.load_cert_chain(CERTFILE) 2941 server_context.set_alpn_protocols(['foo', 'bar']) 2942 stats = server_params_test(client_context, server_context, 2943 chatty=True, connectionchatty=True) 2944 self.assertIs(stats['client_alpn_protocol'], None) 2945 2946 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test") 2947 def test_alpn_protocols(self): 2948 server_protocols = ['foo', 'bar', 'milkshake'] 2949 protocol_tests = [ 2950 (['foo', 'bar'], 'foo'), 2951 (['bar', 'foo'], 'foo'), 2952 (['milkshake'], 'milkshake'), 2953 (['http/3.0', 'http/4.0'], None) 2954 ] 2955 for client_protocols, expected in protocol_tests: 2956 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) 2957 server_context.load_cert_chain(CERTFILE) 2958 server_context.set_alpn_protocols(server_protocols) 2959 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) 2960 client_context.load_cert_chain(CERTFILE) 2961 client_context.set_alpn_protocols(client_protocols) 2962 2963 try: 2964 stats = server_params_test(client_context, 2965 server_context, 2966 chatty=True, 2967 connectionchatty=True) 2968 except ssl.SSLError as e: 2969 stats = e 2970 2971 if expected is None and IS_OPENSSL_1_1: 2972 # OpenSSL 1.1.0 raises handshake error 2973 self.assertIsInstance(stats, ssl.SSLError) 2974 else: 2975 msg = "failed trying %s (s) and %s (c).\n" \ 2976 "was expecting %s, but got %%s from the %%s" \ 2977 % (str(server_protocols), str(client_protocols), 2978 str(expected)) 2979 client_result = stats['client_alpn_protocol'] 2980 self.assertEqual(client_result, expected, 2981 msg % (client_result, "client")) 2982 server_result = stats['server_alpn_protocols'][-1] \ 2983 if len(stats['server_alpn_protocols']) else 'nothing' 2984 self.assertEqual(server_result, expected, 2985 msg % (server_result, "server")) 2986 2987 def test_selected_npn_protocol(self): 2988 # selected_npn_protocol() is None unless NPN is used 2989 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2990 context.load_cert_chain(CERTFILE) 2991 stats = server_params_test(context, context, 2992 chatty=True, connectionchatty=True) 2993 self.assertIs(stats['client_npn_protocol'], None) 2994 2995 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test") 2996 def test_npn_protocols(self): 2997 server_protocols = ['http/1.1', 'spdy/2'] 2998 protocol_tests = [ 2999 (['http/1.1', 'spdy/2'], 'http/1.1'), 3000 (['spdy/2', 'http/1.1'], 'http/1.1'), 3001 (['spdy/2', 'test'], 'spdy/2'), 3002 (['abc', 'def'], 'abc') 3003 ] 3004 for client_protocols, expected in protocol_tests: 3005 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3006 server_context.load_cert_chain(CERTFILE) 3007 server_context.set_npn_protocols(server_protocols) 3008 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3009 client_context.load_cert_chain(CERTFILE) 3010 client_context.set_npn_protocols(client_protocols) 3011 stats = server_params_test(client_context, server_context, 3012 chatty=True, connectionchatty=True) 3013 3014 msg = "failed trying %s (s) and %s (c).\n" \ 3015 "was expecting %s, but got %%s from the %%s" \ 3016 % (str(server_protocols), str(client_protocols), 3017 str(expected)) 3018 client_result = stats['client_npn_protocol'] 3019 self.assertEqual(client_result, expected, msg % (client_result, "client")) 3020 server_result = stats['server_npn_protocols'][-1] \ 3021 if len(stats['server_npn_protocols']) else 'nothing' 3022 self.assertEqual(server_result, expected, msg % (server_result, "server")) 3023 3024 def sni_contexts(self): 3025 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3026 server_context.load_cert_chain(SIGNED_CERTFILE) 3027 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3028 other_context.load_cert_chain(SIGNED_CERTFILE2) 3029 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3030 client_context.verify_mode = ssl.CERT_REQUIRED 3031 client_context.load_verify_locations(SIGNING_CA) 3032 return server_context, other_context, client_context 3033 3034 def check_common_name(self, stats, name): 3035 cert = stats['peercert'] 3036 self.assertIn((('commonName', name),), cert['subject']) 3037 3038 @needs_sni 3039 def test_sni_callback(self): 3040 calls = [] 3041 server_context, other_context, client_context = self.sni_contexts() 3042 3043 def servername_cb(ssl_sock, server_name, initial_context): 3044 calls.append((server_name, initial_context)) 3045 if server_name is not None: 3046 ssl_sock.context = other_context 3047 server_context.set_servername_callback(servername_cb) 3048 3049 stats = server_params_test(client_context, server_context, 3050 chatty=True, 3051 sni_name='supermessage') 3052 # The hostname was fetched properly, and the certificate was 3053 # changed for the connection. 3054 self.assertEqual(calls, [("supermessage", server_context)]) 3055 # CERTFILE4 was selected 3056 self.check_common_name(stats, 'fakehostname') 3057 3058 calls = [] 3059 # The callback is called with server_name=None 3060 stats = server_params_test(client_context, server_context, 3061 chatty=True, 3062 sni_name=None) 3063 self.assertEqual(calls, [(None, server_context)]) 3064 self.check_common_name(stats, 'localhost') 3065 3066 # Check disabling the callback 3067 calls = [] 3068 server_context.set_servername_callback(None) 3069 3070 stats = server_params_test(client_context, server_context, 3071 chatty=True, 3072 sni_name='notfunny') 3073 # Certificate didn't change 3074 self.check_common_name(stats, 'localhost') 3075 self.assertEqual(calls, []) 3076 3077 @needs_sni 3078 def test_sni_callback_alert(self): 3079 # Returning a TLS alert is reflected to the connecting client 3080 server_context, other_context, client_context = self.sni_contexts() 3081 3082 def cb_returning_alert(ssl_sock, server_name, initial_context): 3083 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED 3084 server_context.set_servername_callback(cb_returning_alert) 3085 3086 with self.assertRaises(ssl.SSLError) as cm: 3087 stats = server_params_test(client_context, server_context, 3088 chatty=False, 3089 sni_name='supermessage') 3090 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED') 3091 3092 @needs_sni 3093 def test_sni_callback_raising(self): 3094 # Raising fails the connection with a TLS handshake failure alert. 3095 server_context, other_context, client_context = self.sni_contexts() 3096 3097 def cb_raising(ssl_sock, server_name, initial_context): 3098 1.0/0.0 3099 server_context.set_servername_callback(cb_raising) 3100 3101 with self.assertRaises(ssl.SSLError) as cm, \ 3102 support.captured_stderr() as stderr: 3103 stats = server_params_test(client_context, server_context, 3104 chatty=False, 3105 sni_name='supermessage') 3106 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE') 3107 self.assertIn("ZeroDivisionError", stderr.getvalue()) 3108 3109 @needs_sni 3110 def test_sni_callback_wrong_return_type(self): 3111 # Returning the wrong return type terminates the TLS connection 3112 # with an internal error alert. 3113 server_context, other_context, client_context = self.sni_contexts() 3114 3115 def cb_wrong_return_type(ssl_sock, server_name, initial_context): 3116 return "foo" 3117 server_context.set_servername_callback(cb_wrong_return_type) 3118 3119 with self.assertRaises(ssl.SSLError) as cm, \ 3120 support.captured_stderr() as stderr: 3121 stats = server_params_test(client_context, server_context, 3122 chatty=False, 3123 sni_name='supermessage') 3124 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR') 3125 self.assertIn("TypeError", stderr.getvalue()) 3126 3127 def test_read_write_after_close_raises_valuerror(self): 3128 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 3129 context.verify_mode = ssl.CERT_REQUIRED 3130 context.load_verify_locations(CERTFILE) 3131 context.load_cert_chain(CERTFILE) 3132 server = ThreadedEchoServer(context=context, chatty=False) 3133 3134 with server: 3135 s = context.wrap_socket(socket.socket()) 3136 s.connect((HOST, server.port)) 3137 s.close() 3138 3139 self.assertRaises(ValueError, s.read, 1024) 3140 self.assertRaises(ValueError, s.write, b'hello') 3141 3142 3143def test_main(verbose=False): 3144 if support.verbose: 3145 plats = { 3146 'Linux': platform.linux_distribution, 3147 'Mac': platform.mac_ver, 3148 'Windows': platform.win32_ver, 3149 } 3150 for name, func in plats.items(): 3151 plat = func() 3152 if plat and plat[0]: 3153 plat = '%s %r' % (name, plat) 3154 break 3155 else: 3156 plat = repr(platform.platform()) 3157 print("test_ssl: testing with %r %r" % 3158 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO)) 3159 print(" under %s" % plat) 3160 print(" HAS_SNI = %r" % ssl.HAS_SNI) 3161 print(" OP_ALL = 0x%8x" % ssl.OP_ALL) 3162 try: 3163 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1) 3164 except AttributeError: 3165 pass 3166 3167 for filename in [ 3168 CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE, 3169 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, 3170 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA, 3171 BADCERT, BADKEY, EMPTYCERT]: 3172 if not os.path.exists(filename): 3173 raise support.TestFailed("Can't read certificate file %r" % filename) 3174 3175 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests] 3176 3177 if support.is_resource_enabled('network'): 3178 tests.append(NetworkedTests) 3179 3180 if _have_threads: 3181 thread_info = support.threading_setup() 3182 if thread_info: 3183 tests.append(ThreadedTests) 3184 3185 try: 3186 support.run_unittest(*tests) 3187 finally: 3188 if _have_threads: 3189 support.threading_cleanup(*thread_info) 3190 3191if __name__ == "__main__": 3192 test_main() 3193