1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test of gRPC Python interceptors."""
15
16import collections
17import itertools
18import threading
19import unittest
20import logging
21import os
22from concurrent import futures
23
24import grpc
25from grpc.framework.foundation import logging_pool
26
27from tests.unit import test_common
28from tests.unit.framework.common import test_constants
29from tests.unit.framework.common import test_control
30
31_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
32_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
33_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
34_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
35
36_EXCEPTION_REQUEST = b'\x09\x0a'
37
38_UNARY_UNARY = '/test/UnaryUnary'
39_UNARY_STREAM = '/test/UnaryStream'
40_STREAM_UNARY = '/test/StreamUnary'
41_STREAM_STREAM = '/test/StreamStream'
42
43
44class _ApplicationErrorStandin(Exception):
45    pass
46
47
48class _Callback(object):
49
50    def __init__(self):
51        self._condition = threading.Condition()
52        self._value = None
53        self._called = False
54
55    def __call__(self, value):
56        with self._condition:
57            self._value = value
58            self._called = True
59            self._condition.notify_all()
60
61    def value(self):
62        with self._condition:
63            while not self._called:
64                self._condition.wait()
65            return self._value
66
67
68class _Handler(object):
69
70    def __init__(self, control):
71        self._control = control
72
73    def handle_unary_unary(self, request, servicer_context):
74        self._control.control()
75        if servicer_context is not None:
76            servicer_context.set_trailing_metadata(((
77                'testkey',
78                'testvalue',
79            ),))
80        if request == _EXCEPTION_REQUEST:
81            raise _ApplicationErrorStandin()
82        return request
83
84    def handle_unary_stream(self, request, servicer_context):
85        if request == _EXCEPTION_REQUEST:
86            raise _ApplicationErrorStandin()
87        for _ in range(test_constants.STREAM_LENGTH):
88            self._control.control()
89            yield request
90        self._control.control()
91        if servicer_context is not None:
92            servicer_context.set_trailing_metadata(((
93                'testkey',
94                'testvalue',
95            ),))
96
97    def handle_stream_unary(self, request_iterator, servicer_context):
98        if servicer_context is not None:
99            servicer_context.invocation_metadata()
100        self._control.control()
101        response_elements = []
102        for request in request_iterator:
103            self._control.control()
104            response_elements.append(request)
105        self._control.control()
106        if servicer_context is not None:
107            servicer_context.set_trailing_metadata(((
108                'testkey',
109                'testvalue',
110            ),))
111        if _EXCEPTION_REQUEST in response_elements:
112            raise _ApplicationErrorStandin()
113        return b''.join(response_elements)
114
115    def handle_stream_stream(self, request_iterator, servicer_context):
116        self._control.control()
117        if servicer_context is not None:
118            servicer_context.set_trailing_metadata(((
119                'testkey',
120                'testvalue',
121            ),))
122        for request in request_iterator:
123            if request == _EXCEPTION_REQUEST:
124                raise _ApplicationErrorStandin()
125            self._control.control()
126            yield request
127        self._control.control()
128
129
130class _MethodHandler(grpc.RpcMethodHandler):
131
132    def __init__(self, request_streaming, response_streaming,
133                 request_deserializer, response_serializer, unary_unary,
134                 unary_stream, stream_unary, stream_stream):
135        self.request_streaming = request_streaming
136        self.response_streaming = response_streaming
137        self.request_deserializer = request_deserializer
138        self.response_serializer = response_serializer
139        self.unary_unary = unary_unary
140        self.unary_stream = unary_stream
141        self.stream_unary = stream_unary
142        self.stream_stream = stream_stream
143
144
145class _GenericHandler(grpc.GenericRpcHandler):
146
147    def __init__(self, handler):
148        self._handler = handler
149
150    def service(self, handler_call_details):
151        if handler_call_details.method == _UNARY_UNARY:
152            return _MethodHandler(False, False, None, None,
153                                  self._handler.handle_unary_unary, None, None,
154                                  None)
155        elif handler_call_details.method == _UNARY_STREAM:
156            return _MethodHandler(False, True, _DESERIALIZE_REQUEST,
157                                  _SERIALIZE_RESPONSE, None,
158                                  self._handler.handle_unary_stream, None, None)
159        elif handler_call_details.method == _STREAM_UNARY:
160            return _MethodHandler(True, False, _DESERIALIZE_REQUEST,
161                                  _SERIALIZE_RESPONSE, None, None,
162                                  self._handler.handle_stream_unary, None)
163        elif handler_call_details.method == _STREAM_STREAM:
164            return _MethodHandler(True, True, None, None, None, None, None,
165                                  self._handler.handle_stream_stream)
166        else:
167            return None
168
169
170def _unary_unary_multi_callable(channel):
171    return channel.unary_unary(_UNARY_UNARY)
172
173
174def _unary_stream_multi_callable(channel):
175    return channel.unary_stream(_UNARY_STREAM,
176                                request_serializer=_SERIALIZE_REQUEST,
177                                response_deserializer=_DESERIALIZE_RESPONSE)
178
179
180def _stream_unary_multi_callable(channel):
181    return channel.stream_unary(_STREAM_UNARY,
182                                request_serializer=_SERIALIZE_REQUEST,
183                                response_deserializer=_DESERIALIZE_RESPONSE)
184
185
186def _stream_stream_multi_callable(channel):
187    return channel.stream_stream(_STREAM_STREAM)
188
189
190class _ClientCallDetails(
191        collections.namedtuple(
192            '_ClientCallDetails',
193            ('method', 'timeout', 'metadata', 'credentials')),
194        grpc.ClientCallDetails):
195    pass
196
197
198class _GenericClientInterceptor(grpc.UnaryUnaryClientInterceptor,
199                                grpc.UnaryStreamClientInterceptor,
200                                grpc.StreamUnaryClientInterceptor,
201                                grpc.StreamStreamClientInterceptor):
202
203    def __init__(self, interceptor_function):
204        self._fn = interceptor_function
205
206    def intercept_unary_unary(self, continuation, client_call_details, request):
207        new_details, new_request_iterator, postprocess = self._fn(
208            client_call_details, iter((request,)), False, False)
209        response = continuation(new_details, next(new_request_iterator))
210        return postprocess(response) if postprocess else response
211
212    def intercept_unary_stream(self, continuation, client_call_details,
213                               request):
214        new_details, new_request_iterator, postprocess = self._fn(
215            client_call_details, iter((request,)), False, True)
216        response_it = continuation(new_details, new_request_iterator)
217        return postprocess(response_it) if postprocess else response_it
218
219    def intercept_stream_unary(self, continuation, client_call_details,
220                               request_iterator):
221        new_details, new_request_iterator, postprocess = self._fn(
222            client_call_details, request_iterator, True, False)
223        response = continuation(new_details, next(new_request_iterator))
224        return postprocess(response) if postprocess else response
225
226    def intercept_stream_stream(self, continuation, client_call_details,
227                                request_iterator):
228        new_details, new_request_iterator, postprocess = self._fn(
229            client_call_details, request_iterator, True, True)
230        response_it = continuation(new_details, new_request_iterator)
231        return postprocess(response_it) if postprocess else response_it
232
233
234class _LoggingInterceptor(grpc.ServerInterceptor,
235                          grpc.UnaryUnaryClientInterceptor,
236                          grpc.UnaryStreamClientInterceptor,
237                          grpc.StreamUnaryClientInterceptor,
238                          grpc.StreamStreamClientInterceptor):
239
240    def __init__(self, tag, record):
241        self.tag = tag
242        self.record = record
243
244    def intercept_service(self, continuation, handler_call_details):
245        self.record.append(self.tag + ':intercept_service')
246        return continuation(handler_call_details)
247
248    def intercept_unary_unary(self, continuation, client_call_details, request):
249        self.record.append(self.tag + ':intercept_unary_unary')
250        result = continuation(client_call_details, request)
251        assert isinstance(
252            result,
253            grpc.Call), '{} ({}) is not an instance of grpc.Call'.format(
254                result, type(result))
255        assert isinstance(
256            result,
257            grpc.Future), '{} ({}) is not an instance of grpc.Future'.format(
258                result, type(result))
259        return result
260
261    def intercept_unary_stream(self, continuation, client_call_details,
262                               request):
263        self.record.append(self.tag + ':intercept_unary_stream')
264        return continuation(client_call_details, request)
265
266    def intercept_stream_unary(self, continuation, client_call_details,
267                               request_iterator):
268        self.record.append(self.tag + ':intercept_stream_unary')
269        result = continuation(client_call_details, request_iterator)
270        assert isinstance(
271            result,
272            grpc.Call), '{} is not an instance of grpc.Call'.format(result)
273        assert isinstance(
274            result,
275            grpc.Future), '{} is not an instance of grpc.Future'.format(result)
276        return result
277
278    def intercept_stream_stream(self, continuation, client_call_details,
279                                request_iterator):
280        self.record.append(self.tag + ':intercept_stream_stream')
281        return continuation(client_call_details, request_iterator)
282
283
284class _DefectiveClientInterceptor(grpc.UnaryUnaryClientInterceptor):
285
286    def intercept_unary_unary(self, ignored_continuation,
287                              ignored_client_call_details, ignored_request):
288        raise test_control.Defect()
289
290
291def _wrap_request_iterator_stream_interceptor(wrapper):
292
293    def intercept_call(client_call_details, request_iterator, request_streaming,
294                       ignored_response_streaming):
295        if request_streaming:
296            return client_call_details, wrapper(request_iterator), None
297        else:
298            return client_call_details, request_iterator, None
299
300    return _GenericClientInterceptor(intercept_call)
301
302
303def _append_request_header_interceptor(header, value):
304
305    def intercept_call(client_call_details, request_iterator,
306                       ignored_request_streaming, ignored_response_streaming):
307        metadata = []
308        if client_call_details.metadata:
309            metadata = list(client_call_details.metadata)
310        metadata.append((
311            header,
312            value,
313        ))
314        client_call_details = _ClientCallDetails(
315            client_call_details.method, client_call_details.timeout, metadata,
316            client_call_details.credentials)
317        return client_call_details, request_iterator, None
318
319    return _GenericClientInterceptor(intercept_call)
320
321
322class _GenericServerInterceptor(grpc.ServerInterceptor):
323
324    def __init__(self, fn):
325        self._fn = fn
326
327    def intercept_service(self, continuation, handler_call_details):
328        return self._fn(continuation, handler_call_details)
329
330
331def _filter_server_interceptor(condition, interceptor):
332
333    def intercept_service(continuation, handler_call_details):
334        if condition(handler_call_details):
335            return interceptor.intercept_service(continuation,
336                                                 handler_call_details)
337        return continuation(handler_call_details)
338
339    return _GenericServerInterceptor(intercept_service)
340
341
342class InterceptorTest(unittest.TestCase):
343
344    def setUp(self):
345        self._control = test_control.PauseFailControl()
346        self._handler = _Handler(self._control)
347        self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
348
349        self._record = []
350        conditional_interceptor = _filter_server_interceptor(
351            lambda x: ('secret', '42') in x.invocation_metadata,
352            _LoggingInterceptor('s3', self._record))
353
354        self._server = grpc.server(self._server_pool,
355                                   options=(('grpc.so_reuseport', 0),),
356                                   interceptors=(
357                                       _LoggingInterceptor('s1', self._record),
358                                       conditional_interceptor,
359                                       _LoggingInterceptor('s2', self._record),
360                                   ))
361        port = self._server.add_insecure_port('[::]:0')
362        self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
363        self._server.start()
364
365        self._channel = grpc.insecure_channel('localhost:%d' % port)
366
367    def tearDown(self):
368        self._server.stop(None)
369        self._server_pool.shutdown(wait=True)
370        self._channel.close()
371
372    def testTripleRequestMessagesClientInterceptor(self):
373
374        def triple(request_iterator):
375            while True:
376                try:
377                    item = next(request_iterator)
378                    yield item
379                    yield item
380                    yield item
381                except StopIteration:
382                    break
383
384        interceptor = _wrap_request_iterator_stream_interceptor(triple)
385        channel = grpc.intercept_channel(self._channel, interceptor)
386        requests = tuple(
387            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
388
389        multi_callable = _stream_stream_multi_callable(channel)
390        response_iterator = multi_callable(
391            iter(requests),
392            metadata=(
393                ('test',
394                 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
395
396        responses = tuple(response_iterator)
397        self.assertEqual(len(responses), 3 * test_constants.STREAM_LENGTH)
398
399        multi_callable = _stream_stream_multi_callable(self._channel)
400        response_iterator = multi_callable(
401            iter(requests),
402            metadata=(
403                ('test',
404                 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
405
406        responses = tuple(response_iterator)
407        self.assertEqual(len(responses), test_constants.STREAM_LENGTH)
408
409    def testDefectiveClientInterceptor(self):
410        interceptor = _DefectiveClientInterceptor()
411        defective_channel = grpc.intercept_channel(self._channel, interceptor)
412
413        request = b'\x07\x08'
414
415        multi_callable = _unary_unary_multi_callable(defective_channel)
416        call_future = multi_callable.future(
417            request,
418            metadata=(('test',
419                       'InterceptedUnaryRequestBlockingUnaryResponse'),))
420
421        self.assertIsNotNone(call_future.exception())
422        self.assertEqual(call_future.code(), grpc.StatusCode.INTERNAL)
423
424    def testInterceptedHeaderManipulationWithServerSideVerification(self):
425        request = b'\x07\x08'
426
427        channel = grpc.intercept_channel(
428            self._channel, _append_request_header_interceptor('secret', '42'))
429        channel = grpc.intercept_channel(
430            channel, _LoggingInterceptor('c1', self._record),
431            _LoggingInterceptor('c2', self._record))
432
433        self._record[:] = []
434
435        multi_callable = _unary_unary_multi_callable(channel)
436        multi_callable.with_call(
437            request,
438            metadata=(
439                ('test',
440                 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),))
441
442        self.assertSequenceEqual(self._record, [
443            'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
444            's1:intercept_service', 's3:intercept_service',
445            's2:intercept_service'
446        ])
447
448    def testInterceptedUnaryRequestBlockingUnaryResponse(self):
449        request = b'\x07\x08'
450
451        self._record[:] = []
452
453        channel = grpc.intercept_channel(
454            self._channel, _LoggingInterceptor('c1', self._record),
455            _LoggingInterceptor('c2', self._record))
456
457        multi_callable = _unary_unary_multi_callable(channel)
458        multi_callable(
459            request,
460            metadata=(('test',
461                       'InterceptedUnaryRequestBlockingUnaryResponse'),))
462
463        self.assertSequenceEqual(self._record, [
464            'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
465            's1:intercept_service', 's2:intercept_service'
466        ])
467
468    def testInterceptedUnaryRequestBlockingUnaryResponseWithError(self):
469        request = _EXCEPTION_REQUEST
470
471        self._record[:] = []
472
473        channel = grpc.intercept_channel(
474            self._channel, _LoggingInterceptor('c1', self._record),
475            _LoggingInterceptor('c2', self._record))
476
477        multi_callable = _unary_unary_multi_callable(channel)
478        with self.assertRaises(grpc.RpcError) as exception_context:
479            multi_callable(
480                request,
481                metadata=(('test',
482                           'InterceptedUnaryRequestBlockingUnaryResponse'),))
483        exception = exception_context.exception
484        self.assertFalse(exception.cancelled())
485        self.assertFalse(exception.running())
486        self.assertTrue(exception.done())
487        with self.assertRaises(grpc.RpcError):
488            exception.result()
489        self.assertIsInstance(exception.exception(), grpc.RpcError)
490
491    def testInterceptedUnaryRequestBlockingUnaryResponseWithCall(self):
492        request = b'\x07\x08'
493
494        channel = grpc.intercept_channel(
495            self._channel, _LoggingInterceptor('c1', self._record),
496            _LoggingInterceptor('c2', self._record))
497
498        self._record[:] = []
499
500        multi_callable = _unary_unary_multi_callable(channel)
501        multi_callable.with_call(
502            request,
503            metadata=(
504                ('test',
505                 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),))
506
507        self.assertSequenceEqual(self._record, [
508            'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
509            's1:intercept_service', 's2:intercept_service'
510        ])
511
512    def testInterceptedUnaryRequestFutureUnaryResponse(self):
513        request = b'\x07\x08'
514
515        self._record[:] = []
516        channel = grpc.intercept_channel(
517            self._channel, _LoggingInterceptor('c1', self._record),
518            _LoggingInterceptor('c2', self._record))
519
520        multi_callable = _unary_unary_multi_callable(channel)
521        response_future = multi_callable.future(
522            request,
523            metadata=(('test', 'InterceptedUnaryRequestFutureUnaryResponse'),))
524        response_future.result()
525
526        self.assertSequenceEqual(self._record, [
527            'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
528            's1:intercept_service', 's2:intercept_service'
529        ])
530
531    def testInterceptedUnaryRequestStreamResponse(self):
532        request = b'\x37\x58'
533
534        self._record[:] = []
535        channel = grpc.intercept_channel(
536            self._channel, _LoggingInterceptor('c1', self._record),
537            _LoggingInterceptor('c2', self._record))
538
539        multi_callable = _unary_stream_multi_callable(channel)
540        response_iterator = multi_callable(
541            request,
542            metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),))
543        tuple(response_iterator)
544
545        self.assertSequenceEqual(self._record, [
546            'c1:intercept_unary_stream', 'c2:intercept_unary_stream',
547            's1:intercept_service', 's2:intercept_service'
548        ])
549
550    # NOTE: The single-threaded unary-stream path does not support the
551    # grpc.Future interface, so this test does not apply.
552    @unittest.skipIf(os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM"),
553                     "Not supported.")
554    def testInterceptedUnaryRequestStreamResponseWithError(self):
555        request = _EXCEPTION_REQUEST
556
557        self._record[:] = []
558        channel = grpc.intercept_channel(
559            self._channel, _LoggingInterceptor('c1', self._record),
560            _LoggingInterceptor('c2', self._record))
561
562        multi_callable = _unary_stream_multi_callable(channel)
563        response_iterator = multi_callable(
564            request,
565            metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),))
566        with self.assertRaises(grpc.RpcError) as exception_context:
567            tuple(response_iterator)
568        exception = exception_context.exception
569        self.assertFalse(exception.cancelled())
570        self.assertFalse(exception.running())
571        self.assertTrue(exception.done())
572        with self.assertRaises(grpc.RpcError):
573            exception.result()
574        self.assertIsInstance(exception.exception(), grpc.RpcError)
575
576    def testInterceptedStreamRequestBlockingUnaryResponse(self):
577        requests = tuple(
578            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
579        request_iterator = iter(requests)
580
581        self._record[:] = []
582        channel = grpc.intercept_channel(
583            self._channel, _LoggingInterceptor('c1', self._record),
584            _LoggingInterceptor('c2', self._record))
585
586        multi_callable = _stream_unary_multi_callable(channel)
587        multi_callable(
588            request_iterator,
589            metadata=(('test',
590                       'InterceptedStreamRequestBlockingUnaryResponse'),))
591
592        self.assertSequenceEqual(self._record, [
593            'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
594            's1:intercept_service', 's2:intercept_service'
595        ])
596
597    def testInterceptedStreamRequestBlockingUnaryResponseWithCall(self):
598        requests = tuple(
599            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
600        request_iterator = iter(requests)
601
602        self._record[:] = []
603        channel = grpc.intercept_channel(
604            self._channel, _LoggingInterceptor('c1', self._record),
605            _LoggingInterceptor('c2', self._record))
606
607        multi_callable = _stream_unary_multi_callable(channel)
608        multi_callable.with_call(
609            request_iterator,
610            metadata=(
611                ('test',
612                 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
613
614        self.assertSequenceEqual(self._record, [
615            'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
616            's1:intercept_service', 's2:intercept_service'
617        ])
618
619    def testInterceptedStreamRequestFutureUnaryResponse(self):
620        requests = tuple(
621            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
622        request_iterator = iter(requests)
623
624        self._record[:] = []
625        channel = grpc.intercept_channel(
626            self._channel, _LoggingInterceptor('c1', self._record),
627            _LoggingInterceptor('c2', self._record))
628
629        multi_callable = _stream_unary_multi_callable(channel)
630        response_future = multi_callable.future(
631            request_iterator,
632            metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),))
633        response_future.result()
634
635        self.assertSequenceEqual(self._record, [
636            'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
637            's1:intercept_service', 's2:intercept_service'
638        ])
639
640    def testInterceptedStreamRequestFutureUnaryResponseWithError(self):
641        requests = tuple(
642            _EXCEPTION_REQUEST for _ in range(test_constants.STREAM_LENGTH))
643        request_iterator = iter(requests)
644
645        self._record[:] = []
646        channel = grpc.intercept_channel(
647            self._channel, _LoggingInterceptor('c1', self._record),
648            _LoggingInterceptor('c2', self._record))
649
650        multi_callable = _stream_unary_multi_callable(channel)
651        response_future = multi_callable.future(
652            request_iterator,
653            metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),))
654        with self.assertRaises(grpc.RpcError) as exception_context:
655            response_future.result()
656        exception = exception_context.exception
657        self.assertFalse(exception.cancelled())
658        self.assertFalse(exception.running())
659        self.assertTrue(exception.done())
660        with self.assertRaises(grpc.RpcError):
661            exception.result()
662        self.assertIsInstance(exception.exception(), grpc.RpcError)
663
664    def testInterceptedStreamRequestStreamResponse(self):
665        requests = tuple(
666            b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH))
667        request_iterator = iter(requests)
668
669        self._record[:] = []
670        channel = grpc.intercept_channel(
671            self._channel, _LoggingInterceptor('c1', self._record),
672            _LoggingInterceptor('c2', self._record))
673
674        multi_callable = _stream_stream_multi_callable(channel)
675        response_iterator = multi_callable(
676            request_iterator,
677            metadata=(('test', 'InterceptedStreamRequestStreamResponse'),))
678        tuple(response_iterator)
679
680        self.assertSequenceEqual(self._record, [
681            'c1:intercept_stream_stream', 'c2:intercept_stream_stream',
682            's1:intercept_service', 's2:intercept_service'
683        ])
684
685    def testInterceptedStreamRequestStreamResponseWithError(self):
686        requests = tuple(
687            _EXCEPTION_REQUEST for _ in range(test_constants.STREAM_LENGTH))
688        request_iterator = iter(requests)
689
690        self._record[:] = []
691        channel = grpc.intercept_channel(
692            self._channel, _LoggingInterceptor('c1', self._record),
693            _LoggingInterceptor('c2', self._record))
694
695        multi_callable = _stream_stream_multi_callable(channel)
696        response_iterator = multi_callable(
697            request_iterator,
698            metadata=(('test', 'InterceptedStreamRequestStreamResponse'),))
699        with self.assertRaises(grpc.RpcError) as exception_context:
700            tuple(response_iterator)
701        exception = exception_context.exception
702        self.assertFalse(exception.cancelled())
703        self.assertFalse(exception.running())
704        self.assertTrue(exception.done())
705        with self.assertRaises(grpc.RpcError):
706            exception.result()
707        self.assertIsInstance(exception.exception(), grpc.RpcError)
708
709
710if __name__ == '__main__':
711    logging.basicConfig()
712    unittest.main(verbosity=2)
713