1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import collections 16import contextlib 17import distutils.spawn 18import errno 19import os 20import shutil 21import subprocess 22import sys 23import tempfile 24import threading 25import unittest 26 27import grpc 28import grpc.experimental 29from six import moves 30 31import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2 32import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2 33import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2 34import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc 35from tests.unit import test_common 36from tests.unit.framework.common import test_constants 37 38# Identifiers of entities we expect to find in the generated module. 39STUB_IDENTIFIER = 'TestServiceStub' 40SERVICER_IDENTIFIER = 'TestServiceServicer' 41ADD_SERVICER_TO_SERVER_IDENTIFIER = 'add_TestServiceServicer_to_server' 42 43 44class _ServicerMethods(object): 45 46 def __init__(self): 47 self._condition = threading.Condition() 48 self._paused = False 49 self._fail = False 50 51 @contextlib.contextmanager 52 def pause(self): # pylint: disable=invalid-name 53 with self._condition: 54 self._paused = True 55 yield 56 with self._condition: 57 self._paused = False 58 self._condition.notify_all() 59 60 @contextlib.contextmanager 61 def fail(self): # pylint: disable=invalid-name 62 with self._condition: 63 self._fail = True 64 yield 65 with self._condition: 66 self._fail = False 67 68 def _control(self): # pylint: disable=invalid-name 69 with self._condition: 70 if self._fail: 71 raise ValueError() 72 while self._paused: 73 self._condition.wait() 74 75 def UnaryCall(self, request, unused_rpc_context): 76 response = response_pb2.SimpleResponse() 77 response.payload.payload_type = payload_pb2.COMPRESSABLE 78 response.payload.payload_compressable = 'a' * request.response_size 79 self._control() 80 return response 81 82 def StreamingOutputCall(self, request, unused_rpc_context): 83 for parameter in request.response_parameters: 84 response = response_pb2.StreamingOutputCallResponse() 85 response.payload.payload_type = payload_pb2.COMPRESSABLE 86 response.payload.payload_compressable = 'a' * parameter.size 87 self._control() 88 yield response 89 90 def StreamingInputCall(self, request_iter, unused_rpc_context): 91 response = response_pb2.StreamingInputCallResponse() 92 aggregated_payload_size = 0 93 for request in request_iter: 94 aggregated_payload_size += len(request.payload.payload_compressable) 95 response.aggregated_payload_size = aggregated_payload_size 96 self._control() 97 return response 98 99 def FullDuplexCall(self, request_iter, unused_rpc_context): 100 for request in request_iter: 101 for parameter in request.response_parameters: 102 response = response_pb2.StreamingOutputCallResponse() 103 response.payload.payload_type = payload_pb2.COMPRESSABLE 104 response.payload.payload_compressable = 'a' * parameter.size 105 self._control() 106 yield response 107 108 def HalfDuplexCall(self, request_iter, unused_rpc_context): 109 responses = [] 110 for request in request_iter: 111 for parameter in request.response_parameters: 112 response = response_pb2.StreamingOutputCallResponse() 113 response.payload.payload_type = payload_pb2.COMPRESSABLE 114 response.payload.payload_compressable = 'a' * parameter.size 115 self._control() 116 responses.append(response) 117 for response in responses: 118 yield response 119 120 121class _Service( 122 collections.namedtuple('_Service', ( 123 'servicer_methods', 124 'server', 125 'stub', 126 ))): 127 """A live and running service. 128 129 Attributes: 130 servicer_methods: The _ServicerMethods servicing RPCs. 131 server: The grpc.Server servicing RPCs. 132 stub: A stub on which to invoke RPCs. 133 """ 134 135 136def _CreateService(): 137 """Provides a servicer backend and a stub. 138 139 Returns: 140 A _Service with which to test RPCs. 141 """ 142 servicer_methods = _ServicerMethods() 143 144 class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): 145 146 def UnaryCall(self, request, context): 147 return servicer_methods.UnaryCall(request, context) 148 149 def StreamingOutputCall(self, request, context): 150 return servicer_methods.StreamingOutputCall(request, context) 151 152 def StreamingInputCall(self, request_iterator, context): 153 return servicer_methods.StreamingInputCall(request_iterator, 154 context) 155 156 def FullDuplexCall(self, request_iterator, context): 157 return servicer_methods.FullDuplexCall(request_iterator, context) 158 159 def HalfDuplexCall(self, request_iterator, context): 160 return servicer_methods.HalfDuplexCall(request_iterator, context) 161 162 server = test_common.test_server() 163 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), 164 server) 165 port = server.add_insecure_port('[::]:0') 166 server.start() 167 channel = grpc.insecure_channel('localhost:{}'.format(port)) 168 stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) 169 return _Service(servicer_methods, server, stub) 170 171 172def _CreateIncompleteService(): 173 """Provides a servicer backend that fails to implement methods and its stub. 174 175 Returns: 176 A _Service with which to test RPCs. The returned _Service's 177 servicer_methods implements none of the methods required of it. 178 """ 179 180 class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): 181 pass 182 183 server = test_common.test_server() 184 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), 185 server) 186 port = server.add_insecure_port('[::]:0') 187 server.start() 188 channel = grpc.insecure_channel('localhost:{}'.format(port)) 189 stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) 190 return _Service(None, server, stub) 191 192 193def _streaming_input_request_iterator(): 194 for _ in range(3): 195 request = request_pb2.StreamingInputCallRequest() 196 request.payload.payload_type = payload_pb2.COMPRESSABLE 197 request.payload.payload_compressable = 'a' 198 yield request 199 200 201def _streaming_output_request(): 202 request = request_pb2.StreamingOutputCallRequest() 203 sizes = [1, 2, 3] 204 request.response_parameters.add(size=sizes[0], interval_us=0) 205 request.response_parameters.add(size=sizes[1], interval_us=0) 206 request.response_parameters.add(size=sizes[2], interval_us=0) 207 return request 208 209 210def _full_duplex_request_iterator(): 211 request = request_pb2.StreamingOutputCallRequest() 212 request.response_parameters.add(size=1, interval_us=0) 213 yield request 214 request = request_pb2.StreamingOutputCallRequest() 215 request.response_parameters.add(size=2, interval_us=0) 216 request.response_parameters.add(size=3, interval_us=0) 217 yield request 218 219 220class PythonPluginTest(unittest.TestCase): 221 """Test case for the gRPC Python protoc-plugin. 222 223 While reading these tests, remember that the futures API 224 (`stub.method.future()`) only gives futures for the *response-unary* 225 methods and does not exist for response-streaming methods. 226 """ 227 228 def testImportAttributes(self): 229 # check that we can access the generated module and its members. 230 self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None)) 231 self.assertIsNotNone( 232 getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None)) 233 self.assertIsNotNone( 234 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None)) 235 236 def testUpDown(self): 237 service = _CreateService() 238 self.assertIsNotNone(service.servicer_methods) 239 self.assertIsNotNone(service.server) 240 self.assertIsNotNone(service.stub) 241 service.server.stop(None) 242 243 def testIncompleteServicer(self): 244 service = _CreateIncompleteService() 245 request = request_pb2.SimpleRequest(response_size=13) 246 with self.assertRaises(grpc.RpcError) as exception_context: 247 service.stub.UnaryCall(request) 248 self.assertIs(exception_context.exception.code(), 249 grpc.StatusCode.UNIMPLEMENTED) 250 service.server.stop(None) 251 252 def testUnaryCall(self): 253 service = _CreateService() 254 request = request_pb2.SimpleRequest(response_size=13) 255 response = service.stub.UnaryCall(request) 256 expected_response = service.servicer_methods.UnaryCall( 257 request, 'not a real context!') 258 self.assertEqual(expected_response, response) 259 service.server.stop(None) 260 261 def testUnaryCallFuture(self): 262 service = _CreateService() 263 request = request_pb2.SimpleRequest(response_size=13) 264 # Check that the call does not block waiting for the server to respond. 265 with service.servicer_methods.pause(): 266 response_future = service.stub.UnaryCall.future(request) 267 response = response_future.result() 268 expected_response = service.servicer_methods.UnaryCall( 269 request, 'not a real RpcContext!') 270 self.assertEqual(expected_response, response) 271 service.server.stop(None) 272 273 def testUnaryCallFutureExpired(self): 274 service = _CreateService() 275 request = request_pb2.SimpleRequest(response_size=13) 276 with service.servicer_methods.pause(): 277 response_future = service.stub.UnaryCall.future( 278 request, timeout=test_constants.SHORT_TIMEOUT) 279 with self.assertRaises(grpc.RpcError) as exception_context: 280 response_future.result() 281 self.assertIs(exception_context.exception.code(), 282 grpc.StatusCode.DEADLINE_EXCEEDED) 283 self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED) 284 service.server.stop(None) 285 286 def testUnaryCallFutureCancelled(self): 287 service = _CreateService() 288 request = request_pb2.SimpleRequest(response_size=13) 289 with service.servicer_methods.pause(): 290 response_future = service.stub.UnaryCall.future(request) 291 response_future.cancel() 292 self.assertTrue(response_future.cancelled()) 293 self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED) 294 service.server.stop(None) 295 296 def testUnaryCallFutureFailed(self): 297 service = _CreateService() 298 request = request_pb2.SimpleRequest(response_size=13) 299 with service.servicer_methods.fail(): 300 response_future = service.stub.UnaryCall.future(request) 301 self.assertIsNotNone(response_future.exception()) 302 self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) 303 service.server.stop(None) 304 305 def testStreamingOutputCall(self): 306 service = _CreateService() 307 request = _streaming_output_request() 308 responses = service.stub.StreamingOutputCall(request) 309 expected_responses = service.servicer_methods.StreamingOutputCall( 310 request, 'not a real RpcContext!') 311 for expected_response, response in moves.zip_longest( 312 expected_responses, responses): 313 self.assertEqual(expected_response, response) 314 service.server.stop(None) 315 316 def testStreamingOutputCallExpired(self): 317 service = _CreateService() 318 request = _streaming_output_request() 319 with service.servicer_methods.pause(): 320 responses = service.stub.StreamingOutputCall( 321 request, timeout=test_constants.SHORT_TIMEOUT) 322 with self.assertRaises(grpc.RpcError) as exception_context: 323 list(responses) 324 self.assertIs(exception_context.exception.code(), 325 grpc.StatusCode.DEADLINE_EXCEEDED) 326 service.server.stop(None) 327 328 def testStreamingOutputCallCancelled(self): 329 service = _CreateService() 330 request = _streaming_output_request() 331 responses = service.stub.StreamingOutputCall(request) 332 next(responses) 333 responses.cancel() 334 with self.assertRaises(grpc.RpcError) as exception_context: 335 next(responses) 336 self.assertIs(responses.code(), grpc.StatusCode.CANCELLED) 337 service.server.stop(None) 338 339 def testStreamingOutputCallFailed(self): 340 service = _CreateService() 341 request = _streaming_output_request() 342 with service.servicer_methods.fail(): 343 responses = service.stub.StreamingOutputCall(request) 344 self.assertIsNotNone(responses) 345 with self.assertRaises(grpc.RpcError) as exception_context: 346 next(responses) 347 self.assertIs(exception_context.exception.code(), 348 grpc.StatusCode.UNKNOWN) 349 service.server.stop(None) 350 351 def testStreamingInputCall(self): 352 service = _CreateService() 353 response = service.stub.StreamingInputCall( 354 _streaming_input_request_iterator()) 355 expected_response = service.servicer_methods.StreamingInputCall( 356 _streaming_input_request_iterator(), 'not a real RpcContext!') 357 self.assertEqual(expected_response, response) 358 service.server.stop(None) 359 360 def testStreamingInputCallFuture(self): 361 service = _CreateService() 362 with service.servicer_methods.pause(): 363 response_future = service.stub.StreamingInputCall.future( 364 _streaming_input_request_iterator()) 365 response = response_future.result() 366 expected_response = service.servicer_methods.StreamingInputCall( 367 _streaming_input_request_iterator(), 'not a real RpcContext!') 368 self.assertEqual(expected_response, response) 369 service.server.stop(None) 370 371 def testStreamingInputCallFutureExpired(self): 372 service = _CreateService() 373 with service.servicer_methods.pause(): 374 response_future = service.stub.StreamingInputCall.future( 375 _streaming_input_request_iterator(), 376 timeout=test_constants.SHORT_TIMEOUT) 377 with self.assertRaises(grpc.RpcError) as exception_context: 378 response_future.result() 379 self.assertIsInstance(response_future.exception(), grpc.RpcError) 380 self.assertIs(response_future.exception().code(), 381 grpc.StatusCode.DEADLINE_EXCEEDED) 382 self.assertIs(exception_context.exception.code(), 383 grpc.StatusCode.DEADLINE_EXCEEDED) 384 service.server.stop(None) 385 386 def testStreamingInputCallFutureCancelled(self): 387 service = _CreateService() 388 with service.servicer_methods.pause(): 389 response_future = service.stub.StreamingInputCall.future( 390 _streaming_input_request_iterator()) 391 response_future.cancel() 392 self.assertTrue(response_future.cancelled()) 393 with self.assertRaises(grpc.FutureCancelledError): 394 response_future.result() 395 service.server.stop(None) 396 397 def testStreamingInputCallFutureFailed(self): 398 service = _CreateService() 399 with service.servicer_methods.fail(): 400 response_future = service.stub.StreamingInputCall.future( 401 _streaming_input_request_iterator()) 402 self.assertIsNotNone(response_future.exception()) 403 self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) 404 service.server.stop(None) 405 406 def testFullDuplexCall(self): 407 service = _CreateService() 408 responses = service.stub.FullDuplexCall(_full_duplex_request_iterator()) 409 expected_responses = service.servicer_methods.FullDuplexCall( 410 _full_duplex_request_iterator(), 'not a real RpcContext!') 411 for expected_response, response in moves.zip_longest( 412 expected_responses, responses): 413 self.assertEqual(expected_response, response) 414 service.server.stop(None) 415 416 def testFullDuplexCallExpired(self): 417 request_iterator = _full_duplex_request_iterator() 418 service = _CreateService() 419 with service.servicer_methods.pause(): 420 responses = service.stub.FullDuplexCall( 421 request_iterator, timeout=test_constants.SHORT_TIMEOUT) 422 with self.assertRaises(grpc.RpcError) as exception_context: 423 list(responses) 424 self.assertIs(exception_context.exception.code(), 425 grpc.StatusCode.DEADLINE_EXCEEDED) 426 service.server.stop(None) 427 428 def testFullDuplexCallCancelled(self): 429 service = _CreateService() 430 request_iterator = _full_duplex_request_iterator() 431 responses = service.stub.FullDuplexCall(request_iterator) 432 next(responses) 433 responses.cancel() 434 with self.assertRaises(grpc.RpcError) as exception_context: 435 next(responses) 436 self.assertIs(exception_context.exception.code(), 437 grpc.StatusCode.CANCELLED) 438 service.server.stop(None) 439 440 def testFullDuplexCallFailed(self): 441 request_iterator = _full_duplex_request_iterator() 442 service = _CreateService() 443 with service.servicer_methods.fail(): 444 responses = service.stub.FullDuplexCall(request_iterator) 445 with self.assertRaises(grpc.RpcError) as exception_context: 446 next(responses) 447 self.assertIs(exception_context.exception.code(), 448 grpc.StatusCode.UNKNOWN) 449 service.server.stop(None) 450 451 def testHalfDuplexCall(self): 452 service = _CreateService() 453 454 def half_duplex_request_iterator(): 455 request = request_pb2.StreamingOutputCallRequest() 456 request.response_parameters.add(size=1, interval_us=0) 457 yield request 458 request = request_pb2.StreamingOutputCallRequest() 459 request.response_parameters.add(size=2, interval_us=0) 460 request.response_parameters.add(size=3, interval_us=0) 461 yield request 462 463 responses = service.stub.HalfDuplexCall(half_duplex_request_iterator()) 464 expected_responses = service.servicer_methods.HalfDuplexCall( 465 half_duplex_request_iterator(), 'not a real RpcContext!') 466 for expected_response, response in moves.zip_longest( 467 expected_responses, responses): 468 self.assertEqual(expected_response, response) 469 service.server.stop(None) 470 471 def testHalfDuplexCallWedged(self): 472 condition = threading.Condition() 473 wait_cell = [False] 474 475 @contextlib.contextmanager 476 def wait(): # pylint: disable=invalid-name 477 # Where's Python 3's 'nonlocal' statement when you need it? 478 with condition: 479 wait_cell[0] = True 480 yield 481 with condition: 482 wait_cell[0] = False 483 condition.notify_all() 484 485 def half_duplex_request_iterator(): 486 request = request_pb2.StreamingOutputCallRequest() 487 request.response_parameters.add(size=1, interval_us=0) 488 yield request 489 with condition: 490 while wait_cell[0]: 491 condition.wait() 492 493 service = _CreateService() 494 with wait(): 495 responses = service.stub.HalfDuplexCall( 496 half_duplex_request_iterator(), 497 timeout=test_constants.SHORT_TIMEOUT) 498 # half-duplex waits for the client to send all info 499 with self.assertRaises(grpc.RpcError) as exception_context: 500 next(responses) 501 self.assertIs(exception_context.exception.code(), 502 grpc.StatusCode.DEADLINE_EXCEEDED) 503 service.server.stop(None) 504 505 506@unittest.skipIf(sys.version_info[0] < 3 or sys.version_info[1] < 6, 507 "Unsupported on Python 2.") 508class SimpleStubsPluginTest(unittest.TestCase): 509 servicer_methods = _ServicerMethods() 510 511 class Servicer(service_pb2_grpc.TestServiceServicer): 512 513 def UnaryCall(self, request, context): 514 return SimpleStubsPluginTest.servicer_methods.UnaryCall( 515 request, context) 516 517 def StreamingOutputCall(self, request, context): 518 return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall( 519 request, context) 520 521 def StreamingInputCall(self, request_iterator, context): 522 return SimpleStubsPluginTest.servicer_methods.StreamingInputCall( 523 request_iterator, context) 524 525 def FullDuplexCall(self, request_iterator, context): 526 return SimpleStubsPluginTest.servicer_methods.FullDuplexCall( 527 request_iterator, context) 528 529 def HalfDuplexCall(self, request_iterator, context): 530 return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall( 531 request_iterator, context) 532 533 def setUp(self): 534 super(SimpleStubsPluginTest, self).setUp() 535 self._server = test_common.test_server() 536 service_pb2_grpc.add_TestServiceServicer_to_server( 537 self.Servicer(), self._server) 538 self._port = self._server.add_insecure_port('[::]:0') 539 self._server.start() 540 self._target = 'localhost:{}'.format(self._port) 541 542 def tearDown(self): 543 self._server.stop(None) 544 super(SimpleStubsPluginTest, self).tearDown() 545 546 def testUnaryCall(self): 547 request = request_pb2.SimpleRequest(response_size=13) 548 response = service_pb2_grpc.TestService.UnaryCall( 549 request, 550 self._target, 551 channel_credentials=grpc.experimental.insecure_channel_credentials( 552 ), 553 wait_for_ready=True) 554 expected_response = self.servicer_methods.UnaryCall( 555 request, 'not a real context!') 556 self.assertEqual(expected_response, response) 557 558 def testUnaryCallInsecureSugar(self): 559 request = request_pb2.SimpleRequest(response_size=13) 560 response = service_pb2_grpc.TestService.UnaryCall(request, 561 self._target, 562 insecure=True, 563 wait_for_ready=True) 564 expected_response = self.servicer_methods.UnaryCall( 565 request, 'not a real context!') 566 self.assertEqual(expected_response, response) 567 568 def testStreamingOutputCall(self): 569 request = _streaming_output_request() 570 expected_responses = self.servicer_methods.StreamingOutputCall( 571 request, 'not a real RpcContext!') 572 responses = service_pb2_grpc.TestService.StreamingOutputCall( 573 request, 574 self._target, 575 channel_credentials=grpc.experimental.insecure_channel_credentials( 576 ), 577 wait_for_ready=True) 578 for expected_response, response in moves.zip_longest( 579 expected_responses, responses): 580 self.assertEqual(expected_response, response) 581 582 def testStreamingInputCall(self): 583 response = service_pb2_grpc.TestService.StreamingInputCall( 584 _streaming_input_request_iterator(), 585 self._target, 586 channel_credentials=grpc.experimental.insecure_channel_credentials( 587 ), 588 wait_for_ready=True) 589 expected_response = self.servicer_methods.StreamingInputCall( 590 _streaming_input_request_iterator(), 'not a real RpcContext!') 591 self.assertEqual(expected_response, response) 592 593 def testFullDuplexCall(self): 594 responses = service_pb2_grpc.TestService.FullDuplexCall( 595 _full_duplex_request_iterator(), 596 self._target, 597 channel_credentials=grpc.experimental.insecure_channel_credentials( 598 ), 599 wait_for_ready=True) 600 expected_responses = self.servicer_methods.FullDuplexCall( 601 _full_duplex_request_iterator(), 'not a real RpcContext!') 602 for expected_response, response in moves.zip_longest( 603 expected_responses, responses): 604 self.assertEqual(expected_response, response) 605 606 def testHalfDuplexCall(self): 607 608 def half_duplex_request_iterator(): 609 request = request_pb2.StreamingOutputCallRequest() 610 request.response_parameters.add(size=1, interval_us=0) 611 yield request 612 request = request_pb2.StreamingOutputCallRequest() 613 request.response_parameters.add(size=2, interval_us=0) 614 request.response_parameters.add(size=3, interval_us=0) 615 yield request 616 617 responses = service_pb2_grpc.TestService.HalfDuplexCall( 618 half_duplex_request_iterator(), 619 self._target, 620 channel_credentials=grpc.experimental.insecure_channel_credentials( 621 ), 622 wait_for_ready=True) 623 expected_responses = self.servicer_methods.HalfDuplexCall( 624 half_duplex_request_iterator(), 'not a real RpcContext!') 625 for expected_response, response in moves.zip_longest( 626 expected_responses, responses): 627 self.assertEqual(expected_response, response) 628 629 630class ModuleMainTest(unittest.TestCase): 631 """Test case for running `python -m grpc_tools.protoc`. 632 """ 633 634 def test_clean_output(self): 635 if sys.executable is None: 636 raise unittest.SkipTest( 637 "Running on a interpreter that cannot be invoked from the CLI.") 638 proto_dir_path = os.path.join("src", "proto") 639 test_proto_path = os.path.join(proto_dir_path, "grpc", "testing", 640 "empty.proto") 641 streams = tuple(tempfile.TemporaryFile() for _ in range(2)) 642 work_dir = tempfile.mkdtemp() 643 try: 644 invocation = (sys.executable, "-m", "grpc_tools.protoc", 645 "--proto_path", proto_dir_path, "--python_out", 646 work_dir, "--grpc_python_out", work_dir, 647 test_proto_path) 648 proc = subprocess.Popen(invocation, 649 stdout=streams[0], 650 stderr=streams[1]) 651 proc.wait() 652 outs = [] 653 for stream in streams: 654 stream.seek(0) 655 self.assertEqual(0, len(stream.read())) 656 self.assertEqual(0, proc.returncode) 657 except Exception: # pylint: disable=broad-except 658 shutil.rmtree(work_dir) 659 660 661if __name__ == '__main__': 662 unittest.main(verbosity=2) 663