1import unittest 2from test import support 3from test import test_urllib 4 5import os 6import io 7import socket 8import array 9import sys 10import tempfile 11import subprocess 12 13import urllib.request 14# The proxy bypass method imported below has logic specific to the OSX 15# proxy config data structure but is testable on all platforms. 16from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler, 17 HTTPPasswordMgrWithPriorAuth, _parse_proxy, 18 _proxy_bypass_macosx_sysconf, 19 AbstractDigestAuthHandler) 20from urllib.parse import urlparse 21import urllib.error 22import http.client 23 24# XXX 25# Request 26# CacheFTPHandler (hard to write) 27# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler 28 29 30class TrivialTests(unittest.TestCase): 31 32 def test___all__(self): 33 # Verify which names are exposed 34 for module in 'request', 'response', 'parse', 'error', 'robotparser': 35 context = {} 36 exec('from urllib.%s import *' % module, context) 37 del context['__builtins__'] 38 if module == 'request' and os.name == 'nt': 39 u, p = context.pop('url2pathname'), context.pop('pathname2url') 40 self.assertEqual(u.__module__, 'nturl2path') 41 self.assertEqual(p.__module__, 'nturl2path') 42 for k, v in context.items(): 43 self.assertEqual(v.__module__, 'urllib.%s' % module, 44 "%r is exposed in 'urllib.%s' but defined in %r" % 45 (k, module, v.__module__)) 46 47 def test_trivial(self): 48 # A couple trivial tests 49 50 self.assertRaises(ValueError, urllib.request.urlopen, 'bogus url') 51 52 # XXX Name hacking to get this to work on Windows. 53 fname = os.path.abspath(urllib.request.__file__).replace(os.sep, '/') 54 55 if os.name == 'nt': 56 file_url = "file:///%s" % fname 57 else: 58 file_url = "file://%s" % fname 59 60 with urllib.request.urlopen(file_url) as f: 61 f.read() 62 63 def test_parse_http_list(self): 64 tests = [ 65 ('a,b,c', ['a', 'b', 'c']), 66 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), 67 ('a, b, "c", "d", "e,f", g, h', 68 ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']), 69 ('a="b\\"c", d="e\\,f", g="h\\\\i"', 70 ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] 71 for string, list in tests: 72 self.assertEqual(urllib.request.parse_http_list(string), list) 73 74 def test_URLError_reasonstr(self): 75 err = urllib.error.URLError('reason') 76 self.assertIn(err.reason, str(err)) 77 78 79class RequestHdrsTests(unittest.TestCase): 80 81 def test_request_headers_dict(self): 82 """ 83 The Request.headers dictionary is not a documented interface. It 84 should stay that way, because the complete set of headers are only 85 accessible through the .get_header(), .has_header(), .header_items() 86 interface. However, .headers pre-dates those methods, and so real code 87 will be using the dictionary. 88 89 The introduction in 2.4 of those methods was a mistake for the same 90 reason: code that previously saw all (urllib2 user)-provided headers in 91 .headers now sees only a subset. 92 93 """ 94 url = "http://example.com" 95 self.assertEqual(Request(url, 96 headers={"Spam-eggs": "blah"} 97 ).headers["Spam-eggs"], "blah") 98 self.assertEqual(Request(url, 99 headers={"spam-EggS": "blah"} 100 ).headers["Spam-eggs"], "blah") 101 102 def test_request_headers_methods(self): 103 """ 104 Note the case normalization of header names here, to 105 .capitalize()-case. This should be preserved for 106 backwards-compatibility. (In the HTTP case, normalization to 107 .title()-case is done by urllib2 before sending headers to 108 http.client). 109 110 Note that e.g. r.has_header("spam-EggS") is currently False, and 111 r.get_header("spam-EggS") returns None, but that could be changed in 112 future. 113 114 Method r.remove_header should remove items both from r.headers and 115 r.unredirected_hdrs dictionaries 116 """ 117 url = "http://example.com" 118 req = Request(url, headers={"Spam-eggs": "blah"}) 119 self.assertTrue(req.has_header("Spam-eggs")) 120 self.assertEqual(req.header_items(), [('Spam-eggs', 'blah')]) 121 122 req.add_header("Foo-Bar", "baz") 123 self.assertEqual(sorted(req.header_items()), 124 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]) 125 self.assertFalse(req.has_header("Not-there")) 126 self.assertIsNone(req.get_header("Not-there")) 127 self.assertEqual(req.get_header("Not-there", "default"), "default") 128 129 req.remove_header("Spam-eggs") 130 self.assertFalse(req.has_header("Spam-eggs")) 131 132 req.add_unredirected_header("Unredirected-spam", "Eggs") 133 self.assertTrue(req.has_header("Unredirected-spam")) 134 135 req.remove_header("Unredirected-spam") 136 self.assertFalse(req.has_header("Unredirected-spam")) 137 138 def test_password_manager(self): 139 mgr = urllib.request.HTTPPasswordMgr() 140 add = mgr.add_password 141 find_user_pass = mgr.find_user_password 142 143 add("Some Realm", "http://example.com/", "joe", "password") 144 add("Some Realm", "http://example.com/ni", "ni", "ni") 145 add("Some Realm", "http://c.example.com:3128", "3", "c") 146 add("Some Realm", "d.example.com", "4", "d") 147 add("Some Realm", "e.example.com:3128", "5", "e") 148 149 # For the same realm, password set the highest path is the winner. 150 self.assertEqual(find_user_pass("Some Realm", "example.com"), 151 ('joe', 'password')) 152 self.assertEqual(find_user_pass("Some Realm", "http://example.com/ni"), 153 ('joe', 'password')) 154 self.assertEqual(find_user_pass("Some Realm", "http://example.com"), 155 ('joe', 'password')) 156 self.assertEqual(find_user_pass("Some Realm", "http://example.com/"), 157 ('joe', 'password')) 158 self.assertEqual(find_user_pass("Some Realm", 159 "http://example.com/spam"), 160 ('joe', 'password')) 161 162 self.assertEqual(find_user_pass("Some Realm", 163 "http://example.com/spam/spam"), 164 ('joe', 'password')) 165 166 # You can have different passwords for different paths. 167 168 add("c", "http://example.com/foo", "foo", "ni") 169 add("c", "http://example.com/bar", "bar", "nini") 170 171 self.assertEqual(find_user_pass("c", "http://example.com/foo"), 172 ('foo', 'ni')) 173 174 self.assertEqual(find_user_pass("c", "http://example.com/bar"), 175 ('bar', 'nini')) 176 177 # For the same path, newer password should be considered. 178 179 add("b", "http://example.com/", "first", "blah") 180 add("b", "http://example.com/", "second", "spam") 181 182 self.assertEqual(find_user_pass("b", "http://example.com/"), 183 ('second', 'spam')) 184 185 # No special relationship between a.example.com and example.com: 186 187 add("a", "http://example.com", "1", "a") 188 self.assertEqual(find_user_pass("a", "http://example.com/"), 189 ('1', 'a')) 190 191 self.assertEqual(find_user_pass("a", "http://a.example.com/"), 192 (None, None)) 193 194 # Ports: 195 196 self.assertEqual(find_user_pass("Some Realm", "c.example.com"), 197 (None, None)) 198 self.assertEqual(find_user_pass("Some Realm", "c.example.com:3128"), 199 ('3', 'c')) 200 self.assertEqual( 201 find_user_pass("Some Realm", "http://c.example.com:3128"), 202 ('3', 'c')) 203 self.assertEqual(find_user_pass("Some Realm", "d.example.com"), 204 ('4', 'd')) 205 self.assertEqual(find_user_pass("Some Realm", "e.example.com:3128"), 206 ('5', 'e')) 207 208 def test_password_manager_default_port(self): 209 """ 210 The point to note here is that we can't guess the default port if 211 there's no scheme. This applies to both add_password and 212 find_user_password. 213 """ 214 mgr = urllib.request.HTTPPasswordMgr() 215 add = mgr.add_password 216 find_user_pass = mgr.find_user_password 217 add("f", "http://g.example.com:80", "10", "j") 218 add("g", "http://h.example.com", "11", "k") 219 add("h", "i.example.com:80", "12", "l") 220 add("i", "j.example.com", "13", "m") 221 self.assertEqual(find_user_pass("f", "g.example.com:100"), 222 (None, None)) 223 self.assertEqual(find_user_pass("f", "g.example.com:80"), 224 ('10', 'j')) 225 self.assertEqual(find_user_pass("f", "g.example.com"), 226 (None, None)) 227 self.assertEqual(find_user_pass("f", "http://g.example.com:100"), 228 (None, None)) 229 self.assertEqual(find_user_pass("f", "http://g.example.com:80"), 230 ('10', 'j')) 231 self.assertEqual(find_user_pass("f", "http://g.example.com"), 232 ('10', 'j')) 233 self.assertEqual(find_user_pass("g", "h.example.com"), ('11', 'k')) 234 self.assertEqual(find_user_pass("g", "h.example.com:80"), ('11', 'k')) 235 self.assertEqual(find_user_pass("g", "http://h.example.com:80"), 236 ('11', 'k')) 237 self.assertEqual(find_user_pass("h", "i.example.com"), (None, None)) 238 self.assertEqual(find_user_pass("h", "i.example.com:80"), ('12', 'l')) 239 self.assertEqual(find_user_pass("h", "http://i.example.com:80"), 240 ('12', 'l')) 241 self.assertEqual(find_user_pass("i", "j.example.com"), ('13', 'm')) 242 self.assertEqual(find_user_pass("i", "j.example.com:80"), 243 (None, None)) 244 self.assertEqual(find_user_pass("i", "http://j.example.com"), 245 ('13', 'm')) 246 self.assertEqual(find_user_pass("i", "http://j.example.com:80"), 247 (None, None)) 248 249 250class MockOpener: 251 addheaders = [] 252 253 def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 254 self.req, self.data, self.timeout = req, data, timeout 255 256 def error(self, proto, *args): 257 self.proto, self.args = proto, args 258 259 260class MockFile: 261 def read(self, count=None): 262 pass 263 264 def readline(self, count=None): 265 pass 266 267 def close(self): 268 pass 269 270 271class MockHeaders(dict): 272 def getheaders(self, name): 273 return list(self.values()) 274 275 276class MockResponse(io.StringIO): 277 def __init__(self, code, msg, headers, data, url=None): 278 io.StringIO.__init__(self, data) 279 self.code, self.msg, self.headers, self.url = code, msg, headers, url 280 281 def info(self): 282 return self.headers 283 284 def geturl(self): 285 return self.url 286 287 288class MockCookieJar: 289 def add_cookie_header(self, request): 290 self.ach_req = request 291 292 def extract_cookies(self, response, request): 293 self.ec_req, self.ec_r = request, response 294 295 296class FakeMethod: 297 def __init__(self, meth_name, action, handle): 298 self.meth_name = meth_name 299 self.handle = handle 300 self.action = action 301 302 def __call__(self, *args): 303 return self.handle(self.meth_name, self.action, *args) 304 305 306class MockHTTPResponse(io.IOBase): 307 def __init__(self, fp, msg, status, reason): 308 self.fp = fp 309 self.msg = msg 310 self.status = status 311 self.reason = reason 312 self.code = 200 313 314 def read(self): 315 return '' 316 317 def info(self): 318 return {} 319 320 def geturl(self): 321 return self.url 322 323 324class MockHTTPClass: 325 def __init__(self): 326 self.level = 0 327 self.req_headers = [] 328 self.data = None 329 self.raise_on_endheaders = False 330 self.sock = None 331 self._tunnel_headers = {} 332 333 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 334 self.host = host 335 self.timeout = timeout 336 return self 337 338 def set_debuglevel(self, level): 339 self.level = level 340 341 def set_tunnel(self, host, port=None, headers=None): 342 self._tunnel_host = host 343 self._tunnel_port = port 344 if headers: 345 self._tunnel_headers = headers 346 else: 347 self._tunnel_headers.clear() 348 349 def request(self, method, url, body=None, headers=None, *, 350 encode_chunked=False): 351 self.method = method 352 self.selector = url 353 if headers is not None: 354 self.req_headers += headers.items() 355 self.req_headers.sort() 356 if body: 357 self.data = body 358 self.encode_chunked = encode_chunked 359 if self.raise_on_endheaders: 360 raise OSError() 361 362 def getresponse(self): 363 return MockHTTPResponse(MockFile(), {}, 200, "OK") 364 365 def close(self): 366 pass 367 368 369class MockHandler: 370 # useful for testing handler machinery 371 # see add_ordered_mock_handlers() docstring 372 handler_order = 500 373 374 def __init__(self, methods): 375 self._define_methods(methods) 376 377 def _define_methods(self, methods): 378 for spec in methods: 379 if len(spec) == 2: 380 name, action = spec 381 else: 382 name, action = spec, None 383 meth = FakeMethod(name, action, self.handle) 384 setattr(self.__class__, name, meth) 385 386 def handle(self, fn_name, action, *args, **kwds): 387 self.parent.calls.append((self, fn_name, args, kwds)) 388 if action is None: 389 return None 390 elif action == "return self": 391 return self 392 elif action == "return response": 393 res = MockResponse(200, "OK", {}, "") 394 return res 395 elif action == "return request": 396 return Request("http://blah/") 397 elif action.startswith("error"): 398 code = action[action.rfind(" ")+1:] 399 try: 400 code = int(code) 401 except ValueError: 402 pass 403 res = MockResponse(200, "OK", {}, "") 404 return self.parent.error("http", args[0], res, code, "", {}) 405 elif action == "raise": 406 raise urllib.error.URLError("blah") 407 assert False 408 409 def close(self): 410 pass 411 412 def add_parent(self, parent): 413 self.parent = parent 414 self.parent.calls = [] 415 416 def __lt__(self, other): 417 if not hasattr(other, "handler_order"): 418 # No handler_order, leave in original order. Yuck. 419 return True 420 return self.handler_order < other.handler_order 421 422 423def add_ordered_mock_handlers(opener, meth_spec): 424 """Create MockHandlers and add them to an OpenerDirector. 425 426 meth_spec: list of lists of tuples and strings defining methods to define 427 on handlers. eg: 428 429 [["http_error", "ftp_open"], ["http_open"]] 430 431 defines methods .http_error() and .ftp_open() on one handler, and 432 .http_open() on another. These methods just record their arguments and 433 return None. Using a tuple instead of a string causes the method to 434 perform some action (see MockHandler.handle()), eg: 435 436 [["http_error"], [("http_open", "return request")]] 437 438 defines .http_error() on one handler (which simply returns None), and 439 .http_open() on another handler, which returns a Request object. 440 441 """ 442 handlers = [] 443 count = 0 444 for meths in meth_spec: 445 class MockHandlerSubclass(MockHandler): 446 pass 447 448 h = MockHandlerSubclass(meths) 449 h.handler_order += count 450 h.add_parent(opener) 451 count = count + 1 452 handlers.append(h) 453 opener.add_handler(h) 454 return handlers 455 456 457def build_test_opener(*handler_instances): 458 opener = OpenerDirector() 459 for h in handler_instances: 460 opener.add_handler(h) 461 return opener 462 463 464class MockHTTPHandler(urllib.request.BaseHandler): 465 # useful for testing redirections and auth 466 # sends supplied headers and code as first response 467 # sends 200 OK as second response 468 def __init__(self, code, headers): 469 self.code = code 470 self.headers = headers 471 self.reset() 472 473 def reset(self): 474 self._count = 0 475 self.requests = [] 476 477 def http_open(self, req): 478 import email, copy 479 self.requests.append(copy.deepcopy(req)) 480 if self._count == 0: 481 self._count = self._count + 1 482 name = http.client.responses[self.code] 483 msg = email.message_from_string(self.headers) 484 return self.parent.error( 485 "http", req, MockFile(), self.code, name, msg) 486 else: 487 self.req = req 488 msg = email.message_from_string("\r\n\r\n") 489 return MockResponse(200, "OK", msg, "", req.get_full_url()) 490 491 492class MockHTTPSHandler(urllib.request.AbstractHTTPHandler): 493 # Useful for testing the Proxy-Authorization request by verifying the 494 # properties of httpcon 495 496 def __init__(self, debuglevel=0): 497 urllib.request.AbstractHTTPHandler.__init__(self, debuglevel=debuglevel) 498 self.httpconn = MockHTTPClass() 499 500 def https_open(self, req): 501 return self.do_open(self.httpconn, req) 502 503 504class MockHTTPHandlerCheckAuth(urllib.request.BaseHandler): 505 # useful for testing auth 506 # sends supplied code response 507 # checks if auth header is specified in request 508 def __init__(self, code): 509 self.code = code 510 self.has_auth_header = False 511 512 def reset(self): 513 self.has_auth_header = False 514 515 def http_open(self, req): 516 if req.has_header('Authorization'): 517 self.has_auth_header = True 518 name = http.client.responses[self.code] 519 return MockResponse(self.code, name, MockFile(), "", req.get_full_url()) 520 521 522 523class MockPasswordManager: 524 def add_password(self, realm, uri, user, password): 525 self.realm = realm 526 self.url = uri 527 self.user = user 528 self.password = password 529 530 def find_user_password(self, realm, authuri): 531 self.target_realm = realm 532 self.target_url = authuri 533 return self.user, self.password 534 535 536class OpenerDirectorTests(unittest.TestCase): 537 538 def test_add_non_handler(self): 539 class NonHandler(object): 540 pass 541 self.assertRaises(TypeError, 542 OpenerDirector().add_handler, NonHandler()) 543 544 def test_badly_named_methods(self): 545 # test work-around for three methods that accidentally follow the 546 # naming conventions for handler methods 547 # (*_open() / *_request() / *_response()) 548 549 # These used to call the accidentally-named methods, causing a 550 # TypeError in real code; here, returning self from these mock 551 # methods would either cause no exception, or AttributeError. 552 553 from urllib.error import URLError 554 555 o = OpenerDirector() 556 meth_spec = [ 557 [("do_open", "return self"), ("proxy_open", "return self")], 558 [("redirect_request", "return self")], 559 ] 560 add_ordered_mock_handlers(o, meth_spec) 561 o.add_handler(urllib.request.UnknownHandler()) 562 for scheme in "do", "proxy", "redirect": 563 self.assertRaises(URLError, o.open, scheme+"://example.com/") 564 565 def test_handled(self): 566 # handler returning non-None means no more handlers will be called 567 o = OpenerDirector() 568 meth_spec = [ 569 ["http_open", "ftp_open", "http_error_302"], 570 ["ftp_open"], 571 [("http_open", "return self")], 572 [("http_open", "return self")], 573 ] 574 handlers = add_ordered_mock_handlers(o, meth_spec) 575 576 req = Request("http://example.com/") 577 r = o.open(req) 578 # Second .http_open() gets called, third doesn't, since second returned 579 # non-None. Handlers without .http_open() never get any methods called 580 # on them. 581 # In fact, second mock handler defining .http_open() returns self 582 # (instead of response), which becomes the OpenerDirector's return 583 # value. 584 self.assertEqual(r, handlers[2]) 585 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] 586 for expected, got in zip(calls, o.calls): 587 handler, name, args, kwds = got 588 self.assertEqual((handler, name), expected) 589 self.assertEqual(args, (req,)) 590 591 def test_handler_order(self): 592 o = OpenerDirector() 593 handlers = [] 594 for meths, handler_order in [([("http_open", "return self")], 500), 595 (["http_open"], 0)]: 596 class MockHandlerSubclass(MockHandler): 597 pass 598 599 h = MockHandlerSubclass(meths) 600 h.handler_order = handler_order 601 handlers.append(h) 602 o.add_handler(h) 603 604 o.open("http://example.com/") 605 # handlers called in reverse order, thanks to their sort order 606 self.assertEqual(o.calls[0][0], handlers[1]) 607 self.assertEqual(o.calls[1][0], handlers[0]) 608 609 def test_raise(self): 610 # raising URLError stops processing of request 611 o = OpenerDirector() 612 meth_spec = [ 613 [("http_open", "raise")], 614 [("http_open", "return self")], 615 ] 616 handlers = add_ordered_mock_handlers(o, meth_spec) 617 618 req = Request("http://example.com/") 619 self.assertRaises(urllib.error.URLError, o.open, req) 620 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) 621 622 def test_http_error(self): 623 # XXX http_error_default 624 # http errors are a special case 625 o = OpenerDirector() 626 meth_spec = [ 627 [("http_open", "error 302")], 628 [("http_error_400", "raise"), "http_open"], 629 [("http_error_302", "return response"), "http_error_303", 630 "http_error"], 631 [("http_error_302")], 632 ] 633 handlers = add_ordered_mock_handlers(o, meth_spec) 634 635 class Unknown: 636 def __eq__(self, other): 637 return True 638 639 req = Request("http://example.com/") 640 o.open(req) 641 assert len(o.calls) == 2 642 calls = [(handlers[0], "http_open", (req,)), 643 (handlers[2], "http_error_302", 644 (req, Unknown(), 302, "", {}))] 645 for expected, got in zip(calls, o.calls): 646 handler, method_name, args = expected 647 self.assertEqual((handler, method_name), got[:2]) 648 self.assertEqual(args, got[2]) 649 650 def test_processors(self): 651 # *_request / *_response methods get called appropriately 652 o = OpenerDirector() 653 meth_spec = [ 654 [("http_request", "return request"), 655 ("http_response", "return response")], 656 [("http_request", "return request"), 657 ("http_response", "return response")], 658 ] 659 handlers = add_ordered_mock_handlers(o, meth_spec) 660 661 req = Request("http://example.com/") 662 o.open(req) 663 # processor methods are called on *all* handlers that define them, 664 # not just the first handler that handles the request 665 calls = [ 666 (handlers[0], "http_request"), (handlers[1], "http_request"), 667 (handlers[0], "http_response"), (handlers[1], "http_response")] 668 669 for i, (handler, name, args, kwds) in enumerate(o.calls): 670 if i < 2: 671 # *_request 672 self.assertEqual((handler, name), calls[i]) 673 self.assertEqual(len(args), 1) 674 self.assertIsInstance(args[0], Request) 675 else: 676 # *_response 677 self.assertEqual((handler, name), calls[i]) 678 self.assertEqual(len(args), 2) 679 self.assertIsInstance(args[0], Request) 680 # response from opener.open is None, because there's no 681 # handler that defines http_open to handle it 682 if args[1] is not None: 683 self.assertIsInstance(args[1], MockResponse) 684 685 686def sanepathname2url(path): 687 try: 688 path.encode("utf-8") 689 except UnicodeEncodeError: 690 raise unittest.SkipTest("path is not encodable to utf8") 691 urlpath = urllib.request.pathname2url(path) 692 if os.name == "nt" and urlpath.startswith("///"): 693 urlpath = urlpath[2:] 694 # XXX don't ask me about the mac... 695 return urlpath 696 697 698class HandlerTests(unittest.TestCase): 699 700 def test_ftp(self): 701 class MockFTPWrapper: 702 def __init__(self, data): 703 self.data = data 704 705 def retrfile(self, filename, filetype): 706 self.filename, self.filetype = filename, filetype 707 return io.StringIO(self.data), len(self.data) 708 709 def close(self): 710 pass 711 712 class NullFTPHandler(urllib.request.FTPHandler): 713 def __init__(self, data): 714 self.data = data 715 716 def connect_ftp(self, user, passwd, host, port, dirs, 717 timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 718 self.user, self.passwd = user, passwd 719 self.host, self.port = host, port 720 self.dirs = dirs 721 self.ftpwrapper = MockFTPWrapper(self.data) 722 return self.ftpwrapper 723 724 import ftplib 725 data = "rheum rhaponicum" 726 h = NullFTPHandler(data) 727 h.parent = MockOpener() 728 729 for url, host, port, user, passwd, type_, dirs, filename, mimetype in [ 730 ("ftp://localhost/foo/bar/baz.html", 731 "localhost", ftplib.FTP_PORT, "", "", "I", 732 ["foo", "bar"], "baz.html", "text/html"), 733 ("ftp://parrot@localhost/foo/bar/baz.html", 734 "localhost", ftplib.FTP_PORT, "parrot", "", "I", 735 ["foo", "bar"], "baz.html", "text/html"), 736 ("ftp://%25parrot@localhost/foo/bar/baz.html", 737 "localhost", ftplib.FTP_PORT, "%parrot", "", "I", 738 ["foo", "bar"], "baz.html", "text/html"), 739 ("ftp://%2542parrot@localhost/foo/bar/baz.html", 740 "localhost", ftplib.FTP_PORT, "%42parrot", "", "I", 741 ["foo", "bar"], "baz.html", "text/html"), 742 ("ftp://localhost:80/foo/bar/", 743 "localhost", 80, "", "", "D", 744 ["foo", "bar"], "", None), 745 ("ftp://localhost/baz.gif;type=a", 746 "localhost", ftplib.FTP_PORT, "", "", "A", 747 [], "baz.gif", None), # XXX really this should guess image/gif 748 ]: 749 req = Request(url) 750 req.timeout = None 751 r = h.ftp_open(req) 752 # ftp authentication not yet implemented by FTPHandler 753 self.assertEqual(h.user, user) 754 self.assertEqual(h.passwd, passwd) 755 self.assertEqual(h.host, socket.gethostbyname(host)) 756 self.assertEqual(h.port, port) 757 self.assertEqual(h.dirs, dirs) 758 self.assertEqual(h.ftpwrapper.filename, filename) 759 self.assertEqual(h.ftpwrapper.filetype, type_) 760 headers = r.info() 761 self.assertEqual(headers.get("Content-type"), mimetype) 762 self.assertEqual(int(headers["Content-length"]), len(data)) 763 764 def test_file(self): 765 import email.utils 766 h = urllib.request.FileHandler() 767 o = h.parent = MockOpener() 768 769 TESTFN = support.TESTFN 770 urlpath = sanepathname2url(os.path.abspath(TESTFN)) 771 towrite = b"hello, world\n" 772 urls = [ 773 "file://localhost%s" % urlpath, 774 "file://%s" % urlpath, 775 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), 776 ] 777 try: 778 localaddr = socket.gethostbyname(socket.gethostname()) 779 except socket.gaierror: 780 localaddr = '' 781 if localaddr: 782 urls.append("file://%s%s" % (localaddr, urlpath)) 783 784 for url in urls: 785 f = open(TESTFN, "wb") 786 try: 787 try: 788 f.write(towrite) 789 finally: 790 f.close() 791 792 r = h.file_open(Request(url)) 793 try: 794 data = r.read() 795 headers = r.info() 796 respurl = r.geturl() 797 finally: 798 r.close() 799 stats = os.stat(TESTFN) 800 modified = email.utils.formatdate(stats.st_mtime, usegmt=True) 801 finally: 802 os.remove(TESTFN) 803 self.assertEqual(data, towrite) 804 self.assertEqual(headers["Content-type"], "text/plain") 805 self.assertEqual(headers["Content-length"], "13") 806 self.assertEqual(headers["Last-modified"], modified) 807 self.assertEqual(respurl, url) 808 809 for url in [ 810 "file://localhost:80%s" % urlpath, 811 "file:///file_does_not_exist.txt", 812 "file://not-a-local-host.com//dir/file.txt", 813 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), 814 os.getcwd(), TESTFN), 815 "file://somerandomhost.ontheinternet.com%s/%s" % 816 (os.getcwd(), TESTFN), 817 ]: 818 try: 819 f = open(TESTFN, "wb") 820 try: 821 f.write(towrite) 822 finally: 823 f.close() 824 825 self.assertRaises(urllib.error.URLError, 826 h.file_open, Request(url)) 827 finally: 828 os.remove(TESTFN) 829 830 h = urllib.request.FileHandler() 831 o = h.parent = MockOpener() 832 # XXXX why does // mean ftp (and /// mean not ftp!), and where 833 # is file: scheme specified? I think this is really a bug, and 834 # what was intended was to distinguish between URLs like: 835 # file:/blah.txt (a file) 836 # file://localhost/blah.txt (a file) 837 # file:///blah.txt (a file) 838 # file://ftp.example.com/blah.txt (an ftp URL) 839 for url, ftp in [ 840 ("file://ftp.example.com//foo.txt", False), 841 ("file://ftp.example.com///foo.txt", False), 842 ("file://ftp.example.com/foo.txt", False), 843 ("file://somehost//foo/something.txt", False), 844 ("file://localhost//foo/something.txt", False), 845 ]: 846 req = Request(url) 847 try: 848 h.file_open(req) 849 except urllib.error.URLError: 850 self.assertFalse(ftp) 851 else: 852 self.assertIs(o.req, req) 853 self.assertEqual(req.type, "ftp") 854 self.assertEqual(req.type == "ftp", ftp) 855 856 def test_http(self): 857 858 h = urllib.request.AbstractHTTPHandler() 859 o = h.parent = MockOpener() 860 861 url = "http://example.com/" 862 for method, data in [("GET", None), ("POST", b"blah")]: 863 req = Request(url, data, {"Foo": "bar"}) 864 req.timeout = None 865 req.add_unredirected_header("Spam", "eggs") 866 http = MockHTTPClass() 867 r = h.do_open(http, req) 868 869 # result attributes 870 r.read; r.readline # wrapped MockFile methods 871 r.info; r.geturl # addinfourl methods 872 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() 873 hdrs = r.info() 874 hdrs.get; hdrs.__contains__ # r.info() gives dict from .getreply() 875 self.assertEqual(r.geturl(), url) 876 877 self.assertEqual(http.host, "example.com") 878 self.assertEqual(http.level, 0) 879 self.assertEqual(http.method, method) 880 self.assertEqual(http.selector, "/") 881 self.assertEqual(http.req_headers, 882 [("Connection", "close"), 883 ("Foo", "bar"), ("Spam", "eggs")]) 884 self.assertEqual(http.data, data) 885 886 # check OSError converted to URLError 887 http.raise_on_endheaders = True 888 self.assertRaises(urllib.error.URLError, h.do_open, http, req) 889 890 # Check for TypeError on POST data which is str. 891 req = Request("http://example.com/","badpost") 892 self.assertRaises(TypeError, h.do_request_, req) 893 894 # check adding of standard headers 895 o.addheaders = [("Spam", "eggs")] 896 for data in b"", None: # POST, GET 897 req = Request("http://example.com/", data) 898 r = MockResponse(200, "OK", {}, "") 899 newreq = h.do_request_(req) 900 if data is None: # GET 901 self.assertNotIn("Content-length", req.unredirected_hdrs) 902 self.assertNotIn("Content-type", req.unredirected_hdrs) 903 else: # POST 904 self.assertEqual(req.unredirected_hdrs["Content-length"], "0") 905 self.assertEqual(req.unredirected_hdrs["Content-type"], 906 "application/x-www-form-urlencoded") 907 # XXX the details of Host could be better tested 908 self.assertEqual(req.unredirected_hdrs["Host"], "example.com") 909 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") 910 911 # don't clobber existing headers 912 req.add_unredirected_header("Content-length", "foo") 913 req.add_unredirected_header("Content-type", "bar") 914 req.add_unredirected_header("Host", "baz") 915 req.add_unredirected_header("Spam", "foo") 916 newreq = h.do_request_(req) 917 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo") 918 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar") 919 self.assertEqual(req.unredirected_hdrs["Host"], "baz") 920 self.assertEqual(req.unredirected_hdrs["Spam"], "foo") 921 922 def test_http_body_file(self): 923 # A regular file - chunked encoding is used unless Content Length is 924 # already set. 925 926 h = urllib.request.AbstractHTTPHandler() 927 o = h.parent = MockOpener() 928 929 file_obj = tempfile.NamedTemporaryFile(mode='w+b', delete=False) 930 file_path = file_obj.name 931 file_obj.close() 932 self.addCleanup(os.unlink, file_path) 933 934 with open(file_path, "rb") as f: 935 req = Request("http://example.com/", f, {}) 936 newreq = h.do_request_(req) 937 te = newreq.get_header('Transfer-encoding') 938 self.assertEqual(te, "chunked") 939 self.assertFalse(newreq.has_header('Content-length')) 940 941 with open(file_path, "rb") as f: 942 req = Request("http://example.com/", f, {"Content-Length": 30}) 943 newreq = h.do_request_(req) 944 self.assertEqual(int(newreq.get_header('Content-length')), 30) 945 self.assertFalse(newreq.has_header("Transfer-encoding")) 946 947 def test_http_body_fileobj(self): 948 # A file object - chunked encoding is used 949 # unless Content Length is already set. 950 # (Note that there are some subtle differences to a regular 951 # file, that is why we are testing both cases.) 952 953 h = urllib.request.AbstractHTTPHandler() 954 o = h.parent = MockOpener() 955 file_obj = io.BytesIO() 956 957 req = Request("http://example.com/", file_obj, {}) 958 newreq = h.do_request_(req) 959 self.assertEqual(newreq.get_header('Transfer-encoding'), 'chunked') 960 self.assertFalse(newreq.has_header('Content-length')) 961 962 headers = {"Content-Length": 30} 963 req = Request("http://example.com/", file_obj, headers) 964 newreq = h.do_request_(req) 965 self.assertEqual(int(newreq.get_header('Content-length')), 30) 966 self.assertFalse(newreq.has_header("Transfer-encoding")) 967 968 file_obj.close() 969 970 def test_http_body_pipe(self): 971 # A file reading from a pipe. 972 # A pipe cannot be seek'ed. There is no way to determine the 973 # content length up front. Thus, do_request_() should fall 974 # back to Transfer-encoding chunked. 975 976 h = urllib.request.AbstractHTTPHandler() 977 o = h.parent = MockOpener() 978 979 cmd = [sys.executable, "-c", r"pass"] 980 for headers in {}, {"Content-Length": 30}: 981 with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc: 982 req = Request("http://example.com/", proc.stdout, headers) 983 newreq = h.do_request_(req) 984 if not headers: 985 self.assertEqual(newreq.get_header('Content-length'), None) 986 self.assertEqual(newreq.get_header('Transfer-encoding'), 987 'chunked') 988 else: 989 self.assertEqual(int(newreq.get_header('Content-length')), 990 30) 991 992 def test_http_body_iterable(self): 993 # Generic iterable. There is no way to determine the content 994 # length up front. Fall back to Transfer-encoding chunked. 995 996 h = urllib.request.AbstractHTTPHandler() 997 o = h.parent = MockOpener() 998 999 def iterable_body(): 1000 yield b"one" 1001 1002 for headers in {}, {"Content-Length": 11}: 1003 req = Request("http://example.com/", iterable_body(), headers) 1004 newreq = h.do_request_(req) 1005 if not headers: 1006 self.assertEqual(newreq.get_header('Content-length'), None) 1007 self.assertEqual(newreq.get_header('Transfer-encoding'), 1008 'chunked') 1009 else: 1010 self.assertEqual(int(newreq.get_header('Content-length')), 11) 1011 1012 def test_http_body_empty_seq(self): 1013 # Zero-length iterable body should be treated like any other iterable 1014 h = urllib.request.AbstractHTTPHandler() 1015 h.parent = MockOpener() 1016 req = h.do_request_(Request("http://example.com/", ())) 1017 self.assertEqual(req.get_header("Transfer-encoding"), "chunked") 1018 self.assertFalse(req.has_header("Content-length")) 1019 1020 def test_http_body_array(self): 1021 # array.array Iterable - Content Length is calculated 1022 1023 h = urllib.request.AbstractHTTPHandler() 1024 o = h.parent = MockOpener() 1025 1026 iterable_array = array.array("I",[1,2,3,4]) 1027 1028 for headers in {}, {"Content-Length": 16}: 1029 req = Request("http://example.com/", iterable_array, headers) 1030 newreq = h.do_request_(req) 1031 self.assertEqual(int(newreq.get_header('Content-length')),16) 1032 1033 def test_http_handler_debuglevel(self): 1034 o = OpenerDirector() 1035 h = MockHTTPSHandler(debuglevel=1) 1036 o.add_handler(h) 1037 o.open("https://www.example.com") 1038 self.assertEqual(h._debuglevel, 1) 1039 1040 def test_http_doubleslash(self): 1041 # Checks the presence of any unnecessary double slash in url does not 1042 # break anything. Previously, a double slash directly after the host 1043 # could cause incorrect parsing. 1044 h = urllib.request.AbstractHTTPHandler() 1045 h.parent = MockOpener() 1046 1047 data = b"" 1048 ds_urls = [ 1049 "http://example.com/foo/bar/baz.html", 1050 "http://example.com//foo/bar/baz.html", 1051 "http://example.com/foo//bar/baz.html", 1052 "http://example.com/foo/bar//baz.html" 1053 ] 1054 1055 for ds_url in ds_urls: 1056 ds_req = Request(ds_url, data) 1057 1058 # Check whether host is determined correctly if there is no proxy 1059 np_ds_req = h.do_request_(ds_req) 1060 self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com") 1061 1062 # Check whether host is determined correctly if there is a proxy 1063 ds_req.set_proxy("someproxy:3128", None) 1064 p_ds_req = h.do_request_(ds_req) 1065 self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com") 1066 1067 def test_full_url_setter(self): 1068 # Checks to ensure that components are set correctly after setting the 1069 # full_url of a Request object 1070 1071 urls = [ 1072 'http://example.com?foo=bar#baz', 1073 'http://example.com?foo=bar&spam=eggs#bash', 1074 'http://example.com', 1075 ] 1076 1077 # testing a reusable request instance, but the url parameter is 1078 # required, so just use a dummy one to instantiate 1079 r = Request('http://example.com') 1080 for url in urls: 1081 r.full_url = url 1082 parsed = urlparse(url) 1083 1084 self.assertEqual(r.get_full_url(), url) 1085 # full_url setter uses splittag to split into components. 1086 # splittag sets the fragment as None while urlparse sets it to '' 1087 self.assertEqual(r.fragment or '', parsed.fragment) 1088 self.assertEqual(urlparse(r.get_full_url()).query, parsed.query) 1089 1090 def test_full_url_deleter(self): 1091 r = Request('http://www.example.com') 1092 del r.full_url 1093 self.assertIsNone(r.full_url) 1094 self.assertIsNone(r.fragment) 1095 self.assertEqual(r.selector, '') 1096 1097 def test_fixpath_in_weirdurls(self): 1098 # Issue4493: urllib2 to supply '/' when to urls where path does not 1099 # start with'/' 1100 1101 h = urllib.request.AbstractHTTPHandler() 1102 h.parent = MockOpener() 1103 1104 weird_url = 'http://www.python.org?getspam' 1105 req = Request(weird_url) 1106 newreq = h.do_request_(req) 1107 self.assertEqual(newreq.host, 'www.python.org') 1108 self.assertEqual(newreq.selector, '/?getspam') 1109 1110 url_without_path = 'http://www.python.org' 1111 req = Request(url_without_path) 1112 newreq = h.do_request_(req) 1113 self.assertEqual(newreq.host, 'www.python.org') 1114 self.assertEqual(newreq.selector, '') 1115 1116 def test_errors(self): 1117 h = urllib.request.HTTPErrorProcessor() 1118 o = h.parent = MockOpener() 1119 1120 url = "http://example.com/" 1121 req = Request(url) 1122 # all 2xx are passed through 1123 r = MockResponse(200, "OK", {}, "", url) 1124 newr = h.http_response(req, r) 1125 self.assertIs(r, newr) 1126 self.assertFalse(hasattr(o, "proto")) # o.error not called 1127 r = MockResponse(202, "Accepted", {}, "", url) 1128 newr = h.http_response(req, r) 1129 self.assertIs(r, newr) 1130 self.assertFalse(hasattr(o, "proto")) # o.error not called 1131 r = MockResponse(206, "Partial content", {}, "", url) 1132 newr = h.http_response(req, r) 1133 self.assertIs(r, newr) 1134 self.assertFalse(hasattr(o, "proto")) # o.error not called 1135 # anything else calls o.error (and MockOpener returns None, here) 1136 r = MockResponse(502, "Bad gateway", {}, "", url) 1137 self.assertIsNone(h.http_response(req, r)) 1138 self.assertEqual(o.proto, "http") # o.error called 1139 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {})) 1140 1141 def test_cookies(self): 1142 cj = MockCookieJar() 1143 h = urllib.request.HTTPCookieProcessor(cj) 1144 h.parent = MockOpener() 1145 1146 req = Request("http://example.com/") 1147 r = MockResponse(200, "OK", {}, "") 1148 newreq = h.http_request(req) 1149 self.assertIs(cj.ach_req, req) 1150 self.assertIs(cj.ach_req, newreq) 1151 self.assertEqual(req.origin_req_host, "example.com") 1152 self.assertFalse(req.unverifiable) 1153 newr = h.http_response(req, r) 1154 self.assertIs(cj.ec_req, req) 1155 self.assertIs(cj.ec_r, r) 1156 self.assertIs(r, newr) 1157 1158 def test_redirect(self): 1159 from_url = "http://example.com/a.html" 1160 to_url = "http://example.com/b.html" 1161 h = urllib.request.HTTPRedirectHandler() 1162 o = h.parent = MockOpener() 1163 1164 # ordinary redirect behaviour 1165 for code in 301, 302, 303, 307: 1166 for data in None, "blah\nblah\n": 1167 method = getattr(h, "http_error_%s" % code) 1168 req = Request(from_url, data) 1169 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1170 req.add_header("Nonsense", "viking=withhold") 1171 if data is not None: 1172 req.add_header("Content-Length", str(len(data))) 1173 req.add_unredirected_header("Spam", "spam") 1174 try: 1175 method(req, MockFile(), code, "Blah", 1176 MockHeaders({"location": to_url})) 1177 except urllib.error.HTTPError: 1178 # 307 in response to POST requires user OK 1179 self.assertEqual(code, 307) 1180 self.assertIsNotNone(data) 1181 self.assertEqual(o.req.get_full_url(), to_url) 1182 try: 1183 self.assertEqual(o.req.get_method(), "GET") 1184 except AttributeError: 1185 self.assertFalse(o.req.data) 1186 1187 # now it's a GET, there should not be headers regarding content 1188 # (possibly dragged from before being a POST) 1189 headers = [x.lower() for x in o.req.headers] 1190 self.assertNotIn("content-length", headers) 1191 self.assertNotIn("content-type", headers) 1192 1193 self.assertEqual(o.req.headers["Nonsense"], 1194 "viking=withhold") 1195 self.assertNotIn("Spam", o.req.headers) 1196 self.assertNotIn("Spam", o.req.unredirected_hdrs) 1197 1198 # loop detection 1199 req = Request(from_url) 1200 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1201 1202 def redirect(h, req, url=to_url): 1203 h.http_error_302(req, MockFile(), 302, "Blah", 1204 MockHeaders({"location": url})) 1205 # Note that the *original* request shares the same record of 1206 # redirections with the sub-requests caused by the redirections. 1207 1208 # detect infinite loop redirect of a URL to itself 1209 req = Request(from_url, origin_req_host="example.com") 1210 count = 0 1211 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1212 try: 1213 while 1: 1214 redirect(h, req, "http://example.com/") 1215 count = count + 1 1216 except urllib.error.HTTPError: 1217 # don't stop until max_repeats, because cookies may introduce state 1218 self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_repeats) 1219 1220 # detect endless non-repeating chain of redirects 1221 req = Request(from_url, origin_req_host="example.com") 1222 count = 0 1223 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1224 try: 1225 while 1: 1226 redirect(h, req, "http://example.com/%d" % count) 1227 count = count + 1 1228 except urllib.error.HTTPError: 1229 self.assertEqual(count, 1230 urllib.request.HTTPRedirectHandler.max_redirections) 1231 1232 def test_invalid_redirect(self): 1233 from_url = "http://example.com/a.html" 1234 valid_schemes = ['http','https','ftp'] 1235 invalid_schemes = ['file','imap','ldap'] 1236 schemeless_url = "example.com/b.html" 1237 h = urllib.request.HTTPRedirectHandler() 1238 o = h.parent = MockOpener() 1239 req = Request(from_url) 1240 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1241 1242 for scheme in invalid_schemes: 1243 invalid_url = scheme + '://' + schemeless_url 1244 self.assertRaises(urllib.error.HTTPError, h.http_error_302, 1245 req, MockFile(), 302, "Security Loophole", 1246 MockHeaders({"location": invalid_url})) 1247 1248 for scheme in valid_schemes: 1249 valid_url = scheme + '://' + schemeless_url 1250 h.http_error_302(req, MockFile(), 302, "That's fine", 1251 MockHeaders({"location": valid_url})) 1252 self.assertEqual(o.req.get_full_url(), valid_url) 1253 1254 def test_relative_redirect(self): 1255 from_url = "http://example.com/a.html" 1256 relative_url = "/b.html" 1257 h = urllib.request.HTTPRedirectHandler() 1258 o = h.parent = MockOpener() 1259 req = Request(from_url) 1260 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1261 1262 valid_url = urllib.parse.urljoin(from_url,relative_url) 1263 h.http_error_302(req, MockFile(), 302, "That's fine", 1264 MockHeaders({"location": valid_url})) 1265 self.assertEqual(o.req.get_full_url(), valid_url) 1266 1267 def test_cookie_redirect(self): 1268 # cookies shouldn't leak into redirected requests 1269 from http.cookiejar import CookieJar 1270 from test.test_http_cookiejar import interact_netscape 1271 1272 cj = CookieJar() 1273 interact_netscape(cj, "http://www.example.com/", "spam=eggs") 1274 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") 1275 hdeh = urllib.request.HTTPDefaultErrorHandler() 1276 hrh = urllib.request.HTTPRedirectHandler() 1277 cp = urllib.request.HTTPCookieProcessor(cj) 1278 o = build_test_opener(hh, hdeh, hrh, cp) 1279 o.open("http://www.example.com/") 1280 self.assertFalse(hh.req.has_header("Cookie")) 1281 1282 def test_redirect_fragment(self): 1283 redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n' 1284 hh = MockHTTPHandler(302, 'Location: ' + redirected_url) 1285 hdeh = urllib.request.HTTPDefaultErrorHandler() 1286 hrh = urllib.request.HTTPRedirectHandler() 1287 o = build_test_opener(hh, hdeh, hrh) 1288 fp = o.open('http://www.example.com') 1289 self.assertEqual(fp.geturl(), redirected_url.strip()) 1290 1291 def test_redirect_no_path(self): 1292 # Issue 14132: Relative redirect strips original path 1293 real_class = http.client.HTTPConnection 1294 response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n" 1295 http.client.HTTPConnection = test_urllib.fakehttp(response1) 1296 self.addCleanup(setattr, http.client, "HTTPConnection", real_class) 1297 urls = iter(("/path", "/path?query")) 1298 def request(conn, method, url, *pos, **kw): 1299 self.assertEqual(url, next(urls)) 1300 real_class.request(conn, method, url, *pos, **kw) 1301 # Change response for subsequent connection 1302 conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!" 1303 http.client.HTTPConnection.request = request 1304 fp = urllib.request.urlopen("http://python.org/path") 1305 self.assertEqual(fp.geturl(), "http://python.org/path?query") 1306 1307 def test_redirect_encoding(self): 1308 # Some characters in the redirect target may need special handling, 1309 # but most ASCII characters should be treated as already encoded 1310 class Handler(urllib.request.HTTPHandler): 1311 def http_open(self, req): 1312 result = self.do_open(self.connection, req) 1313 self.last_buf = self.connection.buf 1314 # Set up a normal response for the next request 1315 self.connection = test_urllib.fakehttp( 1316 b'HTTP/1.1 200 OK\r\n' 1317 b'Content-Length: 3\r\n' 1318 b'\r\n' 1319 b'123' 1320 ) 1321 return result 1322 handler = Handler() 1323 opener = urllib.request.build_opener(handler) 1324 tests = ( 1325 (b'/p\xC3\xA5-dansk/', b'/p%C3%A5-dansk/'), 1326 (b'/spaced%20path/', b'/spaced%20path/'), 1327 (b'/spaced path/', b'/spaced%20path/'), 1328 (b'/?p\xC3\xA5-dansk', b'/?p%C3%A5-dansk'), 1329 ) 1330 for [location, result] in tests: 1331 with self.subTest(repr(location)): 1332 handler.connection = test_urllib.fakehttp( 1333 b'HTTP/1.1 302 Redirect\r\n' 1334 b'Location: ' + location + b'\r\n' 1335 b'\r\n' 1336 ) 1337 response = opener.open('http://example.com/') 1338 expected = b'GET ' + result + b' ' 1339 request = handler.last_buf 1340 self.assertTrue(request.startswith(expected), repr(request)) 1341 1342 def test_proxy(self): 1343 u = "proxy.example.com:3128" 1344 for d in dict(http=u), dict(HTTP=u): 1345 o = OpenerDirector() 1346 ph = urllib.request.ProxyHandler(d) 1347 o.add_handler(ph) 1348 meth_spec = [ 1349 [("http_open", "return response")] 1350 ] 1351 handlers = add_ordered_mock_handlers(o, meth_spec) 1352 1353 req = Request("http://acme.example.com/") 1354 self.assertEqual(req.host, "acme.example.com") 1355 o.open(req) 1356 self.assertEqual(req.host, u) 1357 self.assertEqual([(handlers[0], "http_open")], 1358 [tup[0:2] for tup in o.calls]) 1359 1360 def test_proxy_no_proxy(self): 1361 os.environ['no_proxy'] = 'python.org' 1362 o = OpenerDirector() 1363 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com")) 1364 o.add_handler(ph) 1365 req = Request("http://www.perl.org/") 1366 self.assertEqual(req.host, "www.perl.org") 1367 o.open(req) 1368 self.assertEqual(req.host, "proxy.example.com") 1369 req = Request("http://www.python.org") 1370 self.assertEqual(req.host, "www.python.org") 1371 o.open(req) 1372 self.assertEqual(req.host, "www.python.org") 1373 del os.environ['no_proxy'] 1374 1375 def test_proxy_no_proxy_all(self): 1376 os.environ['no_proxy'] = '*' 1377 o = OpenerDirector() 1378 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com")) 1379 o.add_handler(ph) 1380 req = Request("http://www.python.org") 1381 self.assertEqual(req.host, "www.python.org") 1382 o.open(req) 1383 self.assertEqual(req.host, "www.python.org") 1384 del os.environ['no_proxy'] 1385 1386 def test_proxy_https(self): 1387 o = OpenerDirector() 1388 ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128")) 1389 o.add_handler(ph) 1390 meth_spec = [ 1391 [("https_open", "return response")] 1392 ] 1393 handlers = add_ordered_mock_handlers(o, meth_spec) 1394 1395 req = Request("https://www.example.com/") 1396 self.assertEqual(req.host, "www.example.com") 1397 o.open(req) 1398 self.assertEqual(req.host, "proxy.example.com:3128") 1399 self.assertEqual([(handlers[0], "https_open")], 1400 [tup[0:2] for tup in o.calls]) 1401 1402 def test_proxy_https_proxy_authorization(self): 1403 o = OpenerDirector() 1404 ph = urllib.request.ProxyHandler(dict(https='proxy.example.com:3128')) 1405 o.add_handler(ph) 1406 https_handler = MockHTTPSHandler() 1407 o.add_handler(https_handler) 1408 req = Request("https://www.example.com/") 1409 req.add_header("Proxy-Authorization", "FooBar") 1410 req.add_header("User-Agent", "Grail") 1411 self.assertEqual(req.host, "www.example.com") 1412 self.assertIsNone(req._tunnel_host) 1413 o.open(req) 1414 # Verify Proxy-Authorization gets tunneled to request. 1415 # httpsconn req_headers do not have the Proxy-Authorization header but 1416 # the req will have. 1417 self.assertNotIn(("Proxy-Authorization", "FooBar"), 1418 https_handler.httpconn.req_headers) 1419 self.assertIn(("User-Agent", "Grail"), 1420 https_handler.httpconn.req_headers) 1421 self.assertIsNotNone(req._tunnel_host) 1422 self.assertEqual(req.host, "proxy.example.com:3128") 1423 self.assertEqual(req.get_header("Proxy-authorization"), "FooBar") 1424 1425 @unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX") 1426 def test_osx_proxy_bypass(self): 1427 bypass = { 1428 'exclude_simple': False, 1429 'exceptions': ['foo.bar', '*.bar.com', '127.0.0.1', '10.10', 1430 '10.0/16'] 1431 } 1432 # Check hosts that should trigger the proxy bypass 1433 for host in ('foo.bar', 'www.bar.com', '127.0.0.1', '10.10.0.1', 1434 '10.0.0.1'): 1435 self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass), 1436 'expected bypass of %s to be True' % host) 1437 # Check hosts that should not trigger the proxy bypass 1438 for host in ('abc.foo.bar', 'bar.com', '127.0.0.2', '10.11.0.1', 1439 'notinbypass'): 1440 self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass), 1441 'expected bypass of %s to be False' % host) 1442 1443 # Check the exclude_simple flag 1444 bypass = {'exclude_simple': True, 'exceptions': []} 1445 self.assertTrue(_proxy_bypass_macosx_sysconf('test', bypass)) 1446 1447 # Check that invalid prefix lengths are ignored 1448 bypass = { 1449 'exclude_simple': False, 1450 'exceptions': [ '10.0.0.0/40', '172.19.10.0/24' ] 1451 } 1452 host = '172.19.10.5' 1453 self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass), 1454 'expected bypass of %s to be True' % host) 1455 host = '10.0.1.5' 1456 self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass), 1457 'expected bypass of %s to be False' % host) 1458 1459 def check_basic_auth(self, headers, realm): 1460 with self.subTest(realm=realm, headers=headers): 1461 opener = OpenerDirector() 1462 password_manager = MockPasswordManager() 1463 auth_handler = urllib.request.HTTPBasicAuthHandler(password_manager) 1464 body = '\r\n'.join(headers) + '\r\n\r\n' 1465 http_handler = MockHTTPHandler(401, body) 1466 opener.add_handler(auth_handler) 1467 opener.add_handler(http_handler) 1468 self._test_basic_auth(opener, auth_handler, "Authorization", 1469 realm, http_handler, password_manager, 1470 "http://acme.example.com/protected", 1471 "http://acme.example.com/protected") 1472 1473 def test_basic_auth(self): 1474 realm = "realm2@example.com" 1475 realm2 = "realm2@example.com" 1476 basic = f'Basic realm="{realm}"' 1477 basic2 = f'Basic realm="{realm2}"' 1478 other_no_realm = 'Otherscheme xxx' 1479 digest = (f'Digest realm="{realm2}", ' 1480 f'qop="auth, auth-int", ' 1481 f'nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", ' 1482 f'opaque="5ccc069c403ebaf9f0171e9517f40e41"') 1483 for realm_str in ( 1484 # test "quote" and 'quote' 1485 f'Basic realm="{realm}"', 1486 f"Basic realm='{realm}'", 1487 1488 # charset is ignored 1489 f'Basic realm="{realm}", charset="UTF-8"', 1490 1491 # Multiple challenges per header 1492 f'{basic}, {basic2}', 1493 f'{basic}, {other_no_realm}', 1494 f'{other_no_realm}, {basic}', 1495 f'{basic}, {digest}', 1496 f'{digest}, {basic}', 1497 ): 1498 headers = [f'WWW-Authenticate: {realm_str}'] 1499 self.check_basic_auth(headers, realm) 1500 1501 # no quote: expect a warning 1502 with support.check_warnings(("Basic Auth Realm was unquoted", 1503 UserWarning)): 1504 headers = [f'WWW-Authenticate: Basic realm={realm}'] 1505 self.check_basic_auth(headers, realm) 1506 1507 # Multiple headers: one challenge per header. 1508 # Use the first Basic realm. 1509 for challenges in ( 1510 [basic, basic2], 1511 [basic, digest], 1512 [digest, basic], 1513 ): 1514 headers = [f'WWW-Authenticate: {challenge}' 1515 for challenge in challenges] 1516 self.check_basic_auth(headers, realm) 1517 1518 def test_proxy_basic_auth(self): 1519 opener = OpenerDirector() 1520 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com:3128")) 1521 opener.add_handler(ph) 1522 password_manager = MockPasswordManager() 1523 auth_handler = urllib.request.ProxyBasicAuthHandler(password_manager) 1524 realm = "ACME Networks" 1525 http_handler = MockHTTPHandler( 1526 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1527 opener.add_handler(auth_handler) 1528 opener.add_handler(http_handler) 1529 self._test_basic_auth(opener, auth_handler, "Proxy-authorization", 1530 realm, http_handler, password_manager, 1531 "http://acme.example.com:3128/protected", 1532 "proxy.example.com:3128", 1533 ) 1534 1535 def test_basic_and_digest_auth_handlers(self): 1536 # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40* 1537 # response (http://python.org/sf/1479302), where it should instead 1538 # return None to allow another handler (especially 1539 # HTTPBasicAuthHandler) to handle the response. 1540 1541 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must 1542 # try digest first (since it's the strongest auth scheme), so we record 1543 # order of calls here to check digest comes first: 1544 class RecordingOpenerDirector(OpenerDirector): 1545 def __init__(self): 1546 OpenerDirector.__init__(self) 1547 self.recorded = [] 1548 1549 def record(self, info): 1550 self.recorded.append(info) 1551 1552 class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler): 1553 def http_error_401(self, *args, **kwds): 1554 self.parent.record("digest") 1555 urllib.request.HTTPDigestAuthHandler.http_error_401(self, 1556 *args, **kwds) 1557 1558 class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler): 1559 def http_error_401(self, *args, **kwds): 1560 self.parent.record("basic") 1561 urllib.request.HTTPBasicAuthHandler.http_error_401(self, 1562 *args, **kwds) 1563 1564 opener = RecordingOpenerDirector() 1565 password_manager = MockPasswordManager() 1566 digest_handler = TestDigestAuthHandler(password_manager) 1567 basic_handler = TestBasicAuthHandler(password_manager) 1568 realm = "ACME Networks" 1569 http_handler = MockHTTPHandler( 1570 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1571 opener.add_handler(basic_handler) 1572 opener.add_handler(digest_handler) 1573 opener.add_handler(http_handler) 1574 1575 # check basic auth isn't blocked by digest handler failing 1576 self._test_basic_auth(opener, basic_handler, "Authorization", 1577 realm, http_handler, password_manager, 1578 "http://acme.example.com/protected", 1579 "http://acme.example.com/protected", 1580 ) 1581 # check digest was tried before basic (twice, because 1582 # _test_basic_auth called .open() twice) 1583 self.assertEqual(opener.recorded, ["digest", "basic"]*2) 1584 1585 def test_unsupported_auth_digest_handler(self): 1586 opener = OpenerDirector() 1587 # While using DigestAuthHandler 1588 digest_auth_handler = urllib.request.HTTPDigestAuthHandler(None) 1589 http_handler = MockHTTPHandler( 1590 401, 'WWW-Authenticate: Kerberos\r\n\r\n') 1591 opener.add_handler(digest_auth_handler) 1592 opener.add_handler(http_handler) 1593 self.assertRaises(ValueError, opener.open, "http://www.example.com") 1594 1595 def test_unsupported_auth_basic_handler(self): 1596 # While using BasicAuthHandler 1597 opener = OpenerDirector() 1598 basic_auth_handler = urllib.request.HTTPBasicAuthHandler(None) 1599 http_handler = MockHTTPHandler( 1600 401, 'WWW-Authenticate: NTLM\r\n\r\n') 1601 opener.add_handler(basic_auth_handler) 1602 opener.add_handler(http_handler) 1603 self.assertRaises(ValueError, opener.open, "http://www.example.com") 1604 1605 def _test_basic_auth(self, opener, auth_handler, auth_header, 1606 realm, http_handler, password_manager, 1607 request_url, protected_url): 1608 import base64 1609 user, password = "wile", "coyote" 1610 1611 # .add_password() fed through to password manager 1612 auth_handler.add_password(realm, request_url, user, password) 1613 self.assertEqual(realm, password_manager.realm) 1614 self.assertEqual(request_url, password_manager.url) 1615 self.assertEqual(user, password_manager.user) 1616 self.assertEqual(password, password_manager.password) 1617 1618 opener.open(request_url) 1619 1620 # should have asked the password manager for the username/password 1621 self.assertEqual(password_manager.target_realm, realm) 1622 self.assertEqual(password_manager.target_url, protected_url) 1623 1624 # expect one request without authorization, then one with 1625 self.assertEqual(len(http_handler.requests), 2) 1626 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1627 userpass = bytes('%s:%s' % (user, password), "ascii") 1628 auth_hdr_value = ('Basic ' + 1629 base64.encodebytes(userpass).strip().decode()) 1630 self.assertEqual(http_handler.requests[1].get_header(auth_header), 1631 auth_hdr_value) 1632 self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header], 1633 auth_hdr_value) 1634 # if the password manager can't find a password, the handler won't 1635 # handle the HTTP auth error 1636 password_manager.user = password_manager.password = None 1637 http_handler.reset() 1638 opener.open(request_url) 1639 self.assertEqual(len(http_handler.requests), 1) 1640 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1641 1642 def test_basic_prior_auth_auto_send(self): 1643 # Assume already authenticated if is_authenticated=True 1644 # for APIs like Github that don't return 401 1645 1646 user, password = "wile", "coyote" 1647 request_url = "http://acme.example.com/protected" 1648 1649 http_handler = MockHTTPHandlerCheckAuth(200) 1650 1651 pwd_manager = HTTPPasswordMgrWithPriorAuth() 1652 auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) 1653 auth_prior_handler.add_password( 1654 None, request_url, user, password, is_authenticated=True) 1655 1656 is_auth = pwd_manager.is_authenticated(request_url) 1657 self.assertTrue(is_auth) 1658 1659 opener = OpenerDirector() 1660 opener.add_handler(auth_prior_handler) 1661 opener.add_handler(http_handler) 1662 1663 opener.open(request_url) 1664 1665 # expect request to be sent with auth header 1666 self.assertTrue(http_handler.has_auth_header) 1667 1668 def test_basic_prior_auth_send_after_first_success(self): 1669 # Auto send auth header after authentication is successful once 1670 1671 user, password = 'wile', 'coyote' 1672 request_url = 'http://acme.example.com/protected' 1673 realm = 'ACME' 1674 1675 pwd_manager = HTTPPasswordMgrWithPriorAuth() 1676 auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) 1677 auth_prior_handler.add_password(realm, request_url, user, password) 1678 1679 is_auth = pwd_manager.is_authenticated(request_url) 1680 self.assertFalse(is_auth) 1681 1682 opener = OpenerDirector() 1683 opener.add_handler(auth_prior_handler) 1684 1685 http_handler = MockHTTPHandler( 1686 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % None) 1687 opener.add_handler(http_handler) 1688 1689 opener.open(request_url) 1690 1691 is_auth = pwd_manager.is_authenticated(request_url) 1692 self.assertTrue(is_auth) 1693 1694 http_handler = MockHTTPHandlerCheckAuth(200) 1695 self.assertFalse(http_handler.has_auth_header) 1696 1697 opener = OpenerDirector() 1698 opener.add_handler(auth_prior_handler) 1699 opener.add_handler(http_handler) 1700 1701 # After getting 200 from MockHTTPHandler 1702 # Next request sends header in the first request 1703 opener.open(request_url) 1704 1705 # expect request to be sent with auth header 1706 self.assertTrue(http_handler.has_auth_header) 1707 1708 def test_http_closed(self): 1709 """Test the connection is cleaned up when the response is closed""" 1710 for (transfer, data) in ( 1711 ("Connection: close", b"data"), 1712 ("Transfer-Encoding: chunked", b"4\r\ndata\r\n0\r\n\r\n"), 1713 ("Content-Length: 4", b"data"), 1714 ): 1715 header = "HTTP/1.1 200 OK\r\n{}\r\n\r\n".format(transfer) 1716 conn = test_urllib.fakehttp(header.encode() + data) 1717 handler = urllib.request.AbstractHTTPHandler() 1718 req = Request("http://dummy/") 1719 req.timeout = None 1720 with handler.do_open(conn, req) as resp: 1721 resp.read() 1722 self.assertTrue(conn.fakesock.closed, 1723 "Connection not closed with {!r}".format(transfer)) 1724 1725 def test_invalid_closed(self): 1726 """Test the connection is cleaned up after an invalid response""" 1727 conn = test_urllib.fakehttp(b"") 1728 handler = urllib.request.AbstractHTTPHandler() 1729 req = Request("http://dummy/") 1730 req.timeout = None 1731 with self.assertRaises(http.client.BadStatusLine): 1732 handler.do_open(conn, req) 1733 self.assertTrue(conn.fakesock.closed, "Connection not closed") 1734 1735 1736class MiscTests(unittest.TestCase): 1737 1738 def opener_has_handler(self, opener, handler_class): 1739 self.assertTrue(any(h.__class__ == handler_class 1740 for h in opener.handlers)) 1741 1742 def test_build_opener(self): 1743 class MyHTTPHandler(urllib.request.HTTPHandler): 1744 pass 1745 1746 class FooHandler(urllib.request.BaseHandler): 1747 def foo_open(self): 1748 pass 1749 1750 class BarHandler(urllib.request.BaseHandler): 1751 def bar_open(self): 1752 pass 1753 1754 build_opener = urllib.request.build_opener 1755 1756 o = build_opener(FooHandler, BarHandler) 1757 self.opener_has_handler(o, FooHandler) 1758 self.opener_has_handler(o, BarHandler) 1759 1760 # can take a mix of classes and instances 1761 o = build_opener(FooHandler, BarHandler()) 1762 self.opener_has_handler(o, FooHandler) 1763 self.opener_has_handler(o, BarHandler) 1764 1765 # subclasses of default handlers override default handlers 1766 o = build_opener(MyHTTPHandler) 1767 self.opener_has_handler(o, MyHTTPHandler) 1768 1769 # a particular case of overriding: default handlers can be passed 1770 # in explicitly 1771 o = build_opener() 1772 self.opener_has_handler(o, urllib.request.HTTPHandler) 1773 o = build_opener(urllib.request.HTTPHandler) 1774 self.opener_has_handler(o, urllib.request.HTTPHandler) 1775 o = build_opener(urllib.request.HTTPHandler()) 1776 self.opener_has_handler(o, urllib.request.HTTPHandler) 1777 1778 # Issue2670: multiple handlers sharing the same base class 1779 class MyOtherHTTPHandler(urllib.request.HTTPHandler): 1780 pass 1781 1782 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) 1783 self.opener_has_handler(o, MyHTTPHandler) 1784 self.opener_has_handler(o, MyOtherHTTPHandler) 1785 1786 @unittest.skipUnless(support.is_resource_enabled('network'), 1787 'test requires network access') 1788 def test_issue16464(self): 1789 with support.transient_internet("http://www.example.com/"): 1790 opener = urllib.request.build_opener() 1791 request = urllib.request.Request("http://www.example.com/") 1792 self.assertEqual(None, request.data) 1793 1794 opener.open(request, "1".encode("us-ascii")) 1795 self.assertEqual(b"1", request.data) 1796 self.assertEqual("1", request.get_header("Content-length")) 1797 1798 opener.open(request, "1234567890".encode("us-ascii")) 1799 self.assertEqual(b"1234567890", request.data) 1800 self.assertEqual("10", request.get_header("Content-length")) 1801 1802 def test_HTTPError_interface(self): 1803 """ 1804 Issue 13211 reveals that HTTPError didn't implement the URLError 1805 interface even though HTTPError is a subclass of URLError. 1806 """ 1807 msg = 'something bad happened' 1808 url = code = fp = None 1809 hdrs = 'Content-Length: 42' 1810 err = urllib.error.HTTPError(url, code, msg, hdrs, fp) 1811 self.assertTrue(hasattr(err, 'reason')) 1812 self.assertEqual(err.reason, 'something bad happened') 1813 self.assertTrue(hasattr(err, 'headers')) 1814 self.assertEqual(err.headers, 'Content-Length: 42') 1815 expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg) 1816 self.assertEqual(str(err), expected_errmsg) 1817 expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg) 1818 self.assertEqual(repr(err), expected_errmsg) 1819 1820 def test_parse_proxy(self): 1821 parse_proxy_test_cases = [ 1822 ('proxy.example.com', 1823 (None, None, None, 'proxy.example.com')), 1824 ('proxy.example.com:3128', 1825 (None, None, None, 'proxy.example.com:3128')), 1826 ('proxy.example.com', (None, None, None, 'proxy.example.com')), 1827 ('proxy.example.com:3128', 1828 (None, None, None, 'proxy.example.com:3128')), 1829 # The authority component may optionally include userinfo 1830 # (assumed to be # username:password): 1831 ('joe:password@proxy.example.com', 1832 (None, 'joe', 'password', 'proxy.example.com')), 1833 ('joe:password@proxy.example.com:3128', 1834 (None, 'joe', 'password', 'proxy.example.com:3128')), 1835 #Examples with URLS 1836 ('http://proxy.example.com/', 1837 ('http', None, None, 'proxy.example.com')), 1838 ('http://proxy.example.com:3128/', 1839 ('http', None, None, 'proxy.example.com:3128')), 1840 ('http://joe:password@proxy.example.com/', 1841 ('http', 'joe', 'password', 'proxy.example.com')), 1842 ('http://joe:password@proxy.example.com:3128', 1843 ('http', 'joe', 'password', 'proxy.example.com:3128')), 1844 # Everything after the authority is ignored 1845 ('ftp://joe:password@proxy.example.com/rubbish:3128', 1846 ('ftp', 'joe', 'password', 'proxy.example.com')), 1847 # Test for no trailing '/' case 1848 ('http://joe:password@proxy.example.com', 1849 ('http', 'joe', 'password', 'proxy.example.com')), 1850 # Testcases with '/' character in username, password 1851 ('http://user/name:password@localhost:22', 1852 ('http', 'user/name', 'password', 'localhost:22')), 1853 ('http://username:pass/word@localhost:22', 1854 ('http', 'username', 'pass/word', 'localhost:22')), 1855 ('http://user/name:pass/word@localhost:22', 1856 ('http', 'user/name', 'pass/word', 'localhost:22')), 1857 ] 1858 1859 1860 for tc, expected in parse_proxy_test_cases: 1861 self.assertEqual(_parse_proxy(tc), expected) 1862 1863 self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'), 1864 1865 def test_unsupported_algorithm(self): 1866 handler = AbstractDigestAuthHandler() 1867 with self.assertRaises(ValueError) as exc: 1868 handler.get_algorithm_impls('invalid') 1869 self.assertEqual( 1870 str(exc.exception), 1871 "Unsupported digest authentication algorithm 'invalid'" 1872 ) 1873 1874 1875class RequestTests(unittest.TestCase): 1876 class PutRequest(Request): 1877 method = 'PUT' 1878 1879 def setUp(self): 1880 self.get = Request("http://www.python.org/~jeremy/") 1881 self.post = Request("http://www.python.org/~jeremy/", 1882 "data", 1883 headers={"X-Test": "test"}) 1884 self.head = Request("http://www.python.org/~jeremy/", method='HEAD') 1885 self.put = self.PutRequest("http://www.python.org/~jeremy/") 1886 self.force_post = self.PutRequest("http://www.python.org/~jeremy/", 1887 method="POST") 1888 1889 def test_method(self): 1890 self.assertEqual("POST", self.post.get_method()) 1891 self.assertEqual("GET", self.get.get_method()) 1892 self.assertEqual("HEAD", self.head.get_method()) 1893 self.assertEqual("PUT", self.put.get_method()) 1894 self.assertEqual("POST", self.force_post.get_method()) 1895 1896 def test_data(self): 1897 self.assertFalse(self.get.data) 1898 self.assertEqual("GET", self.get.get_method()) 1899 self.get.data = "spam" 1900 self.assertTrue(self.get.data) 1901 self.assertEqual("POST", self.get.get_method()) 1902 1903 # issue 16464 1904 # if we change data we need to remove content-length header 1905 # (cause it's most probably calculated for previous value) 1906 def test_setting_data_should_remove_content_length(self): 1907 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1908 self.get.add_unredirected_header("Content-length", 42) 1909 self.assertEqual(42, self.get.unredirected_hdrs["Content-length"]) 1910 self.get.data = "spam" 1911 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1912 1913 # issue 17485 same for deleting data. 1914 def test_deleting_data_should_remove_content_length(self): 1915 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1916 self.get.data = 'foo' 1917 self.get.add_unredirected_header("Content-length", 3) 1918 self.assertEqual(3, self.get.unredirected_hdrs["Content-length"]) 1919 del self.get.data 1920 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1921 1922 def test_get_full_url(self): 1923 self.assertEqual("http://www.python.org/~jeremy/", 1924 self.get.get_full_url()) 1925 1926 def test_selector(self): 1927 self.assertEqual("/~jeremy/", self.get.selector) 1928 req = Request("http://www.python.org/") 1929 self.assertEqual("/", req.selector) 1930 1931 def test_get_type(self): 1932 self.assertEqual("http", self.get.type) 1933 1934 def test_get_host(self): 1935 self.assertEqual("www.python.org", self.get.host) 1936 1937 def test_get_host_unquote(self): 1938 req = Request("http://www.%70ython.org/") 1939 self.assertEqual("www.python.org", req.host) 1940 1941 def test_proxy(self): 1942 self.assertFalse(self.get.has_proxy()) 1943 self.get.set_proxy("www.perl.org", "http") 1944 self.assertTrue(self.get.has_proxy()) 1945 self.assertEqual("www.python.org", self.get.origin_req_host) 1946 self.assertEqual("www.perl.org", self.get.host) 1947 1948 def test_wrapped_url(self): 1949 req = Request("<URL:http://www.python.org>") 1950 self.assertEqual("www.python.org", req.host) 1951 1952 def test_url_fragment(self): 1953 req = Request("http://www.python.org/?qs=query#fragment=true") 1954 self.assertEqual("/?qs=query", req.selector) 1955 req = Request("http://www.python.org/#fun=true") 1956 self.assertEqual("/", req.selector) 1957 1958 # Issue 11703: geturl() omits fragment in the original URL. 1959 url = 'http://docs.python.org/library/urllib2.html#OK' 1960 req = Request(url) 1961 self.assertEqual(req.get_full_url(), url) 1962 1963 def test_url_fullurl_get_full_url(self): 1964 urls = ['http://docs.python.org', 1965 'http://docs.python.org/library/urllib2.html#OK', 1966 'http://www.python.org/?qs=query#fragment=true'] 1967 for url in urls: 1968 req = Request(url) 1969 self.assertEqual(req.get_full_url(), req.full_url) 1970 1971 1972if __name__ == "__main__": 1973 unittest.main() 1974