1import unittest
2from test import test_support
3from test import test_urllib
4
5import os
6import socket
7import StringIO
8
9import urllib2
10from urllib2 import Request, OpenerDirector, AbstractDigestAuthHandler
11import httplib
12
13try:
14    import ssl
15except ImportError:
16    ssl = None
17
18from test.test_urllib import FakeHTTPMixin
19
20
21# XXX
22# Request
23# CacheFTPHandler (hard to write)
24# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
25
26class TrivialTests(unittest.TestCase):
27    def test_trivial(self):
28        # A couple trivial tests
29
30        self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
31
32        # XXX Name hacking to get this to work on Windows.
33        fname = os.path.abspath(urllib2.__file__).replace(os.sep, '/')
34
35        # And more hacking to get it to work on MacOS. This assumes
36        # urllib.pathname2url works, unfortunately...
37        if os.name == 'riscos':
38            import string
39            fname = os.expand(fname)
40            fname = fname.translate(string.maketrans("/.", "./"))
41
42        if os.name == 'nt':
43            file_url = "file:///%s" % fname
44        else:
45            file_url = "file://%s" % fname
46
47        f = urllib2.urlopen(file_url)
48
49        buf = f.read()
50        f.close()
51
52    def test_parse_http_list(self):
53        tests = [('a,b,c', ['a', 'b', 'c']),
54                 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
55                 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
56                 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
57        for string, list in tests:
58            self.assertEqual(urllib2.parse_http_list(string), list)
59
60    @unittest.skipUnless(ssl, "ssl module required")
61    def test_cafile_and_context(self):
62        context = ssl.create_default_context()
63        with self.assertRaises(ValueError):
64            urllib2.urlopen(
65                "https://localhost", cafile="/nonexistent/path", context=context
66            )
67
68
69def test_request_headers_dict():
70    """
71    The Request.headers dictionary is not a documented interface.  It should
72    stay that way, because the complete set of headers are only accessible
73    through the .get_header(), .has_header(), .header_items() interface.
74    However, .headers pre-dates those methods, and so real code will be using
75    the dictionary.
76
77    The introduction in 2.4 of those methods was a mistake for the same reason:
78    code that previously saw all (urllib2 user)-provided headers in .headers
79    now sees only a subset (and the function interface is ugly and incomplete).
80    A better change would have been to replace .headers dict with a dict
81    subclass (or UserDict.DictMixin instance?)  that preserved the .headers
82    interface and also provided access to the "unredirected" headers.  It's
83    probably too late to fix that, though.
84
85
86    Check .capitalize() case normalization:
87
88    >>> url = "http://example.com"
89    >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
90    'blah'
91    >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
92    'blah'
93
94    Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
95    but that could be changed in future.
96
97    """
98
99def test_request_headers_methods():
100    """
101    Note the case normalization of header names here, to .capitalize()-case.
102    This should be preserved for backwards-compatibility.  (In the HTTP case,
103    normalization to .title()-case is done by urllib2 before sending headers to
104    httplib).
105
106    >>> url = "http://example.com"
107    >>> r = Request(url, headers={"Spam-eggs": "blah"})
108    >>> r.has_header("Spam-eggs")
109    True
110    >>> r.header_items()
111    [('Spam-eggs', 'blah')]
112    >>> r.add_header("Foo-Bar", "baz")
113    >>> items = r.header_items()
114    >>> items.sort()
115    >>> items
116    [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
117
118    Note that e.g. r.has_header("spam-EggS") is currently False, and
119    r.get_header("spam-EggS") returns None, but that could be changed in
120    future.
121
122    >>> r.has_header("Not-there")
123    False
124    >>> print r.get_header("Not-there")
125    None
126    >>> r.get_header("Not-there", "default")
127    'default'
128
129    """
130
131
132def test_password_manager(self):
133    """
134    >>> mgr = urllib2.HTTPPasswordMgr()
135    >>> add = mgr.add_password
136    >>> add("Some Realm", "http://example.com/", "joe", "password")
137    >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
138    >>> add("c", "http://example.com/foo", "foo", "ni")
139    >>> add("c", "http://example.com/bar", "bar", "nini")
140    >>> add("b", "http://example.com/", "first", "blah")
141    >>> add("b", "http://example.com/", "second", "spam")
142    >>> add("a", "http://example.com", "1", "a")
143    >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
144    >>> add("Some Realm", "d.example.com", "4", "d")
145    >>> add("Some Realm", "e.example.com:3128", "5", "e")
146
147    >>> mgr.find_user_password("Some Realm", "example.com")
148    ('joe', 'password')
149    >>> mgr.find_user_password("Some Realm", "http://example.com")
150    ('joe', 'password')
151    >>> mgr.find_user_password("Some Realm", "http://example.com/")
152    ('joe', 'password')
153    >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
154    ('joe', 'password')
155    >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
156    ('joe', 'password')
157    >>> mgr.find_user_password("c", "http://example.com/foo")
158    ('foo', 'ni')
159    >>> mgr.find_user_password("c", "http://example.com/bar")
160    ('bar', 'nini')
161
162    Actually, this is really undefined ATM
163##     Currently, we use the highest-level path where more than one match:
164
165##     >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
166##     ('joe', 'password')
167
168    Use latest add_password() in case of conflict:
169
170    >>> mgr.find_user_password("b", "http://example.com/")
171    ('second', 'spam')
172
173    No special relationship between a.example.com and example.com:
174
175    >>> mgr.find_user_password("a", "http://example.com/")
176    ('1', 'a')
177    >>> mgr.find_user_password("a", "http://a.example.com/")
178    (None, None)
179
180    Ports:
181
182    >>> mgr.find_user_password("Some Realm", "c.example.com")
183    (None, None)
184    >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
185    ('3', 'c')
186    >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
187    ('3', 'c')
188    >>> mgr.find_user_password("Some Realm", "d.example.com")
189    ('4', 'd')
190    >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
191    ('5', 'e')
192
193    """
194    pass
195
196
197def test_password_manager_default_port(self):
198    """
199    >>> mgr = urllib2.HTTPPasswordMgr()
200    >>> add = mgr.add_password
201
202    The point to note here is that we can't guess the default port if there's
203    no scheme.  This applies to both add_password and find_user_password.
204
205    >>> add("f", "http://g.example.com:80", "10", "j")
206    >>> add("g", "http://h.example.com", "11", "k")
207    >>> add("h", "i.example.com:80", "12", "l")
208    >>> add("i", "j.example.com", "13", "m")
209    >>> mgr.find_user_password("f", "g.example.com:100")
210    (None, None)
211    >>> mgr.find_user_password("f", "g.example.com:80")
212    ('10', 'j')
213    >>> mgr.find_user_password("f", "g.example.com")
214    (None, None)
215    >>> mgr.find_user_password("f", "http://g.example.com:100")
216    (None, None)
217    >>> mgr.find_user_password("f", "http://g.example.com:80")
218    ('10', 'j')
219    >>> mgr.find_user_password("f", "http://g.example.com")
220    ('10', 'j')
221    >>> mgr.find_user_password("g", "h.example.com")
222    ('11', 'k')
223    >>> mgr.find_user_password("g", "h.example.com:80")
224    ('11', 'k')
225    >>> mgr.find_user_password("g", "http://h.example.com:80")
226    ('11', 'k')
227    >>> mgr.find_user_password("h", "i.example.com")
228    (None, None)
229    >>> mgr.find_user_password("h", "i.example.com:80")
230    ('12', 'l')
231    >>> mgr.find_user_password("h", "http://i.example.com:80")
232    ('12', 'l')
233    >>> mgr.find_user_password("i", "j.example.com")
234    ('13', 'm')
235    >>> mgr.find_user_password("i", "j.example.com:80")
236    (None, None)
237    >>> mgr.find_user_password("i", "http://j.example.com")
238    ('13', 'm')
239    >>> mgr.find_user_password("i", "http://j.example.com:80")
240    (None, None)
241
242    """
243
244class MockOpener:
245    addheaders = []
246    def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
247        self.req, self.data, self.timeout  = req, data, timeout
248    def error(self, proto, *args):
249        self.proto, self.args = proto, args
250
251class MockFile:
252    def read(self, count=None): pass
253    def readline(self, count=None): pass
254    def close(self): pass
255
256class MockHeaders(dict):
257    def getheaders(self, name):
258        return self.values()
259
260class MockResponse(StringIO.StringIO):
261    def __init__(self, code, msg, headers, data, url=None):
262        StringIO.StringIO.__init__(self, data)
263        self.code, self.msg, self.headers, self.url = code, msg, headers, url
264    def info(self):
265        return self.headers
266    def geturl(self):
267        return self.url
268
269class MockCookieJar:
270    def add_cookie_header(self, request):
271        self.ach_req = request
272    def extract_cookies(self, response, request):
273        self.ec_req, self.ec_r = request, response
274
275class FakeMethod:
276    def __init__(self, meth_name, action, handle):
277        self.meth_name = meth_name
278        self.handle = handle
279        self.action = action
280    def __call__(self, *args):
281        return self.handle(self.meth_name, self.action, *args)
282
283class MockHTTPResponse:
284    def __init__(self, fp, msg, status, reason):
285        self.fp = fp
286        self.msg = msg
287        self.status = status
288        self.reason = reason
289    def read(self):
290        return ''
291
292class MockHTTPClass:
293    def __init__(self):
294        self.req_headers = []
295        self.data = None
296        self.raise_on_endheaders = False
297        self._tunnel_headers = {}
298
299    def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
300        self.host = host
301        self.timeout = timeout
302        return self
303
304    def set_debuglevel(self, level):
305        self.level = level
306
307    def set_tunnel(self, host, port=None, headers=None):
308        self._tunnel_host = host
309        self._tunnel_port = port
310        if headers:
311            self._tunnel_headers = headers
312        else:
313            self._tunnel_headers.clear()
314
315    def request(self, method, url, body=None, headers=None):
316        self.method = method
317        self.selector = url
318        if headers is not None:
319            self.req_headers += headers.items()
320        self.req_headers.sort()
321        if body:
322            self.data = body
323        if self.raise_on_endheaders:
324            import socket
325            raise socket.error()
326
327    def getresponse(self):
328        return MockHTTPResponse(MockFile(), {}, 200, "OK")
329
330    def close(self):
331        pass
332
333class MockHandler:
334    # useful for testing handler machinery
335    # see add_ordered_mock_handlers() docstring
336    handler_order = 500
337    def __init__(self, methods):
338        self._define_methods(methods)
339    def _define_methods(self, methods):
340        for spec in methods:
341            if len(spec) == 2: name, action = spec
342            else: name, action = spec, None
343            meth = FakeMethod(name, action, self.handle)
344            setattr(self.__class__, name, meth)
345    def handle(self, fn_name, action, *args, **kwds):
346        self.parent.calls.append((self, fn_name, args, kwds))
347        if action is None:
348            return None
349        elif action == "return self":
350            return self
351        elif action == "return response":
352            res = MockResponse(200, "OK", {}, "")
353            return res
354        elif action == "return request":
355            return Request("http://blah/")
356        elif action.startswith("error"):
357            code = action[action.rfind(" ")+1:]
358            try:
359                code = int(code)
360            except ValueError:
361                pass
362            res = MockResponse(200, "OK", {}, "")
363            return self.parent.error("http", args[0], res, code, "", {})
364        elif action == "raise":
365            raise urllib2.URLError("blah")
366        assert False
367    def close(self): pass
368    def add_parent(self, parent):
369        self.parent = parent
370        self.parent.calls = []
371    def __lt__(self, other):
372        if not hasattr(other, "handler_order"):
373            # No handler_order, leave in original order.  Yuck.
374            return True
375        return self.handler_order < other.handler_order
376
377def add_ordered_mock_handlers(opener, meth_spec):
378    """Create MockHandlers and add them to an OpenerDirector.
379
380    meth_spec: list of lists of tuples and strings defining methods to define
381    on handlers.  eg:
382
383    [["http_error", "ftp_open"], ["http_open"]]
384
385    defines methods .http_error() and .ftp_open() on one handler, and
386    .http_open() on another.  These methods just record their arguments and
387    return None.  Using a tuple instead of a string causes the method to
388    perform some action (see MockHandler.handle()), eg:
389
390    [["http_error"], [("http_open", "return request")]]
391
392    defines .http_error() on one handler (which simply returns None), and
393    .http_open() on another handler, which returns a Request object.
394
395    """
396    handlers = []
397    count = 0
398    for meths in meth_spec:
399        class MockHandlerSubclass(MockHandler): pass
400        h = MockHandlerSubclass(meths)
401        h.handler_order += count
402        h.add_parent(opener)
403        count = count + 1
404        handlers.append(h)
405        opener.add_handler(h)
406    return handlers
407
408def build_test_opener(*handler_instances):
409    opener = OpenerDirector()
410    for h in handler_instances:
411        opener.add_handler(h)
412    return opener
413
414class MockHTTPHandler(urllib2.BaseHandler):
415    # useful for testing redirections and auth
416    # sends supplied headers and code as first response
417    # sends 200 OK as second response
418    def __init__(self, code, headers):
419        self.code = code
420        self.headers = headers
421        self.reset()
422    def reset(self):
423        self._count = 0
424        self.requests = []
425    def http_open(self, req):
426        import mimetools, copy
427        from StringIO import StringIO
428        self.requests.append(copy.deepcopy(req))
429        if self._count == 0:
430            self._count = self._count + 1
431            name = httplib.responses[self.code]
432            msg = mimetools.Message(StringIO(self.headers))
433            return self.parent.error(
434                "http", req, MockFile(), self.code, name, msg)
435        else:
436            self.req = req
437            msg = mimetools.Message(StringIO("\r\n\r\n"))
438            return MockResponse(200, "OK", msg, "", req.get_full_url())
439
440class MockHTTPSHandler(urllib2.AbstractHTTPHandler):
441    # Useful for testing the Proxy-Authorization request by verifying the
442    # properties of httpcon
443
444    def __init__(self):
445        urllib2.AbstractHTTPHandler.__init__(self)
446        self.httpconn = MockHTTPClass()
447
448    def https_open(self, req):
449        return self.do_open(self.httpconn, req)
450
451class MockPasswordManager:
452    def add_password(self, realm, uri, user, password):
453        self.realm = realm
454        self.url = uri
455        self.user = user
456        self.password = password
457    def find_user_password(self, realm, authuri):
458        self.target_realm = realm
459        self.target_url = authuri
460        return self.user, self.password
461
462
463class OpenerDirectorTests(unittest.TestCase):
464
465    def test_add_non_handler(self):
466        class NonHandler(object):
467            pass
468        self.assertRaises(TypeError,
469                          OpenerDirector().add_handler, NonHandler())
470
471    def test_badly_named_methods(self):
472        # test work-around for three methods that accidentally follow the
473        # naming conventions for handler methods
474        # (*_open() / *_request() / *_response())
475
476        # These used to call the accidentally-named methods, causing a
477        # TypeError in real code; here, returning self from these mock
478        # methods would either cause no exception, or AttributeError.
479
480        from urllib2 import URLError
481
482        o = OpenerDirector()
483        meth_spec = [
484            [("do_open", "return self"), ("proxy_open", "return self")],
485            [("redirect_request", "return self")],
486            ]
487        handlers = add_ordered_mock_handlers(o, meth_spec)
488        o.add_handler(urllib2.UnknownHandler())
489        for scheme in "do", "proxy", "redirect":
490            self.assertRaises(URLError, o.open, scheme+"://example.com/")
491
492    def test_handled(self):
493        # handler returning non-None means no more handlers will be called
494        o = OpenerDirector()
495        meth_spec = [
496            ["http_open", "ftp_open", "http_error_302"],
497            ["ftp_open"],
498            [("http_open", "return self")],
499            [("http_open", "return self")],
500            ]
501        handlers = add_ordered_mock_handlers(o, meth_spec)
502
503        req = Request("http://example.com/")
504        r = o.open(req)
505        # Second .http_open() gets called, third doesn't, since second returned
506        # non-None.  Handlers without .http_open() never get any methods called
507        # on them.
508        # In fact, second mock handler defining .http_open() returns self
509        # (instead of response), which becomes the OpenerDirector's return
510        # value.
511        self.assertEqual(r, handlers[2])
512        calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
513        for expected, got in zip(calls, o.calls):
514            handler, name, args, kwds = got
515            self.assertEqual((handler, name), expected)
516            self.assertEqual(args, (req,))
517
518    def test_handler_order(self):
519        o = OpenerDirector()
520        handlers = []
521        for meths, handler_order in [
522            ([("http_open", "return self")], 500),
523            (["http_open"], 0),
524            ]:
525            class MockHandlerSubclass(MockHandler): pass
526            h = MockHandlerSubclass(meths)
527            h.handler_order = handler_order
528            handlers.append(h)
529            o.add_handler(h)
530
531        r = o.open("http://example.com/")
532        # handlers called in reverse order, thanks to their sort order
533        self.assertEqual(o.calls[0][0], handlers[1])
534        self.assertEqual(o.calls[1][0], handlers[0])
535
536    def test_raise(self):
537        # raising URLError stops processing of request
538        o = OpenerDirector()
539        meth_spec = [
540            [("http_open", "raise")],
541            [("http_open", "return self")],
542            ]
543        handlers = add_ordered_mock_handlers(o, meth_spec)
544
545        req = Request("http://example.com/")
546        self.assertRaises(urllib2.URLError, o.open, req)
547        self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
548
549##     def test_error(self):
550##         # XXX this doesn't actually seem to be used in standard library,
551##         #  but should really be tested anyway...
552
553    def test_http_error(self):
554        # XXX http_error_default
555        # http errors are a special case
556        o = OpenerDirector()
557        meth_spec = [
558            [("http_open", "error 302")],
559            [("http_error_400", "raise"), "http_open"],
560            [("http_error_302", "return response"), "http_error_303",
561             "http_error"],
562            [("http_error_302")],
563            ]
564        handlers = add_ordered_mock_handlers(o, meth_spec)
565
566        class Unknown:
567            def __eq__(self, other): return True
568
569        req = Request("http://example.com/")
570        r = o.open(req)
571        assert len(o.calls) == 2
572        calls = [(handlers[0], "http_open", (req,)),
573                 (handlers[2], "http_error_302",
574                  (req, Unknown(), 302, "", {}))]
575        for expected, got in zip(calls, o.calls):
576            handler, method_name, args = expected
577            self.assertEqual((handler, method_name), got[:2])
578            self.assertEqual(args, got[2])
579
580    def test_processors(self):
581        # *_request / *_response methods get called appropriately
582        o = OpenerDirector()
583        meth_spec = [
584            [("http_request", "return request"),
585             ("http_response", "return response")],
586            [("http_request", "return request"),
587             ("http_response", "return response")],
588            ]
589        handlers = add_ordered_mock_handlers(o, meth_spec)
590
591        req = Request("http://example.com/")
592        r = o.open(req)
593        # processor methods are called on *all* handlers that define them,
594        # not just the first handler that handles the request
595        calls = [
596            (handlers[0], "http_request"), (handlers[1], "http_request"),
597            (handlers[0], "http_response"), (handlers[1], "http_response")]
598
599        for i, (handler, name, args, kwds) in enumerate(o.calls):
600            if i < 2:
601                # *_request
602                self.assertEqual((handler, name), calls[i])
603                self.assertEqual(len(args), 1)
604                self.assertIsInstance(args[0], Request)
605            else:
606                # *_response
607                self.assertEqual((handler, name), calls[i])
608                self.assertEqual(len(args), 2)
609                self.assertIsInstance(args[0], Request)
610                # response from opener.open is None, because there's no
611                # handler that defines http_open to handle it
612                if args[1] is not None:
613                    self.assertIsInstance(args[1], MockResponse)
614
615
616def sanepathname2url(path):
617    import urllib
618    urlpath = urllib.pathname2url(path)
619    if os.name == "nt" and urlpath.startswith("///"):
620        urlpath = urlpath[2:]
621    # XXX don't ask me about the mac...
622    return urlpath
623
624class HandlerTests(unittest.TestCase):
625
626    def test_ftp(self):
627        class MockFTPWrapper:
628            def __init__(self, data): self.data = data
629            def retrfile(self, filename, filetype):
630                self.filename, self.filetype = filename, filetype
631                return StringIO.StringIO(self.data), len(self.data)
632            def close(self): pass
633
634        class NullFTPHandler(urllib2.FTPHandler):
635            def __init__(self, data): self.data = data
636            def connect_ftp(self, user, passwd, host, port, dirs,
637                            timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
638                self.user, self.passwd = user, passwd
639                self.host, self.port = host, port
640                self.dirs = dirs
641                self.ftpwrapper = MockFTPWrapper(self.data)
642                return self.ftpwrapper
643
644        import ftplib
645        data = "rheum rhaponicum"
646        h = NullFTPHandler(data)
647        o = h.parent = MockOpener()
648
649        for url, host, port, user, passwd, type_, dirs, filename, mimetype in [
650            ("ftp://localhost/foo/bar/baz.html",
651             "localhost", ftplib.FTP_PORT, "", "", "I",
652             ["foo", "bar"], "baz.html", "text/html"),
653            ("ftp://parrot@localhost/foo/bar/baz.html",
654             "localhost", ftplib.FTP_PORT, "parrot", "", "I",
655             ["foo", "bar"], "baz.html", "text/html"),
656            ("ftp://%25parrot@localhost/foo/bar/baz.html",
657             "localhost", ftplib.FTP_PORT, "%parrot", "", "I",
658             ["foo", "bar"], "baz.html", "text/html"),
659            ("ftp://%2542parrot@localhost/foo/bar/baz.html",
660             "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",
661             ["foo", "bar"], "baz.html", "text/html"),
662            ("ftp://localhost:80/foo/bar/",
663             "localhost", 80, "", "", "D",
664             ["foo", "bar"], "", None),
665            ("ftp://localhost/baz.gif;type=a",
666             "localhost", ftplib.FTP_PORT, "", "", "A",
667             [], "baz.gif", None),  # XXX really this should guess image/gif
668            ]:
669            req = Request(url)
670            req.timeout = None
671            r = h.ftp_open(req)
672            # ftp authentication not yet implemented by FTPHandler
673            self.assertEqual(h.user, user)
674            self.assertEqual(h.passwd, passwd)
675            self.assertEqual(h.host, socket.gethostbyname(host))
676            self.assertEqual(h.port, port)
677            self.assertEqual(h.dirs, dirs)
678            self.assertEqual(h.ftpwrapper.filename, filename)
679            self.assertEqual(h.ftpwrapper.filetype, type_)
680            headers = r.info()
681            self.assertEqual(headers.get("Content-type"), mimetype)
682            self.assertEqual(int(headers["Content-length"]), len(data))
683
684    def test_file(self):
685        import rfc822, socket
686        h = urllib2.FileHandler()
687        o = h.parent = MockOpener()
688
689        TESTFN = test_support.TESTFN
690        urlpath = sanepathname2url(os.path.abspath(TESTFN))
691        towrite = "hello, world\n"
692        urls = [
693            "file://localhost%s" % urlpath,
694            "file://%s" % urlpath,
695            "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
696            ]
697        try:
698            localaddr = socket.gethostbyname(socket.gethostname())
699        except socket.gaierror:
700            localaddr = ''
701        if localaddr:
702            urls.append("file://%s%s" % (localaddr, urlpath))
703
704        for url in urls:
705            f = open(TESTFN, "wb")
706            try:
707                try:
708                    f.write(towrite)
709                finally:
710                    f.close()
711
712                r = h.file_open(Request(url))
713                try:
714                    data = r.read()
715                    headers = r.info()
716                    respurl = r.geturl()
717                finally:
718                    r.close()
719                stats = os.stat(TESTFN)
720                modified = rfc822.formatdate(stats.st_mtime)
721            finally:
722                os.remove(TESTFN)
723            self.assertEqual(data, towrite)
724            self.assertEqual(headers["Content-type"], "text/plain")
725            self.assertEqual(headers["Content-length"], "13")
726            self.assertEqual(headers["Last-modified"], modified)
727            self.assertEqual(respurl, url)
728
729        for url in [
730            "file://localhost:80%s" % urlpath,
731            "file:///file_does_not_exist.txt",
732            "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
733                                   os.getcwd(), TESTFN),
734            "file://somerandomhost.ontheinternet.com%s/%s" %
735            (os.getcwd(), TESTFN),
736            ]:
737            try:
738                f = open(TESTFN, "wb")
739                try:
740                    f.write(towrite)
741                finally:
742                    f.close()
743
744                self.assertRaises(urllib2.URLError,
745                                  h.file_open, Request(url))
746            finally:
747                os.remove(TESTFN)
748
749        h = urllib2.FileHandler()
750        o = h.parent = MockOpener()
751        # XXXX why does // mean ftp (and /// mean not ftp!), and where
752        #  is file: scheme specified?  I think this is really a bug, and
753        #  what was intended was to distinguish between URLs like:
754        # file:/blah.txt (a file)
755        # file://localhost/blah.txt (a file)
756        # file:///blah.txt (a file)
757        # file://ftp.example.com/blah.txt (an ftp URL)
758        for url, ftp in [
759            ("file://ftp.example.com//foo.txt", True),
760            ("file://ftp.example.com///foo.txt", False),
761# XXXX bug: fails with OSError, should be URLError
762            ("file://ftp.example.com/foo.txt", False),
763            ("file://somehost//foo/something.txt", True),
764            ("file://localhost//foo/something.txt", False),
765            ]:
766            req = Request(url)
767            try:
768                h.file_open(req)
769            # XXXX remove OSError when bug fixed
770            except (urllib2.URLError, OSError):
771                self.assertTrue(not ftp)
772            else:
773                self.assertTrue(o.req is req)
774                self.assertEqual(req.type, "ftp")
775            self.assertEqual(req.type == "ftp", ftp)
776
777    def test_http(self):
778
779        h = urllib2.AbstractHTTPHandler()
780        o = h.parent = MockOpener()
781
782        url = "http://example.com/"
783        for method, data in [("GET", None), ("POST", "blah")]:
784            req = Request(url, data, {"Foo": "bar"})
785            req.timeout = None
786            req.add_unredirected_header("Spam", "eggs")
787            http = MockHTTPClass()
788            r = h.do_open(http, req)
789
790            # result attributes
791            r.read; r.readline  # wrapped MockFile methods
792            r.info; r.geturl  # addinfourl methods
793            r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
794            hdrs = r.info()
795            hdrs.get; hdrs.has_key  # r.info() gives dict from .getreply()
796            self.assertEqual(r.geturl(), url)
797
798            self.assertEqual(http.host, "example.com")
799            self.assertEqual(http.level, 0)
800            self.assertEqual(http.method, method)
801            self.assertEqual(http.selector, "/")
802            self.assertEqual(http.req_headers,
803                             [("Connection", "close"),
804                              ("Foo", "bar"), ("Spam", "eggs")])
805            self.assertEqual(http.data, data)
806
807        # check socket.error converted to URLError
808        http.raise_on_endheaders = True
809        self.assertRaises(urllib2.URLError, h.do_open, http, req)
810
811        # check adding of standard headers
812        o.addheaders = [("Spam", "eggs")]
813        for data in "", None:  # POST, GET
814            req = Request("http://example.com/", data)
815            r = MockResponse(200, "OK", {}, "")
816            newreq = h.do_request_(req)
817            if data is None:  # GET
818                self.assertNotIn("Content-length", req.unredirected_hdrs)
819                self.assertNotIn("Content-type", req.unredirected_hdrs)
820            else:  # POST
821                self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
822                self.assertEqual(req.unredirected_hdrs["Content-type"],
823                             "application/x-www-form-urlencoded")
824            # XXX the details of Host could be better tested
825            self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
826            self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
827
828            # don't clobber existing headers
829            req.add_unredirected_header("Content-length", "foo")
830            req.add_unredirected_header("Content-type", "bar")
831            req.add_unredirected_header("Host", "baz")
832            req.add_unredirected_header("Spam", "foo")
833            newreq = h.do_request_(req)
834            self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
835            self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
836            self.assertEqual(req.unredirected_hdrs["Host"], "baz")
837            self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
838
839    def test_http_doubleslash(self):
840        # Checks that the presence of an unnecessary double slash in a url doesn't break anything
841        # Previously, a double slash directly after the host could cause incorrect parsing of the url
842        h = urllib2.AbstractHTTPHandler()
843        o = h.parent = MockOpener()
844
845        data = ""
846        ds_urls = [
847            "http://example.com/foo/bar/baz.html",
848            "http://example.com//foo/bar/baz.html",
849            "http://example.com/foo//bar/baz.html",
850            "http://example.com/foo/bar//baz.html",
851        ]
852
853        for ds_url in ds_urls:
854            ds_req = Request(ds_url, data)
855
856            # Check whether host is determined correctly if there is no proxy
857            np_ds_req = h.do_request_(ds_req)
858            self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
859
860            # Check whether host is determined correctly if there is a proxy
861            ds_req.set_proxy("someproxy:3128",None)
862            p_ds_req = h.do_request_(ds_req)
863            self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
864
865    def test_fixpath_in_weirdurls(self):
866        # Issue4493: urllib2 to supply '/' when to urls where path does not
867        # start with'/'
868
869        h = urllib2.AbstractHTTPHandler()
870        o = h.parent = MockOpener()
871
872        weird_url = 'http://www.python.org?getspam'
873        req = Request(weird_url)
874        newreq = h.do_request_(req)
875        self.assertEqual(newreq.get_host(),'www.python.org')
876        self.assertEqual(newreq.get_selector(),'/?getspam')
877
878        url_without_path = 'http://www.python.org'
879        req = Request(url_without_path)
880        newreq = h.do_request_(req)
881        self.assertEqual(newreq.get_host(),'www.python.org')
882        self.assertEqual(newreq.get_selector(),'')
883
884    def test_errors(self):
885        h = urllib2.HTTPErrorProcessor()
886        o = h.parent = MockOpener()
887
888        url = "http://example.com/"
889        req = Request(url)
890        # all 2xx are passed through
891        r = MockResponse(200, "OK", {}, "", url)
892        newr = h.http_response(req, r)
893        self.assertTrue(r is newr)
894        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
895        r = MockResponse(202, "Accepted", {}, "", url)
896        newr = h.http_response(req, r)
897        self.assertTrue(r is newr)
898        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
899        r = MockResponse(206, "Partial content", {}, "", url)
900        newr = h.http_response(req, r)
901        self.assertTrue(r is newr)
902        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
903        # anything else calls o.error (and MockOpener returns None, here)
904        r = MockResponse(502, "Bad gateway", {}, "", url)
905        self.assertTrue(h.http_response(req, r) is None)
906        self.assertEqual(o.proto, "http")  # o.error called
907        self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
908
909    def test_cookies(self):
910        cj = MockCookieJar()
911        h = urllib2.HTTPCookieProcessor(cj)
912        o = h.parent = MockOpener()
913
914        req = Request("http://example.com/")
915        r = MockResponse(200, "OK", {}, "")
916        newreq = h.http_request(req)
917        self.assertTrue(cj.ach_req is req is newreq)
918        self.assertEqual(req.get_origin_req_host(), "example.com")
919        self.assertTrue(not req.is_unverifiable())
920        newr = h.http_response(req, r)
921        self.assertTrue(cj.ec_req is req)
922        self.assertTrue(cj.ec_r is r is newr)
923
924    def test_redirect(self):
925        from_url = "http://example.com/a.html"
926        to_url = "http://example.com/b.html"
927        h = urllib2.HTTPRedirectHandler()
928        o = h.parent = MockOpener()
929
930        # ordinary redirect behaviour
931        for code in 301, 302, 303, 307:
932            for data in None, "blah\nblah\n":
933                method = getattr(h, "http_error_%s" % code)
934                req = Request(from_url, data)
935                req.add_header("Nonsense", "viking=withhold")
936                req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
937                if data is not None:
938                    req.add_header("Content-Length", str(len(data)))
939                req.add_unredirected_header("Spam", "spam")
940                try:
941                    method(req, MockFile(), code, "Blah",
942                           MockHeaders({"location": to_url}))
943                except urllib2.HTTPError:
944                    # 307 in response to POST requires user OK
945                    self.assertEqual(code, 307)
946                    self.assertIsNotNone(data)
947                self.assertEqual(o.req.get_full_url(), to_url)
948                try:
949                    self.assertEqual(o.req.get_method(), "GET")
950                except AttributeError:
951                    self.assertTrue(not o.req.has_data())
952
953                # now it's a GET, there should not be headers regarding content
954                # (possibly dragged from before being a POST)
955                headers = [x.lower() for x in o.req.headers]
956                self.assertNotIn("content-length", headers)
957                self.assertNotIn("content-type", headers)
958
959                self.assertEqual(o.req.headers["Nonsense"],
960                                 "viking=withhold")
961                self.assertNotIn("Spam", o.req.headers)
962                self.assertNotIn("Spam", o.req.unredirected_hdrs)
963
964        # loop detection
965        req = Request(from_url)
966        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
967        def redirect(h, req, url=to_url):
968            h.http_error_302(req, MockFile(), 302, "Blah",
969                             MockHeaders({"location": url}))
970        # Note that the *original* request shares the same record of
971        # redirections with the sub-requests caused by the redirections.
972
973        # detect infinite loop redirect of a URL to itself
974        req = Request(from_url, origin_req_host="example.com")
975        count = 0
976        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
977        try:
978            while 1:
979                redirect(h, req, "http://example.com/")
980                count = count + 1
981        except urllib2.HTTPError:
982            # don't stop until max_repeats, because cookies may introduce state
983            self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
984
985        # detect endless non-repeating chain of redirects
986        req = Request(from_url, origin_req_host="example.com")
987        count = 0
988        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
989        try:
990            while 1:
991                redirect(h, req, "http://example.com/%d" % count)
992                count = count + 1
993        except urllib2.HTTPError:
994            self.assertEqual(count,
995                             urllib2.HTTPRedirectHandler.max_redirections)
996
997    def test_invalid_redirect(self):
998        from_url = "http://example.com/a.html"
999        valid_schemes = ['http', 'https', 'ftp']
1000        invalid_schemes = ['file', 'imap', 'ldap']
1001        schemeless_url = "example.com/b.html"
1002        h = urllib2.HTTPRedirectHandler()
1003        o = h.parent = MockOpener()
1004        req = Request(from_url)
1005        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
1006
1007        for scheme in invalid_schemes:
1008            invalid_url = scheme + '://' + schemeless_url
1009            self.assertRaises(urllib2.HTTPError, h.http_error_302,
1010                              req, MockFile(), 302, "Security Loophole",
1011                              MockHeaders({"location": invalid_url}))
1012
1013        for scheme in valid_schemes:
1014            valid_url = scheme + '://' + schemeless_url
1015            h.http_error_302(req, MockFile(), 302, "That's fine",
1016                MockHeaders({"location": valid_url}))
1017            self.assertEqual(o.req.get_full_url(), valid_url)
1018
1019    def test_cookie_redirect(self):
1020        # cookies shouldn't leak into redirected requests
1021        from cookielib import CookieJar
1022
1023        from test.test_cookielib import interact_netscape
1024
1025        cj = CookieJar()
1026        interact_netscape(cj, "http://www.example.com/", "spam=eggs")
1027        hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
1028        hdeh = urllib2.HTTPDefaultErrorHandler()
1029        hrh = urllib2.HTTPRedirectHandler()
1030        cp = urllib2.HTTPCookieProcessor(cj)
1031        o = build_test_opener(hh, hdeh, hrh, cp)
1032        o.open("http://www.example.com/")
1033        self.assertTrue(not hh.req.has_header("Cookie"))
1034
1035    def test_redirect_fragment(self):
1036        redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
1037        hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
1038        hdeh = urllib2.HTTPDefaultErrorHandler()
1039        hrh = urllib2.HTTPRedirectHandler()
1040        o = build_test_opener(hh, hdeh, hrh)
1041        fp = o.open('http://www.example.com')
1042        self.assertEqual(fp.geturl(), redirected_url.strip())
1043
1044    def test_redirect_no_path(self):
1045        # Issue 14132: Relative redirect strips original path
1046        real_class = httplib.HTTPConnection
1047        response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n"
1048        httplib.HTTPConnection = test_urllib.fakehttp(response1)
1049        self.addCleanup(setattr, httplib, "HTTPConnection", real_class)
1050        urls = iter(("/path", "/path?query"))
1051        def request(conn, method, url, *pos, **kw):
1052            self.assertEqual(url, next(urls))
1053            real_class.request(conn, method, url, *pos, **kw)
1054            # Change response for subsequent connection
1055            conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!"
1056        httplib.HTTPConnection.request = request
1057        fp = urllib2.urlopen("http://python.org/path")
1058        self.assertEqual(fp.geturl(), "http://python.org/path?query")
1059
1060    def test_proxy(self):
1061        o = OpenerDirector()
1062        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
1063        o.add_handler(ph)
1064        meth_spec = [
1065            [("http_open", "return response")]
1066            ]
1067        handlers = add_ordered_mock_handlers(o, meth_spec)
1068
1069        req = Request("http://acme.example.com/")
1070        self.assertEqual(req.get_host(), "acme.example.com")
1071        r = o.open(req)
1072        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1073
1074        self.assertEqual([(handlers[0], "http_open")],
1075                         [tup[0:2] for tup in o.calls])
1076
1077    def test_proxy_no_proxy(self):
1078        os.environ['no_proxy'] = 'python.org'
1079        o = OpenerDirector()
1080        ph = urllib2.ProxyHandler(dict(http="proxy.example.com"))
1081        o.add_handler(ph)
1082        req = Request("http://www.perl.org/")
1083        self.assertEqual(req.get_host(), "www.perl.org")
1084        r = o.open(req)
1085        self.assertEqual(req.get_host(), "proxy.example.com")
1086        req = Request("http://www.python.org")
1087        self.assertEqual(req.get_host(), "www.python.org")
1088        r = o.open(req)
1089        self.assertEqual(req.get_host(), "www.python.org")
1090        del os.environ['no_proxy']
1091
1092
1093    def test_proxy_https(self):
1094        o = OpenerDirector()
1095        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
1096        o.add_handler(ph)
1097        meth_spec = [
1098            [("https_open","return response")]
1099        ]
1100        handlers = add_ordered_mock_handlers(o, meth_spec)
1101        req = Request("https://www.example.com/")
1102        self.assertEqual(req.get_host(), "www.example.com")
1103        r = o.open(req)
1104        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1105        self.assertEqual([(handlers[0], "https_open")],
1106                         [tup[0:2] for tup in o.calls])
1107
1108    def test_proxy_https_proxy_authorization(self):
1109        o = OpenerDirector()
1110        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
1111        o.add_handler(ph)
1112        https_handler = MockHTTPSHandler()
1113        o.add_handler(https_handler)
1114        req = Request("https://www.example.com/")
1115        req.add_header("Proxy-Authorization","FooBar")
1116        req.add_header("User-Agent","Grail")
1117        self.assertEqual(req.get_host(), "www.example.com")
1118        self.assertIsNone(req._tunnel_host)
1119        r = o.open(req)
1120        # Verify Proxy-Authorization gets tunneled to request.
1121        # httpsconn req_headers do not have the Proxy-Authorization header but
1122        # the req will have.
1123        self.assertNotIn(("Proxy-Authorization","FooBar"),
1124                         https_handler.httpconn.req_headers)
1125        self.assertIn(("User-Agent","Grail"),
1126                      https_handler.httpconn.req_headers)
1127        self.assertIsNotNone(req._tunnel_host)
1128        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1129        self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
1130
1131    def test_basic_auth(self, quote_char='"'):
1132        opener = OpenerDirector()
1133        password_manager = MockPasswordManager()
1134        auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
1135        realm = "ACME Widget Store"
1136        http_handler = MockHTTPHandler(
1137            401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
1138            (quote_char, realm, quote_char) )
1139        opener.add_handler(auth_handler)
1140        opener.add_handler(http_handler)
1141        self._test_basic_auth(opener, auth_handler, "Authorization",
1142                              realm, http_handler, password_manager,
1143                              "http://acme.example.com/protected",
1144                              "http://acme.example.com/protected"
1145                             )
1146
1147    def test_basic_auth_with_single_quoted_realm(self):
1148        self.test_basic_auth(quote_char="'")
1149
1150    def test_basic_auth_with_unquoted_realm(self):
1151        opener = OpenerDirector()
1152        password_manager = MockPasswordManager()
1153        auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
1154        realm = "ACME Widget Store"
1155        http_handler = MockHTTPHandler(
1156            401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
1157        opener.add_handler(auth_handler)
1158        opener.add_handler(http_handler)
1159        msg = "Basic Auth Realm was unquoted"
1160        with test_support.check_warnings((msg, UserWarning)):
1161            self._test_basic_auth(opener, auth_handler, "Authorization",
1162                                  realm, http_handler, password_manager,
1163                                  "http://acme.example.com/protected",
1164                                  "http://acme.example.com/protected"
1165                                 )
1166
1167
1168    def test_proxy_basic_auth(self):
1169        opener = OpenerDirector()
1170        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
1171        opener.add_handler(ph)
1172        password_manager = MockPasswordManager()
1173        auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
1174        realm = "ACME Networks"
1175        http_handler = MockHTTPHandler(
1176            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1177        opener.add_handler(auth_handler)
1178        opener.add_handler(http_handler)
1179        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
1180                              realm, http_handler, password_manager,
1181                              "http://acme.example.com:3128/protected",
1182                              "proxy.example.com:3128",
1183                              )
1184
1185    def test_basic_and_digest_auth_handlers(self):
1186        # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
1187        # response (http://python.org/sf/1479302), where it should instead
1188        # return None to allow another handler (especially
1189        # HTTPBasicAuthHandler) to handle the response.
1190
1191        # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
1192        # try digest first (since it's the strongest auth scheme), so we record
1193        # order of calls here to check digest comes first:
1194        class RecordingOpenerDirector(OpenerDirector):
1195            def __init__(self):
1196                OpenerDirector.__init__(self)
1197                self.recorded = []
1198            def record(self, info):
1199                self.recorded.append(info)
1200        class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
1201            def http_error_401(self, *args, **kwds):
1202                self.parent.record("digest")
1203                urllib2.HTTPDigestAuthHandler.http_error_401(self,
1204                                                             *args, **kwds)
1205        class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
1206            def http_error_401(self, *args, **kwds):
1207                self.parent.record("basic")
1208                urllib2.HTTPBasicAuthHandler.http_error_401(self,
1209                                                            *args, **kwds)
1210
1211        opener = RecordingOpenerDirector()
1212        password_manager = MockPasswordManager()
1213        digest_handler = TestDigestAuthHandler(password_manager)
1214        basic_handler = TestBasicAuthHandler(password_manager)
1215        realm = "ACME Networks"
1216        http_handler = MockHTTPHandler(
1217            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1218        opener.add_handler(basic_handler)
1219        opener.add_handler(digest_handler)
1220        opener.add_handler(http_handler)
1221
1222        # check basic auth isn't blocked by digest handler failing
1223        self._test_basic_auth(opener, basic_handler, "Authorization",
1224                              realm, http_handler, password_manager,
1225                              "http://acme.example.com/protected",
1226                              "http://acme.example.com/protected",
1227                              )
1228        # check digest was tried before basic (twice, because
1229        # _test_basic_auth called .open() twice)
1230        self.assertEqual(opener.recorded, ["digest", "basic"]*2)
1231
1232    def _test_basic_auth(self, opener, auth_handler, auth_header,
1233                         realm, http_handler, password_manager,
1234                         request_url, protected_url):
1235        import base64
1236        user, password = "wile", "coyote"
1237
1238        # .add_password() fed through to password manager
1239        auth_handler.add_password(realm, request_url, user, password)
1240        self.assertEqual(realm, password_manager.realm)
1241        self.assertEqual(request_url, password_manager.url)
1242        self.assertEqual(user, password_manager.user)
1243        self.assertEqual(password, password_manager.password)
1244
1245        r = opener.open(request_url)
1246
1247        # should have asked the password manager for the username/password
1248        self.assertEqual(password_manager.target_realm, realm)
1249        self.assertEqual(password_manager.target_url, protected_url)
1250
1251        # expect one request without authorization, then one with
1252        self.assertEqual(len(http_handler.requests), 2)
1253        self.assertFalse(http_handler.requests[0].has_header(auth_header))
1254        userpass = '%s:%s' % (user, password)
1255        auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
1256        self.assertEqual(http_handler.requests[1].get_header(auth_header),
1257                         auth_hdr_value)
1258        self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],
1259                         auth_hdr_value)
1260        # if the password manager can't find a password, the handler won't
1261        # handle the HTTP auth error
1262        password_manager.user = password_manager.password = None
1263        http_handler.reset()
1264        r = opener.open(request_url)
1265        self.assertEqual(len(http_handler.requests), 1)
1266        self.assertFalse(http_handler.requests[0].has_header(auth_header))
1267
1268class MiscTests(unittest.TestCase, FakeHTTPMixin):
1269
1270    def test_build_opener(self):
1271        class MyHTTPHandler(urllib2.HTTPHandler): pass
1272        class FooHandler(urllib2.BaseHandler):
1273            def foo_open(self): pass
1274        class BarHandler(urllib2.BaseHandler):
1275            def bar_open(self): pass
1276
1277        build_opener = urllib2.build_opener
1278
1279        o = build_opener(FooHandler, BarHandler)
1280        self.opener_has_handler(o, FooHandler)
1281        self.opener_has_handler(o, BarHandler)
1282
1283        # can take a mix of classes and instances
1284        o = build_opener(FooHandler, BarHandler())
1285        self.opener_has_handler(o, FooHandler)
1286        self.opener_has_handler(o, BarHandler)
1287
1288        # subclasses of default handlers override default handlers
1289        o = build_opener(MyHTTPHandler)
1290        self.opener_has_handler(o, MyHTTPHandler)
1291
1292        # a particular case of overriding: default handlers can be passed
1293        # in explicitly
1294        o = build_opener()
1295        self.opener_has_handler(o, urllib2.HTTPHandler)
1296        o = build_opener(urllib2.HTTPHandler)
1297        self.opener_has_handler(o, urllib2.HTTPHandler)
1298        o = build_opener(urllib2.HTTPHandler())
1299        self.opener_has_handler(o, urllib2.HTTPHandler)
1300
1301        # Issue2670: multiple handlers sharing the same base class
1302        class MyOtherHTTPHandler(urllib2.HTTPHandler): pass
1303        o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
1304        self.opener_has_handler(o, MyHTTPHandler)
1305        self.opener_has_handler(o, MyOtherHTTPHandler)
1306
1307    def opener_has_handler(self, opener, handler_class):
1308        for h in opener.handlers:
1309            if h.__class__ == handler_class:
1310                break
1311        else:
1312            self.assertTrue(False)
1313
1314    def test_unsupported_algorithm(self):
1315        handler = AbstractDigestAuthHandler()
1316        with self.assertRaises(ValueError) as exc:
1317            handler.get_algorithm_impls('invalid')
1318        self.assertEqual(
1319            str(exc.exception),
1320            "Unsupported digest authentication algorithm 'invalid'"
1321        )
1322
1323    @unittest.skipUnless(ssl, "ssl module required")
1324    def test_url_path_with_control_char_rejected(self):
1325        for char_no in range(0, 0x21) + range(0x7f, 0x100):
1326            char = chr(char_no)
1327            schemeless_url = "//localhost:7777/test%s/" % char
1328            self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.")
1329            try:
1330                # We explicitly test urllib.request.urlopen() instead of the top
1331                # level 'def urlopen()' function defined in this... (quite ugly)
1332                # test suite.  They use different url opening codepaths.  Plain
1333                # urlopen uses FancyURLOpener which goes via a codepath that
1334                # calls urllib.parse.quote() on the URL which makes all of the
1335                # above attempts at injection within the url _path_ safe.
1336                escaped_char_repr = repr(char).replace('\\', r'\\')
1337                InvalidURL = httplib.InvalidURL
1338                with self.assertRaisesRegexp(
1339                    InvalidURL, "contain control.*" + escaped_char_repr):
1340                    urllib2.urlopen("http:" + schemeless_url)
1341                with self.assertRaisesRegexp(
1342                    InvalidURL, "contain control.*" + escaped_char_repr):
1343                    urllib2.urlopen("https:" + schemeless_url)
1344            finally:
1345                self.unfakehttp()
1346
1347    @unittest.skipUnless(ssl, "ssl module required")
1348    def test_url_path_with_newline_header_injection_rejected(self):
1349        self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.")
1350        host = "localhost:7777?a=1 HTTP/1.1\r\nX-injected: header\r\nTEST: 123"
1351        schemeless_url = "//" + host + ":8080/test/?test=a"
1352        try:
1353            # We explicitly test urllib2.urlopen() instead of the top
1354            # level 'def urlopen()' function defined in this... (quite ugly)
1355            # test suite.  They use different url opening codepaths.  Plain
1356            # urlopen uses FancyURLOpener which goes via a codepath that
1357            # calls urllib.parse.quote() on the URL which makes all of the
1358            # above attempts at injection within the url _path_ safe.
1359            InvalidURL = httplib.InvalidURL
1360            with self.assertRaisesRegexp(InvalidURL,
1361                    r"contain control.*\\r.*(found at least . .)"):
1362                urllib2.urlopen("http:{}".format(schemeless_url))
1363            with self.assertRaisesRegexp(InvalidURL,
1364                    r"contain control.*\\n"):
1365                urllib2.urlopen("https:{}".format(schemeless_url))
1366        finally:
1367            self.unfakehttp()
1368
1369    @unittest.skipUnless(ssl, "ssl module required")
1370    def test_url_host_with_control_char_rejected(self):
1371        for char_no in list(range(0, 0x21)) + [0x7f]:
1372            char = chr(char_no)
1373            schemeless_url = "//localhost{}/test/".format(char)
1374            self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.")
1375            try:
1376                escaped_char_repr = repr(char).replace('\\', r'\\')
1377                InvalidURL = httplib.InvalidURL
1378                with self.assertRaisesRegexp(InvalidURL,
1379                    "contain control.*{}".format(escaped_char_repr)):
1380                        urllib2.urlopen("http:{}".format(schemeless_url))
1381                with self.assertRaisesRegexp(InvalidURL,
1382                    "contain control.*{}".format(escaped_char_repr)):
1383                        urllib2.urlopen("https:{}".format(schemeless_url))
1384            finally:
1385                self.unfakehttp()
1386
1387
1388class RequestTests(unittest.TestCase):
1389
1390    def setUp(self):
1391        self.get = urllib2.Request("http://www.python.org/~jeremy/")
1392        self.post = urllib2.Request("http://www.python.org/~jeremy/",
1393                                    "data",
1394                                    headers={"X-Test": "test"})
1395
1396    def test_method(self):
1397        self.assertEqual("POST", self.post.get_method())
1398        self.assertEqual("GET", self.get.get_method())
1399
1400    def test_add_data(self):
1401        self.assertTrue(not self.get.has_data())
1402        self.assertEqual("GET", self.get.get_method())
1403        self.get.add_data("spam")
1404        self.assertTrue(self.get.has_data())
1405        self.assertEqual("POST", self.get.get_method())
1406
1407    def test_get_full_url(self):
1408        self.assertEqual("http://www.python.org/~jeremy/",
1409                         self.get.get_full_url())
1410
1411    def test_selector(self):
1412        self.assertEqual("/~jeremy/", self.get.get_selector())
1413        req = urllib2.Request("http://www.python.org/")
1414        self.assertEqual("/", req.get_selector())
1415
1416    def test_get_type(self):
1417        self.assertEqual("http", self.get.get_type())
1418
1419    def test_get_host(self):
1420        self.assertEqual("www.python.org", self.get.get_host())
1421
1422    def test_get_host_unquote(self):
1423        req = urllib2.Request("http://www.%70ython.org/")
1424        self.assertEqual("www.python.org", req.get_host())
1425
1426    def test_proxy(self):
1427        self.assertTrue(not self.get.has_proxy())
1428        self.get.set_proxy("www.perl.org", "http")
1429        self.assertTrue(self.get.has_proxy())
1430        self.assertEqual("www.python.org", self.get.get_origin_req_host())
1431        self.assertEqual("www.perl.org", self.get.get_host())
1432
1433    def test_wrapped_url(self):
1434        req = Request("<URL:http://www.python.org>")
1435        self.assertEqual("www.python.org", req.get_host())
1436
1437    def test_url_fragment(self):
1438        req = Request("http://www.python.org/?qs=query#fragment=true")
1439        self.assertEqual("/?qs=query", req.get_selector())
1440        req = Request("http://www.python.org/#fun=true")
1441        self.assertEqual("/", req.get_selector())
1442
1443        # Issue 11703: geturl() omits fragment in the original URL.
1444        url = 'http://docs.python.org/library/urllib2.html#OK'
1445        req = Request(url)
1446        self.assertEqual(req.get_full_url(), url)
1447
1448    def test_private_attributes(self):
1449        self.assertFalse(hasattr(self.get, '_Request__r_xxx'))
1450        # Issue #6500: infinite recursion
1451        self.assertFalse(hasattr(self.get, '_Request__r_method'))
1452
1453    def test_HTTPError_interface(self):
1454        """
1455        Issue 13211 reveals that HTTPError didn't implement the URLError
1456        interface even though HTTPError is a subclass of URLError.
1457
1458        >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None)
1459        >>> assert hasattr(err, 'reason')
1460        >>> err.reason
1461        'something bad happened'
1462        """
1463
1464    def test_HTTPError_interface_call(self):
1465        """
1466        Issue 15701= - HTTPError interface has info method available from URLError.
1467        """
1468        err = urllib2.HTTPError(msg='something bad happened', url=None,
1469                                code=None, hdrs='Content-Length:42', fp=None)
1470        self.assertTrue(hasattr(err, 'reason'))
1471        assert hasattr(err, 'reason')
1472        assert hasattr(err, 'info')
1473        assert callable(err.info)
1474        try:
1475            err.info()
1476        except AttributeError:
1477            self.fail("err.info() failed")
1478        self.assertEqual(err.info(), "Content-Length:42")
1479
1480
1481def test_main(verbose=None):
1482    # gevent: disabled doctests. they hang on OSX on Travis.
1483    # from test import test_urllib2
1484    # test_support.run_doctest(test_urllib2, verbose)
1485    # test_support.run_doctest(urllib2, verbose)
1486    tests = (TrivialTests,
1487             OpenerDirectorTests,
1488             HandlerTests,
1489             MiscTests,
1490             RequestTests)
1491    test_support.run_unittest(*tests)
1492
1493if __name__ == "__main__":
1494    test_main(verbose=True)
1495