1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test of RPCs made against gRPC Python's application-layer API."""
15
16from concurrent import futures
17import itertools
18import logging
19import threading
20import unittest
21
22import grpc
23from grpc.framework.foundation import logging_pool
24
25from tests.unit import test_common
26from tests.unit._rpc_test_helpers import BaseRPCTest
27from tests.unit._rpc_test_helpers import Callback
28from tests.unit._rpc_test_helpers import TIMEOUT_SHORT
29from tests.unit._rpc_test_helpers import \
30    stream_stream_non_blocking_multi_callable
31from tests.unit._rpc_test_helpers import \
32    unary_stream_non_blocking_multi_callable
33from tests.unit._rpc_test_helpers import stream_stream_multi_callable
34from tests.unit._rpc_test_helpers import stream_unary_multi_callable
35from tests.unit._rpc_test_helpers import unary_stream_multi_callable
36from tests.unit._rpc_test_helpers import unary_unary_multi_callable
37from tests.unit.framework.common import test_constants
38
39
40@unittest.skipIf(test_common.running_under_gevent(),
41                 "Causes deadlock under gevent.")
42class RPCPart2Test(BaseRPCTest, unittest.TestCase):
43
44    def testDefaultThreadPoolIsUsed(self):
45        self._consume_one_stream_response_unary_request(
46            unary_stream_multi_callable(self._channel))
47        self.assertFalse(self._thread_pool.was_used())
48
49    def testExperimentalThreadPoolIsUsed(self):
50        self._consume_one_stream_response_unary_request(
51            unary_stream_non_blocking_multi_callable(self._channel))
52        self.assertTrue(self._thread_pool.was_used())
53
54    def testUnrecognizedMethod(self):
55        request = b'abc'
56
57        with self.assertRaises(grpc.RpcError) as exception_context:
58            self._channel.unary_unary('NoSuchMethod')(request)
59
60        self.assertEqual(grpc.StatusCode.UNIMPLEMENTED,
61                         exception_context.exception.code())
62
63    def testSuccessfulUnaryRequestBlockingUnaryResponse(self):
64        request = b'\x07\x08'
65        expected_response = self._handler.handle_unary_unary(request, None)
66
67        multi_callable = unary_unary_multi_callable(self._channel)
68        response = multi_callable(
69            request,
70            metadata=(('test', 'SuccessfulUnaryRequestBlockingUnaryResponse'),))
71
72        self.assertEqual(expected_response, response)
73
74    def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self):
75        request = b'\x07\x08'
76        expected_response = self._handler.handle_unary_unary(request, None)
77
78        multi_callable = unary_unary_multi_callable(self._channel)
79        response, call = multi_callable.with_call(
80            request,
81            metadata=(('test',
82                       'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),))
83
84        self.assertEqual(expected_response, response)
85        self.assertIs(grpc.StatusCode.OK, call.code())
86        self.assertEqual('', call.debug_error_string())
87
88    def testSuccessfulUnaryRequestFutureUnaryResponse(self):
89        request = b'\x07\x08'
90        expected_response = self._handler.handle_unary_unary(request, None)
91
92        multi_callable = unary_unary_multi_callable(self._channel)
93        response_future = multi_callable.future(
94            request,
95            metadata=(('test', 'SuccessfulUnaryRequestFutureUnaryResponse'),))
96        response = response_future.result()
97
98        self.assertIsInstance(response_future, grpc.Future)
99        self.assertIsInstance(response_future, grpc.Call)
100        self.assertEqual(expected_response, response)
101        self.assertIsNone(response_future.exception())
102        self.assertIsNone(response_future.traceback())
103
104    def testSuccessfulUnaryRequestStreamResponse(self):
105        request = b'\x37\x58'
106        expected_responses = tuple(
107            self._handler.handle_unary_stream(request, None))
108
109        multi_callable = unary_stream_multi_callable(self._channel)
110        response_iterator = multi_callable(
111            request,
112            metadata=(('test', 'SuccessfulUnaryRequestStreamResponse'),))
113        responses = tuple(response_iterator)
114
115        self.assertSequenceEqual(expected_responses, responses)
116
117    def testSuccessfulStreamRequestBlockingUnaryResponse(self):
118        requests = tuple(
119            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
120        expected_response = self._handler.handle_stream_unary(
121            iter(requests), None)
122        request_iterator = iter(requests)
123
124        multi_callable = stream_unary_multi_callable(self._channel)
125        response = multi_callable(
126            request_iterator,
127            metadata=(('test',
128                       'SuccessfulStreamRequestBlockingUnaryResponse'),))
129
130        self.assertEqual(expected_response, response)
131
132    def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self):
133        requests = tuple(
134            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
135        expected_response = self._handler.handle_stream_unary(
136            iter(requests), None)
137        request_iterator = iter(requests)
138
139        multi_callable = stream_unary_multi_callable(self._channel)
140        response, call = multi_callable.with_call(
141            request_iterator,
142            metadata=(
143                ('test',
144                 'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),))
145
146        self.assertEqual(expected_response, response)
147        self.assertIs(grpc.StatusCode.OK, call.code())
148
149    def testSuccessfulStreamRequestFutureUnaryResponse(self):
150        requests = tuple(
151            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
152        expected_response = self._handler.handle_stream_unary(
153            iter(requests), None)
154        request_iterator = iter(requests)
155
156        multi_callable = stream_unary_multi_callable(self._channel)
157        response_future = multi_callable.future(
158            request_iterator,
159            metadata=(('test', 'SuccessfulStreamRequestFutureUnaryResponse'),))
160        response = response_future.result()
161
162        self.assertEqual(expected_response, response)
163        self.assertIsNone(response_future.exception())
164        self.assertIsNone(response_future.traceback())
165
166    def testSuccessfulStreamRequestStreamResponse(self):
167        requests = tuple(
168            b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH))
169
170        expected_responses = tuple(
171            self._handler.handle_stream_stream(iter(requests), None))
172        request_iterator = iter(requests)
173
174        multi_callable = stream_stream_multi_callable(self._channel)
175        response_iterator = multi_callable(
176            request_iterator,
177            metadata=(('test', 'SuccessfulStreamRequestStreamResponse'),))
178        responses = tuple(response_iterator)
179
180        self.assertSequenceEqual(expected_responses, responses)
181
182    def testSequentialInvocations(self):
183        first_request = b'\x07\x08'
184        second_request = b'\x0809'
185        expected_first_response = self._handler.handle_unary_unary(
186            first_request, None)
187        expected_second_response = self._handler.handle_unary_unary(
188            second_request, None)
189
190        multi_callable = unary_unary_multi_callable(self._channel)
191        first_response = multi_callable(first_request,
192                                        metadata=(('test',
193                                                   'SequentialInvocations'),))
194        second_response = multi_callable(second_request,
195                                         metadata=(('test',
196                                                    'SequentialInvocations'),))
197
198        self.assertEqual(expected_first_response, first_response)
199        self.assertEqual(expected_second_response, second_response)
200
201    def testConcurrentBlockingInvocations(self):
202        pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
203        requests = tuple(
204            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
205        expected_response = self._handler.handle_stream_unary(
206            iter(requests), None)
207        expected_responses = [expected_response
208                             ] * test_constants.THREAD_CONCURRENCY
209        response_futures = [None] * test_constants.THREAD_CONCURRENCY
210
211        multi_callable = stream_unary_multi_callable(self._channel)
212        for index in range(test_constants.THREAD_CONCURRENCY):
213            request_iterator = iter(requests)
214            response_future = pool.submit(
215                multi_callable,
216                request_iterator,
217                metadata=(('test', 'ConcurrentBlockingInvocations'),))
218            response_futures[index] = response_future
219        responses = tuple(
220            response_future.result() for response_future in response_futures)
221
222        pool.shutdown(wait=True)
223        self.assertSequenceEqual(expected_responses, responses)
224
225    def testConcurrentFutureInvocations(self):
226        requests = tuple(
227            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
228        expected_response = self._handler.handle_stream_unary(
229            iter(requests), None)
230        expected_responses = [expected_response
231                             ] * test_constants.THREAD_CONCURRENCY
232        response_futures = [None] * test_constants.THREAD_CONCURRENCY
233
234        multi_callable = stream_unary_multi_callable(self._channel)
235        for index in range(test_constants.THREAD_CONCURRENCY):
236            request_iterator = iter(requests)
237            response_future = multi_callable.future(
238                request_iterator,
239                metadata=(('test', 'ConcurrentFutureInvocations'),))
240            response_futures[index] = response_future
241        responses = tuple(
242            response_future.result() for response_future in response_futures)
243
244        self.assertSequenceEqual(expected_responses, responses)
245
246    def testWaitingForSomeButNotAllConcurrentFutureInvocations(self):
247        pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
248        request = b'\x67\x68'
249        expected_response = self._handler.handle_unary_unary(request, None)
250        response_futures = [None] * test_constants.THREAD_CONCURRENCY
251        lock = threading.Lock()
252        test_is_running_cell = [True]
253
254        def wrap_future(future):
255
256            def wrap():
257                try:
258                    return future.result()
259                except grpc.RpcError:
260                    with lock:
261                        if test_is_running_cell[0]:
262                            raise
263                    return None
264
265            return wrap
266
267        multi_callable = unary_unary_multi_callable(self._channel)
268        for index in range(test_constants.THREAD_CONCURRENCY):
269            inner_response_future = multi_callable.future(
270                request,
271                metadata=(
272                    ('test',
273                     'WaitingForSomeButNotAllConcurrentFutureInvocations'),))
274            outer_response_future = pool.submit(
275                wrap_future(inner_response_future))
276            response_futures[index] = outer_response_future
277
278        some_completed_response_futures_iterator = itertools.islice(
279            futures.as_completed(response_futures),
280            test_constants.THREAD_CONCURRENCY // 2)
281        for response_future in some_completed_response_futures_iterator:
282            self.assertEqual(expected_response, response_future.result())
283        with lock:
284            test_is_running_cell[0] = False
285
286    def testConsumingOneStreamResponseUnaryRequest(self):
287        self._consume_one_stream_response_unary_request(
288            unary_stream_multi_callable(self._channel))
289
290    def testConsumingOneStreamResponseUnaryRequestNonBlocking(self):
291        self._consume_one_stream_response_unary_request(
292            unary_stream_non_blocking_multi_callable(self._channel))
293
294    def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self):
295        self._consume_some_but_not_all_stream_responses_unary_request(
296            unary_stream_multi_callable(self._channel))
297
298    def testConsumingSomeButNotAllStreamResponsesUnaryRequestNonBlocking(self):
299        self._consume_some_but_not_all_stream_responses_unary_request(
300            unary_stream_non_blocking_multi_callable(self._channel))
301
302    def testConsumingSomeButNotAllStreamResponsesStreamRequest(self):
303        self._consume_some_but_not_all_stream_responses_stream_request(
304            stream_stream_multi_callable(self._channel))
305
306    def testConsumingSomeButNotAllStreamResponsesStreamRequestNonBlocking(self):
307        self._consume_some_but_not_all_stream_responses_stream_request(
308            stream_stream_non_blocking_multi_callable(self._channel))
309
310    def testConsumingTooManyStreamResponsesStreamRequest(self):
311        self._consume_too_many_stream_responses_stream_request(
312            stream_stream_multi_callable(self._channel))
313
314    def testConsumingTooManyStreamResponsesStreamRequestNonBlocking(self):
315        self._consume_too_many_stream_responses_stream_request(
316            stream_stream_non_blocking_multi_callable(self._channel))
317
318    def testCancelledUnaryRequestUnaryResponse(self):
319        request = b'\x07\x17'
320
321        multi_callable = unary_unary_multi_callable(self._channel)
322        with self._control.pause():
323            response_future = multi_callable.future(
324                request,
325                metadata=(('test', 'CancelledUnaryRequestUnaryResponse'),))
326            response_future.cancel()
327
328        self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
329        self.assertTrue(response_future.cancelled())
330        with self.assertRaises(grpc.FutureCancelledError):
331            response_future.result()
332        with self.assertRaises(grpc.FutureCancelledError):
333            response_future.exception()
334        with self.assertRaises(grpc.FutureCancelledError):
335            response_future.traceback()
336
337    def testCancelledUnaryRequestStreamResponse(self):
338        self._cancelled_unary_request_stream_response(
339            unary_stream_multi_callable(self._channel))
340
341    def testCancelledUnaryRequestStreamResponseNonBlocking(self):
342        self._cancelled_unary_request_stream_response(
343            unary_stream_non_blocking_multi_callable(self._channel))
344
345    def testCancelledStreamRequestUnaryResponse(self):
346        requests = tuple(
347            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
348        request_iterator = iter(requests)
349
350        multi_callable = stream_unary_multi_callable(self._channel)
351        with self._control.pause():
352            response_future = multi_callable.future(
353                request_iterator,
354                metadata=(('test', 'CancelledStreamRequestUnaryResponse'),))
355            self._control.block_until_paused()
356            response_future.cancel()
357
358        self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
359        self.assertTrue(response_future.cancelled())
360        with self.assertRaises(grpc.FutureCancelledError):
361            response_future.result()
362        with self.assertRaises(grpc.FutureCancelledError):
363            response_future.exception()
364        with self.assertRaises(grpc.FutureCancelledError):
365            response_future.traceback()
366        self.assertIsNotNone(response_future.initial_metadata())
367        self.assertIsNotNone(response_future.details())
368        self.assertIsNotNone(response_future.trailing_metadata())
369
370    def testCancelledStreamRequestStreamResponse(self):
371        self._cancelled_stream_request_stream_response(
372            stream_stream_multi_callable(self._channel))
373
374    def testCancelledStreamRequestStreamResponseNonBlocking(self):
375        self._cancelled_stream_request_stream_response(
376            stream_stream_non_blocking_multi_callable(self._channel))
377
378    def testExpiredUnaryRequestBlockingUnaryResponse(self):
379        request = b'\x07\x17'
380
381        multi_callable = unary_unary_multi_callable(self._channel)
382        with self._control.pause():
383            with self.assertRaises(grpc.RpcError) as exception_context:
384                multi_callable.with_call(
385                    request,
386                    timeout=TIMEOUT_SHORT,
387                    metadata=(('test',
388                               'ExpiredUnaryRequestBlockingUnaryResponse'),))
389
390        self.assertIsInstance(exception_context.exception, grpc.Call)
391        self.assertIsNotNone(exception_context.exception.initial_metadata())
392        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
393                      exception_context.exception.code())
394        self.assertIsNotNone(exception_context.exception.details())
395        self.assertIsNotNone(exception_context.exception.trailing_metadata())
396
397    def testExpiredUnaryRequestFutureUnaryResponse(self):
398        request = b'\x07\x17'
399        callback = Callback()
400
401        multi_callable = unary_unary_multi_callable(self._channel)
402        with self._control.pause():
403            response_future = multi_callable.future(
404                request,
405                timeout=TIMEOUT_SHORT,
406                metadata=(('test', 'ExpiredUnaryRequestFutureUnaryResponse'),))
407            response_future.add_done_callback(callback)
408            value_passed_to_callback = callback.value()
409
410        self.assertIs(response_future, value_passed_to_callback)
411        self.assertIsNotNone(response_future.initial_metadata())
412        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code())
413        self.assertIsNotNone(response_future.details())
414        self.assertIsNotNone(response_future.trailing_metadata())
415        with self.assertRaises(grpc.RpcError) as exception_context:
416            response_future.result()
417        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
418                      exception_context.exception.code())
419        self.assertIsInstance(response_future.exception(), grpc.RpcError)
420        self.assertIsNotNone(response_future.traceback())
421        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
422                      response_future.exception().code())
423
424    def testExpiredUnaryRequestStreamResponse(self):
425        self._expired_unary_request_stream_response(
426            unary_stream_multi_callable(self._channel))
427
428    def testExpiredUnaryRequestStreamResponseNonBlocking(self):
429        self._expired_unary_request_stream_response(
430            unary_stream_non_blocking_multi_callable(self._channel))
431
432
433if __name__ == '__main__':
434    logging.basicConfig()
435    unittest.main(verbosity=2)
436