1import logging
2import os
3
4import pytest
5from mock import Mock, patch
6
7from pip._internal.cli.base_command import Command
8from pip._internal.cli.status_codes import SUCCESS
9from pip._internal.utils import temp_dir
10from pip._internal.utils.logging import BrokenStdoutLoggingError
11from pip._internal.utils.temp_dir import TempDirectory
12
13
14@pytest.fixture
15def fixed_time(utc):
16    with patch('time.time', lambda: 1547704837.040001):
17        yield
18
19
20class FakeCommand(Command):
21
22    _name = 'fake'
23
24    def __init__(self, run_func=None, error=False):
25        if error:
26            def run_func():
27                raise SystemExit(1)
28
29        self.run_func = run_func
30        super(FakeCommand, self).__init__(self._name, self._name)
31
32    def main(self, args):
33        args.append("--disable-pip-version-check")
34        return super(FakeCommand, self).main(args)
35
36    def run(self, options, args):
37        logging.getLogger("pip.tests").info("fake")
38        # Return SUCCESS from run if run_func is not provided
39        if self.run_func:
40            return self.run_func()
41        else:
42            return SUCCESS
43
44
45class FakeCommandWithUnicode(FakeCommand):
46    _name = 'fake_unicode'
47
48    def run(self, options, args):
49        logging.getLogger("pip.tests").info(b"bytes here \xE9")
50        logging.getLogger("pip.tests").info(
51            b"unicode here \xC3\xA9".decode("utf-8")
52        )
53
54
55class TestCommand(object):
56
57    def call_main(self, capsys, args):
58        """
59        Call command.main(), and return the command's stderr.
60        """
61        def raise_broken_stdout():
62            raise BrokenStdoutLoggingError()
63
64        cmd = FakeCommand(run_func=raise_broken_stdout)
65        status = cmd.main(args)
66        assert status == 1
67        stderr = capsys.readouterr().err
68
69        return stderr
70
71    def test_raise_broken_stdout(self, capsys):
72        """
73        Test raising BrokenStdoutLoggingError.
74        """
75        stderr = self.call_main(capsys, [])
76
77        assert stderr.rstrip() == 'ERROR: Pipe to stdout was broken'
78
79    def test_raise_broken_stdout__debug_logging(self, capsys):
80        """
81        Test raising BrokenStdoutLoggingError with debug logging enabled.
82        """
83        stderr = self.call_main(capsys, ['-v'])
84
85        assert 'ERROR: Pipe to stdout was broken' in stderr
86        assert 'Traceback (most recent call last):' in stderr
87
88
89@patch('pip._internal.cli.req_command.Command.handle_pip_version_check')
90def test_handle_pip_version_check_called(mock_handle_version_check):
91    """
92    Check that Command.handle_pip_version_check() is called.
93    """
94    cmd = FakeCommand()
95    cmd.main([])
96    mock_handle_version_check.assert_called_once()
97
98
99def test_log_command_success(fixed_time, tmpdir):
100    """Test the --log option logs when command succeeds."""
101    cmd = FakeCommand()
102    log_path = tmpdir.joinpath('log')
103    cmd.main(['fake', '--log', log_path])
104    with open(log_path) as f:
105        assert f.read().rstrip() == '2019-01-17T06:00:37,040 fake'
106
107
108def test_log_command_error(fixed_time, tmpdir):
109    """Test the --log option logs when command fails."""
110    cmd = FakeCommand(error=True)
111    log_path = tmpdir.joinpath('log')
112    cmd.main(['fake', '--log', log_path])
113    with open(log_path) as f:
114        assert f.read().startswith('2019-01-17T06:00:37,040 fake')
115
116
117def test_log_file_command_error(fixed_time, tmpdir):
118    """Test the --log-file option logs (when there's an error)."""
119    cmd = FakeCommand(error=True)
120    log_file_path = tmpdir.joinpath('log_file')
121    cmd.main(['fake', '--log-file', log_file_path])
122    with open(log_file_path) as f:
123        assert f.read().startswith('2019-01-17T06:00:37,040 fake')
124
125
126def test_log_unicode_messages(fixed_time, tmpdir):
127    """Tests that logging bytestrings and unicode objects
128    don't break logging.
129    """
130    cmd = FakeCommandWithUnicode()
131    log_path = tmpdir.joinpath('log')
132    cmd.main(['fake_unicode', '--log', log_path])
133
134
135@pytest.mark.no_auto_tempdir_manager
136def test_base_command_provides_tempdir_helpers():
137    assert temp_dir._tempdir_manager is None
138    assert temp_dir._tempdir_registry is None
139
140    def assert_helpers_set(options, args):
141        assert temp_dir._tempdir_manager is not None
142        assert temp_dir._tempdir_registry is not None
143        return SUCCESS
144
145    c = Command("fake", "fake")
146    c.run = Mock(side_effect=assert_helpers_set)
147    assert c.main(["fake"]) == SUCCESS
148    c.run.assert_called_once()
149
150
151not_deleted = "not_deleted"
152
153
154@pytest.mark.parametrize("kind,exists", [
155    (not_deleted, True), ("deleted", False)
156])
157@pytest.mark.no_auto_tempdir_manager
158def test_base_command_global_tempdir_cleanup(kind, exists):
159    assert temp_dir._tempdir_manager is None
160    assert temp_dir._tempdir_registry is None
161
162    class Holder(object):
163        value = None
164
165    def create_temp_dirs(options, args):
166        c.tempdir_registry.set_delete(not_deleted, False)
167        Holder.value = TempDirectory(kind=kind, globally_managed=True).path
168        return SUCCESS
169
170    c = Command("fake", "fake")
171    c.run = Mock(side_effect=create_temp_dirs)
172    assert c.main(["fake"]) == SUCCESS
173    c.run.assert_called_once()
174    assert os.path.exists(Holder.value) == exists
175
176
177@pytest.mark.parametrize("kind,exists", [
178    (not_deleted, True), ("deleted", False)
179])
180@pytest.mark.no_auto_tempdir_manager
181def test_base_command_local_tempdir_cleanup(kind, exists):
182    assert temp_dir._tempdir_manager is None
183    assert temp_dir._tempdir_registry is None
184
185    def create_temp_dirs(options, args):
186        c.tempdir_registry.set_delete(not_deleted, False)
187
188        with TempDirectory(kind=kind) as d:
189            path = d.path
190            assert os.path.exists(path)
191        assert os.path.exists(path) == exists
192        return SUCCESS
193
194    c = Command("fake", "fake")
195    c.run = Mock(side_effect=create_temp_dirs)
196    assert c.main(["fake"]) == SUCCESS
197    c.run.assert_called_once()
198