1# -*- coding: utf-8 -*-
2"""Tests for distutils.archive_util."""
3__revision__ = "$Id$"
4
5import unittest
6import os
7import sys
8import tarfile
9from os.path import splitdrive
10import warnings
11
12from distutils.archive_util import (check_archive_formats, make_tarball,
13                                    make_zipfile, make_archive,
14                                    ARCHIVE_FORMATS)
15from distutils.spawn import find_executable, spawn
16from distutils.tests import support
17from test.test_support import check_warnings, run_unittest
18
19try:
20    import grp
21    import pwd
22    UID_GID_SUPPORT = True
23except ImportError:
24    UID_GID_SUPPORT = False
25
26try:
27    import zipfile
28    ZIP_SUPPORT = True
29except ImportError:
30    ZIP_SUPPORT = find_executable('zip')
31
32# some tests will fail if zlib is not available
33try:
34    import zlib
35except ImportError:
36    zlib = None
37
38def can_fs_encode(filename):
39    """
40    Return True if the filename can be saved in the file system.
41    """
42    if os.path.supports_unicode_filenames:
43        return True
44    try:
45        filename.encode(sys.getfilesystemencoding())
46    except UnicodeEncodeError:
47        return False
48    return True
49
50
51class ArchiveUtilTestCase(support.TempdirManager,
52                          support.LoggingSilencer,
53                          unittest.TestCase):
54
55    @unittest.skipUnless(zlib, "requires zlib")
56    def test_make_tarball(self):
57        self._make_tarball('archive')
58
59    def _make_tarball(self, target_name):
60        # creating something to tar
61        tmpdir = self.mkdtemp()
62        self.write_file([tmpdir, 'file1'], 'xxx')
63        self.write_file([tmpdir, 'file2'], 'xxx')
64        os.mkdir(os.path.join(tmpdir, 'sub'))
65        self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
66
67        tmpdir2 = self.mkdtemp()
68        unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
69                            "source and target should be on same drive")
70
71        base_name = os.path.join(tmpdir2, target_name)
72
73        # working with relative paths to avoid tar warnings
74        old_dir = os.getcwd()
75        os.chdir(tmpdir)
76        try:
77            make_tarball(splitdrive(base_name)[1], '.')
78        finally:
79            os.chdir(old_dir)
80
81        # check if the compressed tarball was created
82        tarball = base_name + '.tar.gz'
83        self.assertTrue(os.path.exists(tarball))
84
85        # trying an uncompressed one
86        base_name = os.path.join(tmpdir2, target_name)
87        old_dir = os.getcwd()
88        os.chdir(tmpdir)
89        try:
90            make_tarball(splitdrive(base_name)[1], '.', compress=None)
91        finally:
92            os.chdir(old_dir)
93        tarball = base_name + '.tar'
94        self.assertTrue(os.path.exists(tarball))
95
96    def _tarinfo(self, path):
97        tar = tarfile.open(path)
98        try:
99            names = tar.getnames()
100            names.sort()
101            return names
102        finally:
103            tar.close()
104
105    def _create_files(self):
106        # creating something to tar
107        tmpdir = self.mkdtemp()
108        dist = os.path.join(tmpdir, 'dist')
109        os.mkdir(dist)
110        self.write_file([dist, 'file1'], 'xxx')
111        self.write_file([dist, 'file2'], 'xxx')
112        os.mkdir(os.path.join(dist, 'sub'))
113        self.write_file([dist, 'sub', 'file3'], 'xxx')
114        os.mkdir(os.path.join(dist, 'sub2'))
115        tmpdir2 = self.mkdtemp()
116        base_name = os.path.join(tmpdir2, 'archive')
117        return tmpdir, tmpdir2, base_name
118
119    @unittest.skipUnless(zlib, "Requires zlib")
120    @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
121                         'Need the tar command to run')
122    def test_tarfile_vs_tar(self):
123        tmpdir, tmpdir2, base_name =  self._create_files()
124        old_dir = os.getcwd()
125        os.chdir(tmpdir)
126        try:
127            make_tarball(base_name, 'dist')
128        finally:
129            os.chdir(old_dir)
130
131        # check if the compressed tarball was created
132        tarball = base_name + '.tar.gz'
133        self.assertTrue(os.path.exists(tarball))
134
135        # now create another tarball using `tar`
136        tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
137        tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
138        gzip_cmd = ['gzip', '-f9', 'archive2.tar']
139        old_dir = os.getcwd()
140        os.chdir(tmpdir)
141        try:
142            spawn(tar_cmd)
143            spawn(gzip_cmd)
144        finally:
145            os.chdir(old_dir)
146
147        self.assertTrue(os.path.exists(tarball2))
148        # let's compare both tarballs
149        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
150
151        # trying an uncompressed one
152        base_name = os.path.join(tmpdir2, 'archive')
153        old_dir = os.getcwd()
154        os.chdir(tmpdir)
155        try:
156            make_tarball(base_name, 'dist', compress=None)
157        finally:
158            os.chdir(old_dir)
159        tarball = base_name + '.tar'
160        self.assertTrue(os.path.exists(tarball))
161
162        # now for a dry_run
163        base_name = os.path.join(tmpdir2, 'archive')
164        old_dir = os.getcwd()
165        os.chdir(tmpdir)
166        try:
167            make_tarball(base_name, 'dist', compress=None, dry_run=True)
168        finally:
169            os.chdir(old_dir)
170        tarball = base_name + '.tar'
171        self.assertTrue(os.path.exists(tarball))
172
173    @unittest.skipUnless(find_executable('compress'),
174                         'The compress program is required')
175    def test_compress_deprecated(self):
176        tmpdir, tmpdir2, base_name =  self._create_files()
177
178        # using compress and testing the PendingDeprecationWarning
179        old_dir = os.getcwd()
180        os.chdir(tmpdir)
181        try:
182            with check_warnings() as w:
183                warnings.simplefilter("always")
184                make_tarball(base_name, 'dist', compress='compress')
185        finally:
186            os.chdir(old_dir)
187        tarball = base_name + '.tar.Z'
188        self.assertTrue(os.path.exists(tarball))
189        self.assertEqual(len(w.warnings), 1)
190
191        # same test with dry_run
192        os.remove(tarball)
193        old_dir = os.getcwd()
194        os.chdir(tmpdir)
195        try:
196            with check_warnings() as w:
197                warnings.simplefilter("always")
198                make_tarball(base_name, 'dist', compress='compress',
199                             dry_run=True)
200        finally:
201            os.chdir(old_dir)
202        self.assertFalse(os.path.exists(tarball))
203        self.assertEqual(len(w.warnings), 1)
204
205    @unittest.skipUnless(zlib, "Requires zlib")
206    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
207    def test_make_zipfile(self):
208        # creating something to tar
209        tmpdir = self.mkdtemp()
210        self.write_file([tmpdir, 'file1'], 'xxx')
211        self.write_file([tmpdir, 'file2'], 'xxx')
212
213        tmpdir2 = self.mkdtemp()
214        base_name = os.path.join(tmpdir2, 'archive')
215        make_zipfile(base_name, tmpdir)
216
217        # check if the compressed tarball was created
218        tarball = base_name + '.zip'
219
220    def test_check_archive_formats(self):
221        self.assertEqual(check_archive_formats(['gztar', 'xxx', 'zip']),
222                         'xxx')
223        self.assertEqual(check_archive_formats(['gztar', 'zip']), None)
224
225    def test_make_archive(self):
226        tmpdir = self.mkdtemp()
227        base_name = os.path.join(tmpdir, 'archive')
228        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
229
230    @unittest.skipUnless(zlib, "Requires zlib")
231    def test_make_archive_owner_group(self):
232        # testing make_archive with owner and group, with various combinations
233        # this works even if there's not gid/uid support
234        if UID_GID_SUPPORT:
235            group = grp.getgrgid(0)[0]
236            owner = pwd.getpwuid(0)[0]
237        else:
238            group = owner = 'root'
239
240        base_dir, root_dir, base_name =  self._create_files()
241        base_name = os.path.join(self.mkdtemp() , 'archive')
242        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
243                           group=group)
244        self.assertTrue(os.path.exists(res))
245
246        res = make_archive(base_name, 'zip', root_dir, base_dir)
247        self.assertTrue(os.path.exists(res))
248
249        res = make_archive(base_name, 'tar', root_dir, base_dir,
250                           owner=owner, group=group)
251        self.assertTrue(os.path.exists(res))
252
253        res = make_archive(base_name, 'tar', root_dir, base_dir,
254                           owner='kjhkjhkjg', group='oihohoh')
255        self.assertTrue(os.path.exists(res))
256
257    @unittest.skipUnless(zlib, "Requires zlib")
258    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
259    def test_tarfile_root_owner(self):
260        tmpdir, tmpdir2, base_name =  self._create_files()
261        old_dir = os.getcwd()
262        os.chdir(tmpdir)
263        group = grp.getgrgid(0)[0]
264        owner = pwd.getpwuid(0)[0]
265        try:
266            archive_name = make_tarball(base_name, 'dist', compress=None,
267                                        owner=owner, group=group)
268        finally:
269            os.chdir(old_dir)
270
271        # check if the compressed tarball was created
272        self.assertTrue(os.path.exists(archive_name))
273
274        # now checks the rights
275        archive = tarfile.open(archive_name)
276        try:
277            for member in archive.getmembers():
278                self.assertEqual(member.uid, 0)
279                self.assertEqual(member.gid, 0)
280        finally:
281            archive.close()
282
283    def test_make_archive_cwd(self):
284        current_dir = os.getcwd()
285        def _breaks(*args, **kw):
286            raise RuntimeError()
287        ARCHIVE_FORMATS['xxx'] = (_breaks, [], 'xxx file')
288        try:
289            try:
290                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
291            except:
292                pass
293            self.assertEqual(os.getcwd(), current_dir)
294        finally:
295            del ARCHIVE_FORMATS['xxx']
296
297    @unittest.skipUnless(zlib, "requires zlib")
298    def test_make_tarball_unicode(self):
299        """
300        Mirror test_make_tarball, except filename is unicode.
301        """
302        self._make_tarball(u'archive')
303
304    @unittest.skipUnless(zlib, "requires zlib")
305    @unittest.skipUnless(can_fs_encode(u'årchiv'),
306        'File system cannot handle this filename')
307    def test_make_tarball_unicode_latin1(self):
308        """
309        Mirror test_make_tarball, except filename is unicode and contains
310        latin characters.
311        """
312        self._make_tarball(u'årchiv') # note this isn't a real word
313
314    @unittest.skipUnless(zlib, "requires zlib")
315    @unittest.skipUnless(can_fs_encode(u'のアーカイブ'),
316        'File system cannot handle this filename')
317    def test_make_tarball_unicode_extended(self):
318        """
319        Mirror test_make_tarball, except filename is unicode and contains
320        characters outside the latin charset.
321        """
322        self._make_tarball(u'のアーカイブ') # japanese for archive
323
324def test_suite():
325    return unittest.makeSuite(ArchiveUtilTestCase)
326
327if __name__ == "__main__":
328    run_unittest(test_suite())
329