1"""
2    :codeauthor: Nicole Thomas <nicole@saltstack.com>
3
4    Unit tests for the salt.modules.cmdmod module
5"""
6
7import builtins
8import logging
9import os
10import re
11import sys
12import tempfile
13
14import pytest
15import salt.modules.cmdmod as cmdmod
16import salt.utils.files
17import salt.utils.platform
18import salt.utils.stringutils
19from salt.exceptions import CommandExecutionError
20from salt.log.setup import LOG_LEVELS
21from tests.support.mock import MagicMock, Mock, MockTimedProc, mock_open, patch
22from tests.support.runtests import RUNTIME_VARS
23
24DEFAULT_SHELL = "foo/bar"
25MOCK_SHELL_FILE = "# List of acceptable shells\n\n/bin/bash\n"
26
27
28@pytest.fixture
29def configure_loader_modules():
30    return {cmdmod: {}}
31
32
33@pytest.fixture(scope="module")
34def mock_loglevels():
35    return {
36        "info": "foo",
37        "all": "bar",
38        "critical": "bar",
39        "trace": "bar",
40        "garbage": "bar",
41        "error": "bar",
42        "debug": "bar",
43        "warning": "bar",
44        "quiet": "bar",
45    }
46
47
48def test_render_cmd_no_template():
49    """
50    Tests return when template=None
51    """
52    assert cmdmod._render_cmd("foo", "bar", None) == ("foo", "bar")
53
54
55def test_render_cmd_unavailable_engine():
56    """
57    Tests CommandExecutionError raised when template isn't in the
58    template registry
59    """
60    with pytest.raises(CommandExecutionError):
61        cmdmod._render_cmd("boo", "bar", "baz")
62
63
64def test_check_loglevel_bad_level(mock_loglevels):
65    """
66    Tests return of providing an invalid loglevel option
67    """
68    with patch.dict(LOG_LEVELS, mock_loglevels):
69        assert cmdmod._check_loglevel(level="bad_loglevel") == "foo"
70
71
72def test_check_loglevel_bad_level_not_str(mock_loglevels):
73    """
74    Tests the return of providing an invalid loglevel option that is not a string
75    """
76    with patch.dict(LOG_LEVELS, mock_loglevels):
77        assert cmdmod._check_loglevel(level=1000) == "foo"
78
79
80def test_check_loglevel_quiet(mock_loglevels):
81    """
82    Tests the return of providing a loglevel of 'quiet'
83    """
84    with patch.dict(LOG_LEVELS, mock_loglevels):
85        assert cmdmod._check_loglevel(level="quiet") is None
86
87
88def test_parse_env_not_env():
89    """
90    Tests the return of an env that is not an env
91    """
92    assert cmdmod._parse_env(None) == {}
93
94
95def test_parse_env_list():
96    """
97    Tests the return of an env that is a list
98    """
99    ret = {"foo": None, "bar": None}
100    assert ret == cmdmod._parse_env(["foo", "bar"])
101
102
103def test_parse_env_dict():
104    """
105    Test the return of an env that is not a dict
106    """
107    assert cmdmod._parse_env("test") == {}
108
109
110def test_run_shell_is_not_file():
111    """
112    Tests error raised when shell is not available after _is_valid_shell error msg
113    and os.path.isfile returns False
114    """
115    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
116        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
117            with patch("os.path.isfile", MagicMock(return_value=False)):
118                with pytest.raises(CommandExecutionError):
119                    cmdmod._run("foo", "bar")
120
121
122def test_run_shell_file_no_access():
123    """
124    Tests error raised when shell is not available after _is_valid_shell error msg,
125    os.path.isfile returns True, but os.access returns False
126    """
127    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
128        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
129            with patch("os.path.isfile", MagicMock(return_value=True)):
130                with patch("os.access", MagicMock(return_value=False)):
131                    with pytest.raises(CommandExecutionError):
132                        cmdmod._run("foo", "bar")
133
134
135def test_run_runas_with_windows():
136    """
137    Tests error raised when runas is passed on windows
138    """
139    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
140        with patch("salt.utils.platform.is_windows", MagicMock(return_value=True)):
141            with patch(
142                "salt.utils.win_chcp.get_codepage_id", MagicMock(return_value=65001)
143            ):
144                with patch.dict(cmdmod.__grains__, {"os": "fake_os"}):
145                    with pytest.raises(CommandExecutionError):
146                        cmdmod._run("foo", "bar", runas="baz")
147
148
149def test_run_with_tuple():
150    """
151    Tests return when cmd is a tuple
152    """
153    mock_true = MagicMock(return_value=True)
154    with patch("salt.modules.cmdmod._is_valid_shell", mock_true):
155        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
156            with patch("os.path.isfile", mock_true):
157                with patch("os.access", mock_true):
158                    cmdmod._run(("echo", "foo"), python_shell=True, cwd="/")
159
160
161def test_run_user_not_available():
162    """
163    Tests return when runas user is not available
164    """
165    mock_true = MagicMock(return_value=True)
166    with patch("salt.modules.cmdmod._is_valid_shell", mock_true):
167        with patch("os.path.isfile", mock_true):
168            with patch("os.access", mock_true):
169                with pytest.raises(CommandExecutionError):
170                    cmdmod._run("foo", "bar", runas="baz")
171
172
173def test_run_zero_umask():
174    """
175    Tests error raised when umask is set to zero
176    """
177    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
178        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
179            with patch("os.path.isfile", MagicMock(return_value=True)):
180                with patch("os.access", MagicMock(return_value=True)):
181                    with pytest.raises(CommandExecutionError):
182                        cmdmod._run("foo", "bar", umask=0)
183
184
185def test_run_invalid_umask():
186    """
187    Tests error raised when an invalid umask is given
188    """
189    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
190        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
191            with patch("os.path.isfile", MagicMock(return_value=True)):
192                with patch("os.access", MagicMock(return_value=True)):
193                    pytest.raises(
194                        CommandExecutionError,
195                        cmdmod._run,
196                        "foo",
197                        "bar",
198                        umask="baz",
199                    )
200
201
202def test_run_invalid_cwd_not_abs_path():
203    """
204    Tests error raised when cwd is not an absolute path
205    """
206    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
207        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
208            with patch("os.path.isfile", MagicMock(return_value=True)):
209                with patch("os.access", MagicMock(return_value=True)):
210                    with pytest.raises(CommandExecutionError):
211                        cmdmod._run("foo", "bar")
212
213
214def test_run_invalid_cwd_not_dir():
215    """
216    Tests error raised when cwd is not a dir
217    """
218    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
219        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
220            with patch("os.path.isfile", MagicMock(return_value=True)):
221                with patch("os.access", MagicMock(return_value=True)):
222                    with patch("os.path.isabs", MagicMock(return_value=True)):
223                        with pytest.raises(CommandExecutionError):
224                            cmdmod._run("foo", "bar")
225
226
227def test_run_no_vt_os_error():
228    """
229    Tests error raised when not useing vt and OSError is provided
230    """
231    expected_error = "expect error"
232    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
233        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
234            with patch("os.path.isfile", MagicMock(return_value=True)):
235                with patch("os.access", MagicMock(return_value=True)):
236                    with patch(
237                        "salt.utils.timed_subprocess.TimedProc",
238                        MagicMock(side_effect=OSError(expected_error)),
239                    ):
240                        with pytest.raises(CommandExecutionError) as error:
241                            cmdmod.run("foo", cwd="/")
242                        assert error.value.args[0].endswith(expected_error)
243
244
245def test_run_no_vt_io_error():
246    """
247    Tests error raised when not useing vt and IOError is provided
248    """
249    expected_error = "expect error"
250    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
251        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
252            with patch("os.path.isfile", MagicMock(return_value=True)):
253                with patch("os.access", MagicMock(return_value=True)):
254                    with patch(
255                        "salt.utils.timed_subprocess.TimedProc",
256                        MagicMock(side_effect=IOError(expected_error)),
257                    ):
258                        with pytest.raises(CommandExecutionError) as error:
259                            cmdmod.run("foo", cwd="/")
260                        assert error.value.args[0].endswith(expected_error)
261
262
263@pytest.mark.skip(reason="Test breaks unittests runs")
264@pytest.mark.skip_on_windows
265def test_run():
266    """
267    Tests end result when a command is not found
268    """
269    with patch("salt.modules.cmdmod._is_valid_shell", MagicMock(return_value=True)):
270        with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
271            with patch("os.path.isfile", MagicMock(return_value=True)):
272                with patch("os.access", MagicMock(return_value=True)):
273                    ret = cmdmod._run("foo", cwd=os.getcwd(), use_vt=True).get("stderr")
274                    assert "foo" in ret
275
276
277@pytest.mark.skip_unless_on_windows
278def test_powershell():
279    """
280    Tests cmd.powershell with a string value output
281    """
282    mock_run = {"pid": 1234, "retcode": 0, "stderr": "", "stdout": '"foo"'}
283    with patch("salt.modules.cmdmod._run", return_value=mock_run):
284        ret = cmdmod.powershell("Set-ExecutionPolicy RemoteSigned")
285        assert ret == "foo"
286
287
288@pytest.mark.skip_unless_on_windows
289def test_powershell_empty():
290    """
291    Tests cmd.powershell when the output is an empty string
292    """
293    mock_run = {"pid": 1234, "retcode": 0, "stderr": "", "stdout": ""}
294    with patch("salt.modules.cmdmod._run", return_value=mock_run):
295        ret = cmdmod.powershell("Set-ExecutionPolicy RemoteSigned")
296        assert ret == {}
297
298
299def test_is_valid_shell_windows():
300    """
301    Tests return if running on windows
302    """
303    with patch("salt.utils.platform.is_windows", MagicMock(return_value=True)):
304        assert cmdmod._is_valid_shell("foo")
305
306
307@pytest.mark.skip_on_windows
308def test_is_valid_shell_none():
309    """
310    Tests return of when os.path.exists(/etc/shells) isn't available
311    """
312    with patch("os.path.exists", MagicMock(return_value=False)):
313        assert cmdmod._is_valid_shell("foo") is None
314
315
316def test_is_valid_shell_available():
317    """
318    Tests return when provided shell is available
319    """
320    with patch("os.path.exists", MagicMock(return_value=True)):
321        with patch("salt.utils.files.fopen", mock_open(read_data=MOCK_SHELL_FILE)):
322            assert cmdmod._is_valid_shell("/bin/bash")
323
324
325@pytest.mark.skip_on_windows
326def test_is_valid_shell_unavailable():
327    """
328    Tests return when provided shell is not available
329    """
330    with patch("os.path.exists", MagicMock(return_value=True)):
331        with patch("salt.utils.files.fopen", mock_open(read_data=MOCK_SHELL_FILE)):
332            assert not cmdmod._is_valid_shell("foo")
333
334
335@pytest.mark.skip_on_windows
336def test_os_environment_remains_intact():
337    """
338    Make sure the OS environment is not tainted after running a command
339    that specifies runas.
340    """
341    with patch("pwd.getpwnam") as getpwnam_mock:
342        with patch("subprocess.Popen") as popen_mock:
343            environment = os.environ.copy()
344
345            popen_mock.return_value = Mock(
346                communicate=lambda *args, **kwags: [b"", None],
347                pid=lambda: 1,
348                retcode=0,
349            )
350
351            with patch.dict(
352                cmdmod.__grains__, {"os": "Darwin", "os_family": "Solaris"}
353            ):
354                if sys.platform.startswith(("freebsd", "openbsd")):
355                    shell = "/bin/sh"
356                else:
357                    shell = "/bin/bash"
358
359                cmdmod._run(
360                    "ls", cwd=tempfile.gettempdir(), runas="foobar", shell=shell
361                )
362
363                environment2 = os.environ.copy()
364
365                assert environment == environment2
366
367                if not salt.utils.platform.is_darwin():
368                    getpwnam_mock.assert_called_with("foobar")
369
370
371@pytest.mark.skip_unless_on_darwin
372def test_shell_properly_handled_on_macOS():
373    """
374    cmd.run should invoke a new bash login only
375    when bash is the default shell for the selected user
376    """
377
378    class _CommandHandler:
379        """
380        Class for capturing cmd
381        """
382
383        def __init__(self):
384            self.cmd = None
385
386        def clear(self):
387            self.cmd = None
388
389    cmd_handler = _CommandHandler()
390
391    def mock_proc(__cmd__, **kwargs):
392        cmd_handler.cmd = " ".join(__cmd__)
393        return MagicMock(return_value=MockTimedProc(stdout=None, stderr=None))
394
395    with patch("pwd.getpwnam") as getpwnam_mock:
396        with patch("salt.utils.timed_subprocess.TimedProc", mock_proc):
397
398            # User default shell is '/usr/local/bin/bash'
399            user_default_shell = "/usr/local/bin/bash"
400            with patch.dict(
401                cmdmod.__salt__,
402                {"user.info": MagicMock(return_value={"shell": user_default_shell})},
403            ):
404
405                cmd_handler.clear()
406                cmdmod._run(
407                    "ls", cwd=tempfile.gettempdir(), runas="foobar", use_vt=False
408                )
409
410                assert re.search(
411                    "{} -l -c".format(user_default_shell), cmd_handler.cmd
412                ), "cmd invokes right bash session on macOS"
413
414            # User default shell is '/bin/zsh'
415            user_default_shell = "/bin/zsh"
416            with patch.dict(
417                cmdmod.__salt__,
418                {"user.info": MagicMock(return_value={"shell": user_default_shell})},
419            ):
420
421                cmd_handler.clear()
422                cmdmod._run(
423                    "ls", cwd=tempfile.gettempdir(), runas="foobar", use_vt=False
424                )
425
426                assert not re.search(
427                    "bash -l -c", cmd_handler.cmd
428                ), "cmd does not invoke user shell on macOS"
429
430
431def test_run_cwd_doesnt_exist_issue_7154():
432    """
433    cmd.run should fail and raise
434    salt.exceptions.CommandExecutionError if the cwd dir does not
435    exist
436    """
437    cmd = "echo OHAI"
438    cwd = "/path/to/nowhere"
439    with pytest.raises(CommandExecutionError):
440        cmdmod.run_all(cmd, cwd=cwd)
441
442
443@pytest.mark.skip_on_darwin
444@pytest.mark.skip_on_windows
445def test_run_cwd_in_combination_with_runas():
446    """
447    cmd.run executes command in the cwd directory
448    when the runas parameter is specified
449    """
450    cmd = "pwd"
451    cwd = "/tmp"
452    runas = os.getlogin()
453
454    with patch.dict(cmdmod.__grains__, {"os": "Darwin", "os_family": "Solaris"}):
455        stdout = cmdmod._run(cmd, cwd=cwd, runas=runas).get("stdout")
456    assert stdout == cwd
457
458
459def test_run_all_binary_replace():
460    """
461    Test for failed decoding of binary data, for instance when doing
462    something silly like using dd to read from /dev/urandom and write to
463    /dev/stdout.
464    """
465    # Since we're using unicode_literals, read the random bytes from a file
466    rand_bytes_file = os.path.join(RUNTIME_VARS.BASE_FILES, "random_bytes")
467    with salt.utils.files.fopen(rand_bytes_file, "rb") as fp_:
468        stdout_bytes = fp_.read()
469
470    # kitchen-salt uses unix2dos on all the files before copying them over
471    # to the vm that will be running the tests. It skips binary files though
472    # The file specified in `rand_bytes_file` is detected as binary so the
473    # Unix-style line ending remains. This should account for that.
474    stdout_bytes = stdout_bytes.rstrip() + os.linesep.encode()
475
476    # stdout with the non-decodable bits replaced with the unicode
477    # replacement character U+FFFD.
478    stdout_unicode = "\ufffd\x1b\ufffd\ufffd" + os.linesep
479    stderr_bytes = (
480        os.linesep.encode().join(
481            [
482                b"1+0 records in",
483                b"1+0 records out",
484                b"4 bytes copied, 9.1522e-05 s, 43.7 kB/s",
485            ]
486        )
487        + os.linesep.encode()
488    )
489    stderr_unicode = stderr_bytes.decode()
490
491    proc = MagicMock(
492        return_value=MockTimedProc(stdout=stdout_bytes, stderr=stderr_bytes)
493    )
494    with patch("salt.utils.timed_subprocess.TimedProc", proc):
495        ret = cmdmod.run_all(
496            "dd if=/dev/urandom of=/dev/stdout bs=4 count=1", rstrip=False
497        )
498
499    assert ret["stdout"] == stdout_unicode
500    assert ret["stderr"] == stderr_unicode
501
502
503def test_run_all_none():
504    """
505    Tests cases when proc.stdout or proc.stderr are None. These should be
506    caught and replaced with empty strings.
507    """
508    proc = MagicMock(return_value=MockTimedProc(stdout=None, stderr=None))
509    with patch("salt.utils.timed_subprocess.TimedProc", proc):
510        ret = cmdmod.run_all("some command", rstrip=False)
511
512    assert ret["stdout"] == ""
513    assert ret["stderr"] == ""
514
515
516def test_run_all_unicode():
517    """
518    Ensure that unicode stdout and stderr are decoded properly
519    """
520    stdout_unicode = "Here is some unicode: спам"
521    stderr_unicode = "Here is some unicode: яйца"
522    stdout_bytes = stdout_unicode.encode("utf-8")
523    stderr_bytes = stderr_unicode.encode("utf-8")
524
525    proc = MagicMock(
526        return_value=MockTimedProc(stdout=stdout_bytes, stderr=stderr_bytes)
527    )
528
529    with patch("salt.utils.timed_subprocess.TimedProc", proc), patch.object(
530        builtins, "__salt_system_encoding__", "utf-8"
531    ):
532        ret = cmdmod.run_all("some command", rstrip=False)
533
534    assert ret["stdout"] == stdout_unicode
535    assert ret["stderr"] == stderr_unicode
536
537
538def test_run_all_output_encoding():
539    """
540    Test that specifying the output encoding works as expected
541    """
542    stdout = "Æ"
543    stdout_latin1_enc = stdout.encode("latin1")
544
545    proc = MagicMock(return_value=MockTimedProc(stdout=stdout_latin1_enc))
546
547    with patch("salt.utils.timed_subprocess.TimedProc", proc), patch.object(
548        builtins, "__salt_system_encoding__", "utf-8"
549    ):
550        ret = cmdmod.run_all("some command", output_encoding="latin1")
551
552    assert ret["stdout"] == stdout
553
554
555def test_run_all_output_loglevel_quiet(caplog):
556    """
557    Test that specifying quiet for loglevel
558    does not log the command.
559    """
560    stdout = b"test"
561    proc = MagicMock(return_value=MockTimedProc(stdout=stdout))
562
563    msg = "Executing command 'some command' in directory"
564    with patch("salt.utils.timed_subprocess.TimedProc", proc):
565        with caplog.at_level(logging.DEBUG, logger="salt.modules.cmdmod"):
566            ret = cmdmod.run_all("some command", output_loglevel="quiet")
567        assert msg not in caplog.text
568
569    assert ret["stdout"] == salt.utils.stringutils.to_unicode(stdout)
570
571
572def test_run_all_output_loglevel_debug(caplog):
573    """
574    Test that specifying debug for loglevel
575    does log the command.
576    """
577    stdout = b"test"
578    proc = MagicMock(return_value=MockTimedProc(stdout=stdout))
579
580    msg = "Executing command 'some' in directory"
581    with patch("salt.utils.timed_subprocess.TimedProc", proc):
582        with caplog.at_level(logging.DEBUG, logger="salt.modules.cmdmod"):
583            ret = cmdmod.run_all("some command", output_loglevel="debug")
584        assert msg in caplog.text
585
586    assert ret["stdout"] == salt.utils.stringutils.to_unicode(stdout)
587
588
589def test_run_chroot_mount():
590    """
591    Test cmdmod.run_chroot mount / umount balance
592    """
593    mock_mount = MagicMock()
594    mock_umount = MagicMock()
595    mock_run_all = MagicMock()
596    with patch.dict(
597        cmdmod.__salt__, {"mount.mount": mock_mount, "mount.umount": mock_umount}
598    ):
599        with patch("salt.modules.cmdmod.run_all", mock_run_all):
600            cmdmod.run_chroot("/mnt", "cmd")
601            assert mock_mount.call_count == 3
602            assert mock_umount.call_count == 3
603
604
605def test_run_chroot_mount_bind():
606    """
607    Test cmdmod.run_chroot mount / umount balance with bind mount
608    """
609    mock_mount = MagicMock()
610    mock_umount = MagicMock()
611    mock_run_all = MagicMock()
612    with patch.dict(
613        cmdmod.__salt__, {"mount.mount": mock_mount, "mount.umount": mock_umount}
614    ):
615        with patch("salt.modules.cmdmod.run_all", mock_run_all):
616            cmdmod.run_chroot("/mnt", "cmd", binds=["/var"])
617            assert mock_mount.call_count == 4
618            assert mock_umount.call_count == 4
619
620
621@pytest.mark.skip_on_windows
622def test_run_chroot_runas():
623    """
624    Test run_chroot when a runas parameter is provided
625    """
626    with patch.dict(
627        cmdmod.__salt__, {"mount.mount": MagicMock(), "mount.umount": MagicMock()}
628    ):
629        with patch("salt.modules.cmdmod.run_all") as run_all_mock:
630            cmdmod.run_chroot("/mnt", "ls", runas="foobar", shell="/bin/sh")
631    run_all_mock.assert_called_with(
632        "chroot --userspec foobar: /mnt /bin/sh -c ls",
633        bg=False,
634        clean_env=False,
635        cwd=None,
636        env=None,
637        ignore_retcode=False,
638        log_callback=None,
639        output_encoding=None,
640        output_loglevel="quiet",
641        pillar=None,
642        pillarenv=None,
643        python_shell=True,
644        reset_system_locale=True,
645        rstrip=True,
646        saltenv="base",
647        shell="/bin/sh",
648        stdin=None,
649        success_retcodes=None,
650        success_stdout=None,
651        success_stderr=None,
652        template=None,
653        timeout=None,
654        umask=None,
655        use_vt=False,
656    )
657
658
659def test_cve_2021_25284(caplog):
660    proc = MagicMock(
661        return_value=MockTimedProc(stdout=b"foo", stderr=b"wtf", returncode=2)
662    )
663    with patch("salt.utils.timed_subprocess.TimedProc", proc):
664        with caplog.at_level(logging.DEBUG, logger="salt.modules.cmdmod"):
665            cmdmod.run("testcmd -p ImAPassword", output_loglevel="error")
666        assert "ImAPassword" not in caplog.text
667
668
669def test__log_cmd_str():
670    "_log_cmd function handles strings"
671    assert cmdmod._log_cmd("foo bar") == "foo"
672
673
674def test__log_cmd_list():
675    "_log_cmd function handles lists"
676    assert cmdmod._log_cmd(["foo", "bar"]) == "foo"
677
678
679def test_log_cmd_tuple():
680    "_log_cmd function handles tuples"
681    assert cmdmod._log_cmd(("foo", "bar")) == "foo"
682
683
684def test_log_cmd_non_str_tuple_list():
685    "_log_cmd function casts objects to strings"
686
687    class cmd:
688        def __init__(self, cmd):
689            self.cmd = cmd
690
691        def __str__(self):
692            return self.cmd
693
694    assert cmdmod._log_cmd(cmd("foo bar")) == "foo"
695