1# vim:fileencoding=utf-8 2"""Tests for urllib2-level functionality. 3 4This is urllib2's tests (most of which came from mechanize originally), plus 5some extra tests added, and modifications from bug fixes and feature additions 6to mechanize. 7""" 8 9# TODO: 10# Request 11# CacheFTPHandler (hard to write) 12# parse_keqv_list, parse_http_list 13 14import os 15import sys 16import unittest 17from io import BytesIO 18 19import mechanize 20 21from mechanize._response import test_response 22from mechanize import HTTPRedirectHandler, \ 23 HTTPEquivProcessor, HTTPRefreshProcessor, \ 24 HTTPCookieProcessor, HTTPRefererProcessor, \ 25 HTTPErrorProcessor, HTTPHandler 26from mechanize import OpenerDirector, build_opener, Request 27from mechanize._urllib2_fork import AbstractHTTPHandler, normalize_url, AbstractBasicAuthHandler 28from mechanize._util import write_file 29 30import mechanize._response 31import mechanize._sockettimeout as _sockettimeout 32import mechanize._testcase 33import mechanize._urllib2_fork 34from mechanize._mechanize import sanepathname2url 35from mechanize.polyglot import create_response_info, iteritems 36 37# from logging import getLogger, DEBUG 38# l = getLogger("mechanize") 39# l.setLevel(DEBUG) 40 41 42class TrivialTests(mechanize._testcase.TestCase): 43 def test_trivial(self): 44 # A couple trivial tests 45 46 self.assertRaises(ValueError, mechanize.urlopen, 'bogus url') 47 48 fname = os.path.join(self.make_temp_dir(), "test.txt") 49 data = b'data' 50 write_file(fname, data) 51 if os.sep == '\\': 52 fname = '/' + fname 53 file_url = "file://" + fname 54 try: 55 f = mechanize.urlopen(file_url) 56 except Exception as e: 57 raise ValueError('Failed to open URL: {} for fname: {} with error: {}'.format(file_url, fname, e)) 58 self.assertEqual(f.read(), data) 59 f.close() 60 61 def test_parse_http_list(self): 62 tests = [('a,b,c', ['a', 'b', 'c']), ( 63 'path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), 64 ('a, b, "c", "d", "e,f", g, h', 65 ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 66 'h']), ('a="b\\"c", d="e\\,f", g="h\\\\i"', 67 ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] 68 for string, list in tests: 69 self.assertEqual( 70 mechanize._urllib2_fork.parse_http_list(string), list) 71 72 def test_parse_authreq(self): 73 for bad in (",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,",): 74 self.assertIsNone(AbstractBasicAuthHandler.rx.search(bad)) 75 76 77def test_request_headers_dict(): 78 """ 79 The Request.headers dictionary is not a documented interface. It should 80 stay that way, because the complete set of headers are only accessible 81 through the .get_header(), .has_header(), .header_items() interface. 82 However, .headers pre-dates those methods, and so real code will be using 83 the dictionary. 84 85 The introduction in 2.4 of those methods was a mistake for the same reason: 86 code that previously saw all (urllib2 user)-provided headers in .headers 87 now sees only a subset (and the function interface is ugly and incomplete). 88 A better change would have been to replace .headers dict with a dict 89 subclass (or UserDict.DictMixin instance?) that preserved the .headers 90 interface and also provided access to the "unredirected" headers. It's 91 probably too late to fix that, though. 92 93 94 Check .capitalize() case normalization: 95 96 >>> url = "http://example.com" 97 >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"] 98 'blah' 99 >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"] 100 'blah' 101 102 Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError, 103 but that could be changed in future. 104 105 """ 106 107 108def test_request_headers_methods(): 109 """ 110 Note the case normalization of header names here, to .capitalize()-case. 111 This should be preserved for backwards-compatibility. (In the HTTP case, 112 normalization to .title()-case is done by urllib2 before sending headers to 113 httplib). 114 115 >>> url = "http://example.com" 116 >>> r = Request(url, headers={"Spam-eggs": "blah"}) 117 >>> r.has_header("Spam-eggs") 118 True 119 >>> r.header_items() 120 [('Spam-eggs', 'blah')] 121 >>> r.add_header("Foo-Bar", "baz") 122 >>> items = r.header_items() 123 >>> items.sort() 124 >>> items 125 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')] 126 127 Note that e.g. r.has_header("spam-EggS") is currently False, and 128 r.get_header("spam-EggS") returns None, but that could be changed in 129 future. 130 131 >>> r.has_header("Not-there") 132 False 133 >>> print r.get_header("Not-there") 134 None 135 >>> r.get_header("Not-there", "default") 136 'default' 137 138 """ 139 140 141def test_password_manager(self): 142 """ 143 >>> mgr = mechanize.HTTPPasswordMgr() 144 >>> add = mgr.add_password 145 >>> add("Some Realm", "http://example.com/", "joe", "password") 146 >>> add("Some Realm", "http://example.com/ni", "ni", "ni") 147 >>> add("c", "http://example.com/foo", "foo", "ni") 148 >>> add("c", "http://example.com/bar", "bar", "nini") 149 >>> add("b", "http://example.com/", "first", "blah") 150 >>> add("b", "http://example.com/", "second", "spam") 151 >>> add("a", "http://example.com", "1", "a") 152 >>> add("Some Realm", "http://c.example.com:3128", "3", "c") 153 >>> add("Some Realm", "d.example.com", "4", "d") 154 >>> add("Some Realm", "e.example.com:3128", "5", "e") 155 156 >>> mgr.find_user_password("Some Realm", "example.com") 157 ('joe', 'password') 158 >>> mgr.find_user_password("Some Realm", "http://example.com") 159 ('joe', 'password') 160 >>> mgr.find_user_password("Some Realm", "http://example.com/") 161 ('joe', 'password') 162 >>> mgr.find_user_password("Some Realm", "http://example.com/spam") 163 ('joe', 'password') 164 >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam") 165 ('joe', 'password') 166 >>> mgr.find_user_password("c", "http://example.com/foo") 167 ('foo', 'ni') 168 >>> mgr.find_user_password("c", "http://example.com/bar") 169 ('bar', 'nini') 170 171 Actually, this is really undefined ATM 172## Currently, we use the highest-level path where more than one match: 173 174## >>> mgr.find_user_password("Some Realm", "http://example.com/ni") 175## ('joe', 'password') 176 177 Use latest add_password() in case of conflict: 178 179 >>> mgr.find_user_password("b", "http://example.com/") 180 ('second', 'spam') 181 182 No special relationship between a.example.com and example.com: 183 184 >>> mgr.find_user_password("a", "http://example.com/") 185 ('1', 'a') 186 >>> mgr.find_user_password("a", "http://a.example.com/") 187 (None, None) 188 189 Ports: 190 191 >>> mgr.find_user_password("Some Realm", "c.example.com") 192 (None, None) 193 >>> mgr.find_user_password("Some Realm", "c.example.com:3128") 194 ('3', 'c') 195 >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128") 196 ('3', 'c') 197 >>> mgr.find_user_password("Some Realm", "d.example.com") 198 ('4', 'd') 199 >>> mgr.find_user_password("Some Realm", "e.example.com:3128") 200 ('5', 'e') 201 202 """ 203 pass 204 205 206def test_password_manager_default_port(self): 207 """ 208 >>> mgr = mechanize.HTTPPasswordMgr() 209 >>> add = mgr.add_password 210 211 The point to note here is that we can't guess the default port if there's 212 no scheme. This applies to both add_password and find_user_password. 213 214 >>> add("f", "http://g.example.com:80", "10", "j") 215 >>> add("g", "http://h.example.com", "11", "k") 216 >>> add("h", "i.example.com:80", "12", "l") 217 >>> add("i", "j.example.com", "13", "m") 218 >>> mgr.find_user_password("f", "g.example.com:100") 219 (None, None) 220 >>> mgr.find_user_password("f", "g.example.com:80") 221 ('10', 'j') 222 >>> mgr.find_user_password("f", "g.example.com") 223 (None, None) 224 >>> mgr.find_user_password("f", "http://g.example.com:100") 225 (None, None) 226 >>> mgr.find_user_password("f", "http://g.example.com:80") 227 ('10', 'j') 228 >>> mgr.find_user_password("f", "http://g.example.com") 229 ('10', 'j') 230 >>> mgr.find_user_password("g", "h.example.com") 231 ('11', 'k') 232 >>> mgr.find_user_password("g", "h.example.com:80") 233 ('11', 'k') 234 >>> mgr.find_user_password("g", "http://h.example.com:80") 235 ('11', 'k') 236 >>> mgr.find_user_password("h", "i.example.com") 237 (None, None) 238 >>> mgr.find_user_password("h", "i.example.com:80") 239 ('12', 'l') 240 >>> mgr.find_user_password("h", "http://i.example.com:80") 241 ('12', 'l') 242 >>> mgr.find_user_password("i", "j.example.com") 243 ('13', 'm') 244 >>> mgr.find_user_password("i", "j.example.com:80") 245 (None, None) 246 >>> mgr.find_user_password("i", "http://j.example.com") 247 ('13', 'm') 248 >>> mgr.find_user_password("i", "http://j.example.com:80") 249 (None, None) 250 251 """ 252 253 254class MockOpener: 255 addheaders = [] 256 finalize_request_headers = None 257 258 def open(self, 259 req, 260 data=None, 261 timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT): 262 self.req, self.data, self.timeout = req, data, timeout 263 264 def error(self, proto, *args): 265 self.proto, self.args = proto, args 266 267 268class MockFile: 269 def read(self, count=None): 270 pass 271 272 def readline(self, count=None): 273 pass 274 275 def close(self): 276 pass 277 278 def __iter__(self): 279 for i in (): 280 yield i 281 282 283def http_message(mapping): 284 """ 285 >>> http_message({"Content-Type": "text/html"}).items() 286 [('content-type', 'text/html')] 287 288 """ 289 f = [] 290 for kv in iteritems(mapping): 291 f.append("%s: %s" % kv) 292 f.append("") 293 msg = "\r\n".join(f) 294 if not isinstance(msg, bytes): 295 msg = msg.encode('iso-8859-1') 296 msg = create_response_info(BytesIO(msg)) 297 return msg 298 299 300class MockResponse(BytesIO): 301 def __init__(self, code, msg, headers, data, url=None): 302 if not isinstance(data, bytes): 303 data = data.encode('utf-8') 304 BytesIO.__init__(self, data) 305 self.code, self.msg, self.headers, self.url = code, msg, headers, url 306 307 def info(self): 308 return self.headers 309 310 def geturl(self): 311 return self.url 312 313 314class MockCookieJar: 315 def add_cookie_header(self, request, unverifiable=False): 316 self.ach_req, self.ach_u = request, unverifiable 317 318 def extract_cookies(self, response, request, unverifiable=False): 319 self.ec_req, self.ec_r, self.ec_u = request, response, unverifiable 320 321 322class FakeMethod: 323 def __init__(self, meth_name, action, handle): 324 self.meth_name = meth_name 325 self.handle = handle 326 self.action = action 327 328 def __call__(self, *args): 329 return self.handle(self.meth_name, self.action, *args) 330 331 332class MockHandler: 333 # useful for testing handler machinery 334 # see add_ordered_mock_handlers() docstring 335 handler_order = 500 336 337 def __init__(self, methods): 338 self._define_methods(methods) 339 340 def _define_methods(self, methods): 341 for spec in methods: 342 if len(spec) == 2: 343 name, action = spec 344 else: 345 name, action = spec, None 346 meth = FakeMethod(name, action, self.handle) 347 setattr(self.__class__, name, meth) 348 349 def handle(self, fn_name, action, *args, **kwds): 350 self.parent.calls.append((self, fn_name, args, kwds)) 351 if action is None: 352 return None 353 elif action == "return self": 354 return self 355 elif action == "return response": 356 res = MockResponse(200, "OK", {}, "") 357 return res 358 elif action == "return request": 359 return Request("http://blah/") 360 elif action.startswith("error"): 361 code = action[action.rfind(" ") + 1:] 362 try: 363 code = int(code) 364 except ValueError: 365 pass 366 res = MockResponse(200, "OK", {}, "") 367 return self.parent.error("http", args[0], res, code, "", {}) 368 elif action == "raise": 369 raise mechanize.URLError("blah") 370 assert False 371 372 def close(self): 373 pass 374 375 def add_parent(self, parent): 376 self.parent = parent 377 self.parent.calls = [] 378 379 def __lt__(self, other): 380 if not hasattr(other, "handler_order"): 381 # Try to preserve the old behavior of having custom classes 382 # inserted after default ones (works only for custom user 383 # classes which are not aware of handler_order). 384 return True 385 return self.handler_order < other.handler_order 386 387 388def add_ordered_mock_handlers(opener, meth_spec): 389 """Create MockHandlers and add them to an OpenerDirector. 390 391 meth_spec: list of lists of tuples and strings defining methods to define 392 on handlers. eg: 393 394 [["http_error", "ftp_open"], ["http_open"]] 395 396 defines methods .http_error() and .ftp_open() on one handler, and 397 .http_open() on another. These methods just record their arguments and 398 return None. Using a tuple instead of a string causes the method to 399 perform some action (see MockHandler.handle()), eg: 400 401 [["http_error"], [("http_open", "return request")]] 402 403 defines .http_error() on one handler (which simply returns None), and 404 .http_open() on another handler, which returns a Request object. 405 406 """ 407 handlers = [] 408 count = 0 409 for meths in meth_spec: 410 411 class MockHandlerSubclass(MockHandler): 412 pass 413 414 h = MockHandlerSubclass(meths) 415 h.handler_order += count 416 h.add_parent(opener) 417 count = count + 1 418 handlers.append(h) 419 opener.add_handler(h) 420 return handlers 421 422 423def build_test_opener(*handler_instances): 424 opener = OpenerDirector() 425 for h in handler_instances: 426 opener.add_handler(h) 427 return opener 428 429 430class MockHTTPHandler(mechanize.BaseHandler): 431 # useful for testing redirections and auth 432 # sends supplied headers and code as first response 433 # sends 200 OK as second response 434 435 def __init__(self, code, headers): 436 self.code = code 437 self.headers = headers 438 self.reset() 439 440 def reset(self): 441 self._count = 0 442 self.requests = [] 443 444 def http_open(self, req): 445 import copy 446 self.requests.append(copy.deepcopy(req)) 447 if self._count == 0: 448 self._count = self._count + 1 449 name = "Not important" 450 msg = create_response_info(BytesIO( 451 self.headers.encode('iso-8859-1'))) 452 return self.parent.error("http", req, 453 test_response(), self.code, name, msg) 454 else: 455 self.req = req 456 return test_response("", [], req.get_full_url()) 457 458 459class MockHTTPResponse: 460 def __init__(self, fp, msg, status, reason): 461 self.fp = fp 462 self.msg = msg 463 self.status = status 464 self.reason = reason 465 466 def read(self): 467 return b'' 468 469 def readinto(self, b): 470 pass 471 472 def close(self): 473 self.fp = None 474 475 476class MockHTTPClass: 477 def __init__(self): 478 self.req_headers = [] 479 self.data = None 480 self.raise_on_endheaders = False 481 self._tunnel_headers = {} 482 483 def __call__(self, host, timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT): 484 self.host = host 485 self.timeout = timeout 486 return self 487 488 def set_debuglevel(self, level): 489 self.level = level 490 491 def set_tunnel(self, host, port=None, headers=None): 492 self._tunnel_host = host 493 self._tunnel_port = port 494 if headers: 495 self._tunnel_headers = headers 496 else: 497 self._tunnel_headers.clear() 498 499 def request(self, method, url, body=None, headers={}): 500 self.method = method 501 self.selector = url 502 self.req_headers += list(iteritems(headers)) 503 self.req_headers.sort() 504 if body: 505 self.data = body 506 if self.raise_on_endheaders: 507 import socket 508 509 raise socket.error() 510 511 def getresponse(self): 512 return MockHTTPResponse(MockFile(), {}, 200, "OK") 513 514 515class MockHTTPSHandler(AbstractHTTPHandler): 516 # Useful for testing the Proxy-Authorization request by verifying the 517 # properties of httpcon 518 httpconn = MockHTTPClass() 519 520 def https_open(self, req): 521 return self.do_open(self.httpconn, req) 522 523 524class OpenerDirectorTests(unittest.TestCase): 525 def test_add_non_handler(self): 526 class NonHandler(object): 527 pass 528 529 self.assertRaises(TypeError, OpenerDirector().add_handler, 530 NonHandler()) 531 532 def test_badly_named_methods(self): 533 # test work-around for three methods that accidentally follow the 534 # naming conventions for handler methods 535 # (*_open() / *_request() / *_response()) 536 537 # These used to call the accidentally-named methods, causing a 538 # TypeError in real code; here, returning self from these mock 539 # methods would either cause no exception, or AttributeError. 540 541 from mechanize import URLError 542 543 o = OpenerDirector() 544 meth_spec = [ 545 [("do_open", "return self"), ("proxy_open", "return self")], 546 [("redirect_request", "return self")], 547 ] 548 add_ordered_mock_handlers(o, meth_spec) 549 o.add_handler(mechanize.UnknownHandler()) 550 for scheme in "do", "proxy", "redirect": 551 self.assertRaises(URLError, o.open, scheme + "://example.com/") 552 553 def test_handled(self): 554 # handler returning non-None means no more handlers will be called 555 o = OpenerDirector() 556 meth_spec = [ 557 ["http_open", "ftp_open", "http_error_302"], 558 ["ftp_open"], 559 [("http_open", "return self")], 560 [("http_open", "return self")], 561 ] 562 handlers = add_ordered_mock_handlers(o, meth_spec) 563 564 req = Request("http://example.com/") 565 r = o.open(req) 566 # Second .http_open() gets called, third doesn't, since second returned 567 # non-None. Handlers without .http_open() never get any methods called 568 # on them. 569 # In fact, second mock handler defining .http_open() returns self 570 # (instead of response), which becomes the OpenerDirector's return 571 # value. 572 self.assertEqual(r, handlers[2]) 573 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] 574 for expected, got in zip(calls, o.calls): 575 handler, name, args, kwds = got 576 self.assertEqual((handler, name), expected) 577 self.assertEqual(args, (req, )) 578 579 def test_reindex_handlers(self): 580 o = OpenerDirector() 581 582 class MockHandler: 583 def add_parent(self, parent): 584 pass 585 586 def close(self): 587 pass 588 589 def __lt__(self, other): 590 return self.handler_order < other.handler_order 591 592 # this first class is here as an obscure regression test for bug 593 # encountered during development: if something manages to get through 594 # to _maybe_reindex_handlers, make sure it's properly removed and 595 # doesn't affect adding of subsequent handlers 596 class NonHandler(MockHandler): 597 handler_order = 1 598 599 class Handler(MockHandler): 600 handler_order = 2 601 602 def http_open(self): 603 pass 604 605 class Processor(MockHandler): 606 handler_order = 3 607 608 def any_response(self): 609 pass 610 611 def http_response(self): 612 pass 613 614 o.add_handler(NonHandler()) 615 h = Handler() 616 o.add_handler(h) 617 p = Processor() 618 o.add_handler(p) 619 o._maybe_reindex_handlers() 620 self.assertEqual(o.handle_open, {"http": [h]}) 621 self.assertEqual(len(list(o.process_response.keys())), 1) 622 self.assertEqual(list(o.process_response["http"]), [p]) 623 self.assertEqual(list(o._any_response), [p]) 624 self.assertEqual(o.handlers, [h, p]) 625 626 def test_handler_order(self): 627 o = OpenerDirector() 628 handlers = [] 629 for meths, handler_order in [ 630 ([("http_open", "return self")], 500), 631 (["http_open"], 0), 632 ]: 633 634 class MockHandlerSubclass(MockHandler): 635 pass 636 637 h = MockHandlerSubclass(meths) 638 h.handler_order = handler_order 639 handlers.append(h) 640 o.add_handler(h) 641 642 o.open("http://example.com/") 643 # handlers called in reverse order, thanks to their sort order 644 self.assertEqual(o.calls[0][0], handlers[1]) 645 self.assertEqual(o.calls[1][0], handlers[0]) 646 647 def test_raise(self): 648 # raising URLError stops processing of request 649 o = OpenerDirector() 650 meth_spec = [ 651 [("http_open", "raise")], 652 [("http_open", "return self")], 653 ] 654 handlers = add_ordered_mock_handlers(o, meth_spec) 655 656 req = Request("http://example.com/") 657 self.assertRaises(mechanize.URLError, o.open, req) 658 self.assertEqual(o.calls, [(handlers[0], "http_open", (req, ), {})]) 659 660# def test_error(self): 661# XXX this doesn't actually seem to be used in standard library, 662# but should really be tested anyway... 663 664 def test_http_error(self): 665 # XXX http_error_default 666 # http errors are a special case 667 o = OpenerDirector() 668 meth_spec = [ 669 [("http_open", "error 302")], 670 [("http_error_400", "raise"), "http_open"], 671 [("http_error_302", "return response"), "http_error_303", 672 "http_error"], 673 [("http_error_302")], 674 ] 675 handlers = add_ordered_mock_handlers(o, meth_spec) 676 677 req = Request("http://example.com/") 678 o.open(req) 679 assert len(o.calls) == 2 680 ignore = object() 681 calls = [(handlers[0], "http_open", (req, )), ( 682 handlers[2], "http_error_302", (req, ignore, 302, "", {}))] 683 for expected, got in zip(calls, o.calls): 684 handler, method_name, args = expected 685 self.assertEqual((handler, method_name), got[:2]) 686 self.assertEqual(len(args), len(got[2])) 687 for a, b in zip(args, got[2]): 688 if a is not ignore: 689 self.assertEqual(a, b) 690 691 def test_http_error_raised(self): 692 # should get an HTTPError if an HTTP handler raises a non-200 response 693 # XXX it worries me that this is the only test that excercises the else 694 # branch in HTTPDefaultErrorHandler 695 from mechanize import _response 696 o = mechanize.OpenerDirector() 697 o.add_handler(mechanize.HTTPErrorProcessor()) 698 o.add_handler(mechanize.HTTPDefaultErrorHandler()) 699 700 class HTTPHandler(AbstractHTTPHandler): 701 def http_open(self, req): 702 return _response.test_response(code=302) 703 704 o.add_handler(HTTPHandler()) 705 self.assertRaises(mechanize.HTTPError, o.open, "http://example.com/") 706 707 def test_processors(self): 708 # *_request / *_response methods get called appropriately 709 o = OpenerDirector() 710 meth_spec = [ 711 [("http_request", "return request"), 712 ("http_response", "return response")], 713 [("http_request", "return request"), 714 ("http_response", "return response")], 715 ] 716 handlers = add_ordered_mock_handlers(o, meth_spec) 717 718 req = Request("http://example.com/") 719 o.open(req) 720 # processor methods are called on *all* handlers that define them, 721 # not just the first handler that handles the request 722 calls = [(handlers[0], "http_request"), (handlers[1], "http_request"), 723 (handlers[0], "http_response"), 724 (handlers[1], "http_response")] 725 726 self.assertEqual(len(o.calls), len(calls)) 727 for i, (handler, name, args, kwds) in enumerate(o.calls): 728 if i < 2: 729 # *_request 730 self.assertEqual((handler, name), calls[i]) 731 self.assertEqual(len(args), 1) 732 self.assertTrue(isinstance(args[0], Request)) 733 else: 734 # *_response 735 self.assertEqual((handler, name), calls[i]) 736 self.assertEqual(len(args), 2) 737 self.assertTrue(isinstance(args[0], Request)) 738 # response from opener.open is None, because there's no 739 # handler that defines http_open to handle it 740 self.assertTrue(args[1] is None or 741 isinstance(args[1], MockResponse)) 742 743 def test_any(self): 744 # XXXXX two handlers case: ordering 745 o = OpenerDirector() 746 meth_spec = [[ 747 ("http_request", "return request"), 748 ("http_response", "return response"), 749 ("ftp_request", "return request"), 750 ("ftp_response", "return response"), 751 ("any_request", "return request"), 752 ("any_response", "return response"), 753 ]] 754 handlers = add_ordered_mock_handlers(o, meth_spec) 755 handler = handlers[0] 756 757 for scheme in ["http", "ftp"]: 758 o.calls = [] 759 req = Request("%s://example.com/" % scheme) 760 o.open(req) 761 762 calls = [ 763 (handler, "any_request"), 764 (handler, ("%s_request" % scheme)), 765 (handler, "any_response"), 766 (handler, ("%s_response" % scheme)), 767 ] 768 self.assertEqual(len(o.calls), len(calls)) 769 for i, ((handler, name, args, kwds), calls) in ( 770 enumerate(zip(o.calls, calls))): 771 if i < 2: 772 # *_request 773 self.assertTrue((handler, name) == calls) 774 self.assertTrue(len(args) == 1) 775 self.assertTrue(isinstance(args[0], Request)) 776 else: 777 # *_response 778 self.assertTrue((handler, name) == calls) 779 self.assertTrue(len(args) == 2) 780 self.assertTrue(isinstance(args[0], Request)) 781 # response from opener.open is None, because there's no 782 # handler that defines http_open to handle it 783 self.assertTrue(args[1] is None or isinstance( 784 args[1], MockResponse)) 785 786 787class MockRobotFileParserClass: 788 def __init__(self): 789 self.calls = [] 790 self._can_fetch = True 791 792 def clear(self): 793 self.calls = [] 794 795 def __call__(self): 796 self.calls.append("__call__") 797 return self 798 799 def set_url(self, url): 800 self.calls.append(("set_url", url)) 801 802 def set_timeout(self, timeout): 803 self.calls.append(("set_timeout", timeout)) 804 805 def set_opener(self, opener): 806 self.calls.append(("set_opener", opener)) 807 808 def read(self): 809 self.calls.append("read") 810 811 def can_fetch(self, ua, url): 812 self.calls.append(("can_fetch", ua, url)) 813 return self._can_fetch 814 815 816class MockPasswordManager: 817 def add_password(self, realm, uri, user, password): 818 self.realm = realm 819 self.url = uri 820 self.user = user 821 self.password = password 822 823 def find_user_password(self, realm, authuri): 824 self.target_realm = realm 825 self.target_url = authuri 826 return self.user, self.password 827 828 829class HandlerTests(mechanize._testcase.TestCase): 830 def test_ftp(self): 831 class MockFTPWrapper: 832 def __init__(self, data): 833 self.data = data 834 835 def retrfile(self, filename, filetype): 836 self.filename, self.filetype = filename, filetype 837 data = self.data if isinstance( 838 self.data, bytes) else self.data.encode('utf-8') 839 return BytesIO(data), len(self.data) 840 841 class NullFTPHandler(mechanize.FTPHandler): 842 def __init__(self, data): 843 self.data = data 844 845 def connect_ftp(self, user, passwd, host, port, dirs, timeout): 846 self.user, self.passwd = user, passwd 847 self.host, self.port = host, port 848 self.dirs = dirs 849 self.timeout = timeout 850 self.ftpwrapper = MockFTPWrapper(self.data) 851 return self.ftpwrapper 852 853 import ftplib 854 import socket 855 data = "rheum rhaponicum" 856 h = NullFTPHandler(data) 857 h.parent = MockOpener() 858 859 for url, host, port, type_, dirs, timeout, filename, mimetype in [ 860 ("ftp://localhost/foo/bar/baz.html", "localhost", ftplib.FTP_PORT, 861 "I", ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, 862 "baz.html", "text/html"), 863 ("ftp://localhost:80/foo/bar/", "localhost", 80, "D", 864 ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "", None), 865 ("ftp://localhost/baz.gif;type=a", "localhost", ftplib.FTP_PORT, 866 "A", [], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "baz.gif", 867 None), # TODO: really this should guess image/gif 868 ]: 869 req = Request(url, timeout=timeout) 870 r = h.ftp_open(req) 871 # ftp authentication not yet implemented by FTPHandler 872 self.assertTrue(h.user == h.passwd == "") 873 self.assertEqual(h.host, socket.gethostbyname(host)) 874 self.assertEqual(h.port, port) 875 self.assertEqual(h.dirs, dirs) 876 if sys.version_info >= (2, 6): 877 self.assertEqual(h.timeout, timeout) 878 self.assertEqual(h.ftpwrapper.filename, filename) 879 self.assertEqual(h.ftpwrapper.filetype, type_) 880 headers = r.info() 881 self.assertEqual(headers.get("Content-type"), mimetype) 882 self.assertEqual(int(headers["Content-length"]), len(data)) 883 884 def test_file(self): 885 from email.utils import formatdate 886 import socket 887 h = mechanize.FileHandler() 888 o = h.parent = MockOpener() 889 890 temp_file = os.path.join(self.make_temp_dir(), "test.txt") 891 urlpath = sanepathname2url(os.path.abspath(temp_file)) 892 towrite = b"hello, world\n" 893 try: 894 fqdn = socket.gethostbyname(socket.gethostname()) 895 except socket.gaierror: 896 fqdn = "localhost" 897 for url in [ 898 "file://localhost%s" % urlpath, "file://%s" % urlpath, 899 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), 900 "file://%s%s" % (fqdn, urlpath) 901 ]: 902 write_file(temp_file, towrite) 903 r = h.file_open(Request(url)) 904 try: 905 data = r.read() 906 headers = r.info() 907 r.geturl() 908 finally: 909 r.close() 910 stats = os.stat(temp_file) 911 modified = formatdate(stats.st_mtime, usegmt=True) 912 self.assertEqual(data, towrite) 913 self.assertEqual(headers["Content-type"], "text/plain") 914 self.assertEqual(headers["Content-length"], "13") 915 self.assertEqual(headers["Last-modified"], modified) 916 917 for url in [ 918 "file://localhost:80%s" % urlpath, 919 "file:///file_does_not_exist.txt", 920 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), 921 sanepathname2url(os.getcwd()), 922 temp_file), 923 "file://somerandomhost.ontheinternet.com%s/%s" % ( 924 sanepathname2url(os.getcwd()), temp_file), 925 ]: 926 write_file(temp_file, towrite) 927 self.assertRaises(mechanize.URLError, h.file_open, Request(url)) 928 929 h = mechanize.FileHandler() 930 o = h.parent = MockOpener() 931 # XXXX why does // mean ftp (and /// mean not ftp!), and where 932 # is file: scheme specified? I think this is really a bug, and 933 # what was intended was to distinguish between URLs like: 934 # file:/blah.txt (a file) 935 # file://localhost/blah.txt (a file) 936 # file:///blah.txt (a file) 937 # file://ftp.example.com/blah.txt (an ftp URL) 938 for url, ftp in [ 939 ("file://ftp.example.com//foo.txt", True), 940 ("file://ftp.example.com///foo.txt", False), 941 # XXXX bug: fails with OSError, should be URLError 942 ("file://ftp.example.com/foo.txt", False), 943 ]: 944 req = Request(url) 945 try: 946 h.file_open(req) 947 # XXXX remove OSError when bug fixed 948 except (mechanize.URLError, OSError): 949 self.assertFalse(ftp) 950 else: 951 self.assertTrue(o.req is req) 952 self.assertEqual(req.type, "ftp") 953 954 def test_http(self): 955 class MockHTTPResponse: 956 def __init__(self, fp, msg, status, reason): 957 self.fp = fp 958 self.msg = msg 959 self.status = status 960 self.reason = reason 961 962 def read(self): 963 return b'' 964 965 def readinto(self, b): 966 pass 967 968 def close(self): 969 self.fp = None 970 971 class MockHTTPClass: 972 def __init__(self): 973 self.req_headers = [] 974 self.data = None 975 self.raise_on_endheaders = False 976 977 def __call__(self, 978 host, 979 timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT): 980 self.host = host 981 self.timeout = timeout 982 return self 983 984 def set_debuglevel(self, level): 985 self.level = level 986 987 def request(self, method, url, body=None, headers={}): 988 self.method = method 989 self.selector = url 990 self.req_headers += list(iteritems(headers)) 991 if body: 992 self.data = body 993 if self.raise_on_endheaders: 994 import socket 995 raise socket.error() 996 997 def getresponse(self): 998 return MockHTTPResponse(MockFile(), {}, 200, "OK") 999 1000 h = AbstractHTTPHandler() 1001 o = h.parent = MockOpener() 1002 1003 url = "http://example.com/" 1004 for method, data in [("GET", None), ("POST", "blah")]: 1005 req = Request(url, data, {"Foo": "bar"}) 1006 req.add_header('Order', '1') 1007 req.add_unredirected_header("Spam", "eggs") 1008 http = MockHTTPClass() 1009 r = h.do_open(http, req) 1010 1011 # result attributes 1012 r.read 1013 r.readline # wrapped MockFile methods 1014 r.info 1015 r.geturl # addinfourl methods 1016 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() 1017 hdrs = r.info() 1018 hdrs.get 1019 hdrs.__contains__ # r.info() gives dict from .getreply() 1020 self.assertEqual(r.geturl(), url) 1021 1022 self.assertEqual(http.host, "example.com") 1023 self.assertEqual(http.level, 0) 1024 self.assertEqual(http.method, method) 1025 self.assertEqual(http.selector, "/") 1026 self.assertEqual( 1027 http.req_headers, 1028 [('Foo', 'bar'), ('Order', '1'), 1029 ('Spam', 'eggs'), ('Connection', 'close')] 1030 ) 1031 self.assertEqual(http.data, data) 1032 1033 # check socket.error converted to URLError 1034 http.raise_on_endheaders = True 1035 self.assertRaises(mechanize.URLError, h.do_open, http, req) 1036 1037 # check adding of standard headers 1038 o.addheaders = [("Spam", "eggs")] 1039 for data in "", None: # POST, GET 1040 req = Request("http://example.com/", data) 1041 r = MockResponse(200, "OK", {}, "") 1042 h.do_request_(req) 1043 if data is None: # GET 1044 self.assertTrue("Content-length" not in req.unredirected_hdrs) 1045 self.assertTrue("Content-type" not in req.unredirected_hdrs) 1046 else: # POST 1047 self.assertEqual(req.unredirected_hdrs["Content-Length"], "0") 1048 self.assertEqual(req.unredirected_hdrs["Content-Type"], 1049 "application/x-www-form-urlencoded") 1050 # XXX the details of Host could be better tested 1051 self.assertEqual(req.unredirected_hdrs["Host"], "example.com") 1052 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") 1053 1054 # don't clobber existing headers 1055 req.add_unredirected_header("Content-Length", "foo") 1056 req.add_unredirected_header("Content-Type", "bar") 1057 req.add_unredirected_header("Host", "baz") 1058 req.add_unredirected_header("Spam", "foo") 1059 h.do_request_(req) 1060 self.assertEqual(req.unredirected_hdrs["Content-Length"], "foo") 1061 self.assertEqual(req.unredirected_hdrs["Content-Type"], "bar") 1062 self.assertEqual(req.unredirected_hdrs["Host"], "baz") 1063 self.assertEqual(req.unredirected_hdrs["Spam"], "foo") 1064 1065 def test_http_double_slash(self): 1066 # Checks that the presence of an unnecessary double slash in a url 1067 # doesn't break anything Previously, a double slash directly after the 1068 # host could cause incorrect parsing of the url 1069 h = AbstractHTTPHandler() 1070 h.parent = MockOpener() 1071 1072 data = "" 1073 ds_urls = [ 1074 "http://example.com/foo/bar/baz.html", 1075 "http://example.com//foo/bar/baz.html", 1076 "http://example.com/foo//bar/baz.html", 1077 "http://example.com/foo/bar//baz.html", 1078 ] 1079 1080 for ds_url in ds_urls: 1081 ds_req = Request(ds_url, data) 1082 1083 # Check whether host is determined correctly if there is no proxy 1084 np_ds_req = h.do_request_(ds_req) 1085 self.assertEqual(np_ds_req.unredirected_hdrs["Host"], 1086 "example.com") 1087 1088 # Check whether host is determined correctly if there is a proxy 1089 ds_req.set_proxy("someproxy:3128", None) 1090 p_ds_req = h.do_request_(ds_req) 1091 self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com") 1092 1093 def test_errors(self): 1094 h = HTTPErrorProcessor() 1095 o = h.parent = MockOpener() 1096 1097 req = Request("http://example.com") 1098 # all 2xx are passed through 1099 r = mechanize._response.test_response() 1100 newr = h.http_response(req, r) 1101 self.assertTrue(r is newr) 1102 self.assertTrue(not hasattr(o, "proto")) # o.error not called 1103 r = mechanize._response.test_response(code=202, msg="Accepted") 1104 newr = h.http_response(req, r) 1105 self.assertTrue(r is newr) 1106 self.assertTrue(not hasattr(o, "proto")) # o.error not called 1107 r = mechanize._response.test_response(code=206, msg="Partial content") 1108 newr = h.http_response(req, r) 1109 self.assertTrue(r is newr) 1110 self.assertTrue(not hasattr(o, "proto")) # o.error not called 1111 # anything else calls o.error (and MockOpener returns None, here) 1112 r = mechanize._response.test_response(code=502, msg="Bad gateway") 1113 self.assertTrue(h.http_response(req, r) is None) 1114 self.assertEqual(o.proto, "http") # o.error called 1115 self.assertEqual(o.args[:4], (req, r, 502, "Bad gateway")) 1116 1117 def test_referer(self): 1118 h = HTTPRefererProcessor() 1119 h.parent = MockOpener() 1120 1121 # normal case 1122 url = "http://example.com/" 1123 req = Request(url) 1124 r = MockResponse(200, "OK", {}, "", url) 1125 newr = h.http_response(req, r) 1126 self.assertTrue(r is newr) 1127 self.assertTrue(h.referer == url) 1128 newreq = h.http_request(req) 1129 self.assertTrue(req is newreq) 1130 self.assertTrue(req.unredirected_hdrs["Referer"] == url) 1131 # don't clobber existing Referer 1132 ref = "http://set.by.user.com/" 1133 req.add_unredirected_header("Referer", ref) 1134 newreq = h.http_request(req) 1135 self.assertTrue(req is newreq) 1136 self.assertTrue(req.unredirected_hdrs["Referer"] == ref) 1137 1138 def test_raise_http_errors(self): 1139 # HTTPDefaultErrorHandler should raise HTTPError if no error handler 1140 # handled the error response 1141 from mechanize import _response 1142 h = mechanize.HTTPDefaultErrorHandler() 1143 1144 url = "http://example.com" 1145 code = 500 1146 msg = "Error" 1147 request = mechanize.Request(url) 1148 response = _response.test_response(url=url, code=code, msg=msg) 1149 1150 # case 1. it's not an HTTPError 1151 try: 1152 h.http_error_default(request, response, code, msg, response.info()) 1153 except mechanize.HTTPError as exc: 1154 self.assertTrue(exc is not response) 1155 self.assertTrue(exc.fp is response) 1156 else: 1157 self.assertTrue(False) 1158 1159 # case 2. response object is already an HTTPError, so just re-raise it 1160 error = mechanize.HTTPError(url, code, msg, "fake headers", response) 1161 try: 1162 h.http_error_default(request, error, code, msg, error.info()) 1163 except mechanize.HTTPError as exc: 1164 self.assertTrue(exc is error) 1165 else: 1166 self.assertTrue(False) 1167 1168 def test_robots(self): 1169 # XXX useragent 1170 from mechanize import HTTPRobotRulesProcessor 1171 opener = OpenerDirector() 1172 rfpc = MockRobotFileParserClass() 1173 h = HTTPRobotRulesProcessor(rfpc) 1174 opener.add_handler(h) 1175 1176 url = "http://example.com:80/foo/bar.html" 1177 req = Request(url) 1178 # first time: initialise and set up robots.txt parser before checking 1179 # whether OK to fetch URL 1180 h.http_request(req) 1181 self.assertEqual(rfpc.calls, [ 1182 "__call__", 1183 ("set_opener", opener), 1184 ("set_url", "http://example.com:80/robots.txt"), 1185 ("set_timeout", _sockettimeout._GLOBAL_DEFAULT_TIMEOUT), 1186 "read", 1187 ("can_fetch", "", url), 1188 ]) 1189 # second time: just use existing parser 1190 rfpc.clear() 1191 req = Request(url) 1192 h.http_request(req) 1193 self.assertTrue(rfpc.calls == [ 1194 ("can_fetch", "", url), 1195 ]) 1196 # different URL on same server: same again 1197 rfpc.clear() 1198 url = "http://example.com:80/blah.html" 1199 req = Request(url) 1200 h.http_request(req) 1201 self.assertTrue(rfpc.calls == [ 1202 ("can_fetch", "", url), 1203 ]) 1204 # disallowed URL 1205 rfpc.clear() 1206 rfpc._can_fetch = False 1207 url = "http://example.com:80/rhubarb.html" 1208 req = Request(url) 1209 try: 1210 h.http_request(req) 1211 except mechanize.HTTPError as e: 1212 self.assertTrue(e.request == req) 1213 self.assertTrue(e.code == 403) 1214 # new host: reload robots.txt (even though the host and port are 1215 # unchanged, we treat this as a new host because 1216 # "example.com" != "example.com:80") 1217 rfpc.clear() 1218 rfpc._can_fetch = True 1219 url = "http://example.com/rhubarb.html" 1220 req = Request(url) 1221 h.http_request(req) 1222 self.assertEqual(rfpc.calls, [ 1223 "__call__", 1224 ("set_opener", opener), 1225 ("set_url", "http://example.com/robots.txt"), 1226 ("set_timeout", _sockettimeout._GLOBAL_DEFAULT_TIMEOUT), 1227 "read", 1228 ("can_fetch", "", url), 1229 ]) 1230 # https url -> should fetch robots.txt from https url too 1231 rfpc.clear() 1232 url = "https://example.org/rhubarb.html" 1233 req = Request(url) 1234 h.http_request(req) 1235 self.assertEqual(rfpc.calls, [ 1236 "__call__", 1237 ("set_opener", opener), 1238 ("set_url", "https://example.org/robots.txt"), 1239 ("set_timeout", _sockettimeout._GLOBAL_DEFAULT_TIMEOUT), 1240 "read", 1241 ("can_fetch", "", url), 1242 ]) 1243 # non-HTTP URL -> ignore robots.txt 1244 rfpc.clear() 1245 url = "ftp://example.com/" 1246 req = Request(url) 1247 h.http_request(req) 1248 self.assertTrue(rfpc.calls == []) 1249 1250 def test_redirected_robots_txt(self): 1251 # redirected robots.txt fetch shouldn't result in another attempted 1252 # robots.txt fetch to check the redirection is allowed! 1253 import mechanize 1254 from mechanize import ( 1255 HTTPDefaultErrorHandler, HTTPRedirectHandler, 1256 HTTPRobotRulesProcessor) 1257 1258 class MockHTTPHandler(mechanize.BaseHandler): 1259 def __init__(self): 1260 self.requests = [] 1261 1262 def http_open(self, req): 1263 import copy 1264 self.requests.append(copy.deepcopy(req)) 1265 if req.get_full_url() == "http://example.com/robots.txt": 1266 hdr = b"Location: http://example.com/en/robots.txt\r\n\r\n" 1267 msg = create_response_info(BytesIO(hdr)) 1268 return self.parent.error("http", req, 1269 test_response(), 302, "Blah", msg) 1270 else: 1271 return test_response("Allow: *", [], req.get_full_url()) 1272 1273 hh = MockHTTPHandler() 1274 hdeh = HTTPDefaultErrorHandler() 1275 hrh = HTTPRedirectHandler() 1276 rh = HTTPRobotRulesProcessor() 1277 o = build_test_opener(hh, hdeh, hrh, rh) 1278 o.open("http://example.com/") 1279 self.assertEqual([req.get_full_url() for req in hh.requests], [ 1280 "http://example.com/robots.txt", 1281 "http://example.com/en/robots.txt", 1282 "http://example.com/", 1283 ]) 1284 1285 def test_cookies(self): 1286 cj = MockCookieJar() 1287 h = HTTPCookieProcessor(cj) 1288 h.parent = MockOpener() 1289 1290 req = Request("http://example.com/") 1291 r = MockResponse(200, "OK", {}, "") 1292 newreq = h.http_request(req) 1293 self.assertTrue(cj.ach_req is req is newreq) 1294 self.assertEqual(req.get_origin_req_host(), "example.com") 1295 self.assertFalse(cj.ach_u) 1296 newr = h.http_response(req, r) 1297 self.assertTrue(cj.ec_req is req) 1298 self.assertTrue(cj.ec_r is r is newr) 1299 self.assertFalse(cj.ec_u) 1300 1301 def test_http_equiv(self): 1302 h = HTTPEquivProcessor() 1303 h.parent = MockOpener() 1304 1305 data = ('<html><HEad>' 1306 '<Meta httP-equiv="RefResh" coNtent="spam&Eggs">' 1307 '</Head></html>') 1308 headers = [ 1309 ("Foo", "Bar"), 1310 ("Content-type", "text/html"), 1311 ("Refresh", "blah"), 1312 ] 1313 url = "http://example.com/" 1314 req = Request(url) 1315 r = mechanize._response.make_response(data, headers, url, 200, "OK") 1316 newr = h.http_response(req, r) 1317 1318 new_headers = newr.info() 1319 self.assertEqual(new_headers["Foo"], "Bar") 1320 self.assertEqual(new_headers["Refresh"], "spam&Eggs") 1321 self.assertEqual( 1322 new_headers.getheaders("Refresh"), ["blah", "spam&Eggs"]) 1323 1324 def test_refresh(self): 1325 # XXX test processor constructor optional args 1326 h = HTTPRefreshProcessor(max_time=None, honor_time=False) 1327 1328 for val, valid in [ 1329 ('0; url="http://example.com/foo/"', True), 1330 ("2", True), 1331 # in the past, this failed with UnboundLocalError 1332 ('0; "http://example.com/foo/"', False), 1333 ]: 1334 o = h.parent = MockOpener() 1335 req = Request("http://example.com/") 1336 headers = http_message({"refresh": val}) 1337 r = MockResponse(200, "OK", headers, "", "http://example.com/") 1338 h.http_response(req, r) 1339 if valid: 1340 self.assertEqual(o.proto, "http") 1341 self.assertEqual(o.args, (req, r, "refresh", "OK", headers)) 1342 1343 def test_refresh_honor_time(self): 1344 class SleepTester: 1345 def __init__(self, test, seconds): 1346 self._test = test 1347 if seconds == 0: 1348 seconds = None # don't expect a sleep for 0 seconds 1349 self._expected = seconds 1350 self._got = None 1351 1352 def sleep(self, seconds): 1353 self._got = seconds 1354 1355 def verify(self): 1356 self._test.assertEqual(self._expected, self._got) 1357 1358 class Opener: 1359 called = False 1360 1361 def error(self, *args, **kwds): 1362 self.called = True 1363 1364 def test(rp, header, refresh_after): 1365 expect_refresh = refresh_after is not None 1366 opener = Opener() 1367 rp.parent = opener 1368 st = SleepTester(self, refresh_after) 1369 rp._sleep = st.sleep 1370 rp.http_response( 1371 Request("http://example.com"), 1372 test_response(headers=[("Refresh", header)], url="http://example.com/"), ) 1373 self.assertEqual(expect_refresh, opener.called) 1374 st.verify() 1375 1376 # by default, only zero-time refreshes are honoured 1377 test(HTTPRefreshProcessor(), "0", 0) 1378 test(HTTPRefreshProcessor(), "2", None) 1379 1380 # if requested, more than zero seconds are allowed 1381 test(HTTPRefreshProcessor(max_time=None), "2", 2) 1382 test(HTTPRefreshProcessor(max_time=30), "2", 2) 1383 1384 # no sleep if we don't "honor_time" 1385 test(HTTPRefreshProcessor(max_time=30, honor_time=False), "2", 0) 1386 1387 # request for too-long wait before refreshing --> no refresh occurs 1388 test(HTTPRefreshProcessor(max_time=30), "60", None) 1389 1390 def test_redirect(self): 1391 from_url = "http://example.com/a.html" 1392 to_url = "http://example.com/b.html" 1393 h = HTTPRedirectHandler() 1394 o = h.parent = MockOpener() 1395 1396 # ordinary redirect behaviour 1397 for code in 301, 302, 303, 307, "refresh": 1398 for data in None, "blah\nblah\n": 1399 method = getattr(h, "http_error_%s" % code) 1400 req = Request(from_url, data) 1401 req.add_header("Nonsense", "viking=withhold") 1402 req.add_unredirected_header("Spam", "spam") 1403 req.origin_req_host = "example.com" # XXX 1404 try: 1405 method(req, 1406 MockFile(), code, "Blah", 1407 http_message({ 1408 "location": to_url 1409 })) 1410 except mechanize.HTTPError: 1411 # 307 in response to POST requires user OK 1412 self.assertEqual(code, 307) 1413 self.assertTrue(data is not None) 1414 self.assertEqual(o.req.get_full_url(), to_url) 1415 try: 1416 self.assertEqual(o.req.get_method(), "GET") 1417 except AttributeError: 1418 self.assertFalse(o.req.has_data()) 1419 1420 # now it's a GET, there should not be headers regarding content 1421 # (possibly dragged from before being a POST) 1422 headers = [x.lower() for x in o.req.headers] 1423 self.assertTrue("content-length" not in headers) 1424 self.assertTrue("content-type" not in headers) 1425 1426 self.assertEqual(o.req.headers["Nonsense"], "viking=withhold") 1427 self.assertTrue("Spam" not in o.req.headers) 1428 self.assertTrue("Spam" not in o.req.unredirected_hdrs) 1429 1430 # loop detection 1431 req = Request(from_url) 1432 1433 def redirect(h, req, url=to_url): 1434 h.http_error_302(req, 1435 MockFile(), 302, "Blah", 1436 http_message({ 1437 "location": url 1438 })) 1439 1440 # Note that the *original* request shares the same record of 1441 # redirections with the sub-requests caused by the redirections. 1442 1443 # detect infinite loop redirect of a URL to itself 1444 req = Request(from_url, origin_req_host="example.com") 1445 count = 0 1446 try: 1447 while 1: 1448 redirect(h, req, "http://example.com/") 1449 count = count + 1 1450 except mechanize.HTTPError: 1451 # don't stop until max_repeats, because cookies may introduce state 1452 self.assertEqual(count, HTTPRedirectHandler.max_repeats) 1453 1454 # detect endless non-repeating chain of redirects 1455 req = Request(from_url, origin_req_host="example.com") 1456 count = 0 1457 try: 1458 while 1: 1459 redirect(h, req, "http://example.com/%d" % count) 1460 count = count + 1 1461 except mechanize.HTTPError: 1462 self.assertEqual(count, HTTPRedirectHandler.max_redirections) 1463 1464 def test_redirect_bad_uri(self): 1465 # bad URIs should be cleaned up before redirection 1466 from mechanize._response import test_html_response 1467 from_url = "http://example.com/a.html" 1468 bad_to_url = "http://example.com/b. |html" 1469 good_to_url = "http://example.com/b.%20%7Chtml" 1470 1471 h = HTTPRedirectHandler() 1472 o = h.parent = MockOpener() 1473 1474 req = Request(from_url) 1475 h.http_error_302( 1476 req, 1477 test_html_response(), 1478 302, 1479 "Blah", 1480 http_message({ 1481 "location": bad_to_url 1482 }), ) 1483 self.assertEqual(o.req.get_full_url(), good_to_url) 1484 1485 def test_refresh_bad_uri(self): 1486 # bad URIs should be cleaned up before redirection 1487 from mechanize._response import test_html_response 1488 bad_to_url = "http://example.com/b. |html" 1489 good_to_url = "http://example.com/b.%20%7Chtml" 1490 1491 h = HTTPRefreshProcessor(max_time=None, honor_time=False) 1492 o = h.parent = MockOpener() 1493 1494 req = Request("http://example.com/") 1495 r = test_html_response( 1496 headers=[("refresh", '0; url="%s"' % bad_to_url)]) 1497 h.http_response(req, r) 1498 headers = o.args[-1] 1499 self.assertEqual(headers["Location"], good_to_url) 1500 1501 def test_cookie_redirect(self): 1502 # cookies shouldn't leak into redirected requests 1503 from mechanize import ( 1504 CookieJar, HTTPCookieProcessor, HTTPDefaultErrorHandler, 1505 HTTPRedirectHandler) 1506 1507 from test.test_cookies import interact_netscape 1508 1509 cj = CookieJar() 1510 interact_netscape(cj, "http://www.example.com/", "spam=eggs") 1511 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") 1512 hdeh = HTTPDefaultErrorHandler() 1513 hrh = HTTPRedirectHandler() 1514 cp = HTTPCookieProcessor(cj) 1515 o = build_test_opener(hh, hdeh, hrh, cp) 1516 o.open("http://www.example.com/") 1517 self.assertFalse(hh.req.has_header("Cookie")) 1518 1519 def test_proxy(self): 1520 o = OpenerDirector() 1521 ph = mechanize.ProxyHandler(dict(http="proxy.example.com:3128")) 1522 o.add_handler(ph) 1523 meth_spec = [[("http_open", "return response")]] 1524 handlers = add_ordered_mock_handlers(o, meth_spec) 1525 1526 o._maybe_reindex_handlers() 1527 1528 req = Request("http://acme.example.com/") 1529 self.assertEqual(req.get_host(), "acme.example.com") 1530 o.open(req) 1531 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1532 1533 self.assertEqual([(handlers[0], "http_open")], 1534 [tup[0:2] for tup in o.calls]) 1535 1536 def test_proxy_no_proxy(self): 1537 self.monkey_patch_environ("no_proxy", "python.org") 1538 o = OpenerDirector() 1539 ph = mechanize.ProxyHandler(dict(http="proxy.example.com")) 1540 o.add_handler(ph) 1541 req = Request("http://www.perl.org/") 1542 self.assertEqual(req.get_host(), "www.perl.org") 1543 o.open(req) 1544 self.assertEqual(req.get_host(), "proxy.example.com") 1545 req = Request("http://www.python.org") 1546 self.assertEqual(req.get_host(), "www.python.org") 1547 o.open(req) 1548 if sys.version_info >= (2, 6): 1549 # no_proxy environment variable not supported in python 2.5 1550 self.assertEqual(req.get_host(), "www.python.org") 1551 1552 def test_proxy_custom_proxy_bypass(self): 1553 self.monkey_patch_environ("no_proxy", 1554 mechanize._testcase.MonkeyPatcher.Unset) 1555 1556 def proxy_bypass(hostname): 1557 return hostname == "noproxy.com" 1558 1559 o = OpenerDirector() 1560 ph = mechanize.ProxyHandler( 1561 dict(http="proxy.example.com"), proxy_bypass=proxy_bypass) 1562 1563 def is_proxied(url): 1564 o.add_handler(ph) 1565 req = Request(url) 1566 o.open(req) 1567 return req.has_proxy() 1568 1569 self.assertTrue(is_proxied("http://example.com")) 1570 self.assertFalse(is_proxied("http://noproxy.com")) 1571 1572 def test_proxy_https(self): 1573 o = OpenerDirector() 1574 ph = mechanize.ProxyHandler(dict(https='proxy.example.com:3128')) 1575 o.add_handler(ph) 1576 meth_spec = [[("https_open", "return response")]] 1577 handlers = add_ordered_mock_handlers(o, meth_spec) 1578 req = Request("https://www.example.com/") 1579 self.assertEqual(req.get_host(), "www.example.com") 1580 o.open(req) 1581 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1582 self.assertEqual([(handlers[0], "https_open")], 1583 [tup[0:2] for tup in o.calls]) 1584 1585 def test_basic_auth(self, quote_char='"'): 1586 opener = OpenerDirector() 1587 password_manager = MockPasswordManager() 1588 auth_handler = mechanize.HTTPBasicAuthHandler(password_manager) 1589 realm = "ACME Widget Store" 1590 http_handler = MockHTTPHandler( 1591 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % 1592 (quote_char, realm, quote_char)) 1593 opener.add_handler(auth_handler) 1594 opener.add_handler(http_handler) 1595 self._test_basic_auth( 1596 opener, 1597 auth_handler, 1598 "Authorization", 1599 realm, 1600 http_handler, 1601 password_manager, 1602 "http://acme.example.com/protected", 1603 "http://acme.example.com/protected", ) 1604 1605 def test_basic_auth_with_single_quoted_realm(self): 1606 self.test_basic_auth(quote_char="'") 1607 1608 def test_proxy_basic_auth(self): 1609 opener = OpenerDirector() 1610 ph = mechanize.ProxyHandler(dict(http="proxy.example.com:3128")) 1611 opener.add_handler(ph) 1612 password_manager = MockPasswordManager() 1613 auth_handler = mechanize.ProxyBasicAuthHandler(password_manager) 1614 realm = "ACME Networks" 1615 http_handler = MockHTTPHandler( 1616 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1617 opener.add_handler(auth_handler) 1618 opener.add_handler(http_handler) 1619 self._test_basic_auth( 1620 opener, 1621 auth_handler, 1622 "Proxy-authorization", 1623 realm, 1624 http_handler, 1625 password_manager, 1626 "http://acme.example.com:3128/protected", 1627 "proxy.example.com:3128", ) 1628 1629 def test_proxy_https_proxy_authorization(self): 1630 o = OpenerDirector() 1631 ph = mechanize.ProxyHandler(dict(https='proxy.example.com:3128')) 1632 o.add_handler(ph) 1633 https_handler = MockHTTPSHandler() 1634 o.add_handler(https_handler) 1635 req = Request("https://www.example.com/") 1636 req.add_header("Proxy-Authorization", "FooBar") 1637 req.add_header("User-Agent", "Grail") 1638 self.assertEqual(req.get_host(), "www.example.com") 1639 self.assertIsNone(req._tunnel_host) 1640 o.open(req) 1641 # Verify Proxy-Authorization gets tunneled to request. 1642 # httpsconn req_headers do not have the Proxy-Authorization header but 1643 # the req will have. 1644 self.assertNotIn( 1645 ("Proxy-Authorization", "FooBar"), 1646 https_handler.httpconn.req_headers) 1647 self.assertIn( 1648 ("User-Agent", "Grail"), https_handler.httpconn.req_headers) 1649 self.assertIsNotNone(req._tunnel_host) 1650 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1651 self.assertEqual(req.get_header("Proxy-authorization"), "FooBar") 1652 1653 def test_basic_and_digest_auth_handlers(self): 1654 # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40* 1655 # response (http://python.org/sf/1479302), where it should instead 1656 # return None to allow another handler (especially 1657 # HTTPBasicAuthHandler) to handle the response. 1658 1659 # Also (http://python.org/sf/1479302, RFC 2617 section 1.2), we must 1660 # try digest first (since it's the strongest auth scheme), so we record 1661 # order of calls here to check digest comes first: 1662 class RecordingOpenerDirector(OpenerDirector): 1663 def __init__(self): 1664 OpenerDirector.__init__(self) 1665 self.recorded = [] 1666 1667 def record(self, info): 1668 self.recorded.append(info) 1669 1670 class TestDigestAuthHandler(mechanize.HTTPDigestAuthHandler): 1671 def http_error_401(self, *args, **kwds): 1672 self.parent.record("digest") 1673 mechanize.HTTPDigestAuthHandler.http_error_401(self, *args, 1674 **kwds) 1675 1676 class TestBasicAuthHandler(mechanize.HTTPBasicAuthHandler): 1677 def http_error_401(self, *args, **kwds): 1678 self.parent.record("basic") 1679 mechanize.HTTPBasicAuthHandler.http_error_401(self, *args, 1680 **kwds) 1681 1682 opener = RecordingOpenerDirector() 1683 password_manager = MockPasswordManager() 1684 digest_handler = TestDigestAuthHandler(password_manager) 1685 basic_handler = TestBasicAuthHandler(password_manager) 1686 realm = "ACME Networks" 1687 http_handler = MockHTTPHandler( 1688 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1689 opener.add_handler(digest_handler) 1690 opener.add_handler(basic_handler) 1691 opener.add_handler(http_handler) 1692 opener._maybe_reindex_handlers() 1693 1694 # check basic auth isn't blocked by digest handler failing 1695 self._test_basic_auth( 1696 opener, 1697 basic_handler, 1698 "Authorization", 1699 realm, 1700 http_handler, 1701 password_manager, 1702 "http://acme.example.com/protected", 1703 "http://acme.example.com/protected", ) 1704 # check digest was tried before basic (twice, because 1705 # _test_basic_auth called .open() twice) 1706 self.assertEqual(opener.recorded, ["digest", "basic"] * 2) 1707 1708 def _test_basic_auth(self, opener, auth_handler, auth_header, realm, 1709 http_handler, password_manager, request_url, 1710 protected_url): 1711 import base64 1712 user, password = "wile", "coyote" 1713 1714 # .add_password() fed through to password manager 1715 auth_handler.add_password(realm, request_url, user, password) 1716 self.assertEqual(realm, password_manager.realm) 1717 self.assertEqual(request_url, password_manager.url) 1718 self.assertEqual(user, password_manager.user) 1719 self.assertEqual(password, password_manager.password) 1720 1721 opener.open(request_url) 1722 1723 # should have asked the password manager for the username/password 1724 self.assertEqual(password_manager.target_realm, realm) 1725 self.assertEqual(password_manager.target_url, protected_url) 1726 1727 # expect one request without authorization, then one with 1728 self.assertEqual(len(http_handler.requests), 2) 1729 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1730 userpass = ('%s:%s' % (user, password)).encode('utf-8') 1731 auth_hdr_value = b'Basic ' + base64.b64encode(userpass).strip() 1732 self.assertEqual(http_handler.requests[1].get_header(auth_header), 1733 auth_hdr_value.decode('ascii')) 1734 1735 # if the password manager can't find a password, the handler won't 1736 # handle the HTTP auth error 1737 password_manager.user = password_manager.password = None 1738 http_handler.reset() 1739 opener.open(request_url) 1740 self.assertEqual(len(http_handler.requests), 1) 1741 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1742 1743 1744class HeadParserTests(unittest.TestCase): 1745 def test(self): 1746 from mechanize import HTTPEquivParser 1747 htmls = [ 1748 ( 1749 b"""<meta http-equiv=refresh content="1; http://example.com/"> 1750 """, [(b"refresh", b"1; http://example.com/")]), 1751 1752 ( 1753 b""" 1754 <html><head><title>\xea</title> 1755 <meta http-equiv="refresh" content="1; http://example.com/"> 1756 <meta name="spam" content="eggs"> 1757 <meta content="b\ar" http-equiv="f&Newline;oo"> 1758 <p> <!-- p is not allowed in head, so parsing should stop --> 1759 <meta http-equiv="moo" content="cow"> 1760 </html> 1761 """, 1762 [ 1763 (b"refresh", b"1; http://example.com/"), 1764 (b"f\noo", b"b\\ar") 1765 ]), 1766 1767 ( 1768 b"""<meta http-equiv="refresh"> 1769 """, []), 1770 1771 ] 1772 for html, result in htmls: 1773 headers = HTTPEquivParser(html)() 1774 self.assertEqual(result, headers) 1775 1776 1777class A: 1778 def a(self): 1779 pass 1780 1781 1782class B(A): 1783 def a(self): 1784 pass 1785 1786 def b(self): 1787 pass 1788 1789 1790class C(A): 1791 def c(self): 1792 pass 1793 1794 1795class D(C, B): 1796 def a(self): 1797 pass 1798 1799 def d(self): 1800 pass 1801 1802 1803class FunctionTests(unittest.TestCase): 1804 def test_build_opener(self): 1805 class MyHTTPHandler(HTTPHandler): 1806 pass 1807 1808 class FooHandler(mechanize.BaseHandler): 1809 def foo_open(self): 1810 pass 1811 1812 class BarHandler(mechanize.BaseHandler): 1813 def bar_open(self): 1814 pass 1815 1816 o = build_opener(FooHandler, BarHandler) 1817 self.opener_has_handler(o, FooHandler) 1818 self.opener_has_handler(o, BarHandler) 1819 1820 # can take a mix of classes and instances 1821 o = build_opener(FooHandler, BarHandler()) 1822 self.opener_has_handler(o, FooHandler) 1823 self.opener_has_handler(o, BarHandler) 1824 1825 # subclasses of default handlers override default handlers 1826 o = build_opener(MyHTTPHandler) 1827 self.opener_has_handler(o, MyHTTPHandler) 1828 1829 # a particular case of overriding: default handlers can be passed 1830 # in explicitly 1831 o = build_opener() 1832 self.opener_has_handler(o, HTTPHandler) 1833 o = build_opener(HTTPHandler) 1834 self.opener_has_handler(o, HTTPHandler) 1835 o = build_opener(HTTPHandler()) 1836 self.opener_has_handler(o, HTTPHandler) 1837 1838 # Issue2670: multiple handlers sharing the same base class 1839 class MyOtherHTTPHandler(HTTPHandler): 1840 pass 1841 1842 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) 1843 self.opener_has_handler(o, MyHTTPHandler) 1844 self.opener_has_handler(o, MyOtherHTTPHandler) 1845 1846 def opener_has_handler(self, opener, handler_class): 1847 for h in opener.handlers: 1848 if h.__class__ == handler_class: 1849 break 1850 else: 1851 self.assertTrue(False) 1852 1853 1854class RequestTests(unittest.TestCase): 1855 def setUp(self): 1856 self.get = Request("http://www.python.org/~jeremy/") 1857 self.post = Request( 1858 "http://www.python.org/~jeremy/", 1859 "data", 1860 headers={"X-Test": "test"}) 1861 1862 def test_method(self): 1863 self.assertEqual("POST", self.post.get_method()) 1864 self.assertEqual("GET", self.get.get_method()) 1865 1866 def test_add_data(self): 1867 self.assertTrue(not self.get.has_data()) 1868 self.assertEqual("GET", self.get.get_method()) 1869 self.get.add_data("spam") 1870 self.assertTrue(self.get.has_data()) 1871 self.assertEqual("POST", self.get.get_method()) 1872 1873 def test_get_full_url(self): 1874 self.assertEqual("http://www.python.org/~jeremy/", 1875 self.get.get_full_url()) 1876 1877 def test_selector(self): 1878 self.assertEqual("/~jeremy/", self.get.get_selector()) 1879 req = Request("http://www.python.org/") 1880 self.assertEqual("/", req.get_selector()) 1881 1882 def test_normalize_url(self): 1883 def t(x, expected=None): 1884 self.assertEqual(normalize_url(x), expected or x) 1885 1886 t('https://simple.com/moo%7Ese') 1887 t('https://ex.com/Spört', 'https://ex.com/Sp%C3%B6rt') 1888 t('https://ex.com/Sp%C3%B6rt') 1889 1890 def test_get_type(self): 1891 self.assertEqual("http", self.get.get_type()) 1892 1893 def test_get_host(self): 1894 self.assertEqual("www.python.org", self.get.get_host()) 1895 1896 def test_get_host_unquote(self): 1897 req = Request("http://www.%70ython.org/") 1898 self.assertEqual("www.python.org", req.get_host()) 1899 1900 def test_proxy(self): 1901 self.assertTrue(not self.get.has_proxy()) 1902 self.get.set_proxy("www.perl.org", "http") 1903 self.assertTrue(self.get.has_proxy()) 1904 self.assertEqual("www.python.org", self.get.get_origin_req_host()) 1905 self.assertEqual("www.perl.org", self.get.get_host()) 1906 1907 def test_data(self): 1908 r = Request('https://example.com', data={'a': 1}) 1909 self.assertEqual(r.get_method(), 'POST') 1910 self.assertEqual(r.get_data(), 'a=1') 1911 r = Request('https://example.com', data={'a': 1}, method='GET') 1912 self.assertEqual(r.get_method(), 'GET') 1913 self.assertEqual(r.get_data(), None) 1914 self.assertEqual(r.get_full_url(), 'https://example.com?a=1') 1915 1916 1917if __name__ == "__main__": 1918 import doctest 1919 doctest.testmod() 1920 unittest.main() 1921