1# Copyright 2012, Intel, Inc.
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14
15
16from cinder.common import constants
17from cinder import objects
18from cinder import quota
19from cinder import rpc
20from cinder.volume import utils
21
22
23QUOTAS = quota.QUOTAS
24
25
26class VolumeAPI(rpc.RPCAPI):
27    """Client side of the volume rpc API.
28
29    API version history:
30
31    .. code-block:: none
32
33        1.0 - Initial version.
34        1.1 - Adds clone volume option to create_volume.
35        1.2 - Add publish_service_capabilities() method.
36        1.3 - Pass all image metadata (not just ID) in copy_volume_to_image.
37        1.4 - Add request_spec, filter_properties and
38              allow_reschedule arguments to create_volume().
39        1.5 - Add accept_transfer.
40        1.6 - Add extend_volume.
41        1.7 - Adds host_name parameter to attach_volume()
42              to allow attaching to host rather than instance.
43        1.8 - Add migrate_volume, rename_volume.
44        1.9 - Add new_user and new_project to accept_transfer.
45        1.10 - Add migrate_volume_completion, remove rename_volume.
46        1.11 - Adds mode parameter to attach_volume()
47               to support volume read-only attaching.
48        1.12 - Adds retype.
49        1.13 - Adds create_export.
50        1.14 - Adds reservation parameter to extend_volume().
51        1.15 - Adds manage_existing and unmanage_only flag to delete_volume.
52        1.16 - Removes create_export.
53        1.17 - Add replica option to create_volume, promote_replica and
54               sync_replica.
55        1.18 - Adds create_consistencygroup, delete_consistencygroup,
56               create_cgsnapshot, and delete_cgsnapshot. Also adds
57               the consistencygroup_id parameter in create_volume.
58        1.19 - Adds update_migrated_volume
59        1.20 - Adds support for sending objects over RPC in create_snapshot()
60               and delete_snapshot()
61        1.21 - Adds update_consistencygroup.
62        1.22 - Adds create_consistencygroup_from_src.
63        1.23 - Adds attachment_id to detach_volume.
64        1.24 - Removed duplicated parameters: snapshot_id, image_id,
65               source_volid, source_replicaid, consistencygroup_id and
66               cgsnapshot_id from create_volume. All off them are already
67               passed either in request_spec or available in the DB.
68        1.25 - Add source_cg to create_consistencygroup_from_src.
69        1.26 - Adds support for sending objects over RPC in
70               create_consistencygroup(), create_consistencygroup_from_src(),
71               update_consistencygroup() and delete_consistencygroup().
72        1.27 - Adds support for replication V2
73        1.28 - Adds manage_existing_snapshot
74        1.29 - Adds get_capabilities.
75        1.30 - Adds remove_export
76        1.31 - Updated: create_consistencygroup_from_src(), create_cgsnapshot()
77               and delete_cgsnapshot() to cast method only with necessary
78               args. Forwarding CGSnapshot object instead of CGSnapshot_id.
79        1.32 - Adds support for sending objects over RPC in create_volume().
80        1.33 - Adds support for sending objects over RPC in delete_volume().
81        1.34 - Adds support for sending objects over RPC in retype().
82        1.35 - Adds support for sending objects over RPC in extend_volume().
83        1.36 - Adds support for sending objects over RPC in migrate_volume(),
84               migrate_volume_completion(), and update_migrated_volume().
85        1.37 - Adds old_reservations parameter to retype to support quota
86               checks in the API.
87        1.38 - Scaling backup service, add get_backup_device() and
88               secure_file_operations_enabled()
89        1.39 - Update replication methods to reflect new backend rep strategy
90        1.40 - Add cascade option to delete_volume().
91
92        ... Mitaka supports messaging version 1.40. Any changes to existing
93        methods in 1.x after that point should be done so that they can handle
94        the version_cap being set to 1.40.
95
96        2.0  - Remove 1.x compatibility
97        2.1  - Add get_manageable_volumes() and get_manageable_snapshots().
98        2.2  - Adds support for sending objects over RPC in manage_existing().
99        2.3  - Adds support for sending objects over RPC in
100               initialize_connection().
101        2.4  - Sends request_spec as object in create_volume().
102        2.5  - Adds create_group, delete_group, and update_group
103        2.6  - Adds create_group_snapshot, delete_group_snapshot, and
104               create_group_from_src().
105
106        ... Newton supports messaging version 2.6. Any changes to existing
107        methods in 2.x after that point should be done so that they can handle
108        the version_cap being set to 2.6.
109
110        3.0  - Drop 2.x compatibility
111        3.1  - Remove promote_replica and reenable_replication. This is
112               non-backward compatible, but the user-facing API was removed
113               back in Mitaka when introducing cheesecake replication.
114        3.2  - Adds support for sending objects over RPC in
115               get_backup_device().
116        3.3  - Adds support for sending objects over RPC in attach_volume().
117        3.4  - Adds support for sending objects over RPC in detach_volume().
118        3.5  - Adds support for cluster in retype and migrate_volume
119        3.6  - Switch to use oslo.messaging topics to indicate backends instead
120               of @backend suffixes in server names.
121        3.7  - Adds do_cleanup method to do volume cleanups from other nodes
122               that we were doing in init_host.
123        3.8  - Make failover_host cluster aware and add failover_completed.
124        3.9  - Adds new attach/detach methods
125        3.10 - Returning objects instead of raw dictionaries in
126               get_manageable_volumes & get_manageable_snapshots
127        3.11 - Removes create_consistencygroup, delete_consistencygroup,
128               create_cgsnapshot, delete_cgsnapshot, update_consistencygroup,
129               and create_consistencygroup_from_src.
130        3.12 - Adds set_log_levels and get_log_levels
131        3.13 - Add initialize_connection_snapshot,
132               terminate_connection_snapshot, and remove_export_snapshot.
133        3.14 - Adds enable_replication, disable_replication,
134               failover_replication, and list_replication_targets.
135        3.15 - Add revert_to_snapshot method
136    """
137
138    RPC_API_VERSION = '3.15'
139    RPC_DEFAULT_VERSION = '3.0'
140    TOPIC = constants.VOLUME_TOPIC
141    BINARY = constants.VOLUME_BINARY
142
143    def _get_cctxt(self, host=None, version=None, **kwargs):
144        if host:
145            server = utils.extract_host(host)
146
147            # TODO(dulek): If we're pinned before 3.6, we should send stuff the
148            # old way - addressing server=host@backend, topic=cinder-volume.
149            # Otherwise we're addressing server=host,
150            # topic=cinder-volume.host@backend. This conditional can go away
151            # when we stop supporting 3.x.
152            if self.client.can_send_version('3.6'):
153                kwargs['topic'] = '%(topic)s.%(host)s' % {'topic': self.TOPIC,
154                                                          'host': server}
155                server = utils.extract_host(server, 'host')
156            kwargs['server'] = server
157
158        return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
159
160    def create_volume(self, ctxt, volume, request_spec, filter_properties,
161                      allow_reschedule=True):
162        cctxt = self._get_cctxt(volume.service_topic_queue)
163        cctxt.cast(ctxt, 'create_volume',
164                   request_spec=request_spec,
165                   filter_properties=filter_properties,
166                   allow_reschedule=allow_reschedule,
167                   volume=volume)
168
169    @rpc.assert_min_rpc_version('3.15')
170    def revert_to_snapshot(self, ctxt, volume, snapshot):
171        version = self._compat_ver('3.15')
172        cctxt = self._get_cctxt(volume.host, version)
173        cctxt.cast(ctxt, 'revert_to_snapshot', volume=volume,
174                   snapshot=snapshot)
175
176    def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False):
177        volume.create_worker()
178        cctxt = self._get_cctxt(volume.service_topic_queue)
179        msg_args = {
180            'volume': volume, 'unmanage_only': unmanage_only,
181            'cascade': cascade,
182        }
183
184        cctxt.cast(ctxt, 'delete_volume', **msg_args)
185
186    def create_snapshot(self, ctxt, volume, snapshot):
187        snapshot.create_worker()
188        cctxt = self._get_cctxt(volume.service_topic_queue)
189        cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)
190
191    def delete_snapshot(self, ctxt, snapshot, unmanage_only=False):
192        cctxt = self._get_cctxt(snapshot.service_topic_queue)
193        cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
194                   unmanage_only=unmanage_only)
195
196    def attach_volume(self, ctxt, volume, instance_uuid, host_name,
197                      mountpoint, mode):
198        msg_args = {'volume_id': volume.id,
199                    'instance_uuid': instance_uuid,
200                    'host_name': host_name,
201                    'mountpoint': mountpoint,
202                    'mode': mode,
203                    'volume': volume}
204        cctxt = self._get_cctxt(volume.service_topic_queue, ('3.3', '3.0'))
205        if not cctxt.can_send_version('3.3'):
206            msg_args.pop('volume')
207        return cctxt.call(ctxt, 'attach_volume', **msg_args)
208
209    def detach_volume(self, ctxt, volume, attachment_id):
210        msg_args = {'volume_id': volume.id,
211                    'attachment_id': attachment_id,
212                    'volume': volume}
213        cctxt = self._get_cctxt(volume.service_topic_queue, ('3.4', '3.0'))
214        if not self.client.can_send_version('3.4'):
215            msg_args.pop('volume')
216        return cctxt.call(ctxt, 'detach_volume', **msg_args)
217
218    def copy_volume_to_image(self, ctxt, volume, image_meta):
219        cctxt = self._get_cctxt(volume.service_topic_queue)
220        cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
221                   image_meta=image_meta)
222
223    def initialize_connection(self, ctxt, volume, connector):
224        cctxt = self._get_cctxt(volume.service_topic_queue)
225        return cctxt.call(ctxt, 'initialize_connection', connector=connector,
226                          volume=volume)
227
228    def terminate_connection(self, ctxt, volume, connector, force=False):
229        cctxt = self._get_cctxt(volume.service_topic_queue)
230        return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
231                          connector=connector, force=force)
232
233    def remove_export(self, ctxt, volume):
234        cctxt = self._get_cctxt(volume.service_topic_queue)
235        cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
236
237    def publish_service_capabilities(self, ctxt):
238        cctxt = self._get_cctxt(fanout=True)
239        cctxt.cast(ctxt, 'publish_service_capabilities')
240
241    def accept_transfer(self, ctxt, volume, new_user, new_project):
242        cctxt = self._get_cctxt(volume.service_topic_queue)
243        return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'],
244                          new_user=new_user, new_project=new_project)
245
246    def extend_volume(self, ctxt, volume, new_size, reservations):
247        cctxt = self._get_cctxt(volume.service_topic_queue)
248        cctxt.cast(ctxt, 'extend_volume', volume=volume, new_size=new_size,
249                   reservations=reservations)
250
251    def migrate_volume(self, ctxt, volume, dest_backend, force_host_copy):
252        backend_p = {'host': dest_backend.host,
253                     'cluster_name': dest_backend.cluster_name,
254                     'capabilities': dest_backend.capabilities}
255
256        version = '3.5'
257        if not self.client.can_send_version(version):
258            version = '3.0'
259            del backend_p['cluster_name']
260
261        cctxt = self._get_cctxt(volume.service_topic_queue, version)
262        cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=backend_p,
263                   force_host_copy=force_host_copy)
264
265    def migrate_volume_completion(self, ctxt, volume, new_volume, error):
266        cctxt = self._get_cctxt(volume.service_topic_queue)
267        return cctxt.call(ctxt, 'migrate_volume_completion', volume=volume,
268                          new_volume=new_volume, error=error,)
269
270    def retype(self, ctxt, volume, new_type_id, dest_backend,
271               migration_policy='never', reservations=None,
272               old_reservations=None):
273        backend_p = {'host': dest_backend.host,
274                     'cluster_name': dest_backend.cluster_name,
275                     'capabilities': dest_backend.capabilities}
276        version = '3.5'
277        if not self.client.can_send_version(version):
278            version = '3.0'
279            del backend_p['cluster_name']
280
281        cctxt = self._get_cctxt(volume.service_topic_queue, version)
282        cctxt.cast(ctxt, 'retype', volume=volume, new_type_id=new_type_id,
283                   host=backend_p, migration_policy=migration_policy,
284                   reservations=reservations,
285                   old_reservations=old_reservations)
286
287    def manage_existing(self, ctxt, volume, ref):
288        cctxt = self._get_cctxt(volume.service_topic_queue)
289        cctxt.cast(ctxt, 'manage_existing', ref=ref, volume=volume)
290
291    def update_migrated_volume(self, ctxt, volume, new_volume,
292                               original_volume_status):
293        cctxt = self._get_cctxt(new_volume['host'])
294        cctxt.call(ctxt, 'update_migrated_volume',
295                   volume=volume,
296                   new_volume=new_volume,
297                   volume_status=original_volume_status)
298
299    def freeze_host(self, ctxt, service):
300        """Set backend host to frozen."""
301        cctxt = self._get_cctxt(service.service_topic_queue)
302        return cctxt.call(ctxt, 'freeze_host')
303
304    def thaw_host(self, ctxt, service):
305        """Clear the frozen setting on a backend host."""
306        cctxt = self._get_cctxt(service.service_topic_queue)
307        return cctxt.call(ctxt, 'thaw_host')
308
309    def failover(self, ctxt, service, secondary_backend_id=None):
310        """Failover host to the specified backend_id (secondary). """
311        version = '3.8'
312        method = 'failover'
313        if not self.client.can_send_version(version):
314            version = '3.0'
315            method = 'failover_host'
316        cctxt = self._get_cctxt(service.service_topic_queue, version)
317        cctxt.cast(ctxt, method, secondary_backend_id=secondary_backend_id)
318
319    def failover_completed(self, ctxt, service, updates):
320        """Complete failover on all services of the cluster."""
321        cctxt = self._get_cctxt(service.service_topic_queue, '3.8',
322                                fanout=True)
323        cctxt.cast(ctxt, 'failover_completed', updates=updates)
324
325    def manage_existing_snapshot(self, ctxt, snapshot, ref, backend):
326        cctxt = self._get_cctxt(backend)
327        cctxt.cast(ctxt, 'manage_existing_snapshot',
328                   snapshot=snapshot,
329                   ref=ref)
330
331    def get_capabilities(self, ctxt, backend_id, discover):
332        cctxt = self._get_cctxt(backend_id)
333        return cctxt.call(ctxt, 'get_capabilities', discover=discover)
334
335    def get_backup_device(self, ctxt, backup, volume):
336        cctxt = self._get_cctxt(volume.service_topic_queue, ('3.2', '3.0'))
337        if cctxt.can_send_version('3.2'):
338            backup_obj = cctxt.call(ctxt, 'get_backup_device', backup=backup,
339                                    want_objects=True)
340        else:
341            backup_dict = cctxt.call(ctxt, 'get_backup_device', backup=backup)
342            backup_obj = objects.BackupDeviceInfo.from_primitive(backup_dict,
343                                                                 ctxt)
344        return backup_obj
345
346    def secure_file_operations_enabled(self, ctxt, volume):
347        cctxt = self._get_cctxt(volume.service_topic_queue)
348        return cctxt.call(ctxt, 'secure_file_operations_enabled',
349                          volume=volume)
350
351    def get_manageable_volumes(self, ctxt, service, marker, limit, offset,
352                               sort_keys, sort_dirs):
353        version = ('3.10', '3.0')
354        cctxt = self._get_cctxt(service.service_topic_queue, version=version)
355
356        msg_args = {'marker': marker,
357                    'limit': limit,
358                    'offset': offset,
359                    'sort_keys': sort_keys,
360                    'sort_dirs': sort_dirs,
361                    }
362
363        if cctxt.can_send_version('3.10'):
364            msg_args['want_objects'] = True
365
366        return cctxt.call(ctxt, 'get_manageable_volumes', **msg_args)
367
368    def get_manageable_snapshots(self, ctxt, service, marker, limit, offset,
369                                 sort_keys, sort_dirs):
370        version = ('3.10', '3.0')
371        cctxt = self._get_cctxt(service.service_topic_queue, version=version)
372
373        msg_args = {'marker': marker,
374                    'limit': limit,
375                    'offset': offset,
376                    'sort_keys': sort_keys,
377                    'sort_dirs': sort_dirs,
378                    }
379
380        if cctxt.can_send_version('3.10'):
381            msg_args['want_objects'] = True
382
383        return cctxt.call(ctxt, 'get_manageable_snapshots', **msg_args)
384
385    def create_group(self, ctxt, group):
386        cctxt = self._get_cctxt(group.service_topic_queue)
387        cctxt.cast(ctxt, 'create_group', group=group)
388
389    def delete_group(self, ctxt, group):
390        cctxt = self._get_cctxt(group.service_topic_queue)
391        cctxt.cast(ctxt, 'delete_group', group=group)
392
393    def update_group(self, ctxt, group, add_volumes=None, remove_volumes=None):
394        cctxt = self._get_cctxt(group.service_topic_queue)
395        cctxt.cast(ctxt, 'update_group', group=group, add_volumes=add_volumes,
396                   remove_volumes=remove_volumes)
397
398    def create_group_from_src(self, ctxt, group, group_snapshot=None,
399                              source_group=None):
400        cctxt = self._get_cctxt(group.service_topic_queue)
401        cctxt.cast(ctxt, 'create_group_from_src', group=group,
402                   group_snapshot=group_snapshot, source_group=source_group)
403
404    def create_group_snapshot(self, ctxt, group_snapshot):
405        cctxt = self._get_cctxt(group_snapshot.service_topic_queue)
406        cctxt.cast(ctxt, 'create_group_snapshot',
407                   group_snapshot=group_snapshot)
408
409    def delete_group_snapshot(self, ctxt, group_snapshot):
410        cctxt = self._get_cctxt(group_snapshot.service_topic_queue)
411        cctxt.cast(ctxt, 'delete_group_snapshot',
412                   group_snapshot=group_snapshot)
413
414    @rpc.assert_min_rpc_version('3.13')
415    def initialize_connection_snapshot(self, ctxt, snapshot, connector):
416        cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
417        return cctxt.call(ctxt, 'initialize_connection_snapshot',
418                          snapshot_id=snapshot.id,
419                          connector=connector)
420
421    @rpc.assert_min_rpc_version('3.13')
422    def terminate_connection_snapshot(self, ctxt, snapshot, connector,
423                                      force=False):
424        cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
425        return cctxt.call(ctxt, 'terminate_connection_snapshot',
426                          snapshot_id=snapshot.id,
427                          connector=connector, force=force)
428
429    @rpc.assert_min_rpc_version('3.13')
430    def remove_export_snapshot(self, ctxt, snapshot):
431        cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
432        cctxt.cast(ctxt, 'remove_export_snapshot', snapshot_id=snapshot.id)
433
434    @rpc.assert_min_rpc_version('3.9')
435    def attachment_update(self, ctxt, vref, connector, attachment_id):
436        version = self._compat_ver('3.9')
437        cctxt = self._get_cctxt(vref.host, version=version)
438        return cctxt.call(ctxt,
439                          'attachment_update',
440                          vref=vref,
441                          connector=connector,
442                          attachment_id=attachment_id)
443
444    @rpc.assert_min_rpc_version('3.9')
445    def attachment_delete(self, ctxt, attachment_id, vref):
446        version = self._compat_ver('3.9')
447        cctxt = self._get_cctxt(vref.host, version=version)
448        return cctxt.call(ctxt,
449                          'attachment_delete',
450                          attachment_id=attachment_id,
451                          vref=vref)
452
453    @rpc.assert_min_rpc_version('3.7')
454    def do_cleanup(self, ctxt, cleanup_request):
455        """Perform this service/cluster resource cleanup as requested."""
456        destination = cleanup_request.service_topic_queue
457        cctxt = self._get_cctxt(destination, '3.7')
458        # NOTE(geguileo): This call goes to do_cleanup code in
459        # cinder.manager.CleanableManager unless in the future we overwrite it
460        # in cinder.volume.manager
461        cctxt.cast(ctxt, 'do_cleanup', cleanup_request=cleanup_request)
462
463    @rpc.assert_min_rpc_version('3.12')
464    def set_log_levels(self, context, service, log_request):
465        cctxt = self._get_cctxt(host=service.host, version='3.12')
466        cctxt.cast(context, 'set_log_levels', log_request=log_request)
467
468    @rpc.assert_min_rpc_version('3.12')
469    def get_log_levels(self, context, service, log_request):
470        cctxt = self._get_cctxt(host=service.host, version='3.12')
471        return cctxt.call(context, 'get_log_levels', log_request=log_request)
472
473    @rpc.assert_min_rpc_version('3.14')
474    def enable_replication(self, ctxt, group):
475        cctxt = self._get_cctxt(group.host, version='3.14')
476        cctxt.cast(ctxt, 'enable_replication',
477                   group=group)
478
479    @rpc.assert_min_rpc_version('3.14')
480    def disable_replication(self, ctxt, group):
481        cctxt = self._get_cctxt(group.host, version='3.14')
482        cctxt.cast(ctxt, 'disable_replication',
483                   group=group)
484
485    @rpc.assert_min_rpc_version('3.14')
486    def failover_replication(self, ctxt, group, allow_attached_volume=False,
487                             secondary_backend_id=None):
488        cctxt = self._get_cctxt(group.host, version='3.14')
489        cctxt.cast(ctxt, 'failover_replication',
490                   group=group, allow_attached_volume=allow_attached_volume,
491                   secondary_backend_id=secondary_backend_id)
492
493    @rpc.assert_min_rpc_version('3.14')
494    def list_replication_targets(self, ctxt, group):
495        cctxt = self._get_cctxt(group.host, version='3.14')
496        return cctxt.call(ctxt, 'list_replication_targets',
497                          group=group)
498