17Simple handshake tests using gnutls-cli
20import argparse
21import collections
22import os
23import sys
24import ssl
25import socket
26import subprocess
27import itertools
28import multiprocessing
29from os import environ
30from multiprocessing.pool import ThreadPool
31from s2n_test_constants import *
33# A container to make passing the return values from an attempted handshake more convenient
34HANDSHAKE_RC = collections.namedtuple('HANDSHAKE_RC', 'handshake_success gnutls_stdout')
36LIBCRYPTO_SUPPORT_X25519 = ['openssl-1.1.1']
38# Helper to print just the SHA256 portion of SIGN-RSA-SHA256
39def sigalg_str_from_list(sigalgs):
40    # strip the first nine bytes from each name for "SIGN-RSA", 11 for "SIGN-ECDSA"
41    return ":".join(x[9:] if x.startswith("SIGN-RSA") else x[11:] for x in sigalgs)
43def try_gnutls_handshake(endpoint, port, priority_str, mfl_extension_test, ssl_version, enter_fips_mode=False):
44    # Fire up s2nd
45    s2nd_cmd = ["../../bin/s2nd", str(endpoint), str(port)]
46    s2nd_ciphers = "test_all_tls12"
48    if enter_fips_mode == True:
49        s2nd_ciphers = "test_all_fips"
50        s2nd_cmd.append("--enter-fips-mode")
51    s2nd_cmd.append("-c")
52    s2nd_cmd.append(s2nd_ciphers)
53    if "ECDSA" in priority_str:
54        s2nd_ciphers = "test_all_ecdsa"
55        s2nd_cmd.extend(["--cert", TEST_ECDSA_CERT])
56        s2nd_cmd.extend(["--key", TEST_ECDSA_KEY])
57    if mfl_extension_test:
58        s2nd_cmd.append("--enable-mfl")
60    s2nd = subprocess.Popen(s2nd_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
62    # Make sure it's running
63    s2nd.stdout.readline()
65    gnutls_cmd = ["gnutls-cli", "--priority=" + priority_str,"--insecure", "-p " + str(port), str(endpoint)]
67    if mfl_extension_test:
68        gnutls_cmd.append("--recordsize=" + str(mfl_extension_test))
70    # Fire up gnutls-cli, use insecure since s2nd is using a dummy cert
71    gnutls_cli = subprocess.Popen(gnutls_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
73    # Save the initial output of gnutls-cli to parse the negotiated handshake parameters later
74    gnutls_initial_stdout_str = ""
75    for line in range(0 , 100):
76        output = gnutls_cli.stdout.readline().decode("utf-8")
77        gnutls_initial_stdout_str += output + "\n"
78        # Once we see this string, we have read enough output to determine which signature algorithm was used
79        if "Simple Client Mode" in output:
80            break
82    # Write the priority str towards s2nd. Prepend with the 's2n' string to make sure we don't accidently match something
83    # in the gnutls-cli handshake output
84    written_str = "s2n" + priority_str
85    gnutls_cli.stdin.write((written_str + "\n").encode("utf-8"))
86    gnutls_cli.stdin.flush()
88    # Read it
89    found = 0
90    right_version = 0
91    for line in range(0, 50):
92        output = s2nd.stdout.readline().decode("utf-8")
93        if output.strip() == written_str:
94            found = 1
95            break
96        if ACTUAL_VERSION_STR.format(ssl_version or S2N_TLS12) in output:
97            right_version = 1
99    if found == 0 or right_version == 0:
100        return HANDSHAKE_RC(False, gnutls_initial_stdout_str)
102    # Write the cipher name from s2n
103    s2nd.stdin.write((written_str + "\n").encode("utf-8"))
104    s2nd.stdin.flush()
105    found = 0
106    for line in range(0, 50):
107        output = gnutls_cli.stdout.readline().decode("utf-8")
108        if output.strip() == written_str:
109            found = 1
110            break
112    if found == 0:
113        return HANDSHAKE_RC(False, gnutls_initial_stdout_str)
115    gnutls_cli.kill()
116    gnutls_cli.wait()
117    s2nd.kill()
118    s2nd.wait()
119    return HANDSHAKE_RC(True, gnutls_initial_stdout_str)
121def handshake(endpoint, port, cipher_name, ssl_version, priority_str, digests, curves, mfl_extension_test, fips_mode,
122        other_prefix=None):
123    ret = try_gnutls_handshake(endpoint, port, priority_str, mfl_extension_test, ssl_version, fips_mode)
125    prefix = other_prefix or ""
126    if mfl_extension_test:
127        prefix += "MFL: %-10s Cipher: %-10s Vers: %-10s ... " % (mfl_extension_test, cipher_name, S2N_PROTO_VERS_TO_STR[ssl_version])
128    elif digests:
129        # strip the first nine bytes from each name for "SIGN-RSA", 11 for "SIGN-ECDSA"
130        prefix += "Digests: %-40s Vers: %-10s ... " % (sigalg_str_from_list(digests), S2N_PROTO_VERS_TO_STR[ssl_version])
131    elif curves:
132         # strip the first 6 bytes of each curve name ("CURVE-")
133         curve_string = ":".join([x[6:] for x in curves])
134         prefix += "Curves: %-40s Vers: %-10s ... " % (curve_string, S2N_PROTO_VERS_TO_STR[ssl_version])
135    else:
136        prefix += "Cipher: %-30s Vers: %-10s ... " % (cipher_name, S2N_PROTO_VERS_TO_STR[ssl_version])
138    suffix = ""
139    if ret.handshake_success == True:
140        if sys.stdout.isatty():
141            suffix = "\033[32;1mPASSED\033[0m"
142        else:
143            suffix = "PASSED"
144    else:
145        if sys.stdout.isatty():
146            suffix = "\033[31;1mFAILED\033[0m"
147        else:
148            suffix = "FAILED"
149    print(prefix + suffix)
150    return ret
152def create_thread_pool():
153    threadpool_size = multiprocessing.cpu_count() * 2  #Multiply by 2 since performance improves slightly if CPU has hyperthreading
154    print("\n\tCreating ThreadPool of size: " + str(threadpool_size))
155    threadpool = ThreadPool(processes=threadpool_size)
156    return threadpool
158def main():
159    parser = argparse.ArgumentParser(description='Runs TLS server integration tests against s2nd using gnutls-cli')
160    parser.add_argument('host', help='The host for s2nd to bind to')
161    parser.add_argument('port', type=int, help='The port for s2nd to bind to')
162    parser.add_argument('--libcrypto', default='openssl-1.1.1', choices=S2N_LIBCRYPTO_CHOICES,
163            help="""The Libcrypto that s2n was built with. s2n supports different cipher suites depending on
164                    libcrypto version. Defaults to openssl-1.1.1.""")
165    args = parser.parse_args()
167    # Retrieve the test ciphers to use based on the libcrypto version s2n was built with
168    test_ciphers = S2N_LIBCRYPTO_TO_TEST_CIPHERS[args.libcrypto]
169    host = args.host
170    port = args.port
172    fips_mode = False
173    if environ.get("S2N_TEST_IN_FIPS_MODE") is not None:
174        fips_mode = True
175        print("\nRunning s2nd in FIPS mode.")
177    print("\nRunning GnuTLS handshake tests with: " + os.popen('gnutls-cli --version | grep -w gnutls-cli').read())
178    for ssl_version in [S2N_SSLv3, S2N_TLS10, S2N_TLS11, S2N_TLS12]:
180        if ssl_version == S2N_SSLv3 and fips_mode == True:
181            # FIPS does not permit the use of SSLv3
182            continue
184        print("\n\tTesting ciphers using client version: " + S2N_PROTO_VERS_TO_STR[ssl_version])
185        threadpool = create_thread_pool()
186        port_offset = 0
187        results = []
189        for cipher in test_ciphers:
190            # Use the Openssl name for printing
191            cipher_name = cipher.openssl_name
192            cipher_priority_str = cipher.gnutls_priority_str
193            cipher_vers = cipher.min_tls_vers
195            if ssl_version < cipher_vers:
196                continue
198            # gnutls-cli always adds tls extensions to client hello, add NO_EXTENSIONS flag for SSLv3 to avoid that
199            if ssl_version == S2N_SSLv3:
200                cipher_priority_str = cipher_priority_str + ":%NO_EXTENSIONS"
202            # Add the SSL version to make the cipher priority string fully qualified
203            complete_priority_str = cipher_priority_str + ":+" + S2N_PROTO_VERS_TO_GNUTLS[ssl_version] + ":+SIGN-ALL" + ":+CURVE-ALL"
205            async_result = threadpool.apply_async(handshake, (host, port + port_offset, cipher_name, ssl_version, complete_priority_str, [], [], 0, fips_mode))
206            port_offset += 1
207            results.append(async_result)
209        threadpool.close()
210        threadpool.join()
211        for async_result in results:
212            if async_result.get().handshake_success == False:
213                return -1
215    # Produce permutations of every accepted signature algorithm in every possible order
216    for size in range(1, min(MAX_ITERATION_DEPTH, len(EXPECTED_RSA_SIGNATURE_ALGORITHM_PREFS)) + 1):
217        print("\n\tTesting ciphers using RSA signature preferences of size: " + str(size))
218        threadpool = create_thread_pool()
219        port_offset = 0
220        results = []
221        for permutation in itertools.permutations(EXPECTED_RSA_SIGNATURE_ALGORITHM_PREFS, size):
222            # Try an ECDHE cipher suite and a DHE one
223            for cipher in filter(lambda x: x.openssl_name == "ECDHE-RSA-AES128-GCM-SHA256" or x.openssl_name == "DHE-RSA-AES128-GCM-SHA256", ALL_TEST_CIPHERS):
224                if fips_mode and cipher.openssl_fips_compatible == False:
225                    continue
226                complete_priority_str = cipher.gnutls_priority_str + ":+CURVE-ALL" + ":+VERS-TLS1.2:+" + ":+".join(permutation)
227                async_result = threadpool.apply_async(handshake,(host, port + port_offset, cipher.openssl_name, S2N_TLS12, complete_priority_str, permutation, [], 0, fips_mode))
228                port_offset += 1
229                results.append(async_result)
231        threadpool.close()
232        threadpool.join()
233        for async_result in results:
234            if async_result.get().handshake_success == False:
235                return -1
237    # Try ECDSA signature algorithm permutations. When we support multiple certificates, we can combine the RSA and ECDSA tests
239        print("\n\tTesting ciphers using ECDSA signature preferences of size: " + str(size))
240        threadpool = create_thread_pool()
241        port_offset = 0
242        results = []
243        for permutation in itertools.permutations(EXPECTED_ECDSA_SIGNATURE_ALGORITHM_PREFS, size):
244            for cipher in filter(lambda x: x.openssl_name == "ECDHE-ECDSA-AES128-SHA", ALL_TEST_CIPHERS):
245                if fips_mode and cipher.openssl_fips_compatible == False:
246                    continue
247                complete_priority_str = cipher.gnutls_priority_str + ":+CURVE-ALL" + ":+VERS-TLS1.2:+" + ":+".join(permutation)
248                async_result = threadpool.apply_async(handshake,(host, port + port_offset, cipher.openssl_name, S2N_TLS12, complete_priority_str, permutation, [], 0, fips_mode))
249                port_offset += 1
250                results.append(async_result)
252        threadpool.close()
253        threadpool.join()
254        for async_result in results:
255            if async_result.get().handshake_success == False:
256                return -1
258    # Test that s2n's server Signature Algorithm preferences are as expected.
259    # This is a brittle test that must be kept in sync with the signature algorithm preference lists in the core code,
260    # but made manageable by rarity of signature algorithm preference updates.
261    print("\n\tTesting RSA Signature Algorithm preferences")
262    print("\n\tExpected preference order: " + ",".join(EXPECTED_RSA_SIGNATURE_ALGORITHM_PREFS))
263    for i in range(0, len(EXPECTED_RSA_SIGNATURE_ALGORITHM_PREFS)):
264        # To find the Nth preferred signature algorithm, generate a priority string with ALL sigalgs then subtract any
265        # higher preference sigalgs we've already found.
266        current_preferences_found = EXPECTED_RSA_SIGNATURE_ALGORITHM_PREFS[:i]
267        # We expect to negotiate sigalg at preference i if previous i - 1 sigalgs are removed.
268        expected_sigalg = EXPECTED_RSA_SIGNATURE_ALGORITHM_PREFS[i]
269        for cipher in filter(lambda x: x.openssl_name == "ECDHE-RSA-AES128-SHA", ALL_TEST_CIPHERS):
270            if fips_mode and cipher.openssl_fips_compatible == False:
271                continue
272            sig_algs_to_remove = ":!".join(current_preferences_found)
273            sig_algs = "SIGN-ALL"
274            if len(sig_algs_to_remove) > 0:
275                sig_algs += ":!" + sig_algs_to_remove
276            priority_str = cipher.gnutls_priority_str + ":+CURVE-ALL" + ":+VERS-TLS1.2:+" + sig_algs
277            rc = handshake(host, port, cipher.openssl_name, S2N_TLS12, priority_str, [], [], 0, fips_mode, "Preferences found: %-40s "
278                    % (sigalg_str_from_list(current_preferences_found)))
279            if rc.handshake_success == False:
280                print("Failed to negotiate " + expected_sigalg + " as expected! Priority string: "
281                        + priority_str)
282                return -1
283            negotiated_sigalg_line = [line for line in rc.gnutls_stdout.split('\n') if "Server Signature" in line]
284            if len(negotiated_sigalg_line) == 0:
285                print("Failed to find negotiated sig alg in gnutls-cli output! Priority string: " + priority_str)
286                return -1
288            # The gnutls-cli output is for sigalgs is of the format "Server Signature : $SIGALG"
289            # Confusingly, $SIGALG is in GnuTLS priority string format with the "SIGN" part of the string removed.
290            # Restore it to this string for comparison with existing list.
291            negotiated_sigalg = "SIGN-" + negotiated_sigalg_line[0].split(":")[1].strip()
292            if negotiated_sigalg != expected_sigalg:
293                print("Failed to negotiate the expected sigalg! Expected " + expected_sigalg
294                        + " Got: " + negotiated_sigalg + " at position " + str(i) + " in the preference list" +
295                        " Priority string: " + priority_str)
296                return -1
298    print("\n\tTesting ECDSA Signature Algorithm preferences")
299    print("\n\tExpected preference order: " + ",".join(EXPECTED_ECDSA_SIGNATURE_ALGORITHM_PREFS))
300    for i in range(0, len(EXPECTED_ECDSA_SIGNATURE_ALGORITHM_PREFS)):
301        # To find the Nth preferred signature algorithm, generate a priority string with ALL sigalgs then subtract any
302        # higher preference sigalgs we've already found.
303        current_preferences_found = EXPECTED_ECDSA_SIGNATURE_ALGORITHM_PREFS[:i]
304        # We expect to negotiate sigalg at preference i if previous i - 1 sigalgs are removed.
305        expected_sigalg = EXPECTED_ECDSA_SIGNATURE_ALGORITHM_PREFS[i]
306        for cipher in filter(lambda x: x.openssl_name == "ECDHE-ECDSA-AES128-SHA", ALL_TEST_CIPHERS):
307            if fips_mode and cipher.openssl_fips_compatible == False:
308                continue
309            sig_algs_to_remove = ":!".join(current_preferences_found)
310            sig_algs = "SIGN-ALL"
311            if len(sig_algs_to_remove) > 0:
312                sig_algs += ":!" + sig_algs_to_remove
313            priority_str = cipher.gnutls_priority_str + ":+CURVE-ALL" + ":+VERS-TLS1.2:+" + sig_algs
314            rc = handshake(host, port, cipher.openssl_name, S2N_TLS12, priority_str, [], [], 0, fips_mode, "Preferences found: %-40s "
315                    % (sigalg_str_from_list(current_preferences_found)))
316            if rc.handshake_success == False:
317                print("Failed to negotiate " + expected_sigalg + " as expected! Priority string: " +
318                        priority_str)
319                return -1
320            negotiated_sigalg_line = [line for line in rc.gnutls_stdout.split('\n') if "Server Signature" in line]
321            if len(negotiated_sigalg_line) == 0:
322                print("Failed to find negotiated sig alg in gnutls-cli output! Priority string: " + priority_str)
323                return -1
325            # The gnutls-cli output is for sigalgs is of the format "Server Signature : $SIGALG"
326            # Confusingly, $SIGALG is in GnuTLS priority string format with the "SIGN" part of the string removed.
327            # Restore it to this string for comparison with existing list.
328            negotiated_sigalg = "SIGN-" + negotiated_sigalg_line[0].split(":")[1].strip()
329            if negotiated_sigalg != expected_sigalg:
330                print("Failed to negotiate the expected sigalg! Expected " + expected_sigalg
331                        + " Got: " + negotiated_sigalg + " at position " + str(i) + " in the preference list" +
332                        " Priority string: " + priority_str)
333                return -1
335    # Produce permutations of every curve s2n supports in every possible order
336    curves = ["CURVE-SECP256R1", "CURVE-SECP384R1", "CURVE-SECP521R1"]
337    for size in range(1, len(curves) + 1):
338        print("\n\tTesting named curve preferences of size: " + str(size))
339        threadpool = create_thread_pool()
340        port_offset = 0
341        results = []
342        for permutation in itertools.permutations(curves,size):
343            # Use an arbitrary ECDHE kx cipher
344            cipher = [x for x in ALL_TEST_CIPHERS if x.openssl_name == "ECDHE-RSA-AES128-GCM-SHA256"][0]
345            complete_priority_str = cipher.gnutls_priority_str + ":+SIGN-ALL" + ":+VERS-TLS1.2:+" + ":+".join(permutation)
346            async_result = threadpool.apply_async(handshake, (host, port + port_offset, cipher.openssl_name, S2N_TLS12, complete_priority_str, [], permutation, 0, fips_mode))
347            port_offset += 1
348            results.append(async_result)
349        threadpool.close()
350        threadpool.join()
351        for async_result in results:
352            if async_result.get().handshake_success == False:
353                return -1
355    print("\n\tTesting handshakes with Max Fragment Length Extension")
356    for ssl_version in [S2N_TLS10, S2N_TLS11, S2N_TLS12]:
357        print("\n\tTesting Max Fragment Length Extension using client version: " + S2N_PROTO_VERS_TO_STR[ssl_version])
358        threadpool = create_thread_pool()
359        port_offset = 0
360        results = []
361        for mfl_extension_test in [512, 1024, 2048, 4096]:
362            cipher = test_ciphers[0]
363            complete_priority_str = cipher.gnutls_priority_str + ":+CURVE-ALL" + ":+" + S2N_PROTO_VERS_TO_GNUTLS[ssl_version] + ":+SIGN-ALL"
364            async_result = threadpool.apply_async(handshake,(host, port + port_offset, cipher.openssl_name, ssl_version, complete_priority_str, [], [], mfl_extension_test, fips_mode))
365            port_offset += 1
366            results.append(async_result)
368        threadpool.close()
369        threadpool.join()
370        for async_result in results:
371            if async_result.get().handshake_success == False:
372                return -1
374if __name__ == "__main__":
375    sys.exit(main())