1"""Regression tests for urllib""" 2 3import collections 4import urllib 5import httplib 6import io 7import unittest 8import os 9import sys 10import mimetools 11import tempfile 12 13from test import test_support 14from base64 import b64encode 15 16 17def hexescape(char): 18 """Escape char as RFC 2396 specifies""" 19 hex_repr = hex(ord(char))[2:].upper() 20 if len(hex_repr) == 1: 21 hex_repr = "0%s" % hex_repr 22 return "%" + hex_repr 23 24 25def fakehttp(fakedata): 26 class FakeSocket(io.BytesIO): 27 28 def sendall(self, data): 29 FakeHTTPConnection.buf = data 30 31 def makefile(self, *args, **kwds): 32 return self 33 34 def read(self, amt=None): 35 if self.closed: 36 return b"" 37 return io.BytesIO.read(self, amt) 38 39 def readline(self, length=None): 40 if self.closed: 41 return b"" 42 return io.BytesIO.readline(self, length) 43 44 class FakeHTTPConnection(httplib.HTTPConnection): 45 46 # buffer to store data for verification in urlopen tests. 47 buf = "" 48 49 def connect(self): 50 self.sock = FakeSocket(self.fakedata) 51 self.__class__.fakesock = self.sock 52 FakeHTTPConnection.fakedata = fakedata 53 54 return FakeHTTPConnection 55 56 57class FakeHTTPMixin(object): 58 def fakehttp(self, fakedata): 59 assert httplib.HTTP._connection_class == httplib.HTTPConnection 60 61 httplib.HTTP._connection_class = fakehttp(fakedata) 62 63 def unfakehttp(self): 64 httplib.HTTP._connection_class = httplib.HTTPConnection 65 66 67class urlopen_FileTests(unittest.TestCase): 68 """Test urlopen() opening a temporary file. 69 70 Try to test as much functionality as possible so as to cut down on reliance 71 on connecting to the Net for testing. 72 73 """ 74 75 def setUp(self): 76 """Setup of a temp file to use for testing""" 77 self.text = "test_urllib: %s\n" % self.__class__.__name__ 78 FILE = file(test_support.TESTFN, 'wb') 79 try: 80 FILE.write(self.text) 81 finally: 82 FILE.close() 83 self.pathname = test_support.TESTFN 84 self.returned_obj = urllib.urlopen("file:%s" % self.pathname) 85 86 def tearDown(self): 87 """Shut down the open object""" 88 self.returned_obj.close() 89 os.remove(test_support.TESTFN) 90 91 def test_interface(self): 92 # Make sure object returned by urlopen() has the specified methods 93 for attr in ("read", "readline", "readlines", "fileno", 94 "close", "info", "geturl", "getcode", "__iter__"): 95 self.assertTrue(hasattr(self.returned_obj, attr), 96 "object returned by urlopen() lacks %s attribute" % 97 attr) 98 99 def test_read(self): 100 self.assertEqual(self.text, self.returned_obj.read()) 101 102 def test_readline(self): 103 self.assertEqual(self.text, self.returned_obj.readline()) 104 self.assertEqual('', self.returned_obj.readline(), 105 "calling readline() after exhausting the file did not" 106 " return an empty string") 107 108 def test_readlines(self): 109 lines_list = self.returned_obj.readlines() 110 self.assertEqual(len(lines_list), 1, 111 "readlines() returned the wrong number of lines") 112 self.assertEqual(lines_list[0], self.text, 113 "readlines() returned improper text") 114 115 def test_fileno(self): 116 file_num = self.returned_obj.fileno() 117 self.assertIsInstance(file_num, int, "fileno() did not return an int") 118 self.assertEqual(os.read(file_num, len(self.text)), self.text, 119 "Reading on the file descriptor returned by fileno() " 120 "did not return the expected text") 121 122 def test_close(self): 123 # Test close() by calling it hear and then having it be called again 124 # by the tearDown() method for the test 125 self.returned_obj.close() 126 127 def test_info(self): 128 self.assertIsInstance(self.returned_obj.info(), mimetools.Message) 129 130 def test_geturl(self): 131 self.assertEqual(self.returned_obj.geturl(), self.pathname) 132 133 def test_getcode(self): 134 self.assertEqual(self.returned_obj.getcode(), None) 135 136 def test_iter(self): 137 # Test iterator 138 # Don't need to count number of iterations since test would fail the 139 # instant it returned anything beyond the first line from the 140 # comparison 141 for line in self.returned_obj.__iter__(): 142 self.assertEqual(line, self.text) 143 144 def test_relativelocalfile(self): 145 self.assertRaises(ValueError,urllib.urlopen,'./' + self.pathname) 146 147class ProxyTests(unittest.TestCase): 148 149 def setUp(self): 150 # Records changes to env vars 151 self.env = test_support.EnvironmentVarGuard() 152 # Delete all proxy related env vars 153 for k in os.environ.keys(): 154 if 'proxy' in k.lower(): 155 self.env.unset(k) 156 157 def tearDown(self): 158 # Restore all proxy related env vars 159 self.env.__exit__() 160 del self.env 161 162 def test_getproxies_environment_keep_no_proxies(self): 163 self.env.set('NO_PROXY', 'localhost') 164 proxies = urllib.getproxies_environment() 165 # getproxies_environment use lowered case truncated (no '_proxy') keys 166 self.assertEqual('localhost', proxies['no']) 167 # List of no_proxies with space. 168 self.env.set('NO_PROXY', 'localhost, anotherdomain.com, newdomain.com:1234') 169 self.assertTrue(urllib.proxy_bypass_environment('anotherdomain.com')) 170 self.assertTrue(urllib.proxy_bypass_environment('anotherdomain.com:8888')) 171 self.assertTrue(urllib.proxy_bypass_environment('newdomain.com:1234')) 172 173 def test_proxy_cgi_ignore(self): 174 try: 175 self.env.set('HTTP_PROXY', 'http://somewhere:3128') 176 proxies = urllib.getproxies_environment() 177 self.assertEqual('http://somewhere:3128', proxies['http']) 178 self.env.set('REQUEST_METHOD', 'GET') 179 proxies = urllib.getproxies_environment() 180 self.assertNotIn('http', proxies) 181 finally: 182 self.env.unset('REQUEST_METHOD') 183 self.env.unset('HTTP_PROXY') 184 185 def test_proxy_bypass_environment_host_match(self): 186 bypass = urllib.proxy_bypass_environment 187 self.env.set('NO_PROXY', 188 'localhost, anotherdomain.com, newdomain.com:1234, .d.o.t') 189 self.assertTrue(bypass('localhost')) 190 self.assertTrue(bypass('LocalHost')) # MixedCase 191 self.assertTrue(bypass('LOCALHOST')) # UPPERCASE 192 self.assertTrue(bypass('newdomain.com:1234')) 193 self.assertTrue(bypass('foo.d.o.t')) # issue 29142 194 self.assertTrue(bypass('anotherdomain.com:8888')) 195 self.assertTrue(bypass('www.newdomain.com:1234')) 196 self.assertFalse(bypass('prelocalhost')) 197 self.assertFalse(bypass('newdomain.com')) # no port 198 self.assertFalse(bypass('newdomain.com:1235')) # wrong port 199 200class ProxyTests_withOrderedEnv(unittest.TestCase): 201 202 def setUp(self): 203 # We need to test conditions, where variable order _is_ significant 204 self._saved_env = os.environ 205 # Monkey patch os.environ, start with empty fake environment 206 os.environ = collections.OrderedDict() 207 208 def tearDown(self): 209 os.environ = self._saved_env 210 211 def test_getproxies_environment_prefer_lowercase(self): 212 # Test lowercase preference with removal 213 os.environ['no_proxy'] = '' 214 os.environ['No_Proxy'] = 'localhost' 215 self.assertFalse(urllib.proxy_bypass_environment('localhost')) 216 self.assertFalse(urllib.proxy_bypass_environment('arbitrary')) 217 os.environ['http_proxy'] = '' 218 os.environ['HTTP_PROXY'] = 'http://somewhere:3128' 219 proxies = urllib.getproxies_environment() 220 self.assertEqual({}, proxies) 221 # Test lowercase preference of proxy bypass and correct matching including ports 222 os.environ['no_proxy'] = 'localhost, noproxy.com, my.proxy:1234' 223 os.environ['No_Proxy'] = 'xyz.com' 224 self.assertTrue(urllib.proxy_bypass_environment('localhost')) 225 self.assertTrue(urllib.proxy_bypass_environment('noproxy.com:5678')) 226 self.assertTrue(urllib.proxy_bypass_environment('my.proxy:1234')) 227 self.assertFalse(urllib.proxy_bypass_environment('my.proxy')) 228 self.assertFalse(urllib.proxy_bypass_environment('arbitrary')) 229 # Test lowercase preference with replacement 230 os.environ['http_proxy'] = 'http://somewhere:3128' 231 os.environ['Http_Proxy'] = 'http://somewhereelse:3128' 232 proxies = urllib.getproxies_environment() 233 self.assertEqual('http://somewhere:3128', proxies['http']) 234 235 236class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin): 237 """Test urlopen() opening a fake http connection.""" 238 239 def test_read(self): 240 self.fakehttp('Hello!') 241 try: 242 fp = urllib.urlopen("http://python.org/") 243 self.assertEqual(fp.readline(), 'Hello!') 244 self.assertEqual(fp.readline(), '') 245 self.assertEqual(fp.geturl(), 'http://python.org/') 246 self.assertEqual(fp.getcode(), 200) 247 finally: 248 self.unfakehttp() 249 250 def test_url_fragment(self): 251 # Issue #11703: geturl() omits fragments in the original URL. 252 url = 'http://docs.python.org/library/urllib.html#OK' 253 self.fakehttp('Hello!') 254 try: 255 fp = urllib.urlopen(url) 256 self.assertEqual(fp.geturl(), url) 257 finally: 258 self.unfakehttp() 259 260 def test_url_with_control_char_rejected(self): 261 for char_no in range(0, 0x21) + range(0x7f, 0x100): 262 char = chr(char_no) 263 schemeless_url = "//localhost:7777/test%s/" % char 264 self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") 265 try: 266 # urllib quotes the URL so there is no injection. 267 resp = urllib.urlopen("http:" + schemeless_url) 268 self.assertNotIn(char, resp.geturl()) 269 finally: 270 self.unfakehttp() 271 272 def test_url_with_newline_header_injection_rejected(self): 273 self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") 274 host = "localhost:7777?a=1 HTTP/1.1\r\nX-injected: header\r\nTEST: 123" 275 schemeless_url = "//" + host + ":8080/test/?test=a" 276 try: 277 # urllib quotes the URL so there is no injection. 278 resp = urllib.urlopen("http:" + schemeless_url) 279 self.assertNotIn(' ', resp.geturl()) 280 self.assertNotIn('\r', resp.geturl()) 281 self.assertNotIn('\n', resp.geturl()) 282 finally: 283 self.unfakehttp() 284 285 def test_read_bogus(self): 286 # urlopen() should raise IOError for many error codes. 287 self.fakehttp('''HTTP/1.1 401 Authentication Required 288Date: Wed, 02 Jan 2008 03:03:54 GMT 289Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e 290Connection: close 291Content-Type: text/html; charset=iso-8859-1 292''') 293 try: 294 self.assertRaises(IOError, urllib.urlopen, "http://python.org/") 295 finally: 296 self.unfakehttp() 297 298 def test_invalid_redirect(self): 299 # urlopen() should raise IOError for many error codes. 300 self.fakehttp("""HTTP/1.1 302 Found 301Date: Wed, 02 Jan 2008 03:03:54 GMT 302Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e 303Location: file:README 304Connection: close 305Content-Type: text/html; charset=iso-8859-1 306""") 307 try: 308 msg = "Redirection to url 'file:" 309 with self.assertRaisesRegexp(IOError, msg): 310 urllib.urlopen("http://python.org/") 311 finally: 312 self.unfakehttp() 313 314 def test_redirect_limit_independent(self): 315 # Ticket #12923: make sure independent requests each use their 316 # own retry limit. 317 for i in range(urllib.FancyURLopener().maxtries): 318 self.fakehttp(b'''HTTP/1.1 302 Found 319Location: file://guidocomputer.athome.com:/python/license 320Connection: close 321''') 322 try: 323 self.assertRaises(IOError, urllib.urlopen, 324 "http://something") 325 finally: 326 self.unfakehttp() 327 328 def test_empty_socket(self): 329 # urlopen() raises IOError if the underlying socket does not send any 330 # data. (#1680230) 331 self.fakehttp('') 332 try: 333 self.assertRaises(IOError, urllib.urlopen, 'http://something') 334 finally: 335 self.unfakehttp() 336 337 def test_missing_localfile(self): 338 self.assertRaises(IOError, urllib.urlopen, 339 'file://localhost/a/missing/file.py') 340 fd, tmp_file = tempfile.mkstemp() 341 tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/') 342 self.assertTrue(os.path.exists(tmp_file)) 343 try: 344 fp = urllib.urlopen(tmp_fileurl) 345 fp.close() 346 finally: 347 os.close(fd) 348 os.unlink(tmp_file) 349 350 self.assertFalse(os.path.exists(tmp_file)) 351 self.assertRaises(IOError, urllib.urlopen, tmp_fileurl) 352 353 def test_ftp_nonexisting(self): 354 self.assertRaises(IOError, urllib.urlopen, 355 'ftp://localhost/not/existing/file.py') 356 357 358 def test_userpass_inurl(self): 359 self.fakehttp('Hello!') 360 try: 361 fakehttp_wrapper = httplib.HTTP._connection_class 362 fp = urllib.urlopen("http://user:pass@python.org/") 363 authorization = ("Authorization: Basic %s\r\n" % 364 b64encode('user:pass')) 365 # The authorization header must be in place 366 self.assertIn(authorization, fakehttp_wrapper.buf) 367 self.assertEqual(fp.readline(), "Hello!") 368 self.assertEqual(fp.readline(), "") 369 self.assertEqual(fp.geturl(), 'http://user:pass@python.org/') 370 self.assertEqual(fp.getcode(), 200) 371 finally: 372 self.unfakehttp() 373 374 def test_userpass_with_spaces_inurl(self): 375 self.fakehttp('Hello!') 376 try: 377 url = "http://a b:c d@python.org/" 378 fakehttp_wrapper = httplib.HTTP._connection_class 379 authorization = ("Authorization: Basic %s\r\n" % 380 b64encode('a b:c d')) 381 fp = urllib.urlopen(url) 382 # The authorization header must be in place 383 self.assertIn(authorization, fakehttp_wrapper.buf) 384 self.assertEqual(fp.readline(), "Hello!") 385 self.assertEqual(fp.readline(), "") 386 # the spaces are quoted in URL so no match 387 self.assertNotEqual(fp.geturl(), url) 388 self.assertEqual(fp.getcode(), 200) 389 finally: 390 self.unfakehttp() 391 392 393class urlretrieve_FileTests(unittest.TestCase): 394 """Test urllib.urlretrieve() on local files""" 395 396 def setUp(self): 397 # Create a list of temporary files. Each item in the list is a file 398 # name (absolute path or relative to the current working directory). 399 # All files in this list will be deleted in the tearDown method. Note, 400 # this only helps to makes sure temporary files get deleted, but it 401 # does nothing about trying to close files that may still be open. It 402 # is the responsibility of the developer to properly close files even 403 # when exceptional conditions occur. 404 self.tempFiles = [] 405 406 # Create a temporary file. 407 self.registerFileForCleanUp(test_support.TESTFN) 408 self.text = 'testing urllib.urlretrieve' 409 try: 410 FILE = file(test_support.TESTFN, 'wb') 411 FILE.write(self.text) 412 FILE.close() 413 finally: 414 try: FILE.close() 415 except: pass 416 417 def tearDown(self): 418 # Delete the temporary files. 419 for each in self.tempFiles: 420 try: os.remove(each) 421 except: pass 422 423 def constructLocalFileUrl(self, filePath): 424 return "file://%s" % urllib.pathname2url(os.path.abspath(filePath)) 425 426 def createNewTempFile(self, data=""): 427 """Creates a new temporary file containing the specified data, 428 registers the file for deletion during the test fixture tear down, and 429 returns the absolute path of the file.""" 430 431 newFd, newFilePath = tempfile.mkstemp() 432 try: 433 self.registerFileForCleanUp(newFilePath) 434 newFile = os.fdopen(newFd, "wb") 435 newFile.write(data) 436 newFile.close() 437 finally: 438 try: newFile.close() 439 except: pass 440 return newFilePath 441 442 def registerFileForCleanUp(self, fileName): 443 self.tempFiles.append(fileName) 444 445 def test_basic(self): 446 # Make sure that a local file just gets its own location returned and 447 # a headers value is returned. 448 result = urllib.urlretrieve("file:%s" % test_support.TESTFN) 449 self.assertEqual(result[0], test_support.TESTFN) 450 self.assertIsInstance(result[1], mimetools.Message, 451 "did not get a mimetools.Message instance as " 452 "second returned value") 453 454 def test_copy(self): 455 # Test that setting the filename argument works. 456 second_temp = "%s.2" % test_support.TESTFN 457 self.registerFileForCleanUp(second_temp) 458 result = urllib.urlretrieve(self.constructLocalFileUrl( 459 test_support.TESTFN), second_temp) 460 self.assertEqual(second_temp, result[0]) 461 self.assertTrue(os.path.exists(second_temp), "copy of the file was not " 462 "made") 463 FILE = file(second_temp, 'rb') 464 try: 465 text = FILE.read() 466 FILE.close() 467 finally: 468 try: FILE.close() 469 except: pass 470 self.assertEqual(self.text, text) 471 472 def test_reporthook(self): 473 # Make sure that the reporthook works. 474 def hooktester(count, block_size, total_size, count_holder=[0]): 475 self.assertIsInstance(count, int) 476 self.assertIsInstance(block_size, int) 477 self.assertIsInstance(total_size, int) 478 self.assertEqual(count, count_holder[0]) 479 count_holder[0] = count_holder[0] + 1 480 second_temp = "%s.2" % test_support.TESTFN 481 self.registerFileForCleanUp(second_temp) 482 urllib.urlretrieve(self.constructLocalFileUrl(test_support.TESTFN), 483 second_temp, hooktester) 484 485 def test_reporthook_0_bytes(self): 486 # Test on zero length file. Should call reporthook only 1 time. 487 report = [] 488 def hooktester(count, block_size, total_size, _report=report): 489 _report.append((count, block_size, total_size)) 490 srcFileName = self.createNewTempFile() 491 urllib.urlretrieve(self.constructLocalFileUrl(srcFileName), 492 test_support.TESTFN, hooktester) 493 self.assertEqual(len(report), 1) 494 self.assertEqual(report[0][2], 0) 495 496 def test_reporthook_5_bytes(self): 497 # Test on 5 byte file. Should call reporthook only 2 times (once when 498 # the "network connection" is established and once when the block is 499 # read). Since the block size is 8192 bytes, only one block read is 500 # required to read the entire file. 501 report = [] 502 def hooktester(count, block_size, total_size, _report=report): 503 _report.append((count, block_size, total_size)) 504 srcFileName = self.createNewTempFile("x" * 5) 505 urllib.urlretrieve(self.constructLocalFileUrl(srcFileName), 506 test_support.TESTFN, hooktester) 507 self.assertEqual(len(report), 2) 508 self.assertEqual(report[0][1], 8192) 509 self.assertEqual(report[0][2], 5) 510 511 def test_reporthook_8193_bytes(self): 512 # Test on 8193 byte file. Should call reporthook only 3 times (once 513 # when the "network connection" is established, once for the next 8192 514 # bytes, and once for the last byte). 515 report = [] 516 def hooktester(count, block_size, total_size, _report=report): 517 _report.append((count, block_size, total_size)) 518 srcFileName = self.createNewTempFile("x" * 8193) 519 urllib.urlretrieve(self.constructLocalFileUrl(srcFileName), 520 test_support.TESTFN, hooktester) 521 self.assertEqual(len(report), 3) 522 self.assertEqual(report[0][1], 8192) 523 self.assertEqual(report[0][2], 8193) 524 525 526class urlretrieve_HttpTests(unittest.TestCase, FakeHTTPMixin): 527 """Test urllib.urlretrieve() using fake http connections""" 528 529 def test_short_content_raises_ContentTooShortError(self): 530 self.fakehttp('''HTTP/1.1 200 OK 531Date: Wed, 02 Jan 2008 03:03:54 GMT 532Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e 533Connection: close 534Content-Length: 100 535Content-Type: text/html; charset=iso-8859-1 536 537FF 538''') 539 540 def _reporthook(par1, par2, par3): 541 pass 542 543 try: 544 self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve, 545 'http://example.com', reporthook=_reporthook) 546 finally: 547 self.unfakehttp() 548 549 def test_short_content_raises_ContentTooShortError_without_reporthook(self): 550 self.fakehttp('''HTTP/1.1 200 OK 551Date: Wed, 02 Jan 2008 03:03:54 GMT 552Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e 553Connection: close 554Content-Length: 100 555Content-Type: text/html; charset=iso-8859-1 556 557FF 558''') 559 try: 560 self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve, 'http://example.com/') 561 finally: 562 self.unfakehttp() 563 564class QuotingTests(unittest.TestCase): 565 """Tests for urllib.quote() and urllib.quote_plus() 566 567 According to RFC 2396 ("Uniform Resource Identifiers), to escape a 568 character you write it as '%' + <2 character US-ASCII hex value>. The Python 569 code of ``'%' + hex(ord(<character>))[2:]`` escapes a character properly. 570 Case does not matter on the hex letters. 571 572 The various character sets specified are: 573 574 Reserved characters : ";/?:@&=+$," 575 Have special meaning in URIs and must be escaped if not being used for 576 their special meaning 577 Data characters : letters, digits, and "-_.!~*'()" 578 Unreserved and do not need to be escaped; can be, though, if desired 579 Control characters : 0x00 - 0x1F, 0x7F 580 Have no use in URIs so must be escaped 581 space : 0x20 582 Must be escaped 583 Delimiters : '<>#%"' 584 Must be escaped 585 Unwise : "{}|\^[]`" 586 Must be escaped 587 588 """ 589 590 def test_never_quote(self): 591 # Make sure quote() does not quote letters, digits, and "_,.-" 592 do_not_quote = '' .join(["ABCDEFGHIJKLMNOPQRSTUVWXYZ", 593 "abcdefghijklmnopqrstuvwxyz", 594 "0123456789", 595 "_.-"]) 596 result = urllib.quote(do_not_quote) 597 self.assertEqual(do_not_quote, result, 598 "using quote(): %s != %s" % (do_not_quote, result)) 599 result = urllib.quote_plus(do_not_quote) 600 self.assertEqual(do_not_quote, result, 601 "using quote_plus(): %s != %s" % (do_not_quote, result)) 602 603 def test_default_safe(self): 604 # Test '/' is default value for 'safe' parameter 605 self.assertEqual(urllib.quote.func_defaults[0], '/') 606 607 def test_safe(self): 608 # Test setting 'safe' parameter does what it should do 609 quote_by_default = "<>" 610 result = urllib.quote(quote_by_default, safe=quote_by_default) 611 self.assertEqual(quote_by_default, result, 612 "using quote(): %s != %s" % (quote_by_default, result)) 613 result = urllib.quote_plus(quote_by_default, safe=quote_by_default) 614 self.assertEqual(quote_by_default, result, 615 "using quote_plus(): %s != %s" % 616 (quote_by_default, result)) 617 618 def test_default_quoting(self): 619 # Make sure all characters that should be quoted are by default sans 620 # space (separate test for that). 621 should_quote = [chr(num) for num in range(32)] # For 0x00 - 0x1F 622 should_quote.append('<>#%"{}|\^[]`') 623 should_quote.append(chr(127)) # For 0x7F 624 should_quote = ''.join(should_quote) 625 for char in should_quote: 626 result = urllib.quote(char) 627 self.assertEqual(hexescape(char), result, 628 "using quote(): %s should be escaped to %s, not %s" % 629 (char, hexescape(char), result)) 630 result = urllib.quote_plus(char) 631 self.assertEqual(hexescape(char), result, 632 "using quote_plus(): " 633 "%s should be escapes to %s, not %s" % 634 (char, hexescape(char), result)) 635 del should_quote 636 partial_quote = "ab[]cd" 637 expected = "ab%5B%5Dcd" 638 result = urllib.quote(partial_quote) 639 self.assertEqual(expected, result, 640 "using quote(): %s != %s" % (expected, result)) 641 result = urllib.quote_plus(partial_quote) 642 self.assertEqual(expected, result, 643 "using quote_plus(): %s != %s" % (expected, result)) 644 self.assertRaises(TypeError, urllib.quote, None) 645 646 def test_quoting_space(self): 647 # Make sure quote() and quote_plus() handle spaces as specified in 648 # their unique way 649 result = urllib.quote(' ') 650 self.assertEqual(result, hexescape(' '), 651 "using quote(): %s != %s" % (result, hexescape(' '))) 652 result = urllib.quote_plus(' ') 653 self.assertEqual(result, '+', 654 "using quote_plus(): %s != +" % result) 655 given = "a b cd e f" 656 expect = given.replace(' ', hexescape(' ')) 657 result = urllib.quote(given) 658 self.assertEqual(expect, result, 659 "using quote(): %s != %s" % (expect, result)) 660 expect = given.replace(' ', '+') 661 result = urllib.quote_plus(given) 662 self.assertEqual(expect, result, 663 "using quote_plus(): %s != %s" % (expect, result)) 664 665 def test_quoting_plus(self): 666 self.assertEqual(urllib.quote_plus('alpha+beta gamma'), 667 'alpha%2Bbeta+gamma') 668 self.assertEqual(urllib.quote_plus('alpha+beta gamma', '+'), 669 'alpha+beta+gamma') 670 671class UnquotingTests(unittest.TestCase): 672 """Tests for unquote() and unquote_plus() 673 674 See the doc string for quoting_Tests for details on quoting and such. 675 676 """ 677 678 def test_unquoting(self): 679 # Make sure unquoting of all ASCII values works 680 escape_list = [] 681 for num in range(128): 682 given = hexescape(chr(num)) 683 expect = chr(num) 684 result = urllib.unquote(given) 685 self.assertEqual(expect, result, 686 "using unquote(): %s != %s" % (expect, result)) 687 result = urllib.unquote_plus(given) 688 self.assertEqual(expect, result, 689 "using unquote_plus(): %s != %s" % 690 (expect, result)) 691 escape_list.append(given) 692 escape_string = ''.join(escape_list) 693 del escape_list 694 result = urllib.unquote(escape_string) 695 self.assertEqual(result.count('%'), 1, 696 "using quote(): not all characters escaped; %s" % 697 result) 698 result = urllib.unquote(escape_string) 699 self.assertEqual(result.count('%'), 1, 700 "using unquote(): not all characters escaped: " 701 "%s" % result) 702 703 def test_unquoting_badpercent(self): 704 # Test unquoting on bad percent-escapes 705 given = '%xab' 706 expect = given 707 result = urllib.unquote(given) 708 self.assertEqual(expect, result, "using unquote(): %r != %r" 709 % (expect, result)) 710 given = '%x' 711 expect = given 712 result = urllib.unquote(given) 713 self.assertEqual(expect, result, "using unquote(): %r != %r" 714 % (expect, result)) 715 given = '%' 716 expect = given 717 result = urllib.unquote(given) 718 self.assertEqual(expect, result, "using unquote(): %r != %r" 719 % (expect, result)) 720 721 def test_unquoting_mixed_case(self): 722 # Test unquoting on mixed-case hex digits in the percent-escapes 723 given = '%Ab%eA' 724 expect = '\xab\xea' 725 result = urllib.unquote(given) 726 self.assertEqual(expect, result, "using unquote(): %r != %r" 727 % (expect, result)) 728 729 def test_unquoting_parts(self): 730 # Make sure unquoting works when have non-quoted characters 731 # interspersed 732 given = 'ab%sd' % hexescape('c') 733 expect = "abcd" 734 result = urllib.unquote(given) 735 self.assertEqual(expect, result, 736 "using quote(): %s != %s" % (expect, result)) 737 result = urllib.unquote_plus(given) 738 self.assertEqual(expect, result, 739 "using unquote_plus(): %s != %s" % (expect, result)) 740 741 def test_unquoting_plus(self): 742 # Test difference between unquote() and unquote_plus() 743 given = "are+there+spaces..." 744 expect = given 745 result = urllib.unquote(given) 746 self.assertEqual(expect, result, 747 "using unquote(): %s != %s" % (expect, result)) 748 expect = given.replace('+', ' ') 749 result = urllib.unquote_plus(given) 750 self.assertEqual(expect, result, 751 "using unquote_plus(): %s != %s" % (expect, result)) 752 753 def test_unquote_with_unicode(self): 754 r = urllib.unquote(u'br%C3%BCckner_sapporo_20050930.doc') 755 self.assertEqual(r, u'br\xc3\xbcckner_sapporo_20050930.doc') 756 757class urlencode_Tests(unittest.TestCase): 758 """Tests for urlencode()""" 759 760 def help_inputtype(self, given, test_type): 761 """Helper method for testing different input types. 762 763 'given' must lead to only the pairs: 764 * 1st, 1 765 * 2nd, 2 766 * 3rd, 3 767 768 Test cannot assume anything about order. Docs make no guarantee and 769 have possible dictionary input. 770 771 """ 772 expect_somewhere = ["1st=1", "2nd=2", "3rd=3"] 773 result = urllib.urlencode(given) 774 for expected in expect_somewhere: 775 self.assertIn(expected, result, 776 "testing %s: %s not found in %s" % 777 (test_type, expected, result)) 778 self.assertEqual(result.count('&'), 2, 779 "testing %s: expected 2 '&'s; got %s" % 780 (test_type, result.count('&'))) 781 amp_location = result.index('&') 782 on_amp_left = result[amp_location - 1] 783 on_amp_right = result[amp_location + 1] 784 self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(), 785 "testing %s: '&' not located in proper place in %s" % 786 (test_type, result)) 787 self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps 788 "testing %s: " 789 "unexpected number of characters: %s != %s" % 790 (test_type, len(result), (5 * 3) + 2)) 791 792 def test_using_mapping(self): 793 # Test passing in a mapping object as an argument. 794 self.help_inputtype({"1st":'1', "2nd":'2', "3rd":'3'}, 795 "using dict as input type") 796 797 def test_using_sequence(self): 798 # Test passing in a sequence of two-item sequences as an argument. 799 self.help_inputtype([('1st', '1'), ('2nd', '2'), ('3rd', '3')], 800 "using sequence of two-item tuples as input") 801 802 def test_quoting(self): 803 # Make sure keys and values are quoted using quote_plus() 804 given = {"&":"="} 805 expect = "%s=%s" % (hexescape('&'), hexescape('=')) 806 result = urllib.urlencode(given) 807 self.assertEqual(expect, result) 808 given = {"key name":"A bunch of pluses"} 809 expect = "key+name=A+bunch+of+pluses" 810 result = urllib.urlencode(given) 811 self.assertEqual(expect, result) 812 813 def test_doseq(self): 814 # Test that passing True for 'doseq' parameter works correctly 815 given = {'sequence':['1', '2', '3']} 816 expect = "sequence=%s" % urllib.quote_plus(str(['1', '2', '3'])) 817 result = urllib.urlencode(given) 818 self.assertEqual(expect, result) 819 result = urllib.urlencode(given, True) 820 for value in given["sequence"]: 821 expect = "sequence=%s" % value 822 self.assertIn(expect, result) 823 self.assertEqual(result.count('&'), 2, 824 "Expected 2 '&'s, got %s" % result.count('&')) 825 826class Pathname_Tests(unittest.TestCase): 827 """Test pathname2url() and url2pathname()""" 828 829 def test_basic(self): 830 # Make sure simple tests pass 831 expected_path = os.path.join("parts", "of", "a", "path") 832 expected_url = "parts/of/a/path" 833 result = urllib.pathname2url(expected_path) 834 self.assertEqual(expected_url, result, 835 "pathname2url() failed; %s != %s" % 836 (result, expected_url)) 837 result = urllib.url2pathname(expected_url) 838 self.assertEqual(expected_path, result, 839 "url2pathame() failed; %s != %s" % 840 (result, expected_path)) 841 842 def test_quoting(self): 843 # Test automatic quoting and unquoting works for pathnam2url() and 844 # url2pathname() respectively 845 given = os.path.join("needs", "quot=ing", "here") 846 expect = "needs/%s/here" % urllib.quote("quot=ing") 847 result = urllib.pathname2url(given) 848 self.assertEqual(expect, result, 849 "pathname2url() failed; %s != %s" % 850 (expect, result)) 851 expect = given 852 result = urllib.url2pathname(result) 853 self.assertEqual(expect, result, 854 "url2pathname() failed; %s != %s" % 855 (expect, result)) 856 given = os.path.join("make sure", "using_quote") 857 expect = "%s/using_quote" % urllib.quote("make sure") 858 result = urllib.pathname2url(given) 859 self.assertEqual(expect, result, 860 "pathname2url() failed; %s != %s" % 861 (expect, result)) 862 given = "make+sure/using_unquote" 863 expect = os.path.join("make+sure", "using_unquote") 864 result = urllib.url2pathname(given) 865 self.assertEqual(expect, result, 866 "url2pathname() failed; %s != %s" % 867 (expect, result)) 868 869 @unittest.skipUnless(sys.platform == 'win32', 870 'test specific to the nturl2path library') 871 def test_ntpath(self): 872 given = ('/C:/', '///C:/', '/C|//') 873 expect = 'C:\\' 874 for url in given: 875 result = urllib.url2pathname(url) 876 self.assertEqual(expect, result, 877 'nturl2path.url2pathname() failed; %s != %s' % 878 (expect, result)) 879 given = '///C|/path' 880 expect = 'C:\\path' 881 result = urllib.url2pathname(given) 882 self.assertEqual(expect, result, 883 'nturl2path.url2pathname() failed; %s != %s' % 884 (expect, result)) 885 886class Utility_Tests(unittest.TestCase): 887 """Testcase to test the various utility functions in the urllib.""" 888 # In Python 3 this test class is moved to test_urlparse. 889 890 def test_splittype(self): 891 splittype = urllib.splittype 892 self.assertEqual(splittype('type:opaquestring'), ('type', 'opaquestring')) 893 self.assertEqual(splittype('opaquestring'), (None, 'opaquestring')) 894 self.assertEqual(splittype(':opaquestring'), (None, ':opaquestring')) 895 self.assertEqual(splittype('type:'), ('type', '')) 896 self.assertEqual(splittype('type:opaque:string'), ('type', 'opaque:string')) 897 898 def test_splithost(self): 899 splithost = urllib.splithost 900 self.assertEqual(splithost('//www.example.org:80/foo/bar/baz.html'), 901 ('www.example.org:80', '/foo/bar/baz.html')) 902 self.assertEqual(splithost('//www.example.org:80'), 903 ('www.example.org:80', '')) 904 self.assertEqual(splithost('/foo/bar/baz.html'), 905 (None, '/foo/bar/baz.html')) 906 907 # bpo-30500: # starts a fragment. 908 self.assertEqual(splithost('//127.0.0.1#@host.com'), 909 ('127.0.0.1', '/#@host.com')) 910 self.assertEqual(splithost('//127.0.0.1#@host.com:80'), 911 ('127.0.0.1', '/#@host.com:80')) 912 self.assertEqual(splithost('//127.0.0.1:80#@host.com'), 913 ('127.0.0.1:80', '/#@host.com')) 914 915 # Empty host is returned as empty string. 916 self.assertEqual(splithost("///file"), 917 ('', '/file')) 918 919 # Trailing semicolon, question mark and hash symbol are kept. 920 self.assertEqual(splithost("//example.net/file;"), 921 ('example.net', '/file;')) 922 self.assertEqual(splithost("//example.net/file?"), 923 ('example.net', '/file?')) 924 self.assertEqual(splithost("//example.net/file#"), 925 ('example.net', '/file#')) 926 927 def test_splituser(self): 928 splituser = urllib.splituser 929 self.assertEqual(splituser('User:Pass@www.python.org:080'), 930 ('User:Pass', 'www.python.org:080')) 931 self.assertEqual(splituser('@www.python.org:080'), 932 ('', 'www.python.org:080')) 933 self.assertEqual(splituser('www.python.org:080'), 934 (None, 'www.python.org:080')) 935 self.assertEqual(splituser('User:Pass@'), 936 ('User:Pass', '')) 937 self.assertEqual(splituser('User@example.com:Pass@www.python.org:080'), 938 ('User@example.com:Pass', 'www.python.org:080')) 939 940 def test_splitpasswd(self): 941 # Some of the password examples are not sensible, but it is added to 942 # confirming to RFC2617 and addressing issue4675. 943 splitpasswd = urllib.splitpasswd 944 self.assertEqual(splitpasswd('user:ab'), ('user', 'ab')) 945 self.assertEqual(splitpasswd('user:a\nb'), ('user', 'a\nb')) 946 self.assertEqual(splitpasswd('user:a\tb'), ('user', 'a\tb')) 947 self.assertEqual(splitpasswd('user:a\rb'), ('user', 'a\rb')) 948 self.assertEqual(splitpasswd('user:a\fb'), ('user', 'a\fb')) 949 self.assertEqual(splitpasswd('user:a\vb'), ('user', 'a\vb')) 950 self.assertEqual(splitpasswd('user:a:b'), ('user', 'a:b')) 951 self.assertEqual(splitpasswd('user:a b'), ('user', 'a b')) 952 self.assertEqual(splitpasswd('user 2:ab'), ('user 2', 'ab')) 953 self.assertEqual(splitpasswd('user+1:a+b'), ('user+1', 'a+b')) 954 self.assertEqual(splitpasswd('user:'), ('user', '')) 955 self.assertEqual(splitpasswd('user'), ('user', None)) 956 self.assertEqual(splitpasswd(':ab'), ('', 'ab')) 957 958 def test_splitport(self): 959 splitport = urllib.splitport 960 self.assertEqual(splitport('parrot:88'), ('parrot', '88')) 961 self.assertEqual(splitport('parrot'), ('parrot', None)) 962 self.assertEqual(splitport('parrot:'), ('parrot', None)) 963 self.assertEqual(splitport('127.0.0.1'), ('127.0.0.1', None)) 964 self.assertEqual(splitport('parrot:cheese'), ('parrot:cheese', None)) 965 self.assertEqual(splitport('[::1]:88'), ('[::1]', '88')) 966 self.assertEqual(splitport('[::1]'), ('[::1]', None)) 967 self.assertEqual(splitport(':88'), ('', '88')) 968 969 def test_splitnport(self): 970 splitnport = urllib.splitnport 971 self.assertEqual(splitnport('parrot:88'), ('parrot', 88)) 972 self.assertEqual(splitnport('parrot'), ('parrot', -1)) 973 self.assertEqual(splitnport('parrot', 55), ('parrot', 55)) 974 self.assertEqual(splitnport('parrot:'), ('parrot', -1)) 975 self.assertEqual(splitnport('parrot:', 55), ('parrot', 55)) 976 self.assertEqual(splitnport('127.0.0.1'), ('127.0.0.1', -1)) 977 self.assertEqual(splitnport('127.0.0.1', 55), ('127.0.0.1', 55)) 978 self.assertEqual(splitnport('parrot:cheese'), ('parrot', None)) 979 self.assertEqual(splitnport('parrot:cheese', 55), ('parrot', None)) 980 981 def test_splitquery(self): 982 # Normal cases are exercised by other tests; ensure that we also 983 # catch cases with no port specified (testcase ensuring coverage) 984 splitquery = urllib.splitquery 985 self.assertEqual(splitquery('http://python.org/fake?foo=bar'), 986 ('http://python.org/fake', 'foo=bar')) 987 self.assertEqual(splitquery('http://python.org/fake?foo=bar?'), 988 ('http://python.org/fake?foo=bar', '')) 989 self.assertEqual(splitquery('http://python.org/fake'), 990 ('http://python.org/fake', None)) 991 self.assertEqual(splitquery('?foo=bar'), ('', 'foo=bar')) 992 993 def test_splittag(self): 994 splittag = urllib.splittag 995 self.assertEqual(splittag('http://example.com?foo=bar#baz'), 996 ('http://example.com?foo=bar', 'baz')) 997 self.assertEqual(splittag('http://example.com?foo=bar#'), 998 ('http://example.com?foo=bar', '')) 999 self.assertEqual(splittag('#baz'), ('', 'baz')) 1000 self.assertEqual(splittag('http://example.com?foo=bar'), 1001 ('http://example.com?foo=bar', None)) 1002 self.assertEqual(splittag('http://example.com?foo=bar#baz#boo'), 1003 ('http://example.com?foo=bar#baz', 'boo')) 1004 1005 def test_splitattr(self): 1006 splitattr = urllib.splitattr 1007 self.assertEqual(splitattr('/path;attr1=value1;attr2=value2'), 1008 ('/path', ['attr1=value1', 'attr2=value2'])) 1009 self.assertEqual(splitattr('/path;'), ('/path', [''])) 1010 self.assertEqual(splitattr(';attr1=value1;attr2=value2'), 1011 ('', ['attr1=value1', 'attr2=value2'])) 1012 self.assertEqual(splitattr('/path'), ('/path', [])) 1013 1014 def test_splitvalue(self): 1015 # Normal cases are exercised by other tests; test pathological cases 1016 # with no key/value pairs. (testcase ensuring coverage) 1017 splitvalue = urllib.splitvalue 1018 self.assertEqual(splitvalue('foo=bar'), ('foo', 'bar')) 1019 self.assertEqual(splitvalue('foo='), ('foo', '')) 1020 self.assertEqual(splitvalue('=bar'), ('', 'bar')) 1021 self.assertEqual(splitvalue('foobar'), ('foobar', None)) 1022 self.assertEqual(splitvalue('foo=bar=baz'), ('foo', 'bar=baz')) 1023 1024 def test_toBytes(self): 1025 result = urllib.toBytes(u'http://www.python.org') 1026 self.assertEqual(result, 'http://www.python.org') 1027 self.assertRaises(UnicodeError, urllib.toBytes, 1028 test_support.u(r'http://www.python.org/medi\u00e6val')) 1029 1030 def test_unwrap(self): 1031 url = urllib.unwrap('<URL:type://host/path>') 1032 self.assertEqual(url, 'type://host/path') 1033 1034 1035class URLopener_Tests(unittest.TestCase): 1036 """Testcase to test the open method of URLopener class.""" 1037 1038 def test_quoted_open(self): 1039 class DummyURLopener(urllib.URLopener): 1040 def open_spam(self, url): 1041 return url 1042 1043 self.assertEqual(DummyURLopener().open( 1044 'spam://example/ /'),'//example/%20/') 1045 1046 # test the safe characters are not quoted by urlopen 1047 self.assertEqual(DummyURLopener().open( 1048 "spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"), 1049 "//c:|windows%/:=&?~#+!$,;'@()*[]|/path/") 1050 1051 def test_local_file_open(self): 1052 # bpo-35907, CVE-2019-9948: urllib must reject local_file:// scheme 1053 class DummyURLopener(urllib.URLopener): 1054 def open_local_file(self, url): 1055 return url 1056 for url in ('local_file://example', 'local-file://example'): 1057 self.assertRaises(IOError, urllib.urlopen, url) 1058 self.assertRaises(IOError, urllib.URLopener().open, url) 1059 self.assertRaises(IOError, urllib.URLopener().retrieve, url) 1060 self.assertRaises(IOError, DummyURLopener().open, url) 1061 self.assertRaises(IOError, DummyURLopener().retrieve, url) 1062 1063# Just commented them out. 1064# Can't really tell why keep failing in windows and sparc. 1065# Everywhere else they work ok, but on those machines, sometimes 1066# fail in one of the tests, sometimes in other. I have a linux, and 1067# the tests go ok. 1068# If anybody has one of the problematic environments, please help! 1069# . Facundo 1070# 1071# def server(evt): 1072# import socket, time 1073# serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1074# serv.settimeout(3) 1075# serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1076# serv.bind(("", 9093)) 1077# serv.listen(5) 1078# try: 1079# conn, addr = serv.accept() 1080# conn.send("1 Hola mundo\n") 1081# cantdata = 0 1082# while cantdata < 13: 1083# data = conn.recv(13-cantdata) 1084# cantdata += len(data) 1085# time.sleep(.3) 1086# conn.send("2 No more lines\n") 1087# conn.close() 1088# except socket.timeout: 1089# pass 1090# finally: 1091# serv.close() 1092# evt.set() 1093# 1094# class FTPWrapperTests(unittest.TestCase): 1095# 1096# def setUp(self): 1097# import ftplib, time, threading 1098# ftplib.FTP.port = 9093 1099# self.evt = threading.Event() 1100# threading.Thread(target=server, args=(self.evt,)).start() 1101# time.sleep(.1) 1102# 1103# def tearDown(self): 1104# self.evt.wait() 1105# 1106# def testBasic(self): 1107# # connects 1108# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, []) 1109# ftp.close() 1110# 1111# def testTimeoutNone(self): 1112# # global default timeout is ignored 1113# import socket 1114# self.assertIsNone(socket.getdefaulttimeout()) 1115# socket.setdefaulttimeout(30) 1116# try: 1117# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, []) 1118# finally: 1119# socket.setdefaulttimeout(None) 1120# self.assertEqual(ftp.ftp.sock.gettimeout(), 30) 1121# ftp.close() 1122# 1123# def testTimeoutDefault(self): 1124# # global default timeout is used 1125# import socket 1126# self.assertIsNone(socket.getdefaulttimeout()) 1127# socket.setdefaulttimeout(30) 1128# try: 1129# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, []) 1130# finally: 1131# socket.setdefaulttimeout(None) 1132# self.assertEqual(ftp.ftp.sock.gettimeout(), 30) 1133# ftp.close() 1134# 1135# def testTimeoutValue(self): 1136# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [], 1137# timeout=30) 1138# self.assertEqual(ftp.ftp.sock.gettimeout(), 30) 1139# ftp.close() 1140 1141 1142 1143def test_main(): 1144 import warnings 1145 with warnings.catch_warnings(): 1146 warnings.filterwarnings('ignore', ".*urllib\.urlopen.*Python 3.0", 1147 DeprecationWarning) 1148 test_support.run_unittest( 1149 urlopen_FileTests, 1150 urlopen_HttpTests, 1151 urlretrieve_FileTests, 1152 urlretrieve_HttpTests, 1153 ProxyTests, 1154 QuotingTests, 1155 UnquotingTests, 1156 urlencode_Tests, 1157 Pathname_Tests, 1158 Utility_Tests, 1159 URLopener_Tests, 1160 ProxyTests, 1161 ProxyTests_withOrderedEnv, 1162 #FTPWrapperTests, 1163 ) 1164 1165 1166 1167if __name__ == '__main__': 1168 test_main() 1169