1#!/usr/bin/env python3
2
3if __name__ == '__main__':
4    import pytest
5    import sys
6    sys.exit(pytest.main([__file__] + sys.argv[1:]))
7
8import subprocess
9import os
10import sys
11import pytest
12import stat
13import shutil
14import filecmp
15import errno
16from contextlib import contextmanager
17from tempfile import NamedTemporaryFile
18from util import (wait_for_mount, umount, cleanup, base_cmdline,
19                  basename, fuse_test_marker, safe_sleep)
20from os.path import join as pjoin
21
22TEST_FILE = __file__
23
24pytestmark = fuse_test_marker()
25
26with open(TEST_FILE, 'rb') as fh:
27    TEST_DATA = fh.read()
28
29def name_generator(__ctr=[0]):
30    __ctr[0] += 1
31    return 'testfile_%d' % __ctr[0]
32
33@pytest.mark.parametrize("debug", (False, True))
34@pytest.mark.parametrize("cache_timeout", (0,1))
35@pytest.mark.parametrize("sync_rd", (True, False))
36@pytest.mark.parametrize("multiconn", (True,False))
37def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, multiconn, capfd):
38
39    # Avoid false positives from debug messages
40    #if debug:
41    #    capfd.register_output(r'^   unique: [0-9]+, error: -[0-9]+ .+$',
42    #                          count=0)
43
44    # Avoid false positives from storing key for localhost
45    capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0)
46
47    # Test if we can ssh into localhost without password
48    try:
49        res = subprocess.call(['ssh', '-o', 'KbdInteractiveAuthentication=no',
50                               '-o', 'ChallengeResponseAuthentication=no',
51                               '-o', 'PasswordAuthentication=no',
52                               'localhost', '--', 'true'], stdin=subprocess.DEVNULL,
53                              timeout=10)
54    except subprocess.TimeoutExpired:
55        res = 1
56    if res != 0:
57        pytest.fail('Unable to ssh into localhost without password prompt.')
58
59    mnt_dir = str(tmpdir.mkdir('mnt'))
60    src_dir = str(tmpdir.mkdir('src'))
61
62    cmdline = base_cmdline + [ pjoin(basename, 'sshfs'),
63                               '-f', 'localhost:' + src_dir, mnt_dir ]
64    if debug:
65        cmdline += [ '-o', 'sshfs_debug' ]
66
67    if sync_rd:
68        cmdline += [ '-o', 'sync_readdir' ]
69
70    # SSHFS Cache
71    if cache_timeout == 0:
72        cmdline += [ '-o', 'dir_cache=no' ]
73    else:
74        cmdline += [ '-o', 'dcache_timeout=%d' % cache_timeout,
75                     '-o', 'dir_cache=yes' ]
76
77    # FUSE Cache
78    cmdline += [ '-o', 'entry_timeout=0',
79                 '-o', 'attr_timeout=0' ]
80
81    if multiconn:
82        cmdline += [ '-o', 'max_conns=3' ]
83
84    new_env = dict(os.environ) # copy, don't modify
85
86    # Abort on warnings from glib
87    new_env['G_DEBUG'] = 'fatal-warnings'
88
89    mount_process = subprocess.Popen(cmdline, env=new_env)
90    try:
91        wait_for_mount(mount_process, mnt_dir)
92
93        tst_statvfs(mnt_dir)
94        tst_readdir(src_dir, mnt_dir)
95        tst_open_read(src_dir, mnt_dir)
96        tst_open_write(src_dir, mnt_dir)
97        tst_append(src_dir, mnt_dir)
98        tst_seek(src_dir, mnt_dir)
99        tst_create(mnt_dir)
100        tst_passthrough(src_dir, mnt_dir, cache_timeout)
101        tst_mkdir(mnt_dir)
102        tst_rmdir(src_dir, mnt_dir, cache_timeout)
103        tst_unlink(src_dir, mnt_dir, cache_timeout)
104        tst_symlink(mnt_dir)
105        if os.getuid() == 0:
106            tst_chown(mnt_dir)
107
108        # SSHFS only supports one second resolution when setting
109        # file timestamps.
110        tst_utimens(mnt_dir, tol=1)
111        tst_utimens_now(mnt_dir)
112
113        tst_link(mnt_dir, cache_timeout)
114        tst_truncate_path(mnt_dir)
115        tst_truncate_fd(mnt_dir)
116        tst_open_unlink(mnt_dir)
117    except:
118        cleanup(mount_process, mnt_dir)
119        raise
120    else:
121        umount(mount_process, mnt_dir)
122
123@contextmanager
124def os_open(name, flags):
125    fd = os.open(name, flags)
126    try:
127        yield fd
128    finally:
129        os.close(fd)
130
131def os_create(name):
132    os.close(os.open(name, os.O_CREAT | os.O_RDWR))
133
134def tst_unlink(src_dir, mnt_dir, cache_timeout):
135    name = name_generator()
136    fullname = mnt_dir + "/" + name
137    with open(pjoin(src_dir, name), 'wb') as fh:
138        fh.write(b'hello')
139    if cache_timeout:
140        safe_sleep(cache_timeout+1)
141    assert name in os.listdir(mnt_dir)
142    os.unlink(fullname)
143    with pytest.raises(OSError) as exc_info:
144        os.stat(fullname)
145    assert exc_info.value.errno == errno.ENOENT
146    assert name not in os.listdir(mnt_dir)
147    assert name not in os.listdir(src_dir)
148
149def tst_mkdir(mnt_dir):
150    dirname = name_generator()
151    fullname = mnt_dir + "/" + dirname
152    os.mkdir(fullname)
153    fstat = os.stat(fullname)
154    assert stat.S_ISDIR(fstat.st_mode)
155    assert os.listdir(fullname) ==  []
156    assert fstat.st_nlink in (1,2)
157    assert dirname in os.listdir(mnt_dir)
158
159def tst_rmdir(src_dir, mnt_dir, cache_timeout):
160    name = name_generator()
161    fullname = mnt_dir + "/" + name
162    os.mkdir(pjoin(src_dir, name))
163    if cache_timeout:
164        safe_sleep(cache_timeout+1)
165    assert name in os.listdir(mnt_dir)
166    os.rmdir(fullname)
167    with pytest.raises(OSError) as exc_info:
168        os.stat(fullname)
169    assert exc_info.value.errno == errno.ENOENT
170    assert name not in os.listdir(mnt_dir)
171    assert name not in os.listdir(src_dir)
172
173def tst_symlink(mnt_dir):
174    linkname = name_generator()
175    fullname = mnt_dir + "/" + linkname
176    os.symlink("/imaginary/dest", fullname)
177    fstat = os.lstat(fullname)
178    assert stat.S_ISLNK(fstat.st_mode)
179    assert os.readlink(fullname) == "/imaginary/dest"
180    assert fstat.st_nlink == 1
181    assert linkname in os.listdir(mnt_dir)
182
183def tst_create(mnt_dir):
184    name = name_generator()
185    fullname = pjoin(mnt_dir, name)
186    with pytest.raises(OSError) as exc_info:
187        os.stat(fullname)
188    assert exc_info.value.errno == errno.ENOENT
189    assert name not in os.listdir(mnt_dir)
190
191    fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
192    os.close(fd)
193
194    assert name in os.listdir(mnt_dir)
195    fstat = os.lstat(fullname)
196    assert stat.S_ISREG(fstat.st_mode)
197    assert fstat.st_nlink == 1
198    assert fstat.st_size == 0
199
200def tst_chown(mnt_dir):
201    filename = pjoin(mnt_dir, name_generator())
202    os.mkdir(filename)
203    fstat = os.lstat(filename)
204    uid = fstat.st_uid
205    gid = fstat.st_gid
206
207    uid_new = uid + 1
208    os.chown(filename, uid_new, -1)
209    fstat = os.lstat(filename)
210    assert fstat.st_uid == uid_new
211    assert fstat.st_gid == gid
212
213    gid_new = gid + 1
214    os.chown(filename, -1, gid_new)
215    fstat = os.lstat(filename)
216    assert fstat.st_uid == uid_new
217    assert fstat.st_gid == gid_new
218
219def tst_open_read(src_dir, mnt_dir):
220    name = name_generator()
221    with open(pjoin(src_dir, name), 'wb') as fh_out, \
222         open(TEST_FILE, 'rb') as fh_in:
223        shutil.copyfileobj(fh_in, fh_out)
224
225    assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
226
227def tst_open_write(src_dir, mnt_dir):
228    name = name_generator()
229    fd = os.open(pjoin(src_dir, name),
230                 os.O_CREAT | os.O_RDWR)
231    os.close(fd)
232    fullname = pjoin(mnt_dir, name)
233    with open(fullname, 'wb') as fh_out, \
234         open(TEST_FILE, 'rb') as fh_in:
235        shutil.copyfileobj(fh_in, fh_out)
236
237    assert filecmp.cmp(fullname, TEST_FILE, False)
238
239def tst_append(src_dir, mnt_dir):
240    name = name_generator()
241    os_create(pjoin(src_dir, name))
242    fullname = pjoin(mnt_dir, name)
243    with os_open(fullname, os.O_WRONLY) as fd:
244        os.write(fd, b'foo\n')
245    with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd:
246        os.write(fd, b'bar\n')
247
248    with open(fullname, 'rb') as fh:
249        assert fh.read() == b'foo\nbar\n'
250
251def tst_seek(src_dir, mnt_dir):
252    name = name_generator()
253    os_create(pjoin(src_dir, name))
254    fullname = pjoin(mnt_dir, name)
255    with os_open(fullname, os.O_WRONLY) as fd:
256        os.lseek(fd, 1, os.SEEK_SET)
257        os.write(fd, b'foobar\n')
258    with os_open(fullname, os.O_WRONLY) as fd:
259        os.lseek(fd, 4, os.SEEK_SET)
260        os.write(fd, b'com')
261
262    with open(fullname, 'rb') as fh:
263        assert fh.read() == b'\0foocom\n'
264
265def tst_open_unlink(mnt_dir):
266    name = pjoin(mnt_dir, name_generator())
267    data1 = b'foo'
268    data2 = b'bar'
269    fullname = pjoin(mnt_dir, name)
270    with open(fullname, 'wb+', buffering=0) as fh:
271        fh.write(data1)
272        os.unlink(fullname)
273        with pytest.raises(OSError) as exc_info:
274            os.stat(fullname)
275            assert exc_info.value.errno == errno.ENOENT
276        assert name not in os.listdir(mnt_dir)
277        fh.write(data2)
278        fh.seek(0)
279        assert fh.read() == data1+data2
280
281def tst_statvfs(mnt_dir):
282    os.statvfs(mnt_dir)
283
284def tst_link(mnt_dir, cache_timeout):
285    name1 = pjoin(mnt_dir, name_generator())
286    name2 = pjoin(mnt_dir, name_generator())
287    shutil.copyfile(TEST_FILE, name1)
288    assert filecmp.cmp(name1, TEST_FILE, False)
289
290    fstat1 = os.lstat(name1)
291    assert fstat1.st_nlink == 1
292
293    os.link(name1, name2)
294
295    # The link operation changes st_ctime, and if we're unlucky
296    # the kernel will keep the old value cached for name1, and
297    # retrieve the new value for name2 (at least, this is the only
298    # way I can explain the test failure). To avoid this problem,
299    # we need to wait until the cached value has expired.
300    if cache_timeout:
301        safe_sleep(cache_timeout)
302
303    fstat1 = os.lstat(name1)
304    fstat2 = os.lstat(name2)
305    for attr in ('st_mode', 'st_dev', 'st_uid', 'st_gid',
306                 'st_size', 'st_atime', 'st_mtime', 'st_ctime'):
307        assert getattr(fstat1, attr) == getattr(fstat2, attr)
308    assert os.path.basename(name2) in os.listdir(mnt_dir)
309    assert filecmp.cmp(name1, name2, False)
310
311    os.unlink(name2)
312
313    assert os.path.basename(name2) not in os.listdir(mnt_dir)
314    with pytest.raises(FileNotFoundError):
315        os.lstat(name2)
316
317    os.unlink(name1)
318
319def tst_readdir(src_dir, mnt_dir):
320    newdir = name_generator()
321    src_newdir = pjoin(src_dir, newdir)
322    mnt_newdir = pjoin(mnt_dir, newdir)
323    file_ = src_newdir + "/" + name_generator()
324    subdir = src_newdir + "/" + name_generator()
325    subfile = subdir + "/" + name_generator()
326
327    os.mkdir(src_newdir)
328    shutil.copyfile(TEST_FILE, file_)
329    os.mkdir(subdir)
330    shutil.copyfile(TEST_FILE, subfile)
331
332    listdir_is = os.listdir(mnt_newdir)
333    listdir_is.sort()
334    listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
335    listdir_should.sort()
336    assert listdir_is == listdir_should
337
338    os.unlink(file_)
339    os.unlink(subfile)
340    os.rmdir(subdir)
341    os.rmdir(src_newdir)
342
343def tst_truncate_path(mnt_dir):
344    assert len(TEST_DATA) > 1024
345
346    filename = pjoin(mnt_dir, name_generator())
347    with open(filename, 'wb') as fh:
348        fh.write(TEST_DATA)
349
350    fstat = os.stat(filename)
351    size = fstat.st_size
352    assert size == len(TEST_DATA)
353
354    # Add zeros at the end
355    os.truncate(filename, size + 1024)
356    assert os.stat(filename).st_size == size + 1024
357    with open(filename, 'rb') as fh:
358        assert fh.read(size) == TEST_DATA
359        assert fh.read(1025) == b'\0' * 1024
360
361    # Truncate data
362    os.truncate(filename, size - 1024)
363    assert os.stat(filename).st_size == size - 1024
364    with open(filename, 'rb') as fh:
365        assert fh.read(size) == TEST_DATA[:size-1024]
366
367    os.unlink(filename)
368
369def tst_truncate_fd(mnt_dir):
370    assert len(TEST_DATA) > 1024
371    with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
372        fd = fh.fileno()
373        fh.write(TEST_DATA)
374        fstat = os.fstat(fd)
375        size = fstat.st_size
376        assert size == len(TEST_DATA)
377
378        # Add zeros at the end
379        os.ftruncate(fd, size + 1024)
380        assert os.fstat(fd).st_size == size + 1024
381        fh.seek(0)
382        assert fh.read(size) == TEST_DATA
383        assert fh.read(1025) == b'\0' * 1024
384
385        # Truncate data
386        os.ftruncate(fd, size - 1024)
387        assert os.fstat(fd).st_size == size - 1024
388        fh.seek(0)
389        assert fh.read(size) == TEST_DATA[:size-1024]
390
391def tst_utimens(mnt_dir, tol=0):
392    filename = pjoin(mnt_dir, name_generator())
393    os.mkdir(filename)
394    fstat = os.lstat(filename)
395
396    atime = fstat.st_atime + 42.28
397    mtime = fstat.st_mtime - 42.23
398    if sys.version_info < (3,3):
399        os.utime(filename, (atime, mtime))
400    else:
401        atime_ns = fstat.st_atime_ns + int(42.28*1e9)
402        mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
403        os.utime(filename, None, ns=(atime_ns, mtime_ns))
404
405    fstat = os.lstat(filename)
406
407    assert abs(fstat.st_atime - atime) < tol
408    assert abs(fstat.st_mtime - mtime) < tol
409    if sys.version_info >= (3,3):
410        assert abs(fstat.st_atime_ns - atime_ns) < tol*1e9
411        assert abs(fstat.st_mtime_ns - mtime_ns) < tol*1e9
412
413def tst_utimens_now(mnt_dir):
414    fullname = pjoin(mnt_dir, name_generator())
415
416    fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
417    os.close(fd)
418    os.utime(fullname, None)
419
420    fstat = os.lstat(fullname)
421    # We should get now-timestamps
422    assert fstat.st_atime != 0
423    assert fstat.st_mtime != 0
424
425def tst_passthrough(src_dir, mnt_dir, cache_timeout):
426    name = name_generator()
427    src_name = pjoin(src_dir, name)
428    mnt_name = pjoin(src_dir, name)
429    assert name not in os.listdir(src_dir)
430    assert name not in os.listdir(mnt_dir)
431    with open(src_name, 'w') as fh:
432        fh.write('Hello, world')
433    assert name in os.listdir(src_dir)
434    if cache_timeout:
435        safe_sleep(cache_timeout+1)
436    assert name in os.listdir(mnt_dir)
437    assert os.stat(src_name) == os.stat(mnt_name)
438
439    name = name_generator()
440    src_name = pjoin(src_dir, name)
441    mnt_name = pjoin(src_dir, name)
442    assert name not in os.listdir(src_dir)
443    assert name not in os.listdir(mnt_dir)
444    with open(mnt_name, 'w') as fh:
445        fh.write('Hello, world')
446    assert name in os.listdir(src_dir)
447    if cache_timeout:
448        safe_sleep(cache_timeout+1)
449    assert name in os.listdir(mnt_dir)
450    assert os.stat(src_name) == os.stat(mnt_name)
451