1import unittest
2from test import support
3
4import contextlib
5import socket
6import urllib.parse
7import urllib.request
8import os
9import email.message
10import time
11
12
13support.requires('network')
14
15
16class URLTimeoutTest(unittest.TestCase):
17    # XXX this test doesn't seem to test anything useful.
18
19    TIMEOUT = 30.0
20
21    def setUp(self):
22        socket.setdefaulttimeout(self.TIMEOUT)
23
24    def tearDown(self):
25        socket.setdefaulttimeout(None)
26
27    def testURLread(self):
28        domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
29        with support.transient_internet(domain):
30            f = urllib.request.urlopen(support.TEST_HTTP_URL)
31            f.read()
32
33
34class urlopenNetworkTests(unittest.TestCase):
35    """Tests urllib.request.urlopen using the network.
36
37    These tests are not exhaustive.  Assuming that testing using files does a
38    good job overall of some of the basic interface features.  There are no
39    tests exercising the optional 'data' and 'proxies' arguments.  No tests
40    for transparent redirection have been written.
41
42    setUp is not used for always constructing a connection to
43    http://www.pythontest.net/ since there a few tests that don't use that address
44    and making a connection is expensive enough to warrant minimizing unneeded
45    connections.
46
47    """
48
49    url = 'http://www.pythontest.net/'
50
51    @contextlib.contextmanager
52    def urlopen(self, *args, **kwargs):
53        resource = args[0]
54        with support.transient_internet(resource):
55            r = urllib.request.urlopen(*args, **kwargs)
56            try:
57                yield r
58            finally:
59                r.close()
60
61    def test_basic(self):
62        # Simple test expected to pass.
63        with self.urlopen(self.url) as open_url:
64            for attr in ("read", "readline", "readlines", "fileno", "close",
65                         "info", "geturl"):
66                self.assertTrue(hasattr(open_url, attr), "object returned from "
67                                "urlopen lacks the %s attribute" % attr)
68            self.assertTrue(open_url.read(), "calling 'read' failed")
69
70    def test_readlines(self):
71        # Test both readline and readlines.
72        with self.urlopen(self.url) as open_url:
73            self.assertIsInstance(open_url.readline(), bytes,
74                                  "readline did not return a string")
75            self.assertIsInstance(open_url.readlines(), list,
76                                  "readlines did not return a list")
77
78    def test_info(self):
79        # Test 'info'.
80        with self.urlopen(self.url) as open_url:
81            info_obj = open_url.info()
82            self.assertIsInstance(info_obj, email.message.Message,
83                                  "object returned by 'info' is not an "
84                                  "instance of email.message.Message")
85            self.assertEqual(info_obj.get_content_subtype(), "html")
86
87    def test_geturl(self):
88        # Make sure same URL as opened is returned by geturl.
89        with self.urlopen(self.url) as open_url:
90            gotten_url = open_url.geturl()
91            self.assertEqual(gotten_url, self.url)
92
93    def test_getcode(self):
94        # test getcode() with the fancy opener to get 404 error codes
95        URL = self.url + "XXXinvalidXXX"
96        with support.transient_internet(URL):
97            with self.assertWarns(DeprecationWarning):
98                open_url = urllib.request.FancyURLopener().open(URL)
99            try:
100                code = open_url.getcode()
101            finally:
102                open_url.close()
103            self.assertEqual(code, 404)
104
105    def test_bad_address(self):
106        # Make sure proper exception is raised when connecting to a bogus
107        # address.
108
109        # Given that both VeriSign and various ISPs have in
110        # the past or are presently hijacking various invalid
111        # domain name requests in an attempt to boost traffic
112        # to their own sites, finding a domain name to use
113        # for this test is difficult.  RFC2606 leads one to
114        # believe that '.invalid' should work, but experience
115        # seemed to indicate otherwise.  Single character
116        # TLDs are likely to remain invalid, so this seems to
117        # be the best choice. The trailing '.' prevents a
118        # related problem: The normal DNS resolver appends
119        # the domain names from the search path if there is
120        # no '.' the end and, and if one of those domains
121        # implements a '*' rule a result is returned.
122        # However, none of this will prevent the test from
123        # failing if the ISP hijacks all invalid domain
124        # requests.  The real solution would be to be able to
125        # parameterize the framework with a mock resolver.
126        bogus_domain = "sadflkjsasf.i.nvali.d."
127        try:
128            socket.gethostbyname(bogus_domain)
129        except OSError:
130            # socket.gaierror is too narrow, since getaddrinfo() may also
131            # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
132            # i.e. Python's TimeoutError.
133            pass
134        else:
135            # This happens with some overzealous DNS providers such as OpenDNS
136            self.skipTest("%r should not resolve for test to work" % bogus_domain)
137        failure_explanation = ('opening an invalid URL did not raise OSError; '
138                               'can be caused by a broken DNS server '
139                               '(e.g. returns 404 or hijacks page)')
140        with self.assertRaises(OSError, msg=failure_explanation):
141            urllib.request.urlopen("http://{}/".format(bogus_domain))
142
143
144class urlretrieveNetworkTests(unittest.TestCase):
145    """Tests urllib.request.urlretrieve using the network."""
146
147    @contextlib.contextmanager
148    def urlretrieve(self, *args, **kwargs):
149        resource = args[0]
150        with support.transient_internet(resource):
151            file_location, info = urllib.request.urlretrieve(*args, **kwargs)
152            try:
153                yield file_location, info
154            finally:
155                support.unlink(file_location)
156
157    def test_basic(self):
158        # Test basic functionality.
159        with self.urlretrieve(self.logo) as (file_location, info):
160            self.assertTrue(os.path.exists(file_location), "file location returned by"
161                            " urlretrieve is not a valid path")
162            with open(file_location, 'rb') as f:
163                self.assertTrue(f.read(), "reading from the file location returned"
164                                " by urlretrieve failed")
165
166    def test_specified_path(self):
167        # Make sure that specifying the location of the file to write to works.
168        with self.urlretrieve(self.logo,
169                              support.TESTFN) as (file_location, info):
170            self.assertEqual(file_location, support.TESTFN)
171            self.assertTrue(os.path.exists(file_location))
172            with open(file_location, 'rb') as f:
173                self.assertTrue(f.read(), "reading from temporary file failed")
174
175    def test_header(self):
176        # Make sure header returned as 2nd value from urlretrieve is good.
177        with self.urlretrieve(self.logo) as (file_location, info):
178            self.assertIsInstance(info, email.message.Message,
179                                  "info is not an instance of email.message.Message")
180
181    logo = "http://www.pythontest.net/"
182
183    def test_data_header(self):
184        with self.urlretrieve(self.logo) as (file_location, fileheaders):
185            datevalue = fileheaders.get('Date')
186            dateformat = '%a, %d %b %Y %H:%M:%S GMT'
187            try:
188                time.strptime(datevalue, dateformat)
189            except ValueError:
190                self.fail('Date value not in %r format' % dateformat)
191
192    def test_reporthook(self):
193        records = []
194
195        def recording_reporthook(blocks, block_size, total_size):
196            records.append((blocks, block_size, total_size))
197
198        with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
199                file_location, fileheaders):
200            expected_size = int(fileheaders['Content-Length'])
201
202        records_repr = repr(records)  # For use in error messages.
203        self.assertGreater(len(records), 1, msg="There should always be two "
204                           "calls; the first one before the transfer starts.")
205        self.assertEqual(records[0][0], 0)
206        self.assertGreater(records[0][1], 0,
207                           msg="block size can't be 0 in %s" % records_repr)
208        self.assertEqual(records[0][2], expected_size)
209        self.assertEqual(records[-1][2], expected_size)
210
211        block_sizes = {block_size for _, block_size, _ in records}
212        self.assertEqual({records[0][1]}, block_sizes,
213                         msg="block sizes in %s must be equal" % records_repr)
214        self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
215                                msg="number of blocks * block size must be"
216                                " >= total size in %s" % records_repr)
217
218
219if __name__ == "__main__":
220    unittest.main()
221