1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import collections
16import contextlib
17import distutils.spawn
18import errno
19import os
20import shutil
21import subprocess
22import sys
23import tempfile
24import threading
25import unittest
26
27import grpc
28import grpc.experimental
29from six import moves
30
31import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2
32import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2
33import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2
34import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc
35from tests.unit import test_common
36from tests.unit.framework.common import test_constants
37
38# Identifiers of entities we expect to find in the generated module.
39STUB_IDENTIFIER = 'TestServiceStub'
40SERVICER_IDENTIFIER = 'TestServiceServicer'
41ADD_SERVICER_TO_SERVER_IDENTIFIER = 'add_TestServiceServicer_to_server'
42
43
44class _ServicerMethods(object):
45
46    def __init__(self):
47        self._condition = threading.Condition()
48        self._paused = False
49        self._fail = False
50
51    @contextlib.contextmanager
52    def pause(self):  # pylint: disable=invalid-name
53        with self._condition:
54            self._paused = True
55        yield
56        with self._condition:
57            self._paused = False
58            self._condition.notify_all()
59
60    @contextlib.contextmanager
61    def fail(self):  # pylint: disable=invalid-name
62        with self._condition:
63            self._fail = True
64        yield
65        with self._condition:
66            self._fail = False
67
68    def _control(self):  # pylint: disable=invalid-name
69        with self._condition:
70            if self._fail:
71                raise ValueError()
72            while self._paused:
73                self._condition.wait()
74
75    def UnaryCall(self, request, unused_rpc_context):
76        response = response_pb2.SimpleResponse()
77        response.payload.payload_type = payload_pb2.COMPRESSABLE
78        response.payload.payload_compressable = 'a' * request.response_size
79        self._control()
80        return response
81
82    def StreamingOutputCall(self, request, unused_rpc_context):
83        for parameter in request.response_parameters:
84            response = response_pb2.StreamingOutputCallResponse()
85            response.payload.payload_type = payload_pb2.COMPRESSABLE
86            response.payload.payload_compressable = 'a' * parameter.size
87            self._control()
88            yield response
89
90    def StreamingInputCall(self, request_iter, unused_rpc_context):
91        response = response_pb2.StreamingInputCallResponse()
92        aggregated_payload_size = 0
93        for request in request_iter:
94            aggregated_payload_size += len(request.payload.payload_compressable)
95        response.aggregated_payload_size = aggregated_payload_size
96        self._control()
97        return response
98
99    def FullDuplexCall(self, request_iter, unused_rpc_context):
100        for request in request_iter:
101            for parameter in request.response_parameters:
102                response = response_pb2.StreamingOutputCallResponse()
103                response.payload.payload_type = payload_pb2.COMPRESSABLE
104                response.payload.payload_compressable = 'a' * parameter.size
105                self._control()
106                yield response
107
108    def HalfDuplexCall(self, request_iter, unused_rpc_context):
109        responses = []
110        for request in request_iter:
111            for parameter in request.response_parameters:
112                response = response_pb2.StreamingOutputCallResponse()
113                response.payload.payload_type = payload_pb2.COMPRESSABLE
114                response.payload.payload_compressable = 'a' * parameter.size
115                self._control()
116                responses.append(response)
117        for response in responses:
118            yield response
119
120
121class _Service(
122        collections.namedtuple('_Service', (
123            'servicer_methods',
124            'server',
125            'stub',
126        ))):
127    """A live and running service.
128
129  Attributes:
130    servicer_methods: The _ServicerMethods servicing RPCs.
131    server: The grpc.Server servicing RPCs.
132    stub: A stub on which to invoke RPCs.
133  """
134
135
136def _CreateService():
137    """Provides a servicer backend and a stub.
138
139  Returns:
140    A _Service with which to test RPCs.
141  """
142    servicer_methods = _ServicerMethods()
143
144    class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
145
146        def UnaryCall(self, request, context):
147            return servicer_methods.UnaryCall(request, context)
148
149        def StreamingOutputCall(self, request, context):
150            return servicer_methods.StreamingOutputCall(request, context)
151
152        def StreamingInputCall(self, request_iterator, context):
153            return servicer_methods.StreamingInputCall(request_iterator,
154                                                       context)
155
156        def FullDuplexCall(self, request_iterator, context):
157            return servicer_methods.FullDuplexCall(request_iterator, context)
158
159        def HalfDuplexCall(self, request_iterator, context):
160            return servicer_methods.HalfDuplexCall(request_iterator, context)
161
162    server = test_common.test_server()
163    getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
164                                                                 server)
165    port = server.add_insecure_port('[::]:0')
166    server.start()
167    channel = grpc.insecure_channel('localhost:{}'.format(port))
168    stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
169    return _Service(servicer_methods, server, stub)
170
171
172def _CreateIncompleteService():
173    """Provides a servicer backend that fails to implement methods and its stub.
174
175  Returns:
176    A _Service with which to test RPCs. The returned _Service's
177      servicer_methods implements none of the methods required of it.
178  """
179
180    class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
181        pass
182
183    server = test_common.test_server()
184    getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
185                                                                 server)
186    port = server.add_insecure_port('[::]:0')
187    server.start()
188    channel = grpc.insecure_channel('localhost:{}'.format(port))
189    stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
190    return _Service(None, server, stub)
191
192
193def _streaming_input_request_iterator():
194    for _ in range(3):
195        request = request_pb2.StreamingInputCallRequest()
196        request.payload.payload_type = payload_pb2.COMPRESSABLE
197        request.payload.payload_compressable = 'a'
198        yield request
199
200
201def _streaming_output_request():
202    request = request_pb2.StreamingOutputCallRequest()
203    sizes = [1, 2, 3]
204    request.response_parameters.add(size=sizes[0], interval_us=0)
205    request.response_parameters.add(size=sizes[1], interval_us=0)
206    request.response_parameters.add(size=sizes[2], interval_us=0)
207    return request
208
209
210def _full_duplex_request_iterator():
211    request = request_pb2.StreamingOutputCallRequest()
212    request.response_parameters.add(size=1, interval_us=0)
213    yield request
214    request = request_pb2.StreamingOutputCallRequest()
215    request.response_parameters.add(size=2, interval_us=0)
216    request.response_parameters.add(size=3, interval_us=0)
217    yield request
218
219
220class PythonPluginTest(unittest.TestCase):
221    """Test case for the gRPC Python protoc-plugin.
222
223  While reading these tests, remember that the futures API
224  (`stub.method.future()`) only gives futures for the *response-unary*
225  methods and does not exist for response-streaming methods.
226  """
227
228    def testImportAttributes(self):
229        # check that we can access the generated module and its members.
230        self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None))
231        self.assertIsNotNone(
232            getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None))
233        self.assertIsNotNone(
234            getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None))
235
236    def testUpDown(self):
237        service = _CreateService()
238        self.assertIsNotNone(service.servicer_methods)
239        self.assertIsNotNone(service.server)
240        self.assertIsNotNone(service.stub)
241        service.server.stop(None)
242
243    def testIncompleteServicer(self):
244        service = _CreateIncompleteService()
245        request = request_pb2.SimpleRequest(response_size=13)
246        with self.assertRaises(grpc.RpcError) as exception_context:
247            service.stub.UnaryCall(request)
248        self.assertIs(exception_context.exception.code(),
249                      grpc.StatusCode.UNIMPLEMENTED)
250        service.server.stop(None)
251
252    def testUnaryCall(self):
253        service = _CreateService()
254        request = request_pb2.SimpleRequest(response_size=13)
255        response = service.stub.UnaryCall(request)
256        expected_response = service.servicer_methods.UnaryCall(
257            request, 'not a real context!')
258        self.assertEqual(expected_response, response)
259        service.server.stop(None)
260
261    def testUnaryCallFuture(self):
262        service = _CreateService()
263        request = request_pb2.SimpleRequest(response_size=13)
264        # Check that the call does not block waiting for the server to respond.
265        with service.servicer_methods.pause():
266            response_future = service.stub.UnaryCall.future(request)
267        response = response_future.result()
268        expected_response = service.servicer_methods.UnaryCall(
269            request, 'not a real RpcContext!')
270        self.assertEqual(expected_response, response)
271        service.server.stop(None)
272
273    def testUnaryCallFutureExpired(self):
274        service = _CreateService()
275        request = request_pb2.SimpleRequest(response_size=13)
276        with service.servicer_methods.pause():
277            response_future = service.stub.UnaryCall.future(
278                request, timeout=test_constants.SHORT_TIMEOUT)
279            with self.assertRaises(grpc.RpcError) as exception_context:
280                response_future.result()
281        self.assertIs(exception_context.exception.code(),
282                      grpc.StatusCode.DEADLINE_EXCEEDED)
283        self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED)
284        service.server.stop(None)
285
286    def testUnaryCallFutureCancelled(self):
287        service = _CreateService()
288        request = request_pb2.SimpleRequest(response_size=13)
289        with service.servicer_methods.pause():
290            response_future = service.stub.UnaryCall.future(request)
291            response_future.cancel()
292        self.assertTrue(response_future.cancelled())
293        self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED)
294        service.server.stop(None)
295
296    def testUnaryCallFutureFailed(self):
297        service = _CreateService()
298        request = request_pb2.SimpleRequest(response_size=13)
299        with service.servicer_methods.fail():
300            response_future = service.stub.UnaryCall.future(request)
301            self.assertIsNotNone(response_future.exception())
302        self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
303        service.server.stop(None)
304
305    def testStreamingOutputCall(self):
306        service = _CreateService()
307        request = _streaming_output_request()
308        responses = service.stub.StreamingOutputCall(request)
309        expected_responses = service.servicer_methods.StreamingOutputCall(
310            request, 'not a real RpcContext!')
311        for expected_response, response in moves.zip_longest(
312                expected_responses, responses):
313            self.assertEqual(expected_response, response)
314        service.server.stop(None)
315
316    def testStreamingOutputCallExpired(self):
317        service = _CreateService()
318        request = _streaming_output_request()
319        with service.servicer_methods.pause():
320            responses = service.stub.StreamingOutputCall(
321                request, timeout=test_constants.SHORT_TIMEOUT)
322            with self.assertRaises(grpc.RpcError) as exception_context:
323                list(responses)
324        self.assertIs(exception_context.exception.code(),
325                      grpc.StatusCode.DEADLINE_EXCEEDED)
326        service.server.stop(None)
327
328    def testStreamingOutputCallCancelled(self):
329        service = _CreateService()
330        request = _streaming_output_request()
331        responses = service.stub.StreamingOutputCall(request)
332        next(responses)
333        responses.cancel()
334        with self.assertRaises(grpc.RpcError) as exception_context:
335            next(responses)
336        self.assertIs(responses.code(), grpc.StatusCode.CANCELLED)
337        service.server.stop(None)
338
339    def testStreamingOutputCallFailed(self):
340        service = _CreateService()
341        request = _streaming_output_request()
342        with service.servicer_methods.fail():
343            responses = service.stub.StreamingOutputCall(request)
344            self.assertIsNotNone(responses)
345            with self.assertRaises(grpc.RpcError) as exception_context:
346                next(responses)
347        self.assertIs(exception_context.exception.code(),
348                      grpc.StatusCode.UNKNOWN)
349        service.server.stop(None)
350
351    def testStreamingInputCall(self):
352        service = _CreateService()
353        response = service.stub.StreamingInputCall(
354            _streaming_input_request_iterator())
355        expected_response = service.servicer_methods.StreamingInputCall(
356            _streaming_input_request_iterator(), 'not a real RpcContext!')
357        self.assertEqual(expected_response, response)
358        service.server.stop(None)
359
360    def testStreamingInputCallFuture(self):
361        service = _CreateService()
362        with service.servicer_methods.pause():
363            response_future = service.stub.StreamingInputCall.future(
364                _streaming_input_request_iterator())
365        response = response_future.result()
366        expected_response = service.servicer_methods.StreamingInputCall(
367            _streaming_input_request_iterator(), 'not a real RpcContext!')
368        self.assertEqual(expected_response, response)
369        service.server.stop(None)
370
371    def testStreamingInputCallFutureExpired(self):
372        service = _CreateService()
373        with service.servicer_methods.pause():
374            response_future = service.stub.StreamingInputCall.future(
375                _streaming_input_request_iterator(),
376                timeout=test_constants.SHORT_TIMEOUT)
377            with self.assertRaises(grpc.RpcError) as exception_context:
378                response_future.result()
379        self.assertIsInstance(response_future.exception(), grpc.RpcError)
380        self.assertIs(response_future.exception().code(),
381                      grpc.StatusCode.DEADLINE_EXCEEDED)
382        self.assertIs(exception_context.exception.code(),
383                      grpc.StatusCode.DEADLINE_EXCEEDED)
384        service.server.stop(None)
385
386    def testStreamingInputCallFutureCancelled(self):
387        service = _CreateService()
388        with service.servicer_methods.pause():
389            response_future = service.stub.StreamingInputCall.future(
390                _streaming_input_request_iterator())
391            response_future.cancel()
392        self.assertTrue(response_future.cancelled())
393        with self.assertRaises(grpc.FutureCancelledError):
394            response_future.result()
395        service.server.stop(None)
396
397    def testStreamingInputCallFutureFailed(self):
398        service = _CreateService()
399        with service.servicer_methods.fail():
400            response_future = service.stub.StreamingInputCall.future(
401                _streaming_input_request_iterator())
402            self.assertIsNotNone(response_future.exception())
403            self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
404        service.server.stop(None)
405
406    def testFullDuplexCall(self):
407        service = _CreateService()
408        responses = service.stub.FullDuplexCall(_full_duplex_request_iterator())
409        expected_responses = service.servicer_methods.FullDuplexCall(
410            _full_duplex_request_iterator(), 'not a real RpcContext!')
411        for expected_response, response in moves.zip_longest(
412                expected_responses, responses):
413            self.assertEqual(expected_response, response)
414        service.server.stop(None)
415
416    def testFullDuplexCallExpired(self):
417        request_iterator = _full_duplex_request_iterator()
418        service = _CreateService()
419        with service.servicer_methods.pause():
420            responses = service.stub.FullDuplexCall(
421                request_iterator, timeout=test_constants.SHORT_TIMEOUT)
422            with self.assertRaises(grpc.RpcError) as exception_context:
423                list(responses)
424        self.assertIs(exception_context.exception.code(),
425                      grpc.StatusCode.DEADLINE_EXCEEDED)
426        service.server.stop(None)
427
428    def testFullDuplexCallCancelled(self):
429        service = _CreateService()
430        request_iterator = _full_duplex_request_iterator()
431        responses = service.stub.FullDuplexCall(request_iterator)
432        next(responses)
433        responses.cancel()
434        with self.assertRaises(grpc.RpcError) as exception_context:
435            next(responses)
436        self.assertIs(exception_context.exception.code(),
437                      grpc.StatusCode.CANCELLED)
438        service.server.stop(None)
439
440    def testFullDuplexCallFailed(self):
441        request_iterator = _full_duplex_request_iterator()
442        service = _CreateService()
443        with service.servicer_methods.fail():
444            responses = service.stub.FullDuplexCall(request_iterator)
445            with self.assertRaises(grpc.RpcError) as exception_context:
446                next(responses)
447        self.assertIs(exception_context.exception.code(),
448                      grpc.StatusCode.UNKNOWN)
449        service.server.stop(None)
450
451    def testHalfDuplexCall(self):
452        service = _CreateService()
453
454        def half_duplex_request_iterator():
455            request = request_pb2.StreamingOutputCallRequest()
456            request.response_parameters.add(size=1, interval_us=0)
457            yield request
458            request = request_pb2.StreamingOutputCallRequest()
459            request.response_parameters.add(size=2, interval_us=0)
460            request.response_parameters.add(size=3, interval_us=0)
461            yield request
462
463        responses = service.stub.HalfDuplexCall(half_duplex_request_iterator())
464        expected_responses = service.servicer_methods.HalfDuplexCall(
465            half_duplex_request_iterator(), 'not a real RpcContext!')
466        for expected_response, response in moves.zip_longest(
467                expected_responses, responses):
468            self.assertEqual(expected_response, response)
469        service.server.stop(None)
470
471    def testHalfDuplexCallWedged(self):
472        condition = threading.Condition()
473        wait_cell = [False]
474
475        @contextlib.contextmanager
476        def wait():  # pylint: disable=invalid-name
477            # Where's Python 3's 'nonlocal' statement when you need it?
478            with condition:
479                wait_cell[0] = True
480            yield
481            with condition:
482                wait_cell[0] = False
483                condition.notify_all()
484
485        def half_duplex_request_iterator():
486            request = request_pb2.StreamingOutputCallRequest()
487            request.response_parameters.add(size=1, interval_us=0)
488            yield request
489            with condition:
490                while wait_cell[0]:
491                    condition.wait()
492
493        service = _CreateService()
494        with wait():
495            responses = service.stub.HalfDuplexCall(
496                half_duplex_request_iterator(),
497                timeout=test_constants.SHORT_TIMEOUT)
498            # half-duplex waits for the client to send all info
499            with self.assertRaises(grpc.RpcError) as exception_context:
500                next(responses)
501        self.assertIs(exception_context.exception.code(),
502                      grpc.StatusCode.DEADLINE_EXCEEDED)
503        service.server.stop(None)
504
505
506@unittest.skipIf(sys.version_info[0] < 3 or sys.version_info[1] < 6,
507                 "Unsupported on Python 2.")
508class SimpleStubsPluginTest(unittest.TestCase):
509    servicer_methods = _ServicerMethods()
510
511    class Servicer(service_pb2_grpc.TestServiceServicer):
512
513        def UnaryCall(self, request, context):
514            return SimpleStubsPluginTest.servicer_methods.UnaryCall(
515                request, context)
516
517        def StreamingOutputCall(self, request, context):
518            return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall(
519                request, context)
520
521        def StreamingInputCall(self, request_iterator, context):
522            return SimpleStubsPluginTest.servicer_methods.StreamingInputCall(
523                request_iterator, context)
524
525        def FullDuplexCall(self, request_iterator, context):
526            return SimpleStubsPluginTest.servicer_methods.FullDuplexCall(
527                request_iterator, context)
528
529        def HalfDuplexCall(self, request_iterator, context):
530            return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall(
531                request_iterator, context)
532
533    def setUp(self):
534        super(SimpleStubsPluginTest, self).setUp()
535        self._server = test_common.test_server()
536        service_pb2_grpc.add_TestServiceServicer_to_server(
537            self.Servicer(), self._server)
538        self._port = self._server.add_insecure_port('[::]:0')
539        self._server.start()
540        self._target = 'localhost:{}'.format(self._port)
541
542    def tearDown(self):
543        self._server.stop(None)
544        super(SimpleStubsPluginTest, self).tearDown()
545
546    def testUnaryCall(self):
547        request = request_pb2.SimpleRequest(response_size=13)
548        response = service_pb2_grpc.TestService.UnaryCall(
549            request,
550            self._target,
551            channel_credentials=grpc.experimental.insecure_channel_credentials(
552            ),
553            wait_for_ready=True)
554        expected_response = self.servicer_methods.UnaryCall(
555            request, 'not a real context!')
556        self.assertEqual(expected_response, response)
557
558    def testUnaryCallInsecureSugar(self):
559        request = request_pb2.SimpleRequest(response_size=13)
560        response = service_pb2_grpc.TestService.UnaryCall(request,
561                                                          self._target,
562                                                          insecure=True,
563                                                          wait_for_ready=True)
564        expected_response = self.servicer_methods.UnaryCall(
565            request, 'not a real context!')
566        self.assertEqual(expected_response, response)
567
568    def testStreamingOutputCall(self):
569        request = _streaming_output_request()
570        expected_responses = self.servicer_methods.StreamingOutputCall(
571            request, 'not a real RpcContext!')
572        responses = service_pb2_grpc.TestService.StreamingOutputCall(
573            request,
574            self._target,
575            channel_credentials=grpc.experimental.insecure_channel_credentials(
576            ),
577            wait_for_ready=True)
578        for expected_response, response in moves.zip_longest(
579                expected_responses, responses):
580            self.assertEqual(expected_response, response)
581
582    def testStreamingInputCall(self):
583        response = service_pb2_grpc.TestService.StreamingInputCall(
584            _streaming_input_request_iterator(),
585            self._target,
586            channel_credentials=grpc.experimental.insecure_channel_credentials(
587            ),
588            wait_for_ready=True)
589        expected_response = self.servicer_methods.StreamingInputCall(
590            _streaming_input_request_iterator(), 'not a real RpcContext!')
591        self.assertEqual(expected_response, response)
592
593    def testFullDuplexCall(self):
594        responses = service_pb2_grpc.TestService.FullDuplexCall(
595            _full_duplex_request_iterator(),
596            self._target,
597            channel_credentials=grpc.experimental.insecure_channel_credentials(
598            ),
599            wait_for_ready=True)
600        expected_responses = self.servicer_methods.FullDuplexCall(
601            _full_duplex_request_iterator(), 'not a real RpcContext!')
602        for expected_response, response in moves.zip_longest(
603                expected_responses, responses):
604            self.assertEqual(expected_response, response)
605
606    def testHalfDuplexCall(self):
607
608        def half_duplex_request_iterator():
609            request = request_pb2.StreamingOutputCallRequest()
610            request.response_parameters.add(size=1, interval_us=0)
611            yield request
612            request = request_pb2.StreamingOutputCallRequest()
613            request.response_parameters.add(size=2, interval_us=0)
614            request.response_parameters.add(size=3, interval_us=0)
615            yield request
616
617        responses = service_pb2_grpc.TestService.HalfDuplexCall(
618            half_duplex_request_iterator(),
619            self._target,
620            channel_credentials=grpc.experimental.insecure_channel_credentials(
621            ),
622            wait_for_ready=True)
623        expected_responses = self.servicer_methods.HalfDuplexCall(
624            half_duplex_request_iterator(), 'not a real RpcContext!')
625        for expected_response, response in moves.zip_longest(
626                expected_responses, responses):
627            self.assertEqual(expected_response, response)
628
629
630class ModuleMainTest(unittest.TestCase):
631    """Test case for running `python -m grpc_tools.protoc`.
632    """
633
634    def test_clean_output(self):
635        if sys.executable is None:
636            raise unittest.SkipTest(
637                "Running on a interpreter that cannot be invoked from the CLI.")
638        proto_dir_path = os.path.join("src", "proto")
639        test_proto_path = os.path.join(proto_dir_path, "grpc", "testing",
640                                       "empty.proto")
641        streams = tuple(tempfile.TemporaryFile() for _ in range(2))
642        work_dir = tempfile.mkdtemp()
643        try:
644            invocation = (sys.executable, "-m", "grpc_tools.protoc",
645                          "--proto_path", proto_dir_path, "--python_out",
646                          work_dir, "--grpc_python_out", work_dir,
647                          test_proto_path)
648            proc = subprocess.Popen(invocation,
649                                    stdout=streams[0],
650                                    stderr=streams[1])
651            proc.wait()
652            outs = []
653            for stream in streams:
654                stream.seek(0)
655                self.assertEqual(0, len(stream.read()))
656            self.assertEqual(0, proc.returncode)
657        except Exception:  # pylint: disable=broad-except
658            shutil.rmtree(work_dir)
659
660
661if __name__ == '__main__':
662    unittest.main(verbosity=2)
663