1"""Tests for distutils.command.upload."""
2import os
3import unittest
4import unittest.mock as mock
5from urllib.request import HTTPError
6
7from test.support import run_unittest
8
9from distutils.command import upload as upload_mod
10from distutils.command.upload import upload
11from distutils.core import Distribution
12from distutils.errors import DistutilsError
13from distutils.log import ERROR, INFO
14
15from distutils.tests.test_config import PYPIRC, BasePyPIRCCommandTestCase
16
17PYPIRC_LONG_PASSWORD = """\
18[distutils]
19
20index-servers =
21    server1
22    server2
23
24[server1]
25username:me
26password:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
27
28[server2]
29username:meagain
30password: secret
31realm:acme
32repository:http://another.pypi/
33"""
34
35
36PYPIRC_NOPASSWORD = """\
37[distutils]
38
39index-servers =
40    server1
41
42[server1]
43username:me
44"""
45
46class FakeOpen(object):
47
48    def __init__(self, url, msg=None, code=None):
49        self.url = url
50        if not isinstance(url, str):
51            self.req = url
52        else:
53            self.req = None
54        self.msg = msg or 'OK'
55        self.code = code or 200
56
57    def getheader(self, name, default=None):
58        return {
59            'content-type': 'text/plain; charset=utf-8',
60            }.get(name.lower(), default)
61
62    def read(self):
63        return b'xyzzy'
64
65    def getcode(self):
66        return self.code
67
68
69class uploadTestCase(BasePyPIRCCommandTestCase):
70
71    def setUp(self):
72        super(uploadTestCase, self).setUp()
73        self.old_open = upload_mod.urlopen
74        upload_mod.urlopen = self._urlopen
75        self.last_open = None
76        self.next_msg = None
77        self.next_code = None
78
79    def tearDown(self):
80        upload_mod.urlopen = self.old_open
81        super(uploadTestCase, self).tearDown()
82
83    def _urlopen(self, url):
84        self.last_open = FakeOpen(url, msg=self.next_msg, code=self.next_code)
85        return self.last_open
86
87    def test_finalize_options(self):
88
89        # new format
90        self.write_file(self.rc, PYPIRC)
91        dist = Distribution()
92        cmd = upload(dist)
93        cmd.finalize_options()
94        for attr, waited in (('username', 'me'), ('password', 'secret'),
95                             ('realm', 'pypi'),
96                             ('repository', 'https://upload.pypi.org/legacy/')):
97            self.assertEqual(getattr(cmd, attr), waited)
98
99    def test_saved_password(self):
100        # file with no password
101        self.write_file(self.rc, PYPIRC_NOPASSWORD)
102
103        # make sure it passes
104        dist = Distribution()
105        cmd = upload(dist)
106        cmd.finalize_options()
107        self.assertEqual(cmd.password, None)
108
109        # make sure we get it as well, if another command
110        # initialized it at the dist level
111        dist.password = 'xxx'
112        cmd = upload(dist)
113        cmd.finalize_options()
114        self.assertEqual(cmd.password, 'xxx')
115
116    def test_upload(self):
117        tmp = self.mkdtemp()
118        path = os.path.join(tmp, 'xxx')
119        self.write_file(path)
120        command, pyversion, filename = 'xxx', '2.6', path
121        dist_files = [(command, pyversion, filename)]
122        self.write_file(self.rc, PYPIRC_LONG_PASSWORD)
123
124        # lets run it
125        pkg_dir, dist = self.create_dist(dist_files=dist_files)
126        cmd = upload(dist)
127        cmd.show_response = 1
128        cmd.ensure_finalized()
129        cmd.run()
130
131        # what did we send ?
132        headers = dict(self.last_open.req.headers)
133        self.assertGreaterEqual(int(headers['Content-length']), 2162)
134        content_type = headers['Content-type']
135        self.assertTrue(content_type.startswith('multipart/form-data'))
136        self.assertEqual(self.last_open.req.get_method(), 'POST')
137        expected_url = 'https://upload.pypi.org/legacy/'
138        self.assertEqual(self.last_open.req.get_full_url(), expected_url)
139        data = self.last_open.req.data
140        self.assertIn(b'xxx',data)
141        self.assertIn(b'protocol_version', data)
142        self.assertIn(b'sha256_digest', data)
143        self.assertIn(
144            b'cd2eb0837c9b4c962c22d2ff8b5441b7b45805887f051d39bf133b583baf'
145            b'6860',
146            data
147        )
148        if b'md5_digest' in data:
149            self.assertIn(b'f561aaf6ef0bf14d4208bb46a4ccb3ad', data)
150        if b'blake2_256_digest' in data:
151            self.assertIn(
152                b'b6f289a27d4fe90da63c503bfe0a9b761a8f76bb86148565065f040be'
153                b'6d1c3044cf7ded78ef800509bccb4b648e507d88dc6383d67642aadcc'
154                b'ce443f1534330a',
155                data
156            )
157
158        # The PyPI response body was echoed
159        results = self.get_logs(INFO)
160        self.assertEqual(results[-1], 75 * '-' + '\nxyzzy\n' + 75 * '-')
161
162    # bpo-32304: archives whose last byte was b'\r' were corrupted due to
163    # normalization intended for Mac OS 9.
164    def test_upload_correct_cr(self):
165        # content that ends with \r should not be modified.
166        tmp = self.mkdtemp()
167        path = os.path.join(tmp, 'xxx')
168        self.write_file(path, content='yy\r')
169        command, pyversion, filename = 'xxx', '2.6', path
170        dist_files = [(command, pyversion, filename)]
171        self.write_file(self.rc, PYPIRC_LONG_PASSWORD)
172
173        # other fields that ended with \r used to be modified, now are
174        # preserved.
175        pkg_dir, dist = self.create_dist(
176            dist_files=dist_files,
177            description='long description\r'
178        )
179        cmd = upload(dist)
180        cmd.show_response = 1
181        cmd.ensure_finalized()
182        cmd.run()
183
184        headers = dict(self.last_open.req.headers)
185        self.assertGreaterEqual(int(headers['Content-length']), 2172)
186        self.assertIn(b'long description\r', self.last_open.req.data)
187
188    def test_upload_fails(self):
189        self.next_msg = "Not Found"
190        self.next_code = 404
191        self.assertRaises(DistutilsError, self.test_upload)
192
193    def test_wrong_exception_order(self):
194        tmp = self.mkdtemp()
195        path = os.path.join(tmp, 'xxx')
196        self.write_file(path)
197        dist_files = [('xxx', '2.6', path)]  # command, pyversion, filename
198        self.write_file(self.rc, PYPIRC_LONG_PASSWORD)
199
200        pkg_dir, dist = self.create_dist(dist_files=dist_files)
201        tests = [
202            (OSError('oserror'), 'oserror', OSError),
203            (HTTPError('url', 400, 'httperror', {}, None),
204             'Upload failed (400): httperror', DistutilsError),
205        ]
206        for exception, expected, raised_exception in tests:
207            with self.subTest(exception=type(exception).__name__):
208                with mock.patch('distutils.command.upload.urlopen',
209                                new=mock.Mock(side_effect=exception)):
210                    with self.assertRaises(raised_exception):
211                        cmd = upload(dist)
212                        cmd.ensure_finalized()
213                        cmd.run()
214                    results = self.get_logs(ERROR)
215                    self.assertIn(expected, results[-1])
216                    self.clear_logs()
217
218
219def test_suite():
220    return unittest.makeSuite(uploadTestCase)
221
222if __name__ == "__main__":
223    run_unittest(test_suite())
224