1# -*- coding: utf-8 -*- 2 3"""Tests for Requests.""" 4 5from __future__ import division 6import json 7import os 8import pickle 9import collections 10import contextlib 11import warnings 12import re 13 14import io 15import requests 16import pytest 17from requests.adapters import HTTPAdapter 18from requests.auth import HTTPDigestAuth, _basic_auth_str 19from requests.compat import ( 20 Morsel, cookielib, getproxies, str, urlparse, 21 builtin_str) 22from requests.cookies import ( 23 cookiejar_from_dict, morsel_to_cookie) 24from requests.exceptions import ( 25 ConnectionError, ConnectTimeout, InvalidSchema, InvalidURL, 26 MissingSchema, ReadTimeout, Timeout, RetryError, TooManyRedirects, 27 ProxyError, InvalidHeader, UnrewindableBodyError, SSLError, InvalidProxyURL) 28from requests.models import PreparedRequest 29from requests.structures import CaseInsensitiveDict 30from requests.sessions import SessionRedirectMixin 31from requests.models import urlencode 32from requests.hooks import default_hooks 33from requests.compat import MutableMapping 34 35from .compat import StringIO, u 36from .utils import override_environ 37from urllib3.util import Timeout as Urllib3Timeout 38 39# Requests to this URL should always fail with a connection timeout (nothing 40# listening on that port) 41TARPIT = 'http://10.255.255.1' 42 43try: 44 from ssl import SSLContext 45 del SSLContext 46 HAS_MODERN_SSL = True 47except ImportError: 48 HAS_MODERN_SSL = False 49 50try: 51 requests.pyopenssl 52 HAS_PYOPENSSL = True 53except AttributeError: 54 HAS_PYOPENSSL = False 55 56 57class TestRequests: 58 59 digest_auth_algo = ('MD5', 'SHA-256', 'SHA-512') 60 61 def test_entry_points(self): 62 63 requests.session 64 requests.session().get 65 requests.session().head 66 requests.get 67 requests.head 68 requests.put 69 requests.patch 70 requests.post 71 # Not really an entry point, but people rely on it. 72 from requests.packages.urllib3.poolmanager import PoolManager 73 74 @pytest.mark.parametrize( 75 'exception, url', ( 76 (MissingSchema, 'hiwpefhipowhefopw'), 77 (InvalidSchema, 'localhost:3128'), 78 (InvalidSchema, 'localhost.localdomain:3128/'), 79 (InvalidSchema, '10.122.1.1:3128/'), 80 (InvalidURL, 'http://'), 81 )) 82 def test_invalid_url(self, exception, url): 83 with pytest.raises(exception): 84 requests.get(url) 85 86 def test_basic_building(self): 87 req = requests.Request() 88 req.url = 'http://kennethreitz.org/' 89 req.data = {'life': '42'} 90 91 pr = req.prepare() 92 assert pr.url == req.url 93 assert pr.body == 'life=42' 94 95 @pytest.mark.parametrize('method', ('GET', 'HEAD')) 96 def test_no_content_length(self, httpbin, method): 97 req = requests.Request(method, httpbin(method.lower())).prepare() 98 assert 'Content-Length' not in req.headers 99 100 @pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS')) 101 def test_no_body_content_length(self, httpbin, method): 102 req = requests.Request(method, httpbin(method.lower())).prepare() 103 assert req.headers['Content-Length'] == '0' 104 105 @pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS')) 106 def test_empty_content_length(self, httpbin, method): 107 req = requests.Request(method, httpbin(method.lower()), data='').prepare() 108 assert req.headers['Content-Length'] == '0' 109 110 def test_override_content_length(self, httpbin): 111 headers = { 112 'Content-Length': 'not zero' 113 } 114 r = requests.Request('POST', httpbin('post'), headers=headers).prepare() 115 assert 'Content-Length' in r.headers 116 assert r.headers['Content-Length'] == 'not zero' 117 118 def test_path_is_not_double_encoded(self): 119 request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare() 120 121 assert request.path_url == '/get/test%20case' 122 123 @pytest.mark.parametrize( 124 'url, expected', ( 125 ('http://example.com/path#fragment', 'http://example.com/path?a=b#fragment'), 126 ('http://example.com/path?key=value#fragment', 'http://example.com/path?key=value&a=b#fragment') 127 )) 128 def test_params_are_added_before_fragment(self, url, expected): 129 request = requests.Request('GET', url, params={"a": "b"}).prepare() 130 assert request.url == expected 131 132 def test_params_original_order_is_preserved_by_default(self): 133 param_ordered_dict = collections.OrderedDict((('z', 1), ('a', 1), ('k', 1), ('d', 1))) 134 session = requests.Session() 135 request = requests.Request('GET', 'http://example.com/', params=param_ordered_dict) 136 prep = session.prepare_request(request) 137 assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1' 138 139 def test_params_bytes_are_encoded(self): 140 request = requests.Request('GET', 'http://example.com', 141 params=b'test=foo').prepare() 142 assert request.url == 'http://example.com/?test=foo' 143 144 def test_binary_put(self): 145 request = requests.Request('PUT', 'http://example.com', 146 data=u"ööö".encode("utf-8")).prepare() 147 assert isinstance(request.body, bytes) 148 149 def test_whitespaces_are_removed_from_url(self): 150 # Test for issue #3696 151 request = requests.Request('GET', ' http://example.com').prepare() 152 assert request.url == 'http://example.com/' 153 154 @pytest.mark.parametrize('scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://')) 155 def test_mixed_case_scheme_acceptable(self, httpbin, scheme): 156 s = requests.Session() 157 s.proxies = getproxies() 158 parts = urlparse(httpbin('get')) 159 url = scheme + parts.netloc + parts.path 160 r = requests.Request('GET', url) 161 r = s.send(r.prepare()) 162 assert r.status_code == 200, 'failed for scheme {}'.format(scheme) 163 164 def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin): 165 r = requests.Request('GET', httpbin('get')) 166 s = requests.Session() 167 s.proxies = getproxies() 168 169 r = s.send(r.prepare()) 170 171 assert r.status_code == 200 172 173 def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin): 174 r = requests.get(httpbin('redirect', '1')) 175 assert r.status_code == 200 176 assert r.history[0].status_code == 302 177 assert r.history[0].is_redirect 178 179 def test_HTTP_307_ALLOW_REDIRECT_POST(self, httpbin): 180 r = requests.post(httpbin('redirect-to'), data='test', params={'url': 'post', 'status_code': 307}) 181 assert r.status_code == 200 182 assert r.history[0].status_code == 307 183 assert r.history[0].is_redirect 184 assert r.json()['data'] == 'test' 185 186 def test_HTTP_307_ALLOW_REDIRECT_POST_WITH_SEEKABLE(self, httpbin): 187 byte_str = b'test' 188 r = requests.post(httpbin('redirect-to'), data=io.BytesIO(byte_str), params={'url': 'post', 'status_code': 307}) 189 assert r.status_code == 200 190 assert r.history[0].status_code == 307 191 assert r.history[0].is_redirect 192 assert r.json()['data'] == byte_str.decode('utf-8') 193 194 def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin): 195 try: 196 requests.get(httpbin('relative-redirect', '50')) 197 except TooManyRedirects as e: 198 url = httpbin('relative-redirect', '20') 199 assert e.request.url == url 200 assert e.response.url == url 201 assert len(e.response.history) == 30 202 else: 203 pytest.fail('Expected redirect to raise TooManyRedirects but it did not') 204 205 def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin): 206 s = requests.session() 207 s.max_redirects = 5 208 try: 209 s.get(httpbin('relative-redirect', '50')) 210 except TooManyRedirects as e: 211 url = httpbin('relative-redirect', '45') 212 assert e.request.url == url 213 assert e.response.url == url 214 assert len(e.response.history) == 5 215 else: 216 pytest.fail('Expected custom max number of redirects to be respected but was not') 217 218 def test_http_301_changes_post_to_get(self, httpbin): 219 r = requests.post(httpbin('status', '301')) 220 assert r.status_code == 200 221 assert r.request.method == 'GET' 222 assert r.history[0].status_code == 301 223 assert r.history[0].is_redirect 224 225 def test_http_301_doesnt_change_head_to_get(self, httpbin): 226 r = requests.head(httpbin('status', '301'), allow_redirects=True) 227 print(r.content) 228 assert r.status_code == 200 229 assert r.request.method == 'HEAD' 230 assert r.history[0].status_code == 301 231 assert r.history[0].is_redirect 232 233 def test_http_302_changes_post_to_get(self, httpbin): 234 r = requests.post(httpbin('status', '302')) 235 assert r.status_code == 200 236 assert r.request.method == 'GET' 237 assert r.history[0].status_code == 302 238 assert r.history[0].is_redirect 239 240 def test_http_302_doesnt_change_head_to_get(self, httpbin): 241 r = requests.head(httpbin('status', '302'), allow_redirects=True) 242 assert r.status_code == 200 243 assert r.request.method == 'HEAD' 244 assert r.history[0].status_code == 302 245 assert r.history[0].is_redirect 246 247 def test_http_303_changes_post_to_get(self, httpbin): 248 r = requests.post(httpbin('status', '303')) 249 assert r.status_code == 200 250 assert r.request.method == 'GET' 251 assert r.history[0].status_code == 303 252 assert r.history[0].is_redirect 253 254 def test_http_303_doesnt_change_head_to_get(self, httpbin): 255 r = requests.head(httpbin('status', '303'), allow_redirects=True) 256 assert r.status_code == 200 257 assert r.request.method == 'HEAD' 258 assert r.history[0].status_code == 303 259 assert r.history[0].is_redirect 260 261 def test_header_and_body_removal_on_redirect(self, httpbin): 262 purged_headers = ('Content-Length', 'Content-Type') 263 ses = requests.Session() 264 req = requests.Request('POST', httpbin('post'), data={'test': 'data'}) 265 prep = ses.prepare_request(req) 266 resp = ses.send(prep) 267 268 # Mimic a redirect response 269 resp.status_code = 302 270 resp.headers['location'] = 'get' 271 272 # Run request through resolve_redirects 273 next_resp = next(ses.resolve_redirects(resp, prep)) 274 assert next_resp.request.body is None 275 for header in purged_headers: 276 assert header not in next_resp.request.headers 277 278 def test_transfer_enc_removal_on_redirect(self, httpbin): 279 purged_headers = ('Transfer-Encoding', 'Content-Type') 280 ses = requests.Session() 281 req = requests.Request('POST', httpbin('post'), data=(b'x' for x in range(1))) 282 prep = ses.prepare_request(req) 283 assert 'Transfer-Encoding' in prep.headers 284 285 # Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33 286 resp = requests.Response() 287 resp.raw = io.BytesIO(b'the content') 288 resp.request = prep 289 setattr(resp.raw, 'release_conn', lambda *args: args) 290 291 # Mimic a redirect response 292 resp.status_code = 302 293 resp.headers['location'] = httpbin('get') 294 295 # Run request through resolve_redirect 296 next_resp = next(ses.resolve_redirects(resp, prep)) 297 assert next_resp.request.body is None 298 for header in purged_headers: 299 assert header not in next_resp.request.headers 300 301 def test_fragment_maintained_on_redirect(self, httpbin): 302 fragment = "#view=edit&token=hunter2" 303 r = requests.get(httpbin('redirect-to?url=get')+fragment) 304 305 assert len(r.history) > 0 306 assert r.history[0].request.url == httpbin('redirect-to?url=get')+fragment 307 assert r.url == httpbin('get')+fragment 308 309 def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin): 310 heads = {'User-agent': 'Mozilla/5.0'} 311 312 r = requests.get(httpbin('user-agent'), headers=heads) 313 314 assert heads['User-agent'] in r.text 315 assert r.status_code == 200 316 317 def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin): 318 heads = {'User-agent': 'Mozilla/5.0'} 319 320 r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads) 321 assert r.status_code == 200 322 323 def test_set_cookie_on_301(self, httpbin): 324 s = requests.session() 325 url = httpbin('cookies/set?foo=bar') 326 s.get(url) 327 assert s.cookies['foo'] == 'bar' 328 329 def test_cookie_sent_on_redirect(self, httpbin): 330 s = requests.session() 331 s.get(httpbin('cookies/set?foo=bar')) 332 r = s.get(httpbin('redirect/1')) # redirects to httpbin('get') 333 assert 'Cookie' in r.json()['headers'] 334 335 def test_cookie_removed_on_expire(self, httpbin): 336 s = requests.session() 337 s.get(httpbin('cookies/set?foo=bar')) 338 assert s.cookies['foo'] == 'bar' 339 s.get( 340 httpbin('response-headers'), 341 params={ 342 'Set-Cookie': 343 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT' 344 } 345 ) 346 assert 'foo' not in s.cookies 347 348 def test_cookie_quote_wrapped(self, httpbin): 349 s = requests.session() 350 s.get(httpbin('cookies/set?foo="bar:baz"')) 351 assert s.cookies['foo'] == '"bar:baz"' 352 353 def test_cookie_persists_via_api(self, httpbin): 354 s = requests.session() 355 r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'}) 356 assert 'foo' in r.request.headers['Cookie'] 357 assert 'foo' in r.history[0].request.headers['Cookie'] 358 359 def test_request_cookie_overrides_session_cookie(self, httpbin): 360 s = requests.session() 361 s.cookies['foo'] = 'bar' 362 r = s.get(httpbin('cookies'), cookies={'foo': 'baz'}) 363 assert r.json()['cookies']['foo'] == 'baz' 364 # Session cookie should not be modified 365 assert s.cookies['foo'] == 'bar' 366 367 def test_request_cookies_not_persisted(self, httpbin): 368 s = requests.session() 369 s.get(httpbin('cookies'), cookies={'foo': 'baz'}) 370 # Sending a request with cookies should not add cookies to the session 371 assert not s.cookies 372 373 def test_generic_cookiejar_works(self, httpbin): 374 cj = cookielib.CookieJar() 375 cookiejar_from_dict({'foo': 'bar'}, cj) 376 s = requests.session() 377 s.cookies = cj 378 r = s.get(httpbin('cookies')) 379 # Make sure the cookie was sent 380 assert r.json()['cookies']['foo'] == 'bar' 381 # Make sure the session cj is still the custom one 382 assert s.cookies is cj 383 384 def test_param_cookiejar_works(self, httpbin): 385 cj = cookielib.CookieJar() 386 cookiejar_from_dict({'foo': 'bar'}, cj) 387 s = requests.session() 388 r = s.get(httpbin('cookies'), cookies=cj) 389 # Make sure the cookie was sent 390 assert r.json()['cookies']['foo'] == 'bar' 391 392 def test_cookielib_cookiejar_on_redirect(self, httpbin): 393 """Tests resolve_redirect doesn't fail when merging cookies 394 with non-RequestsCookieJar cookiejar. 395 396 See GH #3579 397 """ 398 cj = cookiejar_from_dict({'foo': 'bar'}, cookielib.CookieJar()) 399 s = requests.Session() 400 s.cookies = cookiejar_from_dict({'cookie': 'tasty'}) 401 402 # Prepare request without using Session 403 req = requests.Request('GET', httpbin('headers'), cookies=cj) 404 prep_req = req.prepare() 405 406 # Send request and simulate redirect 407 resp = s.send(prep_req) 408 resp.status_code = 302 409 resp.headers['location'] = httpbin('get') 410 redirects = s.resolve_redirects(resp, prep_req) 411 resp = next(redirects) 412 413 # Verify CookieJar isn't being converted to RequestsCookieJar 414 assert isinstance(prep_req._cookies, cookielib.CookieJar) 415 assert isinstance(resp.request._cookies, cookielib.CookieJar) 416 assert not isinstance(resp.request._cookies, requests.cookies.RequestsCookieJar) 417 418 cookies = {} 419 for c in resp.request._cookies: 420 cookies[c.name] = c.value 421 assert cookies['foo'] == 'bar' 422 assert cookies['cookie'] == 'tasty' 423 424 def test_requests_in_history_are_not_overridden(self, httpbin): 425 resp = requests.get(httpbin('redirect/3')) 426 urls = [r.url for r in resp.history] 427 req_urls = [r.request.url for r in resp.history] 428 assert urls == req_urls 429 430 def test_history_is_always_a_list(self, httpbin): 431 """Show that even with redirects, Response.history is always a list.""" 432 resp = requests.get(httpbin('get')) 433 assert isinstance(resp.history, list) 434 resp = requests.get(httpbin('redirect/1')) 435 assert isinstance(resp.history, list) 436 assert not isinstance(resp.history, tuple) 437 438 def test_headers_on_session_with_None_are_not_sent(self, httpbin): 439 """Do not send headers in Session.headers with None values.""" 440 ses = requests.Session() 441 ses.headers['Accept-Encoding'] = None 442 req = requests.Request('GET', httpbin('get')) 443 prep = ses.prepare_request(req) 444 assert 'Accept-Encoding' not in prep.headers 445 446 def test_headers_preserve_order(self, httpbin): 447 """Preserve order when headers provided as OrderedDict.""" 448 ses = requests.Session() 449 ses.headers = collections.OrderedDict() 450 ses.headers['Accept-Encoding'] = 'identity' 451 ses.headers['First'] = '1' 452 ses.headers['Second'] = '2' 453 headers = collections.OrderedDict([('Third', '3'), ('Fourth', '4')]) 454 headers['Fifth'] = '5' 455 headers['Second'] = '222' 456 req = requests.Request('GET', httpbin('get'), headers=headers) 457 prep = ses.prepare_request(req) 458 items = list(prep.headers.items()) 459 assert items[0] == ('Accept-Encoding', 'identity') 460 assert items[1] == ('First', '1') 461 assert items[2] == ('Second', '222') 462 assert items[3] == ('Third', '3') 463 assert items[4] == ('Fourth', '4') 464 assert items[5] == ('Fifth', '5') 465 466 @pytest.mark.parametrize('key', ('User-agent', 'user-agent')) 467 def test_user_agent_transfers(self, httpbin, key): 468 469 heads = {key: 'Mozilla/5.0 (github.com/psf/requests)'} 470 471 r = requests.get(httpbin('user-agent'), headers=heads) 472 assert heads[key] in r.text 473 474 def test_HTTP_200_OK_HEAD(self, httpbin): 475 r = requests.head(httpbin('get')) 476 assert r.status_code == 200 477 478 def test_HTTP_200_OK_PUT(self, httpbin): 479 r = requests.put(httpbin('put')) 480 assert r.status_code == 200 481 482 def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin): 483 auth = ('user', 'pass') 484 url = httpbin('basic-auth', 'user', 'pass') 485 486 r = requests.get(url, auth=auth) 487 assert r.status_code == 200 488 489 r = requests.get(url) 490 assert r.status_code == 401 491 492 s = requests.session() 493 s.auth = auth 494 r = s.get(url) 495 assert r.status_code == 200 496 497 @pytest.mark.parametrize( 498 'username, password', ( 499 ('user', 'pass'), 500 (u'имя'.encode('utf-8'), u'пароль'.encode('utf-8')), 501 (42, 42), 502 (None, None), 503 )) 504 def test_set_basicauth(self, httpbin, username, password): 505 auth = (username, password) 506 url = httpbin('get') 507 508 r = requests.Request('GET', url, auth=auth) 509 p = r.prepare() 510 511 assert p.headers['Authorization'] == _basic_auth_str(username, password) 512 513 def test_basicauth_encodes_byte_strings(self): 514 """Ensure b'test' formats as the byte string "test" rather 515 than the unicode string "b'test'" in Python 3. 516 """ 517 auth = (b'\xc5\xafsername', b'test\xc6\xb6') 518 r = requests.Request('GET', 'http://localhost', auth=auth) 519 p = r.prepare() 520 521 assert p.headers['Authorization'] == 'Basic xa9zZXJuYW1lOnRlc3TGtg==' 522 523 @pytest.mark.parametrize( 524 'url, exception', ( 525 # Connecting to an unknown domain should raise a ConnectionError 526 ('http://doesnotexist.google.com', ConnectionError), 527 # Connecting to an invalid port should raise a ConnectionError 528 ('http://localhost:1', ConnectionError), 529 # Inputing a URL that cannot be parsed should raise an InvalidURL error 530 ('http://fe80::5054:ff:fe5a:fc0', InvalidURL) 531 )) 532 def test_errors(self, url, exception): 533 with pytest.raises(exception): 534 requests.get(url, timeout=1) 535 536 def test_proxy_error(self): 537 # any proxy related error (address resolution, no route to host, etc) should result in a ProxyError 538 with pytest.raises(ProxyError): 539 requests.get('http://localhost:1', proxies={'http': 'non-resolvable-address'}) 540 541 def test_proxy_error_on_bad_url(self, httpbin, httpbin_secure): 542 with pytest.raises(InvalidProxyURL): 543 requests.get(httpbin_secure(), proxies={'https': 'http:/badproxyurl:3128'}) 544 545 with pytest.raises(InvalidProxyURL): 546 requests.get(httpbin(), proxies={'http': 'http://:8080'}) 547 548 with pytest.raises(InvalidProxyURL): 549 requests.get(httpbin_secure(), proxies={'https': 'https://'}) 550 551 with pytest.raises(InvalidProxyURL): 552 requests.get(httpbin(), proxies={'http': 'http:///example.com:8080'}) 553 554 def test_basicauth_with_netrc(self, httpbin): 555 auth = ('user', 'pass') 556 wrong_auth = ('wronguser', 'wrongpass') 557 url = httpbin('basic-auth', 'user', 'pass') 558 559 old_auth = requests.sessions.get_netrc_auth 560 561 try: 562 def get_netrc_auth_mock(url): 563 return auth 564 requests.sessions.get_netrc_auth = get_netrc_auth_mock 565 566 # Should use netrc and work. 567 r = requests.get(url) 568 assert r.status_code == 200 569 570 # Given auth should override and fail. 571 r = requests.get(url, auth=wrong_auth) 572 assert r.status_code == 401 573 574 s = requests.session() 575 576 # Should use netrc and work. 577 r = s.get(url) 578 assert r.status_code == 200 579 580 # Given auth should override and fail. 581 s.auth = wrong_auth 582 r = s.get(url) 583 assert r.status_code == 401 584 finally: 585 requests.sessions.get_netrc_auth = old_auth 586 587 def test_DIGEST_HTTP_200_OK_GET(self, httpbin): 588 589 for authtype in self.digest_auth_algo: 590 auth = HTTPDigestAuth('user', 'pass') 591 url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype, 'never') 592 593 r = requests.get(url, auth=auth) 594 assert r.status_code == 200 595 596 r = requests.get(url) 597 assert r.status_code == 401 598 print(r.headers['WWW-Authenticate']) 599 600 s = requests.session() 601 s.auth = HTTPDigestAuth('user', 'pass') 602 r = s.get(url) 603 assert r.status_code == 200 604 605 def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin): 606 607 for authtype in self.digest_auth_algo: 608 url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype) 609 auth = HTTPDigestAuth('user', 'pass') 610 r = requests.get(url) 611 assert r.cookies['fake'] == 'fake_value' 612 613 r = requests.get(url, auth=auth) 614 assert r.status_code == 200 615 616 def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin): 617 618 for authtype in self.digest_auth_algo: 619 url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype) 620 auth = HTTPDigestAuth('user', 'pass') 621 s = requests.Session() 622 s.get(url, auth=auth) 623 assert s.cookies['fake'] == 'fake_value' 624 625 def test_DIGEST_STREAM(self, httpbin): 626 627 for authtype in self.digest_auth_algo: 628 auth = HTTPDigestAuth('user', 'pass') 629 url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype) 630 631 r = requests.get(url, auth=auth, stream=True) 632 assert r.raw.read() != b'' 633 634 r = requests.get(url, auth=auth, stream=False) 635 assert r.raw.read() == b'' 636 637 def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin): 638 639 for authtype in self.digest_auth_algo: 640 auth = HTTPDigestAuth('user', 'wrongpass') 641 url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype) 642 643 r = requests.get(url, auth=auth) 644 assert r.status_code == 401 645 646 r = requests.get(url) 647 assert r.status_code == 401 648 649 s = requests.session() 650 s.auth = auth 651 r = s.get(url) 652 assert r.status_code == 401 653 654 def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin): 655 656 for authtype in self.digest_auth_algo: 657 auth = HTTPDigestAuth('user', 'pass') 658 url = httpbin('digest-auth', 'auth', 'user', 'pass', authtype) 659 660 r = requests.get(url, auth=auth) 661 assert '"auth"' in r.request.headers['Authorization'] 662 663 def test_POSTBIN_GET_POST_FILES(self, httpbin): 664 665 url = httpbin('post') 666 requests.post(url).raise_for_status() 667 668 post1 = requests.post(url, data={'some': 'data'}) 669 assert post1.status_code == 200 670 671 with open('requirements-dev.txt') as f: 672 post2 = requests.post(url, files={'some': f}) 673 assert post2.status_code == 200 674 675 post4 = requests.post(url, data='[{"some": "json"}]') 676 assert post4.status_code == 200 677 678 with pytest.raises(ValueError): 679 requests.post(url, files=['bad file data']) 680 681 def test_invalid_files_input(self, httpbin): 682 683 url = httpbin('post') 684 post = requests.post(url, 685 files={"random-file-1": None, "random-file-2": 1}) 686 assert b'name="random-file-1"' not in post.request.body 687 assert b'name="random-file-2"' in post.request.body 688 689 def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin): 690 691 class TestStream(object): 692 def __init__(self, data): 693 self.data = data.encode() 694 self.length = len(self.data) 695 self.index = 0 696 697 def __len__(self): 698 return self.length 699 700 def read(self, size=None): 701 if size: 702 ret = self.data[self.index:self.index + size] 703 self.index += size 704 else: 705 ret = self.data[self.index:] 706 self.index = self.length 707 return ret 708 709 def tell(self): 710 return self.index 711 712 def seek(self, offset, where=0): 713 if where == 0: 714 self.index = offset 715 elif where == 1: 716 self.index += offset 717 elif where == 2: 718 self.index = self.length + offset 719 720 test = TestStream('test') 721 post1 = requests.post(httpbin('post'), data=test) 722 assert post1.status_code == 200 723 assert post1.json()['data'] == 'test' 724 725 test = TestStream('test') 726 test.seek(2) 727 post2 = requests.post(httpbin('post'), data=test) 728 assert post2.status_code == 200 729 assert post2.json()['data'] == 'st' 730 731 def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin): 732 733 url = httpbin('post') 734 requests.post(url).raise_for_status() 735 736 post1 = requests.post(url, data={'some': 'data'}) 737 assert post1.status_code == 200 738 739 with open('requirements-dev.txt') as f: 740 post2 = requests.post(url, data={'some': 'data'}, files={'some': f}) 741 assert post2.status_code == 200 742 743 post4 = requests.post(url, data='[{"some": "json"}]') 744 assert post4.status_code == 200 745 746 with pytest.raises(ValueError): 747 requests.post(url, files=['bad file data']) 748 749 def test_post_with_custom_mapping(self, httpbin): 750 class CustomMapping(MutableMapping): 751 def __init__(self, *args, **kwargs): 752 self.data = dict(*args, **kwargs) 753 754 def __delitem__(self, key): 755 del self.data[key] 756 757 def __getitem__(self, key): 758 return self.data[key] 759 760 def __setitem__(self, key, value): 761 self.data[key] = value 762 763 def __iter__(self): 764 return iter(self.data) 765 766 def __len__(self): 767 return len(self.data) 768 769 data = CustomMapping({'some': 'data'}) 770 url = httpbin('post') 771 found_json = requests.post(url, data=data).json().get('form') 772 assert found_json == {'some': 'data'} 773 774 def test_conflicting_post_params(self, httpbin): 775 url = httpbin('post') 776 with open('requirements-dev.txt') as f: 777 with pytest.raises(ValueError): 778 requests.post(url, data='[{"some": "data"}]', files={'some': f}) 779 with pytest.raises(ValueError): 780 requests.post(url, data=u('[{"some": "data"}]'), files={'some': f}) 781 782 def test_request_ok_set(self, httpbin): 783 r = requests.get(httpbin('status', '404')) 784 assert not r.ok 785 786 def test_status_raising(self, httpbin): 787 r = requests.get(httpbin('status', '404')) 788 with pytest.raises(requests.exceptions.HTTPError): 789 r.raise_for_status() 790 791 r = requests.get(httpbin('status', '500')) 792 assert not r.ok 793 794 def test_decompress_gzip(self, httpbin): 795 r = requests.get(httpbin('gzip')) 796 r.content.decode('ascii') 797 798 @pytest.mark.parametrize( 799 'url, params', ( 800 ('/get', {'foo': 'føø'}), 801 ('/get', {'føø': 'føø'}), 802 ('/get', {'føø': 'føø'}), 803 ('/get', {'foo': 'foo'}), 804 ('ø', {'foo': 'foo'}), 805 )) 806 def test_unicode_get(self, httpbin, url, params): 807 requests.get(httpbin(url), params=params) 808 809 def test_unicode_header_name(self, httpbin): 810 requests.put( 811 httpbin('put'), 812 headers={str('Content-Type'): 'application/octet-stream'}, 813 data='\xff') # compat.str is unicode. 814 815 def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle): 816 requests.get(httpbin_secure('status', '301'), verify=httpbin_ca_bundle) 817 818 def test_invalid_ca_certificate_path(self, httpbin_secure): 819 INVALID_PATH = '/garbage' 820 with pytest.raises(IOError) as e: 821 requests.get(httpbin_secure(), verify=INVALID_PATH) 822 assert str(e.value) == 'Could not find a suitable TLS CA certificate bundle, invalid path: {}'.format(INVALID_PATH) 823 824 def test_invalid_ssl_certificate_files(self, httpbin_secure): 825 INVALID_PATH = '/garbage' 826 with pytest.raises(IOError) as e: 827 requests.get(httpbin_secure(), cert=INVALID_PATH) 828 assert str(e.value) == 'Could not find the TLS certificate file, invalid path: {}'.format(INVALID_PATH) 829 830 with pytest.raises(IOError) as e: 831 requests.get(httpbin_secure(), cert=('.', INVALID_PATH)) 832 assert str(e.value) == 'Could not find the TLS key file, invalid path: {}'.format(INVALID_PATH) 833 834 def test_http_with_certificate(self, httpbin): 835 r = requests.get(httpbin(), cert='.') 836 assert r.status_code == 200 837 838 def test_https_warnings(self, httpbin_secure, httpbin_ca_bundle): 839 """warnings are emitted with requests.get""" 840 if HAS_MODERN_SSL or HAS_PYOPENSSL: 841 warnings_expected = () 842 else: 843 warnings_expected = ('SNIMissingWarning', 844 'InsecurePlatformWarning', 845 'SubjectAltNameWarning', ) 846 847 with pytest.warns(None) as warning_records: 848 warnings.simplefilter('always') 849 requests.get(httpbin_secure('status', '200'), 850 verify=httpbin_ca_bundle) 851 852 warning_records = [item for item in warning_records 853 if item.category.__name__ != 'ResourceWarning'] 854 855 warnings_category = tuple( 856 item.category.__name__ for item in warning_records) 857 assert warnings_category == warnings_expected 858 859 def test_certificate_failure(self, httpbin_secure): 860 """ 861 When underlying SSL problems occur, an SSLError is raised. 862 """ 863 with pytest.raises(SSLError): 864 # Our local httpbin does not have a trusted CA, so this call will 865 # fail if we use our default trust bundle. 866 requests.get(httpbin_secure('status', '200')) 867 868 def test_urlencoded_get_query_multivalued_param(self, httpbin): 869 870 r = requests.get(httpbin('get'), params={'test': ['foo', 'baz']}) 871 assert r.status_code == 200 872 assert r.url == httpbin('get?test=foo&test=baz') 873 874 def test_form_encoded_post_query_multivalued_element(self, httpbin): 875 r = requests.Request(method='POST', url=httpbin('post'), 876 data=dict(test=['foo', 'baz'])) 877 prep = r.prepare() 878 assert prep.body == 'test=foo&test=baz' 879 880 def test_different_encodings_dont_break_post(self, httpbin): 881 r = requests.post(httpbin('post'), 882 data={'stuff': json.dumps({'a': 123})}, 883 params={'blah': 'asdf1234'}, 884 files={'file': ('test_requests.py', open(__file__, 'rb'))}) 885 assert r.status_code == 200 886 887 @pytest.mark.parametrize( 888 'data', ( 889 {'stuff': u('ëlïxr')}, 890 {'stuff': u('ëlïxr').encode('utf-8')}, 891 {'stuff': 'elixr'}, 892 {'stuff': 'elixr'.encode('utf-8')}, 893 )) 894 def test_unicode_multipart_post(self, httpbin, data): 895 r = requests.post(httpbin('post'), 896 data=data, 897 files={'file': ('test_requests.py', open(__file__, 'rb'))}) 898 assert r.status_code == 200 899 900 def test_unicode_multipart_post_fieldnames(self, httpbin): 901 filename = os.path.splitext(__file__)[0] + '.py' 902 r = requests.Request( 903 method='POST', url=httpbin('post'), 904 data={'stuff'.encode('utf-8'): 'elixr'}, 905 files={'file': ('test_requests.py', open(filename, 'rb'))}) 906 prep = r.prepare() 907 assert b'name="stuff"' in prep.body 908 assert b'name="b\'stuff\'"' not in prep.body 909 910 def test_unicode_method_name(self, httpbin): 911 files = {'file': open(__file__, 'rb')} 912 r = requests.request( 913 method=u('POST'), url=httpbin('post'), files=files) 914 assert r.status_code == 200 915 916 def test_unicode_method_name_with_request_object(self, httpbin): 917 files = {'file': open(__file__, 'rb')} 918 s = requests.Session() 919 req = requests.Request(u('POST'), httpbin('post'), files=files) 920 prep = s.prepare_request(req) 921 assert isinstance(prep.method, builtin_str) 922 assert prep.method == 'POST' 923 924 resp = s.send(prep) 925 assert resp.status_code == 200 926 927 def test_non_prepared_request_error(self): 928 s = requests.Session() 929 req = requests.Request(u('POST'), '/') 930 931 with pytest.raises(ValueError) as e: 932 s.send(req) 933 assert str(e.value) == 'You can only send PreparedRequests.' 934 935 def test_custom_content_type(self, httpbin): 936 r = requests.post( 937 httpbin('post'), 938 data={'stuff': json.dumps({'a': 123})}, 939 files={ 940 'file1': ('test_requests.py', open(__file__, 'rb')), 941 'file2': ('test_requests', open(__file__, 'rb'), 942 'text/py-content-type')}) 943 assert r.status_code == 200 944 assert b"text/py-content-type" in r.request.body 945 946 def test_hook_receives_request_arguments(self, httpbin): 947 def hook(resp, **kwargs): 948 assert resp is not None 949 assert kwargs != {} 950 951 s = requests.Session() 952 r = requests.Request('GET', httpbin(), hooks={'response': hook}) 953 prep = s.prepare_request(r) 954 s.send(prep) 955 956 def test_session_hooks_are_used_with_no_request_hooks(self, httpbin): 957 hook = lambda x, *args, **kwargs: x 958 s = requests.Session() 959 s.hooks['response'].append(hook) 960 r = requests.Request('GET', httpbin()) 961 prep = s.prepare_request(r) 962 assert prep.hooks['response'] != [] 963 assert prep.hooks['response'] == [hook] 964 965 def test_session_hooks_are_overridden_by_request_hooks(self, httpbin): 966 hook1 = lambda x, *args, **kwargs: x 967 hook2 = lambda x, *args, **kwargs: x 968 assert hook1 is not hook2 969 s = requests.Session() 970 s.hooks['response'].append(hook2) 971 r = requests.Request('GET', httpbin(), hooks={'response': [hook1]}) 972 prep = s.prepare_request(r) 973 assert prep.hooks['response'] == [hook1] 974 975 def test_prepared_request_hook(self, httpbin): 976 def hook(resp, **kwargs): 977 resp.hook_working = True 978 return resp 979 980 req = requests.Request('GET', httpbin(), hooks={'response': hook}) 981 prep = req.prepare() 982 983 s = requests.Session() 984 s.proxies = getproxies() 985 resp = s.send(prep) 986 987 assert hasattr(resp, 'hook_working') 988 989 def test_prepared_from_session(self, httpbin): 990 class DummyAuth(requests.auth.AuthBase): 991 def __call__(self, r): 992 r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok' 993 return r 994 995 req = requests.Request('GET', httpbin('headers')) 996 assert not req.auth 997 998 s = requests.Session() 999 s.auth = DummyAuth() 1000 1001 prep = s.prepare_request(req) 1002 resp = s.send(prep) 1003 1004 assert resp.json()['headers'][ 1005 'Dummy-Auth-Test'] == 'dummy-auth-test-ok' 1006 1007 def test_prepare_request_with_bytestring_url(self): 1008 req = requests.Request('GET', b'https://httpbin.org/') 1009 s = requests.Session() 1010 prep = s.prepare_request(req) 1011 assert prep.url == "https://httpbin.org/" 1012 1013 def test_request_with_bytestring_host(self, httpbin): 1014 s = requests.Session() 1015 resp = s.request( 1016 'GET', 1017 httpbin('cookies/set?cookie=value'), 1018 allow_redirects=False, 1019 headers={'Host': b'httpbin.org'} 1020 ) 1021 assert resp.cookies.get('cookie') == 'value' 1022 1023 def test_links(self): 1024 r = requests.Response() 1025 r.headers = { 1026 'cache-control': 'public, max-age=60, s-maxage=60', 1027 'connection': 'keep-alive', 1028 'content-encoding': 'gzip', 1029 'content-type': 'application/json; charset=utf-8', 1030 'date': 'Sat, 26 Jan 2013 16:47:56 GMT', 1031 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"', 1032 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT', 1033 'link': ('<https://api.github.com/users/kennethreitz/repos?' 1034 'page=2&per_page=10>; rel="next", <https://api.github.' 1035 'com/users/kennethreitz/repos?page=7&per_page=10>; ' 1036 ' rel="last"'), 1037 'server': 'GitHub.com', 1038 'status': '200 OK', 1039 'vary': 'Accept', 1040 'x-content-type-options': 'nosniff', 1041 'x-github-media-type': 'github.beta', 1042 'x-ratelimit-limit': '60', 1043 'x-ratelimit-remaining': '57' 1044 } 1045 assert r.links['next']['rel'] == 'next' 1046 1047 def test_cookie_parameters(self): 1048 key = 'some_cookie' 1049 value = 'some_value' 1050 secure = True 1051 domain = 'test.com' 1052 rest = {'HttpOnly': True} 1053 1054 jar = requests.cookies.RequestsCookieJar() 1055 jar.set(key, value, secure=secure, domain=domain, rest=rest) 1056 1057 assert len(jar) == 1 1058 assert 'some_cookie' in jar 1059 1060 cookie = list(jar)[0] 1061 assert cookie.secure == secure 1062 assert cookie.domain == domain 1063 assert cookie._rest['HttpOnly'] == rest['HttpOnly'] 1064 1065 def test_cookie_as_dict_keeps_len(self): 1066 key = 'some_cookie' 1067 value = 'some_value' 1068 1069 key1 = 'some_cookie1' 1070 value1 = 'some_value1' 1071 1072 jar = requests.cookies.RequestsCookieJar() 1073 jar.set(key, value) 1074 jar.set(key1, value1) 1075 1076 d1 = dict(jar) 1077 d2 = dict(jar.iteritems()) 1078 d3 = dict(jar.items()) 1079 1080 assert len(jar) == 2 1081 assert len(d1) == 2 1082 assert len(d2) == 2 1083 assert len(d3) == 2 1084 1085 def test_cookie_as_dict_keeps_items(self): 1086 key = 'some_cookie' 1087 value = 'some_value' 1088 1089 key1 = 'some_cookie1' 1090 value1 = 'some_value1' 1091 1092 jar = requests.cookies.RequestsCookieJar() 1093 jar.set(key, value) 1094 jar.set(key1, value1) 1095 1096 d1 = dict(jar) 1097 d2 = dict(jar.iteritems()) 1098 d3 = dict(jar.items()) 1099 1100 assert d1['some_cookie'] == 'some_value' 1101 assert d2['some_cookie'] == 'some_value' 1102 assert d3['some_cookie1'] == 'some_value1' 1103 1104 def test_cookie_as_dict_keys(self): 1105 key = 'some_cookie' 1106 value = 'some_value' 1107 1108 key1 = 'some_cookie1' 1109 value1 = 'some_value1' 1110 1111 jar = requests.cookies.RequestsCookieJar() 1112 jar.set(key, value) 1113 jar.set(key1, value1) 1114 1115 keys = jar.keys() 1116 assert keys == list(keys) 1117 # make sure one can use keys multiple times 1118 assert list(keys) == list(keys) 1119 1120 def test_cookie_as_dict_values(self): 1121 key = 'some_cookie' 1122 value = 'some_value' 1123 1124 key1 = 'some_cookie1' 1125 value1 = 'some_value1' 1126 1127 jar = requests.cookies.RequestsCookieJar() 1128 jar.set(key, value) 1129 jar.set(key1, value1) 1130 1131 values = jar.values() 1132 assert values == list(values) 1133 # make sure one can use values multiple times 1134 assert list(values) == list(values) 1135 1136 def test_cookie_as_dict_items(self): 1137 key = 'some_cookie' 1138 value = 'some_value' 1139 1140 key1 = 'some_cookie1' 1141 value1 = 'some_value1' 1142 1143 jar = requests.cookies.RequestsCookieJar() 1144 jar.set(key, value) 1145 jar.set(key1, value1) 1146 1147 items = jar.items() 1148 assert items == list(items) 1149 # make sure one can use items multiple times 1150 assert list(items) == list(items) 1151 1152 def test_cookie_duplicate_names_different_domains(self): 1153 key = 'some_cookie' 1154 value = 'some_value' 1155 domain1 = 'test1.com' 1156 domain2 = 'test2.com' 1157 1158 jar = requests.cookies.RequestsCookieJar() 1159 jar.set(key, value, domain=domain1) 1160 jar.set(key, value, domain=domain2) 1161 assert key in jar 1162 items = jar.items() 1163 assert len(items) == 2 1164 1165 # Verify that CookieConflictError is raised if domain is not specified 1166 with pytest.raises(requests.cookies.CookieConflictError): 1167 jar.get(key) 1168 1169 # Verify that CookieConflictError is not raised if domain is specified 1170 cookie = jar.get(key, domain=domain1) 1171 assert cookie == value 1172 1173 def test_cookie_duplicate_names_raises_cookie_conflict_error(self): 1174 key = 'some_cookie' 1175 value = 'some_value' 1176 path = 'some_path' 1177 1178 jar = requests.cookies.RequestsCookieJar() 1179 jar.set(key, value, path=path) 1180 jar.set(key, value) 1181 with pytest.raises(requests.cookies.CookieConflictError): 1182 jar.get(key) 1183 1184 def test_cookie_policy_copy(self): 1185 class MyCookiePolicy(cookielib.DefaultCookiePolicy): 1186 pass 1187 1188 jar = requests.cookies.RequestsCookieJar() 1189 jar.set_policy(MyCookiePolicy()) 1190 assert isinstance(jar.copy().get_policy(), MyCookiePolicy) 1191 1192 def test_time_elapsed_blank(self, httpbin): 1193 r = requests.get(httpbin('get')) 1194 td = r.elapsed 1195 total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6) 1196 assert total_seconds > 0.0 1197 1198 def test_empty_response_has_content_none(self): 1199 r = requests.Response() 1200 assert r.content is None 1201 1202 def test_response_is_iterable(self): 1203 r = requests.Response() 1204 io = StringIO.StringIO('abc') 1205 read_ = io.read 1206 1207 def read_mock(amt, decode_content=None): 1208 return read_(amt) 1209 setattr(io, 'read', read_mock) 1210 r.raw = io 1211 assert next(iter(r)) 1212 io.close() 1213 1214 def test_response_decode_unicode(self): 1215 """When called with decode_unicode, Response.iter_content should always 1216 return unicode. 1217 """ 1218 r = requests.Response() 1219 r._content_consumed = True 1220 r._content = b'the content' 1221 r.encoding = 'ascii' 1222 1223 chunks = r.iter_content(decode_unicode=True) 1224 assert all(isinstance(chunk, str) for chunk in chunks) 1225 1226 # also for streaming 1227 r = requests.Response() 1228 r.raw = io.BytesIO(b'the content') 1229 r.encoding = 'ascii' 1230 chunks = r.iter_content(decode_unicode=True) 1231 assert all(isinstance(chunk, str) for chunk in chunks) 1232 1233 def test_response_reason_unicode(self): 1234 # check for unicode HTTP status 1235 r = requests.Response() 1236 r.url = u'unicode URL' 1237 r.reason = u'Komponenttia ei löydy'.encode('utf-8') 1238 r.status_code = 404 1239 r.encoding = None 1240 assert not r.ok # old behaviour - crashes here 1241 1242 def test_response_reason_unicode_fallback(self): 1243 # check raise_status falls back to ISO-8859-1 1244 r = requests.Response() 1245 r.url = 'some url' 1246 reason = u'Komponenttia ei löydy' 1247 r.reason = reason.encode('latin-1') 1248 r.status_code = 500 1249 r.encoding = None 1250 with pytest.raises(requests.exceptions.HTTPError) as e: 1251 r.raise_for_status() 1252 assert reason in e.value.args[0] 1253 1254 def test_response_chunk_size_type(self): 1255 """Ensure that chunk_size is passed as None or an integer, otherwise 1256 raise a TypeError. 1257 """ 1258 r = requests.Response() 1259 r.raw = io.BytesIO(b'the content') 1260 chunks = r.iter_content(1) 1261 assert all(len(chunk) == 1 for chunk in chunks) 1262 1263 r = requests.Response() 1264 r.raw = io.BytesIO(b'the content') 1265 chunks = r.iter_content(None) 1266 assert list(chunks) == [b'the content'] 1267 1268 r = requests.Response() 1269 r.raw = io.BytesIO(b'the content') 1270 with pytest.raises(TypeError): 1271 chunks = r.iter_content("1024") 1272 1273 def test_request_and_response_are_pickleable(self, httpbin): 1274 r = requests.get(httpbin('get')) 1275 1276 # verify we can pickle the original request 1277 assert pickle.loads(pickle.dumps(r.request)) 1278 1279 # verify we can pickle the response and that we have access to 1280 # the original request. 1281 pr = pickle.loads(pickle.dumps(r)) 1282 assert r.request.url == pr.request.url 1283 assert r.request.headers == pr.request.headers 1284 1285 def test_prepared_request_is_pickleable(self, httpbin): 1286 p = requests.Request('GET', httpbin('get')).prepare() 1287 1288 # Verify PreparedRequest can be pickled and unpickled 1289 r = pickle.loads(pickle.dumps(p)) 1290 assert r.url == p.url 1291 assert r.headers == p.headers 1292 assert r.body == p.body 1293 1294 # Verify unpickled PreparedRequest sends properly 1295 s = requests.Session() 1296 resp = s.send(r) 1297 assert resp.status_code == 200 1298 1299 def test_prepared_request_with_file_is_pickleable(self, httpbin): 1300 files = {'file': open(__file__, 'rb')} 1301 r = requests.Request('POST', httpbin('post'), files=files) 1302 p = r.prepare() 1303 1304 # Verify PreparedRequest can be pickled and unpickled 1305 r = pickle.loads(pickle.dumps(p)) 1306 assert r.url == p.url 1307 assert r.headers == p.headers 1308 assert r.body == p.body 1309 1310 # Verify unpickled PreparedRequest sends properly 1311 s = requests.Session() 1312 resp = s.send(r) 1313 assert resp.status_code == 200 1314 1315 def test_prepared_request_with_hook_is_pickleable(self, httpbin): 1316 r = requests.Request('GET', httpbin('get'), hooks=default_hooks()) 1317 p = r.prepare() 1318 1319 # Verify PreparedRequest can be pickled 1320 r = pickle.loads(pickle.dumps(p)) 1321 assert r.url == p.url 1322 assert r.headers == p.headers 1323 assert r.body == p.body 1324 assert r.hooks == p.hooks 1325 1326 # Verify unpickled PreparedRequest sends properly 1327 s = requests.Session() 1328 resp = s.send(r) 1329 assert resp.status_code == 200 1330 1331 def test_cannot_send_unprepared_requests(self, httpbin): 1332 r = requests.Request(url=httpbin()) 1333 with pytest.raises(ValueError): 1334 requests.Session().send(r) 1335 1336 def test_http_error(self): 1337 error = requests.exceptions.HTTPError() 1338 assert not error.response 1339 response = requests.Response() 1340 error = requests.exceptions.HTTPError(response=response) 1341 assert error.response == response 1342 error = requests.exceptions.HTTPError('message', response=response) 1343 assert str(error) == 'message' 1344 assert error.response == response 1345 1346 def test_session_pickling(self, httpbin): 1347 r = requests.Request('GET', httpbin('get')) 1348 s = requests.Session() 1349 1350 s = pickle.loads(pickle.dumps(s)) 1351 s.proxies = getproxies() 1352 1353 r = s.send(r.prepare()) 1354 assert r.status_code == 200 1355 1356 def test_fixes_1329(self, httpbin): 1357 """Ensure that header updates are done case-insensitively.""" 1358 s = requests.Session() 1359 s.headers.update({'ACCEPT': 'BOGUS'}) 1360 s.headers.update({'accept': 'application/json'}) 1361 r = s.get(httpbin('get')) 1362 headers = r.request.headers 1363 assert headers['accept'] == 'application/json' 1364 assert headers['Accept'] == 'application/json' 1365 assert headers['ACCEPT'] == 'application/json' 1366 1367 def test_uppercase_scheme_redirect(self, httpbin): 1368 parts = urlparse(httpbin('html')) 1369 url = "HTTP://" + parts.netloc + parts.path 1370 r = requests.get(httpbin('redirect-to'), params={'url': url}) 1371 assert r.status_code == 200 1372 assert r.url.lower() == url.lower() 1373 1374 def test_transport_adapter_ordering(self): 1375 s = requests.Session() 1376 order = ['https://', 'http://'] 1377 assert order == list(s.adapters) 1378 s.mount('http://git', HTTPAdapter()) 1379 s.mount('http://github', HTTPAdapter()) 1380 s.mount('http://github.com', HTTPAdapter()) 1381 s.mount('http://github.com/about/', HTTPAdapter()) 1382 order = [ 1383 'http://github.com/about/', 1384 'http://github.com', 1385 'http://github', 1386 'http://git', 1387 'https://', 1388 'http://', 1389 ] 1390 assert order == list(s.adapters) 1391 s.mount('http://gittip', HTTPAdapter()) 1392 s.mount('http://gittip.com', HTTPAdapter()) 1393 s.mount('http://gittip.com/about/', HTTPAdapter()) 1394 order = [ 1395 'http://github.com/about/', 1396 'http://gittip.com/about/', 1397 'http://github.com', 1398 'http://gittip.com', 1399 'http://github', 1400 'http://gittip', 1401 'http://git', 1402 'https://', 1403 'http://', 1404 ] 1405 assert order == list(s.adapters) 1406 s2 = requests.Session() 1407 s2.adapters = {'http://': HTTPAdapter()} 1408 s2.mount('https://', HTTPAdapter()) 1409 assert 'http://' in s2.adapters 1410 assert 'https://' in s2.adapters 1411 1412 def test_session_get_adapter_prefix_matching(self): 1413 prefix = 'https://example.com' 1414 more_specific_prefix = prefix + '/some/path' 1415 1416 url_matching_only_prefix = prefix + '/another/path' 1417 url_matching_more_specific_prefix = more_specific_prefix + '/longer/path' 1418 url_not_matching_prefix = 'https://another.example.com/' 1419 1420 s = requests.Session() 1421 prefix_adapter = HTTPAdapter() 1422 more_specific_prefix_adapter = HTTPAdapter() 1423 s.mount(prefix, prefix_adapter) 1424 s.mount(more_specific_prefix, more_specific_prefix_adapter) 1425 1426 assert s.get_adapter(url_matching_only_prefix) is prefix_adapter 1427 assert s.get_adapter(url_matching_more_specific_prefix) is more_specific_prefix_adapter 1428 assert s.get_adapter(url_not_matching_prefix) not in (prefix_adapter, more_specific_prefix_adapter) 1429 1430 def test_session_get_adapter_prefix_matching_mixed_case(self): 1431 mixed_case_prefix = 'hTtPs://eXamPle.CoM/MixEd_CAse_PREfix' 1432 url_matching_prefix = mixed_case_prefix + '/full_url' 1433 1434 s = requests.Session() 1435 my_adapter = HTTPAdapter() 1436 s.mount(mixed_case_prefix, my_adapter) 1437 1438 assert s.get_adapter(url_matching_prefix) is my_adapter 1439 1440 def test_session_get_adapter_prefix_matching_is_case_insensitive(self): 1441 mixed_case_prefix = 'hTtPs://eXamPle.CoM/MixEd_CAse_PREfix' 1442 url_matching_prefix_with_different_case = 'HtTpS://exaMPLe.cOm/MiXeD_caSE_preFIX/another_url' 1443 1444 s = requests.Session() 1445 my_adapter = HTTPAdapter() 1446 s.mount(mixed_case_prefix, my_adapter) 1447 1448 assert s.get_adapter(url_matching_prefix_with_different_case) is my_adapter 1449 1450 def test_header_remove_is_case_insensitive(self, httpbin): 1451 # From issue #1321 1452 s = requests.Session() 1453 s.headers['foo'] = 'bar' 1454 r = s.get(httpbin('get'), headers={'FOO': None}) 1455 assert 'foo' not in r.request.headers 1456 1457 def test_params_are_merged_case_sensitive(self, httpbin): 1458 s = requests.Session() 1459 s.params['foo'] = 'bar' 1460 r = s.get(httpbin('get'), params={'FOO': 'bar'}) 1461 assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'} 1462 1463 def test_long_authinfo_in_url(self): 1464 url = 'http://{}:{}@{}:9000/path?query#frag'.format( 1465 'E8A3BE87-9E3F-4620-8858-95478E385B5B', 1466 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E', 1467 'exactly-------------sixty-----------three------------characters', 1468 ) 1469 r = requests.Request('GET', url).prepare() 1470 assert r.url == url 1471 1472 def test_header_keys_are_native(self, httpbin): 1473 headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'} 1474 r = requests.Request('GET', httpbin('get'), headers=headers) 1475 p = r.prepare() 1476 1477 # This is testing that they are builtin strings. A bit weird, but there 1478 # we go. 1479 assert 'unicode' in p.headers.keys() 1480 assert 'byte' in p.headers.keys() 1481 1482 def test_header_validation(self, httpbin): 1483 """Ensure prepare_headers regex isn't flagging valid header contents.""" 1484 headers_ok = {'foo': 'bar baz qux', 1485 'bar': u'fbbq'.encode('utf8'), 1486 'baz': '', 1487 'qux': '1'} 1488 r = requests.get(httpbin('get'), headers=headers_ok) 1489 assert r.request.headers['foo'] == headers_ok['foo'] 1490 1491 def test_header_value_not_str(self, httpbin): 1492 """Ensure the header value is of type string or bytes as 1493 per discussion in GH issue #3386 1494 """ 1495 headers_int = {'foo': 3} 1496 headers_dict = {'bar': {'foo': 'bar'}} 1497 headers_list = {'baz': ['foo', 'bar']} 1498 1499 # Test for int 1500 with pytest.raises(InvalidHeader) as excinfo: 1501 r = requests.get(httpbin('get'), headers=headers_int) 1502 assert 'foo' in str(excinfo.value) 1503 # Test for dict 1504 with pytest.raises(InvalidHeader) as excinfo: 1505 r = requests.get(httpbin('get'), headers=headers_dict) 1506 assert 'bar' in str(excinfo.value) 1507 # Test for list 1508 with pytest.raises(InvalidHeader) as excinfo: 1509 r = requests.get(httpbin('get'), headers=headers_list) 1510 assert 'baz' in str(excinfo.value) 1511 1512 def test_header_no_return_chars(self, httpbin): 1513 """Ensure that a header containing return character sequences raise an 1514 exception. Otherwise, multiple headers are created from single string. 1515 """ 1516 headers_ret = {'foo': 'bar\r\nbaz: qux'} 1517 headers_lf = {'foo': 'bar\nbaz: qux'} 1518 headers_cr = {'foo': 'bar\rbaz: qux'} 1519 1520 # Test for newline 1521 with pytest.raises(InvalidHeader): 1522 r = requests.get(httpbin('get'), headers=headers_ret) 1523 # Test for line feed 1524 with pytest.raises(InvalidHeader): 1525 r = requests.get(httpbin('get'), headers=headers_lf) 1526 # Test for carriage return 1527 with pytest.raises(InvalidHeader): 1528 r = requests.get(httpbin('get'), headers=headers_cr) 1529 1530 def test_header_no_leading_space(self, httpbin): 1531 """Ensure headers containing leading whitespace raise 1532 InvalidHeader Error before sending. 1533 """ 1534 headers_space = {'foo': ' bar'} 1535 headers_tab = {'foo': ' bar'} 1536 1537 # Test for whitespace 1538 with pytest.raises(InvalidHeader): 1539 r = requests.get(httpbin('get'), headers=headers_space) 1540 # Test for tab 1541 with pytest.raises(InvalidHeader): 1542 r = requests.get(httpbin('get'), headers=headers_tab) 1543 1544 @pytest.mark.parametrize('files', ('foo', b'foo', bytearray(b'foo'))) 1545 def test_can_send_objects_with_files(self, httpbin, files): 1546 data = {'a': 'this is a string'} 1547 files = {'b': files} 1548 r = requests.Request('POST', httpbin('post'), data=data, files=files) 1549 p = r.prepare() 1550 assert 'multipart/form-data' in p.headers['Content-Type'] 1551 1552 def test_can_send_file_object_with_non_string_filename(self, httpbin): 1553 f = io.BytesIO() 1554 f.name = 2 1555 r = requests.Request('POST', httpbin('post'), files={'f': f}) 1556 p = r.prepare() 1557 1558 assert 'multipart/form-data' in p.headers['Content-Type'] 1559 1560 def test_autoset_header_values_are_native(self, httpbin): 1561 data = 'this is a string' 1562 length = '16' 1563 req = requests.Request('POST', httpbin('post'), data=data) 1564 p = req.prepare() 1565 1566 assert p.headers['Content-Length'] == length 1567 1568 def test_nonhttp_schemes_dont_check_URLs(self): 1569 test_urls = ( 1570 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==', 1571 'file:///etc/passwd', 1572 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431', 1573 ) 1574 for test_url in test_urls: 1575 req = requests.Request('GET', test_url) 1576 preq = req.prepare() 1577 assert test_url == preq.url 1578 1579 def test_auth_is_stripped_on_http_downgrade(self, httpbin, httpbin_secure, httpbin_ca_bundle): 1580 r = requests.get( 1581 httpbin_secure('redirect-to'), 1582 params={'url': httpbin('get')}, 1583 auth=('user', 'pass'), 1584 verify=httpbin_ca_bundle 1585 ) 1586 assert r.history[0].request.headers['Authorization'] 1587 assert 'Authorization' not in r.request.headers 1588 1589 def test_auth_is_retained_for_redirect_on_host(self, httpbin): 1590 r = requests.get(httpbin('redirect/1'), auth=('user', 'pass')) 1591 h1 = r.history[0].request.headers['Authorization'] 1592 h2 = r.request.headers['Authorization'] 1593 1594 assert h1 == h2 1595 1596 def test_should_strip_auth_host_change(self): 1597 s = requests.Session() 1598 assert s.should_strip_auth('http://example.com/foo', 'http://another.example.com/') 1599 1600 def test_should_strip_auth_http_downgrade(self): 1601 s = requests.Session() 1602 assert s.should_strip_auth('https://example.com/foo', 'http://example.com/bar') 1603 1604 def test_should_strip_auth_https_upgrade(self): 1605 s = requests.Session() 1606 assert not s.should_strip_auth('http://example.com/foo', 'https://example.com/bar') 1607 assert not s.should_strip_auth('http://example.com:80/foo', 'https://example.com/bar') 1608 assert not s.should_strip_auth('http://example.com/foo', 'https://example.com:443/bar') 1609 # Non-standard ports should trigger stripping 1610 assert s.should_strip_auth('http://example.com:8080/foo', 'https://example.com/bar') 1611 assert s.should_strip_auth('http://example.com/foo', 'https://example.com:8443/bar') 1612 1613 def test_should_strip_auth_port_change(self): 1614 s = requests.Session() 1615 assert s.should_strip_auth('http://example.com:1234/foo', 'https://example.com:4321/bar') 1616 1617 @pytest.mark.parametrize( 1618 'old_uri, new_uri', ( 1619 ('https://example.com:443/foo', 'https://example.com/bar'), 1620 ('http://example.com:80/foo', 'http://example.com/bar'), 1621 ('https://example.com/foo', 'https://example.com:443/bar'), 1622 ('http://example.com/foo', 'http://example.com:80/bar') 1623 )) 1624 def test_should_strip_auth_default_port(self, old_uri, new_uri): 1625 s = requests.Session() 1626 assert not s.should_strip_auth(old_uri, new_uri) 1627 1628 def test_manual_redirect_with_partial_body_read(self, httpbin): 1629 s = requests.Session() 1630 r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True) 1631 assert r1.is_redirect 1632 rg = s.resolve_redirects(r1, r1.request, stream=True) 1633 1634 # read only the first eight bytes of the response body, 1635 # then follow the redirect 1636 r1.iter_content(8) 1637 r2 = next(rg) 1638 assert r2.is_redirect 1639 1640 # read all of the response via iter_content, 1641 # then follow the redirect 1642 for _ in r2.iter_content(): 1643 pass 1644 r3 = next(rg) 1645 assert not r3.is_redirect 1646 1647 def test_prepare_body_position_non_stream(self): 1648 data = b'the data' 1649 prep = requests.Request('GET', 'http://example.com', data=data).prepare() 1650 assert prep._body_position is None 1651 1652 def test_rewind_body(self): 1653 data = io.BytesIO(b'the data') 1654 prep = requests.Request('GET', 'http://example.com', data=data).prepare() 1655 assert prep._body_position == 0 1656 assert prep.body.read() == b'the data' 1657 1658 # the data has all been read 1659 assert prep.body.read() == b'' 1660 1661 # rewind it back 1662 requests.utils.rewind_body(prep) 1663 assert prep.body.read() == b'the data' 1664 1665 def test_rewind_partially_read_body(self): 1666 data = io.BytesIO(b'the data') 1667 data.read(4) # read some data 1668 prep = requests.Request('GET', 'http://example.com', data=data).prepare() 1669 assert prep._body_position == 4 1670 assert prep.body.read() == b'data' 1671 1672 # the data has all been read 1673 assert prep.body.read() == b'' 1674 1675 # rewind it back 1676 requests.utils.rewind_body(prep) 1677 assert prep.body.read() == b'data' 1678 1679 def test_rewind_body_no_seek(self): 1680 class BadFileObj: 1681 def __init__(self, data): 1682 self.data = data 1683 1684 def tell(self): 1685 return 0 1686 1687 def __iter__(self): 1688 return 1689 1690 data = BadFileObj('the data') 1691 prep = requests.Request('GET', 'http://example.com', data=data).prepare() 1692 assert prep._body_position == 0 1693 1694 with pytest.raises(UnrewindableBodyError) as e: 1695 requests.utils.rewind_body(prep) 1696 1697 assert 'Unable to rewind request body' in str(e) 1698 1699 def test_rewind_body_failed_seek(self): 1700 class BadFileObj: 1701 def __init__(self, data): 1702 self.data = data 1703 1704 def tell(self): 1705 return 0 1706 1707 def seek(self, pos, whence=0): 1708 raise OSError() 1709 1710 def __iter__(self): 1711 return 1712 1713 data = BadFileObj('the data') 1714 prep = requests.Request('GET', 'http://example.com', data=data).prepare() 1715 assert prep._body_position == 0 1716 1717 with pytest.raises(UnrewindableBodyError) as e: 1718 requests.utils.rewind_body(prep) 1719 1720 assert 'error occurred when rewinding request body' in str(e) 1721 1722 def test_rewind_body_failed_tell(self): 1723 class BadFileObj: 1724 def __init__(self, data): 1725 self.data = data 1726 1727 def tell(self): 1728 raise OSError() 1729 1730 def __iter__(self): 1731 return 1732 1733 data = BadFileObj('the data') 1734 prep = requests.Request('GET', 'http://example.com', data=data).prepare() 1735 assert prep._body_position is not None 1736 1737 with pytest.raises(UnrewindableBodyError) as e: 1738 requests.utils.rewind_body(prep) 1739 1740 assert 'Unable to rewind request body' in str(e) 1741 1742 def _patch_adapter_gzipped_redirect(self, session, url): 1743 adapter = session.get_adapter(url=url) 1744 org_build_response = adapter.build_response 1745 self._patched_response = False 1746 1747 def build_response(*args, **kwargs): 1748 resp = org_build_response(*args, **kwargs) 1749 if not self._patched_response: 1750 resp.raw.headers['content-encoding'] = 'gzip' 1751 self._patched_response = True 1752 return resp 1753 1754 adapter.build_response = build_response 1755 1756 def test_redirect_with_wrong_gzipped_header(self, httpbin): 1757 s = requests.Session() 1758 url = httpbin('redirect/1') 1759 self._patch_adapter_gzipped_redirect(s, url) 1760 s.get(url) 1761 1762 @pytest.mark.parametrize( 1763 'username, password, auth_str', ( 1764 ('test', 'test', 'Basic dGVzdDp0ZXN0'), 1765 (u'имя'.encode('utf-8'), u'пароль'.encode('utf-8'), 'Basic 0LjQvNGPOtC/0LDRgNC+0LvRjA=='), 1766 )) 1767 def test_basic_auth_str_is_always_native(self, username, password, auth_str): 1768 s = _basic_auth_str(username, password) 1769 assert isinstance(s, builtin_str) 1770 assert s == auth_str 1771 1772 def test_requests_history_is_saved(self, httpbin): 1773 r = requests.get(httpbin('redirect/5')) 1774 total = r.history[-1].history 1775 i = 0 1776 for item in r.history: 1777 assert item.history == total[0:i] 1778 i += 1 1779 1780 def test_json_param_post_content_type_works(self, httpbin): 1781 r = requests.post( 1782 httpbin('post'), 1783 json={'life': 42} 1784 ) 1785 assert r.status_code == 200 1786 assert 'application/json' in r.request.headers['Content-Type'] 1787 assert {'life': 42} == r.json()['json'] 1788 1789 def test_json_param_post_should_not_override_data_param(self, httpbin): 1790 r = requests.Request(method='POST', url=httpbin('post'), 1791 data={'stuff': 'elixr'}, 1792 json={'music': 'flute'}) 1793 prep = r.prepare() 1794 assert 'stuff=elixr' == prep.body 1795 1796 def test_response_iter_lines(self, httpbin): 1797 r = requests.get(httpbin('stream/4'), stream=True) 1798 assert r.status_code == 200 1799 1800 it = r.iter_lines() 1801 next(it) 1802 assert len(list(it)) == 3 1803 1804 def test_response_context_manager(self, httpbin): 1805 with requests.get(httpbin('stream/4'), stream=True) as response: 1806 assert isinstance(response, requests.Response) 1807 1808 assert response.raw.closed 1809 1810 def test_unconsumed_session_response_closes_connection(self, httpbin): 1811 s = requests.session() 1812 1813 with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as response: 1814 pass 1815 1816 assert response._content_consumed is False 1817 assert response.raw.closed 1818 1819 @pytest.mark.xfail 1820 def test_response_iter_lines_reentrant(self, httpbin): 1821 """Response.iter_lines() is not reentrant safe""" 1822 r = requests.get(httpbin('stream/4'), stream=True) 1823 assert r.status_code == 200 1824 1825 next(r.iter_lines()) 1826 assert len(list(r.iter_lines())) == 3 1827 1828 def test_session_close_proxy_clear(self, mocker): 1829 proxies = { 1830 'one': mocker.Mock(), 1831 'two': mocker.Mock(), 1832 } 1833 session = requests.Session() 1834 mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies) 1835 session.close() 1836 proxies['one'].clear.assert_called_once_with() 1837 proxies['two'].clear.assert_called_once_with() 1838 1839 def test_proxy_auth(self): 1840 adapter = HTTPAdapter() 1841 headers = adapter.proxy_headers("http://user:pass@httpbin.org") 1842 assert headers == {'Proxy-Authorization': 'Basic dXNlcjpwYXNz'} 1843 1844 def test_proxy_auth_empty_pass(self): 1845 adapter = HTTPAdapter() 1846 headers = adapter.proxy_headers("http://user:@httpbin.org") 1847 assert headers == {'Proxy-Authorization': 'Basic dXNlcjo='} 1848 1849 def test_response_json_when_content_is_None(self, httpbin): 1850 r = requests.get(httpbin('/status/204')) 1851 # Make sure r.content is None 1852 r.status_code = 0 1853 r._content = False 1854 r._content_consumed = False 1855 1856 assert r.content is None 1857 with pytest.raises(ValueError): 1858 r.json() 1859 1860 def test_response_without_release_conn(self): 1861 """Test `close` call for non-urllib3-like raw objects. 1862 Should work when `release_conn` attr doesn't exist on `response.raw`. 1863 """ 1864 resp = requests.Response() 1865 resp.raw = StringIO.StringIO('test') 1866 assert not resp.raw.closed 1867 resp.close() 1868 assert resp.raw.closed 1869 1870 def test_empty_stream_with_auth_does_not_set_content_length_header(self, httpbin): 1871 """Ensure that a byte stream with size 0 will not set both a Content-Length 1872 and Transfer-Encoding header. 1873 """ 1874 auth = ('user', 'pass') 1875 url = httpbin('post') 1876 file_obj = io.BytesIO(b'') 1877 r = requests.Request('POST', url, auth=auth, data=file_obj) 1878 prepared_request = r.prepare() 1879 assert 'Transfer-Encoding' in prepared_request.headers 1880 assert 'Content-Length' not in prepared_request.headers 1881 1882 def test_stream_with_auth_does_not_set_transfer_encoding_header(self, httpbin): 1883 """Ensure that a byte stream with size > 0 will not set both a Content-Length 1884 and Transfer-Encoding header. 1885 """ 1886 auth = ('user', 'pass') 1887 url = httpbin('post') 1888 file_obj = io.BytesIO(b'test data') 1889 r = requests.Request('POST', url, auth=auth, data=file_obj) 1890 prepared_request = r.prepare() 1891 assert 'Transfer-Encoding' not in prepared_request.headers 1892 assert 'Content-Length' in prepared_request.headers 1893 1894 def test_chunked_upload_does_not_set_content_length_header(self, httpbin): 1895 """Ensure that requests with a generator body stream using 1896 Transfer-Encoding: chunked, not a Content-Length header. 1897 """ 1898 data = (i for i in [b'a', b'b', b'c']) 1899 url = httpbin('post') 1900 r = requests.Request('POST', url, data=data) 1901 prepared_request = r.prepare() 1902 assert 'Transfer-Encoding' in prepared_request.headers 1903 assert 'Content-Length' not in prepared_request.headers 1904 1905 def test_custom_redirect_mixin(self, httpbin): 1906 """Tests a custom mixin to overwrite ``get_redirect_target``. 1907 1908 Ensures a subclassed ``requests.Session`` can handle a certain type of 1909 malformed redirect responses. 1910 1911 1. original request receives a proper response: 302 redirect 1912 2. following the redirect, a malformed response is given: 1913 status code = HTTP 200 1914 location = alternate url 1915 3. the custom session catches the edge case and follows the redirect 1916 """ 1917 url_final = httpbin('html') 1918 querystring_malformed = urlencode({'location': url_final}) 1919 url_redirect_malformed = httpbin('response-headers?%s' % querystring_malformed) 1920 querystring_redirect = urlencode({'url': url_redirect_malformed}) 1921 url_redirect = httpbin('redirect-to?%s' % querystring_redirect) 1922 urls_test = [url_redirect, 1923 url_redirect_malformed, 1924 url_final, 1925 ] 1926 1927 class CustomRedirectSession(requests.Session): 1928 def get_redirect_target(self, resp): 1929 # default behavior 1930 if resp.is_redirect: 1931 return resp.headers['location'] 1932 # edge case - check to see if 'location' is in headers anyways 1933 location = resp.headers.get('location') 1934 if location and (location != resp.url): 1935 return location 1936 return None 1937 1938 session = CustomRedirectSession() 1939 r = session.get(urls_test[0]) 1940 assert len(r.history) == 2 1941 assert r.status_code == 200 1942 assert r.history[0].status_code == 302 1943 assert r.history[0].is_redirect 1944 assert r.history[1].status_code == 200 1945 assert not r.history[1].is_redirect 1946 assert r.url == urls_test[2] 1947 1948 1949class TestCaseInsensitiveDict: 1950 1951 @pytest.mark.parametrize( 1952 'cid', ( 1953 CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}), 1954 CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]), 1955 CaseInsensitiveDict(FOO='foo', BAr='bar'), 1956 )) 1957 def test_init(self, cid): 1958 assert len(cid) == 2 1959 assert 'foo' in cid 1960 assert 'bar' in cid 1961 1962 def test_docstring_example(self): 1963 cid = CaseInsensitiveDict() 1964 cid['Accept'] = 'application/json' 1965 assert cid['aCCEPT'] == 'application/json' 1966 assert list(cid) == ['Accept'] 1967 1968 def test_len(self): 1969 cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'}) 1970 cid['A'] = 'a' 1971 assert len(cid) == 2 1972 1973 def test_getitem(self): 1974 cid = CaseInsensitiveDict({'Spam': 'blueval'}) 1975 assert cid['spam'] == 'blueval' 1976 assert cid['SPAM'] == 'blueval' 1977 1978 def test_fixes_649(self): 1979 """__setitem__ should behave case-insensitively.""" 1980 cid = CaseInsensitiveDict() 1981 cid['spam'] = 'oneval' 1982 cid['Spam'] = 'twoval' 1983 cid['sPAM'] = 'redval' 1984 cid['SPAM'] = 'blueval' 1985 assert cid['spam'] == 'blueval' 1986 assert cid['SPAM'] == 'blueval' 1987 assert list(cid.keys()) == ['SPAM'] 1988 1989 def test_delitem(self): 1990 cid = CaseInsensitiveDict() 1991 cid['Spam'] = 'someval' 1992 del cid['sPam'] 1993 assert 'spam' not in cid 1994 assert len(cid) == 0 1995 1996 def test_contains(self): 1997 cid = CaseInsensitiveDict() 1998 cid['Spam'] = 'someval' 1999 assert 'Spam' in cid 2000 assert 'spam' in cid 2001 assert 'SPAM' in cid 2002 assert 'sPam' in cid 2003 assert 'notspam' not in cid 2004 2005 def test_get(self): 2006 cid = CaseInsensitiveDict() 2007 cid['spam'] = 'oneval' 2008 cid['SPAM'] = 'blueval' 2009 assert cid.get('spam') == 'blueval' 2010 assert cid.get('SPAM') == 'blueval' 2011 assert cid.get('sPam') == 'blueval' 2012 assert cid.get('notspam', 'default') == 'default' 2013 2014 def test_update(self): 2015 cid = CaseInsensitiveDict() 2016 cid['spam'] = 'blueval' 2017 cid.update({'sPam': 'notblueval'}) 2018 assert cid['spam'] == 'notblueval' 2019 cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}) 2020 cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'}) 2021 assert len(cid) == 2 2022 assert cid['foo'] == 'anotherfoo' 2023 assert cid['bar'] == 'anotherbar' 2024 2025 def test_update_retains_unchanged(self): 2026 cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'}) 2027 cid.update({'foo': 'newfoo'}) 2028 assert cid['bar'] == 'bar' 2029 2030 def test_iter(self): 2031 cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'}) 2032 keys = frozenset(['Spam', 'Eggs']) 2033 assert frozenset(iter(cid)) == keys 2034 2035 def test_equality(self): 2036 cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'}) 2037 othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'}) 2038 assert cid == othercid 2039 del othercid['spam'] 2040 assert cid != othercid 2041 assert cid == {'spam': 'blueval', 'eggs': 'redval'} 2042 assert cid != object() 2043 2044 def test_setdefault(self): 2045 cid = CaseInsensitiveDict({'Spam': 'blueval'}) 2046 assert cid.setdefault('spam', 'notblueval') == 'blueval' 2047 assert cid.setdefault('notspam', 'notblueval') == 'notblueval' 2048 2049 def test_lower_items(self): 2050 cid = CaseInsensitiveDict({ 2051 'Accept': 'application/json', 2052 'user-Agent': 'requests', 2053 }) 2054 keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) 2055 lowerkeyset = frozenset(['accept', 'user-agent']) 2056 assert keyset == lowerkeyset 2057 2058 def test_preserve_key_case(self): 2059 cid = CaseInsensitiveDict({ 2060 'Accept': 'application/json', 2061 'user-Agent': 'requests', 2062 }) 2063 keyset = frozenset(['Accept', 'user-Agent']) 2064 assert frozenset(i[0] for i in cid.items()) == keyset 2065 assert frozenset(cid.keys()) == keyset 2066 assert frozenset(cid) == keyset 2067 2068 def test_preserve_last_key_case(self): 2069 cid = CaseInsensitiveDict({ 2070 'Accept': 'application/json', 2071 'user-Agent': 'requests', 2072 }) 2073 cid.update({'ACCEPT': 'application/json'}) 2074 cid['USER-AGENT'] = 'requests' 2075 keyset = frozenset(['ACCEPT', 'USER-AGENT']) 2076 assert frozenset(i[0] for i in cid.items()) == keyset 2077 assert frozenset(cid.keys()) == keyset 2078 assert frozenset(cid) == keyset 2079 2080 def test_copy(self): 2081 cid = CaseInsensitiveDict({ 2082 'Accept': 'application/json', 2083 'user-Agent': 'requests', 2084 }) 2085 cid_copy = cid.copy() 2086 assert cid == cid_copy 2087 cid['changed'] = True 2088 assert cid != cid_copy 2089 2090 2091class TestMorselToCookieExpires: 2092 """Tests for morsel_to_cookie when morsel contains expires.""" 2093 2094 def test_expires_valid_str(self): 2095 """Test case where we convert expires from string time.""" 2096 2097 morsel = Morsel() 2098 morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT' 2099 cookie = morsel_to_cookie(morsel) 2100 assert cookie.expires == 1 2101 2102 @pytest.mark.parametrize( 2103 'value, exception', ( 2104 (100, TypeError), 2105 ('woops', ValueError), 2106 )) 2107 def test_expires_invalid_int(self, value, exception): 2108 """Test case where an invalid type is passed for expires.""" 2109 morsel = Morsel() 2110 morsel['expires'] = value 2111 with pytest.raises(exception): 2112 morsel_to_cookie(morsel) 2113 2114 def test_expires_none(self): 2115 """Test case where expires is None.""" 2116 2117 morsel = Morsel() 2118 morsel['expires'] = None 2119 cookie = morsel_to_cookie(morsel) 2120 assert cookie.expires is None 2121 2122 2123class TestMorselToCookieMaxAge: 2124 2125 """Tests for morsel_to_cookie when morsel contains max-age.""" 2126 2127 def test_max_age_valid_int(self): 2128 """Test case where a valid max age in seconds is passed.""" 2129 2130 morsel = Morsel() 2131 morsel['max-age'] = 60 2132 cookie = morsel_to_cookie(morsel) 2133 assert isinstance(cookie.expires, int) 2134 2135 def test_max_age_invalid_str(self): 2136 """Test case where a invalid max age is passed.""" 2137 2138 morsel = Morsel() 2139 morsel['max-age'] = 'woops' 2140 with pytest.raises(TypeError): 2141 morsel_to_cookie(morsel) 2142 2143 2144class TestTimeout: 2145 2146 def test_stream_timeout(self, httpbin): 2147 try: 2148 requests.get(httpbin('delay/10'), timeout=2.0) 2149 except requests.exceptions.Timeout as e: 2150 assert 'Read timed out' in e.args[0].args[0] 2151 2152 @pytest.mark.parametrize( 2153 'timeout, error_text', ( 2154 ((3, 4, 5), '(connect, read)'), 2155 ('foo', 'must be an int, float or None'), 2156 )) 2157 def test_invalid_timeout(self, httpbin, timeout, error_text): 2158 with pytest.raises(ValueError) as e: 2159 requests.get(httpbin('get'), timeout=timeout) 2160 assert error_text in str(e) 2161 2162 @pytest.mark.parametrize( 2163 'timeout', ( 2164 None, 2165 Urllib3Timeout(connect=None, read=None) 2166 )) 2167 def test_none_timeout(self, httpbin, timeout): 2168 """Check that you can set None as a valid timeout value. 2169 2170 To actually test this behavior, we'd want to check that setting the 2171 timeout to None actually lets the request block past the system default 2172 timeout. However, this would make the test suite unbearably slow. 2173 Instead we verify that setting the timeout to None does not prevent the 2174 request from succeeding. 2175 """ 2176 r = requests.get(httpbin('get'), timeout=timeout) 2177 assert r.status_code == 200 2178 2179 @pytest.mark.parametrize( 2180 'timeout', ( 2181 (None, 0.1), 2182 Urllib3Timeout(connect=None, read=0.1) 2183 )) 2184 def test_read_timeout(self, httpbin, timeout): 2185 try: 2186 requests.get(httpbin('delay/10'), timeout=timeout) 2187 pytest.fail('The recv() request should time out.') 2188 except ReadTimeout: 2189 pass 2190 2191 @pytest.mark.parametrize( 2192 'timeout', ( 2193 (0.1, None), 2194 Urllib3Timeout(connect=0.1, read=None) 2195 )) 2196 def test_connect_timeout(self, timeout): 2197 try: 2198 requests.get(TARPIT, timeout=timeout) 2199 pytest.fail('The connect() request should time out.') 2200 except ConnectTimeout as e: 2201 assert isinstance(e, ConnectionError) 2202 assert isinstance(e, Timeout) 2203 2204 @pytest.mark.parametrize( 2205 'timeout', ( 2206 (0.1, 0.1), 2207 Urllib3Timeout(connect=0.1, read=0.1) 2208 )) 2209 def test_total_timeout_connect(self, timeout): 2210 try: 2211 requests.get(TARPIT, timeout=timeout) 2212 pytest.fail('The connect() request should time out.') 2213 except ConnectTimeout: 2214 pass 2215 2216 def test_encoded_methods(self, httpbin): 2217 """See: https://github.com/psf/requests/issues/2316""" 2218 r = requests.request(b'GET', httpbin('get')) 2219 assert r.ok 2220 2221 2222SendCall = collections.namedtuple('SendCall', ('args', 'kwargs')) 2223 2224 2225class RedirectSession(SessionRedirectMixin): 2226 def __init__(self, order_of_redirects): 2227 self.redirects = order_of_redirects 2228 self.calls = [] 2229 self.max_redirects = 30 2230 self.cookies = {} 2231 self.trust_env = False 2232 2233 def send(self, *args, **kwargs): 2234 self.calls.append(SendCall(args, kwargs)) 2235 return self.build_response() 2236 2237 def build_response(self): 2238 request = self.calls[-1].args[0] 2239 r = requests.Response() 2240 2241 try: 2242 r.status_code = int(self.redirects.pop(0)) 2243 except IndexError: 2244 r.status_code = 200 2245 2246 r.headers = CaseInsensitiveDict({'Location': '/'}) 2247 r.raw = self._build_raw() 2248 r.request = request 2249 return r 2250 2251 def _build_raw(self): 2252 string = StringIO.StringIO('') 2253 setattr(string, 'release_conn', lambda *args: args) 2254 return string 2255 2256 2257def test_json_encodes_as_bytes(): 2258 # urllib3 expects bodies as bytes-like objects 2259 body = {"key": "value"} 2260 p = PreparedRequest() 2261 p.prepare( 2262 method='GET', 2263 url='https://www.example.com/', 2264 json=body 2265 ) 2266 assert isinstance(p.body, bytes) 2267 2268 2269def test_requests_are_updated_each_time(httpbin): 2270 session = RedirectSession([303, 307]) 2271 prep = requests.Request('POST', httpbin('post')).prepare() 2272 r0 = session.send(prep) 2273 assert r0.request.method == 'POST' 2274 assert session.calls[-1] == SendCall((r0.request,), {}) 2275 redirect_generator = session.resolve_redirects(r0, prep) 2276 default_keyword_args = { 2277 'stream': False, 2278 'verify': True, 2279 'cert': None, 2280 'timeout': None, 2281 'allow_redirects': False, 2282 'proxies': {}, 2283 } 2284 for response in redirect_generator: 2285 assert response.request.method == 'GET' 2286 send_call = SendCall((response.request,), default_keyword_args) 2287 assert session.calls[-1] == send_call 2288 2289 2290@pytest.mark.parametrize("var,url,proxy", [ 2291 ('http_proxy', 'http://example.com', 'socks5://proxy.com:9876'), 2292 ('https_proxy', 'https://example.com', 'socks5://proxy.com:9876'), 2293 ('all_proxy', 'http://example.com', 'socks5://proxy.com:9876'), 2294 ('all_proxy', 'https://example.com', 'socks5://proxy.com:9876'), 2295]) 2296def test_proxy_env_vars_override_default(var, url, proxy): 2297 session = requests.Session() 2298 prep = PreparedRequest() 2299 prep.prepare(method='GET', url=url) 2300 2301 kwargs = { 2302 var: proxy 2303 } 2304 scheme = urlparse(url).scheme 2305 with override_environ(**kwargs): 2306 proxies = session.rebuild_proxies(prep, {}) 2307 assert scheme in proxies 2308 assert proxies[scheme] == proxy 2309 2310 2311@pytest.mark.parametrize( 2312 'data', ( 2313 (('a', 'b'), ('c', 'd')), 2314 (('c', 'd'), ('a', 'b')), 2315 (('a', 'b'), ('c', 'd'), ('e', 'f')), 2316 )) 2317def test_data_argument_accepts_tuples(data): 2318 """Ensure that the data argument will accept tuples of strings 2319 and properly encode them. 2320 """ 2321 p = PreparedRequest() 2322 p.prepare( 2323 method='GET', 2324 url='http://www.example.com', 2325 data=data, 2326 hooks=default_hooks() 2327 ) 2328 assert p.body == urlencode(data) 2329 2330 2331@pytest.mark.parametrize( 2332 'kwargs', ( 2333 None, 2334 { 2335 'method': 'GET', 2336 'url': 'http://www.example.com', 2337 'data': 'foo=bar', 2338 'hooks': default_hooks() 2339 }, 2340 { 2341 'method': 'GET', 2342 'url': 'http://www.example.com', 2343 'data': 'foo=bar', 2344 'hooks': default_hooks(), 2345 'cookies': {'foo': 'bar'} 2346 }, 2347 { 2348 'method': 'GET', 2349 'url': u('http://www.example.com/üniçø∂é') 2350 }, 2351 )) 2352def test_prepared_copy(kwargs): 2353 p = PreparedRequest() 2354 if kwargs: 2355 p.prepare(**kwargs) 2356 copy = p.copy() 2357 for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'): 2358 assert getattr(p, attr) == getattr(copy, attr) 2359 2360 2361def test_urllib3_retries(httpbin): 2362 from urllib3.util import Retry 2363 s = requests.Session() 2364 s.mount('http://', HTTPAdapter(max_retries=Retry( 2365 total=2, status_forcelist=[500] 2366 ))) 2367 2368 with pytest.raises(RetryError): 2369 s.get(httpbin('status/500')) 2370 2371 2372def test_urllib3_pool_connection_closed(httpbin): 2373 s = requests.Session() 2374 s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0)) 2375 2376 try: 2377 s.get(httpbin('status/200')) 2378 except ConnectionError as e: 2379 assert u"Pool is closed." in str(e) 2380 2381 2382class TestPreparingURLs(object): 2383 @pytest.mark.parametrize( 2384 'url,expected', 2385 ( 2386 ('http://google.com', 'http://google.com/'), 2387 (u'http://ジェーピーニック.jp', u'http://xn--hckqz9bzb1cyrb.jp/'), 2388 (u'http://xn--n3h.net/', u'http://xn--n3h.net/'), 2389 ( 2390 u'http://ジェーピーニック.jp'.encode('utf-8'), 2391 u'http://xn--hckqz9bzb1cyrb.jp/' 2392 ), 2393 ( 2394 u'http://straße.de/straße', 2395 u'http://xn--strae-oqa.de/stra%C3%9Fe' 2396 ), 2397 ( 2398 u'http://straße.de/straße'.encode('utf-8'), 2399 u'http://xn--strae-oqa.de/stra%C3%9Fe' 2400 ), 2401 ( 2402 u'http://Königsgäßchen.de/straße', 2403 u'http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe' 2404 ), 2405 ( 2406 u'http://Königsgäßchen.de/straße'.encode('utf-8'), 2407 u'http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe' 2408 ), 2409 ( 2410 b'http://xn--n3h.net/', 2411 u'http://xn--n3h.net/' 2412 ), 2413 ( 2414 b'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/', 2415 u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/' 2416 ), 2417 ( 2418 u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/', 2419 u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/' 2420 ) 2421 ) 2422 ) 2423 def test_preparing_url(self, url, expected): 2424 2425 def normalize_percent_encode(x): 2426 # Helper function that normalizes equivalent 2427 # percent-encoded bytes before comparisons 2428 for c in re.findall(r'%[a-fA-F0-9]{2}', x): 2429 x = x.replace(c, c.upper()) 2430 return x 2431 2432 r = requests.Request('GET', url=url) 2433 p = r.prepare() 2434 assert normalize_percent_encode(p.url) == expected 2435 2436 @pytest.mark.parametrize( 2437 'url', 2438 ( 2439 b"http://*.google.com", 2440 b"http://*", 2441 u"http://*.google.com", 2442 u"http://*", 2443 u"http://☃.net/" 2444 ) 2445 ) 2446 def test_preparing_bad_url(self, url): 2447 r = requests.Request('GET', url=url) 2448 with pytest.raises(requests.exceptions.InvalidURL): 2449 r.prepare() 2450 2451 @pytest.mark.parametrize( 2452 'url, exception', 2453 ( 2454 ('http://localhost:-1', InvalidURL), 2455 ) 2456 ) 2457 def test_redirecting_to_bad_url(self, httpbin, url, exception): 2458 with pytest.raises(exception): 2459 r = requests.get(httpbin('redirect-to'), params={'url': url}) 2460 2461 @pytest.mark.parametrize( 2462 'input, expected', 2463 ( 2464 ( 2465 b"http+unix://%2Fvar%2Frun%2Fsocket/path%7E", 2466 u"http+unix://%2Fvar%2Frun%2Fsocket/path~", 2467 ), 2468 ( 2469 u"http+unix://%2Fvar%2Frun%2Fsocket/path%7E", 2470 u"http+unix://%2Fvar%2Frun%2Fsocket/path~", 2471 ), 2472 ( 2473 b"mailto:user@example.org", 2474 u"mailto:user@example.org", 2475 ), 2476 ( 2477 u"mailto:user@example.org", 2478 u"mailto:user@example.org", 2479 ), 2480 ( 2481 b"data:SSDimaUgUHl0aG9uIQ==", 2482 u"data:SSDimaUgUHl0aG9uIQ==", 2483 ) 2484 ) 2485 ) 2486 def test_url_mutation(self, input, expected): 2487 """ 2488 This test validates that we correctly exclude some URLs from 2489 preparation, and that we handle others. Specifically, it tests that 2490 any URL whose scheme doesn't begin with "http" is left alone, and 2491 those whose scheme *does* begin with "http" are mutated. 2492 """ 2493 r = requests.Request('GET', url=input) 2494 p = r.prepare() 2495 assert p.url == expected 2496 2497 @pytest.mark.parametrize( 2498 'input, params, expected', 2499 ( 2500 ( 2501 b"http+unix://%2Fvar%2Frun%2Fsocket/path", 2502 {"key": "value"}, 2503 u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value", 2504 ), 2505 ( 2506 u"http+unix://%2Fvar%2Frun%2Fsocket/path", 2507 {"key": "value"}, 2508 u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value", 2509 ), 2510 ( 2511 b"mailto:user@example.org", 2512 {"key": "value"}, 2513 u"mailto:user@example.org", 2514 ), 2515 ( 2516 u"mailto:user@example.org", 2517 {"key": "value"}, 2518 u"mailto:user@example.org", 2519 ), 2520 ) 2521 ) 2522 def test_parameters_for_nonstandard_schemes(self, input, params, expected): 2523 """ 2524 Setting parameters for nonstandard schemes is allowed if those schemes 2525 begin with "http", and is forbidden otherwise. 2526 """ 2527 r = requests.Request('GET', url=input, params=params) 2528 p = r.prepare() 2529 assert p.url == expected 2530