1import subprocess
2from subprocess import PIPE, STDOUT
3from unittest import TestCase
4
5from testfixtures.mock import call
6from testfixtures import ShouldRaise, compare, Replacer
7
8from testfixtures.popen import MockPopen, PopenBehaviour
9from testfixtures.compat import BytesLiteral, PY2
10
11import signal
12
13
14class Tests(TestCase):
15
16    def test_command_min_args(self):
17        # setup
18        Popen = MockPopen()
19        Popen.set_command('a command')
20        # usage
21        process = Popen('a command', stdout=PIPE, stderr=PIPE)
22        # process started, no return code
23        compare(process.pid, 1234)
24        compare(None, process.returncode)
25
26        out, err = process.communicate()
27
28        # test the rest
29        compare(out, b'')
30        compare(err, b'')
31        compare(process.returncode, 0)
32        # test call list
33        compare([
34                call.Popen('a command', stderr=-1, stdout=-1),
35                call.Popen_instance.communicate(),
36                ], Popen.mock.method_calls)
37
38    def test_command_max_args(self):
39
40        Popen = MockPopen()
41        Popen.set_command('a command', b'out', b'err', 1, 345)
42
43        process = Popen('a command', stdout=PIPE, stderr=PIPE)
44        compare(process.pid, 345)
45        compare(None, process.returncode)
46
47        out, err = process.communicate()
48
49        # test the rest
50        compare(out, b'out')
51        compare(err, b'err')
52        compare(process.returncode, 1)
53        # test call list
54        compare([
55                call.Popen('a command', stderr=-1, stdout=-1),
56                call.Popen_instance.communicate(),
57                ], Popen.mock.method_calls)
58
59    def test_callable_default_behaviour(self):
60        def some_callable(command, stdin):
61            return PopenBehaviour(BytesLiteral(command), BytesLiteral(stdin), 1, 345, 0)
62
63        Popen = MockPopen()
64        Popen.set_default(behaviour=some_callable)
65
66        process = Popen('a command', stdin='some stdin', stdout=PIPE, stderr=PIPE)
67        compare(process.pid, 345)
68
69        out, err = process.communicate()
70
71        compare(out, b'a command')
72        compare(err, b'some stdin')
73        compare(process.returncode, 1)
74
75    def test_command_is_sequence(self):
76        Popen = MockPopen()
77        Popen.set_command('a command')
78
79        process = Popen(['a', 'command'], stdout=PIPE, stderr=PIPE)
80
81        compare(process.wait(), 0)
82        compare([
83                call.Popen(['a', 'command'], stderr=-1, stdout=-1),
84                call.Popen_instance.wait(),
85                ], Popen.mock.method_calls)
86
87    def test_communicate_with_input(self):
88        # setup
89        Popen = MockPopen()
90        Popen.set_command('a command')
91        # usage
92        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
93        out, err = process.communicate('foo')
94        # test call list
95        compare([
96                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
97                call.Popen_instance.communicate('foo'),
98                ], Popen.mock.method_calls)
99
100    def test_communicate_with_timeout(self):
101        Popen = MockPopen()
102        Popen.set_command('a command', returncode=3)
103        process = Popen('a command')
104        if PY2:
105            with ShouldRaise(TypeError):
106                process.communicate(timeout=1)
107            with ShouldRaise(TypeError):
108                process.communicate('foo', 1)
109        else:
110            process.communicate(timeout=1)
111            process.communicate('foo', 1)
112            compare([
113                call.Popen('a command'),
114                call.Popen_instance.communicate(timeout=1),
115                call.Popen_instance.communicate('foo', 1),
116            ], expected=Popen.mock.method_calls)
117
118    def test_read_from_stdout(self):
119        # setup
120        Popen = MockPopen()
121        Popen.set_command('a command', stdout=b'foo')
122        # usage
123        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
124        self.assertTrue(isinstance(process.stdout.fileno(), int))
125        compare(process.stdout.read(), b'foo')
126        # test call list
127        compare([
128                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
129                ], Popen.mock.method_calls)
130
131    def test_read_from_stderr(self):
132        # setup
133        Popen = MockPopen()
134        Popen.set_command('a command', stderr=b'foo')
135        # usage
136        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
137        self.assertTrue(isinstance(process.stdout.fileno(), int))
138        compare(process.stderr.read(), b'foo')
139        # test call list
140        compare([
141                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
142                ], Popen.mock.method_calls)
143
144    def test_read_from_stdout_with_stderr_redirected_check_stdout_contents(self):
145        # setup
146        Popen = MockPopen()
147        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
148        # usage
149        process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True)
150        # test stdout contents
151        compare(b'foobar', process.stdout.read())
152        compare(process.stderr, None)
153
154    def test_read_from_stdout_with_stderr_redirected_check_stdout_stderr_interleaved(self):
155        # setup
156        Popen = MockPopen()
157        Popen.set_command('a command', stdout=b'o1\no2\no3\no4\n', stderr=b'e1\ne2\n')
158        # usage
159        process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True)
160        self.assertTrue(isinstance(process.stdout.fileno(), int))
161        # test stdout contents
162        compare(b'o1\ne1\no2\ne2\no3\no4\n', process.stdout.read())
163
164    def test_communicate_with_stderr_redirected_check_stderr_is_none(self):
165        # setup
166        Popen = MockPopen()
167        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
168        # usage
169        process = Popen('a command', stdout=PIPE, stderr=STDOUT, shell=True)
170        out, err = process.communicate()
171        # test stderr is None
172        compare(out, b'foobar')
173        compare(err, None)
174
175    def test_read_from_stdout_and_stderr(self):
176        # setup
177        Popen = MockPopen()
178        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
179        # usage
180        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
181        compare(process.stdout.read(), b'foo')
182        compare(process.stderr.read(), b'bar')
183        # test call list
184        compare([
185                call.Popen('a command', shell=True, stderr=PIPE, stdout=PIPE),
186                ], Popen.mock.method_calls)
187
188    def test_communicate_text_mode(self):
189        Popen = MockPopen()
190        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
191        # usage
192        process = Popen('a command', stdout=PIPE, stderr=PIPE, text=True)
193        actual = process.communicate()
194        # check
195        compare(actual, expected=(u'foo', u'bar'))
196
197    def test_communicate_universal_newlines(self):
198        Popen = MockPopen()
199        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
200        # usage
201        process = Popen('a command', stdout=PIPE, stderr=PIPE, universal_newlines=True)
202        actual = process.communicate()
203        # check
204        compare(actual, expected=(u'foo', u'bar'))
205
206    def test_communicate_encoding(self):
207        Popen = MockPopen()
208        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
209        # usage
210        process = Popen('a command', stdout=PIPE, stderr=PIPE, encoding='ascii')
211        actual = process.communicate()
212        # check
213        compare(actual, expected=(u'foo', u'bar'))
214
215    def test_communicate_encoding_with_errors(self):
216        Popen = MockPopen()
217        Popen.set_command('a command', stdout=b'\xa3', stderr=b'\xa3')
218        # usage
219        process = Popen('a command', stdout=PIPE, stderr=PIPE, encoding='ascii', errors='ignore')
220        actual = process.communicate()
221        # check
222        if PY2:
223            compare(actual, expected=(b'\xa3', b'\xa3'))
224        else:
225            compare(actual, expected=(u'', u''))
226
227    def test_read_from_stdout_and_stderr_text_mode(self):
228        Popen = MockPopen()
229        Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
230        # usage
231        process = Popen('a command', stdout=PIPE, stderr=PIPE, text=True)
232        actual = process.stdout.read(), process.stderr.read()
233        # check
234        compare(actual, expected=(u'foo', u'bar'))
235
236    def test_write_to_stdin(self):
237        # setup
238        Popen = MockPopen()
239        Popen.set_command('a command')
240        # usage
241        process = Popen('a command', stdin=PIPE, shell=True)
242        process.stdin.write('some text')
243        # test call list
244        compare(Popen.mock.method_calls, expected=[
245            call.Popen('a command', shell=True, stdin=PIPE),
246            call.Popen_instance.stdin.write('some text'),
247        ])
248        compare(Popen.all_calls, expected=[
249            call.Popen('a command', shell=True, stdin=PIPE),
250            call.Popen('a command', shell=True, stdin=PIPE).stdin.write('some text'),
251        ])
252        compare(process.mock.method_calls, expected=[
253            call.stdin.write('some text'),
254        ])
255        compare(process.calls, expected=[
256            call.stdin.write('some text'),
257        ])
258        repr(call.stdin.write('some text'))
259
260    def test_wait_and_return_code(self):
261        # setup
262        Popen = MockPopen()
263        Popen.set_command('a command', returncode=3)
264        # usage
265        process = Popen('a command')
266        compare(process.returncode, None)
267        # result checking
268        compare(process.wait(), 3)
269        compare(process.returncode, 3)
270        # test call list
271        compare([
272                call.Popen('a command'),
273                call.Popen_instance.wait(),
274                ], Popen.mock.method_calls)
275
276    def test_wait_timeout(self):
277        Popen = MockPopen()
278        Popen.set_command('a command', returncode=3)
279        process = Popen('a command')
280        if PY2:
281            with ShouldRaise(TypeError):
282                process.wait(timeout=1)
283            with ShouldRaise(TypeError):
284                process.wait(1)
285        else:
286            process.wait(timeout=1)
287            process.wait(1)
288            compare([
289                call.Popen('a command'),
290                call.Popen_instance.wait(timeout=1),
291                call.Popen_instance.wait(1)
292            ], expected=Popen.mock.method_calls)
293
294    def test_multiple_uses(self):
295        Popen = MockPopen()
296        Popen.set_command('a command', b'a')
297        Popen.set_command('b command', b'b')
298        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
299        out, err = process.communicate('foo')
300        compare(out, b'a')
301        process = Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True)
302        out, err = process.communicate('foo')
303        compare(out, b'b')
304        compare([
305                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
306                call.Popen_instance.communicate('foo'),
307                call.Popen(['b', 'command'], shell=True, stderr=-1, stdout=-1),
308                call.Popen_instance.communicate('foo'),
309                ], Popen.mock.method_calls)
310
311    def test_send_signal(self):
312        # setup
313        Popen = MockPopen()
314        Popen.set_command('a command')
315        # usage
316        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
317        process.send_signal(0)
318        # result checking
319        compare([
320                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
321                call.Popen_instance.send_signal(0),
322                ], Popen.mock.method_calls)
323
324    def test_terminate(self):
325        # setup
326        Popen = MockPopen()
327        Popen.set_command('a command')
328        # usage
329        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
330        process.terminate()
331        # result checking
332        compare([
333                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
334                call.Popen_instance.terminate(),
335                ], Popen.mock.method_calls)
336
337    def test_kill(self):
338        # setup
339        Popen = MockPopen()
340        Popen.set_command('a command')
341        # usage
342        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
343        process.kill()
344        # result checking
345        compare([
346                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
347                call.Popen_instance.kill(),
348                ], Popen.mock.method_calls)
349
350    def test_all_signals(self):
351        # setup
352        Popen = MockPopen()
353        Popen.set_command('a command')
354        # usage
355        process = Popen('a command')
356        process.send_signal(signal.SIGINT)
357        process.terminate()
358        process.kill()
359        # test call list
360        compare([
361                call.Popen('a command'),
362                call.Popen_instance.send_signal(signal.SIGINT),
363                call.Popen_instance.terminate(),
364                call.Popen_instance.kill(),
365                ], Popen.mock.method_calls)
366
367    def test_poll_no_setup(self):
368        # setup
369        Popen = MockPopen()
370        Popen.set_command('a command')
371        # usage
372        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
373        compare(process.poll(), None)
374        compare(process.poll(), None)
375        compare(process.wait(), 0)
376        compare(process.poll(), 0)
377        # result checking
378        compare([
379                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
380                call.Popen_instance.poll(),
381                call.Popen_instance.poll(),
382                call.Popen_instance.wait(),
383                call.Popen_instance.poll(),
384                ], Popen.mock.method_calls)
385
386    def test_poll_setup(self):
387        # setup
388        Popen = MockPopen()
389        Popen.set_command('a command', poll_count=1)
390        # usage
391        process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
392        compare(process.poll(), None)
393        compare(process.poll(), 0)
394        compare(process.wait(), 0)
395        compare(process.poll(), 0)
396        # result checking
397        compare([
398                call.Popen('a command', shell=True, stderr=-1, stdout=-1),
399                call.Popen_instance.poll(),
400                call.Popen_instance.poll(),
401                call.Popen_instance.wait(),
402                call.Popen_instance.poll(),
403                ], Popen.mock.method_calls)
404
405    def test_poll_until_result(self):
406        # setup
407        Popen = MockPopen()
408        Popen.set_command('a command', returncode=3, poll_count=2)
409        # example usage
410        process = Popen('a command')
411        while process.poll() is None:
412            # you'd probably have a sleep here, or go off and
413            # do some other work.
414            pass
415        # result checking
416        compare(process.returncode, 3)
417        compare([
418                call.Popen('a command'),
419                call.Popen_instance.poll(),
420                call.Popen_instance.poll(),
421                call.Popen_instance.poll(),
422                ], Popen.mock.method_calls)
423
424    def test_command_not_specified(self):
425        Popen = MockPopen()
426        with ShouldRaise(KeyError(
427            "Nothing specified for command 'a command'"
428        )):
429            Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
430
431    def test_default_command_min_args(self):
432        # setup
433        Popen = MockPopen()
434        Popen.set_default()
435        # usage
436        process = Popen('a command', stdout=PIPE, stderr=PIPE)
437        # process started, no return code
438        compare(process.pid, 1234)
439        compare(None, process.returncode)
440
441        out, err = process.communicate()
442
443        # test the rest
444        compare(out, b'')
445        compare(err, b'')
446        compare(process.returncode, 0)
447        # test call list
448        compare([
449            call.Popen('a command', stderr=-1, stdout=-1),
450            call.Popen_instance.communicate(),
451        ], Popen.mock.method_calls)
452
453    def test_default_command_max_args(self):
454        Popen = MockPopen()
455        Popen.set_default(b'out', b'err', 1, 345)
456
457        process = Popen('a command', stdout=PIPE, stderr=PIPE)
458        compare(process.pid, 345)
459        compare(None, process.returncode)
460
461        out, err = process.communicate()
462
463        # test the rest
464        compare(out, b'out')
465        compare(err, b'err')
466        compare(process.returncode, 1)
467        # test call list
468        compare([
469            call.Popen('a command', stderr=-1, stdout=-1),
470            call.Popen_instance.communicate(),
471        ], Popen.mock.method_calls)
472
473    def test_invalid_parameters(self):
474        Popen = MockPopen()
475        with ShouldRaise(TypeError(
476                "__init__() got an unexpected keyword argument 'foo'"
477        )):
478            Popen(foo='bar')
479
480    def test_invalid_method_or_attr(self):
481        Popen = MockPopen()
482        Popen.set_command('command')
483        process = Popen('command')
484        with ShouldRaise(AttributeError):
485            process.foo()
486
487    def test_invalid_attribute(self):
488        Popen = MockPopen()
489        Popen.set_command('command')
490        process = Popen('command')
491        with ShouldRaise(AttributeError):
492            process.foo
493
494    def test_invalid_communicate_call(self):
495        Popen = MockPopen()
496        Popen.set_command('bar')
497        process = Popen('bar')
498        with ShouldRaise(TypeError(
499                "communicate() got an unexpected keyword argument 'foo'"
500        )):
501            process.communicate(foo='bar')
502
503    def test_invalid_wait_call(self):
504        Popen = MockPopen()
505        Popen.set_command('bar')
506        process = Popen('bar')
507        with ShouldRaise(TypeError(
508                "wait() got an unexpected keyword argument 'foo'"
509        )):
510            process.wait(foo='bar')
511
512    def test_invalid_send_signal(self):
513        Popen = MockPopen()
514        Popen.set_command('bar')
515        process = Popen('bar')
516        with ShouldRaise(TypeError(
517                "send_signal() got an unexpected keyword argument 'foo'"
518        )):
519            process.send_signal(foo='bar')
520
521    def test_invalid_terminate(self):
522        Popen = MockPopen()
523        Popen.set_command('bar')
524        process = Popen('bar')
525        with ShouldRaise(TypeError(
526                "terminate() got an unexpected keyword argument 'foo'"
527        )):
528            process.terminate(foo='bar')
529
530    def test_invalid_kill(self):
531        Popen = MockPopen()
532        Popen.set_command('bar')
533        process = Popen('bar')
534        if PY2:
535            text = 'kill() takes exactly 1 argument (2 given)'
536        else:
537            text = 'kill() takes 1 positional argument but 2 were given'
538        with ShouldRaise(TypeError(text)):
539            process.kill('moo')
540
541    def test_invalid_poll(self):
542        Popen = MockPopen()
543        Popen.set_command('bar')
544        process = Popen('bar')
545        if PY2:
546            text = 'poll() takes exactly 1 argument (2 given)'
547        else:
548            text = 'poll() takes 1 positional argument but 2 were given'
549        with ShouldRaise(TypeError(text)):
550            process.poll('moo')
551
552    def test_non_pipe(self):
553        # setup
554        Popen = MockPopen()
555        Popen.set_command('a command')
556        # usage
557        process = Popen('a command')
558        # checks
559        compare(process.stdout, expected=None)
560        compare(process.stderr, expected=None)
561        out, err = process.communicate()
562        # test the rest
563        compare(out, expected=None)
564        compare(err, expected=None)
565        # test call list
566        compare([
567                call.Popen('a command'),
568                call.Popen_instance.communicate(),
569                ], Popen.mock.method_calls)
570
571    def test_use_as_context_manager(self):
572        # setup
573        Popen = MockPopen()
574        Popen.set_command('a command')
575        if PY2:
576
577            process = Popen('a command')
578            with ShouldRaise(AttributeError):
579                process.__enter__
580            with ShouldRaise(AttributeError):
581                process.__exit__
582
583        else:
584
585            # usage
586            with Popen('a command', stdout=PIPE, stderr=PIPE) as process:
587                # process started, no return code
588                compare(process.pid, 1234)
589                compare(None, process.returncode)
590
591                out, err = process.communicate()
592
593            # test the rest
594            compare(out, b'')
595            compare(err, b'')
596            compare(process.returncode, 0)
597
598            compare(process.stdout.closed, expected=True)
599            compare(process.stderr.closed, expected=True)
600
601            # test call list
602            compare([
603                call.Popen('a command', stderr=-1, stdout=-1),
604                call.Popen_instance.communicate(),
605                call.Popen_instance.wait(),
606            ], Popen.mock.method_calls)
607
608    def test_start_new_session(self):
609        # setup
610        Popen = MockPopen()
611        Popen.set_command('a command')
612        # usage
613        Popen('a command', start_new_session=True)
614        # test call list
615        compare([
616            call.Popen('a command', start_new_session=True),
617        ], Popen.mock.method_calls)
618
619    def test_simultaneous_processes(self):
620        Popen = MockPopen()
621        Popen.set_command('a command', b'a', returncode=1)
622        Popen.set_command('b command', b'b', returncode=2)
623        process_a = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
624        process_b = Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True)
625        compare(process_a.wait(), expected=1)
626        compare(process_b.wait(), expected=2)
627        a_call = call.Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
628        b_call = call.Popen(['b', 'command'], stdout=PIPE, stderr=PIPE, shell=True)
629        compare(Popen.all_calls, expected=[
630                a_call,
631                b_call,
632                a_call.wait(),
633                b_call.wait(),
634        ])
635        compare(process_a.mock.method_calls, expected=[
636            call.wait()
637        ])
638        compare(process_b.mock.method_calls, expected=[
639            call.wait()
640        ])
641
642    def test_pass_executable(self):
643        Popen = MockPopen()
644        Popen.set_command('a command', b'a', returncode=1)
645        Popen('a command', executable='/foo/bar')
646        compare(Popen.all_calls, expected=[
647            call.Popen('a command', executable='/foo/bar')
648        ])
649
650    def test_set_command_with_list(self):
651        Popen = MockPopen()
652        Popen.set_command(['a', 'command'])
653        Popen(['a', 'command'], stdout=PIPE, stderr=PIPE)
654        compare([call.Popen(['a',  'command'], stderr=-1, stdout=-1)],
655                actual=Popen.all_calls)
656
657
658class IntegrationTests(TestCase):
659
660    def setUp(self):
661        self.popen = MockPopen()
662        replacer = Replacer()
663        replacer.replace('testfixtures.tests.test_popen.subprocess.Popen', self.popen)
664        self.addCleanup(replacer.restore)
665
666    def test_command_called_with_check_call_check_returncode(self):
667        self.popen.set_command('ls')
668        compare(0, subprocess.check_call(['ls']))
669
670    def test_command_called_with_check_output_check_stdout_returned(self):
671        self.popen.set_command('ls', stdout=b'abc')
672        compare(b'abc', subprocess.check_output(['ls']))
673
674    def test_command_called_with_check_output_stderr_to_stdout_check_returned(self):
675        self.popen.set_command('ls', stderr=b'xyz')
676        compare(b'xyz', subprocess.check_output(['ls'], stderr=STDOUT))
677
678    def test_command_called_with_check_call_failing_command_check_exception(self):
679        self.popen.set_command('ls', returncode=1)
680        with self.assertRaises(subprocess.CalledProcessError):
681            subprocess.check_output(['ls'])
682