1from contextlib import contextmanager
2import filecmp
3import functools
4import os
5import posix
6import stat
7import sys
8import sysconfig
9import tempfile
10import time
11import uuid
12import unittest
13
14from ..xattr import get_all
15from ..platform import get_flags
16from ..helpers import umount
17from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
18from .. import platform
19
20# Note: this is used by borg.selftest, do not use or import py.test functionality here.
21
22try:
23    import llfuse
24    # Does this version of llfuse support ns precision?
25    have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
26except ImportError:
27    have_fuse_mtime_ns = False
28
29try:
30    from pytest import raises
31except:  # noqa
32    raises = None
33
34has_lchflags = hasattr(os, 'lchflags') or sys.platform.startswith('linux')
35try:
36    with tempfile.NamedTemporaryFile() as file:
37        platform.set_flags(file.name, stat.UF_NODUMP)
38except OSError:
39    has_lchflags = False
40
41try:
42    import llfuse
43    has_llfuse = True or llfuse  # avoids "unused import"
44except ImportError:
45    has_llfuse = False
46
47# The mtime get/set precision varies on different OS and Python versions
48if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
49    st_mtime_ns_round = 0
50elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
51    st_mtime_ns_round = -6
52else:
53    st_mtime_ns_round = -9
54
55if sys.platform.startswith('netbsd'):
56    st_mtime_ns_round = -4  # only >1 microsecond resolution here?
57
58
59@contextmanager
60def unopened_tempfile():
61    with tempfile.TemporaryDirectory() as tempdir:
62        yield os.path.join(tempdir, "file")
63
64
65@functools.lru_cache()
66def are_symlinks_supported():
67    with unopened_tempfile() as filepath:
68        try:
69            os.symlink('somewhere', filepath)
70            if os.stat(filepath, follow_symlinks=False) and os.readlink(filepath) == 'somewhere':
71                return True
72        except OSError:
73            pass
74    return False
75
76
77@functools.lru_cache()
78def are_hardlinks_supported():
79    if not hasattr(os, 'link'):
80        # some pythons do not have os.link
81        return False
82
83    with unopened_tempfile() as file1path, unopened_tempfile() as file2path:
84        open(file1path, 'w').close()
85        try:
86            os.link(file1path, file2path)
87            stat1 = os.stat(file1path)
88            stat2 = os.stat(file2path)
89            if stat1.st_nlink == stat2.st_nlink == 2 and stat1.st_ino == stat2.st_ino:
90                return True
91        except OSError:
92            pass
93    return False
94
95
96@functools.lru_cache()
97def are_fifos_supported():
98    with unopened_tempfile() as filepath:
99        try:
100            os.mkfifo(filepath)
101            return True
102        except OSError:
103            return False
104
105
106@functools.lru_cache()
107def is_utime_fully_supported():
108    with unopened_tempfile() as filepath:
109        # Some filesystems (such as SSHFS) don't support utime on symlinks
110        if are_symlinks_supported():
111            os.symlink('something', filepath)
112        else:
113            open(filepath, 'w').close()
114        try:
115            os.utime(filepath, (1000, 2000), follow_symlinks=False)
116            new_stats = os.stat(filepath, follow_symlinks=False)
117            if new_stats.st_atime == 1000 and new_stats.st_mtime == 2000:
118                return True
119        except OSError:
120            pass
121        return False
122
123
124@functools.lru_cache()
125def is_birthtime_fully_supported():
126    if not hasattr(os.stat_result, 'st_birthtime'):
127        return False
128    with unopened_tempfile() as filepath:
129        # Some filesystems (such as SSHFS) don't support utime on symlinks
130        if are_symlinks_supported():
131            os.symlink('something', filepath)
132        else:
133            open(filepath, 'w').close()
134        try:
135            birthtime, mtime, atime = 946598400, 946684800, 946771200
136            os.utime(filepath, (atime, birthtime), follow_symlinks=False)
137            os.utime(filepath, (atime, mtime), follow_symlinks=False)
138            new_stats = os.stat(filepath, follow_symlinks=False)
139            if new_stats.st_birthtime == birthtime and new_stats.st_mtime == mtime and new_stats.st_atime == atime:
140                return True
141        except OSError:
142            pass
143        return False
144
145
146def no_selinux(x):
147    # selinux fails our FUSE tests, thus ignore selinux xattrs
148    SELINUX_KEY = 'security.selinux'
149    if isinstance(x, dict):
150        return {k: v for k, v in x.items() if k != SELINUX_KEY}
151    if isinstance(x, list):
152        return [k for k in x if k != SELINUX_KEY]
153
154
155class BaseTestCase(unittest.TestCase):
156    """
157    """
158    assert_in = unittest.TestCase.assertIn
159    assert_not_in = unittest.TestCase.assertNotIn
160    assert_equal = unittest.TestCase.assertEqual
161    assert_not_equal = unittest.TestCase.assertNotEqual
162    assert_true = unittest.TestCase.assertTrue
163
164    if raises:
165        assert_raises = staticmethod(raises)
166    else:
167        assert_raises = unittest.TestCase.assertRaises
168
169    @contextmanager
170    def assert_creates_file(self, path):
171        self.assert_true(not os.path.exists(path), '{} should not exist'.format(path))
172        yield
173        self.assert_true(os.path.exists(path), '{} should exist'.format(path))
174
175    def assert_dirs_equal(self, dir1, dir2, **kwargs):
176        diff = filecmp.dircmp(dir1, dir2)
177        self._assert_dirs_equal_cmp(diff, **kwargs)
178
179    def _assert_dirs_equal_cmp(self, diff, ignore_bsdflags=False, ignore_xattrs=False, ignore_ns=False):
180        self.assert_equal(diff.left_only, [])
181        self.assert_equal(diff.right_only, [])
182        self.assert_equal(diff.diff_files, [])
183        self.assert_equal(diff.funny_files, [])
184        for filename in diff.common:
185            path1 = os.path.join(diff.left, filename)
186            path2 = os.path.join(diff.right, filename)
187            s1 = os.stat(path1, follow_symlinks=False)
188            s2 = os.stat(path2, follow_symlinks=False)
189            # Assume path2 is on FUSE if st_dev is different
190            fuse = s1.st_dev != s2.st_dev
191            attrs = ['st_uid', 'st_gid', 'st_rdev']
192            if not fuse or not os.path.isdir(path1):
193                # dir nlink is always 1 on our FUSE filesystem
194                attrs.append('st_nlink')
195            d1 = [filename] + [getattr(s1, a) for a in attrs]
196            d2 = [filename] + [getattr(s2, a) for a in attrs]
197            d1.insert(1, oct(s1.st_mode))
198            d2.insert(1, oct(s2.st_mode))
199            if not ignore_bsdflags:
200                d1.append(get_flags(path1, s1))
201                d2.append(get_flags(path2, s2))
202            # ignore st_rdev if file is not a block/char device, fixes #203
203            if not stat.S_ISCHR(s1.st_mode) and not stat.S_ISBLK(s1.st_mode):
204                d1[4] = None
205            if not stat.S_ISCHR(s2.st_mode) and not stat.S_ISBLK(s2.st_mode):
206                d2[4] = None
207            # If utime isn't fully supported, borg can't set mtime.
208            # Therefore, we shouldn't test it in that case.
209            if is_utime_fully_supported():
210                # Older versions of llfuse do not support ns precision properly
211                if ignore_ns:
212                    d1.append(int(s1.st_mtime_ns / 1e9))
213                    d2.append(int(s2.st_mtime_ns / 1e9))
214                elif fuse and not have_fuse_mtime_ns:
215                    d1.append(round(s1.st_mtime_ns, -4))
216                    d2.append(round(s2.st_mtime_ns, -4))
217                else:
218                    d1.append(round(s1.st_mtime_ns, st_mtime_ns_round))
219                    d2.append(round(s2.st_mtime_ns, st_mtime_ns_round))
220            if not ignore_xattrs:
221                d1.append(no_selinux(get_all(path1, follow_symlinks=False)))
222                d2.append(no_selinux(get_all(path2, follow_symlinks=False)))
223            self.assert_equal(d1, d2)
224        for sub_diff in diff.subdirs.values():
225            self._assert_dirs_equal_cmp(sub_diff, ignore_bsdflags=ignore_bsdflags, ignore_xattrs=ignore_xattrs, ignore_ns=ignore_ns)
226
227    @contextmanager
228    def fuse_mount(self, location, mountpoint=None, *options, **kwargs):
229        if mountpoint is None:
230            mountpoint = tempfile.mkdtemp()
231        else:
232            os.mkdir(mountpoint)
233        if 'fork' not in kwargs:
234            # For a successful mount, `fork = True` is required for
235            # the borg mount daemon to work properly or the tests
236            # will just freeze. Therefore, if argument `fork` is not
237            # specified, the default value is `True`, regardless of
238            # `FORK_DEFAULT`. However, leaving the possibility to run
239            # the command with `fork = False` is still necessary for
240            # testing for mount failures, for example attempting to
241            # mount a read-only repo.
242            kwargs['fork'] = True
243        self.cmd('mount', location, mountpoint, *options, **kwargs)
244        if kwargs.get('exit_code', EXIT_SUCCESS) == EXIT_ERROR:
245            # If argument `exit_code = EXIT_ERROR`, then this call
246            # is testing the behavior of an unsuccessful mount and
247            # we must not continue, as there is no mount to work
248            # with. The test itself has already failed or succeeded
249            # with the call to `self.cmd`, above.
250            yield
251            return
252        self.wait_for_mountstate(mountpoint, mounted=True)
253        yield
254        umount(mountpoint)
255        self.wait_for_mountstate(mountpoint, mounted=False)
256        os.rmdir(mountpoint)
257        # Give the daemon some time to exit
258        time.sleep(0.2)
259
260    def wait_for_mountstate(self, mountpoint, *, mounted, timeout=5):
261        """Wait until a path meets specified mount point status"""
262        timeout += time.time()
263        while timeout > time.time():
264            if os.path.ismount(mountpoint) == mounted:
265                return
266            time.sleep(0.1)
267        message = 'Waiting for %s of %s' % ('mount' if mounted else 'umount', mountpoint)
268        raise TimeoutError(message)
269
270    @contextmanager
271    def read_only(self, path):
272        """Some paths need to be made read-only for testing
273
274        If the tests are executed inside a fakeroot environment, the
275        changes from chmod won't affect the real permissions of that
276        folder. This issue is circumvented by temporarily disabling
277        fakeroot with `LD_PRELOAD=`.
278
279        Using chmod to remove write permissions is not enough if the
280        tests are running with root privileges. Instead, the folder is
281        rendered immutable with chattr or chflags, respectively.
282        """
283        if sys.platform.startswith('linux'):
284            cmd_immutable = 'chattr +i "%s"' % path
285            cmd_mutable = 'chattr -i "%s"' % path
286        elif sys.platform.startswith(('darwin', 'freebsd', 'netbsd', 'openbsd')):
287            cmd_immutable = 'chflags uchg "%s"' % path
288            cmd_mutable = 'chflags nouchg "%s"' % path
289        elif sys.platform.startswith('sunos'):  # openindiana
290            cmd_immutable = 'chmod S+vimmutable "%s"' % path
291            cmd_mutable = 'chmod S-vimmutable "%s"' % path
292        else:
293            message = 'Testing read-only repos is not supported on platform %s' % sys.platform
294            self.skipTest(message)
295        try:
296            os.system('LD_PRELOAD= chmod -R ugo-w "%s"' % path)
297            os.system(cmd_immutable)
298            yield
299        finally:
300            # Restore permissions to ensure clean-up doesn't fail
301            os.system(cmd_mutable)
302            os.system('LD_PRELOAD= chmod -R ugo+w "%s"' % path)
303
304
305class changedir:
306    def __init__(self, dir):
307        self.dir = dir
308
309    def __enter__(self):
310        self.old = os.getcwd()
311        os.chdir(self.dir)
312
313    def __exit__(self, *args, **kw):
314        os.chdir(self.old)
315
316
317class environment_variable:
318    def __init__(self, **values):
319        self.values = values
320        self.old_values = {}
321
322    def __enter__(self):
323        for k, v in self.values.items():
324            self.old_values[k] = os.environ.get(k)
325            if v is None:
326                os.environ.pop(k, None)
327            else:
328                os.environ[k] = v
329
330    def __exit__(self, *args, **kw):
331        for k, v in self.old_values.items():
332            if v is None:
333                os.environ.pop(k, None)
334            else:
335                os.environ[k] = v
336
337
338class FakeInputs:
339    """Simulate multiple user inputs, can be used as input() replacement"""
340    def __init__(self, inputs):
341        self.inputs = inputs
342
343    def __call__(self, prompt=None):
344        if prompt is not None:
345            print(prompt, end='')
346        try:
347            return self.inputs.pop(0)
348        except IndexError:
349            raise EOFError from None
350