1# Copyright 2010 United States Government as represented by the 2# Administrator of the National Aeronautics and Space Administration. 3# All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17"""Handles all requests relating to volumes.""" 18 19import ast 20import collections 21import datetime 22 23from castellan import key_manager 24from oslo_config import cfg 25from oslo_log import log as logging 26from oslo_utils import excutils 27from oslo_utils import strutils 28from oslo_utils import timeutils 29from oslo_utils import versionutils 30import six 31 32from cinder.api import common 33from cinder.common import constants 34from cinder import context 35from cinder import coordination 36from cinder import db 37from cinder.db import base 38from cinder import exception 39from cinder import flow_utils 40from cinder.i18n import _ 41from cinder.image import cache as image_cache 42from cinder.image import glance 43from cinder.message import api as message_api 44from cinder.message import message_field 45from cinder import objects 46from cinder.objects import base as objects_base 47from cinder.objects import fields 48from cinder.objects import volume_type 49from cinder.policies import attachments as attachment_policy 50from cinder.policies import services as svr_policy 51from cinder.policies import snapshot_metadata as s_meta_policy 52from cinder.policies import snapshots as snapshot_policy 53from cinder.policies import volume_actions as vol_action_policy 54from cinder.policies import volume_metadata as vol_meta_policy 55from cinder.policies import volume_transfer as vol_transfer_policy 56from cinder.policies import volumes as vol_policy 57from cinder import quota 58from cinder import quota_utils 59from cinder.scheduler import rpcapi as scheduler_rpcapi 60from cinder import utils 61from cinder.volume.flows.api import create_volume 62from cinder.volume.flows.api import manage_existing 63from cinder.volume import rpcapi as volume_rpcapi 64from cinder.volume import utils as volume_utils 65from cinder.volume import volume_types 66 67allow_force_upload_opt = cfg.BoolOpt('enable_force_upload', 68 default=False, 69 help='Enables the Force option on ' 70 'upload_to_image. This enables ' 71 'running upload_volume on in-use ' 72 'volumes for backends that ' 73 'support it.') 74volume_host_opt = cfg.BoolOpt('snapshot_same_host', 75 default=True, 76 help='Create volume from snapshot at the host ' 77 'where snapshot resides') 78volume_same_az_opt = cfg.BoolOpt('cloned_volume_same_az', 79 default=True, 80 help='Ensure that the new volumes are the ' 81 'same AZ as snapshot or source volume') 82az_cache_time_opt = cfg.IntOpt('az_cache_duration', 83 default=3600, 84 help='Cache volume availability zones in ' 85 'memory for the provided duration in ' 86 'seconds') 87 88CONF = cfg.CONF 89CONF.register_opt(allow_force_upload_opt) 90CONF.register_opt(volume_host_opt) 91CONF.register_opt(volume_same_az_opt) 92CONF.register_opt(az_cache_time_opt) 93 94CONF.import_opt('glance_core_properties', 'cinder.image.glance') 95 96LOG = logging.getLogger(__name__) 97QUOTAS = quota.QUOTAS 98AO_LIST = objects.VolumeAttachmentList 99 100 101class API(base.Base): 102 """API for interacting with the volume manager.""" 103 104 AVAILABLE_MIGRATION_STATUS = (None, 'deleting', 'error', 'success') 105 106 def __init__(self, db_driver=None, image_service=None): 107 self.image_service = (image_service or 108 glance.get_default_image_service()) 109 self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() 110 self.volume_rpcapi = volume_rpcapi.VolumeAPI() 111 self.availability_zones = [] 112 self.availability_zones_last_fetched = None 113 self.key_manager = key_manager.API(CONF) 114 self.message = message_api.API() 115 super(API, self).__init__(db_driver) 116 117 def list_availability_zones(self, enable_cache=False, refresh_cache=False): 118 """Describe the known availability zones 119 120 :param enable_cache: Enable az cache 121 :param refresh_cache: Refresh cache immediately 122 :return: tuple of dicts, each with a 'name' and 'available' key 123 """ 124 if enable_cache: 125 if self.availability_zones_last_fetched is None: 126 refresh_cache = True 127 else: 128 cache_age = timeutils.delta_seconds( 129 self.availability_zones_last_fetched, 130 timeutils.utcnow()) 131 if cache_age >= CONF.az_cache_duration: 132 refresh_cache = True 133 if refresh_cache or not enable_cache: 134 topic = constants.VOLUME_TOPIC 135 ctxt = context.get_admin_context() 136 services = objects.ServiceList.get_all_by_topic(ctxt, topic) 137 az_data = [(s.availability_zone, s.disabled) 138 for s in services] 139 disabled_map = {} 140 for (az_name, disabled) in az_data: 141 tracked_disabled = disabled_map.get(az_name, True) 142 disabled_map[az_name] = tracked_disabled and disabled 143 azs = [{'name': name, 'available': not disabled} 144 for (name, disabled) in disabled_map.items()] 145 if refresh_cache: 146 now = timeutils.utcnow() 147 self.availability_zones = azs 148 self.availability_zones_last_fetched = now 149 LOG.debug("Availability zone cache updated, next update will" 150 " occur around %s.", now + datetime.timedelta( 151 seconds=CONF.az_cache_duration)) 152 else: 153 azs = self.availability_zones 154 LOG.info("Availability Zones retrieved successfully.") 155 return tuple(azs) 156 157 def _retype_is_possible(self, context, 158 source_type, target_type): 159 elevated = context.elevated() 160 # If encryptions are different, it is not allowed 161 # to create volume from source volume or snapshot. 162 if volume_types.volume_types_encryption_changed( 163 elevated, 164 source_type.id if source_type else None, 165 target_type.id if target_type else None): 166 return False 167 services = objects.ServiceList.get_all_by_topic( 168 elevated, 169 constants.VOLUME_TOPIC, 170 disabled=True) 171 if len(services.objects) == 1: 172 return True 173 174 source_extra_specs = {} 175 if source_type: 176 with source_type.obj_as_admin(): 177 source_extra_specs = source_type.extra_specs 178 target_extra_specs = {} 179 if target_type: 180 with target_type.obj_as_admin(): 181 target_extra_specs = target_type.extra_specs 182 if (volume_utils.matching_backend_name( 183 source_extra_specs, target_extra_specs)): 184 return True 185 return False 186 187 def _is_volume_migrating(self, volume): 188 # The migration status 'none' means no migration has ever been done 189 # before. The migration status 'error' means the previous migration 190 # failed. The migration status 'success' means the previous migration 191 # succeeded. The migration status 'deleting' means the source volume 192 # fails to delete after a migration. 193 # All of the statuses above means the volume is not in the process 194 # of a migration. 195 return (volume['migration_status'] not in 196 self.AVAILABLE_MIGRATION_STATUS) 197 198 def _is_multiattach(self, volume_type): 199 specs = getattr(volume_type, 'extra_specs', {}) 200 return specs.get('multiattach', 'False') == '<is> True' 201 202 def _is_encrypted(self, volume_type): 203 specs = volume_type.get('extra_specs', {}) 204 if 'encryption' not in specs: 205 return False 206 return specs.get('encryption', {}) is not {} 207 208 def create(self, context, size, name, description, snapshot=None, 209 image_id=None, volume_type=None, metadata=None, 210 availability_zone=None, source_volume=None, 211 scheduler_hints=None, 212 source_replica=None, consistencygroup=None, 213 cgsnapshot=None, multiattach=False, source_cg=None, 214 group=None, group_snapshot=None, source_group=None, 215 backup=None): 216 217 if image_id: 218 context.authorize(vol_policy.CREATE_FROM_IMAGE_POLICY) 219 else: 220 context.authorize(vol_policy.CREATE_POLICY) 221 222 # Check up front for legacy replication parameters to quick fail 223 if source_replica: 224 msg = _("Creating a volume from a replica source was part of the " 225 "replication v1 implementation which is no longer " 226 "available.") 227 raise exception.InvalidInput(reason=msg) 228 229 # NOTE(jdg): we can have a create without size if we're 230 # doing a create from snap or volume. Currently 231 # the taskflow api will handle this and pull in the 232 # size from the source. 233 234 # NOTE(jdg): cinderclient sends in a string representation 235 # of the size value. BUT there is a possibility that somebody 236 # could call the API directly so the is_int_like check 237 # handles both cases (string representation of true float or int). 238 if size and (not strutils.is_int_like(size) or int(size) <= 0): 239 msg = _('Invalid volume size provided for create request: %s ' 240 '(size argument must be an integer (or string ' 241 'representation of an integer) and greater ' 242 'than zero).') % size 243 raise exception.InvalidInput(reason=msg) 244 245 if consistencygroup and (not cgsnapshot and not source_cg): 246 if not volume_type: 247 msg = _("volume_type must be provided when creating " 248 "a volume in a consistency group.") 249 raise exception.InvalidInput(reason=msg) 250 cg_voltypeids = consistencygroup.volume_type_id 251 if volume_type.id not in cg_voltypeids: 252 msg = _("Invalid volume_type provided: %s (requested " 253 "type must be supported by this consistency " 254 "group).") % volume_type 255 raise exception.InvalidInput(reason=msg) 256 257 if group and (not group_snapshot and not source_group): 258 if not volume_type: 259 msg = _("volume_type must be provided when creating " 260 "a volume in a group.") 261 raise exception.InvalidInput(reason=msg) 262 vol_type_ids = [v_type.id for v_type in group.volume_types] 263 if volume_type.id not in vol_type_ids: 264 msg = _("Invalid volume_type provided: %s (requested " 265 "type must be supported by this " 266 "group).") % volume_type 267 raise exception.InvalidInput(reason=msg) 268 269 if source_volume and volume_type: 270 if volume_type.id != source_volume.volume_type_id: 271 if not self._retype_is_possible( 272 context, 273 source_volume.volume_type, 274 volume_type): 275 msg = _("Invalid volume_type provided: %s (requested type " 276 "is not compatible; either match source volume, " 277 "or omit type argument).") % volume_type.id 278 raise exception.InvalidInput(reason=msg) 279 280 if snapshot and volume_type: 281 if volume_type.id != snapshot.volume_type_id: 282 if not self._retype_is_possible(context, 283 snapshot.volume.volume_type, 284 volume_type): 285 msg = _("Invalid volume_type provided: %s (requested " 286 "type is not compatible; recommend omitting " 287 "the type argument).") % volume_type.id 288 raise exception.InvalidInput(reason=msg) 289 290 # Determine the valid availability zones that the volume could be 291 # created in (a task in the flow will/can use this information to 292 # ensure that the availability zone requested is valid). 293 raw_zones = self.list_availability_zones(enable_cache=True) 294 availability_zones = set([az['name'] for az in raw_zones]) 295 if CONF.storage_availability_zone: 296 availability_zones.add(CONF.storage_availability_zone) 297 298 utils.check_metadata_properties(metadata) 299 300 create_what = { 301 'context': context, 302 'raw_size': size, 303 'name': name, 304 'description': description, 305 'snapshot': snapshot, 306 'image_id': image_id, 307 'raw_volume_type': volume_type, 308 'metadata': metadata or {}, 309 'raw_availability_zone': availability_zone, 310 'source_volume': source_volume, 311 'scheduler_hints': scheduler_hints, 312 'key_manager': self.key_manager, 313 'optional_args': {'is_quota_committed': False}, 314 'consistencygroup': consistencygroup, 315 'cgsnapshot': cgsnapshot, 316 'raw_multiattach': multiattach, 317 'group': group, 318 'group_snapshot': group_snapshot, 319 'source_group': source_group, 320 'backup': backup, 321 } 322 try: 323 sched_rpcapi = (self.scheduler_rpcapi if ( 324 not cgsnapshot and not source_cg and 325 not group_snapshot and not source_group) 326 else None) 327 volume_rpcapi = (self.volume_rpcapi if ( 328 not cgsnapshot and not source_cg and 329 not group_snapshot and not source_group) 330 else None) 331 flow_engine = create_volume.get_flow(self.db, 332 self.image_service, 333 availability_zones, 334 create_what, 335 sched_rpcapi, 336 volume_rpcapi) 337 except Exception: 338 msg = _('Failed to create api volume flow.') 339 LOG.exception(msg) 340 raise exception.CinderException(msg) 341 342 # Attaching this listener will capture all of the notifications that 343 # taskflow sends out and redirect them to a more useful log for 344 # cinders debugging (or error reporting) usage. 345 with flow_utils.DynamicLogListener(flow_engine, logger=LOG): 346 try: 347 flow_engine.run() 348 vref = flow_engine.storage.fetch('volume') 349 # NOTE(tommylikehu): If the target az is not hit, 350 # refresh the az cache immediately. 351 if flow_engine.storage.fetch('refresh_az'): 352 self.list_availability_zones(enable_cache=True, 353 refresh_cache=True) 354 # Refresh the object here, otherwise things ain't right 355 vref = objects.Volume.get_by_id( 356 context, vref['id']) 357 vref.save() 358 LOG.info("Create volume request issued successfully.", 359 resource=vref) 360 return vref 361 except exception.InvalidAvailabilityZone: 362 with excutils.save_and_reraise_exception(): 363 self.list_availability_zones(enable_cache=True, 364 refresh_cache=True) 365 366 def revert_to_snapshot(self, context, volume, snapshot): 367 """revert a volume to a snapshot""" 368 context.authorize(vol_action_policy.REVERT_POLICY, 369 target_obj=volume) 370 v_res = volume.update_single_status_where( 371 'reverting', 'available') 372 if not v_res: 373 msg = _("Can't revert volume %s to its latest snapshot. " 374 "Volume's status must be 'available'.") % volume.id 375 raise exception.InvalidVolume(reason=msg) 376 s_res = snapshot.update_single_status_where( 377 fields.SnapshotStatus.RESTORING, 378 fields.SnapshotStatus.AVAILABLE) 379 if not s_res: 380 msg = _("Can't revert volume %s to its latest snapshot. " 381 "Snapshot's status must be 'available'.") % snapshot.id 382 raise exception.InvalidSnapshot(reason=msg) 383 384 self.volume_rpcapi.revert_to_snapshot(context, volume, snapshot) 385 386 def delete(self, context, volume, 387 force=False, 388 unmanage_only=False, 389 cascade=False): 390 context.authorize(vol_policy.DELETE_POLICY, target_obj=volume) 391 if context.is_admin and context.project_id != volume.project_id: 392 project_id = volume.project_id 393 else: 394 project_id = context.project_id 395 396 if not volume.host: 397 volume_utils.notify_about_volume_usage(context, 398 volume, "delete.start") 399 # NOTE(vish): scheduling failed, so delete it 400 # Note(zhiteng): update volume quota reservation 401 try: 402 reservations = None 403 if volume.status != 'error_managing': 404 LOG.debug("Decrease volume quotas only if status is not " 405 "error_managing.") 406 reserve_opts = {'volumes': -1, 'gigabytes': -volume.size} 407 QUOTAS.add_volume_type_opts(context, 408 reserve_opts, 409 volume.volume_type_id) 410 reservations = QUOTAS.reserve(context, 411 project_id=project_id, 412 **reserve_opts) 413 except Exception: 414 LOG.exception("Failed to update quota while " 415 "deleting volume.") 416 volume.destroy() 417 418 if reservations: 419 QUOTAS.commit(context, reservations, project_id=project_id) 420 421 volume_utils.notify_about_volume_usage(context, 422 volume, "delete.end") 423 LOG.info("Delete volume request issued successfully.", 424 resource={'type': 'volume', 425 'id': volume.id}) 426 return 427 428 if not unmanage_only: 429 volume.assert_not_frozen() 430 431 if unmanage_only and volume.encryption_key_id is not None: 432 msg = _("Unmanaging encrypted volumes is not supported.") 433 e = exception.Invalid(reason=msg) 434 self.message.create( 435 context, 436 message_field.Action.UNMANAGE_VOLUME, 437 resource_uuid=volume.id, 438 detail=message_field.Detail.UNMANAGE_ENC_NOT_SUPPORTED, 439 exception=e) 440 raise e 441 442 # Build required conditions for conditional update 443 expected = { 444 'attach_status': db.Not(fields.VolumeAttachStatus.ATTACHED), 445 'migration_status': self.AVAILABLE_MIGRATION_STATUS, 446 'consistencygroup_id': None, 447 'group_id': None} 448 449 # If not force deleting we have status conditions 450 if not force: 451 expected['status'] = ('available', 'error', 'error_restoring', 452 'error_extending', 'error_managing') 453 454 if cascade: 455 if force: 456 # Ignore status checks, but ensure snapshots are not part 457 # of a cgsnapshot. 458 filters = [~db.volume_has_snapshots_in_a_cgsnapshot_filter()] 459 else: 460 # Allow deletion if all snapshots are in an expected state 461 filters = [~db.volume_has_undeletable_snapshots_filter()] 462 # Check if the volume has snapshots which are existing in 463 # other project now. 464 if not context.is_admin: 465 filters.append(~db.volume_has_other_project_snp_filter()) 466 else: 467 # Don't allow deletion of volume with snapshots 468 filters = [~db.volume_has_snapshots_filter()] 469 values = {'status': 'deleting', 'terminated_at': timeutils.utcnow()} 470 if unmanage_only is True: 471 values['status'] = 'unmanaging' 472 if volume.status == 'error_managing': 473 values['status'] = 'error_managing_deleting' 474 475 result = volume.conditional_update(values, expected, filters) 476 477 if not result: 478 status = utils.build_or_str(expected.get('status'), 479 _('status must be %s and')) 480 msg = _('Volume %s must not be migrating, attached, belong to a ' 481 'group, have snapshots or be disassociated from ' 482 'snapshots after volume transfer.') % status 483 LOG.info(msg) 484 raise exception.InvalidVolume(reason=msg) 485 486 if cascade: 487 values = {'status': 'deleting'} 488 expected = {'cgsnapshot_id': None, 489 'group_snapshot_id': None} 490 if not force: 491 expected['status'] = ('available', 'error', 'deleting') 492 493 snapshots = objects.snapshot.SnapshotList.get_all_for_volume( 494 context, volume.id) 495 for s in snapshots: 496 result = s.conditional_update(values, expected, filters) 497 498 if not result: 499 volume.update({'status': 'error_deleting'}) 500 volume.save() 501 502 msg = _('Failed to update snapshot.') 503 raise exception.InvalidVolume(reason=msg) 504 505 cache = image_cache.ImageVolumeCache(self.db, self) 506 entry = cache.get_by_image_volume(context, volume.id) 507 if entry: 508 cache.evict(context, entry) 509 510 # If the volume is encrypted, delete its encryption key from the key 511 # manager. This operation makes volume deletion an irreversible process 512 # because the volume cannot be decrypted without its key. 513 encryption_key_id = volume.get('encryption_key_id', None) 514 if encryption_key_id is not None: 515 try: 516 volume_utils.delete_encryption_key(context, 517 self.key_manager, 518 encryption_key_id) 519 except Exception as e: 520 volume.update({'status': 'error_deleting'}) 521 volume.save() 522 if hasattr(e, 'msg'): 523 msg = _("Unable to delete encryption key for " 524 "volume: %s") % (e.msg) 525 else: 526 msg = _("Unable to delete encryption key for volume.") 527 LOG.error(msg) 528 raise exception.InvalidVolume(reason=msg) 529 530 self.volume_rpcapi.delete_volume(context, 531 volume, 532 unmanage_only, 533 cascade) 534 LOG.info("Delete volume request issued successfully.", 535 resource=volume) 536 537 def update(self, context, volume, fields): 538 context.authorize(vol_policy.UPDATE_POLICY, target_obj=volume) 539 # TODO(karthikp): Making sure volume is always oslo-versioned 540 # If not we convert it at the start of update method. This check 541 # needs to be removed once we have moved to ovo. 542 if not isinstance(volume, objects_base.CinderObject): 543 vol_obj = objects.Volume() 544 volume = objects.Volume._from_db_object(context, vol_obj, volume) 545 546 if volume.status == 'maintenance': 547 LOG.info("Unable to update volume, " 548 "because it is in maintenance.", resource=volume) 549 msg = _("The volume cannot be updated during maintenance.") 550 raise exception.InvalidVolume(reason=msg) 551 552 utils.check_metadata_properties(fields.get('metadata', None)) 553 554 volume.update(fields) 555 volume.save() 556 LOG.info("Volume updated successfully.", resource=volume) 557 558 def get(self, context, volume_id, viewable_admin_meta=False): 559 volume = objects.Volume.get_by_id(context, volume_id) 560 561 try: 562 context.authorize(vol_policy.GET_POLICY, target_obj=volume) 563 except exception.PolicyNotAuthorized: 564 # raise VolumeNotFound to avoid providing info about 565 # the existence of an unauthorized volume id 566 raise exception.VolumeNotFound(volume_id=volume_id) 567 568 if viewable_admin_meta: 569 ctxt = context.elevated() 570 admin_metadata = self.db.volume_admin_metadata_get(ctxt, 571 volume_id) 572 volume.admin_metadata = admin_metadata 573 volume.obj_reset_changes() 574 575 LOG.info("Volume info retrieved successfully.", resource=volume) 576 return volume 577 578 def calculate_resource_count(self, context, resource_type, filters): 579 filters = filters if filters else {} 580 allTenants = utils.get_bool_param('all_tenants', filters) 581 if context.is_admin and allTenants: 582 del filters['all_tenants'] 583 else: 584 filters['project_id'] = context.project_id 585 return db.calculate_resource_count(context, resource_type, filters) 586 587 def get_all(self, context, marker=None, limit=None, sort_keys=None, 588 sort_dirs=None, filters=None, viewable_admin_meta=False, 589 offset=None): 590 context.authorize(vol_policy.GET_ALL_POLICY) 591 592 if filters is None: 593 filters = {} 594 595 allTenants = utils.get_bool_param('all_tenants', filters) 596 597 try: 598 if limit is not None: 599 limit = int(limit) 600 if limit < 0: 601 msg = _('limit param must be positive') 602 raise exception.InvalidInput(reason=msg) 603 except ValueError: 604 msg = _('limit param must be an integer') 605 raise exception.InvalidInput(reason=msg) 606 607 # Non-admin shouldn't see temporary target of a volume migration, add 608 # unique filter data to reflect that only volumes with a NULL 609 # 'migration_status' or a 'migration_status' that does not start with 610 # 'target:' should be returned (processed in db/sqlalchemy/api.py) 611 if not context.is_admin: 612 filters['no_migration_targets'] = True 613 614 if filters: 615 LOG.debug("Searching by: %s.", six.text_type(filters)) 616 617 if context.is_admin and allTenants: 618 # Need to remove all_tenants to pass the filtering below. 619 del filters['all_tenants'] 620 volumes = objects.VolumeList.get_all(context, marker, limit, 621 sort_keys=sort_keys, 622 sort_dirs=sort_dirs, 623 filters=filters, 624 offset=offset) 625 else: 626 if viewable_admin_meta: 627 context = context.elevated() 628 volumes = objects.VolumeList.get_all_by_project( 629 context, context.project_id, marker, limit, 630 sort_keys=sort_keys, sort_dirs=sort_dirs, filters=filters, 631 offset=offset) 632 633 LOG.info("Get all volumes completed successfully.") 634 return volumes 635 636 def get_volume_summary(self, context, filters=None): 637 context.authorize(vol_policy.GET_ALL_POLICY) 638 639 if filters is None: 640 filters = {} 641 642 all_tenants = utils.get_bool_param('all_tenants', filters) 643 filters.pop('all_tenants', None) 644 project_only = not (all_tenants and context.is_admin) 645 volumes = objects.VolumeList.get_volume_summary(context, project_only) 646 647 LOG.info("Get summary completed successfully.") 648 return volumes 649 650 def get_snapshot(self, context, snapshot_id): 651 snapshot = objects.Snapshot.get_by_id(context, snapshot_id) 652 context.authorize(snapshot_policy.GET_POLICY, target_obj=snapshot) 653 654 # FIXME(jdg): The objects don't have the db name entries 655 # so build the resource tag manually for now. 656 LOG.info("Snapshot retrieved successfully.", 657 resource={'type': 'snapshot', 658 'id': snapshot.id}) 659 return snapshot 660 661 def get_volume(self, context, volume_id): 662 volume = objects.Volume.get_by_id(context, volume_id) 663 context.authorize(vol_policy.GET_POLICY, target_obj=volume) 664 LOG.info("Volume retrieved successfully.", resource=volume) 665 return volume 666 667 def get_all_snapshots(self, context, search_opts=None, marker=None, 668 limit=None, sort_keys=None, sort_dirs=None, 669 offset=None): 670 context.authorize(snapshot_policy.GET_ALL_POLICY) 671 672 search_opts = search_opts or {} 673 674 if context.is_admin and 'all_tenants' in search_opts: 675 # Need to remove all_tenants to pass the filtering below. 676 del search_opts['all_tenants'] 677 snapshots = objects.SnapshotList.get_all( 678 context, search_opts, marker, limit, sort_keys, sort_dirs, 679 offset) 680 else: 681 snapshots = objects.SnapshotList.get_all_by_project( 682 context, context.project_id, search_opts, marker, limit, 683 sort_keys, sort_dirs, offset) 684 685 LOG.info("Get all snapshots completed successfully.") 686 return snapshots 687 688 def reserve_volume(self, context, volume): 689 context.authorize(vol_action_policy.RESERVE_POLICY, target_obj=volume) 690 expected = {'multiattach': volume.multiattach, 691 'status': (('available', 'in-use') if volume.multiattach 692 else 'available')} 693 694 result = volume.conditional_update({'status': 'attaching'}, expected) 695 696 if not result: 697 expected_status = utils.build_or_str(expected['status']) 698 msg = _('Volume status must be %(expected)s to reserve, but the ' 699 'status is %(current)s.') % {'expected': expected_status, 700 'current': volume.status} 701 LOG.error(msg) 702 raise exception.InvalidVolume(reason=msg) 703 704 LOG.info("Reserve volume completed successfully.", 705 resource=volume) 706 707 def unreserve_volume(self, context, volume): 708 context.authorize(vol_action_policy.UNRESERVE_POLICY, 709 target_obj=volume) 710 expected = {'status': 'attaching'} 711 # Status change depends on whether it has attachments (in-use) or not 712 # (available) 713 value = {'status': db.Case([(db.volume_has_attachments_filter(), 714 'in-use')], 715 else_='available')} 716 result = volume.conditional_update(value, expected) 717 if not result: 718 LOG.debug("Attempted to unreserve volume that was not " 719 "reserved, nothing to do.", 720 resource=volume) 721 return 722 723 LOG.info("Unreserve volume completed successfully.", 724 resource=volume) 725 726 def begin_detaching(self, context, volume): 727 context.authorize(vol_action_policy.BEGIN_DETACHING_POLICY, 728 target_obj=volume) 729 # If we are in the middle of a volume migration, we don't want the 730 # user to see that the volume is 'detaching'. Having 731 # 'migration_status' set will have the same effect internally. 732 expected = {'status': 'in-use', 733 'attach_status': fields.VolumeAttachStatus.ATTACHED, 734 'migration_status': self.AVAILABLE_MIGRATION_STATUS} 735 736 result = volume.conditional_update({'status': 'detaching'}, expected) 737 738 if not (result or self._is_volume_migrating(volume)): 739 msg = _("Unable to detach volume. Volume status must be 'in-use' " 740 "and attach_status must be 'attached' to detach.") 741 LOG.error(msg) 742 raise exception.InvalidVolume(reason=msg) 743 744 LOG.info("Begin detaching volume completed successfully.", 745 resource=volume) 746 747 def roll_detaching(self, context, volume): 748 context.authorize(vol_action_policy.ROLL_DETACHING_POLICY, 749 target_obj=volume) 750 volume.conditional_update({'status': 'in-use'}, 751 {'status': 'detaching'}) 752 LOG.info("Roll detaching of volume completed successfully.", 753 resource=volume) 754 755 def attach(self, context, volume, instance_uuid, host_name, 756 mountpoint, mode): 757 context.authorize(vol_action_policy.ATTACH_POLICY, 758 target_obj=volume) 759 if volume.status == 'maintenance': 760 LOG.info('Unable to attach volume, ' 761 'because it is in maintenance.', resource=volume) 762 msg = _("The volume cannot be attached in maintenance mode.") 763 raise exception.InvalidVolume(reason=msg) 764 765 # We add readonly metadata if it doesn't already exist 766 readonly = self.update_volume_admin_metadata(context.elevated(), 767 volume, 768 {'readonly': 'False'}, 769 update=False)['readonly'] 770 if readonly == 'True' and mode != 'ro': 771 raise exception.InvalidVolumeAttachMode(mode=mode, 772 volume_id=volume.id) 773 774 attach_results = self.volume_rpcapi.attach_volume(context, 775 volume, 776 instance_uuid, 777 host_name, 778 mountpoint, 779 mode) 780 LOG.info("Attach volume completed successfully.", 781 resource=volume) 782 return attach_results 783 784 def detach(self, context, volume, attachment_id): 785 context.authorize(vol_action_policy.DETACH_POLICY, 786 target_obj=volume) 787 if volume['status'] == 'maintenance': 788 LOG.info('Unable to detach volume, ' 789 'because it is in maintenance.', resource=volume) 790 msg = _("The volume cannot be detached in maintenance mode.") 791 raise exception.InvalidVolume(reason=msg) 792 detach_results = self.volume_rpcapi.detach_volume(context, volume, 793 attachment_id) 794 LOG.info("Detach volume completed successfully.", 795 resource=volume) 796 return detach_results 797 798 def initialize_connection(self, context, volume, connector): 799 context.authorize(vol_action_policy.INITIALIZE_POLICY, 800 target_obj=volume) 801 if volume.status == 'maintenance': 802 LOG.info('Unable to initialize the connection for ' 803 'volume, because it is in ' 804 'maintenance.', resource=volume) 805 msg = _("The volume connection cannot be initialized in " 806 "maintenance mode.") 807 raise exception.InvalidVolume(reason=msg) 808 init_results = self.volume_rpcapi.initialize_connection(context, 809 volume, 810 connector) 811 LOG.info("Initialize volume connection completed successfully.", 812 resource=volume) 813 return init_results 814 815 def terminate_connection(self, context, volume, connector, force=False): 816 context.authorize(vol_action_policy.TERMINATE_POLICY, 817 target_obj=volume) 818 self.volume_rpcapi.terminate_connection(context, 819 volume, 820 connector, 821 force) 822 LOG.info("Terminate volume connection completed successfully.", 823 resource=volume) 824 self.unreserve_volume(context, volume) 825 826 def accept_transfer(self, context, volume, new_user, new_project): 827 context.authorize(vol_transfer_policy.ACCEPT_POLICY, 828 target_obj=volume) 829 if volume['status'] == 'maintenance': 830 LOG.info('Unable to accept transfer for volume, ' 831 'because it is in maintenance.', resource=volume) 832 msg = _("The volume cannot accept transfer in maintenance mode.") 833 raise exception.InvalidVolume(reason=msg) 834 results = self.volume_rpcapi.accept_transfer(context, 835 volume, 836 new_user, 837 new_project) 838 LOG.info("Transfer volume completed successfully.", 839 resource=volume) 840 return results 841 842 def _create_snapshot(self, context, 843 volume, name, description, 844 force=False, metadata=None, 845 cgsnapshot_id=None, 846 group_snapshot_id=None): 847 volume.assert_not_frozen() 848 snapshot = self.create_snapshot_in_db( 849 context, volume, name, 850 description, force, metadata, cgsnapshot_id, 851 True, group_snapshot_id) 852 # NOTE(tommylikehu): We only wrap the 'size' attribute here 853 # because only the volume's host is passed and only capacity is 854 # validated in the scheduler now. 855 kwargs = {'snapshot_id': snapshot.id, 856 'volume_properties': objects.VolumeProperties( 857 size=volume.size)} 858 self.scheduler_rpcapi.create_snapshot(context, volume, snapshot, 859 volume.service_topic_queue, 860 objects.RequestSpec(**kwargs)) 861 return snapshot 862 863 def create_snapshot_in_db(self, context, 864 volume, name, description, 865 force, metadata, 866 cgsnapshot_id, 867 commit_quota=True, 868 group_snapshot_id=None): 869 context.authorize(snapshot_policy.CREATE_POLICY, target_obj=volume) 870 871 utils.check_metadata_properties(metadata) 872 if not volume.host: 873 msg = _("The snapshot cannot be created because volume has " 874 "not been scheduled to any host.") 875 raise exception.InvalidVolume(reason=msg) 876 877 if volume['status'] == 'maintenance': 878 LOG.info('Unable to create the snapshot for volume, ' 879 'because it is in maintenance.', resource=volume) 880 msg = _("The snapshot cannot be created when the volume is in " 881 "maintenance mode.") 882 raise exception.InvalidVolume(reason=msg) 883 if self._is_volume_migrating(volume): 884 # Volume is migrating, wait until done 885 msg = _("Snapshot cannot be created while volume is migrating.") 886 raise exception.InvalidVolume(reason=msg) 887 888 if volume['status'].startswith('replica_'): 889 # Can't snapshot secondary replica 890 msg = _("Snapshot of secondary replica is not allowed.") 891 raise exception.InvalidVolume(reason=msg) 892 893 valid_status = ["available", "in-use"] if force else ["available"] 894 895 if volume['status'] not in valid_status: 896 msg = _("Volume %(vol_id)s status must be %(status)s, " 897 "but current status is: " 898 "%(vol_status)s.") % {'vol_id': volume['id'], 899 'status': ', '.join(valid_status), 900 'vol_status': volume['status']} 901 raise exception.InvalidVolume(reason=msg) 902 903 if commit_quota: 904 try: 905 if CONF.no_snapshot_gb_quota: 906 reserve_opts = {'snapshots': 1} 907 else: 908 reserve_opts = {'snapshots': 1, 909 'gigabytes': volume['size']} 910 QUOTAS.add_volume_type_opts(context, 911 reserve_opts, 912 volume.get('volume_type_id')) 913 reservations = QUOTAS.reserve(context, **reserve_opts) 914 except exception.OverQuota as e: 915 quota_utils.process_reserve_over_quota( 916 context, e, 917 resource='snapshots', 918 size=volume.size) 919 920 snapshot = None 921 try: 922 kwargs = { 923 'volume_id': volume['id'], 924 'cgsnapshot_id': cgsnapshot_id, 925 'group_snapshot_id': group_snapshot_id, 926 'user_id': context.user_id, 927 'project_id': context.project_id, 928 'status': fields.SnapshotStatus.CREATING, 929 'progress': '0%', 930 'volume_size': volume['size'], 931 'display_name': name, 932 'display_description': description, 933 'volume_type_id': volume['volume_type_id'], 934 'encryption_key_id': volume['encryption_key_id'], 935 'metadata': metadata or {} 936 } 937 snapshot = objects.Snapshot(context=context, **kwargs) 938 snapshot.create() 939 volume.refresh() 940 941 if volume['status'] not in valid_status: 942 msg = _("Volume %(vol_id)s status must be %(status)s , " 943 "but current status is: " 944 "%(vol_status)s.") % {'vol_id': volume['id'], 945 'status': 946 ', '.join(valid_status), 947 'vol_status': 948 volume['status']} 949 raise exception.InvalidVolume(reason=msg) 950 if commit_quota: 951 QUOTAS.commit(context, reservations) 952 except Exception: 953 with excutils.save_and_reraise_exception(): 954 try: 955 if snapshot.obj_attr_is_set('id'): 956 snapshot.destroy() 957 finally: 958 if commit_quota: 959 QUOTAS.rollback(context, reservations) 960 961 return snapshot 962 963 def create_snapshots_in_db(self, context, 964 volume_list, 965 name, description, 966 cgsnapshot_id, 967 group_snapshot_id=None): 968 snapshot_list = [] 969 for volume in volume_list: 970 self._create_snapshot_in_db_validate(context, volume) 971 972 reservations = self._create_snapshots_in_db_reserve( 973 context, volume_list) 974 975 options_list = [] 976 for volume in volume_list: 977 options = self._create_snapshot_in_db_options( 978 context, volume, name, description, cgsnapshot_id, 979 group_snapshot_id) 980 options_list.append(options) 981 982 try: 983 for options in options_list: 984 snapshot = objects.Snapshot(context=context, **options) 985 snapshot.create() 986 snapshot_list.append(snapshot) 987 988 QUOTAS.commit(context, reservations) 989 except Exception: 990 with excutils.save_and_reraise_exception(): 991 try: 992 for snap in snapshot_list: 993 snap.destroy() 994 finally: 995 QUOTAS.rollback(context, reservations) 996 997 return snapshot_list 998 999 def _create_snapshot_in_db_validate(self, context, volume): 1000 context.authorize(snapshot_policy.CREATE_POLICY, target_obj=volume) 1001 1002 if volume['status'] == 'maintenance': 1003 LOG.info('Unable to create the snapshot for volume, ' 1004 'because it is in maintenance.', resource=volume) 1005 msg = _("The snapshot cannot be created when the volume is in " 1006 "maintenance mode.") 1007 raise exception.InvalidVolume(reason=msg) 1008 if self._is_volume_migrating(volume): 1009 # Volume is migrating, wait until done 1010 msg = _("Snapshot cannot be created while volume is migrating.") 1011 raise exception.InvalidVolume(reason=msg) 1012 if volume['status'] == 'error': 1013 msg = _("The snapshot cannot be created when the volume is " 1014 "in error status.") 1015 LOG.error(msg) 1016 raise exception.InvalidVolume(reason=msg) 1017 1018 def _create_snapshots_in_db_reserve(self, context, volume_list): 1019 reserve_opts_list = [] 1020 total_reserve_opts = {} 1021 try: 1022 for volume in volume_list: 1023 if CONF.no_snapshot_gb_quota: 1024 reserve_opts = {'snapshots': 1} 1025 else: 1026 reserve_opts = {'snapshots': 1, 1027 'gigabytes': volume['size']} 1028 QUOTAS.add_volume_type_opts(context, 1029 reserve_opts, 1030 volume.get('volume_type_id')) 1031 reserve_opts_list.append(reserve_opts) 1032 1033 for reserve_opts in reserve_opts_list: 1034 for (key, value) in reserve_opts.items(): 1035 if key not in total_reserve_opts.keys(): 1036 total_reserve_opts[key] = value 1037 else: 1038 total_reserve_opts[key] = \ 1039 total_reserve_opts[key] + value 1040 reservations = QUOTAS.reserve(context, **total_reserve_opts) 1041 except exception.OverQuota as e: 1042 quota_utils.process_reserve_over_quota( 1043 context, 1044 e, 1045 resource='snapshots', 1046 size=total_reserve_opts.get('gigabytes', volume.size)) 1047 1048 return reservations 1049 1050 def _create_snapshot_in_db_options(self, context, volume, 1051 name, description, 1052 cgsnapshot_id, 1053 group_snapshot_id=None): 1054 options = {'volume_id': volume['id'], 1055 'cgsnapshot_id': cgsnapshot_id, 1056 'group_snapshot_id': group_snapshot_id, 1057 'user_id': context.user_id, 1058 'project_id': context.project_id, 1059 'status': fields.SnapshotStatus.CREATING, 1060 'progress': '0%', 1061 'volume_size': volume['size'], 1062 'display_name': name, 1063 'display_description': description, 1064 'volume_type_id': volume['volume_type_id'], 1065 'encryption_key_id': volume['encryption_key_id']} 1066 return options 1067 1068 def create_snapshot(self, context, 1069 volume, name, description, 1070 metadata=None, cgsnapshot_id=None, 1071 group_snapshot_id=None): 1072 result = self._create_snapshot(context, volume, name, description, 1073 False, metadata, cgsnapshot_id, 1074 group_snapshot_id) 1075 LOG.info("Snapshot create request issued successfully.", 1076 resource=result) 1077 return result 1078 1079 def create_snapshot_force(self, context, 1080 volume, name, 1081 description, metadata=None): 1082 result = self._create_snapshot(context, volume, name, description, 1083 True, metadata) 1084 LOG.info("Snapshot force create request issued successfully.", 1085 resource=result) 1086 return result 1087 1088 def delete_snapshot(self, context, snapshot, force=False, 1089 unmanage_only=False): 1090 context.authorize(snapshot_policy.DELETE_POLICY, 1091 target_obj=snapshot) 1092 if not unmanage_only: 1093 snapshot.assert_not_frozen() 1094 1095 # Build required conditions for conditional update 1096 expected = {'cgsnapshot_id': None, 1097 'group_snapshot_id': None} 1098 # If not force deleting we have status conditions 1099 if not force: 1100 expected['status'] = (fields.SnapshotStatus.AVAILABLE, 1101 fields.SnapshotStatus.ERROR) 1102 1103 values = {'status': fields.SnapshotStatus.DELETING} 1104 if unmanage_only is True: 1105 values['status'] = fields.SnapshotStatus.UNMANAGING 1106 result = snapshot.conditional_update(values, expected) 1107 if not result: 1108 status = utils.build_or_str(expected.get('status'), 1109 _('status must be %s and')) 1110 msg = (_('Snapshot %s must not be part of a group.') % 1111 status) 1112 LOG.error(msg) 1113 raise exception.InvalidSnapshot(reason=msg) 1114 1115 self.volume_rpcapi.delete_snapshot(context, snapshot, unmanage_only) 1116 LOG.info("Snapshot delete request issued successfully.", 1117 resource=snapshot) 1118 1119 def update_snapshot(self, context, snapshot, fields): 1120 context.authorize(snapshot_policy.UPDATE_POLICY, 1121 target_obj=snapshot) 1122 snapshot.update(fields) 1123 snapshot.save() 1124 1125 def get_volume_metadata(self, context, volume): 1126 """Get all metadata associated with a volume.""" 1127 context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume) 1128 rv = self.db.volume_metadata_get(context, volume['id']) 1129 LOG.info("Get volume metadata completed successfully.", 1130 resource=volume) 1131 return dict(rv) 1132 1133 def create_volume_metadata(self, context, volume, metadata): 1134 """Creates volume metadata.""" 1135 context.authorize(vol_meta_policy.CREATE_POLICY, target_obj=volume) 1136 db_meta = self._update_volume_metadata(context, volume, metadata) 1137 1138 LOG.info("Create volume metadata completed successfully.", 1139 resource=volume) 1140 return db_meta 1141 1142 def delete_volume_metadata(self, context, volume, 1143 key, meta_type=common.METADATA_TYPES.user): 1144 """Delete the given metadata item from a volume.""" 1145 context.authorize(vol_meta_policy.DELETE_POLICY, target_obj=volume) 1146 if volume.status in ('maintenance', 'uploading'): 1147 msg = _('Deleting volume metadata is not allowed for volumes in ' 1148 '%s status.') % volume.status 1149 LOG.info(msg, resource=volume) 1150 raise exception.InvalidVolume(reason=msg) 1151 self.db.volume_metadata_delete(context, volume.id, key, meta_type) 1152 LOG.info("Delete volume metadata completed successfully.", 1153 resource=volume) 1154 1155 def _update_volume_metadata(self, context, volume, metadata, delete=False, 1156 meta_type=common.METADATA_TYPES.user): 1157 if volume['status'] in ('maintenance', 'uploading'): 1158 msg = _('Updating volume metadata is not allowed for volumes in ' 1159 '%s status.') % volume['status'] 1160 LOG.info(msg, resource=volume) 1161 raise exception.InvalidVolume(reason=msg) 1162 utils.check_metadata_properties(metadata) 1163 return self.db.volume_metadata_update(context, volume['id'], 1164 metadata, delete, meta_type) 1165 1166 def update_volume_metadata(self, context, volume, metadata, delete=False, 1167 meta_type=common.METADATA_TYPES.user): 1168 """Updates volume metadata. 1169 1170 If delete is True, metadata items that are not specified in the 1171 `metadata` argument will be deleted. 1172 1173 """ 1174 context.authorize(vol_meta_policy.UPDATE_POLICY, target_obj=volume) 1175 db_meta = self._update_volume_metadata(context, volume, metadata, 1176 delete, meta_type) 1177 1178 # TODO(jdg): Implement an RPC call for drivers that may use this info 1179 1180 LOG.info("Update volume metadata completed successfully.", 1181 resource=volume) 1182 return db_meta 1183 1184 def get_volume_admin_metadata(self, context, volume): 1185 """Get all administration metadata associated with a volume.""" 1186 rv = self.db.volume_admin_metadata_get(context, volume['id']) 1187 LOG.info("Get volume admin metadata completed successfully.", 1188 resource=volume) 1189 return dict(rv) 1190 1191 def update_volume_admin_metadata(self, context, volume, metadata, 1192 delete=False, add=True, update=True): 1193 """Updates or creates volume administration metadata. 1194 1195 If delete is True, metadata items that are not specified in the 1196 `metadata` argument will be deleted. 1197 1198 """ 1199 context.authorize(vol_meta_policy.UPDATE_ADMIN_METADATA_POLICY, 1200 target_obj=volume) 1201 utils.check_metadata_properties(metadata) 1202 db_meta = self.db.volume_admin_metadata_update(context, volume.id, 1203 metadata, delete, add, 1204 update) 1205 1206 # TODO(jdg): Implement an RPC call for drivers that may use this info 1207 1208 LOG.info("Update volume admin metadata completed successfully.", 1209 resource=volume) 1210 return db_meta 1211 1212 def get_snapshot_metadata(self, context, snapshot): 1213 """Get all metadata associated with a snapshot.""" 1214 context.authorize(s_meta_policy.GET_POLICY, 1215 target_obj=snapshot) 1216 LOG.info("Get snapshot metadata completed successfully.", 1217 resource=snapshot) 1218 return snapshot.metadata 1219 1220 def delete_snapshot_metadata(self, context, snapshot, key): 1221 """Delete the given metadata item from a snapshot.""" 1222 context.authorize(s_meta_policy.DELETE_POLICY, 1223 target_obj=snapshot) 1224 snapshot.delete_metadata_key(context, key) 1225 LOG.info("Delete snapshot metadata completed successfully.", 1226 resource=snapshot) 1227 1228 def update_snapshot_metadata(self, context, 1229 snapshot, metadata, 1230 delete=False): 1231 """Updates or creates snapshot metadata. 1232 1233 If delete is True, metadata items that are not specified in the 1234 `metadata` argument will be deleted. 1235 1236 """ 1237 context.authorize(s_meta_policy.UPDATE_POLICY, 1238 target_obj=snapshot) 1239 if delete: 1240 _metadata = metadata 1241 else: 1242 orig_meta = snapshot.metadata 1243 _metadata = orig_meta.copy() 1244 _metadata.update(metadata) 1245 1246 utils.check_metadata_properties(_metadata) 1247 1248 snapshot.metadata = _metadata 1249 snapshot.save() 1250 1251 # TODO(jdg): Implement an RPC call for drivers that may use this info 1252 1253 LOG.info("Update snapshot metadata completed successfully.", 1254 resource=snapshot) 1255 return snapshot.metadata 1256 1257 def get_snapshot_metadata_value(self, snapshot, key): 1258 LOG.info("Get snapshot metadata value not implemented.", 1259 resource=snapshot) 1260 # FIXME(jdg): Huh? Pass? 1261 pass 1262 1263 def get_volumes_image_metadata(self, context): 1264 context.authorize(vol_meta_policy.GET_POLICY) 1265 db_data = self.db.volume_glance_metadata_get_all(context) 1266 results = collections.defaultdict(dict) 1267 for meta_entry in db_data: 1268 results[meta_entry['volume_id']].update({meta_entry['key']: 1269 meta_entry['value']}) 1270 return results 1271 1272 def get_volume_image_metadata(self, context, volume): 1273 context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume) 1274 db_data = self.db.volume_glance_metadata_get(context, volume['id']) 1275 LOG.info("Get volume image-metadata completed successfully.", 1276 resource=volume) 1277 return {meta_entry.key: meta_entry.value for meta_entry in db_data} 1278 1279 def get_list_volumes_image_metadata(self, context, volume_id_list): 1280 db_data = self.db.volume_glance_metadata_list_get(context, 1281 volume_id_list) 1282 results = collections.defaultdict(dict) 1283 for meta_entry in db_data: 1284 results[meta_entry['volume_id']].update({meta_entry['key']: 1285 meta_entry['value']}) 1286 return results 1287 1288 def copy_volume_to_image(self, context, volume, metadata, force): 1289 """Create a new image from the specified volume.""" 1290 if not CONF.enable_force_upload and force: 1291 LOG.info("Force upload to image is disabled, " 1292 "Force option will be ignored.", 1293 resource={'type': 'volume', 'id': volume['id']}) 1294 force = False 1295 1296 # Build required conditions for conditional update 1297 expected = {'status': ('available', 'in-use') if force 1298 else 'available'} 1299 values = {'status': 'uploading', 1300 'previous_status': volume.model.status} 1301 1302 result = volume.conditional_update(values, expected) 1303 if not result: 1304 msg = (_('Volume %(vol_id)s status must be %(statuses)s') % 1305 {'vol_id': volume.id, 1306 'statuses': utils.build_or_str(expected['status'])}) 1307 raise exception.InvalidVolume(reason=msg) 1308 1309 try: 1310 glance_core_props = CONF.glance_core_properties 1311 if glance_core_props: 1312 try: 1313 vol_img_metadata = self.get_volume_image_metadata( 1314 context, volume) 1315 custom_property_set = ( 1316 set(vol_img_metadata).difference(glance_core_props)) 1317 if custom_property_set: 1318 metadata['properties'] = { 1319 custom_prop: vol_img_metadata[custom_prop] 1320 for custom_prop in custom_property_set} 1321 except exception.GlanceMetadataNotFound: 1322 # If volume is not created from image, No glance metadata 1323 # would be available for that volume in 1324 # volume glance metadata table 1325 pass 1326 1327 recv_metadata = self.image_service.create( 1328 context, self.image_service._translate_to_glance(metadata)) 1329 except Exception: 1330 # NOTE(geguileo): To mimic behavior before conditional_update we 1331 # will rollback status if image create fails 1332 with excutils.save_and_reraise_exception(): 1333 volume.conditional_update( 1334 {'status': volume.model.previous_status, 1335 'previous_status': None}, 1336 {'status': 'uploading'}) 1337 1338 self.volume_rpcapi.copy_volume_to_image(context, 1339 volume, 1340 recv_metadata) 1341 1342 response = {"id": volume['id'], 1343 "updated_at": volume['updated_at'], 1344 "status": 'uploading', 1345 "display_description": volume['display_description'], 1346 "size": volume['size'], 1347 "volume_type": volume['volume_type'], 1348 "image_id": recv_metadata['id'], 1349 "container_format": recv_metadata['container_format'], 1350 "disk_format": recv_metadata['disk_format'], 1351 "image_name": recv_metadata.get('name', None)} 1352 if 'protected' in recv_metadata: 1353 response['protected'] = recv_metadata.get('protected') 1354 if 'is_public' in recv_metadata: 1355 response['is_public'] = recv_metadata.get('is_public') 1356 elif 'visibility' in recv_metadata: 1357 response['visibility'] = recv_metadata.get('visibility') 1358 LOG.info("Copy volume to image completed successfully.", 1359 resource=volume) 1360 return response 1361 1362 def _extend(self, context, volume, new_size, attached=False): 1363 value = {'status': 'extending', 1364 'previous_status': volume.status} 1365 if attached: 1366 expected = {'status': 'in-use'} 1367 else: 1368 expected = {'status': 'available'} 1369 orig_status = {'status': volume.status} 1370 1371 def _roll_back_status(): 1372 status = orig_status['status'] 1373 msg = _('Could not return volume %(id)s to %(status)s.') 1374 try: 1375 if not volume.conditional_update(orig_status, value): 1376 LOG.error(msg, {'id': volume.id, 'status': status}) 1377 except Exception: 1378 LOG.exception(msg, {'id': volume.id, 'status': status}) 1379 1380 size_increase = (int(new_size)) - volume.size 1381 if size_increase <= 0: 1382 msg = (_("New size for extend must be greater " 1383 "than current size. (current: %(size)s, " 1384 "extended: %(new_size)s).") % {'new_size': new_size, 1385 'size': volume.size}) 1386 raise exception.InvalidInput(reason=msg) 1387 1388 result = volume.conditional_update(value, expected) 1389 if not result: 1390 msg = (_("Volume %(vol_id)s status must be '%(expected)s' " 1391 "to extend, currently %(status)s.") 1392 % {'vol_id': volume.id, 1393 'status': volume.status, 1394 'expected': six.text_type(expected)}) 1395 raise exception.InvalidVolume(reason=msg) 1396 1397 rollback = True 1398 try: 1399 values = {'per_volume_gigabytes': new_size} 1400 QUOTAS.limit_check(context, project_id=context.project_id, 1401 **values) 1402 rollback = False 1403 except exception.OverQuota as e: 1404 quotas = e.kwargs['quotas'] 1405 raise exception.VolumeSizeExceedsLimit( 1406 size=new_size, limit=quotas['per_volume_gigabytes']) 1407 finally: 1408 # NOTE(geguileo): To mimic behavior before conditional_update we 1409 # will rollback status on quota reservation failure regardless of 1410 # the exception that caused the failure. 1411 if rollback: 1412 _roll_back_status() 1413 1414 try: 1415 reservations = None 1416 reserve_opts = {'gigabytes': size_increase} 1417 QUOTAS.add_volume_type_opts(context, reserve_opts, 1418 volume.volume_type_id) 1419 reservations = QUOTAS.reserve(context, 1420 project_id=volume.project_id, 1421 **reserve_opts) 1422 except exception.OverQuota as exc: 1423 gigabytes = exc.kwargs['usages']['gigabytes'] 1424 gb_quotas = exc.kwargs['quotas']['gigabytes'] 1425 1426 consumed = gigabytes['reserved'] + gigabytes['in_use'] 1427 LOG.error("Quota exceeded for %(s_pid)s, tried to extend volume " 1428 "by %(s_size)sG, (%(d_consumed)dG of %(d_quota)dG " 1429 "already consumed).", 1430 {'s_pid': context.project_id, 1431 's_size': size_increase, 1432 'd_consumed': consumed, 1433 'd_quota': gb_quotas}) 1434 raise exception.VolumeSizeExceedsAvailableQuota( 1435 requested=size_increase, consumed=consumed, quota=gb_quotas) 1436 finally: 1437 # NOTE(geguileo): To mimic behavior before conditional_update we 1438 # will rollback status on quota reservation failure regardless of 1439 # the exception that caused the failure. 1440 if reservations is None: 1441 _roll_back_status() 1442 1443 volume_type = {} 1444 if volume.volume_type_id: 1445 volume_type = volume_types.get_volume_type(context.elevated(), 1446 volume.volume_type_id) 1447 1448 request_spec = { 1449 'volume_properties': volume, 1450 'volume_type': volume_type, 1451 'volume_id': volume.id 1452 } 1453 1454 self.scheduler_rpcapi.extend_volume(context, volume, new_size, 1455 reservations, request_spec) 1456 1457 LOG.info("Extend volume request issued successfully.", 1458 resource=volume) 1459 1460 def extend(self, context, volume, new_size): 1461 context.authorize(vol_action_policy.EXTEND_POLICY, 1462 target_obj=volume) 1463 self._extend(context, volume, new_size, attached=False) 1464 1465 # NOTE(tommylikehu): New method is added here so that administrator 1466 # can enable/disable this ability by editing the policy file if the 1467 # cloud environment doesn't allow this operation. 1468 def extend_attached_volume(self, context, volume, new_size): 1469 context.authorize(vol_action_policy.EXTEND_ATTACHED_POLICY, 1470 target_obj=volume) 1471 self._extend(context, volume, new_size, attached=True) 1472 1473 def migrate_volume(self, context, volume, host, cluster_name, force_copy, 1474 lock_volume): 1475 """Migrate the volume to the specified host or cluster.""" 1476 elevated = context.elevated() 1477 context.authorize(vol_action_policy.MIGRATE_POLICY, 1478 target_obj=volume) 1479 1480 # If we received a request to migrate to a host 1481 # Look for the service - must be up and enabled 1482 svc_host = host and volume_utils.extract_host(host, 'backend') 1483 svc_cluster = cluster_name and volume_utils.extract_host(cluster_name, 1484 'backend') 1485 # NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get 1486 # a service from the DB we are getting either one specific service from 1487 # a host or any service from a cluster that is up, which means that the 1488 # cluster itself is also up. 1489 try: 1490 svc = objects.Service.get_by_id(elevated, None, is_up=True, 1491 topic=constants.VOLUME_TOPIC, 1492 host=svc_host, disabled=False, 1493 cluster_name=svc_cluster, 1494 backend_match_level='pool') 1495 except exception.ServiceNotFound: 1496 msg = _("No available service named '%s'") % (cluster_name or host) 1497 LOG.error(msg) 1498 raise exception.InvalidHost(reason=msg) 1499 # Even if we were requested to do a migration to a host, if the host is 1500 # in a cluster we will do a cluster migration. 1501 cluster_name = svc.cluster_name 1502 1503 # Build required conditions for conditional update 1504 expected = {'status': ('available', 'in-use'), 1505 'migration_status': self.AVAILABLE_MIGRATION_STATUS, 1506 'replication_status': ( 1507 None, 1508 fields.ReplicationStatus.DISABLED, 1509 fields.ReplicationStatus.NOT_CAPABLE), 1510 'consistencygroup_id': (None, ''), 1511 'group_id': (None, '')} 1512 1513 # We want to make sure that the migration is to another host or 1514 # another cluster. 1515 if cluster_name: 1516 expected['cluster_name'] = db.Not(cluster_name) 1517 else: 1518 expected['host'] = db.Not(host) 1519 1520 filters = [~db.volume_has_snapshots_filter()] 1521 1522 updates = {'migration_status': 'starting', 1523 'previous_status': volume.model.status} 1524 1525 # When the migration of an available volume starts, both the status 1526 # and the migration status of the volume will be changed. 1527 # If the admin sets lock_volume flag to True, the volume 1528 # status is changed to 'maintenance', telling users 1529 # that this volume is in maintenance mode, and no action is allowed 1530 # on this volume, e.g. attach, detach, retype, migrate, etc. 1531 if lock_volume: 1532 updates['status'] = db.Case( 1533 [(volume.model.status == 'available', 'maintenance')], 1534 else_=volume.model.status) 1535 1536 result = volume.conditional_update(updates, expected, filters) 1537 1538 if not result: 1539 msg = _('Volume %s status must be available or in-use, must not ' 1540 'be migrating, have snapshots, be replicated, be part of ' 1541 'a group and destination host/cluster must be different ' 1542 'than the current one') % volume.id 1543 LOG.error(msg) 1544 raise exception.InvalidVolume(reason=msg) 1545 1546 # Call the scheduler to ensure that the host exists and that it can 1547 # accept the volume 1548 volume_type = {} 1549 if volume.volume_type_id: 1550 volume_type = volume_types.get_volume_type(context.elevated(), 1551 volume.volume_type_id) 1552 request_spec = {'volume_properties': volume, 1553 'volume_type': volume_type, 1554 'volume_id': volume.id} 1555 self.scheduler_rpcapi.migrate_volume(context, 1556 volume, 1557 cluster_name or host, 1558 force_copy, 1559 request_spec) 1560 LOG.info("Migrate volume request issued successfully.", 1561 resource=volume) 1562 1563 def migrate_volume_completion(self, context, volume, new_volume, error): 1564 context.authorize(vol_action_policy.MIGRATE_COMPLETE_POLICY, 1565 target_obj=volume) 1566 if not (volume.migration_status or new_volume.migration_status): 1567 # When we're not migrating and haven't hit any errors, we issue 1568 # volume attach and detach requests so the volumes don't end in 1569 # 'attaching' and 'detaching' state 1570 if not error: 1571 attachments = volume.volume_attachment 1572 for attachment in attachments: 1573 self.detach(context, volume, attachment.id) 1574 1575 self.attach(context, new_volume, 1576 attachment.instance_uuid, 1577 attachment.attached_host, 1578 attachment.mountpoint, 1579 'rw') 1580 1581 return new_volume.id 1582 1583 if not volume.migration_status: 1584 msg = _('Source volume not mid-migration.') 1585 raise exception.InvalidVolume(reason=msg) 1586 1587 if not new_volume.migration_status: 1588 msg = _('Destination volume not mid-migration.') 1589 raise exception.InvalidVolume(reason=msg) 1590 1591 expected_status = 'target:%s' % volume.id 1592 if not new_volume.migration_status == expected_status: 1593 msg = (_('Destination has migration_status %(stat)s, expected ' 1594 '%(exp)s.') % {'stat': new_volume.migration_status, 1595 'exp': expected_status}) 1596 raise exception.InvalidVolume(reason=msg) 1597 1598 LOG.info("Migrate volume completion issued successfully.", 1599 resource=volume) 1600 return self.volume_rpcapi.migrate_volume_completion(context, volume, 1601 new_volume, error) 1602 1603 def update_readonly_flag(self, context, volume, flag): 1604 context.authorize(vol_action_policy.UPDATE_READONLY_POLICY, 1605 target_obj=volume) 1606 if volume['status'] != 'available': 1607 msg = _('Volume %(vol_id)s status must be available ' 1608 'to update readonly flag, but current status is: ' 1609 '%(vol_status)s.') % {'vol_id': volume['id'], 1610 'vol_status': volume['status']} 1611 raise exception.InvalidVolume(reason=msg) 1612 self.update_volume_admin_metadata(context.elevated(), volume, 1613 {'readonly': six.text_type(flag)}) 1614 LOG.info("Update readonly setting on volume " 1615 "completed successfully.", 1616 resource=volume) 1617 1618 def retype(self, context, volume, new_type, migration_policy=None): 1619 """Attempt to modify the type associated with an existing volume.""" 1620 context.authorize(vol_action_policy.RETYPE_POLICY, target_obj=volume) 1621 if migration_policy and migration_policy not in ('on-demand', 'never'): 1622 msg = _('migration_policy must be \'on-demand\' or \'never\', ' 1623 'passed: %s') % new_type 1624 LOG.error(msg) 1625 raise exception.InvalidInput(reason=msg) 1626 1627 # Support specifying volume type by ID or name 1628 try: 1629 new_type = ( 1630 volume_type.VolumeType.get_by_name_or_id(context.elevated(), 1631 new_type)) 1632 except exception.InvalidVolumeType: 1633 msg = _('Invalid volume_type passed: %s.') % new_type 1634 LOG.error(msg) 1635 raise exception.InvalidInput(reason=msg) 1636 1637 new_type_id = new_type['id'] 1638 1639 # NOTE(jdg): We check here if multiattach is involved in either side 1640 # of the retype, we can't change multiattach on an in-use volume 1641 # because there's things the hypervisor needs when attaching, so 1642 # we just disallow retype of in-use volumes in this case. You still 1643 # have to get through scheduling if all the conditions are met, we 1644 # should consider an up front capabilities check to give fast feedback 1645 # rather than "No hosts found" and error status 1646 src_is_multiattach = volume.multiattach 1647 tgt_is_multiattach = False 1648 1649 if new_type: 1650 tgt_is_multiattach = self._is_multiattach(new_type) 1651 1652 if src_is_multiattach != tgt_is_multiattach: 1653 if volume.status != "available": 1654 msg = _('Invalid volume_type passed, retypes affecting ' 1655 'multiattach are only allowed on available volumes, ' 1656 'the specified volume however currently has a status ' 1657 'of: %s.') % volume.status 1658 LOG.info(msg) 1659 raise exception.InvalidInput(reason=msg) 1660 1661 # If they are retyping to a multiattach capable, make sure they 1662 # are allowed to do so. 1663 if tgt_is_multiattach: 1664 context.authorize(vol_policy.MULTIATTACH_POLICY, 1665 target_obj=volume) 1666 1667 if tgt_is_multiattach and self._is_encrypted(new_type): 1668 msg = ('Retype requested both encryption and multi-attach, ' 1669 'which is not supported.') 1670 raise exception.InvalidInput(reason=msg) 1671 1672 # We're checking here in so that we can report any quota issues as 1673 # early as possible, but won't commit until we change the type. We 1674 # pass the reservations onward in case we need to roll back. 1675 reservations = quota_utils.get_volume_type_reservation( 1676 context, volume, new_type_id, reserve_vol_type_only=True) 1677 1678 # Get old reservations 1679 try: 1680 reserve_opts = {'volumes': -1, 'gigabytes': -volume.size} 1681 QUOTAS.add_volume_type_opts(context, 1682 reserve_opts, 1683 volume.volume_type_id) 1684 # NOTE(wanghao): We don't need to reserve volumes and gigabytes 1685 # quota for retyping operation since they didn't changed, just 1686 # reserve volume_type and type gigabytes is fine. 1687 reserve_opts.pop('volumes') 1688 reserve_opts.pop('gigabytes') 1689 old_reservations = QUOTAS.reserve(context, 1690 project_id=volume.project_id, 1691 **reserve_opts) 1692 except Exception: 1693 volume.status = volume.previous_status 1694 volume.save() 1695 msg = _("Failed to update quota usage while retyping volume.") 1696 LOG.exception(msg, resource=volume) 1697 raise exception.CinderException(msg) 1698 1699 # Build required conditions for conditional update 1700 expected = {'status': ('available', 'in-use'), 1701 'migration_status': self.AVAILABLE_MIGRATION_STATUS, 1702 'consistencygroup_id': (None, ''), 1703 'group_id': (None, ''), 1704 'volume_type_id': db.Not(new_type_id)} 1705 1706 # We don't support changing QoS at the front-end yet for in-use volumes 1707 # TODO(avishay): Call Nova to change QoS setting (libvirt has support 1708 # - virDomainSetBlockIoTune() - Nova does not have support yet). 1709 filters = [db.volume_qos_allows_retype(new_type_id)] 1710 1711 updates = {'status': 'retyping', 1712 'previous_status': objects.Volume.model.status} 1713 1714 if not volume.conditional_update(updates, expected, filters): 1715 msg = _('Retype needs volume to be in available or in-use state, ' 1716 'not be part of an active migration or a consistency ' 1717 'group, requested type has to be different that the ' 1718 'one from the volume, and for in-use volumes front-end ' 1719 'qos specs cannot change.') 1720 LOG.error(msg) 1721 QUOTAS.rollback(context, reservations + old_reservations, 1722 project_id=volume.project_id) 1723 raise exception.InvalidVolume(reason=msg) 1724 1725 request_spec = {'volume_properties': volume, 1726 'volume_id': volume.id, 1727 'volume_type': new_type, 1728 'migration_policy': migration_policy, 1729 'quota_reservations': reservations, 1730 'old_reservations': old_reservations} 1731 1732 self.scheduler_rpcapi.retype(context, volume, 1733 request_spec=request_spec, 1734 filter_properties={}) 1735 volume.multiattach = tgt_is_multiattach 1736 volume.save() 1737 LOG.info("Retype volume request issued successfully.", 1738 resource=volume) 1739 1740 def _get_service_by_host_cluster(self, context, host, cluster_name, 1741 resource='volume'): 1742 elevated = context.elevated() 1743 1744 svc_cluster = cluster_name and volume_utils.extract_host(cluster_name, 1745 'backend') 1746 svc_host = host and volume_utils.extract_host(host, 'backend') 1747 1748 # NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get 1749 # a service from the DB we are getting either one specific service from 1750 # a host or any service that is up from a cluster, which means that the 1751 # cluster itself is also up. 1752 try: 1753 service = objects.Service.get_by_id(elevated, None, host=svc_host, 1754 binary=constants.VOLUME_BINARY, 1755 cluster_name=svc_cluster) 1756 except exception.ServiceNotFound: 1757 with excutils.save_and_reraise_exception(): 1758 LOG.error('Unable to find service: %(service)s for ' 1759 'given host: %(host)s and cluster %(cluster)s.', 1760 {'service': constants.VOLUME_BINARY, 'host': host, 1761 'cluster': cluster_name}) 1762 1763 if service.disabled and (not service.cluster_name or 1764 service.cluster.disabled): 1765 LOG.error('Unable to manage existing %s on a disabled ' 1766 'service.', resource) 1767 raise exception.ServiceUnavailable() 1768 1769 if not service.is_up: 1770 LOG.error('Unable to manage existing %s on a service that is ' 1771 'down.', resource) 1772 raise exception.ServiceUnavailable() 1773 1774 return service 1775 1776 def manage_existing(self, context, host, cluster_name, ref, name=None, 1777 description=None, volume_type=None, metadata=None, 1778 availability_zone=None, bootable=False): 1779 1780 if 'source-name' in ref: 1781 vol_id = volume_utils.extract_id_from_volume_name( 1782 ref['source-name']) 1783 if vol_id and volume_utils.check_already_managed_volume(vol_id): 1784 raise exception.InvalidVolume( 1785 _("Unable to manage existing volume." 1786 " The volume is already managed")) 1787 1788 if volume_type and 'extra_specs' not in volume_type: 1789 extra_specs = volume_types.get_volume_type_extra_specs( 1790 volume_type['id']) 1791 volume_type['extra_specs'] = extra_specs 1792 1793 service = self._get_service_by_host_cluster(context, host, 1794 cluster_name) 1795 1796 if availability_zone is None: 1797 availability_zone = service.availability_zone 1798 1799 if not cluster_name and bool(volume_utils.extract_host(host, 'pool')): 1800 manage_host = host 1801 else: 1802 manage_host = service.host 1803 1804 manage_what = { 1805 'context': context, 1806 'name': name, 1807 'description': description, 1808 'host': manage_host, 1809 'cluster_name': service.cluster_name, 1810 'ref': ref, 1811 'volume_type': volume_type, 1812 'metadata': metadata, 1813 'availability_zone': availability_zone, 1814 'bootable': bootable, 1815 'size': 0, 1816 'group_snapshot': None, 1817 'optional_args': {'is_quota_committed': False}, 1818 'volume_type_id': None if not volume_type else volume_type['id'], 1819 } 1820 1821 try: 1822 flow_engine = manage_existing.get_flow(self.scheduler_rpcapi, 1823 self.db, 1824 manage_what) 1825 except Exception: 1826 msg = _('Failed to manage api volume flow.') 1827 LOG.exception(msg) 1828 raise exception.CinderException(msg) 1829 1830 # Attaching this listener will capture all of the notifications that 1831 # taskflow sends out and redirect them to a more useful log for 1832 # cinder's debugging (or error reporting) usage. 1833 with flow_utils.DynamicLogListener(flow_engine, logger=LOG): 1834 flow_engine.run() 1835 vol_ref = flow_engine.storage.fetch('volume') 1836 LOG.info("Manage volume request issued successfully.", 1837 resource=vol_ref) 1838 return vol_ref 1839 1840 def get_manageable_volumes(self, context, host, cluster_name, marker=None, 1841 limit=None, offset=None, sort_keys=None, 1842 sort_dirs=None): 1843 svc = self._get_service_by_host_cluster(context, host, cluster_name) 1844 return self.volume_rpcapi.get_manageable_volumes(context, svc, 1845 marker, limit, 1846 offset, sort_keys, 1847 sort_dirs) 1848 1849 def manage_existing_snapshot(self, context, ref, volume, 1850 name=None, description=None, 1851 metadata=None): 1852 service = self._get_service_by_host_cluster(context, volume.host, 1853 volume.cluster_name, 1854 'snapshot') 1855 1856 snapshot_object = self.create_snapshot_in_db(context, volume, name, 1857 description, True, 1858 metadata, None, 1859 commit_quota=True) 1860 self.volume_rpcapi.manage_existing_snapshot( 1861 context, snapshot_object, ref, service.service_topic_queue) 1862 return snapshot_object 1863 1864 def get_manageable_snapshots(self, context, host, cluster_name, 1865 marker=None, limit=None, offset=None, 1866 sort_keys=None, sort_dirs=None): 1867 svc = self._get_service_by_host_cluster(context, host, cluster_name, 1868 'snapshot') 1869 return self.volume_rpcapi.get_manageable_snapshots(context, svc, 1870 marker, limit, 1871 offset, sort_keys, 1872 sort_dirs) 1873 1874 def _get_cluster_and_services_for_replication(self, ctxt, host, 1875 cluster_name): 1876 services = objects.ServiceList.get_all( 1877 ctxt, filters={'host': host, 'cluster_name': cluster_name, 1878 'binary': constants.VOLUME_BINARY}) 1879 1880 if not services: 1881 if host: 1882 msg = _("No service found with host=%s") % host 1883 else: 1884 msg = _("No service found with cluster=%s") % cluster_name 1885 1886 raise exception.ServiceNotFound(msg) 1887 1888 cluster = services[0].cluster 1889 # Check that the host or cluster we received only results in 1 host or 1890 # hosts from the same cluster. 1891 if cluster_name: 1892 check_attribute = 'cluster_name' 1893 expected = cluster.name 1894 else: 1895 check_attribute = 'host' 1896 expected = services[0].host 1897 if any(getattr(s, check_attribute) != expected for s in services): 1898 msg = _('Services from different clusters found.') 1899 raise exception.InvalidParameterValue(msg) 1900 1901 # If we received host parameter but host belongs to a cluster we have 1902 # to change all the services in the cluster, not just one host 1903 if host and cluster: 1904 services = cluster.services 1905 1906 return cluster, services 1907 1908 def _replication_db_change(self, ctxt, field, expected_value, new_value, 1909 host, cluster_name, check_up=False): 1910 def _error_msg(service): 1911 expected = utils.build_or_str(six.text_type(expected_value)) 1912 up_msg = 'and must be up ' if check_up else '' 1913 msg = (_('%(field)s in %(service)s must be %(expected)s ' 1914 '%(up_msg)sto failover.') 1915 % {'field': field, 'service': service, 1916 'expected': expected, 'up_msg': up_msg}) 1917 LOG.error(msg) 1918 return msg 1919 1920 cluster, services = self._get_cluster_and_services_for_replication( 1921 ctxt, host, cluster_name) 1922 1923 expect = {field: expected_value} 1924 change = {field: new_value} 1925 1926 if cluster: 1927 old_value = getattr(cluster, field) 1928 if ((check_up and not cluster.is_up) 1929 or not cluster.conditional_update(change, expect)): 1930 msg = _error_msg(cluster.name) 1931 raise exception.InvalidInput(reason=msg) 1932 1933 changed = [] 1934 not_changed = [] 1935 for service in services: 1936 if ((not check_up or service.is_up) 1937 and service.conditional_update(change, expect)): 1938 changed.append(service) 1939 else: 1940 not_changed.append(service) 1941 1942 # If there were some services that couldn't be changed we should at 1943 # least log the error. 1944 if not_changed: 1945 msg = _error_msg([s.host for s in not_changed]) 1946 # If we couldn't change any of the services 1947 if not changed: 1948 # Undo the cluster change 1949 if cluster: 1950 setattr(cluster, field, old_value) 1951 cluster.save() 1952 raise exception.InvalidInput( 1953 reason=_('No service could be changed: %s') % msg) 1954 LOG.warning('Some services could not be changed: %s', msg) 1955 1956 return cluster, services 1957 1958 def failover(self, ctxt, host, cluster_name, secondary_id=None): 1959 ctxt.authorize(svr_policy.FAILOVER_POLICY) 1960 ctxt = ctxt if ctxt.is_admin else ctxt.elevated() 1961 1962 # TODO(geguileo): In P - Remove this version check 1963 rpc_version = self.volume_rpcapi.determine_rpc_version_cap() 1964 rpc_version = versionutils.convert_version_to_tuple(rpc_version) 1965 if cluster_name and rpc_version < (3, 5): 1966 msg = _('replication operations with cluster field') 1967 raise exception.UnavailableDuringUpgrade(action=msg) 1968 1969 rep_fields = fields.ReplicationStatus 1970 expected_values = [rep_fields.ENABLED, rep_fields.FAILED_OVER] 1971 new_value = rep_fields.FAILING_OVER 1972 1973 cluster, services = self._replication_db_change( 1974 ctxt, 'replication_status', expected_values, new_value, host, 1975 cluster_name, check_up=True) 1976 1977 self.volume_rpcapi.failover(ctxt, services[0], secondary_id) 1978 1979 def freeze_host(self, ctxt, host, cluster_name): 1980 ctxt.authorize(svr_policy.FREEZE_POLICY) 1981 ctxt = ctxt if ctxt.is_admin else ctxt.elevated() 1982 1983 expected = False 1984 new_value = True 1985 cluster, services = self._replication_db_change( 1986 ctxt, 'frozen', expected, new_value, host, cluster_name, 1987 check_up=False) 1988 1989 # Should we set service status to disabled to keep 1990 # scheduler calls from being sent? Just use existing 1991 # `cinder service-disable reason=freeze` 1992 self.volume_rpcapi.freeze_host(ctxt, services[0]) 1993 1994 def thaw_host(self, ctxt, host, cluster_name): 1995 ctxt.authorize(svr_policy.THAW_POLICY) 1996 ctxt = ctxt if ctxt.is_admin else ctxt.elevated() 1997 1998 expected = True 1999 new_value = False 2000 cluster, services = self._replication_db_change( 2001 ctxt, 'frozen', expected, new_value, host, cluster_name, 2002 check_up=False) 2003 2004 if not self.volume_rpcapi.thaw_host(ctxt, services[0]): 2005 return "Backend reported error during thaw_host operation." 2006 2007 def check_volume_filters(self, filters, strict=False): 2008 """Sets the user filter value to accepted format""" 2009 booleans = self.db.get_booleans_for_table('volume') 2010 2011 # To translate any true/false equivalent to True/False 2012 # which is only acceptable format in database queries. 2013 2014 for key, val in filters.items(): 2015 try: 2016 if key in booleans: 2017 filters[key] = self._check_boolean_filter_value( 2018 key, val, strict) 2019 elif key == 'display_name': 2020 # Use the raw value of display name as is for the filter 2021 # without passing it through ast.literal_eval(). If the 2022 # display name is a properly quoted string (e.g. '"foo"') 2023 # then literal_eval() strips the quotes (i.e. 'foo'), so 2024 # the filter becomes different from the user input. 2025 continue 2026 else: 2027 filters[key] = ast.literal_eval(val) 2028 except (ValueError, SyntaxError): 2029 LOG.debug('Could not evaluate value %s, assuming string', val) 2030 2031 def _check_boolean_filter_value(self, key, val, strict=False): 2032 """Boolean filter values in Volume GET. 2033 2034 Before VOLUME_LIST_BOOTABLE, all values other than 'False', 'false', 2035 'FALSE' were trated as True for specific boolean filter parameters in 2036 Volume GET request. 2037 2038 But VOLUME_LIST_BOOTABLE onwards, only true/True/0/1/False/false 2039 parameters are supported. 2040 All other input values to specific boolean filter parameter will 2041 lead to raising exception. 2042 2043 This changes API behavior. So, micro version introduced for 2044 VOLUME_LIST_BOOTABLE onwards. 2045 """ 2046 if strict: 2047 # for updated behavior, from VOLUME_LIST_BOOTABLE onwards. 2048 # To translate any true/false/t/f/0/1 to True/False 2049 # which is only acceptable format in database queries. 2050 try: 2051 return strutils.bool_from_string(val, strict=True) 2052 except ValueError: 2053 msg = _('\'%(key)s = %(value)s\'') % {'key': key, 2054 'value': val} 2055 raise exception.InvalidInput(reason=msg) 2056 else: 2057 # For existing behavior(before version VOLUME_LIST_BOOTABLE) 2058 accepted_true = ['True', 'true', 'TRUE'] 2059 accepted_false = ['False', 'false', 'FALSE'] 2060 2061 if val in accepted_false: 2062 return False 2063 elif val in accepted_true: 2064 return True 2065 else: 2066 return bool(val) 2067 2068 def _attachment_reserve(self, ctxt, vref, instance_uuid=None): 2069 # NOTE(jdg): Reserved is a special case, we're avoiding allowing 2070 # creation of other new reserves/attachments while in this state 2071 # so we avoid contention issues with shared connections 2072 2073 # Multiattach of bootable volumes is a special case with it's own 2074 # policy, check that here right off the bat 2075 if (vref.get('multiattach', False) and 2076 vref.status == 'in-use' and 2077 vref.bootable): 2078 ctxt.authorize( 2079 attachment_policy.MULTIATTACH_BOOTABLE_VOLUME_POLICY, 2080 target_obj=vref) 2081 2082 # FIXME(JDG): We want to be able to do things here like reserve a 2083 # volume for Nova to do BFV WHILE the volume may be in the process of 2084 # downloading image, we add downloading here; that's easy enough but 2085 # we've got a race between with the attaching/detaching that we do 2086 # locally on the Cinder node. Just come up with an easy way to 2087 # determine if we're attaching to the Cinder host for some work or if 2088 # we're being used by the outside world. 2089 expected = {'multiattach': vref.multiattach, 2090 'status': (('available', 'in-use', 'downloading') 2091 if vref.multiattach 2092 else ('available', 'downloading'))} 2093 2094 result = vref.conditional_update({'status': 'reserved'}, expected) 2095 2096 if not result: 2097 override = False 2098 if instance_uuid: 2099 # Refresh the volume reference in case multiple instances were 2100 # being concurrently attached to the same non-multiattach 2101 # volume. 2102 vref = objects.Volume.get_by_id(ctxt, vref.id) 2103 for attachment in vref.volume_attachment: 2104 # If we're attaching the same volume to the same instance, 2105 # we could be migrating the instance to another host in 2106 # which case we want to allow the reservation. 2107 # (LP BUG: 1694530) 2108 if attachment.instance_uuid == instance_uuid: 2109 override = True 2110 break 2111 2112 if not override: 2113 msg = (_('Volume %(vol_id)s status must be %(statuses)s') % 2114 {'vol_id': vref.id, 2115 'statuses': utils.build_or_str(expected['status'])}) 2116 raise exception.InvalidVolume(reason=msg) 2117 2118 values = {'volume_id': vref.id, 2119 'volume_host': vref.host, 2120 'attach_status': 'reserved', 2121 'instance_uuid': instance_uuid} 2122 db_ref = self.db.volume_attach(ctxt.elevated(), values) 2123 return objects.VolumeAttachment.get_by_id(ctxt, db_ref['id']) 2124 2125 def attachment_create(self, 2126 ctxt, 2127 volume_ref, 2128 instance_uuid, 2129 connector=None): 2130 """Create an attachment record for the specified volume.""" 2131 ctxt.authorize(attachment_policy.CREATE_POLICY, target_obj=volume_ref) 2132 connection_info = {} 2133 if "error" in volume_ref.status: 2134 msg = ('Volume attachments can not be created if the volume ' 2135 'is in an error state. ' 2136 'The Volume %(volume_id)s currently has a status of: ' 2137 '%(volume_status)s ') % { 2138 'volume_id': volume_ref.id, 2139 'volume_status': volume_ref.status} 2140 LOG.error(msg) 2141 raise exception.InvalidVolume(reason=msg) 2142 attachment_ref = self._attachment_reserve(ctxt, 2143 volume_ref, 2144 instance_uuid) 2145 if connector: 2146 connection_info = ( 2147 self.volume_rpcapi.attachment_update(ctxt, 2148 volume_ref, 2149 connector, 2150 attachment_ref.id)) 2151 attachment_ref.connection_info = connection_info 2152 if self.db.volume_admin_metadata_get( 2153 ctxt.elevated(), 2154 volume_ref['id']).get('readonly', False): 2155 attachment_ref.attach_mode = 'ro' 2156 attachment_ref.save() 2157 return attachment_ref 2158 2159 @coordination.synchronized( 2160 '{f_name}-{attachment_ref.volume_id}-{connector[host]}') 2161 def attachment_update(self, ctxt, attachment_ref, connector): 2162 """Update an existing attachment record.""" 2163 # Valid items to update (connector includes mode and mountpoint): 2164 # 1. connector (required) 2165 # a. mode (if None use value from attachment_ref) 2166 # b. mountpoint (if None use value from attachment_ref) 2167 # c. instance_uuid(if None use value from attachment_ref) 2168 2169 # This method has a synchronized() lock on the volume id 2170 # because we have to prevent race conditions around checking 2171 # for duplicate attachment requests to the same host. 2172 2173 # We fetch the volume object and pass it to the rpc call because we 2174 # need to direct this to the correct host/backend 2175 2176 ctxt.authorize(attachment_policy.UPDATE_POLICY, 2177 target_obj=attachment_ref) 2178 volume_ref = objects.Volume.get_by_id(ctxt, attachment_ref.volume_id) 2179 if "error" in volume_ref.status: 2180 msg = ('Volume attachments can not be updated if the volume ' 2181 'is in an error state. The Volume %(volume_id)s ' 2182 'currently has a status of: %(volume_status)s ') % { 2183 'volume_id': volume_ref.id, 2184 'volume_status': volume_ref.status} 2185 LOG.error(msg) 2186 raise exception.InvalidVolume(reason=msg) 2187 2188 if (len(volume_ref.volume_attachment) > 1 and 2189 not (volume_ref.multiattach or 2190 self._is_multiattach(volume_ref.volume_type))): 2191 # Check whether all connection hosts are unique 2192 # Multiple attachments to different hosts is permitted to 2193 # support Nova instance migration. 2194 2195 # This particular check also does not prevent multiple attachments 2196 # for a multiattach volume to the same instance. 2197 2198 connection_hosts = set(a.connector['host'] 2199 for a in volume_ref.volume_attachment 2200 if a.connection_info) 2201 2202 if len(connection_hosts) > 0: 2203 # We raced, and have more than one connection 2204 2205 msg = _('duplicate connectors detected on volume ' 2206 '%(vol)s') % {'vol': volume_ref.id} 2207 2208 raise exception.InvalidVolume(reason=msg) 2209 2210 connection_info = ( 2211 self.volume_rpcapi.attachment_update(ctxt, 2212 volume_ref, 2213 connector, 2214 attachment_ref.id)) 2215 attachment_ref.connection_info = connection_info 2216 attachment_ref.save() 2217 return attachment_ref 2218 2219 def attachment_delete(self, ctxt, attachment): 2220 ctxt.authorize(attachment_policy.DELETE_POLICY, 2221 target_obj=attachment) 2222 volume = objects.Volume.get_by_id(ctxt, attachment.volume_id) 2223 if attachment.attach_status == 'reserved': 2224 self.db.volume_detached(ctxt.elevated(), attachment.volume_id, 2225 attachment.get('id')) 2226 self.db.volume_admin_metadata_delete(ctxt.elevated(), 2227 attachment.volume_id, 2228 'attached_mode') 2229 volume_utils.notify_about_volume_usage(ctxt, volume, "detach.end") 2230 else: 2231 self.volume_rpcapi.attachment_delete(ctxt, 2232 attachment.id, 2233 volume) 2234 status_updates = {'status': 'available', 2235 'attach_status': 'detached'} 2236 remaining_attachments = AO_LIST.get_all_by_volume_id(ctxt, volume.id) 2237 LOG.debug("Remaining volume attachments: %s", remaining_attachments, 2238 resource=volume) 2239 2240 # NOTE(jdg) Try and figure out the > state we have left and set that 2241 # attached > attaching > > detaching > reserved 2242 pending_status_list = [] 2243 for attachment in remaining_attachments: 2244 pending_status_list.append(attachment.attach_status) 2245 LOG.debug("Adding status of: %s to pending status list " 2246 "for volume.", attachment.attach_status, 2247 resource=volume) 2248 2249 LOG.debug("Pending status list for volume during " 2250 "attachment-delete: %s", 2251 pending_status_list, resource=volume) 2252 if 'attached' in pending_status_list: 2253 status_updates['status'] = 'in-use' 2254 status_updates['attach_status'] = 'attached' 2255 elif 'attaching' in pending_status_list: 2256 status_updates['status'] = 'attaching' 2257 status_updates['attach_status'] = 'attaching' 2258 elif 'detaching' in pending_status_list: 2259 status_updates['status'] = 'detaching' 2260 status_updates['attach_status'] = 'detaching' 2261 elif 'reserved' in pending_status_list: 2262 status_updates['status'] = 'reserved' 2263 status_updates['attach_status'] = 'reserved' 2264 2265 volume.status = status_updates['status'] 2266 volume.attach_status = status_updates['attach_status'] 2267 volume.save() 2268 return remaining_attachments 2269 2270 2271class HostAPI(base.Base): 2272 """Sub-set of the Volume Manager API for managing host operations.""" 2273 2274 def set_host_enabled(self, context, host, enabled): 2275 """Sets the specified host's ability to accept new volumes.""" 2276 raise NotImplementedError() 2277 2278 def get_host_uptime(self, context, host): 2279 """Returns the result of calling "uptime" on the target host.""" 2280 raise NotImplementedError() 2281 2282 def host_power_action(self, context, host, action): 2283 raise NotImplementedError() 2284 2285 def set_host_maintenance(self, context, host, mode): 2286 """Start/Stop host maintenance window. 2287 2288 On start, it triggers volume evacuation. 2289 """ 2290 raise NotImplementedError() 2291