1# -*- encoding: utf8 -*-
2"""Tests for distutils.command.upload."""
3import os
4import unittest
5from test.test_support import run_unittest
6
7from distutils.command import upload as upload_mod
8from distutils.command.upload import upload
9from distutils.core import Distribution
10from distutils.errors import DistutilsError
11
12from distutils.tests.test_config import PYPIRC, PyPIRCCommandTestCase
13
14PYPIRC_LONG_PASSWORD = """\
15[distutils]
16
17index-servers =
18    server1
19    server2
20
21[server1]
22username:me
23password:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24
25[server2]
26username:meagain
27password: secret
28realm:acme
29repository:http://another.pypi/
30"""
31
32
33PYPIRC_NOPASSWORD = """\
34[distutils]
35
36index-servers =
37    server1
38
39[server1]
40username:me
41"""
42
43class FakeOpen(object):
44
45    def __init__(self, url, msg=None, code=None):
46        self.url = url
47        if not isinstance(url, str):
48            self.req = url
49        else:
50            self.req = None
51        self.msg = msg or 'OK'
52        self.code = code or 200
53
54    def getcode(self):
55        return self.code
56
57
58class uploadTestCase(PyPIRCCommandTestCase):
59
60    def setUp(self):
61        super(uploadTestCase, self).setUp()
62        self.old_open = upload_mod.urlopen
63        upload_mod.urlopen = self._urlopen
64        self.last_open = None
65        self.next_msg = None
66        self.next_code = None
67
68    def tearDown(self):
69        upload_mod.urlopen = self.old_open
70        super(uploadTestCase, self).tearDown()
71
72    def _urlopen(self, url):
73        self.last_open = FakeOpen(url, msg=self.next_msg, code=self.next_code)
74        return self.last_open
75
76    def test_finalize_options(self):
77
78        # new format
79        self.write_file(self.rc, PYPIRC)
80        dist = Distribution()
81        cmd = upload(dist)
82        cmd.finalize_options()
83        for attr, waited in (('username', 'me'), ('password', 'secret'),
84                             ('realm', 'pypi'),
85                             ('repository', 'https://upload.pypi.org/legacy/')):
86            self.assertEqual(getattr(cmd, attr), waited)
87
88    def test_saved_password(self):
89        # file with no password
90        self.write_file(self.rc, PYPIRC_NOPASSWORD)
91
92        # make sure it passes
93        dist = Distribution()
94        cmd = upload(dist)
95        cmd.finalize_options()
96        self.assertEqual(cmd.password, None)
97
98        # make sure we get it as well, if another command
99        # initialized it at the dist level
100        dist.password = 'xxx'
101        cmd = upload(dist)
102        cmd.finalize_options()
103        self.assertEqual(cmd.password, 'xxx')
104
105    def test_upload(self):
106        tmp = self.mkdtemp()
107        path = os.path.join(tmp, 'xxx')
108        self.write_file(path)
109        command, pyversion, filename = 'xxx', '2.6', path
110        dist_files = [(command, pyversion, filename)]
111        self.write_file(self.rc, PYPIRC_LONG_PASSWORD)
112
113        # lets run it
114        pkg_dir, dist = self.create_dist(dist_files=dist_files, author=u'dédé')
115        cmd = upload(dist)
116        cmd.ensure_finalized()
117        cmd.run()
118
119        # what did we send ?
120        self.assertIn('dédé', self.last_open.req.data)
121        headers = dict(self.last_open.req.headers)
122        self.assertEqual(headers['Content-length'], '2159')
123        self.assertTrue(headers['Content-type'].startswith('multipart/form-data'))
124        self.assertEqual(self.last_open.req.get_method(), 'POST')
125        self.assertEqual(self.last_open.req.get_full_url(),
126                         'https://upload.pypi.org/legacy/')
127        self.assertIn('xxx', self.last_open.req.data)
128        auth = self.last_open.req.headers['Authorization']
129        self.assertNotIn('\n', auth)
130
131    # bpo-32304: archives whose last byte was b'\r' were corrupted due to
132    # normalization intended for Mac OS 9.
133    def test_upload_correct_cr(self):
134        # content that ends with \r should not be modified.
135        tmp = self.mkdtemp()
136        path = os.path.join(tmp, 'xxx')
137        self.write_file(path, content='yy\r')
138        command, pyversion, filename = 'xxx', '2.6', path
139        dist_files = [(command, pyversion, filename)]
140        self.write_file(self.rc, PYPIRC_LONG_PASSWORD)
141
142        # other fields that ended with \r used to be modified, now are
143        # preserved.
144        pkg_dir, dist = self.create_dist(
145            dist_files=dist_files,
146            description='long description\r'
147        )
148        cmd = upload(dist)
149        cmd.ensure_finalized()
150        cmd.run()
151
152        headers = dict(self.last_open.req.headers)
153        self.assertEqual(headers['Content-length'], '2170')
154        self.assertIn(b'long description\r', self.last_open.req.data)
155        self.assertNotIn(b'long description\r\n', self.last_open.req.data)
156
157    def test_upload_fails(self):
158        self.next_msg = "Not Found"
159        self.next_code = 404
160        self.assertRaises(DistutilsError, self.test_upload)
161
162def test_suite():
163    return unittest.makeSuite(uploadTestCase)
164
165if __name__ == "__main__":
166    run_unittest(test_suite())
167