1import unittest
2from test import support
3from test import test_urllib
4
5import os
6import io
7import socket
8import array
9import sys
10import tempfile
11import subprocess
12
13import urllib.request
14# The proxy bypass method imported below has logic specific to the OSX
15# proxy config data structure but is testable on all platforms.
16from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler,
17                            HTTPPasswordMgrWithPriorAuth, _parse_proxy,
18                            _proxy_bypass_macosx_sysconf,
19                            AbstractDigestAuthHandler)
20from urllib.parse import urlparse
21import urllib.error
22import http.client
23
24# XXX
25# Request
26# CacheFTPHandler (hard to write)
27# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
28
29
30class TrivialTests(unittest.TestCase):
31
32    def test___all__(self):
33        # Verify which names are exposed
34        for module in 'request', 'response', 'parse', 'error', 'robotparser':
35            context = {}
36            exec('from urllib.%s import *' % module, context)
37            del context['__builtins__']
38            if module == 'request' and os.name == 'nt':
39                u, p = context.pop('url2pathname'), context.pop('pathname2url')
40                self.assertEqual(u.__module__, 'nturl2path')
41                self.assertEqual(p.__module__, 'nturl2path')
42            for k, v in context.items():
43                self.assertEqual(v.__module__, 'urllib.%s' % module,
44                    "%r is exposed in 'urllib.%s' but defined in %r" %
45                    (k, module, v.__module__))
46
47    def test_trivial(self):
48        # A couple trivial tests
49
50        self.assertRaises(ValueError, urllib.request.urlopen, 'bogus url')
51
52        # XXX Name hacking to get this to work on Windows.
53        fname = os.path.abspath(urllib.request.__file__).replace(os.sep, '/')
54
55        if os.name == 'nt':
56            file_url = "file:///%s" % fname
57        else:
58            file_url = "file://%s" % fname
59
60        with urllib.request.urlopen(file_url) as f:
61            f.read()
62
63    def test_parse_http_list(self):
64        tests = [
65            ('a,b,c', ['a', 'b', 'c']),
66            ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
67            ('a, b, "c", "d", "e,f", g, h',
68             ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
69            ('a="b\\"c", d="e\\,f", g="h\\\\i"',
70             ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
71        for string, list in tests:
72            self.assertEqual(urllib.request.parse_http_list(string), list)
73
74    def test_URLError_reasonstr(self):
75        err = urllib.error.URLError('reason')
76        self.assertIn(err.reason, str(err))
77
78
79class RequestHdrsTests(unittest.TestCase):
80
81    def test_request_headers_dict(self):
82        """
83        The Request.headers dictionary is not a documented interface.  It
84        should stay that way, because the complete set of headers are only
85        accessible through the .get_header(), .has_header(), .header_items()
86        interface.  However, .headers pre-dates those methods, and so real code
87        will be using the dictionary.
88
89        The introduction in 2.4 of those methods was a mistake for the same
90        reason: code that previously saw all (urllib2 user)-provided headers in
91        .headers now sees only a subset.
92
93        """
94        url = "http://example.com"
95        self.assertEqual(Request(url,
96                                 headers={"Spam-eggs": "blah"}
97                                 ).headers["Spam-eggs"], "blah")
98        self.assertEqual(Request(url,
99                                 headers={"spam-EggS": "blah"}
100                                 ).headers["Spam-eggs"], "blah")
101
102    def test_request_headers_methods(self):
103        """
104        Note the case normalization of header names here, to
105        .capitalize()-case.  This should be preserved for
106        backwards-compatibility.  (In the HTTP case, normalization to
107        .title()-case is done by urllib2 before sending headers to
108        http.client).
109
110        Note that e.g. r.has_header("spam-EggS") is currently False, and
111        r.get_header("spam-EggS") returns None, but that could be changed in
112        future.
113
114        Method r.remove_header should remove items both from r.headers and
115        r.unredirected_hdrs dictionaries
116        """
117        url = "http://example.com"
118        req = Request(url, headers={"Spam-eggs": "blah"})
119        self.assertTrue(req.has_header("Spam-eggs"))
120        self.assertEqual(req.header_items(), [('Spam-eggs', 'blah')])
121
122        req.add_header("Foo-Bar", "baz")
123        self.assertEqual(sorted(req.header_items()),
124                         [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')])
125        self.assertFalse(req.has_header("Not-there"))
126        self.assertIsNone(req.get_header("Not-there"))
127        self.assertEqual(req.get_header("Not-there", "default"), "default")
128
129        req.remove_header("Spam-eggs")
130        self.assertFalse(req.has_header("Spam-eggs"))
131
132        req.add_unredirected_header("Unredirected-spam", "Eggs")
133        self.assertTrue(req.has_header("Unredirected-spam"))
134
135        req.remove_header("Unredirected-spam")
136        self.assertFalse(req.has_header("Unredirected-spam"))
137
138    def test_password_manager(self):
139        mgr = urllib.request.HTTPPasswordMgr()
140        add = mgr.add_password
141        find_user_pass = mgr.find_user_password
142
143        add("Some Realm", "http://example.com/", "joe", "password")
144        add("Some Realm", "http://example.com/ni", "ni", "ni")
145        add("Some Realm", "http://c.example.com:3128", "3", "c")
146        add("Some Realm", "d.example.com", "4", "d")
147        add("Some Realm", "e.example.com:3128", "5", "e")
148
149        # For the same realm, password set the highest path is the winner.
150        self.assertEqual(find_user_pass("Some Realm", "example.com"),
151                         ('joe', 'password'))
152        self.assertEqual(find_user_pass("Some Realm", "http://example.com/ni"),
153                         ('joe', 'password'))
154        self.assertEqual(find_user_pass("Some Realm", "http://example.com"),
155                         ('joe', 'password'))
156        self.assertEqual(find_user_pass("Some Realm", "http://example.com/"),
157                         ('joe', 'password'))
158        self.assertEqual(find_user_pass("Some Realm",
159                                        "http://example.com/spam"),
160                         ('joe', 'password'))
161
162        self.assertEqual(find_user_pass("Some Realm",
163                                        "http://example.com/spam/spam"),
164                         ('joe', 'password'))
165
166        # You can have different passwords for different paths.
167
168        add("c", "http://example.com/foo", "foo", "ni")
169        add("c", "http://example.com/bar", "bar", "nini")
170
171        self.assertEqual(find_user_pass("c", "http://example.com/foo"),
172                         ('foo', 'ni'))
173
174        self.assertEqual(find_user_pass("c", "http://example.com/bar"),
175                         ('bar', 'nini'))
176
177        # For the same path, newer password should be considered.
178
179        add("b", "http://example.com/", "first", "blah")
180        add("b", "http://example.com/", "second", "spam")
181
182        self.assertEqual(find_user_pass("b", "http://example.com/"),
183                         ('second', 'spam'))
184
185        # No special relationship between a.example.com and example.com:
186
187        add("a", "http://example.com", "1", "a")
188        self.assertEqual(find_user_pass("a", "http://example.com/"),
189                         ('1', 'a'))
190
191        self.assertEqual(find_user_pass("a", "http://a.example.com/"),
192                         (None, None))
193
194        # Ports:
195
196        self.assertEqual(find_user_pass("Some Realm", "c.example.com"),
197                         (None, None))
198        self.assertEqual(find_user_pass("Some Realm", "c.example.com:3128"),
199                         ('3', 'c'))
200        self.assertEqual(
201            find_user_pass("Some Realm", "http://c.example.com:3128"),
202            ('3', 'c'))
203        self.assertEqual(find_user_pass("Some Realm", "d.example.com"),
204                         ('4', 'd'))
205        self.assertEqual(find_user_pass("Some Realm", "e.example.com:3128"),
206                         ('5', 'e'))
207
208    def test_password_manager_default_port(self):
209        """
210        The point to note here is that we can't guess the default port if
211        there's no scheme.  This applies to both add_password and
212        find_user_password.
213        """
214        mgr = urllib.request.HTTPPasswordMgr()
215        add = mgr.add_password
216        find_user_pass = mgr.find_user_password
217        add("f", "http://g.example.com:80", "10", "j")
218        add("g", "http://h.example.com", "11", "k")
219        add("h", "i.example.com:80", "12", "l")
220        add("i", "j.example.com", "13", "m")
221        self.assertEqual(find_user_pass("f", "g.example.com:100"),
222                         (None, None))
223        self.assertEqual(find_user_pass("f", "g.example.com:80"),
224                         ('10', 'j'))
225        self.assertEqual(find_user_pass("f", "g.example.com"),
226                         (None, None))
227        self.assertEqual(find_user_pass("f", "http://g.example.com:100"),
228                         (None, None))
229        self.assertEqual(find_user_pass("f", "http://g.example.com:80"),
230                         ('10', 'j'))
231        self.assertEqual(find_user_pass("f", "http://g.example.com"),
232                         ('10', 'j'))
233        self.assertEqual(find_user_pass("g", "h.example.com"), ('11', 'k'))
234        self.assertEqual(find_user_pass("g", "h.example.com:80"), ('11', 'k'))
235        self.assertEqual(find_user_pass("g", "http://h.example.com:80"),
236                         ('11', 'k'))
237        self.assertEqual(find_user_pass("h", "i.example.com"), (None, None))
238        self.assertEqual(find_user_pass("h", "i.example.com:80"), ('12', 'l'))
239        self.assertEqual(find_user_pass("h", "http://i.example.com:80"),
240                         ('12', 'l'))
241        self.assertEqual(find_user_pass("i", "j.example.com"), ('13', 'm'))
242        self.assertEqual(find_user_pass("i", "j.example.com:80"),
243                         (None, None))
244        self.assertEqual(find_user_pass("i", "http://j.example.com"),
245                         ('13', 'm'))
246        self.assertEqual(find_user_pass("i", "http://j.example.com:80"),
247                         (None, None))
248
249
250class MockOpener:
251    addheaders = []
252
253    def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
254        self.req, self.data, self.timeout = req, data, timeout
255
256    def error(self, proto, *args):
257        self.proto, self.args = proto, args
258
259
260class MockFile:
261    def read(self, count=None):
262        pass
263
264    def readline(self, count=None):
265        pass
266
267    def close(self):
268        pass
269
270
271class MockHeaders(dict):
272    def getheaders(self, name):
273        return list(self.values())
274
275
276class MockResponse(io.StringIO):
277    def __init__(self, code, msg, headers, data, url=None):
278        io.StringIO.__init__(self, data)
279        self.code, self.msg, self.headers, self.url = code, msg, headers, url
280
281    def info(self):
282        return self.headers
283
284    def geturl(self):
285        return self.url
286
287
288class MockCookieJar:
289    def add_cookie_header(self, request):
290        self.ach_req = request
291
292    def extract_cookies(self, response, request):
293        self.ec_req, self.ec_r = request, response
294
295
296class FakeMethod:
297    def __init__(self, meth_name, action, handle):
298        self.meth_name = meth_name
299        self.handle = handle
300        self.action = action
301
302    def __call__(self, *args):
303        return self.handle(self.meth_name, self.action, *args)
304
305
306class MockHTTPResponse(io.IOBase):
307    def __init__(self, fp, msg, status, reason):
308        self.fp = fp
309        self.msg = msg
310        self.status = status
311        self.reason = reason
312        self.code = 200
313
314    def read(self):
315        return ''
316
317    def info(self):
318        return {}
319
320    def geturl(self):
321        return self.url
322
323
324class MockHTTPClass:
325    def __init__(self):
326        self.level = 0
327        self.req_headers = []
328        self.data = None
329        self.raise_on_endheaders = False
330        self.sock = None
331        self._tunnel_headers = {}
332
333    def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
334        self.host = host
335        self.timeout = timeout
336        return self
337
338    def set_debuglevel(self, level):
339        self.level = level
340
341    def set_tunnel(self, host, port=None, headers=None):
342        self._tunnel_host = host
343        self._tunnel_port = port
344        if headers:
345            self._tunnel_headers = headers
346        else:
347            self._tunnel_headers.clear()
348
349    def request(self, method, url, body=None, headers=None, *,
350                encode_chunked=False):
351        self.method = method
352        self.selector = url
353        if headers is not None:
354            self.req_headers += headers.items()
355        self.req_headers.sort()
356        if body:
357            self.data = body
358        self.encode_chunked = encode_chunked
359        if self.raise_on_endheaders:
360            raise OSError()
361
362    def getresponse(self):
363        return MockHTTPResponse(MockFile(), {}, 200, "OK")
364
365    def close(self):
366        pass
367
368
369class MockHandler:
370    # useful for testing handler machinery
371    # see add_ordered_mock_handlers() docstring
372    handler_order = 500
373
374    def __init__(self, methods):
375        self._define_methods(methods)
376
377    def _define_methods(self, methods):
378        for spec in methods:
379            if len(spec) == 2:
380                name, action = spec
381            else:
382                name, action = spec, None
383            meth = FakeMethod(name, action, self.handle)
384            setattr(self.__class__, name, meth)
385
386    def handle(self, fn_name, action, *args, **kwds):
387        self.parent.calls.append((self, fn_name, args, kwds))
388        if action is None:
389            return None
390        elif action == "return self":
391            return self
392        elif action == "return response":
393            res = MockResponse(200, "OK", {}, "")
394            return res
395        elif action == "return request":
396            return Request("http://blah/")
397        elif action.startswith("error"):
398            code = action[action.rfind(" ")+1:]
399            try:
400                code = int(code)
401            except ValueError:
402                pass
403            res = MockResponse(200, "OK", {}, "")
404            return self.parent.error("http", args[0], res, code, "", {})
405        elif action == "raise":
406            raise urllib.error.URLError("blah")
407        assert False
408
409    def close(self):
410        pass
411
412    def add_parent(self, parent):
413        self.parent = parent
414        self.parent.calls = []
415
416    def __lt__(self, other):
417        if not hasattr(other, "handler_order"):
418            # No handler_order, leave in original order.  Yuck.
419            return True
420        return self.handler_order < other.handler_order
421
422
423def add_ordered_mock_handlers(opener, meth_spec):
424    """Create MockHandlers and add them to an OpenerDirector.
425
426    meth_spec: list of lists of tuples and strings defining methods to define
427    on handlers.  eg:
428
429    [["http_error", "ftp_open"], ["http_open"]]
430
431    defines methods .http_error() and .ftp_open() on one handler, and
432    .http_open() on another.  These methods just record their arguments and
433    return None.  Using a tuple instead of a string causes the method to
434    perform some action (see MockHandler.handle()), eg:
435
436    [["http_error"], [("http_open", "return request")]]
437
438    defines .http_error() on one handler (which simply returns None), and
439    .http_open() on another handler, which returns a Request object.
440
441    """
442    handlers = []
443    count = 0
444    for meths in meth_spec:
445        class MockHandlerSubclass(MockHandler):
446            pass
447
448        h = MockHandlerSubclass(meths)
449        h.handler_order += count
450        h.add_parent(opener)
451        count = count + 1
452        handlers.append(h)
453        opener.add_handler(h)
454    return handlers
455
456
457def build_test_opener(*handler_instances):
458    opener = OpenerDirector()
459    for h in handler_instances:
460        opener.add_handler(h)
461    return opener
462
463
464class MockHTTPHandler(urllib.request.BaseHandler):
465    # useful for testing redirections and auth
466    # sends supplied headers and code as first response
467    # sends 200 OK as second response
468    def __init__(self, code, headers):
469        self.code = code
470        self.headers = headers
471        self.reset()
472
473    def reset(self):
474        self._count = 0
475        self.requests = []
476
477    def http_open(self, req):
478        import email, copy
479        self.requests.append(copy.deepcopy(req))
480        if self._count == 0:
481            self._count = self._count + 1
482            name = http.client.responses[self.code]
483            msg = email.message_from_string(self.headers)
484            return self.parent.error(
485                "http", req, MockFile(), self.code, name, msg)
486        else:
487            self.req = req
488            msg = email.message_from_string("\r\n\r\n")
489            return MockResponse(200, "OK", msg, "", req.get_full_url())
490
491
492class MockHTTPSHandler(urllib.request.AbstractHTTPHandler):
493    # Useful for testing the Proxy-Authorization request by verifying the
494    # properties of httpcon
495
496    def __init__(self, debuglevel=0):
497        urllib.request.AbstractHTTPHandler.__init__(self, debuglevel=debuglevel)
498        self.httpconn = MockHTTPClass()
499
500    def https_open(self, req):
501        return self.do_open(self.httpconn, req)
502
503
504class MockHTTPHandlerCheckAuth(urllib.request.BaseHandler):
505    # useful for testing auth
506    # sends supplied code response
507    # checks if auth header is specified in request
508    def __init__(self, code):
509        self.code = code
510        self.has_auth_header = False
511
512    def reset(self):
513        self.has_auth_header = False
514
515    def http_open(self, req):
516        if req.has_header('Authorization'):
517            self.has_auth_header = True
518        name = http.client.responses[self.code]
519        return MockResponse(self.code, name, MockFile(), "", req.get_full_url())
520
521
522
523class MockPasswordManager:
524    def add_password(self, realm, uri, user, password):
525        self.realm = realm
526        self.url = uri
527        self.user = user
528        self.password = password
529
530    def find_user_password(self, realm, authuri):
531        self.target_realm = realm
532        self.target_url = authuri
533        return self.user, self.password
534
535
536class OpenerDirectorTests(unittest.TestCase):
537
538    def test_add_non_handler(self):
539        class NonHandler(object):
540            pass
541        self.assertRaises(TypeError,
542                          OpenerDirector().add_handler, NonHandler())
543
544    def test_badly_named_methods(self):
545        # test work-around for three methods that accidentally follow the
546        # naming conventions for handler methods
547        # (*_open() / *_request() / *_response())
548
549        # These used to call the accidentally-named methods, causing a
550        # TypeError in real code; here, returning self from these mock
551        # methods would either cause no exception, or AttributeError.
552
553        from urllib.error import URLError
554
555        o = OpenerDirector()
556        meth_spec = [
557            [("do_open", "return self"), ("proxy_open", "return self")],
558            [("redirect_request", "return self")],
559            ]
560        add_ordered_mock_handlers(o, meth_spec)
561        o.add_handler(urllib.request.UnknownHandler())
562        for scheme in "do", "proxy", "redirect":
563            self.assertRaises(URLError, o.open, scheme+"://example.com/")
564
565    def test_handled(self):
566        # handler returning non-None means no more handlers will be called
567        o = OpenerDirector()
568        meth_spec = [
569            ["http_open", "ftp_open", "http_error_302"],
570            ["ftp_open"],
571            [("http_open", "return self")],
572            [("http_open", "return self")],
573            ]
574        handlers = add_ordered_mock_handlers(o, meth_spec)
575
576        req = Request("http://example.com/")
577        r = o.open(req)
578        # Second .http_open() gets called, third doesn't, since second returned
579        # non-None.  Handlers without .http_open() never get any methods called
580        # on them.
581        # In fact, second mock handler defining .http_open() returns self
582        # (instead of response), which becomes the OpenerDirector's return
583        # value.
584        self.assertEqual(r, handlers[2])
585        calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
586        for expected, got in zip(calls, o.calls):
587            handler, name, args, kwds = got
588            self.assertEqual((handler, name), expected)
589            self.assertEqual(args, (req,))
590
591    def test_handler_order(self):
592        o = OpenerDirector()
593        handlers = []
594        for meths, handler_order in [([("http_open", "return self")], 500),
595                                     (["http_open"], 0)]:
596            class MockHandlerSubclass(MockHandler):
597                pass
598
599            h = MockHandlerSubclass(meths)
600            h.handler_order = handler_order
601            handlers.append(h)
602            o.add_handler(h)
603
604        o.open("http://example.com/")
605        # handlers called in reverse order, thanks to their sort order
606        self.assertEqual(o.calls[0][0], handlers[1])
607        self.assertEqual(o.calls[1][0], handlers[0])
608
609    def test_raise(self):
610        # raising URLError stops processing of request
611        o = OpenerDirector()
612        meth_spec = [
613            [("http_open", "raise")],
614            [("http_open", "return self")],
615            ]
616        handlers = add_ordered_mock_handlers(o, meth_spec)
617
618        req = Request("http://example.com/")
619        self.assertRaises(urllib.error.URLError, o.open, req)
620        self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
621
622    def test_http_error(self):
623        # XXX http_error_default
624        # http errors are a special case
625        o = OpenerDirector()
626        meth_spec = [
627            [("http_open", "error 302")],
628            [("http_error_400", "raise"), "http_open"],
629            [("http_error_302", "return response"), "http_error_303",
630             "http_error"],
631            [("http_error_302")],
632            ]
633        handlers = add_ordered_mock_handlers(o, meth_spec)
634
635        class Unknown:
636            def __eq__(self, other):
637                return True
638
639        req = Request("http://example.com/")
640        o.open(req)
641        assert len(o.calls) == 2
642        calls = [(handlers[0], "http_open", (req,)),
643                 (handlers[2], "http_error_302",
644                  (req, Unknown(), 302, "", {}))]
645        for expected, got in zip(calls, o.calls):
646            handler, method_name, args = expected
647            self.assertEqual((handler, method_name), got[:2])
648            self.assertEqual(args, got[2])
649
650    def test_processors(self):
651        # *_request / *_response methods get called appropriately
652        o = OpenerDirector()
653        meth_spec = [
654            [("http_request", "return request"),
655             ("http_response", "return response")],
656            [("http_request", "return request"),
657             ("http_response", "return response")],
658            ]
659        handlers = add_ordered_mock_handlers(o, meth_spec)
660
661        req = Request("http://example.com/")
662        o.open(req)
663        # processor methods are called on *all* handlers that define them,
664        # not just the first handler that handles the request
665        calls = [
666            (handlers[0], "http_request"), (handlers[1], "http_request"),
667            (handlers[0], "http_response"), (handlers[1], "http_response")]
668
669        for i, (handler, name, args, kwds) in enumerate(o.calls):
670            if i < 2:
671                # *_request
672                self.assertEqual((handler, name), calls[i])
673                self.assertEqual(len(args), 1)
674                self.assertIsInstance(args[0], Request)
675            else:
676                # *_response
677                self.assertEqual((handler, name), calls[i])
678                self.assertEqual(len(args), 2)
679                self.assertIsInstance(args[0], Request)
680                # response from opener.open is None, because there's no
681                # handler that defines http_open to handle it
682                if args[1] is not None:
683                    self.assertIsInstance(args[1], MockResponse)
684
685
686def sanepathname2url(path):
687    try:
688        path.encode("utf-8")
689    except UnicodeEncodeError:
690        raise unittest.SkipTest("path is not encodable to utf8")
691    urlpath = urllib.request.pathname2url(path)
692    if os.name == "nt" and urlpath.startswith("///"):
693        urlpath = urlpath[2:]
694    # XXX don't ask me about the mac...
695    return urlpath
696
697
698class HandlerTests(unittest.TestCase):
699
700    def test_ftp(self):
701        class MockFTPWrapper:
702            def __init__(self, data):
703                self.data = data
704
705            def retrfile(self, filename, filetype):
706                self.filename, self.filetype = filename, filetype
707                return io.StringIO(self.data), len(self.data)
708
709            def close(self):
710                pass
711
712        class NullFTPHandler(urllib.request.FTPHandler):
713            def __init__(self, data):
714                self.data = data
715
716            def connect_ftp(self, user, passwd, host, port, dirs,
717                            timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
718                self.user, self.passwd = user, passwd
719                self.host, self.port = host, port
720                self.dirs = dirs
721                self.ftpwrapper = MockFTPWrapper(self.data)
722                return self.ftpwrapper
723
724        import ftplib
725        data = "rheum rhaponicum"
726        h = NullFTPHandler(data)
727        h.parent = MockOpener()
728
729        for url, host, port, user, passwd, type_, dirs, filename, mimetype in [
730            ("ftp://localhost/foo/bar/baz.html",
731             "localhost", ftplib.FTP_PORT, "", "", "I",
732             ["foo", "bar"], "baz.html", "text/html"),
733            ("ftp://parrot@localhost/foo/bar/baz.html",
734             "localhost", ftplib.FTP_PORT, "parrot", "", "I",
735             ["foo", "bar"], "baz.html", "text/html"),
736            ("ftp://%25parrot@localhost/foo/bar/baz.html",
737             "localhost", ftplib.FTP_PORT, "%parrot", "", "I",
738             ["foo", "bar"], "baz.html", "text/html"),
739            ("ftp://%2542parrot@localhost/foo/bar/baz.html",
740             "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",
741             ["foo", "bar"], "baz.html", "text/html"),
742            ("ftp://localhost:80/foo/bar/",
743             "localhost", 80, "", "", "D",
744             ["foo", "bar"], "", None),
745            ("ftp://localhost/baz.gif;type=a",
746             "localhost", ftplib.FTP_PORT, "", "", "A",
747             [], "baz.gif", None),  # XXX really this should guess image/gif
748            ]:
749            req = Request(url)
750            req.timeout = None
751            r = h.ftp_open(req)
752            # ftp authentication not yet implemented by FTPHandler
753            self.assertEqual(h.user, user)
754            self.assertEqual(h.passwd, passwd)
755            self.assertEqual(h.host, socket.gethostbyname(host))
756            self.assertEqual(h.port, port)
757            self.assertEqual(h.dirs, dirs)
758            self.assertEqual(h.ftpwrapper.filename, filename)
759            self.assertEqual(h.ftpwrapper.filetype, type_)
760            headers = r.info()
761            self.assertEqual(headers.get("Content-type"), mimetype)
762            self.assertEqual(int(headers["Content-length"]), len(data))
763
764    def test_file(self):
765        import email.utils
766        h = urllib.request.FileHandler()
767        o = h.parent = MockOpener()
768
769        TESTFN = support.TESTFN
770        urlpath = sanepathname2url(os.path.abspath(TESTFN))
771        towrite = b"hello, world\n"
772        urls = [
773            "file://localhost%s" % urlpath,
774            "file://%s" % urlpath,
775            "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
776            ]
777        try:
778            localaddr = socket.gethostbyname(socket.gethostname())
779        except socket.gaierror:
780            localaddr = ''
781        if localaddr:
782            urls.append("file://%s%s" % (localaddr, urlpath))
783
784        for url in urls:
785            f = open(TESTFN, "wb")
786            try:
787                try:
788                    f.write(towrite)
789                finally:
790                    f.close()
791
792                r = h.file_open(Request(url))
793                try:
794                    data = r.read()
795                    headers = r.info()
796                    respurl = r.geturl()
797                finally:
798                    r.close()
799                stats = os.stat(TESTFN)
800                modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
801            finally:
802                os.remove(TESTFN)
803            self.assertEqual(data, towrite)
804            self.assertEqual(headers["Content-type"], "text/plain")
805            self.assertEqual(headers["Content-length"], "13")
806            self.assertEqual(headers["Last-modified"], modified)
807            self.assertEqual(respurl, url)
808
809        for url in [
810            "file://localhost:80%s" % urlpath,
811            "file:///file_does_not_exist.txt",
812            "file://not-a-local-host.com//dir/file.txt",
813            "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
814                                   os.getcwd(), TESTFN),
815            "file://somerandomhost.ontheinternet.com%s/%s" %
816            (os.getcwd(), TESTFN),
817            ]:
818            try:
819                f = open(TESTFN, "wb")
820                try:
821                    f.write(towrite)
822                finally:
823                    f.close()
824
825                self.assertRaises(urllib.error.URLError,
826                                  h.file_open, Request(url))
827            finally:
828                os.remove(TESTFN)
829
830        h = urllib.request.FileHandler()
831        o = h.parent = MockOpener()
832        # XXXX why does // mean ftp (and /// mean not ftp!), and where
833        #  is file: scheme specified?  I think this is really a bug, and
834        #  what was intended was to distinguish between URLs like:
835        # file:/blah.txt (a file)
836        # file://localhost/blah.txt (a file)
837        # file:///blah.txt (a file)
838        # file://ftp.example.com/blah.txt (an ftp URL)
839        for url, ftp in [
840            ("file://ftp.example.com//foo.txt", False),
841            ("file://ftp.example.com///foo.txt", False),
842            ("file://ftp.example.com/foo.txt", False),
843            ("file://somehost//foo/something.txt", False),
844            ("file://localhost//foo/something.txt", False),
845            ]:
846            req = Request(url)
847            try:
848                h.file_open(req)
849            except urllib.error.URLError:
850                self.assertFalse(ftp)
851            else:
852                self.assertIs(o.req, req)
853                self.assertEqual(req.type, "ftp")
854            self.assertEqual(req.type == "ftp", ftp)
855
856    def test_http(self):
857
858        h = urllib.request.AbstractHTTPHandler()
859        o = h.parent = MockOpener()
860
861        url = "http://example.com/"
862        for method, data in [("GET", None), ("POST", b"blah")]:
863            req = Request(url, data, {"Foo": "bar"})
864            req.timeout = None
865            req.add_unredirected_header("Spam", "eggs")
866            http = MockHTTPClass()
867            r = h.do_open(http, req)
868
869            # result attributes
870            r.read; r.readline  # wrapped MockFile methods
871            r.info; r.geturl  # addinfourl methods
872            r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
873            hdrs = r.info()
874            hdrs.get; hdrs.__contains__  # r.info() gives dict from .getreply()
875            self.assertEqual(r.geturl(), url)
876
877            self.assertEqual(http.host, "example.com")
878            self.assertEqual(http.level, 0)
879            self.assertEqual(http.method, method)
880            self.assertEqual(http.selector, "/")
881            self.assertEqual(http.req_headers,
882                             [("Connection", "close"),
883                              ("Foo", "bar"), ("Spam", "eggs")])
884            self.assertEqual(http.data, data)
885
886        # check OSError converted to URLError
887        http.raise_on_endheaders = True
888        self.assertRaises(urllib.error.URLError, h.do_open, http, req)
889
890        # Check for TypeError on POST data which is str.
891        req = Request("http://example.com/","badpost")
892        self.assertRaises(TypeError, h.do_request_, req)
893
894        # check adding of standard headers
895        o.addheaders = [("Spam", "eggs")]
896        for data in b"", None:  # POST, GET
897            req = Request("http://example.com/", data)
898            r = MockResponse(200, "OK", {}, "")
899            newreq = h.do_request_(req)
900            if data is None:  # GET
901                self.assertNotIn("Content-length", req.unredirected_hdrs)
902                self.assertNotIn("Content-type", req.unredirected_hdrs)
903            else:  # POST
904                self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
905                self.assertEqual(req.unredirected_hdrs["Content-type"],
906                             "application/x-www-form-urlencoded")
907            # XXX the details of Host could be better tested
908            self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
909            self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
910
911            # don't clobber existing headers
912            req.add_unredirected_header("Content-length", "foo")
913            req.add_unredirected_header("Content-type", "bar")
914            req.add_unredirected_header("Host", "baz")
915            req.add_unredirected_header("Spam", "foo")
916            newreq = h.do_request_(req)
917            self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
918            self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
919            self.assertEqual(req.unredirected_hdrs["Host"], "baz")
920            self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
921
922    def test_http_body_file(self):
923        # A regular file - chunked encoding is used unless Content Length is
924        # already set.
925
926        h = urllib.request.AbstractHTTPHandler()
927        o = h.parent = MockOpener()
928
929        file_obj = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
930        file_path = file_obj.name
931        file_obj.close()
932        self.addCleanup(os.unlink, file_path)
933
934        with open(file_path, "rb") as f:
935            req = Request("http://example.com/", f, {})
936            newreq = h.do_request_(req)
937            te = newreq.get_header('Transfer-encoding')
938            self.assertEqual(te, "chunked")
939            self.assertFalse(newreq.has_header('Content-length'))
940
941        with open(file_path, "rb") as f:
942            req = Request("http://example.com/", f, {"Content-Length": 30})
943            newreq = h.do_request_(req)
944            self.assertEqual(int(newreq.get_header('Content-length')), 30)
945            self.assertFalse(newreq.has_header("Transfer-encoding"))
946
947    def test_http_body_fileobj(self):
948        # A file object - chunked encoding is used
949        # unless Content Length is already set.
950        # (Note that there are some subtle differences to a regular
951        # file, that is why we are testing both cases.)
952
953        h = urllib.request.AbstractHTTPHandler()
954        o = h.parent = MockOpener()
955        file_obj = io.BytesIO()
956
957        req = Request("http://example.com/", file_obj, {})
958        newreq = h.do_request_(req)
959        self.assertEqual(newreq.get_header('Transfer-encoding'), 'chunked')
960        self.assertFalse(newreq.has_header('Content-length'))
961
962        headers = {"Content-Length": 30}
963        req = Request("http://example.com/", file_obj, headers)
964        newreq = h.do_request_(req)
965        self.assertEqual(int(newreq.get_header('Content-length')), 30)
966        self.assertFalse(newreq.has_header("Transfer-encoding"))
967
968        file_obj.close()
969
970    def test_http_body_pipe(self):
971        # A file reading from a pipe.
972        # A pipe cannot be seek'ed.  There is no way to determine the
973        # content length up front.  Thus, do_request_() should fall
974        # back to Transfer-encoding chunked.
975
976        h = urllib.request.AbstractHTTPHandler()
977        o = h.parent = MockOpener()
978
979        cmd = [sys.executable, "-c", r"pass"]
980        for headers in {}, {"Content-Length": 30}:
981            with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
982                req = Request("http://example.com/", proc.stdout, headers)
983                newreq = h.do_request_(req)
984                if not headers:
985                    self.assertEqual(newreq.get_header('Content-length'), None)
986                    self.assertEqual(newreq.get_header('Transfer-encoding'),
987                                     'chunked')
988                else:
989                    self.assertEqual(int(newreq.get_header('Content-length')),
990                                     30)
991
992    def test_http_body_iterable(self):
993        # Generic iterable.  There is no way to determine the content
994        # length up front.  Fall back to Transfer-encoding chunked.
995
996        h = urllib.request.AbstractHTTPHandler()
997        o = h.parent = MockOpener()
998
999        def iterable_body():
1000            yield b"one"
1001
1002        for headers in {}, {"Content-Length": 11}:
1003            req = Request("http://example.com/", iterable_body(), headers)
1004            newreq = h.do_request_(req)
1005            if not headers:
1006                self.assertEqual(newreq.get_header('Content-length'), None)
1007                self.assertEqual(newreq.get_header('Transfer-encoding'),
1008                                 'chunked')
1009            else:
1010                self.assertEqual(int(newreq.get_header('Content-length')), 11)
1011
1012    def test_http_body_empty_seq(self):
1013        # Zero-length iterable body should be treated like any other iterable
1014        h = urllib.request.AbstractHTTPHandler()
1015        h.parent = MockOpener()
1016        req = h.do_request_(Request("http://example.com/", ()))
1017        self.assertEqual(req.get_header("Transfer-encoding"), "chunked")
1018        self.assertFalse(req.has_header("Content-length"))
1019
1020    def test_http_body_array(self):
1021        # array.array Iterable - Content Length is calculated
1022
1023        h = urllib.request.AbstractHTTPHandler()
1024        o = h.parent = MockOpener()
1025
1026        iterable_array = array.array("I",[1,2,3,4])
1027
1028        for headers in {}, {"Content-Length": 16}:
1029            req = Request("http://example.com/", iterable_array, headers)
1030            newreq = h.do_request_(req)
1031            self.assertEqual(int(newreq.get_header('Content-length')),16)
1032
1033    def test_http_handler_debuglevel(self):
1034        o = OpenerDirector()
1035        h = MockHTTPSHandler(debuglevel=1)
1036        o.add_handler(h)
1037        o.open("https://www.example.com")
1038        self.assertEqual(h._debuglevel, 1)
1039
1040    def test_http_doubleslash(self):
1041        # Checks the presence of any unnecessary double slash in url does not
1042        # break anything. Previously, a double slash directly after the host
1043        # could cause incorrect parsing.
1044        h = urllib.request.AbstractHTTPHandler()
1045        h.parent = MockOpener()
1046
1047        data = b""
1048        ds_urls = [
1049            "http://example.com/foo/bar/baz.html",
1050            "http://example.com//foo/bar/baz.html",
1051            "http://example.com/foo//bar/baz.html",
1052            "http://example.com/foo/bar//baz.html"
1053            ]
1054
1055        for ds_url in ds_urls:
1056            ds_req = Request(ds_url, data)
1057
1058            # Check whether host is determined correctly if there is no proxy
1059            np_ds_req = h.do_request_(ds_req)
1060            self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com")
1061
1062            # Check whether host is determined correctly if there is a proxy
1063            ds_req.set_proxy("someproxy:3128", None)
1064            p_ds_req = h.do_request_(ds_req)
1065            self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com")
1066
1067    def test_full_url_setter(self):
1068        # Checks to ensure that components are set correctly after setting the
1069        # full_url of a Request object
1070
1071        urls = [
1072            'http://example.com?foo=bar#baz',
1073            'http://example.com?foo=bar&spam=eggs#bash',
1074            'http://example.com',
1075        ]
1076
1077        # testing a reusable request instance, but the url parameter is
1078        # required, so just use a dummy one to instantiate
1079        r = Request('http://example.com')
1080        for url in urls:
1081            r.full_url = url
1082            parsed = urlparse(url)
1083
1084            self.assertEqual(r.get_full_url(), url)
1085            # full_url setter uses splittag to split into components.
1086            # splittag sets the fragment as None while urlparse sets it to ''
1087            self.assertEqual(r.fragment or '', parsed.fragment)
1088            self.assertEqual(urlparse(r.get_full_url()).query, parsed.query)
1089
1090    def test_full_url_deleter(self):
1091        r = Request('http://www.example.com')
1092        del r.full_url
1093        self.assertIsNone(r.full_url)
1094        self.assertIsNone(r.fragment)
1095        self.assertEqual(r.selector, '')
1096
1097    def test_fixpath_in_weirdurls(self):
1098        # Issue4493: urllib2 to supply '/' when to urls where path does not
1099        # start with'/'
1100
1101        h = urllib.request.AbstractHTTPHandler()
1102        h.parent = MockOpener()
1103
1104        weird_url = 'http://www.python.org?getspam'
1105        req = Request(weird_url)
1106        newreq = h.do_request_(req)
1107        self.assertEqual(newreq.host, 'www.python.org')
1108        self.assertEqual(newreq.selector, '/?getspam')
1109
1110        url_without_path = 'http://www.python.org'
1111        req = Request(url_without_path)
1112        newreq = h.do_request_(req)
1113        self.assertEqual(newreq.host, 'www.python.org')
1114        self.assertEqual(newreq.selector, '')
1115
1116    def test_errors(self):
1117        h = urllib.request.HTTPErrorProcessor()
1118        o = h.parent = MockOpener()
1119
1120        url = "http://example.com/"
1121        req = Request(url)
1122        # all 2xx are passed through
1123        r = MockResponse(200, "OK", {}, "", url)
1124        newr = h.http_response(req, r)
1125        self.assertIs(r, newr)
1126        self.assertFalse(hasattr(o, "proto"))  # o.error not called
1127        r = MockResponse(202, "Accepted", {}, "", url)
1128        newr = h.http_response(req, r)
1129        self.assertIs(r, newr)
1130        self.assertFalse(hasattr(o, "proto"))  # o.error not called
1131        r = MockResponse(206, "Partial content", {}, "", url)
1132        newr = h.http_response(req, r)
1133        self.assertIs(r, newr)
1134        self.assertFalse(hasattr(o, "proto"))  # o.error not called
1135        # anything else calls o.error (and MockOpener returns None, here)
1136        r = MockResponse(502, "Bad gateway", {}, "", url)
1137        self.assertIsNone(h.http_response(req, r))
1138        self.assertEqual(o.proto, "http")  # o.error called
1139        self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
1140
1141    def test_cookies(self):
1142        cj = MockCookieJar()
1143        h = urllib.request.HTTPCookieProcessor(cj)
1144        h.parent = MockOpener()
1145
1146        req = Request("http://example.com/")
1147        r = MockResponse(200, "OK", {}, "")
1148        newreq = h.http_request(req)
1149        self.assertIs(cj.ach_req, req)
1150        self.assertIs(cj.ach_req, newreq)
1151        self.assertEqual(req.origin_req_host, "example.com")
1152        self.assertFalse(req.unverifiable)
1153        newr = h.http_response(req, r)
1154        self.assertIs(cj.ec_req, req)
1155        self.assertIs(cj.ec_r, r)
1156        self.assertIs(r, newr)
1157
1158    def test_redirect(self):
1159        from_url = "http://example.com/a.html"
1160        to_url = "http://example.com/b.html"
1161        h = urllib.request.HTTPRedirectHandler()
1162        o = h.parent = MockOpener()
1163
1164        # ordinary redirect behaviour
1165        for code in 301, 302, 303, 307:
1166            for data in None, "blah\nblah\n":
1167                method = getattr(h, "http_error_%s" % code)
1168                req = Request(from_url, data)
1169                req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
1170                req.add_header("Nonsense", "viking=withhold")
1171                if data is not None:
1172                    req.add_header("Content-Length", str(len(data)))
1173                req.add_unredirected_header("Spam", "spam")
1174                try:
1175                    method(req, MockFile(), code, "Blah",
1176                           MockHeaders({"location": to_url}))
1177                except urllib.error.HTTPError:
1178                    # 307 in response to POST requires user OK
1179                    self.assertEqual(code, 307)
1180                    self.assertIsNotNone(data)
1181                self.assertEqual(o.req.get_full_url(), to_url)
1182                try:
1183                    self.assertEqual(o.req.get_method(), "GET")
1184                except AttributeError:
1185                    self.assertFalse(o.req.data)
1186
1187                # now it's a GET, there should not be headers regarding content
1188                # (possibly dragged from before being a POST)
1189                headers = [x.lower() for x in o.req.headers]
1190                self.assertNotIn("content-length", headers)
1191                self.assertNotIn("content-type", headers)
1192
1193                self.assertEqual(o.req.headers["Nonsense"],
1194                                 "viking=withhold")
1195                self.assertNotIn("Spam", o.req.headers)
1196                self.assertNotIn("Spam", o.req.unredirected_hdrs)
1197
1198        # loop detection
1199        req = Request(from_url)
1200        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
1201
1202        def redirect(h, req, url=to_url):
1203            h.http_error_302(req, MockFile(), 302, "Blah",
1204                             MockHeaders({"location": url}))
1205        # Note that the *original* request shares the same record of
1206        # redirections with the sub-requests caused by the redirections.
1207
1208        # detect infinite loop redirect of a URL to itself
1209        req = Request(from_url, origin_req_host="example.com")
1210        count = 0
1211        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
1212        try:
1213            while 1:
1214                redirect(h, req, "http://example.com/")
1215                count = count + 1
1216        except urllib.error.HTTPError:
1217            # don't stop until max_repeats, because cookies may introduce state
1218            self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_repeats)
1219
1220        # detect endless non-repeating chain of redirects
1221        req = Request(from_url, origin_req_host="example.com")
1222        count = 0
1223        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
1224        try:
1225            while 1:
1226                redirect(h, req, "http://example.com/%d" % count)
1227                count = count + 1
1228        except urllib.error.HTTPError:
1229            self.assertEqual(count,
1230                             urllib.request.HTTPRedirectHandler.max_redirections)
1231
1232    def test_invalid_redirect(self):
1233        from_url = "http://example.com/a.html"
1234        valid_schemes = ['http','https','ftp']
1235        invalid_schemes = ['file','imap','ldap']
1236        schemeless_url = "example.com/b.html"
1237        h = urllib.request.HTTPRedirectHandler()
1238        o = h.parent = MockOpener()
1239        req = Request(from_url)
1240        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
1241
1242        for scheme in invalid_schemes:
1243            invalid_url = scheme + '://' + schemeless_url
1244            self.assertRaises(urllib.error.HTTPError, h.http_error_302,
1245                    req, MockFile(), 302, "Security Loophole",
1246                    MockHeaders({"location": invalid_url}))
1247
1248        for scheme in valid_schemes:
1249            valid_url = scheme + '://' + schemeless_url
1250            h.http_error_302(req, MockFile(), 302, "That's fine",
1251                MockHeaders({"location": valid_url}))
1252            self.assertEqual(o.req.get_full_url(), valid_url)
1253
1254    def test_relative_redirect(self):
1255        from_url = "http://example.com/a.html"
1256        relative_url = "/b.html"
1257        h = urllib.request.HTTPRedirectHandler()
1258        o = h.parent = MockOpener()
1259        req = Request(from_url)
1260        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
1261
1262        valid_url = urllib.parse.urljoin(from_url,relative_url)
1263        h.http_error_302(req, MockFile(), 302, "That's fine",
1264            MockHeaders({"location": valid_url}))
1265        self.assertEqual(o.req.get_full_url(), valid_url)
1266
1267    def test_cookie_redirect(self):
1268        # cookies shouldn't leak into redirected requests
1269        from http.cookiejar import CookieJar
1270        from test.test_http_cookiejar import interact_netscape
1271
1272        cj = CookieJar()
1273        interact_netscape(cj, "http://www.example.com/", "spam=eggs")
1274        hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
1275        hdeh = urllib.request.HTTPDefaultErrorHandler()
1276        hrh = urllib.request.HTTPRedirectHandler()
1277        cp = urllib.request.HTTPCookieProcessor(cj)
1278        o = build_test_opener(hh, hdeh, hrh, cp)
1279        o.open("http://www.example.com/")
1280        self.assertFalse(hh.req.has_header("Cookie"))
1281
1282    def test_redirect_fragment(self):
1283        redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
1284        hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
1285        hdeh = urllib.request.HTTPDefaultErrorHandler()
1286        hrh = urllib.request.HTTPRedirectHandler()
1287        o = build_test_opener(hh, hdeh, hrh)
1288        fp = o.open('http://www.example.com')
1289        self.assertEqual(fp.geturl(), redirected_url.strip())
1290
1291    def test_redirect_no_path(self):
1292        # Issue 14132: Relative redirect strips original path
1293        real_class = http.client.HTTPConnection
1294        response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n"
1295        http.client.HTTPConnection = test_urllib.fakehttp(response1)
1296        self.addCleanup(setattr, http.client, "HTTPConnection", real_class)
1297        urls = iter(("/path", "/path?query"))
1298        def request(conn, method, url, *pos, **kw):
1299            self.assertEqual(url, next(urls))
1300            real_class.request(conn, method, url, *pos, **kw)
1301            # Change response for subsequent connection
1302            conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!"
1303        http.client.HTTPConnection.request = request
1304        fp = urllib.request.urlopen("http://python.org/path")
1305        self.assertEqual(fp.geturl(), "http://python.org/path?query")
1306
1307    def test_redirect_encoding(self):
1308        # Some characters in the redirect target may need special handling,
1309        # but most ASCII characters should be treated as already encoded
1310        class Handler(urllib.request.HTTPHandler):
1311            def http_open(self, req):
1312                result = self.do_open(self.connection, req)
1313                self.last_buf = self.connection.buf
1314                # Set up a normal response for the next request
1315                self.connection = test_urllib.fakehttp(
1316                    b'HTTP/1.1 200 OK\r\n'
1317                    b'Content-Length: 3\r\n'
1318                    b'\r\n'
1319                    b'123'
1320                )
1321                return result
1322        handler = Handler()
1323        opener = urllib.request.build_opener(handler)
1324        tests = (
1325            (b'/p\xC3\xA5-dansk/', b'/p%C3%A5-dansk/'),
1326            (b'/spaced%20path/', b'/spaced%20path/'),
1327            (b'/spaced path/', b'/spaced%20path/'),
1328            (b'/?p\xC3\xA5-dansk', b'/?p%C3%A5-dansk'),
1329        )
1330        for [location, result] in tests:
1331            with self.subTest(repr(location)):
1332                handler.connection = test_urllib.fakehttp(
1333                    b'HTTP/1.1 302 Redirect\r\n'
1334                    b'Location: ' + location + b'\r\n'
1335                    b'\r\n'
1336                )
1337                response = opener.open('http://example.com/')
1338                expected = b'GET ' + result + b' '
1339                request = handler.last_buf
1340                self.assertTrue(request.startswith(expected), repr(request))
1341
1342    def test_proxy(self):
1343        u = "proxy.example.com:3128"
1344        for d in dict(http=u), dict(HTTP=u):
1345            o = OpenerDirector()
1346            ph = urllib.request.ProxyHandler(d)
1347            o.add_handler(ph)
1348            meth_spec = [
1349                [("http_open", "return response")]
1350                ]
1351            handlers = add_ordered_mock_handlers(o, meth_spec)
1352
1353            req = Request("http://acme.example.com/")
1354            self.assertEqual(req.host, "acme.example.com")
1355            o.open(req)
1356            self.assertEqual(req.host, u)
1357            self.assertEqual([(handlers[0], "http_open")],
1358                             [tup[0:2] for tup in o.calls])
1359
1360    def test_proxy_no_proxy(self):
1361        os.environ['no_proxy'] = 'python.org'
1362        o = OpenerDirector()
1363        ph = urllib.request.ProxyHandler(dict(http="proxy.example.com"))
1364        o.add_handler(ph)
1365        req = Request("http://www.perl.org/")
1366        self.assertEqual(req.host, "www.perl.org")
1367        o.open(req)
1368        self.assertEqual(req.host, "proxy.example.com")
1369        req = Request("http://www.python.org")
1370        self.assertEqual(req.host, "www.python.org")
1371        o.open(req)
1372        self.assertEqual(req.host, "www.python.org")
1373        del os.environ['no_proxy']
1374
1375    def test_proxy_no_proxy_all(self):
1376        os.environ['no_proxy'] = '*'
1377        o = OpenerDirector()
1378        ph = urllib.request.ProxyHandler(dict(http="proxy.example.com"))
1379        o.add_handler(ph)
1380        req = Request("http://www.python.org")
1381        self.assertEqual(req.host, "www.python.org")
1382        o.open(req)
1383        self.assertEqual(req.host, "www.python.org")
1384        del os.environ['no_proxy']
1385
1386    def test_proxy_https(self):
1387        o = OpenerDirector()
1388        ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128"))
1389        o.add_handler(ph)
1390        meth_spec = [
1391            [("https_open", "return response")]
1392        ]
1393        handlers = add_ordered_mock_handlers(o, meth_spec)
1394
1395        req = Request("https://www.example.com/")
1396        self.assertEqual(req.host, "www.example.com")
1397        o.open(req)
1398        self.assertEqual(req.host, "proxy.example.com:3128")
1399        self.assertEqual([(handlers[0], "https_open")],
1400                         [tup[0:2] for tup in o.calls])
1401
1402    def test_proxy_https_proxy_authorization(self):
1403        o = OpenerDirector()
1404        ph = urllib.request.ProxyHandler(dict(https='proxy.example.com:3128'))
1405        o.add_handler(ph)
1406        https_handler = MockHTTPSHandler()
1407        o.add_handler(https_handler)
1408        req = Request("https://www.example.com/")
1409        req.add_header("Proxy-Authorization", "FooBar")
1410        req.add_header("User-Agent", "Grail")
1411        self.assertEqual(req.host, "www.example.com")
1412        self.assertIsNone(req._tunnel_host)
1413        o.open(req)
1414        # Verify Proxy-Authorization gets tunneled to request.
1415        # httpsconn req_headers do not have the Proxy-Authorization header but
1416        # the req will have.
1417        self.assertNotIn(("Proxy-Authorization", "FooBar"),
1418                         https_handler.httpconn.req_headers)
1419        self.assertIn(("User-Agent", "Grail"),
1420                      https_handler.httpconn.req_headers)
1421        self.assertIsNotNone(req._tunnel_host)
1422        self.assertEqual(req.host, "proxy.example.com:3128")
1423        self.assertEqual(req.get_header("Proxy-authorization"), "FooBar")
1424
1425    @unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX")
1426    def test_osx_proxy_bypass(self):
1427        bypass = {
1428            'exclude_simple': False,
1429            'exceptions': ['foo.bar', '*.bar.com', '127.0.0.1', '10.10',
1430                           '10.0/16']
1431        }
1432        # Check hosts that should trigger the proxy bypass
1433        for host in ('foo.bar', 'www.bar.com', '127.0.0.1', '10.10.0.1',
1434                     '10.0.0.1'):
1435            self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass),
1436                            'expected bypass of %s to be True' % host)
1437        # Check hosts that should not trigger the proxy bypass
1438        for host in ('abc.foo.bar', 'bar.com', '127.0.0.2', '10.11.0.1',
1439                'notinbypass'):
1440            self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass),
1441                             'expected bypass of %s to be False' % host)
1442
1443        # Check the exclude_simple flag
1444        bypass = {'exclude_simple': True, 'exceptions': []}
1445        self.assertTrue(_proxy_bypass_macosx_sysconf('test', bypass))
1446
1447        # Check that invalid prefix lengths are ignored
1448        bypass = {
1449            'exclude_simple': False,
1450            'exceptions': [ '10.0.0.0/40', '172.19.10.0/24' ]
1451        }
1452        host = '172.19.10.5'
1453        self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass),
1454                        'expected bypass of %s to be True' % host)
1455        host = '10.0.1.5'
1456        self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass),
1457                        'expected bypass of %s to be False' % host)
1458
1459    def check_basic_auth(self, headers, realm):
1460        with self.subTest(realm=realm, headers=headers):
1461            opener = OpenerDirector()
1462            password_manager = MockPasswordManager()
1463            auth_handler = urllib.request.HTTPBasicAuthHandler(password_manager)
1464            body = '\r\n'.join(headers) + '\r\n\r\n'
1465            http_handler = MockHTTPHandler(401, body)
1466            opener.add_handler(auth_handler)
1467            opener.add_handler(http_handler)
1468            self._test_basic_auth(opener, auth_handler, "Authorization",
1469                                  realm, http_handler, password_manager,
1470                                  "http://acme.example.com/protected",
1471                                  "http://acme.example.com/protected")
1472
1473    def test_basic_auth(self):
1474        realm = "realm2@example.com"
1475        realm2 = "realm2@example.com"
1476        basic = f'Basic realm="{realm}"'
1477        basic2 = f'Basic realm="{realm2}"'
1478        other_no_realm = 'Otherscheme xxx'
1479        digest = (f'Digest realm="{realm2}", '
1480                  f'qop="auth, auth-int", '
1481                  f'nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", '
1482                  f'opaque="5ccc069c403ebaf9f0171e9517f40e41"')
1483        for realm_str in (
1484            # test "quote" and 'quote'
1485            f'Basic realm="{realm}"',
1486            f"Basic realm='{realm}'",
1487
1488            # charset is ignored
1489            f'Basic realm="{realm}", charset="UTF-8"',
1490
1491            # Multiple challenges per header
1492            f'{basic}, {basic2}',
1493            f'{basic}, {other_no_realm}',
1494            f'{other_no_realm}, {basic}',
1495            f'{basic}, {digest}',
1496            f'{digest}, {basic}',
1497        ):
1498            headers = [f'WWW-Authenticate: {realm_str}']
1499            self.check_basic_auth(headers, realm)
1500
1501        # no quote: expect a warning
1502        with support.check_warnings(("Basic Auth Realm was unquoted",
1503                                     UserWarning)):
1504            headers = [f'WWW-Authenticate: Basic realm={realm}']
1505            self.check_basic_auth(headers, realm)
1506
1507        # Multiple headers: one challenge per header.
1508        # Use the first Basic realm.
1509        for challenges in (
1510            [basic,  basic2],
1511            [basic,  digest],
1512            [digest, basic],
1513        ):
1514            headers = [f'WWW-Authenticate: {challenge}'
1515                       for challenge in challenges]
1516            self.check_basic_auth(headers, realm)
1517
1518    def test_proxy_basic_auth(self):
1519        opener = OpenerDirector()
1520        ph = urllib.request.ProxyHandler(dict(http="proxy.example.com:3128"))
1521        opener.add_handler(ph)
1522        password_manager = MockPasswordManager()
1523        auth_handler = urllib.request.ProxyBasicAuthHandler(password_manager)
1524        realm = "ACME Networks"
1525        http_handler = MockHTTPHandler(
1526            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1527        opener.add_handler(auth_handler)
1528        opener.add_handler(http_handler)
1529        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
1530                              realm, http_handler, password_manager,
1531                              "http://acme.example.com:3128/protected",
1532                              "proxy.example.com:3128",
1533                              )
1534
1535    def test_basic_and_digest_auth_handlers(self):
1536        # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
1537        # response (http://python.org/sf/1479302), where it should instead
1538        # return None to allow another handler (especially
1539        # HTTPBasicAuthHandler) to handle the response.
1540
1541        # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
1542        # try digest first (since it's the strongest auth scheme), so we record
1543        # order of calls here to check digest comes first:
1544        class RecordingOpenerDirector(OpenerDirector):
1545            def __init__(self):
1546                OpenerDirector.__init__(self)
1547                self.recorded = []
1548
1549            def record(self, info):
1550                self.recorded.append(info)
1551
1552        class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler):
1553            def http_error_401(self, *args, **kwds):
1554                self.parent.record("digest")
1555                urllib.request.HTTPDigestAuthHandler.http_error_401(self,
1556                                                             *args, **kwds)
1557
1558        class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler):
1559            def http_error_401(self, *args, **kwds):
1560                self.parent.record("basic")
1561                urllib.request.HTTPBasicAuthHandler.http_error_401(self,
1562                                                            *args, **kwds)
1563
1564        opener = RecordingOpenerDirector()
1565        password_manager = MockPasswordManager()
1566        digest_handler = TestDigestAuthHandler(password_manager)
1567        basic_handler = TestBasicAuthHandler(password_manager)
1568        realm = "ACME Networks"
1569        http_handler = MockHTTPHandler(
1570            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1571        opener.add_handler(basic_handler)
1572        opener.add_handler(digest_handler)
1573        opener.add_handler(http_handler)
1574
1575        # check basic auth isn't blocked by digest handler failing
1576        self._test_basic_auth(opener, basic_handler, "Authorization",
1577                              realm, http_handler, password_manager,
1578                              "http://acme.example.com/protected",
1579                              "http://acme.example.com/protected",
1580                              )
1581        # check digest was tried before basic (twice, because
1582        # _test_basic_auth called .open() twice)
1583        self.assertEqual(opener.recorded, ["digest", "basic"]*2)
1584
1585    def test_unsupported_auth_digest_handler(self):
1586        opener = OpenerDirector()
1587        # While using DigestAuthHandler
1588        digest_auth_handler = urllib.request.HTTPDigestAuthHandler(None)
1589        http_handler = MockHTTPHandler(
1590            401, 'WWW-Authenticate: Kerberos\r\n\r\n')
1591        opener.add_handler(digest_auth_handler)
1592        opener.add_handler(http_handler)
1593        self.assertRaises(ValueError, opener.open, "http://www.example.com")
1594
1595    def test_unsupported_auth_basic_handler(self):
1596        # While using BasicAuthHandler
1597        opener = OpenerDirector()
1598        basic_auth_handler = urllib.request.HTTPBasicAuthHandler(None)
1599        http_handler = MockHTTPHandler(
1600            401, 'WWW-Authenticate: NTLM\r\n\r\n')
1601        opener.add_handler(basic_auth_handler)
1602        opener.add_handler(http_handler)
1603        self.assertRaises(ValueError, opener.open, "http://www.example.com")
1604
1605    def _test_basic_auth(self, opener, auth_handler, auth_header,
1606                         realm, http_handler, password_manager,
1607                         request_url, protected_url):
1608        import base64
1609        user, password = "wile", "coyote"
1610
1611        # .add_password() fed through to password manager
1612        auth_handler.add_password(realm, request_url, user, password)
1613        self.assertEqual(realm, password_manager.realm)
1614        self.assertEqual(request_url, password_manager.url)
1615        self.assertEqual(user, password_manager.user)
1616        self.assertEqual(password, password_manager.password)
1617
1618        opener.open(request_url)
1619
1620        # should have asked the password manager for the username/password
1621        self.assertEqual(password_manager.target_realm, realm)
1622        self.assertEqual(password_manager.target_url, protected_url)
1623
1624        # expect one request without authorization, then one with
1625        self.assertEqual(len(http_handler.requests), 2)
1626        self.assertFalse(http_handler.requests[0].has_header(auth_header))
1627        userpass = bytes('%s:%s' % (user, password), "ascii")
1628        auth_hdr_value = ('Basic ' +
1629            base64.encodebytes(userpass).strip().decode())
1630        self.assertEqual(http_handler.requests[1].get_header(auth_header),
1631                         auth_hdr_value)
1632        self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],
1633                         auth_hdr_value)
1634        # if the password manager can't find a password, the handler won't
1635        # handle the HTTP auth error
1636        password_manager.user = password_manager.password = None
1637        http_handler.reset()
1638        opener.open(request_url)
1639        self.assertEqual(len(http_handler.requests), 1)
1640        self.assertFalse(http_handler.requests[0].has_header(auth_header))
1641
1642    def test_basic_prior_auth_auto_send(self):
1643        # Assume already authenticated if is_authenticated=True
1644        # for APIs like Github that don't return 401
1645
1646        user, password = "wile", "coyote"
1647        request_url = "http://acme.example.com/protected"
1648
1649        http_handler = MockHTTPHandlerCheckAuth(200)
1650
1651        pwd_manager = HTTPPasswordMgrWithPriorAuth()
1652        auth_prior_handler = HTTPBasicAuthHandler(pwd_manager)
1653        auth_prior_handler.add_password(
1654            None, request_url, user, password, is_authenticated=True)
1655
1656        is_auth = pwd_manager.is_authenticated(request_url)
1657        self.assertTrue(is_auth)
1658
1659        opener = OpenerDirector()
1660        opener.add_handler(auth_prior_handler)
1661        opener.add_handler(http_handler)
1662
1663        opener.open(request_url)
1664
1665        # expect request to be sent with auth header
1666        self.assertTrue(http_handler.has_auth_header)
1667
1668    def test_basic_prior_auth_send_after_first_success(self):
1669        # Auto send auth header after authentication is successful once
1670
1671        user, password = 'wile', 'coyote'
1672        request_url = 'http://acme.example.com/protected'
1673        realm = 'ACME'
1674
1675        pwd_manager = HTTPPasswordMgrWithPriorAuth()
1676        auth_prior_handler = HTTPBasicAuthHandler(pwd_manager)
1677        auth_prior_handler.add_password(realm, request_url, user, password)
1678
1679        is_auth = pwd_manager.is_authenticated(request_url)
1680        self.assertFalse(is_auth)
1681
1682        opener = OpenerDirector()
1683        opener.add_handler(auth_prior_handler)
1684
1685        http_handler = MockHTTPHandler(
1686            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % None)
1687        opener.add_handler(http_handler)
1688
1689        opener.open(request_url)
1690
1691        is_auth = pwd_manager.is_authenticated(request_url)
1692        self.assertTrue(is_auth)
1693
1694        http_handler = MockHTTPHandlerCheckAuth(200)
1695        self.assertFalse(http_handler.has_auth_header)
1696
1697        opener = OpenerDirector()
1698        opener.add_handler(auth_prior_handler)
1699        opener.add_handler(http_handler)
1700
1701        # After getting 200 from MockHTTPHandler
1702        # Next request sends header in the first request
1703        opener.open(request_url)
1704
1705        # expect request to be sent with auth header
1706        self.assertTrue(http_handler.has_auth_header)
1707
1708    def test_http_closed(self):
1709        """Test the connection is cleaned up when the response is closed"""
1710        for (transfer, data) in (
1711            ("Connection: close", b"data"),
1712            ("Transfer-Encoding: chunked", b"4\r\ndata\r\n0\r\n\r\n"),
1713            ("Content-Length: 4", b"data"),
1714        ):
1715            header = "HTTP/1.1 200 OK\r\n{}\r\n\r\n".format(transfer)
1716            conn = test_urllib.fakehttp(header.encode() + data)
1717            handler = urllib.request.AbstractHTTPHandler()
1718            req = Request("http://dummy/")
1719            req.timeout = None
1720            with handler.do_open(conn, req) as resp:
1721                resp.read()
1722            self.assertTrue(conn.fakesock.closed,
1723                "Connection not closed with {!r}".format(transfer))
1724
1725    def test_invalid_closed(self):
1726        """Test the connection is cleaned up after an invalid response"""
1727        conn = test_urllib.fakehttp(b"")
1728        handler = urllib.request.AbstractHTTPHandler()
1729        req = Request("http://dummy/")
1730        req.timeout = None
1731        with self.assertRaises(http.client.BadStatusLine):
1732            handler.do_open(conn, req)
1733        self.assertTrue(conn.fakesock.closed, "Connection not closed")
1734
1735
1736class MiscTests(unittest.TestCase):
1737
1738    def opener_has_handler(self, opener, handler_class):
1739        self.assertTrue(any(h.__class__ == handler_class
1740                            for h in opener.handlers))
1741
1742    def test_build_opener(self):
1743        class MyHTTPHandler(urllib.request.HTTPHandler):
1744            pass
1745
1746        class FooHandler(urllib.request.BaseHandler):
1747            def foo_open(self):
1748                pass
1749
1750        class BarHandler(urllib.request.BaseHandler):
1751            def bar_open(self):
1752                pass
1753
1754        build_opener = urllib.request.build_opener
1755
1756        o = build_opener(FooHandler, BarHandler)
1757        self.opener_has_handler(o, FooHandler)
1758        self.opener_has_handler(o, BarHandler)
1759
1760        # can take a mix of classes and instances
1761        o = build_opener(FooHandler, BarHandler())
1762        self.opener_has_handler(o, FooHandler)
1763        self.opener_has_handler(o, BarHandler)
1764
1765        # subclasses of default handlers override default handlers
1766        o = build_opener(MyHTTPHandler)
1767        self.opener_has_handler(o, MyHTTPHandler)
1768
1769        # a particular case of overriding: default handlers can be passed
1770        # in explicitly
1771        o = build_opener()
1772        self.opener_has_handler(o, urllib.request.HTTPHandler)
1773        o = build_opener(urllib.request.HTTPHandler)
1774        self.opener_has_handler(o, urllib.request.HTTPHandler)
1775        o = build_opener(urllib.request.HTTPHandler())
1776        self.opener_has_handler(o, urllib.request.HTTPHandler)
1777
1778        # Issue2670: multiple handlers sharing the same base class
1779        class MyOtherHTTPHandler(urllib.request.HTTPHandler):
1780            pass
1781
1782        o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
1783        self.opener_has_handler(o, MyHTTPHandler)
1784        self.opener_has_handler(o, MyOtherHTTPHandler)
1785
1786    @unittest.skipUnless(support.is_resource_enabled('network'),
1787                         'test requires network access')
1788    def test_issue16464(self):
1789        with support.transient_internet("http://www.example.com/"):
1790            opener = urllib.request.build_opener()
1791            request = urllib.request.Request("http://www.example.com/")
1792            self.assertEqual(None, request.data)
1793
1794            opener.open(request, "1".encode("us-ascii"))
1795            self.assertEqual(b"1", request.data)
1796            self.assertEqual("1", request.get_header("Content-length"))
1797
1798            opener.open(request, "1234567890".encode("us-ascii"))
1799            self.assertEqual(b"1234567890", request.data)
1800            self.assertEqual("10", request.get_header("Content-length"))
1801
1802    def test_HTTPError_interface(self):
1803        """
1804        Issue 13211 reveals that HTTPError didn't implement the URLError
1805        interface even though HTTPError is a subclass of URLError.
1806        """
1807        msg = 'something bad happened'
1808        url = code = fp = None
1809        hdrs = 'Content-Length: 42'
1810        err = urllib.error.HTTPError(url, code, msg, hdrs, fp)
1811        self.assertTrue(hasattr(err, 'reason'))
1812        self.assertEqual(err.reason, 'something bad happened')
1813        self.assertTrue(hasattr(err, 'headers'))
1814        self.assertEqual(err.headers, 'Content-Length: 42')
1815        expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg)
1816        self.assertEqual(str(err), expected_errmsg)
1817        expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg)
1818        self.assertEqual(repr(err), expected_errmsg)
1819
1820    def test_parse_proxy(self):
1821        parse_proxy_test_cases = [
1822            ('proxy.example.com',
1823             (None, None, None, 'proxy.example.com')),
1824            ('proxy.example.com:3128',
1825             (None, None, None, 'proxy.example.com:3128')),
1826            ('proxy.example.com', (None, None, None, 'proxy.example.com')),
1827            ('proxy.example.com:3128',
1828             (None, None, None, 'proxy.example.com:3128')),
1829            # The authority component may optionally include userinfo
1830            # (assumed to be # username:password):
1831            ('joe:password@proxy.example.com',
1832             (None, 'joe', 'password', 'proxy.example.com')),
1833            ('joe:password@proxy.example.com:3128',
1834             (None, 'joe', 'password', 'proxy.example.com:3128')),
1835            #Examples with URLS
1836            ('http://proxy.example.com/',
1837             ('http', None, None, 'proxy.example.com')),
1838            ('http://proxy.example.com:3128/',
1839             ('http', None, None, 'proxy.example.com:3128')),
1840            ('http://joe:password@proxy.example.com/',
1841             ('http', 'joe', 'password', 'proxy.example.com')),
1842            ('http://joe:password@proxy.example.com:3128',
1843             ('http', 'joe', 'password', 'proxy.example.com:3128')),
1844            # Everything after the authority is ignored
1845            ('ftp://joe:password@proxy.example.com/rubbish:3128',
1846             ('ftp', 'joe', 'password', 'proxy.example.com')),
1847            # Test for no trailing '/' case
1848            ('http://joe:password@proxy.example.com',
1849             ('http', 'joe', 'password', 'proxy.example.com')),
1850            # Testcases with '/' character in username, password
1851            ('http://user/name:password@localhost:22',
1852             ('http', 'user/name', 'password', 'localhost:22')),
1853            ('http://username:pass/word@localhost:22',
1854             ('http', 'username', 'pass/word', 'localhost:22')),
1855            ('http://user/name:pass/word@localhost:22',
1856             ('http', 'user/name', 'pass/word', 'localhost:22')),
1857        ]
1858
1859
1860        for tc, expected in parse_proxy_test_cases:
1861            self.assertEqual(_parse_proxy(tc), expected)
1862
1863        self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'),
1864
1865    def test_unsupported_algorithm(self):
1866        handler = AbstractDigestAuthHandler()
1867        with self.assertRaises(ValueError) as exc:
1868            handler.get_algorithm_impls('invalid')
1869        self.assertEqual(
1870            str(exc.exception),
1871            "Unsupported digest authentication algorithm 'invalid'"
1872        )
1873
1874
1875class RequestTests(unittest.TestCase):
1876    class PutRequest(Request):
1877        method = 'PUT'
1878
1879    def setUp(self):
1880        self.get = Request("http://www.python.org/~jeremy/")
1881        self.post = Request("http://www.python.org/~jeremy/",
1882                            "data",
1883                            headers={"X-Test": "test"})
1884        self.head = Request("http://www.python.org/~jeremy/", method='HEAD')
1885        self.put = self.PutRequest("http://www.python.org/~jeremy/")
1886        self.force_post = self.PutRequest("http://www.python.org/~jeremy/",
1887            method="POST")
1888
1889    def test_method(self):
1890        self.assertEqual("POST", self.post.get_method())
1891        self.assertEqual("GET", self.get.get_method())
1892        self.assertEqual("HEAD", self.head.get_method())
1893        self.assertEqual("PUT", self.put.get_method())
1894        self.assertEqual("POST", self.force_post.get_method())
1895
1896    def test_data(self):
1897        self.assertFalse(self.get.data)
1898        self.assertEqual("GET", self.get.get_method())
1899        self.get.data = "spam"
1900        self.assertTrue(self.get.data)
1901        self.assertEqual("POST", self.get.get_method())
1902
1903    # issue 16464
1904    # if we change data we need to remove content-length header
1905    # (cause it's most probably calculated for previous value)
1906    def test_setting_data_should_remove_content_length(self):
1907        self.assertNotIn("Content-length", self.get.unredirected_hdrs)
1908        self.get.add_unredirected_header("Content-length", 42)
1909        self.assertEqual(42, self.get.unredirected_hdrs["Content-length"])
1910        self.get.data = "spam"
1911        self.assertNotIn("Content-length", self.get.unredirected_hdrs)
1912
1913    # issue 17485 same for deleting data.
1914    def test_deleting_data_should_remove_content_length(self):
1915        self.assertNotIn("Content-length", self.get.unredirected_hdrs)
1916        self.get.data = 'foo'
1917        self.get.add_unredirected_header("Content-length", 3)
1918        self.assertEqual(3, self.get.unredirected_hdrs["Content-length"])
1919        del self.get.data
1920        self.assertNotIn("Content-length", self.get.unredirected_hdrs)
1921
1922    def test_get_full_url(self):
1923        self.assertEqual("http://www.python.org/~jeremy/",
1924                         self.get.get_full_url())
1925
1926    def test_selector(self):
1927        self.assertEqual("/~jeremy/", self.get.selector)
1928        req = Request("http://www.python.org/")
1929        self.assertEqual("/", req.selector)
1930
1931    def test_get_type(self):
1932        self.assertEqual("http", self.get.type)
1933
1934    def test_get_host(self):
1935        self.assertEqual("www.python.org", self.get.host)
1936
1937    def test_get_host_unquote(self):
1938        req = Request("http://www.%70ython.org/")
1939        self.assertEqual("www.python.org", req.host)
1940
1941    def test_proxy(self):
1942        self.assertFalse(self.get.has_proxy())
1943        self.get.set_proxy("www.perl.org", "http")
1944        self.assertTrue(self.get.has_proxy())
1945        self.assertEqual("www.python.org", self.get.origin_req_host)
1946        self.assertEqual("www.perl.org", self.get.host)
1947
1948    def test_wrapped_url(self):
1949        req = Request("<URL:http://www.python.org>")
1950        self.assertEqual("www.python.org", req.host)
1951
1952    def test_url_fragment(self):
1953        req = Request("http://www.python.org/?qs=query#fragment=true")
1954        self.assertEqual("/?qs=query", req.selector)
1955        req = Request("http://www.python.org/#fun=true")
1956        self.assertEqual("/", req.selector)
1957
1958        # Issue 11703: geturl() omits fragment in the original URL.
1959        url = 'http://docs.python.org/library/urllib2.html#OK'
1960        req = Request(url)
1961        self.assertEqual(req.get_full_url(), url)
1962
1963    def test_url_fullurl_get_full_url(self):
1964        urls = ['http://docs.python.org',
1965                'http://docs.python.org/library/urllib2.html#OK',
1966                'http://www.python.org/?qs=query#fragment=true']
1967        for url in urls:
1968            req = Request(url)
1969            self.assertEqual(req.get_full_url(), req.full_url)
1970
1971
1972if __name__ == "__main__":
1973    unittest.main()
1974