1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17"""
18Volume manager manages creating, attaching, detaching, and persistent storage.
19
20Persistent storage volumes keep their state independent of instances.  You can
21attach to an instance, terminate the instance, spawn a new instance (even
22one from a different image) and re-attach the volume with the same data
23intact.
24
25**Related Flags**
26
27:volume_manager:  The module name of a class derived from
28                  :class:`manager.Manager` (default:
29                  :class:`cinder.volume.manager.Manager`).
30:volume_driver:  Used by :class:`Manager`.  Defaults to
31                 :class:`cinder.volume.drivers.lvm.LVMVolumeDriver`.
32:volume_group:  Name of the group that will contain exported volumes (default:
33                `cinder-volumes`)
34:num_shell_tries:  Number of times to attempt to run commands (default: 3)
35
36"""
37
38
39import requests
40import time
41
42from castellan import key_manager
43from oslo_config import cfg
44from oslo_log import log as logging
45import oslo_messaging as messaging
46from oslo_serialization import jsonutils
47from oslo_service import periodic_task
48from oslo_utils import excutils
49from oslo_utils import importutils
50from oslo_utils import timeutils
51from oslo_utils import units
52from oslo_utils import uuidutils
53profiler = importutils.try_import('osprofiler.profiler')
54import six
55from taskflow import exceptions as tfe
56
57from cinder.common import constants
58from cinder import compute
59from cinder import context
60from cinder import coordination
61from cinder import db
62from cinder import exception
63from cinder import flow_utils
64from cinder.i18n import _
65from cinder.image import cache as image_cache
66from cinder.image import glance
67from cinder.image import image_utils
68from cinder.keymgr import migration as key_migration
69from cinder import manager
70from cinder.message import api as message_api
71from cinder.message import message_field
72from cinder import objects
73from cinder.objects import cgsnapshot
74from cinder.objects import consistencygroup
75from cinder.objects import fields
76from cinder import quota
77from cinder import utils
78from cinder import volume as cinder_volume
79from cinder.volume import configuration as config
80from cinder.volume.flows.manager import create_volume
81from cinder.volume.flows.manager import manage_existing
82from cinder.volume.flows.manager import manage_existing_snapshot
83from cinder.volume import group_types
84from cinder.volume import rpcapi as volume_rpcapi
85from cinder.volume import utils as vol_utils
86from cinder.volume import volume_types
87
88LOG = logging.getLogger(__name__)
89
90QUOTAS = quota.QUOTAS
91CGQUOTAS = quota.CGQUOTAS
92GROUP_QUOTAS = quota.GROUP_QUOTAS
93VALID_REMOVE_VOL_FROM_CG_STATUS = (
94    'available',
95    'in-use',
96    'error',
97    'error_deleting')
98VALID_REMOVE_VOL_FROM_GROUP_STATUS = (
99    'available',
100    'in-use',
101    'error',
102    'error_deleting')
103VALID_ADD_VOL_TO_CG_STATUS = (
104    'available',
105    'in-use')
106VALID_ADD_VOL_TO_GROUP_STATUS = (
107    'available',
108    'in-use')
109VALID_CREATE_CG_SRC_SNAP_STATUS = (fields.SnapshotStatus.AVAILABLE,)
110VALID_CREATE_GROUP_SRC_SNAP_STATUS = (fields.SnapshotStatus.AVAILABLE,)
111VALID_CREATE_CG_SRC_CG_STATUS = ('available',)
112VALID_CREATE_GROUP_SRC_GROUP_STATUS = ('available',)
113VA_LIST = objects.VolumeAttachmentList
114
115volume_manager_opts = [
116    cfg.IntOpt('migration_create_volume_timeout_secs',
117               default=300,
118               help='Timeout for creating the volume to migrate to '
119                    'when performing volume migration (seconds)'),
120    cfg.BoolOpt('volume_service_inithost_offload',
121                default=False,
122                help='Offload pending volume delete during '
123                     'volume service startup'),
124    cfg.StrOpt('zoning_mode',
125               help="FC Zoning mode configured, only 'fabric' is "
126                    "supported now."),
127]
128
129volume_backend_opts = [
130    cfg.StrOpt('volume_driver',
131               default='cinder.volume.drivers.lvm.LVMVolumeDriver',
132               help='Driver to use for volume creation'),
133    cfg.StrOpt('extra_capabilities',
134               default='{}',
135               help='User defined capabilities, a JSON formatted string '
136                    'specifying key/value pairs. The key/value pairs can '
137                    'be used by the CapabilitiesFilter to select between '
138                    'backends when requests specify volume types. For '
139                    'example, specifying a service level or the geographical '
140                    'location of a backend, then creating a volume type to '
141                    'allow the user to select by these different '
142                    'properties.'),
143    cfg.BoolOpt('suppress_requests_ssl_warnings',
144                default=False,
145                help='Suppress requests library SSL certificate warnings.'),
146    cfg.IntOpt('backend_native_threads_pool_size',
147               default=20,
148               min=20,
149               help='Size of the native threads pool for the backend.  '
150                    'Increase for backends that heavily rely on this, like '
151                    'the RBD driver.'),
152]
153
154CONF = cfg.CONF
155CONF.register_opts(volume_manager_opts)
156CONF.register_opts(volume_backend_opts, group=config.SHARED_CONF_GROUP)
157
158MAPPING = {
159    'cinder.volume.drivers.emc.scaleio':
160    'cinder.volume.drivers.dell_emc.scaleio.driver',
161    'cinder.volume.drivers.emc.vnx.driver.EMCVNXDriver':
162    'cinder.volume.drivers.dell_emc.vnx.driver.VNXDriver',
163    'cinder.volume.drivers.emc.xtremio.XtremIOISCSIDriver':
164    'cinder.volume.drivers.dell_emc.xtremio.XtremIOISCSIDriver',
165    'cinder.volume.drivers.emc.xtremio.XtremIOFibreChannelDriver':
166    'cinder.volume.drivers.dell_emc.xtremio.XtremIOFCDriver',
167    'cinder.volume.drivers.datera.DateraDriver':
168    'cinder.volume.drivers.datera.datera_iscsi.DateraDriver',
169    'cinder.volume.drivers.emc.emc_vmax_iscsi.EMCVMAXISCSIDriver':
170    'cinder.volume.drivers.dell_emc.vmax.iscsi.VMAXISCSIDriver',
171    'cinder.volume.drivers.emc.emc_vmax_fc.EMCVMAXFCDriver':
172    'cinder.volume.drivers.dell_emc.vmax.fc.VMAXFCDriver',
173    'cinder.volume.drivers.eqlx.DellEQLSanISCSIDriver':
174    'cinder.volume.drivers.dell_emc.ps.PSSeriesISCSIDriver',
175    'cinder.volume.drivers.dell.dell_storagecenter_iscsi.'
176    'DellStorageCenterISCSIDriver':
177    'cinder.volume.drivers.dell_emc.sc.storagecenter_iscsi.'
178    'SCISCSIDriver',
179    'cinder.volume.drivers.dell.dell_storagecenter_fc.'
180    'DellStorageCenterFCDriver':
181    'cinder.volume.drivers.dell_emc.sc.storagecenter_fc.'
182    'SCFCDriver',
183    'cinder.volume.drivers.windows.windows.WindowsDriver':
184    'cinder.volume.drivers.windows.iscsi.WindowsISCSIDriver',
185}
186
187
188class VolumeManager(manager.CleanableManager,
189                    manager.SchedulerDependentManager):
190    """Manages attachable block storage devices."""
191
192    RPC_API_VERSION = volume_rpcapi.VolumeAPI.RPC_API_VERSION
193
194    FAILBACK_SENTINEL = 'default'
195
196    target = messaging.Target(version=RPC_API_VERSION)
197
198    # On cloning a volume, we shouldn't copy volume_type, consistencygroup
199    # and volume_attachment, because the db sets that according to [field]_id,
200    # which we do copy. We also skip some other values that are set during
201    # creation of Volume object.
202    _VOLUME_CLONE_SKIP_PROPERTIES = {
203        'id', '_name_id', 'name_id', 'name', 'status',
204        'attach_status', 'migration_status', 'volume_type',
205        'consistencygroup', 'volume_attachment', 'group'}
206
207    def _get_service(self, host=None, binary=constants.VOLUME_BINARY):
208        host = host or self.host
209        ctxt = context.get_admin_context()
210        svc_host = vol_utils.extract_host(host, 'backend')
211        return objects.Service.get_by_args(ctxt, svc_host, binary)
212
213    def __init__(self, volume_driver=None, service_name=None,
214                 *args, **kwargs):
215        """Load the driver from the one specified in args, or from flags."""
216        # update_service_capabilities needs service_name to be volume
217        super(VolumeManager, self).__init__(service_name='volume',
218                                            *args, **kwargs)
219        # NOTE(dulek): service_name=None means we're running in unit tests.
220        service_name = service_name or 'backend_defaults'
221        self.configuration = config.Configuration(volume_backend_opts,
222                                                  config_group=service_name)
223        self._set_tpool_size(
224            self.configuration.backend_native_threads_pool_size)
225        self.stats = {}
226        self.service_uuid = None
227
228        if not volume_driver:
229            # Get from configuration, which will get the default
230            # if its not using the multi backend
231            volume_driver = self.configuration.volume_driver
232        if volume_driver in MAPPING:
233            LOG.warning("Driver path %s is deprecated, update your "
234                        "configuration to the new path.", volume_driver)
235            volume_driver = MAPPING[volume_driver]
236
237        vol_db_empty = self._set_voldb_empty_at_startup_indicator(
238            context.get_admin_context())
239        LOG.debug("Cinder Volume DB check: vol_db_empty=%s", vol_db_empty)
240
241        # We pass the current setting for service.active_backend_id to
242        # the driver on init, in case there was a restart or something
243        curr_active_backend_id = None
244        try:
245            service = self._get_service()
246        except exception.ServiceNotFound:
247            # NOTE(jdg): This is to solve problems with unit tests
248            LOG.info("Service not found for updating "
249                     "active_backend_id, assuming default "
250                     "for driver init.")
251        else:
252            curr_active_backend_id = service.active_backend_id
253            self.service_uuid = service.uuid
254
255        if self.configuration.suppress_requests_ssl_warnings:
256            LOG.warning("Suppressing requests library SSL Warnings")
257            requests.packages.urllib3.disable_warnings(
258                requests.packages.urllib3.exceptions.InsecureRequestWarning)
259            requests.packages.urllib3.disable_warnings(
260                requests.packages.urllib3.exceptions.InsecurePlatformWarning)
261
262        self.key_manager = key_manager.API(CONF)
263        self.driver = importutils.import_object(
264            volume_driver,
265            configuration=self.configuration,
266            db=self.db,
267            host=self.host,
268            cluster_name=self.cluster,
269            is_vol_db_empty=vol_db_empty,
270            active_backend_id=curr_active_backend_id)
271
272        if self.cluster and not self.driver.SUPPORTS_ACTIVE_ACTIVE:
273            msg = _('Active-Active configuration is not currently supported '
274                    'by driver %s.') % volume_driver
275            LOG.error(msg)
276            raise exception.VolumeDriverException(message=msg)
277
278        self.message_api = message_api.API()
279
280        if CONF.profiler.enabled and profiler is not None:
281            self.driver = profiler.trace_cls("driver")(self.driver)
282        try:
283            self.extra_capabilities = jsonutils.loads(
284                self.driver.configuration.extra_capabilities)
285        except AttributeError:
286            self.extra_capabilities = {}
287        except Exception:
288            with excutils.save_and_reraise_exception():
289                LOG.error("Invalid JSON: %s",
290                          self.driver.configuration.extra_capabilities)
291
292        # Check if a per-backend AZ has been specified
293        backend_zone = self.driver.configuration.safe_get(
294            'backend_availability_zone')
295        if backend_zone:
296            self.availability_zone = backend_zone
297
298        if self.driver.configuration.safe_get(
299                'image_volume_cache_enabled'):
300
301            max_cache_size = self.driver.configuration.safe_get(
302                'image_volume_cache_max_size_gb')
303            max_cache_entries = self.driver.configuration.safe_get(
304                'image_volume_cache_max_count')
305
306            self.image_volume_cache = image_cache.ImageVolumeCache(
307                self.db,
308                cinder_volume.API(),
309                max_cache_size,
310                max_cache_entries
311            )
312            LOG.info('Image-volume cache enabled for host %(host)s.',
313                     {'host': self.host})
314        else:
315            LOG.info('Image-volume cache disabled for host %(host)s.',
316                     {'host': self.host})
317            self.image_volume_cache = None
318
319    def _count_allocated_capacity(self, ctxt, volume):
320        pool = vol_utils.extract_host(volume['host'], 'pool')
321        if pool is None:
322            # No pool name encoded in host, so this is a legacy
323            # volume created before pool is introduced, ask
324            # driver to provide pool info if it has such
325            # knowledge and update the DB.
326            try:
327                pool = self.driver.get_pool(volume)
328            except Exception:
329                LOG.exception('Fetch volume pool name failed.',
330                              resource=volume)
331                return
332
333            if pool:
334                new_host = vol_utils.append_host(volume['host'],
335                                                 pool)
336                self.db.volume_update(ctxt, volume['id'],
337                                      {'host': new_host})
338            else:
339                # Otherwise, put them into a special fixed pool with
340                # volume_backend_name being the pool name, if
341                # volume_backend_name is None, use default pool name.
342                # This is only for counting purpose, doesn't update DB.
343                pool = (self.driver.configuration.safe_get(
344                    'volume_backend_name') or vol_utils.extract_host(
345                    volume['host'], 'pool', True))
346        try:
347            pool_stat = self.stats['pools'][pool]
348        except KeyError:
349            # First volume in the pool
350            self.stats['pools'][pool] = dict(
351                allocated_capacity_gb=0)
352            pool_stat = self.stats['pools'][pool]
353        pool_sum = pool_stat['allocated_capacity_gb']
354        pool_sum += volume['size']
355
356        self.stats['pools'][pool]['allocated_capacity_gb'] = pool_sum
357        self.stats['allocated_capacity_gb'] += volume['size']
358
359    def _set_voldb_empty_at_startup_indicator(self, ctxt):
360        """Determine if the Cinder volume DB is empty.
361
362        A check of the volume DB is done to determine whether it is empty or
363        not at this point.
364
365        :param ctxt: our working context
366        """
367        vol_entries = self.db.volume_get_all(ctxt, None, 1, filters=None)
368
369        if len(vol_entries) == 0:
370            LOG.info("Determined volume DB was empty at startup.")
371            return True
372        else:
373            LOG.info("Determined volume DB was not empty at startup.")
374            return False
375
376    def _sync_provider_info(self, ctxt, volumes, snapshots):
377        # NOTE(jdg): For now this just updates provider_id, we can add more
378        # items to the update if they're relevant but we need to be safe in
379        # what we allow and add a list of allowed keys.  Things that make sense
380        # are provider_*, replication_status etc
381
382        updates, snapshot_updates = self.driver.update_provider_info(
383            volumes, snapshots)
384
385        if updates:
386            for volume in volumes:
387                # NOTE(JDG): Make sure returned item is in this hosts volumes
388                update = (
389                    [updt for updt in updates if updt['id'] ==
390                        volume['id']])
391                if update:
392                    update = update[0]
393                    self.db.volume_update(
394                        ctxt,
395                        update['id'],
396                        {'provider_id': update['provider_id']})
397
398        if snapshot_updates:
399            for snap in snapshots:
400                # NOTE(jdg): For now we only update those that have no entry
401                if not snap.get('provider_id', None):
402                    update = (
403                        [updt for updt in snapshot_updates if updt['id'] ==
404                            snap['id']][0])
405                    if update:
406                        self.db.snapshot_update(
407                            ctxt,
408                            update['id'],
409                            {'provider_id': update['provider_id']})
410
411    def _include_resources_in_cluster(self, ctxt):
412
413        LOG.info('Including all resources from host %(host)s in cluster '
414                 '%(cluster)s.',
415                 {'host': self.host, 'cluster': self.cluster})
416        num_vols = objects.VolumeList.include_in_cluster(
417            ctxt, self.cluster, host=self.host)
418        num_cgs = objects.ConsistencyGroupList.include_in_cluster(
419            ctxt, self.cluster, host=self.host)
420        num_gs = objects.GroupList.include_in_cluster(
421            ctxt, self.cluster, host=self.host)
422        num_cache = db.image_volume_cache_include_in_cluster(
423            ctxt, self.cluster, host=self.host)
424        LOG.info('%(num_vols)s volumes, %(num_cgs)s consistency groups, '
425                 '%(num_gs)s generic groups and %(num_cache)s image '
426                 'volume caches from host %(host)s have been included in '
427                 'cluster %(cluster)s.',
428                 {'num_vols': num_vols, 'num_cgs': num_cgs, 'num_gs': num_gs,
429                  'host': self.host, 'cluster': self.cluster,
430                  'num_cache': num_cache})
431
432    def init_host(self, added_to_cluster=None, **kwargs):
433        """Perform any required initialization."""
434        ctxt = context.get_admin_context()
435        if not self.driver.supported:
436            utils.log_unsupported_driver_warning(self.driver)
437
438            if not self.configuration.enable_unsupported_driver:
439                LOG.error("Unsupported drivers are disabled."
440                          " You can re-enable by adding "
441                          "enable_unsupported_driver=True to the "
442                          "driver section in cinder.conf",
443                          resource={'type': 'driver',
444                                    'id': self.__class__.__name__})
445                return
446
447        # If we have just added this host to a cluster we have to include all
448        # our resources in that cluster.
449        if added_to_cluster:
450            self._include_resources_in_cluster(ctxt)
451
452        LOG.info("Starting volume driver %(driver_name)s (%(version)s)",
453                 {'driver_name': self.driver.__class__.__name__,
454                  'version': self.driver.get_version()})
455        try:
456            self.driver.do_setup(ctxt)
457            self.driver.check_for_setup_error()
458        except Exception:
459            LOG.exception("Failed to initialize driver.",
460                          resource={'type': 'driver',
461                                    'id': self.__class__.__name__})
462            # we don't want to continue since we failed
463            # to initialize the driver correctly.
464            return
465
466        # Initialize backend capabilities list
467        self.driver.init_capabilities()
468
469        volumes = self._get_my_volumes(ctxt)
470        snapshots = self._get_my_snapshots(ctxt)
471        self._sync_provider_info(ctxt, volumes, snapshots)
472        # FIXME volume count for exporting is wrong
473
474        self.stats['pools'] = {}
475        self.stats.update({'allocated_capacity_gb': 0})
476
477        try:
478            for volume in volumes:
479                # available volume should also be counted into allocated
480                if volume['status'] in ['in-use', 'available']:
481                    # calculate allocated capacity for driver
482                    self._count_allocated_capacity(ctxt, volume)
483
484                    try:
485                        if volume['status'] in ['in-use']:
486                            self.driver.ensure_export(ctxt, volume)
487                    except Exception:
488                        LOG.exception("Failed to re-export volume, "
489                                      "setting to ERROR.",
490                                      resource=volume)
491                        volume.conditional_update({'status': 'error'},
492                                                  {'status': 'in-use'})
493            # All other cleanups are processed by parent class CleanableManager
494
495        except Exception:
496            LOG.exception("Error during re-export on driver init.",
497                          resource=volume)
498            return
499
500        self.driver.set_throttle()
501
502        # at this point the driver is considered initialized.
503        # NOTE(jdg): Careful though because that doesn't mean
504        # that an entry exists in the service table
505        self.driver.set_initialized()
506
507        # Keep the image tmp file clean when init host.
508        backend_name = vol_utils.extract_host(self.service_topic_queue)
509        image_utils.cleanup_temporary_file(backend_name)
510
511        # Migrate any ConfKeyManager keys based on fixed_key to the currently
512        # configured key manager.
513        self._add_to_threadpool(key_migration.migrate_fixed_key,
514                                volumes=volumes)
515
516        # collect and publish service capabilities
517        self.publish_service_capabilities(ctxt)
518        LOG.info("Driver initialization completed successfully.",
519                 resource={'type': 'driver',
520                           'id': self.driver.__class__.__name__})
521
522        # Make sure to call CleanableManager to do the cleanup
523        super(VolumeManager, self).init_host(added_to_cluster=added_to_cluster,
524                                             **kwargs)
525
526    def init_host_with_rpc(self):
527        LOG.info("Initializing RPC dependent components of volume "
528                 "driver %(driver_name)s (%(version)s)",
529                 {'driver_name': self.driver.__class__.__name__,
530                  'version': self.driver.get_version()})
531
532        try:
533            # Make sure the driver is initialized first
534            utils.log_unsupported_driver_warning(self.driver)
535            utils.require_driver_initialized(self.driver)
536        except exception.DriverNotInitialized:
537            LOG.error("Cannot complete RPC initialization because "
538                      "driver isn't initialized properly.",
539                      resource={'type': 'driver',
540                                'id': self.driver.__class__.__name__})
541            return
542
543        stats = self.driver.get_volume_stats(refresh=True)
544        try:
545            service = self._get_service()
546        except exception.ServiceNotFound:
547            with excutils.save_and_reraise_exception():
548                LOG.error("Service not found for updating replication_status.")
549
550        if service.replication_status != fields.ReplicationStatus.FAILED_OVER:
551            if stats and stats.get('replication_enabled', False):
552                replication_status = fields.ReplicationStatus.ENABLED
553            else:
554                replication_status = fields.ReplicationStatus.DISABLED
555
556            if replication_status != service.replication_status:
557                service.replication_status = replication_status
558                service.save()
559
560        # Update the cluster replication status if necessary
561        cluster = service.cluster
562        if (cluster and
563                cluster.replication_status != service.replication_status):
564            cluster.replication_status = service.replication_status
565            cluster.save()
566
567        LOG.info("Driver post RPC initialization completed successfully.",
568                 resource={'type': 'driver',
569                           'id': self.driver.__class__.__name__})
570
571    def _do_cleanup(self, ctxt, vo_resource):
572        if isinstance(vo_resource, objects.Volume):
573            if vo_resource.status == 'downloading':
574                self.driver.clear_download(ctxt, vo_resource)
575
576            elif vo_resource.status == 'uploading':
577                # Set volume status to available or in-use.
578                self.db.volume_update_status_based_on_attachment(
579                    ctxt, vo_resource.id)
580
581            elif vo_resource.status == 'deleting':
582                if CONF.volume_service_inithost_offload:
583                    # Offload all the pending volume delete operations to the
584                    # threadpool to prevent the main volume service thread
585                    # from being blocked.
586                    self._add_to_threadpool(self.delete_volume, ctxt,
587                                            vo_resource, cascade=True)
588                else:
589                    # By default, delete volumes sequentially
590                    self.delete_volume(ctxt, vo_resource, cascade=True)
591                # We signal that we take care of cleaning the worker ourselves
592                # (with set_workers decorator in delete_volume method) so
593                # do_cleanup method doesn't need to remove it.
594                return True
595
596        # For Volume creating and downloading and for Snapshot downloading
597        # statuses we have to set status to error
598        if vo_resource.status in ('creating', 'downloading'):
599            vo_resource.status = 'error'
600            vo_resource.save()
601
602    def is_working(self):
603        """Return if Manager is ready to accept requests.
604
605        This is to inform Service class that in case of volume driver
606        initialization failure the manager is actually down and not ready to
607        accept any requests.
608        """
609        return self.driver.initialized
610
611    def _set_resource_host(self, resource):
612        """Set the host field on the DB to our own when we are clustered."""
613        if (resource.is_clustered and
614                not vol_utils.hosts_are_equivalent(resource.host, self.host)):
615            pool = vol_utils.extract_host(resource.host, 'pool')
616            resource.host = vol_utils.append_host(self.host, pool)
617            resource.save()
618
619    @objects.Volume.set_workers
620    def create_volume(self, context, volume, request_spec=None,
621                      filter_properties=None, allow_reschedule=True):
622        """Creates the volume."""
623        # Log about unsupported drivers
624        utils.log_unsupported_driver_warning(self.driver)
625
626        # Make sure the host in the DB matches our own when clustered
627        self._set_resource_host(volume)
628
629        # Update our allocated capacity counter early to minimize race
630        # conditions with the scheduler.
631        self._update_allocated_capacity(volume)
632        # We lose the host value if we reschedule, so keep it here
633        original_host = volume.host
634
635        context_elevated = context.elevated()
636        if filter_properties is None:
637            filter_properties = {}
638
639        if request_spec is None:
640            request_spec = objects.RequestSpec()
641
642        try:
643            # NOTE(flaper87): Driver initialization is
644            # verified by the task itself.
645            flow_engine = create_volume.get_flow(
646                context_elevated,
647                self,
648                self.db,
649                self.driver,
650                self.scheduler_rpcapi,
651                self.host,
652                volume,
653                allow_reschedule,
654                context,
655                request_spec,
656                filter_properties,
657                image_volume_cache=self.image_volume_cache,
658            )
659        except Exception:
660            msg = _("Create manager volume flow failed.")
661            LOG.exception(msg, resource={'type': 'volume', 'id': volume.id})
662            raise exception.CinderException(msg)
663
664        snapshot_id = request_spec.get('snapshot_id')
665        source_volid = request_spec.get('source_volid')
666
667        if snapshot_id is not None:
668            # Make sure the snapshot is not deleted until we are done with it.
669            locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot')
670        elif source_volid is not None:
671            # Make sure the volume is not deleted until we are done with it.
672            locked_action = "%s-%s" % (source_volid, 'delete_volume')
673        else:
674            locked_action = None
675
676        def _run_flow():
677            # This code executes create volume flow. If something goes wrong,
678            # flow reverts all job that was done and reraises an exception.
679            # Otherwise, all data that was generated by flow becomes available
680            # in flow engine's storage.
681            with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
682                flow_engine.run()
683
684        # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
685        # decide if allocated_capacity should be incremented.
686        rescheduled = False
687
688        try:
689            if locked_action is None:
690                _run_flow()
691            else:
692                with coordination.COORDINATOR.get_lock(locked_action):
693                    _run_flow()
694        finally:
695            try:
696                flow_engine.storage.fetch('refreshed')
697            except tfe.NotFound:
698                # If there's no vol_ref, then flow is reverted. Lets check out
699                # if rescheduling occurred.
700                try:
701                    rescheduled = flow_engine.storage.get_revert_result(
702                        create_volume.OnFailureRescheduleTask.make_name(
703                            [create_volume.ACTION]))
704                except tfe.NotFound:
705                    pass
706
707            if rescheduled:
708                # NOTE(geguileo): Volume was rescheduled so we need to update
709                # volume stats because the volume wasn't created here.
710                # Volume.host is None now, so we pass the original host value.
711                self._update_allocated_capacity(volume, decrement=True,
712                                                host=original_host)
713
714        # Shared targets is only relevant for iSCSI connections.
715        # We default to True to be on the safe side.
716        volume.shared_targets = (
717            self.driver.capabilities.get('storage_protocol') == 'iSCSI' and
718            self.driver.capabilities.get('shared_targets', True))
719        # TODO(geguileo): service_uuid won't be enough on Active/Active
720        # deployments. There can be 2 services handling volumes from the same
721        # backend.
722        volume.service_uuid = self.service_uuid
723        volume.save()
724
725        LOG.info("Created volume successfully.", resource=volume)
726        return volume.id
727
728    def _check_is_our_resource(self, resource):
729        if resource.host:
730            res_backend = vol_utils.extract_host(resource.service_topic_queue)
731            backend = vol_utils.extract_host(self.service_topic_queue)
732            if res_backend != backend:
733                msg = (_('Invalid %(resource)s: %(resource)s %(id)s is not '
734                         'local to %(backend)s.') %
735                       {'resource': resource.obj_name, 'id': resource.id,
736                        'backend': backend})
737                raise exception.Invalid(msg)
738
739    @coordination.synchronized('{volume.id}-{f_name}')
740    @objects.Volume.set_workers
741    def delete_volume(self, context, volume, unmanage_only=False,
742                      cascade=False):
743        """Deletes and unexports volume.
744
745        1. Delete a volume(normal case)
746           Delete a volume and update quotas.
747
748        2. Delete a migration volume
749           If deleting the volume in a migration, we want to skip
750           quotas but we need database updates for the volume.
751
752        3. Delete a temp volume for backup
753           If deleting the temp volume for backup, we want to skip
754           quotas but we need database updates for the volume.
755      """
756
757        context = context.elevated()
758
759        try:
760            volume.refresh()
761        except exception.VolumeNotFound:
762            # NOTE(thingee): It could be possible for a volume to
763            # be deleted when resuming deletes from init_host().
764            LOG.debug("Attempted delete of non-existent volume: %s", volume.id)
765            return
766
767        if context.project_id != volume.project_id:
768            project_id = volume.project_id
769        else:
770            project_id = context.project_id
771
772        if volume['attach_status'] == fields.VolumeAttachStatus.ATTACHED:
773            # Volume is still attached, need to detach first
774            raise exception.VolumeAttached(volume_id=volume.id)
775        self._check_is_our_resource(volume)
776
777        if unmanage_only and volume.encryption_key_id is not None:
778            raise exception.Invalid(
779                reason=_("Unmanaging encrypted volumes is not "
780                         "supported."))
781
782        if unmanage_only and cascade:
783            # This could be done, but is ruled out for now just
784            # for simplicity.
785            raise exception.Invalid(
786                reason=_("Unmanage and cascade delete options "
787                         "are mutually exclusive."))
788
789        # To backup a snapshot or a 'in-use' volume, create a temp volume
790        # from the snapshot or in-use volume, and back it up.
791        # Get admin_metadata (needs admin context) to detect temporary volume.
792        is_temp_vol = False
793        with volume.obj_as_admin():
794            if volume.admin_metadata.get('temporary', 'False') == 'True':
795                is_temp_vol = True
796                LOG.info("Trying to delete temp volume: %s", volume.id)
797
798        # The status 'deleting' is not included, because it only applies to
799        # the source volume to be deleted after a migration. No quota
800        # needs to be handled for it.
801        is_migrating = volume.migration_status not in (None, 'error',
802                                                       'success')
803        is_migrating_dest = (is_migrating and
804                             volume.migration_status.startswith(
805                                 'target:'))
806        notification = "delete.start"
807        if unmanage_only:
808            notification = "unmanage.start"
809        if not is_temp_vol:
810            self._notify_about_volume_usage(context, volume, notification)
811        try:
812            # NOTE(flaper87): Verify the driver is enabled
813            # before going forward. The exception will be caught
814            # and the volume status updated.
815            utils.require_driver_initialized(self.driver)
816
817            self.driver.remove_export(context, volume)
818            if unmanage_only:
819                self.driver.unmanage(volume)
820            elif cascade:
821                LOG.debug('Performing cascade delete.')
822                snapshots = objects.SnapshotList.get_all_for_volume(context,
823                                                                    volume.id)
824                for s in snapshots:
825                    if s.status != fields.SnapshotStatus.DELETING:
826                        self._clear_db(context, is_migrating_dest, volume,
827                                       'error_deleting')
828
829                        msg = (_("Snapshot %(id)s was found in state "
830                                 "%(state)s rather than 'deleting' during "
831                                 "cascade delete.") % {'id': s.id,
832                                                       'state': s.status})
833                        raise exception.InvalidSnapshot(reason=msg)
834
835                    self.delete_snapshot(context, s)
836
837                LOG.debug('Snapshots deleted, issuing volume delete')
838                self.driver.delete_volume(volume)
839            else:
840                self.driver.delete_volume(volume)
841        except exception.VolumeIsBusy:
842            LOG.error("Unable to delete busy volume.",
843                      resource=volume)
844            # If this is a destination volume, we have to clear the database
845            # record to avoid user confusion.
846            self._clear_db(context, is_migrating_dest, volume,
847                           'available')
848            return
849        except Exception:
850            with excutils.save_and_reraise_exception():
851                # If this is a destination volume, we have to clear the
852                # database record to avoid user confusion.
853                new_status = 'error_deleting'
854                if unmanage_only is True:
855                    new_status = 'error_unmanaging'
856
857                self._clear_db(context, is_migrating_dest, volume,
858                               new_status)
859
860        # If deleting source/destination volume in a migration or a temp
861        # volume for backup, we should skip quotas.
862        skip_quota = is_migrating or is_temp_vol
863        if not skip_quota:
864            # Get reservations
865            try:
866                reservations = None
867                if volume.status != 'error_managing_deleting':
868                    reserve_opts = {'volumes': -1,
869                                    'gigabytes': -volume.size}
870                    QUOTAS.add_volume_type_opts(context,
871                                                reserve_opts,
872                                                volume.volume_type_id)
873                    reservations = QUOTAS.reserve(context,
874                                                  project_id=project_id,
875                                                  **reserve_opts)
876            except Exception:
877                LOG.exception("Failed to update usages deleting volume.",
878                              resource=volume)
879
880        # Delete glance metadata if it exists
881        self.db.volume_glance_metadata_delete_by_volume(context, volume.id)
882
883        volume.destroy()
884
885        # If deleting source/destination volume in a migration or a temp
886        # volume for backup, we should skip quotas.
887        if not skip_quota:
888            notification = "delete.end"
889            if unmanage_only:
890                notification = "unmanage.end"
891            self._notify_about_volume_usage(context, volume, notification)
892
893            # Commit the reservations
894            if reservations:
895                QUOTAS.commit(context, reservations, project_id=project_id)
896
897            self._update_allocated_capacity(volume, decrement=True)
898            self.publish_service_capabilities(context)
899
900        msg = "Deleted volume successfully."
901        if unmanage_only:
902            msg = "Unmanaged volume successfully."
903        LOG.info(msg, resource=volume)
904
905    def _clear_db(self, context, is_migrating_dest, volume_ref, status):
906        # This method is called when driver.unmanage() or
907        # driver.delete_volume() fails in delete_volume(), so it is already
908        # in the exception handling part.
909        if is_migrating_dest:
910            volume_ref.destroy()
911            LOG.error("Unable to delete the destination volume "
912                      "during volume migration, (NOTE: database "
913                      "record needs to be deleted).", resource=volume_ref)
914        else:
915            volume_ref.status = status
916            volume_ref.save()
917
918    def _revert_to_snapshot_generic(self, ctxt, volume, snapshot):
919        """Generic way to revert volume to a snapshot.
920
921        the framework will use the generic way to implement the revert
922        to snapshot feature:
923        1. create a temporary volume from snapshot
924        2. mount two volumes to host
925        3. copy data from temporary volume to original volume
926        4. detach and destroy temporary volume
927        """
928        temp_vol = None
929
930        try:
931            v_options = {'display_name': '[revert] temporary volume created '
932                                         'from snapshot %s' % snapshot.id}
933            ctxt = context.get_internal_tenant_context() or ctxt
934            temp_vol = self.driver._create_temp_volume_from_snapshot(
935                ctxt, volume, snapshot, volume_options=v_options)
936            self._copy_volume_data(ctxt, temp_vol, volume)
937            self.driver.delete_volume(temp_vol)
938            temp_vol.destroy()
939        except Exception:
940            with excutils.save_and_reraise_exception():
941                LOG.exception(
942                    "Failed to use snapshot %(snapshot)s to create "
943                    "a temporary volume and copy data to volume "
944                    " %(volume)s.",
945                    {'snapshot': snapshot.id,
946                     'volume': volume.id})
947                if temp_vol and temp_vol.status == 'available':
948                    self.driver.delete_volume(temp_vol)
949                    temp_vol.destroy()
950
951    def _revert_to_snapshot(self, context, volume, snapshot):
952        """Use driver or generic method to rollback volume."""
953
954        try:
955            self.driver.revert_to_snapshot(context, volume, snapshot)
956        except (NotImplementedError, AttributeError):
957            LOG.info("Driver's 'revert_to_snapshot' is not found. "
958                     "Try to use copy-snapshot-to-volume method.")
959            self._revert_to_snapshot_generic(context, volume, snapshot)
960
961    def _create_backup_snapshot(self, context, volume):
962        kwargs = {
963            'volume_id': volume.id,
964            'user_id': context.user_id,
965            'project_id': context.project_id,
966            'status': fields.SnapshotStatus.CREATING,
967            'progress': '0%',
968            'volume_size': volume.size,
969            'display_name': '[revert] volume %s backup snapshot' % volume.id,
970            'display_description': 'This is only used for backup when '
971                                   'reverting. If the reverting process '
972                                   'failed, you can restore you data by '
973                                   'creating new volume with this snapshot.',
974            'volume_type_id': volume.volume_type_id,
975            'encryption_key_id': volume.encryption_key_id,
976            'metadata': {}
977        }
978        snapshot = objects.Snapshot(context=context, **kwargs)
979        snapshot.create()
980        self.create_snapshot(context, snapshot)
981        return snapshot
982
983    def revert_to_snapshot(self, context, volume, snapshot):
984        """Revert a volume to a snapshot.
985
986        The process of reverting to snapshot consists of several steps:
987        1.   create a snapshot for backup (in case of data loss)
988        2.1. use driver's specific logic to revert volume
989        2.2. try the generic way to revert volume if driver's method is missing
990        3.   delete the backup snapshot
991        """
992        backup_snapshot = None
993        try:
994            LOG.info("Start to perform revert to snapshot process.")
995
996            self._notify_about_volume_usage(context, volume,
997                                            "revert.start")
998            self._notify_about_snapshot_usage(context, snapshot,
999                                              "revert.start")
1000
1001            # Create a snapshot which can be used to restore the volume
1002            # data by hand if revert process failed.
1003
1004            if self.driver.snapshot_revert_use_temp_snapshot():
1005                backup_snapshot = self._create_backup_snapshot(context,
1006                                                               volume)
1007            self._revert_to_snapshot(context, volume, snapshot)
1008        except Exception as error:
1009            with excutils.save_and_reraise_exception():
1010                self._notify_about_volume_usage(context, volume,
1011                                                "revert.end")
1012                self._notify_about_snapshot_usage(context, snapshot,
1013                                                  "revert.end")
1014                msg = ('Volume %(v_id)s revert to '
1015                       'snapshot %(s_id)s failed with %(error)s.')
1016                msg_args = {'v_id': volume.id,
1017                            's_id': snapshot.id,
1018                            'error': six.text_type(error)}
1019                v_res = volume.update_single_status_where(
1020                    'error',
1021                    'reverting')
1022                if not v_res:
1023                    msg_args = {"id": volume.id,
1024                                "status": 'error'}
1025                    msg += ("Failed to reset volume %(id)s "
1026                            "status to %(status)s.") % msg_args
1027
1028                s_res = snapshot.update_single_status_where(
1029                    fields.SnapshotStatus.AVAILABLE,
1030                    fields.SnapshotStatus.RESTORING)
1031                if not s_res:
1032                    msg_args = {"id": snapshot.id,
1033                                "status":
1034                                    fields.SnapshotStatus.ERROR}
1035                    msg += ("Failed to reset snapshot %(id)s "
1036                            "status to %(status)s." % msg_args)
1037                LOG.exception(msg, msg_args)
1038
1039        v_res = volume.update_single_status_where(
1040            'available', 'reverting')
1041        if not v_res:
1042            msg_args = {"id": volume.id,
1043                        "status": 'available'}
1044            msg = _("Revert finished, but failed to reset "
1045                    "volume %(id)s status to %(status)s, "
1046                    "please manually reset it.") % msg_args
1047            raise exception.BadResetResourceStatus(message=msg)
1048
1049        s_res = snapshot.update_single_status_where(
1050            fields.SnapshotStatus.AVAILABLE,
1051            fields.SnapshotStatus.RESTORING)
1052        if not s_res:
1053            msg_args = {"id": snapshot.id,
1054                        "status":
1055                            fields.SnapshotStatus.AVAILABLE}
1056            msg = _("Revert finished, but failed to reset "
1057                    "snapshot %(id)s status to %(status)s, "
1058                    "please manually reset it.") % msg_args
1059            raise exception.BadResetResourceStatus(message=msg)
1060        if backup_snapshot:
1061            self.delete_snapshot(context,
1062                                 backup_snapshot, handle_quota=False)
1063        msg = ('Volume %(v_id)s reverted to snapshot %(snap_id)s '
1064               'successfully.')
1065        msg_args = {'v_id': volume.id, 'snap_id': snapshot.id}
1066        LOG.info(msg, msg_args)
1067        self._notify_about_volume_usage(context, volume, "revert.end")
1068        self._notify_about_snapshot_usage(context, snapshot, "revert.end")
1069
1070    @objects.Snapshot.set_workers
1071    def create_snapshot(self, context, snapshot):
1072        """Creates and exports the snapshot."""
1073        context = context.elevated()
1074
1075        self._notify_about_snapshot_usage(
1076            context, snapshot, "create.start")
1077
1078        try:
1079            # NOTE(flaper87): Verify the driver is enabled
1080            # before going forward. The exception will be caught
1081            # and the snapshot status updated.
1082            utils.require_driver_initialized(self.driver)
1083
1084            # Pass context so that drivers that want to use it, can,
1085            # but it is not a requirement for all drivers.
1086            snapshot.context = context
1087
1088            model_update = self.driver.create_snapshot(snapshot)
1089            if model_update:
1090                snapshot.update(model_update)
1091                snapshot.save()
1092
1093        except Exception:
1094            with excutils.save_and_reraise_exception():
1095                snapshot.status = fields.SnapshotStatus.ERROR
1096                snapshot.save()
1097
1098        vol_ref = self.db.volume_get(context, snapshot.volume_id)
1099        if vol_ref.bootable:
1100            try:
1101                self.db.volume_glance_metadata_copy_to_snapshot(
1102                    context, snapshot.id, snapshot.volume_id)
1103            except exception.GlanceMetadataNotFound:
1104                # If volume is not created from image, No glance metadata
1105                # would be available for that volume in
1106                # volume glance metadata table
1107                pass
1108            except exception.CinderException as ex:
1109                LOG.exception("Failed updating snapshot"
1110                              " metadata using the provided volumes"
1111                              " %(volume_id)s metadata",
1112                              {'volume_id': snapshot.volume_id},
1113                              resource=snapshot)
1114                snapshot.status = fields.SnapshotStatus.ERROR
1115                snapshot.save()
1116                raise exception.MetadataCopyFailure(reason=six.text_type(ex))
1117
1118        snapshot.status = fields.SnapshotStatus.AVAILABLE
1119        snapshot.progress = '100%'
1120        # Resync with the volume's DB value. This addresses the case where
1121        # the snapshot creation was in flight just prior to when the volume's
1122        # fixed_key encryption key ID was migrated to Barbican.
1123        snapshot.encryption_key_id = vol_ref.encryption_key_id
1124        snapshot.save()
1125
1126        self._notify_about_snapshot_usage(context, snapshot, "create.end")
1127        LOG.info("Create snapshot completed successfully",
1128                 resource=snapshot)
1129        return snapshot.id
1130
1131    @coordination.synchronized('{snapshot.id}-{f_name}')
1132    def delete_snapshot(self, context, snapshot,
1133                        unmanage_only=False, handle_quota=True):
1134        """Deletes and unexports snapshot."""
1135        context = context.elevated()
1136        snapshot._context = context
1137        project_id = snapshot.project_id
1138
1139        self._notify_about_snapshot_usage(
1140            context, snapshot, "delete.start")
1141
1142        try:
1143            # NOTE(flaper87): Verify the driver is enabled
1144            # before going forward. The exception will be caught
1145            # and the snapshot status updated.
1146            utils.require_driver_initialized(self.driver)
1147
1148            # Pass context so that drivers that want to use it, can,
1149            # but it is not a requirement for all drivers.
1150            snapshot.context = context
1151            snapshot.save()
1152
1153            if unmanage_only:
1154                self.driver.unmanage_snapshot(snapshot)
1155            else:
1156                self.driver.delete_snapshot(snapshot)
1157        except exception.SnapshotIsBusy:
1158            LOG.error("Delete snapshot failed, due to snapshot busy.",
1159                      resource=snapshot)
1160            snapshot.status = fields.SnapshotStatus.AVAILABLE
1161            snapshot.save()
1162            return
1163        except Exception:
1164            with excutils.save_and_reraise_exception():
1165                snapshot.status = fields.SnapshotStatus.ERROR_DELETING
1166                snapshot.save()
1167
1168        # Get reservations
1169        reservations = None
1170        try:
1171            if handle_quota:
1172                if CONF.no_snapshot_gb_quota:
1173                    reserve_opts = {'snapshots': -1}
1174                else:
1175                    reserve_opts = {
1176                        'snapshots': -1,
1177                        'gigabytes': -snapshot.volume_size,
1178                    }
1179                volume_ref = self.db.volume_get(context, snapshot.volume_id)
1180                QUOTAS.add_volume_type_opts(context,
1181                                            reserve_opts,
1182                                            volume_ref.get('volume_type_id'))
1183                reservations = QUOTAS.reserve(context,
1184                                              project_id=project_id,
1185                                              **reserve_opts)
1186        except Exception:
1187            reservations = None
1188            LOG.exception("Update snapshot usages failed.",
1189                          resource=snapshot)
1190        self.db.volume_glance_metadata_delete_by_snapshot(context, snapshot.id)
1191        snapshot.destroy()
1192        self._notify_about_snapshot_usage(context, snapshot, "delete.end")
1193
1194        # Commit the reservations
1195        if reservations:
1196            QUOTAS.commit(context, reservations, project_id=project_id)
1197
1198        msg = "Delete snapshot completed successfully."
1199        if unmanage_only:
1200            msg = "Unmanage snapshot completed successfully."
1201        LOG.info(msg, resource=snapshot)
1202
1203    @coordination.synchronized('{volume_id}')
1204    def attach_volume(self, context, volume_id, instance_uuid, host_name,
1205                      mountpoint, mode, volume=None):
1206        """Updates db to show volume is attached."""
1207        # FIXME(lixiaoy1): Remove this in v4.0 of RPC API.
1208        if volume is None:
1209            # For older clients, mimic the old behavior and look
1210            # up the volume by its volume_id.
1211            volume = objects.Volume.get_by_id(context, volume_id)
1212        # Get admin_metadata. This needs admin context.
1213        with volume.obj_as_admin():
1214            volume_metadata = volume.admin_metadata
1215        # check the volume status before attaching
1216        if volume.status == 'attaching':
1217            if (volume_metadata.get('attached_mode') and
1218               volume_metadata.get('attached_mode') != mode):
1219                raise exception.InvalidVolume(
1220                    reason=_("being attached by different mode"))
1221
1222        host_name_sanitized = utils.sanitize_hostname(
1223            host_name) if host_name else None
1224        if instance_uuid:
1225            attachments = (
1226                VA_LIST.get_all_by_instance_uuid(
1227                    context, instance_uuid))
1228        else:
1229            attachments = (
1230                VA_LIST.get_all_by_host(
1231                    context, host_name_sanitized))
1232        if attachments:
1233            # check if volume<->instance mapping is already tracked in DB
1234            for attachment in attachments:
1235                if attachment['volume_id'] == volume_id:
1236                    volume.status = 'in-use'
1237                    volume.save()
1238                    return attachment
1239
1240        if (volume.status == 'in-use' and not volume.multiattach
1241           and not volume.migration_status):
1242            raise exception.InvalidVolume(
1243                reason=_("volume is already attached and multiple attachments "
1244                         "are not enabled"))
1245
1246        self._notify_about_volume_usage(context, volume,
1247                                        "attach.start")
1248
1249        attachment = volume.begin_attach(mode)
1250
1251        if instance_uuid and not uuidutils.is_uuid_like(instance_uuid):
1252            attachment.attach_status = (
1253                fields.VolumeAttachStatus.ERROR_ATTACHING)
1254            attachment.save()
1255            raise exception.InvalidUUID(uuid=instance_uuid)
1256
1257        try:
1258            if volume_metadata.get('readonly') == 'True' and mode != 'ro':
1259                raise exception.InvalidVolumeAttachMode(mode=mode,
1260                                                        volume_id=volume.id)
1261            # NOTE(flaper87): Verify the driver is enabled
1262            # before going forward. The exception will be caught
1263            # and the volume status updated.
1264            utils.require_driver_initialized(self.driver)
1265
1266            LOG.info('Attaching volume %(volume_id)s to instance '
1267                     '%(instance)s at mountpoint %(mount)s on host '
1268                     '%(host)s.',
1269                     {'volume_id': volume_id, 'instance': instance_uuid,
1270                      'mount': mountpoint, 'host': host_name_sanitized},
1271                     resource=volume)
1272            self.driver.attach_volume(context,
1273                                      volume,
1274                                      instance_uuid,
1275                                      host_name_sanitized,
1276                                      mountpoint)
1277        except Exception as excep:
1278            with excutils.save_and_reraise_exception():
1279                self.message_api.create(
1280                    context,
1281                    message_field.Action.ATTACH_VOLUME,
1282                    resource_uuid=volume_id,
1283                    exception=excep)
1284                attachment.attach_status = (
1285                    fields.VolumeAttachStatus.ERROR_ATTACHING)
1286                attachment.save()
1287
1288        volume = attachment.finish_attach(
1289            instance_uuid,
1290            host_name_sanitized,
1291            mountpoint,
1292            mode)
1293
1294        self._notify_about_volume_usage(context, volume, "attach.end")
1295        LOG.info("Attach volume completed successfully.",
1296                 resource=volume)
1297        return attachment
1298
1299    @coordination.synchronized('{volume_id}-{f_name}')
1300    def detach_volume(self, context, volume_id, attachment_id=None,
1301                      volume=None):
1302        """Updates db to show volume is detached."""
1303        # TODO(vish): refactor this into a more general "unreserve"
1304        # FIXME(lixiaoy1): Remove this in v4.0 of RPC API.
1305        if volume is None:
1306            # For older clients, mimic the old behavior and look up the volume
1307            # by its volume_id.
1308            volume = objects.Volume.get_by_id(context, volume_id)
1309
1310        if attachment_id:
1311            try:
1312                attachment = objects.VolumeAttachment.get_by_id(context,
1313                                                                attachment_id)
1314            except exception.VolumeAttachmentNotFound:
1315                LOG.info("Volume detach called, but volume not attached.",
1316                         resource=volume)
1317                # We need to make sure the volume status is set to the correct
1318                # status.  It could be in detaching status now, and we don't
1319                # want to leave it there.
1320                volume.finish_detach(attachment_id)
1321                return
1322        else:
1323            # We can try and degrade gracefully here by trying to detach
1324            # a volume without the attachment_id here if the volume only has
1325            # one attachment.  This is for backwards compatibility.
1326            attachments = volume.volume_attachment
1327            if len(attachments) > 1:
1328                # There are more than 1 attachments for this volume
1329                # we have to have an attachment id.
1330                msg = _("Detach volume failed: More than one attachment, "
1331                        "but no attachment_id provided.")
1332                LOG.error(msg, resource=volume)
1333                raise exception.InvalidVolume(reason=msg)
1334            elif len(attachments) == 1:
1335                attachment = attachments[0]
1336            else:
1337                # there aren't any attachments for this volume.
1338                # so set the status to available and move on.
1339                LOG.info("Volume detach called, but volume not attached.",
1340                         resource=volume)
1341                volume.status = 'available'
1342                volume.attach_status = fields.VolumeAttachStatus.DETACHED
1343                volume.save()
1344                return
1345
1346        self._notify_about_volume_usage(context, volume, "detach.start")
1347        try:
1348            # NOTE(flaper87): Verify the driver is enabled
1349            # before going forward. The exception will be caught
1350            # and the volume status updated.
1351            utils.require_driver_initialized(self.driver)
1352
1353            LOG.info('Detaching volume %(volume_id)s from instance '
1354                     '%(instance)s.',
1355                     {'volume_id': volume_id,
1356                      'instance': attachment.get('instance_uuid')},
1357                     resource=volume)
1358            self.driver.detach_volume(context, volume, attachment)
1359        except Exception:
1360            with excutils.save_and_reraise_exception():
1361                self.db.volume_attachment_update(
1362                    context, attachment.get('id'), {
1363                        'attach_status':
1364                            fields.VolumeAttachStatus.ERROR_DETACHING})
1365
1366        # NOTE(jdg): We used to do an ensure export here to
1367        # catch upgrades while volumes were attached (E->F)
1368        # this was necessary to convert in-use volumes from
1369        # int ID's to UUID's.  Don't need this any longer
1370
1371        # We're going to remove the export here
1372        # (delete the iscsi target)
1373        try:
1374            utils.require_driver_initialized(self.driver)
1375            self.driver.remove_export(context.elevated(), volume)
1376        except exception.DriverNotInitialized:
1377            with excutils.save_and_reraise_exception():
1378                LOG.exception("Detach volume failed, due to "
1379                              "uninitialized driver.",
1380                              resource=volume)
1381        except Exception as ex:
1382            LOG.exception("Detach volume failed, due to "
1383                          "remove-export failure.",
1384                          resource=volume)
1385            raise exception.RemoveExportException(volume=volume_id,
1386                                                  reason=six.text_type(ex))
1387
1388        volume.finish_detach(attachment.id)
1389        self._notify_about_volume_usage(context, volume, "detach.end")
1390        LOG.info("Detach volume completed successfully.", resource=volume)
1391
1392    def _create_image_cache_volume_entry(self, ctx, volume_ref,
1393                                         image_id, image_meta):
1394        """Create a new image-volume and cache entry for it.
1395
1396        This assumes that the image has already been downloaded and stored
1397        in the volume described by the volume_ref.
1398        """
1399        cache_entry = self.image_volume_cache.get_entry(ctx,
1400                                                        volume_ref,
1401                                                        image_id,
1402                                                        image_meta)
1403        if cache_entry:
1404            LOG.debug('Cache entry already exists with image ID %'
1405                      '(image_id)s',
1406                      {'image_id': image_id})
1407            return
1408
1409        image_volume = None
1410        try:
1411            if not self.image_volume_cache.ensure_space(ctx, volume_ref):
1412                LOG.warning('Unable to ensure space for image-volume in'
1413                            ' cache. Will skip creating entry for image'
1414                            ' %(image)s on %(service)s.',
1415                            {'image': image_id,
1416                             'service': volume_ref.service_topic_queue})
1417                return
1418
1419            image_volume = self._clone_image_volume(ctx,
1420                                                    volume_ref,
1421                                                    image_meta)
1422            if not image_volume:
1423                LOG.warning('Unable to clone image_volume for image '
1424                            '%(image_id)s will not create cache entry.',
1425                            {'image_id': image_id})
1426                return
1427            self.image_volume_cache.create_cache_entry(
1428                ctx,
1429                image_volume,
1430                image_id,
1431                image_meta
1432            )
1433        except exception.CinderException as e:
1434            LOG.warning('Failed to create new image-volume cache entry.'
1435                        ' Error: %(exception)s', {'exception': e})
1436            if image_volume:
1437                self.delete_volume(ctx, image_volume)
1438
1439    def _clone_image_volume(self, ctx, volume, image_meta):
1440        volume_type_id = volume.get('volume_type_id')
1441        reserve_opts = {'volumes': 1, 'gigabytes': volume.size}
1442        QUOTAS.add_volume_type_opts(ctx, reserve_opts, volume_type_id)
1443        reservations = QUOTAS.reserve(ctx, **reserve_opts)
1444        try:
1445            new_vol_values = {k: volume[k] for k in set(volume.keys()) -
1446                              self._VOLUME_CLONE_SKIP_PROPERTIES}
1447            new_vol_values['volume_type_id'] = volume_type_id
1448            new_vol_values['attach_status'] = (
1449                fields.VolumeAttachStatus.DETACHED)
1450            new_vol_values['status'] = 'creating'
1451            new_vol_values['project_id'] = ctx.project_id
1452            new_vol_values['display_name'] = 'image-%s' % image_meta['id']
1453            new_vol_values['source_volid'] = volume.id
1454
1455            LOG.debug('Creating image volume entry: %s.', new_vol_values)
1456            image_volume = objects.Volume(context=ctx, **new_vol_values)
1457            image_volume.create()
1458        except Exception as ex:
1459            LOG.exception('Create clone_image_volume: %(volume_id)s'
1460                          'for image %(image_id)s, '
1461                          'failed (Exception: %(except)s)',
1462                          {'volume_id': volume.id,
1463                           'image_id': image_meta['id'],
1464                           'except': ex})
1465            QUOTAS.rollback(ctx, reservations)
1466            return
1467
1468        QUOTAS.commit(ctx, reservations,
1469                      project_id=new_vol_values['project_id'])
1470
1471        try:
1472            self.create_volume(ctx, image_volume, allow_reschedule=False)
1473            image_volume.refresh()
1474            if image_volume.status != 'available':
1475                raise exception.InvalidVolume(_('Volume is not available.'))
1476
1477            self.db.volume_admin_metadata_update(ctx.elevated(),
1478                                                 image_volume.id,
1479                                                 {'readonly': 'True'},
1480                                                 False)
1481            return image_volume
1482        except exception.CinderException:
1483            LOG.exception('Failed to clone volume %(volume_id)s for '
1484                          'image %(image_id)s.',
1485                          {'volume_id': volume.id,
1486                           'image_id': image_meta['id']})
1487            try:
1488                self.delete_volume(ctx, image_volume)
1489            except exception.CinderException:
1490                LOG.exception('Could not delete the image volume %(id)s.',
1491                              {'id': volume.id})
1492            return
1493
1494    def _clone_image_volume_and_add_location(self, ctx, volume, image_service,
1495                                             image_meta):
1496        """Create a cloned volume and register its location to the image."""
1497        if (image_meta['disk_format'] != 'raw' or
1498                image_meta['container_format'] != 'bare'):
1499            return False
1500
1501        image_volume_context = ctx
1502        if self.driver.configuration.image_upload_use_internal_tenant:
1503            internal_ctx = context.get_internal_tenant_context()
1504            if internal_ctx:
1505                image_volume_context = internal_ctx
1506
1507        image_volume = self._clone_image_volume(image_volume_context,
1508                                                volume,
1509                                                image_meta)
1510        if not image_volume:
1511            return False
1512
1513        # The image_owner metadata should be set before uri is added to
1514        # the image so glance cinder store can check its owner.
1515        image_volume_meta = {'image_owner': ctx.project_id}
1516        self.db.volume_metadata_update(image_volume_context,
1517                                       image_volume.id,
1518                                       image_volume_meta,
1519                                       False)
1520
1521        uri = 'cinder://%s' % image_volume.id
1522        image_registered = None
1523        try:
1524            image_registered = image_service.add_location(
1525                ctx, image_meta['id'], uri, {})
1526        except (exception.NotAuthorized, exception.Invalid,
1527                exception.NotFound):
1528            LOG.exception('Failed to register image volume location '
1529                          '%(uri)s.', {'uri': uri})
1530
1531        if not image_registered:
1532            LOG.warning('Registration of image volume URI %(uri)s '
1533                        'to image %(image_id)s failed.',
1534                        {'uri': uri, 'image_id': image_meta['id']})
1535            try:
1536                self.delete_volume(image_volume_context, image_volume)
1537            except exception.CinderException:
1538                LOG.exception('Could not delete failed image volume '
1539                              '%(id)s.', {'id': image_volume.id})
1540            return False
1541
1542        image_volume_meta['glance_image_id'] = image_meta['id']
1543        self.db.volume_metadata_update(image_volume_context,
1544                                       image_volume.id,
1545                                       image_volume_meta,
1546                                       False)
1547        return True
1548
1549    def copy_volume_to_image(self, context, volume_id, image_meta):
1550        """Uploads the specified volume to Glance.
1551
1552        image_meta is a dictionary containing the following keys:
1553        'id', 'container_format', 'disk_format'
1554
1555        """
1556        payload = {'volume_id': volume_id, 'image_id': image_meta['id']}
1557        image_service = None
1558        try:
1559            volume = objects.Volume.get_by_id(context, volume_id)
1560
1561            # NOTE(flaper87): Verify the driver is enabled
1562            # before going forward. The exception will be caught
1563            # and the volume status updated.
1564            utils.require_driver_initialized(self.driver)
1565
1566            image_service, image_id = \
1567                glance.get_remote_image_service(context, image_meta['id'])
1568            if (self.driver.configuration.image_upload_use_cinder_backend
1569                    and self._clone_image_volume_and_add_location(
1570                        context, volume, image_service, image_meta)):
1571                LOG.debug("Registered image volume location to glance "
1572                          "image-id: %(image_id)s.",
1573                          {'image_id': image_meta['id']},
1574                          resource=volume)
1575            else:
1576                self.driver.copy_volume_to_image(context, volume,
1577                                                 image_service, image_meta)
1578                LOG.debug("Uploaded volume to glance image-id: %(image_id)s.",
1579                          {'image_id': image_meta['id']},
1580                          resource=volume)
1581        except Exception as error:
1582            LOG.error("Upload volume to image encountered an error "
1583                      "(image-id: %(image_id)s).",
1584                      {'image_id': image_meta['id']},
1585                      resource=volume)
1586            self.message_api.create(
1587                context,
1588                message_field.Action.COPY_VOLUME_TO_IMAGE,
1589                resource_uuid=volume_id,
1590                exception=error,
1591                detail=message_field.Detail.FAILED_TO_UPLOAD_VOLUME)
1592            if image_service is not None:
1593                # Deletes the image if it is in queued or saving state
1594                self._delete_image(context, image_meta['id'], image_service)
1595            with excutils.save_and_reraise_exception():
1596                payload['message'] = six.text_type(error)
1597        finally:
1598            self.db.volume_update_status_based_on_attachment(context,
1599                                                             volume_id)
1600        LOG.info("Copy volume to image completed successfully.",
1601                 resource=volume)
1602
1603    def _delete_image(self, context, image_id, image_service):
1604        """Deletes an image stuck in queued or saving state."""
1605        try:
1606            image_meta = image_service.show(context, image_id)
1607            image_status = image_meta.get('status')
1608            if image_status == 'queued' or image_status == 'saving':
1609                LOG.warning("Deleting image in unexpected status: "
1610                            "%(image_status)s.",
1611                            {'image_status': image_status},
1612                            resource={'type': 'image', 'id': image_id})
1613                image_service.delete(context, image_id)
1614        except Exception:
1615            LOG.warning("Image delete encountered an error.",
1616                        exc_info=True, resource={'type': 'image',
1617                                                 'id': image_id})
1618
1619    def _parse_connection_options(self, context, volume, conn_info):
1620        # Add qos_specs to connection info
1621        typeid = volume.volume_type_id
1622        specs = None
1623        if typeid:
1624            res = volume_types.get_volume_type_qos_specs(typeid)
1625            qos = res['qos_specs']
1626            # only pass qos_specs that is designated to be consumed by
1627            # front-end, or both front-end and back-end.
1628            if qos and qos.get('consumer') in ['front-end', 'both']:
1629                specs = qos.get('specs')
1630
1631            if specs is not None:
1632                # Compute fixed IOPS values for per-GB keys
1633                if 'write_iops_sec_per_gb' in specs:
1634                    specs['write_iops_sec'] = (
1635                        int(specs['write_iops_sec_per_gb']) * int(volume.size))
1636                    specs.pop('write_iops_sec_per_gb')
1637
1638                if 'read_iops_sec_per_gb' in specs:
1639                    specs['read_iops_sec'] = (
1640                        int(specs['read_iops_sec_per_gb']) * int(volume.size))
1641                    specs.pop('read_iops_sec_per_gb')
1642
1643                if 'total_iops_sec_per_gb' in specs:
1644                    specs['total_iops_sec'] = (
1645                        int(specs['total_iops_sec_per_gb']) * int(volume.size))
1646                    specs.pop('total_iops_sec_per_gb')
1647
1648        qos_spec = dict(qos_specs=specs)
1649        conn_info['data'].update(qos_spec)
1650
1651        # Add access_mode to connection info
1652        volume_metadata = volume.admin_metadata
1653        access_mode = volume_metadata.get('attached_mode')
1654        if access_mode is None:
1655            # NOTE(zhiyan): client didn't call 'os-attach' before
1656            access_mode = ('ro'
1657                           if volume_metadata.get('readonly') == 'True'
1658                           else 'rw')
1659        conn_info['data']['access_mode'] = access_mode
1660
1661        # Add encrypted flag to connection_info if not set in the driver.
1662        if conn_info['data'].get('encrypted') is None:
1663            encrypted = bool(volume.encryption_key_id)
1664            conn_info['data']['encrypted'] = encrypted
1665
1666        # Add discard flag to connection_info if not set in the driver and
1667        # configured to be reported.
1668        if conn_info['data'].get('discard') is None:
1669            discard_supported = (self.driver.configuration
1670                                 .safe_get('report_discard_supported'))
1671            if discard_supported:
1672                conn_info['data']['discard'] = True
1673
1674        return conn_info
1675
1676    def initialize_connection(self, context, volume, connector):
1677        """Prepare volume for connection from host represented by connector.
1678
1679        This method calls the driver initialize_connection and returns
1680        it to the caller.  The connector parameter is a dictionary with
1681        information about the host that will connect to the volume in the
1682        following format::
1683
1684          .. code:: json
1685
1686            {
1687                'ip': ip,
1688                'initiator': initiator,
1689            }
1690
1691        ip: the ip address of the connecting machine
1692
1693        initiator: the iscsi initiator name of the connecting machine.
1694        This can be None if the connecting machine does not support iscsi
1695        connections.
1696
1697        driver is responsible for doing any necessary security setup and
1698        returning a connection_info dictionary in the following format::
1699
1700          .. code:: json
1701
1702            {
1703                'driver_volume_type': driver_volume_type,
1704                'data': data,
1705            }
1706
1707        driver_volume_type: a string to identify the type of volume.  This
1708                           can be used by the calling code to determine the
1709                           strategy for connecting to the volume. This could
1710                           be 'iscsi', 'rbd', 'sheepdog', etc.
1711
1712        data: this is the data that the calling code will use to connect
1713              to the volume. Keep in mind that this will be serialized to
1714              json in various places, so it should not contain any non-json
1715              data types.
1716        """
1717        # NOTE(flaper87): Verify the driver is enabled
1718        # before going forward. The exception will be caught
1719        # and the volume status updated.
1720
1721        # TODO(jdg): Add deprecation warning
1722        utils.require_driver_initialized(self.driver)
1723        try:
1724            self.driver.validate_connector(connector)
1725        except exception.InvalidConnectorException as err:
1726            raise exception.InvalidInput(reason=six.text_type(err))
1727        except Exception as err:
1728            err_msg = (_("Validate volume connection failed "
1729                         "(error: %(err)s).") % {'err': six.text_type(err)})
1730            LOG.exception(err_msg, resource=volume)
1731            raise exception.VolumeBackendAPIException(data=err_msg)
1732
1733        try:
1734            model_update = self.driver.create_export(context.elevated(),
1735                                                     volume, connector)
1736        except exception.CinderException as ex:
1737            msg = _("Create export of volume failed (%s)") % ex.msg
1738            LOG.exception(msg, resource=volume)
1739            raise exception.VolumeBackendAPIException(data=msg)
1740
1741        try:
1742            if model_update:
1743                volume.update(model_update)
1744                volume.save()
1745        except Exception as ex:
1746            LOG.exception("Model update failed.", resource=volume)
1747            try:
1748                self.driver.remove_export(context.elevated(), volume)
1749            except Exception:
1750                LOG.exception('Could not remove export after DB model failed.')
1751            raise exception.ExportFailure(reason=six.text_type(ex))
1752
1753        try:
1754            conn_info = self.driver.initialize_connection(volume, connector)
1755        except Exception as err:
1756            err_msg = (_("Driver initialize connection failed "
1757                         "(error: %(err)s).") % {'err': six.text_type(err)})
1758            LOG.exception(err_msg, resource=volume)
1759
1760            self.driver.remove_export(context.elevated(), volume)
1761
1762            raise exception.VolumeBackendAPIException(data=err_msg)
1763
1764        conn_info = self._parse_connection_options(context, volume, conn_info)
1765        LOG.info("Initialize volume connection completed successfully.",
1766                 resource=volume)
1767        return conn_info
1768
1769    def initialize_connection_snapshot(self, ctxt, snapshot_id, connector):
1770        utils.require_driver_initialized(self.driver)
1771        snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
1772        try:
1773            self.driver.validate_connector(connector)
1774        except exception.InvalidConnectorException as err:
1775            raise exception.InvalidInput(reason=six.text_type(err))
1776        except Exception as err:
1777            err_msg = (_("Validate snapshot connection failed "
1778                         "(error: %(err)s).") % {'err': six.text_type(err)})
1779            LOG.exception(err_msg, resource=snapshot)
1780            raise exception.VolumeBackendAPIException(data=err_msg)
1781
1782        model_update = None
1783        try:
1784            LOG.debug("Snapshot %s: creating export.", snapshot.id)
1785            model_update = self.driver.create_export_snapshot(
1786                ctxt.elevated(), snapshot, connector)
1787            if model_update:
1788                snapshot.provider_location = model_update.get(
1789                    'provider_location', None)
1790                snapshot.provider_auth = model_update.get(
1791                    'provider_auth', None)
1792                snapshot.save()
1793        except exception.CinderException as ex:
1794            msg = _("Create export of snapshot failed (%s)") % ex.msg
1795            LOG.exception(msg, resource=snapshot)
1796            raise exception.VolumeBackendAPIException(data=msg)
1797
1798        try:
1799            if model_update:
1800                snapshot.update(model_update)
1801                snapshot.save()
1802        except exception.CinderException as ex:
1803            LOG.exception("Model update failed.", resource=snapshot)
1804            raise exception.ExportFailure(reason=six.text_type(ex))
1805
1806        try:
1807            conn = self.driver.initialize_connection_snapshot(snapshot,
1808                                                              connector)
1809        except Exception as err:
1810            try:
1811                err_msg = (_('Unable to fetch connection information from '
1812                             'backend: %(err)s') %
1813                           {'err': six.text_type(err)})
1814                LOG.error(err_msg)
1815                LOG.debug("Cleaning up failed connect initialization.")
1816                self.driver.remove_export_snapshot(ctxt.elevated(), snapshot)
1817            except Exception as ex:
1818                ex_msg = (_('Error encountered during cleanup '
1819                            'of a failed attach: %(ex)s') %
1820                          {'ex': six.text_type(ex)})
1821                LOG.error(ex_msg)
1822                raise exception.VolumeBackendAPIException(data=ex_msg)
1823            raise exception.VolumeBackendAPIException(data=err_msg)
1824
1825        LOG.info("Initialize snapshot connection completed successfully.",
1826                 resource=snapshot)
1827        return conn
1828
1829    def terminate_connection(self, context, volume_id, connector, force=False):
1830        """Cleanup connection from host represented by connector.
1831
1832        The format of connector is the same as for initialize_connection.
1833        """
1834        # NOTE(flaper87): Verify the driver is enabled
1835        # before going forward. The exception will be caught
1836        # and the volume status updated.
1837        utils.require_driver_initialized(self.driver)
1838
1839        volume_ref = self.db.volume_get(context, volume_id)
1840        try:
1841            self.driver.terminate_connection(volume_ref, connector,
1842                                             force=force)
1843        except Exception as err:
1844            err_msg = (_('Terminate volume connection failed: %(err)s')
1845                       % {'err': six.text_type(err)})
1846            LOG.exception(err_msg, resource=volume_ref)
1847            raise exception.VolumeBackendAPIException(data=err_msg)
1848        LOG.info("Terminate volume connection completed successfully.",
1849                 resource=volume_ref)
1850
1851    def terminate_connection_snapshot(self, ctxt, snapshot_id,
1852                                      connector, force=False):
1853        utils.require_driver_initialized(self.driver)
1854
1855        snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
1856        try:
1857            self.driver.terminate_connection_snapshot(snapshot, connector,
1858                                                      force=force)
1859        except Exception as err:
1860            err_msg = (_('Terminate snapshot connection failed: %(err)s')
1861                       % {'err': six.text_type(err)})
1862            LOG.exception(err_msg, resource=snapshot)
1863            raise exception.VolumeBackendAPIException(data=err_msg)
1864        LOG.info("Terminate snapshot connection completed successfully.",
1865                 resource=snapshot)
1866
1867    def remove_export(self, context, volume_id):
1868        """Removes an export for a volume."""
1869        utils.require_driver_initialized(self.driver)
1870        volume_ref = self.db.volume_get(context, volume_id)
1871        try:
1872            self.driver.remove_export(context, volume_ref)
1873        except Exception:
1874            msg = _("Remove volume export failed.")
1875            LOG.exception(msg, resource=volume_ref)
1876            raise exception.VolumeBackendAPIException(data=msg)
1877
1878        LOG.info("Remove volume export completed successfully.",
1879                 resource=volume_ref)
1880
1881    def remove_export_snapshot(self, ctxt, snapshot_id):
1882        """Removes an export for a snapshot."""
1883        utils.require_driver_initialized(self.driver)
1884        snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
1885        try:
1886            self.driver.remove_export_snapshot(ctxt, snapshot)
1887        except Exception:
1888            msg = _("Remove snapshot export failed.")
1889            LOG.exception(msg, resource=snapshot)
1890            raise exception.VolumeBackendAPIException(data=msg)
1891
1892        LOG.info("Remove snapshot export completed successfully.",
1893                 resource=snapshot)
1894
1895    def accept_transfer(self, context, volume_id, new_user, new_project):
1896        # NOTE(flaper87): Verify the driver is enabled
1897        # before going forward. The exception will be caught
1898        # and the volume status updated.
1899        utils.require_driver_initialized(self.driver)
1900
1901        # NOTE(jdg): need elevated context as we haven't "given" the vol
1902        # yet
1903        volume_ref = self.db.volume_get(context.elevated(), volume_id)
1904
1905        # NOTE(jdg): Some drivers tie provider info (CHAP) to tenant
1906        # for those that do allow them to return updated model info
1907        model_update = self.driver.accept_transfer(context,
1908                                                   volume_ref,
1909                                                   new_user,
1910                                                   new_project)
1911
1912        if model_update:
1913            try:
1914                self.db.volume_update(context.elevated(),
1915                                      volume_id,
1916                                      model_update)
1917            except exception.CinderException:
1918                with excutils.save_and_reraise_exception():
1919                    LOG.exception("Update volume model for "
1920                                  "transfer operation failed.",
1921                                  resource=volume_ref)
1922                    self.db.volume_update(context.elevated(),
1923                                          volume_id,
1924                                          {'status': 'error'})
1925
1926        LOG.info("Transfer volume completed successfully.",
1927                 resource=volume_ref)
1928        return model_update
1929
1930    def _connect_device(self, conn):
1931        use_multipath = self.configuration.use_multipath_for_image_xfer
1932        device_scan_attempts = self.configuration.num_volume_device_scan_tries
1933        protocol = conn['driver_volume_type']
1934        connector = utils.brick_get_connector(
1935            protocol,
1936            use_multipath=use_multipath,
1937            device_scan_attempts=device_scan_attempts,
1938            conn=conn)
1939        vol_handle = connector.connect_volume(conn['data'])
1940
1941        root_access = True
1942
1943        if not connector.check_valid_device(vol_handle['path'], root_access):
1944            if isinstance(vol_handle['path'], six.string_types):
1945                raise exception.DeviceUnavailable(
1946                    path=vol_handle['path'],
1947                    reason=(_("Unable to access the backend storage via the "
1948                              "path %(path)s.") %
1949                            {'path': vol_handle['path']}))
1950            else:
1951                raise exception.DeviceUnavailable(
1952                    path=None,
1953                    reason=(_("Unable to access the backend storage via file "
1954                              "handle.")))
1955
1956        return {'conn': conn, 'device': vol_handle, 'connector': connector}
1957
1958    def _attach_volume(self, ctxt, volume, properties, remote=False,
1959                       attach_encryptor=False):
1960        status = volume['status']
1961
1962        if remote:
1963            rpcapi = volume_rpcapi.VolumeAPI()
1964            try:
1965                conn = rpcapi.initialize_connection(ctxt, volume, properties)
1966            except Exception:
1967                with excutils.save_and_reraise_exception():
1968                    LOG.error("Failed to attach volume %(vol)s.",
1969                              {'vol': volume['id']})
1970                    self.db.volume_update(ctxt, volume['id'],
1971                                          {'status': status})
1972        else:
1973            conn = self.initialize_connection(ctxt, volume, properties)
1974
1975        attach_info = self._connect_device(conn)
1976        try:
1977            if attach_encryptor and (
1978                    volume_types.is_encrypted(ctxt,
1979                                              volume.volume_type_id)):
1980                encryption = self.db.volume_encryption_metadata_get(
1981                    ctxt.elevated(), volume.id)
1982                if encryption:
1983                    utils.brick_attach_volume_encryptor(ctxt,
1984                                                        attach_info,
1985                                                        encryption)
1986        except Exception:
1987            with excutils.save_and_reraise_exception():
1988                LOG.error("Failed to attach volume encryptor"
1989                          " %(vol)s.", {'vol': volume['id']})
1990                self._detach_volume(ctxt, attach_info, volume, properties,
1991                                    force=True)
1992        return attach_info
1993
1994    def _detach_volume(self, ctxt, attach_info, volume, properties,
1995                       force=False, remote=False,
1996                       attach_encryptor=False):
1997        connector = attach_info['connector']
1998        if attach_encryptor and (
1999                volume_types.is_encrypted(ctxt,
2000                                          volume.volume_type_id)):
2001            encryption = self.db.volume_encryption_metadata_get(
2002                ctxt.elevated(), volume.id)
2003            if encryption:
2004                utils.brick_detach_volume_encryptor(attach_info, encryption)
2005        connector.disconnect_volume(attach_info['conn']['data'],
2006                                    attach_info['device'], force=force)
2007
2008        if remote:
2009            rpcapi = volume_rpcapi.VolumeAPI()
2010            rpcapi.terminate_connection(ctxt, volume, properties, force=force)
2011            rpcapi.remove_export(ctxt, volume)
2012        else:
2013            try:
2014                self.terminate_connection(ctxt, volume['id'], properties,
2015                                          force=force)
2016                self.remove_export(ctxt, volume['id'])
2017            except Exception as err:
2018                with excutils.save_and_reraise_exception():
2019                    LOG.error('Unable to terminate volume connection: '
2020                              '%(err)s.', {'err': err})
2021
2022    def _copy_volume_data(self, ctxt, src_vol, dest_vol, remote=None):
2023        """Copy data from src_vol to dest_vol."""
2024
2025        LOG.debug('copy_data_between_volumes %(src)s -> %(dest)s.',
2026                  {'src': src_vol['name'], 'dest': dest_vol['name']})
2027        attach_encryptor = False
2028        # If the encryption method or key is changed, we have to
2029        # copy data through dm-crypt.
2030        if volume_types.volume_types_encryption_changed(
2031                ctxt,
2032                src_vol.volume_type_id,
2033                dest_vol.volume_type_id):
2034            attach_encryptor = True
2035        use_multipath = self.configuration.use_multipath_for_image_xfer
2036        enforce_multipath = self.configuration.enforce_multipath_for_image_xfer
2037        properties = utils.brick_get_connector_properties(use_multipath,
2038                                                          enforce_multipath)
2039
2040        dest_remote = remote in ['dest', 'both']
2041        dest_attach_info = self._attach_volume(
2042            ctxt, dest_vol, properties,
2043            remote=dest_remote,
2044            attach_encryptor=attach_encryptor)
2045
2046        try:
2047            src_remote = remote in ['src', 'both']
2048            src_attach_info = self._attach_volume(
2049                ctxt, src_vol, properties,
2050                remote=src_remote,
2051                attach_encryptor=attach_encryptor)
2052        except Exception:
2053            with excutils.save_and_reraise_exception():
2054                LOG.error("Failed to attach source volume for copy.")
2055                self._detach_volume(ctxt, dest_attach_info, dest_vol,
2056                                    properties, remote=dest_remote,
2057                                    attach_encryptor=attach_encryptor,
2058                                    force=True)
2059
2060        # Check the backend capabilities of migration destination host.
2061        rpcapi = volume_rpcapi.VolumeAPI()
2062        capabilities = rpcapi.get_capabilities(ctxt,
2063                                               dest_vol.service_topic_queue,
2064                                               False)
2065        sparse_copy_volume = bool(capabilities and
2066                                  capabilities.get('sparse_copy_volume',
2067                                                   False))
2068
2069        try:
2070            size_in_mb = int(src_vol['size']) * units.Ki    # vol size is in GB
2071            vol_utils.copy_volume(src_attach_info['device']['path'],
2072                                  dest_attach_info['device']['path'],
2073                                  size_in_mb,
2074                                  self.configuration.volume_dd_blocksize,
2075                                  sparse=sparse_copy_volume)
2076        except Exception:
2077            with excutils.save_and_reraise_exception():
2078                LOG.error("Failed to copy volume %(src)s to %(dest)s.",
2079                          {'src': src_vol['id'], 'dest': dest_vol['id']})
2080        finally:
2081            try:
2082                self._detach_volume(ctxt, dest_attach_info, dest_vol,
2083                                    properties, force=True,
2084                                    remote=dest_remote,
2085                                    attach_encryptor=attach_encryptor)
2086            finally:
2087                self._detach_volume(ctxt, src_attach_info, src_vol,
2088                                    properties, force=True,
2089                                    remote=src_remote,
2090                                    attach_encryptor=attach_encryptor)
2091
2092    def _migrate_volume_generic(self, ctxt, volume, backend, new_type_id):
2093        rpcapi = volume_rpcapi.VolumeAPI()
2094
2095        # Create new volume on remote host
2096        tmp_skip = {'snapshot_id', 'source_volid'}
2097        skip = {'host', 'cluster_name', 'availability_zone'}
2098        skip.update(tmp_skip)
2099        skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES)
2100
2101        new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip}
2102        if new_type_id:
2103            new_vol_values['volume_type_id'] = new_type_id
2104            if volume_types.volume_types_encryption_changed(
2105                    ctxt, volume.volume_type_id, new_type_id):
2106                encryption_key_id = vol_utils.create_encryption_key(
2107                    ctxt, self.key_manager, new_type_id)
2108                new_vol_values['encryption_key_id'] = encryption_key_id
2109
2110        dst_service = self._get_service(backend['host'])
2111        new_volume = objects.Volume(
2112            context=ctxt,
2113            host=backend['host'],
2114            availability_zone=dst_service.availability_zone,
2115            cluster_name=backend.get('cluster_name'),
2116            status='creating',
2117            attach_status=fields.VolumeAttachStatus.DETACHED,
2118            migration_status='target:%s' % volume['id'],
2119            **new_vol_values
2120        )
2121        new_volume.create()
2122        rpcapi.create_volume(ctxt, new_volume, None, None,
2123                             allow_reschedule=False)
2124
2125        # Wait for new_volume to become ready
2126        starttime = time.time()
2127        deadline = starttime + CONF.migration_create_volume_timeout_secs
2128
2129        new_volume.refresh()
2130        tries = 0
2131        while new_volume.status != 'available':
2132            tries += 1
2133            now = time.time()
2134            if new_volume.status == 'error':
2135                msg = _("failed to create new_volume on destination")
2136                self._clean_temporary_volume(ctxt, volume,
2137                                             new_volume,
2138                                             clean_db_only=True)
2139                raise exception.VolumeMigrationFailed(reason=msg)
2140            elif now > deadline:
2141                msg = _("timeout creating new_volume on destination")
2142                self._clean_temporary_volume(ctxt, volume,
2143                                             new_volume,
2144                                             clean_db_only=True)
2145                raise exception.VolumeMigrationFailed(reason=msg)
2146            else:
2147                time.sleep(tries ** 2)
2148            new_volume.refresh()
2149
2150        # Set skipped value to avoid calling
2151        # function except for _create_raw_volume
2152        tmp_skipped_values = {k: volume[k] for k in tmp_skip if volume.get(k)}
2153        if tmp_skipped_values:
2154            new_volume.update(tmp_skipped_values)
2155            new_volume.save()
2156
2157        # Copy the source volume to the destination volume
2158        try:
2159            attachments = volume.volume_attachment
2160            if not attachments:
2161                # Pre- and post-copy driver-specific actions
2162                self.driver.before_volume_copy(ctxt, volume, new_volume,
2163                                               remote='dest')
2164                self._copy_volume_data(ctxt, volume, new_volume, remote='dest')
2165                self.driver.after_volume_copy(ctxt, volume, new_volume,
2166                                              remote='dest')
2167
2168                # The above call is synchronous so we complete the migration
2169                self.migrate_volume_completion(ctxt, volume, new_volume,
2170                                               error=False)
2171            else:
2172                nova_api = compute.API()
2173                # This is an async call to Nova, which will call the completion
2174                # when it's done
2175                for attachment in attachments:
2176                    instance_uuid = attachment['instance_uuid']
2177                    nova_api.update_server_volume(ctxt, instance_uuid,
2178                                                  volume.id,
2179                                                  new_volume.id)
2180        except Exception:
2181            with excutils.save_and_reraise_exception():
2182                LOG.exception(
2183                    "Failed to copy volume %(vol1)s to %(vol2)s", {
2184                        'vol1': volume.id, 'vol2': new_volume.id})
2185                self._clean_temporary_volume(ctxt, volume,
2186                                             new_volume)
2187
2188    def _clean_temporary_volume(self, ctxt, volume, new_volume,
2189                                clean_db_only=False):
2190        # If we're in the migrating phase, we need to cleanup
2191        # destination volume because source volume is remaining
2192        if volume.migration_status == 'migrating':
2193            try:
2194                if clean_db_only:
2195                    # The temporary volume is not created, only DB data
2196                    # is created
2197                    new_volume.destroy()
2198                else:
2199                    # The temporary volume is already created
2200                    rpcapi = volume_rpcapi.VolumeAPI()
2201                    rpcapi.delete_volume(ctxt, new_volume)
2202            except exception.VolumeNotFound:
2203                LOG.info("Couldn't find the temporary volume "
2204                         "%(vol)s in the database. There is no need "
2205                         "to clean up this volume.",
2206                         {'vol': new_volume.id})
2207        else:
2208            # If we're in the completing phase don't delete the
2209            # destination because we may have already deleted the
2210            # source! But the migration_status in database should
2211            # be cleared to handle volume after migration failure
2212            try:
2213                new_volume.migration_status = None
2214                new_volume.save()
2215            except exception.VolumeNotFound:
2216                LOG.info("Couldn't find destination volume "
2217                         "%(vol)s in the database. The entry might be "
2218                         "successfully deleted during migration "
2219                         "completion phase.",
2220                         {'vol': new_volume.id})
2221
2222                LOG.warning("Failed to migrate volume. The destination "
2223                            "volume %(vol)s is not deleted since the "
2224                            "source volume may have been deleted.",
2225                            {'vol': new_volume.id})
2226
2227    def migrate_volume_completion(self, ctxt, volume, new_volume, error=False):
2228        try:
2229            # NOTE(flaper87): Verify the driver is enabled
2230            # before going forward. The exception will be caught
2231            # and the migration status updated.
2232            utils.require_driver_initialized(self.driver)
2233        except exception.DriverNotInitialized:
2234            with excutils.save_and_reraise_exception():
2235                volume.migration_status = 'error'
2236                volume.save()
2237
2238        # NOTE(jdg):  Things get a little hairy in here and we do a lot of
2239        # things based on volume previous-status and current-status.  At some
2240        # point this should all be reworked but for now we need to maintain
2241        # backward compatibility and NOT change the API so we're going to try
2242        # and make this work best we can
2243
2244        LOG.debug("migrate_volume_completion: completing migration for "
2245                  "volume %(vol1)s (temporary volume %(vol2)s",
2246                  {'vol1': volume.id, 'vol2': new_volume.id})
2247        rpcapi = volume_rpcapi.VolumeAPI()
2248
2249        orig_volume_status = volume.previous_status
2250
2251        if error:
2252            LOG.info("migrate_volume_completion is cleaning up an error "
2253                     "for volume %(vol1)s (temporary volume %(vol2)s",
2254                     {'vol1': volume['id'], 'vol2': new_volume.id})
2255            rpcapi.delete_volume(ctxt, new_volume)
2256            updates = {'migration_status': 'error',
2257                       'status': orig_volume_status}
2258            volume.update(updates)
2259            volume.save()
2260            return volume.id
2261
2262        volume.migration_status = 'completing'
2263        volume.save()
2264
2265        volume_attachments = []
2266
2267        # NOTE(jdg): With new attach flow, we deleted the attachment, so the
2268        # original volume should now be listed as available, we still need to
2269        # do the magic swappy thing of name.id etc but we're done with the
2270        # original attachment record
2271
2272        # In the "old flow" at this point the orig_volume_status will be in-use
2273        # and the current status will be retyping.  This is sort of a
2274        # misleading deal, because Nova has already called terminate
2275        # connection
2276
2277        # New Attach Flow, Nova has gone ahead and deleted the attachemnt, this
2278        # is the source/original volume, we've already migrated the data, we're
2279        # basically done with it at this point.  We don't need to issue the
2280        # detach to toggle the status
2281        if orig_volume_status == 'in-use' and volume.status != 'available':
2282            for attachment in volume.volume_attachment:
2283                # Save the attachments the volume currently have
2284                volume_attachments.append(attachment)
2285                try:
2286                    self.detach_volume(ctxt, volume.id, attachment.id)
2287                except Exception as ex:
2288                    LOG.error("Detach migration source volume "
2289                              "%(volume.id)s from attachment "
2290                              "%(attachment.id)s failed: %(err)s",
2291                              {'err': ex,
2292                               'volume.id': volume.id,
2293                               'attachment.id': attachment.id},
2294                              resource=volume)
2295
2296        # Give driver (new_volume) a chance to update things as needed
2297        # after a successful migration.
2298        # Note this needs to go through rpc to the host of the new volume
2299        # the current host and driver object is for the "existing" volume.
2300        rpcapi.update_migrated_volume(ctxt, volume, new_volume,
2301                                      orig_volume_status)
2302        volume.refresh()
2303        new_volume.refresh()
2304
2305        # Swap src and dest DB records so we can continue using the src id and
2306        # asynchronously delete the destination id
2307        updated_new = volume.finish_volume_migration(new_volume)
2308        updates = {'status': orig_volume_status,
2309                   'previous_status': volume.status,
2310                   'migration_status': 'success'}
2311
2312        # NOTE(jdg):  With new attachment API's nova will delete the
2313        # attachment for the source volume for us before calling the
2314        # migration-completion, now we just need to do the swapping on the
2315        # volume record, but don't jack with the attachments other than
2316        # updating volume_id
2317
2318        # In the old flow at this point the volumes are in attaching and
2319        # deleting status (dest/new is deleting, but we've done our magic
2320        # swappy thing so it's a bit confusing, but it does unwind properly
2321        # when you step through it)
2322
2323        # In the new flow we simlified this and we don't need it, instead of
2324        # doing a bunch of swapping we just do attachment-create/delete on the
2325        # nova side, and then here we just do the ID swaps that are necessary
2326        # to maintain the old beahvior
2327
2328        # Restore the attachments for old flow use-case
2329        if orig_volume_status == 'in-use' and volume.status in ['available',
2330                                                                'reserved',
2331                                                                'attaching']:
2332            for attachment in volume_attachments:
2333                LOG.debug('Re-attaching: %s', attachment)
2334                # This is just a db state toggle, the volume is actually
2335                # already attach and in-use, new attachment flow won't allow
2336                # this
2337                rpcapi.attach_volume(ctxt, volume,
2338                                     attachment.instance_uuid,
2339                                     attachment.attached_host,
2340                                     attachment.mountpoint,
2341                                     attachment.attach_mode or 'rw')
2342                # At this point we now have done almost all of our swapping and
2343                # state-changes.  The target volume is now marked back to
2344                # "in-use" the destination/worker volume is now in deleting
2345                # state and the next steps will finish the deletion steps
2346        volume.update(updates)
2347        volume.save()
2348
2349        # Asynchronous deletion of the source volume in the back-end (now
2350        # pointed by the target volume id)
2351        try:
2352            rpcapi.delete_volume(ctxt, updated_new)
2353        except Exception as ex:
2354            LOG.error('Failed to request async delete of migration source '
2355                      'vol %(vol)s: %(err)s',
2356                      {'vol': volume.id, 'err': ex})
2357
2358        # For the new flow this is really the key part.  We just use the
2359        # attachments to the worker/destination volumes that we created and
2360        # used for the libvirt migration and we'll just swap their volume_id
2361        # entries to coorespond with the volume.id swap we did
2362        for attachment in VA_LIST.get_all_by_volume_id(ctxt, updated_new.id):
2363            attachment.volume_id = volume.id
2364            attachment.save()
2365
2366        # Phewww.. that was easy!  Once we get to a point where the old attach
2367        # flow can go away we really should rewrite all of this.
2368        LOG.info("Complete-Migrate volume completed successfully.",
2369                 resource=volume)
2370        return volume.id
2371
2372    def migrate_volume(self, ctxt, volume, host, force_host_copy=False,
2373                       new_type_id=None):
2374        """Migrate the volume to the specified host (called on source host)."""
2375        try:
2376            # NOTE(flaper87): Verify the driver is enabled
2377            # before going forward. The exception will be caught
2378            # and the migration status updated.
2379            utils.require_driver_initialized(self.driver)
2380        except exception.DriverNotInitialized:
2381            with excutils.save_and_reraise_exception():
2382                volume.migration_status = 'error'
2383                volume.save()
2384
2385        model_update = None
2386        moved = False
2387
2388        status_update = None
2389        if volume.status in ('retyping', 'maintenance'):
2390            status_update = {'status': volume.previous_status}
2391
2392        volume.migration_status = 'migrating'
2393        volume.save()
2394        if not force_host_copy and new_type_id is None:
2395            try:
2396                LOG.debug("Issue driver.migrate_volume.", resource=volume)
2397                moved, model_update = self.driver.migrate_volume(ctxt,
2398                                                                 volume,
2399                                                                 host)
2400                if moved:
2401                    dst_service = self._get_service(host['host'])
2402                    updates = {
2403                        'host': host['host'],
2404                        'cluster_name': host.get('cluster_name'),
2405                        'migration_status': 'success',
2406                        'availability_zone': dst_service.availability_zone,
2407                        'previous_status': volume.status,
2408                    }
2409                    if status_update:
2410                        updates.update(status_update)
2411                    if model_update:
2412                        updates.update(model_update)
2413                    volume.update(updates)
2414                    volume.save()
2415            except Exception:
2416                with excutils.save_and_reraise_exception():
2417                    updates = {'migration_status': 'error'}
2418                    if status_update:
2419                        updates.update(status_update)
2420                    volume.update(updates)
2421                    volume.save()
2422        if not moved:
2423            try:
2424                self._migrate_volume_generic(ctxt, volume, host, new_type_id)
2425            except Exception:
2426                with excutils.save_and_reraise_exception():
2427                    updates = {'migration_status': 'error'}
2428                    if status_update:
2429                        updates.update(status_update)
2430                    volume.update(updates)
2431                    volume.save()
2432        LOG.info("Migrate volume completed successfully.",
2433                 resource=volume)
2434
2435    def _report_driver_status(self, context):
2436        # It's possible during live db migration that the self.service_uuid
2437        # value isn't set (we didn't restart services), so we'll go ahead
2438        # and make this a part of the service periodic
2439        if not self.service_uuid:
2440            # We hack this with a try/except for unit tests temporarily
2441            try:
2442                service = self._get_service()
2443                self.service_uuid = service.uuid
2444            except exception.ServiceNotFound:
2445                LOG.warning("Attempt to update service_uuid "
2446                            "resulted in a Service NotFound "
2447                            "exception, service_uuid field on "
2448                            "volumes will be NULL.")
2449
2450        if not self.driver.initialized:
2451            if self.driver.configuration.config_group is None:
2452                config_group = ''
2453            else:
2454                config_group = ('(config name %s)' %
2455                                self.driver.configuration.config_group)
2456
2457            LOG.warning("Update driver status failed: %(config_group)s "
2458                        "is uninitialized.",
2459                        {'config_group': config_group},
2460                        resource={'type': 'driver',
2461                                  'id': self.driver.__class__.__name__})
2462        else:
2463            volume_stats = self.driver.get_volume_stats(refresh=True)
2464            if self.extra_capabilities:
2465                volume_stats.update(self.extra_capabilities)
2466            if volume_stats:
2467
2468                # NOTE(xyang): If driver reports replication_status to be
2469                # 'error' in volume_stats, get model updates from driver
2470                # and update db
2471                if volume_stats.get('replication_status') == (
2472                        fields.ReplicationStatus.ERROR):
2473                    filters = self._get_cluster_or_host_filters()
2474                    groups = objects.GroupList.get_all_replicated(
2475                        context, filters=filters)
2476                    group_model_updates, volume_model_updates = (
2477                        self.driver.get_replication_error_status(context,
2478                                                                 groups))
2479                    for grp_update in group_model_updates:
2480                        try:
2481                            grp_obj = objects.Group.get_by_id(
2482                                context, grp_update['group_id'])
2483                            grp_obj.update(grp_update)
2484                            grp_obj.save()
2485                        except exception.GroupNotFound:
2486                            # Group may be deleted already. Log a warning
2487                            # and continue.
2488                            LOG.warning("Group %(grp)s not found while "
2489                                        "updating driver status.",
2490                                        {'grp': grp_update['group_id']},
2491                                        resource={
2492                                            'type': 'group',
2493                                            'id': grp_update['group_id']})
2494                    for vol_update in volume_model_updates:
2495                        try:
2496                            vol_obj = objects.Volume.get_by_id(
2497                                context, vol_update['volume_id'])
2498                            vol_obj.update(vol_update)
2499                            vol_obj.save()
2500                        except exception.VolumeNotFound:
2501                            # Volume may be deleted already. Log a warning
2502                            # and continue.
2503                            LOG.warning("Volume %(vol)s not found while "
2504                                        "updating driver status.",
2505                                        {'vol': vol_update['volume_id']},
2506                                        resource={
2507                                            'type': 'volume',
2508                                            'id': vol_update['volume_id']})
2509
2510                # Append volume stats with 'allocated_capacity_gb'
2511                self._append_volume_stats(volume_stats)
2512
2513                # Append filter and goodness function if needed
2514                volume_stats = (
2515                    self._append_filter_goodness_functions(volume_stats))
2516
2517                # queue it to be sent to the Schedulers.
2518                self.update_service_capabilities(volume_stats)
2519
2520    def _append_volume_stats(self, vol_stats):
2521        pools = vol_stats.get('pools', None)
2522        if pools:
2523            if isinstance(pools, list):
2524                for pool in pools:
2525                    pool_name = pool['pool_name']
2526                    try:
2527                        pool_stats = self.stats['pools'][pool_name]
2528                    except KeyError:
2529                        # Pool not found in volume manager
2530                        pool_stats = dict(allocated_capacity_gb=0)
2531
2532                    pool.update(pool_stats)
2533            else:
2534                raise exception.ProgrammingError(
2535                    reason='Pools stats reported by the driver are not '
2536                           'reported in a list')
2537        # For drivers that are not reporting their stats by pool we will use
2538        # the data from the special fixed pool created by
2539        # _count_allocated_capacity.
2540        elif self.stats.get('pools'):
2541            vol_stats.update(next(iter(self.stats['pools'].values())))
2542        # This is a special subcase of the above no pool case that happens when
2543        # we don't have any volumes yet.
2544        else:
2545            vol_stats.update(self.stats)
2546            vol_stats.pop('pools', None)
2547
2548    def _append_filter_goodness_functions(self, volume_stats):
2549        """Returns volume_stats updated as needed."""
2550
2551        # Append filter_function if needed
2552        if 'filter_function' not in volume_stats:
2553            volume_stats['filter_function'] = (
2554                self.driver.get_filter_function())
2555
2556        # Append goodness_function if needed
2557        if 'goodness_function' not in volume_stats:
2558            volume_stats['goodness_function'] = (
2559                self.driver.get_goodness_function())
2560
2561        return volume_stats
2562
2563    @periodic_task.periodic_task
2564    def publish_service_capabilities(self, context):
2565        """Collect driver status and then publish."""
2566        self._report_driver_status(context)
2567        self._publish_service_capabilities(context)
2568
2569    def _notify_about_volume_usage(self,
2570                                   context,
2571                                   volume,
2572                                   event_suffix,
2573                                   extra_usage_info=None):
2574        vol_utils.notify_about_volume_usage(
2575            context, volume, event_suffix,
2576            extra_usage_info=extra_usage_info, host=self.host)
2577
2578    def _notify_about_snapshot_usage(self,
2579                                     context,
2580                                     snapshot,
2581                                     event_suffix,
2582                                     extra_usage_info=None):
2583        vol_utils.notify_about_snapshot_usage(
2584            context, snapshot, event_suffix,
2585            extra_usage_info=extra_usage_info, host=self.host)
2586
2587    def _notify_about_group_usage(self,
2588                                  context,
2589                                  group,
2590                                  event_suffix,
2591                                  volumes=None,
2592                                  extra_usage_info=None):
2593        vol_utils.notify_about_group_usage(
2594            context, group, event_suffix,
2595            extra_usage_info=extra_usage_info, host=self.host)
2596
2597        if not volumes:
2598            volumes = self.db.volume_get_all_by_generic_group(
2599                context, group.id)
2600        if volumes:
2601            for volume in volumes:
2602                vol_utils.notify_about_volume_usage(
2603                    context, volume, event_suffix,
2604                    extra_usage_info=extra_usage_info, host=self.host)
2605
2606    def _notify_about_group_snapshot_usage(self,
2607                                           context,
2608                                           group_snapshot,
2609                                           event_suffix,
2610                                           snapshots=None,
2611                                           extra_usage_info=None):
2612        vol_utils.notify_about_group_snapshot_usage(
2613            context, group_snapshot, event_suffix,
2614            extra_usage_info=extra_usage_info, host=self.host)
2615
2616        if not snapshots:
2617            snapshots = objects.SnapshotList.get_all_for_group_snapshot(
2618                context, group_snapshot.id)
2619        if snapshots:
2620            for snapshot in snapshots:
2621                vol_utils.notify_about_snapshot_usage(
2622                    context, snapshot, event_suffix,
2623                    extra_usage_info=extra_usage_info, host=self.host)
2624
2625    def extend_volume(self, context, volume, new_size, reservations):
2626        try:
2627            # NOTE(flaper87): Verify the driver is enabled
2628            # before going forward. The exception will be caught
2629            # and the volume status updated.
2630            utils.require_driver_initialized(self.driver)
2631        except exception.DriverNotInitialized:
2632            with excutils.save_and_reraise_exception():
2633                volume.status = 'error_extending'
2634                volume.save()
2635
2636        project_id = volume.project_id
2637        size_increase = (int(new_size)) - volume.size
2638        self._notify_about_volume_usage(context, volume, "resize.start")
2639        try:
2640            self.driver.extend_volume(volume, new_size)
2641        except exception.TargetUpdateFailed:
2642            # We just want to log this but continue on with quota commit
2643            LOG.warning('Volume extended but failed to update target.')
2644        except Exception:
2645            LOG.exception("Extend volume failed.",
2646                          resource=volume)
2647            self.message_api.create(
2648                context,
2649                message_field.Action.EXTEND_VOLUME,
2650                resource_uuid=volume.id,
2651                detail=message_field.Detail.DRIVER_FAILED_EXTEND)
2652            try:
2653                self.db.volume_update(context, volume.id,
2654                                      {'status': 'error_extending'})
2655                raise exception.CinderException(_("Volume %s: Error trying "
2656                                                  "to extend volume") %
2657                                                volume.id)
2658            finally:
2659                QUOTAS.rollback(context, reservations, project_id=project_id)
2660                return
2661
2662        QUOTAS.commit(context, reservations, project_id=project_id)
2663
2664        attachments = volume.volume_attachment
2665        if not attachments:
2666            orig_volume_status = 'available'
2667        else:
2668            orig_volume_status = 'in-use'
2669
2670        volume.update({'size': int(new_size), 'status': orig_volume_status})
2671        volume.save()
2672
2673        if orig_volume_status == 'in-use':
2674            nova_api = compute.API()
2675            instance_uuids = [attachment.instance_uuid
2676                              for attachment in attachments]
2677            nova_api.extend_volume(context, instance_uuids, volume.id)
2678
2679        pool = vol_utils.extract_host(volume.host, 'pool')
2680        if pool is None:
2681            # Legacy volume, put them into default pool
2682            pool = self.driver.configuration.safe_get(
2683                'volume_backend_name') or vol_utils.extract_host(
2684                    volume.host, 'pool', True)
2685
2686        try:
2687            self.stats['pools'][pool]['allocated_capacity_gb'] += size_increase
2688        except KeyError:
2689            self.stats['pools'][pool] = dict(
2690                allocated_capacity_gb=size_increase)
2691
2692        self._notify_about_volume_usage(
2693            context, volume, "resize.end",
2694            extra_usage_info={'size': int(new_size)})
2695        LOG.info("Extend volume completed successfully.",
2696                 resource=volume)
2697
2698    def _is_our_backend(self, host, cluster_name):
2699        return ((not cluster_name and
2700                 vol_utils.hosts_are_equivalent(self.driver.host, host)) or
2701                (cluster_name and
2702                 vol_utils.hosts_are_equivalent(self.driver.cluster_name,
2703                                                cluster_name)))
2704
2705    def retype(self, context, volume, new_type_id, host,
2706               migration_policy='never', reservations=None,
2707               old_reservations=None):
2708
2709        def _retype_error(context, volume, old_reservations,
2710                          new_reservations, status_update):
2711            try:
2712                volume.update(status_update)
2713                volume.save()
2714            finally:
2715                QUOTAS.rollback(context, old_reservations)
2716                QUOTAS.rollback(context, new_reservations)
2717
2718        status_update = {'status': volume.previous_status}
2719        if context.project_id != volume.project_id:
2720            project_id = volume.project_id
2721        else:
2722            project_id = context.project_id
2723
2724        try:
2725            # NOTE(flaper87): Verify the driver is enabled
2726            # before going forward. The exception will be caught
2727            # and the volume status updated.
2728            utils.require_driver_initialized(self.driver)
2729        except exception.DriverNotInitialized:
2730            with excutils.save_and_reraise_exception():
2731                # NOTE(flaper87): Other exceptions in this method don't
2732                # set the volume status to error. Should that be done
2733                # here? Setting the volume back to it's original status
2734                # for now.
2735                volume.update(status_update)
2736                volume.save()
2737
2738        # We already got the new reservations
2739        new_reservations = reservations
2740
2741        # If volume types have the same contents, no need to do anything.
2742        # Use the admin contex to be able to access volume extra_specs
2743        retyped = False
2744        diff, all_equal = volume_types.volume_types_diff(
2745            context.elevated(), volume.volume_type_id, new_type_id)
2746        if all_equal:
2747            retyped = True
2748
2749        # Call driver to try and change the type
2750        retype_model_update = None
2751
2752        # NOTE(jdg): Check to see if the destination host or cluster (depending
2753        # if it's the volume is in a clustered backend or not) is the same as
2754        # the current.  If it's not don't call the driver.retype method,
2755        # otherwise drivers that implement retype may report success, but it's
2756        # invalid in the case of a migrate.
2757
2758        # We assume that those that support pools do this internally
2759        # so we strip off the pools designation
2760
2761        if (not retyped and
2762                not diff.get('encryption') and
2763                self._is_our_backend(host['host'], host.get('cluster_name'))):
2764            try:
2765                new_type = volume_types.get_volume_type(context.elevated(),
2766                                                        new_type_id)
2767                with volume.obj_as_admin():
2768                    ret = self.driver.retype(context,
2769                                             volume,
2770                                             new_type,
2771                                             diff,
2772                                             host)
2773                # Check if the driver retype provided a model update or
2774                # just a retype indication
2775                if type(ret) == tuple:
2776                    retyped, retype_model_update = ret
2777                else:
2778                    retyped = ret
2779
2780                if retyped:
2781                    LOG.info("Volume %s: retyped successfully.", volume.id)
2782            except Exception:
2783                retyped = False
2784                LOG.exception("Volume %s: driver error when trying to "
2785                              "retype, falling back to generic "
2786                              "mechanism.", volume.id)
2787
2788        # We could not change the type, so we need to migrate the volume, where
2789        # the destination volume will be of the new type
2790        if not retyped:
2791            if migration_policy == 'never':
2792                _retype_error(context, volume, old_reservations,
2793                              new_reservations, status_update)
2794                msg = _("Retype requires migration but is not allowed.")
2795                raise exception.VolumeMigrationFailed(reason=msg)
2796
2797            snaps = objects.SnapshotList.get_all_for_volume(context,
2798                                                            volume.id)
2799            if snaps:
2800                _retype_error(context, volume, old_reservations,
2801                              new_reservations, status_update)
2802                msg = _("Volume must not have snapshots.")
2803                LOG.error(msg)
2804                raise exception.InvalidVolume(reason=msg)
2805
2806            # Don't allow volume with replicas to be migrated
2807            rep_status = volume.replication_status
2808            if(rep_status is not None and rep_status not in
2809                    [fields.ReplicationStatus.DISABLED,
2810                     fields.ReplicationStatus.NOT_CAPABLE]):
2811                _retype_error(context, volume, old_reservations,
2812                              new_reservations, status_update)
2813                msg = _("Volume must not be replicated.")
2814                LOG.error(msg)
2815                raise exception.InvalidVolume(reason=msg)
2816
2817            volume.migration_status = 'starting'
2818            volume.save()
2819
2820            try:
2821                self.migrate_volume(context, volume, host,
2822                                    new_type_id=new_type_id)
2823            except Exception:
2824                with excutils.save_and_reraise_exception():
2825                    _retype_error(context, volume, old_reservations,
2826                                  new_reservations, status_update)
2827        else:
2828            model_update = {'volume_type_id': new_type_id,
2829                            'host': host['host'],
2830                            'cluster_name': host.get('cluster_name'),
2831                            'status': status_update['status']}
2832            if retype_model_update:
2833                model_update.update(retype_model_update)
2834            self._set_replication_status(diff, model_update)
2835            volume.update(model_update)
2836            volume.save()
2837
2838        if old_reservations:
2839            QUOTAS.commit(context, old_reservations, project_id=project_id)
2840        if new_reservations:
2841            QUOTAS.commit(context, new_reservations, project_id=project_id)
2842        self._notify_about_volume_usage(
2843            context, volume, "retype",
2844            extra_usage_info={'volume_type': new_type_id})
2845        self.publish_service_capabilities(context)
2846        LOG.info("Retype volume completed successfully.",
2847                 resource=volume)
2848
2849    @staticmethod
2850    def _set_replication_status(diff, model_update):
2851        """Update replication_status in model_update if it has changed."""
2852        if not diff or model_update.get('replication_status'):
2853            return
2854
2855        diff_specs = diff.get('extra_specs', {})
2856        replication_diff = diff_specs.get('replication_enabled')
2857
2858        if replication_diff:
2859            is_replicated = vol_utils.is_replicated_str(replication_diff[1])
2860            if is_replicated:
2861                replication_status = fields.ReplicationStatus.ENABLED
2862            else:
2863                replication_status = fields.ReplicationStatus.DISABLED
2864            model_update['replication_status'] = replication_status
2865
2866    def manage_existing(self, ctxt, volume, ref=None):
2867        vol_ref = self._run_manage_existing_flow_engine(
2868            ctxt, volume, ref)
2869
2870        self._update_stats_for_managed(vol_ref)
2871
2872        LOG.info("Manage existing volume completed successfully.",
2873                 resource=vol_ref)
2874        return vol_ref.id
2875
2876    def _update_stats_for_managed(self, volume_reference):
2877        # Update volume stats
2878        pool = vol_utils.extract_host(volume_reference.host, 'pool')
2879        if pool is None:
2880            # Legacy volume, put them into default pool
2881            pool = self.driver.configuration.safe_get(
2882                'volume_backend_name') or vol_utils.extract_host(
2883                    volume_reference.host, 'pool', True)
2884
2885        try:
2886            self.stats['pools'][pool]['allocated_capacity_gb'] \
2887                += volume_reference.size
2888        except KeyError:
2889            self.stats['pools'][pool] = dict(
2890                allocated_capacity_gb=volume_reference.size)
2891
2892    def _run_manage_existing_flow_engine(self, ctxt, volume, ref):
2893        try:
2894            flow_engine = manage_existing.get_flow(
2895                ctxt,
2896                self.db,
2897                self.driver,
2898                self.host,
2899                volume,
2900                ref,
2901            )
2902        except Exception:
2903            msg = _("Failed to create manage_existing flow.")
2904            LOG.exception(msg, resource={'type': 'volume', 'id': volume.id})
2905            raise exception.CinderException(msg)
2906
2907        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
2908            flow_engine.run()
2909
2910        # Fetch created volume from storage
2911        vol_ref = flow_engine.storage.fetch('volume')
2912
2913        return vol_ref
2914
2915    def _get_cluster_or_host_filters(self):
2916        if self.cluster:
2917            filters = {'cluster_name': self.cluster}
2918        else:
2919            filters = {'host': self.host}
2920        return filters
2921
2922    def _get_my_resources(self, ctxt, ovo_class_list):
2923        filters = self._get_cluster_or_host_filters()
2924        return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters)
2925
2926    def _get_my_volumes(self, ctxt):
2927        return self._get_my_resources(ctxt, objects.VolumeList)
2928
2929    def _get_my_snapshots(self, ctxt):
2930        return self._get_my_resources(ctxt, objects.SnapshotList)
2931
2932    def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys,
2933                               sort_dirs, want_objects=False):
2934        try:
2935            utils.require_driver_initialized(self.driver)
2936        except exception.DriverNotInitialized:
2937            with excutils.save_and_reraise_exception():
2938                LOG.exception("Listing manageable volumes failed, due "
2939                              "to uninitialized driver.")
2940
2941        cinder_volumes = self._get_my_volumes(ctxt)
2942        try:
2943            driver_entries = self.driver.get_manageable_volumes(
2944                cinder_volumes, marker, limit, offset, sort_keys, sort_dirs)
2945            if want_objects:
2946                driver_entries = (objects.ManageableVolumeList.
2947                                  from_primitives(ctxt, driver_entries))
2948        except Exception:
2949            with excutils.save_and_reraise_exception():
2950                LOG.exception("Listing manageable volumes failed, due "
2951                              "to driver error.")
2952        return driver_entries
2953
2954    def create_group(self, context, group):
2955        """Creates the group."""
2956        context = context.elevated()
2957
2958        # Make sure the host in the DB matches our own when clustered
2959        self._set_resource_host(group)
2960
2961        status = fields.GroupStatus.AVAILABLE
2962        model_update = None
2963
2964        self._notify_about_group_usage(context, group, "create.start")
2965
2966        try:
2967            utils.require_driver_initialized(self.driver)
2968
2969            LOG.info("Group %s: creating", group.name)
2970
2971            try:
2972                model_update = self.driver.create_group(context, group)
2973            except NotImplementedError:
2974                if not group_types.is_default_cgsnapshot_type(
2975                        group.group_type_id):
2976                    model_update = self._create_group_generic(context, group)
2977                else:
2978                    cg, __ = self._convert_group_to_cg(group, [])
2979                    model_update = self.driver.create_consistencygroup(
2980                        context, cg)
2981
2982            if model_update:
2983                if (model_update['status'] ==
2984                        fields.GroupStatus.ERROR):
2985                    msg = (_('Create group failed.'))
2986                    LOG.error(msg,
2987                              resource={'type': 'group',
2988                                        'id': group.id})
2989                    raise exception.VolumeDriverException(message=msg)
2990                else:
2991                    group.update(model_update)
2992                    group.save()
2993        except Exception:
2994            with excutils.save_and_reraise_exception():
2995                group.status = fields.GroupStatus.ERROR
2996                group.save()
2997                LOG.error("Group %s: create failed",
2998                          group.name)
2999
3000        group.status = status
3001        group.created_at = timeutils.utcnow()
3002        group.save()
3003        LOG.info("Group %s: created successfully", group.name)
3004
3005        self._notify_about_group_usage(context, group, "create.end")
3006
3007        LOG.info("Create group completed successfully.",
3008                 resource={'type': 'group',
3009                           'id': group.id})
3010        return group
3011
3012    def create_group_from_src(self, context, group,
3013                              group_snapshot=None, source_group=None):
3014        """Creates the group from source.
3015
3016        The source can be a group snapshot or a source group.
3017        """
3018        source_name = None
3019        snapshots = None
3020        source_vols = None
3021        try:
3022            volumes = objects.VolumeList.get_all_by_generic_group(context,
3023                                                                  group.id)
3024            if group_snapshot:
3025                try:
3026                    # Check if group_snapshot still exists
3027                    group_snapshot.refresh()
3028                except exception.GroupSnapshotNotFound:
3029                    LOG.error("Create group from snapshot-%(snap)s failed: "
3030                              "SnapshotNotFound.",
3031                              {'snap': group_snapshot.id},
3032                              resource={'type': 'group',
3033                                        'id': group.id})
3034                    raise
3035
3036                source_name = _("snapshot-%s") % group_snapshot.id
3037                snapshots = objects.SnapshotList.get_all_for_group_snapshot(
3038                    context, group_snapshot.id)
3039                for snap in snapshots:
3040                    if (snap.status not in
3041                            VALID_CREATE_GROUP_SRC_SNAP_STATUS):
3042                        msg = (_("Cannot create group "
3043                                 "%(group)s because snapshot %(snap)s is "
3044                                 "not in a valid state. Valid states are: "
3045                                 "%(valid)s.") %
3046                               {'group': group.id,
3047                                'snap': snap['id'],
3048                                'valid': VALID_CREATE_GROUP_SRC_SNAP_STATUS})
3049                        raise exception.InvalidGroup(reason=msg)
3050
3051            if source_group:
3052                try:
3053                    source_group.refresh()
3054                except exception.GroupNotFound:
3055                    LOG.error("Create group "
3056                              "from source group-%(group)s failed: "
3057                              "GroupNotFound.",
3058                              {'group': source_group.id},
3059                              resource={'type': 'group',
3060                                        'id': group.id})
3061                    raise
3062
3063                source_name = _("group-%s") % source_group.id
3064                source_vols = objects.VolumeList.get_all_by_generic_group(
3065                    context, source_group.id)
3066                for source_vol in source_vols:
3067                    if (source_vol.status not in
3068                            VALID_CREATE_GROUP_SRC_GROUP_STATUS):
3069                        msg = (_("Cannot create group "
3070                                 "%(group)s because source volume "
3071                                 "%(source_vol)s is not in a valid "
3072                                 "state. Valid states are: "
3073                                 "%(valid)s.") %
3074                               {'group': group.id,
3075                                'source_vol': source_vol.id,
3076                                'valid': VALID_CREATE_GROUP_SRC_GROUP_STATUS})
3077                        raise exception.InvalidGroup(reason=msg)
3078
3079            # Sort source snapshots so that they are in the same order as their
3080            # corresponding target volumes.
3081            sorted_snapshots = None
3082            if group_snapshot and snapshots:
3083                sorted_snapshots = self._sort_snapshots(volumes, snapshots)
3084
3085            # Sort source volumes so that they are in the same order as their
3086            # corresponding target volumes.
3087            sorted_source_vols = None
3088            if source_group and source_vols:
3089                sorted_source_vols = self._sort_source_vols(volumes,
3090                                                            source_vols)
3091
3092            self._notify_about_group_usage(
3093                context, group, "create.start")
3094
3095            utils.require_driver_initialized(self.driver)
3096
3097            try:
3098                model_update, volumes_model_update = (
3099                    self.driver.create_group_from_src(
3100                        context, group, volumes, group_snapshot,
3101                        sorted_snapshots, source_group, sorted_source_vols))
3102            except NotImplementedError:
3103                if not group_types.is_default_cgsnapshot_type(
3104                        group.group_type_id):
3105                    model_update, volumes_model_update = (
3106                        self._create_group_from_src_generic(
3107                            context, group, volumes, group_snapshot,
3108                            sorted_snapshots, source_group,
3109                            sorted_source_vols))
3110                else:
3111                    cg, volumes = self._convert_group_to_cg(
3112                        group, volumes)
3113                    cgsnapshot, sorted_snapshots = (
3114                        self._convert_group_snapshot_to_cgsnapshot(
3115                            group_snapshot, sorted_snapshots, context))
3116                    source_cg, sorted_source_vols = (
3117                        self._convert_group_to_cg(source_group,
3118                                                  sorted_source_vols))
3119                    model_update, volumes_model_update = (
3120                        self.driver.create_consistencygroup_from_src(
3121                            context, cg, volumes, cgsnapshot,
3122                            sorted_snapshots, source_cg, sorted_source_vols))
3123                    self._remove_cgsnapshot_id_from_snapshots(sorted_snapshots)
3124                    self._remove_consistencygroup_id_from_volumes(volumes)
3125                    self._remove_consistencygroup_id_from_volumes(
3126                        sorted_source_vols)
3127
3128            if volumes_model_update:
3129                for update in volumes_model_update:
3130                    self.db.volume_update(context, update['id'], update)
3131
3132            if model_update:
3133                group.update(model_update)
3134                group.save()
3135
3136        except Exception:
3137            with excutils.save_and_reraise_exception():
3138                group.status = fields.GroupStatus.ERROR
3139                group.save()
3140                LOG.error("Create group "
3141                          "from source %(source)s failed.",
3142                          {'source': source_name},
3143                          resource={'type': 'group',
3144                                    'id': group.id})
3145                # Update volume status to 'error' as well.
3146                self._remove_consistencygroup_id_from_volumes(volumes)
3147                for vol in volumes:
3148                    vol.status = 'error'
3149                    vol.save()
3150
3151        now = timeutils.utcnow()
3152        status = 'available'
3153        for vol in volumes:
3154            update = {'status': status, 'created_at': now}
3155            self._update_volume_from_src(context, vol, update, group=group)
3156            self._update_allocated_capacity(vol)
3157
3158        group.status = status
3159        group.created_at = now
3160        group.save()
3161
3162        self._notify_about_group_usage(
3163            context, group, "create.end")
3164        LOG.info("Create group "
3165                 "from source-%(source)s completed successfully.",
3166                 {'source': source_name},
3167                 resource={'type': 'group',
3168                           'id': group.id})
3169        return group
3170
3171    def _create_group_from_src_generic(self, context, group, volumes,
3172                                       group_snapshot=None, snapshots=None,
3173                                       source_group=None, source_vols=None):
3174        """Creates a group from source.
3175
3176        :param context: the context of the caller.
3177        :param group: the Group object to be created.
3178        :param volumes: a list of volume objects in the group.
3179        :param group_snapshot: the GroupSnapshot object as source.
3180        :param snapshots: a list of snapshot objects in group_snapshot.
3181        :param source_group: the Group object as source.
3182        :param source_vols: a list of volume objects in the source_group.
3183        :returns: model_update, volumes_model_update
3184        """
3185        model_update = {'status': 'available'}
3186        volumes_model_update = []
3187        for vol in volumes:
3188            if snapshots:
3189                for snapshot in snapshots:
3190                    if vol.snapshot_id == snapshot.id:
3191                        vol_model_update = {'id': vol.id}
3192                        try:
3193                            driver_update = (
3194                                self.driver.create_volume_from_snapshot(
3195                                    vol, snapshot))
3196                            if driver_update:
3197                                driver_update.pop('id', None)
3198                                vol_model_update.update(driver_update)
3199                            if 'status' not in vol_model_update:
3200                                vol_model_update['status'] = 'available'
3201                        except Exception:
3202                            vol_model_update['status'] = 'error'
3203                            model_update['status'] = 'error'
3204                        volumes_model_update.append(vol_model_update)
3205                        break
3206            elif source_vols:
3207                for source_vol in source_vols:
3208                    if vol.source_volid == source_vol.id:
3209                        vol_model_update = {'id': vol.id}
3210                        try:
3211                            driver_update = self.driver.create_cloned_volume(
3212                                vol, source_vol)
3213                            if driver_update:
3214                                driver_update.pop('id', None)
3215                                vol_model_update.update(driver_update)
3216                            if 'status' not in vol_model_update:
3217                                vol_model_update['status'] = 'available'
3218                        except Exception:
3219                            vol_model_update['status'] = 'error'
3220                            model_update['status'] = 'error'
3221                        volumes_model_update.append(vol_model_update)
3222                        break
3223
3224        return model_update, volumes_model_update
3225
3226    def _sort_snapshots(self, volumes, snapshots):
3227        # Sort source snapshots so that they are in the same order as their
3228        # corresponding target volumes. Each source snapshot in the snapshots
3229        # list should have a corresponding target volume in the volumes list.
3230        if not volumes or not snapshots or len(volumes) != len(snapshots):
3231            msg = _("Input volumes or snapshots are invalid.")
3232            LOG.error(msg)
3233            raise exception.InvalidInput(reason=msg)
3234
3235        sorted_snapshots = []
3236        for vol in volumes:
3237            found_snaps = [snap for snap in snapshots
3238                           if snap['id'] == vol['snapshot_id']]
3239            if not found_snaps:
3240                LOG.error("Source snapshot cannot be found for target "
3241                          "volume %(volume_id)s.",
3242                          {'volume_id': vol['id']})
3243                raise exception.SnapshotNotFound(
3244                    snapshot_id=vol['snapshot_id'])
3245            sorted_snapshots.extend(found_snaps)
3246
3247        return sorted_snapshots
3248
3249    def _sort_source_vols(self, volumes, source_vols):
3250        # Sort source volumes so that they are in the same order as their
3251        # corresponding target volumes. Each source volume in the source_vols
3252        # list should have a corresponding target volume in the volumes list.
3253        if not volumes or not source_vols or len(volumes) != len(source_vols):
3254            msg = _("Input volumes or source volumes are invalid.")
3255            LOG.error(msg)
3256            raise exception.InvalidInput(reason=msg)
3257
3258        sorted_source_vols = []
3259        for vol in volumes:
3260            found_source_vols = [source_vol for source_vol in source_vols
3261                                 if source_vol['id'] == vol['source_volid']]
3262            if not found_source_vols:
3263                LOG.error("Source volumes cannot be found for target "
3264                          "volume %(volume_id)s.",
3265                          {'volume_id': vol['id']})
3266                raise exception.VolumeNotFound(
3267                    volume_id=vol['source_volid'])
3268            sorted_source_vols.extend(found_source_vols)
3269
3270        return sorted_source_vols
3271
3272    def _update_volume_from_src(self, context, vol, update, group=None):
3273        try:
3274            snapshot_id = vol.get('snapshot_id')
3275            source_volid = vol.get('source_volid')
3276            if snapshot_id:
3277                snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
3278                orig_vref = self.db.volume_get(context,
3279                                               snapshot.volume_id)
3280                if orig_vref.bootable:
3281                    update['bootable'] = True
3282                    self.db.volume_glance_metadata_copy_to_volume(
3283                        context, vol['id'], snapshot_id)
3284            if source_volid:
3285                source_vol = objects.Volume.get_by_id(context, source_volid)
3286                if source_vol.bootable:
3287                    update['bootable'] = True
3288                    self.db.volume_glance_metadata_copy_from_volume_to_volume(
3289                        context, source_volid, vol['id'])
3290                if source_vol.multiattach:
3291                    update['multiattach'] = True
3292
3293        except exception.SnapshotNotFound:
3294            LOG.error("Source snapshot %(snapshot_id)s cannot be found.",
3295                      {'snapshot_id': vol['snapshot_id']})
3296            self.db.volume_update(context, vol['id'],
3297                                  {'status': 'error'})
3298            if group:
3299                group.status = fields.GroupStatus.ERROR
3300                group.save()
3301            raise
3302        except exception.VolumeNotFound:
3303            LOG.error("The source volume %(volume_id)s "
3304                      "cannot be found.",
3305                      {'volume_id': snapshot.volume_id})
3306            self.db.volume_update(context, vol['id'],
3307                                  {'status': 'error'})
3308            if group:
3309                group.status = fields.GroupStatus.ERROR
3310                group.save()
3311            raise
3312        except exception.CinderException as ex:
3313            LOG.error("Failed to update %(volume_id)s"
3314                      " metadata using the provided snapshot"
3315                      " %(snapshot_id)s metadata.",
3316                      {'volume_id': vol['id'],
3317                       'snapshot_id': vol['snapshot_id']})
3318            self.db.volume_update(context, vol['id'],
3319                                  {'status': 'error'})
3320            if group:
3321                group.status = fields.GroupStatus.ERROR
3322                group.save()
3323            raise exception.MetadataCopyFailure(reason=six.text_type(ex))
3324
3325        self.db.volume_update(context, vol['id'], update)
3326
3327    def _update_allocated_capacity(self, vol, decrement=False, host=None):
3328        # Update allocated capacity in volume stats
3329        host = host or vol['host']
3330        pool = vol_utils.extract_host(host, 'pool')
3331        if pool is None:
3332            # Legacy volume, put them into default pool
3333            pool = self.driver.configuration.safe_get(
3334                'volume_backend_name') or vol_utils.extract_host(host, 'pool',
3335                                                                 True)
3336
3337        vol_size = -vol['size'] if decrement else vol['size']
3338        try:
3339            self.stats['pools'][pool]['allocated_capacity_gb'] += vol_size
3340        except KeyError:
3341            self.stats['pools'][pool] = dict(
3342                allocated_capacity_gb=max(vol_size, 0))
3343
3344    def delete_group(self, context, group):
3345        """Deletes group and the volumes in the group."""
3346        context = context.elevated()
3347        project_id = group.project_id
3348
3349        if context.project_id != group.project_id:
3350            project_id = group.project_id
3351        else:
3352            project_id = context.project_id
3353
3354        volumes = objects.VolumeList.get_all_by_generic_group(
3355            context, group.id)
3356
3357        for vol_obj in volumes:
3358            if vol_obj.attach_status == "attached":
3359                # Volume is still attached, need to detach first
3360                raise exception.VolumeAttached(volume_id=vol_obj.id)
3361            self._check_is_our_resource(vol_obj)
3362
3363        self._notify_about_group_usage(
3364            context, group, "delete.start")
3365
3366        volumes_model_update = None
3367        model_update = None
3368        try:
3369            utils.require_driver_initialized(self.driver)
3370
3371            try:
3372                model_update, volumes_model_update = (
3373                    self.driver.delete_group(context, group, volumes))
3374            except NotImplementedError:
3375                if not group_types.is_default_cgsnapshot_type(
3376                        group.group_type_id):
3377                    model_update, volumes_model_update = (
3378                        self._delete_group_generic(context, group, volumes))
3379                else:
3380                    cg, volumes = self._convert_group_to_cg(
3381                        group, volumes)
3382                    model_update, volumes_model_update = (
3383                        self.driver.delete_consistencygroup(context, cg,
3384                                                            volumes))
3385                    self._remove_consistencygroup_id_from_volumes(volumes)
3386
3387            if volumes_model_update:
3388                for update in volumes_model_update:
3389                    # If we failed to delete a volume, make sure the
3390                    # status for the group is set to error as well
3391                    if (update['status'] in ['error_deleting', 'error']
3392                            and model_update['status'] not in
3393                            ['error_deleting', 'error']):
3394                        model_update['status'] = update['status']
3395                self.db.volumes_update(context, volumes_model_update)
3396
3397            if model_update:
3398                if model_update['status'] in ['error_deleting', 'error']:
3399                    msg = (_('Delete group failed.'))
3400                    LOG.error(msg,
3401                              resource={'type': 'group',
3402                                        'id': group.id})
3403                    raise exception.VolumeDriverException(message=msg)
3404                else:
3405                    group.update(model_update)
3406                    group.save()
3407
3408        except Exception:
3409            with excutils.save_and_reraise_exception():
3410                group.status = fields.GroupStatus.ERROR
3411                group.save()
3412                # Update volume status to 'error' if driver returns
3413                # None for volumes_model_update.
3414                if not volumes_model_update:
3415                    self._remove_consistencygroup_id_from_volumes(volumes)
3416                    for vol_obj in volumes:
3417                        vol_obj.status = 'error'
3418                        vol_obj.save()
3419
3420        # Get reservations for group
3421        try:
3422            reserve_opts = {'groups': -1}
3423            grpreservations = GROUP_QUOTAS.reserve(context,
3424                                                   project_id=project_id,
3425                                                   **reserve_opts)
3426        except Exception:
3427            grpreservations = None
3428            LOG.exception("Delete group "
3429                          "failed to update usages.",
3430                          resource={'type': 'group',
3431                                    'id': group.id})
3432
3433        for vol in volumes:
3434            # Get reservations for volume
3435            try:
3436                reserve_opts = {'volumes': -1,
3437                                'gigabytes': -vol.size}
3438                QUOTAS.add_volume_type_opts(context,
3439                                            reserve_opts,
3440                                            vol.volume_type_id)
3441                reservations = QUOTAS.reserve(context,
3442                                              project_id=project_id,
3443                                              **reserve_opts)
3444            except Exception:
3445                reservations = None
3446                LOG.exception("Delete group "
3447                              "failed to update usages.",
3448                              resource={'type': 'group',
3449                                        'id': group.id})
3450
3451            # Delete glance metadata if it exists
3452            self.db.volume_glance_metadata_delete_by_volume(context, vol.id)
3453
3454            vol.destroy()
3455
3456            # Commit the reservations
3457            if reservations:
3458                QUOTAS.commit(context, reservations, project_id=project_id)
3459
3460            self.stats['allocated_capacity_gb'] -= vol.size
3461
3462        if grpreservations:
3463            GROUP_QUOTAS.commit(context, grpreservations,
3464                                project_id=project_id)
3465
3466        group.destroy()
3467        self._notify_about_group_usage(
3468            context, group, "delete.end")
3469        self.publish_service_capabilities(context)
3470        LOG.info("Delete group "
3471                 "completed successfully.",
3472                 resource={'type': 'group',
3473                           'id': group.id})
3474
3475    def _convert_group_to_cg(self, group, volumes):
3476        if not group:
3477            return None, None
3478        cg = consistencygroup.ConsistencyGroup()
3479        cg.from_group(group)
3480        for vol in volumes:
3481            vol.consistencygroup_id = vol.group_id
3482            vol.consistencygroup = cg
3483
3484        return cg, volumes
3485
3486    def _remove_consistencygroup_id_from_volumes(self, volumes):
3487        if not volumes:
3488            return
3489        for vol in volumes:
3490            vol.consistencygroup_id = None
3491            vol.consistencygroup = None
3492
3493    def _convert_group_snapshot_to_cgsnapshot(self, group_snapshot, snapshots,
3494                                              ctxt):
3495        if not group_snapshot:
3496            return None, None
3497        cgsnap = cgsnapshot.CGSnapshot()
3498        cgsnap.from_group_snapshot(group_snapshot)
3499
3500        # Populate consistencygroup object
3501        grp = objects.Group.get_by_id(ctxt, group_snapshot.group_id)
3502        cg, __ = self._convert_group_to_cg(grp, [])
3503        cgsnap.consistencygroup = cg
3504
3505        for snap in snapshots:
3506            snap.cgsnapshot_id = snap.group_snapshot_id
3507            snap.cgsnapshot = cgsnap
3508
3509        return cgsnap, snapshots
3510
3511    def _remove_cgsnapshot_id_from_snapshots(self, snapshots):
3512        if not snapshots:
3513            return
3514        for snap in snapshots:
3515            snap.cgsnapshot_id = None
3516            snap.cgsnapshot = None
3517
3518    def _create_group_generic(self, context, group):
3519        """Creates a group."""
3520        # A group entry is already created in db. Just returns a status here.
3521        model_update = {'status': fields.GroupStatus.AVAILABLE,
3522                        'created_at': timeutils.utcnow()}
3523        return model_update
3524
3525    def _delete_group_generic(self, context, group, volumes):
3526        """Deletes a group and volumes in the group."""
3527        model_update = {'status': group.status}
3528        volume_model_updates = []
3529        for volume_ref in volumes:
3530            volume_model_update = {'id': volume_ref.id}
3531            try:
3532                self.driver.remove_export(context, volume_ref)
3533                self.driver.delete_volume(volume_ref)
3534                volume_model_update['status'] = 'deleted'
3535            except exception.VolumeIsBusy:
3536                volume_model_update['status'] = 'available'
3537            except Exception:
3538                volume_model_update['status'] = 'error'
3539                model_update['status'] = fields.GroupStatus.ERROR
3540            volume_model_updates.append(volume_model_update)
3541
3542        return model_update, volume_model_updates
3543
3544    def _update_group_generic(self, context, group,
3545                              add_volumes=None, remove_volumes=None):
3546        """Updates a group."""
3547        # NOTE(xyang): The volume manager adds/removes the volume to/from the
3548        # group in the database. This default implementation does not do
3549        # anything in the backend storage.
3550        return None, None, None
3551
3552    def _collect_volumes_for_group(self, context, group, volumes, add=True):
3553        if add:
3554            valid_status = VALID_ADD_VOL_TO_GROUP_STATUS
3555        else:
3556            valid_status = VALID_REMOVE_VOL_FROM_GROUP_STATUS
3557        volumes_ref = []
3558        if not volumes:
3559            return volumes_ref
3560        for add_vol in volumes.split(','):
3561            try:
3562                add_vol_ref = objects.Volume.get_by_id(context, add_vol)
3563            except exception.VolumeNotFound:
3564                LOG.error("Update group "
3565                          "failed to %(op)s volume-%(volume_id)s: "
3566                          "VolumeNotFound.",
3567                          {'volume_id': add_vol_ref.id,
3568                           'op': 'add' if add else 'remove'},
3569                          resource={'type': 'group',
3570                                    'id': group.id})
3571                raise
3572            if add_vol_ref.status not in valid_status:
3573                msg = (_("Can not %(op)s volume %(volume_id)s to "
3574                         "group %(group_id)s because volume is in an invalid "
3575                         "state: %(status)s. Valid states are: %(valid)s.") %
3576                       {'volume_id': add_vol_ref.id,
3577                        'group_id': group.id,
3578                        'status': add_vol_ref.status,
3579                        'valid': valid_status,
3580                        'op': 'add' if add else 'remove'})
3581                raise exception.InvalidVolume(reason=msg)
3582            if add:
3583                self._check_is_our_resource(add_vol_ref)
3584            volumes_ref.append(add_vol_ref)
3585        return volumes_ref
3586
3587    def update_group(self, context, group,
3588                     add_volumes=None, remove_volumes=None):
3589        """Updates group.
3590
3591        Update group by adding volumes to the group,
3592        or removing volumes from the group.
3593        """
3594
3595        add_volumes_ref = self._collect_volumes_for_group(context,
3596                                                          group,
3597                                                          add_volumes,
3598                                                          add=True)
3599        remove_volumes_ref = self._collect_volumes_for_group(context,
3600                                                             group,
3601                                                             remove_volumes,
3602                                                             add=False)
3603        self._notify_about_group_usage(
3604            context, group, "update.start")
3605
3606        try:
3607            utils.require_driver_initialized(self.driver)
3608
3609            try:
3610                model_update, add_volumes_update, remove_volumes_update = (
3611                    self.driver.update_group(
3612                        context, group,
3613                        add_volumes=add_volumes_ref,
3614                        remove_volumes=remove_volumes_ref))
3615            except NotImplementedError:
3616                if not group_types.is_default_cgsnapshot_type(
3617                        group.group_type_id):
3618                    model_update, add_volumes_update, remove_volumes_update = (
3619                        self._update_group_generic(
3620                            context, group,
3621                            add_volumes=add_volumes_ref,
3622                            remove_volumes=remove_volumes_ref))
3623                else:
3624                    cg, remove_volumes_ref = self._convert_group_to_cg(
3625                        group, remove_volumes_ref)
3626                    model_update, add_volumes_update, remove_volumes_update = (
3627                        self.driver.update_consistencygroup(
3628                            context, cg,
3629                            add_volumes=add_volumes_ref,
3630                            remove_volumes=remove_volumes_ref))
3631                    self._remove_consistencygroup_id_from_volumes(
3632                        remove_volumes_ref)
3633
3634            volumes_to_update = []
3635            if add_volumes_update:
3636                volumes_to_update.extend(add_volumes_update)
3637            if remove_volumes_update:
3638                volumes_to_update.extend(remove_volumes_update)
3639            self.db.volumes_update(context, volumes_to_update)
3640
3641            if model_update:
3642                if model_update['status'] in (
3643                        [fields.GroupStatus.ERROR]):
3644                    msg = (_('Error occurred when updating group '
3645                             '%s.') % group.id)
3646                    LOG.error(msg)
3647                    raise exception.VolumeDriverException(message=msg)
3648                group.update(model_update)
3649                group.save()
3650
3651        except Exception as e:
3652            with excutils.save_and_reraise_exception():
3653                if isinstance(e, exception.VolumeDriverException):
3654                    LOG.error("Error occurred in the volume driver when "
3655                              "updating group %(group_id)s.",
3656                              {'group_id': group.id})
3657                else:
3658                    LOG.error("Failed to update group %(group_id)s.",
3659                              {'group_id': group.id})
3660                group.status = fields.GroupStatus.ERROR
3661                group.save()
3662                for add_vol in add_volumes_ref:
3663                    add_vol.status = 'error'
3664                    add_vol.save()
3665                for rem_vol in remove_volumes_ref:
3666                    if isinstance(e, exception.VolumeDriverException):
3667                        rem_vol.consistencygroup_id = None
3668                        rem_vol.consistencygroup = None
3669                    rem_vol.status = 'error'
3670                    rem_vol.save()
3671
3672        for add_vol in add_volumes_ref:
3673            add_vol.group_id = group.id
3674            add_vol.save()
3675        for rem_vol in remove_volumes_ref:
3676            rem_vol.group_id = None
3677            rem_vol.save()
3678        group.status = fields.GroupStatus.AVAILABLE
3679        group.save()
3680
3681        self._notify_about_group_usage(
3682            context, group, "update.end")
3683        LOG.info("Update group completed successfully.",
3684                 resource={'type': 'group',
3685                           'id': group.id})
3686
3687    def create_group_snapshot(self, context, group_snapshot):
3688        """Creates the group_snapshot."""
3689        caller_context = context
3690        context = context.elevated()
3691
3692        LOG.info("GroupSnapshot %s: creating.", group_snapshot.id)
3693
3694        snapshots = objects.SnapshotList.get_all_for_group_snapshot(
3695            context, group_snapshot.id)
3696
3697        self._notify_about_group_snapshot_usage(
3698            context, group_snapshot, "create.start")
3699
3700        snapshots_model_update = None
3701        model_update = None
3702        try:
3703            utils.require_driver_initialized(self.driver)
3704
3705            LOG.debug("Group snapshot %(grp_snap_id)s: creating.",
3706                      {'grp_snap_id': group_snapshot.id})
3707
3708            # Pass context so that drivers that want to use it, can,
3709            # but it is not a requirement for all drivers.
3710            group_snapshot.context = caller_context
3711            for snapshot in snapshots:
3712                snapshot.context = caller_context
3713
3714            try:
3715                model_update, snapshots_model_update = (
3716                    self.driver.create_group_snapshot(context, group_snapshot,
3717                                                      snapshots))
3718            except NotImplementedError:
3719                if not group_types.is_default_cgsnapshot_type(
3720                        group_snapshot.group_type_id):
3721                    model_update, snapshots_model_update = (
3722                        self._create_group_snapshot_generic(
3723                            context, group_snapshot, snapshots))
3724                else:
3725                    cgsnapshot, snapshots = (
3726                        self._convert_group_snapshot_to_cgsnapshot(
3727                            group_snapshot, snapshots, context))
3728                    model_update, snapshots_model_update = (
3729                        self.driver.create_cgsnapshot(context, cgsnapshot,
3730                                                      snapshots))
3731                    self._remove_cgsnapshot_id_from_snapshots(snapshots)
3732            if snapshots_model_update:
3733                for snap_model in snapshots_model_update:
3734                    # Update db for snapshot.
3735                    # NOTE(xyang): snapshots is a list of snapshot objects.
3736                    # snapshots_model_update should be a list of dicts.
3737                    snap_id = snap_model.pop('id')
3738                    snap_obj = objects.Snapshot.get_by_id(context, snap_id)
3739                    snap_obj.update(snap_model)
3740                    snap_obj.save()
3741                    if (snap_model['status'] in [
3742                        fields.SnapshotStatus.ERROR_DELETING,
3743                        fields.SnapshotStatus.ERROR] and
3744                            model_update['status'] not in
3745                            [fields.GroupSnapshotStatus.ERROR_DELETING,
3746                             fields.GroupSnapshotStatus.ERROR]):
3747                        model_update['status'] = snap_model['status']
3748
3749            if model_update:
3750                if model_update['status'] == fields.GroupSnapshotStatus.ERROR:
3751                    msg = (_('Error occurred when creating group_snapshot '
3752                             '%s.') % group_snapshot.id)
3753                    LOG.error(msg)
3754                    raise exception.VolumeDriverException(message=msg)
3755
3756                group_snapshot.update(model_update)
3757                group_snapshot.save()
3758
3759        except exception.CinderException:
3760            with excutils.save_and_reraise_exception():
3761                group_snapshot.status = fields.GroupSnapshotStatus.ERROR
3762                group_snapshot.save()
3763                # Update snapshot status to 'error' if driver returns
3764                # None for snapshots_model_update.
3765                self._remove_cgsnapshot_id_from_snapshots(snapshots)
3766                if not snapshots_model_update:
3767                    for snapshot in snapshots:
3768                        snapshot.status = fields.SnapshotStatus.ERROR
3769                        snapshot.save()
3770
3771        for snapshot in snapshots:
3772            volume_id = snapshot.volume_id
3773            snapshot_id = snapshot.id
3774            vol_obj = objects.Volume.get_by_id(context, volume_id)
3775            if vol_obj.bootable:
3776                try:
3777                    self.db.volume_glance_metadata_copy_to_snapshot(
3778                        context, snapshot_id, volume_id)
3779                except exception.GlanceMetadataNotFound:
3780                    # If volume is not created from image, No glance metadata
3781                    # would be available for that volume in
3782                    # volume glance metadata table
3783                    pass
3784                except exception.CinderException as ex:
3785                    LOG.error("Failed updating %(snapshot_id)s"
3786                              " metadata using the provided volumes"
3787                              " %(volume_id)s metadata.",
3788                              {'volume_id': volume_id,
3789                               'snapshot_id': snapshot_id})
3790                    snapshot.status = fields.SnapshotStatus.ERROR
3791                    snapshot.save()
3792                    raise exception.MetadataCopyFailure(
3793                        reason=six.text_type(ex))
3794
3795            snapshot.status = fields.SnapshotStatus.AVAILABLE
3796            snapshot.progress = '100%'
3797            snapshot.save()
3798
3799        group_snapshot.status = fields.GroupSnapshotStatus.AVAILABLE
3800        group_snapshot.save()
3801
3802        LOG.info("group_snapshot %s: created successfully",
3803                 group_snapshot.id)
3804        self._notify_about_group_snapshot_usage(
3805            context, group_snapshot, "create.end")
3806        return group_snapshot
3807
3808    def _create_group_snapshot_generic(self, context, group_snapshot,
3809                                       snapshots):
3810        """Creates a group_snapshot."""
3811        model_update = {'status': 'available'}
3812        snapshot_model_updates = []
3813        for snapshot in snapshots:
3814            snapshot_model_update = {'id': snapshot.id}
3815            try:
3816                driver_update = self.driver.create_snapshot(snapshot)
3817                if driver_update:
3818                    driver_update.pop('id', None)
3819                    snapshot_model_update.update(driver_update)
3820                if 'status' not in snapshot_model_update:
3821                    snapshot_model_update['status'] = (
3822                        fields.SnapshotStatus.AVAILABLE)
3823            except Exception:
3824                snapshot_model_update['status'] = (
3825                    fields.SnapshotStatus.ERROR)
3826                model_update['status'] = 'error'
3827            snapshot_model_updates.append(snapshot_model_update)
3828
3829        return model_update, snapshot_model_updates
3830
3831    def _delete_group_snapshot_generic(self, context, group_snapshot,
3832                                       snapshots):
3833        """Deletes a group_snapshot."""
3834        model_update = {'status': group_snapshot.status}
3835        snapshot_model_updates = []
3836        for snapshot in snapshots:
3837            snapshot_model_update = {'id': snapshot.id}
3838            try:
3839                self.driver.delete_snapshot(snapshot)
3840                snapshot_model_update['status'] = (
3841                    fields.SnapshotStatus.DELETED)
3842            except exception.SnapshotIsBusy:
3843                snapshot_model_update['status'] = (
3844                    fields.SnapshotStatus.AVAILABLE)
3845            except Exception:
3846                snapshot_model_update['status'] = (
3847                    fields.SnapshotStatus.ERROR)
3848                model_update['status'] = 'error'
3849            snapshot_model_updates.append(snapshot_model_update)
3850
3851        return model_update, snapshot_model_updates
3852
3853    def delete_group_snapshot(self, context, group_snapshot):
3854        """Deletes group_snapshot."""
3855        caller_context = context
3856        context = context.elevated()
3857        project_id = group_snapshot.project_id
3858
3859        LOG.info("group_snapshot %s: deleting", group_snapshot.id)
3860
3861        snapshots = objects.SnapshotList.get_all_for_group_snapshot(
3862            context, group_snapshot.id)
3863
3864        self._notify_about_group_snapshot_usage(
3865            context, group_snapshot, "delete.start")
3866
3867        snapshots_model_update = None
3868        model_update = None
3869        try:
3870            utils.require_driver_initialized(self.driver)
3871
3872            LOG.debug("group_snapshot %(grp_snap_id)s: deleting",
3873                      {'grp_snap_id': group_snapshot.id})
3874
3875            # Pass context so that drivers that want to use it, can,
3876            # but it is not a requirement for all drivers.
3877            group_snapshot.context = caller_context
3878            for snapshot in snapshots:
3879                snapshot.context = caller_context
3880
3881            try:
3882                model_update, snapshots_model_update = (
3883                    self.driver.delete_group_snapshot(context, group_snapshot,
3884                                                      snapshots))
3885            except NotImplementedError:
3886                if not group_types.is_default_cgsnapshot_type(
3887                        group_snapshot.group_type_id):
3888                    model_update, snapshots_model_update = (
3889                        self._delete_group_snapshot_generic(
3890                            context, group_snapshot, snapshots))
3891                else:
3892                    cgsnapshot, snapshots = (
3893                        self._convert_group_snapshot_to_cgsnapshot(
3894                            group_snapshot, snapshots, context))
3895                    model_update, snapshots_model_update = (
3896                        self.driver.delete_cgsnapshot(context, cgsnapshot,
3897                                                      snapshots))
3898                    self._remove_cgsnapshot_id_from_snapshots(snapshots)
3899
3900            if snapshots_model_update:
3901                for snap_model in snapshots_model_update:
3902                    # NOTE(xyang): snapshots is a list of snapshot objects.
3903                    # snapshots_model_update should be a list of dicts.
3904                    snap = next((item for item in snapshots if
3905                                 item.id == snap_model['id']), None)
3906                    if snap:
3907                        snap_model.pop('id')
3908                        snap.update(snap_model)
3909                        snap.save()
3910
3911                    if (snap_model['status'] in
3912                            [fields.SnapshotStatus.ERROR_DELETING,
3913                             fields.SnapshotStatus.ERROR] and
3914                            model_update['status'] not in
3915                            ['error_deleting', 'error']):
3916                        model_update['status'] = snap_model['status']
3917
3918            if model_update:
3919                if model_update['status'] in ['error_deleting', 'error']:
3920                    msg = (_('Error occurred when deleting group_snapshot '
3921                             '%s.') % group_snapshot.id)
3922                    LOG.error(msg)
3923                    raise exception.VolumeDriverException(message=msg)
3924                else:
3925                    group_snapshot.update(model_update)
3926                    group_snapshot.save()
3927
3928        except exception.CinderException:
3929            with excutils.save_and_reraise_exception():
3930                group_snapshot.status = fields.GroupSnapshotStatus.ERROR
3931                group_snapshot.save()
3932                # Update snapshot status to 'error' if driver returns
3933                # None for snapshots_model_update.
3934                if not snapshots_model_update:
3935                    self._remove_cgsnapshot_id_from_snapshots(snapshots)
3936                    for snapshot in snapshots:
3937                        snapshot.status = fields.SnapshotStatus.ERROR
3938                        snapshot.save()
3939
3940        for snapshot in snapshots:
3941            # Get reservations
3942            try:
3943                reserve_opts = {'snapshots': -1}
3944                volume_ref = objects.Volume.get_by_id(context,
3945                                                      snapshot.volume_id)
3946                QUOTAS.add_volume_type_opts(context,
3947                                            reserve_opts,
3948                                            volume_ref.volume_type_id)
3949                reservations = QUOTAS.reserve(context,
3950                                              project_id=project_id,
3951                                              **reserve_opts)
3952
3953            except Exception:
3954                reservations = None
3955                LOG.exception("Failed to update usages deleting snapshot")
3956
3957            self.db.volume_glance_metadata_delete_by_snapshot(context,
3958                                                              snapshot.id)
3959            snapshot.destroy()
3960
3961            # Commit the reservations
3962            if reservations:
3963                QUOTAS.commit(context, reservations, project_id=project_id)
3964
3965        group_snapshot.destroy()
3966        LOG.info("group_snapshot %s: deleted successfully",
3967                 group_snapshot.id)
3968        self._notify_about_group_snapshot_usage(context, group_snapshot,
3969                                                "delete.end",
3970                                                snapshots)
3971
3972    def update_migrated_volume(self, ctxt, volume, new_volume, volume_status):
3973        """Finalize migration process on backend device."""
3974        model_update = None
3975        model_update_default = {'_name_id': new_volume.name_id,
3976                                'provider_location':
3977                                new_volume.provider_location}
3978        try:
3979            model_update = self.driver.update_migrated_volume(ctxt,
3980                                                              volume,
3981                                                              new_volume,
3982                                                              volume_status)
3983        except NotImplementedError:
3984            # If update_migrated_volume is not implemented for the driver,
3985            # _name_id and provider_location will be set with the values
3986            # from new_volume.
3987            model_update = model_update_default
3988        if model_update:
3989            model_update_default.update(model_update)
3990            # Swap keys that were changed in the source so we keep their values
3991            # in the temporary volume's DB record.
3992            # Need to convert 'metadata' and 'admin_metadata' since
3993            # they are not keys of volume, their corresponding keys are
3994            # 'volume_metadata' and 'volume_admin_metadata'.
3995            model_update_new = dict()
3996            for key in model_update:
3997                if key == 'metadata':
3998                    if volume.get('volume_metadata'):
3999                        model_update_new[key] = {
4000                            metadata['key']: metadata['value']
4001                            for metadata in volume.volume_metadata}
4002                elif key == 'admin_metadata':
4003                    model_update_new[key] = {
4004                        metadata['key']: metadata['value']
4005                        for metadata in volume.volume_admin_metadata}
4006                else:
4007                    model_update_new[key] = volume[key]
4008            with new_volume.obj_as_admin():
4009                new_volume.update(model_update_new)
4010                new_volume.save()
4011        with volume.obj_as_admin():
4012                volume.update(model_update_default)
4013                volume.save()
4014
4015    # Replication V2.1 and a/a method
4016    def failover(self, context, secondary_backend_id=None):
4017        """Failover a backend to a secondary replication target.
4018
4019        Instructs a replication capable/configured backend to failover
4020        to one of it's secondary replication targets. host=None is
4021        an acceetable input, and leaves it to the driver to failover
4022        to the only configured target, or to choose a target on it's
4023        own. All of the hosts volumes will be passed on to the driver
4024        in order for it to determine the replicated volumes on the host,
4025        if needed.
4026
4027        :param context: security context
4028        :param secondary_backend_id: Specifies backend_id to fail over to
4029        """
4030        updates = {}
4031        repl_status = fields.ReplicationStatus
4032
4033        service = self._get_service()
4034
4035        # TODO(geguileo): We should optimize these updates by doing them
4036        # directly on the DB with just 3 queries, one to change the volumes
4037        # another to change all the snapshots, and another to get replicated
4038        # volumes.
4039
4040        # Change non replicated volumes and their snapshots to error if we are
4041        # failing over, leave them as they are for failback
4042        volumes = self._get_my_volumes(context)
4043
4044        replicated_vols = []
4045        for volume in volumes:
4046            if volume.replication_status not in (repl_status.DISABLED,
4047                                                 repl_status.NOT_CAPABLE):
4048                replicated_vols.append(volume)
4049            elif secondary_backend_id != self.FAILBACK_SENTINEL:
4050                volume.previous_status = volume.status
4051                volume.status = 'error'
4052                volume.replication_status = repl_status.NOT_CAPABLE
4053                volume.save()
4054
4055                for snapshot in volume.snapshots:
4056                    snapshot.status = fields.SnapshotStatus.ERROR
4057                    snapshot.save()
4058
4059        volume_update_list = None
4060        group_update_list = None
4061        try:
4062            # For non clustered we can call v2.1 failover_host, but for
4063            # clustered we call a/a failover method.  We know a/a method
4064            # exists because BaseVD class wouldn't have started if it didn't.
4065            failover = getattr(self.driver,
4066                               'failover' if service.is_clustered
4067                               else 'failover_host')
4068            # expected form of volume_update_list:
4069            # [{volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}},
4070            #  {volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}}]
4071            # It includes volumes in replication groups and those not in them
4072            # expected form of group_update_list:
4073            # [{group_id: <cinder-grpid>, updates: {'xxxx': xxxx....}},
4074            #  {group_id: <cinder-grpid>, updates: {'xxxx': xxxx....}}]
4075            filters = self._get_cluster_or_host_filters()
4076            groups = objects.GroupList.get_all_replicated(context,
4077                                                          filters=filters)
4078            active_backend_id, volume_update_list, group_update_list = (
4079                failover(context,
4080                         replicated_vols,
4081                         secondary_id=secondary_backend_id,
4082                         groups=groups))
4083            try:
4084                update_data = {u['volume_id']: u['updates']
4085                               for u in volume_update_list}
4086            except KeyError:
4087                msg = "Update list, doesn't include volume_id"
4088                raise exception.ProgrammingError(reason=msg)
4089            try:
4090                update_group_data = {g['group_id']: g['updates']
4091                                     for g in group_update_list}
4092            except KeyError:
4093                msg = "Update list, doesn't include group_id"
4094                raise exception.ProgrammingError(reason=msg)
4095        except Exception as exc:
4096            # NOTE(jdg): Drivers need to be aware if they fail during
4097            # a failover sequence, we're expecting them to cleanup
4098            # and make sure the driver state is such that the original
4099            # backend is still set as primary as per driver memory
4100
4101            # We don't want to log the exception trace invalid replication
4102            # target
4103            if isinstance(exc, exception.InvalidReplicationTarget):
4104                log_method = LOG.error
4105                # Preserve the replication_status: Status should be failed over
4106                # if we were failing back or if we were failing over from one
4107                # secondary to another secondary. In both cases
4108                # active_backend_id will be set.
4109                if service.active_backend_id:
4110                    updates['replication_status'] = repl_status.FAILED_OVER
4111                else:
4112                    updates['replication_status'] = repl_status.ENABLED
4113            else:
4114                log_method = LOG.exception
4115                updates.update(disabled=True,
4116                               replication_status=repl_status.FAILOVER_ERROR)
4117
4118            log_method("Error encountered during failover on host: %(host)s "
4119                       "to %(backend_id)s: %(error)s",
4120                       {'host': self.host, 'backend_id': secondary_backend_id,
4121                        'error': exc})
4122            # We dump the update list for manual recovery
4123            LOG.error('Failed update_list is: %s', volume_update_list)
4124            self.finish_failover(context, service, updates)
4125            return
4126
4127        if secondary_backend_id == "default":
4128            updates['replication_status'] = repl_status.ENABLED
4129            updates['active_backend_id'] = ''
4130            updates['disabled'] = service.frozen
4131            updates['disabled_reason'] = 'frozen' if service.frozen else ''
4132        else:
4133            updates['replication_status'] = repl_status.FAILED_OVER
4134            updates['active_backend_id'] = active_backend_id
4135            updates['disabled'] = True
4136            updates['disabled_reason'] = 'failed-over'
4137
4138        self.finish_failover(context, service, updates)
4139
4140        for volume in replicated_vols:
4141            update = update_data.get(volume.id, {})
4142            if update.get('status', '') == 'error':
4143                update['replication_status'] = repl_status.FAILOVER_ERROR
4144            elif update.get('replication_status') in (None,
4145                                                      repl_status.FAILED_OVER):
4146                update['replication_status'] = updates['replication_status']
4147
4148            if update['replication_status'] == repl_status.FAILOVER_ERROR:
4149                update.setdefault('status', 'error')
4150                # Set all volume snapshots to error
4151                for snapshot in volume.snapshots:
4152                    snapshot.status = fields.SnapshotStatus.ERROR
4153                    snapshot.save()
4154            if 'status' in update:
4155                update['previous_status'] = volume.status
4156            volume.update(update)
4157            volume.save()
4158
4159        for grp in groups:
4160            update = update_group_data.get(grp.id, {})
4161            if update.get('status', '') == 'error':
4162                update['replication_status'] = repl_status.FAILOVER_ERROR
4163            elif update.get('replication_status') in (None,
4164                                                      repl_status.FAILED_OVER):
4165                update['replication_status'] = updates['replication_status']
4166
4167            if update['replication_status'] == repl_status.FAILOVER_ERROR:
4168                update.setdefault('status', 'error')
4169            grp.update(update)
4170            grp.save()
4171
4172        LOG.info("Failed over to replication target successfully.")
4173
4174    # TODO(geguileo): In P - remove this
4175    failover_host = failover
4176
4177    def finish_failover(self, context, service, updates):
4178        """Completion of the failover locally or via RPC."""
4179        # If the service is clustered, broadcast the service changes to all
4180        # volume services, including this one.
4181        if service.is_clustered:
4182            # We have to update the cluster with the same data, and we do it
4183            # before broadcasting the failover_completed RPC call to prevent
4184            # races with services that may be starting..
4185            for key, value in updates.items():
4186                setattr(service.cluster, key, value)
4187            service.cluster.save()
4188            rpcapi = volume_rpcapi.VolumeAPI()
4189            rpcapi.failover_completed(context, service, updates)
4190        else:
4191            service.update(updates)
4192            service.save()
4193
4194    def failover_completed(self, context, updates):
4195        """Finalize failover of this backend.
4196
4197        When a service is clustered and replicated the failover has 2 stages,
4198        one that does the failover of the volumes and another that finalizes
4199        the failover of the services themselves.
4200
4201        This method takes care of the last part and is called from the service
4202        doing the failover of the volumes after finished processing the
4203        volumes.
4204        """
4205        service = self._get_service()
4206        service.update(updates)
4207        try:
4208            self.driver.failover_completed(context, service.active_backend_id)
4209        except Exception:
4210            msg = _('Driver reported error during replication failover '
4211                    'completion.')
4212            LOG.exception(msg)
4213            service.disabled = True
4214            service.disabled_reason = msg
4215            service.replication_status = (
4216                fields.ReplicationStatus.ERROR)
4217        service.save()
4218
4219    def freeze_host(self, context):
4220        """Freeze management plane on this backend.
4221
4222        Basically puts the control/management plane into a
4223        Read Only state.  We should handle this in the scheduler,
4224        however this is provided to let the driver know in case it
4225        needs/wants to do something specific on the backend.
4226
4227        :param context: security context
4228        """
4229        # TODO(jdg): Return from driver? or catch?
4230        # Update status column in service entry
4231        try:
4232            self.driver.freeze_backend(context)
4233        except exception.VolumeDriverException:
4234            # NOTE(jdg): In the case of freeze, we don't really
4235            # need the backend's consent or anything, we'll just
4236            # disable the service, so we can just log this and
4237            # go about our business
4238            LOG.warning('Error encountered on Cinder backend during '
4239                        'freeze operation, service is frozen, however '
4240                        'notification to driver has failed.')
4241
4242        service = self._get_service()
4243        service.disabled = True
4244        service.disabled_reason = "frozen"
4245        service.save()
4246        LOG.info("Set backend status to frozen successfully.")
4247        return True
4248
4249    def thaw_host(self, context):
4250        """UnFreeze management plane on this backend.
4251
4252        Basically puts the control/management plane back into
4253        a normal state.  We should handle this in the scheduler,
4254        however this is provided to let the driver know in case it
4255        needs/wants to do something specific on the backend.
4256
4257        :param context: security context
4258        """
4259
4260        # TODO(jdg): Return from driver? or catch?
4261        # Update status column in service entry
4262        try:
4263            self.driver.thaw_backend(context)
4264        except exception.VolumeDriverException:
4265            # NOTE(jdg): Thaw actually matters, if this call
4266            # to the backend fails, we're stuck and can't re-enable
4267            LOG.error('Error encountered on Cinder backend during '
4268                      'thaw operation, service will remain frozen.')
4269            return False
4270
4271        service = self._get_service()
4272        service.disabled = False
4273        service.disabled_reason = ""
4274        service.save()
4275        LOG.info("Thawed backend successfully.")
4276        return True
4277
4278    def manage_existing_snapshot(self, ctxt, snapshot, ref=None):
4279        LOG.debug('manage_existing_snapshot: managing %s.', ref)
4280        try:
4281            flow_engine = manage_existing_snapshot.get_flow(
4282                ctxt,
4283                self.db,
4284                self.driver,
4285                self.host,
4286                snapshot.id,
4287                ref)
4288        except Exception:
4289            LOG.exception("Failed to create manage_existing flow: "
4290                          "%(object_type)s %(object_id)s.",
4291                          {'object_type': 'snapshot',
4292                           'object_id': snapshot.id})
4293            raise exception.CinderException(
4294                _("Failed to create manage existing flow."))
4295
4296        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
4297            flow_engine.run()
4298        return snapshot.id
4299
4300    def get_manageable_snapshots(self, ctxt, marker, limit, offset,
4301                                 sort_keys, sort_dirs, want_objects=False):
4302        try:
4303            utils.require_driver_initialized(self.driver)
4304        except exception.DriverNotInitialized:
4305            with excutils.save_and_reraise_exception():
4306                LOG.exception("Listing manageable snapshots failed, due "
4307                              "to uninitialized driver.")
4308
4309        cinder_snapshots = self._get_my_snapshots(ctxt)
4310        try:
4311            driver_entries = self.driver.get_manageable_snapshots(
4312                cinder_snapshots, marker, limit, offset, sort_keys, sort_dirs)
4313            if want_objects:
4314                driver_entries = (objects.ManageableSnapshotList.
4315                                  from_primitives(ctxt, driver_entries))
4316        except Exception:
4317            with excutils.save_and_reraise_exception():
4318                LOG.exception("Listing manageable snapshots failed, due "
4319                              "to driver error.")
4320        return driver_entries
4321
4322    def get_capabilities(self, context, discover):
4323        """Get capabilities of backend storage."""
4324        if discover:
4325            self.driver.init_capabilities()
4326        capabilities = self.driver.capabilities
4327        LOG.debug("Obtained capabilities list: %s.", capabilities)
4328        return capabilities
4329
4330    def get_backup_device(self, ctxt, backup, want_objects=False):
4331        (backup_device, is_snapshot) = (
4332            self.driver.get_backup_device(ctxt, backup))
4333        secure_enabled = self.driver.secure_file_operations_enabled()
4334        backup_device_dict = {'backup_device': backup_device,
4335                              'secure_enabled': secure_enabled,
4336                              'is_snapshot': is_snapshot, }
4337        # TODO(sborkows): from_primitive method will be removed in O, so there
4338        # is a need to clean here then.
4339        return (objects.BackupDeviceInfo.from_primitive(backup_device_dict,
4340                                                        ctxt)
4341                if want_objects else backup_device_dict)
4342
4343    def secure_file_operations_enabled(self, ctxt, volume):
4344        secure_enabled = self.driver.secure_file_operations_enabled()
4345        return secure_enabled
4346
4347    def _connection_create(self, ctxt, volume, attachment, connector):
4348        try:
4349            self.driver.validate_connector(connector)
4350        except exception.InvalidConnectorException as err:
4351            raise exception.InvalidInput(reason=six.text_type(err))
4352        except Exception as err:
4353            err_msg = (_("Validate volume connection failed "
4354                         "(error: %(err)s).") % {'err': six.text_type(err)})
4355            LOG.error(err_msg, resource=volume)
4356            raise exception.VolumeBackendAPIException(data=err_msg)
4357
4358        try:
4359            model_update = self.driver.create_export(ctxt.elevated(),
4360                                                     volume, connector)
4361        except exception.CinderException as ex:
4362            err_msg = (_("Create export for volume failed (%s).") % ex.msg)
4363            LOG.exception(err_msg, resource=volume)
4364            raise exception.VolumeBackendAPIException(data=err_msg)
4365
4366        try:
4367            if model_update:
4368                volume.update(model_update)
4369                volume.save()
4370        except exception.CinderException as ex:
4371            LOG.exception("Model update failed.", resource=volume)
4372            raise exception.ExportFailure(reason=six.text_type(ex))
4373
4374        try:
4375            conn_info = self.driver.initialize_connection(volume, connector)
4376        except Exception as err:
4377            err_msg = (_("Driver initialize connection failed "
4378                         "(error: %(err)s).") % {'err': six.text_type(err)})
4379            LOG.exception(err_msg, resource=volume)
4380            self.driver.remove_export(ctxt.elevated(), volume)
4381            raise exception.VolumeBackendAPIException(data=err_msg)
4382        conn_info = self._parse_connection_options(ctxt, volume, conn_info)
4383
4384        # NOTE(jdg): Get rid of the nested dict (data key)
4385        conn_data = conn_info.pop('data', {})
4386        connection_info = conn_data.copy()
4387        connection_info.update(conn_info)
4388        values = {'volume_id': volume.id,
4389                  'attach_status': 'attaching',
4390                  'connector': jsonutils.dumps(connector)}
4391
4392        # TODO(mriedem): Use VolumeAttachment.save() here.
4393        self.db.volume_attachment_update(ctxt, attachment.id, values)
4394
4395        connection_info['attachment_id'] = attachment.id
4396        return connection_info
4397
4398    def attachment_update(self,
4399                          context,
4400                          vref,
4401                          connector,
4402                          attachment_id):
4403        """Update/Finalize an attachment.
4404
4405        This call updates a valid attachment record to associate with a volume
4406        and provide the caller with the proper connection info.  Note that
4407        this call requires an `attachment_ref`.  It's expected that prior to
4408        this call that the volume and an attachment UUID has been reserved.
4409
4410        param: vref: Volume object to create attachment for
4411        param: connector: Connector object to use for attachment creation
4412        param: attachment_ref: ID of the attachment record to update
4413        """
4414
4415        mode = connector.get('mode', 'rw')
4416        self._notify_about_volume_usage(context, vref, 'attach.start')
4417        attachment_ref = objects.VolumeAttachment.get_by_id(context,
4418                                                            attachment_id)
4419        connection_info = self._connection_create(context,
4420                                                  vref,
4421                                                  attachment_ref,
4422                                                  connector)
4423        # FIXME(jdg): get rid of this admin_meta option here, the only thing
4424        # it does is enforce that a volume is R/O, that should be done via a
4425        # type and not *more* metadata
4426        volume_metadata = self.db.volume_admin_metadata_update(
4427            context.elevated(),
4428            attachment_ref.volume_id,
4429            {'attached_mode': mode}, False)
4430
4431        # The prior seting of mode in the attachment_ref overrides any
4432        # settings within the connector when dealing with read only
4433        # attachment options
4434        if mode != 'ro' and attachment_ref.attach_mode == 'ro':
4435            connector['mode'] = 'ro'
4436            mode = 'ro'
4437
4438        try:
4439            if volume_metadata.get('readonly') == 'True' and mode != 'ro':
4440                raise exception.InvalidVolumeAttachMode(mode=mode,
4441                                                        volume_id=vref.id)
4442            utils.require_driver_initialized(self.driver)
4443            self.driver.attach_volume(context,
4444                                      vref,
4445                                      attachment_ref.instance_uuid,
4446                                      connector.get('host', ''),
4447                                      connector.get('mountpoint', 'na'))
4448        except Exception as err:
4449            self.message_api.create(
4450                context, message_field.Action.UPDATE_ATTACHMENT,
4451                resource_uuid=vref.id,
4452                exception=err)
4453            with excutils.save_and_reraise_exception():
4454                self.db.volume_attachment_update(
4455                    context, attachment_ref.id,
4456                    {'attach_status':
4457                     fields.VolumeAttachStatus.ERROR_ATTACHING})
4458
4459        self.db.volume_attached(context.elevated(),
4460                                attachment_ref.id,
4461                                attachment_ref.instance_uuid,
4462                                connector.get('host', ''),
4463                                connector.get('mountpoint', 'na'),
4464                                mode,
4465                                False)
4466        vref.refresh()
4467        attachment_ref.refresh()
4468        self._notify_about_volume_usage(context, vref, "attach.end")
4469        LOG.info("attachment_update completed successfully.",
4470                 resource=vref)
4471        return connection_info
4472
4473    def _connection_terminate(self, context, volume,
4474                              attachment, force=False):
4475        """Remove a volume connection, but leave attachment.
4476
4477        Exits early if the attachment does not have a connector and returns
4478        None to indicate shared connections are irrelevant.
4479        """
4480        utils.require_driver_initialized(self.driver)
4481        connector = attachment.connector
4482        if not connector and not force:
4483            # It's possible to attach a volume to a shelved offloaded server
4484            # in nova, and a shelved offloaded server is not on a compute host,
4485            # which means the attachment was made without a host connector,
4486            # so if we don't have a connector we can't terminate a connection
4487            # that was never actually made to the storage backend, so just
4488            # log a message and exit.
4489            LOG.debug('No connector for attachment %s; skipping storage '
4490                      'backend terminate_connection call.', attachment.id)
4491            # None indicates we don't know and don't care.
4492            return None
4493        try:
4494            shared_connections = self.driver.terminate_connection(volume,
4495                                                                  connector,
4496                                                                  force=force)
4497            if not isinstance(shared_connections, bool):
4498                shared_connections = False
4499
4500        except Exception as err:
4501            err_msg = (_('Terminate volume connection failed: %(err)s')
4502                       % {'err': six.text_type(err)})
4503            LOG.exception(err_msg, resource=volume)
4504            raise exception.VolumeBackendAPIException(data=err_msg)
4505        LOG.info("Terminate volume connection completed successfully.",
4506                 resource=volume)
4507        # NOTE(jdg): Return True/False if there are other outstanding
4508        # attachments that share this connection.  If True should signify
4509        # caller to preserve the actual host connection (work should be
4510        # done in the brick connector as it has the knowledge of what's
4511        # going on here.
4512        return shared_connections
4513
4514    def attachment_delete(self, context, attachment_id, vref):
4515        """Delete/Detach the specified attachment.
4516
4517        Notifies the backend device that we're detaching the specified
4518        attachment instance.
4519
4520        param: vref: Volume object associated with the attachment
4521        param: attachment: Attachment reference object to remove
4522
4523        NOTE if the attachment reference is None, we remove all existing
4524        attachments for the specified volume object.
4525        """
4526        attachment_ref = objects.VolumeAttachment.get_by_id(context,
4527                                                            attachment_id)
4528        if not attachment_ref:
4529            for attachment in VA_LIST.get_all_by_volume_id(context, vref.id):
4530                self._do_attachment_delete(context, vref, attachment)
4531        else:
4532            self._do_attachment_delete(context, vref, attachment_ref)
4533
4534    def _do_attachment_delete(self, context, vref, attachment):
4535        utils.require_driver_initialized(self.driver)
4536        self._notify_about_volume_usage(context, vref, "detach.start")
4537        has_shared_connection = self._connection_terminate(context,
4538                                                           vref,
4539                                                           attachment)
4540        try:
4541            LOG.debug('Deleting attachment %(attachment_id)s.',
4542                      {'attachment_id': attachment.id},
4543                      resource=vref)
4544            self.driver.detach_volume(context, vref, attachment)
4545            if has_shared_connection is not None and not has_shared_connection:
4546                self.driver.remove_export(context.elevated(), vref)
4547        except Exception:
4548            # FIXME(jdg): Obviously our volume object is going to need some
4549            # changes to deal with multi-attach and figuring out how to
4550            # represent a single failed attach out of multiple attachments
4551
4552            # TODO(jdg): object method here
4553            self.db.volume_attachment_update(
4554                context, attachment.get('id'),
4555                {'attach_status': 'error_detaching'})
4556        else:
4557            self.db.volume_detached(context.elevated(), vref.id,
4558                                    attachment.get('id'))
4559            self.db.volume_admin_metadata_delete(context.elevated(),
4560                                                 vref.id,
4561                                                 'attached_mode')
4562        self._notify_about_volume_usage(context, vref, "detach.end")
4563
4564    # Replication group API (Tiramisu)
4565    def enable_replication(self, ctxt, group):
4566        """Enable replication."""
4567        group.refresh()
4568        if group.replication_status != fields.ReplicationStatus.ENABLING:
4569            msg = _("Replication status in group %s is not "
4570                    "enabling. Cannot enable replication.") % group.id
4571            LOG.error(msg)
4572            raise exception.InvalidGroup(reason=msg)
4573
4574        volumes = group.volumes
4575        for vol in volumes:
4576            vol.refresh()
4577            if vol.replication_status != fields.ReplicationStatus.ENABLING:
4578                msg = _("Replication status in volume %s is not "
4579                        "enabling. Cannot enable replication.") % vol.id
4580                LOG.error(msg)
4581                raise exception.InvalidVolume(reason=msg)
4582
4583        self._notify_about_group_usage(
4584            ctxt, group, "enable_replication.start")
4585
4586        volumes_model_update = None
4587        model_update = None
4588        try:
4589            utils.require_driver_initialized(self.driver)
4590
4591            model_update, volumes_model_update = (
4592                self.driver.enable_replication(ctxt, group, volumes))
4593
4594            if volumes_model_update:
4595                for update in volumes_model_update:
4596                    vol_obj = objects.Volume.get_by_id(ctxt, update['id'])
4597                    vol_obj.update(update)
4598                    vol_obj.save()
4599                    # If we failed to enable a volume, make sure the status
4600                    # for the group is set to error as well
4601                    if (update.get('replication_status') ==
4602                            fields.ReplicationStatus.ERROR and
4603                            model_update.get('replication_status') !=
4604                            fields.ReplicationStatus.ERROR):
4605                        model_update['replication_status'] = update.get(
4606                            'replication_status')
4607
4608            if model_update:
4609                if (model_update.get('replication_status') ==
4610                        fields.ReplicationStatus.ERROR):
4611                    msg = _('Enable replication failed.')
4612                    LOG.error(msg,
4613                              resource={'type': 'group',
4614                                        'id': group.id})
4615                    raise exception.VolumeDriverException(message=msg)
4616                else:
4617                    group.update(model_update)
4618                    group.save()
4619
4620        except exception.CinderException as ex:
4621            group.status = fields.GroupStatus.ERROR
4622            group.replication_status = fields.ReplicationStatus.ERROR
4623            group.save()
4624            # Update volume status to 'error' if driver returns
4625            # None for volumes_model_update.
4626            if not volumes_model_update:
4627                for vol in volumes:
4628                    vol.status = 'error'
4629                    vol.replication_status = fields.ReplicationStatus.ERROR
4630                    vol.save()
4631            err_msg = _("Enable replication group failed: "
4632                        "%s.") % six.text_type(ex)
4633            raise exception.ReplicationGroupError(reason=err_msg,
4634                                                  group_id=group.id)
4635
4636        for vol in volumes:
4637            vol.replication_status = fields.ReplicationStatus.ENABLED
4638            vol.save()
4639        group.replication_status = fields.ReplicationStatus.ENABLED
4640        group.save()
4641
4642        self._notify_about_group_usage(
4643            ctxt, group, "enable_replication.end", volumes)
4644        LOG.info("Enable replication completed successfully.",
4645                 resource={'type': 'group',
4646                           'id': group.id})
4647
4648    # Replication group API (Tiramisu)
4649    def disable_replication(self, ctxt, group):
4650        """Disable replication."""
4651        group.refresh()
4652        if group.replication_status != fields.ReplicationStatus.DISABLING:
4653            msg = _("Replication status in group %s is not "
4654                    "disabling. Cannot disable replication.") % group.id
4655            LOG.error(msg)
4656            raise exception.InvalidGroup(reason=msg)
4657
4658        volumes = group.volumes
4659        for vol in volumes:
4660            vol.refresh()
4661            if (vol.replication_status !=
4662                    fields.ReplicationStatus.DISABLING):
4663                msg = _("Replication status in volume %s is not "
4664                        "disabling. Cannot disable replication.") % vol.id
4665                LOG.error(msg)
4666                raise exception.InvalidVolume(reason=msg)
4667
4668        self._notify_about_group_usage(
4669            ctxt, group, "disable_replication.start")
4670
4671        volumes_model_update = None
4672        model_update = None
4673        try:
4674            utils.require_driver_initialized(self.driver)
4675
4676            model_update, volumes_model_update = (
4677                self.driver.disable_replication(ctxt, group, volumes))
4678
4679            if volumes_model_update:
4680                for update in volumes_model_update:
4681                    vol_obj = objects.Volume.get_by_id(ctxt, update['id'])
4682                    vol_obj.update(update)
4683                    vol_obj.save()
4684                    # If we failed to enable a volume, make sure the status
4685                    # for the group is set to error as well
4686                    if (update.get('replication_status') ==
4687                            fields.ReplicationStatus.ERROR and
4688                            model_update.get('replication_status') !=
4689                            fields.ReplicationStatus.ERROR):
4690                        model_update['replication_status'] = update.get(
4691                            'replication_status')
4692
4693            if model_update:
4694                if (model_update.get('replication_status') ==
4695                        fields.ReplicationStatus.ERROR):
4696                    msg = _('Disable replication failed.')
4697                    LOG.error(msg,
4698                              resource={'type': 'group',
4699                                        'id': group.id})
4700                    raise exception.VolumeDriverException(message=msg)
4701                else:
4702                    group.update(model_update)
4703                    group.save()
4704
4705        except exception.CinderException as ex:
4706            group.status = fields.GroupStatus.ERROR
4707            group.replication_status = fields.ReplicationStatus.ERROR
4708            group.save()
4709            # Update volume status to 'error' if driver returns
4710            # None for volumes_model_update.
4711            if not volumes_model_update:
4712                for vol in volumes:
4713                    vol.status = 'error'
4714                    vol.replication_status = fields.ReplicationStatus.ERROR
4715                    vol.save()
4716            err_msg = _("Disable replication group failed: "
4717                        "%s.") % six.text_type(ex)
4718            raise exception.ReplicationGroupError(reason=err_msg,
4719                                                  group_id=group.id)
4720
4721        for vol in volumes:
4722            vol.replication_status = fields.ReplicationStatus.DISABLED
4723            vol.save()
4724        group.replication_status = fields.ReplicationStatus.DISABLED
4725        group.save()
4726
4727        self._notify_about_group_usage(
4728            ctxt, group, "disable_replication.end", volumes)
4729        LOG.info("Disable replication completed successfully.",
4730                 resource={'type': 'group',
4731                           'id': group.id})
4732
4733    # Replication group API (Tiramisu)
4734    def failover_replication(self, ctxt, group, allow_attached_volume=False,
4735                             secondary_backend_id=None):
4736        """Failover replication."""
4737        group.refresh()
4738        if group.replication_status != fields.ReplicationStatus.FAILING_OVER:
4739            msg = _("Replication status in group %s is not "
4740                    "failing-over. Cannot failover replication.") % group.id
4741            LOG.error(msg)
4742            raise exception.InvalidGroup(reason=msg)
4743
4744        volumes = group.volumes
4745        for vol in volumes:
4746            vol.refresh()
4747            if vol.status == 'in-use' and not allow_attached_volume:
4748                msg = _("Volume %s is attached but allow_attached_volume flag "
4749                        "is False. Cannot failover replication.") % vol.id
4750                LOG.error(msg)
4751                raise exception.InvalidVolume(reason=msg)
4752            if (vol.replication_status !=
4753                    fields.ReplicationStatus.FAILING_OVER):
4754                msg = _("Replication status in volume %s is not "
4755                        "failing-over. Cannot failover replication.") % vol.id
4756                LOG.error(msg)
4757                raise exception.InvalidVolume(reason=msg)
4758
4759        self._notify_about_group_usage(
4760            ctxt, group, "failover_replication.start")
4761
4762        volumes_model_update = None
4763        model_update = None
4764        try:
4765            utils.require_driver_initialized(self.driver)
4766
4767            model_update, volumes_model_update = (
4768                self.driver.failover_replication(
4769                    ctxt, group, volumes, secondary_backend_id))
4770
4771            if volumes_model_update:
4772                for update in volumes_model_update:
4773                    vol_obj = objects.Volume.get_by_id(ctxt, update['id'])
4774                    vol_obj.update(update)
4775                    vol_obj.save()
4776                    # If we failed to enable a volume, make sure the status
4777                    # for the group is set to error as well
4778                    if (update.get('replication_status') ==
4779                            fields.ReplicationStatus.ERROR and
4780                            model_update.get('replication_status') !=
4781                            fields.ReplicationStatus.ERROR):
4782                        model_update['replication_status'] = update.get(
4783                            'replication_status')
4784
4785            if model_update:
4786                if (model_update.get('replication_status') ==
4787                        fields.ReplicationStatus.ERROR):
4788                    msg = _('Failover replication failed.')
4789                    LOG.error(msg,
4790                              resource={'type': 'group',
4791                                        'id': group.id})
4792                    raise exception.VolumeDriverException(message=msg)
4793                else:
4794                    group.update(model_update)
4795                    group.save()
4796
4797        except exception.CinderException as ex:
4798            group.status = fields.GroupStatus.ERROR
4799            group.replication_status = fields.ReplicationStatus.ERROR
4800            group.save()
4801            # Update volume status to 'error' if driver returns
4802            # None for volumes_model_update.
4803            if not volumes_model_update:
4804                for vol in volumes:
4805                    vol.status = 'error'
4806                    vol.replication_status = fields.ReplicationStatus.ERROR
4807                    vol.save()
4808            err_msg = _("Failover replication group failed: "
4809                        "%s.") % six.text_type(ex)
4810            raise exception.ReplicationGroupError(reason=err_msg,
4811                                                  group_id=group.id)
4812
4813        for vol in volumes:
4814            if secondary_backend_id == "default":
4815                vol.replication_status = fields.ReplicationStatus.ENABLED
4816            else:
4817                vol.replication_status = (
4818                    fields.ReplicationStatus.FAILED_OVER)
4819            vol.save()
4820        if secondary_backend_id == "default":
4821            group.replication_status = fields.ReplicationStatus.ENABLED
4822        else:
4823            group.replication_status = fields.ReplicationStatus.FAILED_OVER
4824        group.save()
4825
4826        self._notify_about_group_usage(
4827            ctxt, group, "failover_replication.end", volumes)
4828        LOG.info("Failover replication completed successfully.",
4829                 resource={'type': 'group',
4830                           'id': group.id})
4831
4832    def list_replication_targets(self, ctxt, group):
4833        """Provide a means to obtain replication targets for a group.
4834
4835        This method is used to find the replication_device config
4836        info. 'backend_id' is a required key in 'replication_device'.
4837
4838        Response Example for admin:
4839
4840        .. code:: json
4841
4842          {
4843              'replication_targets': [
4844                  {
4845                      'backend_id': 'vendor-id-1',
4846                      'unique_key': 'val1',
4847                      ......
4848                  },
4849                  {
4850                      'backend_id': 'vendor-id-2',
4851                      'unique_key': 'val2',
4852                      ......
4853                  }
4854               ]
4855          }
4856
4857        Response example for non-admin:
4858
4859        .. code json
4860
4861          {
4862              'replication_targets': [
4863                  {
4864                      'backend_id': 'vendor-id-1'
4865                  },
4866                  {
4867                      'backend_id': 'vendor-id-2'
4868                  }
4869               ]
4870          }
4871
4872        """
4873
4874        replication_targets = []
4875        try:
4876            group.refresh()
4877            if self.configuration.replication_device:
4878                if ctxt.is_admin:
4879                    for rep_dev in self.configuration.replication_device:
4880                        keys = rep_dev.keys()
4881                        dev = {}
4882                        for k in keys:
4883                            dev[k] = rep_dev[k]
4884                        replication_targets.append(dev)
4885                else:
4886                    for rep_dev in self.configuration.replication_device:
4887                        dev = rep_dev.get('backend_id')
4888                        if dev:
4889                            replication_targets.append({'backend_id': dev})
4890
4891        except exception.GroupNotFound:
4892            err_msg = (_("Get replication targets failed. Group %s not "
4893                         "found.") % group.id)
4894            LOG.exception(err_msg)
4895            raise exception.VolumeBackendAPIException(data=err_msg)
4896
4897        return {'replication_targets': replication_targets}
4898