1import errno
2import unittest
3from test import support
4from test.support import os_helper
5from test.support import socket_helper
6from test.test_urllib2 import sanepathname2url
7
8import os
9import socket
10import urllib.error
11import urllib.request
12import sys
13
14support.requires("network")
15
16
17def _retry_thrice(func, exc, *args, **kwargs):
18    for i in range(3):
19        try:
20            return func(*args, **kwargs)
21        except exc as e:
22            last_exc = e
23            continue
24    raise last_exc
25
26def _wrap_with_retry_thrice(func, exc):
27    def wrapped(*args, **kwargs):
28        return _retry_thrice(func, exc, *args, **kwargs)
29    return wrapped
30
31# bpo-35411: FTP tests of test_urllib2net randomly fail
32# with "425 Security: Bad IP connecting" on Travis CI
33skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ,
34                                          'bpo-35411: skip FTP test '
35                                          'on Travis CI')
36
37
38# Connecting to remote hosts is flaky.  Make it more robust by retrying
39# the connection several times.
40_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
41                                              urllib.error.URLError)
42
43
44class TransientResource(object):
45
46    """Raise ResourceDenied if an exception is raised while the context manager
47    is in effect that matches the specified exception and attributes."""
48
49    def __init__(self, exc, **kwargs):
50        self.exc = exc
51        self.attrs = kwargs
52
53    def __enter__(self):
54        return self
55
56    def __exit__(self, type_=None, value=None, traceback=None):
57        """If type_ is a subclass of self.exc and value has attributes matching
58        self.attrs, raise ResourceDenied.  Otherwise let the exception
59        propagate (if any)."""
60        if type_ is not None and issubclass(self.exc, type_):
61            for attr, attr_value in self.attrs.items():
62                if not hasattr(value, attr):
63                    break
64                if getattr(value, attr) != attr_value:
65                    break
66            else:
67                raise ResourceDenied("an optional resource is not available")
68
69# Context managers that raise ResourceDenied when various issues
70# with the internet connection manifest themselves as exceptions.
71# XXX deprecate these and use transient_internet() instead
72time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
73socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
74ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
75
76
77class AuthTests(unittest.TestCase):
78    """Tests urllib2 authentication features."""
79
80## Disabled at the moment since there is no page under python.org which
81## could be used to HTTP authentication.
82#
83#    def test_basic_auth(self):
84#        import http.client
85#
86#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
87#        test_hostport = "www.python.org"
88#        test_realm = 'Test Realm'
89#        test_user = 'test.test_urllib2net'
90#        test_password = 'blah'
91#
92#        # failure
93#        try:
94#            _urlopen_with_retry(test_url)
95#        except urllib2.HTTPError, exc:
96#            self.assertEqual(exc.code, 401)
97#        else:
98#            self.fail("urlopen() should have failed with 401")
99#
100#        # success
101#        auth_handler = urllib2.HTTPBasicAuthHandler()
102#        auth_handler.add_password(test_realm, test_hostport,
103#                                  test_user, test_password)
104#        opener = urllib2.build_opener(auth_handler)
105#        f = opener.open('http://localhost/')
106#        response = _urlopen_with_retry("http://www.python.org/")
107#
108#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
109#        # reasons, let's not implement it!  (it's already implemented for proxy
110#        # specification strings (that is, URLs or authorities specifying a
111#        # proxy), so we must keep that)
112#        self.assertRaises(http.client.InvalidURL,
113#                          urllib2.urlopen, "http://evil:thing@example.com")
114
115
116class CloseSocketTest(unittest.TestCase):
117
118    def test_close(self):
119        # clear _opener global variable
120        self.addCleanup(urllib.request.urlcleanup)
121
122        # calling .close() on urllib2's response objects should close the
123        # underlying socket
124        url = support.TEST_HTTP_URL
125        with socket_helper.transient_internet(url):
126            response = _urlopen_with_retry(url)
127            sock = response.fp
128            self.assertFalse(sock.closed)
129            response.close()
130            self.assertTrue(sock.closed)
131
132class OtherNetworkTests(unittest.TestCase):
133    def setUp(self):
134        if 0:  # for debugging
135            import logging
136            logger = logging.getLogger("test_urllib2net")
137            logger.addHandler(logging.StreamHandler())
138
139    # XXX The rest of these tests aren't very good -- they don't check much.
140    # They do sometimes catch some major disasters, though.
141
142    @skip_ftp_test_on_travis
143    def test_ftp(self):
144        urls = [
145            'ftp://www.pythontest.net/README',
146            ('ftp://www.pythontest.net/non-existent-file',
147             None, urllib.error.URLError),
148            ]
149        self._test_urls(urls, self._extra_handlers())
150
151    def test_file(self):
152        TESTFN = os_helper.TESTFN
153        f = open(TESTFN, 'w')
154        try:
155            f.write('hi there\n')
156            f.close()
157            urls = [
158                'file:' + sanepathname2url(os.path.abspath(TESTFN)),
159                ('file:///nonsensename/etc/passwd', None,
160                 urllib.error.URLError),
161                ]
162            self._test_urls(urls, self._extra_handlers(), retry=True)
163        finally:
164            os.remove(TESTFN)
165
166        self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
167
168    # XXX Following test depends on machine configurations that are internal
169    # to CNRI.  Need to set up a public server with the right authentication
170    # configuration for test purposes.
171
172##     def test_cnri(self):
173##         if socket.gethostname() == 'bitdiddle':
174##             localhost = 'bitdiddle.cnri.reston.va.us'
175##         elif socket.gethostname() == 'bitdiddle.concentric.net':
176##             localhost = 'localhost'
177##         else:
178##             localhost = None
179##         if localhost is not None:
180##             urls = [
181##                 'file://%s/etc/passwd' % localhost,
182##                 'http://%s/simple/' % localhost,
183##                 'http://%s/digest/' % localhost,
184##                 'http://%s/not/found.h' % localhost,
185##                 ]
186
187##             bauth = HTTPBasicAuthHandler()
188##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
189##                                'password')
190##             dauth = HTTPDigestAuthHandler()
191##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
192##                                'password')
193
194##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
195
196    def test_urlwithfrag(self):
197        urlwith_frag = "http://www.pythontest.net/index.html#frag"
198        with socket_helper.transient_internet(urlwith_frag):
199            req = urllib.request.Request(urlwith_frag)
200            res = urllib.request.urlopen(req)
201            self.assertEqual(res.geturl(),
202                    "http://www.pythontest.net/index.html#frag")
203
204    def test_redirect_url_withfrag(self):
205        redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
206        with socket_helper.transient_internet(redirect_url_with_frag):
207            req = urllib.request.Request(redirect_url_with_frag)
208            res = urllib.request.urlopen(req)
209            self.assertEqual(res.geturl(),
210                    "http://www.pythontest.net/elsewhere/#frag")
211
212    def test_custom_headers(self):
213        url = support.TEST_HTTP_URL
214        with socket_helper.transient_internet(url):
215            opener = urllib.request.build_opener()
216            request = urllib.request.Request(url)
217            self.assertFalse(request.header_items())
218            opener.open(request)
219            self.assertTrue(request.header_items())
220            self.assertTrue(request.has_header('User-agent'))
221            request.add_header('User-Agent','Test-Agent')
222            opener.open(request)
223            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
224
225    @unittest.skip('XXX: http://www.imdb.com is gone')
226    def test_sites_no_connection_close(self):
227        # Some sites do not send Connection: close header.
228        # Verify that those work properly. (#issue12576)
229
230        URL = 'http://www.imdb.com' # mangles Connection:close
231
232        with socket_helper.transient_internet(URL):
233            try:
234                with urllib.request.urlopen(URL) as res:
235                    pass
236            except ValueError:
237                self.fail("urlopen failed for site not sending \
238                           Connection:close")
239            else:
240                self.assertTrue(res)
241
242            req = urllib.request.urlopen(URL)
243            res = req.read()
244            self.assertTrue(res)
245
246    def _test_urls(self, urls, handlers, retry=True):
247        import time
248        import logging
249        debug = logging.getLogger("test_urllib2").debug
250
251        urlopen = urllib.request.build_opener(*handlers).open
252        if retry:
253            urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
254
255        for url in urls:
256            with self.subTest(url=url):
257                if isinstance(url, tuple):
258                    url, req, expected_err = url
259                else:
260                    req = expected_err = None
261
262                with socket_helper.transient_internet(url):
263                    try:
264                        f = urlopen(url, req, support.INTERNET_TIMEOUT)
265                    # urllib.error.URLError is a subclass of OSError
266                    except OSError as err:
267                        if expected_err:
268                            msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
269                                   (expected_err, url, req, type(err), err))
270                            self.assertIsInstance(err, expected_err, msg)
271                        else:
272                            raise
273                    else:
274                        try:
275                            with time_out, \
276                                 socket_peer_reset, \
277                                 ioerror_peer_reset:
278                                buf = f.read()
279                                debug("read %d bytes" % len(buf))
280                        except TimeoutError:
281                            print("<timeout: %s>" % url, file=sys.stderr)
282                        f.close()
283                time.sleep(0.1)
284
285    def _extra_handlers(self):
286        handlers = []
287
288        cfh = urllib.request.CacheFTPHandler()
289        self.addCleanup(cfh.clear_cache)
290        cfh.setTimeout(1)
291        handlers.append(cfh)
292
293        return handlers
294
295
296class TimeoutTest(unittest.TestCase):
297    def setUp(self):
298        # clear _opener global variable
299        self.addCleanup(urllib.request.urlcleanup)
300
301    def test_http_basic(self):
302        self.assertIsNone(socket.getdefaulttimeout())
303        url = support.TEST_HTTP_URL
304        with socket_helper.transient_internet(url, timeout=None):
305            u = _urlopen_with_retry(url)
306            self.addCleanup(u.close)
307            self.assertIsNone(u.fp.raw._sock.gettimeout())
308
309    def test_http_default_timeout(self):
310        self.assertIsNone(socket.getdefaulttimeout())
311        url = support.TEST_HTTP_URL
312        with socket_helper.transient_internet(url):
313            socket.setdefaulttimeout(60)
314            try:
315                u = _urlopen_with_retry(url)
316                self.addCleanup(u.close)
317            finally:
318                socket.setdefaulttimeout(None)
319            self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
320
321    def test_http_no_timeout(self):
322        self.assertIsNone(socket.getdefaulttimeout())
323        url = support.TEST_HTTP_URL
324        with socket_helper.transient_internet(url):
325            socket.setdefaulttimeout(60)
326            try:
327                u = _urlopen_with_retry(url, timeout=None)
328                self.addCleanup(u.close)
329            finally:
330                socket.setdefaulttimeout(None)
331            self.assertIsNone(u.fp.raw._sock.gettimeout())
332
333    def test_http_timeout(self):
334        url = support.TEST_HTTP_URL
335        with socket_helper.transient_internet(url):
336            u = _urlopen_with_retry(url, timeout=120)
337            self.addCleanup(u.close)
338            self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
339
340    FTP_HOST = 'ftp://www.pythontest.net/'
341
342    @skip_ftp_test_on_travis
343    def test_ftp_basic(self):
344        self.assertIsNone(socket.getdefaulttimeout())
345        with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
346            u = _urlopen_with_retry(self.FTP_HOST)
347            self.addCleanup(u.close)
348            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
349
350    @skip_ftp_test_on_travis
351    def test_ftp_default_timeout(self):
352        self.assertIsNone(socket.getdefaulttimeout())
353        with socket_helper.transient_internet(self.FTP_HOST):
354            socket.setdefaulttimeout(60)
355            try:
356                u = _urlopen_with_retry(self.FTP_HOST)
357                self.addCleanup(u.close)
358            finally:
359                socket.setdefaulttimeout(None)
360            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
361
362    @skip_ftp_test_on_travis
363    def test_ftp_no_timeout(self):
364        self.assertIsNone(socket.getdefaulttimeout())
365        with socket_helper.transient_internet(self.FTP_HOST):
366            socket.setdefaulttimeout(60)
367            try:
368                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
369                self.addCleanup(u.close)
370            finally:
371                socket.setdefaulttimeout(None)
372            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
373
374    @skip_ftp_test_on_travis
375    def test_ftp_timeout(self):
376        with socket_helper.transient_internet(self.FTP_HOST):
377            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
378            self.addCleanup(u.close)
379            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
380
381
382if __name__ == "__main__":
383    unittest.main()
384