1# 2# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"). 5# You may not use this file except in compliance with the License. 6# A copy of the License is located at 7# 8# http://aws.amazon.com/apache2.0 9# 10# or in the "license" file accompanying this file. This file is distributed 11# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12# express or implied. See the License for the specific language governing 13# permissions and limitations under the License. 14# 15 16""" 17Simple handshake tests using gnutls-serv 18""" 19 20import argparse 21import collections 22import os 23import sys 24import ssl 25import socket 26import subprocess 27import itertools 28import multiprocessing 29from os import environ 30from multiprocessing.pool import ThreadPool 31from s2n_test_constants import * 32 33def try_gnutls_handshake(endpoint, port, priority_str, session_tickets, ocsp): 34 gnutls_cmd = ["gnutls-serv", "--priority=" + priority_str, "-p " + str(port), 35 "--dhparams", TEST_DH_PARAMS] 36 37 if "ECDSA" in priority_str: 38 ocsp_response = TEST_OCSP_ECDSA_RESPONSE_FILE 39 40 # When gnutls-serv is provided with incorrect staple, it'll indicate 41 # that it's going to send an OCSP response through ServerHello 42 # extension, but it won't send the ServerCertStatus message. 43 if ocsp == OCSP.MALFORMED: 44 ocsp_response = TEST_OCSP_RESPONSE_FILE 45 46 gnutls_cmd.extend(["--x509keyfile", TEST_OCSP_ECDSA_KEY, "--x509certfile", TEST_OCSP_ECDSA_CERT, 47 "--ocsp-response", ocsp_response]) 48 else: 49 ocsp_response = TEST_OCSP_RESPONSE_FILE 50 51 if ocsp == OCSP.MALFORMED: 52 ocsp_response = TEST_OCSP_ECDSA_RESPONSE_FILE 53 54 gnutls_cmd.extend(["--x509keyfile", TEST_OCSP_KEY, "--x509certfile", TEST_OCSP_CERT, 55 "--ocsp-response", ocsp_response]) 56 57 if not session_tickets: 58 gnutls_cmd.append("--noticket") 59 60 # Fire up gnutls-serv 61 gnutls_serv = subprocess.Popen(gnutls_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 62 63 # Make sure it's running 64 gnutls_serv.stderr.readline() 65 66 # Fire up s2nc 67 s2nc_cipher_suite = "test_all" 68 if "ECDSA" in priority_str: 69 s2nc_cipher_suite = "test_all_ecdsa" 70 71 s2nc_cmd = ["../../bin/s2nc", "-i", "-r", "-c", s2nc_cipher_suite, str(endpoint), str(port)] 72 73 if ocsp != OCSP.DISABLED: 74 s2nc_cmd.append("-s") 75 76 s2nc = subprocess.Popen(s2nc_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 77 78 # Read it 79 found = 0 80 right_version = 0 81 for line in range(0, 50): 82 output = s2nc.stdout.readline().decode("utf-8") 83 if output.strip().startswith("Connected to"): 84 found = 1 85 if ACTUAL_VERSION_STR.format(S2N_TLS12) in output: 86 right_version = 1 87 88 gnutls_serv.kill() 89 gnutls_serv.communicate() 90 gnutls_serv.wait() 91 92 s2nc.kill() 93 s2nc.communicate() 94 s2nc.wait() 95 96 return found == 1 and right_version == 1 97 98def handshake(endpoint, port, cipher, session_tickets, ocsp): 99 success = try_gnutls_handshake(endpoint, port, cipher.gnutls_priority_str + ":+CURVE-ALL:+VERS-TLS1.2:+SIGN-ALL:+SHA1", session_tickets, ocsp) 100 101 prefix = "Cipher: %-30s Session Tickets: %-5s OCSP: %-5s ... " % (cipher.openssl_name, session_tickets, ocsp) 102 103 suffix = "" 104 if success: 105 if sys.stdout.isatty(): 106 suffix = "\033[32;1mPASSED\033[0m" 107 else: 108 suffix = "PASSED" 109 else: 110 if sys.stdout.isatty(): 111 suffix = "\033[31;1mFAILED\033[0m" 112 else: 113 suffix = "FAILED" 114 print(prefix + suffix) 115 return success 116 117 118def create_thread_pool(): 119 threadpool_size = multiprocessing.cpu_count() * 2 #Multiply by 2 since performance improves slightly if CPU has hyperthreading 120 print("\n\tCreating ThreadPool of size: " + str(threadpool_size)) 121 threadpool = ThreadPool(processes=threadpool_size) 122 return threadpool 123 124 125def main(): 126 parser = argparse.ArgumentParser(description='Runs TLS server integration tests against s2nd using gnutls-cli') 127 parser.add_argument('host', help='The host for gnutls-serv to bind to') 128 parser.add_argument('port', type=int, help='The port for gnutls-serv to bind to') 129 parser.add_argument('--libcrypto', default='openssl-1.1.1', choices=S2N_LIBCRYPTO_CHOICES, 130 help="""The Libcrypto that s2n was built with. s2n supports different cipher suites depending on 131 libcrypto version. Defaults to openssl-1.1.1.""") 132 args = parser.parse_args() 133 134 # Retrieve the test ciphers to use based on the libcrypto version s2n was built with 135 test_ciphers = S2N_LIBCRYPTO_TO_TEST_CIPHERS[args.libcrypto] 136 host = args.host 137 port = args.port 138 139 print("\nRunning GnuTLS handshake tests with: " + os.popen('gnutls-serv --version | grep -w gnutls-serv').read()) 140 141 # gnutls-serv requests cient cert by default, but allows empty cert to be 142 # provided, test that this functionality work with and without session 143 # tickets for all cipher suites and handshakes with and without OCSP staple 144 threadpool = create_thread_pool() 145 port_offset = 0 146 results = [] 147 for cipher in test_ciphers: 148 for session_tickets in [True, False]: 149 for ocsp in S2N_LIBCRYPTO_TO_OCSP[args.libcrypto]: 150 async_result = threadpool.apply_async(handshake, (host, port + port_offset, cipher, session_tickets, ocsp)) 151 port_offset += 1 152 results.append(async_result) 153 threadpool.close() 154 threadpool.join() 155 for async_result in results: 156 if not async_result.get(): 157 return -1 158 159 return 0 160 161if __name__ == "__main__": 162 sys.exit(main()) 163