1import unittest 2from test import test_support 3from test.test_urllib2 import sanepathname2url 4 5import socket 6import urllib2 7import os 8import sys 9 10TIMEOUT = 60 # seconds 11 12 13def _retry_thrice(func, exc, *args, **kwargs): 14 for i in range(3): 15 try: 16 return func(*args, **kwargs) 17 except exc, last_exc: 18 continue 19 except: 20 raise 21 raise last_exc 22 23def _wrap_with_retry_thrice(func, exc): 24 def wrapped(*args, **kwargs): 25 return _retry_thrice(func, exc, *args, **kwargs) 26 return wrapped 27 28# bpo-35411: FTP tests of test_urllib2net randomly fail 29# with "425 Security: Bad IP connecting" on Travis CI 30skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ, 31 'bpo-35411: skip FTP test ' 32 'on Travis CI') 33 34 35# Connecting to remote hosts is flaky. Make it more robust by retrying 36# the connection several times. 37_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError) 38 39 40class AuthTests(unittest.TestCase): 41 """Tests urllib2 authentication features.""" 42 43## Disabled at the moment since there is no page under python.org which 44## could be used to HTTP authentication. 45# 46# def test_basic_auth(self): 47# import httplib 48# 49# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 50# test_hostport = "www.python.org" 51# test_realm = 'Test Realm' 52# test_user = 'test.test_urllib2net' 53# test_password = 'blah' 54# 55# # failure 56# try: 57# _urlopen_with_retry(test_url) 58# except urllib2.HTTPError, exc: 59# self.assertEqual(exc.code, 401) 60# else: 61# self.fail("urlopen() should have failed with 401") 62# 63# # success 64# auth_handler = urllib2.HTTPBasicAuthHandler() 65# auth_handler.add_password(test_realm, test_hostport, 66# test_user, test_password) 67# opener = urllib2.build_opener(auth_handler) 68# f = opener.open('http://localhost/') 69# response = _urlopen_with_retry("http://www.python.org/") 70# 71# # The 'userinfo' URL component is deprecated by RFC 3986 for security 72# # reasons, let's not implement it! (it's already implemented for proxy 73# # specification strings (that is, URLs or authorities specifying a 74# # proxy), so we must keep that) 75# self.assertRaises(httplib.InvalidURL, 76# urllib2.urlopen, "http://evil:thing@example.com") 77 78 79class CloseSocketTest(unittest.TestCase): 80 81 def test_close(self): 82 import httplib 83 84 # calling .close() on urllib2's response objects should close the 85 # underlying socket 86 87 # delve deep into response to fetch socket._socketobject 88 response = _urlopen_with_retry(test_support.TEST_HTTP_URL) 89 abused_fileobject = response.fp 90 self.assertIs(abused_fileobject.__class__, socket._fileobject) 91 httpresponse = abused_fileobject._sock 92 self.assertIs(httpresponse.__class__, httplib.HTTPResponse) 93 fileobject = httpresponse.fp 94 self.assertIs(fileobject.__class__, socket._fileobject) 95 96 self.assertTrue(not fileobject.closed) 97 response.close() 98 self.assertTrue(fileobject.closed) 99 100class OtherNetworkTests(unittest.TestCase): 101 def setUp(self): 102 if 0: # for debugging 103 import logging 104 logger = logging.getLogger("test_urllib2net") 105 logger.addHandler(logging.StreamHandler()) 106 107 # XXX The rest of these tests aren't very good -- they don't check much. 108 # They do sometimes catch some major disasters, though. 109 110 @skip_ftp_test_on_travis 111 def test_ftp(self): 112 urls = [ 113 'ftp://www.pythontest.net/README', 114 ('ftp://www.pythontest.net/non-existent-file', 115 None, urllib2.URLError), 116 ] 117 self._test_urls(urls, self._extra_handlers()) 118 119 def test_file(self): 120 TESTFN = test_support.TESTFN 121 f = open(TESTFN, 'w') 122 try: 123 f.write('hi there\n') 124 f.close() 125 urls = [ 126 'file:'+sanepathname2url(os.path.abspath(TESTFN)), 127 ('file:///nonsensename/etc/passwd', None, urllib2.URLError), 128 ] 129 self._test_urls(urls, self._extra_handlers(), retry=True) 130 finally: 131 os.remove(TESTFN) 132 133 self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file') 134 135 # XXX Following test depends on machine configurations that are internal 136 # to CNRI. Need to set up a public server with the right authentication 137 # configuration for test purposes. 138 139## def test_cnri(self): 140## if socket.gethostname() == 'bitdiddle': 141## localhost = 'bitdiddle.cnri.reston.va.us' 142## elif socket.gethostname() == 'bitdiddle.concentric.net': 143## localhost = 'localhost' 144## else: 145## localhost = None 146## if localhost is not None: 147## urls = [ 148## 'file://%s/etc/passwd' % localhost, 149## 'http://%s/simple/' % localhost, 150## 'http://%s/digest/' % localhost, 151## 'http://%s/not/found.h' % localhost, 152## ] 153 154## bauth = HTTPBasicAuthHandler() 155## bauth.add_password('basic_test_realm', localhost, 'jhylton', 156## 'password') 157## dauth = HTTPDigestAuthHandler() 158## dauth.add_password('digest_test_realm', localhost, 'jhylton', 159## 'password') 160 161## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 162 163 def test_urlwithfrag(self): 164 urlwith_frag = "http://www.pythontest.net/index.html#frag" 165 with test_support.transient_internet(urlwith_frag): 166 req = urllib2.Request(urlwith_frag) 167 res = urllib2.urlopen(req) 168 self.assertEqual(res.geturl(), 169 "http://www.pythontest.net/index.html#frag") 170 171 def test_fileno(self): 172 req = urllib2.Request(test_support.TEST_HTTP_URL) 173 opener = urllib2.build_opener() 174 res = opener.open(req) 175 try: 176 res.fileno() 177 except AttributeError: 178 self.fail("HTTPResponse object should return a valid fileno") 179 finally: 180 res.close() 181 182 def test_custom_headers(self): 183 url = test_support.TEST_HTTP_URL 184 with test_support.transient_internet(url): 185 opener = urllib2.build_opener() 186 request = urllib2.Request(url) 187 self.assertFalse(request.header_items()) 188 opener.open(request) 189 self.assertTrue(request.header_items()) 190 self.assertTrue(request.has_header('User-agent')) 191 request.add_header('User-Agent','Test-Agent') 192 opener.open(request) 193 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 194 195 @unittest.skip('XXX: http://www.imdb.com is gone') 196 def test_sites_no_connection_close(self): 197 # Some sites do not send Connection: close header. 198 # Verify that those work properly. (#issue12576) 199 200 URL = 'http://www.imdb.com' # No Connection:close 201 with test_support.transient_internet(URL): 202 req = urllib2.urlopen(URL) 203 res = req.read() 204 self.assertTrue(res) 205 206 def _test_urls(self, urls, handlers, retry=True): 207 import time 208 import logging 209 debug = logging.getLogger("test_urllib2").debug 210 211 urlopen = urllib2.build_opener(*handlers).open 212 if retry: 213 urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError) 214 215 for url in urls: 216 if isinstance(url, tuple): 217 url, req, expected_err = url 218 else: 219 req = expected_err = None 220 with test_support.transient_internet(url): 221 debug(url) 222 try: 223 f = urlopen(url, req, TIMEOUT) 224 except EnvironmentError as err: 225 debug(err) 226 if expected_err: 227 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 228 (expected_err, url, req, type(err), err)) 229 self.assertIsInstance(err, expected_err, msg) 230 except urllib2.URLError as err: 231 if isinstance(err[0], socket.timeout): 232 print >>sys.stderr, "<timeout: %s>" % url 233 continue 234 else: 235 raise 236 else: 237 try: 238 with test_support.transient_internet(url): 239 buf = f.read() 240 debug("read %d bytes" % len(buf)) 241 except socket.timeout: 242 print >>sys.stderr, "<timeout: %s>" % url 243 f.close() 244 debug("******** next url coming up...") 245 time.sleep(0.1) 246 247 def _extra_handlers(self): 248 handlers = [] 249 250 cfh = urllib2.CacheFTPHandler() 251 self.addCleanup(cfh.clear_cache) 252 cfh.setTimeout(1) 253 handlers.append(cfh) 254 255 return handlers 256 257 258class TimeoutTest(unittest.TestCase): 259 def test_http_basic(self): 260 self.assertIsNone(socket.getdefaulttimeout()) 261 url = test_support.TEST_HTTP_URL 262 with test_support.transient_internet(url, timeout=None): 263 u = _urlopen_with_retry(url) 264 self.assertIsNone(u.fp._sock.fp._sock.gettimeout()) 265 266 def test_http_default_timeout(self): 267 self.assertIsNone(socket.getdefaulttimeout()) 268 url = test_support.TEST_HTTP_URL 269 with test_support.transient_internet(url): 270 socket.setdefaulttimeout(60) 271 try: 272 u = _urlopen_with_retry(url) 273 finally: 274 socket.setdefaulttimeout(None) 275 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60) 276 277 def test_http_no_timeout(self): 278 self.assertIsNone(socket.getdefaulttimeout()) 279 url = test_support.TEST_HTTP_URL 280 with test_support.transient_internet(url): 281 socket.setdefaulttimeout(60) 282 try: 283 u = _urlopen_with_retry(url, timeout=None) 284 finally: 285 socket.setdefaulttimeout(None) 286 self.assertIsNone(u.fp._sock.fp._sock.gettimeout()) 287 288 def test_http_timeout(self): 289 url = test_support.TEST_HTTP_URL 290 with test_support.transient_internet(url): 291 u = _urlopen_with_retry(url, timeout=120) 292 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120) 293 294 FTP_HOST = 'ftp://www.pythontest.net/' 295 296 @skip_ftp_test_on_travis 297 def test_ftp_basic(self): 298 self.assertIsNone(socket.getdefaulttimeout()) 299 with test_support.transient_internet(self.FTP_HOST, timeout=None): 300 u = _urlopen_with_retry(self.FTP_HOST) 301 self.assertIsNone(u.fp.fp._sock.gettimeout()) 302 303 @skip_ftp_test_on_travis 304 def test_ftp_default_timeout(self): 305 self.assertIsNone(socket.getdefaulttimeout()) 306 with test_support.transient_internet(self.FTP_HOST): 307 socket.setdefaulttimeout(60) 308 try: 309 u = _urlopen_with_retry(self.FTP_HOST) 310 finally: 311 socket.setdefaulttimeout(None) 312 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 313 314 @skip_ftp_test_on_travis 315 def test_ftp_no_timeout(self): 316 self.assertIsNone(socket.getdefaulttimeout(),) 317 with test_support.transient_internet(self.FTP_HOST): 318 socket.setdefaulttimeout(60) 319 try: 320 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 321 finally: 322 socket.setdefaulttimeout(None) 323 self.assertIsNone(u.fp.fp._sock.gettimeout()) 324 325 @skip_ftp_test_on_travis 326 def test_ftp_timeout(self): 327 with test_support.transient_internet(self.FTP_HOST): 328 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 329 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 330 331 332def test_main(): 333 test_support.requires("network") 334 test_support.run_unittest(AuthTests, 335 OtherNetworkTests, 336 CloseSocketTest, 337 TimeoutTest, 338 ) 339 340if __name__ == "__main__": 341 test_main() 342