1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test of gRPC Python interceptors.""" 15 16import collections 17import itertools 18import threading 19import unittest 20import logging 21import os 22from concurrent import futures 23 24import grpc 25from grpc.framework.foundation import logging_pool 26 27from tests.unit import test_common 28from tests.unit.framework.common import test_constants 29from tests.unit.framework.common import test_control 30 31_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 32_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] 33_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 34_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] 35 36_EXCEPTION_REQUEST = b'\x09\x0a' 37 38_UNARY_UNARY = '/test/UnaryUnary' 39_UNARY_STREAM = '/test/UnaryStream' 40_STREAM_UNARY = '/test/StreamUnary' 41_STREAM_STREAM = '/test/StreamStream' 42 43 44class _ApplicationErrorStandin(Exception): 45 pass 46 47 48class _Callback(object): 49 50 def __init__(self): 51 self._condition = threading.Condition() 52 self._value = None 53 self._called = False 54 55 def __call__(self, value): 56 with self._condition: 57 self._value = value 58 self._called = True 59 self._condition.notify_all() 60 61 def value(self): 62 with self._condition: 63 while not self._called: 64 self._condition.wait() 65 return self._value 66 67 68class _Handler(object): 69 70 def __init__(self, control): 71 self._control = control 72 73 def handle_unary_unary(self, request, servicer_context): 74 self._control.control() 75 if servicer_context is not None: 76 servicer_context.set_trailing_metadata((( 77 'testkey', 78 'testvalue', 79 ),)) 80 if request == _EXCEPTION_REQUEST: 81 raise _ApplicationErrorStandin() 82 return request 83 84 def handle_unary_stream(self, request, servicer_context): 85 if request == _EXCEPTION_REQUEST: 86 raise _ApplicationErrorStandin() 87 for _ in range(test_constants.STREAM_LENGTH): 88 self._control.control() 89 yield request 90 self._control.control() 91 if servicer_context is not None: 92 servicer_context.set_trailing_metadata((( 93 'testkey', 94 'testvalue', 95 ),)) 96 97 def handle_stream_unary(self, request_iterator, servicer_context): 98 if servicer_context is not None: 99 servicer_context.invocation_metadata() 100 self._control.control() 101 response_elements = [] 102 for request in request_iterator: 103 self._control.control() 104 response_elements.append(request) 105 self._control.control() 106 if servicer_context is not None: 107 servicer_context.set_trailing_metadata((( 108 'testkey', 109 'testvalue', 110 ),)) 111 if _EXCEPTION_REQUEST in response_elements: 112 raise _ApplicationErrorStandin() 113 return b''.join(response_elements) 114 115 def handle_stream_stream(self, request_iterator, servicer_context): 116 self._control.control() 117 if servicer_context is not None: 118 servicer_context.set_trailing_metadata((( 119 'testkey', 120 'testvalue', 121 ),)) 122 for request in request_iterator: 123 if request == _EXCEPTION_REQUEST: 124 raise _ApplicationErrorStandin() 125 self._control.control() 126 yield request 127 self._control.control() 128 129 130class _MethodHandler(grpc.RpcMethodHandler): 131 132 def __init__(self, request_streaming, response_streaming, 133 request_deserializer, response_serializer, unary_unary, 134 unary_stream, stream_unary, stream_stream): 135 self.request_streaming = request_streaming 136 self.response_streaming = response_streaming 137 self.request_deserializer = request_deserializer 138 self.response_serializer = response_serializer 139 self.unary_unary = unary_unary 140 self.unary_stream = unary_stream 141 self.stream_unary = stream_unary 142 self.stream_stream = stream_stream 143 144 145class _GenericHandler(grpc.GenericRpcHandler): 146 147 def __init__(self, handler): 148 self._handler = handler 149 150 def service(self, handler_call_details): 151 if handler_call_details.method == _UNARY_UNARY: 152 return _MethodHandler(False, False, None, None, 153 self._handler.handle_unary_unary, None, None, 154 None) 155 elif handler_call_details.method == _UNARY_STREAM: 156 return _MethodHandler(False, True, _DESERIALIZE_REQUEST, 157 _SERIALIZE_RESPONSE, None, 158 self._handler.handle_unary_stream, None, None) 159 elif handler_call_details.method == _STREAM_UNARY: 160 return _MethodHandler(True, False, _DESERIALIZE_REQUEST, 161 _SERIALIZE_RESPONSE, None, None, 162 self._handler.handle_stream_unary, None) 163 elif handler_call_details.method == _STREAM_STREAM: 164 return _MethodHandler(True, True, None, None, None, None, None, 165 self._handler.handle_stream_stream) 166 else: 167 return None 168 169 170def _unary_unary_multi_callable(channel): 171 return channel.unary_unary(_UNARY_UNARY) 172 173 174def _unary_stream_multi_callable(channel): 175 return channel.unary_stream(_UNARY_STREAM, 176 request_serializer=_SERIALIZE_REQUEST, 177 response_deserializer=_DESERIALIZE_RESPONSE) 178 179 180def _stream_unary_multi_callable(channel): 181 return channel.stream_unary(_STREAM_UNARY, 182 request_serializer=_SERIALIZE_REQUEST, 183 response_deserializer=_DESERIALIZE_RESPONSE) 184 185 186def _stream_stream_multi_callable(channel): 187 return channel.stream_stream(_STREAM_STREAM) 188 189 190class _ClientCallDetails( 191 collections.namedtuple( 192 '_ClientCallDetails', 193 ('method', 'timeout', 'metadata', 'credentials')), 194 grpc.ClientCallDetails): 195 pass 196 197 198class _GenericClientInterceptor(grpc.UnaryUnaryClientInterceptor, 199 grpc.UnaryStreamClientInterceptor, 200 grpc.StreamUnaryClientInterceptor, 201 grpc.StreamStreamClientInterceptor): 202 203 def __init__(self, interceptor_function): 204 self._fn = interceptor_function 205 206 def intercept_unary_unary(self, continuation, client_call_details, request): 207 new_details, new_request_iterator, postprocess = self._fn( 208 client_call_details, iter((request,)), False, False) 209 response = continuation(new_details, next(new_request_iterator)) 210 return postprocess(response) if postprocess else response 211 212 def intercept_unary_stream(self, continuation, client_call_details, 213 request): 214 new_details, new_request_iterator, postprocess = self._fn( 215 client_call_details, iter((request,)), False, True) 216 response_it = continuation(new_details, new_request_iterator) 217 return postprocess(response_it) if postprocess else response_it 218 219 def intercept_stream_unary(self, continuation, client_call_details, 220 request_iterator): 221 new_details, new_request_iterator, postprocess = self._fn( 222 client_call_details, request_iterator, True, False) 223 response = continuation(new_details, next(new_request_iterator)) 224 return postprocess(response) if postprocess else response 225 226 def intercept_stream_stream(self, continuation, client_call_details, 227 request_iterator): 228 new_details, new_request_iterator, postprocess = self._fn( 229 client_call_details, request_iterator, True, True) 230 response_it = continuation(new_details, new_request_iterator) 231 return postprocess(response_it) if postprocess else response_it 232 233 234class _LoggingInterceptor(grpc.ServerInterceptor, 235 grpc.UnaryUnaryClientInterceptor, 236 grpc.UnaryStreamClientInterceptor, 237 grpc.StreamUnaryClientInterceptor, 238 grpc.StreamStreamClientInterceptor): 239 240 def __init__(self, tag, record): 241 self.tag = tag 242 self.record = record 243 244 def intercept_service(self, continuation, handler_call_details): 245 self.record.append(self.tag + ':intercept_service') 246 return continuation(handler_call_details) 247 248 def intercept_unary_unary(self, continuation, client_call_details, request): 249 self.record.append(self.tag + ':intercept_unary_unary') 250 result = continuation(client_call_details, request) 251 assert isinstance( 252 result, 253 grpc.Call), '{} ({}) is not an instance of grpc.Call'.format( 254 result, type(result)) 255 assert isinstance( 256 result, 257 grpc.Future), '{} ({}) is not an instance of grpc.Future'.format( 258 result, type(result)) 259 return result 260 261 def intercept_unary_stream(self, continuation, client_call_details, 262 request): 263 self.record.append(self.tag + ':intercept_unary_stream') 264 return continuation(client_call_details, request) 265 266 def intercept_stream_unary(self, continuation, client_call_details, 267 request_iterator): 268 self.record.append(self.tag + ':intercept_stream_unary') 269 result = continuation(client_call_details, request_iterator) 270 assert isinstance( 271 result, 272 grpc.Call), '{} is not an instance of grpc.Call'.format(result) 273 assert isinstance( 274 result, 275 grpc.Future), '{} is not an instance of grpc.Future'.format(result) 276 return result 277 278 def intercept_stream_stream(self, continuation, client_call_details, 279 request_iterator): 280 self.record.append(self.tag + ':intercept_stream_stream') 281 return continuation(client_call_details, request_iterator) 282 283 284class _DefectiveClientInterceptor(grpc.UnaryUnaryClientInterceptor): 285 286 def intercept_unary_unary(self, ignored_continuation, 287 ignored_client_call_details, ignored_request): 288 raise test_control.Defect() 289 290 291def _wrap_request_iterator_stream_interceptor(wrapper): 292 293 def intercept_call(client_call_details, request_iterator, request_streaming, 294 ignored_response_streaming): 295 if request_streaming: 296 return client_call_details, wrapper(request_iterator), None 297 else: 298 return client_call_details, request_iterator, None 299 300 return _GenericClientInterceptor(intercept_call) 301 302 303def _append_request_header_interceptor(header, value): 304 305 def intercept_call(client_call_details, request_iterator, 306 ignored_request_streaming, ignored_response_streaming): 307 metadata = [] 308 if client_call_details.metadata: 309 metadata = list(client_call_details.metadata) 310 metadata.append(( 311 header, 312 value, 313 )) 314 client_call_details = _ClientCallDetails( 315 client_call_details.method, client_call_details.timeout, metadata, 316 client_call_details.credentials) 317 return client_call_details, request_iterator, None 318 319 return _GenericClientInterceptor(intercept_call) 320 321 322class _GenericServerInterceptor(grpc.ServerInterceptor): 323 324 def __init__(self, fn): 325 self._fn = fn 326 327 def intercept_service(self, continuation, handler_call_details): 328 return self._fn(continuation, handler_call_details) 329 330 331def _filter_server_interceptor(condition, interceptor): 332 333 def intercept_service(continuation, handler_call_details): 334 if condition(handler_call_details): 335 return interceptor.intercept_service(continuation, 336 handler_call_details) 337 return continuation(handler_call_details) 338 339 return _GenericServerInterceptor(intercept_service) 340 341 342class InterceptorTest(unittest.TestCase): 343 344 def setUp(self): 345 self._control = test_control.PauseFailControl() 346 self._handler = _Handler(self._control) 347 self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 348 349 self._record = [] 350 conditional_interceptor = _filter_server_interceptor( 351 lambda x: ('secret', '42') in x.invocation_metadata, 352 _LoggingInterceptor('s3', self._record)) 353 354 self._server = grpc.server(self._server_pool, 355 options=(('grpc.so_reuseport', 0),), 356 interceptors=( 357 _LoggingInterceptor('s1', self._record), 358 conditional_interceptor, 359 _LoggingInterceptor('s2', self._record), 360 )) 361 port = self._server.add_insecure_port('[::]:0') 362 self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) 363 self._server.start() 364 365 self._channel = grpc.insecure_channel('localhost:%d' % port) 366 367 def tearDown(self): 368 self._server.stop(None) 369 self._server_pool.shutdown(wait=True) 370 self._channel.close() 371 372 def testTripleRequestMessagesClientInterceptor(self): 373 374 def triple(request_iterator): 375 while True: 376 try: 377 item = next(request_iterator) 378 yield item 379 yield item 380 yield item 381 except StopIteration: 382 break 383 384 interceptor = _wrap_request_iterator_stream_interceptor(triple) 385 channel = grpc.intercept_channel(self._channel, interceptor) 386 requests = tuple( 387 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 388 389 multi_callable = _stream_stream_multi_callable(channel) 390 response_iterator = multi_callable( 391 iter(requests), 392 metadata=( 393 ('test', 394 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) 395 396 responses = tuple(response_iterator) 397 self.assertEqual(len(responses), 3 * test_constants.STREAM_LENGTH) 398 399 multi_callable = _stream_stream_multi_callable(self._channel) 400 response_iterator = multi_callable( 401 iter(requests), 402 metadata=( 403 ('test', 404 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) 405 406 responses = tuple(response_iterator) 407 self.assertEqual(len(responses), test_constants.STREAM_LENGTH) 408 409 def testDefectiveClientInterceptor(self): 410 interceptor = _DefectiveClientInterceptor() 411 defective_channel = grpc.intercept_channel(self._channel, interceptor) 412 413 request = b'\x07\x08' 414 415 multi_callable = _unary_unary_multi_callable(defective_channel) 416 call_future = multi_callable.future( 417 request, 418 metadata=(('test', 419 'InterceptedUnaryRequestBlockingUnaryResponse'),)) 420 421 self.assertIsNotNone(call_future.exception()) 422 self.assertEqual(call_future.code(), grpc.StatusCode.INTERNAL) 423 424 def testInterceptedHeaderManipulationWithServerSideVerification(self): 425 request = b'\x07\x08' 426 427 channel = grpc.intercept_channel( 428 self._channel, _append_request_header_interceptor('secret', '42')) 429 channel = grpc.intercept_channel( 430 channel, _LoggingInterceptor('c1', self._record), 431 _LoggingInterceptor('c2', self._record)) 432 433 self._record[:] = [] 434 435 multi_callable = _unary_unary_multi_callable(channel) 436 multi_callable.with_call( 437 request, 438 metadata=( 439 ('test', 440 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),)) 441 442 self.assertSequenceEqual(self._record, [ 443 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', 444 's1:intercept_service', 's3:intercept_service', 445 's2:intercept_service' 446 ]) 447 448 def testInterceptedUnaryRequestBlockingUnaryResponse(self): 449 request = b'\x07\x08' 450 451 self._record[:] = [] 452 453 channel = grpc.intercept_channel( 454 self._channel, _LoggingInterceptor('c1', self._record), 455 _LoggingInterceptor('c2', self._record)) 456 457 multi_callable = _unary_unary_multi_callable(channel) 458 multi_callable( 459 request, 460 metadata=(('test', 461 'InterceptedUnaryRequestBlockingUnaryResponse'),)) 462 463 self.assertSequenceEqual(self._record, [ 464 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', 465 's1:intercept_service', 's2:intercept_service' 466 ]) 467 468 def testInterceptedUnaryRequestBlockingUnaryResponseWithError(self): 469 request = _EXCEPTION_REQUEST 470 471 self._record[:] = [] 472 473 channel = grpc.intercept_channel( 474 self._channel, _LoggingInterceptor('c1', self._record), 475 _LoggingInterceptor('c2', self._record)) 476 477 multi_callable = _unary_unary_multi_callable(channel) 478 with self.assertRaises(grpc.RpcError) as exception_context: 479 multi_callable( 480 request, 481 metadata=(('test', 482 'InterceptedUnaryRequestBlockingUnaryResponse'),)) 483 exception = exception_context.exception 484 self.assertFalse(exception.cancelled()) 485 self.assertFalse(exception.running()) 486 self.assertTrue(exception.done()) 487 with self.assertRaises(grpc.RpcError): 488 exception.result() 489 self.assertIsInstance(exception.exception(), grpc.RpcError) 490 491 def testInterceptedUnaryRequestBlockingUnaryResponseWithCall(self): 492 request = b'\x07\x08' 493 494 channel = grpc.intercept_channel( 495 self._channel, _LoggingInterceptor('c1', self._record), 496 _LoggingInterceptor('c2', self._record)) 497 498 self._record[:] = [] 499 500 multi_callable = _unary_unary_multi_callable(channel) 501 multi_callable.with_call( 502 request, 503 metadata=( 504 ('test', 505 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),)) 506 507 self.assertSequenceEqual(self._record, [ 508 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', 509 's1:intercept_service', 's2:intercept_service' 510 ]) 511 512 def testInterceptedUnaryRequestFutureUnaryResponse(self): 513 request = b'\x07\x08' 514 515 self._record[:] = [] 516 channel = grpc.intercept_channel( 517 self._channel, _LoggingInterceptor('c1', self._record), 518 _LoggingInterceptor('c2', self._record)) 519 520 multi_callable = _unary_unary_multi_callable(channel) 521 response_future = multi_callable.future( 522 request, 523 metadata=(('test', 'InterceptedUnaryRequestFutureUnaryResponse'),)) 524 response_future.result() 525 526 self.assertSequenceEqual(self._record, [ 527 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', 528 's1:intercept_service', 's2:intercept_service' 529 ]) 530 531 def testInterceptedUnaryRequestStreamResponse(self): 532 request = b'\x37\x58' 533 534 self._record[:] = [] 535 channel = grpc.intercept_channel( 536 self._channel, _LoggingInterceptor('c1', self._record), 537 _LoggingInterceptor('c2', self._record)) 538 539 multi_callable = _unary_stream_multi_callable(channel) 540 response_iterator = multi_callable( 541 request, 542 metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),)) 543 tuple(response_iterator) 544 545 self.assertSequenceEqual(self._record, [ 546 'c1:intercept_unary_stream', 'c2:intercept_unary_stream', 547 's1:intercept_service', 's2:intercept_service' 548 ]) 549 550 # NOTE: The single-threaded unary-stream path does not support the 551 # grpc.Future interface, so this test does not apply. 552 @unittest.skipIf(os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM"), 553 "Not supported.") 554 def testInterceptedUnaryRequestStreamResponseWithError(self): 555 request = _EXCEPTION_REQUEST 556 557 self._record[:] = [] 558 channel = grpc.intercept_channel( 559 self._channel, _LoggingInterceptor('c1', self._record), 560 _LoggingInterceptor('c2', self._record)) 561 562 multi_callable = _unary_stream_multi_callable(channel) 563 response_iterator = multi_callable( 564 request, 565 metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),)) 566 with self.assertRaises(grpc.RpcError) as exception_context: 567 tuple(response_iterator) 568 exception = exception_context.exception 569 self.assertFalse(exception.cancelled()) 570 self.assertFalse(exception.running()) 571 self.assertTrue(exception.done()) 572 with self.assertRaises(grpc.RpcError): 573 exception.result() 574 self.assertIsInstance(exception.exception(), grpc.RpcError) 575 576 def testInterceptedStreamRequestBlockingUnaryResponse(self): 577 requests = tuple( 578 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 579 request_iterator = iter(requests) 580 581 self._record[:] = [] 582 channel = grpc.intercept_channel( 583 self._channel, _LoggingInterceptor('c1', self._record), 584 _LoggingInterceptor('c2', self._record)) 585 586 multi_callable = _stream_unary_multi_callable(channel) 587 multi_callable( 588 request_iterator, 589 metadata=(('test', 590 'InterceptedStreamRequestBlockingUnaryResponse'),)) 591 592 self.assertSequenceEqual(self._record, [ 593 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', 594 's1:intercept_service', 's2:intercept_service' 595 ]) 596 597 def testInterceptedStreamRequestBlockingUnaryResponseWithCall(self): 598 requests = tuple( 599 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 600 request_iterator = iter(requests) 601 602 self._record[:] = [] 603 channel = grpc.intercept_channel( 604 self._channel, _LoggingInterceptor('c1', self._record), 605 _LoggingInterceptor('c2', self._record)) 606 607 multi_callable = _stream_unary_multi_callable(channel) 608 multi_callable.with_call( 609 request_iterator, 610 metadata=( 611 ('test', 612 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) 613 614 self.assertSequenceEqual(self._record, [ 615 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', 616 's1:intercept_service', 's2:intercept_service' 617 ]) 618 619 def testInterceptedStreamRequestFutureUnaryResponse(self): 620 requests = tuple( 621 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 622 request_iterator = iter(requests) 623 624 self._record[:] = [] 625 channel = grpc.intercept_channel( 626 self._channel, _LoggingInterceptor('c1', self._record), 627 _LoggingInterceptor('c2', self._record)) 628 629 multi_callable = _stream_unary_multi_callable(channel) 630 response_future = multi_callable.future( 631 request_iterator, 632 metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),)) 633 response_future.result() 634 635 self.assertSequenceEqual(self._record, [ 636 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', 637 's1:intercept_service', 's2:intercept_service' 638 ]) 639 640 def testInterceptedStreamRequestFutureUnaryResponseWithError(self): 641 requests = tuple( 642 _EXCEPTION_REQUEST for _ in range(test_constants.STREAM_LENGTH)) 643 request_iterator = iter(requests) 644 645 self._record[:] = [] 646 channel = grpc.intercept_channel( 647 self._channel, _LoggingInterceptor('c1', self._record), 648 _LoggingInterceptor('c2', self._record)) 649 650 multi_callable = _stream_unary_multi_callable(channel) 651 response_future = multi_callable.future( 652 request_iterator, 653 metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),)) 654 with self.assertRaises(grpc.RpcError) as exception_context: 655 response_future.result() 656 exception = exception_context.exception 657 self.assertFalse(exception.cancelled()) 658 self.assertFalse(exception.running()) 659 self.assertTrue(exception.done()) 660 with self.assertRaises(grpc.RpcError): 661 exception.result() 662 self.assertIsInstance(exception.exception(), grpc.RpcError) 663 664 def testInterceptedStreamRequestStreamResponse(self): 665 requests = tuple( 666 b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)) 667 request_iterator = iter(requests) 668 669 self._record[:] = [] 670 channel = grpc.intercept_channel( 671 self._channel, _LoggingInterceptor('c1', self._record), 672 _LoggingInterceptor('c2', self._record)) 673 674 multi_callable = _stream_stream_multi_callable(channel) 675 response_iterator = multi_callable( 676 request_iterator, 677 metadata=(('test', 'InterceptedStreamRequestStreamResponse'),)) 678 tuple(response_iterator) 679 680 self.assertSequenceEqual(self._record, [ 681 'c1:intercept_stream_stream', 'c2:intercept_stream_stream', 682 's1:intercept_service', 's2:intercept_service' 683 ]) 684 685 def testInterceptedStreamRequestStreamResponseWithError(self): 686 requests = tuple( 687 _EXCEPTION_REQUEST for _ in range(test_constants.STREAM_LENGTH)) 688 request_iterator = iter(requests) 689 690 self._record[:] = [] 691 channel = grpc.intercept_channel( 692 self._channel, _LoggingInterceptor('c1', self._record), 693 _LoggingInterceptor('c2', self._record)) 694 695 multi_callable = _stream_stream_multi_callable(channel) 696 response_iterator = multi_callable( 697 request_iterator, 698 metadata=(('test', 'InterceptedStreamRequestStreamResponse'),)) 699 with self.assertRaises(grpc.RpcError) as exception_context: 700 tuple(response_iterator) 701 exception = exception_context.exception 702 self.assertFalse(exception.cancelled()) 703 self.assertFalse(exception.running()) 704 self.assertTrue(exception.done()) 705 with self.assertRaises(grpc.RpcError): 706 exception.result() 707 self.assertIsInstance(exception.exception(), grpc.RpcError) 708 709 710if __name__ == '__main__': 711 logging.basicConfig() 712 unittest.main(verbosity=2) 713