1
2from __future__ import absolute_import, print_function
3from binascii import unhexlify
4from errno import ELOOP, ENOTDIR
5from os import symlink
6from stat import S_IFDIR
7from sys import stderr
8from time import localtime, strftime
9
10from wvtest import *
11
12from bup import git, path, vfs
13from bup.compat import environ
14from bup.io import path_msg
15from bup.metadata import Metadata
16from bup.repo import LocalRepo, RemoteRepo
17from bup.test.vfs import tree_dict
18from buptest import ex, exo, no_lingering_errors, test_tempdir
19
20bup_path = path.exe()
21
22## The clear_cache() calls below are to make sure that the test starts
23## from a known state since at the moment the cache entry for a given
24## item (like a commit) can change.  For example, its meta value might
25## be promoted from a mode to a Metadata instance once the tree it
26## refers to is traversed.
27
28def prep_and_test_repo(name, create_repo, test_repo):
29    with no_lingering_errors():
30        with test_tempdir(b'bup-t' + name) as tmpdir:
31            bup_dir = tmpdir + b'/bup'
32            environ[b'GIT_DIR'] = bup_dir
33            environ[b'BUP_DIR'] = bup_dir
34            ex((bup_path, b'init'))
35            git.repodir = bup_dir
36            with create_repo(bup_dir) as repo:
37                test_repo(repo, tmpdir)
38
39# Currently, we just test through the repos since LocalRepo resolve is
40# just a straight redirection to vfs.resolve.
41
42def test_resolve(repo, tmpdir):
43        data_path = tmpdir + b'/src'
44        resolve = repo.resolve
45        save_time = 100000
46        save_time_str = strftime('%Y-%m-%d-%H%M%S', localtime(save_time)).encode('ascii')
47        os.mkdir(data_path)
48        os.mkdir(data_path + b'/dir')
49        with open(data_path + b'/file', 'wb+') as tmpfile:
50            tmpfile.write(b'canary\n')
51        symlink(b'file', data_path + b'/file-symlink')
52        symlink(b'dir', data_path + b'/dir-symlink')
53        symlink(b'not-there', data_path + b'/bad-symlink')
54        ex((bup_path, b'index', b'-v', data_path))
55        ex((bup_path, b'save', b'-d', b'%d' % save_time, b'-tvvn', b'test',
56            b'--strip', data_path))
57        ex((bup_path, b'tag', b'test-tag', b'test'))
58
59        tip_hash = exo((b'git', b'show-ref', b'refs/heads/test'))[0]
60        tip_oidx = tip_hash.strip().split()[0]
61        tip_oid = unhexlify(tip_oidx)
62        tip_tree_oidx = exo((b'git', b'log', b'--pretty=%T', b'-n1',
63                             tip_oidx))[0].strip()
64        tip_tree_oid = unhexlify(tip_tree_oidx)
65        tip_tree = tree_dict(repo, tip_tree_oid)
66        test_revlist_w_meta = vfs.RevList(meta=tip_tree[b'.'].meta,
67                                          oid=tip_oid)
68        expected_latest_item = vfs.Commit(meta=S_IFDIR | 0o755,
69                                          oid=tip_tree_oid,
70                                          coid=tip_oid)
71        expected_latest_item_w_meta = vfs.Commit(meta=tip_tree[b'.'].meta,
72                                                 oid=tip_tree_oid,
73                                                 coid=tip_oid)
74        expected_latest_link = vfs.FakeLink(meta=vfs.default_symlink_mode,
75                                            target=save_time_str)
76        expected_test_tag_item = expected_latest_item
77
78        wvstart('resolve: /')
79        vfs.clear_cache()
80        res = resolve(b'/')
81        wvpasseq(1, len(res))
82        wvpasseq(((b'', vfs._root),), res)
83        ignore, root_item = res[0]
84        root_content = frozenset(vfs.contents(repo, root_item))
85        wvpasseq(frozenset([(b'.', root_item),
86                            (b'.tag', vfs._tags),
87                            (b'test', test_revlist_w_meta)]),
88                 root_content)
89        for path in (b'//', b'/.', b'/./', b'/..', b'/../',
90                     b'/test/latest/dir/../../..',
91                     b'/test/latest/dir/../../../',
92                     b'/test/latest/dir/../../../.',
93                     b'/test/latest/dir/../../..//',
94                     b'/test//latest/dir/../../..',
95                     b'/test/./latest/dir/../../..',
96                     b'/test/././latest/dir/../../..',
97                     b'/test/.//./latest/dir/../../..',
98                     b'/test//.//.//latest/dir/../../..'
99                     b'/test//./latest/dir/../../..'):
100            wvstart('resolve: ' + path_msg(path))
101            vfs.clear_cache()
102            res = resolve(path)
103            wvpasseq(((b'', vfs._root),), res)
104
105        wvstart('resolve: /.tag')
106        vfs.clear_cache()
107        res = resolve(b'/.tag')
108        wvpasseq(2, len(res))
109        wvpasseq(((b'', vfs._root), (b'.tag', vfs._tags)),
110                 res)
111        ignore, tag_item = res[1]
112        tag_content = frozenset(vfs.contents(repo, tag_item))
113        wvpasseq(frozenset([(b'.', tag_item),
114                            (b'test-tag', expected_test_tag_item)]),
115                 tag_content)
116
117        wvstart('resolve: /test')
118        vfs.clear_cache()
119        res = resolve(b'/test')
120        wvpasseq(2, len(res))
121        wvpasseq(((b'', vfs._root), (b'test', test_revlist_w_meta)), res)
122        ignore, test_item = res[1]
123        test_content = frozenset(vfs.contents(repo, test_item))
124        # latest has metadata here due to caching
125        wvpasseq(frozenset([(b'.', test_revlist_w_meta),
126                            (save_time_str, expected_latest_item_w_meta),
127                            (b'latest', expected_latest_link)]),
128                 test_content)
129
130        wvstart('resolve: /test/latest')
131        vfs.clear_cache()
132        res = resolve(b'/test/latest')
133        wvpasseq(3, len(res))
134        expected_latest_item_w_meta = vfs.Commit(meta=tip_tree[b'.'].meta,
135                                                 oid=tip_tree_oid,
136                                                 coid=tip_oid)
137        expected = ((b'', vfs._root),
138                    (b'test', test_revlist_w_meta),
139                    (save_time_str, expected_latest_item_w_meta))
140        wvpasseq(expected, res)
141        ignore, latest_item = res[2]
142        latest_content = frozenset(vfs.contents(repo, latest_item))
143        expected = frozenset((x.name, vfs.Item(oid=x.oid, meta=x.meta))
144                             for x in (tip_tree[name]
145                                       for name in (b'.',
146                                                    b'bad-symlink',
147                                                    b'dir',
148                                                    b'dir-symlink',
149                                                    b'file',
150                                                    b'file-symlink')))
151        wvpasseq(expected, latest_content)
152
153        wvstart('resolve: /test/latest/file')
154        vfs.clear_cache()
155        res = resolve(b'/test/latest/file')
156        wvpasseq(4, len(res))
157        expected_file_item_w_meta = vfs.Item(meta=tip_tree[b'file'].meta,
158                                             oid=tip_tree[b'file'].oid)
159        expected = ((b'', vfs._root),
160                    (b'test', test_revlist_w_meta),
161                    (save_time_str, expected_latest_item_w_meta),
162                    (b'file', expected_file_item_w_meta))
163        wvpasseq(expected, res)
164
165        wvstart('resolve: /test/latest/bad-symlink')
166        vfs.clear_cache()
167        res = resolve(b'/test/latest/bad-symlink')
168        wvpasseq(4, len(res))
169        expected = ((b'', vfs._root),
170                    (b'test', test_revlist_w_meta),
171                    (save_time_str, expected_latest_item_w_meta),
172                    (b'not-there', None))
173        wvpasseq(expected, res)
174
175        wvstart('resolve nofollow: /test/latest/bad-symlink')
176        vfs.clear_cache()
177        res = resolve(b'/test/latest/bad-symlink', follow=False)
178        wvpasseq(4, len(res))
179        bad_symlink_value = tip_tree[b'bad-symlink']
180        expected_bad_symlink_item_w_meta = vfs.Item(meta=bad_symlink_value.meta,
181                                                    oid=bad_symlink_value.oid)
182        expected = ((b'', vfs._root),
183                    (b'test', test_revlist_w_meta),
184                    (save_time_str, expected_latest_item_w_meta),
185                    (b'bad-symlink', expected_bad_symlink_item_w_meta))
186        wvpasseq(expected, res)
187
188        wvstart('resolve: /test/latest/file-symlink')
189        vfs.clear_cache()
190        res = resolve(b'/test/latest/file-symlink')
191        wvpasseq(4, len(res))
192        expected = ((b'', vfs._root),
193                    (b'test', test_revlist_w_meta),
194                    (save_time_str, expected_latest_item_w_meta),
195                    (b'file', expected_file_item_w_meta))
196        wvpasseq(expected, res)
197
198        wvstart('resolve nofollow: /test/latest/file-symlink')
199        vfs.clear_cache()
200        res = resolve(b'/test/latest/file-symlink', follow=False)
201        wvpasseq(4, len(res))
202        file_symlink_value = tip_tree[b'file-symlink']
203        expected_file_symlink_item_w_meta = vfs.Item(meta=file_symlink_value.meta,
204                                                     oid=file_symlink_value.oid)
205        expected = ((b'', vfs._root),
206                    (b'test', test_revlist_w_meta),
207                    (save_time_str, expected_latest_item_w_meta),
208                    (b'file-symlink', expected_file_symlink_item_w_meta))
209        wvpasseq(expected, res)
210
211        wvstart('resolve: /test/latest/missing')
212        vfs.clear_cache()
213        res = resolve(b'/test/latest/missing')
214        wvpasseq(4, len(res))
215        name, item = res[-1]
216        wvpasseq(b'missing', name)
217        wvpass(item is None)
218
219        for path in (b'/test/latest/file/',
220                     b'/test/latest/file/.',
221                     b'/test/latest/file/..',
222                     b'/test/latest/file/../',
223                     b'/test/latest/file/../.',
224                     b'/test/latest/file/../..',
225                     b'/test/latest/file/foo'):
226            wvstart('resolve: ' + path_msg(path))
227            vfs.clear_cache()
228            try:
229                resolve(path)
230            except vfs.IOError as res_ex:
231                wvpasseq(ENOTDIR, res_ex.errno)
232                wvpasseq([b'', b'test', save_time_str, b'file'],
233                         [name for name, item in res_ex.terminus])
234
235        for path in (b'/test/latest/file-symlink/',
236                     b'/test/latest/file-symlink/.',
237                     b'/test/latest/file-symlink/..',
238                     b'/test/latest/file-symlink/../',
239                     b'/test/latest/file-symlink/../.',
240                     b'/test/latest/file-symlink/../..'):
241            wvstart('resolve nofollow: ' + path_msg(path))
242            vfs.clear_cache()
243            try:
244                resolve(path, follow=False)
245            except vfs.IOError as res_ex:
246                wvpasseq(ENOTDIR, res_ex.errno)
247                wvpasseq([b'', b'test', save_time_str, b'file'],
248                         [name for name, item in res_ex.terminus])
249
250        wvstart('resolve: non-directory parent')
251        vfs.clear_cache()
252        file_res = resolve(b'/test/latest/file')
253        try:
254            resolve(b'foo', parent=file_res)
255        except vfs.IOError as res_ex:
256            wvpasseq(ENOTDIR, res_ex.errno)
257            wvpasseq(None, res_ex.terminus)
258
259        wvstart('resolve nofollow: /test/latest/dir-symlink')
260        vfs.clear_cache()
261        res = resolve(b'/test/latest/dir-symlink', follow=False)
262        wvpasseq(4, len(res))
263        dir_symlink_value = tip_tree[b'dir-symlink']
264        expected_dir_symlink_item_w_meta = vfs.Item(meta=dir_symlink_value.meta,
265                                                     oid=dir_symlink_value.oid)
266        expected = ((b'', vfs._root),
267                    (b'test', test_revlist_w_meta),
268                    (save_time_str, expected_latest_item_w_meta),
269                    (b'dir-symlink', expected_dir_symlink_item_w_meta))
270        wvpasseq(expected, res)
271
272        dir_value = tip_tree[b'dir']
273        expected_dir_item = vfs.Item(oid=dir_value.oid,
274                                     meta=tree_dict(repo, dir_value.oid)[b'.'].meta)
275        expected = ((b'', vfs._root),
276                    (b'test', test_revlist_w_meta),
277                    (save_time_str, expected_latest_item_w_meta),
278                    (b'dir', expected_dir_item))
279        def lresolve(*args, **keys):
280            return resolve(*args, **dict(keys, follow=False))
281        for resname, resolver in (('resolve', resolve),
282                                  ('resolve nofollow', lresolve)):
283            for path in (b'/test/latest/dir-symlink/',
284                         b'/test/latest/dir-symlink/.'):
285                wvstart(resname + ': ' + path_msg(path))
286                vfs.clear_cache()
287                res = resolver(path)
288                wvpasseq(4, len(res))
289                wvpasseq(expected, res)
290        wvstart('resolve: /test/latest/dir-symlink')
291        vfs.clear_cache()
292        res = resolve(path)
293        wvpasseq(4, len(res))
294        wvpasseq(expected, res)
295
296@wvtest
297def test_local_resolve():
298    prep_and_test_repo(b'local-vfs-resolve',
299                       lambda x: LocalRepo(repo_dir=x), test_resolve)
300
301@wvtest
302def test_remote_resolve():
303    prep_and_test_repo(b'remote-vfs-resolve',
304                       lambda x: RemoteRepo(x), test_resolve)
305
306def test_resolve_loop(repo, tmpdir):
307    data_path = tmpdir + b'/src'
308    os.mkdir(data_path)
309    symlink(b'loop', data_path + b'/loop')
310    ex((bup_path, b'init'))
311    ex((bup_path, b'index', b'-v', data_path))
312    save_utc = 100000
313    ex((bup_path, b'save', b'-d', b'%d' % save_utc, b'-tvvn', b'test', b'--strip',
314        data_path))
315    save_name = strftime('%Y-%m-%d-%H%M%S', localtime(save_utc)).encode('ascii')
316    try:
317        wvpasseq('this call should never return',
318                 repo.resolve(b'/test/%s/loop' % save_name))
319    except vfs.IOError as res_ex:
320        wvpasseq(ELOOP, res_ex.errno)
321        wvpasseq([b'', b'test', save_name, b'loop'],
322                 [name for name, item in res_ex.terminus])
323
324@wvtest
325def test_local_resolve_loop():
326    prep_and_test_repo(b'local-vfs-resolve-loop',
327                       lambda x: LocalRepo(x), test_resolve_loop)
328
329@wvtest
330def test_remote_resolve_loop():
331    prep_and_test_repo(b'remote-vfs-resolve-loop',
332                       lambda x: RemoteRepo(x), test_resolve_loop)
333
334# FIXME: add tests for the want_meta=False cases.
335