1import errno 2import unittest 3from test import support 4from test.support import os_helper 5from test.support import socket_helper 6from test.test_urllib2 import sanepathname2url 7 8import os 9import socket 10import urllib.error 11import urllib.request 12import sys 13 14support.requires("network") 15 16 17def _retry_thrice(func, exc, *args, **kwargs): 18 for i in range(3): 19 try: 20 return func(*args, **kwargs) 21 except exc as e: 22 last_exc = e 23 continue 24 raise last_exc 25 26def _wrap_with_retry_thrice(func, exc): 27 def wrapped(*args, **kwargs): 28 return _retry_thrice(func, exc, *args, **kwargs) 29 return wrapped 30 31# bpo-35411: FTP tests of test_urllib2net randomly fail 32# with "425 Security: Bad IP connecting" on Travis CI 33skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ, 34 'bpo-35411: skip FTP test ' 35 'on Travis CI') 36 37 38# Connecting to remote hosts is flaky. Make it more robust by retrying 39# the connection several times. 40_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen, 41 urllib.error.URLError) 42 43 44class TransientResource(object): 45 46 """Raise ResourceDenied if an exception is raised while the context manager 47 is in effect that matches the specified exception and attributes.""" 48 49 def __init__(self, exc, **kwargs): 50 self.exc = exc 51 self.attrs = kwargs 52 53 def __enter__(self): 54 return self 55 56 def __exit__(self, type_=None, value=None, traceback=None): 57 """If type_ is a subclass of self.exc and value has attributes matching 58 self.attrs, raise ResourceDenied. Otherwise let the exception 59 propagate (if any).""" 60 if type_ is not None and issubclass(self.exc, type_): 61 for attr, attr_value in self.attrs.items(): 62 if not hasattr(value, attr): 63 break 64 if getattr(value, attr) != attr_value: 65 break 66 else: 67 raise ResourceDenied("an optional resource is not available") 68 69# Context managers that raise ResourceDenied when various issues 70# with the internet connection manifest themselves as exceptions. 71# XXX deprecate these and use transient_internet() instead 72time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) 73socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 74ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 75 76 77class AuthTests(unittest.TestCase): 78 """Tests urllib2 authentication features.""" 79 80## Disabled at the moment since there is no page under python.org which 81## could be used to HTTP authentication. 82# 83# def test_basic_auth(self): 84# import http.client 85# 86# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 87# test_hostport = "www.python.org" 88# test_realm = 'Test Realm' 89# test_user = 'test.test_urllib2net' 90# test_password = 'blah' 91# 92# # failure 93# try: 94# _urlopen_with_retry(test_url) 95# except urllib2.HTTPError, exc: 96# self.assertEqual(exc.code, 401) 97# else: 98# self.fail("urlopen() should have failed with 401") 99# 100# # success 101# auth_handler = urllib2.HTTPBasicAuthHandler() 102# auth_handler.add_password(test_realm, test_hostport, 103# test_user, test_password) 104# opener = urllib2.build_opener(auth_handler) 105# f = opener.open('http://localhost/') 106# response = _urlopen_with_retry("http://www.python.org/") 107# 108# # The 'userinfo' URL component is deprecated by RFC 3986 for security 109# # reasons, let's not implement it! (it's already implemented for proxy 110# # specification strings (that is, URLs or authorities specifying a 111# # proxy), so we must keep that) 112# self.assertRaises(http.client.InvalidURL, 113# urllib2.urlopen, "http://evil:thing@example.com") 114 115 116class CloseSocketTest(unittest.TestCase): 117 118 def test_close(self): 119 # clear _opener global variable 120 self.addCleanup(urllib.request.urlcleanup) 121 122 # calling .close() on urllib2's response objects should close the 123 # underlying socket 124 url = support.TEST_HTTP_URL 125 with socket_helper.transient_internet(url): 126 response = _urlopen_with_retry(url) 127 sock = response.fp 128 self.assertFalse(sock.closed) 129 response.close() 130 self.assertTrue(sock.closed) 131 132class OtherNetworkTests(unittest.TestCase): 133 def setUp(self): 134 if 0: # for debugging 135 import logging 136 logger = logging.getLogger("test_urllib2net") 137 logger.addHandler(logging.StreamHandler()) 138 139 # XXX The rest of these tests aren't very good -- they don't check much. 140 # They do sometimes catch some major disasters, though. 141 142 @skip_ftp_test_on_travis 143 def test_ftp(self): 144 urls = [ 145 'ftp://www.pythontest.net/README', 146 ('ftp://www.pythontest.net/non-existent-file', 147 None, urllib.error.URLError), 148 ] 149 self._test_urls(urls, self._extra_handlers()) 150 151 def test_file(self): 152 TESTFN = os_helper.TESTFN 153 f = open(TESTFN, 'w') 154 try: 155 f.write('hi there\n') 156 f.close() 157 urls = [ 158 'file:' + sanepathname2url(os.path.abspath(TESTFN)), 159 ('file:///nonsensename/etc/passwd', None, 160 urllib.error.URLError), 161 ] 162 self._test_urls(urls, self._extra_handlers(), retry=True) 163 finally: 164 os.remove(TESTFN) 165 166 self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file') 167 168 # XXX Following test depends on machine configurations that are internal 169 # to CNRI. Need to set up a public server with the right authentication 170 # configuration for test purposes. 171 172## def test_cnri(self): 173## if socket.gethostname() == 'bitdiddle': 174## localhost = 'bitdiddle.cnri.reston.va.us' 175## elif socket.gethostname() == 'bitdiddle.concentric.net': 176## localhost = 'localhost' 177## else: 178## localhost = None 179## if localhost is not None: 180## urls = [ 181## 'file://%s/etc/passwd' % localhost, 182## 'http://%s/simple/' % localhost, 183## 'http://%s/digest/' % localhost, 184## 'http://%s/not/found.h' % localhost, 185## ] 186 187## bauth = HTTPBasicAuthHandler() 188## bauth.add_password('basic_test_realm', localhost, 'jhylton', 189## 'password') 190## dauth = HTTPDigestAuthHandler() 191## dauth.add_password('digest_test_realm', localhost, 'jhylton', 192## 'password') 193 194## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 195 196 def test_urlwithfrag(self): 197 urlwith_frag = "http://www.pythontest.net/index.html#frag" 198 with socket_helper.transient_internet(urlwith_frag): 199 req = urllib.request.Request(urlwith_frag) 200 res = urllib.request.urlopen(req) 201 self.assertEqual(res.geturl(), 202 "http://www.pythontest.net/index.html#frag") 203 204 def test_redirect_url_withfrag(self): 205 redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/" 206 with socket_helper.transient_internet(redirect_url_with_frag): 207 req = urllib.request.Request(redirect_url_with_frag) 208 res = urllib.request.urlopen(req) 209 self.assertEqual(res.geturl(), 210 "http://www.pythontest.net/elsewhere/#frag") 211 212 def test_custom_headers(self): 213 url = support.TEST_HTTP_URL 214 with socket_helper.transient_internet(url): 215 opener = urllib.request.build_opener() 216 request = urllib.request.Request(url) 217 self.assertFalse(request.header_items()) 218 opener.open(request) 219 self.assertTrue(request.header_items()) 220 self.assertTrue(request.has_header('User-agent')) 221 request.add_header('User-Agent','Test-Agent') 222 opener.open(request) 223 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 224 225 @unittest.skip('XXX: http://www.imdb.com is gone') 226 def test_sites_no_connection_close(self): 227 # Some sites do not send Connection: close header. 228 # Verify that those work properly. (#issue12576) 229 230 URL = 'http://www.imdb.com' # mangles Connection:close 231 232 with socket_helper.transient_internet(URL): 233 try: 234 with urllib.request.urlopen(URL) as res: 235 pass 236 except ValueError: 237 self.fail("urlopen failed for site not sending \ 238 Connection:close") 239 else: 240 self.assertTrue(res) 241 242 req = urllib.request.urlopen(URL) 243 res = req.read() 244 self.assertTrue(res) 245 246 def _test_urls(self, urls, handlers, retry=True): 247 import time 248 import logging 249 debug = logging.getLogger("test_urllib2").debug 250 251 urlopen = urllib.request.build_opener(*handlers).open 252 if retry: 253 urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError) 254 255 for url in urls: 256 with self.subTest(url=url): 257 if isinstance(url, tuple): 258 url, req, expected_err = url 259 else: 260 req = expected_err = None 261 262 with socket_helper.transient_internet(url): 263 try: 264 f = urlopen(url, req, support.INTERNET_TIMEOUT) 265 # urllib.error.URLError is a subclass of OSError 266 except OSError as err: 267 if expected_err: 268 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 269 (expected_err, url, req, type(err), err)) 270 self.assertIsInstance(err, expected_err, msg) 271 else: 272 raise 273 else: 274 try: 275 with time_out, \ 276 socket_peer_reset, \ 277 ioerror_peer_reset: 278 buf = f.read() 279 debug("read %d bytes" % len(buf)) 280 except TimeoutError: 281 print("<timeout: %s>" % url, file=sys.stderr) 282 f.close() 283 time.sleep(0.1) 284 285 def _extra_handlers(self): 286 handlers = [] 287 288 cfh = urllib.request.CacheFTPHandler() 289 self.addCleanup(cfh.clear_cache) 290 cfh.setTimeout(1) 291 handlers.append(cfh) 292 293 return handlers 294 295 296class TimeoutTest(unittest.TestCase): 297 def setUp(self): 298 # clear _opener global variable 299 self.addCleanup(urllib.request.urlcleanup) 300 301 def test_http_basic(self): 302 self.assertIsNone(socket.getdefaulttimeout()) 303 url = support.TEST_HTTP_URL 304 with socket_helper.transient_internet(url, timeout=None): 305 u = _urlopen_with_retry(url) 306 self.addCleanup(u.close) 307 self.assertIsNone(u.fp.raw._sock.gettimeout()) 308 309 def test_http_default_timeout(self): 310 self.assertIsNone(socket.getdefaulttimeout()) 311 url = support.TEST_HTTP_URL 312 with socket_helper.transient_internet(url): 313 socket.setdefaulttimeout(60) 314 try: 315 u = _urlopen_with_retry(url) 316 self.addCleanup(u.close) 317 finally: 318 socket.setdefaulttimeout(None) 319 self.assertEqual(u.fp.raw._sock.gettimeout(), 60) 320 321 def test_http_no_timeout(self): 322 self.assertIsNone(socket.getdefaulttimeout()) 323 url = support.TEST_HTTP_URL 324 with socket_helper.transient_internet(url): 325 socket.setdefaulttimeout(60) 326 try: 327 u = _urlopen_with_retry(url, timeout=None) 328 self.addCleanup(u.close) 329 finally: 330 socket.setdefaulttimeout(None) 331 self.assertIsNone(u.fp.raw._sock.gettimeout()) 332 333 def test_http_timeout(self): 334 url = support.TEST_HTTP_URL 335 with socket_helper.transient_internet(url): 336 u = _urlopen_with_retry(url, timeout=120) 337 self.addCleanup(u.close) 338 self.assertEqual(u.fp.raw._sock.gettimeout(), 120) 339 340 FTP_HOST = 'ftp://www.pythontest.net/' 341 342 @skip_ftp_test_on_travis 343 def test_ftp_basic(self): 344 self.assertIsNone(socket.getdefaulttimeout()) 345 with socket_helper.transient_internet(self.FTP_HOST, timeout=None): 346 u = _urlopen_with_retry(self.FTP_HOST) 347 self.addCleanup(u.close) 348 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 349 350 @skip_ftp_test_on_travis 351 def test_ftp_default_timeout(self): 352 self.assertIsNone(socket.getdefaulttimeout()) 353 with socket_helper.transient_internet(self.FTP_HOST): 354 socket.setdefaulttimeout(60) 355 try: 356 u = _urlopen_with_retry(self.FTP_HOST) 357 self.addCleanup(u.close) 358 finally: 359 socket.setdefaulttimeout(None) 360 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 361 362 @skip_ftp_test_on_travis 363 def test_ftp_no_timeout(self): 364 self.assertIsNone(socket.getdefaulttimeout()) 365 with socket_helper.transient_internet(self.FTP_HOST): 366 socket.setdefaulttimeout(60) 367 try: 368 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 369 self.addCleanup(u.close) 370 finally: 371 socket.setdefaulttimeout(None) 372 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 373 374 @skip_ftp_test_on_travis 375 def test_ftp_timeout(self): 376 with socket_helper.transient_internet(self.FTP_HOST): 377 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 378 self.addCleanup(u.close) 379 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 380 381 382if __name__ == "__main__": 383 unittest.main() 384