1#
2# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License").
5# You may not use this file except in compliance with the License.
6# A copy of the License is located at
7#
8#  http://aws.amazon.com/apache2.0
9#
10# or in the "license" file accompanying this file. This file is distributed
11# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12# express or implied. See the License for the specific language governing
13# permissions and limitations under the License.
14#
15
16"""
17Simple handshake tests using gnutls-serv
18"""
19
20import argparse
21import collections
22import os
23import sys
24import ssl
25import socket
26import subprocess
27import itertools
28import multiprocessing
29from os import environ
30from multiprocessing.pool import ThreadPool
31from s2n_test_constants import *
32
33def try_gnutls_handshake(endpoint, port, priority_str, session_tickets, ocsp):
34    gnutls_cmd = ["gnutls-serv", "--priority=" + priority_str, "-p " + str(port),
35            "--dhparams", TEST_DH_PARAMS]
36
37    if "ECDSA" in priority_str:
38        ocsp_response = TEST_OCSP_ECDSA_RESPONSE_FILE
39
40        # When gnutls-serv is provided with incorrect staple, it'll indicate
41        # that it's going to send an OCSP response through ServerHello
42        # extension, but it won't send the ServerCertStatus message.
43        if ocsp == OCSP.MALFORMED:
44            ocsp_response = TEST_OCSP_RESPONSE_FILE
45
46        gnutls_cmd.extend(["--x509keyfile", TEST_OCSP_ECDSA_KEY, "--x509certfile", TEST_OCSP_ECDSA_CERT,
47            "--ocsp-response", ocsp_response])
48    else:
49        ocsp_response = TEST_OCSP_RESPONSE_FILE
50
51        if ocsp == OCSP.MALFORMED:
52            ocsp_response = TEST_OCSP_ECDSA_RESPONSE_FILE
53
54        gnutls_cmd.extend(["--x509keyfile", TEST_OCSP_KEY, "--x509certfile", TEST_OCSP_CERT,
55            "--ocsp-response", ocsp_response])
56
57    if not session_tickets:
58        gnutls_cmd.append("--noticket")
59
60    # Fire up gnutls-serv
61    gnutls_serv = subprocess.Popen(gnutls_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
62
63    # Make sure it's running
64    gnutls_serv.stderr.readline()
65
66    # Fire up s2nc
67    s2nc_cipher_suite = "test_all"
68    if "ECDSA" in priority_str:
69        s2nc_cipher_suite = "test_all_ecdsa"
70
71    s2nc_cmd = ["../../bin/s2nc", "-i", "-r", "-c", s2nc_cipher_suite, str(endpoint), str(port)]
72
73    if ocsp != OCSP.DISABLED:
74        s2nc_cmd.append("-s")
75
76    s2nc = subprocess.Popen(s2nc_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
77
78    # Read it
79    found = 0
80    right_version = 0
81    for line in range(0, 50):
82        output = s2nc.stdout.readline().decode("utf-8")
83        if output.strip().startswith("Connected to"):
84            found = 1
85        if ACTUAL_VERSION_STR.format(S2N_TLS12) in output:
86            right_version = 1
87
88    gnutls_serv.kill()
89    gnutls_serv.communicate()
90    gnutls_serv.wait()
91
92    s2nc.kill()
93    s2nc.communicate()
94    s2nc.wait()
95
96    return found == 1 and right_version == 1
97
98def handshake(endpoint, port, cipher, session_tickets, ocsp):
99    success = try_gnutls_handshake(endpoint, port, cipher.gnutls_priority_str + ":+CURVE-ALL:+VERS-TLS1.2:+SIGN-ALL:+SHA1", session_tickets, ocsp)
100
101    prefix = "Cipher: %-30s Session Tickets: %-5s OCSP: %-5s ... " % (cipher.openssl_name, session_tickets, ocsp)
102
103    suffix = ""
104    if success:
105        if sys.stdout.isatty():
106            suffix = "\033[32;1mPASSED\033[0m"
107        else:
108            suffix = "PASSED"
109    else:
110        if sys.stdout.isatty():
111            suffix = "\033[31;1mFAILED\033[0m"
112        else:
113            suffix = "FAILED"
114    print(prefix + suffix)
115    return success
116
117
118def create_thread_pool():
119    threadpool_size = multiprocessing.cpu_count() * 2  #Multiply by 2 since performance improves slightly if CPU has hyperthreading
120    print("\n\tCreating ThreadPool of size: " + str(threadpool_size))
121    threadpool = ThreadPool(processes=threadpool_size)
122    return threadpool
123
124
125def main():
126    parser = argparse.ArgumentParser(description='Runs TLS server integration tests against s2nd using gnutls-cli')
127    parser.add_argument('host', help='The host for gnutls-serv to bind to')
128    parser.add_argument('port', type=int, help='The port for gnutls-serv to bind to')
129    parser.add_argument('--libcrypto', default='openssl-1.1.1', choices=S2N_LIBCRYPTO_CHOICES,
130            help="""The Libcrypto that s2n was built with. s2n supports different cipher suites depending on
131                    libcrypto version. Defaults to openssl-1.1.1.""")
132    args = parser.parse_args()
133
134    # Retrieve the test ciphers to use based on the libcrypto version s2n was built with
135    test_ciphers = S2N_LIBCRYPTO_TO_TEST_CIPHERS[args.libcrypto]
136    host = args.host
137    port = args.port
138
139    print("\nRunning GnuTLS handshake tests with: " + os.popen('gnutls-serv --version | grep -w gnutls-serv').read())
140
141    # gnutls-serv requests cient cert by default, but allows empty cert to be
142    # provided, test that this functionality work with and without session
143    # tickets for all cipher suites and handshakes with and without OCSP staple
144    threadpool = create_thread_pool()
145    port_offset = 0
146    results = []
147    for cipher in test_ciphers:
148        for session_tickets in [True, False]:
149            for ocsp in S2N_LIBCRYPTO_TO_OCSP[args.libcrypto]:
150                async_result = threadpool.apply_async(handshake, (host, port + port_offset, cipher, session_tickets, ocsp))
151                port_offset += 1
152                results.append(async_result)
153    threadpool.close()
154    threadpool.join()
155    for async_result in results:
156        if not async_result.get():
157            return -1
158
159    return 0
160
161if __name__ == "__main__":
162    sys.exit(main())
163