1import unittest 2from test import test_support 3from test import test_urllib 4 5import os 6import socket 7import StringIO 8 9import urllib2 10from urllib2 import Request, OpenerDirector, AbstractDigestAuthHandler 11import httplib 12 13try: 14 import ssl 15except ImportError: 16 ssl = None 17 18from test.test_urllib import FakeHTTPMixin 19 20 21# XXX 22# Request 23# CacheFTPHandler (hard to write) 24# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler 25 26class TrivialTests(unittest.TestCase): 27 def test_trivial(self): 28 # A couple trivial tests 29 30 self.assertRaises(ValueError, urllib2.urlopen, 'bogus url') 31 32 # XXX Name hacking to get this to work on Windows. 33 fname = os.path.abspath(urllib2.__file__).replace(os.sep, '/') 34 35 # And more hacking to get it to work on MacOS. This assumes 36 # urllib.pathname2url works, unfortunately... 37 if os.name == 'riscos': 38 import string 39 fname = os.expand(fname) 40 fname = fname.translate(string.maketrans("/.", "./")) 41 42 if os.name == 'nt': 43 file_url = "file:///%s" % fname 44 else: 45 file_url = "file://%s" % fname 46 47 f = urllib2.urlopen(file_url) 48 49 buf = f.read() 50 f.close() 51 52 def test_parse_http_list(self): 53 tests = [('a,b,c', ['a', 'b', 'c']), 54 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), 55 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']), 56 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] 57 for string, list in tests: 58 self.assertEqual(urllib2.parse_http_list(string), list) 59 60 @unittest.skipUnless(ssl, "ssl module required") 61 def test_cafile_and_context(self): 62 context = ssl.create_default_context() 63 with self.assertRaises(ValueError): 64 urllib2.urlopen( 65 "https://localhost", cafile="/nonexistent/path", context=context 66 ) 67 68 69def test_request_headers_dict(): 70 """ 71 The Request.headers dictionary is not a documented interface. It should 72 stay that way, because the complete set of headers are only accessible 73 through the .get_header(), .has_header(), .header_items() interface. 74 However, .headers pre-dates those methods, and so real code will be using 75 the dictionary. 76 77 The introduction in 2.4 of those methods was a mistake for the same reason: 78 code that previously saw all (urllib2 user)-provided headers in .headers 79 now sees only a subset (and the function interface is ugly and incomplete). 80 A better change would have been to replace .headers dict with a dict 81 subclass (or UserDict.DictMixin instance?) that preserved the .headers 82 interface and also provided access to the "unredirected" headers. It's 83 probably too late to fix that, though. 84 85 86 Check .capitalize() case normalization: 87 88 >>> url = "http://example.com" 89 >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"] 90 'blah' 91 >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"] 92 'blah' 93 94 Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError, 95 but that could be changed in future. 96 97 """ 98 99def test_request_headers_methods(): 100 """ 101 Note the case normalization of header names here, to .capitalize()-case. 102 This should be preserved for backwards-compatibility. (In the HTTP case, 103 normalization to .title()-case is done by urllib2 before sending headers to 104 httplib). 105 106 >>> url = "http://example.com" 107 >>> r = Request(url, headers={"Spam-eggs": "blah"}) 108 >>> r.has_header("Spam-eggs") 109 True 110 >>> r.header_items() 111 [('Spam-eggs', 'blah')] 112 >>> r.add_header("Foo-Bar", "baz") 113 >>> items = r.header_items() 114 >>> items.sort() 115 >>> items 116 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')] 117 118 Note that e.g. r.has_header("spam-EggS") is currently False, and 119 r.get_header("spam-EggS") returns None, but that could be changed in 120 future. 121 122 >>> r.has_header("Not-there") 123 False 124 >>> print r.get_header("Not-there") 125 None 126 >>> r.get_header("Not-there", "default") 127 'default' 128 129 """ 130 131 132def test_password_manager(self): 133 """ 134 >>> mgr = urllib2.HTTPPasswordMgr() 135 >>> add = mgr.add_password 136 >>> add("Some Realm", "http://example.com/", "joe", "password") 137 >>> add("Some Realm", "http://example.com/ni", "ni", "ni") 138 >>> add("c", "http://example.com/foo", "foo", "ni") 139 >>> add("c", "http://example.com/bar", "bar", "nini") 140 >>> add("b", "http://example.com/", "first", "blah") 141 >>> add("b", "http://example.com/", "second", "spam") 142 >>> add("a", "http://example.com", "1", "a") 143 >>> add("Some Realm", "http://c.example.com:3128", "3", "c") 144 >>> add("Some Realm", "d.example.com", "4", "d") 145 >>> add("Some Realm", "e.example.com:3128", "5", "e") 146 147 >>> mgr.find_user_password("Some Realm", "example.com") 148 ('joe', 'password') 149 >>> mgr.find_user_password("Some Realm", "http://example.com") 150 ('joe', 'password') 151 >>> mgr.find_user_password("Some Realm", "http://example.com/") 152 ('joe', 'password') 153 >>> mgr.find_user_password("Some Realm", "http://example.com/spam") 154 ('joe', 'password') 155 >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam") 156 ('joe', 'password') 157 >>> mgr.find_user_password("c", "http://example.com/foo") 158 ('foo', 'ni') 159 >>> mgr.find_user_password("c", "http://example.com/bar") 160 ('bar', 'nini') 161 162 Actually, this is really undefined ATM 163## Currently, we use the highest-level path where more than one match: 164 165## >>> mgr.find_user_password("Some Realm", "http://example.com/ni") 166## ('joe', 'password') 167 168 Use latest add_password() in case of conflict: 169 170 >>> mgr.find_user_password("b", "http://example.com/") 171 ('second', 'spam') 172 173 No special relationship between a.example.com and example.com: 174 175 >>> mgr.find_user_password("a", "http://example.com/") 176 ('1', 'a') 177 >>> mgr.find_user_password("a", "http://a.example.com/") 178 (None, None) 179 180 Ports: 181 182 >>> mgr.find_user_password("Some Realm", "c.example.com") 183 (None, None) 184 >>> mgr.find_user_password("Some Realm", "c.example.com:3128") 185 ('3', 'c') 186 >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128") 187 ('3', 'c') 188 >>> mgr.find_user_password("Some Realm", "d.example.com") 189 ('4', 'd') 190 >>> mgr.find_user_password("Some Realm", "e.example.com:3128") 191 ('5', 'e') 192 193 """ 194 pass 195 196 197def test_password_manager_default_port(self): 198 """ 199 >>> mgr = urllib2.HTTPPasswordMgr() 200 >>> add = mgr.add_password 201 202 The point to note here is that we can't guess the default port if there's 203 no scheme. This applies to both add_password and find_user_password. 204 205 >>> add("f", "http://g.example.com:80", "10", "j") 206 >>> add("g", "http://h.example.com", "11", "k") 207 >>> add("h", "i.example.com:80", "12", "l") 208 >>> add("i", "j.example.com", "13", "m") 209 >>> mgr.find_user_password("f", "g.example.com:100") 210 (None, None) 211 >>> mgr.find_user_password("f", "g.example.com:80") 212 ('10', 'j') 213 >>> mgr.find_user_password("f", "g.example.com") 214 (None, None) 215 >>> mgr.find_user_password("f", "http://g.example.com:100") 216 (None, None) 217 >>> mgr.find_user_password("f", "http://g.example.com:80") 218 ('10', 'j') 219 >>> mgr.find_user_password("f", "http://g.example.com") 220 ('10', 'j') 221 >>> mgr.find_user_password("g", "h.example.com") 222 ('11', 'k') 223 >>> mgr.find_user_password("g", "h.example.com:80") 224 ('11', 'k') 225 >>> mgr.find_user_password("g", "http://h.example.com:80") 226 ('11', 'k') 227 >>> mgr.find_user_password("h", "i.example.com") 228 (None, None) 229 >>> mgr.find_user_password("h", "i.example.com:80") 230 ('12', 'l') 231 >>> mgr.find_user_password("h", "http://i.example.com:80") 232 ('12', 'l') 233 >>> mgr.find_user_password("i", "j.example.com") 234 ('13', 'm') 235 >>> mgr.find_user_password("i", "j.example.com:80") 236 (None, None) 237 >>> mgr.find_user_password("i", "http://j.example.com") 238 ('13', 'm') 239 >>> mgr.find_user_password("i", "http://j.example.com:80") 240 (None, None) 241 242 """ 243 244class MockOpener: 245 addheaders = [] 246 def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 247 self.req, self.data, self.timeout = req, data, timeout 248 def error(self, proto, *args): 249 self.proto, self.args = proto, args 250 251class MockFile: 252 def read(self, count=None): pass 253 def readline(self, count=None): pass 254 def close(self): pass 255 256class MockHeaders(dict): 257 def getheaders(self, name): 258 return self.values() 259 260class MockResponse(StringIO.StringIO): 261 def __init__(self, code, msg, headers, data, url=None): 262 StringIO.StringIO.__init__(self, data) 263 self.code, self.msg, self.headers, self.url = code, msg, headers, url 264 def info(self): 265 return self.headers 266 def geturl(self): 267 return self.url 268 269class MockCookieJar: 270 def add_cookie_header(self, request): 271 self.ach_req = request 272 def extract_cookies(self, response, request): 273 self.ec_req, self.ec_r = request, response 274 275class FakeMethod: 276 def __init__(self, meth_name, action, handle): 277 self.meth_name = meth_name 278 self.handle = handle 279 self.action = action 280 def __call__(self, *args): 281 return self.handle(self.meth_name, self.action, *args) 282 283class MockHTTPResponse: 284 def __init__(self, fp, msg, status, reason): 285 self.fp = fp 286 self.msg = msg 287 self.status = status 288 self.reason = reason 289 def read(self): 290 return '' 291 292class MockHTTPClass: 293 def __init__(self): 294 self.req_headers = [] 295 self.data = None 296 self.raise_on_endheaders = False 297 self._tunnel_headers = {} 298 299 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 300 self.host = host 301 self.timeout = timeout 302 return self 303 304 def set_debuglevel(self, level): 305 self.level = level 306 307 def set_tunnel(self, host, port=None, headers=None): 308 self._tunnel_host = host 309 self._tunnel_port = port 310 if headers: 311 self._tunnel_headers = headers 312 else: 313 self._tunnel_headers.clear() 314 315 def request(self, method, url, body=None, headers=None): 316 self.method = method 317 self.selector = url 318 if headers is not None: 319 self.req_headers += headers.items() 320 self.req_headers.sort() 321 if body: 322 self.data = body 323 if self.raise_on_endheaders: 324 import socket 325 raise socket.error() 326 327 def getresponse(self): 328 return MockHTTPResponse(MockFile(), {}, 200, "OK") 329 330 def close(self): 331 pass 332 333class MockHandler: 334 # useful for testing handler machinery 335 # see add_ordered_mock_handlers() docstring 336 handler_order = 500 337 def __init__(self, methods): 338 self._define_methods(methods) 339 def _define_methods(self, methods): 340 for spec in methods: 341 if len(spec) == 2: name, action = spec 342 else: name, action = spec, None 343 meth = FakeMethod(name, action, self.handle) 344 setattr(self.__class__, name, meth) 345 def handle(self, fn_name, action, *args, **kwds): 346 self.parent.calls.append((self, fn_name, args, kwds)) 347 if action is None: 348 return None 349 elif action == "return self": 350 return self 351 elif action == "return response": 352 res = MockResponse(200, "OK", {}, "") 353 return res 354 elif action == "return request": 355 return Request("http://blah/") 356 elif action.startswith("error"): 357 code = action[action.rfind(" ")+1:] 358 try: 359 code = int(code) 360 except ValueError: 361 pass 362 res = MockResponse(200, "OK", {}, "") 363 return self.parent.error("http", args[0], res, code, "", {}) 364 elif action == "raise": 365 raise urllib2.URLError("blah") 366 assert False 367 def close(self): pass 368 def add_parent(self, parent): 369 self.parent = parent 370 self.parent.calls = [] 371 def __lt__(self, other): 372 if not hasattr(other, "handler_order"): 373 # No handler_order, leave in original order. Yuck. 374 return True 375 return self.handler_order < other.handler_order 376 377def add_ordered_mock_handlers(opener, meth_spec): 378 """Create MockHandlers and add them to an OpenerDirector. 379 380 meth_spec: list of lists of tuples and strings defining methods to define 381 on handlers. eg: 382 383 [["http_error", "ftp_open"], ["http_open"]] 384 385 defines methods .http_error() and .ftp_open() on one handler, and 386 .http_open() on another. These methods just record their arguments and 387 return None. Using a tuple instead of a string causes the method to 388 perform some action (see MockHandler.handle()), eg: 389 390 [["http_error"], [("http_open", "return request")]] 391 392 defines .http_error() on one handler (which simply returns None), and 393 .http_open() on another handler, which returns a Request object. 394 395 """ 396 handlers = [] 397 count = 0 398 for meths in meth_spec: 399 class MockHandlerSubclass(MockHandler): pass 400 h = MockHandlerSubclass(meths) 401 h.handler_order += count 402 h.add_parent(opener) 403 count = count + 1 404 handlers.append(h) 405 opener.add_handler(h) 406 return handlers 407 408def build_test_opener(*handler_instances): 409 opener = OpenerDirector() 410 for h in handler_instances: 411 opener.add_handler(h) 412 return opener 413 414class MockHTTPHandler(urllib2.BaseHandler): 415 # useful for testing redirections and auth 416 # sends supplied headers and code as first response 417 # sends 200 OK as second response 418 def __init__(self, code, headers): 419 self.code = code 420 self.headers = headers 421 self.reset() 422 def reset(self): 423 self._count = 0 424 self.requests = [] 425 def http_open(self, req): 426 import mimetools, copy 427 from StringIO import StringIO 428 self.requests.append(copy.deepcopy(req)) 429 if self._count == 0: 430 self._count = self._count + 1 431 name = httplib.responses[self.code] 432 msg = mimetools.Message(StringIO(self.headers)) 433 return self.parent.error( 434 "http", req, MockFile(), self.code, name, msg) 435 else: 436 self.req = req 437 msg = mimetools.Message(StringIO("\r\n\r\n")) 438 return MockResponse(200, "OK", msg, "", req.get_full_url()) 439 440class MockHTTPSHandler(urllib2.AbstractHTTPHandler): 441 # Useful for testing the Proxy-Authorization request by verifying the 442 # properties of httpcon 443 444 def __init__(self): 445 urllib2.AbstractHTTPHandler.__init__(self) 446 self.httpconn = MockHTTPClass() 447 448 def https_open(self, req): 449 return self.do_open(self.httpconn, req) 450 451class MockPasswordManager: 452 def add_password(self, realm, uri, user, password): 453 self.realm = realm 454 self.url = uri 455 self.user = user 456 self.password = password 457 def find_user_password(self, realm, authuri): 458 self.target_realm = realm 459 self.target_url = authuri 460 return self.user, self.password 461 462 463class OpenerDirectorTests(unittest.TestCase): 464 465 def test_add_non_handler(self): 466 class NonHandler(object): 467 pass 468 self.assertRaises(TypeError, 469 OpenerDirector().add_handler, NonHandler()) 470 471 def test_badly_named_methods(self): 472 # test work-around for three methods that accidentally follow the 473 # naming conventions for handler methods 474 # (*_open() / *_request() / *_response()) 475 476 # These used to call the accidentally-named methods, causing a 477 # TypeError in real code; here, returning self from these mock 478 # methods would either cause no exception, or AttributeError. 479 480 from urllib2 import URLError 481 482 o = OpenerDirector() 483 meth_spec = [ 484 [("do_open", "return self"), ("proxy_open", "return self")], 485 [("redirect_request", "return self")], 486 ] 487 handlers = add_ordered_mock_handlers(o, meth_spec) 488 o.add_handler(urllib2.UnknownHandler()) 489 for scheme in "do", "proxy", "redirect": 490 self.assertRaises(URLError, o.open, scheme+"://example.com/") 491 492 def test_handled(self): 493 # handler returning non-None means no more handlers will be called 494 o = OpenerDirector() 495 meth_spec = [ 496 ["http_open", "ftp_open", "http_error_302"], 497 ["ftp_open"], 498 [("http_open", "return self")], 499 [("http_open", "return self")], 500 ] 501 handlers = add_ordered_mock_handlers(o, meth_spec) 502 503 req = Request("http://example.com/") 504 r = o.open(req) 505 # Second .http_open() gets called, third doesn't, since second returned 506 # non-None. Handlers without .http_open() never get any methods called 507 # on them. 508 # In fact, second mock handler defining .http_open() returns self 509 # (instead of response), which becomes the OpenerDirector's return 510 # value. 511 self.assertEqual(r, handlers[2]) 512 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] 513 for expected, got in zip(calls, o.calls): 514 handler, name, args, kwds = got 515 self.assertEqual((handler, name), expected) 516 self.assertEqual(args, (req,)) 517 518 def test_handler_order(self): 519 o = OpenerDirector() 520 handlers = [] 521 for meths, handler_order in [ 522 ([("http_open", "return self")], 500), 523 (["http_open"], 0), 524 ]: 525 class MockHandlerSubclass(MockHandler): pass 526 h = MockHandlerSubclass(meths) 527 h.handler_order = handler_order 528 handlers.append(h) 529 o.add_handler(h) 530 531 r = o.open("http://example.com/") 532 # handlers called in reverse order, thanks to their sort order 533 self.assertEqual(o.calls[0][0], handlers[1]) 534 self.assertEqual(o.calls[1][0], handlers[0]) 535 536 def test_raise(self): 537 # raising URLError stops processing of request 538 o = OpenerDirector() 539 meth_spec = [ 540 [("http_open", "raise")], 541 [("http_open", "return self")], 542 ] 543 handlers = add_ordered_mock_handlers(o, meth_spec) 544 545 req = Request("http://example.com/") 546 self.assertRaises(urllib2.URLError, o.open, req) 547 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) 548 549## def test_error(self): 550## # XXX this doesn't actually seem to be used in standard library, 551## # but should really be tested anyway... 552 553 def test_http_error(self): 554 # XXX http_error_default 555 # http errors are a special case 556 o = OpenerDirector() 557 meth_spec = [ 558 [("http_open", "error 302")], 559 [("http_error_400", "raise"), "http_open"], 560 [("http_error_302", "return response"), "http_error_303", 561 "http_error"], 562 [("http_error_302")], 563 ] 564 handlers = add_ordered_mock_handlers(o, meth_spec) 565 566 class Unknown: 567 def __eq__(self, other): return True 568 569 req = Request("http://example.com/") 570 r = o.open(req) 571 assert len(o.calls) == 2 572 calls = [(handlers[0], "http_open", (req,)), 573 (handlers[2], "http_error_302", 574 (req, Unknown(), 302, "", {}))] 575 for expected, got in zip(calls, o.calls): 576 handler, method_name, args = expected 577 self.assertEqual((handler, method_name), got[:2]) 578 self.assertEqual(args, got[2]) 579 580 def test_processors(self): 581 # *_request / *_response methods get called appropriately 582 o = OpenerDirector() 583 meth_spec = [ 584 [("http_request", "return request"), 585 ("http_response", "return response")], 586 [("http_request", "return request"), 587 ("http_response", "return response")], 588 ] 589 handlers = add_ordered_mock_handlers(o, meth_spec) 590 591 req = Request("http://example.com/") 592 r = o.open(req) 593 # processor methods are called on *all* handlers that define them, 594 # not just the first handler that handles the request 595 calls = [ 596 (handlers[0], "http_request"), (handlers[1], "http_request"), 597 (handlers[0], "http_response"), (handlers[1], "http_response")] 598 599 for i, (handler, name, args, kwds) in enumerate(o.calls): 600 if i < 2: 601 # *_request 602 self.assertEqual((handler, name), calls[i]) 603 self.assertEqual(len(args), 1) 604 self.assertIsInstance(args[0], Request) 605 else: 606 # *_response 607 self.assertEqual((handler, name), calls[i]) 608 self.assertEqual(len(args), 2) 609 self.assertIsInstance(args[0], Request) 610 # response from opener.open is None, because there's no 611 # handler that defines http_open to handle it 612 if args[1] is not None: 613 self.assertIsInstance(args[1], MockResponse) 614 615 616def sanepathname2url(path): 617 import urllib 618 urlpath = urllib.pathname2url(path) 619 if os.name == "nt" and urlpath.startswith("///"): 620 urlpath = urlpath[2:] 621 # XXX don't ask me about the mac... 622 return urlpath 623 624class HandlerTests(unittest.TestCase): 625 626 def test_ftp(self): 627 class MockFTPWrapper: 628 def __init__(self, data): self.data = data 629 def retrfile(self, filename, filetype): 630 self.filename, self.filetype = filename, filetype 631 return StringIO.StringIO(self.data), len(self.data) 632 def close(self): pass 633 634 class NullFTPHandler(urllib2.FTPHandler): 635 def __init__(self, data): self.data = data 636 def connect_ftp(self, user, passwd, host, port, dirs, 637 timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 638 self.user, self.passwd = user, passwd 639 self.host, self.port = host, port 640 self.dirs = dirs 641 self.ftpwrapper = MockFTPWrapper(self.data) 642 return self.ftpwrapper 643 644 import ftplib 645 data = "rheum rhaponicum" 646 h = NullFTPHandler(data) 647 o = h.parent = MockOpener() 648 649 for url, host, port, user, passwd, type_, dirs, filename, mimetype in [ 650 ("ftp://localhost/foo/bar/baz.html", 651 "localhost", ftplib.FTP_PORT, "", "", "I", 652 ["foo", "bar"], "baz.html", "text/html"), 653 ("ftp://parrot@localhost/foo/bar/baz.html", 654 "localhost", ftplib.FTP_PORT, "parrot", "", "I", 655 ["foo", "bar"], "baz.html", "text/html"), 656 ("ftp://%25parrot@localhost/foo/bar/baz.html", 657 "localhost", ftplib.FTP_PORT, "%parrot", "", "I", 658 ["foo", "bar"], "baz.html", "text/html"), 659 ("ftp://%2542parrot@localhost/foo/bar/baz.html", 660 "localhost", ftplib.FTP_PORT, "%42parrot", "", "I", 661 ["foo", "bar"], "baz.html", "text/html"), 662 ("ftp://localhost:80/foo/bar/", 663 "localhost", 80, "", "", "D", 664 ["foo", "bar"], "", None), 665 ("ftp://localhost/baz.gif;type=a", 666 "localhost", ftplib.FTP_PORT, "", "", "A", 667 [], "baz.gif", None), # XXX really this should guess image/gif 668 ]: 669 req = Request(url) 670 req.timeout = None 671 r = h.ftp_open(req) 672 # ftp authentication not yet implemented by FTPHandler 673 self.assertEqual(h.user, user) 674 self.assertEqual(h.passwd, passwd) 675 self.assertEqual(h.host, socket.gethostbyname(host)) 676 self.assertEqual(h.port, port) 677 self.assertEqual(h.dirs, dirs) 678 self.assertEqual(h.ftpwrapper.filename, filename) 679 self.assertEqual(h.ftpwrapper.filetype, type_) 680 headers = r.info() 681 self.assertEqual(headers.get("Content-type"), mimetype) 682 self.assertEqual(int(headers["Content-length"]), len(data)) 683 684 def test_file(self): 685 import rfc822, socket 686 h = urllib2.FileHandler() 687 o = h.parent = MockOpener() 688 689 TESTFN = test_support.TESTFN 690 urlpath = sanepathname2url(os.path.abspath(TESTFN)) 691 towrite = "hello, world\n" 692 urls = [ 693 "file://localhost%s" % urlpath, 694 "file://%s" % urlpath, 695 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), 696 ] 697 try: 698 localaddr = socket.gethostbyname(socket.gethostname()) 699 except socket.gaierror: 700 localaddr = '' 701 if localaddr: 702 urls.append("file://%s%s" % (localaddr, urlpath)) 703 704 for url in urls: 705 f = open(TESTFN, "wb") 706 try: 707 try: 708 f.write(towrite) 709 finally: 710 f.close() 711 712 r = h.file_open(Request(url)) 713 try: 714 data = r.read() 715 headers = r.info() 716 respurl = r.geturl() 717 finally: 718 r.close() 719 stats = os.stat(TESTFN) 720 modified = rfc822.formatdate(stats.st_mtime) 721 finally: 722 os.remove(TESTFN) 723 self.assertEqual(data, towrite) 724 self.assertEqual(headers["Content-type"], "text/plain") 725 self.assertEqual(headers["Content-length"], "13") 726 self.assertEqual(headers["Last-modified"], modified) 727 self.assertEqual(respurl, url) 728 729 for url in [ 730 "file://localhost:80%s" % urlpath, 731 "file:///file_does_not_exist.txt", 732 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), 733 os.getcwd(), TESTFN), 734 "file://somerandomhost.ontheinternet.com%s/%s" % 735 (os.getcwd(), TESTFN), 736 ]: 737 try: 738 f = open(TESTFN, "wb") 739 try: 740 f.write(towrite) 741 finally: 742 f.close() 743 744 self.assertRaises(urllib2.URLError, 745 h.file_open, Request(url)) 746 finally: 747 os.remove(TESTFN) 748 749 h = urllib2.FileHandler() 750 o = h.parent = MockOpener() 751 # XXXX why does // mean ftp (and /// mean not ftp!), and where 752 # is file: scheme specified? I think this is really a bug, and 753 # what was intended was to distinguish between URLs like: 754 # file:/blah.txt (a file) 755 # file://localhost/blah.txt (a file) 756 # file:///blah.txt (a file) 757 # file://ftp.example.com/blah.txt (an ftp URL) 758 for url, ftp in [ 759 ("file://ftp.example.com//foo.txt", True), 760 ("file://ftp.example.com///foo.txt", False), 761# XXXX bug: fails with OSError, should be URLError 762 ("file://ftp.example.com/foo.txt", False), 763 ("file://somehost//foo/something.txt", True), 764 ("file://localhost//foo/something.txt", False), 765 ]: 766 req = Request(url) 767 try: 768 h.file_open(req) 769 # XXXX remove OSError when bug fixed 770 except (urllib2.URLError, OSError): 771 self.assertTrue(not ftp) 772 else: 773 self.assertTrue(o.req is req) 774 self.assertEqual(req.type, "ftp") 775 self.assertEqual(req.type == "ftp", ftp) 776 777 def test_http(self): 778 779 h = urllib2.AbstractHTTPHandler() 780 o = h.parent = MockOpener() 781 782 url = "http://example.com/" 783 for method, data in [("GET", None), ("POST", "blah")]: 784 req = Request(url, data, {"Foo": "bar"}) 785 req.timeout = None 786 req.add_unredirected_header("Spam", "eggs") 787 http = MockHTTPClass() 788 r = h.do_open(http, req) 789 790 # result attributes 791 r.read; r.readline # wrapped MockFile methods 792 r.info; r.geturl # addinfourl methods 793 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() 794 hdrs = r.info() 795 hdrs.get; hdrs.has_key # r.info() gives dict from .getreply() 796 self.assertEqual(r.geturl(), url) 797 798 self.assertEqual(http.host, "example.com") 799 self.assertEqual(http.level, 0) 800 self.assertEqual(http.method, method) 801 self.assertEqual(http.selector, "/") 802 self.assertEqual(http.req_headers, 803 [("Connection", "close"), 804 ("Foo", "bar"), ("Spam", "eggs")]) 805 self.assertEqual(http.data, data) 806 807 # check socket.error converted to URLError 808 http.raise_on_endheaders = True 809 self.assertRaises(urllib2.URLError, h.do_open, http, req) 810 811 # check adding of standard headers 812 o.addheaders = [("Spam", "eggs")] 813 for data in "", None: # POST, GET 814 req = Request("http://example.com/", data) 815 r = MockResponse(200, "OK", {}, "") 816 newreq = h.do_request_(req) 817 if data is None: # GET 818 self.assertNotIn("Content-length", req.unredirected_hdrs) 819 self.assertNotIn("Content-type", req.unredirected_hdrs) 820 else: # POST 821 self.assertEqual(req.unredirected_hdrs["Content-length"], "0") 822 self.assertEqual(req.unredirected_hdrs["Content-type"], 823 "application/x-www-form-urlencoded") 824 # XXX the details of Host could be better tested 825 self.assertEqual(req.unredirected_hdrs["Host"], "example.com") 826 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") 827 828 # don't clobber existing headers 829 req.add_unredirected_header("Content-length", "foo") 830 req.add_unredirected_header("Content-type", "bar") 831 req.add_unredirected_header("Host", "baz") 832 req.add_unredirected_header("Spam", "foo") 833 newreq = h.do_request_(req) 834 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo") 835 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar") 836 self.assertEqual(req.unredirected_hdrs["Host"], "baz") 837 self.assertEqual(req.unredirected_hdrs["Spam"], "foo") 838 839 def test_http_doubleslash(self): 840 # Checks that the presence of an unnecessary double slash in a url doesn't break anything 841 # Previously, a double slash directly after the host could cause incorrect parsing of the url 842 h = urllib2.AbstractHTTPHandler() 843 o = h.parent = MockOpener() 844 845 data = "" 846 ds_urls = [ 847 "http://example.com/foo/bar/baz.html", 848 "http://example.com//foo/bar/baz.html", 849 "http://example.com/foo//bar/baz.html", 850 "http://example.com/foo/bar//baz.html", 851 ] 852 853 for ds_url in ds_urls: 854 ds_req = Request(ds_url, data) 855 856 # Check whether host is determined correctly if there is no proxy 857 np_ds_req = h.do_request_(ds_req) 858 self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com") 859 860 # Check whether host is determined correctly if there is a proxy 861 ds_req.set_proxy("someproxy:3128",None) 862 p_ds_req = h.do_request_(ds_req) 863 self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com") 864 865 def test_fixpath_in_weirdurls(self): 866 # Issue4493: urllib2 to supply '/' when to urls where path does not 867 # start with'/' 868 869 h = urllib2.AbstractHTTPHandler() 870 o = h.parent = MockOpener() 871 872 weird_url = 'http://www.python.org?getspam' 873 req = Request(weird_url) 874 newreq = h.do_request_(req) 875 self.assertEqual(newreq.get_host(),'www.python.org') 876 self.assertEqual(newreq.get_selector(),'/?getspam') 877 878 url_without_path = 'http://www.python.org' 879 req = Request(url_without_path) 880 newreq = h.do_request_(req) 881 self.assertEqual(newreq.get_host(),'www.python.org') 882 self.assertEqual(newreq.get_selector(),'') 883 884 def test_errors(self): 885 h = urllib2.HTTPErrorProcessor() 886 o = h.parent = MockOpener() 887 888 url = "http://example.com/" 889 req = Request(url) 890 # all 2xx are passed through 891 r = MockResponse(200, "OK", {}, "", url) 892 newr = h.http_response(req, r) 893 self.assertTrue(r is newr) 894 self.assertTrue(not hasattr(o, "proto")) # o.error not called 895 r = MockResponse(202, "Accepted", {}, "", url) 896 newr = h.http_response(req, r) 897 self.assertTrue(r is newr) 898 self.assertTrue(not hasattr(o, "proto")) # o.error not called 899 r = MockResponse(206, "Partial content", {}, "", url) 900 newr = h.http_response(req, r) 901 self.assertTrue(r is newr) 902 self.assertTrue(not hasattr(o, "proto")) # o.error not called 903 # anything else calls o.error (and MockOpener returns None, here) 904 r = MockResponse(502, "Bad gateway", {}, "", url) 905 self.assertTrue(h.http_response(req, r) is None) 906 self.assertEqual(o.proto, "http") # o.error called 907 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {})) 908 909 def test_cookies(self): 910 cj = MockCookieJar() 911 h = urllib2.HTTPCookieProcessor(cj) 912 o = h.parent = MockOpener() 913 914 req = Request("http://example.com/") 915 r = MockResponse(200, "OK", {}, "") 916 newreq = h.http_request(req) 917 self.assertTrue(cj.ach_req is req is newreq) 918 self.assertEqual(req.get_origin_req_host(), "example.com") 919 self.assertTrue(not req.is_unverifiable()) 920 newr = h.http_response(req, r) 921 self.assertTrue(cj.ec_req is req) 922 self.assertTrue(cj.ec_r is r is newr) 923 924 def test_redirect(self): 925 from_url = "http://example.com/a.html" 926 to_url = "http://example.com/b.html" 927 h = urllib2.HTTPRedirectHandler() 928 o = h.parent = MockOpener() 929 930 # ordinary redirect behaviour 931 for code in 301, 302, 303, 307: 932 for data in None, "blah\nblah\n": 933 method = getattr(h, "http_error_%s" % code) 934 req = Request(from_url, data) 935 req.add_header("Nonsense", "viking=withhold") 936 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 937 if data is not None: 938 req.add_header("Content-Length", str(len(data))) 939 req.add_unredirected_header("Spam", "spam") 940 try: 941 method(req, MockFile(), code, "Blah", 942 MockHeaders({"location": to_url})) 943 except urllib2.HTTPError: 944 # 307 in response to POST requires user OK 945 self.assertEqual(code, 307) 946 self.assertIsNotNone(data) 947 self.assertEqual(o.req.get_full_url(), to_url) 948 try: 949 self.assertEqual(o.req.get_method(), "GET") 950 except AttributeError: 951 self.assertTrue(not o.req.has_data()) 952 953 # now it's a GET, there should not be headers regarding content 954 # (possibly dragged from before being a POST) 955 headers = [x.lower() for x in o.req.headers] 956 self.assertNotIn("content-length", headers) 957 self.assertNotIn("content-type", headers) 958 959 self.assertEqual(o.req.headers["Nonsense"], 960 "viking=withhold") 961 self.assertNotIn("Spam", o.req.headers) 962 self.assertNotIn("Spam", o.req.unredirected_hdrs) 963 964 # loop detection 965 req = Request(from_url) 966 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 967 def redirect(h, req, url=to_url): 968 h.http_error_302(req, MockFile(), 302, "Blah", 969 MockHeaders({"location": url})) 970 # Note that the *original* request shares the same record of 971 # redirections with the sub-requests caused by the redirections. 972 973 # detect infinite loop redirect of a URL to itself 974 req = Request(from_url, origin_req_host="example.com") 975 count = 0 976 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 977 try: 978 while 1: 979 redirect(h, req, "http://example.com/") 980 count = count + 1 981 except urllib2.HTTPError: 982 # don't stop until max_repeats, because cookies may introduce state 983 self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats) 984 985 # detect endless non-repeating chain of redirects 986 req = Request(from_url, origin_req_host="example.com") 987 count = 0 988 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 989 try: 990 while 1: 991 redirect(h, req, "http://example.com/%d" % count) 992 count = count + 1 993 except urllib2.HTTPError: 994 self.assertEqual(count, 995 urllib2.HTTPRedirectHandler.max_redirections) 996 997 def test_invalid_redirect(self): 998 from_url = "http://example.com/a.html" 999 valid_schemes = ['http', 'https', 'ftp'] 1000 invalid_schemes = ['file', 'imap', 'ldap'] 1001 schemeless_url = "example.com/b.html" 1002 h = urllib2.HTTPRedirectHandler() 1003 o = h.parent = MockOpener() 1004 req = Request(from_url) 1005 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1006 1007 for scheme in invalid_schemes: 1008 invalid_url = scheme + '://' + schemeless_url 1009 self.assertRaises(urllib2.HTTPError, h.http_error_302, 1010 req, MockFile(), 302, "Security Loophole", 1011 MockHeaders({"location": invalid_url})) 1012 1013 for scheme in valid_schemes: 1014 valid_url = scheme + '://' + schemeless_url 1015 h.http_error_302(req, MockFile(), 302, "That's fine", 1016 MockHeaders({"location": valid_url})) 1017 self.assertEqual(o.req.get_full_url(), valid_url) 1018 1019 def test_cookie_redirect(self): 1020 # cookies shouldn't leak into redirected requests 1021 from cookielib import CookieJar 1022 1023 from test.test_cookielib import interact_netscape 1024 1025 cj = CookieJar() 1026 interact_netscape(cj, "http://www.example.com/", "spam=eggs") 1027 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") 1028 hdeh = urllib2.HTTPDefaultErrorHandler() 1029 hrh = urllib2.HTTPRedirectHandler() 1030 cp = urllib2.HTTPCookieProcessor(cj) 1031 o = build_test_opener(hh, hdeh, hrh, cp) 1032 o.open("http://www.example.com/") 1033 self.assertTrue(not hh.req.has_header("Cookie")) 1034 1035 def test_redirect_fragment(self): 1036 redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n' 1037 hh = MockHTTPHandler(302, 'Location: ' + redirected_url) 1038 hdeh = urllib2.HTTPDefaultErrorHandler() 1039 hrh = urllib2.HTTPRedirectHandler() 1040 o = build_test_opener(hh, hdeh, hrh) 1041 fp = o.open('http://www.example.com') 1042 self.assertEqual(fp.geturl(), redirected_url.strip()) 1043 1044 def test_redirect_no_path(self): 1045 # Issue 14132: Relative redirect strips original path 1046 real_class = httplib.HTTPConnection 1047 response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n" 1048 httplib.HTTPConnection = test_urllib.fakehttp(response1) 1049 self.addCleanup(setattr, httplib, "HTTPConnection", real_class) 1050 urls = iter(("/path", "/path?query")) 1051 def request(conn, method, url, *pos, **kw): 1052 self.assertEqual(url, next(urls)) 1053 real_class.request(conn, method, url, *pos, **kw) 1054 # Change response for subsequent connection 1055 conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!" 1056 httplib.HTTPConnection.request = request 1057 fp = urllib2.urlopen("http://python.org/path") 1058 self.assertEqual(fp.geturl(), "http://python.org/path?query") 1059 1060 def test_proxy(self): 1061 o = OpenerDirector() 1062 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) 1063 o.add_handler(ph) 1064 meth_spec = [ 1065 [("http_open", "return response")] 1066 ] 1067 handlers = add_ordered_mock_handlers(o, meth_spec) 1068 1069 req = Request("http://acme.example.com/") 1070 self.assertEqual(req.get_host(), "acme.example.com") 1071 r = o.open(req) 1072 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1073 1074 self.assertEqual([(handlers[0], "http_open")], 1075 [tup[0:2] for tup in o.calls]) 1076 1077 def test_proxy_no_proxy(self): 1078 os.environ['no_proxy'] = 'python.org' 1079 o = OpenerDirector() 1080 ph = urllib2.ProxyHandler(dict(http="proxy.example.com")) 1081 o.add_handler(ph) 1082 req = Request("http://www.perl.org/") 1083 self.assertEqual(req.get_host(), "www.perl.org") 1084 r = o.open(req) 1085 self.assertEqual(req.get_host(), "proxy.example.com") 1086 req = Request("http://www.python.org") 1087 self.assertEqual(req.get_host(), "www.python.org") 1088 r = o.open(req) 1089 self.assertEqual(req.get_host(), "www.python.org") 1090 del os.environ['no_proxy'] 1091 1092 1093 def test_proxy_https(self): 1094 o = OpenerDirector() 1095 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128')) 1096 o.add_handler(ph) 1097 meth_spec = [ 1098 [("https_open","return response")] 1099 ] 1100 handlers = add_ordered_mock_handlers(o, meth_spec) 1101 req = Request("https://www.example.com/") 1102 self.assertEqual(req.get_host(), "www.example.com") 1103 r = o.open(req) 1104 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1105 self.assertEqual([(handlers[0], "https_open")], 1106 [tup[0:2] for tup in o.calls]) 1107 1108 def test_proxy_https_proxy_authorization(self): 1109 o = OpenerDirector() 1110 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128')) 1111 o.add_handler(ph) 1112 https_handler = MockHTTPSHandler() 1113 o.add_handler(https_handler) 1114 req = Request("https://www.example.com/") 1115 req.add_header("Proxy-Authorization","FooBar") 1116 req.add_header("User-Agent","Grail") 1117 self.assertEqual(req.get_host(), "www.example.com") 1118 self.assertIsNone(req._tunnel_host) 1119 r = o.open(req) 1120 # Verify Proxy-Authorization gets tunneled to request. 1121 # httpsconn req_headers do not have the Proxy-Authorization header but 1122 # the req will have. 1123 self.assertNotIn(("Proxy-Authorization","FooBar"), 1124 https_handler.httpconn.req_headers) 1125 self.assertIn(("User-Agent","Grail"), 1126 https_handler.httpconn.req_headers) 1127 self.assertIsNotNone(req._tunnel_host) 1128 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1129 self.assertEqual(req.get_header("Proxy-authorization"),"FooBar") 1130 1131 def test_basic_auth(self, quote_char='"'): 1132 opener = OpenerDirector() 1133 password_manager = MockPasswordManager() 1134 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) 1135 realm = "ACME Widget Store" 1136 http_handler = MockHTTPHandler( 1137 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % 1138 (quote_char, realm, quote_char) ) 1139 opener.add_handler(auth_handler) 1140 opener.add_handler(http_handler) 1141 self._test_basic_auth(opener, auth_handler, "Authorization", 1142 realm, http_handler, password_manager, 1143 "http://acme.example.com/protected", 1144 "http://acme.example.com/protected" 1145 ) 1146 1147 def test_basic_auth_with_single_quoted_realm(self): 1148 self.test_basic_auth(quote_char="'") 1149 1150 def test_basic_auth_with_unquoted_realm(self): 1151 opener = OpenerDirector() 1152 password_manager = MockPasswordManager() 1153 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) 1154 realm = "ACME Widget Store" 1155 http_handler = MockHTTPHandler( 1156 401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm) 1157 opener.add_handler(auth_handler) 1158 opener.add_handler(http_handler) 1159 msg = "Basic Auth Realm was unquoted" 1160 with test_support.check_warnings((msg, UserWarning)): 1161 self._test_basic_auth(opener, auth_handler, "Authorization", 1162 realm, http_handler, password_manager, 1163 "http://acme.example.com/protected", 1164 "http://acme.example.com/protected" 1165 ) 1166 1167 1168 def test_proxy_basic_auth(self): 1169 opener = OpenerDirector() 1170 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) 1171 opener.add_handler(ph) 1172 password_manager = MockPasswordManager() 1173 auth_handler = urllib2.ProxyBasicAuthHandler(password_manager) 1174 realm = "ACME Networks" 1175 http_handler = MockHTTPHandler( 1176 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1177 opener.add_handler(auth_handler) 1178 opener.add_handler(http_handler) 1179 self._test_basic_auth(opener, auth_handler, "Proxy-authorization", 1180 realm, http_handler, password_manager, 1181 "http://acme.example.com:3128/protected", 1182 "proxy.example.com:3128", 1183 ) 1184 1185 def test_basic_and_digest_auth_handlers(self): 1186 # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40* 1187 # response (http://python.org/sf/1479302), where it should instead 1188 # return None to allow another handler (especially 1189 # HTTPBasicAuthHandler) to handle the response. 1190 1191 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must 1192 # try digest first (since it's the strongest auth scheme), so we record 1193 # order of calls here to check digest comes first: 1194 class RecordingOpenerDirector(OpenerDirector): 1195 def __init__(self): 1196 OpenerDirector.__init__(self) 1197 self.recorded = [] 1198 def record(self, info): 1199 self.recorded.append(info) 1200 class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler): 1201 def http_error_401(self, *args, **kwds): 1202 self.parent.record("digest") 1203 urllib2.HTTPDigestAuthHandler.http_error_401(self, 1204 *args, **kwds) 1205 class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler): 1206 def http_error_401(self, *args, **kwds): 1207 self.parent.record("basic") 1208 urllib2.HTTPBasicAuthHandler.http_error_401(self, 1209 *args, **kwds) 1210 1211 opener = RecordingOpenerDirector() 1212 password_manager = MockPasswordManager() 1213 digest_handler = TestDigestAuthHandler(password_manager) 1214 basic_handler = TestBasicAuthHandler(password_manager) 1215 realm = "ACME Networks" 1216 http_handler = MockHTTPHandler( 1217 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1218 opener.add_handler(basic_handler) 1219 opener.add_handler(digest_handler) 1220 opener.add_handler(http_handler) 1221 1222 # check basic auth isn't blocked by digest handler failing 1223 self._test_basic_auth(opener, basic_handler, "Authorization", 1224 realm, http_handler, password_manager, 1225 "http://acme.example.com/protected", 1226 "http://acme.example.com/protected", 1227 ) 1228 # check digest was tried before basic (twice, because 1229 # _test_basic_auth called .open() twice) 1230 self.assertEqual(opener.recorded, ["digest", "basic"]*2) 1231 1232 def _test_basic_auth(self, opener, auth_handler, auth_header, 1233 realm, http_handler, password_manager, 1234 request_url, protected_url): 1235 import base64 1236 user, password = "wile", "coyote" 1237 1238 # .add_password() fed through to password manager 1239 auth_handler.add_password(realm, request_url, user, password) 1240 self.assertEqual(realm, password_manager.realm) 1241 self.assertEqual(request_url, password_manager.url) 1242 self.assertEqual(user, password_manager.user) 1243 self.assertEqual(password, password_manager.password) 1244 1245 r = opener.open(request_url) 1246 1247 # should have asked the password manager for the username/password 1248 self.assertEqual(password_manager.target_realm, realm) 1249 self.assertEqual(password_manager.target_url, protected_url) 1250 1251 # expect one request without authorization, then one with 1252 self.assertEqual(len(http_handler.requests), 2) 1253 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1254 userpass = '%s:%s' % (user, password) 1255 auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip() 1256 self.assertEqual(http_handler.requests[1].get_header(auth_header), 1257 auth_hdr_value) 1258 self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header], 1259 auth_hdr_value) 1260 # if the password manager can't find a password, the handler won't 1261 # handle the HTTP auth error 1262 password_manager.user = password_manager.password = None 1263 http_handler.reset() 1264 r = opener.open(request_url) 1265 self.assertEqual(len(http_handler.requests), 1) 1266 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1267 1268class MiscTests(unittest.TestCase, FakeHTTPMixin): 1269 1270 def test_build_opener(self): 1271 class MyHTTPHandler(urllib2.HTTPHandler): pass 1272 class FooHandler(urllib2.BaseHandler): 1273 def foo_open(self): pass 1274 class BarHandler(urllib2.BaseHandler): 1275 def bar_open(self): pass 1276 1277 build_opener = urllib2.build_opener 1278 1279 o = build_opener(FooHandler, BarHandler) 1280 self.opener_has_handler(o, FooHandler) 1281 self.opener_has_handler(o, BarHandler) 1282 1283 # can take a mix of classes and instances 1284 o = build_opener(FooHandler, BarHandler()) 1285 self.opener_has_handler(o, FooHandler) 1286 self.opener_has_handler(o, BarHandler) 1287 1288 # subclasses of default handlers override default handlers 1289 o = build_opener(MyHTTPHandler) 1290 self.opener_has_handler(o, MyHTTPHandler) 1291 1292 # a particular case of overriding: default handlers can be passed 1293 # in explicitly 1294 o = build_opener() 1295 self.opener_has_handler(o, urllib2.HTTPHandler) 1296 o = build_opener(urllib2.HTTPHandler) 1297 self.opener_has_handler(o, urllib2.HTTPHandler) 1298 o = build_opener(urllib2.HTTPHandler()) 1299 self.opener_has_handler(o, urllib2.HTTPHandler) 1300 1301 # Issue2670: multiple handlers sharing the same base class 1302 class MyOtherHTTPHandler(urllib2.HTTPHandler): pass 1303 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) 1304 self.opener_has_handler(o, MyHTTPHandler) 1305 self.opener_has_handler(o, MyOtherHTTPHandler) 1306 1307 def opener_has_handler(self, opener, handler_class): 1308 for h in opener.handlers: 1309 if h.__class__ == handler_class: 1310 break 1311 else: 1312 self.assertTrue(False) 1313 1314 def test_unsupported_algorithm(self): 1315 handler = AbstractDigestAuthHandler() 1316 with self.assertRaises(ValueError) as exc: 1317 handler.get_algorithm_impls('invalid') 1318 self.assertEqual( 1319 str(exc.exception), 1320 "Unsupported digest authentication algorithm 'invalid'" 1321 ) 1322 1323 @unittest.skipUnless(ssl, "ssl module required") 1324 def test_url_path_with_control_char_rejected(self): 1325 for char_no in range(0, 0x21) + range(0x7f, 0x100): 1326 char = chr(char_no) 1327 schemeless_url = "//localhost:7777/test%s/" % char 1328 self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") 1329 try: 1330 # We explicitly test urllib.request.urlopen() instead of the top 1331 # level 'def urlopen()' function defined in this... (quite ugly) 1332 # test suite. They use different url opening codepaths. Plain 1333 # urlopen uses FancyURLOpener which goes via a codepath that 1334 # calls urllib.parse.quote() on the URL which makes all of the 1335 # above attempts at injection within the url _path_ safe. 1336 escaped_char_repr = repr(char).replace('\\', r'\\') 1337 InvalidURL = httplib.InvalidURL 1338 with self.assertRaisesRegexp( 1339 InvalidURL, "contain control.*" + escaped_char_repr): 1340 urllib2.urlopen("http:" + schemeless_url) 1341 with self.assertRaisesRegexp( 1342 InvalidURL, "contain control.*" + escaped_char_repr): 1343 urllib2.urlopen("https:" + schemeless_url) 1344 finally: 1345 self.unfakehttp() 1346 1347 @unittest.skipUnless(ssl, "ssl module required") 1348 def test_url_path_with_newline_header_injection_rejected(self): 1349 self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") 1350 host = "localhost:7777?a=1 HTTP/1.1\r\nX-injected: header\r\nTEST: 123" 1351 schemeless_url = "//" + host + ":8080/test/?test=a" 1352 try: 1353 # We explicitly test urllib2.urlopen() instead of the top 1354 # level 'def urlopen()' function defined in this... (quite ugly) 1355 # test suite. They use different url opening codepaths. Plain 1356 # urlopen uses FancyURLOpener which goes via a codepath that 1357 # calls urllib.parse.quote() on the URL which makes all of the 1358 # above attempts at injection within the url _path_ safe. 1359 InvalidURL = httplib.InvalidURL 1360 with self.assertRaisesRegexp(InvalidURL, 1361 r"contain control.*\\r.*(found at least . .)"): 1362 urllib2.urlopen("http:{}".format(schemeless_url)) 1363 with self.assertRaisesRegexp(InvalidURL, 1364 r"contain control.*\\n"): 1365 urllib2.urlopen("https:{}".format(schemeless_url)) 1366 finally: 1367 self.unfakehttp() 1368 1369 @unittest.skipUnless(ssl, "ssl module required") 1370 def test_url_host_with_control_char_rejected(self): 1371 for char_no in list(range(0, 0x21)) + [0x7f]: 1372 char = chr(char_no) 1373 schemeless_url = "//localhost{}/test/".format(char) 1374 self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") 1375 try: 1376 escaped_char_repr = repr(char).replace('\\', r'\\') 1377 InvalidURL = httplib.InvalidURL 1378 with self.assertRaisesRegexp(InvalidURL, 1379 "contain control.*{}".format(escaped_char_repr)): 1380 urllib2.urlopen("http:{}".format(schemeless_url)) 1381 with self.assertRaisesRegexp(InvalidURL, 1382 "contain control.*{}".format(escaped_char_repr)): 1383 urllib2.urlopen("https:{}".format(schemeless_url)) 1384 finally: 1385 self.unfakehttp() 1386 1387 1388class RequestTests(unittest.TestCase): 1389 1390 def setUp(self): 1391 self.get = urllib2.Request("http://www.python.org/~jeremy/") 1392 self.post = urllib2.Request("http://www.python.org/~jeremy/", 1393 "data", 1394 headers={"X-Test": "test"}) 1395 1396 def test_method(self): 1397 self.assertEqual("POST", self.post.get_method()) 1398 self.assertEqual("GET", self.get.get_method()) 1399 1400 def test_add_data(self): 1401 self.assertTrue(not self.get.has_data()) 1402 self.assertEqual("GET", self.get.get_method()) 1403 self.get.add_data("spam") 1404 self.assertTrue(self.get.has_data()) 1405 self.assertEqual("POST", self.get.get_method()) 1406 1407 def test_get_full_url(self): 1408 self.assertEqual("http://www.python.org/~jeremy/", 1409 self.get.get_full_url()) 1410 1411 def test_selector(self): 1412 self.assertEqual("/~jeremy/", self.get.get_selector()) 1413 req = urllib2.Request("http://www.python.org/") 1414 self.assertEqual("/", req.get_selector()) 1415 1416 def test_get_type(self): 1417 self.assertEqual("http", self.get.get_type()) 1418 1419 def test_get_host(self): 1420 self.assertEqual("www.python.org", self.get.get_host()) 1421 1422 def test_get_host_unquote(self): 1423 req = urllib2.Request("http://www.%70ython.org/") 1424 self.assertEqual("www.python.org", req.get_host()) 1425 1426 def test_proxy(self): 1427 self.assertTrue(not self.get.has_proxy()) 1428 self.get.set_proxy("www.perl.org", "http") 1429 self.assertTrue(self.get.has_proxy()) 1430 self.assertEqual("www.python.org", self.get.get_origin_req_host()) 1431 self.assertEqual("www.perl.org", self.get.get_host()) 1432 1433 def test_wrapped_url(self): 1434 req = Request("<URL:http://www.python.org>") 1435 self.assertEqual("www.python.org", req.get_host()) 1436 1437 def test_url_fragment(self): 1438 req = Request("http://www.python.org/?qs=query#fragment=true") 1439 self.assertEqual("/?qs=query", req.get_selector()) 1440 req = Request("http://www.python.org/#fun=true") 1441 self.assertEqual("/", req.get_selector()) 1442 1443 # Issue 11703: geturl() omits fragment in the original URL. 1444 url = 'http://docs.python.org/library/urllib2.html#OK' 1445 req = Request(url) 1446 self.assertEqual(req.get_full_url(), url) 1447 1448 def test_private_attributes(self): 1449 self.assertFalse(hasattr(self.get, '_Request__r_xxx')) 1450 # Issue #6500: infinite recursion 1451 self.assertFalse(hasattr(self.get, '_Request__r_method')) 1452 1453 def test_HTTPError_interface(self): 1454 """ 1455 Issue 13211 reveals that HTTPError didn't implement the URLError 1456 interface even though HTTPError is a subclass of URLError. 1457 1458 >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None) 1459 >>> assert hasattr(err, 'reason') 1460 >>> err.reason 1461 'something bad happened' 1462 """ 1463 1464 def test_HTTPError_interface_call(self): 1465 """ 1466 Issue 15701= - HTTPError interface has info method available from URLError. 1467 """ 1468 err = urllib2.HTTPError(msg='something bad happened', url=None, 1469 code=None, hdrs='Content-Length:42', fp=None) 1470 self.assertTrue(hasattr(err, 'reason')) 1471 assert hasattr(err, 'reason') 1472 assert hasattr(err, 'info') 1473 assert callable(err.info) 1474 try: 1475 err.info() 1476 except AttributeError: 1477 self.fail("err.info() failed") 1478 self.assertEqual(err.info(), "Content-Length:42") 1479 1480def test_main(verbose=None): 1481 from test import test_urllib2 1482 test_support.run_doctest(test_urllib2, verbose) 1483 test_support.run_doctest(urllib2, verbose) 1484 tests = (TrivialTests, 1485 OpenerDirectorTests, 1486 HandlerTests, 1487 MiscTests, 1488 RequestTests) 1489 test_support.run_unittest(*tests) 1490 1491if __name__ == "__main__": 1492 test_main(verbose=True) 1493