1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests exposure of SSL auth context"""
15
16import logging
17import pickle
18import unittest
19
20import grpc
21from grpc import _channel
22from grpc.experimental import session_cache
23import six
24
25from tests.unit import resources
26from tests.unit import test_common
27
28_REQUEST = b'\x00\x00\x00'
29_RESPONSE = b'\x00\x00\x00'
30
31_UNARY_UNARY = '/test/UnaryUnary'
32
33_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
34_CLIENT_IDS = (
35    b'*.test.google.fr',
36    b'waterzooi.test.google.be',
37    b'*.test.youtube.com',
38    b'192.168.1.3',
39)
40_ID = 'id'
41_ID_KEY = 'id_key'
42_AUTH_CTX = 'auth_ctx'
43
44_PRIVATE_KEY = resources.private_key()
45_CERTIFICATE_CHAIN = resources.certificate_chain()
46_TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
47_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
48_PROPERTY_OPTIONS = ((
49    'grpc.ssl_target_name_override',
50    _SERVER_HOST_OVERRIDE,
51),)
52
53
54def handle_unary_unary(request, servicer_context):
55    return pickle.dumps({
56        _ID: servicer_context.peer_identities(),
57        _ID_KEY: servicer_context.peer_identity_key(),
58        _AUTH_CTX: servicer_context.auth_context()
59    })
60
61
62class AuthContextTest(unittest.TestCase):
63
64    def testInsecure(self):
65        handler = grpc.method_handlers_generic_handler('test', {
66            'UnaryUnary':
67                grpc.unary_unary_rpc_method_handler(handle_unary_unary)
68        })
69        server = test_common.test_server()
70        server.add_generic_rpc_handlers((handler,))
71        port = server.add_insecure_port('[::]:0')
72        server.start()
73
74        with grpc.insecure_channel('localhost:%d' % port) as channel:
75            response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
76        server.stop(None)
77
78        auth_data = pickle.loads(response)
79        self.assertIsNone(auth_data[_ID])
80        self.assertIsNone(auth_data[_ID_KEY])
81        self.assertDictEqual({}, auth_data[_AUTH_CTX])
82
83    def testSecureNoCert(self):
84        handler = grpc.method_handlers_generic_handler('test', {
85            'UnaryUnary':
86                grpc.unary_unary_rpc_method_handler(handle_unary_unary)
87        })
88        server = test_common.test_server()
89        server.add_generic_rpc_handlers((handler,))
90        server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
91        port = server.add_secure_port('[::]:0', server_cred)
92        server.start()
93
94        channel_creds = grpc.ssl_channel_credentials(
95            root_certificates=_TEST_ROOT_CERTIFICATES)
96        channel = grpc.secure_channel('localhost:{}'.format(port),
97                                      channel_creds,
98                                      options=_PROPERTY_OPTIONS)
99        response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
100        channel.close()
101        server.stop(None)
102
103        auth_data = pickle.loads(response)
104        self.assertIsNone(auth_data[_ID])
105        self.assertIsNone(auth_data[_ID_KEY])
106        self.assertDictEqual(
107            {
108                'security_level': [b'TSI_PRIVACY_AND_INTEGRITY'],
109                'transport_security_type': [b'ssl'],
110                'ssl_session_reused': [b'false'],
111            }, auth_data[_AUTH_CTX])
112
113    def testSecureClientCert(self):
114        handler = grpc.method_handlers_generic_handler('test', {
115            'UnaryUnary':
116                grpc.unary_unary_rpc_method_handler(handle_unary_unary)
117        })
118        server = test_common.test_server()
119        server.add_generic_rpc_handlers((handler,))
120        server_cred = grpc.ssl_server_credentials(
121            _SERVER_CERTS,
122            root_certificates=_TEST_ROOT_CERTIFICATES,
123            require_client_auth=True)
124        port = server.add_secure_port('[::]:0', server_cred)
125        server.start()
126
127        channel_creds = grpc.ssl_channel_credentials(
128            root_certificates=_TEST_ROOT_CERTIFICATES,
129            private_key=_PRIVATE_KEY,
130            certificate_chain=_CERTIFICATE_CHAIN)
131        channel = grpc.secure_channel('localhost:{}'.format(port),
132                                      channel_creds,
133                                      options=_PROPERTY_OPTIONS)
134
135        response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
136        channel.close()
137        server.stop(None)
138
139        auth_data = pickle.loads(response)
140        auth_ctx = auth_data[_AUTH_CTX]
141        six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
142        self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY])
143        self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type'])
144        self.assertSequenceEqual([b'*.test.google.com'],
145                                 auth_ctx['x509_common_name'])
146
147    def _do_one_shot_client_rpc(self, channel_creds, channel_options, port,
148                                expect_ssl_session_reused):
149        channel = grpc.secure_channel('localhost:{}'.format(port),
150                                      channel_creds,
151                                      options=channel_options)
152        response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
153        auth_data = pickle.loads(response)
154        self.assertEqual(expect_ssl_session_reused,
155                         auth_data[_AUTH_CTX]['ssl_session_reused'])
156        channel.close()
157
158    def testSessionResumption(self):
159        # Set up a secure server
160        handler = grpc.method_handlers_generic_handler('test', {
161            'UnaryUnary':
162                grpc.unary_unary_rpc_method_handler(handle_unary_unary)
163        })
164        server = test_common.test_server()
165        server.add_generic_rpc_handlers((handler,))
166        server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
167        port = server.add_secure_port('[::]:0', server_cred)
168        server.start()
169
170        # Create a cache for TLS session tickets
171        cache = session_cache.ssl_session_cache_lru(1)
172        channel_creds = grpc.ssl_channel_credentials(
173            root_certificates=_TEST_ROOT_CERTIFICATES)
174        channel_options = _PROPERTY_OPTIONS + (
175            ('grpc.ssl_session_cache', cache),)
176
177        # Initial connection has no session to resume
178        self._do_one_shot_client_rpc(channel_creds,
179                                     channel_options,
180                                     port,
181                                     expect_ssl_session_reused=[b'false'])
182
183        # Subsequent connections resume sessions
184        self._do_one_shot_client_rpc(channel_creds,
185                                     channel_options,
186                                     port,
187                                     expect_ssl_session_reused=[b'true'])
188        server.stop(None)
189
190
191if __name__ == '__main__':
192    logging.basicConfig()
193    unittest.main(verbosity=2)
194