1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests experimental TLS Session Resumption API""" 15 16import pickle 17import unittest 18import logging 19 20import grpc 21from grpc import _channel 22from grpc.experimental import session_cache 23 24from tests.unit import test_common 25from tests.unit import resources 26 27_REQUEST = b'\x00\x00\x00' 28_RESPONSE = b'\x00\x00\x00' 29 30_UNARY_UNARY = '/test/UnaryUnary' 31 32_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' 33_ID = 'id' 34_ID_KEY = 'id_key' 35_AUTH_CTX = 'auth_ctx' 36 37_PRIVATE_KEY = resources.private_key() 38_CERTIFICATE_CHAIN = resources.certificate_chain() 39_TEST_ROOT_CERTIFICATES = resources.test_root_certificates() 40_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),) 41_PROPERTY_OPTIONS = (( 42 'grpc.ssl_target_name_override', 43 _SERVER_HOST_OVERRIDE, 44),) 45 46 47def handle_unary_unary(request, servicer_context): 48 return pickle.dumps({ 49 _ID: servicer_context.peer_identities(), 50 _ID_KEY: servicer_context.peer_identity_key(), 51 _AUTH_CTX: servicer_context.auth_context() 52 }) 53 54 55def start_secure_server(): 56 handler = grpc.method_handlers_generic_handler( 57 'test', 58 {'UnaryUnary': grpc.unary_unary_rpc_method_handler(handle_unary_unary)}) 59 server = test_common.test_server() 60 server.add_generic_rpc_handlers((handler,)) 61 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 62 port = server.add_secure_port('[::]:0', server_cred) 63 server.start() 64 65 return server, port 66 67 68class SSLSessionCacheTest(unittest.TestCase): 69 70 def _do_one_shot_client_rpc(self, channel_creds, channel_options, port, 71 expect_ssl_session_reused): 72 channel = grpc.secure_channel('localhost:{}'.format(port), 73 channel_creds, 74 options=channel_options) 75 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) 76 auth_data = pickle.loads(response) 77 self.assertEqual(expect_ssl_session_reused, 78 auth_data[_AUTH_CTX]['ssl_session_reused']) 79 channel.close() 80 81 def testSSLSessionCacheLRU(self): 82 server_1, port_1 = start_secure_server() 83 84 cache = session_cache.ssl_session_cache_lru(1) 85 channel_creds = grpc.ssl_channel_credentials( 86 root_certificates=_TEST_ROOT_CERTIFICATES) 87 channel_options = _PROPERTY_OPTIONS + ( 88 ('grpc.ssl_session_cache', cache),) 89 90 # Initial connection has no session to resume 91 self._do_one_shot_client_rpc(channel_creds, 92 channel_options, 93 port_1, 94 expect_ssl_session_reused=[b'false']) 95 96 # Connection to server_1 resumes from initial session 97 self._do_one_shot_client_rpc(channel_creds, 98 channel_options, 99 port_1, 100 expect_ssl_session_reused=[b'true']) 101 102 # Connection to a different server with the same name overwrites the cache entry 103 server_2, port_2 = start_secure_server() 104 self._do_one_shot_client_rpc(channel_creds, 105 channel_options, 106 port_2, 107 expect_ssl_session_reused=[b'false']) 108 self._do_one_shot_client_rpc(channel_creds, 109 channel_options, 110 port_2, 111 expect_ssl_session_reused=[b'true']) 112 server_2.stop(None) 113 114 # Connection to server_1 now falls back to full TLS handshake 115 self._do_one_shot_client_rpc(channel_creds, 116 channel_options, 117 port_1, 118 expect_ssl_session_reused=[b'false']) 119 120 # Re-creating server_1 causes old sessions to become invalid 121 server_1.stop(None) 122 server_1, port_1 = start_secure_server() 123 124 # Old sessions should no longer be valid 125 self._do_one_shot_client_rpc(channel_creds, 126 channel_options, 127 port_1, 128 expect_ssl_session_reused=[b'false']) 129 130 # Resumption should work for subsequent connections 131 self._do_one_shot_client_rpc(channel_creds, 132 channel_options, 133 port_1, 134 expect_ssl_session_reused=[b'true']) 135 server_1.stop(None) 136 137 138if __name__ == '__main__': 139 logging.basicConfig() 140 unittest.main(verbosity=2) 141