1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test of RPCs made against gRPC Python's application-layer API.""" 15 16from concurrent import futures 17import itertools 18import logging 19import threading 20import unittest 21 22import grpc 23from grpc.framework.foundation import logging_pool 24 25from tests.unit import test_common 26from tests.unit._rpc_test_helpers import BaseRPCTest 27from tests.unit._rpc_test_helpers import Callback 28from tests.unit._rpc_test_helpers import TIMEOUT_SHORT 29from tests.unit._rpc_test_helpers import \ 30 stream_stream_non_blocking_multi_callable 31from tests.unit._rpc_test_helpers import \ 32 unary_stream_non_blocking_multi_callable 33from tests.unit._rpc_test_helpers import stream_stream_multi_callable 34from tests.unit._rpc_test_helpers import stream_unary_multi_callable 35from tests.unit._rpc_test_helpers import unary_stream_multi_callable 36from tests.unit._rpc_test_helpers import unary_unary_multi_callable 37from tests.unit.framework.common import test_constants 38 39 40@unittest.skipIf(test_common.running_under_gevent(), 41 "Causes deadlock under gevent.") 42class RPCPart2Test(BaseRPCTest, unittest.TestCase): 43 44 def testDefaultThreadPoolIsUsed(self): 45 self._consume_one_stream_response_unary_request( 46 unary_stream_multi_callable(self._channel)) 47 self.assertFalse(self._thread_pool.was_used()) 48 49 def testExperimentalThreadPoolIsUsed(self): 50 self._consume_one_stream_response_unary_request( 51 unary_stream_non_blocking_multi_callable(self._channel)) 52 self.assertTrue(self._thread_pool.was_used()) 53 54 def testUnrecognizedMethod(self): 55 request = b'abc' 56 57 with self.assertRaises(grpc.RpcError) as exception_context: 58 self._channel.unary_unary('NoSuchMethod')(request) 59 60 self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, 61 exception_context.exception.code()) 62 63 def testSuccessfulUnaryRequestBlockingUnaryResponse(self): 64 request = b'\x07\x08' 65 expected_response = self._handler.handle_unary_unary(request, None) 66 67 multi_callable = unary_unary_multi_callable(self._channel) 68 response = multi_callable( 69 request, 70 metadata=(('test', 'SuccessfulUnaryRequestBlockingUnaryResponse'),)) 71 72 self.assertEqual(expected_response, response) 73 74 def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self): 75 request = b'\x07\x08' 76 expected_response = self._handler.handle_unary_unary(request, None) 77 78 multi_callable = unary_unary_multi_callable(self._channel) 79 response, call = multi_callable.with_call( 80 request, 81 metadata=(('test', 82 'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),)) 83 84 self.assertEqual(expected_response, response) 85 self.assertIs(grpc.StatusCode.OK, call.code()) 86 self.assertEqual('', call.debug_error_string()) 87 88 def testSuccessfulUnaryRequestFutureUnaryResponse(self): 89 request = b'\x07\x08' 90 expected_response = self._handler.handle_unary_unary(request, None) 91 92 multi_callable = unary_unary_multi_callable(self._channel) 93 response_future = multi_callable.future( 94 request, 95 metadata=(('test', 'SuccessfulUnaryRequestFutureUnaryResponse'),)) 96 response = response_future.result() 97 98 self.assertIsInstance(response_future, grpc.Future) 99 self.assertIsInstance(response_future, grpc.Call) 100 self.assertEqual(expected_response, response) 101 self.assertIsNone(response_future.exception()) 102 self.assertIsNone(response_future.traceback()) 103 104 def testSuccessfulUnaryRequestStreamResponse(self): 105 request = b'\x37\x58' 106 expected_responses = tuple( 107 self._handler.handle_unary_stream(request, None)) 108 109 multi_callable = unary_stream_multi_callable(self._channel) 110 response_iterator = multi_callable( 111 request, 112 metadata=(('test', 'SuccessfulUnaryRequestStreamResponse'),)) 113 responses = tuple(response_iterator) 114 115 self.assertSequenceEqual(expected_responses, responses) 116 117 def testSuccessfulStreamRequestBlockingUnaryResponse(self): 118 requests = tuple( 119 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 120 expected_response = self._handler.handle_stream_unary( 121 iter(requests), None) 122 request_iterator = iter(requests) 123 124 multi_callable = stream_unary_multi_callable(self._channel) 125 response = multi_callable( 126 request_iterator, 127 metadata=(('test', 128 'SuccessfulStreamRequestBlockingUnaryResponse'),)) 129 130 self.assertEqual(expected_response, response) 131 132 def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self): 133 requests = tuple( 134 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 135 expected_response = self._handler.handle_stream_unary( 136 iter(requests), None) 137 request_iterator = iter(requests) 138 139 multi_callable = stream_unary_multi_callable(self._channel) 140 response, call = multi_callable.with_call( 141 request_iterator, 142 metadata=( 143 ('test', 144 'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),)) 145 146 self.assertEqual(expected_response, response) 147 self.assertIs(grpc.StatusCode.OK, call.code()) 148 149 def testSuccessfulStreamRequestFutureUnaryResponse(self): 150 requests = tuple( 151 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 152 expected_response = self._handler.handle_stream_unary( 153 iter(requests), None) 154 request_iterator = iter(requests) 155 156 multi_callable = stream_unary_multi_callable(self._channel) 157 response_future = multi_callable.future( 158 request_iterator, 159 metadata=(('test', 'SuccessfulStreamRequestFutureUnaryResponse'),)) 160 response = response_future.result() 161 162 self.assertEqual(expected_response, response) 163 self.assertIsNone(response_future.exception()) 164 self.assertIsNone(response_future.traceback()) 165 166 def testSuccessfulStreamRequestStreamResponse(self): 167 requests = tuple( 168 b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)) 169 170 expected_responses = tuple( 171 self._handler.handle_stream_stream(iter(requests), None)) 172 request_iterator = iter(requests) 173 174 multi_callable = stream_stream_multi_callable(self._channel) 175 response_iterator = multi_callable( 176 request_iterator, 177 metadata=(('test', 'SuccessfulStreamRequestStreamResponse'),)) 178 responses = tuple(response_iterator) 179 180 self.assertSequenceEqual(expected_responses, responses) 181 182 def testSequentialInvocations(self): 183 first_request = b'\x07\x08' 184 second_request = b'\x0809' 185 expected_first_response = self._handler.handle_unary_unary( 186 first_request, None) 187 expected_second_response = self._handler.handle_unary_unary( 188 second_request, None) 189 190 multi_callable = unary_unary_multi_callable(self._channel) 191 first_response = multi_callable(first_request, 192 metadata=(('test', 193 'SequentialInvocations'),)) 194 second_response = multi_callable(second_request, 195 metadata=(('test', 196 'SequentialInvocations'),)) 197 198 self.assertEqual(expected_first_response, first_response) 199 self.assertEqual(expected_second_response, second_response) 200 201 def testConcurrentBlockingInvocations(self): 202 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 203 requests = tuple( 204 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 205 expected_response = self._handler.handle_stream_unary( 206 iter(requests), None) 207 expected_responses = [expected_response 208 ] * test_constants.THREAD_CONCURRENCY 209 response_futures = [None] * test_constants.THREAD_CONCURRENCY 210 211 multi_callable = stream_unary_multi_callable(self._channel) 212 for index in range(test_constants.THREAD_CONCURRENCY): 213 request_iterator = iter(requests) 214 response_future = pool.submit( 215 multi_callable, 216 request_iterator, 217 metadata=(('test', 'ConcurrentBlockingInvocations'),)) 218 response_futures[index] = response_future 219 responses = tuple( 220 response_future.result() for response_future in response_futures) 221 222 pool.shutdown(wait=True) 223 self.assertSequenceEqual(expected_responses, responses) 224 225 def testConcurrentFutureInvocations(self): 226 requests = tuple( 227 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 228 expected_response = self._handler.handle_stream_unary( 229 iter(requests), None) 230 expected_responses = [expected_response 231 ] * test_constants.THREAD_CONCURRENCY 232 response_futures = [None] * test_constants.THREAD_CONCURRENCY 233 234 multi_callable = stream_unary_multi_callable(self._channel) 235 for index in range(test_constants.THREAD_CONCURRENCY): 236 request_iterator = iter(requests) 237 response_future = multi_callable.future( 238 request_iterator, 239 metadata=(('test', 'ConcurrentFutureInvocations'),)) 240 response_futures[index] = response_future 241 responses = tuple( 242 response_future.result() for response_future in response_futures) 243 244 self.assertSequenceEqual(expected_responses, responses) 245 246 def testWaitingForSomeButNotAllConcurrentFutureInvocations(self): 247 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 248 request = b'\x67\x68' 249 expected_response = self._handler.handle_unary_unary(request, None) 250 response_futures = [None] * test_constants.THREAD_CONCURRENCY 251 lock = threading.Lock() 252 test_is_running_cell = [True] 253 254 def wrap_future(future): 255 256 def wrap(): 257 try: 258 return future.result() 259 except grpc.RpcError: 260 with lock: 261 if test_is_running_cell[0]: 262 raise 263 return None 264 265 return wrap 266 267 multi_callable = unary_unary_multi_callable(self._channel) 268 for index in range(test_constants.THREAD_CONCURRENCY): 269 inner_response_future = multi_callable.future( 270 request, 271 metadata=( 272 ('test', 273 'WaitingForSomeButNotAllConcurrentFutureInvocations'),)) 274 outer_response_future = pool.submit( 275 wrap_future(inner_response_future)) 276 response_futures[index] = outer_response_future 277 278 some_completed_response_futures_iterator = itertools.islice( 279 futures.as_completed(response_futures), 280 test_constants.THREAD_CONCURRENCY // 2) 281 for response_future in some_completed_response_futures_iterator: 282 self.assertEqual(expected_response, response_future.result()) 283 with lock: 284 test_is_running_cell[0] = False 285 286 def testConsumingOneStreamResponseUnaryRequest(self): 287 self._consume_one_stream_response_unary_request( 288 unary_stream_multi_callable(self._channel)) 289 290 def testConsumingOneStreamResponseUnaryRequestNonBlocking(self): 291 self._consume_one_stream_response_unary_request( 292 unary_stream_non_blocking_multi_callable(self._channel)) 293 294 def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self): 295 self._consume_some_but_not_all_stream_responses_unary_request( 296 unary_stream_multi_callable(self._channel)) 297 298 def testConsumingSomeButNotAllStreamResponsesUnaryRequestNonBlocking(self): 299 self._consume_some_but_not_all_stream_responses_unary_request( 300 unary_stream_non_blocking_multi_callable(self._channel)) 301 302 def testConsumingSomeButNotAllStreamResponsesStreamRequest(self): 303 self._consume_some_but_not_all_stream_responses_stream_request( 304 stream_stream_multi_callable(self._channel)) 305 306 def testConsumingSomeButNotAllStreamResponsesStreamRequestNonBlocking(self): 307 self._consume_some_but_not_all_stream_responses_stream_request( 308 stream_stream_non_blocking_multi_callable(self._channel)) 309 310 def testConsumingTooManyStreamResponsesStreamRequest(self): 311 self._consume_too_many_stream_responses_stream_request( 312 stream_stream_multi_callable(self._channel)) 313 314 def testConsumingTooManyStreamResponsesStreamRequestNonBlocking(self): 315 self._consume_too_many_stream_responses_stream_request( 316 stream_stream_non_blocking_multi_callable(self._channel)) 317 318 def testCancelledUnaryRequestUnaryResponse(self): 319 request = b'\x07\x17' 320 321 multi_callable = unary_unary_multi_callable(self._channel) 322 with self._control.pause(): 323 response_future = multi_callable.future( 324 request, 325 metadata=(('test', 'CancelledUnaryRequestUnaryResponse'),)) 326 response_future.cancel() 327 328 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 329 self.assertTrue(response_future.cancelled()) 330 with self.assertRaises(grpc.FutureCancelledError): 331 response_future.result() 332 with self.assertRaises(grpc.FutureCancelledError): 333 response_future.exception() 334 with self.assertRaises(grpc.FutureCancelledError): 335 response_future.traceback() 336 337 def testCancelledUnaryRequestStreamResponse(self): 338 self._cancelled_unary_request_stream_response( 339 unary_stream_multi_callable(self._channel)) 340 341 def testCancelledUnaryRequestStreamResponseNonBlocking(self): 342 self._cancelled_unary_request_stream_response( 343 unary_stream_non_blocking_multi_callable(self._channel)) 344 345 def testCancelledStreamRequestUnaryResponse(self): 346 requests = tuple( 347 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 348 request_iterator = iter(requests) 349 350 multi_callable = stream_unary_multi_callable(self._channel) 351 with self._control.pause(): 352 response_future = multi_callable.future( 353 request_iterator, 354 metadata=(('test', 'CancelledStreamRequestUnaryResponse'),)) 355 self._control.block_until_paused() 356 response_future.cancel() 357 358 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 359 self.assertTrue(response_future.cancelled()) 360 with self.assertRaises(grpc.FutureCancelledError): 361 response_future.result() 362 with self.assertRaises(grpc.FutureCancelledError): 363 response_future.exception() 364 with self.assertRaises(grpc.FutureCancelledError): 365 response_future.traceback() 366 self.assertIsNotNone(response_future.initial_metadata()) 367 self.assertIsNotNone(response_future.details()) 368 self.assertIsNotNone(response_future.trailing_metadata()) 369 370 def testCancelledStreamRequestStreamResponse(self): 371 self._cancelled_stream_request_stream_response( 372 stream_stream_multi_callable(self._channel)) 373 374 def testCancelledStreamRequestStreamResponseNonBlocking(self): 375 self._cancelled_stream_request_stream_response( 376 stream_stream_non_blocking_multi_callable(self._channel)) 377 378 def testExpiredUnaryRequestBlockingUnaryResponse(self): 379 request = b'\x07\x17' 380 381 multi_callable = unary_unary_multi_callable(self._channel) 382 with self._control.pause(): 383 with self.assertRaises(grpc.RpcError) as exception_context: 384 multi_callable.with_call( 385 request, 386 timeout=TIMEOUT_SHORT, 387 metadata=(('test', 388 'ExpiredUnaryRequestBlockingUnaryResponse'),)) 389 390 self.assertIsInstance(exception_context.exception, grpc.Call) 391 self.assertIsNotNone(exception_context.exception.initial_metadata()) 392 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 393 exception_context.exception.code()) 394 self.assertIsNotNone(exception_context.exception.details()) 395 self.assertIsNotNone(exception_context.exception.trailing_metadata()) 396 397 def testExpiredUnaryRequestFutureUnaryResponse(self): 398 request = b'\x07\x17' 399 callback = Callback() 400 401 multi_callable = unary_unary_multi_callable(self._channel) 402 with self._control.pause(): 403 response_future = multi_callable.future( 404 request, 405 timeout=TIMEOUT_SHORT, 406 metadata=(('test', 'ExpiredUnaryRequestFutureUnaryResponse'),)) 407 response_future.add_done_callback(callback) 408 value_passed_to_callback = callback.value() 409 410 self.assertIs(response_future, value_passed_to_callback) 411 self.assertIsNotNone(response_future.initial_metadata()) 412 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 413 self.assertIsNotNone(response_future.details()) 414 self.assertIsNotNone(response_future.trailing_metadata()) 415 with self.assertRaises(grpc.RpcError) as exception_context: 416 response_future.result() 417 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 418 exception_context.exception.code()) 419 self.assertIsInstance(response_future.exception(), grpc.RpcError) 420 self.assertIsNotNone(response_future.traceback()) 421 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 422 response_future.exception().code()) 423 424 def testExpiredUnaryRequestStreamResponse(self): 425 self._expired_unary_request_stream_response( 426 unary_stream_multi_callable(self._channel)) 427 428 def testExpiredUnaryRequestStreamResponseNonBlocking(self): 429 self._expired_unary_request_stream_response( 430 unary_stream_non_blocking_multi_callable(self._channel)) 431 432 433if __name__ == '__main__': 434 logging.basicConfig() 435 unittest.main(verbosity=2) 436