1# 2# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# not use this file except in compliance with the License. You may obtain 4# a copy of the License at 5# 6# http://www.apache.org/licenses/LICENSE-2.0 7# 8# Unless required by applicable law or agreed to in writing, software 9# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 10# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 11# License for the specific language governing permissions and limitations 12# under the License. 13# 14 15from unittest import mock 16from unittest.mock import call 17 18from osc_lib.cli import format_columns 19from osc_lib import exceptions 20from osc_lib import utils 21 22from openstackclient.tests.unit import utils as tests_utils 23from openstackclient.tests.unit.volume.v1 import fakes as volume_fakes 24from openstackclient.volume.v1 import volume_type 25 26 27class TestType(volume_fakes.TestVolumev1): 28 29 def setUp(self): 30 super(TestType, self).setUp() 31 32 self.types_mock = self.app.client_manager.volume.volume_types 33 self.types_mock.reset_mock() 34 35 self.encryption_types_mock = ( 36 self.app.client_manager.volume.volume_encryption_types) 37 self.encryption_types_mock.reset_mock() 38 39 40class TestTypeCreate(TestType): 41 42 columns = ( 43 'description', 44 'id', 45 'is_public', 46 'name', 47 ) 48 49 def setUp(self): 50 super(TestTypeCreate, self).setUp() 51 52 self.new_volume_type = volume_fakes.FakeType.create_one_type( 53 methods={'set_keys': {'myprop': 'myvalue'}} 54 ) 55 self.data = ( 56 self.new_volume_type.description, 57 self.new_volume_type.id, 58 True, 59 self.new_volume_type.name, 60 ) 61 62 self.types_mock.create.return_value = self.new_volume_type 63 # Get the command object to test 64 self.cmd = volume_type.CreateVolumeType(self.app, None) 65 66 def test_type_create(self): 67 arglist = [ 68 self.new_volume_type.name, 69 ] 70 verifylist = [ 71 ("name", self.new_volume_type.name), 72 ] 73 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 74 75 columns, data = self.cmd.take_action(parsed_args) 76 self.types_mock.create.assert_called_with( 77 self.new_volume_type.name, 78 ) 79 80 self.assertEqual(self.columns, columns) 81 self.assertItemEqual(self.data, data) 82 83 def test_type_create_with_encryption(self): 84 encryption_info = { 85 'provider': 'LuksEncryptor', 86 'cipher': 'aes-xts-plain64', 87 'key_size': '128', 88 'control_location': 'front-end', 89 } 90 encryption_type = volume_fakes.FakeType.create_one_encryption_type( 91 attrs=encryption_info 92 ) 93 self.new_volume_type = volume_fakes.FakeType.create_one_type( 94 attrs={'encryption': encryption_info}) 95 self.types_mock.create.return_value = self.new_volume_type 96 self.encryption_types_mock.create.return_value = encryption_type 97 encryption_columns = ( 98 'description', 99 'encryption', 100 'id', 101 'is_public', 102 'name', 103 ) 104 encryption_data = ( 105 self.new_volume_type.description, 106 format_columns.DictColumn(encryption_info), 107 self.new_volume_type.id, 108 True, 109 self.new_volume_type.name, 110 ) 111 arglist = [ 112 '--encryption-provider', 'LuksEncryptor', 113 '--encryption-cipher', 'aes-xts-plain64', 114 '--encryption-key-size', '128', 115 '--encryption-control-location', 'front-end', 116 self.new_volume_type.name, 117 ] 118 verifylist = [ 119 ('encryption_provider', 'LuksEncryptor'), 120 ('encryption_cipher', 'aes-xts-plain64'), 121 ('encryption_key_size', 128), 122 ('encryption_control_location', 'front-end'), 123 ('name', self.new_volume_type.name), 124 ] 125 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 126 127 columns, data = self.cmd.take_action(parsed_args) 128 self.types_mock.create.assert_called_with( 129 self.new_volume_type.name, 130 ) 131 body = { 132 'provider': 'LuksEncryptor', 133 'cipher': 'aes-xts-plain64', 134 'key_size': 128, 135 'control_location': 'front-end', 136 } 137 self.encryption_types_mock.create.assert_called_with( 138 self.new_volume_type, 139 body, 140 ) 141 self.assertEqual(encryption_columns, columns) 142 self.assertItemEqual(encryption_data, data) 143 144 145class TestTypeDelete(TestType): 146 147 volume_types = volume_fakes.FakeType.create_types(count=2) 148 149 def setUp(self): 150 super(TestTypeDelete, self).setUp() 151 152 self.types_mock.get = volume_fakes.FakeType.get_types( 153 self.volume_types) 154 self.types_mock.delete.return_value = None 155 156 # Get the command object to mock 157 self.cmd = volume_type.DeleteVolumeType(self.app, None) 158 159 def test_type_delete(self): 160 arglist = [ 161 self.volume_types[0].id 162 ] 163 verifylist = [ 164 ("volume_types", [self.volume_types[0].id]) 165 ] 166 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 167 168 result = self.cmd.take_action(parsed_args) 169 170 self.types_mock.delete.assert_called_with(self.volume_types[0]) 171 self.assertIsNone(result) 172 173 def test_delete_multiple_types(self): 174 arglist = [] 175 for t in self.volume_types: 176 arglist.append(t.id) 177 verifylist = [ 178 ('volume_types', arglist), 179 ] 180 181 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 182 result = self.cmd.take_action(parsed_args) 183 184 calls = [] 185 for t in self.volume_types: 186 calls.append(call(t)) 187 self.types_mock.delete.assert_has_calls(calls) 188 self.assertIsNone(result) 189 190 def test_delete_multiple_types_with_exception(self): 191 arglist = [ 192 self.volume_types[0].id, 193 'unexist_type', 194 ] 195 verifylist = [ 196 ('volume_types', arglist), 197 ] 198 199 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 200 201 find_mock_result = [self.volume_types[0], exceptions.CommandError] 202 with mock.patch.object(utils, 'find_resource', 203 side_effect=find_mock_result) as find_mock: 204 try: 205 self.cmd.take_action(parsed_args) 206 self.fail('CommandError should be raised.') 207 except exceptions.CommandError as e: 208 self.assertEqual('1 of 2 volume types failed to delete.', 209 str(e)) 210 211 find_mock.assert_any_call( 212 self.types_mock, self.volume_types[0].id) 213 find_mock.assert_any_call(self.types_mock, 'unexist_type') 214 215 self.assertEqual(2, find_mock.call_count) 216 self.types_mock.delete.assert_called_once_with( 217 self.volume_types[0] 218 ) 219 220 221class TestTypeList(TestType): 222 223 volume_types = volume_fakes.FakeType.create_types() 224 225 columns = [ 226 "ID", 227 "Name", 228 "Is Public", 229 ] 230 columns_long = [ 231 "ID", 232 "Name", 233 "Is Public", 234 "Properties" 235 ] 236 237 data = [] 238 for t in volume_types: 239 data.append(( 240 t.id, 241 t.name, 242 t.is_public, 243 )) 244 data_long = [] 245 for t in volume_types: 246 data_long.append(( 247 t.id, 248 t.name, 249 t.is_public, 250 format_columns.DictColumn(t.extra_specs), 251 )) 252 253 def setUp(self): 254 super(TestTypeList, self).setUp() 255 256 self.types_mock.list.return_value = self.volume_types 257 self.encryption_types_mock.create.return_value = None 258 self.encryption_types_mock.update.return_value = None 259 # get the command to test 260 self.cmd = volume_type.ListVolumeType(self.app, None) 261 262 def test_type_list_without_options(self): 263 arglist = [] 264 verifylist = [ 265 ("long", False), 266 ("encryption_type", False), 267 ] 268 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 269 270 columns, data = self.cmd.take_action(parsed_args) 271 self.types_mock.list.assert_called_once_with() 272 self.assertEqual(self.columns, columns) 273 self.assertListItemEqual(self.data, list(data)) 274 275 def test_type_list_with_options(self): 276 arglist = [ 277 "--long", 278 ] 279 verifylist = [ 280 ("long", True), 281 ] 282 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 283 284 columns, data = self.cmd.take_action(parsed_args) 285 self.types_mock.list.assert_called_once_with() 286 self.assertEqual(self.columns_long, columns) 287 self.assertListItemEqual(self.data_long, list(data)) 288 289 def test_type_list_with_encryption(self): 290 encryption_type = volume_fakes.FakeType.create_one_encryption_type( 291 attrs={'volume_type_id': self.volume_types[0].id}) 292 encryption_info = { 293 'provider': 'LuksEncryptor', 294 'cipher': None, 295 'key_size': None, 296 'control_location': 'front-end', 297 } 298 encryption_columns = self.columns + [ 299 "Encryption", 300 ] 301 encryption_data = [] 302 encryption_data.append(( 303 self.volume_types[0].id, 304 self.volume_types[0].name, 305 self.volume_types[0].is_public, 306 volume_type.EncryptionInfoColumn( 307 self.volume_types[0].id, 308 {self.volume_types[0].id: encryption_info}), 309 )) 310 encryption_data.append(( 311 self.volume_types[1].id, 312 self.volume_types[1].name, 313 self.volume_types[1].is_public, 314 volume_type.EncryptionInfoColumn( 315 self.volume_types[1].id, {}), 316 )) 317 318 self.encryption_types_mock.list.return_value = [encryption_type] 319 arglist = [ 320 "--encryption-type", 321 ] 322 verifylist = [ 323 ("encryption_type", True), 324 ] 325 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 326 327 columns, data = self.cmd.take_action(parsed_args) 328 self.encryption_types_mock.list.assert_called_once_with() 329 self.types_mock.list.assert_called_once_with() 330 self.assertEqual(encryption_columns, columns) 331 self.assertListItemEqual(encryption_data, list(data)) 332 333 334class TestTypeSet(TestType): 335 336 volume_type = volume_fakes.FakeType.create_one_type( 337 methods={'set_keys': None}) 338 339 def setUp(self): 340 super(TestTypeSet, self).setUp() 341 342 self.types_mock.get.return_value = self.volume_type 343 344 # Get the command object to test 345 self.cmd = volume_type.SetVolumeType(self.app, None) 346 347 def test_type_set_nothing(self): 348 arglist = [ 349 self.volume_type.id, 350 ] 351 verifylist = [ 352 ('volume_type', self.volume_type.id), 353 ] 354 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 355 356 result = self.cmd.take_action(parsed_args) 357 358 self.assertIsNone(result) 359 360 def test_type_set_property(self): 361 arglist = [ 362 '--property', 'myprop=myvalue', 363 self.volume_type.id, 364 ] 365 verifylist = [ 366 ('property', {'myprop': 'myvalue'}), 367 ('volume_type', self.volume_type.id), 368 ] 369 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 370 371 result = self.cmd.take_action(parsed_args) 372 self.volume_type.set_keys.assert_called_once_with( 373 {'myprop': 'myvalue'}) 374 self.assertIsNone(result) 375 376 def test_type_set_new_encryption(self): 377 arglist = [ 378 '--encryption-provider', 'LuksEncryptor', 379 '--encryption-cipher', 'aes-xts-plain64', 380 '--encryption-key-size', '128', 381 '--encryption-control-location', 'front-end', 382 self.volume_type.id, 383 ] 384 verifylist = [ 385 ('encryption_provider', 'LuksEncryptor'), 386 ('encryption_cipher', 'aes-xts-plain64'), 387 ('encryption_key_size', 128), 388 ('encryption_control_location', 'front-end'), 389 ('volume_type', self.volume_type.id), 390 ] 391 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 392 393 result = self.cmd.take_action(parsed_args) 394 body = { 395 'provider': 'LuksEncryptor', 396 'cipher': 'aes-xts-plain64', 397 'key_size': 128, 398 'control_location': 'front-end', 399 } 400 self.encryption_types_mock.create.assert_called_with( 401 self.volume_type, 402 body, 403 ) 404 self.assertIsNone(result) 405 406 def test_type_set_new_encryption_without_provider(self): 407 arglist = [ 408 '--encryption-cipher', 'aes-xts-plain64', 409 '--encryption-key-size', '128', 410 '--encryption-control-location', 'front-end', 411 self.volume_type.id, 412 ] 413 verifylist = [ 414 ('encryption_cipher', 'aes-xts-plain64'), 415 ('encryption_key_size', 128), 416 ('encryption_control_location', 'front-end'), 417 ('volume_type', self.volume_type.id), 418 ] 419 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 420 try: 421 self.cmd.take_action(parsed_args) 422 self.fail('CommandError should be raised.') 423 except exceptions.CommandError as e: 424 self.assertEqual("Command Failed: One or more of" 425 " the operations failed", 426 str(e)) 427 self.encryption_types_mock.create.assert_not_called() 428 self.encryption_types_mock.update.assert_not_called() 429 430 431class TestTypeShow(TestType): 432 433 columns = ( 434 'description', 435 'id', 436 'is_public', 437 'name', 438 'properties', 439 ) 440 441 def setUp(self): 442 super(TestTypeShow, self).setUp() 443 444 self.volume_type = volume_fakes.FakeType.create_one_type() 445 self.data = ( 446 self.volume_type.description, 447 self.volume_type.id, 448 True, 449 self.volume_type.name, 450 format_columns.DictColumn(self.volume_type.extra_specs) 451 ) 452 453 self.types_mock.get.return_value = self.volume_type 454 455 # Get the command object to test 456 self.cmd = volume_type.ShowVolumeType(self.app, None) 457 458 def test_type_show(self): 459 arglist = [ 460 self.volume_type.id 461 ] 462 verifylist = [ 463 ("volume_type", self.volume_type.id), 464 ("encryption_type", False), 465 ] 466 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 467 468 columns, data = self.cmd.take_action(parsed_args) 469 self.types_mock.get.assert_called_with(self.volume_type.id) 470 471 self.assertEqual(self.columns, columns) 472 self.assertItemEqual(self.data, data) 473 474 def test_type_show_with_encryption(self): 475 encryption_type = volume_fakes.FakeType.create_one_encryption_type() 476 encryption_info = { 477 'provider': 'LuksEncryptor', 478 'cipher': None, 479 'key_size': None, 480 'control_location': 'front-end', 481 } 482 self.volume_type = volume_fakes.FakeType.create_one_type( 483 attrs={'encryption': encryption_info}) 484 self.types_mock.get.return_value = self.volume_type 485 self.encryption_types_mock.get.return_value = encryption_type 486 encryption_columns = ( 487 'description', 488 'encryption', 489 'id', 490 'is_public', 491 'name', 492 'properties', 493 ) 494 encryption_data = ( 495 self.volume_type.description, 496 format_columns.DictColumn(encryption_info), 497 self.volume_type.id, 498 True, 499 self.volume_type.name, 500 format_columns.DictColumn(self.volume_type.extra_specs) 501 ) 502 arglist = [ 503 '--encryption-type', 504 self.volume_type.id 505 ] 506 verifylist = [ 507 ('encryption_type', True), 508 ("volume_type", self.volume_type.id) 509 ] 510 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 511 512 columns, data = self.cmd.take_action(parsed_args) 513 self.types_mock.get.assert_called_with(self.volume_type.id) 514 self.encryption_types_mock.get.assert_called_with(self.volume_type.id) 515 self.assertEqual(encryption_columns, columns) 516 self.assertItemEqual(encryption_data, data) 517 518 519class TestTypeUnset(TestType): 520 521 volume_type = volume_fakes.FakeType.create_one_type( 522 methods={'unset_keys': None}) 523 524 def setUp(self): 525 super(TestTypeUnset, self).setUp() 526 527 self.types_mock.get.return_value = self.volume_type 528 529 # Get the command object to test 530 self.cmd = volume_type.UnsetVolumeType(self.app, None) 531 532 def test_type_unset_property(self): 533 arglist = [ 534 '--property', 'property', 535 '--property', 'multi_property', 536 self.volume_type.id, 537 ] 538 verifylist = [ 539 ('encryption_type', False), 540 ('property', ['property', 'multi_property']), 541 ('volume_type', self.volume_type.id), 542 ] 543 544 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 545 546 result = self.cmd.take_action(parsed_args) 547 self.volume_type.unset_keys.assert_called_once_with( 548 ['property', 'multi_property']) 549 self.encryption_types_mock.delete.assert_not_called() 550 self.assertIsNone(result) 551 552 def test_type_unset_failed_with_missing_volume_type_argument(self): 553 arglist = [ 554 '--property', 'property', 555 '--property', 'multi_property', 556 ] 557 verifylist = [ 558 ('property', ['property', 'multi_property']), 559 ] 560 561 self.assertRaises(tests_utils.ParserException, 562 self.check_parser, 563 self.cmd, 564 arglist, 565 verifylist) 566 567 def test_type_unset_nothing(self): 568 arglist = [ 569 self.volume_type.id, 570 ] 571 verifylist = [ 572 ('volume_type', self.volume_type.id), 573 ] 574 575 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 576 577 result = self.cmd.take_action(parsed_args) 578 self.assertIsNone(result) 579 580 def test_type_unset_encryption_type(self): 581 arglist = [ 582 '--encryption-type', 583 self.volume_type.id, 584 ] 585 verifylist = [ 586 ('encryption_type', True), 587 ('volume_type', self.volume_type.id), 588 ] 589 parsed_args = self.check_parser(self.cmd, arglist, verifylist) 590 591 result = self.cmd.take_action(parsed_args) 592 self.encryption_types_mock.delete.assert_called_with(self.volume_type) 593 self.assertIsNone(result) 594 595 596class TestColumns(TestType): 597 598 def test_encryption_info_column_with_info(self): 599 fake_volume_type = volume_fakes.FakeType.create_one_type() 600 type_id = fake_volume_type.id 601 602 encryption_info = { 603 'provider': 'LuksEncryptor', 604 'cipher': None, 605 'key_size': None, 606 'control_location': 'front-end', 607 } 608 col = volume_type.EncryptionInfoColumn(type_id, 609 {type_id: encryption_info}) 610 self.assertEqual(utils.format_dict(encryption_info), 611 col.human_readable()) 612 self.assertEqual(encryption_info, col.machine_readable()) 613 614 def test_encryption_info_column_without_info(self): 615 fake_volume_type = volume_fakes.FakeType.create_one_type() 616 type_id = fake_volume_type.id 617 618 col = volume_type.EncryptionInfoColumn(type_id, {}) 619 self.assertEqual('-', col.human_readable()) 620 self.assertIsNone(col.machine_readable()) 621