1import os
2import base64
3import urlparse
4import urllib2
5import BaseHTTPServer
6import unittest
7import hashlib
8
9from test import test_support
10
11mimetools = test_support.import_module('mimetools', deprecated=True)
12threading = test_support.import_module('threading')
13
14try:
15    import ssl
16except ImportError:
17    ssl = None
18
19here = os.path.dirname(__file__)
20# Self-signed cert file for 'localhost'
21CERT_localhost = os.path.join(here, 'keycert.pem')
22# Self-signed cert file for 'fakehostname'
23CERT_fakehostname = os.path.join(here, 'keycert2.pem')
24
25# Loopback http server infrastructure
26
27class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
28    """HTTP server w/ a few modifications that make it useful for
29    loopback testing purposes.
30    """
31
32    def __init__(self, server_address, RequestHandlerClass):
33        BaseHTTPServer.HTTPServer.__init__(self,
34                                           server_address,
35                                           RequestHandlerClass)
36
37        # Set the timeout of our listening socket really low so
38        # that we can stop the server easily.
39        self.socket.settimeout(0.1)
40
41    def get_request(self):
42        """BaseHTTPServer method, overridden."""
43
44        request, client_address = self.socket.accept()
45
46        # It's a loopback connection, so setting the timeout
47        # really low shouldn't affect anything, but should make
48        # deadlocks less likely to occur.
49        request.settimeout(10.0)
50
51        return (request, client_address)
52
53class LoopbackHttpServerThread(threading.Thread):
54    """Stoppable thread that runs a loopback http server."""
55
56    def __init__(self, request_handler):
57        threading.Thread.__init__(self)
58        self._stop = False
59        self.ready = threading.Event()
60        request_handler.protocol_version = "HTTP/1.0"
61        self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
62                                        request_handler)
63        #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
64        #                                      self.httpd.server_port)
65        self.port = self.httpd.server_port
66
67    def stop(self):
68        """Stops the webserver if it's currently running."""
69
70        # Set the stop flag.
71        self._stop = True
72
73        self.join()
74
75    def run(self):
76        self.ready.set()
77        while not self._stop:
78            self.httpd.handle_request()
79
80# Authentication infrastructure
81
82
83class BasicAuthHandler(BaseHTTPServer.BaseHTTPRequestHandler):
84    """Handler for performing Basic Authentication."""
85    # Server side values
86    USER = "testUser"
87    PASSWD = "testPass"
88    REALM = "Test"
89    USER_PASSWD = "%s:%s" % (USER, PASSWD)
90    ENCODED_AUTH = base64.b64encode(USER_PASSWD)
91
92    def __init__(self, *args, **kwargs):
93        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
94
95    def log_message(self, format, *args):
96        # Suppress the HTTP Console log output
97        pass
98
99    def do_HEAD(self):
100        self.send_response(200)
101        self.send_header("Content-type", "text/html")
102        self.end_headers()
103
104    def do_AUTHHEAD(self):
105        self.send_response(401)
106        self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
107        self.send_header("Content-type", "text/html")
108        self.end_headers()
109
110    def do_GET(self):
111        if self.headers.getheader("Authorization") == None:
112            self.do_AUTHHEAD()
113            self.wfile.write("No Auth Header Received")
114        elif self.headers.getheader(
115                "Authorization") == "Basic " + self.ENCODED_AUTH:
116            self.wfile.write("It works!")
117        else:
118            # Unauthorized Request
119            self.do_AUTHHEAD()
120
121
122class DigestAuthHandler:
123    """Handler for performing digest authentication."""
124
125    def __init__(self):
126        self._request_num = 0
127        self._nonces = []
128        self._users = {}
129        self._realm_name = "Test Realm"
130        self._qop = "auth"
131
132    def set_qop(self, qop):
133        self._qop = qop
134
135    def set_users(self, users):
136        assert isinstance(users, dict)
137        self._users = users
138
139    def set_realm(self, realm):
140        self._realm_name = realm
141
142    def _generate_nonce(self):
143        self._request_num += 1
144        nonce = hashlib.md5(str(self._request_num)).hexdigest()
145        self._nonces.append(nonce)
146        return nonce
147
148    def _create_auth_dict(self, auth_str):
149        first_space_index = auth_str.find(" ")
150        auth_str = auth_str[first_space_index+1:]
151
152        parts = auth_str.split(",")
153
154        auth_dict = {}
155        for part in parts:
156            name, value = part.split("=")
157            name = name.strip()
158            if value[0] == '"' and value[-1] == '"':
159                value = value[1:-1]
160            else:
161                value = value.strip()
162            auth_dict[name] = value
163        return auth_dict
164
165    def _validate_auth(self, auth_dict, password, method, uri):
166        final_dict = {}
167        final_dict.update(auth_dict)
168        final_dict["password"] = password
169        final_dict["method"] = method
170        final_dict["uri"] = uri
171        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
172        HA1 = hashlib.md5(HA1_str).hexdigest()
173        HA2_str = "%(method)s:%(uri)s" % final_dict
174        HA2 = hashlib.md5(HA2_str).hexdigest()
175        final_dict["HA1"] = HA1
176        final_dict["HA2"] = HA2
177        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
178                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
179        response = hashlib.md5(response_str).hexdigest()
180
181        return response == auth_dict["response"]
182
183    def _return_auth_challenge(self, request_handler):
184        request_handler.send_response(407, "Proxy Authentication Required")
185        request_handler.send_header("Content-Type", "text/html")
186        request_handler.send_header(
187            'Proxy-Authenticate', 'Digest realm="%s", '
188            'qop="%s",'
189            'nonce="%s", ' % \
190            (self._realm_name, self._qop, self._generate_nonce()))
191        # XXX: Not sure if we're supposed to add this next header or
192        # not.
193        #request_handler.send_header('Connection', 'close')
194        request_handler.end_headers()
195        request_handler.wfile.write("Proxy Authentication Required.")
196        return False
197
198    def handle_request(self, request_handler):
199        """Performs digest authentication on the given HTTP request
200        handler.  Returns True if authentication was successful, False
201        otherwise.
202
203        If no users have been set, then digest auth is effectively
204        disabled and this method will always return True.
205        """
206
207        if len(self._users) == 0:
208            return True
209
210        if 'Proxy-Authorization' not in request_handler.headers:
211            return self._return_auth_challenge(request_handler)
212        else:
213            auth_dict = self._create_auth_dict(
214                request_handler.headers['Proxy-Authorization']
215                )
216            if auth_dict["username"] in self._users:
217                password = self._users[ auth_dict["username"] ]
218            else:
219                return self._return_auth_challenge(request_handler)
220            if not auth_dict.get("nonce") in self._nonces:
221                return self._return_auth_challenge(request_handler)
222            else:
223                self._nonces.remove(auth_dict["nonce"])
224
225            auth_validated = False
226
227            # MSIE uses short_path in its validation, but Python's
228            # urllib2 uses the full path, so we're going to see if
229            # either of them works here.
230
231            for path in [request_handler.path, request_handler.short_path]:
232                if self._validate_auth(auth_dict,
233                                       password,
234                                       request_handler.command,
235                                       path):
236                    auth_validated = True
237
238            if not auth_validated:
239                return self._return_auth_challenge(request_handler)
240            return True
241
242# Proxy test infrastructure
243
244class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
245    """This is a 'fake proxy' that makes it look like the entire
246    internet has gone down due to a sudden zombie invasion.  It main
247    utility is in providing us with authentication support for
248    testing.
249    """
250
251    def __init__(self, digest_auth_handler, *args, **kwargs):
252        # This has to be set before calling our parent's __init__(), which will
253        # try to call do_GET().
254        self.digest_auth_handler = digest_auth_handler
255        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
256
257    def log_message(self, format, *args):
258        # Uncomment the next line for debugging.
259        #sys.stderr.write(format % args)
260        pass
261
262    def do_GET(self):
263        (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
264            self.path, 'http')
265        self.short_path = path
266        if self.digest_auth_handler.handle_request(self):
267            self.send_response(200, "OK")
268            self.send_header("Content-Type", "text/html")
269            self.end_headers()
270            self.wfile.write("You've reached %s!<BR>" % self.path)
271            self.wfile.write("Our apologies, but our server is down due to "
272                              "a sudden zombie invasion.")
273
274# Test cases
275
276class BaseTestCase(unittest.TestCase):
277    def setUp(self):
278        self._threads = test_support.threading_setup()
279
280    def tearDown(self):
281        test_support.threading_cleanup(*self._threads)
282
283
284class BasicAuthTests(BaseTestCase):
285    USER = "testUser"
286    PASSWD = "testPass"
287    INCORRECT_PASSWD = "Incorrect"
288    REALM = "Test"
289
290    def setUp(self):
291        super(BasicAuthTests, self).setUp()
292        # With Basic Authentication
293        def http_server_with_basic_auth_handler(*args, **kwargs):
294            return BasicAuthHandler(*args, **kwargs)
295        self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
296        self.server_url = 'http://127.0.0.1:%s' % self.server.port
297        self.server.start()
298        self.server.ready.wait()
299
300    def tearDown(self):
301        self.server.stop()
302        super(BasicAuthTests, self).tearDown()
303
304    def test_basic_auth_success(self):
305        ah = urllib2.HTTPBasicAuthHandler()
306        ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
307        urllib2.install_opener(urllib2.build_opener(ah))
308        try:
309            self.assertTrue(urllib2.urlopen(self.server_url))
310        except urllib2.HTTPError:
311            self.fail("Basic Auth Failed for url: %s" % self.server_url)
312        except Exception as e:
313            raise e
314
315    def test_basic_auth_httperror(self):
316        ah = urllib2.HTTPBasicAuthHandler()
317        ah.add_password(self.REALM, self.server_url, self.USER,
318                        self.INCORRECT_PASSWD)
319        urllib2.install_opener(urllib2.build_opener(ah))
320        self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
321
322
323class ProxyAuthTests(BaseTestCase):
324    URL = "http://localhost"
325
326    USER = "tester"
327    PASSWD = "test123"
328    REALM = "TestRealm"
329
330    def setUp(self):
331        super(ProxyAuthTests, self).setUp()
332        # Ignore proxy bypass settings in the environment.
333        def restore_environ(old_environ):
334            os.environ.clear()
335            os.environ.update(old_environ)
336        self.addCleanup(restore_environ, os.environ.copy())
337        os.environ['NO_PROXY'] = ''
338        os.environ['no_proxy'] = ''
339
340        self.digest_auth_handler = DigestAuthHandler()
341        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
342        self.digest_auth_handler.set_realm(self.REALM)
343        # With Digest Authentication
344        def create_fake_proxy_handler(*args, **kwargs):
345            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
346
347        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
348        self.server.start()
349        self.server.ready.wait()
350        proxy_url = "http://127.0.0.1:%d" % self.server.port
351        handler = urllib2.ProxyHandler({"http" : proxy_url})
352        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
353        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
354
355    def tearDown(self):
356        self.server.stop()
357        super(ProxyAuthTests, self).tearDown()
358
359    def test_proxy_with_bad_password_raises_httperror(self):
360        self.proxy_digest_handler.add_password(self.REALM, self.URL,
361                                               self.USER, self.PASSWD+"bad")
362        self.digest_auth_handler.set_qop("auth")
363        self.assertRaises(urllib2.HTTPError,
364                          self.opener.open,
365                          self.URL)
366
367    def test_proxy_with_no_password_raises_httperror(self):
368        self.digest_auth_handler.set_qop("auth")
369        self.assertRaises(urllib2.HTTPError,
370                          self.opener.open,
371                          self.URL)
372
373    def test_proxy_qop_auth_works(self):
374        self.proxy_digest_handler.add_password(self.REALM, self.URL,
375                                               self.USER, self.PASSWD)
376        self.digest_auth_handler.set_qop("auth")
377        result = self.opener.open(self.URL)
378        while result.read():
379            pass
380        result.close()
381
382    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
383        self.proxy_digest_handler.add_password(self.REALM, self.URL,
384                                               self.USER, self.PASSWD)
385        self.digest_auth_handler.set_qop("auth-int")
386        try:
387            result = self.opener.open(self.URL)
388        except urllib2.URLError:
389            # It's okay if we don't support auth-int, but we certainly
390            # shouldn't receive any kind of exception here other than
391            # a URLError.
392            result = None
393        if result:
394            while result.read():
395                pass
396            result.close()
397
398
399def GetRequestHandler(responses):
400
401    class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
402
403        server_version = "TestHTTP/"
404        requests = []
405        headers_received = []
406        port = 80
407
408        def do_GET(self):
409            body = self.send_head()
410            if body:
411                self.wfile.write(body)
412
413        def do_POST(self):
414            content_length = self.headers['Content-Length']
415            post_data = self.rfile.read(int(content_length))
416            self.do_GET()
417            self.requests.append(post_data)
418
419        def send_head(self):
420            FakeHTTPRequestHandler.headers_received = self.headers
421            self.requests.append(self.path)
422            response_code, headers, body = responses.pop(0)
423
424            self.send_response(response_code)
425
426            for (header, value) in headers:
427                self.send_header(header, value % self.port)
428            if body:
429                self.send_header('Content-type', 'text/plain')
430                self.end_headers()
431                return body
432            self.end_headers()
433
434        def log_message(self, *args):
435            pass
436
437
438    return FakeHTTPRequestHandler
439
440
441class TestUrlopen(BaseTestCase):
442    """Tests urllib2.urlopen using the network.
443
444    These tests are not exhaustive.  Assuming that testing using files does a
445    good job overall of some of the basic interface features.  There are no
446    tests exercising the optional 'data' and 'proxies' arguments.  No tests
447    for transparent redirection have been written.
448    """
449
450    def setUp(self):
451        proxy_handler = urllib2.ProxyHandler({})
452        opener = urllib2.build_opener(proxy_handler)
453        urllib2.install_opener(opener)
454        super(TestUrlopen, self).setUp()
455
456    def urlopen(self, url, data=None, **kwargs):
457        l = []
458        f = urllib2.urlopen(url, data, **kwargs)
459        try:
460            # Exercise various methods
461            l.extend(f.readlines(200))
462            l.append(f.readline())
463            l.append(f.read(1024))
464            l.append(f.read())
465        finally:
466            f.close()
467        return b"".join(l)
468
469    def start_server(self, responses):
470        handler = GetRequestHandler(responses)
471
472        self.server = LoopbackHttpServerThread(handler)
473        self.server.start()
474        self.server.ready.wait()
475        port = self.server.port
476        handler.port = port
477        return handler
478
479    def start_https_server(self, responses=None, **kwargs):
480        if not hasattr(urllib2, 'HTTPSHandler'):
481            self.skipTest('ssl support required')
482        from test.ssl_servers import make_https_server
483        if responses is None:
484            responses = [(200, [], b"we care a bit")]
485        handler = GetRequestHandler(responses)
486        server = make_https_server(self, handler_class=handler, **kwargs)
487        handler.port = server.port
488        return handler
489
490    def test_redirection(self):
491        expected_response = 'We got here...'
492        responses = [
493            (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
494            (200, [], expected_response)
495        ]
496
497        handler = self.start_server(responses)
498
499        try:
500            f = urllib2.urlopen('http://localhost:%s/' % handler.port)
501            data = f.read()
502            f.close()
503
504            self.assertEqual(data, expected_response)
505            self.assertEqual(handler.requests, ['/', '/somewhere_else'])
506        finally:
507            self.server.stop()
508
509
510    def test_404(self):
511        expected_response = 'Bad bad bad...'
512        handler = self.start_server([(404, [], expected_response)])
513
514        try:
515            try:
516                urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
517            except urllib2.URLError, f:
518                pass
519            else:
520                self.fail('404 should raise URLError')
521
522            data = f.read()
523            f.close()
524
525            self.assertEqual(data, expected_response)
526            self.assertEqual(handler.requests, ['/weeble'])
527        finally:
528            self.server.stop()
529
530
531    def test_200(self):
532        expected_response = 'pycon 2008...'
533        handler = self.start_server([(200, [], expected_response)])
534
535        try:
536            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
537            data = f.read()
538            f.close()
539
540            self.assertEqual(data, expected_response)
541            self.assertEqual(handler.requests, ['/bizarre'])
542        finally:
543            self.server.stop()
544
545    def test_200_with_parameters(self):
546        expected_response = 'pycon 2008...'
547        handler = self.start_server([(200, [], expected_response)])
548
549        try:
550            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
551            data = f.read()
552            f.close()
553
554            self.assertEqual(data, expected_response)
555            self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
556        finally:
557            self.server.stop()
558
559    def test_https(self):
560        handler = self.start_https_server()
561        context = ssl.create_default_context(cafile=CERT_localhost)
562        data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
563        self.assertEqual(data, b"we care a bit")
564
565    def test_https_with_cafile(self):
566        handler = self.start_https_server(certfile=CERT_localhost)
567        # Good cert
568        data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
569                            cafile=CERT_localhost)
570        self.assertEqual(data, b"we care a bit")
571        # Bad cert
572        with self.assertRaises(urllib2.URLError):
573            self.urlopen("https://localhost:%s/bizarre" % handler.port,
574                         cafile=CERT_fakehostname)
575        # Good cert, but mismatching hostname
576        handler = self.start_https_server(certfile=CERT_fakehostname)
577        with self.assertRaises(ssl.CertificateError):
578            self.urlopen("https://localhost:%s/bizarre" % handler.port,
579                         cafile=CERT_fakehostname)
580
581    def test_https_with_cadefault(self):
582        handler = self.start_https_server(certfile=CERT_localhost)
583        # Self-signed cert should fail verification with system certificate store
584        with self.assertRaises(urllib2.URLError):
585            self.urlopen("https://localhost:%s/bizarre" % handler.port,
586                         cadefault=True)
587
588    def test_https_sni(self):
589        if ssl is None:
590            self.skipTest("ssl module required")
591        if not ssl.HAS_SNI:
592            self.skipTest("SNI support required in OpenSSL")
593        sni_name = [None]
594        def cb_sni(ssl_sock, server_name, initial_context):
595            sni_name[0] = server_name
596        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
597        context.set_servername_callback(cb_sni)
598        handler = self.start_https_server(context=context, certfile=CERT_localhost)
599        context = ssl.create_default_context(cafile=CERT_localhost)
600        self.urlopen("https://localhost:%s" % handler.port, context=context)
601        self.assertEqual(sni_name[0], "localhost")
602
603    def test_sending_headers(self):
604        handler = self.start_server([(200, [], "we don't care")])
605
606        try:
607            req = urllib2.Request("http://localhost:%s/" % handler.port,
608                                  headers={'Range': 'bytes=20-39'})
609            urllib2.urlopen(req)
610            self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
611        finally:
612            self.server.stop()
613
614    def test_basic(self):
615        handler = self.start_server([(200, [], "we don't care")])
616
617        try:
618            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
619            for attr in ("read", "close", "info", "geturl"):
620                self.assertTrue(hasattr(open_url, attr), "object returned from "
621                             "urlopen lacks the %s attribute" % attr)
622            try:
623                self.assertTrue(open_url.read(), "calling 'read' failed")
624            finally:
625                open_url.close()
626        finally:
627            self.server.stop()
628
629    def test_info(self):
630        handler = self.start_server([(200, [], "we don't care")])
631
632        try:
633            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
634            info_obj = open_url.info()
635            self.assertIsInstance(info_obj, mimetools.Message,
636                                  "object returned by 'info' is not an "
637                                  "instance of mimetools.Message")
638            self.assertEqual(info_obj.getsubtype(), "plain")
639        finally:
640            self.server.stop()
641
642    def test_geturl(self):
643        # Make sure same URL as opened is returned by geturl.
644        handler = self.start_server([(200, [], "we don't care")])
645
646        try:
647            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
648            url = open_url.geturl()
649            self.assertEqual(url, "http://localhost:%s" % handler.port)
650        finally:
651            self.server.stop()
652
653
654    def test_bad_address(self):
655        # Make sure proper exception is raised when connecting to a bogus
656        # address.
657
658        # as indicated by the comment below, this might fail with some ISP,
659        # so we run the test only when -unetwork/-uall is specified to
660        # mitigate the problem a bit (see #17564)
661        test_support.requires('network')
662        self.assertRaises(IOError,
663                          # Given that both VeriSign and various ISPs have in
664                          # the past or are presently hijacking various invalid
665                          # domain name requests in an attempt to boost traffic
666                          # to their own sites, finding a domain name to use
667                          # for this test is difficult.  RFC2606 leads one to
668                          # believe that '.invalid' should work, but experience
669                          # seemed to indicate otherwise.  Single character
670                          # TLDs are likely to remain invalid, so this seems to
671                          # be the best choice. The trailing '.' prevents a
672                          # related problem: The normal DNS resolver appends
673                          # the domain names from the search path if there is
674                          # no '.' the end and, and if one of those domains
675                          # implements a '*' rule a result is returned.
676                          # However, none of this will prevent the test from
677                          # failing if the ISP hijacks all invalid domain
678                          # requests.  The real solution would be to be able to
679                          # parameterize the framework with a mock resolver.
680                          urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
681
682    def test_iteration(self):
683        expected_response = "pycon 2008..."
684        handler = self.start_server([(200, [], expected_response)])
685        try:
686            data = urllib2.urlopen("http://localhost:%s" % handler.port)
687            for line in data:
688                self.assertEqual(line, expected_response)
689        finally:
690            self.server.stop()
691
692    def ztest_line_iteration(self):
693        lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
694        expected_response = "".join(lines)
695        handler = self.start_server([(200, [], expected_response)])
696        try:
697            data = urllib2.urlopen("http://localhost:%s" % handler.port)
698            for index, line in enumerate(data):
699                self.assertEqual(line, lines[index],
700                                 "Fetched line number %s doesn't match expected:\n"
701                                 "    Expected length was %s, got %s" %
702                                 (index, len(lines[index]), len(line)))
703        finally:
704            self.server.stop()
705        self.assertEqual(index + 1, len(lines))
706
707def test_main():
708    # We will NOT depend on the network resource flag
709    # (Lib/test/regrtest.py -u network) since all tests here are only
710    # localhost.  However, if this is a bad rationale, then uncomment
711    # the next line.
712    #test_support.requires("network")
713
714    test_support.run_unittest(BasicAuthTests, ProxyAuthTests, TestUrlopen)
715
716if __name__ == "__main__":
717    test_main()
718