1# vim:fileencoding=utf-8
2"""Tests for urllib2-level functionality.
3
4This is urllib2's tests (most of which came from mechanize originally), plus
5some extra tests added, and modifications from bug fixes and feature additions
6to mechanize.
7"""
8
9# TODO:
10# Request
11# CacheFTPHandler (hard to write)
12# parse_keqv_list, parse_http_list
13
14import os
15import sys
16import unittest
17from io import BytesIO
18
19import mechanize
20
21from mechanize._response import test_response
22from mechanize import HTTPRedirectHandler, \
23    HTTPEquivProcessor, HTTPRefreshProcessor, \
24    HTTPCookieProcessor, HTTPRefererProcessor, \
25    HTTPErrorProcessor, HTTPHandler
26from mechanize import OpenerDirector, build_opener, Request
27from mechanize._urllib2_fork import AbstractHTTPHandler, normalize_url, AbstractBasicAuthHandler
28from mechanize._util import write_file
29
30import mechanize._response
31import mechanize._sockettimeout as _sockettimeout
32import mechanize._testcase
33import mechanize._urllib2_fork
34from mechanize._mechanize import sanepathname2url
35from mechanize.polyglot import create_response_info, iteritems
36
37# from logging import getLogger, DEBUG
38# l = getLogger("mechanize")
39# l.setLevel(DEBUG)
40
41
42class TrivialTests(mechanize._testcase.TestCase):
43    def test_trivial(self):
44        # A couple trivial tests
45
46        self.assertRaises(ValueError, mechanize.urlopen, 'bogus url')
47
48        fname = os.path.join(self.make_temp_dir(), "test.txt")
49        data = b'data'
50        write_file(fname, data)
51        if os.sep == '\\':
52            fname = '/' + fname
53        file_url = "file://" + fname
54        try:
55            f = mechanize.urlopen(file_url)
56        except Exception as e:
57            raise ValueError('Failed to open URL: {} for fname: {} with error: {}'.format(file_url, fname, e))
58        self.assertEqual(f.read(), data)
59        f.close()
60
61    def test_parse_http_list(self):
62        tests = [('a,b,c', ['a', 'b', 'c']), (
63            'path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
64                 ('a, b, "c", "d", "e,f", g, h',
65                  ['a', 'b', '"c"', '"d"', '"e,f"', 'g',
66                   'h']), ('a="b\\"c", d="e\\,f", g="h\\\\i"',
67                           ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
68        for string, list in tests:
69            self.assertEqual(
70                mechanize._urllib2_fork.parse_http_list(string), list)
71
72    def test_parse_authreq(self):
73        for bad in (",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,",):
74            self.assertIsNone(AbstractBasicAuthHandler.rx.search(bad))
75
76
77def test_request_headers_dict():
78    """
79    The Request.headers dictionary is not a documented interface.  It should
80    stay that way, because the complete set of headers are only accessible
81    through the .get_header(), .has_header(), .header_items() interface.
82    However, .headers pre-dates those methods, and so real code will be using
83    the dictionary.
84
85    The introduction in 2.4 of those methods was a mistake for the same reason:
86    code that previously saw all (urllib2 user)-provided headers in .headers
87    now sees only a subset (and the function interface is ugly and incomplete).
88    A better change would have been to replace .headers dict with a dict
89    subclass (or UserDict.DictMixin instance?)  that preserved the .headers
90    interface and also provided access to the "unredirected" headers.  It's
91    probably too late to fix that, though.
92
93
94    Check .capitalize() case normalization:
95
96    >>> url = "http://example.com"
97    >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
98    'blah'
99    >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
100    'blah'
101
102    Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
103    but that could be changed in future.
104
105    """
106
107
108def test_request_headers_methods():
109    """
110    Note the case normalization of header names here, to .capitalize()-case.
111    This should be preserved for backwards-compatibility.  (In the HTTP case,
112    normalization to .title()-case is done by urllib2 before sending headers to
113    httplib).
114
115    >>> url = "http://example.com"
116    >>> r = Request(url, headers={"Spam-eggs": "blah"})
117    >>> r.has_header("Spam-eggs")
118    True
119    >>> r.header_items()
120    [('Spam-eggs', 'blah')]
121    >>> r.add_header("Foo-Bar", "baz")
122    >>> items = r.header_items()
123    >>> items.sort()
124    >>> items
125    [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
126
127    Note that e.g. r.has_header("spam-EggS") is currently False, and
128    r.get_header("spam-EggS") returns None, but that could be changed in
129    future.
130
131    >>> r.has_header("Not-there")
132    False
133    >>> print r.get_header("Not-there")
134    None
135    >>> r.get_header("Not-there", "default")
136    'default'
137
138    """
139
140
141def test_password_manager(self):
142    """
143    >>> mgr = mechanize.HTTPPasswordMgr()
144    >>> add = mgr.add_password
145    >>> add("Some Realm", "http://example.com/", "joe", "password")
146    >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
147    >>> add("c", "http://example.com/foo", "foo", "ni")
148    >>> add("c", "http://example.com/bar", "bar", "nini")
149    >>> add("b", "http://example.com/", "first", "blah")
150    >>> add("b", "http://example.com/", "second", "spam")
151    >>> add("a", "http://example.com", "1", "a")
152    >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
153    >>> add("Some Realm", "d.example.com", "4", "d")
154    >>> add("Some Realm", "e.example.com:3128", "5", "e")
155
156    >>> mgr.find_user_password("Some Realm", "example.com")
157    ('joe', 'password')
158    >>> mgr.find_user_password("Some Realm", "http://example.com")
159    ('joe', 'password')
160    >>> mgr.find_user_password("Some Realm", "http://example.com/")
161    ('joe', 'password')
162    >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
163    ('joe', 'password')
164    >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
165    ('joe', 'password')
166    >>> mgr.find_user_password("c", "http://example.com/foo")
167    ('foo', 'ni')
168    >>> mgr.find_user_password("c", "http://example.com/bar")
169    ('bar', 'nini')
170
171    Actually, this is really undefined ATM
172##     Currently, we use the highest-level path where more than one match:
173
174##     >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
175##     ('joe', 'password')
176
177    Use latest add_password() in case of conflict:
178
179    >>> mgr.find_user_password("b", "http://example.com/")
180    ('second', 'spam')
181
182    No special relationship between a.example.com and example.com:
183
184    >>> mgr.find_user_password("a", "http://example.com/")
185    ('1', 'a')
186    >>> mgr.find_user_password("a", "http://a.example.com/")
187    (None, None)
188
189    Ports:
190
191    >>> mgr.find_user_password("Some Realm", "c.example.com")
192    (None, None)
193    >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
194    ('3', 'c')
195    >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
196    ('3', 'c')
197    >>> mgr.find_user_password("Some Realm", "d.example.com")
198    ('4', 'd')
199    >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
200    ('5', 'e')
201
202    """
203    pass
204
205
206def test_password_manager_default_port(self):
207    """
208    >>> mgr = mechanize.HTTPPasswordMgr()
209    >>> add = mgr.add_password
210
211    The point to note here is that we can't guess the default port if there's
212    no scheme.  This applies to both add_password and find_user_password.
213
214    >>> add("f", "http://g.example.com:80", "10", "j")
215    >>> add("g", "http://h.example.com", "11", "k")
216    >>> add("h", "i.example.com:80", "12", "l")
217    >>> add("i", "j.example.com", "13", "m")
218    >>> mgr.find_user_password("f", "g.example.com:100")
219    (None, None)
220    >>> mgr.find_user_password("f", "g.example.com:80")
221    ('10', 'j')
222    >>> mgr.find_user_password("f", "g.example.com")
223    (None, None)
224    >>> mgr.find_user_password("f", "http://g.example.com:100")
225    (None, None)
226    >>> mgr.find_user_password("f", "http://g.example.com:80")
227    ('10', 'j')
228    >>> mgr.find_user_password("f", "http://g.example.com")
229    ('10', 'j')
230    >>> mgr.find_user_password("g", "h.example.com")
231    ('11', 'k')
232    >>> mgr.find_user_password("g", "h.example.com:80")
233    ('11', 'k')
234    >>> mgr.find_user_password("g", "http://h.example.com:80")
235    ('11', 'k')
236    >>> mgr.find_user_password("h", "i.example.com")
237    (None, None)
238    >>> mgr.find_user_password("h", "i.example.com:80")
239    ('12', 'l')
240    >>> mgr.find_user_password("h", "http://i.example.com:80")
241    ('12', 'l')
242    >>> mgr.find_user_password("i", "j.example.com")
243    ('13', 'm')
244    >>> mgr.find_user_password("i", "j.example.com:80")
245    (None, None)
246    >>> mgr.find_user_password("i", "http://j.example.com")
247    ('13', 'm')
248    >>> mgr.find_user_password("i", "http://j.example.com:80")
249    (None, None)
250
251    """
252
253
254class MockOpener:
255    addheaders = []
256    finalize_request_headers = None
257
258    def open(self,
259             req,
260             data=None,
261             timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):
262        self.req, self.data, self.timeout = req, data, timeout
263
264    def error(self, proto, *args):
265        self.proto, self.args = proto, args
266
267
268class MockFile:
269    def read(self, count=None):
270        pass
271
272    def readline(self, count=None):
273        pass
274
275    def close(self):
276        pass
277
278    def __iter__(self):
279        for i in ():
280            yield i
281
282
283def http_message(mapping):
284    """
285    >>> http_message({"Content-Type": "text/html"}).items()
286    [('content-type', 'text/html')]
287
288    """
289    f = []
290    for kv in iteritems(mapping):
291        f.append("%s: %s" % kv)
292    f.append("")
293    msg = "\r\n".join(f)
294    if not isinstance(msg, bytes):
295        msg = msg.encode('iso-8859-1')
296    msg = create_response_info(BytesIO(msg))
297    return msg
298
299
300class MockResponse(BytesIO):
301    def __init__(self, code, msg, headers, data, url=None):
302        if not isinstance(data, bytes):
303            data = data.encode('utf-8')
304        BytesIO.__init__(self, data)
305        self.code, self.msg, self.headers, self.url = code, msg, headers, url
306
307    def info(self):
308        return self.headers
309
310    def geturl(self):
311        return self.url
312
313
314class MockCookieJar:
315    def add_cookie_header(self, request, unverifiable=False):
316        self.ach_req, self.ach_u = request, unverifiable
317
318    def extract_cookies(self, response, request, unverifiable=False):
319        self.ec_req, self.ec_r, self.ec_u = request, response, unverifiable
320
321
322class FakeMethod:
323    def __init__(self, meth_name, action, handle):
324        self.meth_name = meth_name
325        self.handle = handle
326        self.action = action
327
328    def __call__(self, *args):
329        return self.handle(self.meth_name, self.action, *args)
330
331
332class MockHandler:
333    # useful for testing handler machinery
334    # see add_ordered_mock_handlers() docstring
335    handler_order = 500
336
337    def __init__(self, methods):
338        self._define_methods(methods)
339
340    def _define_methods(self, methods):
341        for spec in methods:
342            if len(spec) == 2:
343                name, action = spec
344            else:
345                name, action = spec, None
346            meth = FakeMethod(name, action, self.handle)
347            setattr(self.__class__, name, meth)
348
349    def handle(self, fn_name, action, *args, **kwds):
350        self.parent.calls.append((self, fn_name, args, kwds))
351        if action is None:
352            return None
353        elif action == "return self":
354            return self
355        elif action == "return response":
356            res = MockResponse(200, "OK", {}, "")
357            return res
358        elif action == "return request":
359            return Request("http://blah/")
360        elif action.startswith("error"):
361            code = action[action.rfind(" ") + 1:]
362            try:
363                code = int(code)
364            except ValueError:
365                pass
366            res = MockResponse(200, "OK", {}, "")
367            return self.parent.error("http", args[0], res, code, "", {})
368        elif action == "raise":
369            raise mechanize.URLError("blah")
370        assert False
371
372    def close(self):
373        pass
374
375    def add_parent(self, parent):
376        self.parent = parent
377        self.parent.calls = []
378
379    def __lt__(self, other):
380        if not hasattr(other, "handler_order"):
381            # Try to preserve the old behavior of having custom classes
382            # inserted after default ones (works only for custom user
383            # classes which are not aware of handler_order).
384            return True
385        return self.handler_order < other.handler_order
386
387
388def add_ordered_mock_handlers(opener, meth_spec):
389    """Create MockHandlers and add them to an OpenerDirector.
390
391    meth_spec: list of lists of tuples and strings defining methods to define
392    on handlers.  eg:
393
394    [["http_error", "ftp_open"], ["http_open"]]
395
396    defines methods .http_error() and .ftp_open() on one handler, and
397    .http_open() on another.  These methods just record their arguments and
398    return None.  Using a tuple instead of a string causes the method to
399    perform some action (see MockHandler.handle()), eg:
400
401    [["http_error"], [("http_open", "return request")]]
402
403    defines .http_error() on one handler (which simply returns None), and
404    .http_open() on another handler, which returns a Request object.
405
406    """
407    handlers = []
408    count = 0
409    for meths in meth_spec:
410
411        class MockHandlerSubclass(MockHandler):
412            pass
413
414        h = MockHandlerSubclass(meths)
415        h.handler_order += count
416        h.add_parent(opener)
417        count = count + 1
418        handlers.append(h)
419        opener.add_handler(h)
420    return handlers
421
422
423def build_test_opener(*handler_instances):
424    opener = OpenerDirector()
425    for h in handler_instances:
426        opener.add_handler(h)
427    return opener
428
429
430class MockHTTPHandler(mechanize.BaseHandler):
431    # useful for testing redirections and auth
432    # sends supplied headers and code as first response
433    # sends 200 OK as second response
434
435    def __init__(self, code, headers):
436        self.code = code
437        self.headers = headers
438        self.reset()
439
440    def reset(self):
441        self._count = 0
442        self.requests = []
443
444    def http_open(self, req):
445        import copy
446        self.requests.append(copy.deepcopy(req))
447        if self._count == 0:
448            self._count = self._count + 1
449            name = "Not important"
450            msg = create_response_info(BytesIO(
451                self.headers.encode('iso-8859-1')))
452            return self.parent.error("http", req,
453                                     test_response(), self.code, name, msg)
454        else:
455            self.req = req
456            return test_response("", [], req.get_full_url())
457
458
459class MockHTTPResponse:
460    def __init__(self, fp, msg, status, reason):
461        self.fp = fp
462        self.msg = msg
463        self.status = status
464        self.reason = reason
465
466    def read(self):
467        return b''
468
469    def readinto(self, b):
470        pass
471
472    def close(self):
473        self.fp = None
474
475
476class MockHTTPClass:
477    def __init__(self):
478        self.req_headers = []
479        self.data = None
480        self.raise_on_endheaders = False
481        self._tunnel_headers = {}
482
483    def __call__(self, host, timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):
484        self.host = host
485        self.timeout = timeout
486        return self
487
488    def set_debuglevel(self, level):
489        self.level = level
490
491    def set_tunnel(self, host, port=None, headers=None):
492        self._tunnel_host = host
493        self._tunnel_port = port
494        if headers:
495            self._tunnel_headers = headers
496        else:
497            self._tunnel_headers.clear()
498
499    def request(self, method, url, body=None, headers={}):
500        self.method = method
501        self.selector = url
502        self.req_headers += list(iteritems(headers))
503        self.req_headers.sort()
504        if body:
505            self.data = body
506        if self.raise_on_endheaders:
507            import socket
508
509            raise socket.error()
510
511    def getresponse(self):
512        return MockHTTPResponse(MockFile(), {}, 200, "OK")
513
514
515class MockHTTPSHandler(AbstractHTTPHandler):
516    # Useful for testing the Proxy-Authorization request by verifying the
517    # properties of httpcon
518    httpconn = MockHTTPClass()
519
520    def https_open(self, req):
521        return self.do_open(self.httpconn, req)
522
523
524class OpenerDirectorTests(unittest.TestCase):
525    def test_add_non_handler(self):
526        class NonHandler(object):
527            pass
528
529        self.assertRaises(TypeError, OpenerDirector().add_handler,
530                          NonHandler())
531
532    def test_badly_named_methods(self):
533        # test work-around for three methods that accidentally follow the
534        # naming conventions for handler methods
535        # (*_open() / *_request() / *_response())
536
537        # These used to call the accidentally-named methods, causing a
538        # TypeError in real code; here, returning self from these mock
539        # methods would either cause no exception, or AttributeError.
540
541        from mechanize import URLError
542
543        o = OpenerDirector()
544        meth_spec = [
545            [("do_open", "return self"), ("proxy_open", "return self")],
546            [("redirect_request", "return self")],
547        ]
548        add_ordered_mock_handlers(o, meth_spec)
549        o.add_handler(mechanize.UnknownHandler())
550        for scheme in "do", "proxy", "redirect":
551            self.assertRaises(URLError, o.open, scheme + "://example.com/")
552
553    def test_handled(self):
554        # handler returning non-None means no more handlers will be called
555        o = OpenerDirector()
556        meth_spec = [
557            ["http_open", "ftp_open", "http_error_302"],
558            ["ftp_open"],
559            [("http_open", "return self")],
560            [("http_open", "return self")],
561        ]
562        handlers = add_ordered_mock_handlers(o, meth_spec)
563
564        req = Request("http://example.com/")
565        r = o.open(req)
566        # Second .http_open() gets called, third doesn't, since second returned
567        # non-None.  Handlers without .http_open() never get any methods called
568        # on them.
569        # In fact, second mock handler defining .http_open() returns self
570        # (instead of response), which becomes the OpenerDirector's return
571        # value.
572        self.assertEqual(r, handlers[2])
573        calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
574        for expected, got in zip(calls, o.calls):
575            handler, name, args, kwds = got
576            self.assertEqual((handler, name), expected)
577            self.assertEqual(args, (req, ))
578
579    def test_reindex_handlers(self):
580        o = OpenerDirector()
581
582        class MockHandler:
583            def add_parent(self, parent):
584                pass
585
586            def close(self):
587                pass
588
589            def __lt__(self, other):
590                return self.handler_order < other.handler_order
591
592        # this first class is here as an obscure regression test for bug
593        # encountered during development: if something manages to get through
594        # to _maybe_reindex_handlers, make sure it's properly removed and
595        # doesn't affect adding of subsequent handlers
596        class NonHandler(MockHandler):
597            handler_order = 1
598
599        class Handler(MockHandler):
600            handler_order = 2
601
602            def http_open(self):
603                pass
604
605        class Processor(MockHandler):
606            handler_order = 3
607
608            def any_response(self):
609                pass
610
611            def http_response(self):
612                pass
613
614        o.add_handler(NonHandler())
615        h = Handler()
616        o.add_handler(h)
617        p = Processor()
618        o.add_handler(p)
619        o._maybe_reindex_handlers()
620        self.assertEqual(o.handle_open, {"http": [h]})
621        self.assertEqual(len(list(o.process_response.keys())), 1)
622        self.assertEqual(list(o.process_response["http"]), [p])
623        self.assertEqual(list(o._any_response), [p])
624        self.assertEqual(o.handlers, [h, p])
625
626    def test_handler_order(self):
627        o = OpenerDirector()
628        handlers = []
629        for meths, handler_order in [
630            ([("http_open", "return self")], 500),
631            (["http_open"], 0),
632        ]:
633
634            class MockHandlerSubclass(MockHandler):
635                pass
636
637            h = MockHandlerSubclass(meths)
638            h.handler_order = handler_order
639            handlers.append(h)
640            o.add_handler(h)
641
642        o.open("http://example.com/")
643        # handlers called in reverse order, thanks to their sort order
644        self.assertEqual(o.calls[0][0], handlers[1])
645        self.assertEqual(o.calls[1][0], handlers[0])
646
647    def test_raise(self):
648        # raising URLError stops processing of request
649        o = OpenerDirector()
650        meth_spec = [
651            [("http_open", "raise")],
652            [("http_open", "return self")],
653        ]
654        handlers = add_ordered_mock_handlers(o, meth_spec)
655
656        req = Request("http://example.com/")
657        self.assertRaises(mechanize.URLError, o.open, req)
658        self.assertEqual(o.calls, [(handlers[0], "http_open", (req, ), {})])
659
660# def test_error(self):
661# XXX this doesn't actually seem to be used in standard library,
662# but should really be tested anyway...
663
664    def test_http_error(self):
665        # XXX http_error_default
666        # http errors are a special case
667        o = OpenerDirector()
668        meth_spec = [
669            [("http_open", "error 302")],
670            [("http_error_400", "raise"), "http_open"],
671            [("http_error_302", "return response"), "http_error_303",
672             "http_error"],
673            [("http_error_302")],
674        ]
675        handlers = add_ordered_mock_handlers(o, meth_spec)
676
677        req = Request("http://example.com/")
678        o.open(req)
679        assert len(o.calls) == 2
680        ignore = object()
681        calls = [(handlers[0], "http_open", (req, )), (
682            handlers[2], "http_error_302", (req, ignore, 302, "", {}))]
683        for expected, got in zip(calls, o.calls):
684            handler, method_name, args = expected
685            self.assertEqual((handler, method_name), got[:2])
686            self.assertEqual(len(args), len(got[2]))
687            for a, b in zip(args, got[2]):
688                if a is not ignore:
689                    self.assertEqual(a, b)
690
691    def test_http_error_raised(self):
692        # should get an HTTPError if an HTTP handler raises a non-200 response
693        # XXX it worries me that this is the only test that excercises the else
694        # branch in HTTPDefaultErrorHandler
695        from mechanize import _response
696        o = mechanize.OpenerDirector()
697        o.add_handler(mechanize.HTTPErrorProcessor())
698        o.add_handler(mechanize.HTTPDefaultErrorHandler())
699
700        class HTTPHandler(AbstractHTTPHandler):
701            def http_open(self, req):
702                return _response.test_response(code=302)
703
704        o.add_handler(HTTPHandler())
705        self.assertRaises(mechanize.HTTPError, o.open, "http://example.com/")
706
707    def test_processors(self):
708        # *_request / *_response methods get called appropriately
709        o = OpenerDirector()
710        meth_spec = [
711            [("http_request", "return request"),
712             ("http_response", "return response")],
713            [("http_request", "return request"),
714             ("http_response", "return response")],
715        ]
716        handlers = add_ordered_mock_handlers(o, meth_spec)
717
718        req = Request("http://example.com/")
719        o.open(req)
720        # processor methods are called on *all* handlers that define them,
721        # not just the first handler that handles the request
722        calls = [(handlers[0], "http_request"), (handlers[1], "http_request"),
723                 (handlers[0], "http_response"),
724                 (handlers[1], "http_response")]
725
726        self.assertEqual(len(o.calls), len(calls))
727        for i, (handler, name, args, kwds) in enumerate(o.calls):
728            if i < 2:
729                # *_request
730                self.assertEqual((handler, name), calls[i])
731                self.assertEqual(len(args), 1)
732                self.assertTrue(isinstance(args[0], Request))
733            else:
734                # *_response
735                self.assertEqual((handler, name), calls[i])
736                self.assertEqual(len(args), 2)
737                self.assertTrue(isinstance(args[0], Request))
738                # response from opener.open is None, because there's no
739                # handler that defines http_open to handle it
740                self.assertTrue(args[1] is None or
741                                isinstance(args[1], MockResponse))
742
743    def test_any(self):
744        # XXXXX two handlers case: ordering
745        o = OpenerDirector()
746        meth_spec = [[
747            ("http_request", "return request"),
748            ("http_response", "return response"),
749            ("ftp_request", "return request"),
750            ("ftp_response", "return response"),
751            ("any_request", "return request"),
752            ("any_response", "return response"),
753        ]]
754        handlers = add_ordered_mock_handlers(o, meth_spec)
755        handler = handlers[0]
756
757        for scheme in ["http", "ftp"]:
758            o.calls = []
759            req = Request("%s://example.com/" % scheme)
760            o.open(req)
761
762            calls = [
763                (handler, "any_request"),
764                (handler, ("%s_request" % scheme)),
765                (handler, "any_response"),
766                (handler, ("%s_response" % scheme)),
767            ]
768            self.assertEqual(len(o.calls), len(calls))
769            for i, ((handler, name, args, kwds), calls) in (
770                    enumerate(zip(o.calls, calls))):
771                if i < 2:
772                    # *_request
773                    self.assertTrue((handler, name) == calls)
774                    self.assertTrue(len(args) == 1)
775                    self.assertTrue(isinstance(args[0], Request))
776                else:
777                    # *_response
778                    self.assertTrue((handler, name) == calls)
779                    self.assertTrue(len(args) == 2)
780                    self.assertTrue(isinstance(args[0], Request))
781                    # response from opener.open is None, because there's no
782                    # handler that defines http_open to handle it
783                    self.assertTrue(args[1] is None or isinstance(
784                        args[1], MockResponse))
785
786
787class MockRobotFileParserClass:
788    def __init__(self):
789        self.calls = []
790        self._can_fetch = True
791
792    def clear(self):
793        self.calls = []
794
795    def __call__(self):
796        self.calls.append("__call__")
797        return self
798
799    def set_url(self, url):
800        self.calls.append(("set_url", url))
801
802    def set_timeout(self, timeout):
803        self.calls.append(("set_timeout", timeout))
804
805    def set_opener(self, opener):
806        self.calls.append(("set_opener", opener))
807
808    def read(self):
809        self.calls.append("read")
810
811    def can_fetch(self, ua, url):
812        self.calls.append(("can_fetch", ua, url))
813        return self._can_fetch
814
815
816class MockPasswordManager:
817    def add_password(self, realm, uri, user, password):
818        self.realm = realm
819        self.url = uri
820        self.user = user
821        self.password = password
822
823    def find_user_password(self, realm, authuri):
824        self.target_realm = realm
825        self.target_url = authuri
826        return self.user, self.password
827
828
829class HandlerTests(mechanize._testcase.TestCase):
830    def test_ftp(self):
831        class MockFTPWrapper:
832            def __init__(self, data):
833                self.data = data
834
835            def retrfile(self, filename, filetype):
836                self.filename, self.filetype = filename, filetype
837                data = self.data if isinstance(
838                        self.data, bytes) else self.data.encode('utf-8')
839                return BytesIO(data), len(self.data)
840
841        class NullFTPHandler(mechanize.FTPHandler):
842            def __init__(self, data):
843                self.data = data
844
845            def connect_ftp(self, user, passwd, host, port, dirs, timeout):
846                self.user, self.passwd = user, passwd
847                self.host, self.port = host, port
848                self.dirs = dirs
849                self.timeout = timeout
850                self.ftpwrapper = MockFTPWrapper(self.data)
851                return self.ftpwrapper
852
853        import ftplib
854        import socket
855        data = "rheum rhaponicum"
856        h = NullFTPHandler(data)
857        h.parent = MockOpener()
858
859        for url, host, port, type_, dirs, timeout, filename, mimetype in [
860            ("ftp://localhost/foo/bar/baz.html", "localhost", ftplib.FTP_PORT,
861             "I", ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT,
862             "baz.html", "text/html"),
863            ("ftp://localhost:80/foo/bar/", "localhost", 80, "D",
864             ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "", None),
865            ("ftp://localhost/baz.gif;type=a", "localhost", ftplib.FTP_PORT,
866             "A", [], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "baz.gif",
867             None),  # TODO: really this should guess image/gif
868        ]:
869            req = Request(url, timeout=timeout)
870            r = h.ftp_open(req)
871            # ftp authentication not yet implemented by FTPHandler
872            self.assertTrue(h.user == h.passwd == "")
873            self.assertEqual(h.host, socket.gethostbyname(host))
874            self.assertEqual(h.port, port)
875            self.assertEqual(h.dirs, dirs)
876            if sys.version_info >= (2, 6):
877                self.assertEqual(h.timeout, timeout)
878            self.assertEqual(h.ftpwrapper.filename, filename)
879            self.assertEqual(h.ftpwrapper.filetype, type_)
880            headers = r.info()
881            self.assertEqual(headers.get("Content-type"), mimetype)
882            self.assertEqual(int(headers["Content-length"]), len(data))
883
884    def test_file(self):
885        from email.utils import formatdate
886        import socket
887        h = mechanize.FileHandler()
888        o = h.parent = MockOpener()
889
890        temp_file = os.path.join(self.make_temp_dir(), "test.txt")
891        urlpath = sanepathname2url(os.path.abspath(temp_file))
892        towrite = b"hello, world\n"
893        try:
894            fqdn = socket.gethostbyname(socket.gethostname())
895        except socket.gaierror:
896            fqdn = "localhost"
897        for url in [
898                "file://localhost%s" % urlpath, "file://%s" % urlpath,
899                "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
900                "file://%s%s" % (fqdn, urlpath)
901        ]:
902            write_file(temp_file, towrite)
903            r = h.file_open(Request(url))
904            try:
905                data = r.read()
906                headers = r.info()
907                r.geturl()
908            finally:
909                r.close()
910            stats = os.stat(temp_file)
911            modified = formatdate(stats.st_mtime, usegmt=True)
912            self.assertEqual(data, towrite)
913            self.assertEqual(headers["Content-type"], "text/plain")
914            self.assertEqual(headers["Content-length"], "13")
915            self.assertEqual(headers["Last-modified"], modified)
916
917        for url in [
918                "file://localhost:80%s" % urlpath,
919                "file:///file_does_not_exist.txt",
920                "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
921                                       sanepathname2url(os.getcwd()),
922                                       temp_file),
923                "file://somerandomhost.ontheinternet.com%s/%s" % (
924                    sanepathname2url(os.getcwd()), temp_file),
925        ]:
926            write_file(temp_file, towrite)
927            self.assertRaises(mechanize.URLError, h.file_open, Request(url))
928
929        h = mechanize.FileHandler()
930        o = h.parent = MockOpener()
931        # XXXX why does // mean ftp (and /// mean not ftp!), and where
932        #  is file: scheme specified?  I think this is really a bug, and
933        #  what was intended was to distinguish between URLs like:
934        # file:/blah.txt (a file)
935        # file://localhost/blah.txt (a file)
936        # file:///blah.txt (a file)
937        # file://ftp.example.com/blah.txt (an ftp URL)
938        for url, ftp in [
939            ("file://ftp.example.com//foo.txt", True),
940            ("file://ftp.example.com///foo.txt", False),
941                # XXXX bug: fails with OSError, should be URLError
942            ("file://ftp.example.com/foo.txt", False),
943        ]:
944            req = Request(url)
945            try:
946                h.file_open(req)
947            # XXXX remove OSError when bug fixed
948            except (mechanize.URLError, OSError):
949                self.assertFalse(ftp)
950            else:
951                self.assertTrue(o.req is req)
952                self.assertEqual(req.type, "ftp")
953
954    def test_http(self):
955        class MockHTTPResponse:
956            def __init__(self, fp, msg, status, reason):
957                self.fp = fp
958                self.msg = msg
959                self.status = status
960                self.reason = reason
961
962            def read(self):
963                return b''
964
965            def readinto(self, b):
966                pass
967
968            def close(self):
969                self.fp = None
970
971        class MockHTTPClass:
972            def __init__(self):
973                self.req_headers = []
974                self.data = None
975                self.raise_on_endheaders = False
976
977            def __call__(self,
978                         host,
979                         timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):
980                self.host = host
981                self.timeout = timeout
982                return self
983
984            def set_debuglevel(self, level):
985                self.level = level
986
987            def request(self, method, url, body=None, headers={}):
988                self.method = method
989                self.selector = url
990                self.req_headers += list(iteritems(headers))
991                if body:
992                    self.data = body
993                if self.raise_on_endheaders:
994                    import socket
995                    raise socket.error()
996
997            def getresponse(self):
998                return MockHTTPResponse(MockFile(), {}, 200, "OK")
999
1000        h = AbstractHTTPHandler()
1001        o = h.parent = MockOpener()
1002
1003        url = "http://example.com/"
1004        for method, data in [("GET", None), ("POST", "blah")]:
1005            req = Request(url, data, {"Foo": "bar"})
1006            req.add_header('Order', '1')
1007            req.add_unredirected_header("Spam", "eggs")
1008            http = MockHTTPClass()
1009            r = h.do_open(http, req)
1010
1011            # result attributes
1012            r.read
1013            r.readline  # wrapped MockFile methods
1014            r.info
1015            r.geturl  # addinfourl methods
1016            r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
1017            hdrs = r.info()
1018            hdrs.get
1019            hdrs.__contains__  # r.info() gives dict from .getreply()
1020            self.assertEqual(r.geturl(), url)
1021
1022            self.assertEqual(http.host, "example.com")
1023            self.assertEqual(http.level, 0)
1024            self.assertEqual(http.method, method)
1025            self.assertEqual(http.selector, "/")
1026            self.assertEqual(
1027                http.req_headers,
1028                [('Foo', 'bar'), ('Order', '1'),
1029                    ('Spam', 'eggs'), ('Connection', 'close')]
1030            )
1031            self.assertEqual(http.data, data)
1032
1033        # check socket.error converted to URLError
1034        http.raise_on_endheaders = True
1035        self.assertRaises(mechanize.URLError, h.do_open, http, req)
1036
1037        # check adding of standard headers
1038        o.addheaders = [("Spam", "eggs")]
1039        for data in "", None:  # POST, GET
1040            req = Request("http://example.com/", data)
1041            r = MockResponse(200, "OK", {}, "")
1042            h.do_request_(req)
1043            if data is None:  # GET
1044                self.assertTrue("Content-length" not in req.unredirected_hdrs)
1045                self.assertTrue("Content-type" not in req.unredirected_hdrs)
1046            else:  # POST
1047                self.assertEqual(req.unredirected_hdrs["Content-Length"], "0")
1048                self.assertEqual(req.unredirected_hdrs["Content-Type"],
1049                                 "application/x-www-form-urlencoded")
1050            # XXX the details of Host could be better tested
1051            self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
1052            self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
1053
1054            # don't clobber existing headers
1055            req.add_unredirected_header("Content-Length", "foo")
1056            req.add_unredirected_header("Content-Type", "bar")
1057            req.add_unredirected_header("Host", "baz")
1058            req.add_unredirected_header("Spam", "foo")
1059            h.do_request_(req)
1060            self.assertEqual(req.unredirected_hdrs["Content-Length"], "foo")
1061            self.assertEqual(req.unredirected_hdrs["Content-Type"], "bar")
1062            self.assertEqual(req.unredirected_hdrs["Host"], "baz")
1063            self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
1064
1065    def test_http_double_slash(self):
1066        # Checks that the presence of an unnecessary double slash in a url
1067        # doesn't break anything Previously, a double slash directly after the
1068        # host could cause incorrect parsing of the url
1069        h = AbstractHTTPHandler()
1070        h.parent = MockOpener()
1071
1072        data = ""
1073        ds_urls = [
1074            "http://example.com/foo/bar/baz.html",
1075            "http://example.com//foo/bar/baz.html",
1076            "http://example.com/foo//bar/baz.html",
1077            "http://example.com/foo/bar//baz.html",
1078        ]
1079
1080        for ds_url in ds_urls:
1081            ds_req = Request(ds_url, data)
1082
1083            # Check whether host is determined correctly if there is no proxy
1084            np_ds_req = h.do_request_(ds_req)
1085            self.assertEqual(np_ds_req.unredirected_hdrs["Host"],
1086                             "example.com")
1087
1088            # Check whether host is determined correctly if there is a proxy
1089            ds_req.set_proxy("someproxy:3128", None)
1090            p_ds_req = h.do_request_(ds_req)
1091            self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com")
1092
1093    def test_errors(self):
1094        h = HTTPErrorProcessor()
1095        o = h.parent = MockOpener()
1096
1097        req = Request("http://example.com")
1098        # all 2xx are passed through
1099        r = mechanize._response.test_response()
1100        newr = h.http_response(req, r)
1101        self.assertTrue(r is newr)
1102        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
1103        r = mechanize._response.test_response(code=202, msg="Accepted")
1104        newr = h.http_response(req, r)
1105        self.assertTrue(r is newr)
1106        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
1107        r = mechanize._response.test_response(code=206, msg="Partial content")
1108        newr = h.http_response(req, r)
1109        self.assertTrue(r is newr)
1110        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
1111        # anything else calls o.error (and MockOpener returns None, here)
1112        r = mechanize._response.test_response(code=502, msg="Bad gateway")
1113        self.assertTrue(h.http_response(req, r) is None)
1114        self.assertEqual(o.proto, "http")  # o.error called
1115        self.assertEqual(o.args[:4], (req, r, 502, "Bad gateway"))
1116
1117    def test_referer(self):
1118        h = HTTPRefererProcessor()
1119        h.parent = MockOpener()
1120
1121        # normal case
1122        url = "http://example.com/"
1123        req = Request(url)
1124        r = MockResponse(200, "OK", {}, "", url)
1125        newr = h.http_response(req, r)
1126        self.assertTrue(r is newr)
1127        self.assertTrue(h.referer == url)
1128        newreq = h.http_request(req)
1129        self.assertTrue(req is newreq)
1130        self.assertTrue(req.unredirected_hdrs["Referer"] == url)
1131        # don't clobber existing Referer
1132        ref = "http://set.by.user.com/"
1133        req.add_unredirected_header("Referer", ref)
1134        newreq = h.http_request(req)
1135        self.assertTrue(req is newreq)
1136        self.assertTrue(req.unredirected_hdrs["Referer"] == ref)
1137
1138    def test_raise_http_errors(self):
1139        # HTTPDefaultErrorHandler should raise HTTPError if no error handler
1140        # handled the error response
1141        from mechanize import _response
1142        h = mechanize.HTTPDefaultErrorHandler()
1143
1144        url = "http://example.com"
1145        code = 500
1146        msg = "Error"
1147        request = mechanize.Request(url)
1148        response = _response.test_response(url=url, code=code, msg=msg)
1149
1150        # case 1. it's not an HTTPError
1151        try:
1152            h.http_error_default(request, response, code, msg, response.info())
1153        except mechanize.HTTPError as exc:
1154            self.assertTrue(exc is not response)
1155            self.assertTrue(exc.fp is response)
1156        else:
1157            self.assertTrue(False)
1158
1159        # case 2. response object is already an HTTPError, so just re-raise it
1160        error = mechanize.HTTPError(url, code, msg, "fake headers", response)
1161        try:
1162            h.http_error_default(request, error, code, msg, error.info())
1163        except mechanize.HTTPError as exc:
1164            self.assertTrue(exc is error)
1165        else:
1166            self.assertTrue(False)
1167
1168    def test_robots(self):
1169        # XXX useragent
1170        from mechanize import HTTPRobotRulesProcessor
1171        opener = OpenerDirector()
1172        rfpc = MockRobotFileParserClass()
1173        h = HTTPRobotRulesProcessor(rfpc)
1174        opener.add_handler(h)
1175
1176        url = "http://example.com:80/foo/bar.html"
1177        req = Request(url)
1178        # first time: initialise and set up robots.txt parser before checking
1179        #  whether OK to fetch URL
1180        h.http_request(req)
1181        self.assertEqual(rfpc.calls, [
1182            "__call__",
1183            ("set_opener", opener),
1184            ("set_url", "http://example.com:80/robots.txt"),
1185            ("set_timeout", _sockettimeout._GLOBAL_DEFAULT_TIMEOUT),
1186            "read",
1187            ("can_fetch", "", url),
1188        ])
1189        # second time: just use existing parser
1190        rfpc.clear()
1191        req = Request(url)
1192        h.http_request(req)
1193        self.assertTrue(rfpc.calls == [
1194            ("can_fetch", "", url),
1195        ])
1196        # different URL on same server: same again
1197        rfpc.clear()
1198        url = "http://example.com:80/blah.html"
1199        req = Request(url)
1200        h.http_request(req)
1201        self.assertTrue(rfpc.calls == [
1202            ("can_fetch", "", url),
1203        ])
1204        # disallowed URL
1205        rfpc.clear()
1206        rfpc._can_fetch = False
1207        url = "http://example.com:80/rhubarb.html"
1208        req = Request(url)
1209        try:
1210            h.http_request(req)
1211        except mechanize.HTTPError as e:
1212            self.assertTrue(e.request == req)
1213            self.assertTrue(e.code == 403)
1214        # new host: reload robots.txt (even though the host and port are
1215        #  unchanged, we treat this as a new host because
1216        #  "example.com" != "example.com:80")
1217        rfpc.clear()
1218        rfpc._can_fetch = True
1219        url = "http://example.com/rhubarb.html"
1220        req = Request(url)
1221        h.http_request(req)
1222        self.assertEqual(rfpc.calls, [
1223            "__call__",
1224            ("set_opener", opener),
1225            ("set_url", "http://example.com/robots.txt"),
1226            ("set_timeout", _sockettimeout._GLOBAL_DEFAULT_TIMEOUT),
1227            "read",
1228            ("can_fetch", "", url),
1229        ])
1230        # https url -> should fetch robots.txt from https url too
1231        rfpc.clear()
1232        url = "https://example.org/rhubarb.html"
1233        req = Request(url)
1234        h.http_request(req)
1235        self.assertEqual(rfpc.calls, [
1236            "__call__",
1237            ("set_opener", opener),
1238            ("set_url", "https://example.org/robots.txt"),
1239            ("set_timeout", _sockettimeout._GLOBAL_DEFAULT_TIMEOUT),
1240            "read",
1241            ("can_fetch", "", url),
1242        ])
1243        # non-HTTP URL -> ignore robots.txt
1244        rfpc.clear()
1245        url = "ftp://example.com/"
1246        req = Request(url)
1247        h.http_request(req)
1248        self.assertTrue(rfpc.calls == [])
1249
1250    def test_redirected_robots_txt(self):
1251        # redirected robots.txt fetch shouldn't result in another attempted
1252        # robots.txt fetch to check the redirection is allowed!
1253        import mechanize
1254        from mechanize import (
1255                HTTPDefaultErrorHandler, HTTPRedirectHandler,
1256                HTTPRobotRulesProcessor)
1257
1258        class MockHTTPHandler(mechanize.BaseHandler):
1259            def __init__(self):
1260                self.requests = []
1261
1262            def http_open(self, req):
1263                import copy
1264                self.requests.append(copy.deepcopy(req))
1265                if req.get_full_url() == "http://example.com/robots.txt":
1266                    hdr = b"Location: http://example.com/en/robots.txt\r\n\r\n"
1267                    msg = create_response_info(BytesIO(hdr))
1268                    return self.parent.error("http", req,
1269                                             test_response(), 302, "Blah", msg)
1270                else:
1271                    return test_response("Allow: *", [], req.get_full_url())
1272
1273        hh = MockHTTPHandler()
1274        hdeh = HTTPDefaultErrorHandler()
1275        hrh = HTTPRedirectHandler()
1276        rh = HTTPRobotRulesProcessor()
1277        o = build_test_opener(hh, hdeh, hrh, rh)
1278        o.open("http://example.com/")
1279        self.assertEqual([req.get_full_url() for req in hh.requests], [
1280            "http://example.com/robots.txt",
1281            "http://example.com/en/robots.txt",
1282            "http://example.com/",
1283        ])
1284
1285    def test_cookies(self):
1286        cj = MockCookieJar()
1287        h = HTTPCookieProcessor(cj)
1288        h.parent = MockOpener()
1289
1290        req = Request("http://example.com/")
1291        r = MockResponse(200, "OK", {}, "")
1292        newreq = h.http_request(req)
1293        self.assertTrue(cj.ach_req is req is newreq)
1294        self.assertEqual(req.get_origin_req_host(), "example.com")
1295        self.assertFalse(cj.ach_u)
1296        newr = h.http_response(req, r)
1297        self.assertTrue(cj.ec_req is req)
1298        self.assertTrue(cj.ec_r is r is newr)
1299        self.assertFalse(cj.ec_u)
1300
1301    def test_http_equiv(self):
1302        h = HTTPEquivProcessor()
1303        h.parent = MockOpener()
1304
1305        data = ('<html><HEad>'
1306                '<Meta httP-equiv="RefResh" coNtent="spam&amp;Eggs">'
1307                '</Head></html>')
1308        headers = [
1309            ("Foo", "Bar"),
1310            ("Content-type", "text/html"),
1311            ("Refresh", "blah"),
1312        ]
1313        url = "http://example.com/"
1314        req = Request(url)
1315        r = mechanize._response.make_response(data, headers, url, 200, "OK")
1316        newr = h.http_response(req, r)
1317
1318        new_headers = newr.info()
1319        self.assertEqual(new_headers["Foo"], "Bar")
1320        self.assertEqual(new_headers["Refresh"], "spam&Eggs")
1321        self.assertEqual(
1322            new_headers.getheaders("Refresh"), ["blah", "spam&Eggs"])
1323
1324    def test_refresh(self):
1325        # XXX test processor constructor optional args
1326        h = HTTPRefreshProcessor(max_time=None, honor_time=False)
1327
1328        for val, valid in [
1329            ('0; url="http://example.com/foo/"', True),
1330            ("2", True),
1331                # in the past, this failed with UnboundLocalError
1332            ('0; "http://example.com/foo/"', False),
1333        ]:
1334            o = h.parent = MockOpener()
1335            req = Request("http://example.com/")
1336            headers = http_message({"refresh": val})
1337            r = MockResponse(200, "OK", headers, "", "http://example.com/")
1338            h.http_response(req, r)
1339            if valid:
1340                self.assertEqual(o.proto, "http")
1341                self.assertEqual(o.args, (req, r, "refresh", "OK", headers))
1342
1343    def test_refresh_honor_time(self):
1344        class SleepTester:
1345            def __init__(self, test, seconds):
1346                self._test = test
1347                if seconds == 0:
1348                    seconds = None  # don't expect a sleep for 0 seconds
1349                self._expected = seconds
1350                self._got = None
1351
1352            def sleep(self, seconds):
1353                self._got = seconds
1354
1355            def verify(self):
1356                self._test.assertEqual(self._expected, self._got)
1357
1358        class Opener:
1359            called = False
1360
1361            def error(self, *args, **kwds):
1362                self.called = True
1363
1364        def test(rp, header, refresh_after):
1365            expect_refresh = refresh_after is not None
1366            opener = Opener()
1367            rp.parent = opener
1368            st = SleepTester(self, refresh_after)
1369            rp._sleep = st.sleep
1370            rp.http_response(
1371                Request("http://example.com"),
1372                test_response(headers=[("Refresh", header)], url="http://example.com/"), )
1373            self.assertEqual(expect_refresh, opener.called)
1374            st.verify()
1375
1376        # by default, only zero-time refreshes are honoured
1377        test(HTTPRefreshProcessor(), "0", 0)
1378        test(HTTPRefreshProcessor(), "2", None)
1379
1380        # if requested, more than zero seconds are allowed
1381        test(HTTPRefreshProcessor(max_time=None), "2", 2)
1382        test(HTTPRefreshProcessor(max_time=30), "2", 2)
1383
1384        # no sleep if we don't "honor_time"
1385        test(HTTPRefreshProcessor(max_time=30, honor_time=False), "2", 0)
1386
1387        # request for too-long wait before refreshing --> no refresh occurs
1388        test(HTTPRefreshProcessor(max_time=30), "60", None)
1389
1390    def test_redirect(self):
1391        from_url = "http://example.com/a.html"
1392        to_url = "http://example.com/b.html"
1393        h = HTTPRedirectHandler()
1394        o = h.parent = MockOpener()
1395
1396        # ordinary redirect behaviour
1397        for code in 301, 302, 303, 307, "refresh":
1398            for data in None, "blah\nblah\n":
1399                method = getattr(h, "http_error_%s" % code)
1400                req = Request(from_url, data)
1401                req.add_header("Nonsense", "viking=withhold")
1402                req.add_unredirected_header("Spam", "spam")
1403                req.origin_req_host = "example.com"  # XXX
1404                try:
1405                    method(req,
1406                           MockFile(), code, "Blah",
1407                           http_message({
1408                               "location": to_url
1409                           }))
1410                except mechanize.HTTPError:
1411                    # 307 in response to POST requires user OK
1412                    self.assertEqual(code, 307)
1413                    self.assertTrue(data is not None)
1414                self.assertEqual(o.req.get_full_url(), to_url)
1415                try:
1416                    self.assertEqual(o.req.get_method(), "GET")
1417                except AttributeError:
1418                    self.assertFalse(o.req.has_data())
1419
1420                # now it's a GET, there should not be headers regarding content
1421                # (possibly dragged from before being a POST)
1422                headers = [x.lower() for x in o.req.headers]
1423                self.assertTrue("content-length" not in headers)
1424                self.assertTrue("content-type" not in headers)
1425
1426                self.assertEqual(o.req.headers["Nonsense"], "viking=withhold")
1427                self.assertTrue("Spam" not in o.req.headers)
1428                self.assertTrue("Spam" not in o.req.unredirected_hdrs)
1429
1430        # loop detection
1431        req = Request(from_url)
1432
1433        def redirect(h, req, url=to_url):
1434            h.http_error_302(req,
1435                             MockFile(), 302, "Blah",
1436                             http_message({
1437                                 "location": url
1438                             }))
1439
1440        # Note that the *original* request shares the same record of
1441        # redirections with the sub-requests caused by the redirections.
1442
1443        # detect infinite loop redirect of a URL to itself
1444        req = Request(from_url, origin_req_host="example.com")
1445        count = 0
1446        try:
1447            while 1:
1448                redirect(h, req, "http://example.com/")
1449                count = count + 1
1450        except mechanize.HTTPError:
1451            # don't stop until max_repeats, because cookies may introduce state
1452            self.assertEqual(count, HTTPRedirectHandler.max_repeats)
1453
1454        # detect endless non-repeating chain of redirects
1455        req = Request(from_url, origin_req_host="example.com")
1456        count = 0
1457        try:
1458            while 1:
1459                redirect(h, req, "http://example.com/%d" % count)
1460                count = count + 1
1461        except mechanize.HTTPError:
1462            self.assertEqual(count, HTTPRedirectHandler.max_redirections)
1463
1464    def test_redirect_bad_uri(self):
1465        # bad URIs should be cleaned up before redirection
1466        from mechanize._response import test_html_response
1467        from_url = "http://example.com/a.html"
1468        bad_to_url = "http://example.com/b. |html"
1469        good_to_url = "http://example.com/b.%20%7Chtml"
1470
1471        h = HTTPRedirectHandler()
1472        o = h.parent = MockOpener()
1473
1474        req = Request(from_url)
1475        h.http_error_302(
1476            req,
1477            test_html_response(),
1478            302,
1479            "Blah",
1480            http_message({
1481                "location": bad_to_url
1482            }), )
1483        self.assertEqual(o.req.get_full_url(), good_to_url)
1484
1485    def test_refresh_bad_uri(self):
1486        # bad URIs should be cleaned up before redirection
1487        from mechanize._response import test_html_response
1488        bad_to_url = "http://example.com/b. |html"
1489        good_to_url = "http://example.com/b.%20%7Chtml"
1490
1491        h = HTTPRefreshProcessor(max_time=None, honor_time=False)
1492        o = h.parent = MockOpener()
1493
1494        req = Request("http://example.com/")
1495        r = test_html_response(
1496            headers=[("refresh", '0; url="%s"' % bad_to_url)])
1497        h.http_response(req, r)
1498        headers = o.args[-1]
1499        self.assertEqual(headers["Location"], good_to_url)
1500
1501    def test_cookie_redirect(self):
1502        # cookies shouldn't leak into redirected requests
1503        from mechanize import (
1504                CookieJar, HTTPCookieProcessor, HTTPDefaultErrorHandler,
1505                HTTPRedirectHandler)
1506
1507        from test.test_cookies import interact_netscape
1508
1509        cj = CookieJar()
1510        interact_netscape(cj, "http://www.example.com/", "spam=eggs")
1511        hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
1512        hdeh = HTTPDefaultErrorHandler()
1513        hrh = HTTPRedirectHandler()
1514        cp = HTTPCookieProcessor(cj)
1515        o = build_test_opener(hh, hdeh, hrh, cp)
1516        o.open("http://www.example.com/")
1517        self.assertFalse(hh.req.has_header("Cookie"))
1518
1519    def test_proxy(self):
1520        o = OpenerDirector()
1521        ph = mechanize.ProxyHandler(dict(http="proxy.example.com:3128"))
1522        o.add_handler(ph)
1523        meth_spec = [[("http_open", "return response")]]
1524        handlers = add_ordered_mock_handlers(o, meth_spec)
1525
1526        o._maybe_reindex_handlers()
1527
1528        req = Request("http://acme.example.com/")
1529        self.assertEqual(req.get_host(), "acme.example.com")
1530        o.open(req)
1531        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1532
1533        self.assertEqual([(handlers[0], "http_open")],
1534                         [tup[0:2] for tup in o.calls])
1535
1536    def test_proxy_no_proxy(self):
1537        self.monkey_patch_environ("no_proxy", "python.org")
1538        o = OpenerDirector()
1539        ph = mechanize.ProxyHandler(dict(http="proxy.example.com"))
1540        o.add_handler(ph)
1541        req = Request("http://www.perl.org/")
1542        self.assertEqual(req.get_host(), "www.perl.org")
1543        o.open(req)
1544        self.assertEqual(req.get_host(), "proxy.example.com")
1545        req = Request("http://www.python.org")
1546        self.assertEqual(req.get_host(), "www.python.org")
1547        o.open(req)
1548        if sys.version_info >= (2, 6):
1549            # no_proxy environment variable not supported in python 2.5
1550            self.assertEqual(req.get_host(), "www.python.org")
1551
1552    def test_proxy_custom_proxy_bypass(self):
1553        self.monkey_patch_environ("no_proxy",
1554                                  mechanize._testcase.MonkeyPatcher.Unset)
1555
1556        def proxy_bypass(hostname):
1557            return hostname == "noproxy.com"
1558
1559        o = OpenerDirector()
1560        ph = mechanize.ProxyHandler(
1561            dict(http="proxy.example.com"), proxy_bypass=proxy_bypass)
1562
1563        def is_proxied(url):
1564            o.add_handler(ph)
1565            req = Request(url)
1566            o.open(req)
1567            return req.has_proxy()
1568
1569        self.assertTrue(is_proxied("http://example.com"))
1570        self.assertFalse(is_proxied("http://noproxy.com"))
1571
1572    def test_proxy_https(self):
1573        o = OpenerDirector()
1574        ph = mechanize.ProxyHandler(dict(https='proxy.example.com:3128'))
1575        o.add_handler(ph)
1576        meth_spec = [[("https_open", "return response")]]
1577        handlers = add_ordered_mock_handlers(o, meth_spec)
1578        req = Request("https://www.example.com/")
1579        self.assertEqual(req.get_host(), "www.example.com")
1580        o.open(req)
1581        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1582        self.assertEqual([(handlers[0], "https_open")],
1583                         [tup[0:2] for tup in o.calls])
1584
1585    def test_basic_auth(self, quote_char='"'):
1586        opener = OpenerDirector()
1587        password_manager = MockPasswordManager()
1588        auth_handler = mechanize.HTTPBasicAuthHandler(password_manager)
1589        realm = "ACME Widget Store"
1590        http_handler = MockHTTPHandler(
1591            401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
1592            (quote_char, realm, quote_char))
1593        opener.add_handler(auth_handler)
1594        opener.add_handler(http_handler)
1595        self._test_basic_auth(
1596            opener,
1597            auth_handler,
1598            "Authorization",
1599            realm,
1600            http_handler,
1601            password_manager,
1602            "http://acme.example.com/protected",
1603            "http://acme.example.com/protected", )
1604
1605    def test_basic_auth_with_single_quoted_realm(self):
1606        self.test_basic_auth(quote_char="'")
1607
1608    def test_proxy_basic_auth(self):
1609        opener = OpenerDirector()
1610        ph = mechanize.ProxyHandler(dict(http="proxy.example.com:3128"))
1611        opener.add_handler(ph)
1612        password_manager = MockPasswordManager()
1613        auth_handler = mechanize.ProxyBasicAuthHandler(password_manager)
1614        realm = "ACME Networks"
1615        http_handler = MockHTTPHandler(
1616            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1617        opener.add_handler(auth_handler)
1618        opener.add_handler(http_handler)
1619        self._test_basic_auth(
1620            opener,
1621            auth_handler,
1622            "Proxy-authorization",
1623            realm,
1624            http_handler,
1625            password_manager,
1626            "http://acme.example.com:3128/protected",
1627            "proxy.example.com:3128", )
1628
1629    def test_proxy_https_proxy_authorization(self):
1630        o = OpenerDirector()
1631        ph = mechanize.ProxyHandler(dict(https='proxy.example.com:3128'))
1632        o.add_handler(ph)
1633        https_handler = MockHTTPSHandler()
1634        o.add_handler(https_handler)
1635        req = Request("https://www.example.com/")
1636        req.add_header("Proxy-Authorization", "FooBar")
1637        req.add_header("User-Agent", "Grail")
1638        self.assertEqual(req.get_host(), "www.example.com")
1639        self.assertIsNone(req._tunnel_host)
1640        o.open(req)
1641        # Verify Proxy-Authorization gets tunneled to request.
1642        # httpsconn req_headers do not have the Proxy-Authorization header but
1643        # the req will have.
1644        self.assertNotIn(
1645            ("Proxy-Authorization", "FooBar"),
1646            https_handler.httpconn.req_headers)
1647        self.assertIn(
1648            ("User-Agent", "Grail"), https_handler.httpconn.req_headers)
1649        self.assertIsNotNone(req._tunnel_host)
1650        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1651        self.assertEqual(req.get_header("Proxy-authorization"), "FooBar")
1652
1653    def test_basic_and_digest_auth_handlers(self):
1654        # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
1655        # response (http://python.org/sf/1479302), where it should instead
1656        # return None to allow another handler (especially
1657        # HTTPBasicAuthHandler) to handle the response.
1658
1659        # Also (http://python.org/sf/1479302, RFC 2617 section 1.2), we must
1660        # try digest first (since it's the strongest auth scheme), so we record
1661        # order of calls here to check digest comes first:
1662        class RecordingOpenerDirector(OpenerDirector):
1663            def __init__(self):
1664                OpenerDirector.__init__(self)
1665                self.recorded = []
1666
1667            def record(self, info):
1668                self.recorded.append(info)
1669
1670        class TestDigestAuthHandler(mechanize.HTTPDigestAuthHandler):
1671            def http_error_401(self, *args, **kwds):
1672                self.parent.record("digest")
1673                mechanize.HTTPDigestAuthHandler.http_error_401(self, *args,
1674                                                               **kwds)
1675
1676        class TestBasicAuthHandler(mechanize.HTTPBasicAuthHandler):
1677            def http_error_401(self, *args, **kwds):
1678                self.parent.record("basic")
1679                mechanize.HTTPBasicAuthHandler.http_error_401(self, *args,
1680                                                              **kwds)
1681
1682        opener = RecordingOpenerDirector()
1683        password_manager = MockPasswordManager()
1684        digest_handler = TestDigestAuthHandler(password_manager)
1685        basic_handler = TestBasicAuthHandler(password_manager)
1686        realm = "ACME Networks"
1687        http_handler = MockHTTPHandler(
1688            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1689        opener.add_handler(digest_handler)
1690        opener.add_handler(basic_handler)
1691        opener.add_handler(http_handler)
1692        opener._maybe_reindex_handlers()
1693
1694        # check basic auth isn't blocked by digest handler failing
1695        self._test_basic_auth(
1696            opener,
1697            basic_handler,
1698            "Authorization",
1699            realm,
1700            http_handler,
1701            password_manager,
1702            "http://acme.example.com/protected",
1703            "http://acme.example.com/protected", )
1704        # check digest was tried before basic (twice, because
1705        # _test_basic_auth called .open() twice)
1706        self.assertEqual(opener.recorded, ["digest", "basic"] * 2)
1707
1708    def _test_basic_auth(self, opener, auth_handler, auth_header, realm,
1709                         http_handler, password_manager, request_url,
1710                         protected_url):
1711        import base64
1712        user, password = "wile", "coyote"
1713
1714        # .add_password() fed through to password manager
1715        auth_handler.add_password(realm, request_url, user, password)
1716        self.assertEqual(realm, password_manager.realm)
1717        self.assertEqual(request_url, password_manager.url)
1718        self.assertEqual(user, password_manager.user)
1719        self.assertEqual(password, password_manager.password)
1720
1721        opener.open(request_url)
1722
1723        # should have asked the password manager for the username/password
1724        self.assertEqual(password_manager.target_realm, realm)
1725        self.assertEqual(password_manager.target_url, protected_url)
1726
1727        # expect one request without authorization, then one with
1728        self.assertEqual(len(http_handler.requests), 2)
1729        self.assertFalse(http_handler.requests[0].has_header(auth_header))
1730        userpass = ('%s:%s' % (user, password)).encode('utf-8')
1731        auth_hdr_value = b'Basic ' + base64.b64encode(userpass).strip()
1732        self.assertEqual(http_handler.requests[1].get_header(auth_header),
1733                         auth_hdr_value.decode('ascii'))
1734
1735        # if the password manager can't find a password, the handler won't
1736        # handle the HTTP auth error
1737        password_manager.user = password_manager.password = None
1738        http_handler.reset()
1739        opener.open(request_url)
1740        self.assertEqual(len(http_handler.requests), 1)
1741        self.assertFalse(http_handler.requests[0].has_header(auth_header))
1742
1743
1744class HeadParserTests(unittest.TestCase):
1745    def test(self):
1746        from mechanize import HTTPEquivParser
1747        htmls = [
1748            (
1749                b"""<meta http-equiv=refresh content="1; http://example.com/">
1750                """, [(b"refresh", b"1; http://example.com/")]),
1751
1752            (
1753                b"""
1754                <html><head><title>\xea</title>
1755                <meta http-equiv="refresh" content="1; http://example.com/">
1756                <meta name="spam" content="eggs">
1757                <meta content="b&bsol;ar" http-equiv="f&Newline;oo">
1758                <p> <!-- p is not allowed in head, so parsing should stop -->
1759                <meta http-equiv="moo" content="cow">
1760                </html>
1761                """,
1762                [
1763                    (b"refresh", b"1; http://example.com/"),
1764                    (b"f\noo", b"b\\ar")
1765                ]),
1766
1767            (
1768                b"""<meta http-equiv="refresh">
1769                """, []),
1770
1771        ]
1772        for html, result in htmls:
1773            headers = HTTPEquivParser(html)()
1774            self.assertEqual(result, headers)
1775
1776
1777class A:
1778    def a(self):
1779        pass
1780
1781
1782class B(A):
1783    def a(self):
1784        pass
1785
1786    def b(self):
1787        pass
1788
1789
1790class C(A):
1791    def c(self):
1792        pass
1793
1794
1795class D(C, B):
1796    def a(self):
1797        pass
1798
1799    def d(self):
1800        pass
1801
1802
1803class FunctionTests(unittest.TestCase):
1804    def test_build_opener(self):
1805        class MyHTTPHandler(HTTPHandler):
1806            pass
1807
1808        class FooHandler(mechanize.BaseHandler):
1809            def foo_open(self):
1810                pass
1811
1812        class BarHandler(mechanize.BaseHandler):
1813            def bar_open(self):
1814                pass
1815
1816        o = build_opener(FooHandler, BarHandler)
1817        self.opener_has_handler(o, FooHandler)
1818        self.opener_has_handler(o, BarHandler)
1819
1820        # can take a mix of classes and instances
1821        o = build_opener(FooHandler, BarHandler())
1822        self.opener_has_handler(o, FooHandler)
1823        self.opener_has_handler(o, BarHandler)
1824
1825        # subclasses of default handlers override default handlers
1826        o = build_opener(MyHTTPHandler)
1827        self.opener_has_handler(o, MyHTTPHandler)
1828
1829        # a particular case of overriding: default handlers can be passed
1830        # in explicitly
1831        o = build_opener()
1832        self.opener_has_handler(o, HTTPHandler)
1833        o = build_opener(HTTPHandler)
1834        self.opener_has_handler(o, HTTPHandler)
1835        o = build_opener(HTTPHandler())
1836        self.opener_has_handler(o, HTTPHandler)
1837
1838        # Issue2670: multiple handlers sharing the same base class
1839        class MyOtherHTTPHandler(HTTPHandler):
1840            pass
1841
1842        o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
1843        self.opener_has_handler(o, MyHTTPHandler)
1844        self.opener_has_handler(o, MyOtherHTTPHandler)
1845
1846    def opener_has_handler(self, opener, handler_class):
1847        for h in opener.handlers:
1848            if h.__class__ == handler_class:
1849                break
1850        else:
1851            self.assertTrue(False)
1852
1853
1854class RequestTests(unittest.TestCase):
1855    def setUp(self):
1856        self.get = Request("http://www.python.org/~jeremy/")
1857        self.post = Request(
1858            "http://www.python.org/~jeremy/",
1859            "data",
1860            headers={"X-Test": "test"})
1861
1862    def test_method(self):
1863        self.assertEqual("POST", self.post.get_method())
1864        self.assertEqual("GET", self.get.get_method())
1865
1866    def test_add_data(self):
1867        self.assertTrue(not self.get.has_data())
1868        self.assertEqual("GET", self.get.get_method())
1869        self.get.add_data("spam")
1870        self.assertTrue(self.get.has_data())
1871        self.assertEqual("POST", self.get.get_method())
1872
1873    def test_get_full_url(self):
1874        self.assertEqual("http://www.python.org/~jeremy/",
1875                         self.get.get_full_url())
1876
1877    def test_selector(self):
1878        self.assertEqual("/~jeremy/", self.get.get_selector())
1879        req = Request("http://www.python.org/")
1880        self.assertEqual("/", req.get_selector())
1881
1882    def test_normalize_url(self):
1883        def t(x, expected=None):
1884            self.assertEqual(normalize_url(x), expected or x)
1885
1886        t('https://simple.com/moo%7Ese')
1887        t('https://ex.com/Spört', 'https://ex.com/Sp%C3%B6rt')
1888        t('https://ex.com/Sp%C3%B6rt')
1889
1890    def test_get_type(self):
1891        self.assertEqual("http", self.get.get_type())
1892
1893    def test_get_host(self):
1894        self.assertEqual("www.python.org", self.get.get_host())
1895
1896    def test_get_host_unquote(self):
1897        req = Request("http://www.%70ython.org/")
1898        self.assertEqual("www.python.org", req.get_host())
1899
1900    def test_proxy(self):
1901        self.assertTrue(not self.get.has_proxy())
1902        self.get.set_proxy("www.perl.org", "http")
1903        self.assertTrue(self.get.has_proxy())
1904        self.assertEqual("www.python.org", self.get.get_origin_req_host())
1905        self.assertEqual("www.perl.org", self.get.get_host())
1906
1907    def test_data(self):
1908        r = Request('https://example.com', data={'a': 1})
1909        self.assertEqual(r.get_method(), 'POST')
1910        self.assertEqual(r.get_data(), 'a=1')
1911        r = Request('https://example.com', data={'a': 1}, method='GET')
1912        self.assertEqual(r.get_method(), 'GET')
1913        self.assertEqual(r.get_data(), None)
1914        self.assertEqual(r.get_full_url(), 'https://example.com?a=1')
1915
1916
1917if __name__ == "__main__":
1918    import doctest
1919    doctest.testmod()
1920    unittest.main()
1921