1# Licensed under the Apache License, Version 2.0 (the "License"); you may 2# not use this file except in compliance with the License. You may obtain 3# a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10# License for the specific language governing permissions and limitations 11# under the License. 12""" Tests for manage_existing TaskFlow """ 13 14import inspect 15import mock 16import taskflow.engines 17 18from cinder import context 19from cinder import test 20from cinder.tests.unit import fake_constants as fakes 21from cinder.tests.unit import fake_volume 22from cinder.tests.unit.volume.flows import fake_volume_api 23from cinder.volume.flows.api import manage_existing 24from cinder.volume.flows import common as flow_common 25from cinder.volume.flows.manager import manage_existing as manager 26 27if hasattr(inspect, 'getfullargspec'): 28 getargspec = inspect.getfullargspec 29else: 30 getargspec = inspect.getargspec 31 32 33class ManageVolumeFlowTestCase(test.TestCase): 34 35 def setUp(self): 36 super(ManageVolumeFlowTestCase, self).setUp() 37 self.ctxt = context.get_admin_context() 38 self.counter = float(0) 39 40 def test_cast_manage_existing(self): 41 volume = fake_volume.fake_volume_obj(self.ctxt) 42 43 spec = { 44 'name': 'name', 45 'description': 'description', 46 'host': 'host', 47 'ref': 'ref', 48 'volume_type': 'volume_type', 49 'metadata': 'metadata', 50 'availability_zone': 'availability_zone', 51 'bootable': 'bootable', 52 'volume_id': volume.id, 53 } 54 55 # Fake objects assert specs 56 task = manage_existing.ManageCastTask( 57 fake_volume_api.FakeSchedulerRpcAPI(spec, self), 58 fake_volume_api.FakeDb()) 59 60 create_what = spec.copy() 61 create_what.update({'volume': volume}) 62 create_what.pop('volume_id') 63 task.execute(self.ctxt, **create_what) 64 65 def test_create_db_entry_task_with_multiattach(self): 66 67 fake_volume_type = fake_volume.fake_volume_type_obj( 68 self.ctxt, extra_specs={'multiattach': '<is> True'}) 69 70 spec = { 71 'name': 'name', 72 'description': 'description', 73 'host': 'host', 74 'ref': 'ref', 75 'volume_type': fake_volume_type, 76 'metadata': {}, 77 'availability_zone': 'availability_zone', 78 'bootable': 'bootable', 79 'volume_type_id': fake_volume_type.id, 80 'cluster_name': 'fake_cluster' 81 } 82 task = manage_existing.EntryCreateTask(fake_volume_api.FakeDb()) 83 84 result = task.execute(self.ctxt, **spec) 85 self.assertTrue(result['volume_properties']['multiattach']) 86 87 @staticmethod 88 def _stub_volume_object_get(self): 89 volume = { 90 'id': fakes.VOLUME_ID, 91 'volume_type_id': fakes.VOLUME_TYPE_ID, 92 'status': 'creating', 93 'name': fakes.VOLUME_NAME, 94 } 95 return fake_volume.fake_volume_obj(self.ctxt, **volume) 96 97 def test_prepare_for_quota_reserveration_task_execute(self): 98 mock_db = mock.MagicMock() 99 mock_driver = mock.MagicMock() 100 mock_manage_existing_ref = mock.MagicMock() 101 mock_get_size = self.mock_object( 102 mock_driver, 'manage_existing_get_size') 103 mock_get_size.return_value = '5' 104 105 volume_ref = self._stub_volume_object_get(self) 106 task = manager.PrepareForQuotaReservationTask(mock_db, mock_driver) 107 108 result = task.execute(self.ctxt, volume_ref, mock_manage_existing_ref) 109 110 self.assertEqual(volume_ref, result['volume_properties']) 111 self.assertEqual('5', result['size']) 112 self.assertEqual(volume_ref.id, result['volume_spec']['volume_id']) 113 mock_get_size.assert_called_once_with( 114 volume_ref, mock_manage_existing_ref) 115 116 def test_prepare_for_quota_reservation_task_revert(self): 117 mock_db = mock.MagicMock() 118 mock_driver = mock.MagicMock() 119 mock_result = mock.MagicMock() 120 mock_flow_failures = mock.MagicMock() 121 mock_error_out = self.mock_object(flow_common, 'error_out') 122 volume_ref = self._stub_volume_object_get(self) 123 task = manager.PrepareForQuotaReservationTask(mock_db, mock_driver) 124 125 task.revert(self.ctxt, mock_result, mock_flow_failures, volume_ref) 126 mock_error_out.assert_called_once_with(volume_ref, 127 reason='Volume manage failed.', 128 status='error_managing') 129 130 def test_get_flow(self): 131 mock_volume_flow = mock.Mock() 132 mock_linear_flow = self.mock_object(manager.linear_flow, 'Flow') 133 mock_linear_flow.return_value = mock_volume_flow 134 mock_taskflow_engine = self.mock_object(taskflow.engines, 'load') 135 expected_store = { 136 'context': mock.sentinel.context, 137 'volume': mock.sentinel.volume, 138 'manage_existing_ref': mock.sentinel.ref, 139 'group_snapshot': None, 140 'optional_args': {'is_quota_committed': False, 141 'update_size': True} 142 } 143 144 manager.get_flow( 145 mock.sentinel.context, mock.sentinel.db, mock.sentinel.driver, 146 mock.sentinel.host, mock.sentinel.volume, mock.sentinel.ref) 147 148 mock_linear_flow.assert_called_once_with( 149 'volume_manage_existing_manager') 150 mock_taskflow_engine.assert_called_once_with( 151 mock_volume_flow, store=expected_store) 152 153 def test_get_flow_volume_flow_tasks(self): 154 """Test that all expected parameter names exist for added tasks.""" 155 mock_taskflow_engine = self.mock_object(taskflow.engines, 'load') 156 mock_taskflow_engine.side_effect = self._verify_volume_flow_tasks 157 158 manager.get_flow( 159 mock.sentinel.context, mock.sentinel.db, mock.sentinel.driver, 160 mock.sentinel.host, mock.sentinel.volume, mock.sentinel.ref) 161 162 def _verify_volume_flow_tasks(self, volume_flow, store=None): 163 param_names = [ 164 'context', 165 'volume', 166 'manage_existing_ref', 167 'group_snapshot', 168 'optional_args', 169 ] 170 171 provides = {'self'} 172 revert_provides = ['self', 'result', 'flow_failures'] 173 for node in volume_flow.iter_nodes(): 174 task = node[0] 175 # Subsequent tasks may use parameters defined in a previous task's 176 # default_provides list. Add these names to the provides set. 177 if task.default_provides: 178 for p in task.default_provides: 179 provides.add(p) 180 181 execute_args = getargspec(task.execute)[0] 182 execute_args = [x for x in execute_args if x not in provides] 183 [self.assertIn(arg, param_names) for arg in execute_args] 184 185 revert_args = getargspec(task.revert)[0] 186 revert_args = [x for x in revert_args if x not in revert_provides] 187 [self.assertIn(arg, param_names) for arg in revert_args] 188