1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests exposure of SSL auth context""" 15 16import logging 17import pickle 18import unittest 19 20import grpc 21from grpc import _channel 22from grpc.experimental import session_cache 23import six 24 25from tests.unit import resources 26from tests.unit import test_common 27 28_REQUEST = b'\x00\x00\x00' 29_RESPONSE = b'\x00\x00\x00' 30 31_UNARY_UNARY = '/test/UnaryUnary' 32 33_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' 34_CLIENT_IDS = ( 35 b'*.test.google.fr', 36 b'waterzooi.test.google.be', 37 b'*.test.youtube.com', 38 b'192.168.1.3', 39) 40_ID = 'id' 41_ID_KEY = 'id_key' 42_AUTH_CTX = 'auth_ctx' 43 44_PRIVATE_KEY = resources.private_key() 45_CERTIFICATE_CHAIN = resources.certificate_chain() 46_TEST_ROOT_CERTIFICATES = resources.test_root_certificates() 47_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),) 48_PROPERTY_OPTIONS = (( 49 'grpc.ssl_target_name_override', 50 _SERVER_HOST_OVERRIDE, 51),) 52 53 54def handle_unary_unary(request, servicer_context): 55 return pickle.dumps({ 56 _ID: servicer_context.peer_identities(), 57 _ID_KEY: servicer_context.peer_identity_key(), 58 _AUTH_CTX: servicer_context.auth_context() 59 }) 60 61 62class AuthContextTest(unittest.TestCase): 63 64 def testInsecure(self): 65 handler = grpc.method_handlers_generic_handler('test', { 66 'UnaryUnary': 67 grpc.unary_unary_rpc_method_handler(handle_unary_unary) 68 }) 69 server = test_common.test_server() 70 server.add_generic_rpc_handlers((handler,)) 71 port = server.add_insecure_port('[::]:0') 72 server.start() 73 74 with grpc.insecure_channel('localhost:%d' % port) as channel: 75 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) 76 server.stop(None) 77 78 auth_data = pickle.loads(response) 79 self.assertIsNone(auth_data[_ID]) 80 self.assertIsNone(auth_data[_ID_KEY]) 81 self.assertDictEqual({}, auth_data[_AUTH_CTX]) 82 83 def testSecureNoCert(self): 84 handler = grpc.method_handlers_generic_handler('test', { 85 'UnaryUnary': 86 grpc.unary_unary_rpc_method_handler(handle_unary_unary) 87 }) 88 server = test_common.test_server() 89 server.add_generic_rpc_handlers((handler,)) 90 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 91 port = server.add_secure_port('[::]:0', server_cred) 92 server.start() 93 94 channel_creds = grpc.ssl_channel_credentials( 95 root_certificates=_TEST_ROOT_CERTIFICATES) 96 channel = grpc.secure_channel('localhost:{}'.format(port), 97 channel_creds, 98 options=_PROPERTY_OPTIONS) 99 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) 100 channel.close() 101 server.stop(None) 102 103 auth_data = pickle.loads(response) 104 self.assertIsNone(auth_data[_ID]) 105 self.assertIsNone(auth_data[_ID_KEY]) 106 self.assertDictEqual( 107 { 108 'security_level': [b'TSI_PRIVACY_AND_INTEGRITY'], 109 'transport_security_type': [b'ssl'], 110 'ssl_session_reused': [b'false'], 111 }, auth_data[_AUTH_CTX]) 112 113 def testSecureClientCert(self): 114 handler = grpc.method_handlers_generic_handler('test', { 115 'UnaryUnary': 116 grpc.unary_unary_rpc_method_handler(handle_unary_unary) 117 }) 118 server = test_common.test_server() 119 server.add_generic_rpc_handlers((handler,)) 120 server_cred = grpc.ssl_server_credentials( 121 _SERVER_CERTS, 122 root_certificates=_TEST_ROOT_CERTIFICATES, 123 require_client_auth=True) 124 port = server.add_secure_port('[::]:0', server_cred) 125 server.start() 126 127 channel_creds = grpc.ssl_channel_credentials( 128 root_certificates=_TEST_ROOT_CERTIFICATES, 129 private_key=_PRIVATE_KEY, 130 certificate_chain=_CERTIFICATE_CHAIN) 131 channel = grpc.secure_channel('localhost:{}'.format(port), 132 channel_creds, 133 options=_PROPERTY_OPTIONS) 134 135 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) 136 channel.close() 137 server.stop(None) 138 139 auth_data = pickle.loads(response) 140 auth_ctx = auth_data[_AUTH_CTX] 141 six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID]) 142 self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY]) 143 self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type']) 144 self.assertSequenceEqual([b'*.test.google.com'], 145 auth_ctx['x509_common_name']) 146 147 def _do_one_shot_client_rpc(self, channel_creds, channel_options, port, 148 expect_ssl_session_reused): 149 channel = grpc.secure_channel('localhost:{}'.format(port), 150 channel_creds, 151 options=channel_options) 152 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) 153 auth_data = pickle.loads(response) 154 self.assertEqual(expect_ssl_session_reused, 155 auth_data[_AUTH_CTX]['ssl_session_reused']) 156 channel.close() 157 158 def testSessionResumption(self): 159 # Set up a secure server 160 handler = grpc.method_handlers_generic_handler('test', { 161 'UnaryUnary': 162 grpc.unary_unary_rpc_method_handler(handle_unary_unary) 163 }) 164 server = test_common.test_server() 165 server.add_generic_rpc_handlers((handler,)) 166 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 167 port = server.add_secure_port('[::]:0', server_cred) 168 server.start() 169 170 # Create a cache for TLS session tickets 171 cache = session_cache.ssl_session_cache_lru(1) 172 channel_creds = grpc.ssl_channel_credentials( 173 root_certificates=_TEST_ROOT_CERTIFICATES) 174 channel_options = _PROPERTY_OPTIONS + ( 175 ('grpc.ssl_session_cache', cache),) 176 177 # Initial connection has no session to resume 178 self._do_one_shot_client_rpc(channel_creds, 179 channel_options, 180 port, 181 expect_ssl_session_reused=[b'false']) 182 183 # Subsequent connections resume sessions 184 self._do_one_shot_client_rpc(channel_creds, 185 channel_options, 186 port, 187 expect_ssl_session_reused=[b'true']) 188 server.stop(None) 189 190 191if __name__ == '__main__': 192 logging.basicConfig() 193 unittest.main(verbosity=2) 194