1# -*- coding: utf-8 -*-
2
3"""Tests for Requests."""
4
5from __future__ import division
6import json
7import os
8import pickle
9import collections
10import contextlib
11import warnings
12import re
13
14import io
15import requests
16import pytest
17from requests.adapters import HTTPAdapter
18from requests.auth import HTTPDigestAuth, _basic_auth_str
19from requests.compat import (
20    Morsel, cookielib, getproxies, str, urlparse,
21    builtin_str)
22from requests.cookies import (
23    cookiejar_from_dict, morsel_to_cookie)
24from requests.exceptions import (
25    ConnectionError, ConnectTimeout, InvalidSchema, InvalidURL,
26    MissingSchema, ReadTimeout, Timeout, RetryError, TooManyRedirects,
27    ProxyError, InvalidHeader, UnrewindableBodyError, SSLError, InvalidProxyURL)
28from requests.models import PreparedRequest
29from requests.structures import CaseInsensitiveDict
30from requests.sessions import SessionRedirectMixin
31from requests.models import urlencode
32from requests.hooks import default_hooks
33from requests.compat import MutableMapping
34
35from .compat import StringIO, u
36from .utils import override_environ
37from urllib3.util import Timeout as Urllib3Timeout
38
39# Requests to this URL should always fail with a connection timeout (nothing
40# listening on that port)
41TARPIT = 'http://10.255.255.1'
42
43try:
44    from ssl import SSLContext
45    del SSLContext
46    HAS_MODERN_SSL = True
47except ImportError:
48    HAS_MODERN_SSL = False
49
50try:
51    requests.pyopenssl
52    HAS_PYOPENSSL = True
53except AttributeError:
54    HAS_PYOPENSSL = False
55
56
57class TestRequests:
58
59    digest_auth_algo = ('MD5', 'SHA-256', 'SHA-512')
60
61    def test_entry_points(self):
62
63        requests.session
64        requests.session().get
65        requests.session().head
66        requests.get
67        requests.head
68        requests.put
69        requests.patch
70        requests.post
71        # Not really an entry point, but people rely on it.
72        from requests.packages.urllib3.poolmanager import PoolManager
73
74    @pytest.mark.parametrize(
75        'exception, url', (
76            (MissingSchema, 'hiwpefhipowhefopw'),
77            (InvalidSchema, 'localhost:3128'),
78            (InvalidSchema, 'localhost.localdomain:3128/'),
79            (InvalidSchema, '10.122.1.1:3128/'),
80            (InvalidURL, 'http://'),
81        ))
82    def test_invalid_url(self, exception, url):
83        with pytest.raises(exception):
84            requests.get(url)
85
86    def test_basic_building(self):
87        req = requests.Request()
88        req.url = 'http://kennethreitz.org/'
89        req.data = {'life': '42'}
90
91        pr = req.prepare()
92        assert pr.url == req.url
93        assert pr.body == 'life=42'
94
95    @pytest.mark.parametrize('method', ('GET', 'HEAD'))
96    def test_no_content_length(self, httpbin, method):
97        req = requests.Request(method, httpbin(method.lower())).prepare()
98        assert 'Content-Length' not in req.headers
99
100    @pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS'))
101    def test_no_body_content_length(self, httpbin, method):
102        req = requests.Request(method, httpbin(method.lower())).prepare()
103        assert req.headers['Content-Length'] == '0'
104
105    @pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS'))
106    def test_empty_content_length(self, httpbin, method):
107        req = requests.Request(method, httpbin(method.lower()), data='').prepare()
108        assert req.headers['Content-Length'] == '0'
109
110    def test_override_content_length(self, httpbin):
111        headers = {
112            'Content-Length': 'not zero'
113        }
114        r = requests.Request('POST', httpbin('post'), headers=headers).prepare()
115        assert 'Content-Length' in r.headers
116        assert r.headers['Content-Length'] == 'not zero'
117
118    def test_path_is_not_double_encoded(self):
119        request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare()
120
121        assert request.path_url == '/get/test%20case'
122
123    @pytest.mark.parametrize(
124        'url, expected', (
125            ('http://example.com/path#fragment', 'http://example.com/path?a=b#fragment'),
126            ('http://example.com/path?key=value#fragment', 'http://example.com/path?key=value&a=b#fragment')
127        ))
128    def test_params_are_added_before_fragment(self, url, expected):
129        request = requests.Request('GET', url, params={"a": "b"}).prepare()
130        assert request.url == expected
131
132    def test_params_original_order_is_preserved_by_default(self):
133        param_ordered_dict = collections.OrderedDict((('z', 1), ('a', 1), ('k', 1), ('d', 1)))
134        session = requests.Session()
135        request = requests.Request('GET', 'http://example.com/', params=param_ordered_dict)
136        prep = session.prepare_request(request)
137        assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1'
138
139    def test_params_bytes_are_encoded(self):
140        request = requests.Request('GET', 'http://example.com',
141                                   params=b'test=foo').prepare()
142        assert request.url == 'http://example.com/?test=foo'
143
144    def test_binary_put(self):
145        request = requests.Request('PUT', 'http://example.com',
146                                   data=u"ööö".encode("utf-8")).prepare()
147        assert isinstance(request.body, bytes)
148
149    def test_whitespaces_are_removed_from_url(self):
150        # Test for issue #3696
151        request = requests.Request('GET', ' http://example.com').prepare()
152        assert request.url == 'http://example.com/'
153
154    @pytest.mark.parametrize('scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://'))
155    def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
156        s = requests.Session()
157        s.proxies = getproxies()
158        parts = urlparse(httpbin('get'))
159        url = scheme + parts.netloc + parts.path
160        r = requests.Request('GET', url)
161        r = s.send(r.prepare())
162        assert r.status_code == 200, 'failed for scheme {}'.format(scheme)
163
164    def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
165        r = requests.Request('GET', httpbin('get'))
166        s = requests.Session()
167        s.proxies = getproxies()
168
169        r = s.send(r.prepare())
170
171        assert r.status_code == 200
172
173    def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
174        r = requests.get(httpbin('redirect', '1'))
175        assert r.status_code == 200
176        assert r.history[0].status_code == 302
177        assert r.history[0].is_redirect
178
179    def test_HTTP_307_ALLOW_REDIRECT_POST(self, httpbin):
180        r = requests.post(httpbin('redirect-to'), data='test', params={'url': 'post', 'status_code': 307})
181        assert r.status_code == 200
182        assert r.history[0].status_code == 307
183        assert r.history[0].is_redirect
184        assert r.json()['data'] == 'test'
185
186    def test_HTTP_307_ALLOW_REDIRECT_POST_WITH_SEEKABLE(self, httpbin):
187        byte_str = b'test'
188        r = requests.post(httpbin('redirect-to'), data=io.BytesIO(byte_str), params={'url': 'post', 'status_code': 307})
189        assert r.status_code == 200
190        assert r.history[0].status_code == 307
191        assert r.history[0].is_redirect
192        assert r.json()['data'] == byte_str.decode('utf-8')
193
194    def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin):
195        try:
196            requests.get(httpbin('relative-redirect', '50'))
197        except TooManyRedirects as e:
198            url = httpbin('relative-redirect', '20')
199            assert e.request.url == url
200            assert e.response.url == url
201            assert len(e.response.history) == 30
202        else:
203            pytest.fail('Expected redirect to raise TooManyRedirects but it did not')
204
205    def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
206        s = requests.session()
207        s.max_redirects = 5
208        try:
209            s.get(httpbin('relative-redirect', '50'))
210        except TooManyRedirects as e:
211            url = httpbin('relative-redirect', '45')
212            assert e.request.url == url
213            assert e.response.url == url
214            assert len(e.response.history) == 5
215        else:
216            pytest.fail('Expected custom max number of redirects to be respected but was not')
217
218    def test_http_301_changes_post_to_get(self, httpbin):
219        r = requests.post(httpbin('status', '301'))
220        assert r.status_code == 200
221        assert r.request.method == 'GET'
222        assert r.history[0].status_code == 301
223        assert r.history[0].is_redirect
224
225    def test_http_301_doesnt_change_head_to_get(self, httpbin):
226        r = requests.head(httpbin('status', '301'), allow_redirects=True)
227        print(r.content)
228        assert r.status_code == 200
229        assert r.request.method == 'HEAD'
230        assert r.history[0].status_code == 301
231        assert r.history[0].is_redirect
232
233    def test_http_302_changes_post_to_get(self, httpbin):
234        r = requests.post(httpbin('status', '302'))
235        assert r.status_code == 200
236        assert r.request.method == 'GET'
237        assert r.history[0].status_code == 302
238        assert r.history[0].is_redirect
239
240    def test_http_302_doesnt_change_head_to_get(self, httpbin):
241        r = requests.head(httpbin('status', '302'), allow_redirects=True)
242        assert r.status_code == 200
243        assert r.request.method == 'HEAD'
244        assert r.history[0].status_code == 302
245        assert r.history[0].is_redirect
246
247    def test_http_303_changes_post_to_get(self, httpbin):
248        r = requests.post(httpbin('status', '303'))
249        assert r.status_code == 200
250        assert r.request.method == 'GET'
251        assert r.history[0].status_code == 303
252        assert r.history[0].is_redirect
253
254    def test_http_303_doesnt_change_head_to_get(self, httpbin):
255        r = requests.head(httpbin('status', '303'), allow_redirects=True)
256        assert r.status_code == 200
257        assert r.request.method == 'HEAD'
258        assert r.history[0].status_code == 303
259        assert r.history[0].is_redirect
260
261    def test_header_and_body_removal_on_redirect(self, httpbin):
262        purged_headers = ('Content-Length', 'Content-Type')
263        ses = requests.Session()
264        req = requests.Request('POST', httpbin('post'), data={'test': 'data'})
265        prep = ses.prepare_request(req)
266        resp = ses.send(prep)
267
268        # Mimic a redirect response
269        resp.status_code = 302
270        resp.headers['location'] = 'get'
271
272        # Run request through resolve_redirects
273        next_resp = next(ses.resolve_redirects(resp, prep))
274        assert next_resp.request.body is None
275        for header in purged_headers:
276            assert header not in next_resp.request.headers
277
278    def test_transfer_enc_removal_on_redirect(self, httpbin):
279        purged_headers = ('Transfer-Encoding', 'Content-Type')
280        ses = requests.Session()
281        req = requests.Request('POST', httpbin('post'), data=(b'x' for x in range(1)))
282        prep = ses.prepare_request(req)
283        assert 'Transfer-Encoding' in prep.headers
284
285        # Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33
286        resp = requests.Response()
287        resp.raw = io.BytesIO(b'the content')
288        resp.request = prep
289        setattr(resp.raw, 'release_conn', lambda *args: args)
290
291        # Mimic a redirect response
292        resp.status_code = 302
293        resp.headers['location'] = httpbin('get')
294
295        # Run request through resolve_redirect
296        next_resp = next(ses.resolve_redirects(resp, prep))
297        assert next_resp.request.body is None
298        for header in purged_headers:
299            assert header not in next_resp.request.headers
300
301    def test_fragment_maintained_on_redirect(self, httpbin):
302        fragment = "#view=edit&token=hunter2"
303        r = requests.get(httpbin('redirect-to?url=get')+fragment)
304
305        assert len(r.history) > 0
306        assert r.history[0].request.url == httpbin('redirect-to?url=get')+fragment
307        assert r.url == httpbin('get')+fragment
308
309    def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
310        heads = {'User-agent': 'Mozilla/5.0'}
311
312        r = requests.get(httpbin('user-agent'), headers=heads)
313
314        assert heads['User-agent'] in r.text
315        assert r.status_code == 200
316
317    def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
318        heads = {'User-agent': 'Mozilla/5.0'}
319
320        r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
321        assert r.status_code == 200
322
323    def test_set_cookie_on_301(self, httpbin):
324        s = requests.session()
325        url = httpbin('cookies/set?foo=bar')
326        s.get(url)
327        assert s.cookies['foo'] == 'bar'
328
329    def test_cookie_sent_on_redirect(self, httpbin):
330        s = requests.session()
331        s.get(httpbin('cookies/set?foo=bar'))
332        r = s.get(httpbin('redirect/1'))  # redirects to httpbin('get')
333        assert 'Cookie' in r.json()['headers']
334
335    def test_cookie_removed_on_expire(self, httpbin):
336        s = requests.session()
337        s.get(httpbin('cookies/set?foo=bar'))
338        assert s.cookies['foo'] == 'bar'
339        s.get(
340            httpbin('response-headers'),
341            params={
342                'Set-Cookie':
343                    'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'
344            }
345        )
346        assert 'foo' not in s.cookies
347
348    def test_cookie_quote_wrapped(self, httpbin):
349        s = requests.session()
350        s.get(httpbin('cookies/set?foo="bar:baz"'))
351        assert s.cookies['foo'] == '"bar:baz"'
352
353    def test_cookie_persists_via_api(self, httpbin):
354        s = requests.session()
355        r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
356        assert 'foo' in r.request.headers['Cookie']
357        assert 'foo' in r.history[0].request.headers['Cookie']
358
359    def test_request_cookie_overrides_session_cookie(self, httpbin):
360        s = requests.session()
361        s.cookies['foo'] = 'bar'
362        r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
363        assert r.json()['cookies']['foo'] == 'baz'
364        # Session cookie should not be modified
365        assert s.cookies['foo'] == 'bar'
366
367    def test_request_cookies_not_persisted(self, httpbin):
368        s = requests.session()
369        s.get(httpbin('cookies'), cookies={'foo': 'baz'})
370        # Sending a request with cookies should not add cookies to the session
371        assert not s.cookies
372
373    def test_generic_cookiejar_works(self, httpbin):
374        cj = cookielib.CookieJar()
375        cookiejar_from_dict({'foo': 'bar'}, cj)
376        s = requests.session()
377        s.cookies = cj
378        r = s.get(httpbin('cookies'))
379        # Make sure the cookie was sent
380        assert r.json()['cookies']['foo'] == 'bar'
381        # Make sure the session cj is still the custom one
382        assert s.cookies is cj
383
384    def test_param_cookiejar_works(self, httpbin):
385        cj = cookielib.CookieJar()
386        cookiejar_from_dict({'foo': 'bar'}, cj)
387        s = requests.session()
388        r = s.get(httpbin('cookies'), cookies=cj)
389        # Make sure the cookie was sent
390        assert r.json()['cookies']['foo'] == 'bar'
391
392    def test_cookielib_cookiejar_on_redirect(self, httpbin):
393        """Tests resolve_redirect doesn't fail when merging cookies
394        with non-RequestsCookieJar cookiejar.
395
396        See GH #3579
397        """
398        cj = cookiejar_from_dict({'foo': 'bar'}, cookielib.CookieJar())
399        s = requests.Session()
400        s.cookies = cookiejar_from_dict({'cookie': 'tasty'})
401
402        # Prepare request without using Session
403        req = requests.Request('GET', httpbin('headers'), cookies=cj)
404        prep_req = req.prepare()
405
406        # Send request and simulate redirect
407        resp = s.send(prep_req)
408        resp.status_code = 302
409        resp.headers['location'] = httpbin('get')
410        redirects = s.resolve_redirects(resp, prep_req)
411        resp = next(redirects)
412
413        # Verify CookieJar isn't being converted to RequestsCookieJar
414        assert isinstance(prep_req._cookies, cookielib.CookieJar)
415        assert isinstance(resp.request._cookies, cookielib.CookieJar)
416        assert not isinstance(resp.request._cookies, requests.cookies.RequestsCookieJar)
417
418        cookies = {}
419        for c in resp.request._cookies:
420            cookies[c.name] = c.value
421        assert cookies['foo'] == 'bar'
422        assert cookies['cookie'] == 'tasty'
423
424    def test_requests_in_history_are_not_overridden(self, httpbin):
425        resp = requests.get(httpbin('redirect/3'))
426        urls = [r.url for r in resp.history]
427        req_urls = [r.request.url for r in resp.history]
428        assert urls == req_urls
429
430    def test_history_is_always_a_list(self, httpbin):
431        """Show that even with redirects, Response.history is always a list."""
432        resp = requests.get(httpbin('get'))
433        assert isinstance(resp.history, list)
434        resp = requests.get(httpbin('redirect/1'))
435        assert isinstance(resp.history, list)
436        assert not isinstance(resp.history, tuple)
437
438    def test_headers_on_session_with_None_are_not_sent(self, httpbin):
439        """Do not send headers in Session.headers with None values."""
440        ses = requests.Session()
441        ses.headers['Accept-Encoding'] = None
442        req = requests.Request('GET', httpbin('get'))
443        prep = ses.prepare_request(req)
444        assert 'Accept-Encoding' not in prep.headers
445
446    def test_headers_preserve_order(self, httpbin):
447        """Preserve order when headers provided as OrderedDict."""
448        ses = requests.Session()
449        ses.headers = collections.OrderedDict()
450        ses.headers['Accept-Encoding'] = 'identity'
451        ses.headers['First'] = '1'
452        ses.headers['Second'] = '2'
453        headers = collections.OrderedDict([('Third', '3'), ('Fourth', '4')])
454        headers['Fifth'] = '5'
455        headers['Second'] = '222'
456        req = requests.Request('GET', httpbin('get'), headers=headers)
457        prep = ses.prepare_request(req)
458        items = list(prep.headers.items())
459        assert items[0] == ('Accept-Encoding', 'identity')
460        assert items[1] == ('First', '1')
461        assert items[2] == ('Second', '222')
462        assert items[3] == ('Third', '3')
463        assert items[4] == ('Fourth', '4')
464        assert items[5] == ('Fifth', '5')
465
466    @pytest.mark.parametrize('key', ('User-agent', 'user-agent'))
467    def test_user_agent_transfers(self, httpbin, key):
468
469        heads = {key: 'Mozilla/5.0 (github.com/psf/requests)'}
470
471        r = requests.get(httpbin('user-agent'), headers=heads)
472        assert heads[key] in r.text
473
474    def test_HTTP_200_OK_HEAD(self, httpbin):
475        r = requests.head(httpbin('get'))
476        assert r.status_code == 200
477
478    def test_HTTP_200_OK_PUT(self, httpbin):
479        r = requests.put(httpbin('put'))
480        assert r.status_code == 200
481
482    def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
483        auth = ('user', 'pass')
484        url = httpbin('basic-auth', 'user', 'pass')
485
486        r = requests.get(url, auth=auth)
487        assert r.status_code == 200
488
489        r = requests.get(url)
490        assert r.status_code == 401
491
492        s = requests.session()
493        s.auth = auth
494        r = s.get(url)
495        assert r.status_code == 200
496
497    @pytest.mark.parametrize(
498        'username, password', (
499            ('user', 'pass'),
500            (u'имя'.encode('utf-8'), u'пароль'.encode('utf-8')),
501            (42, 42),
502            (None, None),
503        ))
504    def test_set_basicauth(self, httpbin, username, password):
505        auth = (username, password)
506        url = httpbin('get')
507
508        r = requests.Request('GET', url, auth=auth)
509        p = r.prepare()
510
511        assert p.headers['Authorization'] == _basic_auth_str(username, password)
512
513    def test_basicauth_encodes_byte_strings(self):
514        """Ensure b'test' formats as the byte string "test" rather
515        than the unicode string "b'test'" in Python 3.
516        """
517        auth = (b'\xc5\xafsername', b'test\xc6\xb6')
518        r = requests.Request('GET', 'http://localhost', auth=auth)
519        p = r.prepare()
520
521        assert p.headers['Authorization'] == 'Basic xa9zZXJuYW1lOnRlc3TGtg=='
522
523    @pytest.mark.parametrize(
524        'url, exception', (
525            # Connecting to an unknown domain should raise a ConnectionError
526            ('http://doesnotexist.google.com', ConnectionError),
527            # Connecting to an invalid port should raise a ConnectionError
528            ('http://localhost:1', ConnectionError),
529            # Inputing a URL that cannot be parsed should raise an InvalidURL error
530            ('http://fe80::5054:ff:fe5a:fc0', InvalidURL)
531        ))
532    def test_errors(self, url, exception):
533        with pytest.raises(exception):
534            requests.get(url, timeout=1)
535
536    def test_proxy_error(self):
537        # any proxy related error (address resolution, no route to host, etc) should result in a ProxyError
538        with pytest.raises(ProxyError):
539            requests.get('http://localhost:1', proxies={'http': 'non-resolvable-address'})
540
541    def test_proxy_error_on_bad_url(self, httpbin, httpbin_secure):
542        with pytest.raises(InvalidProxyURL):
543            requests.get(httpbin_secure(), proxies={'https': 'http:/badproxyurl:3128'})
544
545        with pytest.raises(InvalidProxyURL):
546            requests.get(httpbin(), proxies={'http': 'http://:8080'})
547
548        with pytest.raises(InvalidProxyURL):
549            requests.get(httpbin_secure(), proxies={'https': 'https://'})
550
551        with pytest.raises(InvalidProxyURL):
552            requests.get(httpbin(), proxies={'http': 'http:///example.com:8080'})
553
554    def test_basicauth_with_netrc(self, httpbin):
555        auth = ('user', 'pass')
556        wrong_auth = ('wronguser', 'wrongpass')
557        url = httpbin('basic-auth', 'user', 'pass')
558
559        old_auth = requests.sessions.get_netrc_auth
560
561        try:
562            def get_netrc_auth_mock(url):
563                return auth
564            requests.sessions.get_netrc_auth = get_netrc_auth_mock
565
566            # Should use netrc and work.
567            r = requests.get(url)
568            assert r.status_code == 200
569
570            # Given auth should override and fail.
571            r = requests.get(url, auth=wrong_auth)
572            assert r.status_code == 401
573
574            s = requests.session()
575
576            # Should use netrc and work.
577            r = s.get(url)
578            assert r.status_code == 200
579
580            # Given auth should override and fail.
581            s.auth = wrong_auth
582            r = s.get(url)
583            assert r.status_code == 401
584        finally:
585            requests.sessions.get_netrc_auth = old_auth
586
587    def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
588
589        for authtype in self.digest_auth_algo:
590            auth = HTTPDigestAuth('user', 'pass')
591            url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype, 'never')
592
593            r = requests.get(url, auth=auth)
594            assert r.status_code == 200
595
596            r = requests.get(url)
597            assert r.status_code == 401
598            print(r.headers['WWW-Authenticate'])
599
600            s = requests.session()
601            s.auth = HTTPDigestAuth('user', 'pass')
602            r = s.get(url)
603            assert r.status_code == 200
604
605    def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
606
607        for authtype in self.digest_auth_algo:
608            url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype)
609            auth = HTTPDigestAuth('user', 'pass')
610            r = requests.get(url)
611            assert r.cookies['fake'] == 'fake_value'
612
613            r = requests.get(url, auth=auth)
614            assert r.status_code == 200
615
616    def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
617
618        for authtype in self.digest_auth_algo:
619            url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype)
620            auth = HTTPDigestAuth('user', 'pass')
621            s = requests.Session()
622            s.get(url, auth=auth)
623            assert s.cookies['fake'] == 'fake_value'
624
625    def test_DIGEST_STREAM(self, httpbin):
626
627        for authtype in self.digest_auth_algo:
628            auth = HTTPDigestAuth('user', 'pass')
629            url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype)
630
631            r = requests.get(url, auth=auth, stream=True)
632            assert r.raw.read() != b''
633
634            r = requests.get(url, auth=auth, stream=False)
635            assert r.raw.read() == b''
636
637    def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
638
639        for authtype in self.digest_auth_algo:
640            auth = HTTPDigestAuth('user', 'wrongpass')
641            url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype)
642
643            r = requests.get(url, auth=auth)
644            assert r.status_code == 401
645
646            r = requests.get(url)
647            assert r.status_code == 401
648
649            s = requests.session()
650            s.auth = auth
651            r = s.get(url)
652            assert r.status_code == 401
653
654    def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
655
656        for authtype in self.digest_auth_algo:
657            auth = HTTPDigestAuth('user', 'pass')
658            url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype)
659
660            r = requests.get(url, auth=auth)
661            assert '"auth"' in r.request.headers['Authorization']
662
663    def test_POSTBIN_GET_POST_FILES(self, httpbin):
664
665        url = httpbin('post')
666        requests.post(url).raise_for_status()
667
668        post1 = requests.post(url, data={'some': 'data'})
669        assert post1.status_code == 200
670
671        with open('requirements-dev.txt') as f:
672            post2 = requests.post(url, files={'some': f})
673        assert post2.status_code == 200
674
675        post4 = requests.post(url, data='[{"some": "json"}]')
676        assert post4.status_code == 200
677
678        with pytest.raises(ValueError):
679            requests.post(url, files=['bad file data'])
680
681    def test_invalid_files_input(self, httpbin):
682
683        url = httpbin('post')
684        post = requests.post(url,
685                             files={"random-file-1": None, "random-file-2": 1})
686        assert b'name="random-file-1"' not in post.request.body
687        assert b'name="random-file-2"' in post.request.body
688
689    def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin):
690
691        class TestStream(object):
692            def __init__(self, data):
693                self.data = data.encode()
694                self.length = len(self.data)
695                self.index = 0
696
697            def __len__(self):
698                return self.length
699
700            def read(self, size=None):
701                if size:
702                    ret = self.data[self.index:self.index + size]
703                    self.index += size
704                else:
705                    ret = self.data[self.index:]
706                    self.index = self.length
707                return ret
708
709            def tell(self):
710                return self.index
711
712            def seek(self, offset, where=0):
713                if where == 0:
714                    self.index = offset
715                elif where == 1:
716                    self.index += offset
717                elif where == 2:
718                    self.index = self.length + offset
719
720        test = TestStream('test')
721        post1 = requests.post(httpbin('post'), data=test)
722        assert post1.status_code == 200
723        assert post1.json()['data'] == 'test'
724
725        test = TestStream('test')
726        test.seek(2)
727        post2 = requests.post(httpbin('post'), data=test)
728        assert post2.status_code == 200
729        assert post2.json()['data'] == 'st'
730
731    def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
732
733        url = httpbin('post')
734        requests.post(url).raise_for_status()
735
736        post1 = requests.post(url, data={'some': 'data'})
737        assert post1.status_code == 200
738
739        with open('requirements-dev.txt') as f:
740            post2 = requests.post(url, data={'some': 'data'}, files={'some': f})
741        assert post2.status_code == 200
742
743        post4 = requests.post(url, data='[{"some": "json"}]')
744        assert post4.status_code == 200
745
746        with pytest.raises(ValueError):
747            requests.post(url, files=['bad file data'])
748
749    def test_post_with_custom_mapping(self, httpbin):
750        class CustomMapping(MutableMapping):
751            def __init__(self, *args, **kwargs):
752                self.data = dict(*args, **kwargs)
753
754            def __delitem__(self, key):
755                del self.data[key]
756
757            def __getitem__(self, key):
758                return self.data[key]
759
760            def __setitem__(self, key, value):
761                self.data[key] = value
762
763            def __iter__(self):
764                return iter(self.data)
765
766            def __len__(self):
767                return len(self.data)
768
769        data = CustomMapping({'some': 'data'})
770        url = httpbin('post')
771        found_json = requests.post(url, data=data).json().get('form')
772        assert found_json == {'some': 'data'}
773
774    def test_conflicting_post_params(self, httpbin):
775        url = httpbin('post')
776        with open('requirements-dev.txt') as f:
777            with pytest.raises(ValueError):
778                requests.post(url, data='[{"some": "data"}]', files={'some': f})
779            with pytest.raises(ValueError):
780                requests.post(url, data=u('[{"some": "data"}]'), files={'some': f})
781
782    def test_request_ok_set(self, httpbin):
783        r = requests.get(httpbin('status', '404'))
784        assert not r.ok
785
786    def test_status_raising(self, httpbin):
787        r = requests.get(httpbin('status', '404'))
788        with pytest.raises(requests.exceptions.HTTPError):
789            r.raise_for_status()
790
791        r = requests.get(httpbin('status', '500'))
792        assert not r.ok
793
794    def test_decompress_gzip(self, httpbin):
795        r = requests.get(httpbin('gzip'))
796        r.content.decode('ascii')
797
798    @pytest.mark.parametrize(
799        'url, params', (
800            ('/get', {'foo': 'føø'}),
801            ('/get', {'føø': 'føø'}),
802            ('/get', {'føø': 'føø'}),
803            ('/get', {'foo': 'foo'}),
804            ('ø', {'foo': 'foo'}),
805        ))
806    def test_unicode_get(self, httpbin, url, params):
807        requests.get(httpbin(url), params=params)
808
809    def test_unicode_header_name(self, httpbin):
810        requests.put(
811            httpbin('put'),
812            headers={str('Content-Type'): 'application/octet-stream'},
813            data='\xff')  # compat.str is unicode.
814
815    def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle):
816        requests.get(httpbin_secure('status', '301'), verify=httpbin_ca_bundle)
817
818    def test_invalid_ca_certificate_path(self, httpbin_secure):
819        INVALID_PATH = '/garbage'
820        with pytest.raises(IOError) as e:
821            requests.get(httpbin_secure(), verify=INVALID_PATH)
822        assert str(e.value) == 'Could not find a suitable TLS CA certificate bundle, invalid path: {}'.format(INVALID_PATH)
823
824    def test_invalid_ssl_certificate_files(self, httpbin_secure):
825        INVALID_PATH = '/garbage'
826        with pytest.raises(IOError) as e:
827            requests.get(httpbin_secure(), cert=INVALID_PATH)
828        assert str(e.value) == 'Could not find the TLS certificate file, invalid path: {}'.format(INVALID_PATH)
829
830        with pytest.raises(IOError) as e:
831            requests.get(httpbin_secure(), cert=('.', INVALID_PATH))
832        assert str(e.value) == 'Could not find the TLS key file, invalid path: {}'.format(INVALID_PATH)
833
834    def test_http_with_certificate(self, httpbin):
835        r = requests.get(httpbin(), cert='.')
836        assert r.status_code == 200
837
838    def test_https_warnings(self, httpbin_secure, httpbin_ca_bundle):
839        """warnings are emitted with requests.get"""
840        if HAS_MODERN_SSL or HAS_PYOPENSSL:
841            warnings_expected = ()
842        else:
843            warnings_expected = ('SNIMissingWarning',
844                                 'InsecurePlatformWarning',
845                                 'SubjectAltNameWarning', )
846
847        with pytest.warns(None) as warning_records:
848            warnings.simplefilter('always')
849            requests.get(httpbin_secure('status', '200'),
850                         verify=httpbin_ca_bundle)
851
852        warning_records = [item for item in warning_records
853                           if item.category.__name__ != 'ResourceWarning']
854
855        warnings_category = tuple(
856            item.category.__name__ for item in warning_records)
857        assert warnings_category == warnings_expected
858
859    def test_certificate_failure(self, httpbin_secure):
860        """
861        When underlying SSL problems occur, an SSLError is raised.
862        """
863        with pytest.raises(SSLError):
864            # Our local httpbin does not have a trusted CA, so this call will
865            # fail if we use our default trust bundle.
866            requests.get(httpbin_secure('status', '200'))
867
868    def test_urlencoded_get_query_multivalued_param(self, httpbin):
869
870        r = requests.get(httpbin('get'), params={'test': ['foo', 'baz']})
871        assert r.status_code == 200
872        assert r.url == httpbin('get?test=foo&test=baz')
873
874    def test_form_encoded_post_query_multivalued_element(self, httpbin):
875        r = requests.Request(method='POST', url=httpbin('post'),
876                             data=dict(test=['foo', 'baz']))
877        prep = r.prepare()
878        assert prep.body == 'test=foo&test=baz'
879
880    def test_different_encodings_dont_break_post(self, httpbin):
881        r = requests.post(httpbin('post'),
882            data={'stuff': json.dumps({'a': 123})},
883            params={'blah': 'asdf1234'},
884            files={'file': ('test_requests.py', open(__file__, 'rb'))})
885        assert r.status_code == 200
886
887    @pytest.mark.parametrize(
888        'data', (
889            {'stuff': u('ëlïxr')},
890            {'stuff': u('ëlïxr').encode('utf-8')},
891            {'stuff': 'elixr'},
892            {'stuff': 'elixr'.encode('utf-8')},
893        ))
894    def test_unicode_multipart_post(self, httpbin, data):
895        r = requests.post(httpbin('post'),
896            data=data,
897            files={'file': ('test_requests.py', open(__file__, 'rb'))})
898        assert r.status_code == 200
899
900    def test_unicode_multipart_post_fieldnames(self, httpbin):
901        filename = os.path.splitext(__file__)[0] + '.py'
902        r = requests.Request(
903            method='POST', url=httpbin('post'),
904            data={'stuff'.encode('utf-8'): 'elixr'},
905            files={'file': ('test_requests.py', open(filename, 'rb'))})
906        prep = r.prepare()
907        assert b'name="stuff"' in prep.body
908        assert b'name="b\'stuff\'"' not in prep.body
909
910    def test_unicode_method_name(self, httpbin):
911        files = {'file': open(__file__, 'rb')}
912        r = requests.request(
913            method=u('POST'), url=httpbin('post'), files=files)
914        assert r.status_code == 200
915
916    def test_unicode_method_name_with_request_object(self, httpbin):
917        files = {'file': open(__file__, 'rb')}
918        s = requests.Session()
919        req = requests.Request(u('POST'), httpbin('post'), files=files)
920        prep = s.prepare_request(req)
921        assert isinstance(prep.method, builtin_str)
922        assert prep.method == 'POST'
923
924        resp = s.send(prep)
925        assert resp.status_code == 200
926
927    def test_non_prepared_request_error(self):
928        s = requests.Session()
929        req = requests.Request(u('POST'), '/')
930
931        with pytest.raises(ValueError) as e:
932            s.send(req)
933        assert str(e.value) == 'You can only send PreparedRequests.'
934
935    def test_custom_content_type(self, httpbin):
936        r = requests.post(
937            httpbin('post'),
938            data={'stuff': json.dumps({'a': 123})},
939            files={
940                'file1': ('test_requests.py', open(__file__, 'rb')),
941                'file2': ('test_requests', open(__file__, 'rb'),
942                    'text/py-content-type')})
943        assert r.status_code == 200
944        assert b"text/py-content-type" in r.request.body
945
946    def test_hook_receives_request_arguments(self, httpbin):
947        def hook(resp, **kwargs):
948            assert resp is not None
949            assert kwargs != {}
950
951        s = requests.Session()
952        r = requests.Request('GET', httpbin(), hooks={'response': hook})
953        prep = s.prepare_request(r)
954        s.send(prep)
955
956    def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
957        hook = lambda x, *args, **kwargs: x
958        s = requests.Session()
959        s.hooks['response'].append(hook)
960        r = requests.Request('GET', httpbin())
961        prep = s.prepare_request(r)
962        assert prep.hooks['response'] != []
963        assert prep.hooks['response'] == [hook]
964
965    def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
966        hook1 = lambda x, *args, **kwargs: x
967        hook2 = lambda x, *args, **kwargs: x
968        assert hook1 is not hook2
969        s = requests.Session()
970        s.hooks['response'].append(hook2)
971        r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
972        prep = s.prepare_request(r)
973        assert prep.hooks['response'] == [hook1]
974
975    def test_prepared_request_hook(self, httpbin):
976        def hook(resp, **kwargs):
977            resp.hook_working = True
978            return resp
979
980        req = requests.Request('GET', httpbin(), hooks={'response': hook})
981        prep = req.prepare()
982
983        s = requests.Session()
984        s.proxies = getproxies()
985        resp = s.send(prep)
986
987        assert hasattr(resp, 'hook_working')
988
989    def test_prepared_from_session(self, httpbin):
990        class DummyAuth(requests.auth.AuthBase):
991            def __call__(self, r):
992                r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
993                return r
994
995        req = requests.Request('GET', httpbin('headers'))
996        assert not req.auth
997
998        s = requests.Session()
999        s.auth = DummyAuth()
1000
1001        prep = s.prepare_request(req)
1002        resp = s.send(prep)
1003
1004        assert resp.json()['headers'][
1005            'Dummy-Auth-Test'] == 'dummy-auth-test-ok'
1006
1007    def test_prepare_request_with_bytestring_url(self):
1008        req = requests.Request('GET', b'https://httpbin.org/')
1009        s = requests.Session()
1010        prep = s.prepare_request(req)
1011        assert prep.url == "https://httpbin.org/"
1012
1013    def test_request_with_bytestring_host(self, httpbin):
1014        s = requests.Session()
1015        resp = s.request(
1016            'GET',
1017            httpbin('cookies/set?cookie=value'),
1018            allow_redirects=False,
1019            headers={'Host': b'httpbin.org'}
1020        )
1021        assert resp.cookies.get('cookie') == 'value'
1022
1023    def test_links(self):
1024        r = requests.Response()
1025        r.headers = {
1026            'cache-control': 'public, max-age=60, s-maxage=60',
1027            'connection': 'keep-alive',
1028            'content-encoding': 'gzip',
1029            'content-type': 'application/json; charset=utf-8',
1030            'date': 'Sat, 26 Jan 2013 16:47:56 GMT',
1031            'etag': '"6ff6a73c0e446c1f61614769e3ceb778"',
1032            'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT',
1033            'link': ('<https://api.github.com/users/kennethreitz/repos?'
1034                     'page=2&per_page=10>; rel="next", <https://api.github.'
1035                     'com/users/kennethreitz/repos?page=7&per_page=10>; '
1036                     ' rel="last"'),
1037            'server': 'GitHub.com',
1038            'status': '200 OK',
1039            'vary': 'Accept',
1040            'x-content-type-options': 'nosniff',
1041            'x-github-media-type': 'github.beta',
1042            'x-ratelimit-limit': '60',
1043            'x-ratelimit-remaining': '57'
1044        }
1045        assert r.links['next']['rel'] == 'next'
1046
1047    def test_cookie_parameters(self):
1048        key = 'some_cookie'
1049        value = 'some_value'
1050        secure = True
1051        domain = 'test.com'
1052        rest = {'HttpOnly': True}
1053
1054        jar = requests.cookies.RequestsCookieJar()
1055        jar.set(key, value, secure=secure, domain=domain, rest=rest)
1056
1057        assert len(jar) == 1
1058        assert 'some_cookie' in jar
1059
1060        cookie = list(jar)[0]
1061        assert cookie.secure == secure
1062        assert cookie.domain == domain
1063        assert cookie._rest['HttpOnly'] == rest['HttpOnly']
1064
1065    def test_cookie_as_dict_keeps_len(self):
1066        key = 'some_cookie'
1067        value = 'some_value'
1068
1069        key1 = 'some_cookie1'
1070        value1 = 'some_value1'
1071
1072        jar = requests.cookies.RequestsCookieJar()
1073        jar.set(key, value)
1074        jar.set(key1, value1)
1075
1076        d1 = dict(jar)
1077        d2 = dict(jar.iteritems())
1078        d3 = dict(jar.items())
1079
1080        assert len(jar) == 2
1081        assert len(d1) == 2
1082        assert len(d2) == 2
1083        assert len(d3) == 2
1084
1085    def test_cookie_as_dict_keeps_items(self):
1086        key = 'some_cookie'
1087        value = 'some_value'
1088
1089        key1 = 'some_cookie1'
1090        value1 = 'some_value1'
1091
1092        jar = requests.cookies.RequestsCookieJar()
1093        jar.set(key, value)
1094        jar.set(key1, value1)
1095
1096        d1 = dict(jar)
1097        d2 = dict(jar.iteritems())
1098        d3 = dict(jar.items())
1099
1100        assert d1['some_cookie'] == 'some_value'
1101        assert d2['some_cookie'] == 'some_value'
1102        assert d3['some_cookie1'] == 'some_value1'
1103
1104    def test_cookie_as_dict_keys(self):
1105        key = 'some_cookie'
1106        value = 'some_value'
1107
1108        key1 = 'some_cookie1'
1109        value1 = 'some_value1'
1110
1111        jar = requests.cookies.RequestsCookieJar()
1112        jar.set(key, value)
1113        jar.set(key1, value1)
1114
1115        keys = jar.keys()
1116        assert keys == list(keys)
1117        # make sure one can use keys multiple times
1118        assert list(keys) == list(keys)
1119
1120    def test_cookie_as_dict_values(self):
1121        key = 'some_cookie'
1122        value = 'some_value'
1123
1124        key1 = 'some_cookie1'
1125        value1 = 'some_value1'
1126
1127        jar = requests.cookies.RequestsCookieJar()
1128        jar.set(key, value)
1129        jar.set(key1, value1)
1130
1131        values = jar.values()
1132        assert values == list(values)
1133        # make sure one can use values multiple times
1134        assert list(values) == list(values)
1135
1136    def test_cookie_as_dict_items(self):
1137        key = 'some_cookie'
1138        value = 'some_value'
1139
1140        key1 = 'some_cookie1'
1141        value1 = 'some_value1'
1142
1143        jar = requests.cookies.RequestsCookieJar()
1144        jar.set(key, value)
1145        jar.set(key1, value1)
1146
1147        items = jar.items()
1148        assert items == list(items)
1149        # make sure one can use items multiple times
1150        assert list(items) == list(items)
1151
1152    def test_cookie_duplicate_names_different_domains(self):
1153        key = 'some_cookie'
1154        value = 'some_value'
1155        domain1 = 'test1.com'
1156        domain2 = 'test2.com'
1157
1158        jar = requests.cookies.RequestsCookieJar()
1159        jar.set(key, value, domain=domain1)
1160        jar.set(key, value, domain=domain2)
1161        assert key in jar
1162        items = jar.items()
1163        assert len(items) == 2
1164
1165        # Verify that CookieConflictError is raised if domain is not specified
1166        with pytest.raises(requests.cookies.CookieConflictError):
1167            jar.get(key)
1168
1169        # Verify that CookieConflictError is not raised if domain is specified
1170        cookie = jar.get(key, domain=domain1)
1171        assert cookie == value
1172
1173    def test_cookie_duplicate_names_raises_cookie_conflict_error(self):
1174        key = 'some_cookie'
1175        value = 'some_value'
1176        path = 'some_path'
1177
1178        jar = requests.cookies.RequestsCookieJar()
1179        jar.set(key, value, path=path)
1180        jar.set(key, value)
1181        with pytest.raises(requests.cookies.CookieConflictError):
1182            jar.get(key)
1183
1184    def test_cookie_policy_copy(self):
1185        class MyCookiePolicy(cookielib.DefaultCookiePolicy):
1186            pass
1187
1188        jar = requests.cookies.RequestsCookieJar()
1189        jar.set_policy(MyCookiePolicy())
1190        assert isinstance(jar.copy().get_policy(), MyCookiePolicy)
1191
1192    def test_time_elapsed_blank(self, httpbin):
1193        r = requests.get(httpbin('get'))
1194        td = r.elapsed
1195        total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6)
1196        assert total_seconds > 0.0
1197
1198    def test_empty_response_has_content_none(self):
1199        r = requests.Response()
1200        assert r.content is None
1201
1202    def test_response_is_iterable(self):
1203        r = requests.Response()
1204        io = StringIO.StringIO('abc')
1205        read_ = io.read
1206
1207        def read_mock(amt, decode_content=None):
1208            return read_(amt)
1209        setattr(io, 'read', read_mock)
1210        r.raw = io
1211        assert next(iter(r))
1212        io.close()
1213
1214    def test_response_decode_unicode(self):
1215        """When called with decode_unicode, Response.iter_content should always
1216        return unicode.
1217        """
1218        r = requests.Response()
1219        r._content_consumed = True
1220        r._content = b'the content'
1221        r.encoding = 'ascii'
1222
1223        chunks = r.iter_content(decode_unicode=True)
1224        assert all(isinstance(chunk, str) for chunk in chunks)
1225
1226        # also for streaming
1227        r = requests.Response()
1228        r.raw = io.BytesIO(b'the content')
1229        r.encoding = 'ascii'
1230        chunks = r.iter_content(decode_unicode=True)
1231        assert all(isinstance(chunk, str) for chunk in chunks)
1232
1233    def test_response_reason_unicode(self):
1234        # check for unicode HTTP status
1235        r = requests.Response()
1236        r.url = u'unicode URL'
1237        r.reason = u'Komponenttia ei löydy'.encode('utf-8')
1238        r.status_code = 404
1239        r.encoding = None
1240        assert not r.ok  # old behaviour - crashes here
1241
1242    def test_response_reason_unicode_fallback(self):
1243        # check raise_status falls back to ISO-8859-1
1244        r = requests.Response()
1245        r.url = 'some url'
1246        reason = u'Komponenttia ei löydy'
1247        r.reason = reason.encode('latin-1')
1248        r.status_code = 500
1249        r.encoding = None
1250        with pytest.raises(requests.exceptions.HTTPError) as e:
1251            r.raise_for_status()
1252        assert reason in e.value.args[0]
1253
1254    def test_response_chunk_size_type(self):
1255        """Ensure that chunk_size is passed as None or an integer, otherwise
1256        raise a TypeError.
1257        """
1258        r = requests.Response()
1259        r.raw = io.BytesIO(b'the content')
1260        chunks = r.iter_content(1)
1261        assert all(len(chunk) == 1 for chunk in chunks)
1262
1263        r = requests.Response()
1264        r.raw = io.BytesIO(b'the content')
1265        chunks = r.iter_content(None)
1266        assert list(chunks) == [b'the content']
1267
1268        r = requests.Response()
1269        r.raw = io.BytesIO(b'the content')
1270        with pytest.raises(TypeError):
1271            chunks = r.iter_content("1024")
1272
1273    def test_request_and_response_are_pickleable(self, httpbin):
1274        r = requests.get(httpbin('get'))
1275
1276        # verify we can pickle the original request
1277        assert pickle.loads(pickle.dumps(r.request))
1278
1279        # verify we can pickle the response and that we have access to
1280        # the original request.
1281        pr = pickle.loads(pickle.dumps(r))
1282        assert r.request.url == pr.request.url
1283        assert r.request.headers == pr.request.headers
1284
1285    def test_prepared_request_is_pickleable(self, httpbin):
1286        p = requests.Request('GET', httpbin('get')).prepare()
1287
1288        # Verify PreparedRequest can be pickled and unpickled
1289        r = pickle.loads(pickle.dumps(p))
1290        assert r.url == p.url
1291        assert r.headers == p.headers
1292        assert r.body == p.body
1293
1294        # Verify unpickled PreparedRequest sends properly
1295        s = requests.Session()
1296        resp = s.send(r)
1297        assert resp.status_code == 200
1298
1299    def test_prepared_request_with_file_is_pickleable(self, httpbin):
1300        files = {'file': open(__file__, 'rb')}
1301        r = requests.Request('POST', httpbin('post'), files=files)
1302        p = r.prepare()
1303
1304        # Verify PreparedRequest can be pickled and unpickled
1305        r = pickle.loads(pickle.dumps(p))
1306        assert r.url == p.url
1307        assert r.headers == p.headers
1308        assert r.body == p.body
1309
1310        # Verify unpickled PreparedRequest sends properly
1311        s = requests.Session()
1312        resp = s.send(r)
1313        assert resp.status_code == 200
1314
1315    def test_prepared_request_with_hook_is_pickleable(self, httpbin):
1316        r = requests.Request('GET', httpbin('get'), hooks=default_hooks())
1317        p = r.prepare()
1318
1319        # Verify PreparedRequest can be pickled
1320        r = pickle.loads(pickle.dumps(p))
1321        assert r.url == p.url
1322        assert r.headers == p.headers
1323        assert r.body == p.body
1324        assert r.hooks == p.hooks
1325
1326        # Verify unpickled PreparedRequest sends properly
1327        s = requests.Session()
1328        resp = s.send(r)
1329        assert resp.status_code == 200
1330
1331    def test_cannot_send_unprepared_requests(self, httpbin):
1332        r = requests.Request(url=httpbin())
1333        with pytest.raises(ValueError):
1334            requests.Session().send(r)
1335
1336    def test_http_error(self):
1337        error = requests.exceptions.HTTPError()
1338        assert not error.response
1339        response = requests.Response()
1340        error = requests.exceptions.HTTPError(response=response)
1341        assert error.response == response
1342        error = requests.exceptions.HTTPError('message', response=response)
1343        assert str(error) == 'message'
1344        assert error.response == response
1345
1346    def test_session_pickling(self, httpbin):
1347        r = requests.Request('GET', httpbin('get'))
1348        s = requests.Session()
1349
1350        s = pickle.loads(pickle.dumps(s))
1351        s.proxies = getproxies()
1352
1353        r = s.send(r.prepare())
1354        assert r.status_code == 200
1355
1356    def test_fixes_1329(self, httpbin):
1357        """Ensure that header updates are done case-insensitively."""
1358        s = requests.Session()
1359        s.headers.update({'ACCEPT': 'BOGUS'})
1360        s.headers.update({'accept': 'application/json'})
1361        r = s.get(httpbin('get'))
1362        headers = r.request.headers
1363        assert headers['accept'] == 'application/json'
1364        assert headers['Accept'] == 'application/json'
1365        assert headers['ACCEPT'] == 'application/json'
1366
1367    def test_uppercase_scheme_redirect(self, httpbin):
1368        parts = urlparse(httpbin('html'))
1369        url = "HTTP://" + parts.netloc + parts.path
1370        r = requests.get(httpbin('redirect-to'), params={'url': url})
1371        assert r.status_code == 200
1372        assert r.url.lower() == url.lower()
1373
1374    def test_transport_adapter_ordering(self):
1375        s = requests.Session()
1376        order = ['https://', 'http://']
1377        assert order == list(s.adapters)
1378        s.mount('http://git', HTTPAdapter())
1379        s.mount('http://github', HTTPAdapter())
1380        s.mount('http://github.com', HTTPAdapter())
1381        s.mount('http://github.com/about/', HTTPAdapter())
1382        order = [
1383            'http://github.com/about/',
1384            'http://github.com',
1385            'http://github',
1386            'http://git',
1387            'https://',
1388            'http://',
1389        ]
1390        assert order == list(s.adapters)
1391        s.mount('http://gittip', HTTPAdapter())
1392        s.mount('http://gittip.com', HTTPAdapter())
1393        s.mount('http://gittip.com/about/', HTTPAdapter())
1394        order = [
1395            'http://github.com/about/',
1396            'http://gittip.com/about/',
1397            'http://github.com',
1398            'http://gittip.com',
1399            'http://github',
1400            'http://gittip',
1401            'http://git',
1402            'https://',
1403            'http://',
1404        ]
1405        assert order == list(s.adapters)
1406        s2 = requests.Session()
1407        s2.adapters = {'http://': HTTPAdapter()}
1408        s2.mount('https://', HTTPAdapter())
1409        assert 'http://' in s2.adapters
1410        assert 'https://' in s2.adapters
1411
1412    def test_session_get_adapter_prefix_matching(self):
1413        prefix = 'https://example.com'
1414        more_specific_prefix = prefix + '/some/path'
1415
1416        url_matching_only_prefix = prefix + '/another/path'
1417        url_matching_more_specific_prefix = more_specific_prefix + '/longer/path'
1418        url_not_matching_prefix = 'https://another.example.com/'
1419
1420        s = requests.Session()
1421        prefix_adapter = HTTPAdapter()
1422        more_specific_prefix_adapter = HTTPAdapter()
1423        s.mount(prefix, prefix_adapter)
1424        s.mount(more_specific_prefix, more_specific_prefix_adapter)
1425
1426        assert s.get_adapter(url_matching_only_prefix) is prefix_adapter
1427        assert s.get_adapter(url_matching_more_specific_prefix) is more_specific_prefix_adapter
1428        assert s.get_adapter(url_not_matching_prefix) not in (prefix_adapter, more_specific_prefix_adapter)
1429
1430    def test_session_get_adapter_prefix_matching_mixed_case(self):
1431        mixed_case_prefix = 'hTtPs://eXamPle.CoM/MixEd_CAse_PREfix'
1432        url_matching_prefix = mixed_case_prefix + '/full_url'
1433
1434        s = requests.Session()
1435        my_adapter = HTTPAdapter()
1436        s.mount(mixed_case_prefix, my_adapter)
1437
1438        assert s.get_adapter(url_matching_prefix) is my_adapter
1439
1440    def test_session_get_adapter_prefix_matching_is_case_insensitive(self):
1441        mixed_case_prefix = 'hTtPs://eXamPle.CoM/MixEd_CAse_PREfix'
1442        url_matching_prefix_with_different_case = 'HtTpS://exaMPLe.cOm/MiXeD_caSE_preFIX/another_url'
1443
1444        s = requests.Session()
1445        my_adapter = HTTPAdapter()
1446        s.mount(mixed_case_prefix, my_adapter)
1447
1448        assert s.get_adapter(url_matching_prefix_with_different_case) is my_adapter
1449
1450    def test_header_remove_is_case_insensitive(self, httpbin):
1451        # From issue #1321
1452        s = requests.Session()
1453        s.headers['foo'] = 'bar'
1454        r = s.get(httpbin('get'), headers={'FOO': None})
1455        assert 'foo' not in r.request.headers
1456
1457    def test_params_are_merged_case_sensitive(self, httpbin):
1458        s = requests.Session()
1459        s.params['foo'] = 'bar'
1460        r = s.get(httpbin('get'), params={'FOO': 'bar'})
1461        assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
1462
1463    def test_long_authinfo_in_url(self):
1464        url = 'http://{}:{}@{}:9000/path?query#frag'.format(
1465            'E8A3BE87-9E3F-4620-8858-95478E385B5B',
1466            'EA770032-DA4D-4D84-8CE9-29C6D910BF1E',
1467            'exactly-------------sixty-----------three------------characters',
1468        )
1469        r = requests.Request('GET', url).prepare()
1470        assert r.url == url
1471
1472    def test_header_keys_are_native(self, httpbin):
1473        headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'}
1474        r = requests.Request('GET', httpbin('get'), headers=headers)
1475        p = r.prepare()
1476
1477        # This is testing that they are builtin strings. A bit weird, but there
1478        # we go.
1479        assert 'unicode' in p.headers.keys()
1480        assert 'byte' in p.headers.keys()
1481
1482    def test_header_validation(self, httpbin):
1483        """Ensure prepare_headers regex isn't flagging valid header contents."""
1484        headers_ok = {'foo': 'bar baz qux',
1485                      'bar': u'fbbq'.encode('utf8'),
1486                      'baz': '',
1487                      'qux': '1'}
1488        r = requests.get(httpbin('get'), headers=headers_ok)
1489        assert r.request.headers['foo'] == headers_ok['foo']
1490
1491    def test_header_value_not_str(self, httpbin):
1492        """Ensure the header value is of type string or bytes as
1493        per discussion in GH issue #3386
1494        """
1495        headers_int = {'foo': 3}
1496        headers_dict = {'bar': {'foo': 'bar'}}
1497        headers_list = {'baz': ['foo', 'bar']}
1498
1499        # Test for int
1500        with pytest.raises(InvalidHeader) as excinfo:
1501            r = requests.get(httpbin('get'), headers=headers_int)
1502        assert 'foo' in str(excinfo.value)
1503        # Test for dict
1504        with pytest.raises(InvalidHeader) as excinfo:
1505            r = requests.get(httpbin('get'), headers=headers_dict)
1506        assert 'bar' in str(excinfo.value)
1507        # Test for list
1508        with pytest.raises(InvalidHeader) as excinfo:
1509            r = requests.get(httpbin('get'), headers=headers_list)
1510        assert 'baz' in str(excinfo.value)
1511
1512    def test_header_no_return_chars(self, httpbin):
1513        """Ensure that a header containing return character sequences raise an
1514        exception. Otherwise, multiple headers are created from single string.
1515        """
1516        headers_ret = {'foo': 'bar\r\nbaz: qux'}
1517        headers_lf = {'foo': 'bar\nbaz: qux'}
1518        headers_cr = {'foo': 'bar\rbaz: qux'}
1519
1520        # Test for newline
1521        with pytest.raises(InvalidHeader):
1522            r = requests.get(httpbin('get'), headers=headers_ret)
1523        # Test for line feed
1524        with pytest.raises(InvalidHeader):
1525            r = requests.get(httpbin('get'), headers=headers_lf)
1526        # Test for carriage return
1527        with pytest.raises(InvalidHeader):
1528            r = requests.get(httpbin('get'), headers=headers_cr)
1529
1530    def test_header_no_leading_space(self, httpbin):
1531        """Ensure headers containing leading whitespace raise
1532        InvalidHeader Error before sending.
1533        """
1534        headers_space = {'foo': ' bar'}
1535        headers_tab = {'foo': '   bar'}
1536
1537        # Test for whitespace
1538        with pytest.raises(InvalidHeader):
1539            r = requests.get(httpbin('get'), headers=headers_space)
1540        # Test for tab
1541        with pytest.raises(InvalidHeader):
1542            r = requests.get(httpbin('get'), headers=headers_tab)
1543
1544    @pytest.mark.parametrize('files', ('foo', b'foo', bytearray(b'foo')))
1545    def test_can_send_objects_with_files(self, httpbin, files):
1546        data = {'a': 'this is a string'}
1547        files = {'b': files}
1548        r = requests.Request('POST', httpbin('post'), data=data, files=files)
1549        p = r.prepare()
1550        assert 'multipart/form-data' in p.headers['Content-Type']
1551
1552    def test_can_send_file_object_with_non_string_filename(self, httpbin):
1553        f = io.BytesIO()
1554        f.name = 2
1555        r = requests.Request('POST', httpbin('post'), files={'f': f})
1556        p = r.prepare()
1557
1558        assert 'multipart/form-data' in p.headers['Content-Type']
1559
1560    def test_autoset_header_values_are_native(self, httpbin):
1561        data = 'this is a string'
1562        length = '16'
1563        req = requests.Request('POST', httpbin('post'), data=data)
1564        p = req.prepare()
1565
1566        assert p.headers['Content-Length'] == length
1567
1568    def test_nonhttp_schemes_dont_check_URLs(self):
1569        test_urls = (
1570            'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==',
1571            'file:///etc/passwd',
1572            'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431',
1573        )
1574        for test_url in test_urls:
1575            req = requests.Request('GET', test_url)
1576            preq = req.prepare()
1577            assert test_url == preq.url
1578
1579    def test_auth_is_stripped_on_http_downgrade(self, httpbin, httpbin_secure, httpbin_ca_bundle):
1580        r = requests.get(
1581            httpbin_secure('redirect-to'),
1582            params={'url': httpbin('get')},
1583            auth=('user', 'pass'),
1584            verify=httpbin_ca_bundle
1585        )
1586        assert r.history[0].request.headers['Authorization']
1587        assert 'Authorization' not in r.request.headers
1588
1589    def test_auth_is_retained_for_redirect_on_host(self, httpbin):
1590        r = requests.get(httpbin('redirect/1'), auth=('user', 'pass'))
1591        h1 = r.history[0].request.headers['Authorization']
1592        h2 = r.request.headers['Authorization']
1593
1594        assert h1 == h2
1595
1596    def test_should_strip_auth_host_change(self):
1597        s = requests.Session()
1598        assert s.should_strip_auth('http://example.com/foo', 'http://another.example.com/')
1599
1600    def test_should_strip_auth_http_downgrade(self):
1601        s = requests.Session()
1602        assert s.should_strip_auth('https://example.com/foo', 'http://example.com/bar')
1603
1604    def test_should_strip_auth_https_upgrade(self):
1605        s = requests.Session()
1606        assert not s.should_strip_auth('http://example.com/foo', 'https://example.com/bar')
1607        assert not s.should_strip_auth('http://example.com:80/foo', 'https://example.com/bar')
1608        assert not s.should_strip_auth('http://example.com/foo', 'https://example.com:443/bar')
1609        # Non-standard ports should trigger stripping
1610        assert s.should_strip_auth('http://example.com:8080/foo', 'https://example.com/bar')
1611        assert s.should_strip_auth('http://example.com/foo', 'https://example.com:8443/bar')
1612
1613    def test_should_strip_auth_port_change(self):
1614        s = requests.Session()
1615        assert s.should_strip_auth('http://example.com:1234/foo', 'https://example.com:4321/bar')
1616
1617    @pytest.mark.parametrize(
1618        'old_uri, new_uri', (
1619            ('https://example.com:443/foo', 'https://example.com/bar'),
1620            ('http://example.com:80/foo', 'http://example.com/bar'),
1621            ('https://example.com/foo', 'https://example.com:443/bar'),
1622            ('http://example.com/foo', 'http://example.com:80/bar')
1623        ))
1624    def test_should_strip_auth_default_port(self, old_uri, new_uri):
1625        s = requests.Session()
1626        assert not s.should_strip_auth(old_uri, new_uri)
1627
1628    def test_manual_redirect_with_partial_body_read(self, httpbin):
1629        s = requests.Session()
1630        r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True)
1631        assert r1.is_redirect
1632        rg = s.resolve_redirects(r1, r1.request, stream=True)
1633
1634        # read only the first eight bytes of the response body,
1635        # then follow the redirect
1636        r1.iter_content(8)
1637        r2 = next(rg)
1638        assert r2.is_redirect
1639
1640        # read all of the response via iter_content,
1641        # then follow the redirect
1642        for _ in r2.iter_content():
1643            pass
1644        r3 = next(rg)
1645        assert not r3.is_redirect
1646
1647    def test_prepare_body_position_non_stream(self):
1648        data = b'the data'
1649        prep = requests.Request('GET', 'http://example.com', data=data).prepare()
1650        assert prep._body_position is None
1651
1652    def test_rewind_body(self):
1653        data = io.BytesIO(b'the data')
1654        prep = requests.Request('GET', 'http://example.com', data=data).prepare()
1655        assert prep._body_position == 0
1656        assert prep.body.read() == b'the data'
1657
1658        # the data has all been read
1659        assert prep.body.read() == b''
1660
1661        # rewind it back
1662        requests.utils.rewind_body(prep)
1663        assert prep.body.read() == b'the data'
1664
1665    def test_rewind_partially_read_body(self):
1666        data = io.BytesIO(b'the data')
1667        data.read(4)  # read some data
1668        prep = requests.Request('GET', 'http://example.com', data=data).prepare()
1669        assert prep._body_position == 4
1670        assert prep.body.read() == b'data'
1671
1672        # the data has all been read
1673        assert prep.body.read() == b''
1674
1675        # rewind it back
1676        requests.utils.rewind_body(prep)
1677        assert prep.body.read() == b'data'
1678
1679    def test_rewind_body_no_seek(self):
1680        class BadFileObj:
1681            def __init__(self, data):
1682                self.data = data
1683
1684            def tell(self):
1685                return 0
1686
1687            def __iter__(self):
1688                return
1689
1690        data = BadFileObj('the data')
1691        prep = requests.Request('GET', 'http://example.com', data=data).prepare()
1692        assert prep._body_position == 0
1693
1694        with pytest.raises(UnrewindableBodyError) as e:
1695            requests.utils.rewind_body(prep)
1696
1697        assert 'Unable to rewind request body' in str(e)
1698
1699    def test_rewind_body_failed_seek(self):
1700        class BadFileObj:
1701            def __init__(self, data):
1702                self.data = data
1703
1704            def tell(self):
1705                return 0
1706
1707            def seek(self, pos, whence=0):
1708                raise OSError()
1709
1710            def __iter__(self):
1711                return
1712
1713        data = BadFileObj('the data')
1714        prep = requests.Request('GET', 'http://example.com', data=data).prepare()
1715        assert prep._body_position == 0
1716
1717        with pytest.raises(UnrewindableBodyError) as e:
1718            requests.utils.rewind_body(prep)
1719
1720        assert 'error occurred when rewinding request body' in str(e)
1721
1722    def test_rewind_body_failed_tell(self):
1723        class BadFileObj:
1724            def __init__(self, data):
1725                self.data = data
1726
1727            def tell(self):
1728                raise OSError()
1729
1730            def __iter__(self):
1731                return
1732
1733        data = BadFileObj('the data')
1734        prep = requests.Request('GET', 'http://example.com', data=data).prepare()
1735        assert prep._body_position is not None
1736
1737        with pytest.raises(UnrewindableBodyError) as e:
1738            requests.utils.rewind_body(prep)
1739
1740        assert 'Unable to rewind request body' in str(e)
1741
1742    def _patch_adapter_gzipped_redirect(self, session, url):
1743        adapter = session.get_adapter(url=url)
1744        org_build_response = adapter.build_response
1745        self._patched_response = False
1746
1747        def build_response(*args, **kwargs):
1748            resp = org_build_response(*args, **kwargs)
1749            if not self._patched_response:
1750                resp.raw.headers['content-encoding'] = 'gzip'
1751                self._patched_response = True
1752            return resp
1753
1754        adapter.build_response = build_response
1755
1756    def test_redirect_with_wrong_gzipped_header(self, httpbin):
1757        s = requests.Session()
1758        url = httpbin('redirect/1')
1759        self._patch_adapter_gzipped_redirect(s, url)
1760        s.get(url)
1761
1762    @pytest.mark.parametrize(
1763        'username, password, auth_str', (
1764            ('test', 'test', 'Basic dGVzdDp0ZXN0'),
1765            (u'имя'.encode('utf-8'), u'пароль'.encode('utf-8'), 'Basic 0LjQvNGPOtC/0LDRgNC+0LvRjA=='),
1766        ))
1767    def test_basic_auth_str_is_always_native(self, username, password, auth_str):
1768        s = _basic_auth_str(username, password)
1769        assert isinstance(s, builtin_str)
1770        assert s == auth_str
1771
1772    def test_requests_history_is_saved(self, httpbin):
1773        r = requests.get(httpbin('redirect/5'))
1774        total = r.history[-1].history
1775        i = 0
1776        for item in r.history:
1777            assert item.history == total[0:i]
1778            i += 1
1779
1780    def test_json_param_post_content_type_works(self, httpbin):
1781        r = requests.post(
1782            httpbin('post'),
1783            json={'life': 42}
1784        )
1785        assert r.status_code == 200
1786        assert 'application/json' in r.request.headers['Content-Type']
1787        assert {'life': 42} == r.json()['json']
1788
1789    def test_json_param_post_should_not_override_data_param(self, httpbin):
1790        r = requests.Request(method='POST', url=httpbin('post'),
1791                             data={'stuff': 'elixr'},
1792                             json={'music': 'flute'})
1793        prep = r.prepare()
1794        assert 'stuff=elixr' == prep.body
1795
1796    def test_response_iter_lines(self, httpbin):
1797        r = requests.get(httpbin('stream/4'), stream=True)
1798        assert r.status_code == 200
1799
1800        it = r.iter_lines()
1801        next(it)
1802        assert len(list(it)) == 3
1803
1804    def test_response_context_manager(self, httpbin):
1805        with requests.get(httpbin('stream/4'), stream=True) as response:
1806            assert isinstance(response, requests.Response)
1807
1808        assert response.raw.closed
1809
1810    def test_unconsumed_session_response_closes_connection(self, httpbin):
1811        s = requests.session()
1812
1813        with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as response:
1814            pass
1815
1816        assert response._content_consumed is False
1817        assert response.raw.closed
1818
1819    @pytest.mark.xfail
1820    def test_response_iter_lines_reentrant(self, httpbin):
1821        """Response.iter_lines() is not reentrant safe"""
1822        r = requests.get(httpbin('stream/4'), stream=True)
1823        assert r.status_code == 200
1824
1825        next(r.iter_lines())
1826        assert len(list(r.iter_lines())) == 3
1827
1828    def test_session_close_proxy_clear(self, mocker):
1829        proxies = {
1830          'one': mocker.Mock(),
1831          'two': mocker.Mock(),
1832        }
1833        session = requests.Session()
1834        mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
1835        session.close()
1836        proxies['one'].clear.assert_called_once_with()
1837        proxies['two'].clear.assert_called_once_with()
1838
1839    def test_proxy_auth(self):
1840        adapter = HTTPAdapter()
1841        headers = adapter.proxy_headers("http://user:pass@httpbin.org")
1842        assert headers == {'Proxy-Authorization': 'Basic dXNlcjpwYXNz'}
1843
1844    def test_proxy_auth_empty_pass(self):
1845        adapter = HTTPAdapter()
1846        headers = adapter.proxy_headers("http://user:@httpbin.org")
1847        assert headers == {'Proxy-Authorization': 'Basic dXNlcjo='}
1848
1849    def test_response_json_when_content_is_None(self, httpbin):
1850        r = requests.get(httpbin('/status/204'))
1851        # Make sure r.content is None
1852        r.status_code = 0
1853        r._content = False
1854        r._content_consumed = False
1855
1856        assert r.content is None
1857        with pytest.raises(ValueError):
1858            r.json()
1859
1860    def test_response_without_release_conn(self):
1861        """Test `close` call for non-urllib3-like raw objects.
1862        Should work when `release_conn` attr doesn't exist on `response.raw`.
1863        """
1864        resp = requests.Response()
1865        resp.raw = StringIO.StringIO('test')
1866        assert not resp.raw.closed
1867        resp.close()
1868        assert resp.raw.closed
1869
1870    def test_empty_stream_with_auth_does_not_set_content_length_header(self, httpbin):
1871        """Ensure that a byte stream with size 0 will not set both a Content-Length
1872        and Transfer-Encoding header.
1873        """
1874        auth = ('user', 'pass')
1875        url = httpbin('post')
1876        file_obj = io.BytesIO(b'')
1877        r = requests.Request('POST', url, auth=auth, data=file_obj)
1878        prepared_request = r.prepare()
1879        assert 'Transfer-Encoding' in prepared_request.headers
1880        assert 'Content-Length' not in prepared_request.headers
1881
1882    def test_stream_with_auth_does_not_set_transfer_encoding_header(self, httpbin):
1883        """Ensure that a byte stream with size > 0 will not set both a Content-Length
1884        and Transfer-Encoding header.
1885        """
1886        auth = ('user', 'pass')
1887        url = httpbin('post')
1888        file_obj = io.BytesIO(b'test data')
1889        r = requests.Request('POST', url, auth=auth, data=file_obj)
1890        prepared_request = r.prepare()
1891        assert 'Transfer-Encoding' not in prepared_request.headers
1892        assert 'Content-Length' in prepared_request.headers
1893
1894    def test_chunked_upload_does_not_set_content_length_header(self, httpbin):
1895        """Ensure that requests with a generator body stream using
1896        Transfer-Encoding: chunked, not a Content-Length header.
1897        """
1898        data = (i for i in [b'a', b'b', b'c'])
1899        url = httpbin('post')
1900        r = requests.Request('POST', url, data=data)
1901        prepared_request = r.prepare()
1902        assert 'Transfer-Encoding' in prepared_request.headers
1903        assert 'Content-Length' not in prepared_request.headers
1904
1905    def test_custom_redirect_mixin(self, httpbin):
1906        """Tests a custom mixin to overwrite ``get_redirect_target``.
1907
1908        Ensures a subclassed ``requests.Session`` can handle a certain type of
1909        malformed redirect responses.
1910
1911        1. original request receives a proper response: 302 redirect
1912        2. following the redirect, a malformed response is given:
1913            status code = HTTP 200
1914            location = alternate url
1915        3. the custom session catches the edge case and follows the redirect
1916        """
1917        url_final = httpbin('html')
1918        querystring_malformed = urlencode({'location': url_final})
1919        url_redirect_malformed = httpbin('response-headers?%s' % querystring_malformed)
1920        querystring_redirect = urlencode({'url': url_redirect_malformed})
1921        url_redirect = httpbin('redirect-to?%s' % querystring_redirect)
1922        urls_test = [url_redirect,
1923                     url_redirect_malformed,
1924                     url_final,
1925                     ]
1926
1927        class CustomRedirectSession(requests.Session):
1928            def get_redirect_target(self, resp):
1929                # default behavior
1930                if resp.is_redirect:
1931                    return resp.headers['location']
1932                # edge case - check to see if 'location' is in headers anyways
1933                location = resp.headers.get('location')
1934                if location and (location != resp.url):
1935                    return location
1936                return None
1937
1938        session = CustomRedirectSession()
1939        r = session.get(urls_test[0])
1940        assert len(r.history) == 2
1941        assert r.status_code == 200
1942        assert r.history[0].status_code == 302
1943        assert r.history[0].is_redirect
1944        assert r.history[1].status_code == 200
1945        assert not r.history[1].is_redirect
1946        assert r.url == urls_test[2]
1947
1948
1949class TestCaseInsensitiveDict:
1950
1951    @pytest.mark.parametrize(
1952        'cid', (
1953            CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}),
1954            CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]),
1955            CaseInsensitiveDict(FOO='foo', BAr='bar'),
1956        ))
1957    def test_init(self, cid):
1958        assert len(cid) == 2
1959        assert 'foo' in cid
1960        assert 'bar' in cid
1961
1962    def test_docstring_example(self):
1963        cid = CaseInsensitiveDict()
1964        cid['Accept'] = 'application/json'
1965        assert cid['aCCEPT'] == 'application/json'
1966        assert list(cid) == ['Accept']
1967
1968    def test_len(self):
1969        cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
1970        cid['A'] = 'a'
1971        assert len(cid) == 2
1972
1973    def test_getitem(self):
1974        cid = CaseInsensitiveDict({'Spam': 'blueval'})
1975        assert cid['spam'] == 'blueval'
1976        assert cid['SPAM'] == 'blueval'
1977
1978    def test_fixes_649(self):
1979        """__setitem__ should behave case-insensitively."""
1980        cid = CaseInsensitiveDict()
1981        cid['spam'] = 'oneval'
1982        cid['Spam'] = 'twoval'
1983        cid['sPAM'] = 'redval'
1984        cid['SPAM'] = 'blueval'
1985        assert cid['spam'] == 'blueval'
1986        assert cid['SPAM'] == 'blueval'
1987        assert list(cid.keys()) == ['SPAM']
1988
1989    def test_delitem(self):
1990        cid = CaseInsensitiveDict()
1991        cid['Spam'] = 'someval'
1992        del cid['sPam']
1993        assert 'spam' not in cid
1994        assert len(cid) == 0
1995
1996    def test_contains(self):
1997        cid = CaseInsensitiveDict()
1998        cid['Spam'] = 'someval'
1999        assert 'Spam' in cid
2000        assert 'spam' in cid
2001        assert 'SPAM' in cid
2002        assert 'sPam' in cid
2003        assert 'notspam' not in cid
2004
2005    def test_get(self):
2006        cid = CaseInsensitiveDict()
2007        cid['spam'] = 'oneval'
2008        cid['SPAM'] = 'blueval'
2009        assert cid.get('spam') == 'blueval'
2010        assert cid.get('SPAM') == 'blueval'
2011        assert cid.get('sPam') == 'blueval'
2012        assert cid.get('notspam', 'default') == 'default'
2013
2014    def test_update(self):
2015        cid = CaseInsensitiveDict()
2016        cid['spam'] = 'blueval'
2017        cid.update({'sPam': 'notblueval'})
2018        assert cid['spam'] == 'notblueval'
2019        cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
2020        cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
2021        assert len(cid) == 2
2022        assert cid['foo'] == 'anotherfoo'
2023        assert cid['bar'] == 'anotherbar'
2024
2025    def test_update_retains_unchanged(self):
2026        cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
2027        cid.update({'foo': 'newfoo'})
2028        assert cid['bar'] == 'bar'
2029
2030    def test_iter(self):
2031        cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
2032        keys = frozenset(['Spam', 'Eggs'])
2033        assert frozenset(iter(cid)) == keys
2034
2035    def test_equality(self):
2036        cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
2037        othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
2038        assert cid == othercid
2039        del othercid['spam']
2040        assert cid != othercid
2041        assert cid == {'spam': 'blueval', 'eggs': 'redval'}
2042        assert cid != object()
2043
2044    def test_setdefault(self):
2045        cid = CaseInsensitiveDict({'Spam': 'blueval'})
2046        assert cid.setdefault('spam', 'notblueval') == 'blueval'
2047        assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
2048
2049    def test_lower_items(self):
2050        cid = CaseInsensitiveDict({
2051            'Accept': 'application/json',
2052            'user-Agent': 'requests',
2053        })
2054        keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
2055        lowerkeyset = frozenset(['accept', 'user-agent'])
2056        assert keyset == lowerkeyset
2057
2058    def test_preserve_key_case(self):
2059        cid = CaseInsensitiveDict({
2060            'Accept': 'application/json',
2061            'user-Agent': 'requests',
2062        })
2063        keyset = frozenset(['Accept', 'user-Agent'])
2064        assert frozenset(i[0] for i in cid.items()) == keyset
2065        assert frozenset(cid.keys()) == keyset
2066        assert frozenset(cid) == keyset
2067
2068    def test_preserve_last_key_case(self):
2069        cid = CaseInsensitiveDict({
2070            'Accept': 'application/json',
2071            'user-Agent': 'requests',
2072        })
2073        cid.update({'ACCEPT': 'application/json'})
2074        cid['USER-AGENT'] = 'requests'
2075        keyset = frozenset(['ACCEPT', 'USER-AGENT'])
2076        assert frozenset(i[0] for i in cid.items()) == keyset
2077        assert frozenset(cid.keys()) == keyset
2078        assert frozenset(cid) == keyset
2079
2080    def test_copy(self):
2081        cid = CaseInsensitiveDict({
2082            'Accept': 'application/json',
2083            'user-Agent': 'requests',
2084        })
2085        cid_copy = cid.copy()
2086        assert cid == cid_copy
2087        cid['changed'] = True
2088        assert cid != cid_copy
2089
2090
2091class TestMorselToCookieExpires:
2092    """Tests for morsel_to_cookie when morsel contains expires."""
2093
2094    def test_expires_valid_str(self):
2095        """Test case where we convert expires from string time."""
2096
2097        morsel = Morsel()
2098        morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT'
2099        cookie = morsel_to_cookie(morsel)
2100        assert cookie.expires == 1
2101
2102    @pytest.mark.parametrize(
2103        'value, exception', (
2104            (100, TypeError),
2105            ('woops', ValueError),
2106        ))
2107    def test_expires_invalid_int(self, value, exception):
2108        """Test case where an invalid type is passed for expires."""
2109        morsel = Morsel()
2110        morsel['expires'] = value
2111        with pytest.raises(exception):
2112            morsel_to_cookie(morsel)
2113
2114    def test_expires_none(self):
2115        """Test case where expires is None."""
2116
2117        morsel = Morsel()
2118        morsel['expires'] = None
2119        cookie = morsel_to_cookie(morsel)
2120        assert cookie.expires is None
2121
2122
2123class TestMorselToCookieMaxAge:
2124
2125    """Tests for morsel_to_cookie when morsel contains max-age."""
2126
2127    def test_max_age_valid_int(self):
2128        """Test case where a valid max age in seconds is passed."""
2129
2130        morsel = Morsel()
2131        morsel['max-age'] = 60
2132        cookie = morsel_to_cookie(morsel)
2133        assert isinstance(cookie.expires, int)
2134
2135    def test_max_age_invalid_str(self):
2136        """Test case where a invalid max age is passed."""
2137
2138        morsel = Morsel()
2139        morsel['max-age'] = 'woops'
2140        with pytest.raises(TypeError):
2141            morsel_to_cookie(morsel)
2142
2143
2144class TestTimeout:
2145
2146    def test_stream_timeout(self, httpbin):
2147        try:
2148            requests.get(httpbin('delay/10'), timeout=2.0)
2149        except requests.exceptions.Timeout as e:
2150            assert 'Read timed out' in e.args[0].args[0]
2151
2152    @pytest.mark.parametrize(
2153        'timeout, error_text', (
2154            ((3, 4, 5), '(connect, read)'),
2155            ('foo', 'must be an int, float or None'),
2156        ))
2157    def test_invalid_timeout(self, httpbin, timeout, error_text):
2158        with pytest.raises(ValueError) as e:
2159            requests.get(httpbin('get'), timeout=timeout)
2160        assert error_text in str(e)
2161
2162    @pytest.mark.parametrize(
2163        'timeout', (
2164            None,
2165            Urllib3Timeout(connect=None, read=None)
2166        ))
2167    def test_none_timeout(self, httpbin, timeout):
2168        """Check that you can set None as a valid timeout value.
2169
2170        To actually test this behavior, we'd want to check that setting the
2171        timeout to None actually lets the request block past the system default
2172        timeout. However, this would make the test suite unbearably slow.
2173        Instead we verify that setting the timeout to None does not prevent the
2174        request from succeeding.
2175        """
2176        r = requests.get(httpbin('get'), timeout=timeout)
2177        assert r.status_code == 200
2178
2179    @pytest.mark.parametrize(
2180        'timeout', (
2181            (None, 0.1),
2182            Urllib3Timeout(connect=None, read=0.1)
2183        ))
2184    def test_read_timeout(self, httpbin, timeout):
2185        try:
2186            requests.get(httpbin('delay/10'), timeout=timeout)
2187            pytest.fail('The recv() request should time out.')
2188        except ReadTimeout:
2189            pass
2190
2191    @pytest.mark.parametrize(
2192        'timeout', (
2193            (0.1, None),
2194            Urllib3Timeout(connect=0.1, read=None)
2195        ))
2196    def test_connect_timeout(self, timeout):
2197        try:
2198            requests.get(TARPIT, timeout=timeout)
2199            pytest.fail('The connect() request should time out.')
2200        except ConnectTimeout as e:
2201            assert isinstance(e, ConnectionError)
2202            assert isinstance(e, Timeout)
2203
2204    @pytest.mark.parametrize(
2205        'timeout', (
2206            (0.1, 0.1),
2207            Urllib3Timeout(connect=0.1, read=0.1)
2208        ))
2209    def test_total_timeout_connect(self, timeout):
2210        try:
2211            requests.get(TARPIT, timeout=timeout)
2212            pytest.fail('The connect() request should time out.')
2213        except ConnectTimeout:
2214            pass
2215
2216    def test_encoded_methods(self, httpbin):
2217        """See: https://github.com/psf/requests/issues/2316"""
2218        r = requests.request(b'GET', httpbin('get'))
2219        assert r.ok
2220
2221
2222SendCall = collections.namedtuple('SendCall', ('args', 'kwargs'))
2223
2224
2225class RedirectSession(SessionRedirectMixin):
2226    def __init__(self, order_of_redirects):
2227        self.redirects = order_of_redirects
2228        self.calls = []
2229        self.max_redirects = 30
2230        self.cookies = {}
2231        self.trust_env = False
2232
2233    def send(self, *args, **kwargs):
2234        self.calls.append(SendCall(args, kwargs))
2235        return self.build_response()
2236
2237    def build_response(self):
2238        request = self.calls[-1].args[0]
2239        r = requests.Response()
2240
2241        try:
2242            r.status_code = int(self.redirects.pop(0))
2243        except IndexError:
2244            r.status_code = 200
2245
2246        r.headers = CaseInsensitiveDict({'Location': '/'})
2247        r.raw = self._build_raw()
2248        r.request = request
2249        return r
2250
2251    def _build_raw(self):
2252        string = StringIO.StringIO('')
2253        setattr(string, 'release_conn', lambda *args: args)
2254        return string
2255
2256
2257def test_json_encodes_as_bytes():
2258    # urllib3 expects bodies as bytes-like objects
2259    body = {"key": "value"}
2260    p = PreparedRequest()
2261    p.prepare(
2262        method='GET',
2263        url='https://www.example.com/',
2264        json=body
2265    )
2266    assert isinstance(p.body, bytes)
2267
2268
2269def test_requests_are_updated_each_time(httpbin):
2270    session = RedirectSession([303, 307])
2271    prep = requests.Request('POST', httpbin('post')).prepare()
2272    r0 = session.send(prep)
2273    assert r0.request.method == 'POST'
2274    assert session.calls[-1] == SendCall((r0.request,), {})
2275    redirect_generator = session.resolve_redirects(r0, prep)
2276    default_keyword_args = {
2277        'stream': False,
2278        'verify': True,
2279        'cert': None,
2280        'timeout': None,
2281        'allow_redirects': False,
2282        'proxies': {},
2283    }
2284    for response in redirect_generator:
2285        assert response.request.method == 'GET'
2286        send_call = SendCall((response.request,), default_keyword_args)
2287        assert session.calls[-1] == send_call
2288
2289
2290@pytest.mark.parametrize("var,url,proxy", [
2291    ('http_proxy', 'http://example.com', 'socks5://proxy.com:9876'),
2292    ('https_proxy', 'https://example.com', 'socks5://proxy.com:9876'),
2293    ('all_proxy', 'http://example.com', 'socks5://proxy.com:9876'),
2294    ('all_proxy', 'https://example.com', 'socks5://proxy.com:9876'),
2295])
2296def test_proxy_env_vars_override_default(var, url, proxy):
2297    session = requests.Session()
2298    prep = PreparedRequest()
2299    prep.prepare(method='GET', url=url)
2300
2301    kwargs = {
2302        var: proxy
2303    }
2304    scheme = urlparse(url).scheme
2305    with override_environ(**kwargs):
2306        proxies = session.rebuild_proxies(prep, {})
2307        assert scheme in proxies
2308        assert proxies[scheme] == proxy
2309
2310
2311@pytest.mark.parametrize(
2312    'data', (
2313        (('a', 'b'), ('c', 'd')),
2314        (('c', 'd'), ('a', 'b')),
2315        (('a', 'b'), ('c', 'd'), ('e', 'f')),
2316    ))
2317def test_data_argument_accepts_tuples(data):
2318    """Ensure that the data argument will accept tuples of strings
2319    and properly encode them.
2320    """
2321    p = PreparedRequest()
2322    p.prepare(
2323        method='GET',
2324        url='http://www.example.com',
2325        data=data,
2326        hooks=default_hooks()
2327    )
2328    assert p.body == urlencode(data)
2329
2330
2331@pytest.mark.parametrize(
2332    'kwargs', (
2333        None,
2334        {
2335            'method': 'GET',
2336            'url': 'http://www.example.com',
2337            'data': 'foo=bar',
2338            'hooks': default_hooks()
2339        },
2340        {
2341            'method': 'GET',
2342            'url': 'http://www.example.com',
2343            'data': 'foo=bar',
2344            'hooks': default_hooks(),
2345            'cookies': {'foo': 'bar'}
2346        },
2347        {
2348            'method': 'GET',
2349            'url': u('http://www.example.com/üniçø∂é')
2350        },
2351    ))
2352def test_prepared_copy(kwargs):
2353    p = PreparedRequest()
2354    if kwargs:
2355        p.prepare(**kwargs)
2356    copy = p.copy()
2357    for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'):
2358        assert getattr(p, attr) == getattr(copy, attr)
2359
2360
2361def test_urllib3_retries(httpbin):
2362    from urllib3.util import Retry
2363    s = requests.Session()
2364    s.mount('http://', HTTPAdapter(max_retries=Retry(
2365        total=2, status_forcelist=[500]
2366    )))
2367
2368    with pytest.raises(RetryError):
2369        s.get(httpbin('status/500'))
2370
2371
2372def test_urllib3_pool_connection_closed(httpbin):
2373    s = requests.Session()
2374    s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
2375
2376    try:
2377        s.get(httpbin('status/200'))
2378    except ConnectionError as e:
2379        assert u"Pool is closed." in str(e)
2380
2381
2382class TestPreparingURLs(object):
2383    @pytest.mark.parametrize(
2384        'url,expected',
2385        (
2386            ('http://google.com', 'http://google.com/'),
2387            (u'http://ジェーピーニック.jp', u'http://xn--hckqz9bzb1cyrb.jp/'),
2388            (u'http://xn--n3h.net/', u'http://xn--n3h.net/'),
2389            (
2390                u'http://ジェーピーニック.jp'.encode('utf-8'),
2391                u'http://xn--hckqz9bzb1cyrb.jp/'
2392            ),
2393            (
2394                u'http://straße.de/straße',
2395                u'http://xn--strae-oqa.de/stra%C3%9Fe'
2396            ),
2397            (
2398                u'http://straße.de/straße'.encode('utf-8'),
2399                u'http://xn--strae-oqa.de/stra%C3%9Fe'
2400            ),
2401            (
2402                u'http://Königsgäßchen.de/straße',
2403                u'http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe'
2404            ),
2405            (
2406                u'http://Königsgäßchen.de/straße'.encode('utf-8'),
2407                u'http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe'
2408            ),
2409            (
2410                b'http://xn--n3h.net/',
2411                u'http://xn--n3h.net/'
2412            ),
2413            (
2414                b'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/',
2415                u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/'
2416            ),
2417            (
2418                u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/',
2419                u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/'
2420            )
2421        )
2422    )
2423    def test_preparing_url(self, url, expected):
2424
2425        def normalize_percent_encode(x):
2426            # Helper function that normalizes equivalent
2427            # percent-encoded bytes before comparisons
2428            for c in re.findall(r'%[a-fA-F0-9]{2}', x):
2429                x = x.replace(c, c.upper())
2430            return x
2431
2432        r = requests.Request('GET', url=url)
2433        p = r.prepare()
2434        assert normalize_percent_encode(p.url) == expected
2435
2436    @pytest.mark.parametrize(
2437        'url',
2438        (
2439            b"http://*.google.com",
2440            b"http://*",
2441            u"http://*.google.com",
2442            u"http://*",
2443            u"http://☃.net/"
2444        )
2445    )
2446    def test_preparing_bad_url(self, url):
2447        r = requests.Request('GET', url=url)
2448        with pytest.raises(requests.exceptions.InvalidURL):
2449            r.prepare()
2450
2451    @pytest.mark.parametrize(
2452        'url, exception',
2453        (
2454            ('http://localhost:-1', InvalidURL),
2455        )
2456    )
2457    def test_redirecting_to_bad_url(self, httpbin, url, exception):
2458        with pytest.raises(exception):
2459            r = requests.get(httpbin('redirect-to'), params={'url': url})
2460
2461    @pytest.mark.parametrize(
2462        'input, expected',
2463        (
2464            (
2465                b"http+unix://%2Fvar%2Frun%2Fsocket/path%7E",
2466                u"http+unix://%2Fvar%2Frun%2Fsocket/path~",
2467            ),
2468            (
2469                u"http+unix://%2Fvar%2Frun%2Fsocket/path%7E",
2470                u"http+unix://%2Fvar%2Frun%2Fsocket/path~",
2471            ),
2472            (
2473                b"mailto:user@example.org",
2474                u"mailto:user@example.org",
2475            ),
2476            (
2477                u"mailto:user@example.org",
2478                u"mailto:user@example.org",
2479            ),
2480            (
2481                b"data:SSDimaUgUHl0aG9uIQ==",
2482                u"data:SSDimaUgUHl0aG9uIQ==",
2483            )
2484        )
2485    )
2486    def test_url_mutation(self, input, expected):
2487        """
2488        This test validates that we correctly exclude some URLs from
2489        preparation, and that we handle others. Specifically, it tests that
2490        any URL whose scheme doesn't begin with "http" is left alone, and
2491        those whose scheme *does* begin with "http" are mutated.
2492        """
2493        r = requests.Request('GET', url=input)
2494        p = r.prepare()
2495        assert p.url == expected
2496
2497    @pytest.mark.parametrize(
2498        'input, params, expected',
2499        (
2500            (
2501                b"http+unix://%2Fvar%2Frun%2Fsocket/path",
2502                {"key": "value"},
2503                u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
2504            ),
2505            (
2506                u"http+unix://%2Fvar%2Frun%2Fsocket/path",
2507                {"key": "value"},
2508                u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
2509            ),
2510            (
2511                b"mailto:user@example.org",
2512                {"key": "value"},
2513                u"mailto:user@example.org",
2514            ),
2515            (
2516                u"mailto:user@example.org",
2517                {"key": "value"},
2518                u"mailto:user@example.org",
2519            ),
2520        )
2521    )
2522    def test_parameters_for_nonstandard_schemes(self, input, params, expected):
2523        """
2524        Setting parameters for nonstandard schemes is allowed if those schemes
2525        begin with "http", and is forbidden otherwise.
2526        """
2527        r = requests.Request('GET', url=input, params=params)
2528        p = r.prepare()
2529        assert p.url == expected
2530