1#    Licensed under the Apache License, Version 2.0 (the "License"); you may
2#    not use this file except in compliance with the License. You may obtain
3#    a copy of the License at
4#
5#         http://www.apache.org/licenses/LICENSE-2.0
6#
7#    Unless required by applicable law or agreed to in writing, software
8#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10#    License for the specific language governing permissions and limitations
11#    under the License.
12""" Tests for manage_existing TaskFlow """
13
14import inspect
15import mock
16import taskflow.engines
17
18from cinder import context
19from cinder import test
20from cinder.tests.unit import fake_constants as fakes
21from cinder.tests.unit import fake_volume
22from cinder.tests.unit.volume.flows import fake_volume_api
23from cinder.volume.flows.api import manage_existing
24from cinder.volume.flows import common as flow_common
25from cinder.volume.flows.manager import manage_existing as manager
26
27if hasattr(inspect, 'getfullargspec'):
28    getargspec = inspect.getfullargspec
29else:
30    getargspec = inspect.getargspec
31
32
33class ManageVolumeFlowTestCase(test.TestCase):
34
35    def setUp(self):
36        super(ManageVolumeFlowTestCase, self).setUp()
37        self.ctxt = context.get_admin_context()
38        self.counter = float(0)
39
40    def test_cast_manage_existing(self):
41        volume = fake_volume.fake_volume_obj(self.ctxt)
42
43        spec = {
44            'name': 'name',
45            'description': 'description',
46            'host': 'host',
47            'ref': 'ref',
48            'volume_type': 'volume_type',
49            'metadata': 'metadata',
50            'availability_zone': 'availability_zone',
51            'bootable': 'bootable',
52            'volume_id': volume.id,
53        }
54
55        # Fake objects assert specs
56        task = manage_existing.ManageCastTask(
57            fake_volume_api.FakeSchedulerRpcAPI(spec, self),
58            fake_volume_api.FakeDb())
59
60        create_what = spec.copy()
61        create_what.update({'volume': volume})
62        create_what.pop('volume_id')
63        task.execute(self.ctxt, **create_what)
64
65    def test_create_db_entry_task_with_multiattach(self):
66
67        fake_volume_type = fake_volume.fake_volume_type_obj(
68            self.ctxt, extra_specs={'multiattach': '<is> True'})
69
70        spec = {
71            'name': 'name',
72            'description': 'description',
73            'host': 'host',
74            'ref': 'ref',
75            'volume_type': fake_volume_type,
76            'metadata': {},
77            'availability_zone': 'availability_zone',
78            'bootable': 'bootable',
79            'volume_type_id': fake_volume_type.id,
80            'cluster_name': 'fake_cluster'
81        }
82        task = manage_existing.EntryCreateTask(fake_volume_api.FakeDb())
83
84        result = task.execute(self.ctxt, **spec)
85        self.assertTrue(result['volume_properties']['multiattach'])
86
87    @staticmethod
88    def _stub_volume_object_get(self):
89        volume = {
90            'id': fakes.VOLUME_ID,
91            'volume_type_id': fakes.VOLUME_TYPE_ID,
92            'status': 'creating',
93            'name': fakes.VOLUME_NAME,
94        }
95        return fake_volume.fake_volume_obj(self.ctxt, **volume)
96
97    def test_prepare_for_quota_reserveration_task_execute(self):
98        mock_db = mock.MagicMock()
99        mock_driver = mock.MagicMock()
100        mock_manage_existing_ref = mock.MagicMock()
101        mock_get_size = self.mock_object(
102            mock_driver, 'manage_existing_get_size')
103        mock_get_size.return_value = '5'
104
105        volume_ref = self._stub_volume_object_get(self)
106        task = manager.PrepareForQuotaReservationTask(mock_db, mock_driver)
107
108        result = task.execute(self.ctxt, volume_ref, mock_manage_existing_ref)
109
110        self.assertEqual(volume_ref, result['volume_properties'])
111        self.assertEqual('5', result['size'])
112        self.assertEqual(volume_ref.id, result['volume_spec']['volume_id'])
113        mock_get_size.assert_called_once_with(
114            volume_ref, mock_manage_existing_ref)
115
116    def test_prepare_for_quota_reservation_task_revert(self):
117        mock_db = mock.MagicMock()
118        mock_driver = mock.MagicMock()
119        mock_result = mock.MagicMock()
120        mock_flow_failures = mock.MagicMock()
121        mock_error_out = self.mock_object(flow_common, 'error_out')
122        volume_ref = self._stub_volume_object_get(self)
123        task = manager.PrepareForQuotaReservationTask(mock_db, mock_driver)
124
125        task.revert(self.ctxt, mock_result, mock_flow_failures, volume_ref)
126        mock_error_out.assert_called_once_with(volume_ref,
127                                               reason='Volume manage failed.',
128                                               status='error_managing')
129
130    def test_get_flow(self):
131        mock_volume_flow = mock.Mock()
132        mock_linear_flow = self.mock_object(manager.linear_flow, 'Flow')
133        mock_linear_flow.return_value = mock_volume_flow
134        mock_taskflow_engine = self.mock_object(taskflow.engines, 'load')
135        expected_store = {
136            'context': mock.sentinel.context,
137            'volume': mock.sentinel.volume,
138            'manage_existing_ref': mock.sentinel.ref,
139            'group_snapshot': None,
140            'optional_args': {'is_quota_committed': False,
141                              'update_size': True}
142        }
143
144        manager.get_flow(
145            mock.sentinel.context, mock.sentinel.db, mock.sentinel.driver,
146            mock.sentinel.host, mock.sentinel.volume, mock.sentinel.ref)
147
148        mock_linear_flow.assert_called_once_with(
149            'volume_manage_existing_manager')
150        mock_taskflow_engine.assert_called_once_with(
151            mock_volume_flow, store=expected_store)
152
153    def test_get_flow_volume_flow_tasks(self):
154        """Test that all expected parameter names exist for added tasks."""
155        mock_taskflow_engine = self.mock_object(taskflow.engines, 'load')
156        mock_taskflow_engine.side_effect = self._verify_volume_flow_tasks
157
158        manager.get_flow(
159            mock.sentinel.context, mock.sentinel.db, mock.sentinel.driver,
160            mock.sentinel.host, mock.sentinel.volume, mock.sentinel.ref)
161
162    def _verify_volume_flow_tasks(self, volume_flow, store=None):
163        param_names = [
164            'context',
165            'volume',
166            'manage_existing_ref',
167            'group_snapshot',
168            'optional_args',
169        ]
170
171        provides = {'self'}
172        revert_provides = ['self', 'result', 'flow_failures']
173        for node in volume_flow.iter_nodes():
174            task = node[0]
175            # Subsequent tasks may use parameters defined in a previous task's
176            # default_provides list. Add these names to the provides set.
177            if task.default_provides:
178                for p in task.default_provides:
179                    provides.add(p)
180
181            execute_args = getargspec(task.execute)[0]
182            execute_args = [x for x in execute_args if x not in provides]
183            [self.assertIn(arg, param_names) for arg in execute_args]
184
185            revert_args = getargspec(task.revert)[0]
186            revert_args = [x for x in revert_args if x not in revert_provides]
187            [self.assertIn(arg, param_names) for arg in revert_args]
188