1# -*- coding: utf-8 -*-
2# Copyright (c) 2017 Ansible Project
3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
4
5# Make coding more python3-ish
6from __future__ import (absolute_import, division, print_function)
7__metaclass__ = type
8
9import errno
10from itertools import product
11from io import BytesIO
12
13import pytest
14
15from ansible.module_utils._text import to_native
16from ansible.module_utils.six import PY2
17from ansible.module_utils.compat import selectors
18
19
20class OpenBytesIO(BytesIO):
21    """BytesIO with dummy close() method
22
23    So that you can inspect the content after close() was called.
24    """
25
26    def close(self):
27        pass
28
29
30@pytest.fixture
31def mock_os(mocker):
32    def mock_os_chdir(path):
33        if path == '/inaccessible':
34            raise OSError(errno.EPERM, "Permission denied: '/inaccessible'")
35
36    def mock_os_abspath(path):
37        if path.startswith('/'):
38            return path
39        else:
40            return os.getcwd.return_value + '/' + path
41
42    os = mocker.patch('ansible.module_utils.basic.os')
43
44    os.path.expandvars.side_effect = lambda x: x
45    os.path.expanduser.side_effect = lambda x: x
46    os.environ = {'PATH': '/bin'}
47    os.getcwd.return_value = '/home/foo'
48    os.path.isdir.return_value = True
49    os.chdir.side_effect = mock_os_chdir
50    os.path.abspath.side_effect = mock_os_abspath
51
52    yield os
53
54
55class DummyFileObj():
56    def __init__(self, fileobj):
57        self.fileobj = fileobj
58
59
60class SpecialBytesIO(BytesIO):
61    def __init__(self, *args, **kwargs):
62        fh = kwargs.pop('fh', None)
63        super(SpecialBytesIO, self).__init__(*args, **kwargs)
64        self.fh = fh
65
66    def fileno(self):
67        return self.fh
68
69    # We need to do this because some of our tests create a new value for stdout and stderr
70    # The new value is able to affect the string that is returned by the subprocess stdout and
71    # stderr but by the time the test gets it, it is too late to change the SpecialBytesIO that
72    # subprocess.Popen returns for stdout and stderr.  If we could figure out how to change those as
73    # well, then we wouldn't need this.
74    def __eq__(self, other):
75        if id(self) == id(other) or self.fh == other.fileno():
76            return True
77        return False
78
79
80class DummyKey:
81    def __init__(self, fileobj):
82        self.fileobj = fileobj
83
84
85@pytest.fixture
86def mock_subprocess(mocker):
87
88    class MockSelector(selectors.BaseSelector):
89        def __init__(self):
90            super(MockSelector, self).__init__()
91            self._file_objs = []
92
93        def register(self, fileobj, events, data=None):
94            self._file_objs.append(fileobj)
95
96        def unregister(self, fileobj):
97            self._file_objs.remove(fileobj)
98
99        def select(self, timeout=None):
100            ready = []
101            for file_obj in self._file_objs:
102                ready.append((DummyKey(subprocess._output[file_obj.fileno()]), selectors.EVENT_READ))
103            return ready
104
105        def get_map(self):
106            return self._file_objs
107
108        def close(self):
109            super(MockSelector, self).close()
110            self._file_objs = []
111
112    selectors.DefaultSelector = MockSelector
113
114    subprocess = mocker.patch('ansible.module_utils.basic.subprocess')
115    subprocess._output = {mocker.sentinel.stdout: SpecialBytesIO(b'', fh=mocker.sentinel.stdout),
116                          mocker.sentinel.stderr: SpecialBytesIO(b'', fh=mocker.sentinel.stderr)}
117
118    cmd = mocker.MagicMock()
119    cmd.returncode = 0
120    cmd.stdin = OpenBytesIO()
121    cmd.stdout = subprocess._output[mocker.sentinel.stdout]
122    cmd.stderr = subprocess._output[mocker.sentinel.stderr]
123    subprocess.Popen.return_value = cmd
124
125    yield subprocess
126
127
128@pytest.fixture()
129def rc_am(mocker, am, mock_os, mock_subprocess):
130    am.fail_json = mocker.MagicMock(side_effect=SystemExit)
131    am._os = mock_os
132    am._subprocess = mock_subprocess
133    yield am
134
135
136class TestRunCommandArgs:
137    # Format is command as passed to run_command, command to Popen as list, command to Popen as string
138    ARGS_DATA = (
139                (['/bin/ls', 'a', 'b', 'c'], [b'/bin/ls', b'a', b'b', b'c'], b'/bin/ls a b c'),
140                ('/bin/ls a " b" "c "', [b'/bin/ls', b'a', b' b', b'c '], b'/bin/ls a " b" "c "'),
141    )
142
143    # pylint bug: https://github.com/PyCQA/pylint/issues/511
144    # pylint: disable=undefined-variable
145    @pytest.mark.parametrize('cmd, expected, shell, stdin',
146                             ((arg, cmd_str if sh else cmd_lst, sh, {})
147                              for (arg, cmd_lst, cmd_str), sh in product(ARGS_DATA, (True, False))),
148                             indirect=['stdin'])
149    def test_args(self, cmd, expected, shell, rc_am):
150        rc_am.run_command(cmd, use_unsafe_shell=shell)
151        assert rc_am._subprocess.Popen.called
152        args, kwargs = rc_am._subprocess.Popen.call_args
153        assert args == (expected, )
154        assert kwargs['shell'] == shell
155
156    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
157    def test_tuple_as_args(self, rc_am):
158        with pytest.raises(SystemExit):
159            rc_am.run_command(('ls', '/'))
160        assert rc_am.fail_json.called
161
162
163class TestRunCommandCwd:
164    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
165    def test_cwd(self, mocker, rc_am):
166        rc_am._os.getcwd.return_value = '/old'
167        rc_am.run_command('/bin/ls', cwd='/new')
168        assert rc_am._os.chdir.mock_calls == [mocker.call(b'/new'), mocker.call('/old'), ]
169
170    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
171    def test_cwd_relative_path(self, mocker, rc_am):
172        rc_am._os.getcwd.return_value = '/old'
173        rc_am.run_command('/bin/ls', cwd='sub-dir')
174        assert rc_am._os.chdir.mock_calls == [mocker.call(b'/old/sub-dir'), mocker.call('/old'), ]
175
176    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
177    def test_cwd_not_a_dir(self, mocker, rc_am):
178        rc_am._os.getcwd.return_value = '/old'
179        rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir'
180        rc_am.run_command('/bin/ls', cwd='/not-a-dir')
181        assert rc_am._os.chdir.mock_calls == [mocker.call('/old'), ]
182
183    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
184    def test_cwd_not_a_dir_noignore(self, rc_am):
185        rc_am._os.getcwd.return_value = '/old'
186        rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir'
187        with pytest.raises(SystemExit):
188            rc_am.run_command('/bin/ls', cwd='/not-a-dir', ignore_invalid_cwd=False)
189        assert rc_am.fail_json.called
190
191
192class TestRunCommandPrompt:
193    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
194    def test_prompt_bad_regex(self, rc_am):
195        with pytest.raises(SystemExit):
196            rc_am.run_command('foo', prompt_regex='[pP)assword:')
197        assert rc_am.fail_json.called
198
199    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
200    def test_prompt_no_match(self, mocker, rc_am):
201        rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello')
202        (rc, _, _) = rc_am.run_command('foo', prompt_regex='[pP]assword:')
203        assert rc == 0
204
205    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
206    def test_prompt_match_wo_data(self, mocker, rc_am):
207        rc_am._subprocess._output = {mocker.sentinel.stdout:
208                                     SpecialBytesIO(b'Authentication required!\nEnter password: ',
209                                                    fh=mocker.sentinel.stdout),
210                                     mocker.sentinel.stderr:
211                                     SpecialBytesIO(b'', fh=mocker.sentinel.stderr)}
212        (rc, _, _) = rc_am.run_command('foo', prompt_regex=r'[pP]assword:', data=None)
213        assert rc == 257
214
215
216class TestRunCommandRc:
217    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
218    def test_check_rc_false(self, rc_am):
219        rc_am._subprocess.Popen.return_value.returncode = 1
220        (rc, _, _) = rc_am.run_command('/bin/false', check_rc=False)
221        assert rc == 1
222
223    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
224    def test_check_rc_true(self, rc_am):
225        rc_am._subprocess.Popen.return_value.returncode = 1
226        with pytest.raises(SystemExit):
227            rc_am.run_command('/bin/false', check_rc=True)
228        assert rc_am.fail_json.called
229        args, kwargs = rc_am.fail_json.call_args
230        assert kwargs['rc'] == 1
231
232
233class TestRunCommandOutput:
234    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
235    def test_text_stdin(self, rc_am):
236        (rc, stdout, stderr) = rc_am.run_command('/bin/foo', data='hello world')
237        assert rc_am._subprocess.Popen.return_value.stdin.getvalue() == b'hello world\n'
238
239    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
240    def test_ascii_stdout(self, mocker, rc_am):
241        rc_am._subprocess._output = {mocker.sentinel.stdout:
242                                     SpecialBytesIO(b'hello', fh=mocker.sentinel.stdout),
243                                     mocker.sentinel.stderr:
244                                     SpecialBytesIO(b'', fh=mocker.sentinel.stderr)}
245        (rc, stdout, stderr) = rc_am.run_command('/bin/cat hello.txt')
246        assert rc == 0
247        # module_utils function.  On py3 it returns text and py2 it returns
248        # bytes because it's returning native strings
249        assert stdout == 'hello'
250
251    @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
252    def test_utf8_output(self, mocker, rc_am):
253        rc_am._subprocess._output = {mocker.sentinel.stdout:
254                                     SpecialBytesIO(u'Žarn§'.encode('utf-8'),
255                                                    fh=mocker.sentinel.stdout),
256                                     mocker.sentinel.stderr:
257                                     SpecialBytesIO(u'لرئيسية'.encode('utf-8'),
258                                                    fh=mocker.sentinel.stderr)}
259        (rc, stdout, stderr) = rc_am.run_command('/bin/something_ugly')
260        assert rc == 0
261        # module_utils function.  On py3 it returns text and py2 it returns
262        # bytes because it's returning native strings
263        assert stdout == to_native(u'Žarn§')
264        assert stderr == to_native(u'لرئيسية')
265
266
267@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
268def test_run_command_fds(mocker, rc_am):
269    subprocess_mock = mocker.patch('ansible.module_utils.basic.subprocess')
270    subprocess_mock.Popen.side_effect = AssertionError
271
272    try:
273        rc_am.run_command('synchronize', pass_fds=(101, 42))
274    except SystemExit:
275        pass
276
277    if PY2:
278        assert subprocess_mock.Popen.call_args[1]['close_fds'] is False
279        assert 'pass_fds' not in subprocess_mock.Popen.call_args[1]
280
281    else:
282        assert subprocess_mock.Popen.call_args[1]['pass_fds'] == (101, 42)
283        assert subprocess_mock.Popen.call_args[1]['close_fds'] is True
284