1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from concurrent import futures
16import time
17import unittest
18
19import grpc
20from grpc.framework.foundation import logging_pool
21import grpc_testing
22
23from tests.testing import _application_common
24from tests.testing import _application_testing_common
25from tests.testing import _client_application
26from tests.testing.proto import requests_pb2
27from tests.testing.proto import services_pb2
28from tests.unit.framework.common import test_constants
29
30
31# TODO(https://github.com/protocolbuffers/protobuf/issues/3452): Drop this skip.
32@unittest.skipIf(
33    services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
34    'Fix protobuf issue 3452!')
35class ClientTest(unittest.TestCase):
36
37    def setUp(self):
38        # In this test the client-side application under test executes in
39        # a separate thread while we retain use of the test thread to "play
40        # server".
41        self._client_execution_thread_pool = logging_pool.pool(1)
42
43        self._fake_time = grpc_testing.strict_fake_time(time.time())
44        self._real_time = grpc_testing.strict_real_time()
45        self._fake_time_channel = grpc_testing.channel(
46            services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time)
47        self._real_time_channel = grpc_testing.channel(
48            services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time)
49
50    def tearDown(self):
51        self._client_execution_thread_pool.shutdown(wait=True)
52
53    def test_successful_unary_unary(self):
54        application_future = self._client_execution_thread_pool.submit(
55            _client_application.run, _client_application.Scenario.UNARY_UNARY,
56            self._real_time_channel)
57        invocation_metadata, request, rpc = (
58            self._real_time_channel.take_unary_unary(
59                _application_testing_common.FIRST_SERVICE_UNUN))
60        rpc.send_initial_metadata(())
61        rpc.terminate(_application_common.UNARY_UNARY_RESPONSE, (),
62                      grpc.StatusCode.OK, '')
63        application_return_value = application_future.result()
64
65        self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
66        self.assertIs(application_return_value.kind,
67                      _client_application.Outcome.Kind.SATISFACTORY)
68
69    def test_successful_unary_stream(self):
70        application_future = self._client_execution_thread_pool.submit(
71            _client_application.run, _client_application.Scenario.UNARY_STREAM,
72            self._fake_time_channel)
73        invocation_metadata, request, rpc = (
74            self._fake_time_channel.take_unary_stream(
75                _application_testing_common.FIRST_SERVICE_UNSTRE))
76        rpc.send_initial_metadata(())
77        rpc.terminate((), grpc.StatusCode.OK, '')
78        application_return_value = application_future.result()
79
80        self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request)
81        self.assertIs(application_return_value.kind,
82                      _client_application.Outcome.Kind.SATISFACTORY)
83
84    def test_successful_stream_unary(self):
85        application_future = self._client_execution_thread_pool.submit(
86            _client_application.run, _client_application.Scenario.STREAM_UNARY,
87            self._real_time_channel)
88        invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
89            _application_testing_common.FIRST_SERVICE_STREUN)
90        rpc.send_initial_metadata(())
91        first_request = rpc.take_request()
92        second_request = rpc.take_request()
93        third_request = rpc.take_request()
94        rpc.requests_closed()
95        rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
96                      grpc.StatusCode.OK, '')
97        application_return_value = application_future.result()
98
99        self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
100                         first_request)
101        self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
102                         second_request)
103        self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
104                         third_request)
105        self.assertIs(application_return_value.kind,
106                      _client_application.Outcome.Kind.SATISFACTORY)
107
108    def test_successful_stream_stream(self):
109        application_future = self._client_execution_thread_pool.submit(
110            _client_application.run, _client_application.Scenario.STREAM_STREAM,
111            self._fake_time_channel)
112        invocation_metadata, rpc = self._fake_time_channel.take_stream_stream(
113            _application_testing_common.FIRST_SERVICE_STRESTRE)
114        first_request = rpc.take_request()
115        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
116        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
117        second_request = rpc.take_request()
118        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
119        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
120        rpc.requests_closed()
121        rpc.terminate((), grpc.StatusCode.OK, '')
122        application_return_value = application_future.result()
123
124        self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
125                         first_request)
126        self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
127                         second_request)
128        self.assertIs(application_return_value.kind,
129                      _client_application.Outcome.Kind.SATISFACTORY)
130
131    def test_concurrent_stream_stream(self):
132        application_future = self._client_execution_thread_pool.submit(
133            _client_application.run,
134            _client_application.Scenario.CONCURRENT_STREAM_STREAM,
135            self._real_time_channel)
136        rpcs = []
137        for _ in range(test_constants.RPC_CONCURRENCY):
138            invocation_metadata, rpc = (
139                self._real_time_channel.take_stream_stream(
140                    _application_testing_common.FIRST_SERVICE_STRESTRE))
141            rpcs.append(rpc)
142        requests = {}
143        for rpc in rpcs:
144            requests[rpc] = [rpc.take_request()]
145        for rpc in rpcs:
146            rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
147            rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
148        for rpc in rpcs:
149            requests[rpc].append(rpc.take_request())
150        for rpc in rpcs:
151            rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
152            rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
153        for rpc in rpcs:
154            rpc.requests_closed()
155        for rpc in rpcs:
156            rpc.terminate((), grpc.StatusCode.OK, '')
157        application_return_value = application_future.result()
158
159        for requests_of_one_rpc in requests.values():
160            for request in requests_of_one_rpc:
161                self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
162                                 request)
163        self.assertIs(application_return_value.kind,
164                      _client_application.Outcome.Kind.SATISFACTORY)
165
166    def test_cancelled_unary_unary(self):
167        application_future = self._client_execution_thread_pool.submit(
168            _client_application.run,
169            _client_application.Scenario.CANCEL_UNARY_UNARY,
170            self._fake_time_channel)
171        invocation_metadata, request, rpc = (
172            self._fake_time_channel.take_unary_unary(
173                _application_testing_common.FIRST_SERVICE_UNUN))
174        rpc.send_initial_metadata(())
175        rpc.cancelled()
176        application_return_value = application_future.result()
177
178        self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
179        self.assertIs(application_return_value.kind,
180                      _client_application.Outcome.Kind.SATISFACTORY)
181
182    def test_status_stream_unary(self):
183        application_future = self._client_execution_thread_pool.submit(
184            _client_application.run,
185            _client_application.Scenario.CONCURRENT_STREAM_UNARY,
186            self._fake_time_channel)
187        rpcs = tuple(
188            self._fake_time_channel.take_stream_unary(
189                _application_testing_common.FIRST_SERVICE_STREUN)[1]
190            for _ in range(test_constants.THREAD_CONCURRENCY))
191        for rpc in rpcs:
192            rpc.take_request()
193            rpc.take_request()
194            rpc.take_request()
195            rpc.requests_closed()
196            rpc.send_initial_metadata(((
197                'my_metadata_key',
198                'My Metadata Value!',
199            ),))
200        for rpc in rpcs[:-1]:
201            rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
202                          grpc.StatusCode.OK, '')
203        rpcs[-1].terminate(_application_common.STREAM_UNARY_RESPONSE, (),
204                           grpc.StatusCode.RESOURCE_EXHAUSTED,
205                           'nope; not able to handle all those RPCs!')
206        application_return_value = application_future.result()
207
208        self.assertIs(application_return_value.kind,
209                      _client_application.Outcome.Kind.UNSATISFACTORY)
210
211    def test_status_stream_stream(self):
212        code = grpc.StatusCode.DEADLINE_EXCEEDED
213        details = 'test deadline exceeded!'
214
215        application_future = self._client_execution_thread_pool.submit(
216            _client_application.run, _client_application.Scenario.STREAM_STREAM,
217            self._real_time_channel)
218        invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
219            _application_testing_common.FIRST_SERVICE_STRESTRE)
220        first_request = rpc.take_request()
221        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
222        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
223        second_request = rpc.take_request()
224        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
225        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
226        rpc.requests_closed()
227        rpc.terminate((), code, details)
228        application_return_value = application_future.result()
229
230        self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
231                         first_request)
232        self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
233                         second_request)
234        self.assertIs(application_return_value.kind,
235                      _client_application.Outcome.Kind.RPC_ERROR)
236        self.assertIs(application_return_value.code, code)
237        self.assertEqual(application_return_value.details, details)
238
239    def test_misbehaving_server_unary_unary(self):
240        application_future = self._client_execution_thread_pool.submit(
241            _client_application.run, _client_application.Scenario.UNARY_UNARY,
242            self._fake_time_channel)
243        invocation_metadata, request, rpc = (
244            self._fake_time_channel.take_unary_unary(
245                _application_testing_common.FIRST_SERVICE_UNUN))
246        rpc.send_initial_metadata(())
247        rpc.terminate(_application_common.ERRONEOUS_UNARY_UNARY_RESPONSE, (),
248                      grpc.StatusCode.OK, '')
249        application_return_value = application_future.result()
250
251        self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
252        self.assertIs(application_return_value.kind,
253                      _client_application.Outcome.Kind.UNSATISFACTORY)
254
255    def test_misbehaving_server_stream_stream(self):
256        application_future = self._client_execution_thread_pool.submit(
257            _client_application.run, _client_application.Scenario.STREAM_STREAM,
258            self._real_time_channel)
259        invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
260            _application_testing_common.FIRST_SERVICE_STRESTRE)
261        first_request = rpc.take_request()
262        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
263        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
264        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
265        second_request = rpc.take_request()
266        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
267        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
268        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
269        rpc.requests_closed()
270        rpc.terminate((), grpc.StatusCode.OK, '')
271        application_return_value = application_future.result()
272
273        self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
274                         first_request)
275        self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
276                         second_request)
277        self.assertIs(application_return_value.kind,
278                      _client_application.Outcome.Kind.UNSATISFACTORY)
279
280    def test_infinite_request_stream_real_time(self):
281        application_future = self._client_execution_thread_pool.submit(
282            _client_application.run,
283            _client_application.Scenario.INFINITE_REQUEST_STREAM,
284            self._real_time_channel)
285        invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
286            _application_testing_common.FIRST_SERVICE_STREUN)
287        rpc.send_initial_metadata(())
288        first_request = rpc.take_request()
289        second_request = rpc.take_request()
290        third_request = rpc.take_request()
291        self._real_time.sleep_for(
292            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
293        rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
294                      grpc.StatusCode.DEADLINE_EXCEEDED, '')
295        application_return_value = application_future.result()
296
297        self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
298                         first_request)
299        self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
300                         second_request)
301        self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
302                         third_request)
303        self.assertIs(application_return_value.kind,
304                      _client_application.Outcome.Kind.SATISFACTORY)
305
306
307if __name__ == '__main__':
308    unittest.main(verbosity=2)
309