1import subprocess 2from subprocess import PIPE, STDOUT 3from unittest import TestCase 4 5from testfixtures.mock import call 6from testfixtures import ShouldRaise, compare, Replacer 7 8from testfixtures.popen import MockPopen, PopenBehaviour 9from testfixtures.compat import BytesLiteral, PY2 10 11import signal 12 13 14class Tests(TestCase): 15 16 def test_command_min_args(self): 17 # setup 18 Popen = MockPopen() 19 Popen.set_command('a command') 20 # usage 21 process = Popen('a command', stdout=PIPE, stderr=PIPE) 22 # process started, no return code 23 compare(process.pid, 1234) 24 compare(None, process.returncode) 25 26 out, err = process.communicate() 27 28 # test the rest 29 compare(out, b'') 30 compare(err, b'') 31 compare(process.returncode, 0) 32 # test call list 33 compare([ 34 call.Popen('a command', stderr=-1, stdout=-1), 35 call.Popen_instance.communicate(), 36 ], Popen.mock.method_calls) 37 38 def test_command_max_args(self): 39 40 Popen = MockPopen() 41 Popen.set_command('a command', b'out', b'err', 1, 345) 42 43 process = Popen('a command', stdout=PIPE, stderr=PIPE) 44 compare(process.pid, 345) 45 compare(None, process.returncode) 46 47 out, err = process.communicate() 48 49 # test the rest 50 compare(out, b'out') 51 compare(err, b'err') 52 compare(process.returncode, 1) 53 # test call list 54 compare([ 55 call.Popen('a command', stderr=-1, stdout=-1), 56 call.Popen_instance.communicate(), 57 ], Popen.mock.method_calls) 58 59 def test_callable_default_behaviour(self): 60 def some_callable(command, stdin): 61 return PopenBehaviour(BytesLiteral(command), BytesLiteral(stdin), 1, 345, 0) 62 63 Popen = MockPopen() 64 Popen.set_default(behaviour=some_callable) 65 66 process = Popen('a command', stdin='some stdin', stdout=PIPE, stderr=PIPE) 67 compare(process.pid, 345) 68 69 out, err = process.communicate() 70 71 compare(out, b'a command') 72 compare(err, b'some stdin') 73 compare(process.returncode, 1) 74 75 def test_command_is_sequence(self): 76 Popen = MockPopen() 77 Popen.set_command('a command') 78 79 process = Popen(['a', 'command'], stdout=PIPE, stderr=PIPE) 80 81 compare(process.wait(), 0) 82 compare([ 83 call.Popen(['a', 'command'], stderr=-1, stdout=-1), 84 call.Popen_instance.wait(), 85 ], Popen.mock.method_calls) 86 87 def test_communicate_with_input(self): 88 # setup 89 Popen = MockPopen() 90 Popen.set_command('a command') 91 # usage 92 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 93 out, err = process.communicate('foo') 94 # test call list 95 compare([ 96 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 97 call.Popen_instance.communicate('foo'), 98 ], Popen.mock.method_calls) 99 100 def test_communicate_with_timeout(self): 101 Popen = MockPopen() 102 Popen.set_command('a command', returncode=3) 103 process = Popen('a command') 104 if PY2: 105 with ShouldRaise(TypeError): 106 process.communicate(timeout=1) 107 with ShouldRaise(TypeError): 108 process.communicate('foo', 1) 109 else: 110 process.communicate(timeout=1) 111 process.communicate('foo', 1) 112 compare([ 113 call.Popen('a command'), 114 call.Popen_instance.communicate(timeout=1), 115 call.Popen_instance.communicate('foo', 1), 116 ], expected=Popen.mock.method_calls) 117 118 def test_read_from_stdout(self): 119 # setup 120 Popen = MockPopen() 121 Popen.set_command('a command', stdout=b'foo') 122 # usage 123 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 124 self.assertTrue(isinstance(process.stdout.fileno(), int)) 125 compare(process.stdout.read(), b'foo') 126 # test call list 127 compare([ 128 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 129 ], Popen.mock.method_calls) 130 131 def test_read_from_stderr(self): 132 # setup 133 Popen = MockPopen() 134 Popen.set_command('a command', stderr=b'foo') 135 # usage 136 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 137 self.assertTrue(isinstance(process.stdout.fileno(), int)) 138 compare(process.stderr.read(), b'foo') 139 # test call list 140 compare([ 141 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 142 ], Popen.mock.method_calls) 143 144 def test_read_from_stdout_with_stderr_redirected_check_stdout_contents(self): 145 # setup 146 Popen = MockPopen() 147 Popen.set_command('a command', stdout=b'foo', stderr=b'bar') 148 # usage 149 process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True) 150 # test stdout contents 151 compare(b'foobar', process.stdout.read()) 152 compare(process.stderr, None) 153 154 def test_read_from_stdout_with_stderr_redirected_check_stdout_stderr_interleaved(self): 155 # setup 156 Popen = MockPopen() 157 Popen.set_command('a command', stdout=b'o1\no2\no3\no4\n', stderr=b'e1\ne2\n') 158 # usage 159 process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True) 160 self.assertTrue(isinstance(process.stdout.fileno(), int)) 161 # test stdout contents 162 compare(b'o1\ne1\no2\ne2\no3\no4\n', process.stdout.read()) 163 164 def test_communicate_with_stderr_redirected_check_stderr_is_none(self): 165 # setup 166 Popen = MockPopen() 167 Popen.set_command('a command', stdout=b'foo', stderr=b'bar') 168 # usage 169 process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True) 170 out, err = process.communicate() 171 # test stderr is None 172 compare(out, b'foobar') 173 compare(err, None) 174 175 def test_read_from_stdout_and_stderr(self): 176 # setup 177 Popen = MockPopen() 178 Popen.set_command('a command', stdout=b'foo', stderr=b'bar') 179 # usage 180 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 181 compare(process.stdout.read(), b'foo') 182 compare(process.stderr.read(), b'bar') 183 # test call list 184 compare([ 185 call.Popen('a command', shell=True, stderr=PIPE, stdout=PIPE), 186 ], Popen.mock.method_calls) 187 188 def test_communicate_text_mode(self): 189 Popen = MockPopen() 190 Popen.set_command('a command', stdout=b'foo', stderr=b'bar') 191 # usage 192 process = Popen('a command', stdout=PIPE, stderr=PIPE, text=True) 193 actual = process.communicate() 194 # check 195 compare(actual, expected=(u'foo', u'bar')) 196 197 def test_communicate_universal_newlines(self): 198 Popen = MockPopen() 199 Popen.set_command('a command', stdout=b'foo', stderr=b'bar') 200 # usage 201 process = Popen('a command', stdout=PIPE, stderr=PIPE, universal_newlines=True) 202 actual = process.communicate() 203 # check 204 compare(actual, expected=(u'foo', u'bar')) 205 206 def test_communicate_encoding(self): 207 Popen = MockPopen() 208 Popen.set_command('a command', stdout=b'foo', stderr=b'bar') 209 # usage 210 process = Popen('a command', stdout=PIPE, stderr=PIPE, encoding='ascii') 211 actual = process.communicate() 212 # check 213 compare(actual, expected=(u'foo', u'bar')) 214 215 def test_communicate_encoding_with_errors(self): 216 Popen = MockPopen() 217 Popen.set_command('a command', stdout=b'\xa3', stderr=b'\xa3') 218 # usage 219 process = Popen('a command', stdout=PIPE, stderr=PIPE, encoding='ascii', errors='ignore') 220 actual = process.communicate() 221 # check 222 if PY2: 223 compare(actual, expected=(b'\xa3', b'\xa3')) 224 else: 225 compare(actual, expected=(u'', u'')) 226 227 def test_read_from_stdout_and_stderr_text_mode(self): 228 Popen = MockPopen() 229 Popen.set_command('a command', stdout=b'foo', stderr=b'bar') 230 # usage 231 process = Popen('a command', stdout=PIPE, stderr=PIPE, text=True) 232 actual = process.stdout.read(), process.stderr.read() 233 # check 234 compare(actual, expected=(u'foo', u'bar')) 235 236 def test_write_to_stdin(self): 237 # setup 238 Popen = MockPopen() 239 Popen.set_command('a command') 240 # usage 241 process = Popen('a command', stdin=PIPE, shell=True) 242 process.stdin.write('some text') 243 # test call list 244 compare(Popen.mock.method_calls, expected=[ 245 call.Popen('a command', shell=True, stdin=PIPE), 246 call.Popen_instance.stdin.write('some text'), 247 ]) 248 compare(Popen.all_calls, expected=[ 249 call.Popen('a command', shell=True, stdin=PIPE), 250 call.Popen('a command', shell=True, stdin=PIPE).stdin.write('some text'), 251 ]) 252 compare(process.mock.method_calls, expected=[ 253 call.stdin.write('some text'), 254 ]) 255 compare(process.calls, expected=[ 256 call.stdin.write('some text'), 257 ]) 258 repr(call.stdin.write('some text')) 259 260 def test_wait_and_return_code(self): 261 # setup 262 Popen = MockPopen() 263 Popen.set_command('a command', returncode=3) 264 # usage 265 process = Popen('a command') 266 compare(process.returncode, None) 267 # result checking 268 compare(process.wait(), 3) 269 compare(process.returncode, 3) 270 # test call list 271 compare([ 272 call.Popen('a command'), 273 call.Popen_instance.wait(), 274 ], Popen.mock.method_calls) 275 276 def test_wait_timeout(self): 277 Popen = MockPopen() 278 Popen.set_command('a command', returncode=3) 279 process = Popen('a command') 280 if PY2: 281 with ShouldRaise(TypeError): 282 process.wait(timeout=1) 283 with ShouldRaise(TypeError): 284 process.wait(1) 285 else: 286 process.wait(timeout=1) 287 process.wait(1) 288 compare([ 289 call.Popen('a command'), 290 call.Popen_instance.wait(timeout=1), 291 call.Popen_instance.wait(1) 292 ], expected=Popen.mock.method_calls) 293 294 def test_multiple_uses(self): 295 Popen = MockPopen() 296 Popen.set_command('a command', b'a') 297 Popen.set_command('b command', b'b') 298 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 299 out, err = process.communicate('foo') 300 compare(out, b'a') 301 process = Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True) 302 out, err = process.communicate('foo') 303 compare(out, b'b') 304 compare([ 305 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 306 call.Popen_instance.communicate('foo'), 307 call.Popen(['b', 'command'], shell=True, stderr=-1, stdout=-1), 308 call.Popen_instance.communicate('foo'), 309 ], Popen.mock.method_calls) 310 311 def test_send_signal(self): 312 # setup 313 Popen = MockPopen() 314 Popen.set_command('a command') 315 # usage 316 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 317 process.send_signal(0) 318 # result checking 319 compare([ 320 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 321 call.Popen_instance.send_signal(0), 322 ], Popen.mock.method_calls) 323 324 def test_terminate(self): 325 # setup 326 Popen = MockPopen() 327 Popen.set_command('a command') 328 # usage 329 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 330 process.terminate() 331 # result checking 332 compare([ 333 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 334 call.Popen_instance.terminate(), 335 ], Popen.mock.method_calls) 336 337 def test_kill(self): 338 # setup 339 Popen = MockPopen() 340 Popen.set_command('a command') 341 # usage 342 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 343 process.kill() 344 # result checking 345 compare([ 346 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 347 call.Popen_instance.kill(), 348 ], Popen.mock.method_calls) 349 350 def test_all_signals(self): 351 # setup 352 Popen = MockPopen() 353 Popen.set_command('a command') 354 # usage 355 process = Popen('a command') 356 process.send_signal(signal.SIGINT) 357 process.terminate() 358 process.kill() 359 # test call list 360 compare([ 361 call.Popen('a command'), 362 call.Popen_instance.send_signal(signal.SIGINT), 363 call.Popen_instance.terminate(), 364 call.Popen_instance.kill(), 365 ], Popen.mock.method_calls) 366 367 def test_poll_no_setup(self): 368 # setup 369 Popen = MockPopen() 370 Popen.set_command('a command') 371 # usage 372 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 373 compare(process.poll(), None) 374 compare(process.poll(), None) 375 compare(process.wait(), 0) 376 compare(process.poll(), 0) 377 # result checking 378 compare([ 379 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 380 call.Popen_instance.poll(), 381 call.Popen_instance.poll(), 382 call.Popen_instance.wait(), 383 call.Popen_instance.poll(), 384 ], Popen.mock.method_calls) 385 386 def test_poll_setup(self): 387 # setup 388 Popen = MockPopen() 389 Popen.set_command('a command', poll_count=1) 390 # usage 391 process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 392 compare(process.poll(), None) 393 compare(process.poll(), 0) 394 compare(process.wait(), 0) 395 compare(process.poll(), 0) 396 # result checking 397 compare([ 398 call.Popen('a command', shell=True, stderr=-1, stdout=-1), 399 call.Popen_instance.poll(), 400 call.Popen_instance.poll(), 401 call.Popen_instance.wait(), 402 call.Popen_instance.poll(), 403 ], Popen.mock.method_calls) 404 405 def test_poll_until_result(self): 406 # setup 407 Popen = MockPopen() 408 Popen.set_command('a command', returncode=3, poll_count=2) 409 # example usage 410 process = Popen('a command') 411 while process.poll() is None: 412 # you'd probably have a sleep here, or go off and 413 # do some other work. 414 pass 415 # result checking 416 compare(process.returncode, 3) 417 compare([ 418 call.Popen('a command'), 419 call.Popen_instance.poll(), 420 call.Popen_instance.poll(), 421 call.Popen_instance.poll(), 422 ], Popen.mock.method_calls) 423 424 def test_command_not_specified(self): 425 Popen = MockPopen() 426 with ShouldRaise(KeyError( 427 "Nothing specified for command 'a command'" 428 )): 429 Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 430 431 def test_default_command_min_args(self): 432 # setup 433 Popen = MockPopen() 434 Popen.set_default() 435 # usage 436 process = Popen('a command', stdout=PIPE, stderr=PIPE) 437 # process started, no return code 438 compare(process.pid, 1234) 439 compare(None, process.returncode) 440 441 out, err = process.communicate() 442 443 # test the rest 444 compare(out, b'') 445 compare(err, b'') 446 compare(process.returncode, 0) 447 # test call list 448 compare([ 449 call.Popen('a command', stderr=-1, stdout=-1), 450 call.Popen_instance.communicate(), 451 ], Popen.mock.method_calls) 452 453 def test_default_command_max_args(self): 454 Popen = MockPopen() 455 Popen.set_default(b'out', b'err', 1, 345) 456 457 process = Popen('a command', stdout=PIPE, stderr=PIPE) 458 compare(process.pid, 345) 459 compare(None, process.returncode) 460 461 out, err = process.communicate() 462 463 # test the rest 464 compare(out, b'out') 465 compare(err, b'err') 466 compare(process.returncode, 1) 467 # test call list 468 compare([ 469 call.Popen('a command', stderr=-1, stdout=-1), 470 call.Popen_instance.communicate(), 471 ], Popen.mock.method_calls) 472 473 def test_invalid_parameters(self): 474 Popen = MockPopen() 475 with ShouldRaise(TypeError( 476 "__init__() got an unexpected keyword argument 'foo'" 477 )): 478 Popen(foo='bar') 479 480 def test_invalid_method_or_attr(self): 481 Popen = MockPopen() 482 Popen.set_command('command') 483 process = Popen('command') 484 with ShouldRaise(AttributeError): 485 process.foo() 486 487 def test_invalid_attribute(self): 488 Popen = MockPopen() 489 Popen.set_command('command') 490 process = Popen('command') 491 with ShouldRaise(AttributeError): 492 process.foo 493 494 def test_invalid_communicate_call(self): 495 Popen = MockPopen() 496 Popen.set_command('bar') 497 process = Popen('bar') 498 with ShouldRaise(TypeError( 499 "communicate() got an unexpected keyword argument 'foo'" 500 )): 501 process.communicate(foo='bar') 502 503 def test_invalid_wait_call(self): 504 Popen = MockPopen() 505 Popen.set_command('bar') 506 process = Popen('bar') 507 with ShouldRaise(TypeError( 508 "wait() got an unexpected keyword argument 'foo'" 509 )): 510 process.wait(foo='bar') 511 512 def test_invalid_send_signal(self): 513 Popen = MockPopen() 514 Popen.set_command('bar') 515 process = Popen('bar') 516 with ShouldRaise(TypeError( 517 "send_signal() got an unexpected keyword argument 'foo'" 518 )): 519 process.send_signal(foo='bar') 520 521 def test_invalid_terminate(self): 522 Popen = MockPopen() 523 Popen.set_command('bar') 524 process = Popen('bar') 525 with ShouldRaise(TypeError( 526 "terminate() got an unexpected keyword argument 'foo'" 527 )): 528 process.terminate(foo='bar') 529 530 def test_invalid_kill(self): 531 Popen = MockPopen() 532 Popen.set_command('bar') 533 process = Popen('bar') 534 if PY2: 535 text = 'kill() takes exactly 1 argument (2 given)' 536 else: 537 text = 'kill() takes 1 positional argument but 2 were given' 538 with ShouldRaise(TypeError(text)): 539 process.kill('moo') 540 541 def test_invalid_poll(self): 542 Popen = MockPopen() 543 Popen.set_command('bar') 544 process = Popen('bar') 545 if PY2: 546 text = 'poll() takes exactly 1 argument (2 given)' 547 else: 548 text = 'poll() takes 1 positional argument but 2 were given' 549 with ShouldRaise(TypeError(text)): 550 process.poll('moo') 551 552 def test_non_pipe(self): 553 # setup 554 Popen = MockPopen() 555 Popen.set_command('a command') 556 # usage 557 process = Popen('a command') 558 # checks 559 compare(process.stdout, expected=None) 560 compare(process.stderr, expected=None) 561 out, err = process.communicate() 562 # test the rest 563 compare(out, expected=None) 564 compare(err, expected=None) 565 # test call list 566 compare([ 567 call.Popen('a command'), 568 call.Popen_instance.communicate(), 569 ], Popen.mock.method_calls) 570 571 def test_use_as_context_manager(self): 572 # setup 573 Popen = MockPopen() 574 Popen.set_command('a command') 575 if PY2: 576 577 process = Popen('a command') 578 with ShouldRaise(AttributeError): 579 process.__enter__ 580 with ShouldRaise(AttributeError): 581 process.__exit__ 582 583 else: 584 585 # usage 586 with Popen('a command', stdout=PIPE, stderr=PIPE) as process: 587 # process started, no return code 588 compare(process.pid, 1234) 589 compare(None, process.returncode) 590 591 out, err = process.communicate() 592 593 # test the rest 594 compare(out, b'') 595 compare(err, b'') 596 compare(process.returncode, 0) 597 598 compare(process.stdout.closed, expected=True) 599 compare(process.stderr.closed, expected=True) 600 601 # test call list 602 compare([ 603 call.Popen('a command', stderr=-1, stdout=-1), 604 call.Popen_instance.communicate(), 605 call.Popen_instance.wait(), 606 ], Popen.mock.method_calls) 607 608 def test_start_new_session(self): 609 # setup 610 Popen = MockPopen() 611 Popen.set_command('a command') 612 # usage 613 Popen('a command', start_new_session=True) 614 # test call list 615 compare([ 616 call.Popen('a command', start_new_session=True), 617 ], Popen.mock.method_calls) 618 619 def test_simultaneous_processes(self): 620 Popen = MockPopen() 621 Popen.set_command('a command', b'a', returncode=1) 622 Popen.set_command('b command', b'b', returncode=2) 623 process_a = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 624 process_b = Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True) 625 compare(process_a.wait(), expected=1) 626 compare(process_b.wait(), expected=2) 627 a_call = call.Popen('a command', stdout=PIPE, stderr=PIPE, shell=True) 628 b_call = call.Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True) 629 compare(Popen.all_calls, expected=[ 630 a_call, 631 b_call, 632 a_call.wait(), 633 b_call.wait(), 634 ]) 635 compare(process_a.mock.method_calls, expected=[ 636 call.wait() 637 ]) 638 compare(process_b.mock.method_calls, expected=[ 639 call.wait() 640 ]) 641 642 def test_pass_executable(self): 643 Popen = MockPopen() 644 Popen.set_command('a command', b'a', returncode=1) 645 Popen('a command', executable='/foo/bar') 646 compare(Popen.all_calls, expected=[ 647 call.Popen('a command', executable='/foo/bar') 648 ]) 649 650 def test_set_command_with_list(self): 651 Popen = MockPopen() 652 Popen.set_command(['a', 'command']) 653 Popen(['a', 'command'], stdout=PIPE, stderr=PIPE) 654 compare([call.Popen(['a', 'command'], stderr=-1, stdout=-1)], 655 actual=Popen.all_calls) 656 657 658class IntegrationTests(TestCase): 659 660 def setUp(self): 661 self.popen = MockPopen() 662 replacer = Replacer() 663 replacer.replace('testfixtures.tests.test_popen.subprocess.Popen', self.popen) 664 self.addCleanup(replacer.restore) 665 666 def test_command_called_with_check_call_check_returncode(self): 667 self.popen.set_command('ls') 668 compare(0, subprocess.check_call(['ls'])) 669 670 def test_command_called_with_check_output_check_stdout_returned(self): 671 self.popen.set_command('ls', stdout=b'abc') 672 compare(b'abc', subprocess.check_output(['ls'])) 673 674 def test_command_called_with_check_output_stderr_to_stdout_check_returned(self): 675 self.popen.set_command('ls', stderr=b'xyz') 676 compare(b'xyz', subprocess.check_output(['ls'], stderr=STDOUT)) 677 678 def test_command_called_with_check_call_failing_command_check_exception(self): 679 self.popen.set_command('ls', returncode=1) 680 with self.assertRaises(subprocess.CalledProcessError): 681 subprocess.check_output(['ls']) 682