1import pytest
2import subprocess
3from focker.volume import command_volume_create, \
4    command_volume_prune, \
5    command_volume_list, \
6    command_volume_tag, \
7    command_volume_untag, \
8    command_volume_remove, \
9    command_volume_set, \
10    command_volume_get, \
11    command_volume_protect, \
12    command_volume_unprotect
13from focker.zfs import zfs_find, \
14    zfs_mountpoint, \
15    zfs_exists, \
16    zfs_parse_output, \
17    zfs_destroy, \
18    zfs_prune, \
19    zfs_run
20import os
21import focker.volume
22import focker.zfs
23
24
25def test_command_volume_create():
26    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-create'])
27    args = lambda: 0
28    args.tags = ['test-command-volume-create']
29    command_volume_create(args)
30    name, sha256 = zfs_find('test-command-volume-create', focker_type='volume')
31    assert os.path.exists(zfs_mountpoint(name))
32    subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-create'])
33
34
35def test_command_volume_prune():
36    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-prune'])
37    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-prune'])
38    name, sha256 = zfs_find('test-command-volume-prune', focker_type='volume')
39    mountpoint = zfs_mountpoint(name)
40    subprocess.check_output(['focker', 'volume', 'untag', 'test-command-volume-prune'])
41    args = lambda: 0
42    command_volume_prune(args)
43    assert not zfs_exists(name)
44    assert not os.path.exists(mountpoint)
45
46
47def test_command_volume_list(monkeypatch):
48    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-list'])
49    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2'])
50    name, sha256 = zfs_find('test-command-volume-list', focker_type='volume')
51    mountpoint = zfs_mountpoint(name)
52    args = lambda: 0
53    args.full_sha256 = True
54    lst = None
55    headers = None
56    def fake_tabulate(*args, **kwargs):
57        nonlocal lst
58        nonlocal headers
59        lst = args[0]
60        headers = kwargs['headers']
61    monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate)
62    command_volume_list(args)
63    assert lst is not None
64    assert headers == ['Tags', 'Size', 'SHA256', 'Mountpoint']
65    assert len(lst) >= 3
66    match = list(filter(lambda a: sorted(a[0].split(' ')) == ['test-command-volume-list',  'test-command-volume-list-1',  'test-command-volume-list-2'], lst))
67    assert len(match) == 1
68    match = match[0]
69    assert match[2] == sha256
70    assert match[3] == mountpoint
71    subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-list'])
72
73
74def test_command_volume_tag():
75    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-tag'])
76    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-tag'])
77    name_1, sha256_1 = zfs_find('test-command-volume-tag', focker_type='volume')
78    args = lambda: 0
79    args.reference = sha256_1
80    args.tags = ['test-a', 'test-b', 'test-c']
81    command_volume_tag(args)
82    for t in args.tags:
83        name_2, sha256_2 = zfs_find(t, focker_type='volume')
84        assert name_2 == name_1
85        assert sha256_2 == sha256_1
86    subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-tag'])
87    for t in args.tags:
88        with pytest.raises(ValueError):
89            zfs_find(t, focker_type='volume')
90    with pytest.raises(ValueError):
91        zfs_find('test-command-volume-tag', focker_type='volume')
92
93
94def test_command_volume_untag():
95    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-untag'])
96    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-untag', 'test-command-volume-untag-1', 'test-command-volume-untag-2'])
97    name, sha256 = zfs_find('test-command-volume-untag', focker_type='volume')
98    args = lambda: 0
99    args.tags = ['test-command-volume-untag-1', 'test-command-volume-untag-2']
100    command_volume_untag(args)
101    tags = zfs_parse_output(['zfs', 'get', '-H', 'focker:tags', name])
102    tags = tags[0][2].split(',')
103    assert tags == ['test-command-volume-untag']
104    with pytest.raises(ValueError):
105        zfs_find('test-command-volume-untag-1', focker_type='volume')
106    with pytest.raises(ValueError):
107        zfs_find('test-command-image-untag-2', focker_type='volume')
108    subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-untag'])
109
110
111def test_command_volume_remove():
112    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-remove'])
113    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-remove'])
114    name, sha256 = zfs_find('test-command-volume-remove', focker_type='volume')
115    mountpoint = zfs_mountpoint(name)
116    args = lambda: 0
117    args.references = ['test-command-volume-remove']
118    args.force = False
119    command_volume_remove(args)
120    with pytest.raises(ValueError):
121        zfs_find('test-command-volume-remove', focker_type='volume')
122    with pytest.raises(ValueError):
123        zfs_find(sha256, focker_type='volume')
124    assert not os.path.exists(mountpoint)
125    assert not zfs_exists(name)
126
127
128def test_command_volume_set():
129    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-set'])
130    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-set'])
131    name, sha256 = zfs_find('test-command-volume-set', focker_type='volume')
132    args = lambda: 0
133    args.reference = 'test-command-volume-set'
134    args.properties = ['rdonly=on', 'quota=1G']
135    command_volume_set(args)
136    props = zfs_parse_output(['zfs', 'get', '-H', 'rdonly,quota', name])
137    assert len(props) == 2
138    for i in range(2):
139        assert props[i][0] == name
140    assert props[0][1] == 'readonly'
141    assert props[1][1] == 'quota'
142    assert props[0][2] == 'on'
143    assert props[1][2] == '1G'
144    subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-set'])
145
146
147
148def test_command_volume_get(monkeypatch):
149    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-get'])
150    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-get'])
151    subprocess.check_output(['focker', 'volume', 'set', 'test-command-volume-get', 'rdonly=on', 'quota=1G'])
152    name, sha256 = zfs_find('test-command-volume-get', focker_type='volume')
153    args = lambda: 0
154    args.reference = 'test-command-volume-get'
155    args.properties = ['rdonly', 'quota']
156    lst = None
157    headers = None
158    def fake_tabulate(*args, **kwargs):
159        nonlocal lst
160        nonlocal headers
161        lst = args[0]
162        headers = kwargs['headers']
163    monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate)
164    command_volume_get(args)
165    assert lst is not None
166    assert headers is not None
167    assert lst == [ ['rdonly', 'on'], ['quota', '1G'] ]
168    assert headers == [ 'Property', 'Value' ]
169    # assert lst == ['on', '1G']
170    subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-get'])
171
172
173def test_command_volume_protect(monkeypatch):
174    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-protect'])
175    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-protect'])
176    args = lambda: 0
177    args.references = ['test-command-volume-protect']
178    command_volume_protect(args)
179    name, sha256 = zfs_find('test-command-volume-protect', focker_type='volume')
180    mountpoint = zfs_mountpoint(name)
181    lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name])
182    assert len(lst) == 1
183    assert lst[0][2] == 'on'
184    with pytest.raises(RuntimeError):
185        zfs_destroy(name)
186    subprocess.check_output(['focker', 'volume', 'untag', 'test-command-volume-protect'])
187    lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:tags', name])
188    assert len(lst) == 1
189    assert lst[0][2] == '-'
190    n_called = 0
191    def fake_run(*args, **kwargs):
192        nonlocal n_called
193        n_called += 1
194        return zfs_run(*args, **kwargs)
195    monkeypatch.setattr(focker.zfs, 'zfs_run', fake_run)
196    zfs_prune(focker_type='volume')
197    assert not n_called == 1
198    with pytest.raises(subprocess.CalledProcessError):
199        subprocess.check_output(['focker', 'volume', 'remove', sha256])
200    subprocess.check_output(['zfs', 'destroy', '-r', '-f', name])
201    assert not zfs_exists(name)
202    assert not os.path.exists(mountpoint)
203
204
205def test_command_volume_unprotect():
206    subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-unprotect'])
207    subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-unprotect'])
208    subprocess.check_output(['focker', 'volume', 'protect', 'test-command-volume-unprotect'])
209    name, _ = zfs_find('test-command-volume-unprotect', focker_type='volume')
210    lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name])
211    assert len(lst) == 1
212    assert lst[0][2] == 'on'
213    args = lambda: 0
214    args.references = ['test-command-volume-unprotect']
215    command_volume_unprotect(args)
216    lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name])
217    assert len(lst) == 1
218    assert lst[0][2] == '-'
219    subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-unprotect'])
220