1# -*- coding: utf-8 -*- 2# Copyright (c) 2017 Ansible Project 3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 4 5# Make coding more python3-ish 6from __future__ import (absolute_import, division, print_function) 7__metaclass__ = type 8 9import errno 10from itertools import product 11from io import BytesIO 12 13import pytest 14 15from ansible.module_utils._text import to_native 16from ansible.module_utils.six import PY2 17from ansible.module_utils.compat import selectors 18 19 20class OpenBytesIO(BytesIO): 21 """BytesIO with dummy close() method 22 23 So that you can inspect the content after close() was called. 24 """ 25 26 def close(self): 27 pass 28 29 30@pytest.fixture 31def mock_os(mocker): 32 def mock_os_chdir(path): 33 if path == '/inaccessible': 34 raise OSError(errno.EPERM, "Permission denied: '/inaccessible'") 35 36 def mock_os_abspath(path): 37 if path.startswith('/'): 38 return path 39 else: 40 return os.getcwd.return_value + '/' + path 41 42 os = mocker.patch('ansible.module_utils.basic.os') 43 44 os.path.expandvars.side_effect = lambda x: x 45 os.path.expanduser.side_effect = lambda x: x 46 os.environ = {'PATH': '/bin'} 47 os.getcwd.return_value = '/home/foo' 48 os.path.isdir.return_value = True 49 os.chdir.side_effect = mock_os_chdir 50 os.path.abspath.side_effect = mock_os_abspath 51 52 yield os 53 54 55class DummyFileObj(): 56 def __init__(self, fileobj): 57 self.fileobj = fileobj 58 59 60class SpecialBytesIO(BytesIO): 61 def __init__(self, *args, **kwargs): 62 fh = kwargs.pop('fh', None) 63 super(SpecialBytesIO, self).__init__(*args, **kwargs) 64 self.fh = fh 65 66 def fileno(self): 67 return self.fh 68 69 # We need to do this because some of our tests create a new value for stdout and stderr 70 # The new value is able to affect the string that is returned by the subprocess stdout and 71 # stderr but by the time the test gets it, it is too late to change the SpecialBytesIO that 72 # subprocess.Popen returns for stdout and stderr. If we could figure out how to change those as 73 # well, then we wouldn't need this. 74 def __eq__(self, other): 75 if id(self) == id(other) or self.fh == other.fileno(): 76 return True 77 return False 78 79 80class DummyKey: 81 def __init__(self, fileobj): 82 self.fileobj = fileobj 83 84 85@pytest.fixture 86def mock_subprocess(mocker): 87 88 class MockSelector(selectors.BaseSelector): 89 def __init__(self): 90 super(MockSelector, self).__init__() 91 self._file_objs = [] 92 93 def register(self, fileobj, events, data=None): 94 self._file_objs.append(fileobj) 95 96 def unregister(self, fileobj): 97 self._file_objs.remove(fileobj) 98 99 def select(self, timeout=None): 100 ready = [] 101 for file_obj in self._file_objs: 102 ready.append((DummyKey(subprocess._output[file_obj.fileno()]), selectors.EVENT_READ)) 103 return ready 104 105 def get_map(self): 106 return self._file_objs 107 108 def close(self): 109 super(MockSelector, self).close() 110 self._file_objs = [] 111 112 selectors.DefaultSelector = MockSelector 113 114 subprocess = mocker.patch('ansible.module_utils.basic.subprocess') 115 subprocess._output = {mocker.sentinel.stdout: SpecialBytesIO(b'', fh=mocker.sentinel.stdout), 116 mocker.sentinel.stderr: SpecialBytesIO(b'', fh=mocker.sentinel.stderr)} 117 118 cmd = mocker.MagicMock() 119 cmd.returncode = 0 120 cmd.stdin = OpenBytesIO() 121 cmd.stdout = subprocess._output[mocker.sentinel.stdout] 122 cmd.stderr = subprocess._output[mocker.sentinel.stderr] 123 subprocess.Popen.return_value = cmd 124 125 yield subprocess 126 127 128@pytest.fixture() 129def rc_am(mocker, am, mock_os, mock_subprocess): 130 am.fail_json = mocker.MagicMock(side_effect=SystemExit) 131 am._os = mock_os 132 am._subprocess = mock_subprocess 133 yield am 134 135 136class TestRunCommandArgs: 137 # Format is command as passed to run_command, command to Popen as list, command to Popen as string 138 ARGS_DATA = ( 139 (['/bin/ls', 'a', 'b', 'c'], [b'/bin/ls', b'a', b'b', b'c'], b'/bin/ls a b c'), 140 ('/bin/ls a " b" "c "', [b'/bin/ls', b'a', b' b', b'c '], b'/bin/ls a " b" "c "'), 141 ) 142 143 # pylint bug: https://github.com/PyCQA/pylint/issues/511 144 # pylint: disable=undefined-variable 145 @pytest.mark.parametrize('cmd, expected, shell, stdin', 146 ((arg, cmd_str if sh else cmd_lst, sh, {}) 147 for (arg, cmd_lst, cmd_str), sh in product(ARGS_DATA, (True, False))), 148 indirect=['stdin']) 149 def test_args(self, cmd, expected, shell, rc_am): 150 rc_am.run_command(cmd, use_unsafe_shell=shell) 151 assert rc_am._subprocess.Popen.called 152 args, kwargs = rc_am._subprocess.Popen.call_args 153 assert args == (expected, ) 154 assert kwargs['shell'] == shell 155 156 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 157 def test_tuple_as_args(self, rc_am): 158 with pytest.raises(SystemExit): 159 rc_am.run_command(('ls', '/')) 160 assert rc_am.fail_json.called 161 162 163class TestRunCommandCwd: 164 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 165 def test_cwd(self, mocker, rc_am): 166 rc_am._os.getcwd.return_value = '/old' 167 rc_am.run_command('/bin/ls', cwd='/new') 168 assert rc_am._os.chdir.mock_calls == [mocker.call(b'/new'), mocker.call('/old'), ] 169 170 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 171 def test_cwd_relative_path(self, mocker, rc_am): 172 rc_am._os.getcwd.return_value = '/old' 173 rc_am.run_command('/bin/ls', cwd='sub-dir') 174 assert rc_am._os.chdir.mock_calls == [mocker.call(b'/old/sub-dir'), mocker.call('/old'), ] 175 176 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 177 def test_cwd_not_a_dir(self, mocker, rc_am): 178 rc_am._os.getcwd.return_value = '/old' 179 rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir' 180 rc_am.run_command('/bin/ls', cwd='/not-a-dir') 181 assert rc_am._os.chdir.mock_calls == [mocker.call('/old'), ] 182 183 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 184 def test_cwd_not_a_dir_noignore(self, rc_am): 185 rc_am._os.getcwd.return_value = '/old' 186 rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir' 187 with pytest.raises(SystemExit): 188 rc_am.run_command('/bin/ls', cwd='/not-a-dir', ignore_invalid_cwd=False) 189 assert rc_am.fail_json.called 190 191 192class TestRunCommandPrompt: 193 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 194 def test_prompt_bad_regex(self, rc_am): 195 with pytest.raises(SystemExit): 196 rc_am.run_command('foo', prompt_regex='[pP)assword:') 197 assert rc_am.fail_json.called 198 199 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 200 def test_prompt_no_match(self, mocker, rc_am): 201 rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello') 202 (rc, _, _) = rc_am.run_command('foo', prompt_regex='[pP]assword:') 203 assert rc == 0 204 205 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 206 def test_prompt_match_wo_data(self, mocker, rc_am): 207 rc_am._subprocess._output = {mocker.sentinel.stdout: 208 SpecialBytesIO(b'Authentication required!\nEnter password: ', 209 fh=mocker.sentinel.stdout), 210 mocker.sentinel.stderr: 211 SpecialBytesIO(b'', fh=mocker.sentinel.stderr)} 212 (rc, _, _) = rc_am.run_command('foo', prompt_regex=r'[pP]assword:', data=None) 213 assert rc == 257 214 215 216class TestRunCommandRc: 217 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 218 def test_check_rc_false(self, rc_am): 219 rc_am._subprocess.Popen.return_value.returncode = 1 220 (rc, _, _) = rc_am.run_command('/bin/false', check_rc=False) 221 assert rc == 1 222 223 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 224 def test_check_rc_true(self, rc_am): 225 rc_am._subprocess.Popen.return_value.returncode = 1 226 with pytest.raises(SystemExit): 227 rc_am.run_command('/bin/false', check_rc=True) 228 assert rc_am.fail_json.called 229 args, kwargs = rc_am.fail_json.call_args 230 assert kwargs['rc'] == 1 231 232 233class TestRunCommandOutput: 234 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 235 def test_text_stdin(self, rc_am): 236 (rc, stdout, stderr) = rc_am.run_command('/bin/foo', data='hello world') 237 assert rc_am._subprocess.Popen.return_value.stdin.getvalue() == b'hello world\n' 238 239 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 240 def test_ascii_stdout(self, mocker, rc_am): 241 rc_am._subprocess._output = {mocker.sentinel.stdout: 242 SpecialBytesIO(b'hello', fh=mocker.sentinel.stdout), 243 mocker.sentinel.stderr: 244 SpecialBytesIO(b'', fh=mocker.sentinel.stderr)} 245 (rc, stdout, stderr) = rc_am.run_command('/bin/cat hello.txt') 246 assert rc == 0 247 # module_utils function. On py3 it returns text and py2 it returns 248 # bytes because it's returning native strings 249 assert stdout == 'hello' 250 251 @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 252 def test_utf8_output(self, mocker, rc_am): 253 rc_am._subprocess._output = {mocker.sentinel.stdout: 254 SpecialBytesIO(u'Žarn§'.encode('utf-8'), 255 fh=mocker.sentinel.stdout), 256 mocker.sentinel.stderr: 257 SpecialBytesIO(u'لرئيسية'.encode('utf-8'), 258 fh=mocker.sentinel.stderr)} 259 (rc, stdout, stderr) = rc_am.run_command('/bin/something_ugly') 260 assert rc == 0 261 # module_utils function. On py3 it returns text and py2 it returns 262 # bytes because it's returning native strings 263 assert stdout == to_native(u'Žarn§') 264 assert stderr == to_native(u'لرئيسية') 265 266 267@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) 268def test_run_command_fds(mocker, rc_am): 269 subprocess_mock = mocker.patch('ansible.module_utils.basic.subprocess') 270 subprocess_mock.Popen.side_effect = AssertionError 271 272 try: 273 rc_am.run_command('synchronize', pass_fds=(101, 42)) 274 except SystemExit: 275 pass 276 277 if PY2: 278 assert subprocess_mock.Popen.call_args[1]['close_fds'] is False 279 assert 'pass_fds' not in subprocess_mock.Popen.call_args[1] 280 281 else: 282 assert subprocess_mock.Popen.call_args[1]['pass_fds'] == (101, 42) 283 assert subprocess_mock.Popen.call_args[1]['close_fds'] is True 284