1# Copyright 2010 United States Government as represented by the 2# Administrator of the National Aeronautics and Space Administration. 3# All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17""" 18Volume manager manages creating, attaching, detaching, and persistent storage. 19 20Persistent storage volumes keep their state independent of instances. You can 21attach to an instance, terminate the instance, spawn a new instance (even 22one from a different image) and re-attach the volume with the same data 23intact. 24 25**Related Flags** 26 27:volume_manager: The module name of a class derived from 28 :class:`manager.Manager` (default: 29 :class:`cinder.volume.manager.Manager`). 30:volume_driver: Used by :class:`Manager`. Defaults to 31 :class:`cinder.volume.drivers.lvm.LVMVolumeDriver`. 32:volume_group: Name of the group that will contain exported volumes (default: 33 `cinder-volumes`) 34:num_shell_tries: Number of times to attempt to run commands (default: 3) 35 36""" 37 38 39import requests 40import time 41 42from castellan import key_manager 43from oslo_config import cfg 44from oslo_log import log as logging 45import oslo_messaging as messaging 46from oslo_serialization import jsonutils 47from oslo_service import periodic_task 48from oslo_utils import excutils 49from oslo_utils import importutils 50from oslo_utils import timeutils 51from oslo_utils import units 52from oslo_utils import uuidutils 53profiler = importutils.try_import('osprofiler.profiler') 54import six 55from taskflow import exceptions as tfe 56 57from cinder.common import constants 58from cinder import compute 59from cinder import context 60from cinder import coordination 61from cinder import db 62from cinder import exception 63from cinder import flow_utils 64from cinder.i18n import _ 65from cinder.image import cache as image_cache 66from cinder.image import glance 67from cinder.image import image_utils 68from cinder.keymgr import migration as key_migration 69from cinder import manager 70from cinder.message import api as message_api 71from cinder.message import message_field 72from cinder import objects 73from cinder.objects import cgsnapshot 74from cinder.objects import consistencygroup 75from cinder.objects import fields 76from cinder import quota 77from cinder import utils 78from cinder import volume as cinder_volume 79from cinder.volume import configuration as config 80from cinder.volume.flows.manager import create_volume 81from cinder.volume.flows.manager import manage_existing 82from cinder.volume.flows.manager import manage_existing_snapshot 83from cinder.volume import group_types 84from cinder.volume import rpcapi as volume_rpcapi 85from cinder.volume import utils as vol_utils 86from cinder.volume import volume_types 87 88LOG = logging.getLogger(__name__) 89 90QUOTAS = quota.QUOTAS 91CGQUOTAS = quota.CGQUOTAS 92GROUP_QUOTAS = quota.GROUP_QUOTAS 93VALID_REMOVE_VOL_FROM_CG_STATUS = ( 94 'available', 95 'in-use', 96 'error', 97 'error_deleting') 98VALID_REMOVE_VOL_FROM_GROUP_STATUS = ( 99 'available', 100 'in-use', 101 'error', 102 'error_deleting') 103VALID_ADD_VOL_TO_CG_STATUS = ( 104 'available', 105 'in-use') 106VALID_ADD_VOL_TO_GROUP_STATUS = ( 107 'available', 108 'in-use') 109VALID_CREATE_CG_SRC_SNAP_STATUS = (fields.SnapshotStatus.AVAILABLE,) 110VALID_CREATE_GROUP_SRC_SNAP_STATUS = (fields.SnapshotStatus.AVAILABLE,) 111VALID_CREATE_CG_SRC_CG_STATUS = ('available',) 112VALID_CREATE_GROUP_SRC_GROUP_STATUS = ('available',) 113VA_LIST = objects.VolumeAttachmentList 114 115volume_manager_opts = [ 116 cfg.IntOpt('migration_create_volume_timeout_secs', 117 default=300, 118 help='Timeout for creating the volume to migrate to ' 119 'when performing volume migration (seconds)'), 120 cfg.BoolOpt('volume_service_inithost_offload', 121 default=False, 122 help='Offload pending volume delete during ' 123 'volume service startup'), 124 cfg.StrOpt('zoning_mode', 125 help="FC Zoning mode configured, only 'fabric' is " 126 "supported now."), 127] 128 129volume_backend_opts = [ 130 cfg.StrOpt('volume_driver', 131 default='cinder.volume.drivers.lvm.LVMVolumeDriver', 132 help='Driver to use for volume creation'), 133 cfg.StrOpt('extra_capabilities', 134 default='{}', 135 help='User defined capabilities, a JSON formatted string ' 136 'specifying key/value pairs. The key/value pairs can ' 137 'be used by the CapabilitiesFilter to select between ' 138 'backends when requests specify volume types. For ' 139 'example, specifying a service level or the geographical ' 140 'location of a backend, then creating a volume type to ' 141 'allow the user to select by these different ' 142 'properties.'), 143 cfg.BoolOpt('suppress_requests_ssl_warnings', 144 default=False, 145 help='Suppress requests library SSL certificate warnings.'), 146 cfg.IntOpt('backend_native_threads_pool_size', 147 default=20, 148 min=20, 149 help='Size of the native threads pool for the backend. ' 150 'Increase for backends that heavily rely on this, like ' 151 'the RBD driver.'), 152] 153 154CONF = cfg.CONF 155CONF.register_opts(volume_manager_opts) 156CONF.register_opts(volume_backend_opts, group=config.SHARED_CONF_GROUP) 157 158MAPPING = { 159 'cinder.volume.drivers.emc.scaleio': 160 'cinder.volume.drivers.dell_emc.scaleio.driver', 161 'cinder.volume.drivers.emc.vnx.driver.EMCVNXDriver': 162 'cinder.volume.drivers.dell_emc.vnx.driver.VNXDriver', 163 'cinder.volume.drivers.emc.xtremio.XtremIOISCSIDriver': 164 'cinder.volume.drivers.dell_emc.xtremio.XtremIOISCSIDriver', 165 'cinder.volume.drivers.emc.xtremio.XtremIOFibreChannelDriver': 166 'cinder.volume.drivers.dell_emc.xtremio.XtremIOFCDriver', 167 'cinder.volume.drivers.datera.DateraDriver': 168 'cinder.volume.drivers.datera.datera_iscsi.DateraDriver', 169 'cinder.volume.drivers.emc.emc_vmax_iscsi.EMCVMAXISCSIDriver': 170 'cinder.volume.drivers.dell_emc.vmax.iscsi.VMAXISCSIDriver', 171 'cinder.volume.drivers.emc.emc_vmax_fc.EMCVMAXFCDriver': 172 'cinder.volume.drivers.dell_emc.vmax.fc.VMAXFCDriver', 173 'cinder.volume.drivers.eqlx.DellEQLSanISCSIDriver': 174 'cinder.volume.drivers.dell_emc.ps.PSSeriesISCSIDriver', 175 'cinder.volume.drivers.dell.dell_storagecenter_iscsi.' 176 'DellStorageCenterISCSIDriver': 177 'cinder.volume.drivers.dell_emc.sc.storagecenter_iscsi.' 178 'SCISCSIDriver', 179 'cinder.volume.drivers.dell.dell_storagecenter_fc.' 180 'DellStorageCenterFCDriver': 181 'cinder.volume.drivers.dell_emc.sc.storagecenter_fc.' 182 'SCFCDriver', 183 'cinder.volume.drivers.windows.windows.WindowsDriver': 184 'cinder.volume.drivers.windows.iscsi.WindowsISCSIDriver', 185} 186 187 188class VolumeManager(manager.CleanableManager, 189 manager.SchedulerDependentManager): 190 """Manages attachable block storage devices.""" 191 192 RPC_API_VERSION = volume_rpcapi.VolumeAPI.RPC_API_VERSION 193 194 FAILBACK_SENTINEL = 'default' 195 196 target = messaging.Target(version=RPC_API_VERSION) 197 198 # On cloning a volume, we shouldn't copy volume_type, consistencygroup 199 # and volume_attachment, because the db sets that according to [field]_id, 200 # which we do copy. We also skip some other values that are set during 201 # creation of Volume object. 202 _VOLUME_CLONE_SKIP_PROPERTIES = { 203 'id', '_name_id', 'name_id', 'name', 'status', 204 'attach_status', 'migration_status', 'volume_type', 205 'consistencygroup', 'volume_attachment', 'group'} 206 207 def _get_service(self, host=None, binary=constants.VOLUME_BINARY): 208 host = host or self.host 209 ctxt = context.get_admin_context() 210 svc_host = vol_utils.extract_host(host, 'backend') 211 return objects.Service.get_by_args(ctxt, svc_host, binary) 212 213 def __init__(self, volume_driver=None, service_name=None, 214 *args, **kwargs): 215 """Load the driver from the one specified in args, or from flags.""" 216 # update_service_capabilities needs service_name to be volume 217 super(VolumeManager, self).__init__(service_name='volume', 218 *args, **kwargs) 219 # NOTE(dulek): service_name=None means we're running in unit tests. 220 service_name = service_name or 'backend_defaults' 221 self.configuration = config.Configuration(volume_backend_opts, 222 config_group=service_name) 223 self._set_tpool_size( 224 self.configuration.backend_native_threads_pool_size) 225 self.stats = {} 226 self.service_uuid = None 227 228 if not volume_driver: 229 # Get from configuration, which will get the default 230 # if its not using the multi backend 231 volume_driver = self.configuration.volume_driver 232 if volume_driver in MAPPING: 233 LOG.warning("Driver path %s is deprecated, update your " 234 "configuration to the new path.", volume_driver) 235 volume_driver = MAPPING[volume_driver] 236 237 vol_db_empty = self._set_voldb_empty_at_startup_indicator( 238 context.get_admin_context()) 239 LOG.debug("Cinder Volume DB check: vol_db_empty=%s", vol_db_empty) 240 241 # We pass the current setting for service.active_backend_id to 242 # the driver on init, in case there was a restart or something 243 curr_active_backend_id = None 244 try: 245 service = self._get_service() 246 except exception.ServiceNotFound: 247 # NOTE(jdg): This is to solve problems with unit tests 248 LOG.info("Service not found for updating " 249 "active_backend_id, assuming default " 250 "for driver init.") 251 else: 252 curr_active_backend_id = service.active_backend_id 253 self.service_uuid = service.uuid 254 255 if self.configuration.suppress_requests_ssl_warnings: 256 LOG.warning("Suppressing requests library SSL Warnings") 257 requests.packages.urllib3.disable_warnings( 258 requests.packages.urllib3.exceptions.InsecureRequestWarning) 259 requests.packages.urllib3.disable_warnings( 260 requests.packages.urllib3.exceptions.InsecurePlatformWarning) 261 262 self.key_manager = key_manager.API(CONF) 263 self.driver = importutils.import_object( 264 volume_driver, 265 configuration=self.configuration, 266 db=self.db, 267 host=self.host, 268 cluster_name=self.cluster, 269 is_vol_db_empty=vol_db_empty, 270 active_backend_id=curr_active_backend_id) 271 272 if self.cluster and not self.driver.SUPPORTS_ACTIVE_ACTIVE: 273 msg = _('Active-Active configuration is not currently supported ' 274 'by driver %s.') % volume_driver 275 LOG.error(msg) 276 raise exception.VolumeDriverException(message=msg) 277 278 self.message_api = message_api.API() 279 280 if CONF.profiler.enabled and profiler is not None: 281 self.driver = profiler.trace_cls("driver")(self.driver) 282 try: 283 self.extra_capabilities = jsonutils.loads( 284 self.driver.configuration.extra_capabilities) 285 except AttributeError: 286 self.extra_capabilities = {} 287 except Exception: 288 with excutils.save_and_reraise_exception(): 289 LOG.error("Invalid JSON: %s", 290 self.driver.configuration.extra_capabilities) 291 292 # Check if a per-backend AZ has been specified 293 backend_zone = self.driver.configuration.safe_get( 294 'backend_availability_zone') 295 if backend_zone: 296 self.availability_zone = backend_zone 297 298 if self.driver.configuration.safe_get( 299 'image_volume_cache_enabled'): 300 301 max_cache_size = self.driver.configuration.safe_get( 302 'image_volume_cache_max_size_gb') 303 max_cache_entries = self.driver.configuration.safe_get( 304 'image_volume_cache_max_count') 305 306 self.image_volume_cache = image_cache.ImageVolumeCache( 307 self.db, 308 cinder_volume.API(), 309 max_cache_size, 310 max_cache_entries 311 ) 312 LOG.info('Image-volume cache enabled for host %(host)s.', 313 {'host': self.host}) 314 else: 315 LOG.info('Image-volume cache disabled for host %(host)s.', 316 {'host': self.host}) 317 self.image_volume_cache = None 318 319 def _count_allocated_capacity(self, ctxt, volume): 320 pool = vol_utils.extract_host(volume['host'], 'pool') 321 if pool is None: 322 # No pool name encoded in host, so this is a legacy 323 # volume created before pool is introduced, ask 324 # driver to provide pool info if it has such 325 # knowledge and update the DB. 326 try: 327 pool = self.driver.get_pool(volume) 328 except Exception: 329 LOG.exception('Fetch volume pool name failed.', 330 resource=volume) 331 return 332 333 if pool: 334 new_host = vol_utils.append_host(volume['host'], 335 pool) 336 self.db.volume_update(ctxt, volume['id'], 337 {'host': new_host}) 338 else: 339 # Otherwise, put them into a special fixed pool with 340 # volume_backend_name being the pool name, if 341 # volume_backend_name is None, use default pool name. 342 # This is only for counting purpose, doesn't update DB. 343 pool = (self.driver.configuration.safe_get( 344 'volume_backend_name') or vol_utils.extract_host( 345 volume['host'], 'pool', True)) 346 try: 347 pool_stat = self.stats['pools'][pool] 348 except KeyError: 349 # First volume in the pool 350 self.stats['pools'][pool] = dict( 351 allocated_capacity_gb=0) 352 pool_stat = self.stats['pools'][pool] 353 pool_sum = pool_stat['allocated_capacity_gb'] 354 pool_sum += volume['size'] 355 356 self.stats['pools'][pool]['allocated_capacity_gb'] = pool_sum 357 self.stats['allocated_capacity_gb'] += volume['size'] 358 359 def _set_voldb_empty_at_startup_indicator(self, ctxt): 360 """Determine if the Cinder volume DB is empty. 361 362 A check of the volume DB is done to determine whether it is empty or 363 not at this point. 364 365 :param ctxt: our working context 366 """ 367 vol_entries = self.db.volume_get_all(ctxt, None, 1, filters=None) 368 369 if len(vol_entries) == 0: 370 LOG.info("Determined volume DB was empty at startup.") 371 return True 372 else: 373 LOG.info("Determined volume DB was not empty at startup.") 374 return False 375 376 def _sync_provider_info(self, ctxt, volumes, snapshots): 377 # NOTE(jdg): For now this just updates provider_id, we can add more 378 # items to the update if they're relevant but we need to be safe in 379 # what we allow and add a list of allowed keys. Things that make sense 380 # are provider_*, replication_status etc 381 382 updates, snapshot_updates = self.driver.update_provider_info( 383 volumes, snapshots) 384 385 if updates: 386 for volume in volumes: 387 # NOTE(JDG): Make sure returned item is in this hosts volumes 388 update = ( 389 [updt for updt in updates if updt['id'] == 390 volume['id']]) 391 if update: 392 update = update[0] 393 self.db.volume_update( 394 ctxt, 395 update['id'], 396 {'provider_id': update['provider_id']}) 397 398 if snapshot_updates: 399 for snap in snapshots: 400 # NOTE(jdg): For now we only update those that have no entry 401 if not snap.get('provider_id', None): 402 update = ( 403 [updt for updt in snapshot_updates if updt['id'] == 404 snap['id']][0]) 405 if update: 406 self.db.snapshot_update( 407 ctxt, 408 update['id'], 409 {'provider_id': update['provider_id']}) 410 411 def _include_resources_in_cluster(self, ctxt): 412 413 LOG.info('Including all resources from host %(host)s in cluster ' 414 '%(cluster)s.', 415 {'host': self.host, 'cluster': self.cluster}) 416 num_vols = objects.VolumeList.include_in_cluster( 417 ctxt, self.cluster, host=self.host) 418 num_cgs = objects.ConsistencyGroupList.include_in_cluster( 419 ctxt, self.cluster, host=self.host) 420 num_gs = objects.GroupList.include_in_cluster( 421 ctxt, self.cluster, host=self.host) 422 num_cache = db.image_volume_cache_include_in_cluster( 423 ctxt, self.cluster, host=self.host) 424 LOG.info('%(num_vols)s volumes, %(num_cgs)s consistency groups, ' 425 '%(num_gs)s generic groups and %(num_cache)s image ' 426 'volume caches from host %(host)s have been included in ' 427 'cluster %(cluster)s.', 428 {'num_vols': num_vols, 'num_cgs': num_cgs, 'num_gs': num_gs, 429 'host': self.host, 'cluster': self.cluster, 430 'num_cache': num_cache}) 431 432 def init_host(self, added_to_cluster=None, **kwargs): 433 """Perform any required initialization.""" 434 ctxt = context.get_admin_context() 435 if not self.driver.supported: 436 utils.log_unsupported_driver_warning(self.driver) 437 438 if not self.configuration.enable_unsupported_driver: 439 LOG.error("Unsupported drivers are disabled." 440 " You can re-enable by adding " 441 "enable_unsupported_driver=True to the " 442 "driver section in cinder.conf", 443 resource={'type': 'driver', 444 'id': self.__class__.__name__}) 445 return 446 447 # If we have just added this host to a cluster we have to include all 448 # our resources in that cluster. 449 if added_to_cluster: 450 self._include_resources_in_cluster(ctxt) 451 452 LOG.info("Starting volume driver %(driver_name)s (%(version)s)", 453 {'driver_name': self.driver.__class__.__name__, 454 'version': self.driver.get_version()}) 455 try: 456 self.driver.do_setup(ctxt) 457 self.driver.check_for_setup_error() 458 except Exception: 459 LOG.exception("Failed to initialize driver.", 460 resource={'type': 'driver', 461 'id': self.__class__.__name__}) 462 # we don't want to continue since we failed 463 # to initialize the driver correctly. 464 return 465 466 # Initialize backend capabilities list 467 self.driver.init_capabilities() 468 469 volumes = self._get_my_volumes(ctxt) 470 snapshots = self._get_my_snapshots(ctxt) 471 self._sync_provider_info(ctxt, volumes, snapshots) 472 # FIXME volume count for exporting is wrong 473 474 self.stats['pools'] = {} 475 self.stats.update({'allocated_capacity_gb': 0}) 476 477 try: 478 for volume in volumes: 479 # available volume should also be counted into allocated 480 if volume['status'] in ['in-use', 'available']: 481 # calculate allocated capacity for driver 482 self._count_allocated_capacity(ctxt, volume) 483 484 try: 485 if volume['status'] in ['in-use']: 486 self.driver.ensure_export(ctxt, volume) 487 except Exception: 488 LOG.exception("Failed to re-export volume, " 489 "setting to ERROR.", 490 resource=volume) 491 volume.conditional_update({'status': 'error'}, 492 {'status': 'in-use'}) 493 # All other cleanups are processed by parent class CleanableManager 494 495 except Exception: 496 LOG.exception("Error during re-export on driver init.", 497 resource=volume) 498 return 499 500 self.driver.set_throttle() 501 502 # at this point the driver is considered initialized. 503 # NOTE(jdg): Careful though because that doesn't mean 504 # that an entry exists in the service table 505 self.driver.set_initialized() 506 507 # Keep the image tmp file clean when init host. 508 backend_name = vol_utils.extract_host(self.service_topic_queue) 509 image_utils.cleanup_temporary_file(backend_name) 510 511 # Migrate any ConfKeyManager keys based on fixed_key to the currently 512 # configured key manager. 513 self._add_to_threadpool(key_migration.migrate_fixed_key, 514 volumes=volumes) 515 516 # collect and publish service capabilities 517 self.publish_service_capabilities(ctxt) 518 LOG.info("Driver initialization completed successfully.", 519 resource={'type': 'driver', 520 'id': self.driver.__class__.__name__}) 521 522 # Make sure to call CleanableManager to do the cleanup 523 super(VolumeManager, self).init_host(added_to_cluster=added_to_cluster, 524 **kwargs) 525 526 def init_host_with_rpc(self): 527 LOG.info("Initializing RPC dependent components of volume " 528 "driver %(driver_name)s (%(version)s)", 529 {'driver_name': self.driver.__class__.__name__, 530 'version': self.driver.get_version()}) 531 532 try: 533 # Make sure the driver is initialized first 534 utils.log_unsupported_driver_warning(self.driver) 535 utils.require_driver_initialized(self.driver) 536 except exception.DriverNotInitialized: 537 LOG.error("Cannot complete RPC initialization because " 538 "driver isn't initialized properly.", 539 resource={'type': 'driver', 540 'id': self.driver.__class__.__name__}) 541 return 542 543 stats = self.driver.get_volume_stats(refresh=True) 544 try: 545 service = self._get_service() 546 except exception.ServiceNotFound: 547 with excutils.save_and_reraise_exception(): 548 LOG.error("Service not found for updating replication_status.") 549 550 if service.replication_status != fields.ReplicationStatus.FAILED_OVER: 551 if stats and stats.get('replication_enabled', False): 552 replication_status = fields.ReplicationStatus.ENABLED 553 else: 554 replication_status = fields.ReplicationStatus.DISABLED 555 556 if replication_status != service.replication_status: 557 service.replication_status = replication_status 558 service.save() 559 560 # Update the cluster replication status if necessary 561 cluster = service.cluster 562 if (cluster and 563 cluster.replication_status != service.replication_status): 564 cluster.replication_status = service.replication_status 565 cluster.save() 566 567 LOG.info("Driver post RPC initialization completed successfully.", 568 resource={'type': 'driver', 569 'id': self.driver.__class__.__name__}) 570 571 def _do_cleanup(self, ctxt, vo_resource): 572 if isinstance(vo_resource, objects.Volume): 573 if vo_resource.status == 'downloading': 574 self.driver.clear_download(ctxt, vo_resource) 575 576 elif vo_resource.status == 'uploading': 577 # Set volume status to available or in-use. 578 self.db.volume_update_status_based_on_attachment( 579 ctxt, vo_resource.id) 580 581 elif vo_resource.status == 'deleting': 582 if CONF.volume_service_inithost_offload: 583 # Offload all the pending volume delete operations to the 584 # threadpool to prevent the main volume service thread 585 # from being blocked. 586 self._add_to_threadpool(self.delete_volume, ctxt, 587 vo_resource, cascade=True) 588 else: 589 # By default, delete volumes sequentially 590 self.delete_volume(ctxt, vo_resource, cascade=True) 591 # We signal that we take care of cleaning the worker ourselves 592 # (with set_workers decorator in delete_volume method) so 593 # do_cleanup method doesn't need to remove it. 594 return True 595 596 # For Volume creating and downloading and for Snapshot downloading 597 # statuses we have to set status to error 598 if vo_resource.status in ('creating', 'downloading'): 599 vo_resource.status = 'error' 600 vo_resource.save() 601 602 def is_working(self): 603 """Return if Manager is ready to accept requests. 604 605 This is to inform Service class that in case of volume driver 606 initialization failure the manager is actually down and not ready to 607 accept any requests. 608 """ 609 return self.driver.initialized 610 611 def _set_resource_host(self, resource): 612 """Set the host field on the DB to our own when we are clustered.""" 613 if (resource.is_clustered and 614 not vol_utils.hosts_are_equivalent(resource.host, self.host)): 615 pool = vol_utils.extract_host(resource.host, 'pool') 616 resource.host = vol_utils.append_host(self.host, pool) 617 resource.save() 618 619 @objects.Volume.set_workers 620 def create_volume(self, context, volume, request_spec=None, 621 filter_properties=None, allow_reschedule=True): 622 """Creates the volume.""" 623 # Log about unsupported drivers 624 utils.log_unsupported_driver_warning(self.driver) 625 626 # Make sure the host in the DB matches our own when clustered 627 self._set_resource_host(volume) 628 629 # Update our allocated capacity counter early to minimize race 630 # conditions with the scheduler. 631 self._update_allocated_capacity(volume) 632 # We lose the host value if we reschedule, so keep it here 633 original_host = volume.host 634 635 context_elevated = context.elevated() 636 if filter_properties is None: 637 filter_properties = {} 638 639 if request_spec is None: 640 request_spec = objects.RequestSpec() 641 642 try: 643 # NOTE(flaper87): Driver initialization is 644 # verified by the task itself. 645 flow_engine = create_volume.get_flow( 646 context_elevated, 647 self, 648 self.db, 649 self.driver, 650 self.scheduler_rpcapi, 651 self.host, 652 volume, 653 allow_reschedule, 654 context, 655 request_spec, 656 filter_properties, 657 image_volume_cache=self.image_volume_cache, 658 ) 659 except Exception: 660 msg = _("Create manager volume flow failed.") 661 LOG.exception(msg, resource={'type': 'volume', 'id': volume.id}) 662 raise exception.CinderException(msg) 663 664 snapshot_id = request_spec.get('snapshot_id') 665 source_volid = request_spec.get('source_volid') 666 667 if snapshot_id is not None: 668 # Make sure the snapshot is not deleted until we are done with it. 669 locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot') 670 elif source_volid is not None: 671 # Make sure the volume is not deleted until we are done with it. 672 locked_action = "%s-%s" % (source_volid, 'delete_volume') 673 else: 674 locked_action = None 675 676 def _run_flow(): 677 # This code executes create volume flow. If something goes wrong, 678 # flow reverts all job that was done and reraises an exception. 679 # Otherwise, all data that was generated by flow becomes available 680 # in flow engine's storage. 681 with flow_utils.DynamicLogListener(flow_engine, logger=LOG): 682 flow_engine.run() 683 684 # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to 685 # decide if allocated_capacity should be incremented. 686 rescheduled = False 687 688 try: 689 if locked_action is None: 690 _run_flow() 691 else: 692 with coordination.COORDINATOR.get_lock(locked_action): 693 _run_flow() 694 finally: 695 try: 696 flow_engine.storage.fetch('refreshed') 697 except tfe.NotFound: 698 # If there's no vol_ref, then flow is reverted. Lets check out 699 # if rescheduling occurred. 700 try: 701 rescheduled = flow_engine.storage.get_revert_result( 702 create_volume.OnFailureRescheduleTask.make_name( 703 [create_volume.ACTION])) 704 except tfe.NotFound: 705 pass 706 707 if rescheduled: 708 # NOTE(geguileo): Volume was rescheduled so we need to update 709 # volume stats because the volume wasn't created here. 710 # Volume.host is None now, so we pass the original host value. 711 self._update_allocated_capacity(volume, decrement=True, 712 host=original_host) 713 714 # Shared targets is only relevant for iSCSI connections. 715 # We default to True to be on the safe side. 716 volume.shared_targets = ( 717 self.driver.capabilities.get('storage_protocol') == 'iSCSI' and 718 self.driver.capabilities.get('shared_targets', True)) 719 # TODO(geguileo): service_uuid won't be enough on Active/Active 720 # deployments. There can be 2 services handling volumes from the same 721 # backend. 722 volume.service_uuid = self.service_uuid 723 volume.save() 724 725 LOG.info("Created volume successfully.", resource=volume) 726 return volume.id 727 728 def _check_is_our_resource(self, resource): 729 if resource.host: 730 res_backend = vol_utils.extract_host(resource.service_topic_queue) 731 backend = vol_utils.extract_host(self.service_topic_queue) 732 if res_backend != backend: 733 msg = (_('Invalid %(resource)s: %(resource)s %(id)s is not ' 734 'local to %(backend)s.') % 735 {'resource': resource.obj_name, 'id': resource.id, 736 'backend': backend}) 737 raise exception.Invalid(msg) 738 739 @coordination.synchronized('{volume.id}-{f_name}') 740 @objects.Volume.set_workers 741 def delete_volume(self, context, volume, unmanage_only=False, 742 cascade=False): 743 """Deletes and unexports volume. 744 745 1. Delete a volume(normal case) 746 Delete a volume and update quotas. 747 748 2. Delete a migration volume 749 If deleting the volume in a migration, we want to skip 750 quotas but we need database updates for the volume. 751 752 3. Delete a temp volume for backup 753 If deleting the temp volume for backup, we want to skip 754 quotas but we need database updates for the volume. 755 """ 756 757 context = context.elevated() 758 759 try: 760 volume.refresh() 761 except exception.VolumeNotFound: 762 # NOTE(thingee): It could be possible for a volume to 763 # be deleted when resuming deletes from init_host(). 764 LOG.debug("Attempted delete of non-existent volume: %s", volume.id) 765 return 766 767 if context.project_id != volume.project_id: 768 project_id = volume.project_id 769 else: 770 project_id = context.project_id 771 772 if volume['attach_status'] == fields.VolumeAttachStatus.ATTACHED: 773 # Volume is still attached, need to detach first 774 raise exception.VolumeAttached(volume_id=volume.id) 775 self._check_is_our_resource(volume) 776 777 if unmanage_only and volume.encryption_key_id is not None: 778 raise exception.Invalid( 779 reason=_("Unmanaging encrypted volumes is not " 780 "supported.")) 781 782 if unmanage_only and cascade: 783 # This could be done, but is ruled out for now just 784 # for simplicity. 785 raise exception.Invalid( 786 reason=_("Unmanage and cascade delete options " 787 "are mutually exclusive.")) 788 789 # To backup a snapshot or a 'in-use' volume, create a temp volume 790 # from the snapshot or in-use volume, and back it up. 791 # Get admin_metadata (needs admin context) to detect temporary volume. 792 is_temp_vol = False 793 with volume.obj_as_admin(): 794 if volume.admin_metadata.get('temporary', 'False') == 'True': 795 is_temp_vol = True 796 LOG.info("Trying to delete temp volume: %s", volume.id) 797 798 # The status 'deleting' is not included, because it only applies to 799 # the source volume to be deleted after a migration. No quota 800 # needs to be handled for it. 801 is_migrating = volume.migration_status not in (None, 'error', 802 'success') 803 is_migrating_dest = (is_migrating and 804 volume.migration_status.startswith( 805 'target:')) 806 notification = "delete.start" 807 if unmanage_only: 808 notification = "unmanage.start" 809 if not is_temp_vol: 810 self._notify_about_volume_usage(context, volume, notification) 811 try: 812 # NOTE(flaper87): Verify the driver is enabled 813 # before going forward. The exception will be caught 814 # and the volume status updated. 815 utils.require_driver_initialized(self.driver) 816 817 self.driver.remove_export(context, volume) 818 if unmanage_only: 819 self.driver.unmanage(volume) 820 elif cascade: 821 LOG.debug('Performing cascade delete.') 822 snapshots = objects.SnapshotList.get_all_for_volume(context, 823 volume.id) 824 for s in snapshots: 825 if s.status != fields.SnapshotStatus.DELETING: 826 self._clear_db(context, is_migrating_dest, volume, 827 'error_deleting') 828 829 msg = (_("Snapshot %(id)s was found in state " 830 "%(state)s rather than 'deleting' during " 831 "cascade delete.") % {'id': s.id, 832 'state': s.status}) 833 raise exception.InvalidSnapshot(reason=msg) 834 835 self.delete_snapshot(context, s) 836 837 LOG.debug('Snapshots deleted, issuing volume delete') 838 self.driver.delete_volume(volume) 839 else: 840 self.driver.delete_volume(volume) 841 except exception.VolumeIsBusy: 842 LOG.error("Unable to delete busy volume.", 843 resource=volume) 844 # If this is a destination volume, we have to clear the database 845 # record to avoid user confusion. 846 self._clear_db(context, is_migrating_dest, volume, 847 'available') 848 return 849 except Exception: 850 with excutils.save_and_reraise_exception(): 851 # If this is a destination volume, we have to clear the 852 # database record to avoid user confusion. 853 new_status = 'error_deleting' 854 if unmanage_only is True: 855 new_status = 'error_unmanaging' 856 857 self._clear_db(context, is_migrating_dest, volume, 858 new_status) 859 860 # If deleting source/destination volume in a migration or a temp 861 # volume for backup, we should skip quotas. 862 skip_quota = is_migrating or is_temp_vol 863 if not skip_quota: 864 # Get reservations 865 try: 866 reservations = None 867 if volume.status != 'error_managing_deleting': 868 reserve_opts = {'volumes': -1, 869 'gigabytes': -volume.size} 870 QUOTAS.add_volume_type_opts(context, 871 reserve_opts, 872 volume.volume_type_id) 873 reservations = QUOTAS.reserve(context, 874 project_id=project_id, 875 **reserve_opts) 876 except Exception: 877 LOG.exception("Failed to update usages deleting volume.", 878 resource=volume) 879 880 # Delete glance metadata if it exists 881 self.db.volume_glance_metadata_delete_by_volume(context, volume.id) 882 883 volume.destroy() 884 885 # If deleting source/destination volume in a migration or a temp 886 # volume for backup, we should skip quotas. 887 if not skip_quota: 888 notification = "delete.end" 889 if unmanage_only: 890 notification = "unmanage.end" 891 self._notify_about_volume_usage(context, volume, notification) 892 893 # Commit the reservations 894 if reservations: 895 QUOTAS.commit(context, reservations, project_id=project_id) 896 897 self._update_allocated_capacity(volume, decrement=True) 898 self.publish_service_capabilities(context) 899 900 msg = "Deleted volume successfully." 901 if unmanage_only: 902 msg = "Unmanaged volume successfully." 903 LOG.info(msg, resource=volume) 904 905 def _clear_db(self, context, is_migrating_dest, volume_ref, status): 906 # This method is called when driver.unmanage() or 907 # driver.delete_volume() fails in delete_volume(), so it is already 908 # in the exception handling part. 909 if is_migrating_dest: 910 volume_ref.destroy() 911 LOG.error("Unable to delete the destination volume " 912 "during volume migration, (NOTE: database " 913 "record needs to be deleted).", resource=volume_ref) 914 else: 915 volume_ref.status = status 916 volume_ref.save() 917 918 def _revert_to_snapshot_generic(self, ctxt, volume, snapshot): 919 """Generic way to revert volume to a snapshot. 920 921 the framework will use the generic way to implement the revert 922 to snapshot feature: 923 1. create a temporary volume from snapshot 924 2. mount two volumes to host 925 3. copy data from temporary volume to original volume 926 4. detach and destroy temporary volume 927 """ 928 temp_vol = None 929 930 try: 931 v_options = {'display_name': '[revert] temporary volume created ' 932 'from snapshot %s' % snapshot.id} 933 ctxt = context.get_internal_tenant_context() or ctxt 934 temp_vol = self.driver._create_temp_volume_from_snapshot( 935 ctxt, volume, snapshot, volume_options=v_options) 936 self._copy_volume_data(ctxt, temp_vol, volume) 937 self.driver.delete_volume(temp_vol) 938 temp_vol.destroy() 939 except Exception: 940 with excutils.save_and_reraise_exception(): 941 LOG.exception( 942 "Failed to use snapshot %(snapshot)s to create " 943 "a temporary volume and copy data to volume " 944 " %(volume)s.", 945 {'snapshot': snapshot.id, 946 'volume': volume.id}) 947 if temp_vol and temp_vol.status == 'available': 948 self.driver.delete_volume(temp_vol) 949 temp_vol.destroy() 950 951 def _revert_to_snapshot(self, context, volume, snapshot): 952 """Use driver or generic method to rollback volume.""" 953 954 try: 955 self.driver.revert_to_snapshot(context, volume, snapshot) 956 except (NotImplementedError, AttributeError): 957 LOG.info("Driver's 'revert_to_snapshot' is not found. " 958 "Try to use copy-snapshot-to-volume method.") 959 self._revert_to_snapshot_generic(context, volume, snapshot) 960 961 def _create_backup_snapshot(self, context, volume): 962 kwargs = { 963 'volume_id': volume.id, 964 'user_id': context.user_id, 965 'project_id': context.project_id, 966 'status': fields.SnapshotStatus.CREATING, 967 'progress': '0%', 968 'volume_size': volume.size, 969 'display_name': '[revert] volume %s backup snapshot' % volume.id, 970 'display_description': 'This is only used for backup when ' 971 'reverting. If the reverting process ' 972 'failed, you can restore you data by ' 973 'creating new volume with this snapshot.', 974 'volume_type_id': volume.volume_type_id, 975 'encryption_key_id': volume.encryption_key_id, 976 'metadata': {} 977 } 978 snapshot = objects.Snapshot(context=context, **kwargs) 979 snapshot.create() 980 self.create_snapshot(context, snapshot) 981 return snapshot 982 983 def revert_to_snapshot(self, context, volume, snapshot): 984 """Revert a volume to a snapshot. 985 986 The process of reverting to snapshot consists of several steps: 987 1. create a snapshot for backup (in case of data loss) 988 2.1. use driver's specific logic to revert volume 989 2.2. try the generic way to revert volume if driver's method is missing 990 3. delete the backup snapshot 991 """ 992 backup_snapshot = None 993 try: 994 LOG.info("Start to perform revert to snapshot process.") 995 996 self._notify_about_volume_usage(context, volume, 997 "revert.start") 998 self._notify_about_snapshot_usage(context, snapshot, 999 "revert.start") 1000 1001 # Create a snapshot which can be used to restore the volume 1002 # data by hand if revert process failed. 1003 1004 if self.driver.snapshot_revert_use_temp_snapshot(): 1005 backup_snapshot = self._create_backup_snapshot(context, 1006 volume) 1007 self._revert_to_snapshot(context, volume, snapshot) 1008 except Exception as error: 1009 with excutils.save_and_reraise_exception(): 1010 self._notify_about_volume_usage(context, volume, 1011 "revert.end") 1012 self._notify_about_snapshot_usage(context, snapshot, 1013 "revert.end") 1014 msg = ('Volume %(v_id)s revert to ' 1015 'snapshot %(s_id)s failed with %(error)s.') 1016 msg_args = {'v_id': volume.id, 1017 's_id': snapshot.id, 1018 'error': six.text_type(error)} 1019 v_res = volume.update_single_status_where( 1020 'error', 1021 'reverting') 1022 if not v_res: 1023 msg_args = {"id": volume.id, 1024 "status": 'error'} 1025 msg += ("Failed to reset volume %(id)s " 1026 "status to %(status)s.") % msg_args 1027 1028 s_res = snapshot.update_single_status_where( 1029 fields.SnapshotStatus.AVAILABLE, 1030 fields.SnapshotStatus.RESTORING) 1031 if not s_res: 1032 msg_args = {"id": snapshot.id, 1033 "status": 1034 fields.SnapshotStatus.ERROR} 1035 msg += ("Failed to reset snapshot %(id)s " 1036 "status to %(status)s." % msg_args) 1037 LOG.exception(msg, msg_args) 1038 1039 v_res = volume.update_single_status_where( 1040 'available', 'reverting') 1041 if not v_res: 1042 msg_args = {"id": volume.id, 1043 "status": 'available'} 1044 msg = _("Revert finished, but failed to reset " 1045 "volume %(id)s status to %(status)s, " 1046 "please manually reset it.") % msg_args 1047 raise exception.BadResetResourceStatus(message=msg) 1048 1049 s_res = snapshot.update_single_status_where( 1050 fields.SnapshotStatus.AVAILABLE, 1051 fields.SnapshotStatus.RESTORING) 1052 if not s_res: 1053 msg_args = {"id": snapshot.id, 1054 "status": 1055 fields.SnapshotStatus.AVAILABLE} 1056 msg = _("Revert finished, but failed to reset " 1057 "snapshot %(id)s status to %(status)s, " 1058 "please manually reset it.") % msg_args 1059 raise exception.BadResetResourceStatus(message=msg) 1060 if backup_snapshot: 1061 self.delete_snapshot(context, 1062 backup_snapshot, handle_quota=False) 1063 msg = ('Volume %(v_id)s reverted to snapshot %(snap_id)s ' 1064 'successfully.') 1065 msg_args = {'v_id': volume.id, 'snap_id': snapshot.id} 1066 LOG.info(msg, msg_args) 1067 self._notify_about_volume_usage(context, volume, "revert.end") 1068 self._notify_about_snapshot_usage(context, snapshot, "revert.end") 1069 1070 @objects.Snapshot.set_workers 1071 def create_snapshot(self, context, snapshot): 1072 """Creates and exports the snapshot.""" 1073 context = context.elevated() 1074 1075 self._notify_about_snapshot_usage( 1076 context, snapshot, "create.start") 1077 1078 try: 1079 # NOTE(flaper87): Verify the driver is enabled 1080 # before going forward. The exception will be caught 1081 # and the snapshot status updated. 1082 utils.require_driver_initialized(self.driver) 1083 1084 # Pass context so that drivers that want to use it, can, 1085 # but it is not a requirement for all drivers. 1086 snapshot.context = context 1087 1088 model_update = self.driver.create_snapshot(snapshot) 1089 if model_update: 1090 snapshot.update(model_update) 1091 snapshot.save() 1092 1093 except Exception: 1094 with excutils.save_and_reraise_exception(): 1095 snapshot.status = fields.SnapshotStatus.ERROR 1096 snapshot.save() 1097 1098 vol_ref = self.db.volume_get(context, snapshot.volume_id) 1099 if vol_ref.bootable: 1100 try: 1101 self.db.volume_glance_metadata_copy_to_snapshot( 1102 context, snapshot.id, snapshot.volume_id) 1103 except exception.GlanceMetadataNotFound: 1104 # If volume is not created from image, No glance metadata 1105 # would be available for that volume in 1106 # volume glance metadata table 1107 pass 1108 except exception.CinderException as ex: 1109 LOG.exception("Failed updating snapshot" 1110 " metadata using the provided volumes" 1111 " %(volume_id)s metadata", 1112 {'volume_id': snapshot.volume_id}, 1113 resource=snapshot) 1114 snapshot.status = fields.SnapshotStatus.ERROR 1115 snapshot.save() 1116 raise exception.MetadataCopyFailure(reason=six.text_type(ex)) 1117 1118 snapshot.status = fields.SnapshotStatus.AVAILABLE 1119 snapshot.progress = '100%' 1120 # Resync with the volume's DB value. This addresses the case where 1121 # the snapshot creation was in flight just prior to when the volume's 1122 # fixed_key encryption key ID was migrated to Barbican. 1123 snapshot.encryption_key_id = vol_ref.encryption_key_id 1124 snapshot.save() 1125 1126 self._notify_about_snapshot_usage(context, snapshot, "create.end") 1127 LOG.info("Create snapshot completed successfully", 1128 resource=snapshot) 1129 return snapshot.id 1130 1131 @coordination.synchronized('{snapshot.id}-{f_name}') 1132 def delete_snapshot(self, context, snapshot, 1133 unmanage_only=False, handle_quota=True): 1134 """Deletes and unexports snapshot.""" 1135 context = context.elevated() 1136 snapshot._context = context 1137 project_id = snapshot.project_id 1138 1139 self._notify_about_snapshot_usage( 1140 context, snapshot, "delete.start") 1141 1142 try: 1143 # NOTE(flaper87): Verify the driver is enabled 1144 # before going forward. The exception will be caught 1145 # and the snapshot status updated. 1146 utils.require_driver_initialized(self.driver) 1147 1148 # Pass context so that drivers that want to use it, can, 1149 # but it is not a requirement for all drivers. 1150 snapshot.context = context 1151 snapshot.save() 1152 1153 if unmanage_only: 1154 self.driver.unmanage_snapshot(snapshot) 1155 else: 1156 self.driver.delete_snapshot(snapshot) 1157 except exception.SnapshotIsBusy: 1158 LOG.error("Delete snapshot failed, due to snapshot busy.", 1159 resource=snapshot) 1160 snapshot.status = fields.SnapshotStatus.AVAILABLE 1161 snapshot.save() 1162 return 1163 except Exception: 1164 with excutils.save_and_reraise_exception(): 1165 snapshot.status = fields.SnapshotStatus.ERROR_DELETING 1166 snapshot.save() 1167 1168 # Get reservations 1169 reservations = None 1170 try: 1171 if handle_quota: 1172 if CONF.no_snapshot_gb_quota: 1173 reserve_opts = {'snapshots': -1} 1174 else: 1175 reserve_opts = { 1176 'snapshots': -1, 1177 'gigabytes': -snapshot.volume_size, 1178 } 1179 volume_ref = self.db.volume_get(context, snapshot.volume_id) 1180 QUOTAS.add_volume_type_opts(context, 1181 reserve_opts, 1182 volume_ref.get('volume_type_id')) 1183 reservations = QUOTAS.reserve(context, 1184 project_id=project_id, 1185 **reserve_opts) 1186 except Exception: 1187 reservations = None 1188 LOG.exception("Update snapshot usages failed.", 1189 resource=snapshot) 1190 self.db.volume_glance_metadata_delete_by_snapshot(context, snapshot.id) 1191 snapshot.destroy() 1192 self._notify_about_snapshot_usage(context, snapshot, "delete.end") 1193 1194 # Commit the reservations 1195 if reservations: 1196 QUOTAS.commit(context, reservations, project_id=project_id) 1197 1198 msg = "Delete snapshot completed successfully." 1199 if unmanage_only: 1200 msg = "Unmanage snapshot completed successfully." 1201 LOG.info(msg, resource=snapshot) 1202 1203 @coordination.synchronized('{volume_id}') 1204 def attach_volume(self, context, volume_id, instance_uuid, host_name, 1205 mountpoint, mode, volume=None): 1206 """Updates db to show volume is attached.""" 1207 # FIXME(lixiaoy1): Remove this in v4.0 of RPC API. 1208 if volume is None: 1209 # For older clients, mimic the old behavior and look 1210 # up the volume by its volume_id. 1211 volume = objects.Volume.get_by_id(context, volume_id) 1212 # Get admin_metadata. This needs admin context. 1213 with volume.obj_as_admin(): 1214 volume_metadata = volume.admin_metadata 1215 # check the volume status before attaching 1216 if volume.status == 'attaching': 1217 if (volume_metadata.get('attached_mode') and 1218 volume_metadata.get('attached_mode') != mode): 1219 raise exception.InvalidVolume( 1220 reason=_("being attached by different mode")) 1221 1222 host_name_sanitized = utils.sanitize_hostname( 1223 host_name) if host_name else None 1224 if instance_uuid: 1225 attachments = ( 1226 VA_LIST.get_all_by_instance_uuid( 1227 context, instance_uuid)) 1228 else: 1229 attachments = ( 1230 VA_LIST.get_all_by_host( 1231 context, host_name_sanitized)) 1232 if attachments: 1233 # check if volume<->instance mapping is already tracked in DB 1234 for attachment in attachments: 1235 if attachment['volume_id'] == volume_id: 1236 volume.status = 'in-use' 1237 volume.save() 1238 return attachment 1239 1240 if (volume.status == 'in-use' and not volume.multiattach 1241 and not volume.migration_status): 1242 raise exception.InvalidVolume( 1243 reason=_("volume is already attached and multiple attachments " 1244 "are not enabled")) 1245 1246 self._notify_about_volume_usage(context, volume, 1247 "attach.start") 1248 1249 attachment = volume.begin_attach(mode) 1250 1251 if instance_uuid and not uuidutils.is_uuid_like(instance_uuid): 1252 attachment.attach_status = ( 1253 fields.VolumeAttachStatus.ERROR_ATTACHING) 1254 attachment.save() 1255 raise exception.InvalidUUID(uuid=instance_uuid) 1256 1257 try: 1258 if volume_metadata.get('readonly') == 'True' and mode != 'ro': 1259 raise exception.InvalidVolumeAttachMode(mode=mode, 1260 volume_id=volume.id) 1261 # NOTE(flaper87): Verify the driver is enabled 1262 # before going forward. The exception will be caught 1263 # and the volume status updated. 1264 utils.require_driver_initialized(self.driver) 1265 1266 LOG.info('Attaching volume %(volume_id)s to instance ' 1267 '%(instance)s at mountpoint %(mount)s on host ' 1268 '%(host)s.', 1269 {'volume_id': volume_id, 'instance': instance_uuid, 1270 'mount': mountpoint, 'host': host_name_sanitized}, 1271 resource=volume) 1272 self.driver.attach_volume(context, 1273 volume, 1274 instance_uuid, 1275 host_name_sanitized, 1276 mountpoint) 1277 except Exception as excep: 1278 with excutils.save_and_reraise_exception(): 1279 self.message_api.create( 1280 context, 1281 message_field.Action.ATTACH_VOLUME, 1282 resource_uuid=volume_id, 1283 exception=excep) 1284 attachment.attach_status = ( 1285 fields.VolumeAttachStatus.ERROR_ATTACHING) 1286 attachment.save() 1287 1288 volume = attachment.finish_attach( 1289 instance_uuid, 1290 host_name_sanitized, 1291 mountpoint, 1292 mode) 1293 1294 self._notify_about_volume_usage(context, volume, "attach.end") 1295 LOG.info("Attach volume completed successfully.", 1296 resource=volume) 1297 return attachment 1298 1299 @coordination.synchronized('{volume_id}-{f_name}') 1300 def detach_volume(self, context, volume_id, attachment_id=None, 1301 volume=None): 1302 """Updates db to show volume is detached.""" 1303 # TODO(vish): refactor this into a more general "unreserve" 1304 # FIXME(lixiaoy1): Remove this in v4.0 of RPC API. 1305 if volume is None: 1306 # For older clients, mimic the old behavior and look up the volume 1307 # by its volume_id. 1308 volume = objects.Volume.get_by_id(context, volume_id) 1309 1310 if attachment_id: 1311 try: 1312 attachment = objects.VolumeAttachment.get_by_id(context, 1313 attachment_id) 1314 except exception.VolumeAttachmentNotFound: 1315 LOG.info("Volume detach called, but volume not attached.", 1316 resource=volume) 1317 # We need to make sure the volume status is set to the correct 1318 # status. It could be in detaching status now, and we don't 1319 # want to leave it there. 1320 volume.finish_detach(attachment_id) 1321 return 1322 else: 1323 # We can try and degrade gracefully here by trying to detach 1324 # a volume without the attachment_id here if the volume only has 1325 # one attachment. This is for backwards compatibility. 1326 attachments = volume.volume_attachment 1327 if len(attachments) > 1: 1328 # There are more than 1 attachments for this volume 1329 # we have to have an attachment id. 1330 msg = _("Detach volume failed: More than one attachment, " 1331 "but no attachment_id provided.") 1332 LOG.error(msg, resource=volume) 1333 raise exception.InvalidVolume(reason=msg) 1334 elif len(attachments) == 1: 1335 attachment = attachments[0] 1336 else: 1337 # there aren't any attachments for this volume. 1338 # so set the status to available and move on. 1339 LOG.info("Volume detach called, but volume not attached.", 1340 resource=volume) 1341 volume.status = 'available' 1342 volume.attach_status = fields.VolumeAttachStatus.DETACHED 1343 volume.save() 1344 return 1345 1346 self._notify_about_volume_usage(context, volume, "detach.start") 1347 try: 1348 # NOTE(flaper87): Verify the driver is enabled 1349 # before going forward. The exception will be caught 1350 # and the volume status updated. 1351 utils.require_driver_initialized(self.driver) 1352 1353 LOG.info('Detaching volume %(volume_id)s from instance ' 1354 '%(instance)s.', 1355 {'volume_id': volume_id, 1356 'instance': attachment.get('instance_uuid')}, 1357 resource=volume) 1358 self.driver.detach_volume(context, volume, attachment) 1359 except Exception: 1360 with excutils.save_and_reraise_exception(): 1361 self.db.volume_attachment_update( 1362 context, attachment.get('id'), { 1363 'attach_status': 1364 fields.VolumeAttachStatus.ERROR_DETACHING}) 1365 1366 # NOTE(jdg): We used to do an ensure export here to 1367 # catch upgrades while volumes were attached (E->F) 1368 # this was necessary to convert in-use volumes from 1369 # int ID's to UUID's. Don't need this any longer 1370 1371 # We're going to remove the export here 1372 # (delete the iscsi target) 1373 try: 1374 utils.require_driver_initialized(self.driver) 1375 self.driver.remove_export(context.elevated(), volume) 1376 except exception.DriverNotInitialized: 1377 with excutils.save_and_reraise_exception(): 1378 LOG.exception("Detach volume failed, due to " 1379 "uninitialized driver.", 1380 resource=volume) 1381 except Exception as ex: 1382 LOG.exception("Detach volume failed, due to " 1383 "remove-export failure.", 1384 resource=volume) 1385 raise exception.RemoveExportException(volume=volume_id, 1386 reason=six.text_type(ex)) 1387 1388 volume.finish_detach(attachment.id) 1389 self._notify_about_volume_usage(context, volume, "detach.end") 1390 LOG.info("Detach volume completed successfully.", resource=volume) 1391 1392 def _create_image_cache_volume_entry(self, ctx, volume_ref, 1393 image_id, image_meta): 1394 """Create a new image-volume and cache entry for it. 1395 1396 This assumes that the image has already been downloaded and stored 1397 in the volume described by the volume_ref. 1398 """ 1399 cache_entry = self.image_volume_cache.get_entry(ctx, 1400 volume_ref, 1401 image_id, 1402 image_meta) 1403 if cache_entry: 1404 LOG.debug('Cache entry already exists with image ID %' 1405 '(image_id)s', 1406 {'image_id': image_id}) 1407 return 1408 1409 image_volume = None 1410 try: 1411 if not self.image_volume_cache.ensure_space(ctx, volume_ref): 1412 LOG.warning('Unable to ensure space for image-volume in' 1413 ' cache. Will skip creating entry for image' 1414 ' %(image)s on %(service)s.', 1415 {'image': image_id, 1416 'service': volume_ref.service_topic_queue}) 1417 return 1418 1419 image_volume = self._clone_image_volume(ctx, 1420 volume_ref, 1421 image_meta) 1422 if not image_volume: 1423 LOG.warning('Unable to clone image_volume for image ' 1424 '%(image_id)s will not create cache entry.', 1425 {'image_id': image_id}) 1426 return 1427 self.image_volume_cache.create_cache_entry( 1428 ctx, 1429 image_volume, 1430 image_id, 1431 image_meta 1432 ) 1433 except exception.CinderException as e: 1434 LOG.warning('Failed to create new image-volume cache entry.' 1435 ' Error: %(exception)s', {'exception': e}) 1436 if image_volume: 1437 self.delete_volume(ctx, image_volume) 1438 1439 def _clone_image_volume(self, ctx, volume, image_meta): 1440 volume_type_id = volume.get('volume_type_id') 1441 reserve_opts = {'volumes': 1, 'gigabytes': volume.size} 1442 QUOTAS.add_volume_type_opts(ctx, reserve_opts, volume_type_id) 1443 reservations = QUOTAS.reserve(ctx, **reserve_opts) 1444 try: 1445 new_vol_values = {k: volume[k] for k in set(volume.keys()) - 1446 self._VOLUME_CLONE_SKIP_PROPERTIES} 1447 new_vol_values['volume_type_id'] = volume_type_id 1448 new_vol_values['attach_status'] = ( 1449 fields.VolumeAttachStatus.DETACHED) 1450 new_vol_values['status'] = 'creating' 1451 new_vol_values['project_id'] = ctx.project_id 1452 new_vol_values['display_name'] = 'image-%s' % image_meta['id'] 1453 new_vol_values['source_volid'] = volume.id 1454 1455 LOG.debug('Creating image volume entry: %s.', new_vol_values) 1456 image_volume = objects.Volume(context=ctx, **new_vol_values) 1457 image_volume.create() 1458 except Exception as ex: 1459 LOG.exception('Create clone_image_volume: %(volume_id)s' 1460 'for image %(image_id)s, ' 1461 'failed (Exception: %(except)s)', 1462 {'volume_id': volume.id, 1463 'image_id': image_meta['id'], 1464 'except': ex}) 1465 QUOTAS.rollback(ctx, reservations) 1466 return 1467 1468 QUOTAS.commit(ctx, reservations, 1469 project_id=new_vol_values['project_id']) 1470 1471 try: 1472 self.create_volume(ctx, image_volume, allow_reschedule=False) 1473 image_volume.refresh() 1474 if image_volume.status != 'available': 1475 raise exception.InvalidVolume(_('Volume is not available.')) 1476 1477 self.db.volume_admin_metadata_update(ctx.elevated(), 1478 image_volume.id, 1479 {'readonly': 'True'}, 1480 False) 1481 return image_volume 1482 except exception.CinderException: 1483 LOG.exception('Failed to clone volume %(volume_id)s for ' 1484 'image %(image_id)s.', 1485 {'volume_id': volume.id, 1486 'image_id': image_meta['id']}) 1487 try: 1488 self.delete_volume(ctx, image_volume) 1489 except exception.CinderException: 1490 LOG.exception('Could not delete the image volume %(id)s.', 1491 {'id': volume.id}) 1492 return 1493 1494 def _clone_image_volume_and_add_location(self, ctx, volume, image_service, 1495 image_meta): 1496 """Create a cloned volume and register its location to the image.""" 1497 if (image_meta['disk_format'] != 'raw' or 1498 image_meta['container_format'] != 'bare'): 1499 return False 1500 1501 image_volume_context = ctx 1502 if self.driver.configuration.image_upload_use_internal_tenant: 1503 internal_ctx = context.get_internal_tenant_context() 1504 if internal_ctx: 1505 image_volume_context = internal_ctx 1506 1507 image_volume = self._clone_image_volume(image_volume_context, 1508 volume, 1509 image_meta) 1510 if not image_volume: 1511 return False 1512 1513 # The image_owner metadata should be set before uri is added to 1514 # the image so glance cinder store can check its owner. 1515 image_volume_meta = {'image_owner': ctx.project_id} 1516 self.db.volume_metadata_update(image_volume_context, 1517 image_volume.id, 1518 image_volume_meta, 1519 False) 1520 1521 uri = 'cinder://%s' % image_volume.id 1522 image_registered = None 1523 try: 1524 image_registered = image_service.add_location( 1525 ctx, image_meta['id'], uri, {}) 1526 except (exception.NotAuthorized, exception.Invalid, 1527 exception.NotFound): 1528 LOG.exception('Failed to register image volume location ' 1529 '%(uri)s.', {'uri': uri}) 1530 1531 if not image_registered: 1532 LOG.warning('Registration of image volume URI %(uri)s ' 1533 'to image %(image_id)s failed.', 1534 {'uri': uri, 'image_id': image_meta['id']}) 1535 try: 1536 self.delete_volume(image_volume_context, image_volume) 1537 except exception.CinderException: 1538 LOG.exception('Could not delete failed image volume ' 1539 '%(id)s.', {'id': image_volume.id}) 1540 return False 1541 1542 image_volume_meta['glance_image_id'] = image_meta['id'] 1543 self.db.volume_metadata_update(image_volume_context, 1544 image_volume.id, 1545 image_volume_meta, 1546 False) 1547 return True 1548 1549 def copy_volume_to_image(self, context, volume_id, image_meta): 1550 """Uploads the specified volume to Glance. 1551 1552 image_meta is a dictionary containing the following keys: 1553 'id', 'container_format', 'disk_format' 1554 1555 """ 1556 payload = {'volume_id': volume_id, 'image_id': image_meta['id']} 1557 image_service = None 1558 try: 1559 volume = objects.Volume.get_by_id(context, volume_id) 1560 1561 # NOTE(flaper87): Verify the driver is enabled 1562 # before going forward. The exception will be caught 1563 # and the volume status updated. 1564 utils.require_driver_initialized(self.driver) 1565 1566 image_service, image_id = \ 1567 glance.get_remote_image_service(context, image_meta['id']) 1568 if (self.driver.configuration.image_upload_use_cinder_backend 1569 and self._clone_image_volume_and_add_location( 1570 context, volume, image_service, image_meta)): 1571 LOG.debug("Registered image volume location to glance " 1572 "image-id: %(image_id)s.", 1573 {'image_id': image_meta['id']}, 1574 resource=volume) 1575 else: 1576 self.driver.copy_volume_to_image(context, volume, 1577 image_service, image_meta) 1578 LOG.debug("Uploaded volume to glance image-id: %(image_id)s.", 1579 {'image_id': image_meta['id']}, 1580 resource=volume) 1581 except Exception as error: 1582 LOG.error("Upload volume to image encountered an error " 1583 "(image-id: %(image_id)s).", 1584 {'image_id': image_meta['id']}, 1585 resource=volume) 1586 self.message_api.create( 1587 context, 1588 message_field.Action.COPY_VOLUME_TO_IMAGE, 1589 resource_uuid=volume_id, 1590 exception=error, 1591 detail=message_field.Detail.FAILED_TO_UPLOAD_VOLUME) 1592 if image_service is not None: 1593 # Deletes the image if it is in queued or saving state 1594 self._delete_image(context, image_meta['id'], image_service) 1595 with excutils.save_and_reraise_exception(): 1596 payload['message'] = six.text_type(error) 1597 finally: 1598 self.db.volume_update_status_based_on_attachment(context, 1599 volume_id) 1600 LOG.info("Copy volume to image completed successfully.", 1601 resource=volume) 1602 1603 def _delete_image(self, context, image_id, image_service): 1604 """Deletes an image stuck in queued or saving state.""" 1605 try: 1606 image_meta = image_service.show(context, image_id) 1607 image_status = image_meta.get('status') 1608 if image_status == 'queued' or image_status == 'saving': 1609 LOG.warning("Deleting image in unexpected status: " 1610 "%(image_status)s.", 1611 {'image_status': image_status}, 1612 resource={'type': 'image', 'id': image_id}) 1613 image_service.delete(context, image_id) 1614 except Exception: 1615 LOG.warning("Image delete encountered an error.", 1616 exc_info=True, resource={'type': 'image', 1617 'id': image_id}) 1618 1619 def _parse_connection_options(self, context, volume, conn_info): 1620 # Add qos_specs to connection info 1621 typeid = volume.volume_type_id 1622 specs = None 1623 if typeid: 1624 res = volume_types.get_volume_type_qos_specs(typeid) 1625 qos = res['qos_specs'] 1626 # only pass qos_specs that is designated to be consumed by 1627 # front-end, or both front-end and back-end. 1628 if qos and qos.get('consumer') in ['front-end', 'both']: 1629 specs = qos.get('specs') 1630 1631 if specs is not None: 1632 # Compute fixed IOPS values for per-GB keys 1633 if 'write_iops_sec_per_gb' in specs: 1634 specs['write_iops_sec'] = ( 1635 int(specs['write_iops_sec_per_gb']) * int(volume.size)) 1636 specs.pop('write_iops_sec_per_gb') 1637 1638 if 'read_iops_sec_per_gb' in specs: 1639 specs['read_iops_sec'] = ( 1640 int(specs['read_iops_sec_per_gb']) * int(volume.size)) 1641 specs.pop('read_iops_sec_per_gb') 1642 1643 if 'total_iops_sec_per_gb' in specs: 1644 specs['total_iops_sec'] = ( 1645 int(specs['total_iops_sec_per_gb']) * int(volume.size)) 1646 specs.pop('total_iops_sec_per_gb') 1647 1648 qos_spec = dict(qos_specs=specs) 1649 conn_info['data'].update(qos_spec) 1650 1651 # Add access_mode to connection info 1652 volume_metadata = volume.admin_metadata 1653 access_mode = volume_metadata.get('attached_mode') 1654 if access_mode is None: 1655 # NOTE(zhiyan): client didn't call 'os-attach' before 1656 access_mode = ('ro' 1657 if volume_metadata.get('readonly') == 'True' 1658 else 'rw') 1659 conn_info['data']['access_mode'] = access_mode 1660 1661 # Add encrypted flag to connection_info if not set in the driver. 1662 if conn_info['data'].get('encrypted') is None: 1663 encrypted = bool(volume.encryption_key_id) 1664 conn_info['data']['encrypted'] = encrypted 1665 1666 # Add discard flag to connection_info if not set in the driver and 1667 # configured to be reported. 1668 if conn_info['data'].get('discard') is None: 1669 discard_supported = (self.driver.configuration 1670 .safe_get('report_discard_supported')) 1671 if discard_supported: 1672 conn_info['data']['discard'] = True 1673 1674 return conn_info 1675 1676 def initialize_connection(self, context, volume, connector): 1677 """Prepare volume for connection from host represented by connector. 1678 1679 This method calls the driver initialize_connection and returns 1680 it to the caller. The connector parameter is a dictionary with 1681 information about the host that will connect to the volume in the 1682 following format:: 1683 1684 .. code:: json 1685 1686 { 1687 'ip': ip, 1688 'initiator': initiator, 1689 } 1690 1691 ip: the ip address of the connecting machine 1692 1693 initiator: the iscsi initiator name of the connecting machine. 1694 This can be None if the connecting machine does not support iscsi 1695 connections. 1696 1697 driver is responsible for doing any necessary security setup and 1698 returning a connection_info dictionary in the following format:: 1699 1700 .. code:: json 1701 1702 { 1703 'driver_volume_type': driver_volume_type, 1704 'data': data, 1705 } 1706 1707 driver_volume_type: a string to identify the type of volume. This 1708 can be used by the calling code to determine the 1709 strategy for connecting to the volume. This could 1710 be 'iscsi', 'rbd', 'sheepdog', etc. 1711 1712 data: this is the data that the calling code will use to connect 1713 to the volume. Keep in mind that this will be serialized to 1714 json in various places, so it should not contain any non-json 1715 data types. 1716 """ 1717 # NOTE(flaper87): Verify the driver is enabled 1718 # before going forward. The exception will be caught 1719 # and the volume status updated. 1720 1721 # TODO(jdg): Add deprecation warning 1722 utils.require_driver_initialized(self.driver) 1723 try: 1724 self.driver.validate_connector(connector) 1725 except exception.InvalidConnectorException as err: 1726 raise exception.InvalidInput(reason=six.text_type(err)) 1727 except Exception as err: 1728 err_msg = (_("Validate volume connection failed " 1729 "(error: %(err)s).") % {'err': six.text_type(err)}) 1730 LOG.exception(err_msg, resource=volume) 1731 raise exception.VolumeBackendAPIException(data=err_msg) 1732 1733 try: 1734 model_update = self.driver.create_export(context.elevated(), 1735 volume, connector) 1736 except exception.CinderException as ex: 1737 msg = _("Create export of volume failed (%s)") % ex.msg 1738 LOG.exception(msg, resource=volume) 1739 raise exception.VolumeBackendAPIException(data=msg) 1740 1741 try: 1742 if model_update: 1743 volume.update(model_update) 1744 volume.save() 1745 except Exception as ex: 1746 LOG.exception("Model update failed.", resource=volume) 1747 try: 1748 self.driver.remove_export(context.elevated(), volume) 1749 except Exception: 1750 LOG.exception('Could not remove export after DB model failed.') 1751 raise exception.ExportFailure(reason=six.text_type(ex)) 1752 1753 try: 1754 conn_info = self.driver.initialize_connection(volume, connector) 1755 except Exception as err: 1756 err_msg = (_("Driver initialize connection failed " 1757 "(error: %(err)s).") % {'err': six.text_type(err)}) 1758 LOG.exception(err_msg, resource=volume) 1759 1760 self.driver.remove_export(context.elevated(), volume) 1761 1762 raise exception.VolumeBackendAPIException(data=err_msg) 1763 1764 conn_info = self._parse_connection_options(context, volume, conn_info) 1765 LOG.info("Initialize volume connection completed successfully.", 1766 resource=volume) 1767 return conn_info 1768 1769 def initialize_connection_snapshot(self, ctxt, snapshot_id, connector): 1770 utils.require_driver_initialized(self.driver) 1771 snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id) 1772 try: 1773 self.driver.validate_connector(connector) 1774 except exception.InvalidConnectorException as err: 1775 raise exception.InvalidInput(reason=six.text_type(err)) 1776 except Exception as err: 1777 err_msg = (_("Validate snapshot connection failed " 1778 "(error: %(err)s).") % {'err': six.text_type(err)}) 1779 LOG.exception(err_msg, resource=snapshot) 1780 raise exception.VolumeBackendAPIException(data=err_msg) 1781 1782 model_update = None 1783 try: 1784 LOG.debug("Snapshot %s: creating export.", snapshot.id) 1785 model_update = self.driver.create_export_snapshot( 1786 ctxt.elevated(), snapshot, connector) 1787 if model_update: 1788 snapshot.provider_location = model_update.get( 1789 'provider_location', None) 1790 snapshot.provider_auth = model_update.get( 1791 'provider_auth', None) 1792 snapshot.save() 1793 except exception.CinderException as ex: 1794 msg = _("Create export of snapshot failed (%s)") % ex.msg 1795 LOG.exception(msg, resource=snapshot) 1796 raise exception.VolumeBackendAPIException(data=msg) 1797 1798 try: 1799 if model_update: 1800 snapshot.update(model_update) 1801 snapshot.save() 1802 except exception.CinderException as ex: 1803 LOG.exception("Model update failed.", resource=snapshot) 1804 raise exception.ExportFailure(reason=six.text_type(ex)) 1805 1806 try: 1807 conn = self.driver.initialize_connection_snapshot(snapshot, 1808 connector) 1809 except Exception as err: 1810 try: 1811 err_msg = (_('Unable to fetch connection information from ' 1812 'backend: %(err)s') % 1813 {'err': six.text_type(err)}) 1814 LOG.error(err_msg) 1815 LOG.debug("Cleaning up failed connect initialization.") 1816 self.driver.remove_export_snapshot(ctxt.elevated(), snapshot) 1817 except Exception as ex: 1818 ex_msg = (_('Error encountered during cleanup ' 1819 'of a failed attach: %(ex)s') % 1820 {'ex': six.text_type(ex)}) 1821 LOG.error(ex_msg) 1822 raise exception.VolumeBackendAPIException(data=ex_msg) 1823 raise exception.VolumeBackendAPIException(data=err_msg) 1824 1825 LOG.info("Initialize snapshot connection completed successfully.", 1826 resource=snapshot) 1827 return conn 1828 1829 def terminate_connection(self, context, volume_id, connector, force=False): 1830 """Cleanup connection from host represented by connector. 1831 1832 The format of connector is the same as for initialize_connection. 1833 """ 1834 # NOTE(flaper87): Verify the driver is enabled 1835 # before going forward. The exception will be caught 1836 # and the volume status updated. 1837 utils.require_driver_initialized(self.driver) 1838 1839 volume_ref = self.db.volume_get(context, volume_id) 1840 try: 1841 self.driver.terminate_connection(volume_ref, connector, 1842 force=force) 1843 except Exception as err: 1844 err_msg = (_('Terminate volume connection failed: %(err)s') 1845 % {'err': six.text_type(err)}) 1846 LOG.exception(err_msg, resource=volume_ref) 1847 raise exception.VolumeBackendAPIException(data=err_msg) 1848 LOG.info("Terminate volume connection completed successfully.", 1849 resource=volume_ref) 1850 1851 def terminate_connection_snapshot(self, ctxt, snapshot_id, 1852 connector, force=False): 1853 utils.require_driver_initialized(self.driver) 1854 1855 snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id) 1856 try: 1857 self.driver.terminate_connection_snapshot(snapshot, connector, 1858 force=force) 1859 except Exception as err: 1860 err_msg = (_('Terminate snapshot connection failed: %(err)s') 1861 % {'err': six.text_type(err)}) 1862 LOG.exception(err_msg, resource=snapshot) 1863 raise exception.VolumeBackendAPIException(data=err_msg) 1864 LOG.info("Terminate snapshot connection completed successfully.", 1865 resource=snapshot) 1866 1867 def remove_export(self, context, volume_id): 1868 """Removes an export for a volume.""" 1869 utils.require_driver_initialized(self.driver) 1870 volume_ref = self.db.volume_get(context, volume_id) 1871 try: 1872 self.driver.remove_export(context, volume_ref) 1873 except Exception: 1874 msg = _("Remove volume export failed.") 1875 LOG.exception(msg, resource=volume_ref) 1876 raise exception.VolumeBackendAPIException(data=msg) 1877 1878 LOG.info("Remove volume export completed successfully.", 1879 resource=volume_ref) 1880 1881 def remove_export_snapshot(self, ctxt, snapshot_id): 1882 """Removes an export for a snapshot.""" 1883 utils.require_driver_initialized(self.driver) 1884 snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id) 1885 try: 1886 self.driver.remove_export_snapshot(ctxt, snapshot) 1887 except Exception: 1888 msg = _("Remove snapshot export failed.") 1889 LOG.exception(msg, resource=snapshot) 1890 raise exception.VolumeBackendAPIException(data=msg) 1891 1892 LOG.info("Remove snapshot export completed successfully.", 1893 resource=snapshot) 1894 1895 def accept_transfer(self, context, volume_id, new_user, new_project): 1896 # NOTE(flaper87): Verify the driver is enabled 1897 # before going forward. The exception will be caught 1898 # and the volume status updated. 1899 utils.require_driver_initialized(self.driver) 1900 1901 # NOTE(jdg): need elevated context as we haven't "given" the vol 1902 # yet 1903 volume_ref = self.db.volume_get(context.elevated(), volume_id) 1904 1905 # NOTE(jdg): Some drivers tie provider info (CHAP) to tenant 1906 # for those that do allow them to return updated model info 1907 model_update = self.driver.accept_transfer(context, 1908 volume_ref, 1909 new_user, 1910 new_project) 1911 1912 if model_update: 1913 try: 1914 self.db.volume_update(context.elevated(), 1915 volume_id, 1916 model_update) 1917 except exception.CinderException: 1918 with excutils.save_and_reraise_exception(): 1919 LOG.exception("Update volume model for " 1920 "transfer operation failed.", 1921 resource=volume_ref) 1922 self.db.volume_update(context.elevated(), 1923 volume_id, 1924 {'status': 'error'}) 1925 1926 LOG.info("Transfer volume completed successfully.", 1927 resource=volume_ref) 1928 return model_update 1929 1930 def _connect_device(self, conn): 1931 use_multipath = self.configuration.use_multipath_for_image_xfer 1932 device_scan_attempts = self.configuration.num_volume_device_scan_tries 1933 protocol = conn['driver_volume_type'] 1934 connector = utils.brick_get_connector( 1935 protocol, 1936 use_multipath=use_multipath, 1937 device_scan_attempts=device_scan_attempts, 1938 conn=conn) 1939 vol_handle = connector.connect_volume(conn['data']) 1940 1941 root_access = True 1942 1943 if not connector.check_valid_device(vol_handle['path'], root_access): 1944 if isinstance(vol_handle['path'], six.string_types): 1945 raise exception.DeviceUnavailable( 1946 path=vol_handle['path'], 1947 reason=(_("Unable to access the backend storage via the " 1948 "path %(path)s.") % 1949 {'path': vol_handle['path']})) 1950 else: 1951 raise exception.DeviceUnavailable( 1952 path=None, 1953 reason=(_("Unable to access the backend storage via file " 1954 "handle."))) 1955 1956 return {'conn': conn, 'device': vol_handle, 'connector': connector} 1957 1958 def _attach_volume(self, ctxt, volume, properties, remote=False, 1959 attach_encryptor=False): 1960 status = volume['status'] 1961 1962 if remote: 1963 rpcapi = volume_rpcapi.VolumeAPI() 1964 try: 1965 conn = rpcapi.initialize_connection(ctxt, volume, properties) 1966 except Exception: 1967 with excutils.save_and_reraise_exception(): 1968 LOG.error("Failed to attach volume %(vol)s.", 1969 {'vol': volume['id']}) 1970 self.db.volume_update(ctxt, volume['id'], 1971 {'status': status}) 1972 else: 1973 conn = self.initialize_connection(ctxt, volume, properties) 1974 1975 attach_info = self._connect_device(conn) 1976 try: 1977 if attach_encryptor and ( 1978 volume_types.is_encrypted(ctxt, 1979 volume.volume_type_id)): 1980 encryption = self.db.volume_encryption_metadata_get( 1981 ctxt.elevated(), volume.id) 1982 if encryption: 1983 utils.brick_attach_volume_encryptor(ctxt, 1984 attach_info, 1985 encryption) 1986 except Exception: 1987 with excutils.save_and_reraise_exception(): 1988 LOG.error("Failed to attach volume encryptor" 1989 " %(vol)s.", {'vol': volume['id']}) 1990 self._detach_volume(ctxt, attach_info, volume, properties, 1991 force=True) 1992 return attach_info 1993 1994 def _detach_volume(self, ctxt, attach_info, volume, properties, 1995 force=False, remote=False, 1996 attach_encryptor=False): 1997 connector = attach_info['connector'] 1998 if attach_encryptor and ( 1999 volume_types.is_encrypted(ctxt, 2000 volume.volume_type_id)): 2001 encryption = self.db.volume_encryption_metadata_get( 2002 ctxt.elevated(), volume.id) 2003 if encryption: 2004 utils.brick_detach_volume_encryptor(attach_info, encryption) 2005 connector.disconnect_volume(attach_info['conn']['data'], 2006 attach_info['device'], force=force) 2007 2008 if remote: 2009 rpcapi = volume_rpcapi.VolumeAPI() 2010 rpcapi.terminate_connection(ctxt, volume, properties, force=force) 2011 rpcapi.remove_export(ctxt, volume) 2012 else: 2013 try: 2014 self.terminate_connection(ctxt, volume['id'], properties, 2015 force=force) 2016 self.remove_export(ctxt, volume['id']) 2017 except Exception as err: 2018 with excutils.save_and_reraise_exception(): 2019 LOG.error('Unable to terminate volume connection: ' 2020 '%(err)s.', {'err': err}) 2021 2022 def _copy_volume_data(self, ctxt, src_vol, dest_vol, remote=None): 2023 """Copy data from src_vol to dest_vol.""" 2024 2025 LOG.debug('copy_data_between_volumes %(src)s -> %(dest)s.', 2026 {'src': src_vol['name'], 'dest': dest_vol['name']}) 2027 attach_encryptor = False 2028 # If the encryption method or key is changed, we have to 2029 # copy data through dm-crypt. 2030 if volume_types.volume_types_encryption_changed( 2031 ctxt, 2032 src_vol.volume_type_id, 2033 dest_vol.volume_type_id): 2034 attach_encryptor = True 2035 use_multipath = self.configuration.use_multipath_for_image_xfer 2036 enforce_multipath = self.configuration.enforce_multipath_for_image_xfer 2037 properties = utils.brick_get_connector_properties(use_multipath, 2038 enforce_multipath) 2039 2040 dest_remote = remote in ['dest', 'both'] 2041 dest_attach_info = self._attach_volume( 2042 ctxt, dest_vol, properties, 2043 remote=dest_remote, 2044 attach_encryptor=attach_encryptor) 2045 2046 try: 2047 src_remote = remote in ['src', 'both'] 2048 src_attach_info = self._attach_volume( 2049 ctxt, src_vol, properties, 2050 remote=src_remote, 2051 attach_encryptor=attach_encryptor) 2052 except Exception: 2053 with excutils.save_and_reraise_exception(): 2054 LOG.error("Failed to attach source volume for copy.") 2055 self._detach_volume(ctxt, dest_attach_info, dest_vol, 2056 properties, remote=dest_remote, 2057 attach_encryptor=attach_encryptor, 2058 force=True) 2059 2060 # Check the backend capabilities of migration destination host. 2061 rpcapi = volume_rpcapi.VolumeAPI() 2062 capabilities = rpcapi.get_capabilities(ctxt, 2063 dest_vol.service_topic_queue, 2064 False) 2065 sparse_copy_volume = bool(capabilities and 2066 capabilities.get('sparse_copy_volume', 2067 False)) 2068 2069 try: 2070 size_in_mb = int(src_vol['size']) * units.Ki # vol size is in GB 2071 vol_utils.copy_volume(src_attach_info['device']['path'], 2072 dest_attach_info['device']['path'], 2073 size_in_mb, 2074 self.configuration.volume_dd_blocksize, 2075 sparse=sparse_copy_volume) 2076 except Exception: 2077 with excutils.save_and_reraise_exception(): 2078 LOG.error("Failed to copy volume %(src)s to %(dest)s.", 2079 {'src': src_vol['id'], 'dest': dest_vol['id']}) 2080 finally: 2081 try: 2082 self._detach_volume(ctxt, dest_attach_info, dest_vol, 2083 properties, force=True, 2084 remote=dest_remote, 2085 attach_encryptor=attach_encryptor) 2086 finally: 2087 self._detach_volume(ctxt, src_attach_info, src_vol, 2088 properties, force=True, 2089 remote=src_remote, 2090 attach_encryptor=attach_encryptor) 2091 2092 def _migrate_volume_generic(self, ctxt, volume, backend, new_type_id): 2093 rpcapi = volume_rpcapi.VolumeAPI() 2094 2095 # Create new volume on remote host 2096 tmp_skip = {'snapshot_id', 'source_volid'} 2097 skip = {'host', 'cluster_name', 'availability_zone'} 2098 skip.update(tmp_skip) 2099 skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES) 2100 2101 new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip} 2102 if new_type_id: 2103 new_vol_values['volume_type_id'] = new_type_id 2104 if volume_types.volume_types_encryption_changed( 2105 ctxt, volume.volume_type_id, new_type_id): 2106 encryption_key_id = vol_utils.create_encryption_key( 2107 ctxt, self.key_manager, new_type_id) 2108 new_vol_values['encryption_key_id'] = encryption_key_id 2109 2110 dst_service = self._get_service(backend['host']) 2111 new_volume = objects.Volume( 2112 context=ctxt, 2113 host=backend['host'], 2114 availability_zone=dst_service.availability_zone, 2115 cluster_name=backend.get('cluster_name'), 2116 status='creating', 2117 attach_status=fields.VolumeAttachStatus.DETACHED, 2118 migration_status='target:%s' % volume['id'], 2119 **new_vol_values 2120 ) 2121 new_volume.create() 2122 rpcapi.create_volume(ctxt, new_volume, None, None, 2123 allow_reschedule=False) 2124 2125 # Wait for new_volume to become ready 2126 starttime = time.time() 2127 deadline = starttime + CONF.migration_create_volume_timeout_secs 2128 2129 new_volume.refresh() 2130 tries = 0 2131 while new_volume.status != 'available': 2132 tries += 1 2133 now = time.time() 2134 if new_volume.status == 'error': 2135 msg = _("failed to create new_volume on destination") 2136 self._clean_temporary_volume(ctxt, volume, 2137 new_volume, 2138 clean_db_only=True) 2139 raise exception.VolumeMigrationFailed(reason=msg) 2140 elif now > deadline: 2141 msg = _("timeout creating new_volume on destination") 2142 self._clean_temporary_volume(ctxt, volume, 2143 new_volume, 2144 clean_db_only=True) 2145 raise exception.VolumeMigrationFailed(reason=msg) 2146 else: 2147 time.sleep(tries ** 2) 2148 new_volume.refresh() 2149 2150 # Set skipped value to avoid calling 2151 # function except for _create_raw_volume 2152 tmp_skipped_values = {k: volume[k] for k in tmp_skip if volume.get(k)} 2153 if tmp_skipped_values: 2154 new_volume.update(tmp_skipped_values) 2155 new_volume.save() 2156 2157 # Copy the source volume to the destination volume 2158 try: 2159 attachments = volume.volume_attachment 2160 if not attachments: 2161 # Pre- and post-copy driver-specific actions 2162 self.driver.before_volume_copy(ctxt, volume, new_volume, 2163 remote='dest') 2164 self._copy_volume_data(ctxt, volume, new_volume, remote='dest') 2165 self.driver.after_volume_copy(ctxt, volume, new_volume, 2166 remote='dest') 2167 2168 # The above call is synchronous so we complete the migration 2169 self.migrate_volume_completion(ctxt, volume, new_volume, 2170 error=False) 2171 else: 2172 nova_api = compute.API() 2173 # This is an async call to Nova, which will call the completion 2174 # when it's done 2175 for attachment in attachments: 2176 instance_uuid = attachment['instance_uuid'] 2177 nova_api.update_server_volume(ctxt, instance_uuid, 2178 volume.id, 2179 new_volume.id) 2180 except Exception: 2181 with excutils.save_and_reraise_exception(): 2182 LOG.exception( 2183 "Failed to copy volume %(vol1)s to %(vol2)s", { 2184 'vol1': volume.id, 'vol2': new_volume.id}) 2185 self._clean_temporary_volume(ctxt, volume, 2186 new_volume) 2187 2188 def _clean_temporary_volume(self, ctxt, volume, new_volume, 2189 clean_db_only=False): 2190 # If we're in the migrating phase, we need to cleanup 2191 # destination volume because source volume is remaining 2192 if volume.migration_status == 'migrating': 2193 try: 2194 if clean_db_only: 2195 # The temporary volume is not created, only DB data 2196 # is created 2197 new_volume.destroy() 2198 else: 2199 # The temporary volume is already created 2200 rpcapi = volume_rpcapi.VolumeAPI() 2201 rpcapi.delete_volume(ctxt, new_volume) 2202 except exception.VolumeNotFound: 2203 LOG.info("Couldn't find the temporary volume " 2204 "%(vol)s in the database. There is no need " 2205 "to clean up this volume.", 2206 {'vol': new_volume.id}) 2207 else: 2208 # If we're in the completing phase don't delete the 2209 # destination because we may have already deleted the 2210 # source! But the migration_status in database should 2211 # be cleared to handle volume after migration failure 2212 try: 2213 new_volume.migration_status = None 2214 new_volume.save() 2215 except exception.VolumeNotFound: 2216 LOG.info("Couldn't find destination volume " 2217 "%(vol)s in the database. The entry might be " 2218 "successfully deleted during migration " 2219 "completion phase.", 2220 {'vol': new_volume.id}) 2221 2222 LOG.warning("Failed to migrate volume. The destination " 2223 "volume %(vol)s is not deleted since the " 2224 "source volume may have been deleted.", 2225 {'vol': new_volume.id}) 2226 2227 def migrate_volume_completion(self, ctxt, volume, new_volume, error=False): 2228 try: 2229 # NOTE(flaper87): Verify the driver is enabled 2230 # before going forward. The exception will be caught 2231 # and the migration status updated. 2232 utils.require_driver_initialized(self.driver) 2233 except exception.DriverNotInitialized: 2234 with excutils.save_and_reraise_exception(): 2235 volume.migration_status = 'error' 2236 volume.save() 2237 2238 # NOTE(jdg): Things get a little hairy in here and we do a lot of 2239 # things based on volume previous-status and current-status. At some 2240 # point this should all be reworked but for now we need to maintain 2241 # backward compatibility and NOT change the API so we're going to try 2242 # and make this work best we can 2243 2244 LOG.debug("migrate_volume_completion: completing migration for " 2245 "volume %(vol1)s (temporary volume %(vol2)s", 2246 {'vol1': volume.id, 'vol2': new_volume.id}) 2247 rpcapi = volume_rpcapi.VolumeAPI() 2248 2249 orig_volume_status = volume.previous_status 2250 2251 if error: 2252 LOG.info("migrate_volume_completion is cleaning up an error " 2253 "for volume %(vol1)s (temporary volume %(vol2)s", 2254 {'vol1': volume['id'], 'vol2': new_volume.id}) 2255 rpcapi.delete_volume(ctxt, new_volume) 2256 updates = {'migration_status': 'error', 2257 'status': orig_volume_status} 2258 volume.update(updates) 2259 volume.save() 2260 return volume.id 2261 2262 volume.migration_status = 'completing' 2263 volume.save() 2264 2265 volume_attachments = [] 2266 2267 # NOTE(jdg): With new attach flow, we deleted the attachment, so the 2268 # original volume should now be listed as available, we still need to 2269 # do the magic swappy thing of name.id etc but we're done with the 2270 # original attachment record 2271 2272 # In the "old flow" at this point the orig_volume_status will be in-use 2273 # and the current status will be retyping. This is sort of a 2274 # misleading deal, because Nova has already called terminate 2275 # connection 2276 2277 # New Attach Flow, Nova has gone ahead and deleted the attachemnt, this 2278 # is the source/original volume, we've already migrated the data, we're 2279 # basically done with it at this point. We don't need to issue the 2280 # detach to toggle the status 2281 if orig_volume_status == 'in-use' and volume.status != 'available': 2282 for attachment in volume.volume_attachment: 2283 # Save the attachments the volume currently have 2284 volume_attachments.append(attachment) 2285 try: 2286 self.detach_volume(ctxt, volume.id, attachment.id) 2287 except Exception as ex: 2288 LOG.error("Detach migration source volume " 2289 "%(volume.id)s from attachment " 2290 "%(attachment.id)s failed: %(err)s", 2291 {'err': ex, 2292 'volume.id': volume.id, 2293 'attachment.id': attachment.id}, 2294 resource=volume) 2295 2296 # Give driver (new_volume) a chance to update things as needed 2297 # after a successful migration. 2298 # Note this needs to go through rpc to the host of the new volume 2299 # the current host and driver object is for the "existing" volume. 2300 rpcapi.update_migrated_volume(ctxt, volume, new_volume, 2301 orig_volume_status) 2302 volume.refresh() 2303 new_volume.refresh() 2304 2305 # Swap src and dest DB records so we can continue using the src id and 2306 # asynchronously delete the destination id 2307 updated_new = volume.finish_volume_migration(new_volume) 2308 updates = {'status': orig_volume_status, 2309 'previous_status': volume.status, 2310 'migration_status': 'success'} 2311 2312 # NOTE(jdg): With new attachment API's nova will delete the 2313 # attachment for the source volume for us before calling the 2314 # migration-completion, now we just need to do the swapping on the 2315 # volume record, but don't jack with the attachments other than 2316 # updating volume_id 2317 2318 # In the old flow at this point the volumes are in attaching and 2319 # deleting status (dest/new is deleting, but we've done our magic 2320 # swappy thing so it's a bit confusing, but it does unwind properly 2321 # when you step through it) 2322 2323 # In the new flow we simlified this and we don't need it, instead of 2324 # doing a bunch of swapping we just do attachment-create/delete on the 2325 # nova side, and then here we just do the ID swaps that are necessary 2326 # to maintain the old beahvior 2327 2328 # Restore the attachments for old flow use-case 2329 if orig_volume_status == 'in-use' and volume.status in ['available', 2330 'reserved', 2331 'attaching']: 2332 for attachment in volume_attachments: 2333 LOG.debug('Re-attaching: %s', attachment) 2334 # This is just a db state toggle, the volume is actually 2335 # already attach and in-use, new attachment flow won't allow 2336 # this 2337 rpcapi.attach_volume(ctxt, volume, 2338 attachment.instance_uuid, 2339 attachment.attached_host, 2340 attachment.mountpoint, 2341 attachment.attach_mode or 'rw') 2342 # At this point we now have done almost all of our swapping and 2343 # state-changes. The target volume is now marked back to 2344 # "in-use" the destination/worker volume is now in deleting 2345 # state and the next steps will finish the deletion steps 2346 volume.update(updates) 2347 volume.save() 2348 2349 # Asynchronous deletion of the source volume in the back-end (now 2350 # pointed by the target volume id) 2351 try: 2352 rpcapi.delete_volume(ctxt, updated_new) 2353 except Exception as ex: 2354 LOG.error('Failed to request async delete of migration source ' 2355 'vol %(vol)s: %(err)s', 2356 {'vol': volume.id, 'err': ex}) 2357 2358 # For the new flow this is really the key part. We just use the 2359 # attachments to the worker/destination volumes that we created and 2360 # used for the libvirt migration and we'll just swap their volume_id 2361 # entries to coorespond with the volume.id swap we did 2362 for attachment in VA_LIST.get_all_by_volume_id(ctxt, updated_new.id): 2363 attachment.volume_id = volume.id 2364 attachment.save() 2365 2366 # Phewww.. that was easy! Once we get to a point where the old attach 2367 # flow can go away we really should rewrite all of this. 2368 LOG.info("Complete-Migrate volume completed successfully.", 2369 resource=volume) 2370 return volume.id 2371 2372 def migrate_volume(self, ctxt, volume, host, force_host_copy=False, 2373 new_type_id=None): 2374 """Migrate the volume to the specified host (called on source host).""" 2375 try: 2376 # NOTE(flaper87): Verify the driver is enabled 2377 # before going forward. The exception will be caught 2378 # and the migration status updated. 2379 utils.require_driver_initialized(self.driver) 2380 except exception.DriverNotInitialized: 2381 with excutils.save_and_reraise_exception(): 2382 volume.migration_status = 'error' 2383 volume.save() 2384 2385 model_update = None 2386 moved = False 2387 2388 status_update = None 2389 if volume.status in ('retyping', 'maintenance'): 2390 status_update = {'status': volume.previous_status} 2391 2392 volume.migration_status = 'migrating' 2393 volume.save() 2394 if not force_host_copy and new_type_id is None: 2395 try: 2396 LOG.debug("Issue driver.migrate_volume.", resource=volume) 2397 moved, model_update = self.driver.migrate_volume(ctxt, 2398 volume, 2399 host) 2400 if moved: 2401 dst_service = self._get_service(host['host']) 2402 updates = { 2403 'host': host['host'], 2404 'cluster_name': host.get('cluster_name'), 2405 'migration_status': 'success', 2406 'availability_zone': dst_service.availability_zone, 2407 'previous_status': volume.status, 2408 } 2409 if status_update: 2410 updates.update(status_update) 2411 if model_update: 2412 updates.update(model_update) 2413 volume.update(updates) 2414 volume.save() 2415 except Exception: 2416 with excutils.save_and_reraise_exception(): 2417 updates = {'migration_status': 'error'} 2418 if status_update: 2419 updates.update(status_update) 2420 volume.update(updates) 2421 volume.save() 2422 if not moved: 2423 try: 2424 self._migrate_volume_generic(ctxt, volume, host, new_type_id) 2425 except Exception: 2426 with excutils.save_and_reraise_exception(): 2427 updates = {'migration_status': 'error'} 2428 if status_update: 2429 updates.update(status_update) 2430 volume.update(updates) 2431 volume.save() 2432 LOG.info("Migrate volume completed successfully.", 2433 resource=volume) 2434 2435 def _report_driver_status(self, context): 2436 # It's possible during live db migration that the self.service_uuid 2437 # value isn't set (we didn't restart services), so we'll go ahead 2438 # and make this a part of the service periodic 2439 if not self.service_uuid: 2440 # We hack this with a try/except for unit tests temporarily 2441 try: 2442 service = self._get_service() 2443 self.service_uuid = service.uuid 2444 except exception.ServiceNotFound: 2445 LOG.warning("Attempt to update service_uuid " 2446 "resulted in a Service NotFound " 2447 "exception, service_uuid field on " 2448 "volumes will be NULL.") 2449 2450 if not self.driver.initialized: 2451 if self.driver.configuration.config_group is None: 2452 config_group = '' 2453 else: 2454 config_group = ('(config name %s)' % 2455 self.driver.configuration.config_group) 2456 2457 LOG.warning("Update driver status failed: %(config_group)s " 2458 "is uninitialized.", 2459 {'config_group': config_group}, 2460 resource={'type': 'driver', 2461 'id': self.driver.__class__.__name__}) 2462 else: 2463 volume_stats = self.driver.get_volume_stats(refresh=True) 2464 if self.extra_capabilities: 2465 volume_stats.update(self.extra_capabilities) 2466 if volume_stats: 2467 2468 # NOTE(xyang): If driver reports replication_status to be 2469 # 'error' in volume_stats, get model updates from driver 2470 # and update db 2471 if volume_stats.get('replication_status') == ( 2472 fields.ReplicationStatus.ERROR): 2473 filters = self._get_cluster_or_host_filters() 2474 groups = objects.GroupList.get_all_replicated( 2475 context, filters=filters) 2476 group_model_updates, volume_model_updates = ( 2477 self.driver.get_replication_error_status(context, 2478 groups)) 2479 for grp_update in group_model_updates: 2480 try: 2481 grp_obj = objects.Group.get_by_id( 2482 context, grp_update['group_id']) 2483 grp_obj.update(grp_update) 2484 grp_obj.save() 2485 except exception.GroupNotFound: 2486 # Group may be deleted already. Log a warning 2487 # and continue. 2488 LOG.warning("Group %(grp)s not found while " 2489 "updating driver status.", 2490 {'grp': grp_update['group_id']}, 2491 resource={ 2492 'type': 'group', 2493 'id': grp_update['group_id']}) 2494 for vol_update in volume_model_updates: 2495 try: 2496 vol_obj = objects.Volume.get_by_id( 2497 context, vol_update['volume_id']) 2498 vol_obj.update(vol_update) 2499 vol_obj.save() 2500 except exception.VolumeNotFound: 2501 # Volume may be deleted already. Log a warning 2502 # and continue. 2503 LOG.warning("Volume %(vol)s not found while " 2504 "updating driver status.", 2505 {'vol': vol_update['volume_id']}, 2506 resource={ 2507 'type': 'volume', 2508 'id': vol_update['volume_id']}) 2509 2510 # Append volume stats with 'allocated_capacity_gb' 2511 self._append_volume_stats(volume_stats) 2512 2513 # Append filter and goodness function if needed 2514 volume_stats = ( 2515 self._append_filter_goodness_functions(volume_stats)) 2516 2517 # queue it to be sent to the Schedulers. 2518 self.update_service_capabilities(volume_stats) 2519 2520 def _append_volume_stats(self, vol_stats): 2521 pools = vol_stats.get('pools', None) 2522 if pools: 2523 if isinstance(pools, list): 2524 for pool in pools: 2525 pool_name = pool['pool_name'] 2526 try: 2527 pool_stats = self.stats['pools'][pool_name] 2528 except KeyError: 2529 # Pool not found in volume manager 2530 pool_stats = dict(allocated_capacity_gb=0) 2531 2532 pool.update(pool_stats) 2533 else: 2534 raise exception.ProgrammingError( 2535 reason='Pools stats reported by the driver are not ' 2536 'reported in a list') 2537 # For drivers that are not reporting their stats by pool we will use 2538 # the data from the special fixed pool created by 2539 # _count_allocated_capacity. 2540 elif self.stats.get('pools'): 2541 vol_stats.update(next(iter(self.stats['pools'].values()))) 2542 # This is a special subcase of the above no pool case that happens when 2543 # we don't have any volumes yet. 2544 else: 2545 vol_stats.update(self.stats) 2546 vol_stats.pop('pools', None) 2547 2548 def _append_filter_goodness_functions(self, volume_stats): 2549 """Returns volume_stats updated as needed.""" 2550 2551 # Append filter_function if needed 2552 if 'filter_function' not in volume_stats: 2553 volume_stats['filter_function'] = ( 2554 self.driver.get_filter_function()) 2555 2556 # Append goodness_function if needed 2557 if 'goodness_function' not in volume_stats: 2558 volume_stats['goodness_function'] = ( 2559 self.driver.get_goodness_function()) 2560 2561 return volume_stats 2562 2563 @periodic_task.periodic_task 2564 def publish_service_capabilities(self, context): 2565 """Collect driver status and then publish.""" 2566 self._report_driver_status(context) 2567 self._publish_service_capabilities(context) 2568 2569 def _notify_about_volume_usage(self, 2570 context, 2571 volume, 2572 event_suffix, 2573 extra_usage_info=None): 2574 vol_utils.notify_about_volume_usage( 2575 context, volume, event_suffix, 2576 extra_usage_info=extra_usage_info, host=self.host) 2577 2578 def _notify_about_snapshot_usage(self, 2579 context, 2580 snapshot, 2581 event_suffix, 2582 extra_usage_info=None): 2583 vol_utils.notify_about_snapshot_usage( 2584 context, snapshot, event_suffix, 2585 extra_usage_info=extra_usage_info, host=self.host) 2586 2587 def _notify_about_group_usage(self, 2588 context, 2589 group, 2590 event_suffix, 2591 volumes=None, 2592 extra_usage_info=None): 2593 vol_utils.notify_about_group_usage( 2594 context, group, event_suffix, 2595 extra_usage_info=extra_usage_info, host=self.host) 2596 2597 if not volumes: 2598 volumes = self.db.volume_get_all_by_generic_group( 2599 context, group.id) 2600 if volumes: 2601 for volume in volumes: 2602 vol_utils.notify_about_volume_usage( 2603 context, volume, event_suffix, 2604 extra_usage_info=extra_usage_info, host=self.host) 2605 2606 def _notify_about_group_snapshot_usage(self, 2607 context, 2608 group_snapshot, 2609 event_suffix, 2610 snapshots=None, 2611 extra_usage_info=None): 2612 vol_utils.notify_about_group_snapshot_usage( 2613 context, group_snapshot, event_suffix, 2614 extra_usage_info=extra_usage_info, host=self.host) 2615 2616 if not snapshots: 2617 snapshots = objects.SnapshotList.get_all_for_group_snapshot( 2618 context, group_snapshot.id) 2619 if snapshots: 2620 for snapshot in snapshots: 2621 vol_utils.notify_about_snapshot_usage( 2622 context, snapshot, event_suffix, 2623 extra_usage_info=extra_usage_info, host=self.host) 2624 2625 def extend_volume(self, context, volume, new_size, reservations): 2626 try: 2627 # NOTE(flaper87): Verify the driver is enabled 2628 # before going forward. The exception will be caught 2629 # and the volume status updated. 2630 utils.require_driver_initialized(self.driver) 2631 except exception.DriverNotInitialized: 2632 with excutils.save_and_reraise_exception(): 2633 volume.status = 'error_extending' 2634 volume.save() 2635 2636 project_id = volume.project_id 2637 size_increase = (int(new_size)) - volume.size 2638 self._notify_about_volume_usage(context, volume, "resize.start") 2639 try: 2640 self.driver.extend_volume(volume, new_size) 2641 except exception.TargetUpdateFailed: 2642 # We just want to log this but continue on with quota commit 2643 LOG.warning('Volume extended but failed to update target.') 2644 except Exception: 2645 LOG.exception("Extend volume failed.", 2646 resource=volume) 2647 self.message_api.create( 2648 context, 2649 message_field.Action.EXTEND_VOLUME, 2650 resource_uuid=volume.id, 2651 detail=message_field.Detail.DRIVER_FAILED_EXTEND) 2652 try: 2653 self.db.volume_update(context, volume.id, 2654 {'status': 'error_extending'}) 2655 raise exception.CinderException(_("Volume %s: Error trying " 2656 "to extend volume") % 2657 volume.id) 2658 finally: 2659 QUOTAS.rollback(context, reservations, project_id=project_id) 2660 return 2661 2662 QUOTAS.commit(context, reservations, project_id=project_id) 2663 2664 attachments = volume.volume_attachment 2665 if not attachments: 2666 orig_volume_status = 'available' 2667 else: 2668 orig_volume_status = 'in-use' 2669 2670 volume.update({'size': int(new_size), 'status': orig_volume_status}) 2671 volume.save() 2672 2673 if orig_volume_status == 'in-use': 2674 nova_api = compute.API() 2675 instance_uuids = [attachment.instance_uuid 2676 for attachment in attachments] 2677 nova_api.extend_volume(context, instance_uuids, volume.id) 2678 2679 pool = vol_utils.extract_host(volume.host, 'pool') 2680 if pool is None: 2681 # Legacy volume, put them into default pool 2682 pool = self.driver.configuration.safe_get( 2683 'volume_backend_name') or vol_utils.extract_host( 2684 volume.host, 'pool', True) 2685 2686 try: 2687 self.stats['pools'][pool]['allocated_capacity_gb'] += size_increase 2688 except KeyError: 2689 self.stats['pools'][pool] = dict( 2690 allocated_capacity_gb=size_increase) 2691 2692 self._notify_about_volume_usage( 2693 context, volume, "resize.end", 2694 extra_usage_info={'size': int(new_size)}) 2695 LOG.info("Extend volume completed successfully.", 2696 resource=volume) 2697 2698 def _is_our_backend(self, host, cluster_name): 2699 return ((not cluster_name and 2700 vol_utils.hosts_are_equivalent(self.driver.host, host)) or 2701 (cluster_name and 2702 vol_utils.hosts_are_equivalent(self.driver.cluster_name, 2703 cluster_name))) 2704 2705 def retype(self, context, volume, new_type_id, host, 2706 migration_policy='never', reservations=None, 2707 old_reservations=None): 2708 2709 def _retype_error(context, volume, old_reservations, 2710 new_reservations, status_update): 2711 try: 2712 volume.update(status_update) 2713 volume.save() 2714 finally: 2715 QUOTAS.rollback(context, old_reservations) 2716 QUOTAS.rollback(context, new_reservations) 2717 2718 status_update = {'status': volume.previous_status} 2719 if context.project_id != volume.project_id: 2720 project_id = volume.project_id 2721 else: 2722 project_id = context.project_id 2723 2724 try: 2725 # NOTE(flaper87): Verify the driver is enabled 2726 # before going forward. The exception will be caught 2727 # and the volume status updated. 2728 utils.require_driver_initialized(self.driver) 2729 except exception.DriverNotInitialized: 2730 with excutils.save_and_reraise_exception(): 2731 # NOTE(flaper87): Other exceptions in this method don't 2732 # set the volume status to error. Should that be done 2733 # here? Setting the volume back to it's original status 2734 # for now. 2735 volume.update(status_update) 2736 volume.save() 2737 2738 # We already got the new reservations 2739 new_reservations = reservations 2740 2741 # If volume types have the same contents, no need to do anything. 2742 # Use the admin contex to be able to access volume extra_specs 2743 retyped = False 2744 diff, all_equal = volume_types.volume_types_diff( 2745 context.elevated(), volume.volume_type_id, new_type_id) 2746 if all_equal: 2747 retyped = True 2748 2749 # Call driver to try and change the type 2750 retype_model_update = None 2751 2752 # NOTE(jdg): Check to see if the destination host or cluster (depending 2753 # if it's the volume is in a clustered backend or not) is the same as 2754 # the current. If it's not don't call the driver.retype method, 2755 # otherwise drivers that implement retype may report success, but it's 2756 # invalid in the case of a migrate. 2757 2758 # We assume that those that support pools do this internally 2759 # so we strip off the pools designation 2760 2761 if (not retyped and 2762 not diff.get('encryption') and 2763 self._is_our_backend(host['host'], host.get('cluster_name'))): 2764 try: 2765 new_type = volume_types.get_volume_type(context.elevated(), 2766 new_type_id) 2767 with volume.obj_as_admin(): 2768 ret = self.driver.retype(context, 2769 volume, 2770 new_type, 2771 diff, 2772 host) 2773 # Check if the driver retype provided a model update or 2774 # just a retype indication 2775 if type(ret) == tuple: 2776 retyped, retype_model_update = ret 2777 else: 2778 retyped = ret 2779 2780 if retyped: 2781 LOG.info("Volume %s: retyped successfully.", volume.id) 2782 except Exception: 2783 retyped = False 2784 LOG.exception("Volume %s: driver error when trying to " 2785 "retype, falling back to generic " 2786 "mechanism.", volume.id) 2787 2788 # We could not change the type, so we need to migrate the volume, where 2789 # the destination volume will be of the new type 2790 if not retyped: 2791 if migration_policy == 'never': 2792 _retype_error(context, volume, old_reservations, 2793 new_reservations, status_update) 2794 msg = _("Retype requires migration but is not allowed.") 2795 raise exception.VolumeMigrationFailed(reason=msg) 2796 2797 snaps = objects.SnapshotList.get_all_for_volume(context, 2798 volume.id) 2799 if snaps: 2800 _retype_error(context, volume, old_reservations, 2801 new_reservations, status_update) 2802 msg = _("Volume must not have snapshots.") 2803 LOG.error(msg) 2804 raise exception.InvalidVolume(reason=msg) 2805 2806 # Don't allow volume with replicas to be migrated 2807 rep_status = volume.replication_status 2808 if(rep_status is not None and rep_status not in 2809 [fields.ReplicationStatus.DISABLED, 2810 fields.ReplicationStatus.NOT_CAPABLE]): 2811 _retype_error(context, volume, old_reservations, 2812 new_reservations, status_update) 2813 msg = _("Volume must not be replicated.") 2814 LOG.error(msg) 2815 raise exception.InvalidVolume(reason=msg) 2816 2817 volume.migration_status = 'starting' 2818 volume.save() 2819 2820 try: 2821 self.migrate_volume(context, volume, host, 2822 new_type_id=new_type_id) 2823 except Exception: 2824 with excutils.save_and_reraise_exception(): 2825 _retype_error(context, volume, old_reservations, 2826 new_reservations, status_update) 2827 else: 2828 model_update = {'volume_type_id': new_type_id, 2829 'host': host['host'], 2830 'cluster_name': host.get('cluster_name'), 2831 'status': status_update['status']} 2832 if retype_model_update: 2833 model_update.update(retype_model_update) 2834 self._set_replication_status(diff, model_update) 2835 volume.update(model_update) 2836 volume.save() 2837 2838 if old_reservations: 2839 QUOTAS.commit(context, old_reservations, project_id=project_id) 2840 if new_reservations: 2841 QUOTAS.commit(context, new_reservations, project_id=project_id) 2842 self._notify_about_volume_usage( 2843 context, volume, "retype", 2844 extra_usage_info={'volume_type': new_type_id}) 2845 self.publish_service_capabilities(context) 2846 LOG.info("Retype volume completed successfully.", 2847 resource=volume) 2848 2849 @staticmethod 2850 def _set_replication_status(diff, model_update): 2851 """Update replication_status in model_update if it has changed.""" 2852 if not diff or model_update.get('replication_status'): 2853 return 2854 2855 diff_specs = diff.get('extra_specs', {}) 2856 replication_diff = diff_specs.get('replication_enabled') 2857 2858 if replication_diff: 2859 is_replicated = vol_utils.is_replicated_str(replication_diff[1]) 2860 if is_replicated: 2861 replication_status = fields.ReplicationStatus.ENABLED 2862 else: 2863 replication_status = fields.ReplicationStatus.DISABLED 2864 model_update['replication_status'] = replication_status 2865 2866 def manage_existing(self, ctxt, volume, ref=None): 2867 vol_ref = self._run_manage_existing_flow_engine( 2868 ctxt, volume, ref) 2869 2870 self._update_stats_for_managed(vol_ref) 2871 2872 LOG.info("Manage existing volume completed successfully.", 2873 resource=vol_ref) 2874 return vol_ref.id 2875 2876 def _update_stats_for_managed(self, volume_reference): 2877 # Update volume stats 2878 pool = vol_utils.extract_host(volume_reference.host, 'pool') 2879 if pool is None: 2880 # Legacy volume, put them into default pool 2881 pool = self.driver.configuration.safe_get( 2882 'volume_backend_name') or vol_utils.extract_host( 2883 volume_reference.host, 'pool', True) 2884 2885 try: 2886 self.stats['pools'][pool]['allocated_capacity_gb'] \ 2887 += volume_reference.size 2888 except KeyError: 2889 self.stats['pools'][pool] = dict( 2890 allocated_capacity_gb=volume_reference.size) 2891 2892 def _run_manage_existing_flow_engine(self, ctxt, volume, ref): 2893 try: 2894 flow_engine = manage_existing.get_flow( 2895 ctxt, 2896 self.db, 2897 self.driver, 2898 self.host, 2899 volume, 2900 ref, 2901 ) 2902 except Exception: 2903 msg = _("Failed to create manage_existing flow.") 2904 LOG.exception(msg, resource={'type': 'volume', 'id': volume.id}) 2905 raise exception.CinderException(msg) 2906 2907 with flow_utils.DynamicLogListener(flow_engine, logger=LOG): 2908 flow_engine.run() 2909 2910 # Fetch created volume from storage 2911 vol_ref = flow_engine.storage.fetch('volume') 2912 2913 return vol_ref 2914 2915 def _get_cluster_or_host_filters(self): 2916 if self.cluster: 2917 filters = {'cluster_name': self.cluster} 2918 else: 2919 filters = {'host': self.host} 2920 return filters 2921 2922 def _get_my_resources(self, ctxt, ovo_class_list): 2923 filters = self._get_cluster_or_host_filters() 2924 return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters) 2925 2926 def _get_my_volumes(self, ctxt): 2927 return self._get_my_resources(ctxt, objects.VolumeList) 2928 2929 def _get_my_snapshots(self, ctxt): 2930 return self._get_my_resources(ctxt, objects.SnapshotList) 2931 2932 def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys, 2933 sort_dirs, want_objects=False): 2934 try: 2935 utils.require_driver_initialized(self.driver) 2936 except exception.DriverNotInitialized: 2937 with excutils.save_and_reraise_exception(): 2938 LOG.exception("Listing manageable volumes failed, due " 2939 "to uninitialized driver.") 2940 2941 cinder_volumes = self._get_my_volumes(ctxt) 2942 try: 2943 driver_entries = self.driver.get_manageable_volumes( 2944 cinder_volumes, marker, limit, offset, sort_keys, sort_dirs) 2945 if want_objects: 2946 driver_entries = (objects.ManageableVolumeList. 2947 from_primitives(ctxt, driver_entries)) 2948 except Exception: 2949 with excutils.save_and_reraise_exception(): 2950 LOG.exception("Listing manageable volumes failed, due " 2951 "to driver error.") 2952 return driver_entries 2953 2954 def create_group(self, context, group): 2955 """Creates the group.""" 2956 context = context.elevated() 2957 2958 # Make sure the host in the DB matches our own when clustered 2959 self._set_resource_host(group) 2960 2961 status = fields.GroupStatus.AVAILABLE 2962 model_update = None 2963 2964 self._notify_about_group_usage(context, group, "create.start") 2965 2966 try: 2967 utils.require_driver_initialized(self.driver) 2968 2969 LOG.info("Group %s: creating", group.name) 2970 2971 try: 2972 model_update = self.driver.create_group(context, group) 2973 except NotImplementedError: 2974 if not group_types.is_default_cgsnapshot_type( 2975 group.group_type_id): 2976 model_update = self._create_group_generic(context, group) 2977 else: 2978 cg, __ = self._convert_group_to_cg(group, []) 2979 model_update = self.driver.create_consistencygroup( 2980 context, cg) 2981 2982 if model_update: 2983 if (model_update['status'] == 2984 fields.GroupStatus.ERROR): 2985 msg = (_('Create group failed.')) 2986 LOG.error(msg, 2987 resource={'type': 'group', 2988 'id': group.id}) 2989 raise exception.VolumeDriverException(message=msg) 2990 else: 2991 group.update(model_update) 2992 group.save() 2993 except Exception: 2994 with excutils.save_and_reraise_exception(): 2995 group.status = fields.GroupStatus.ERROR 2996 group.save() 2997 LOG.error("Group %s: create failed", 2998 group.name) 2999 3000 group.status = status 3001 group.created_at = timeutils.utcnow() 3002 group.save() 3003 LOG.info("Group %s: created successfully", group.name) 3004 3005 self._notify_about_group_usage(context, group, "create.end") 3006 3007 LOG.info("Create group completed successfully.", 3008 resource={'type': 'group', 3009 'id': group.id}) 3010 return group 3011 3012 def create_group_from_src(self, context, group, 3013 group_snapshot=None, source_group=None): 3014 """Creates the group from source. 3015 3016 The source can be a group snapshot or a source group. 3017 """ 3018 source_name = None 3019 snapshots = None 3020 source_vols = None 3021 try: 3022 volumes = objects.VolumeList.get_all_by_generic_group(context, 3023 group.id) 3024 if group_snapshot: 3025 try: 3026 # Check if group_snapshot still exists 3027 group_snapshot.refresh() 3028 except exception.GroupSnapshotNotFound: 3029 LOG.error("Create group from snapshot-%(snap)s failed: " 3030 "SnapshotNotFound.", 3031 {'snap': group_snapshot.id}, 3032 resource={'type': 'group', 3033 'id': group.id}) 3034 raise 3035 3036 source_name = _("snapshot-%s") % group_snapshot.id 3037 snapshots = objects.SnapshotList.get_all_for_group_snapshot( 3038 context, group_snapshot.id) 3039 for snap in snapshots: 3040 if (snap.status not in 3041 VALID_CREATE_GROUP_SRC_SNAP_STATUS): 3042 msg = (_("Cannot create group " 3043 "%(group)s because snapshot %(snap)s is " 3044 "not in a valid state. Valid states are: " 3045 "%(valid)s.") % 3046 {'group': group.id, 3047 'snap': snap['id'], 3048 'valid': VALID_CREATE_GROUP_SRC_SNAP_STATUS}) 3049 raise exception.InvalidGroup(reason=msg) 3050 3051 if source_group: 3052 try: 3053 source_group.refresh() 3054 except exception.GroupNotFound: 3055 LOG.error("Create group " 3056 "from source group-%(group)s failed: " 3057 "GroupNotFound.", 3058 {'group': source_group.id}, 3059 resource={'type': 'group', 3060 'id': group.id}) 3061 raise 3062 3063 source_name = _("group-%s") % source_group.id 3064 source_vols = objects.VolumeList.get_all_by_generic_group( 3065 context, source_group.id) 3066 for source_vol in source_vols: 3067 if (source_vol.status not in 3068 VALID_CREATE_GROUP_SRC_GROUP_STATUS): 3069 msg = (_("Cannot create group " 3070 "%(group)s because source volume " 3071 "%(source_vol)s is not in a valid " 3072 "state. Valid states are: " 3073 "%(valid)s.") % 3074 {'group': group.id, 3075 'source_vol': source_vol.id, 3076 'valid': VALID_CREATE_GROUP_SRC_GROUP_STATUS}) 3077 raise exception.InvalidGroup(reason=msg) 3078 3079 # Sort source snapshots so that they are in the same order as their 3080 # corresponding target volumes. 3081 sorted_snapshots = None 3082 if group_snapshot and snapshots: 3083 sorted_snapshots = self._sort_snapshots(volumes, snapshots) 3084 3085 # Sort source volumes so that they are in the same order as their 3086 # corresponding target volumes. 3087 sorted_source_vols = None 3088 if source_group and source_vols: 3089 sorted_source_vols = self._sort_source_vols(volumes, 3090 source_vols) 3091 3092 self._notify_about_group_usage( 3093 context, group, "create.start") 3094 3095 utils.require_driver_initialized(self.driver) 3096 3097 try: 3098 model_update, volumes_model_update = ( 3099 self.driver.create_group_from_src( 3100 context, group, volumes, group_snapshot, 3101 sorted_snapshots, source_group, sorted_source_vols)) 3102 except NotImplementedError: 3103 if not group_types.is_default_cgsnapshot_type( 3104 group.group_type_id): 3105 model_update, volumes_model_update = ( 3106 self._create_group_from_src_generic( 3107 context, group, volumes, group_snapshot, 3108 sorted_snapshots, source_group, 3109 sorted_source_vols)) 3110 else: 3111 cg, volumes = self._convert_group_to_cg( 3112 group, volumes) 3113 cgsnapshot, sorted_snapshots = ( 3114 self._convert_group_snapshot_to_cgsnapshot( 3115 group_snapshot, sorted_snapshots, context)) 3116 source_cg, sorted_source_vols = ( 3117 self._convert_group_to_cg(source_group, 3118 sorted_source_vols)) 3119 model_update, volumes_model_update = ( 3120 self.driver.create_consistencygroup_from_src( 3121 context, cg, volumes, cgsnapshot, 3122 sorted_snapshots, source_cg, sorted_source_vols)) 3123 self._remove_cgsnapshot_id_from_snapshots(sorted_snapshots) 3124 self._remove_consistencygroup_id_from_volumes(volumes) 3125 self._remove_consistencygroup_id_from_volumes( 3126 sorted_source_vols) 3127 3128 if volumes_model_update: 3129 for update in volumes_model_update: 3130 self.db.volume_update(context, update['id'], update) 3131 3132 if model_update: 3133 group.update(model_update) 3134 group.save() 3135 3136 except Exception: 3137 with excutils.save_and_reraise_exception(): 3138 group.status = fields.GroupStatus.ERROR 3139 group.save() 3140 LOG.error("Create group " 3141 "from source %(source)s failed.", 3142 {'source': source_name}, 3143 resource={'type': 'group', 3144 'id': group.id}) 3145 # Update volume status to 'error' as well. 3146 self._remove_consistencygroup_id_from_volumes(volumes) 3147 for vol in volumes: 3148 vol.status = 'error' 3149 vol.save() 3150 3151 now = timeutils.utcnow() 3152 status = 'available' 3153 for vol in volumes: 3154 update = {'status': status, 'created_at': now} 3155 self._update_volume_from_src(context, vol, update, group=group) 3156 self._update_allocated_capacity(vol) 3157 3158 group.status = status 3159 group.created_at = now 3160 group.save() 3161 3162 self._notify_about_group_usage( 3163 context, group, "create.end") 3164 LOG.info("Create group " 3165 "from source-%(source)s completed successfully.", 3166 {'source': source_name}, 3167 resource={'type': 'group', 3168 'id': group.id}) 3169 return group 3170 3171 def _create_group_from_src_generic(self, context, group, volumes, 3172 group_snapshot=None, snapshots=None, 3173 source_group=None, source_vols=None): 3174 """Creates a group from source. 3175 3176 :param context: the context of the caller. 3177 :param group: the Group object to be created. 3178 :param volumes: a list of volume objects in the group. 3179 :param group_snapshot: the GroupSnapshot object as source. 3180 :param snapshots: a list of snapshot objects in group_snapshot. 3181 :param source_group: the Group object as source. 3182 :param source_vols: a list of volume objects in the source_group. 3183 :returns: model_update, volumes_model_update 3184 """ 3185 model_update = {'status': 'available'} 3186 volumes_model_update = [] 3187 for vol in volumes: 3188 if snapshots: 3189 for snapshot in snapshots: 3190 if vol.snapshot_id == snapshot.id: 3191 vol_model_update = {'id': vol.id} 3192 try: 3193 driver_update = ( 3194 self.driver.create_volume_from_snapshot( 3195 vol, snapshot)) 3196 if driver_update: 3197 driver_update.pop('id', None) 3198 vol_model_update.update(driver_update) 3199 if 'status' not in vol_model_update: 3200 vol_model_update['status'] = 'available' 3201 except Exception: 3202 vol_model_update['status'] = 'error' 3203 model_update['status'] = 'error' 3204 volumes_model_update.append(vol_model_update) 3205 break 3206 elif source_vols: 3207 for source_vol in source_vols: 3208 if vol.source_volid == source_vol.id: 3209 vol_model_update = {'id': vol.id} 3210 try: 3211 driver_update = self.driver.create_cloned_volume( 3212 vol, source_vol) 3213 if driver_update: 3214 driver_update.pop('id', None) 3215 vol_model_update.update(driver_update) 3216 if 'status' not in vol_model_update: 3217 vol_model_update['status'] = 'available' 3218 except Exception: 3219 vol_model_update['status'] = 'error' 3220 model_update['status'] = 'error' 3221 volumes_model_update.append(vol_model_update) 3222 break 3223 3224 return model_update, volumes_model_update 3225 3226 def _sort_snapshots(self, volumes, snapshots): 3227 # Sort source snapshots so that they are in the same order as their 3228 # corresponding target volumes. Each source snapshot in the snapshots 3229 # list should have a corresponding target volume in the volumes list. 3230 if not volumes or not snapshots or len(volumes) != len(snapshots): 3231 msg = _("Input volumes or snapshots are invalid.") 3232 LOG.error(msg) 3233 raise exception.InvalidInput(reason=msg) 3234 3235 sorted_snapshots = [] 3236 for vol in volumes: 3237 found_snaps = [snap for snap in snapshots 3238 if snap['id'] == vol['snapshot_id']] 3239 if not found_snaps: 3240 LOG.error("Source snapshot cannot be found for target " 3241 "volume %(volume_id)s.", 3242 {'volume_id': vol['id']}) 3243 raise exception.SnapshotNotFound( 3244 snapshot_id=vol['snapshot_id']) 3245 sorted_snapshots.extend(found_snaps) 3246 3247 return sorted_snapshots 3248 3249 def _sort_source_vols(self, volumes, source_vols): 3250 # Sort source volumes so that they are in the same order as their 3251 # corresponding target volumes. Each source volume in the source_vols 3252 # list should have a corresponding target volume in the volumes list. 3253 if not volumes or not source_vols or len(volumes) != len(source_vols): 3254 msg = _("Input volumes or source volumes are invalid.") 3255 LOG.error(msg) 3256 raise exception.InvalidInput(reason=msg) 3257 3258 sorted_source_vols = [] 3259 for vol in volumes: 3260 found_source_vols = [source_vol for source_vol in source_vols 3261 if source_vol['id'] == vol['source_volid']] 3262 if not found_source_vols: 3263 LOG.error("Source volumes cannot be found for target " 3264 "volume %(volume_id)s.", 3265 {'volume_id': vol['id']}) 3266 raise exception.VolumeNotFound( 3267 volume_id=vol['source_volid']) 3268 sorted_source_vols.extend(found_source_vols) 3269 3270 return sorted_source_vols 3271 3272 def _update_volume_from_src(self, context, vol, update, group=None): 3273 try: 3274 snapshot_id = vol.get('snapshot_id') 3275 source_volid = vol.get('source_volid') 3276 if snapshot_id: 3277 snapshot = objects.Snapshot.get_by_id(context, snapshot_id) 3278 orig_vref = self.db.volume_get(context, 3279 snapshot.volume_id) 3280 if orig_vref.bootable: 3281 update['bootable'] = True 3282 self.db.volume_glance_metadata_copy_to_volume( 3283 context, vol['id'], snapshot_id) 3284 if source_volid: 3285 source_vol = objects.Volume.get_by_id(context, source_volid) 3286 if source_vol.bootable: 3287 update['bootable'] = True 3288 self.db.volume_glance_metadata_copy_from_volume_to_volume( 3289 context, source_volid, vol['id']) 3290 if source_vol.multiattach: 3291 update['multiattach'] = True 3292 3293 except exception.SnapshotNotFound: 3294 LOG.error("Source snapshot %(snapshot_id)s cannot be found.", 3295 {'snapshot_id': vol['snapshot_id']}) 3296 self.db.volume_update(context, vol['id'], 3297 {'status': 'error'}) 3298 if group: 3299 group.status = fields.GroupStatus.ERROR 3300 group.save() 3301 raise 3302 except exception.VolumeNotFound: 3303 LOG.error("The source volume %(volume_id)s " 3304 "cannot be found.", 3305 {'volume_id': snapshot.volume_id}) 3306 self.db.volume_update(context, vol['id'], 3307 {'status': 'error'}) 3308 if group: 3309 group.status = fields.GroupStatus.ERROR 3310 group.save() 3311 raise 3312 except exception.CinderException as ex: 3313 LOG.error("Failed to update %(volume_id)s" 3314 " metadata using the provided snapshot" 3315 " %(snapshot_id)s metadata.", 3316 {'volume_id': vol['id'], 3317 'snapshot_id': vol['snapshot_id']}) 3318 self.db.volume_update(context, vol['id'], 3319 {'status': 'error'}) 3320 if group: 3321 group.status = fields.GroupStatus.ERROR 3322 group.save() 3323 raise exception.MetadataCopyFailure(reason=six.text_type(ex)) 3324 3325 self.db.volume_update(context, vol['id'], update) 3326 3327 def _update_allocated_capacity(self, vol, decrement=False, host=None): 3328 # Update allocated capacity in volume stats 3329 host = host or vol['host'] 3330 pool = vol_utils.extract_host(host, 'pool') 3331 if pool is None: 3332 # Legacy volume, put them into default pool 3333 pool = self.driver.configuration.safe_get( 3334 'volume_backend_name') or vol_utils.extract_host(host, 'pool', 3335 True) 3336 3337 vol_size = -vol['size'] if decrement else vol['size'] 3338 try: 3339 self.stats['pools'][pool]['allocated_capacity_gb'] += vol_size 3340 except KeyError: 3341 self.stats['pools'][pool] = dict( 3342 allocated_capacity_gb=max(vol_size, 0)) 3343 3344 def delete_group(self, context, group): 3345 """Deletes group and the volumes in the group.""" 3346 context = context.elevated() 3347 project_id = group.project_id 3348 3349 if context.project_id != group.project_id: 3350 project_id = group.project_id 3351 else: 3352 project_id = context.project_id 3353 3354 volumes = objects.VolumeList.get_all_by_generic_group( 3355 context, group.id) 3356 3357 for vol_obj in volumes: 3358 if vol_obj.attach_status == "attached": 3359 # Volume is still attached, need to detach first 3360 raise exception.VolumeAttached(volume_id=vol_obj.id) 3361 self._check_is_our_resource(vol_obj) 3362 3363 self._notify_about_group_usage( 3364 context, group, "delete.start") 3365 3366 volumes_model_update = None 3367 model_update = None 3368 try: 3369 utils.require_driver_initialized(self.driver) 3370 3371 try: 3372 model_update, volumes_model_update = ( 3373 self.driver.delete_group(context, group, volumes)) 3374 except NotImplementedError: 3375 if not group_types.is_default_cgsnapshot_type( 3376 group.group_type_id): 3377 model_update, volumes_model_update = ( 3378 self._delete_group_generic(context, group, volumes)) 3379 else: 3380 cg, volumes = self._convert_group_to_cg( 3381 group, volumes) 3382 model_update, volumes_model_update = ( 3383 self.driver.delete_consistencygroup(context, cg, 3384 volumes)) 3385 self._remove_consistencygroup_id_from_volumes(volumes) 3386 3387 if volumes_model_update: 3388 for update in volumes_model_update: 3389 # If we failed to delete a volume, make sure the 3390 # status for the group is set to error as well 3391 if (update['status'] in ['error_deleting', 'error'] 3392 and model_update['status'] not in 3393 ['error_deleting', 'error']): 3394 model_update['status'] = update['status'] 3395 self.db.volumes_update(context, volumes_model_update) 3396 3397 if model_update: 3398 if model_update['status'] in ['error_deleting', 'error']: 3399 msg = (_('Delete group failed.')) 3400 LOG.error(msg, 3401 resource={'type': 'group', 3402 'id': group.id}) 3403 raise exception.VolumeDriverException(message=msg) 3404 else: 3405 group.update(model_update) 3406 group.save() 3407 3408 except Exception: 3409 with excutils.save_and_reraise_exception(): 3410 group.status = fields.GroupStatus.ERROR 3411 group.save() 3412 # Update volume status to 'error' if driver returns 3413 # None for volumes_model_update. 3414 if not volumes_model_update: 3415 self._remove_consistencygroup_id_from_volumes(volumes) 3416 for vol_obj in volumes: 3417 vol_obj.status = 'error' 3418 vol_obj.save() 3419 3420 # Get reservations for group 3421 try: 3422 reserve_opts = {'groups': -1} 3423 grpreservations = GROUP_QUOTAS.reserve(context, 3424 project_id=project_id, 3425 **reserve_opts) 3426 except Exception: 3427 grpreservations = None 3428 LOG.exception("Delete group " 3429 "failed to update usages.", 3430 resource={'type': 'group', 3431 'id': group.id}) 3432 3433 for vol in volumes: 3434 # Get reservations for volume 3435 try: 3436 reserve_opts = {'volumes': -1, 3437 'gigabytes': -vol.size} 3438 QUOTAS.add_volume_type_opts(context, 3439 reserve_opts, 3440 vol.volume_type_id) 3441 reservations = QUOTAS.reserve(context, 3442 project_id=project_id, 3443 **reserve_opts) 3444 except Exception: 3445 reservations = None 3446 LOG.exception("Delete group " 3447 "failed to update usages.", 3448 resource={'type': 'group', 3449 'id': group.id}) 3450 3451 # Delete glance metadata if it exists 3452 self.db.volume_glance_metadata_delete_by_volume(context, vol.id) 3453 3454 vol.destroy() 3455 3456 # Commit the reservations 3457 if reservations: 3458 QUOTAS.commit(context, reservations, project_id=project_id) 3459 3460 self.stats['allocated_capacity_gb'] -= vol.size 3461 3462 if grpreservations: 3463 GROUP_QUOTAS.commit(context, grpreservations, 3464 project_id=project_id) 3465 3466 group.destroy() 3467 self._notify_about_group_usage( 3468 context, group, "delete.end") 3469 self.publish_service_capabilities(context) 3470 LOG.info("Delete group " 3471 "completed successfully.", 3472 resource={'type': 'group', 3473 'id': group.id}) 3474 3475 def _convert_group_to_cg(self, group, volumes): 3476 if not group: 3477 return None, None 3478 cg = consistencygroup.ConsistencyGroup() 3479 cg.from_group(group) 3480 for vol in volumes: 3481 vol.consistencygroup_id = vol.group_id 3482 vol.consistencygroup = cg 3483 3484 return cg, volumes 3485 3486 def _remove_consistencygroup_id_from_volumes(self, volumes): 3487 if not volumes: 3488 return 3489 for vol in volumes: 3490 vol.consistencygroup_id = None 3491 vol.consistencygroup = None 3492 3493 def _convert_group_snapshot_to_cgsnapshot(self, group_snapshot, snapshots, 3494 ctxt): 3495 if not group_snapshot: 3496 return None, None 3497 cgsnap = cgsnapshot.CGSnapshot() 3498 cgsnap.from_group_snapshot(group_snapshot) 3499 3500 # Populate consistencygroup object 3501 grp = objects.Group.get_by_id(ctxt, group_snapshot.group_id) 3502 cg, __ = self._convert_group_to_cg(grp, []) 3503 cgsnap.consistencygroup = cg 3504 3505 for snap in snapshots: 3506 snap.cgsnapshot_id = snap.group_snapshot_id 3507 snap.cgsnapshot = cgsnap 3508 3509 return cgsnap, snapshots 3510 3511 def _remove_cgsnapshot_id_from_snapshots(self, snapshots): 3512 if not snapshots: 3513 return 3514 for snap in snapshots: 3515 snap.cgsnapshot_id = None 3516 snap.cgsnapshot = None 3517 3518 def _create_group_generic(self, context, group): 3519 """Creates a group.""" 3520 # A group entry is already created in db. Just returns a status here. 3521 model_update = {'status': fields.GroupStatus.AVAILABLE, 3522 'created_at': timeutils.utcnow()} 3523 return model_update 3524 3525 def _delete_group_generic(self, context, group, volumes): 3526 """Deletes a group and volumes in the group.""" 3527 model_update = {'status': group.status} 3528 volume_model_updates = [] 3529 for volume_ref in volumes: 3530 volume_model_update = {'id': volume_ref.id} 3531 try: 3532 self.driver.remove_export(context, volume_ref) 3533 self.driver.delete_volume(volume_ref) 3534 volume_model_update['status'] = 'deleted' 3535 except exception.VolumeIsBusy: 3536 volume_model_update['status'] = 'available' 3537 except Exception: 3538 volume_model_update['status'] = 'error' 3539 model_update['status'] = fields.GroupStatus.ERROR 3540 volume_model_updates.append(volume_model_update) 3541 3542 return model_update, volume_model_updates 3543 3544 def _update_group_generic(self, context, group, 3545 add_volumes=None, remove_volumes=None): 3546 """Updates a group.""" 3547 # NOTE(xyang): The volume manager adds/removes the volume to/from the 3548 # group in the database. This default implementation does not do 3549 # anything in the backend storage. 3550 return None, None, None 3551 3552 def _collect_volumes_for_group(self, context, group, volumes, add=True): 3553 if add: 3554 valid_status = VALID_ADD_VOL_TO_GROUP_STATUS 3555 else: 3556 valid_status = VALID_REMOVE_VOL_FROM_GROUP_STATUS 3557 volumes_ref = [] 3558 if not volumes: 3559 return volumes_ref 3560 for add_vol in volumes.split(','): 3561 try: 3562 add_vol_ref = objects.Volume.get_by_id(context, add_vol) 3563 except exception.VolumeNotFound: 3564 LOG.error("Update group " 3565 "failed to %(op)s volume-%(volume_id)s: " 3566 "VolumeNotFound.", 3567 {'volume_id': add_vol_ref.id, 3568 'op': 'add' if add else 'remove'}, 3569 resource={'type': 'group', 3570 'id': group.id}) 3571 raise 3572 if add_vol_ref.status not in valid_status: 3573 msg = (_("Can not %(op)s volume %(volume_id)s to " 3574 "group %(group_id)s because volume is in an invalid " 3575 "state: %(status)s. Valid states are: %(valid)s.") % 3576 {'volume_id': add_vol_ref.id, 3577 'group_id': group.id, 3578 'status': add_vol_ref.status, 3579 'valid': valid_status, 3580 'op': 'add' if add else 'remove'}) 3581 raise exception.InvalidVolume(reason=msg) 3582 if add: 3583 self._check_is_our_resource(add_vol_ref) 3584 volumes_ref.append(add_vol_ref) 3585 return volumes_ref 3586 3587 def update_group(self, context, group, 3588 add_volumes=None, remove_volumes=None): 3589 """Updates group. 3590 3591 Update group by adding volumes to the group, 3592 or removing volumes from the group. 3593 """ 3594 3595 add_volumes_ref = self._collect_volumes_for_group(context, 3596 group, 3597 add_volumes, 3598 add=True) 3599 remove_volumes_ref = self._collect_volumes_for_group(context, 3600 group, 3601 remove_volumes, 3602 add=False) 3603 self._notify_about_group_usage( 3604 context, group, "update.start") 3605 3606 try: 3607 utils.require_driver_initialized(self.driver) 3608 3609 try: 3610 model_update, add_volumes_update, remove_volumes_update = ( 3611 self.driver.update_group( 3612 context, group, 3613 add_volumes=add_volumes_ref, 3614 remove_volumes=remove_volumes_ref)) 3615 except NotImplementedError: 3616 if not group_types.is_default_cgsnapshot_type( 3617 group.group_type_id): 3618 model_update, add_volumes_update, remove_volumes_update = ( 3619 self._update_group_generic( 3620 context, group, 3621 add_volumes=add_volumes_ref, 3622 remove_volumes=remove_volumes_ref)) 3623 else: 3624 cg, remove_volumes_ref = self._convert_group_to_cg( 3625 group, remove_volumes_ref) 3626 model_update, add_volumes_update, remove_volumes_update = ( 3627 self.driver.update_consistencygroup( 3628 context, cg, 3629 add_volumes=add_volumes_ref, 3630 remove_volumes=remove_volumes_ref)) 3631 self._remove_consistencygroup_id_from_volumes( 3632 remove_volumes_ref) 3633 3634 volumes_to_update = [] 3635 if add_volumes_update: 3636 volumes_to_update.extend(add_volumes_update) 3637 if remove_volumes_update: 3638 volumes_to_update.extend(remove_volumes_update) 3639 self.db.volumes_update(context, volumes_to_update) 3640 3641 if model_update: 3642 if model_update['status'] in ( 3643 [fields.GroupStatus.ERROR]): 3644 msg = (_('Error occurred when updating group ' 3645 '%s.') % group.id) 3646 LOG.error(msg) 3647 raise exception.VolumeDriverException(message=msg) 3648 group.update(model_update) 3649 group.save() 3650 3651 except Exception as e: 3652 with excutils.save_and_reraise_exception(): 3653 if isinstance(e, exception.VolumeDriverException): 3654 LOG.error("Error occurred in the volume driver when " 3655 "updating group %(group_id)s.", 3656 {'group_id': group.id}) 3657 else: 3658 LOG.error("Failed to update group %(group_id)s.", 3659 {'group_id': group.id}) 3660 group.status = fields.GroupStatus.ERROR 3661 group.save() 3662 for add_vol in add_volumes_ref: 3663 add_vol.status = 'error' 3664 add_vol.save() 3665 for rem_vol in remove_volumes_ref: 3666 if isinstance(e, exception.VolumeDriverException): 3667 rem_vol.consistencygroup_id = None 3668 rem_vol.consistencygroup = None 3669 rem_vol.status = 'error' 3670 rem_vol.save() 3671 3672 for add_vol in add_volumes_ref: 3673 add_vol.group_id = group.id 3674 add_vol.save() 3675 for rem_vol in remove_volumes_ref: 3676 rem_vol.group_id = None 3677 rem_vol.save() 3678 group.status = fields.GroupStatus.AVAILABLE 3679 group.save() 3680 3681 self._notify_about_group_usage( 3682 context, group, "update.end") 3683 LOG.info("Update group completed successfully.", 3684 resource={'type': 'group', 3685 'id': group.id}) 3686 3687 def create_group_snapshot(self, context, group_snapshot): 3688 """Creates the group_snapshot.""" 3689 caller_context = context 3690 context = context.elevated() 3691 3692 LOG.info("GroupSnapshot %s: creating.", group_snapshot.id) 3693 3694 snapshots = objects.SnapshotList.get_all_for_group_snapshot( 3695 context, group_snapshot.id) 3696 3697 self._notify_about_group_snapshot_usage( 3698 context, group_snapshot, "create.start") 3699 3700 snapshots_model_update = None 3701 model_update = None 3702 try: 3703 utils.require_driver_initialized(self.driver) 3704 3705 LOG.debug("Group snapshot %(grp_snap_id)s: creating.", 3706 {'grp_snap_id': group_snapshot.id}) 3707 3708 # Pass context so that drivers that want to use it, can, 3709 # but it is not a requirement for all drivers. 3710 group_snapshot.context = caller_context 3711 for snapshot in snapshots: 3712 snapshot.context = caller_context 3713 3714 try: 3715 model_update, snapshots_model_update = ( 3716 self.driver.create_group_snapshot(context, group_snapshot, 3717 snapshots)) 3718 except NotImplementedError: 3719 if not group_types.is_default_cgsnapshot_type( 3720 group_snapshot.group_type_id): 3721 model_update, snapshots_model_update = ( 3722 self._create_group_snapshot_generic( 3723 context, group_snapshot, snapshots)) 3724 else: 3725 cgsnapshot, snapshots = ( 3726 self._convert_group_snapshot_to_cgsnapshot( 3727 group_snapshot, snapshots, context)) 3728 model_update, snapshots_model_update = ( 3729 self.driver.create_cgsnapshot(context, cgsnapshot, 3730 snapshots)) 3731 self._remove_cgsnapshot_id_from_snapshots(snapshots) 3732 if snapshots_model_update: 3733 for snap_model in snapshots_model_update: 3734 # Update db for snapshot. 3735 # NOTE(xyang): snapshots is a list of snapshot objects. 3736 # snapshots_model_update should be a list of dicts. 3737 snap_id = snap_model.pop('id') 3738 snap_obj = objects.Snapshot.get_by_id(context, snap_id) 3739 snap_obj.update(snap_model) 3740 snap_obj.save() 3741 if (snap_model['status'] in [ 3742 fields.SnapshotStatus.ERROR_DELETING, 3743 fields.SnapshotStatus.ERROR] and 3744 model_update['status'] not in 3745 [fields.GroupSnapshotStatus.ERROR_DELETING, 3746 fields.GroupSnapshotStatus.ERROR]): 3747 model_update['status'] = snap_model['status'] 3748 3749 if model_update: 3750 if model_update['status'] == fields.GroupSnapshotStatus.ERROR: 3751 msg = (_('Error occurred when creating group_snapshot ' 3752 '%s.') % group_snapshot.id) 3753 LOG.error(msg) 3754 raise exception.VolumeDriverException(message=msg) 3755 3756 group_snapshot.update(model_update) 3757 group_snapshot.save() 3758 3759 except exception.CinderException: 3760 with excutils.save_and_reraise_exception(): 3761 group_snapshot.status = fields.GroupSnapshotStatus.ERROR 3762 group_snapshot.save() 3763 # Update snapshot status to 'error' if driver returns 3764 # None for snapshots_model_update. 3765 self._remove_cgsnapshot_id_from_snapshots(snapshots) 3766 if not snapshots_model_update: 3767 for snapshot in snapshots: 3768 snapshot.status = fields.SnapshotStatus.ERROR 3769 snapshot.save() 3770 3771 for snapshot in snapshots: 3772 volume_id = snapshot.volume_id 3773 snapshot_id = snapshot.id 3774 vol_obj = objects.Volume.get_by_id(context, volume_id) 3775 if vol_obj.bootable: 3776 try: 3777 self.db.volume_glance_metadata_copy_to_snapshot( 3778 context, snapshot_id, volume_id) 3779 except exception.GlanceMetadataNotFound: 3780 # If volume is not created from image, No glance metadata 3781 # would be available for that volume in 3782 # volume glance metadata table 3783 pass 3784 except exception.CinderException as ex: 3785 LOG.error("Failed updating %(snapshot_id)s" 3786 " metadata using the provided volumes" 3787 " %(volume_id)s metadata.", 3788 {'volume_id': volume_id, 3789 'snapshot_id': snapshot_id}) 3790 snapshot.status = fields.SnapshotStatus.ERROR 3791 snapshot.save() 3792 raise exception.MetadataCopyFailure( 3793 reason=six.text_type(ex)) 3794 3795 snapshot.status = fields.SnapshotStatus.AVAILABLE 3796 snapshot.progress = '100%' 3797 snapshot.save() 3798 3799 group_snapshot.status = fields.GroupSnapshotStatus.AVAILABLE 3800 group_snapshot.save() 3801 3802 LOG.info("group_snapshot %s: created successfully", 3803 group_snapshot.id) 3804 self._notify_about_group_snapshot_usage( 3805 context, group_snapshot, "create.end") 3806 return group_snapshot 3807 3808 def _create_group_snapshot_generic(self, context, group_snapshot, 3809 snapshots): 3810 """Creates a group_snapshot.""" 3811 model_update = {'status': 'available'} 3812 snapshot_model_updates = [] 3813 for snapshot in snapshots: 3814 snapshot_model_update = {'id': snapshot.id} 3815 try: 3816 driver_update = self.driver.create_snapshot(snapshot) 3817 if driver_update: 3818 driver_update.pop('id', None) 3819 snapshot_model_update.update(driver_update) 3820 if 'status' not in snapshot_model_update: 3821 snapshot_model_update['status'] = ( 3822 fields.SnapshotStatus.AVAILABLE) 3823 except Exception: 3824 snapshot_model_update['status'] = ( 3825 fields.SnapshotStatus.ERROR) 3826 model_update['status'] = 'error' 3827 snapshot_model_updates.append(snapshot_model_update) 3828 3829 return model_update, snapshot_model_updates 3830 3831 def _delete_group_snapshot_generic(self, context, group_snapshot, 3832 snapshots): 3833 """Deletes a group_snapshot.""" 3834 model_update = {'status': group_snapshot.status} 3835 snapshot_model_updates = [] 3836 for snapshot in snapshots: 3837 snapshot_model_update = {'id': snapshot.id} 3838 try: 3839 self.driver.delete_snapshot(snapshot) 3840 snapshot_model_update['status'] = ( 3841 fields.SnapshotStatus.DELETED) 3842 except exception.SnapshotIsBusy: 3843 snapshot_model_update['status'] = ( 3844 fields.SnapshotStatus.AVAILABLE) 3845 except Exception: 3846 snapshot_model_update['status'] = ( 3847 fields.SnapshotStatus.ERROR) 3848 model_update['status'] = 'error' 3849 snapshot_model_updates.append(snapshot_model_update) 3850 3851 return model_update, snapshot_model_updates 3852 3853 def delete_group_snapshot(self, context, group_snapshot): 3854 """Deletes group_snapshot.""" 3855 caller_context = context 3856 context = context.elevated() 3857 project_id = group_snapshot.project_id 3858 3859 LOG.info("group_snapshot %s: deleting", group_snapshot.id) 3860 3861 snapshots = objects.SnapshotList.get_all_for_group_snapshot( 3862 context, group_snapshot.id) 3863 3864 self._notify_about_group_snapshot_usage( 3865 context, group_snapshot, "delete.start") 3866 3867 snapshots_model_update = None 3868 model_update = None 3869 try: 3870 utils.require_driver_initialized(self.driver) 3871 3872 LOG.debug("group_snapshot %(grp_snap_id)s: deleting", 3873 {'grp_snap_id': group_snapshot.id}) 3874 3875 # Pass context so that drivers that want to use it, can, 3876 # but it is not a requirement for all drivers. 3877 group_snapshot.context = caller_context 3878 for snapshot in snapshots: 3879 snapshot.context = caller_context 3880 3881 try: 3882 model_update, snapshots_model_update = ( 3883 self.driver.delete_group_snapshot(context, group_snapshot, 3884 snapshots)) 3885 except NotImplementedError: 3886 if not group_types.is_default_cgsnapshot_type( 3887 group_snapshot.group_type_id): 3888 model_update, snapshots_model_update = ( 3889 self._delete_group_snapshot_generic( 3890 context, group_snapshot, snapshots)) 3891 else: 3892 cgsnapshot, snapshots = ( 3893 self._convert_group_snapshot_to_cgsnapshot( 3894 group_snapshot, snapshots, context)) 3895 model_update, snapshots_model_update = ( 3896 self.driver.delete_cgsnapshot(context, cgsnapshot, 3897 snapshots)) 3898 self._remove_cgsnapshot_id_from_snapshots(snapshots) 3899 3900 if snapshots_model_update: 3901 for snap_model in snapshots_model_update: 3902 # NOTE(xyang): snapshots is a list of snapshot objects. 3903 # snapshots_model_update should be a list of dicts. 3904 snap = next((item for item in snapshots if 3905 item.id == snap_model['id']), None) 3906 if snap: 3907 snap_model.pop('id') 3908 snap.update(snap_model) 3909 snap.save() 3910 3911 if (snap_model['status'] in 3912 [fields.SnapshotStatus.ERROR_DELETING, 3913 fields.SnapshotStatus.ERROR] and 3914 model_update['status'] not in 3915 ['error_deleting', 'error']): 3916 model_update['status'] = snap_model['status'] 3917 3918 if model_update: 3919 if model_update['status'] in ['error_deleting', 'error']: 3920 msg = (_('Error occurred when deleting group_snapshot ' 3921 '%s.') % group_snapshot.id) 3922 LOG.error(msg) 3923 raise exception.VolumeDriverException(message=msg) 3924 else: 3925 group_snapshot.update(model_update) 3926 group_snapshot.save() 3927 3928 except exception.CinderException: 3929 with excutils.save_and_reraise_exception(): 3930 group_snapshot.status = fields.GroupSnapshotStatus.ERROR 3931 group_snapshot.save() 3932 # Update snapshot status to 'error' if driver returns 3933 # None for snapshots_model_update. 3934 if not snapshots_model_update: 3935 self._remove_cgsnapshot_id_from_snapshots(snapshots) 3936 for snapshot in snapshots: 3937 snapshot.status = fields.SnapshotStatus.ERROR 3938 snapshot.save() 3939 3940 for snapshot in snapshots: 3941 # Get reservations 3942 try: 3943 reserve_opts = {'snapshots': -1} 3944 volume_ref = objects.Volume.get_by_id(context, 3945 snapshot.volume_id) 3946 QUOTAS.add_volume_type_opts(context, 3947 reserve_opts, 3948 volume_ref.volume_type_id) 3949 reservations = QUOTAS.reserve(context, 3950 project_id=project_id, 3951 **reserve_opts) 3952 3953 except Exception: 3954 reservations = None 3955 LOG.exception("Failed to update usages deleting snapshot") 3956 3957 self.db.volume_glance_metadata_delete_by_snapshot(context, 3958 snapshot.id) 3959 snapshot.destroy() 3960 3961 # Commit the reservations 3962 if reservations: 3963 QUOTAS.commit(context, reservations, project_id=project_id) 3964 3965 group_snapshot.destroy() 3966 LOG.info("group_snapshot %s: deleted successfully", 3967 group_snapshot.id) 3968 self._notify_about_group_snapshot_usage(context, group_snapshot, 3969 "delete.end", 3970 snapshots) 3971 3972 def update_migrated_volume(self, ctxt, volume, new_volume, volume_status): 3973 """Finalize migration process on backend device.""" 3974 model_update = None 3975 model_update_default = {'_name_id': new_volume.name_id, 3976 'provider_location': 3977 new_volume.provider_location} 3978 try: 3979 model_update = self.driver.update_migrated_volume(ctxt, 3980 volume, 3981 new_volume, 3982 volume_status) 3983 except NotImplementedError: 3984 # If update_migrated_volume is not implemented for the driver, 3985 # _name_id and provider_location will be set with the values 3986 # from new_volume. 3987 model_update = model_update_default 3988 if model_update: 3989 model_update_default.update(model_update) 3990 # Swap keys that were changed in the source so we keep their values 3991 # in the temporary volume's DB record. 3992 # Need to convert 'metadata' and 'admin_metadata' since 3993 # they are not keys of volume, their corresponding keys are 3994 # 'volume_metadata' and 'volume_admin_metadata'. 3995 model_update_new = dict() 3996 for key in model_update: 3997 if key == 'metadata': 3998 if volume.get('volume_metadata'): 3999 model_update_new[key] = { 4000 metadata['key']: metadata['value'] 4001 for metadata in volume.volume_metadata} 4002 elif key == 'admin_metadata': 4003 model_update_new[key] = { 4004 metadata['key']: metadata['value'] 4005 for metadata in volume.volume_admin_metadata} 4006 else: 4007 model_update_new[key] = volume[key] 4008 with new_volume.obj_as_admin(): 4009 new_volume.update(model_update_new) 4010 new_volume.save() 4011 with volume.obj_as_admin(): 4012 volume.update(model_update_default) 4013 volume.save() 4014 4015 # Replication V2.1 and a/a method 4016 def failover(self, context, secondary_backend_id=None): 4017 """Failover a backend to a secondary replication target. 4018 4019 Instructs a replication capable/configured backend to failover 4020 to one of it's secondary replication targets. host=None is 4021 an acceetable input, and leaves it to the driver to failover 4022 to the only configured target, or to choose a target on it's 4023 own. All of the hosts volumes will be passed on to the driver 4024 in order for it to determine the replicated volumes on the host, 4025 if needed. 4026 4027 :param context: security context 4028 :param secondary_backend_id: Specifies backend_id to fail over to 4029 """ 4030 updates = {} 4031 repl_status = fields.ReplicationStatus 4032 4033 service = self._get_service() 4034 4035 # TODO(geguileo): We should optimize these updates by doing them 4036 # directly on the DB with just 3 queries, one to change the volumes 4037 # another to change all the snapshots, and another to get replicated 4038 # volumes. 4039 4040 # Change non replicated volumes and their snapshots to error if we are 4041 # failing over, leave them as they are for failback 4042 volumes = self._get_my_volumes(context) 4043 4044 replicated_vols = [] 4045 for volume in volumes: 4046 if volume.replication_status not in (repl_status.DISABLED, 4047 repl_status.NOT_CAPABLE): 4048 replicated_vols.append(volume) 4049 elif secondary_backend_id != self.FAILBACK_SENTINEL: 4050 volume.previous_status = volume.status 4051 volume.status = 'error' 4052 volume.replication_status = repl_status.NOT_CAPABLE 4053 volume.save() 4054 4055 for snapshot in volume.snapshots: 4056 snapshot.status = fields.SnapshotStatus.ERROR 4057 snapshot.save() 4058 4059 volume_update_list = None 4060 group_update_list = None 4061 try: 4062 # For non clustered we can call v2.1 failover_host, but for 4063 # clustered we call a/a failover method. We know a/a method 4064 # exists because BaseVD class wouldn't have started if it didn't. 4065 failover = getattr(self.driver, 4066 'failover' if service.is_clustered 4067 else 'failover_host') 4068 # expected form of volume_update_list: 4069 # [{volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}}, 4070 # {volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}}] 4071 # It includes volumes in replication groups and those not in them 4072 # expected form of group_update_list: 4073 # [{group_id: <cinder-grpid>, updates: {'xxxx': xxxx....}}, 4074 # {group_id: <cinder-grpid>, updates: {'xxxx': xxxx....}}] 4075 filters = self._get_cluster_or_host_filters() 4076 groups = objects.GroupList.get_all_replicated(context, 4077 filters=filters) 4078 active_backend_id, volume_update_list, group_update_list = ( 4079 failover(context, 4080 replicated_vols, 4081 secondary_id=secondary_backend_id, 4082 groups=groups)) 4083 try: 4084 update_data = {u['volume_id']: u['updates'] 4085 for u in volume_update_list} 4086 except KeyError: 4087 msg = "Update list, doesn't include volume_id" 4088 raise exception.ProgrammingError(reason=msg) 4089 try: 4090 update_group_data = {g['group_id']: g['updates'] 4091 for g in group_update_list} 4092 except KeyError: 4093 msg = "Update list, doesn't include group_id" 4094 raise exception.ProgrammingError(reason=msg) 4095 except Exception as exc: 4096 # NOTE(jdg): Drivers need to be aware if they fail during 4097 # a failover sequence, we're expecting them to cleanup 4098 # and make sure the driver state is such that the original 4099 # backend is still set as primary as per driver memory 4100 4101 # We don't want to log the exception trace invalid replication 4102 # target 4103 if isinstance(exc, exception.InvalidReplicationTarget): 4104 log_method = LOG.error 4105 # Preserve the replication_status: Status should be failed over 4106 # if we were failing back or if we were failing over from one 4107 # secondary to another secondary. In both cases 4108 # active_backend_id will be set. 4109 if service.active_backend_id: 4110 updates['replication_status'] = repl_status.FAILED_OVER 4111 else: 4112 updates['replication_status'] = repl_status.ENABLED 4113 else: 4114 log_method = LOG.exception 4115 updates.update(disabled=True, 4116 replication_status=repl_status.FAILOVER_ERROR) 4117 4118 log_method("Error encountered during failover on host: %(host)s " 4119 "to %(backend_id)s: %(error)s", 4120 {'host': self.host, 'backend_id': secondary_backend_id, 4121 'error': exc}) 4122 # We dump the update list for manual recovery 4123 LOG.error('Failed update_list is: %s', volume_update_list) 4124 self.finish_failover(context, service, updates) 4125 return 4126 4127 if secondary_backend_id == "default": 4128 updates['replication_status'] = repl_status.ENABLED 4129 updates['active_backend_id'] = '' 4130 updates['disabled'] = service.frozen 4131 updates['disabled_reason'] = 'frozen' if service.frozen else '' 4132 else: 4133 updates['replication_status'] = repl_status.FAILED_OVER 4134 updates['active_backend_id'] = active_backend_id 4135 updates['disabled'] = True 4136 updates['disabled_reason'] = 'failed-over' 4137 4138 self.finish_failover(context, service, updates) 4139 4140 for volume in replicated_vols: 4141 update = update_data.get(volume.id, {}) 4142 if update.get('status', '') == 'error': 4143 update['replication_status'] = repl_status.FAILOVER_ERROR 4144 elif update.get('replication_status') in (None, 4145 repl_status.FAILED_OVER): 4146 update['replication_status'] = updates['replication_status'] 4147 4148 if update['replication_status'] == repl_status.FAILOVER_ERROR: 4149 update.setdefault('status', 'error') 4150 # Set all volume snapshots to error 4151 for snapshot in volume.snapshots: 4152 snapshot.status = fields.SnapshotStatus.ERROR 4153 snapshot.save() 4154 if 'status' in update: 4155 update['previous_status'] = volume.status 4156 volume.update(update) 4157 volume.save() 4158 4159 for grp in groups: 4160 update = update_group_data.get(grp.id, {}) 4161 if update.get('status', '') == 'error': 4162 update['replication_status'] = repl_status.FAILOVER_ERROR 4163 elif update.get('replication_status') in (None, 4164 repl_status.FAILED_OVER): 4165 update['replication_status'] = updates['replication_status'] 4166 4167 if update['replication_status'] == repl_status.FAILOVER_ERROR: 4168 update.setdefault('status', 'error') 4169 grp.update(update) 4170 grp.save() 4171 4172 LOG.info("Failed over to replication target successfully.") 4173 4174 # TODO(geguileo): In P - remove this 4175 failover_host = failover 4176 4177 def finish_failover(self, context, service, updates): 4178 """Completion of the failover locally or via RPC.""" 4179 # If the service is clustered, broadcast the service changes to all 4180 # volume services, including this one. 4181 if service.is_clustered: 4182 # We have to update the cluster with the same data, and we do it 4183 # before broadcasting the failover_completed RPC call to prevent 4184 # races with services that may be starting.. 4185 for key, value in updates.items(): 4186 setattr(service.cluster, key, value) 4187 service.cluster.save() 4188 rpcapi = volume_rpcapi.VolumeAPI() 4189 rpcapi.failover_completed(context, service, updates) 4190 else: 4191 service.update(updates) 4192 service.save() 4193 4194 def failover_completed(self, context, updates): 4195 """Finalize failover of this backend. 4196 4197 When a service is clustered and replicated the failover has 2 stages, 4198 one that does the failover of the volumes and another that finalizes 4199 the failover of the services themselves. 4200 4201 This method takes care of the last part and is called from the service 4202 doing the failover of the volumes after finished processing the 4203 volumes. 4204 """ 4205 service = self._get_service() 4206 service.update(updates) 4207 try: 4208 self.driver.failover_completed(context, service.active_backend_id) 4209 except Exception: 4210 msg = _('Driver reported error during replication failover ' 4211 'completion.') 4212 LOG.exception(msg) 4213 service.disabled = True 4214 service.disabled_reason = msg 4215 service.replication_status = ( 4216 fields.ReplicationStatus.ERROR) 4217 service.save() 4218 4219 def freeze_host(self, context): 4220 """Freeze management plane on this backend. 4221 4222 Basically puts the control/management plane into a 4223 Read Only state. We should handle this in the scheduler, 4224 however this is provided to let the driver know in case it 4225 needs/wants to do something specific on the backend. 4226 4227 :param context: security context 4228 """ 4229 # TODO(jdg): Return from driver? or catch? 4230 # Update status column in service entry 4231 try: 4232 self.driver.freeze_backend(context) 4233 except exception.VolumeDriverException: 4234 # NOTE(jdg): In the case of freeze, we don't really 4235 # need the backend's consent or anything, we'll just 4236 # disable the service, so we can just log this and 4237 # go about our business 4238 LOG.warning('Error encountered on Cinder backend during ' 4239 'freeze operation, service is frozen, however ' 4240 'notification to driver has failed.') 4241 4242 service = self._get_service() 4243 service.disabled = True 4244 service.disabled_reason = "frozen" 4245 service.save() 4246 LOG.info("Set backend status to frozen successfully.") 4247 return True 4248 4249 def thaw_host(self, context): 4250 """UnFreeze management plane on this backend. 4251 4252 Basically puts the control/management plane back into 4253 a normal state. We should handle this in the scheduler, 4254 however this is provided to let the driver know in case it 4255 needs/wants to do something specific on the backend. 4256 4257 :param context: security context 4258 """ 4259 4260 # TODO(jdg): Return from driver? or catch? 4261 # Update status column in service entry 4262 try: 4263 self.driver.thaw_backend(context) 4264 except exception.VolumeDriverException: 4265 # NOTE(jdg): Thaw actually matters, if this call 4266 # to the backend fails, we're stuck and can't re-enable 4267 LOG.error('Error encountered on Cinder backend during ' 4268 'thaw operation, service will remain frozen.') 4269 return False 4270 4271 service = self._get_service() 4272 service.disabled = False 4273 service.disabled_reason = "" 4274 service.save() 4275 LOG.info("Thawed backend successfully.") 4276 return True 4277 4278 def manage_existing_snapshot(self, ctxt, snapshot, ref=None): 4279 LOG.debug('manage_existing_snapshot: managing %s.', ref) 4280 try: 4281 flow_engine = manage_existing_snapshot.get_flow( 4282 ctxt, 4283 self.db, 4284 self.driver, 4285 self.host, 4286 snapshot.id, 4287 ref) 4288 except Exception: 4289 LOG.exception("Failed to create manage_existing flow: " 4290 "%(object_type)s %(object_id)s.", 4291 {'object_type': 'snapshot', 4292 'object_id': snapshot.id}) 4293 raise exception.CinderException( 4294 _("Failed to create manage existing flow.")) 4295 4296 with flow_utils.DynamicLogListener(flow_engine, logger=LOG): 4297 flow_engine.run() 4298 return snapshot.id 4299 4300 def get_manageable_snapshots(self, ctxt, marker, limit, offset, 4301 sort_keys, sort_dirs, want_objects=False): 4302 try: 4303 utils.require_driver_initialized(self.driver) 4304 except exception.DriverNotInitialized: 4305 with excutils.save_and_reraise_exception(): 4306 LOG.exception("Listing manageable snapshots failed, due " 4307 "to uninitialized driver.") 4308 4309 cinder_snapshots = self._get_my_snapshots(ctxt) 4310 try: 4311 driver_entries = self.driver.get_manageable_snapshots( 4312 cinder_snapshots, marker, limit, offset, sort_keys, sort_dirs) 4313 if want_objects: 4314 driver_entries = (objects.ManageableSnapshotList. 4315 from_primitives(ctxt, driver_entries)) 4316 except Exception: 4317 with excutils.save_and_reraise_exception(): 4318 LOG.exception("Listing manageable snapshots failed, due " 4319 "to driver error.") 4320 return driver_entries 4321 4322 def get_capabilities(self, context, discover): 4323 """Get capabilities of backend storage.""" 4324 if discover: 4325 self.driver.init_capabilities() 4326 capabilities = self.driver.capabilities 4327 LOG.debug("Obtained capabilities list: %s.", capabilities) 4328 return capabilities 4329 4330 def get_backup_device(self, ctxt, backup, want_objects=False): 4331 (backup_device, is_snapshot) = ( 4332 self.driver.get_backup_device(ctxt, backup)) 4333 secure_enabled = self.driver.secure_file_operations_enabled() 4334 backup_device_dict = {'backup_device': backup_device, 4335 'secure_enabled': secure_enabled, 4336 'is_snapshot': is_snapshot, } 4337 # TODO(sborkows): from_primitive method will be removed in O, so there 4338 # is a need to clean here then. 4339 return (objects.BackupDeviceInfo.from_primitive(backup_device_dict, 4340 ctxt) 4341 if want_objects else backup_device_dict) 4342 4343 def secure_file_operations_enabled(self, ctxt, volume): 4344 secure_enabled = self.driver.secure_file_operations_enabled() 4345 return secure_enabled 4346 4347 def _connection_create(self, ctxt, volume, attachment, connector): 4348 try: 4349 self.driver.validate_connector(connector) 4350 except exception.InvalidConnectorException as err: 4351 raise exception.InvalidInput(reason=six.text_type(err)) 4352 except Exception as err: 4353 err_msg = (_("Validate volume connection failed " 4354 "(error: %(err)s).") % {'err': six.text_type(err)}) 4355 LOG.error(err_msg, resource=volume) 4356 raise exception.VolumeBackendAPIException(data=err_msg) 4357 4358 try: 4359 model_update = self.driver.create_export(ctxt.elevated(), 4360 volume, connector) 4361 except exception.CinderException as ex: 4362 err_msg = (_("Create export for volume failed (%s).") % ex.msg) 4363 LOG.exception(err_msg, resource=volume) 4364 raise exception.VolumeBackendAPIException(data=err_msg) 4365 4366 try: 4367 if model_update: 4368 volume.update(model_update) 4369 volume.save() 4370 except exception.CinderException as ex: 4371 LOG.exception("Model update failed.", resource=volume) 4372 raise exception.ExportFailure(reason=six.text_type(ex)) 4373 4374 try: 4375 conn_info = self.driver.initialize_connection(volume, connector) 4376 except Exception as err: 4377 err_msg = (_("Driver initialize connection failed " 4378 "(error: %(err)s).") % {'err': six.text_type(err)}) 4379 LOG.exception(err_msg, resource=volume) 4380 self.driver.remove_export(ctxt.elevated(), volume) 4381 raise exception.VolumeBackendAPIException(data=err_msg) 4382 conn_info = self._parse_connection_options(ctxt, volume, conn_info) 4383 4384 # NOTE(jdg): Get rid of the nested dict (data key) 4385 conn_data = conn_info.pop('data', {}) 4386 connection_info = conn_data.copy() 4387 connection_info.update(conn_info) 4388 values = {'volume_id': volume.id, 4389 'attach_status': 'attaching', 4390 'connector': jsonutils.dumps(connector)} 4391 4392 # TODO(mriedem): Use VolumeAttachment.save() here. 4393 self.db.volume_attachment_update(ctxt, attachment.id, values) 4394 4395 connection_info['attachment_id'] = attachment.id 4396 return connection_info 4397 4398 def attachment_update(self, 4399 context, 4400 vref, 4401 connector, 4402 attachment_id): 4403 """Update/Finalize an attachment. 4404 4405 This call updates a valid attachment record to associate with a volume 4406 and provide the caller with the proper connection info. Note that 4407 this call requires an `attachment_ref`. It's expected that prior to 4408 this call that the volume and an attachment UUID has been reserved. 4409 4410 param: vref: Volume object to create attachment for 4411 param: connector: Connector object to use for attachment creation 4412 param: attachment_ref: ID of the attachment record to update 4413 """ 4414 4415 mode = connector.get('mode', 'rw') 4416 self._notify_about_volume_usage(context, vref, 'attach.start') 4417 attachment_ref = objects.VolumeAttachment.get_by_id(context, 4418 attachment_id) 4419 connection_info = self._connection_create(context, 4420 vref, 4421 attachment_ref, 4422 connector) 4423 # FIXME(jdg): get rid of this admin_meta option here, the only thing 4424 # it does is enforce that a volume is R/O, that should be done via a 4425 # type and not *more* metadata 4426 volume_metadata = self.db.volume_admin_metadata_update( 4427 context.elevated(), 4428 attachment_ref.volume_id, 4429 {'attached_mode': mode}, False) 4430 4431 # The prior seting of mode in the attachment_ref overrides any 4432 # settings within the connector when dealing with read only 4433 # attachment options 4434 if mode != 'ro' and attachment_ref.attach_mode == 'ro': 4435 connector['mode'] = 'ro' 4436 mode = 'ro' 4437 4438 try: 4439 if volume_metadata.get('readonly') == 'True' and mode != 'ro': 4440 raise exception.InvalidVolumeAttachMode(mode=mode, 4441 volume_id=vref.id) 4442 utils.require_driver_initialized(self.driver) 4443 self.driver.attach_volume(context, 4444 vref, 4445 attachment_ref.instance_uuid, 4446 connector.get('host', ''), 4447 connector.get('mountpoint', 'na')) 4448 except Exception as err: 4449 self.message_api.create( 4450 context, message_field.Action.UPDATE_ATTACHMENT, 4451 resource_uuid=vref.id, 4452 exception=err) 4453 with excutils.save_and_reraise_exception(): 4454 self.db.volume_attachment_update( 4455 context, attachment_ref.id, 4456 {'attach_status': 4457 fields.VolumeAttachStatus.ERROR_ATTACHING}) 4458 4459 self.db.volume_attached(context.elevated(), 4460 attachment_ref.id, 4461 attachment_ref.instance_uuid, 4462 connector.get('host', ''), 4463 connector.get('mountpoint', 'na'), 4464 mode, 4465 False) 4466 vref.refresh() 4467 attachment_ref.refresh() 4468 self._notify_about_volume_usage(context, vref, "attach.end") 4469 LOG.info("attachment_update completed successfully.", 4470 resource=vref) 4471 return connection_info 4472 4473 def _connection_terminate(self, context, volume, 4474 attachment, force=False): 4475 """Remove a volume connection, but leave attachment. 4476 4477 Exits early if the attachment does not have a connector and returns 4478 None to indicate shared connections are irrelevant. 4479 """ 4480 utils.require_driver_initialized(self.driver) 4481 connector = attachment.connector 4482 if not connector and not force: 4483 # It's possible to attach a volume to a shelved offloaded server 4484 # in nova, and a shelved offloaded server is not on a compute host, 4485 # which means the attachment was made without a host connector, 4486 # so if we don't have a connector we can't terminate a connection 4487 # that was never actually made to the storage backend, so just 4488 # log a message and exit. 4489 LOG.debug('No connector for attachment %s; skipping storage ' 4490 'backend terminate_connection call.', attachment.id) 4491 # None indicates we don't know and don't care. 4492 return None 4493 try: 4494 shared_connections = self.driver.terminate_connection(volume, 4495 connector, 4496 force=force) 4497 if not isinstance(shared_connections, bool): 4498 shared_connections = False 4499 4500 except Exception as err: 4501 err_msg = (_('Terminate volume connection failed: %(err)s') 4502 % {'err': six.text_type(err)}) 4503 LOG.exception(err_msg, resource=volume) 4504 raise exception.VolumeBackendAPIException(data=err_msg) 4505 LOG.info("Terminate volume connection completed successfully.", 4506 resource=volume) 4507 # NOTE(jdg): Return True/False if there are other outstanding 4508 # attachments that share this connection. If True should signify 4509 # caller to preserve the actual host connection (work should be 4510 # done in the brick connector as it has the knowledge of what's 4511 # going on here. 4512 return shared_connections 4513 4514 def attachment_delete(self, context, attachment_id, vref): 4515 """Delete/Detach the specified attachment. 4516 4517 Notifies the backend device that we're detaching the specified 4518 attachment instance. 4519 4520 param: vref: Volume object associated with the attachment 4521 param: attachment: Attachment reference object to remove 4522 4523 NOTE if the attachment reference is None, we remove all existing 4524 attachments for the specified volume object. 4525 """ 4526 attachment_ref = objects.VolumeAttachment.get_by_id(context, 4527 attachment_id) 4528 if not attachment_ref: 4529 for attachment in VA_LIST.get_all_by_volume_id(context, vref.id): 4530 self._do_attachment_delete(context, vref, attachment) 4531 else: 4532 self._do_attachment_delete(context, vref, attachment_ref) 4533 4534 def _do_attachment_delete(self, context, vref, attachment): 4535 utils.require_driver_initialized(self.driver) 4536 self._notify_about_volume_usage(context, vref, "detach.start") 4537 has_shared_connection = self._connection_terminate(context, 4538 vref, 4539 attachment) 4540 try: 4541 LOG.debug('Deleting attachment %(attachment_id)s.', 4542 {'attachment_id': attachment.id}, 4543 resource=vref) 4544 self.driver.detach_volume(context, vref, attachment) 4545 if has_shared_connection is not None and not has_shared_connection: 4546 self.driver.remove_export(context.elevated(), vref) 4547 except Exception: 4548 # FIXME(jdg): Obviously our volume object is going to need some 4549 # changes to deal with multi-attach and figuring out how to 4550 # represent a single failed attach out of multiple attachments 4551 4552 # TODO(jdg): object method here 4553 self.db.volume_attachment_update( 4554 context, attachment.get('id'), 4555 {'attach_status': 'error_detaching'}) 4556 else: 4557 self.db.volume_detached(context.elevated(), vref.id, 4558 attachment.get('id')) 4559 self.db.volume_admin_metadata_delete(context.elevated(), 4560 vref.id, 4561 'attached_mode') 4562 self._notify_about_volume_usage(context, vref, "detach.end") 4563 4564 # Replication group API (Tiramisu) 4565 def enable_replication(self, ctxt, group): 4566 """Enable replication.""" 4567 group.refresh() 4568 if group.replication_status != fields.ReplicationStatus.ENABLING: 4569 msg = _("Replication status in group %s is not " 4570 "enabling. Cannot enable replication.") % group.id 4571 LOG.error(msg) 4572 raise exception.InvalidGroup(reason=msg) 4573 4574 volumes = group.volumes 4575 for vol in volumes: 4576 vol.refresh() 4577 if vol.replication_status != fields.ReplicationStatus.ENABLING: 4578 msg = _("Replication status in volume %s is not " 4579 "enabling. Cannot enable replication.") % vol.id 4580 LOG.error(msg) 4581 raise exception.InvalidVolume(reason=msg) 4582 4583 self._notify_about_group_usage( 4584 ctxt, group, "enable_replication.start") 4585 4586 volumes_model_update = None 4587 model_update = None 4588 try: 4589 utils.require_driver_initialized(self.driver) 4590 4591 model_update, volumes_model_update = ( 4592 self.driver.enable_replication(ctxt, group, volumes)) 4593 4594 if volumes_model_update: 4595 for update in volumes_model_update: 4596 vol_obj = objects.Volume.get_by_id(ctxt, update['id']) 4597 vol_obj.update(update) 4598 vol_obj.save() 4599 # If we failed to enable a volume, make sure the status 4600 # for the group is set to error as well 4601 if (update.get('replication_status') == 4602 fields.ReplicationStatus.ERROR and 4603 model_update.get('replication_status') != 4604 fields.ReplicationStatus.ERROR): 4605 model_update['replication_status'] = update.get( 4606 'replication_status') 4607 4608 if model_update: 4609 if (model_update.get('replication_status') == 4610 fields.ReplicationStatus.ERROR): 4611 msg = _('Enable replication failed.') 4612 LOG.error(msg, 4613 resource={'type': 'group', 4614 'id': group.id}) 4615 raise exception.VolumeDriverException(message=msg) 4616 else: 4617 group.update(model_update) 4618 group.save() 4619 4620 except exception.CinderException as ex: 4621 group.status = fields.GroupStatus.ERROR 4622 group.replication_status = fields.ReplicationStatus.ERROR 4623 group.save() 4624 # Update volume status to 'error' if driver returns 4625 # None for volumes_model_update. 4626 if not volumes_model_update: 4627 for vol in volumes: 4628 vol.status = 'error' 4629 vol.replication_status = fields.ReplicationStatus.ERROR 4630 vol.save() 4631 err_msg = _("Enable replication group failed: " 4632 "%s.") % six.text_type(ex) 4633 raise exception.ReplicationGroupError(reason=err_msg, 4634 group_id=group.id) 4635 4636 for vol in volumes: 4637 vol.replication_status = fields.ReplicationStatus.ENABLED 4638 vol.save() 4639 group.replication_status = fields.ReplicationStatus.ENABLED 4640 group.save() 4641 4642 self._notify_about_group_usage( 4643 ctxt, group, "enable_replication.end", volumes) 4644 LOG.info("Enable replication completed successfully.", 4645 resource={'type': 'group', 4646 'id': group.id}) 4647 4648 # Replication group API (Tiramisu) 4649 def disable_replication(self, ctxt, group): 4650 """Disable replication.""" 4651 group.refresh() 4652 if group.replication_status != fields.ReplicationStatus.DISABLING: 4653 msg = _("Replication status in group %s is not " 4654 "disabling. Cannot disable replication.") % group.id 4655 LOG.error(msg) 4656 raise exception.InvalidGroup(reason=msg) 4657 4658 volumes = group.volumes 4659 for vol in volumes: 4660 vol.refresh() 4661 if (vol.replication_status != 4662 fields.ReplicationStatus.DISABLING): 4663 msg = _("Replication status in volume %s is not " 4664 "disabling. Cannot disable replication.") % vol.id 4665 LOG.error(msg) 4666 raise exception.InvalidVolume(reason=msg) 4667 4668 self._notify_about_group_usage( 4669 ctxt, group, "disable_replication.start") 4670 4671 volumes_model_update = None 4672 model_update = None 4673 try: 4674 utils.require_driver_initialized(self.driver) 4675 4676 model_update, volumes_model_update = ( 4677 self.driver.disable_replication(ctxt, group, volumes)) 4678 4679 if volumes_model_update: 4680 for update in volumes_model_update: 4681 vol_obj = objects.Volume.get_by_id(ctxt, update['id']) 4682 vol_obj.update(update) 4683 vol_obj.save() 4684 # If we failed to enable a volume, make sure the status 4685 # for the group is set to error as well 4686 if (update.get('replication_status') == 4687 fields.ReplicationStatus.ERROR and 4688 model_update.get('replication_status') != 4689 fields.ReplicationStatus.ERROR): 4690 model_update['replication_status'] = update.get( 4691 'replication_status') 4692 4693 if model_update: 4694 if (model_update.get('replication_status') == 4695 fields.ReplicationStatus.ERROR): 4696 msg = _('Disable replication failed.') 4697 LOG.error(msg, 4698 resource={'type': 'group', 4699 'id': group.id}) 4700 raise exception.VolumeDriverException(message=msg) 4701 else: 4702 group.update(model_update) 4703 group.save() 4704 4705 except exception.CinderException as ex: 4706 group.status = fields.GroupStatus.ERROR 4707 group.replication_status = fields.ReplicationStatus.ERROR 4708 group.save() 4709 # Update volume status to 'error' if driver returns 4710 # None for volumes_model_update. 4711 if not volumes_model_update: 4712 for vol in volumes: 4713 vol.status = 'error' 4714 vol.replication_status = fields.ReplicationStatus.ERROR 4715 vol.save() 4716 err_msg = _("Disable replication group failed: " 4717 "%s.") % six.text_type(ex) 4718 raise exception.ReplicationGroupError(reason=err_msg, 4719 group_id=group.id) 4720 4721 for vol in volumes: 4722 vol.replication_status = fields.ReplicationStatus.DISABLED 4723 vol.save() 4724 group.replication_status = fields.ReplicationStatus.DISABLED 4725 group.save() 4726 4727 self._notify_about_group_usage( 4728 ctxt, group, "disable_replication.end", volumes) 4729 LOG.info("Disable replication completed successfully.", 4730 resource={'type': 'group', 4731 'id': group.id}) 4732 4733 # Replication group API (Tiramisu) 4734 def failover_replication(self, ctxt, group, allow_attached_volume=False, 4735 secondary_backend_id=None): 4736 """Failover replication.""" 4737 group.refresh() 4738 if group.replication_status != fields.ReplicationStatus.FAILING_OVER: 4739 msg = _("Replication status in group %s is not " 4740 "failing-over. Cannot failover replication.") % group.id 4741 LOG.error(msg) 4742 raise exception.InvalidGroup(reason=msg) 4743 4744 volumes = group.volumes 4745 for vol in volumes: 4746 vol.refresh() 4747 if vol.status == 'in-use' and not allow_attached_volume: 4748 msg = _("Volume %s is attached but allow_attached_volume flag " 4749 "is False. Cannot failover replication.") % vol.id 4750 LOG.error(msg) 4751 raise exception.InvalidVolume(reason=msg) 4752 if (vol.replication_status != 4753 fields.ReplicationStatus.FAILING_OVER): 4754 msg = _("Replication status in volume %s is not " 4755 "failing-over. Cannot failover replication.") % vol.id 4756 LOG.error(msg) 4757 raise exception.InvalidVolume(reason=msg) 4758 4759 self._notify_about_group_usage( 4760 ctxt, group, "failover_replication.start") 4761 4762 volumes_model_update = None 4763 model_update = None 4764 try: 4765 utils.require_driver_initialized(self.driver) 4766 4767 model_update, volumes_model_update = ( 4768 self.driver.failover_replication( 4769 ctxt, group, volumes, secondary_backend_id)) 4770 4771 if volumes_model_update: 4772 for update in volumes_model_update: 4773 vol_obj = objects.Volume.get_by_id(ctxt, update['id']) 4774 vol_obj.update(update) 4775 vol_obj.save() 4776 # If we failed to enable a volume, make sure the status 4777 # for the group is set to error as well 4778 if (update.get('replication_status') == 4779 fields.ReplicationStatus.ERROR and 4780 model_update.get('replication_status') != 4781 fields.ReplicationStatus.ERROR): 4782 model_update['replication_status'] = update.get( 4783 'replication_status') 4784 4785 if model_update: 4786 if (model_update.get('replication_status') == 4787 fields.ReplicationStatus.ERROR): 4788 msg = _('Failover replication failed.') 4789 LOG.error(msg, 4790 resource={'type': 'group', 4791 'id': group.id}) 4792 raise exception.VolumeDriverException(message=msg) 4793 else: 4794 group.update(model_update) 4795 group.save() 4796 4797 except exception.CinderException as ex: 4798 group.status = fields.GroupStatus.ERROR 4799 group.replication_status = fields.ReplicationStatus.ERROR 4800 group.save() 4801 # Update volume status to 'error' if driver returns 4802 # None for volumes_model_update. 4803 if not volumes_model_update: 4804 for vol in volumes: 4805 vol.status = 'error' 4806 vol.replication_status = fields.ReplicationStatus.ERROR 4807 vol.save() 4808 err_msg = _("Failover replication group failed: " 4809 "%s.") % six.text_type(ex) 4810 raise exception.ReplicationGroupError(reason=err_msg, 4811 group_id=group.id) 4812 4813 for vol in volumes: 4814 if secondary_backend_id == "default": 4815 vol.replication_status = fields.ReplicationStatus.ENABLED 4816 else: 4817 vol.replication_status = ( 4818 fields.ReplicationStatus.FAILED_OVER) 4819 vol.save() 4820 if secondary_backend_id == "default": 4821 group.replication_status = fields.ReplicationStatus.ENABLED 4822 else: 4823 group.replication_status = fields.ReplicationStatus.FAILED_OVER 4824 group.save() 4825 4826 self._notify_about_group_usage( 4827 ctxt, group, "failover_replication.end", volumes) 4828 LOG.info("Failover replication completed successfully.", 4829 resource={'type': 'group', 4830 'id': group.id}) 4831 4832 def list_replication_targets(self, ctxt, group): 4833 """Provide a means to obtain replication targets for a group. 4834 4835 This method is used to find the replication_device config 4836 info. 'backend_id' is a required key in 'replication_device'. 4837 4838 Response Example for admin: 4839 4840 .. code:: json 4841 4842 { 4843 'replication_targets': [ 4844 { 4845 'backend_id': 'vendor-id-1', 4846 'unique_key': 'val1', 4847 ...... 4848 }, 4849 { 4850 'backend_id': 'vendor-id-2', 4851 'unique_key': 'val2', 4852 ...... 4853 } 4854 ] 4855 } 4856 4857 Response example for non-admin: 4858 4859 .. code json 4860 4861 { 4862 'replication_targets': [ 4863 { 4864 'backend_id': 'vendor-id-1' 4865 }, 4866 { 4867 'backend_id': 'vendor-id-2' 4868 } 4869 ] 4870 } 4871 4872 """ 4873 4874 replication_targets = [] 4875 try: 4876 group.refresh() 4877 if self.configuration.replication_device: 4878 if ctxt.is_admin: 4879 for rep_dev in self.configuration.replication_device: 4880 keys = rep_dev.keys() 4881 dev = {} 4882 for k in keys: 4883 dev[k] = rep_dev[k] 4884 replication_targets.append(dev) 4885 else: 4886 for rep_dev in self.configuration.replication_device: 4887 dev = rep_dev.get('backend_id') 4888 if dev: 4889 replication_targets.append({'backend_id': dev}) 4890 4891 except exception.GroupNotFound: 4892 err_msg = (_("Get replication targets failed. Group %s not " 4893 "found.") % group.id) 4894 LOG.exception(err_msg) 4895 raise exception.VolumeBackendAPIException(data=err_msg) 4896 4897 return {'replication_targets': replication_targets} 4898