1import subprocess
2from subprocess import PIPE, STDOUT
3from unittest import TestCase
5from testfixtures.mock import call
6from testfixtures import ShouldRaise, compare, Replacer
8from testfixtures.popen import MockPopen, PopenBehaviour
9from testfixtures.compat import BytesLiteral, PY2
11import signal
14class Tests(TestCase):
16    def test_command_min_args(self):
17        # setup
18        Popen = MockPopen()
19        Popen.set_command('a command')
20        # usage
21        process = Popen('a command', stdout=PIPE, stderr=PIPE)
22        # process started, no return code
23        compare(process.pid, 1234)
24        compare(None, process.returncode)
26        out, err = process.communicate()
28        # test the rest
29        compare(out, b'')
30        compare(err, b'')
31        compare(process.returncode, 0)
32        # test call list
33        compare([
34                call.Popen('a command', stderr=-1, stdout=-1),
35                call.Popen_instance.communicate(),
36                ], Popen.mock.method_calls)
38    def test_command_max_args(self):
40        Popen = MockPopen()
41        Popen.set_command('a command', b'out', b'err', 1, 345)
43        process = Popen('a command', stdout=PIPE, stderr=PIPE)
44        compare(process.pid, 345)
45        compare(None, process.returncode)
47        out, err = process.communicate()
49        # test the rest
50        compare(out, b'out')
51        compare(err, b'err')
52        compare(process.returncode, 1)
53        # test call list
54        compare([
55                call.Popen('a command', stderr=-1, stdout=-1),
56                call.Popen_instance.communicate(),
57                ], Popen.mock.method_calls)
59    def test_callable_default_behaviour(self):
60        def some_callable(command, stdin):
61            return PopenBehaviour(BytesLiteral(command), BytesLiteral(stdin), 1, 345, 0)
63        Popen = MockPopen()
64        Popen.set_default(behaviour=some_callable)
66        process = Popen('a command', stdin='some stdin', stdout=PIPE, stderr=PIPE)
67        compare(process.pid, 345)
69        out, err = process.communicate()
71        compare(out, b'a command')
72        compare(err, b'some stdin')
73        compare(process.returncode, 1)
75    def test_command_is_sequence(self):
76        Popen = MockPopen()
77        Popen.set_command('a command')
79        process = Popen(['a', 'command'], stdout=PIPE, stderr=PIPE)
81        compare(process.wait(), 0)
82        compare([
83                call.Popen(['a', 'command'], stderr=-1, stdout=-1),
84                call.Popen_instance.wait(),
85                ], Popen.mock.method_calls)
87    def test_communicate_with_input(self):
88        # setup
89        Popen = MockPopen()
90        Popen.set_command('a command')
91        # usage
92        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
93        out, err = process.communicate('foo')
94        # test call list
95        compare([
96                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
97                call.Popen_instance.communicate('foo'),
98                ], Popen.mock.method_calls)
100    def test_communicate_with_timeout(self):
101        Popen = MockPopen()
102        Popen.set_command('a command', returncode=3)
103        process = Popen('a command')
104        if PY2:
105            with ShouldRaise(TypeError):
106                process.communicate(timeout=1)
107            with ShouldRaise(TypeError):
108                process.communicate('foo', 1)
109        else:
110            process.communicate(timeout=1)
111            process.communicate('foo', 1)
112            compare([
113                call.Popen('a command'),
114                call.Popen_instance.communicate(timeout=1),
115                call.Popen_instance.communicate('foo', 1),
116            ], expected=Popen.mock.method_calls)
118    def test_read_from_stdout(self):
119        # setup
120        Popen = MockPopen()
121        Popen.set_command('a command', stdout=b'foo')
122        # usage
123        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
124        self.assertTrue(isinstance(process.stdout.fileno(), int))
125        compare(process.stdout.read(), b'foo')
126        # test call list
127        compare([
128                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
129                ], Popen.mock.method_calls)
131    def test_read_from_stderr(self):
132        # setup
133        Popen = MockPopen()
134        Popen.set_command('a command', stderr=b'foo')
135        # usage
136        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
137        self.assertTrue(isinstance(process.stdout.fileno(), int))
138        compare(process.stderr.read(), b'foo')
139        # test call list
140        compare([
141                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
142                ], Popen.mock.method_calls)
144    def test_read_from_stdout_with_stderr_redirected_check_stdout_contents(self):
145        # setup
146        Popen = MockPopen()
147        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
148        # usage
149        process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True)
150        # test stdout contents
151        compare(b'foobar', process.stdout.read())
152        compare(process.stderr, None)
154    def test_read_from_stdout_with_stderr_redirected_check_stdout_stderr_interleaved(self):
155        # setup
156        Popen = MockPopen()
157        Popen.set_command('a command', stdout=b'o1\no2\no3\no4\n', stderr=b'e1\ne2\n')
158        # usage
159        process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True)
160        self.assertTrue(isinstance(process.stdout.fileno(), int))
161        # test stdout contents
162        compare(b'o1\ne1\no2\ne2\no3\no4\n', process.stdout.read())
164    def test_communicate_with_stderr_redirected_check_stderr_is_none(self):
165        # setup
166        Popen = MockPopen()
167        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
168        # usage
169        process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True)
170        out, err = process.communicate()
171        # test stderr is None
172        compare(out, b'foobar')
173        compare(err, None)
175    def test_read_from_stdout_and_stderr(self):
176        # setup
177        Popen = MockPopen()
178        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
179        # usage
180        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
181        compare(process.stdout.read(), b'foo')
182        compare(process.stderr.read(), b'bar')
183        # test call list
184        compare([
185                call.Popen('a command', shell=True, stderr=PIPE, stdout=PIPE),
186                ], Popen.mock.method_calls)
188    def test_communicate_text_mode(self):
189        Popen = MockPopen()
190        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
191        # usage
192        process = Popen('a command', stdout=PIPE, stderr=PIPE, text=True)
193        actual = process.communicate()
194        # check
195        compare(actual, expected=(u'foo', u'bar'))
197    def test_communicate_universal_newlines(self):
198        Popen = MockPopen()
199        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
200        # usage
201        process = Popen('a command', stdout=PIPE, stderr=PIPE, universal_newlines=True)
202        actual = process.communicate()
203        # check
204        compare(actual, expected=(u'foo', u'bar'))
206    def test_communicate_encoding(self):
207        Popen = MockPopen()
208        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
209        # usage
210        process = Popen('a command', stdout=PIPE, stderr=PIPE, encoding='ascii')
211        actual = process.communicate()
212        # check
213        compare(actual, expected=(u'foo', u'bar'))
215    def test_communicate_encoding_with_errors(self):
216        Popen = MockPopen()
217        Popen.set_command('a command', stdout=b'\xa3', stderr=b'\xa3')
218        # usage
219        process = Popen('a command', stdout=PIPE, stderr=PIPE, encoding='ascii', errors='ignore')
220        actual = process.communicate()
221        # check
222        if PY2:
223            compare(actual, expected=(b'\xa3', b'\xa3'))
224        else:
225            compare(actual, expected=(u'', u''))
227    def test_read_from_stdout_and_stderr_text_mode(self):
228        Popen = MockPopen()
229        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
230        # usage
231        process = Popen('a command', stdout=PIPE, stderr=PIPE, text=True)
232        actual = process.stdout.read(), process.stderr.read()
233        # check
234        compare(actual, expected=(u'foo', u'bar'))
236    def test_write_to_stdin(self):
237        # setup
238        Popen = MockPopen()
239        Popen.set_command('a command')
240        # usage
241        process = Popen('a command', stdin=PIPE, shell=True)
242        process.stdin.write('some text')
243        # test call list
244        compare(Popen.mock.method_calls, expected=[
245            call.Popen('a command', shell=True, stdin=PIPE),
246            call.Popen_instance.stdin.write('some text'),
247        ])
248        compare(Popen.all_calls, expected=[
249            call.Popen('a command', shell=True, stdin=PIPE),
250            call.Popen('a command', shell=True, stdin=PIPE).stdin.write('some text'),
251        ])
252        compare(process.mock.method_calls, expected=[
253            call.stdin.write('some text'),
254        ])
255        compare(process.calls, expected=[
256            call.stdin.write('some text'),
257        ])
258        repr(call.stdin.write('some text'))
260    def test_wait_and_return_code(self):
261        # setup
262        Popen = MockPopen()
263        Popen.set_command('a command', returncode=3)
264        # usage
265        process = Popen('a command')
266        compare(process.returncode, None)
267        # result checking
268        compare(process.wait(), 3)
269        compare(process.returncode, 3)
270        # test call list
271        compare([
272                call.Popen('a command'),
273                call.Popen_instance.wait(),
274                ], Popen.mock.method_calls)
276    def test_wait_timeout(self):
277        Popen = MockPopen()
278        Popen.set_command('a command', returncode=3)
279        process = Popen('a command')
280        if PY2:
281            with ShouldRaise(TypeError):
282                process.wait(timeout=1)
283            with ShouldRaise(TypeError):
284                process.wait(1)
285        else:
286            process.wait(timeout=1)
287            process.wait(1)
288            compare([
289                call.Popen('a command'),
290                call.Popen_instance.wait(timeout=1),
291                call.Popen_instance.wait(1)
292            ], expected=Popen.mock.method_calls)
294    def test_multiple_uses(self):
295        Popen = MockPopen()
296        Popen.set_command('a command', b'a')
297        Popen.set_command('b command', b'b')
298        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
299        out, err = process.communicate('foo')
300        compare(out, b'a')
301        process = Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True)
302        out, err = process.communicate('foo')
303        compare(out, b'b')
304        compare([
305                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
306                call.Popen_instance.communicate('foo'),
307                call.Popen(['b', 'command'], shell=True, stderr=-1, stdout=-1),
308                call.Popen_instance.communicate('foo'),
309                ], Popen.mock.method_calls)
311    def test_send_signal(self):
312        # setup
313        Popen = MockPopen()
314        Popen.set_command('a command')
315        # usage
316        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
317        process.send_signal(0)
318        # result checking
319        compare([
320                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
321                call.Popen_instance.send_signal(0),
322                ], Popen.mock.method_calls)
324    def test_terminate(self):
325        # setup
326        Popen = MockPopen()
327        Popen.set_command('a command')
328        # usage
329        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
330        process.terminate()
331        # result checking
332        compare([
333                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
334                call.Popen_instance.terminate(),
335                ], Popen.mock.method_calls)
337    def test_kill(self):
338        # setup
339        Popen = MockPopen()
340        Popen.set_command('a command')
341        # usage
342        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
343        process.kill()
344        # result checking
345        compare([
346                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
347                call.Popen_instance.kill(),
348                ], Popen.mock.method_calls)
350    def test_all_signals(self):
351        # setup
352        Popen = MockPopen()
353        Popen.set_command('a command')
354        # usage
355        process = Popen('a command')
356        process.send_signal(signal.SIGINT)
357        process.terminate()
358        process.kill()
359        # test call list
360        compare([
361                call.Popen('a command'),
362                call.Popen_instance.send_signal(signal.SIGINT),
363                call.Popen_instance.terminate(),
364                call.Popen_instance.kill(),
365                ], Popen.mock.method_calls)
367    def test_poll_no_setup(self):
368        # setup
369        Popen = MockPopen()
370        Popen.set_command('a command')
371        # usage
372        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
373        compare(process.poll(), None)
374        compare(process.poll(), None)
375        compare(process.wait(), 0)
376        compare(process.poll(), 0)
377        # result checking
378        compare([
379                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
380                call.Popen_instance.poll(),
381                call.Popen_instance.poll(),
382                call.Popen_instance.wait(),
383                call.Popen_instance.poll(),
384                ], Popen.mock.method_calls)
386    def test_poll_setup(self):
387        # setup
388        Popen = MockPopen()
389        Popen.set_command('a command', poll_count=1)
390        # usage
391        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
392        compare(process.poll(), None)
393        compare(process.poll(), 0)
394        compare(process.wait(), 0)
395        compare(process.poll(), 0)
396        # result checking
397        compare([
398                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
399                call.Popen_instance.poll(),
400                call.Popen_instance.poll(),
401                call.Popen_instance.wait(),
402                call.Popen_instance.poll(),
403                ], Popen.mock.method_calls)
405    def test_poll_until_result(self):
406        # setup
407        Popen = MockPopen()
408        Popen.set_command('a command', returncode=3, poll_count=2)
409        # example usage
410        process = Popen('a command')
411        while process.poll() is None:
412            # you'd probably have a sleep here, or go off and
413            # do some other work.
414            pass
415        # result checking
416        compare(process.returncode, 3)
417        compare([
418                call.Popen('a command'),
419                call.Popen_instance.poll(),
420                call.Popen_instance.poll(),
421                call.Popen_instance.poll(),
422                ], Popen.mock.method_calls)
424    def test_command_not_specified(self):
425        Popen = MockPopen()
426        with ShouldRaise(KeyError(
427            "Nothing specified for command 'a command'"
428        )):
429            Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
431    def test_default_command_min_args(self):
432        # setup
433        Popen = MockPopen()
434        Popen.set_default()
435        # usage
436        process = Popen('a command', stdout=PIPE, stderr=PIPE)
437        # process started, no return code
438        compare(process.pid, 1234)
439        compare(None, process.returncode)
441        out, err = process.communicate()
443        # test the rest
444        compare(out, b'')
445        compare(err, b'')
446        compare(process.returncode, 0)
447        # test call list
448        compare([
449            call.Popen('a command', stderr=-1, stdout=-1),
450            call.Popen_instance.communicate(),
451        ], Popen.mock.method_calls)
453    def test_default_command_max_args(self):
454        Popen = MockPopen()
455        Popen.set_default(b'out', b'err', 1, 345)
457        process = Popen('a command', stdout=PIPE, stderr=PIPE)
458        compare(process.pid, 345)
459        compare(None, process.returncode)
461        out, err = process.communicate()
463        # test the rest
464        compare(out, b'out')
465        compare(err, b'err')
466        compare(process.returncode, 1)
467        # test call list
468        compare([
469            call.Popen('a command', stderr=-1, stdout=-1),
470            call.Popen_instance.communicate(),
471        ], Popen.mock.method_calls)
473    def test_invalid_parameters(self):
474        Popen = MockPopen()
475        with ShouldRaise(TypeError(
476                "__init__() got an unexpected keyword argument 'foo'"
477        )):
478            Popen(foo='bar')
480    def test_invalid_method_or_attr(self):
481        Popen = MockPopen()
482        Popen.set_command('command')
483        process = Popen('command')
484        with ShouldRaise(AttributeError):
485            process.foo()
487    def test_invalid_attribute(self):
488        Popen = MockPopen()
489        Popen.set_command('command')
490        process = Popen('command')
491        with ShouldRaise(AttributeError):
492            process.foo
494    def test_invalid_communicate_call(self):
495        Popen = MockPopen()
496        Popen.set_command('bar')
497        process = Popen('bar')
498        with ShouldRaise(TypeError(
499                "communicate() got an unexpected keyword argument 'foo'"
500        )):
501            process.communicate(foo='bar')
503    def test_invalid_wait_call(self):
504        Popen = MockPopen()
505        Popen.set_command('bar')
506        process = Popen('bar')
507        with ShouldRaise(TypeError(
508                "wait() got an unexpected keyword argument 'foo'"
509        )):
510            process.wait(foo='bar')
512    def test_invalid_send_signal(self):
513        Popen = MockPopen()
514        Popen.set_command('bar')
515        process = Popen('bar')
516        with ShouldRaise(TypeError(
517                "send_signal() got an unexpected keyword argument 'foo'"
518        )):
519            process.send_signal(foo='bar')
521    def test_invalid_terminate(self):
522        Popen = MockPopen()
523        Popen.set_command('bar')
524        process = Popen('bar')
525        with ShouldRaise(TypeError(
526                "terminate() got an unexpected keyword argument 'foo'"
527        )):
528            process.terminate(foo='bar')
530    def test_invalid_kill(self):
531        Popen = MockPopen()
532        Popen.set_command('bar')
533        process = Popen('bar')
534        if PY2:
535            text = 'kill() takes exactly 1 argument (2 given)'
536        else:
537            text = 'kill() takes 1 positional argument but 2 were given'
538        with ShouldRaise(TypeError(text)):
539            process.kill('moo')
541    def test_invalid_poll(self):
542        Popen = MockPopen()
543        Popen.set_command('bar')
544        process = Popen('bar')
545        if PY2:
546            text = 'poll() takes exactly 1 argument (2 given)'
547        else:
548            text = 'poll() takes 1 positional argument but 2 were given'
549        with ShouldRaise(TypeError(text)):
550            process.poll('moo')
552    def test_non_pipe(self):
553        # setup
554        Popen = MockPopen()
555        Popen.set_command('a command')
556        # usage
557        process = Popen('a command')
558        # checks
559        compare(process.stdout, expected=None)
560        compare(process.stderr, expected=None)
561        out, err = process.communicate()
562        # test the rest
563        compare(out, expected=None)
564        compare(err, expected=None)
565        # test call list
566        compare([
567                call.Popen('a command'),
568                call.Popen_instance.communicate(),
569                ], Popen.mock.method_calls)
571    def test_use_as_context_manager(self):
572        # setup
573        Popen = MockPopen()
574        Popen.set_command('a command')
575        if PY2:
577            process = Popen('a command')
578            with ShouldRaise(AttributeError):
579                process.__enter__
580            with ShouldRaise(AttributeError):
581                process.__exit__
583        else:
585            # usage
586            with Popen('a command', stdout=PIPE, stderr=PIPE) as process:
587                # process started, no return code
588                compare(process.pid, 1234)
589                compare(None, process.returncode)
591                out, err = process.communicate()
593            # test the rest
594            compare(out, b'')
595            compare(err, b'')
596            compare(process.returncode, 0)
598            compare(process.stdout.closed, expected=True)
599            compare(process.stderr.closed, expected=True)
601            # test call list
602            compare([
603                call.Popen('a command', stderr=-1, stdout=-1),
604                call.Popen_instance.communicate(),
605                call.Popen_instance.wait(),
606            ], Popen.mock.method_calls)
608    def test_start_new_session(self):
609        # setup
610        Popen = MockPopen()
611        Popen.set_command('a command')
612        # usage
613        Popen('a command', start_new_session=True)
614        # test call list
615        compare([
616            call.Popen('a command', start_new_session=True),
617        ], Popen.mock.method_calls)
619    def test_simultaneous_processes(self):
620        Popen = MockPopen()
621        Popen.set_command('a command', b'a', returncode=1)
622        Popen.set_command('b command', b'b', returncode=2)
623        process_a = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
624        process_b = Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True)
625        compare(process_a.wait(), expected=1)
626        compare(process_b.wait(), expected=2)
627        a_call = call.Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
628        b_call = call.Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True)
629        compare(Popen.all_calls, expected=[
630                a_call,
631                b_call,
632                a_call.wait(),
633                b_call.wait(),
634        ])
635        compare(process_a.mock.method_calls, expected=[
636            call.wait()
637        ])
638        compare(process_b.mock.method_calls, expected=[
639            call.wait()
640        ])
642    def test_pass_executable(self):
643        Popen = MockPopen()
644        Popen.set_command('a command', b'a', returncode=1)
645        Popen('a command', executable='/foo/bar')
646        compare(Popen.all_calls, expected=[
647            call.Popen('a command', executable='/foo/bar')
648        ])
650    def test_set_command_with_list(self):
651        Popen = MockPopen()
652        Popen.set_command(['a', 'command'])
653        Popen(['a', 'command'], stdout=PIPE, stderr=PIPE)
654        compare([call.Popen(['a',  'command'], stderr=-1, stdout=-1)],
655                actual=Popen.all_calls)
658class IntegrationTests(TestCase):
660    def setUp(self):
661        self.popen = MockPopen()
662        replacer = Replacer()
663        replacer.replace('testfixtures.tests.test_popen.subprocess.Popen', self.popen)
664        self.addCleanup(replacer.restore)
666    def test_command_called_with_check_call_check_returncode(self):
667        self.popen.set_command('ls')
668        compare(0, subprocess.check_call(['ls']))
670    def test_command_called_with_check_output_check_stdout_returned(self):
671        self.popen.set_command('ls', stdout=b'abc')
672        compare(b'abc', subprocess.check_output(['ls']))
674    def test_command_called_with_check_output_stderr_to_stdout_check_returned(self):
675        self.popen.set_command('ls', stderr=b'xyz')
676        compare(b'xyz', subprocess.check_output(['ls'], stderr=STDOUT))
678    def test_command_called_with_check_call_failing_command_check_exception(self):
679        self.popen.set_command('ls', returncode=1)
680        with self.assertRaises(subprocess.CalledProcessError):
681            subprocess.check_output(['ls'])