1import pytest 2import subprocess 3from focker.volume import command_volume_create, \ 4 command_volume_prune, \ 5 command_volume_list, \ 6 command_volume_tag, \ 7 command_volume_untag, \ 8 command_volume_remove, \ 9 command_volume_set, \ 10 command_volume_get, \ 11 command_volume_protect, \ 12 command_volume_unprotect 13from focker.zfs import zfs_find, \ 14 zfs_mountpoint, \ 15 zfs_exists, \ 16 zfs_parse_output, \ 17 zfs_destroy, \ 18 zfs_prune, \ 19 zfs_run 20import os 21import focker.volume 22import focker.zfs 23 24 25def test_command_volume_create(): 26 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-create']) 27 args = lambda: 0 28 args.tags = ['test-command-volume-create'] 29 command_volume_create(args) 30 name, sha256 = zfs_find('test-command-volume-create', focker_type='volume') 31 assert os.path.exists(zfs_mountpoint(name)) 32 subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-create']) 33 34 35def test_command_volume_prune(): 36 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-prune']) 37 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-prune']) 38 name, sha256 = zfs_find('test-command-volume-prune', focker_type='volume') 39 mountpoint = zfs_mountpoint(name) 40 subprocess.check_output(['focker', 'volume', 'untag', 'test-command-volume-prune']) 41 args = lambda: 0 42 command_volume_prune(args) 43 assert not zfs_exists(name) 44 assert not os.path.exists(mountpoint) 45 46 47def test_command_volume_list(monkeypatch): 48 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-list']) 49 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2']) 50 name, sha256 = zfs_find('test-command-volume-list', focker_type='volume') 51 mountpoint = zfs_mountpoint(name) 52 args = lambda: 0 53 args.full_sha256 = True 54 lst = None 55 headers = None 56 def fake_tabulate(*args, **kwargs): 57 nonlocal lst 58 nonlocal headers 59 lst = args[0] 60 headers = kwargs['headers'] 61 monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate) 62 command_volume_list(args) 63 assert lst is not None 64 assert headers == ['Tags', 'Size', 'SHA256', 'Mountpoint'] 65 assert len(lst) >= 3 66 match = list(filter(lambda a: sorted(a[0].split(' ')) == ['test-command-volume-list', 'test-command-volume-list-1', 'test-command-volume-list-2'], lst)) 67 assert len(match) == 1 68 match = match[0] 69 assert match[2] == sha256 70 assert match[3] == mountpoint 71 subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-list']) 72 73 74def test_command_volume_tag(): 75 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-tag']) 76 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-tag']) 77 name_1, sha256_1 = zfs_find('test-command-volume-tag', focker_type='volume') 78 args = lambda: 0 79 args.reference = sha256_1 80 args.tags = ['test-a', 'test-b', 'test-c'] 81 command_volume_tag(args) 82 for t in args.tags: 83 name_2, sha256_2 = zfs_find(t, focker_type='volume') 84 assert name_2 == name_1 85 assert sha256_2 == sha256_1 86 subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-tag']) 87 for t in args.tags: 88 with pytest.raises(ValueError): 89 zfs_find(t, focker_type='volume') 90 with pytest.raises(ValueError): 91 zfs_find('test-command-volume-tag', focker_type='volume') 92 93 94def test_command_volume_untag(): 95 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-untag']) 96 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-untag', 'test-command-volume-untag-1', 'test-command-volume-untag-2']) 97 name, sha256 = zfs_find('test-command-volume-untag', focker_type='volume') 98 args = lambda: 0 99 args.tags = ['test-command-volume-untag-1', 'test-command-volume-untag-2'] 100 command_volume_untag(args) 101 tags = zfs_parse_output(['zfs', 'get', '-H', 'focker:tags', name]) 102 tags = tags[0][2].split(',') 103 assert tags == ['test-command-volume-untag'] 104 with pytest.raises(ValueError): 105 zfs_find('test-command-volume-untag-1', focker_type='volume') 106 with pytest.raises(ValueError): 107 zfs_find('test-command-image-untag-2', focker_type='volume') 108 subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-untag']) 109 110 111def test_command_volume_remove(): 112 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-remove']) 113 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-remove']) 114 name, sha256 = zfs_find('test-command-volume-remove', focker_type='volume') 115 mountpoint = zfs_mountpoint(name) 116 args = lambda: 0 117 args.references = ['test-command-volume-remove'] 118 args.force = False 119 command_volume_remove(args) 120 with pytest.raises(ValueError): 121 zfs_find('test-command-volume-remove', focker_type='volume') 122 with pytest.raises(ValueError): 123 zfs_find(sha256, focker_type='volume') 124 assert not os.path.exists(mountpoint) 125 assert not zfs_exists(name) 126 127 128def test_command_volume_set(): 129 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-set']) 130 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-set']) 131 name, sha256 = zfs_find('test-command-volume-set', focker_type='volume') 132 args = lambda: 0 133 args.reference = 'test-command-volume-set' 134 args.properties = ['rdonly=on', 'quota=1G'] 135 command_volume_set(args) 136 props = zfs_parse_output(['zfs', 'get', '-H', 'rdonly,quota', name]) 137 assert len(props) == 2 138 for i in range(2): 139 assert props[i][0] == name 140 assert props[0][1] == 'readonly' 141 assert props[1][1] == 'quota' 142 assert props[0][2] == 'on' 143 assert props[1][2] == '1G' 144 subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-set']) 145 146 147 148def test_command_volume_get(monkeypatch): 149 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-get']) 150 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-get']) 151 subprocess.check_output(['focker', 'volume', 'set', 'test-command-volume-get', 'rdonly=on', 'quota=1G']) 152 name, sha256 = zfs_find('test-command-volume-get', focker_type='volume') 153 args = lambda: 0 154 args.reference = 'test-command-volume-get' 155 args.properties = ['rdonly', 'quota'] 156 lst = None 157 headers = None 158 def fake_tabulate(*args, **kwargs): 159 nonlocal lst 160 nonlocal headers 161 lst = args[0] 162 headers = kwargs['headers'] 163 monkeypatch.setattr(focker.volume, 'tabulate', fake_tabulate) 164 command_volume_get(args) 165 assert lst is not None 166 assert headers is not None 167 assert lst == [ ['rdonly', 'on'], ['quota', '1G'] ] 168 assert headers == [ 'Property', 'Value' ] 169 # assert lst == ['on', '1G'] 170 subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-get']) 171 172 173def test_command_volume_protect(monkeypatch): 174 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-protect']) 175 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-protect']) 176 args = lambda: 0 177 args.references = ['test-command-volume-protect'] 178 command_volume_protect(args) 179 name, sha256 = zfs_find('test-command-volume-protect', focker_type='volume') 180 mountpoint = zfs_mountpoint(name) 181 lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name]) 182 assert len(lst) == 1 183 assert lst[0][2] == 'on' 184 with pytest.raises(RuntimeError): 185 zfs_destroy(name) 186 subprocess.check_output(['focker', 'volume', 'untag', 'test-command-volume-protect']) 187 lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:tags', name]) 188 assert len(lst) == 1 189 assert lst[0][2] == '-' 190 n_called = 0 191 def fake_run(*args, **kwargs): 192 nonlocal n_called 193 n_called += 1 194 return zfs_run(*args, **kwargs) 195 monkeypatch.setattr(focker.zfs, 'zfs_run', fake_run) 196 zfs_prune(focker_type='volume') 197 assert not n_called == 1 198 with pytest.raises(subprocess.CalledProcessError): 199 subprocess.check_output(['focker', 'volume', 'remove', sha256]) 200 subprocess.check_output(['zfs', 'destroy', '-r', '-f', name]) 201 assert not zfs_exists(name) 202 assert not os.path.exists(mountpoint) 203 204 205def test_command_volume_unprotect(): 206 subprocess.check_output(['focker', 'volume', 'remove', '--force', 'test-command-volume-unprotect']) 207 subprocess.check_output(['focker', 'volume', 'create', '-t', 'test-command-volume-unprotect']) 208 subprocess.check_output(['focker', 'volume', 'protect', 'test-command-volume-unprotect']) 209 name, _ = zfs_find('test-command-volume-unprotect', focker_type='volume') 210 lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name]) 211 assert len(lst) == 1 212 assert lst[0][2] == 'on' 213 args = lambda: 0 214 args.references = ['test-command-volume-unprotect'] 215 command_volume_unprotect(args) 216 lst = zfs_parse_output(['zfs', 'get', '-H', 'focker:protect', name]) 217 assert len(lst) == 1 218 assert lst[0][2] == '-' 219 subprocess.check_output(['focker', 'volume', 'remove', 'test-command-volume-unprotect']) 220