1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test of gRPC Python interceptors."""
15
16import collections
17from concurrent import futures
18import itertools
19import logging
20import os
21import threading
22import unittest
23
24import grpc
25from grpc.framework.foundation import logging_pool
26
27from tests.unit import test_common
28from tests.unit.framework.common import test_constants
29from tests.unit.framework.common import test_control
30
31_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
32_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
33_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
34_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
35
36_EXCEPTION_REQUEST = b'\x09\x0a'
37
38_UNARY_UNARY = '/test/UnaryUnary'
39_UNARY_STREAM = '/test/UnaryStream'
40_STREAM_UNARY = '/test/StreamUnary'
41_STREAM_STREAM = '/test/StreamStream'
42
43
44class _ApplicationErrorStandin(Exception):
45    pass
46
47
48class _Callback(object):
49
50    def __init__(self):
51        self._condition = threading.Condition()
52        self._value = None
53        self._called = False
54
55    def __call__(self, value):
56        with self._condition:
57            self._value = value
58            self._called = True
59            self._condition.notify_all()
60
61    def value(self):
62        with self._condition:
63            while not self._called:
64                self._condition.wait()
65            return self._value
66
67
68class _Handler(object):
69
70    def __init__(self, control):
71        self._control = control
72
73    def handle_unary_unary(self, request, servicer_context):
74        self._control.control()
75        if servicer_context is not None:
76            servicer_context.set_trailing_metadata(((
77                'testkey',
78                'testvalue',
79            ),))
80        if request == _EXCEPTION_REQUEST:
81            raise _ApplicationErrorStandin()
82        return request
83
84    def handle_unary_stream(self, request, servicer_context):
85        if request == _EXCEPTION_REQUEST:
86            raise _ApplicationErrorStandin()
87        for _ in range(test_constants.STREAM_LENGTH):
88            self._control.control()
89            yield request
90        self._control.control()
91        if servicer_context is not None:
92            servicer_context.set_trailing_metadata(((
93                'testkey',
94                'testvalue',
95            ),))
96
97    def handle_stream_unary(self, request_iterator, servicer_context):
98        if servicer_context is not None:
99            servicer_context.invocation_metadata()
100        self._control.control()
101        response_elements = []
102        for request in request_iterator:
103            self._control.control()
104            response_elements.append(request)
105        self._control.control()
106        if servicer_context is not None:
107            servicer_context.set_trailing_metadata(((
108                'testkey',
109                'testvalue',
110            ),))
111        if _EXCEPTION_REQUEST in response_elements:
112            raise _ApplicationErrorStandin()
113        return b''.join(response_elements)
114
115    def handle_stream_stream(self, request_iterator, servicer_context):
116        self._control.control()
117        if servicer_context is not None:
118            servicer_context.set_trailing_metadata(((
119                'testkey',
120                'testvalue',
121            ),))
122        for request in request_iterator:
123            if request == _EXCEPTION_REQUEST:
124                raise _ApplicationErrorStandin()
125            self._control.control()
126            yield request
127        self._control.control()
128
129
130class _MethodHandler(grpc.RpcMethodHandler):
131
132    def __init__(self, request_streaming, response_streaming,
133                 request_deserializer, response_serializer, unary_unary,
134                 unary_stream, stream_unary, stream_stream):
135        self.request_streaming = request_streaming
136        self.response_streaming = response_streaming
137        self.request_deserializer = request_deserializer
138        self.response_serializer = response_serializer
139        self.unary_unary = unary_unary
140        self.unary_stream = unary_stream
141        self.stream_unary = stream_unary
142        self.stream_stream = stream_stream
143
144
145class _GenericHandler(grpc.GenericRpcHandler):
146
147    def __init__(self, handler):
148        self._handler = handler
149
150    def service(self, handler_call_details):
151        if handler_call_details.method == _UNARY_UNARY:
152            return _MethodHandler(False, False, None, None,
153                                  self._handler.handle_unary_unary, None, None,
154                                  None)
155        elif handler_call_details.method == _UNARY_STREAM:
156            return _MethodHandler(False, True, _DESERIALIZE_REQUEST,
157                                  _SERIALIZE_RESPONSE, None,
158                                  self._handler.handle_unary_stream, None, None)
159        elif handler_call_details.method == _STREAM_UNARY:
160            return _MethodHandler(True, False, _DESERIALIZE_REQUEST,
161                                  _SERIALIZE_RESPONSE, None, None,
162                                  self._handler.handle_stream_unary, None)
163        elif handler_call_details.method == _STREAM_STREAM:
164            return _MethodHandler(True, True, None, None, None, None, None,
165                                  self._handler.handle_stream_stream)
166        else:
167            return None
168
169
170def _unary_unary_multi_callable(channel):
171    return channel.unary_unary(_UNARY_UNARY)
172
173
174def _unary_stream_multi_callable(channel):
175    return channel.unary_stream(_UNARY_STREAM,
176                                request_serializer=_SERIALIZE_REQUEST,
177                                response_deserializer=_DESERIALIZE_RESPONSE)
178
179
180def _stream_unary_multi_callable(channel):
181    return channel.stream_unary(_STREAM_UNARY,
182                                request_serializer=_SERIALIZE_REQUEST,
183                                response_deserializer=_DESERIALIZE_RESPONSE)
184
185
186def _stream_stream_multi_callable(channel):
187    return channel.stream_stream(_STREAM_STREAM)
188
189
190class _ClientCallDetails(
191        collections.namedtuple(
192            '_ClientCallDetails',
193            ('method', 'timeout', 'metadata', 'credentials')),
194        grpc.ClientCallDetails):
195    pass
196
197
198class _GenericClientInterceptor(grpc.UnaryUnaryClientInterceptor,
199                                grpc.UnaryStreamClientInterceptor,
200                                grpc.StreamUnaryClientInterceptor,
201                                grpc.StreamStreamClientInterceptor):
202
203    def __init__(self, interceptor_function):
204        self._fn = interceptor_function
205
206    def intercept_unary_unary(self, continuation, client_call_details, request):
207        new_details, new_request_iterator, postprocess = self._fn(
208            client_call_details, iter((request,)), False, False)
209        response = continuation(new_details, next(new_request_iterator))
210        return postprocess(response) if postprocess else response
211
212    def intercept_unary_stream(self, continuation, client_call_details,
213                               request):
214        new_details, new_request_iterator, postprocess = self._fn(
215            client_call_details, iter((request,)), False, True)
216        response_it = continuation(new_details, new_request_iterator)
217        return postprocess(response_it) if postprocess else response_it
218
219    def intercept_stream_unary(self, continuation, client_call_details,
220                               request_iterator):
221        new_details, new_request_iterator, postprocess = self._fn(
222            client_call_details, request_iterator, True, False)
223        response = continuation(new_details, next(new_request_iterator))
224        return postprocess(response) if postprocess else response
225
226    def intercept_stream_stream(self, continuation, client_call_details,
227                                request_iterator):
228        new_details, new_request_iterator, postprocess = self._fn(
229            client_call_details, request_iterator, True, True)
230        response_it = continuation(new_details, new_request_iterator)
231        return postprocess(response_it) if postprocess else response_it
232
233
234class _LoggingInterceptor(grpc.ServerInterceptor,
235                          grpc.UnaryUnaryClientInterceptor,
236                          grpc.UnaryStreamClientInterceptor,
237                          grpc.StreamUnaryClientInterceptor,
238                          grpc.StreamStreamClientInterceptor):
239
240    def __init__(self, tag, record):
241        self.tag = tag
242        self.record = record
243
244    def intercept_service(self, continuation, handler_call_details):
245        self.record.append(self.tag + ':intercept_service')
246        return continuation(handler_call_details)
247
248    def intercept_unary_unary(self, continuation, client_call_details, request):
249        self.record.append(self.tag + ':intercept_unary_unary')
250        result = continuation(client_call_details, request)
251        assert isinstance(
252            result,
253            grpc.Call), '{} ({}) is not an instance of grpc.Call'.format(
254                result, type(result))
255        assert isinstance(
256            result,
257            grpc.Future), '{} ({}) is not an instance of grpc.Future'.format(
258                result, type(result))
259        return result
260
261    def intercept_unary_stream(self, continuation, client_call_details,
262                               request):
263        self.record.append(self.tag + ':intercept_unary_stream')
264        return continuation(client_call_details, request)
265
266    def intercept_stream_unary(self, continuation, client_call_details,
267                               request_iterator):
268        self.record.append(self.tag + ':intercept_stream_unary')
269        result = continuation(client_call_details, request_iterator)
270        assert isinstance(
271            result,
272            grpc.Call), '{} is not an instance of grpc.Call'.format(result)
273        assert isinstance(
274            result,
275            grpc.Future), '{} is not an instance of grpc.Future'.format(result)
276        return result
277
278    def intercept_stream_stream(self, continuation, client_call_details,
279                                request_iterator):
280        self.record.append(self.tag + ':intercept_stream_stream')
281        return continuation(client_call_details, request_iterator)
282
283
284class _DefectiveClientInterceptor(grpc.UnaryUnaryClientInterceptor):
285
286    def intercept_unary_unary(self, ignored_continuation,
287                              ignored_client_call_details, ignored_request):
288        raise test_control.Defect()
289
290
291def _wrap_request_iterator_stream_interceptor(wrapper):
292
293    def intercept_call(client_call_details, request_iterator, request_streaming,
294                       ignored_response_streaming):
295        if request_streaming:
296            return client_call_details, wrapper(request_iterator), None
297        else:
298            return client_call_details, request_iterator, None
299
300    return _GenericClientInterceptor(intercept_call)
301
302
303def _append_request_header_interceptor(header, value):
304
305    def intercept_call(client_call_details, request_iterator,
306                       ignored_request_streaming, ignored_response_streaming):
307        metadata = []
308        if client_call_details.metadata:
309            metadata = list(client_call_details.metadata)
310        metadata.append((
311            header,
312            value,
313        ))
314        client_call_details = _ClientCallDetails(
315            client_call_details.method, client_call_details.timeout, metadata,
316            client_call_details.credentials)
317        return client_call_details, request_iterator, None
318
319    return _GenericClientInterceptor(intercept_call)
320
321
322class _GenericServerInterceptor(grpc.ServerInterceptor):
323
324    def __init__(self, fn):
325        self._fn = fn
326
327    def intercept_service(self, continuation, handler_call_details):
328        return self._fn(continuation, handler_call_details)
329
330
331def _filter_server_interceptor(condition, interceptor):
332
333    def intercept_service(continuation, handler_call_details):
334        if condition(handler_call_details):
335            return interceptor.intercept_service(continuation,
336                                                 handler_call_details)
337        return continuation(handler_call_details)
338
339    return _GenericServerInterceptor(intercept_service)
340
341
342class InterceptorTest(unittest.TestCase):
343
344    def setUp(self):
345        self._control = test_control.PauseFailControl()
346        self._handler = _Handler(self._control)
347        self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
348
349        self._record = []
350        conditional_interceptor = _filter_server_interceptor(
351            lambda x: ('secret', '42') in x.invocation_metadata,
352            _LoggingInterceptor('s3', self._record))
353
354        self._server = grpc.server(self._server_pool,
355                                   options=(('grpc.so_reuseport', 0),),
356                                   interceptors=(
357                                       _LoggingInterceptor('s1', self._record),
358                                       conditional_interceptor,
359                                       _LoggingInterceptor('s2', self._record),
360                                   ))
361        port = self._server.add_insecure_port('[::]:0')
362        self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
363        self._server.start()
364
365        self._channel = grpc.insecure_channel('localhost:%d' % port)
366
367    def tearDown(self):
368        self._server.stop(None)
369        self._server_pool.shutdown(wait=True)
370        self._channel.close()
371
372    def testTripleRequestMessagesClientInterceptor(self):
373
374        def triple(request_iterator):
375            while True:
376                try:
377                    item = next(request_iterator)
378                    yield item
379                    yield item
380                    yield item
381                except StopIteration:
382                    break
383
384        interceptor = _wrap_request_iterator_stream_interceptor(triple)
385        channel = grpc.intercept_channel(self._channel, interceptor)
386        requests = tuple(
387            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
388
389        multi_callable = _stream_stream_multi_callable(channel)
390        response_iterator = multi_callable(
391            iter(requests),
392            metadata=(
393                ('test',
394                 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
395
396        responses = tuple(response_iterator)
397        self.assertEqual(len(responses), 3 * test_constants.STREAM_LENGTH)
398
399        multi_callable = _stream_stream_multi_callable(self._channel)
400        response_iterator = multi_callable(
401            iter(requests),
402            metadata=(
403                ('test',
404                 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
405
406        responses = tuple(response_iterator)
407        self.assertEqual(len(responses), test_constants.STREAM_LENGTH)
408
409    def testDefectiveClientInterceptor(self):
410        interceptor = _DefectiveClientInterceptor()
411        defective_channel = grpc.intercept_channel(self._channel, interceptor)
412
413        request = b'\x07\x08'
414
415        multi_callable = _unary_unary_multi_callable(defective_channel)
416        call_future = multi_callable.future(
417            request,
418            metadata=(('test',
419                       'InterceptedUnaryRequestBlockingUnaryResponse'),))
420
421        self.assertIsNotNone(call_future.exception())
422        self.assertEqual(call_future.code(), grpc.StatusCode.INTERNAL)
423
424    def testInterceptedHeaderManipulationWithServerSideVerification(self):
425        request = b'\x07\x08'
426
427        channel = grpc.intercept_channel(
428            self._channel, _append_request_header_interceptor('secret', '42'))
429        channel = grpc.intercept_channel(
430            channel, _LoggingInterceptor('c1', self._record),
431            _LoggingInterceptor('c2', self._record))
432
433        self._record[:] = []
434
435        multi_callable = _unary_unary_multi_callable(channel)
436        multi_callable.with_call(
437            request,
438            metadata=(
439                ('test',
440                 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),))
441
442        self.assertSequenceEqual(self._record, [
443            'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
444            's1:intercept_service', 's3:intercept_service',
445            's2:intercept_service'
446        ])
447
448    def testInterceptedUnaryRequestBlockingUnaryResponse(self):
449        request = b'\x07\x08'
450
451        self._record[:] = []
452
453        channel = grpc.intercept_channel(
454            self._channel, _LoggingInterceptor('c1', self._record),
455            _LoggingInterceptor('c2', self._record))
456
457        multi_callable = _unary_unary_multi_callable(channel)
458        multi_callable(
459            request,
460            metadata=(('test',
461                       'InterceptedUnaryRequestBlockingUnaryResponse'),))
462
463        self.assertSequenceEqual(self._record, [
464            'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
465            's1:intercept_service', 's2:intercept_service'
466        ])
467
468    def testInterceptedUnaryRequestBlockingUnaryResponseWithError(self):
469        request = _EXCEPTION_REQUEST
470
471        self._record[:] = []
472
473        channel = grpc.intercept_channel(
474            self._channel, _LoggingInterceptor('c1', self._record),
475            _LoggingInterceptor('c2', self._record))
476
477        multi_callable = _unary_unary_multi_callable(channel)
478        with self.assertRaises(grpc.RpcError) as exception_context:
479            multi_callable(
480                request,
481                metadata=(('test',
482                           'InterceptedUnaryRequestBlockingUnaryResponse'),))
483        exception = exception_context.exception
484        self.assertFalse(exception.cancelled())
485        self.assertFalse(exception.running())
486        self.assertTrue(exception.done())
487        with self.assertRaises(grpc.RpcError):
488            exception.result()
489        self.assertIsInstance(exception.exception(), grpc.RpcError)
490
491    def testInterceptedUnaryRequestBlockingUnaryResponseWithCall(self):
492        request = b'\x07\x08'
493
494        channel = grpc.intercept_channel(
495            self._channel, _LoggingInterceptor('c1', self._record),
496            _LoggingInterceptor('c2', self._record))
497
498        self._record[:] = []
499
500        multi_callable = _unary_unary_multi_callable(channel)
501        multi_callable.with_call(
502            request,
503            metadata=(
504                ('test',
505                 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),))
506
507        self.assertSequenceEqual(self._record, [
508            'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
509            's1:intercept_service', 's2:intercept_service'
510        ])
511
512    def testInterceptedUnaryRequestFutureUnaryResponse(self):
513        request = b'\x07\x08'
514
515        self._record[:] = []
516        channel = grpc.intercept_channel(
517            self._channel, _LoggingInterceptor('c1', self._record),
518            _LoggingInterceptor('c2', self._record))
519
520        multi_callable = _unary_unary_multi_callable(channel)
521        response_future = multi_callable.future(
522            request,
523            metadata=(('test', 'InterceptedUnaryRequestFutureUnaryResponse'),))
524        response_future.result()
525
526        self.assertSequenceEqual(self._record, [
527            'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
528            's1:intercept_service', 's2:intercept_service'
529        ])
530
531    def testInterceptedUnaryRequestStreamResponse(self):
532        request = b'\x37\x58'
533
534        self._record[:] = []
535        channel = grpc.intercept_channel(
536            self._channel, _LoggingInterceptor('c1', self._record),
537            _LoggingInterceptor('c2', self._record))
538
539        multi_callable = _unary_stream_multi_callable(channel)
540        response_iterator = multi_callable(
541            request,
542            metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),))
543        tuple(response_iterator)
544
545        self.assertSequenceEqual(self._record, [
546            'c1:intercept_unary_stream', 'c2:intercept_unary_stream',
547            's1:intercept_service', 's2:intercept_service'
548        ])
549
550    def testInterceptedUnaryRequestStreamResponseWithError(self):
551        request = _EXCEPTION_REQUEST
552
553        self._record[:] = []
554        channel = grpc.intercept_channel(
555            self._channel, _LoggingInterceptor('c1', self._record),
556            _LoggingInterceptor('c2', self._record))
557
558        multi_callable = _unary_stream_multi_callable(channel)
559        response_iterator = multi_callable(
560            request,
561            metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),))
562        with self.assertRaises(grpc.RpcError) as exception_context:
563            tuple(response_iterator)
564        exception = exception_context.exception
565        self.assertFalse(exception.cancelled())
566        self.assertFalse(exception.running())
567        self.assertTrue(exception.done())
568        with self.assertRaises(grpc.RpcError):
569            exception.result()
570        self.assertIsInstance(exception.exception(), grpc.RpcError)
571
572    def testInterceptedStreamRequestBlockingUnaryResponse(self):
573        requests = tuple(
574            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
575        request_iterator = iter(requests)
576
577        self._record[:] = []
578        channel = grpc.intercept_channel(
579            self._channel, _LoggingInterceptor('c1', self._record),
580            _LoggingInterceptor('c2', self._record))
581
582        multi_callable = _stream_unary_multi_callable(channel)
583        multi_callable(
584            request_iterator,
585            metadata=(('test',
586                       'InterceptedStreamRequestBlockingUnaryResponse'),))
587
588        self.assertSequenceEqual(self._record, [
589            'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
590            's1:intercept_service', 's2:intercept_service'
591        ])
592
593    def testInterceptedStreamRequestBlockingUnaryResponseWithCall(self):
594        requests = tuple(
595            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
596        request_iterator = iter(requests)
597
598        self._record[:] = []
599        channel = grpc.intercept_channel(
600            self._channel, _LoggingInterceptor('c1', self._record),
601            _LoggingInterceptor('c2', self._record))
602
603        multi_callable = _stream_unary_multi_callable(channel)
604        multi_callable.with_call(
605            request_iterator,
606            metadata=(
607                ('test',
608                 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
609
610        self.assertSequenceEqual(self._record, [
611            'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
612            's1:intercept_service', 's2:intercept_service'
613        ])
614
615    def testInterceptedStreamRequestFutureUnaryResponse(self):
616        requests = tuple(
617            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
618        request_iterator = iter(requests)
619
620        self._record[:] = []
621        channel = grpc.intercept_channel(
622            self._channel, _LoggingInterceptor('c1', self._record),
623            _LoggingInterceptor('c2', self._record))
624
625        multi_callable = _stream_unary_multi_callable(channel)
626        response_future = multi_callable.future(
627            request_iterator,
628            metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),))
629        response_future.result()
630
631        self.assertSequenceEqual(self._record, [
632            'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
633            's1:intercept_service', 's2:intercept_service'
634        ])
635
636    def testInterceptedStreamRequestFutureUnaryResponseWithError(self):
637        requests = tuple(
638            _EXCEPTION_REQUEST for _ in range(test_constants.STREAM_LENGTH))
639        request_iterator = iter(requests)
640
641        self._record[:] = []
642        channel = grpc.intercept_channel(
643            self._channel, _LoggingInterceptor('c1', self._record),
644            _LoggingInterceptor('c2', self._record))
645
646        multi_callable = _stream_unary_multi_callable(channel)
647        response_future = multi_callable.future(
648            request_iterator,
649            metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),))
650        with self.assertRaises(grpc.RpcError) as exception_context:
651            response_future.result()
652        exception = exception_context.exception
653        self.assertFalse(exception.cancelled())
654        self.assertFalse(exception.running())
655        self.assertTrue(exception.done())
656        with self.assertRaises(grpc.RpcError):
657            exception.result()
658        self.assertIsInstance(exception.exception(), grpc.RpcError)
659
660    def testInterceptedStreamRequestStreamResponse(self):
661        requests = tuple(
662            b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH))
663        request_iterator = iter(requests)
664
665        self._record[:] = []
666        channel = grpc.intercept_channel(
667            self._channel, _LoggingInterceptor('c1', self._record),
668            _LoggingInterceptor('c2', self._record))
669
670        multi_callable = _stream_stream_multi_callable(channel)
671        response_iterator = multi_callable(
672            request_iterator,
673            metadata=(('test', 'InterceptedStreamRequestStreamResponse'),))
674        tuple(response_iterator)
675
676        self.assertSequenceEqual(self._record, [
677            'c1:intercept_stream_stream', 'c2:intercept_stream_stream',
678            's1:intercept_service', 's2:intercept_service'
679        ])
680
681    def testInterceptedStreamRequestStreamResponseWithError(self):
682        requests = tuple(
683            _EXCEPTION_REQUEST for _ in range(test_constants.STREAM_LENGTH))
684        request_iterator = iter(requests)
685
686        self._record[:] = []
687        channel = grpc.intercept_channel(
688            self._channel, _LoggingInterceptor('c1', self._record),
689            _LoggingInterceptor('c2', self._record))
690
691        multi_callable = _stream_stream_multi_callable(channel)
692        response_iterator = multi_callable(
693            request_iterator,
694            metadata=(('test', 'InterceptedStreamRequestStreamResponse'),))
695        with self.assertRaises(grpc.RpcError) as exception_context:
696            tuple(response_iterator)
697        exception = exception_context.exception
698        self.assertFalse(exception.cancelled())
699        self.assertFalse(exception.running())
700        self.assertTrue(exception.done())
701        with self.assertRaises(grpc.RpcError):
702            exception.result()
703        self.assertIsInstance(exception.exception(), grpc.RpcError)
704
705
706if __name__ == '__main__':
707    logging.basicConfig()
708    unittest.main(verbosity=2)
709