1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test of gRPC Python interceptors.""" 15 16import collections 17from concurrent import futures 18import itertools 19import logging 20import os 21import threading 22import unittest 23 24import grpc 25from grpc.framework.foundation import logging_pool 26 27from tests.unit import test_common 28from tests.unit.framework.common import test_constants 29from tests.unit.framework.common import test_control 30 31_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 32_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] 33_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 34_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] 35 36_EXCEPTION_REQUEST = b'\x09\x0a' 37 38_UNARY_UNARY = '/test/UnaryUnary' 39_UNARY_STREAM = '/test/UnaryStream' 40_STREAM_UNARY = '/test/StreamUnary' 41_STREAM_STREAM = '/test/StreamStream' 42 43 44class _ApplicationErrorStandin(Exception): 45 pass 46 47 48class _Callback(object): 49 50 def __init__(self): 51 self._condition = threading.Condition() 52 self._value = None 53 self._called = False 54 55 def __call__(self, value): 56 with self._condition: 57 self._value = value 58 self._called = True 59 self._condition.notify_all() 60 61 def value(self): 62 with self._condition: 63 while not self._called: 64 self._condition.wait() 65 return self._value 66 67 68class _Handler(object): 69 70 def __init__(self, control): 71 self._control = control 72 73 def handle_unary_unary(self, request, servicer_context): 74 self._control.control() 75 if servicer_context is not None: 76 servicer_context.set_trailing_metadata((( 77 'testkey', 78 'testvalue', 79 ),)) 80 if request == _EXCEPTION_REQUEST: 81 raise _ApplicationErrorStandin() 82 return request 83 84 def handle_unary_stream(self, request, servicer_context): 85 if request == _EXCEPTION_REQUEST: 86 raise _ApplicationErrorStandin() 87 for _ in range(test_constants.STREAM_LENGTH): 88 self._control.control() 89 yield request 90 self._control.control() 91 if servicer_context is not None: 92 servicer_context.set_trailing_metadata((( 93 'testkey', 94 'testvalue', 95 ),)) 96 97 def handle_stream_unary(self, request_iterator, servicer_context): 98 if servicer_context is not None: 99 servicer_context.invocation_metadata() 100 self._control.control() 101 response_elements = [] 102 for request in request_iterator: 103 self._control.control() 104 response_elements.append(request) 105 self._control.control() 106 if servicer_context is not None: 107 servicer_context.set_trailing_metadata((( 108 'testkey', 109 'testvalue', 110 ),)) 111 if _EXCEPTION_REQUEST in response_elements: 112 raise _ApplicationErrorStandin() 113 return b''.join(response_elements) 114 115 def handle_stream_stream(self, request_iterator, servicer_context): 116 self._control.control() 117 if servicer_context is not None: 118 servicer_context.set_trailing_metadata((( 119 'testkey', 120 'testvalue', 121 ),)) 122 for request in request_iterator: 123 if request == _EXCEPTION_REQUEST: 124 raise _ApplicationErrorStandin() 125 self._control.control() 126 yield request 127 self._control.control() 128 129 130class _MethodHandler(grpc.RpcMethodHandler): 131 132 def __init__(self, request_streaming, response_streaming, 133 request_deserializer, response_serializer, unary_unary, 134 unary_stream, stream_unary, stream_stream): 135 self.request_streaming = request_streaming 136 self.response_streaming = response_streaming 137 self.request_deserializer = request_deserializer 138 self.response_serializer = response_serializer 139 self.unary_unary = unary_unary 140 self.unary_stream = unary_stream 141 self.stream_unary = stream_unary 142 self.stream_stream = stream_stream 143 144 145class _GenericHandler(grpc.GenericRpcHandler): 146 147 def __init__(self, handler): 148 self._handler = handler 149 150 def service(self, handler_call_details): 151 if handler_call_details.method == _UNARY_UNARY: 152 return _MethodHandler(False, False, None, None, 153 self._handler.handle_unary_unary, None, None, 154 None) 155 elif handler_call_details.method == _UNARY_STREAM: 156 return _MethodHandler(False, True, _DESERIALIZE_REQUEST, 157 _SERIALIZE_RESPONSE, None, 158 self._handler.handle_unary_stream, None, None) 159 elif handler_call_details.method == _STREAM_UNARY: 160 return _MethodHandler(True, False, _DESERIALIZE_REQUEST, 161 _SERIALIZE_RESPONSE, None, None, 162 self._handler.handle_stream_unary, None) 163 elif handler_call_details.method == _STREAM_STREAM: 164 return _MethodHandler(True, True, None, None, None, None, None, 165 self._handler.handle_stream_stream) 166 else: 167 return None 168 169 170def _unary_unary_multi_callable(channel): 171 return channel.unary_unary(_UNARY_UNARY) 172 173 174def _unary_stream_multi_callable(channel): 175 return channel.unary_stream(_UNARY_STREAM, 176 request_serializer=_SERIALIZE_REQUEST, 177 response_deserializer=_DESERIALIZE_RESPONSE) 178 179 180def _stream_unary_multi_callable(channel): 181 return channel.stream_unary(_STREAM_UNARY, 182 request_serializer=_SERIALIZE_REQUEST, 183 response_deserializer=_DESERIALIZE_RESPONSE) 184 185 186def _stream_stream_multi_callable(channel): 187 return channel.stream_stream(_STREAM_STREAM) 188 189 190class _ClientCallDetails( 191 collections.namedtuple( 192 '_ClientCallDetails', 193 ('method', 'timeout', 'metadata', 'credentials')), 194 grpc.ClientCallDetails): 195 pass 196 197 198class _GenericClientInterceptor(grpc.UnaryUnaryClientInterceptor, 199 grpc.UnaryStreamClientInterceptor, 200 grpc.StreamUnaryClientInterceptor, 201 grpc.StreamStreamClientInterceptor): 202 203 def __init__(self, interceptor_function): 204 self._fn = interceptor_function 205 206 def intercept_unary_unary(self, continuation, client_call_details, request): 207 new_details, new_request_iterator, postprocess = self._fn( 208 client_call_details, iter((request,)), False, False) 209 response = continuation(new_details, next(new_request_iterator)) 210 return postprocess(response) if postprocess else response 211 212 def intercept_unary_stream(self, continuation, client_call_details, 213 request): 214 new_details, new_request_iterator, postprocess = self._fn( 215 client_call_details, iter((request,)), False, True) 216 response_it = continuation(new_details, new_request_iterator) 217 return postprocess(response_it) if postprocess else response_it 218 219 def intercept_stream_unary(self, continuation, client_call_details, 220 request_iterator): 221 new_details, new_request_iterator, postprocess = self._fn( 222 client_call_details, request_iterator, True, False) 223 response = continuation(new_details, next(new_request_iterator)) 224 return postprocess(response) if postprocess else response 225 226 def intercept_stream_stream(self, continuation, client_call_details, 227 request_iterator): 228 new_details, new_request_iterator, postprocess = self._fn( 229 client_call_details, request_iterator, True, True) 230 response_it = continuation(new_details, new_request_iterator) 231 return postprocess(response_it) if postprocess else response_it 232 233 234class _LoggingInterceptor(grpc.ServerInterceptor, 235 grpc.UnaryUnaryClientInterceptor, 236 grpc.UnaryStreamClientInterceptor, 237 grpc.StreamUnaryClientInterceptor, 238 grpc.StreamStreamClientInterceptor): 239 240 def __init__(self, tag, record): 241 self.tag = tag 242 self.record = record 243 244 def intercept_service(self, continuation, handler_call_details): 245 self.record.append(self.tag + ':intercept_service') 246 return continuation(handler_call_details) 247 248 def intercept_unary_unary(self, continuation, client_call_details, request): 249 self.record.append(self.tag + ':intercept_unary_unary') 250 result = continuation(client_call_details, request) 251 assert isinstance( 252 result, 253 grpc.Call), '{} ({}) is not an instance of grpc.Call'.format( 254 result, type(result)) 255 assert isinstance( 256 result, 257 grpc.Future), '{} ({}) is not an instance of grpc.Future'.format( 258 result, type(result)) 259 return result 260 261 def intercept_unary_stream(self, continuation, client_call_details, 262 request): 263 self.record.append(self.tag + ':intercept_unary_stream') 264 return continuation(client_call_details, request) 265 266 def intercept_stream_unary(self, continuation, client_call_details, 267 request_iterator): 268 self.record.append(self.tag + ':intercept_stream_unary') 269 result = continuation(client_call_details, request_iterator) 270 assert isinstance( 271 result, 272 grpc.Call), '{} is not an instance of grpc.Call'.format(result) 273 assert isinstance( 274 result, 275 grpc.Future), '{} is not an instance of grpc.Future'.format(result) 276 return result 277 278 def intercept_stream_stream(self, continuation, client_call_details, 279 request_iterator): 280 self.record.append(self.tag + ':intercept_stream_stream') 281 return continuation(client_call_details, request_iterator) 282 283 284class _DefectiveClientInterceptor(grpc.UnaryUnaryClientInterceptor): 285 286 def intercept_unary_unary(self, ignored_continuation, 287 ignored_client_call_details, ignored_request): 288 raise test_control.Defect() 289 290 291def _wrap_request_iterator_stream_interceptor(wrapper): 292 293 def intercept_call(client_call_details, request_iterator, request_streaming, 294 ignored_response_streaming): 295 if request_streaming: 296 return client_call_details, wrapper(request_iterator), None 297 else: 298 return client_call_details, request_iterator, None 299 300 return _GenericClientInterceptor(intercept_call) 301 302 303def _append_request_header_interceptor(header, value): 304 305 def intercept_call(client_call_details, request_iterator, 306 ignored_request_streaming, ignored_response_streaming): 307 metadata = [] 308 if client_call_details.metadata: 309 metadata = list(client_call_details.metadata) 310 metadata.append(( 311 header, 312 value, 313 )) 314 client_call_details = _ClientCallDetails( 315 client_call_details.method, client_call_details.timeout, metadata, 316 client_call_details.credentials) 317 return client_call_details, request_iterator, None 318 319 return _GenericClientInterceptor(intercept_call) 320 321 322class _GenericServerInterceptor(grpc.ServerInterceptor): 323 324 def __init__(self, fn): 325 self._fn = fn 326 327 def intercept_service(self, continuation, handler_call_details): 328 return self._fn(continuation, handler_call_details) 329 330 331def _filter_server_interceptor(condition, interceptor): 332 333 def intercept_service(continuation, handler_call_details): 334 if condition(handler_call_details): 335 return interceptor.intercept_service(continuation, 336 handler_call_details) 337 return continuation(handler_call_details) 338 339 return _GenericServerInterceptor(intercept_service) 340 341 342class InterceptorTest(unittest.TestCase): 343 344 def setUp(self): 345 self._control = test_control.PauseFailControl() 346 self._handler = _Handler(self._control) 347 self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 348 349 self._record = [] 350 conditional_interceptor = _filter_server_interceptor( 351 lambda x: ('secret', '42') in x.invocation_metadata, 352 _LoggingInterceptor('s3', self._record)) 353 354 self._server = grpc.server(self._server_pool, 355 options=(('grpc.so_reuseport', 0),), 356 interceptors=( 357 _LoggingInterceptor('s1', self._record), 358 conditional_interceptor, 359 _LoggingInterceptor('s2', self._record), 360 )) 361 port = self._server.add_insecure_port('[::]:0') 362 self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) 363 self._server.start() 364 365 self._channel = grpc.insecure_channel('localhost:%d' % port) 366 367 def tearDown(self): 368 self._server.stop(None) 369 self._server_pool.shutdown(wait=True) 370 self._channel.close() 371 372 def testTripleRequestMessagesClientInterceptor(self): 373 374 def triple(request_iterator): 375 while True: 376 try: 377 item = next(request_iterator) 378 yield item 379 yield item 380 yield item 381 except StopIteration: 382 break 383 384 interceptor = _wrap_request_iterator_stream_interceptor(triple) 385 channel = grpc.intercept_channel(self._channel, interceptor) 386 requests = tuple( 387 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 388 389 multi_callable = _stream_stream_multi_callable(channel) 390 response_iterator = multi_callable( 391 iter(requests), 392 metadata=( 393 ('test', 394 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) 395 396 responses = tuple(response_iterator) 397 self.assertEqual(len(responses), 3 * test_constants.STREAM_LENGTH) 398 399 multi_callable = _stream_stream_multi_callable(self._channel) 400 response_iterator = multi_callable( 401 iter(requests), 402 metadata=( 403 ('test', 404 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) 405 406 responses = tuple(response_iterator) 407 self.assertEqual(len(responses), test_constants.STREAM_LENGTH) 408 409 def testDefectiveClientInterceptor(self): 410 interceptor = _DefectiveClientInterceptor() 411 defective_channel = grpc.intercept_channel(self._channel, interceptor) 412 413 request = b'\x07\x08' 414 415 multi_callable = _unary_unary_multi_callable(defective_channel) 416 call_future = multi_callable.future( 417 request, 418 metadata=(('test', 419 'InterceptedUnaryRequestBlockingUnaryResponse'),)) 420 421 self.assertIsNotNone(call_future.exception()) 422 self.assertEqual(call_future.code(), grpc.StatusCode.INTERNAL) 423 424 def testInterceptedHeaderManipulationWithServerSideVerification(self): 425 request = b'\x07\x08' 426 427 channel = grpc.intercept_channel( 428 self._channel, _append_request_header_interceptor('secret', '42')) 429 channel = grpc.intercept_channel( 430 channel, _LoggingInterceptor('c1', self._record), 431 _LoggingInterceptor('c2', self._record)) 432 433 self._record[:] = [] 434 435 multi_callable = _unary_unary_multi_callable(channel) 436 multi_callable.with_call( 437 request, 438 metadata=( 439 ('test', 440 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),)) 441 442 self.assertSequenceEqual(self._record, [ 443 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', 444 's1:intercept_service', 's3:intercept_service', 445 's2:intercept_service' 446 ]) 447 448 def testInterceptedUnaryRequestBlockingUnaryResponse(self): 449 request = b'\x07\x08' 450 451 self._record[:] = [] 452 453 channel = grpc.intercept_channel( 454 self._channel, _LoggingInterceptor('c1', self._record), 455 _LoggingInterceptor('c2', self._record)) 456 457 multi_callable = _unary_unary_multi_callable(channel) 458 multi_callable( 459 request, 460 metadata=(('test', 461 'InterceptedUnaryRequestBlockingUnaryResponse'),)) 462 463 self.assertSequenceEqual(self._record, [ 464 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', 465 's1:intercept_service', 's2:intercept_service' 466 ]) 467 468 def testInterceptedUnaryRequestBlockingUnaryResponseWithError(self): 469 request = _EXCEPTION_REQUEST 470 471 self._record[:] = [] 472 473 channel = grpc.intercept_channel( 474 self._channel, _LoggingInterceptor('c1', self._record), 475 _LoggingInterceptor('c2', self._record)) 476 477 multi_callable = _unary_unary_multi_callable(channel) 478 with self.assertRaises(grpc.RpcError) as exception_context: 479 multi_callable( 480 request, 481 metadata=(('test', 482 'InterceptedUnaryRequestBlockingUnaryResponse'),)) 483 exception = exception_context.exception 484 self.assertFalse(exception.cancelled()) 485 self.assertFalse(exception.running()) 486 self.assertTrue(exception.done()) 487 with self.assertRaises(grpc.RpcError): 488 exception.result() 489 self.assertIsInstance(exception.exception(), grpc.RpcError) 490 491 def testInterceptedUnaryRequestBlockingUnaryResponseWithCall(self): 492 request = b'\x07\x08' 493 494 channel = grpc.intercept_channel( 495 self._channel, _LoggingInterceptor('c1', self._record), 496 _LoggingInterceptor('c2', self._record)) 497 498 self._record[:] = [] 499 500 multi_callable = _unary_unary_multi_callable(channel) 501 multi_callable.with_call( 502 request, 503 metadata=( 504 ('test', 505 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),)) 506 507 self.assertSequenceEqual(self._record, [ 508 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', 509 's1:intercept_service', 's2:intercept_service' 510 ]) 511 512 def testInterceptedUnaryRequestFutureUnaryResponse(self): 513 request = b'\x07\x08' 514 515 self._record[:] = [] 516 channel = grpc.intercept_channel( 517 self._channel, _LoggingInterceptor('c1', self._record), 518 _LoggingInterceptor('c2', self._record)) 519 520 multi_callable = _unary_unary_multi_callable(channel) 521 response_future = multi_callable.future( 522 request, 523 metadata=(('test', 'InterceptedUnaryRequestFutureUnaryResponse'),)) 524 response_future.result() 525 526 self.assertSequenceEqual(self._record, [ 527 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', 528 's1:intercept_service', 's2:intercept_service' 529 ]) 530 531 def testInterceptedUnaryRequestStreamResponse(self): 532 request = b'\x37\x58' 533 534 self._record[:] = [] 535 channel = grpc.intercept_channel( 536 self._channel, _LoggingInterceptor('c1', self._record), 537 _LoggingInterceptor('c2', self._record)) 538 539 multi_callable = _unary_stream_multi_callable(channel) 540 response_iterator = multi_callable( 541 request, 542 metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),)) 543 tuple(response_iterator) 544 545 self.assertSequenceEqual(self._record, [ 546 'c1:intercept_unary_stream', 'c2:intercept_unary_stream', 547 's1:intercept_service', 's2:intercept_service' 548 ]) 549 550 def testInterceptedUnaryRequestStreamResponseWithError(self): 551 request = _EXCEPTION_REQUEST 552 553 self._record[:] = [] 554 channel = grpc.intercept_channel( 555 self._channel, _LoggingInterceptor('c1', self._record), 556 _LoggingInterceptor('c2', self._record)) 557 558 multi_callable = _unary_stream_multi_callable(channel) 559 response_iterator = multi_callable( 560 request, 561 metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),)) 562 with self.assertRaises(grpc.RpcError) as exception_context: 563 tuple(response_iterator) 564 exception = exception_context.exception 565 self.assertFalse(exception.cancelled()) 566 self.assertFalse(exception.running()) 567 self.assertTrue(exception.done()) 568 with self.assertRaises(grpc.RpcError): 569 exception.result() 570 self.assertIsInstance(exception.exception(), grpc.RpcError) 571 572 def testInterceptedStreamRequestBlockingUnaryResponse(self): 573 requests = tuple( 574 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 575 request_iterator = iter(requests) 576 577 self._record[:] = [] 578 channel = grpc.intercept_channel( 579 self._channel, _LoggingInterceptor('c1', self._record), 580 _LoggingInterceptor('c2', self._record)) 581 582 multi_callable = _stream_unary_multi_callable(channel) 583 multi_callable( 584 request_iterator, 585 metadata=(('test', 586 'InterceptedStreamRequestBlockingUnaryResponse'),)) 587 588 self.assertSequenceEqual(self._record, [ 589 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', 590 's1:intercept_service', 's2:intercept_service' 591 ]) 592 593 def testInterceptedStreamRequestBlockingUnaryResponseWithCall(self): 594 requests = tuple( 595 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 596 request_iterator = iter(requests) 597 598 self._record[:] = [] 599 channel = grpc.intercept_channel( 600 self._channel, _LoggingInterceptor('c1', self._record), 601 _LoggingInterceptor('c2', self._record)) 602 603 multi_callable = _stream_unary_multi_callable(channel) 604 multi_callable.with_call( 605 request_iterator, 606 metadata=( 607 ('test', 608 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) 609 610 self.assertSequenceEqual(self._record, [ 611 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', 612 's1:intercept_service', 's2:intercept_service' 613 ]) 614 615 def testInterceptedStreamRequestFutureUnaryResponse(self): 616 requests = tuple( 617 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 618 request_iterator = iter(requests) 619 620 self._record[:] = [] 621 channel = grpc.intercept_channel( 622 self._channel, _LoggingInterceptor('c1', self._record), 623 _LoggingInterceptor('c2', self._record)) 624 625 multi_callable = _stream_unary_multi_callable(channel) 626 response_future = multi_callable.future( 627 request_iterator, 628 metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),)) 629 response_future.result() 630 631 self.assertSequenceEqual(self._record, [ 632 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', 633 's1:intercept_service', 's2:intercept_service' 634 ]) 635 636 def testInterceptedStreamRequestFutureUnaryResponseWithError(self): 637 requests = tuple( 638 _EXCEPTION_REQUEST for _ in range(test_constants.STREAM_LENGTH)) 639 request_iterator = iter(requests) 640 641 self._record[:] = [] 642 channel = grpc.intercept_channel( 643 self._channel, _LoggingInterceptor('c1', self._record), 644 _LoggingInterceptor('c2', self._record)) 645 646 multi_callable = _stream_unary_multi_callable(channel) 647 response_future = multi_callable.future( 648 request_iterator, 649 metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),)) 650 with self.assertRaises(grpc.RpcError) as exception_context: 651 response_future.result() 652 exception = exception_context.exception 653 self.assertFalse(exception.cancelled()) 654 self.assertFalse(exception.running()) 655 self.assertTrue(exception.done()) 656 with self.assertRaises(grpc.RpcError): 657 exception.result() 658 self.assertIsInstance(exception.exception(), grpc.RpcError) 659 660 def testInterceptedStreamRequestStreamResponse(self): 661 requests = tuple( 662 b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)) 663 request_iterator = iter(requests) 664 665 self._record[:] = [] 666 channel = grpc.intercept_channel( 667 self._channel, _LoggingInterceptor('c1', self._record), 668 _LoggingInterceptor('c2', self._record)) 669 670 multi_callable = _stream_stream_multi_callable(channel) 671 response_iterator = multi_callable( 672 request_iterator, 673 metadata=(('test', 'InterceptedStreamRequestStreamResponse'),)) 674 tuple(response_iterator) 675 676 self.assertSequenceEqual(self._record, [ 677 'c1:intercept_stream_stream', 'c2:intercept_stream_stream', 678 's1:intercept_service', 's2:intercept_service' 679 ]) 680 681 def testInterceptedStreamRequestStreamResponseWithError(self): 682 requests = tuple( 683 _EXCEPTION_REQUEST for _ in range(test_constants.STREAM_LENGTH)) 684 request_iterator = iter(requests) 685 686 self._record[:] = [] 687 channel = grpc.intercept_channel( 688 self._channel, _LoggingInterceptor('c1', self._record), 689 _LoggingInterceptor('c2', self._record)) 690 691 multi_callable = _stream_stream_multi_callable(channel) 692 response_iterator = multi_callable( 693 request_iterator, 694 metadata=(('test', 'InterceptedStreamRequestStreamResponse'),)) 695 with self.assertRaises(grpc.RpcError) as exception_context: 696 tuple(response_iterator) 697 exception = exception_context.exception 698 self.assertFalse(exception.cancelled()) 699 self.assertFalse(exception.running()) 700 self.assertTrue(exception.done()) 701 with self.assertRaises(grpc.RpcError): 702 exception.result() 703 self.assertIsInstance(exception.exception(), grpc.RpcError) 704 705 706if __name__ == '__main__': 707 logging.basicConfig() 708 unittest.main(verbosity=2) 709