1from contextlib import contextmanager 2import filecmp 3import functools 4import os 5import posix 6import stat 7import sys 8import sysconfig 9import tempfile 10import time 11import uuid 12import unittest 13 14from ..xattr import get_all 15from ..platform import get_flags 16from ..helpers import umount 17from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR 18from .. import platform 19 20# Note: this is used by borg.selftest, do not use or import py.test functionality here. 21 22try: 23 import llfuse 24 # Does this version of llfuse support ns precision? 25 have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns') 26except ImportError: 27 have_fuse_mtime_ns = False 28 29try: 30 from pytest import raises 31except: # noqa 32 raises = None 33 34has_lchflags = hasattr(os, 'lchflags') or sys.platform.startswith('linux') 35try: 36 with tempfile.NamedTemporaryFile() as file: 37 platform.set_flags(file.name, stat.UF_NODUMP) 38except OSError: 39 has_lchflags = False 40 41try: 42 import llfuse 43 has_llfuse = True or llfuse # avoids "unused import" 44except ImportError: 45 has_llfuse = False 46 47# The mtime get/set precision varies on different OS and Python versions 48if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []): 49 st_mtime_ns_round = 0 50elif 'HAVE_UTIMES' in sysconfig.get_config_vars(): 51 st_mtime_ns_round = -6 52else: 53 st_mtime_ns_round = -9 54 55if sys.platform.startswith('netbsd'): 56 st_mtime_ns_round = -4 # only >1 microsecond resolution here? 57 58 59@contextmanager 60def unopened_tempfile(): 61 with tempfile.TemporaryDirectory() as tempdir: 62 yield os.path.join(tempdir, "file") 63 64 65@functools.lru_cache() 66def are_symlinks_supported(): 67 with unopened_tempfile() as filepath: 68 try: 69 os.symlink('somewhere', filepath) 70 if os.stat(filepath, follow_symlinks=False) and os.readlink(filepath) == 'somewhere': 71 return True 72 except OSError: 73 pass 74 return False 75 76 77@functools.lru_cache() 78def are_hardlinks_supported(): 79 if not hasattr(os, 'link'): 80 # some pythons do not have os.link 81 return False 82 83 with unopened_tempfile() as file1path, unopened_tempfile() as file2path: 84 open(file1path, 'w').close() 85 try: 86 os.link(file1path, file2path) 87 stat1 = os.stat(file1path) 88 stat2 = os.stat(file2path) 89 if stat1.st_nlink == stat2.st_nlink == 2 and stat1.st_ino == stat2.st_ino: 90 return True 91 except OSError: 92 pass 93 return False 94 95 96@functools.lru_cache() 97def are_fifos_supported(): 98 with unopened_tempfile() as filepath: 99 try: 100 os.mkfifo(filepath) 101 return True 102 except OSError: 103 return False 104 105 106@functools.lru_cache() 107def is_utime_fully_supported(): 108 with unopened_tempfile() as filepath: 109 # Some filesystems (such as SSHFS) don't support utime on symlinks 110 if are_symlinks_supported(): 111 os.symlink('something', filepath) 112 else: 113 open(filepath, 'w').close() 114 try: 115 os.utime(filepath, (1000, 2000), follow_symlinks=False) 116 new_stats = os.stat(filepath, follow_symlinks=False) 117 if new_stats.st_atime == 1000 and new_stats.st_mtime == 2000: 118 return True 119 except OSError: 120 pass 121 return False 122 123 124@functools.lru_cache() 125def is_birthtime_fully_supported(): 126 if not hasattr(os.stat_result, 'st_birthtime'): 127 return False 128 with unopened_tempfile() as filepath: 129 # Some filesystems (such as SSHFS) don't support utime on symlinks 130 if are_symlinks_supported(): 131 os.symlink('something', filepath) 132 else: 133 open(filepath, 'w').close() 134 try: 135 birthtime, mtime, atime = 946598400, 946684800, 946771200 136 os.utime(filepath, (atime, birthtime), follow_symlinks=False) 137 os.utime(filepath, (atime, mtime), follow_symlinks=False) 138 new_stats = os.stat(filepath, follow_symlinks=False) 139 if new_stats.st_birthtime == birthtime and new_stats.st_mtime == mtime and new_stats.st_atime == atime: 140 return True 141 except OSError: 142 pass 143 return False 144 145 146def no_selinux(x): 147 # selinux fails our FUSE tests, thus ignore selinux xattrs 148 SELINUX_KEY = 'security.selinux' 149 if isinstance(x, dict): 150 return {k: v for k, v in x.items() if k != SELINUX_KEY} 151 if isinstance(x, list): 152 return [k for k in x if k != SELINUX_KEY] 153 154 155class BaseTestCase(unittest.TestCase): 156 """ 157 """ 158 assert_in = unittest.TestCase.assertIn 159 assert_not_in = unittest.TestCase.assertNotIn 160 assert_equal = unittest.TestCase.assertEqual 161 assert_not_equal = unittest.TestCase.assertNotEqual 162 assert_true = unittest.TestCase.assertTrue 163 164 if raises: 165 assert_raises = staticmethod(raises) 166 else: 167 assert_raises = unittest.TestCase.assertRaises 168 169 @contextmanager 170 def assert_creates_file(self, path): 171 self.assert_true(not os.path.exists(path), '{} should not exist'.format(path)) 172 yield 173 self.assert_true(os.path.exists(path), '{} should exist'.format(path)) 174 175 def assert_dirs_equal(self, dir1, dir2, **kwargs): 176 diff = filecmp.dircmp(dir1, dir2) 177 self._assert_dirs_equal_cmp(diff, **kwargs) 178 179 def _assert_dirs_equal_cmp(self, diff, ignore_bsdflags=False, ignore_xattrs=False, ignore_ns=False): 180 self.assert_equal(diff.left_only, []) 181 self.assert_equal(diff.right_only, []) 182 self.assert_equal(diff.diff_files, []) 183 self.assert_equal(diff.funny_files, []) 184 for filename in diff.common: 185 path1 = os.path.join(diff.left, filename) 186 path2 = os.path.join(diff.right, filename) 187 s1 = os.stat(path1, follow_symlinks=False) 188 s2 = os.stat(path2, follow_symlinks=False) 189 # Assume path2 is on FUSE if st_dev is different 190 fuse = s1.st_dev != s2.st_dev 191 attrs = ['st_uid', 'st_gid', 'st_rdev'] 192 if not fuse or not os.path.isdir(path1): 193 # dir nlink is always 1 on our FUSE filesystem 194 attrs.append('st_nlink') 195 d1 = [filename] + [getattr(s1, a) for a in attrs] 196 d2 = [filename] + [getattr(s2, a) for a in attrs] 197 d1.insert(1, oct(s1.st_mode)) 198 d2.insert(1, oct(s2.st_mode)) 199 if not ignore_bsdflags: 200 d1.append(get_flags(path1, s1)) 201 d2.append(get_flags(path2, s2)) 202 # ignore st_rdev if file is not a block/char device, fixes #203 203 if not stat.S_ISCHR(s1.st_mode) and not stat.S_ISBLK(s1.st_mode): 204 d1[4] = None 205 if not stat.S_ISCHR(s2.st_mode) and not stat.S_ISBLK(s2.st_mode): 206 d2[4] = None 207 # If utime isn't fully supported, borg can't set mtime. 208 # Therefore, we shouldn't test it in that case. 209 if is_utime_fully_supported(): 210 # Older versions of llfuse do not support ns precision properly 211 if ignore_ns: 212 d1.append(int(s1.st_mtime_ns / 1e9)) 213 d2.append(int(s2.st_mtime_ns / 1e9)) 214 elif fuse and not have_fuse_mtime_ns: 215 d1.append(round(s1.st_mtime_ns, -4)) 216 d2.append(round(s2.st_mtime_ns, -4)) 217 else: 218 d1.append(round(s1.st_mtime_ns, st_mtime_ns_round)) 219 d2.append(round(s2.st_mtime_ns, st_mtime_ns_round)) 220 if not ignore_xattrs: 221 d1.append(no_selinux(get_all(path1, follow_symlinks=False))) 222 d2.append(no_selinux(get_all(path2, follow_symlinks=False))) 223 self.assert_equal(d1, d2) 224 for sub_diff in diff.subdirs.values(): 225 self._assert_dirs_equal_cmp(sub_diff, ignore_bsdflags=ignore_bsdflags, ignore_xattrs=ignore_xattrs, ignore_ns=ignore_ns) 226 227 @contextmanager 228 def fuse_mount(self, location, mountpoint=None, *options, **kwargs): 229 if mountpoint is None: 230 mountpoint = tempfile.mkdtemp() 231 else: 232 os.mkdir(mountpoint) 233 if 'fork' not in kwargs: 234 # For a successful mount, `fork = True` is required for 235 # the borg mount daemon to work properly or the tests 236 # will just freeze. Therefore, if argument `fork` is not 237 # specified, the default value is `True`, regardless of 238 # `FORK_DEFAULT`. However, leaving the possibility to run 239 # the command with `fork = False` is still necessary for 240 # testing for mount failures, for example attempting to 241 # mount a read-only repo. 242 kwargs['fork'] = True 243 self.cmd('mount', location, mountpoint, *options, **kwargs) 244 if kwargs.get('exit_code', EXIT_SUCCESS) == EXIT_ERROR: 245 # If argument `exit_code = EXIT_ERROR`, then this call 246 # is testing the behavior of an unsuccessful mount and 247 # we must not continue, as there is no mount to work 248 # with. The test itself has already failed or succeeded 249 # with the call to `self.cmd`, above. 250 yield 251 return 252 self.wait_for_mountstate(mountpoint, mounted=True) 253 yield 254 umount(mountpoint) 255 self.wait_for_mountstate(mountpoint, mounted=False) 256 os.rmdir(mountpoint) 257 # Give the daemon some time to exit 258 time.sleep(0.2) 259 260 def wait_for_mountstate(self, mountpoint, *, mounted, timeout=5): 261 """Wait until a path meets specified mount point status""" 262 timeout += time.time() 263 while timeout > time.time(): 264 if os.path.ismount(mountpoint) == mounted: 265 return 266 time.sleep(0.1) 267 message = 'Waiting for %s of %s' % ('mount' if mounted else 'umount', mountpoint) 268 raise TimeoutError(message) 269 270 @contextmanager 271 def read_only(self, path): 272 """Some paths need to be made read-only for testing 273 274 If the tests are executed inside a fakeroot environment, the 275 changes from chmod won't affect the real permissions of that 276 folder. This issue is circumvented by temporarily disabling 277 fakeroot with `LD_PRELOAD=`. 278 279 Using chmod to remove write permissions is not enough if the 280 tests are running with root privileges. Instead, the folder is 281 rendered immutable with chattr or chflags, respectively. 282 """ 283 if sys.platform.startswith('linux'): 284 cmd_immutable = 'chattr +i "%s"' % path 285 cmd_mutable = 'chattr -i "%s"' % path 286 elif sys.platform.startswith(('darwin', 'freebsd', 'netbsd', 'openbsd')): 287 cmd_immutable = 'chflags uchg "%s"' % path 288 cmd_mutable = 'chflags nouchg "%s"' % path 289 elif sys.platform.startswith('sunos'): # openindiana 290 cmd_immutable = 'chmod S+vimmutable "%s"' % path 291 cmd_mutable = 'chmod S-vimmutable "%s"' % path 292 else: 293 message = 'Testing read-only repos is not supported on platform %s' % sys.platform 294 self.skipTest(message) 295 try: 296 os.system('LD_PRELOAD= chmod -R ugo-w "%s"' % path) 297 os.system(cmd_immutable) 298 yield 299 finally: 300 # Restore permissions to ensure clean-up doesn't fail 301 os.system(cmd_mutable) 302 os.system('LD_PRELOAD= chmod -R ugo+w "%s"' % path) 303 304 305class changedir: 306 def __init__(self, dir): 307 self.dir = dir 308 309 def __enter__(self): 310 self.old = os.getcwd() 311 os.chdir(self.dir) 312 313 def __exit__(self, *args, **kw): 314 os.chdir(self.old) 315 316 317class environment_variable: 318 def __init__(self, **values): 319 self.values = values 320 self.old_values = {} 321 322 def __enter__(self): 323 for k, v in self.values.items(): 324 self.old_values[k] = os.environ.get(k) 325 if v is None: 326 os.environ.pop(k, None) 327 else: 328 os.environ[k] = v 329 330 def __exit__(self, *args, **kw): 331 for k, v in self.old_values.items(): 332 if v is None: 333 os.environ.pop(k, None) 334 else: 335 os.environ[k] = v 336 337 338class FakeInputs: 339 """Simulate multiple user inputs, can be used as input() replacement""" 340 def __init__(self, inputs): 341 self.inputs = inputs 342 343 def __call__(self, prompt=None): 344 if prompt is not None: 345 print(prompt, end='') 346 try: 347 return self.inputs.pop(0) 348 except IndexError: 349 raise EOFError from None 350