Lines Matching refs:self

354     def __init__(self, volid, volsize=1):  argument
355 super(DummyVolume, self).__init__()
356 self.id = volid
357 self.size = volsize
358 self.status = None
359 self.migration_status = None
360 self.volume_id = None
361 self.volume_type_id = None
362 self.attach_status = None
363 self.provider_location = None
369 def setUp(self): argument
370 super(VolumeIDConvertTest, self).setUp()
371 self.mock_object(self, '_create_ismview_dir')
372 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
373 self.do_setup(None)
374 self.vol = DummyVolume(constants.VOLUME_ID)
378 def test_volumeid_should_change_62scale(self, volid, ldname): argument
379 self.vol.id = volid
380 actual = self._convert_id2name(self.vol)
381 self.assertEqual(ldname, actual,
387 def test_snap_volumeid_should_change_62scale_andpostfix(self, argument
390 self.vol.id = volid
391 actual = self._convert_id2snapname(self.vol)
392 self.assertEqual(ldname, actual,
398 def test_ddrsnap_volumeid_should_change_62scale_and_m(self, argument
401 self.vol.id = volid
402 actual = self._convert_id2migratename(self.vol)
403 self.assertEqual(ldname, actual,
410 def setUp(self): argument
411 super(NominatePoolLDTest, self).setUp()
412 self.mock_object(self, '_create_ismview_dir')
413 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
414 self.mock_object(self._cli, '_execute',
416 self.mock_object(self._cli, 'view_all', return_value=xml_out)
417 self.do_setup(None)
418 self.xml = xml_out
419 self._properties['cli_fip'] = '10.0.0.1'
420 self._properties['pool_pools'] = {0, 1}
421 self._properties['pool_backup_pools'] = {2, 3}
422 (self.pools,
423 self.lds,
424 self.ldsets,
425 self.used_ldns,
426 self.hostports,
427 self.max_ld_count) = self.configs(self.xml)
434 self.test_pools = []
437 self.test_pools = [pool_data]
438 self.vol = DummyVolume(constants.VOLUME_ID, 10)
440 def test_getxml(self): argument
441 self.assertIsNotNone(self.xml, "iSMview xml should not be None")
443 def test_selectldn_for_normalvolume(self): argument
444 ldn = self._select_ldnumber(self.used_ldns, self.max_ld_count)
445 self.assertEqual(2, ldn, "selected ldn should be XXX")
447 def test_selectpool_for_normalvolume(self): argument
448 pool = self._select_leastused_poolnumber(self.vol,
449 self.pools,
450 self.xml)
451 self.assertEqual(1, pool, "selected pool should be 1")
453 self.vol.size = 999999999999
454 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
456 pool = self._select_leastused_poolnumber(self.vol,
457 self.pools,
458 self.xml)
460 def test_return_poolnumber(self): argument
461 self.assertEqual(1, self._return_poolnumber(self.test_pools))
463 def test_selectpool_for_migratevolume(self): argument
464 self.vol.id = "46045673-41e7-44a7-9333-02f07feab04b"
466 dummyhost['capabilities'] = self._update_volume_status()
467 pool = self._select_migrate_poolnumber(self.vol,
468 self.pools,
469 self.xml,
471 self.assertEqual(1, pool, "selected pool should be 1")
472 self.vol.id = "1febb976-86d0-42ed-9bc0-4aa3e158f27d"
473 pool = self._select_migrate_poolnumber(self.vol,
474 self.pools,
475 self.xml,
477 self.assertEqual(-1, pool, "selected pool is the same pool(return -1)")
478 self.vol.size = 999999999999
479 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
481 pool = self._select_migrate_poolnumber(self.vol,
482 self.pools,
483 self.xml,
486 def test_selectpool_for_snapvolume(self): argument
487 savePool1 = self.pools[1]['free']
488 self.pools[1]['free'] = 0
489 pool = self._select_dsv_poolnumber(self.vol, self.pools)
490 self.assertEqual(2, pool, "selected pool should be 2")
492 self.pools[1]['free'] = savePool1
494 if len(self.pools[0]['ld_list']) is 1024:
495 savePool2 = self.pools[2]['free']
496 savePool3 = self.pools[3]['free']
497 self.pools[2]['free'] = 0
498 self.pools[3]['free'] = 0
499 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
501 pool = self._select_dsv_poolnumber(self.vol, self.pools)
502 self.pools[2]['free'] = savePool2
503 self.pools[3]['free'] = savePool3
505 self.vol.size = 999999999999
506 pool = self._select_dsv_poolnumber(self.vol, self.pools)
507 self.assertEqual(2, pool, "selected pool should be 2")
510 def test_selectpool_for_ddrvolume(self): argument
511 pool = self._select_ddr_poolnumber(self.vol,
512 self.pools,
513 self.xml,
515 self.assertEqual(2, pool, "selected pool should be 2")
518 savePool2 = self.pools[2]['free']
519 savePool3 = self.pools[3]['free']
520 self.pools[2]['free'] = 0
521 self.pools[3]['free'] = 0
522 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
524 pool = self._select_ddr_poolnumber(self.vol,
525 self.pools,
526 self.xml,
528 self.pools[2]['free'] = savePool2
529 self.pools[3]['free'] = savePool3
531 self.vol.size = 999999999999
532 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
534 pool = self._select_ddr_poolnumber(self.vol,
535 self.pools,
536 self.xml,
539 def test_selectpool_for_volddrvolume(self): argument
540 pool = self._select_volddr_poolnumber(self.vol,
541 self.pools,
542 self.xml,
544 self.assertEqual(1, pool, "selected pool should be 1")
547 savePool0 = self.pools[0]['free']
548 savePool1 = self.pools[1]['free']
549 self.pools[0]['free'] = 0
550 self.pools[1]['free'] = 0
551 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
553 pool = self._select_volddr_poolnumber(self.vol,
554 self.pools,
555 self.xml,
557 self.pools[0]['free'] = savePool0
558 self.pools[1]['free'] = savePool1
560 self.vol.size = 999999999999
561 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
563 pool = self._select_volddr_poolnumber(self.vol,
564 self.pools,
565 self.xml,
571 def setUp(self): argument
572 super(GetInformationTest, self).setUp()
573 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
574 self.do_setup(None)
576 def test_get_ldset(self): argument
577 self.xml = xml_out
578 (self.pools,
579 self.lds,
580 self.ldsets,
581 self.used_ldns,
582 self.hostports,
583 self.max_ld_count) = self.configs(self.xml)
584 self._properties['ldset_name'] = ''
585 ldset = self.get_ldset(self.ldsets)
586 self.assertIsNone(ldset)
587 self._properties['ldset_name'] = 'LX:OpenStack1'
588 ldset = self.get_ldset(self.ldsets)
589 self.assertEqual('LX:OpenStack1', ldset['ldsetname'])
590 self._properties['ldset_name'] = 'LX:OpenStackX'
591 with self.assertRaisesRegexp(exception.NotFound,
595 self.get_ldset(self.ldsets)
600 def setUp(self): argument
601 super(VolumeCreateTest, self).setUp()
602 self.mock_object(self, '_create_ismview_dir')
603 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
604 self.mock_object(self._cli, '_execute',
606 self.mock_object(self._cli, 'view_all', return_value=xml_out)
607 self.do_setup(None)
608 self.xml = xml_out
609 self.vol = DummyVolume("46045673-41e7-44a7-9333-02f07feab04b", 1)
611 def test_validate_migrate_volume(self): argument
612 self.vol.status = 'available'
613 self._validate_migrate_volume(self.vol, self.xml)
615 self.vol.status = 'creating'
616 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
620 self._validate_migrate_volume(self.vol, self.xml)
622 self.vol.id = "AAAAAAAA"
623 self.vol.status = 'available'
624 with self.assertRaisesRegexp(exception.NotFound,
627 self._validate_migrate_volume(self.vol, self.xml)
629 def test_extend_volume(self): argument
630 self.vol.status = 'available'
631 self.extend_volume(self.vol, 10)
633 self.vol.id = "00046058-d38e-7f60-67b7-59ed65e54225" # RV
634 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
637 self.extend_volume(self.vol, 10)
642 def setUp(self): argument
643 super(BindLDTest, self).setUp()
644 self.mock_object(self, '_create_ismview_dir')
645 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
646 self.mock_object(self._cli, '_execute',
648 self.mock_object(self._cli, 'view_all', return_value=xml_out)
649 self.do_setup(None)
650 self.mock_object(self, '_bind_ld', return_value=(0, 0, 0))
651 self.vol = DummyVolume(constants.VOLUME_ID, 1)
653 def test_bindld_CreateVolume(self): argument
654 self.vol.migration_status = "success"
655 self.create_volume(self.vol)
656 self._bind_ld.assert_called_once_with(
657 self.vol, self.vol.size, None,
658 self._convert_id2name,
659 self._select_leastused_poolnumber)
661 def test_bindld_CreateCloneVolume(self): argument
662 self.vol.migration_status = "success"
663 self.src = DummyVolume("46045673-41e7-44a7-9333-02f07feab04b", 1)
664 self.mock_object(self._cli, 'query_BV_SV_status',
666 self.mock_object(self._cli, 'query_MV_RV_name',
668 self.mock_object(self._cli, 'backup_restore')
669 self.create_cloned_volume(self.vol, self.src)
670 self._bind_ld.assert_called_once_with(
671 self.vol, self.vol.size, None,
672 self._convert_id2name,
673 self._select_leastused_poolnumber)
675 def test_bindld_CreateCloneWaitingInterval(self): argument
676 self.assertEqual(10, cli.get_sleep_time_for_clone(0))
677 self.assertEqual(12, cli.get_sleep_time_for_clone(2))
678 self.assertEqual(60, cli.get_sleep_time_for_clone(19))
680 def test_delete_volume(self): argument
681 self.vol.id = "46045673-41e7-44a7-9333-02f07feab04b"
682 detached = self._detach_from_all(self.vol)
683 self.assertTrue(detached)
684 self.vol.id = constants.VOLUME_ID
685 detached = self._detach_from_all(self.vol)
686 self.assertFalse(detached)
687 self.vol.id = constants.VOLUME2_ID
688 with mock.patch.object(self, '_detach_from_all') as detach_mock:
689 self.delete_volume(self.vol)
690 detach_mock.assert_called_once_with(self.vol)
695 def setUp(self): argument
696 super(BindLDTest_Snap, self).setUp()
697 self.mock_object(self, '_create_ismview_dir')
698 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
699 self.mock_object(self._cli, '_execute',
701 self.mock_object(self._cli, 'view_all', return_value=xml_out)
702 self.do_setup(None)
703 self.mock_object(self, '_bind_ld', return_value=(0, 0, 0))
704 self.mock_object(self, '_create_snapshot')
705 self.vol = DummyVolume(constants.VOLUME_ID)
706 self.snap = DummyVolume(constants.SNAPSHOT_ID)
708 def test_bindld_CreateSnapshot(self): argument
709 self.snap.volume_id = constants.VOLUME_ID
710 self.create_snapshot(self.snap)
711 self._create_snapshot.assert_called_once_with(
712 self.snap, self._properties['diskarray_name'])
714 def test_bindld_CreateFromSnapshot(self): argument
715 self.vol.migration_status = "success"
716 self.snap.id = "63410c76-2f12-4473-873d-74a63dfcd3e2"
717 self.snap.volume_id = "1febb976-86d0-42ed-9bc0-4aa3e158f27d"
718 self.mock_object(self._cli, 'query_BV_SV_status',
720 self.mock_object(self._cli, 'backup_restore')
721 self.create_volume_from_snapshot(self.vol, self.snap)
722 self._bind_ld.assert_called_once_with(
723 self.vol, 1, None,
724 self._convert_id2name,
725 self._select_volddr_poolnumber, 1)
730 def setUp(self): argument
731 super(ExportTest, self).setUp()
732 self.mock_object(self, '_create_ismview_dir')
733 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
734 self.mock_object(self._cli, '_execute',
736 self.mock_object(self._cli, 'view_all', return_value=xml_out)
737 self.do_setup(None)
738 self.vol = DummyVolume("46045673-41e7-44a7-9333-02f07feab04b", 10)
740 def test_iscsi_portal(self): argument
742 self.iscsi_do_export(None, self.vol, connector,
743 self._properties['diskarray_name'])
745 def test_fc_do_export(self): argument
747 self.fc_do_export(None, self.vol, connector)
749 def test_iscsi_initialize_connection(self): argument
751 self.vol.provider_location = loc
754 info = self._iscsi_initialize_connection(self.vol, connector)
755 self.assertEqual('iscsi', info['driver_volume_type'])
756 self.assertEqual('iqn.2010-10.org.openstack:volume-00000001',
758 self.assertEqual('127.0.0.1:3260', info['data']['target_portal'])
759 self.assertEqual(88, info['data']['target_lun'])
760 self.assertEqual('iqn.2010-10.org.openstack:volume-00000001',
762 self.assertEqual('127.0.0.1:3260', info['data']['target_portals'][0])
763 self.assertEqual(88, info['data']['target_luns'][0])
765 def test_iscsi_terminate_connection(self): argument
768 ret = self._iscsi_terminate_connection(self.vol, connector)
769 self.assertIsNone(ret)
771 def test_iscsi_terminate_connection_negative(self): argument
774 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
777 self.mock_object(self._cli, 'delldsetld',
779 self._iscsi_terminate_connection(self.vol, connector)
781 def test_fc_initialize_connection(self): argument
783 info = self._fc_initialize_connection(self.vol, connector)
784 self.assertEqual('fibre_channel', info['driver_volume_type'])
785 self.assertEqual('2100000991020012', info['data']['target_wwn'][0])
786 self.assertEqual('2200000991020012', info['data']['target_wwn'][1])
787 self.assertEqual('2900000991020012', info['data']['target_wwn'][2])
788 self.assertEqual('2A00000991020012', info['data']['target_wwn'][3])
789 self.assertEqual(
792 self.assertEqual(
795 self.assertEqual(
798 self.assertEqual(
801 self.assertEqual(
804 self.assertEqual(
807 self.assertEqual(
810 self.assertEqual(
813 with self.assertRaisesRegexp(exception.VolumeBackendAPIException,
816 self.mock_object(self._cli, 'delldsetld',
818 self._fc_terminate_connection(self.vol, connector)
820 def test_fc_terminate_connection(self): argument
822 info = self._fc_terminate_connection(self.vol, connector)
823 self.assertEqual('fibre_channel', info['driver_volume_type'])
824 self.assertEqual('2100000991020012', info['data']['target_wwn'][0])
825 self.assertEqual('2200000991020012', info['data']['target_wwn'][1])
826 self.assertEqual('2900000991020012', info['data']['target_wwn'][2])
827 self.assertEqual('2A00000991020012', info['data']['target_wwn'][3])
828 self.assertEqual(
831 self.assertEqual(
834 self.assertEqual(
837 self.assertEqual(
840 self.assertEqual(
843 self.assertEqual(
846 self.assertEqual(
849 self.assertEqual(
852 info = self._fc_terminate_connection(self.vol, None)
853 self.assertEqual('fibre_channel', info['driver_volume_type'])
854 self.assertEqual({}, info['data'])
856 def test_iscsi_portal_with_controller_node_name(self): argument
857 self.vol.status = 'downloading'
859 self._properties['ldset_controller_node_name'] = 'LX:OpenStack1'
860 self._properties['portal_number'] = 2
861 location = self.iscsi_do_export(None, self.vol, connector,
862 self._properties['diskarray_name'])
863 self.assertEqual('192.168.1.90:3260;192.168.1.91:3260;'
868 def test_fc_do_export_with_controller_node_name(self): argument
869 self.vol.status = 'downloading'
871 self._properties['ldset_controller_node_name'] = 'LX:OpenStack0'
872 location = self.fc_do_export(None, self.vol, connector)
873 self.assertIsNone(location)
879 def setUp(self): argument
880 super(DeleteDSVVolume_test, self).setUp()
881 self.mock_object(self, '_create_ismview_dir')
882 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
883 self.mock_object(self._cli, '_execute',
885 self.mock_object(self._cli, 'view_all', return_value=xml_out)
886 self.do_setup(None)
887 self.vol = DummyVolume(constants.SNAPSHOT_ID)
888 self.vol.volume_id = constants.VOLUME_ID
890 def test_delete_snapshot(self): argument
891 self.mock_object(self._cli, 'query_BV_SV_status',
893 ret = self.delete_snapshot(self.vol)
894 self.assertIsNone(ret)
900 def setUp(self): argument
901 super(NonDisruptiveBackup_test, self).setUp()
902 self.mock_object(self, '_create_ismview_dir')
903 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
904 self.mock_object(self._cli, '_execute',
906 self.mock_object(self._cli, 'view_all', return_value=xml_out)
907 self.do_setup(None)
908 self.vol = DummyVolume('46045673-41e7-44a7-9333-02f07feab04b')
909 self.xml = xml_out
910 (self.pools,
911 self.lds,
912 self.ldsets,
913 self.used_ldns,
914 self.hostports,
915 self.max_ld_count) = self.configs(self.xml)
917 def test_validate_ld_exist(self): argument
918 ldname = self._validate_ld_exist(
919 self.lds, self.vol.id, self._properties['ld_name_format'])
920 self.assertEqual('LX:287RbQoP7VdwR1WsPC2fZT', ldname)
921 self.vol.id = "00000000-0000-0000-0000-6b6d96553b4b"
922 with self.assertRaisesRegexp(exception.NotFound,
925 self._validate_ld_exist(
926 self.lds, self.vol.id, self._properties['ld_name_format'])
928 def test_validate_iscsildset_exist(self): argument
930 ldset = self._validate_iscsildset_exist(self.ldsets, connector)
931 self.assertEqual('LX:OpenStack0', ldset['ldsetname'])
942 self.mock_object(
943 self, 'configs',
945 ldset = self._validate_iscsildset_exist(self.ldsets, connector)
946 self.assertEqual('LX:redhatd1d8e8f23', ldset['ldsetname'])
947 self.assertEqual('iqn.1994-05.com.redhat:d1d8e8f232XX',
950 def test_validate_fcldset_exist(self): argument
952 ldset = self._validate_fcldset_exist(self.ldsets, connector)
953 self.assertEqual('LX:OpenStack1', ldset['ldsetname'])
962 self.mock_object(
963 self, 'configs',
965 ldset = self._validate_fcldset_exist(self.ldsets, connector)
966 self.assertEqual('LX:10000090FAA0786X', ldset['ldsetname'])
967 self.assertEqual('1000-0090-FAA0-786X', ldset['wwpn'][0])
968 self.assertEqual('1000-0090-FAA0-786Y', ldset['wwpn'][1])
970 def test_enumerate_iscsi_portals(self): argument
972 ldset = self._validate_iscsildset_exist(self.ldsets, connector)
973 self.assertEqual('LX:OpenStack0', ldset['ldsetname'])
974 self._properties['portal_number'] = 2
975 portal = self._enumerate_iscsi_portals(self.hostports, ldset)
976 self.assertEqual('192.168.1.90:3260', portal[0])
977 self.assertEqual('192.168.1.91:3260', portal[1])
978 self.assertEqual('192.168.2.92:3260', portal[2])
979 self.assertEqual('192.168.2.93:3260', portal[3])
981 def test_initialize_connection_snapshot(self): argument
984 self.vol.provider_location = loc
985 ret = self.iscsi_initialize_connection_snapshot(self.vol, connector)
986 self.assertIsNotNone(ret)
987 self.assertEqual('iscsi', ret['driver_volume_type'])
990 ret = self.fc_initialize_connection_snapshot(self.vol, connector)
991 self.assertIsNotNone(ret)
992 self.assertEqual('fibre_channel', ret['driver_volume_type'])
994 def test_terminate_connection_snapshot(self): argument
996 self.iscsi_terminate_connection_snapshot(self.vol, connector)
999 ret = self.fc_terminate_connection_snapshot(self.vol, connector)
1000 self.assertEqual('fibre_channel', ret['driver_volume_type'])
1002 def test_remove_export_snapshot(self): argument
1003 self.remove_export_snapshot(None, self.vol)
1005 def test_backup_use_temp_snapshot(self): argument
1006 ret = self.backup_use_temp_snapshot()
1007 self.assertTrue(ret)
1012 def setUp(self): argument
1013 super(VolumeStats_test, self).setUp()
1014 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
1015 self.do_setup(None)
1016 self._properties['cli_fip'] = '10.0.0.1'
1017 self._properties['pool_pools'] = {0, 1}
1018 self._properties['pool_backup_pools'] = {2, 3}
1020 def test_update_volume_status(self): argument
1021 self.mock_object(volume_common.MStorageVolumeCommon, 'parse_xml',
1023 stats = self._update_volume_status()
1024 self.assertEqual('dummy', stats.get('volume_backend_name'))
1025 self.assertEqual('NEC', stats.get('vendor_name'))
1026 self.assertEqual(self.VERSION, stats.get('driver_version'))
1027 self.assertEqual('10.0.0.1', stats.get('location_info').split(':')[0])
1028 self.assertEqual('0,1', stats.get('location_info').split(':')[1])
1033 def setUp(self): argument
1034 super(GetFreeLun_test, self).setUp()
1035 self.do_setup(None)
1037 def test_get_free_lun_iscsi_multi(self): argument
1041 target_lun = self._get_free_lun(ldset)
1042 self.assertIsNone(target_lun)
1044 def test_get_free_lun_iscsi_lun0(self): argument
1048 target_lun = self._get_free_lun(ldset)
1049 self.assertEqual(0, target_lun)
1051 def test_get_free_lun_iscsi_lun2(self): argument
1062 target_lun = self._get_free_lun(ldset)
1063 self.assertEqual(2, target_lun)
1065 def test_get_free_lun_fc_lun1(self): argument
1071 target_lun = self._get_free_lun(ldset)
1072 self.assertEqual(1, target_lun)
1077 def setUp(self): argument
1078 super(Migrate_test, self).setUp()
1079 self.mock_object(self, '_create_ismview_dir')
1080 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
1081 self.mock_object(self._cli, '_execute',
1083 self.mock_object(self._cli, 'view_all', return_value=xml_out)
1084 self.do_setup(None)
1085 self.newvol = DummyVolume(constants.VOLUME_ID)
1086 self.sourcevol = DummyVolume(constants.VOLUME2_ID)
1088 def test_update_migrate_volume(self): argument
1089 update_data = self.update_migrated_volume(None, self.sourcevol,
1090 self.newvol, 'available')
1091 self.assertIsNone(update_data['_name_id'])
1092 self.assertIsNone(update_data['provider_location'])
1097 def setUp(self): argument
1098 super(ManageUnmanage_test, self).setUp()
1099 self.mock_object(self, '_create_ismview_dir')
1100 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
1101 self.mock_object(self._cli, 'view_all', return_value=xml_out)
1102 self.do_setup(None)
1103 self._properties['pool_pools'] = {0}
1104 self._properties['pool_backup_pools'] = {1}
1105 self.newvol = DummyVolume(constants.VOLUME_ID)
1107 def test_is_manageable_volume(self): argument
1115 self.assertTrue(self._is_manageable_volume(ld_ok_iv))
1116 self.assertTrue(self._is_manageable_volume(ld_ok_bv))
1117 self.assertFalse(self._is_manageable_volume(ld_ng_pool))
1118 self.assertFalse(self._is_manageable_volume(ld_ng_rpl1))
1119 self.assertFalse(self._is_manageable_volume(ld_ng_rpl2))
1120 self.assertFalse(self._is_manageable_volume(ld_ng_rpl3))
1121 self.assertFalse(self._is_manageable_volume(ld_ng_purp))
1123 def test_get_manageable_volumes(self): argument
1125 volumes = self.get_manageable_volumes(current_volumes, None,
1127 self.assertEqual('LX:287RbQoP7VdwR1WsPC2fZT',
1130 volumes = self.get_manageable_volumes(current_volumes, None,
1132 self.assertEqual(' :2000000991020012000A',
1134 self.assertEqual(10, len(volumes))
1139 volumes = self.get_manageable_volumes(current_volumes, None,
1141 self.assertFalse(volumes[2]['safe_to_manage'])
1142 self.assertFalse(volumes[3]['safe_to_manage'])
1143 self.assertTrue(volumes[4]['safe_to_manage'])
1145 def test_manage_existing(self): argument
1146 self.mock_object(self._cli, 'changeldname')
1148 volumes = self.get_manageable_volumes(current_volumes, None,
1150 self.manage_existing(self.newvol, volumes[4]['reference'])
1151 self._cli.changeldname.assert_called_once_with(
1153 with self.assertRaisesRegex(exception.ManageExistingInvalidReference,
1155 self.manage_existing(self.newvol, volumes[3]['reference'])
1157 with self.assertRaisesRegex(exception.ManageExistingVolumeTypeMismatch,
1159 self.manage_existing(self.newvol, volume)
1161 def test_manage_existing_get_size(self): argument
1163 volumes = self.get_manageable_volumes(current_volumes, None,
1165 size_in_gb = self.manage_existing_get_size(self.newvol,
1167 self.assertEqual(10, size_in_gb)
1172 def setUp(self): argument
1173 super(ManageUnmanage_Snap_test, self).setUp()
1174 self.mock_object(self, '_create_ismview_dir')
1175 self._set_config(conf.Configuration(None), 'dummy', 'dummy')
1176 self.mock_object(self._cli, 'view_all', return_value=xml_out)
1177 self.do_setup(None)
1178 self._properties['pool_pools'] = {0}
1179 self._properties['pool_backup_pools'] = {1}
1180 self.newsnap = DummyVolume('46045673-41e7-44a7-9333-02f07feab04b')
1181 self.newsnap.volume_id = "1febb976-86d0-42ed-9bc0-4aa3e158f27d"
1183 def test_is_manageable_snapshot(self): argument
1191 self.assertTrue(self._is_manageable_snapshot(ld_ok_sv1))
1192 self.assertTrue(self._is_manageable_snapshot(ld_ok_sv2))
1193 self.assertFalse(self._is_manageable_snapshot(ld_ng_pool))
1194 self.assertFalse(self._is_manageable_snapshot(ld_ng_rpl1))
1195 self.assertFalse(self._is_manageable_snapshot(ld_ng_rpl2))
1196 self.assertFalse(self._is_manageable_snapshot(ld_ng_rpl3))
1197 self.assertFalse(self._is_manageable_snapshot(ld_ng_rpl4))
1199 def test_get_manageable_snapshots(self): argument
1200 self.mock_object(self._cli, 'get_bvname',
1203 volumes = self.get_manageable_snapshots(current_snapshots, None,
1205 self.assertEqual('LX:4T7JpyqI3UuPlKeT9D3VQF',
1208 def test_manage_existing_snapshot(self): argument
1209 self.mock_object(self._cli, 'changeldname')
1210 self.mock_object(self._cli, 'get_bvname',
1213 snaps = self.get_manageable_snapshots(current_snapshots, None,
1215 self.manage_existing_snapshot(self.newsnap, snaps[0]['reference'])
1216 self._cli.changeldname.assert_called_once_with(
1221 self.newsnap.volume_id = "AAAAAAAA"
1222 with self.assertRaisesRegex(exception.ManageExistingInvalidReference,
1224 self.manage_existing_snapshot(self.newsnap, snaps[0]['reference'])
1226 self._cli.get_bvname.return_value = "2000000991020012000C"
1227 self.newsnap.volume_id = "00046058-d38e-7f60-67b7-59ed6422520c"
1229 with self.assertRaisesRegex(exception.ManageExistingVolumeTypeMismatch,
1231 self.manage_existing_snapshot(self.newsnap, snap)
1233 def test_manage_existing_snapshot_get_size(self): argument
1234 self.mock_object(self._cli, 'get_bvname',
1237 snaps = self.get_manageable_snapshots(current_snapshots, None,
1239 size_in_gb = self.manage_existing_snapshot_get_size(
1240 self.newsnap,
1242 self.assertEqual(6, size_in_gb)