1import unittest
2from test import test_support
3from test.test_urllib2 import sanepathname2url
4
5import socket
6import urllib2
7import os
8import sys
9
10TIMEOUT = 60  # seconds
11
12
13def _retry_thrice(func, exc, *args, **kwargs):
14    for i in range(3):
15        try:
16            return func(*args, **kwargs)
17        except exc, last_exc:
18            continue
19        except:
20            raise
21    raise last_exc
22
23def _wrap_with_retry_thrice(func, exc):
24    def wrapped(*args, **kwargs):
25        return _retry_thrice(func, exc, *args, **kwargs)
26    return wrapped
27
28# bpo-35411: FTP tests of test_urllib2net randomly fail
29# with "425 Security: Bad IP connecting" on Travis CI
30skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ,
31                                          'bpo-35411: skip FTP test '
32                                          'on Travis CI')
33
34
35# Connecting to remote hosts is flaky.  Make it more robust by retrying
36# the connection several times.
37_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
38
39
40class AuthTests(unittest.TestCase):
41    """Tests urllib2 authentication features."""
42
43## Disabled at the moment since there is no page under python.org which
44## could be used to HTTP authentication.
45#
46#    def test_basic_auth(self):
47#        import httplib
48#
49#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
50#        test_hostport = "www.python.org"
51#        test_realm = 'Test Realm'
52#        test_user = 'test.test_urllib2net'
53#        test_password = 'blah'
54#
55#        # failure
56#        try:
57#            _urlopen_with_retry(test_url)
58#        except urllib2.HTTPError, exc:
59#            self.assertEqual(exc.code, 401)
60#        else:
61#            self.fail("urlopen() should have failed with 401")
62#
63#        # success
64#        auth_handler = urllib2.HTTPBasicAuthHandler()
65#        auth_handler.add_password(test_realm, test_hostport,
66#                                  test_user, test_password)
67#        opener = urllib2.build_opener(auth_handler)
68#        f = opener.open('http://localhost/')
69#        response = _urlopen_with_retry("http://www.python.org/")
70#
71#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
72#        # reasons, let's not implement it!  (it's already implemented for proxy
73#        # specification strings (that is, URLs or authorities specifying a
74#        # proxy), so we must keep that)
75#        self.assertRaises(httplib.InvalidURL,
76#                          urllib2.urlopen, "http://evil:thing@example.com")
77
78
79class CloseSocketTest(unittest.TestCase):
80
81    def test_close(self):
82        import httplib
83
84        # calling .close() on urllib2's response objects should close the
85        # underlying socket
86
87        # delve deep into response to fetch socket._socketobject
88        response = _urlopen_with_retry(test_support.TEST_HTTP_URL)
89        abused_fileobject = response.fp
90        self.assertIs(abused_fileobject.__class__, socket._fileobject)
91        httpresponse = abused_fileobject._sock
92        self.assertIs(httpresponse.__class__, httplib.HTTPResponse)
93        fileobject = httpresponse.fp
94        self.assertIs(fileobject.__class__, socket._fileobject)
95
96        self.assertTrue(not fileobject.closed)
97        response.close()
98        self.assertTrue(fileobject.closed)
99
100class OtherNetworkTests(unittest.TestCase):
101    def setUp(self):
102        if 0:  # for debugging
103            import logging
104            logger = logging.getLogger("test_urllib2net")
105            logger.addHandler(logging.StreamHandler())
106
107    # XXX The rest of these tests aren't very good -- they don't check much.
108    # They do sometimes catch some major disasters, though.
109
110    @skip_ftp_test_on_travis
111    def test_ftp(self):
112        urls = [
113            'ftp://www.pythontest.net/README',
114            ('ftp://www.pythontest.net/non-existent-file',
115             None, urllib2.URLError),
116            ]
117        self._test_urls(urls, self._extra_handlers())
118
119    def test_file(self):
120        TESTFN = test_support.TESTFN
121        f = open(TESTFN, 'w')
122        try:
123            f.write('hi there\n')
124            f.close()
125            urls = [
126                'file:'+sanepathname2url(os.path.abspath(TESTFN)),
127                ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
128                ]
129            self._test_urls(urls, self._extra_handlers(), retry=True)
130        finally:
131            os.remove(TESTFN)
132
133        self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
134
135    # XXX Following test depends on machine configurations that are internal
136    # to CNRI.  Need to set up a public server with the right authentication
137    # configuration for test purposes.
138
139##     def test_cnri(self):
140##         if socket.gethostname() == 'bitdiddle':
141##             localhost = 'bitdiddle.cnri.reston.va.us'
142##         elif socket.gethostname() == 'bitdiddle.concentric.net':
143##             localhost = 'localhost'
144##         else:
145##             localhost = None
146##         if localhost is not None:
147##             urls = [
148##                 'file://%s/etc/passwd' % localhost,
149##                 'http://%s/simple/' % localhost,
150##                 'http://%s/digest/' % localhost,
151##                 'http://%s/not/found.h' % localhost,
152##                 ]
153
154##             bauth = HTTPBasicAuthHandler()
155##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
156##                                'password')
157##             dauth = HTTPDigestAuthHandler()
158##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
159##                                'password')
160
161##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
162
163    def test_urlwithfrag(self):
164        urlwith_frag = "http://www.pythontest.net/index.html#frag"
165        with test_support.transient_internet(urlwith_frag):
166            req = urllib2.Request(urlwith_frag)
167            res = urllib2.urlopen(req)
168            self.assertEqual(res.geturl(),
169                    "http://www.pythontest.net/index.html#frag")
170
171    def test_fileno(self):
172        req = urllib2.Request(test_support.TEST_HTTP_URL)
173        opener = urllib2.build_opener()
174        res = opener.open(req)
175        try:
176            res.fileno()
177        except AttributeError:
178            self.fail("HTTPResponse object should return a valid fileno")
179        finally:
180            res.close()
181
182    def test_custom_headers(self):
183        url = test_support.TEST_HTTP_URL
184        with test_support.transient_internet(url):
185            opener = urllib2.build_opener()
186            request = urllib2.Request(url)
187            self.assertFalse(request.header_items())
188            opener.open(request)
189            self.assertTrue(request.header_items())
190            self.assertTrue(request.has_header('User-agent'))
191            request.add_header('User-Agent','Test-Agent')
192            opener.open(request)
193            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
194
195    @unittest.skip('XXX: http://www.imdb.com is gone')
196    def test_sites_no_connection_close(self):
197        # Some sites do not send Connection: close header.
198        # Verify that those work properly. (#issue12576)
199
200        URL = 'http://www.imdb.com' # No Connection:close
201        with test_support.transient_internet(URL):
202            req = urllib2.urlopen(URL)
203            res = req.read()
204            self.assertTrue(res)
205
206    def _test_urls(self, urls, handlers, retry=True):
207        import time
208        import logging
209        debug = logging.getLogger("test_urllib2").debug
210
211        urlopen = urllib2.build_opener(*handlers).open
212        if retry:
213            urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
214
215        for url in urls:
216            if isinstance(url, tuple):
217                url, req, expected_err = url
218            else:
219                req = expected_err = None
220            with test_support.transient_internet(url):
221                debug(url)
222                try:
223                    f = urlopen(url, req, TIMEOUT)
224                except EnvironmentError as err:
225                    debug(err)
226                    if expected_err:
227                        msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
228                               (expected_err, url, req, type(err), err))
229                        self.assertIsInstance(err, expected_err, msg)
230                except urllib2.URLError as err:
231                    if isinstance(err[0], socket.timeout):
232                        print >>sys.stderr, "<timeout: %s>" % url
233                        continue
234                    else:
235                        raise
236                else:
237                    try:
238                        with test_support.transient_internet(url):
239                            buf = f.read()
240                            debug("read %d bytes" % len(buf))
241                    except socket.timeout:
242                        print >>sys.stderr, "<timeout: %s>" % url
243                    f.close()
244            debug("******** next url coming up...")
245            time.sleep(0.1)
246
247    def _extra_handlers(self):
248        handlers = []
249
250        cfh = urllib2.CacheFTPHandler()
251        self.addCleanup(cfh.clear_cache)
252        cfh.setTimeout(1)
253        handlers.append(cfh)
254
255        return handlers
256
257
258class TimeoutTest(unittest.TestCase):
259    def test_http_basic(self):
260        self.assertIsNone(socket.getdefaulttimeout())
261        url = test_support.TEST_HTTP_URL
262        with test_support.transient_internet(url, timeout=None):
263            u = _urlopen_with_retry(url)
264            self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
265
266    def test_http_default_timeout(self):
267        self.assertIsNone(socket.getdefaulttimeout())
268        url = test_support.TEST_HTTP_URL
269        with test_support.transient_internet(url):
270            socket.setdefaulttimeout(60)
271            try:
272                u = _urlopen_with_retry(url)
273            finally:
274                socket.setdefaulttimeout(None)
275            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
276
277    def test_http_no_timeout(self):
278        self.assertIsNone(socket.getdefaulttimeout())
279        url = test_support.TEST_HTTP_URL
280        with test_support.transient_internet(url):
281            socket.setdefaulttimeout(60)
282            try:
283                u = _urlopen_with_retry(url, timeout=None)
284            finally:
285                socket.setdefaulttimeout(None)
286            self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
287
288    def test_http_timeout(self):
289        url = test_support.TEST_HTTP_URL
290        with test_support.transient_internet(url):
291            u = _urlopen_with_retry(url, timeout=120)
292            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
293
294    FTP_HOST = 'ftp://www.pythontest.net/'
295
296    @skip_ftp_test_on_travis
297    def test_ftp_basic(self):
298        self.assertIsNone(socket.getdefaulttimeout())
299        with test_support.transient_internet(self.FTP_HOST, timeout=None):
300            u = _urlopen_with_retry(self.FTP_HOST)
301            self.assertIsNone(u.fp.fp._sock.gettimeout())
302
303    @skip_ftp_test_on_travis
304    def test_ftp_default_timeout(self):
305        self.assertIsNone(socket.getdefaulttimeout())
306        with test_support.transient_internet(self.FTP_HOST):
307            socket.setdefaulttimeout(60)
308            try:
309                u = _urlopen_with_retry(self.FTP_HOST)
310            finally:
311                socket.setdefaulttimeout(None)
312            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
313
314    @skip_ftp_test_on_travis
315    def test_ftp_no_timeout(self):
316        self.assertIsNone(socket.getdefaulttimeout(),)
317        with test_support.transient_internet(self.FTP_HOST):
318            socket.setdefaulttimeout(60)
319            try:
320                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
321            finally:
322                socket.setdefaulttimeout(None)
323            self.assertIsNone(u.fp.fp._sock.gettimeout())
324
325    @skip_ftp_test_on_travis
326    def test_ftp_timeout(self):
327        with test_support.transient_internet(self.FTP_HOST):
328            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
329            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
330
331
332def test_main():
333    test_support.requires("network")
334    test_support.run_unittest(AuthTests,
335                              OtherNetworkTests,
336                              CloseSocketTest,
337                              TimeoutTest,
338                              )
339
340if __name__ == "__main__":
341    test_main()
342