1import logging
2
3import pytest
4
5from pip import __version__
6from pip._internal.network.session import CI_ENVIRONMENT_VARIABLES, PipSession
7
8
9def get_user_agent():
10    return PipSession().headers["User-Agent"]
11
12
13def test_user_agent():
14    user_agent = get_user_agent()
15
16    assert user_agent.startswith("pip/{}".format(__version__))
17
18
19@pytest.mark.parametrize('name, expected_like_ci', [
20    ('BUILD_BUILDID', True),
21    ('BUILD_ID', True),
22    ('CI', True),
23    ('PIP_IS_CI', True),
24    # Test a prefix substring of one of the variable names we use.
25    ('BUILD', False),
26])
27def test_user_agent__ci(monkeypatch, name, expected_like_ci):
28    # Delete the variable names we use to check for CI to prevent the
29    # detection from always returning True in case the tests are being run
30    # under actual CI.  It is okay to depend on CI_ENVIRONMENT_VARIABLES
31    # here (part of the code under test) because this setup step can only
32    # prevent false test failures.  It can't cause a false test passage.
33    for ci_name in CI_ENVIRONMENT_VARIABLES:
34        monkeypatch.delenv(ci_name, raising=False)
35
36    # Confirm the baseline before setting the environment variable.
37    user_agent = get_user_agent()
38    assert '"ci":null' in user_agent
39    assert '"ci":true' not in user_agent
40
41    monkeypatch.setenv(name, 'true')
42    user_agent = get_user_agent()
43    assert ('"ci":true' in user_agent) == expected_like_ci
44    assert ('"ci":null' in user_agent) == (not expected_like_ci)
45
46
47def test_user_agent_user_data(monkeypatch):
48    monkeypatch.setenv("PIP_USER_AGENT_USER_DATA", "some_string")
49    assert "some_string" in PipSession().headers["User-Agent"]
50
51
52class TestPipSession:
53
54    def test_cache_defaults_off(self):
55        session = PipSession()
56
57        assert not hasattr(session.adapters["http://"], "cache")
58        assert not hasattr(session.adapters["https://"], "cache")
59
60    def test_cache_is_enabled(self, tmpdir):
61        cache_directory = tmpdir.joinpath("test-cache")
62        session = PipSession(cache=cache_directory)
63
64        assert hasattr(session.adapters["https://"], "cache")
65
66        assert (
67            session.adapters["https://"].cache.directory == cache_directory
68        )
69
70    def test_http_cache_is_not_enabled(self, tmpdir):
71        session = PipSession(cache=tmpdir.joinpath("test-cache"))
72
73        assert not hasattr(session.adapters["http://"], "cache")
74
75    def test_trusted_hosts_adapter(self, tmpdir):
76        session = PipSession(
77            cache=tmpdir.joinpath("test-cache"),
78            trusted_hosts=["example.com"],
79        )
80
81        assert "https://example.com/" in session.adapters
82        # Check that the "port wildcard" is present.
83        assert "https://example.com:" in session.adapters
84        # Check that the cache is enabled.
85        assert hasattr(session.adapters["https://example.com/"], "cache")
86
87    def test_add_trusted_host(self):
88        # Leave a gap to test how the ordering is affected.
89        trusted_hosts = ['host1', 'host3']
90        session = PipSession(trusted_hosts=trusted_hosts)
91        trusted_host_adapter = session._trusted_host_adapter
92        prefix2 = 'https://host2/'
93        prefix3 = 'https://host3/'
94        prefix3_wildcard = 'https://host3:'
95
96        # Confirm some initial conditions as a baseline.
97        assert session.pip_trusted_origins == [
98            ('host1', None), ('host3', None)
99        ]
100        assert session.adapters[prefix3] is trusted_host_adapter
101        assert session.adapters[prefix3_wildcard] is trusted_host_adapter
102
103        assert prefix2 not in session.adapters
104
105        # Test adding a new host.
106        session.add_trusted_host('host2')
107        assert session.pip_trusted_origins == [
108            ('host1', None), ('host3', None), ('host2', None)
109        ]
110        # Check that prefix3 is still present.
111        assert session.adapters[prefix3] is trusted_host_adapter
112        assert session.adapters[prefix2] is trusted_host_adapter
113
114        # Test that adding the same host doesn't create a duplicate.
115        session.add_trusted_host('host3')
116        assert session.pip_trusted_origins == [
117            ('host1', None), ('host3', None), ('host2', None)
118        ], 'actual: {}'.format(session.pip_trusted_origins)
119
120        session.add_trusted_host('host4:8080')
121        prefix4 = 'https://host4:8080/'
122        assert session.pip_trusted_origins == [
123            ('host1', None), ('host3', None),
124            ('host2', None), ('host4', 8080)
125        ]
126        assert session.adapters[prefix4] is trusted_host_adapter
127
128    def test_add_trusted_host__logging(self, caplog):
129        """
130        Test logging when add_trusted_host() is called.
131        """
132        trusted_hosts = ['host0', 'host1']
133        session = PipSession(trusted_hosts=trusted_hosts)
134        with caplog.at_level(logging.INFO):
135            # Test adding an existing host.
136            session.add_trusted_host('host1', source='somewhere')
137            session.add_trusted_host('host2')
138            # Test calling add_trusted_host() on the same host twice.
139            session.add_trusted_host('host2')
140
141        actual = [(r.levelname, r.message) for r in caplog.records]
142        # Observe that "host0" isn't included in the logs.
143        expected = [
144            ('INFO', "adding trusted host: 'host1' (from somewhere)"),
145            ('INFO', "adding trusted host: 'host2'"),
146            ('INFO', "adding trusted host: 'host2'"),
147        ]
148        assert actual == expected
149
150    def test_iter_secure_origins(self):
151        trusted_hosts = ['host1', 'host2', 'host3:8080']
152        session = PipSession(trusted_hosts=trusted_hosts)
153
154        actual = list(session.iter_secure_origins())
155        assert len(actual) == 9
156        # Spot-check that SECURE_ORIGINS is included.
157        assert actual[0] == ('https', '*', '*')
158        assert actual[-3:] == [
159            ('*', 'host1', '*'),
160            ('*', 'host2', '*'),
161            ('*', 'host3', 8080)
162        ]
163
164    def test_iter_secure_origins__trusted_hosts_empty(self):
165        """
166        Test iter_secure_origins() after passing trusted_hosts=[].
167        """
168        session = PipSession(trusted_hosts=[])
169
170        actual = list(session.iter_secure_origins())
171        assert len(actual) == 6
172        # Spot-check that SECURE_ORIGINS is included.
173        assert actual[0] == ('https', '*', '*')
174
175    @pytest.mark.parametrize(
176        'location, trusted, expected',
177        [
178            ("http://pypi.org/something", [], False),
179            ("https://pypi.org/something", [], True),
180            ("git+http://pypi.org/something", [], False),
181            ("git+https://pypi.org/something", [], True),
182            ("git+ssh://git@pypi.org/something", [], True),
183            ("http://localhost", [], True),
184            ("http://127.0.0.1", [], True),
185            ("http://example.com/something/", [], False),
186            ("http://example.com/something/", ["example.com"], True),
187            # Try changing the case.
188            ("http://eXample.com/something/", ["example.cOm"], True),
189            # Test hosts with port.
190            ("http://example.com:8080/something/", ["example.com"], True),
191            # Test a trusted_host with a port.
192            ("http://example.com:8080/something/", ["example.com:8080"], True),
193            ("http://example.com/something/", ["example.com:8080"], False),
194            (
195                "http://example.com:8888/something/",
196                ["example.com:8080"],
197                False
198            ),
199        ],
200    )
201    def test_is_secure_origin(self, caplog, location, trusted, expected):
202        class MockLogger(object):
203            def __init__(self):
204                self.called = False
205
206            def warning(self, *args, **kwargs):
207                self.called = True
208
209        session = PipSession(trusted_hosts=trusted)
210        actual = session.is_secure_origin(location)
211        assert actual == expected
212
213        log_records = [(r.levelname, r.message) for r in caplog.records]
214        if expected:
215            assert not log_records
216            return
217
218        assert len(log_records) == 1
219        actual_level, actual_message = log_records[0]
220        assert actual_level == 'WARNING'
221        assert 'is not a trusted or secure host' in actual_message
222