1import unittest
2from test import support
3from test.support import os_helper
4from test.support import socket_helper
5
6import contextlib
7import socket
8import urllib.parse
9import urllib.request
10import os
11import email.message
12import time
13
14
15support.requires('network')
16
17
18class URLTimeoutTest(unittest.TestCase):
19    # XXX this test doesn't seem to test anything useful.
20
21    def setUp(self):
22        socket.setdefaulttimeout(support.INTERNET_TIMEOUT)
23
24    def tearDown(self):
25        socket.setdefaulttimeout(None)
26
27    def testURLread(self):
28        # clear _opener global variable
29        self.addCleanup(urllib.request.urlcleanup)
30
31        domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
32        with socket_helper.transient_internet(domain):
33            f = urllib.request.urlopen(support.TEST_HTTP_URL)
34            f.read()
35
36
37class urlopenNetworkTests(unittest.TestCase):
38    """Tests urllib.request.urlopen using the network.
39
40    These tests are not exhaustive.  Assuming that testing using files does a
41    good job overall of some of the basic interface features.  There are no
42    tests exercising the optional 'data' and 'proxies' arguments.  No tests
43    for transparent redirection have been written.
44
45    setUp is not used for always constructing a connection to
46    http://www.pythontest.net/ since there a few tests that don't use that address
47    and making a connection is expensive enough to warrant minimizing unneeded
48    connections.
49
50    """
51
52    url = 'http://www.pythontest.net/'
53
54    def setUp(self):
55        # clear _opener global variable
56        self.addCleanup(urllib.request.urlcleanup)
57
58    @contextlib.contextmanager
59    def urlopen(self, *args, **kwargs):
60        resource = args[0]
61        with socket_helper.transient_internet(resource):
62            r = urllib.request.urlopen(*args, **kwargs)
63            try:
64                yield r
65            finally:
66                r.close()
67
68    def test_basic(self):
69        # Simple test expected to pass.
70        with self.urlopen(self.url) as open_url:
71            for attr in ("read", "readline", "readlines", "fileno", "close",
72                         "info", "geturl"):
73                self.assertTrue(hasattr(open_url, attr), "object returned from "
74                                "urlopen lacks the %s attribute" % attr)
75            self.assertTrue(open_url.read(), "calling 'read' failed")
76
77    def test_readlines(self):
78        # Test both readline and readlines.
79        with self.urlopen(self.url) as open_url:
80            self.assertIsInstance(open_url.readline(), bytes,
81                                  "readline did not return a string")
82            self.assertIsInstance(open_url.readlines(), list,
83                                  "readlines did not return a list")
84
85    def test_info(self):
86        # Test 'info'.
87        with self.urlopen(self.url) as open_url:
88            info_obj = open_url.info()
89            self.assertIsInstance(info_obj, email.message.Message,
90                                  "object returned by 'info' is not an "
91                                  "instance of email.message.Message")
92            self.assertEqual(info_obj.get_content_subtype(), "html")
93
94    def test_geturl(self):
95        # Make sure same URL as opened is returned by geturl.
96        with self.urlopen(self.url) as open_url:
97            gotten_url = open_url.geturl()
98            self.assertEqual(gotten_url, self.url)
99
100    def test_getcode(self):
101        # test getcode() with the fancy opener to get 404 error codes
102        URL = self.url + "XXXinvalidXXX"
103        with socket_helper.transient_internet(URL):
104            with self.assertWarns(DeprecationWarning):
105                open_url = urllib.request.FancyURLopener().open(URL)
106            try:
107                code = open_url.getcode()
108            finally:
109                open_url.close()
110            self.assertEqual(code, 404)
111
112    def test_bad_address(self):
113        # Make sure proper exception is raised when connecting to a bogus
114        # address.
115
116        # Given that both VeriSign and various ISPs have in
117        # the past or are presently hijacking various invalid
118        # domain name requests in an attempt to boost traffic
119        # to their own sites, finding a domain name to use
120        # for this test is difficult.  RFC2606 leads one to
121        # believe that '.invalid' should work, but experience
122        # seemed to indicate otherwise.  Single character
123        # TLDs are likely to remain invalid, so this seems to
124        # be the best choice. The trailing '.' prevents a
125        # related problem: The normal DNS resolver appends
126        # the domain names from the search path if there is
127        # no '.' the end and, and if one of those domains
128        # implements a '*' rule a result is returned.
129        # However, none of this will prevent the test from
130        # failing if the ISP hijacks all invalid domain
131        # requests.  The real solution would be to be able to
132        # parameterize the framework with a mock resolver.
133        bogus_domain = "sadflkjsasf.i.nvali.d."
134        try:
135            socket.gethostbyname(bogus_domain)
136        except OSError:
137            # socket.gaierror is too narrow, since getaddrinfo() may also
138            # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
139            # i.e. Python's TimeoutError.
140            pass
141        else:
142            # This happens with some overzealous DNS providers such as OpenDNS
143            self.skipTest("%r should not resolve for test to work" % bogus_domain)
144        failure_explanation = ('opening an invalid URL did not raise OSError; '
145                               'can be caused by a broken DNS server '
146                               '(e.g. returns 404 or hijacks page)')
147        with self.assertRaises(OSError, msg=failure_explanation):
148            urllib.request.urlopen("http://{}/".format(bogus_domain))
149
150
151class urlretrieveNetworkTests(unittest.TestCase):
152    """Tests urllib.request.urlretrieve using the network."""
153
154    def setUp(self):
155        # remove temporary files created by urlretrieve()
156        self.addCleanup(urllib.request.urlcleanup)
157
158    @contextlib.contextmanager
159    def urlretrieve(self, *args, **kwargs):
160        resource = args[0]
161        with socket_helper.transient_internet(resource):
162            file_location, info = urllib.request.urlretrieve(*args, **kwargs)
163            try:
164                yield file_location, info
165            finally:
166                os_helper.unlink(file_location)
167
168    def test_basic(self):
169        # Test basic functionality.
170        with self.urlretrieve(self.logo) as (file_location, info):
171            self.assertTrue(os.path.exists(file_location), "file location returned by"
172                            " urlretrieve is not a valid path")
173            with open(file_location, 'rb') as f:
174                self.assertTrue(f.read(), "reading from the file location returned"
175                                " by urlretrieve failed")
176
177    def test_specified_path(self):
178        # Make sure that specifying the location of the file to write to works.
179        with self.urlretrieve(self.logo,
180                              os_helper.TESTFN) as (file_location, info):
181            self.assertEqual(file_location, os_helper.TESTFN)
182            self.assertTrue(os.path.exists(file_location))
183            with open(file_location, 'rb') as f:
184                self.assertTrue(f.read(), "reading from temporary file failed")
185
186    def test_header(self):
187        # Make sure header returned as 2nd value from urlretrieve is good.
188        with self.urlretrieve(self.logo) as (file_location, info):
189            self.assertIsInstance(info, email.message.Message,
190                                  "info is not an instance of email.message.Message")
191
192    logo = "http://www.pythontest.net/"
193
194    def test_data_header(self):
195        with self.urlretrieve(self.logo) as (file_location, fileheaders):
196            datevalue = fileheaders.get('Date')
197            dateformat = '%a, %d %b %Y %H:%M:%S GMT'
198            try:
199                time.strptime(datevalue, dateformat)
200            except ValueError:
201                self.fail('Date value not in %r format' % dateformat)
202
203    def test_reporthook(self):
204        records = []
205
206        def recording_reporthook(blocks, block_size, total_size):
207            records.append((blocks, block_size, total_size))
208
209        with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
210                file_location, fileheaders):
211            expected_size = int(fileheaders['Content-Length'])
212
213        records_repr = repr(records)  # For use in error messages.
214        self.assertGreater(len(records), 1, msg="There should always be two "
215                           "calls; the first one before the transfer starts.")
216        self.assertEqual(records[0][0], 0)
217        self.assertGreater(records[0][1], 0,
218                           msg="block size can't be 0 in %s" % records_repr)
219        self.assertEqual(records[0][2], expected_size)
220        self.assertEqual(records[-1][2], expected_size)
221
222        block_sizes = {block_size for _, block_size, _ in records}
223        self.assertEqual({records[0][1]}, block_sizes,
224                         msg="block sizes in %s must be equal" % records_repr)
225        self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
226                                msg="number of blocks * block size must be"
227                                " >= total size in %s" % records_repr)
228
229
230if __name__ == "__main__":
231    unittest.main()
232