1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17"""Handles all requests relating to volumes."""
18
19import ast
20import collections
21import datetime
22
23from castellan import key_manager
24from oslo_config import cfg
25from oslo_log import log as logging
26from oslo_utils import excutils
27from oslo_utils import strutils
28from oslo_utils import timeutils
29from oslo_utils import versionutils
30import six
31
32from cinder.api import common
33from cinder.common import constants
34from cinder import context
35from cinder import coordination
36from cinder import db
37from cinder.db import base
38from cinder import exception
39from cinder import flow_utils
40from cinder.i18n import _
41from cinder.image import cache as image_cache
42from cinder.image import glance
43from cinder.message import api as message_api
44from cinder.message import message_field
45from cinder import objects
46from cinder.objects import base as objects_base
47from cinder.objects import fields
48from cinder.objects import volume_type
49from cinder.policies import attachments as attachment_policy
50from cinder.policies import services as svr_policy
51from cinder.policies import snapshot_metadata as s_meta_policy
52from cinder.policies import snapshots as snapshot_policy
53from cinder.policies import volume_actions as vol_action_policy
54from cinder.policies import volume_metadata as vol_meta_policy
55from cinder.policies import volume_transfer as vol_transfer_policy
56from cinder.policies import volumes as vol_policy
57from cinder import quota
58from cinder import quota_utils
59from cinder.scheduler import rpcapi as scheduler_rpcapi
60from cinder import utils
61from cinder.volume.flows.api import create_volume
62from cinder.volume.flows.api import manage_existing
63from cinder.volume import rpcapi as volume_rpcapi
64from cinder.volume import utils as volume_utils
65from cinder.volume import volume_types
66
67allow_force_upload_opt = cfg.BoolOpt('enable_force_upload',
68                                     default=False,
69                                     help='Enables the Force option on '
70                                          'upload_to_image. This enables '
71                                          'running upload_volume on in-use '
72                                          'volumes for backends that '
73                                          'support it.')
74volume_host_opt = cfg.BoolOpt('snapshot_same_host',
75                              default=True,
76                              help='Create volume from snapshot at the host '
77                                   'where snapshot resides')
78volume_same_az_opt = cfg.BoolOpt('cloned_volume_same_az',
79                                 default=True,
80                                 help='Ensure that the new volumes are the '
81                                      'same AZ as snapshot or source volume')
82az_cache_time_opt = cfg.IntOpt('az_cache_duration',
83                               default=3600,
84                               help='Cache volume availability zones in '
85                                    'memory for the provided duration in '
86                                    'seconds')
87
88CONF = cfg.CONF
89CONF.register_opt(allow_force_upload_opt)
90CONF.register_opt(volume_host_opt)
91CONF.register_opt(volume_same_az_opt)
92CONF.register_opt(az_cache_time_opt)
93
94CONF.import_opt('glance_core_properties', 'cinder.image.glance')
95
96LOG = logging.getLogger(__name__)
97QUOTAS = quota.QUOTAS
98AO_LIST = objects.VolumeAttachmentList
99
100
101class API(base.Base):
102    """API for interacting with the volume manager."""
103
104    AVAILABLE_MIGRATION_STATUS = (None, 'deleting', 'error', 'success')
105
106    def __init__(self, db_driver=None, image_service=None):
107        self.image_service = (image_service or
108                              glance.get_default_image_service())
109        self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
110        self.volume_rpcapi = volume_rpcapi.VolumeAPI()
111        self.availability_zones = []
112        self.availability_zones_last_fetched = None
113        self.key_manager = key_manager.API(CONF)
114        self.message = message_api.API()
115        super(API, self).__init__(db_driver)
116
117    def list_availability_zones(self, enable_cache=False, refresh_cache=False):
118        """Describe the known availability zones
119
120        :param enable_cache: Enable az cache
121        :param refresh_cache: Refresh cache immediately
122        :return: tuple of dicts, each with a 'name' and 'available' key
123        """
124        if enable_cache:
125            if self.availability_zones_last_fetched is None:
126                refresh_cache = True
127            else:
128                cache_age = timeutils.delta_seconds(
129                    self.availability_zones_last_fetched,
130                    timeutils.utcnow())
131                if cache_age >= CONF.az_cache_duration:
132                    refresh_cache = True
133        if refresh_cache or not enable_cache:
134            topic = constants.VOLUME_TOPIC
135            ctxt = context.get_admin_context()
136            services = objects.ServiceList.get_all_by_topic(ctxt, topic)
137            az_data = [(s.availability_zone, s.disabled)
138                       for s in services]
139            disabled_map = {}
140            for (az_name, disabled) in az_data:
141                tracked_disabled = disabled_map.get(az_name, True)
142                disabled_map[az_name] = tracked_disabled and disabled
143            azs = [{'name': name, 'available': not disabled}
144                   for (name, disabled) in disabled_map.items()]
145            if refresh_cache:
146                now = timeutils.utcnow()
147                self.availability_zones = azs
148                self.availability_zones_last_fetched = now
149                LOG.debug("Availability zone cache updated, next update will"
150                          " occur around %s.", now + datetime.timedelta(
151                              seconds=CONF.az_cache_duration))
152        else:
153            azs = self.availability_zones
154        LOG.info("Availability Zones retrieved successfully.")
155        return tuple(azs)
156
157    def _retype_is_possible(self, context,
158                            source_type, target_type):
159        elevated = context.elevated()
160        # If encryptions are different, it is not allowed
161        # to create volume from source volume or snapshot.
162        if volume_types.volume_types_encryption_changed(
163                elevated,
164                source_type.id if source_type else None,
165                target_type.id if target_type else None):
166            return False
167        services = objects.ServiceList.get_all_by_topic(
168            elevated,
169            constants.VOLUME_TOPIC,
170            disabled=True)
171        if len(services.objects) == 1:
172            return True
173
174        source_extra_specs = {}
175        if source_type:
176            with source_type.obj_as_admin():
177                source_extra_specs = source_type.extra_specs
178        target_extra_specs = {}
179        if target_type:
180            with target_type.obj_as_admin():
181                target_extra_specs = target_type.extra_specs
182        if (volume_utils.matching_backend_name(
183                source_extra_specs, target_extra_specs)):
184            return True
185        return False
186
187    def _is_volume_migrating(self, volume):
188        # The migration status 'none' means no migration has ever been done
189        # before. The migration status 'error' means the previous migration
190        # failed. The migration status 'success' means the previous migration
191        # succeeded. The migration status 'deleting' means the source volume
192        # fails to delete after a migration.
193        # All of the statuses above means the volume is not in the process
194        # of a migration.
195        return (volume['migration_status'] not in
196                self.AVAILABLE_MIGRATION_STATUS)
197
198    def _is_multiattach(self, volume_type):
199        specs = getattr(volume_type, 'extra_specs', {})
200        return specs.get('multiattach', 'False') == '<is> True'
201
202    def _is_encrypted(self, volume_type):
203        specs = volume_type.get('extra_specs', {})
204        if 'encryption' not in specs:
205            return False
206        return specs.get('encryption', {}) is not {}
207
208    def create(self, context, size, name, description, snapshot=None,
209               image_id=None, volume_type=None, metadata=None,
210               availability_zone=None, source_volume=None,
211               scheduler_hints=None,
212               source_replica=None, consistencygroup=None,
213               cgsnapshot=None, multiattach=False, source_cg=None,
214               group=None, group_snapshot=None, source_group=None,
215               backup=None):
216
217        if image_id:
218            context.authorize(vol_policy.CREATE_FROM_IMAGE_POLICY)
219        else:
220            context.authorize(vol_policy.CREATE_POLICY)
221
222        # Check up front for legacy replication parameters to quick fail
223        if source_replica:
224            msg = _("Creating a volume from a replica source was part of the "
225                    "replication v1 implementation which is no longer "
226                    "available.")
227            raise exception.InvalidInput(reason=msg)
228
229        # NOTE(jdg): we can have a create without size if we're
230        # doing a create from snap or volume.  Currently
231        # the taskflow api will handle this and pull in the
232        # size from the source.
233
234        # NOTE(jdg): cinderclient sends in a string representation
235        # of the size value.  BUT there is a possibility that somebody
236        # could call the API directly so the is_int_like check
237        # handles both cases (string representation of true float or int).
238        if size and (not strutils.is_int_like(size) or int(size) <= 0):
239            msg = _('Invalid volume size provided for create request: %s '
240                    '(size argument must be an integer (or string '
241                    'representation of an integer) and greater '
242                    'than zero).') % size
243            raise exception.InvalidInput(reason=msg)
244
245        if consistencygroup and (not cgsnapshot and not source_cg):
246            if not volume_type:
247                msg = _("volume_type must be provided when creating "
248                        "a volume in a consistency group.")
249                raise exception.InvalidInput(reason=msg)
250            cg_voltypeids = consistencygroup.volume_type_id
251            if volume_type.id not in cg_voltypeids:
252                msg = _("Invalid volume_type provided: %s (requested "
253                        "type must be supported by this consistency "
254                        "group).") % volume_type
255                raise exception.InvalidInput(reason=msg)
256
257        if group and (not group_snapshot and not source_group):
258            if not volume_type:
259                msg = _("volume_type must be provided when creating "
260                        "a volume in a group.")
261                raise exception.InvalidInput(reason=msg)
262            vol_type_ids = [v_type.id for v_type in group.volume_types]
263            if volume_type.id not in vol_type_ids:
264                msg = _("Invalid volume_type provided: %s (requested "
265                        "type must be supported by this "
266                        "group).") % volume_type
267                raise exception.InvalidInput(reason=msg)
268
269        if source_volume and volume_type:
270            if volume_type.id != source_volume.volume_type_id:
271                if not self._retype_is_possible(
272                        context,
273                        source_volume.volume_type,
274                        volume_type):
275                    msg = _("Invalid volume_type provided: %s (requested type "
276                            "is not compatible; either match source volume, "
277                            "or omit type argument).") % volume_type.id
278                    raise exception.InvalidInput(reason=msg)
279
280        if snapshot and volume_type:
281            if volume_type.id != snapshot.volume_type_id:
282                if not self._retype_is_possible(context,
283                                                snapshot.volume.volume_type,
284                                                volume_type):
285                    msg = _("Invalid volume_type provided: %s (requested "
286                            "type is not compatible; recommend omitting "
287                            "the type argument).") % volume_type.id
288                    raise exception.InvalidInput(reason=msg)
289
290        # Determine the valid availability zones that the volume could be
291        # created in (a task in the flow will/can use this information to
292        # ensure that the availability zone requested is valid).
293        raw_zones = self.list_availability_zones(enable_cache=True)
294        availability_zones = set([az['name'] for az in raw_zones])
295        if CONF.storage_availability_zone:
296            availability_zones.add(CONF.storage_availability_zone)
297
298        utils.check_metadata_properties(metadata)
299
300        create_what = {
301            'context': context,
302            'raw_size': size,
303            'name': name,
304            'description': description,
305            'snapshot': snapshot,
306            'image_id': image_id,
307            'raw_volume_type': volume_type,
308            'metadata': metadata or {},
309            'raw_availability_zone': availability_zone,
310            'source_volume': source_volume,
311            'scheduler_hints': scheduler_hints,
312            'key_manager': self.key_manager,
313            'optional_args': {'is_quota_committed': False},
314            'consistencygroup': consistencygroup,
315            'cgsnapshot': cgsnapshot,
316            'raw_multiattach': multiattach,
317            'group': group,
318            'group_snapshot': group_snapshot,
319            'source_group': source_group,
320            'backup': backup,
321        }
322        try:
323            sched_rpcapi = (self.scheduler_rpcapi if (
324                            not cgsnapshot and not source_cg and
325                            not group_snapshot and not source_group)
326                            else None)
327            volume_rpcapi = (self.volume_rpcapi if (
328                             not cgsnapshot and not source_cg and
329                             not group_snapshot and not source_group)
330                             else None)
331            flow_engine = create_volume.get_flow(self.db,
332                                                 self.image_service,
333                                                 availability_zones,
334                                                 create_what,
335                                                 sched_rpcapi,
336                                                 volume_rpcapi)
337        except Exception:
338            msg = _('Failed to create api volume flow.')
339            LOG.exception(msg)
340            raise exception.CinderException(msg)
341
342        # Attaching this listener will capture all of the notifications that
343        # taskflow sends out and redirect them to a more useful log for
344        # cinders debugging (or error reporting) usage.
345        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
346            try:
347                flow_engine.run()
348                vref = flow_engine.storage.fetch('volume')
349                # NOTE(tommylikehu): If the target az is not hit,
350                # refresh the az cache immediately.
351                if flow_engine.storage.fetch('refresh_az'):
352                    self.list_availability_zones(enable_cache=True,
353                                                 refresh_cache=True)
354                # Refresh the object here, otherwise things ain't right
355                vref = objects.Volume.get_by_id(
356                    context, vref['id'])
357                vref.save()
358                LOG.info("Create volume request issued successfully.",
359                         resource=vref)
360                return vref
361            except exception.InvalidAvailabilityZone:
362                with excutils.save_and_reraise_exception():
363                    self.list_availability_zones(enable_cache=True,
364                                                 refresh_cache=True)
365
366    def revert_to_snapshot(self, context, volume, snapshot):
367        """revert a volume to a snapshot"""
368        context.authorize(vol_action_policy.REVERT_POLICY,
369                          target_obj=volume)
370        v_res = volume.update_single_status_where(
371            'reverting', 'available')
372        if not v_res:
373            msg = _("Can't revert volume %s to its latest snapshot. "
374                    "Volume's status must be 'available'.") % volume.id
375            raise exception.InvalidVolume(reason=msg)
376        s_res = snapshot.update_single_status_where(
377            fields.SnapshotStatus.RESTORING,
378            fields.SnapshotStatus.AVAILABLE)
379        if not s_res:
380            msg = _("Can't revert volume %s to its latest snapshot. "
381                    "Snapshot's status must be 'available'.") % snapshot.id
382            raise exception.InvalidSnapshot(reason=msg)
383
384        self.volume_rpcapi.revert_to_snapshot(context, volume, snapshot)
385
386    def delete(self, context, volume,
387               force=False,
388               unmanage_only=False,
389               cascade=False):
390        context.authorize(vol_policy.DELETE_POLICY, target_obj=volume)
391        if context.is_admin and context.project_id != volume.project_id:
392            project_id = volume.project_id
393        else:
394            project_id = context.project_id
395
396        if not volume.host:
397            volume_utils.notify_about_volume_usage(context,
398                                                   volume, "delete.start")
399            # NOTE(vish): scheduling failed, so delete it
400            # Note(zhiteng): update volume quota reservation
401            try:
402                reservations = None
403                if volume.status != 'error_managing':
404                    LOG.debug("Decrease volume quotas only if status is not "
405                              "error_managing.")
406                    reserve_opts = {'volumes': -1, 'gigabytes': -volume.size}
407                    QUOTAS.add_volume_type_opts(context,
408                                                reserve_opts,
409                                                volume.volume_type_id)
410                    reservations = QUOTAS.reserve(context,
411                                                  project_id=project_id,
412                                                  **reserve_opts)
413            except Exception:
414                LOG.exception("Failed to update quota while "
415                              "deleting volume.")
416            volume.destroy()
417
418            if reservations:
419                QUOTAS.commit(context, reservations, project_id=project_id)
420
421            volume_utils.notify_about_volume_usage(context,
422                                                   volume, "delete.end")
423            LOG.info("Delete volume request issued successfully.",
424                     resource={'type': 'volume',
425                               'id': volume.id})
426            return
427
428        if not unmanage_only:
429            volume.assert_not_frozen()
430
431        if unmanage_only and volume.encryption_key_id is not None:
432            msg = _("Unmanaging encrypted volumes is not supported.")
433            e = exception.Invalid(reason=msg)
434            self.message.create(
435                context,
436                message_field.Action.UNMANAGE_VOLUME,
437                resource_uuid=volume.id,
438                detail=message_field.Detail.UNMANAGE_ENC_NOT_SUPPORTED,
439                exception=e)
440            raise e
441
442        # Build required conditions for conditional update
443        expected = {
444            'attach_status': db.Not(fields.VolumeAttachStatus.ATTACHED),
445            'migration_status': self.AVAILABLE_MIGRATION_STATUS,
446            'consistencygroup_id': None,
447            'group_id': None}
448
449        # If not force deleting we have status conditions
450        if not force:
451            expected['status'] = ('available', 'error', 'error_restoring',
452                                  'error_extending', 'error_managing')
453
454        if cascade:
455            if force:
456                # Ignore status checks, but ensure snapshots are not part
457                # of a cgsnapshot.
458                filters = [~db.volume_has_snapshots_in_a_cgsnapshot_filter()]
459            else:
460                # Allow deletion if all snapshots are in an expected state
461                filters = [~db.volume_has_undeletable_snapshots_filter()]
462                # Check if the volume has snapshots which are existing in
463                # other project now.
464                if not context.is_admin:
465                    filters.append(~db.volume_has_other_project_snp_filter())
466        else:
467            # Don't allow deletion of volume with snapshots
468            filters = [~db.volume_has_snapshots_filter()]
469        values = {'status': 'deleting', 'terminated_at': timeutils.utcnow()}
470        if unmanage_only is True:
471            values['status'] = 'unmanaging'
472        if volume.status == 'error_managing':
473            values['status'] = 'error_managing_deleting'
474
475        result = volume.conditional_update(values, expected, filters)
476
477        if not result:
478            status = utils.build_or_str(expected.get('status'),
479                                        _('status must be %s and'))
480            msg = _('Volume %s must not be migrating, attached, belong to a '
481                    'group, have snapshots or be disassociated from '
482                    'snapshots after volume transfer.') % status
483            LOG.info(msg)
484            raise exception.InvalidVolume(reason=msg)
485
486        if cascade:
487            values = {'status': 'deleting'}
488            expected = {'cgsnapshot_id': None,
489                        'group_snapshot_id': None}
490            if not force:
491                expected['status'] = ('available', 'error', 'deleting')
492
493            snapshots = objects.snapshot.SnapshotList.get_all_for_volume(
494                context, volume.id)
495            for s in snapshots:
496                result = s.conditional_update(values, expected, filters)
497
498                if not result:
499                    volume.update({'status': 'error_deleting'})
500                    volume.save()
501
502                    msg = _('Failed to update snapshot.')
503                    raise exception.InvalidVolume(reason=msg)
504
505        cache = image_cache.ImageVolumeCache(self.db, self)
506        entry = cache.get_by_image_volume(context, volume.id)
507        if entry:
508            cache.evict(context, entry)
509
510        # If the volume is encrypted, delete its encryption key from the key
511        # manager. This operation makes volume deletion an irreversible process
512        # because the volume cannot be decrypted without its key.
513        encryption_key_id = volume.get('encryption_key_id', None)
514        if encryption_key_id is not None:
515            try:
516                volume_utils.delete_encryption_key(context,
517                                                   self.key_manager,
518                                                   encryption_key_id)
519            except Exception as e:
520                volume.update({'status': 'error_deleting'})
521                volume.save()
522                if hasattr(e, 'msg'):
523                    msg = _("Unable to delete encryption key for "
524                            "volume: %s") % (e.msg)
525                else:
526                    msg = _("Unable to delete encryption key for volume.")
527                LOG.error(msg)
528                raise exception.InvalidVolume(reason=msg)
529
530        self.volume_rpcapi.delete_volume(context,
531                                         volume,
532                                         unmanage_only,
533                                         cascade)
534        LOG.info("Delete volume request issued successfully.",
535                 resource=volume)
536
537    def update(self, context, volume, fields):
538        context.authorize(vol_policy.UPDATE_POLICY, target_obj=volume)
539        # TODO(karthikp): Making sure volume is always oslo-versioned
540        # If not we convert it at the start of update method. This check
541        # needs to be removed once we have moved to ovo.
542        if not isinstance(volume, objects_base.CinderObject):
543            vol_obj = objects.Volume()
544            volume = objects.Volume._from_db_object(context, vol_obj, volume)
545
546        if volume.status == 'maintenance':
547            LOG.info("Unable to update volume, "
548                     "because it is in maintenance.", resource=volume)
549            msg = _("The volume cannot be updated during maintenance.")
550            raise exception.InvalidVolume(reason=msg)
551
552        utils.check_metadata_properties(fields.get('metadata', None))
553
554        volume.update(fields)
555        volume.save()
556        LOG.info("Volume updated successfully.", resource=volume)
557
558    def get(self, context, volume_id, viewable_admin_meta=False):
559        volume = objects.Volume.get_by_id(context, volume_id)
560
561        try:
562            context.authorize(vol_policy.GET_POLICY, target_obj=volume)
563        except exception.PolicyNotAuthorized:
564            # raise VolumeNotFound to avoid providing info about
565            # the existence of an unauthorized volume id
566            raise exception.VolumeNotFound(volume_id=volume_id)
567
568        if viewable_admin_meta:
569            ctxt = context.elevated()
570            admin_metadata = self.db.volume_admin_metadata_get(ctxt,
571                                                               volume_id)
572            volume.admin_metadata = admin_metadata
573            volume.obj_reset_changes()
574
575        LOG.info("Volume info retrieved successfully.", resource=volume)
576        return volume
577
578    def calculate_resource_count(self, context, resource_type, filters):
579        filters = filters if filters else {}
580        allTenants = utils.get_bool_param('all_tenants', filters)
581        if context.is_admin and allTenants:
582            del filters['all_tenants']
583        else:
584            filters['project_id'] = context.project_id
585        return db.calculate_resource_count(context, resource_type, filters)
586
587    def get_all(self, context, marker=None, limit=None, sort_keys=None,
588                sort_dirs=None, filters=None, viewable_admin_meta=False,
589                offset=None):
590        context.authorize(vol_policy.GET_ALL_POLICY)
591
592        if filters is None:
593            filters = {}
594
595        allTenants = utils.get_bool_param('all_tenants', filters)
596
597        try:
598            if limit is not None:
599                limit = int(limit)
600                if limit < 0:
601                    msg = _('limit param must be positive')
602                    raise exception.InvalidInput(reason=msg)
603        except ValueError:
604            msg = _('limit param must be an integer')
605            raise exception.InvalidInput(reason=msg)
606
607        # Non-admin shouldn't see temporary target of a volume migration, add
608        # unique filter data to reflect that only volumes with a NULL
609        # 'migration_status' or a 'migration_status' that does not start with
610        # 'target:' should be returned (processed in db/sqlalchemy/api.py)
611        if not context.is_admin:
612            filters['no_migration_targets'] = True
613
614        if filters:
615            LOG.debug("Searching by: %s.", six.text_type(filters))
616
617        if context.is_admin and allTenants:
618            # Need to remove all_tenants to pass the filtering below.
619            del filters['all_tenants']
620            volumes = objects.VolumeList.get_all(context, marker, limit,
621                                                 sort_keys=sort_keys,
622                                                 sort_dirs=sort_dirs,
623                                                 filters=filters,
624                                                 offset=offset)
625        else:
626            if viewable_admin_meta:
627                context = context.elevated()
628            volumes = objects.VolumeList.get_all_by_project(
629                context, context.project_id, marker, limit,
630                sort_keys=sort_keys, sort_dirs=sort_dirs, filters=filters,
631                offset=offset)
632
633        LOG.info("Get all volumes completed successfully.")
634        return volumes
635
636    def get_volume_summary(self, context, filters=None):
637        context.authorize(vol_policy.GET_ALL_POLICY)
638
639        if filters is None:
640            filters = {}
641
642        all_tenants = utils.get_bool_param('all_tenants', filters)
643        filters.pop('all_tenants', None)
644        project_only = not (all_tenants and context.is_admin)
645        volumes = objects.VolumeList.get_volume_summary(context, project_only)
646
647        LOG.info("Get summary completed successfully.")
648        return volumes
649
650    def get_snapshot(self, context, snapshot_id):
651        snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
652        context.authorize(snapshot_policy.GET_POLICY, target_obj=snapshot)
653
654        # FIXME(jdg): The objects don't have the db name entries
655        # so build the resource tag manually for now.
656        LOG.info("Snapshot retrieved successfully.",
657                 resource={'type': 'snapshot',
658                           'id': snapshot.id})
659        return snapshot
660
661    def get_volume(self, context, volume_id):
662        volume = objects.Volume.get_by_id(context, volume_id)
663        context.authorize(vol_policy.GET_POLICY, target_obj=volume)
664        LOG.info("Volume retrieved successfully.", resource=volume)
665        return volume
666
667    def get_all_snapshots(self, context, search_opts=None, marker=None,
668                          limit=None, sort_keys=None, sort_dirs=None,
669                          offset=None):
670        context.authorize(snapshot_policy.GET_ALL_POLICY)
671
672        search_opts = search_opts or {}
673
674        if context.is_admin and 'all_tenants' in search_opts:
675            # Need to remove all_tenants to pass the filtering below.
676            del search_opts['all_tenants']
677            snapshots = objects.SnapshotList.get_all(
678                context, search_opts, marker, limit, sort_keys, sort_dirs,
679                offset)
680        else:
681            snapshots = objects.SnapshotList.get_all_by_project(
682                context, context.project_id, search_opts, marker, limit,
683                sort_keys, sort_dirs, offset)
684
685        LOG.info("Get all snapshots completed successfully.")
686        return snapshots
687
688    def reserve_volume(self, context, volume):
689        context.authorize(vol_action_policy.RESERVE_POLICY, target_obj=volume)
690        expected = {'multiattach': volume.multiattach,
691                    'status': (('available', 'in-use') if volume.multiattach
692                               else 'available')}
693
694        result = volume.conditional_update({'status': 'attaching'}, expected)
695
696        if not result:
697            expected_status = utils.build_or_str(expected['status'])
698            msg = _('Volume status must be %(expected)s to reserve, but the '
699                    'status is %(current)s.') % {'expected': expected_status,
700                                                 'current': volume.status}
701            LOG.error(msg)
702            raise exception.InvalidVolume(reason=msg)
703
704        LOG.info("Reserve volume completed successfully.",
705                 resource=volume)
706
707    def unreserve_volume(self, context, volume):
708        context.authorize(vol_action_policy.UNRESERVE_POLICY,
709                          target_obj=volume)
710        expected = {'status': 'attaching'}
711        # Status change depends on whether it has attachments (in-use) or not
712        # (available)
713        value = {'status': db.Case([(db.volume_has_attachments_filter(),
714                                     'in-use')],
715                                   else_='available')}
716        result = volume.conditional_update(value, expected)
717        if not result:
718            LOG.debug("Attempted to unreserve volume that was not "
719                      "reserved, nothing to do.",
720                      resource=volume)
721            return
722
723        LOG.info("Unreserve volume completed successfully.",
724                 resource=volume)
725
726    def begin_detaching(self, context, volume):
727        context.authorize(vol_action_policy.BEGIN_DETACHING_POLICY,
728                          target_obj=volume)
729        # If we are in the middle of a volume migration, we don't want the
730        # user to see that the volume is 'detaching'. Having
731        # 'migration_status' set will have the same effect internally.
732        expected = {'status': 'in-use',
733                    'attach_status': fields.VolumeAttachStatus.ATTACHED,
734                    'migration_status': self.AVAILABLE_MIGRATION_STATUS}
735
736        result = volume.conditional_update({'status': 'detaching'}, expected)
737
738        if not (result or self._is_volume_migrating(volume)):
739            msg = _("Unable to detach volume. Volume status must be 'in-use' "
740                    "and attach_status must be 'attached' to detach.")
741            LOG.error(msg)
742            raise exception.InvalidVolume(reason=msg)
743
744        LOG.info("Begin detaching volume completed successfully.",
745                 resource=volume)
746
747    def roll_detaching(self, context, volume):
748        context.authorize(vol_action_policy.ROLL_DETACHING_POLICY,
749                          target_obj=volume)
750        volume.conditional_update({'status': 'in-use'},
751                                  {'status': 'detaching'})
752        LOG.info("Roll detaching of volume completed successfully.",
753                 resource=volume)
754
755    def attach(self, context, volume, instance_uuid, host_name,
756               mountpoint, mode):
757        context.authorize(vol_action_policy.ATTACH_POLICY,
758                          target_obj=volume)
759        if volume.status == 'maintenance':
760            LOG.info('Unable to attach volume, '
761                     'because it is in maintenance.', resource=volume)
762            msg = _("The volume cannot be attached in maintenance mode.")
763            raise exception.InvalidVolume(reason=msg)
764
765        # We add readonly metadata if it doesn't already exist
766        readonly = self.update_volume_admin_metadata(context.elevated(),
767                                                     volume,
768                                                     {'readonly': 'False'},
769                                                     update=False)['readonly']
770        if readonly == 'True' and mode != 'ro':
771            raise exception.InvalidVolumeAttachMode(mode=mode,
772                                                    volume_id=volume.id)
773
774        attach_results = self.volume_rpcapi.attach_volume(context,
775                                                          volume,
776                                                          instance_uuid,
777                                                          host_name,
778                                                          mountpoint,
779                                                          mode)
780        LOG.info("Attach volume completed successfully.",
781                 resource=volume)
782        return attach_results
783
784    def detach(self, context, volume, attachment_id):
785        context.authorize(vol_action_policy.DETACH_POLICY,
786                          target_obj=volume)
787        if volume['status'] == 'maintenance':
788            LOG.info('Unable to detach volume, '
789                     'because it is in maintenance.', resource=volume)
790            msg = _("The volume cannot be detached in maintenance mode.")
791            raise exception.InvalidVolume(reason=msg)
792        detach_results = self.volume_rpcapi.detach_volume(context, volume,
793                                                          attachment_id)
794        LOG.info("Detach volume completed successfully.",
795                 resource=volume)
796        return detach_results
797
798    def initialize_connection(self, context, volume, connector):
799        context.authorize(vol_action_policy.INITIALIZE_POLICY,
800                          target_obj=volume)
801        if volume.status == 'maintenance':
802            LOG.info('Unable to initialize the connection for '
803                     'volume, because it is in '
804                     'maintenance.', resource=volume)
805            msg = _("The volume connection cannot be initialized in "
806                    "maintenance mode.")
807            raise exception.InvalidVolume(reason=msg)
808        init_results = self.volume_rpcapi.initialize_connection(context,
809                                                                volume,
810                                                                connector)
811        LOG.info("Initialize volume connection completed successfully.",
812                 resource=volume)
813        return init_results
814
815    def terminate_connection(self, context, volume, connector, force=False):
816        context.authorize(vol_action_policy.TERMINATE_POLICY,
817                          target_obj=volume)
818        self.volume_rpcapi.terminate_connection(context,
819                                                volume,
820                                                connector,
821                                                force)
822        LOG.info("Terminate volume connection completed successfully.",
823                 resource=volume)
824        self.unreserve_volume(context, volume)
825
826    def accept_transfer(self, context, volume, new_user, new_project):
827        context.authorize(vol_transfer_policy.ACCEPT_POLICY,
828                          target_obj=volume)
829        if volume['status'] == 'maintenance':
830            LOG.info('Unable to accept transfer for volume, '
831                     'because it is in maintenance.', resource=volume)
832            msg = _("The volume cannot accept transfer in maintenance mode.")
833            raise exception.InvalidVolume(reason=msg)
834        results = self.volume_rpcapi.accept_transfer(context,
835                                                     volume,
836                                                     new_user,
837                                                     new_project)
838        LOG.info("Transfer volume completed successfully.",
839                 resource=volume)
840        return results
841
842    def _create_snapshot(self, context,
843                         volume, name, description,
844                         force=False, metadata=None,
845                         cgsnapshot_id=None,
846                         group_snapshot_id=None):
847        volume.assert_not_frozen()
848        snapshot = self.create_snapshot_in_db(
849            context, volume, name,
850            description, force, metadata, cgsnapshot_id,
851            True, group_snapshot_id)
852        # NOTE(tommylikehu): We only wrap the 'size' attribute here
853        # because only the volume's host is passed and only capacity is
854        # validated in the scheduler now.
855        kwargs = {'snapshot_id': snapshot.id,
856                  'volume_properties': objects.VolumeProperties(
857                      size=volume.size)}
858        self.scheduler_rpcapi.create_snapshot(context, volume, snapshot,
859                                              volume.service_topic_queue,
860                                              objects.RequestSpec(**kwargs))
861        return snapshot
862
863    def create_snapshot_in_db(self, context,
864                              volume, name, description,
865                              force, metadata,
866                              cgsnapshot_id,
867                              commit_quota=True,
868                              group_snapshot_id=None):
869        context.authorize(snapshot_policy.CREATE_POLICY, target_obj=volume)
870
871        utils.check_metadata_properties(metadata)
872        if not volume.host:
873            msg = _("The snapshot cannot be created because volume has "
874                    "not been scheduled to any host.")
875            raise exception.InvalidVolume(reason=msg)
876
877        if volume['status'] == 'maintenance':
878            LOG.info('Unable to create the snapshot for volume, '
879                     'because it is in maintenance.', resource=volume)
880            msg = _("The snapshot cannot be created when the volume is in "
881                    "maintenance mode.")
882            raise exception.InvalidVolume(reason=msg)
883        if self._is_volume_migrating(volume):
884            # Volume is migrating, wait until done
885            msg = _("Snapshot cannot be created while volume is migrating.")
886            raise exception.InvalidVolume(reason=msg)
887
888        if volume['status'].startswith('replica_'):
889            # Can't snapshot secondary replica
890            msg = _("Snapshot of secondary replica is not allowed.")
891            raise exception.InvalidVolume(reason=msg)
892
893        valid_status = ["available", "in-use"] if force else ["available"]
894
895        if volume['status'] not in valid_status:
896            msg = _("Volume %(vol_id)s status must be %(status)s, "
897                    "but current status is: "
898                    "%(vol_status)s.") % {'vol_id': volume['id'],
899                                          'status': ', '.join(valid_status),
900                                          'vol_status': volume['status']}
901            raise exception.InvalidVolume(reason=msg)
902
903        if commit_quota:
904            try:
905                if CONF.no_snapshot_gb_quota:
906                    reserve_opts = {'snapshots': 1}
907                else:
908                    reserve_opts = {'snapshots': 1,
909                                    'gigabytes': volume['size']}
910                QUOTAS.add_volume_type_opts(context,
911                                            reserve_opts,
912                                            volume.get('volume_type_id'))
913                reservations = QUOTAS.reserve(context, **reserve_opts)
914            except exception.OverQuota as e:
915                quota_utils.process_reserve_over_quota(
916                    context, e,
917                    resource='snapshots',
918                    size=volume.size)
919
920        snapshot = None
921        try:
922            kwargs = {
923                'volume_id': volume['id'],
924                'cgsnapshot_id': cgsnapshot_id,
925                'group_snapshot_id': group_snapshot_id,
926                'user_id': context.user_id,
927                'project_id': context.project_id,
928                'status': fields.SnapshotStatus.CREATING,
929                'progress': '0%',
930                'volume_size': volume['size'],
931                'display_name': name,
932                'display_description': description,
933                'volume_type_id': volume['volume_type_id'],
934                'encryption_key_id': volume['encryption_key_id'],
935                'metadata': metadata or {}
936            }
937            snapshot = objects.Snapshot(context=context, **kwargs)
938            snapshot.create()
939            volume.refresh()
940
941            if volume['status'] not in valid_status:
942                msg = _("Volume %(vol_id)s status must be %(status)s , "
943                        "but current status is: "
944                        "%(vol_status)s.") % {'vol_id': volume['id'],
945                                              'status':
946                                                  ', '.join(valid_status),
947                                              'vol_status':
948                                                  volume['status']}
949                raise exception.InvalidVolume(reason=msg)
950            if commit_quota:
951                QUOTAS.commit(context, reservations)
952        except Exception:
953            with excutils.save_and_reraise_exception():
954                try:
955                    if snapshot.obj_attr_is_set('id'):
956                        snapshot.destroy()
957                finally:
958                    if commit_quota:
959                        QUOTAS.rollback(context, reservations)
960
961        return snapshot
962
963    def create_snapshots_in_db(self, context,
964                               volume_list,
965                               name, description,
966                               cgsnapshot_id,
967                               group_snapshot_id=None):
968        snapshot_list = []
969        for volume in volume_list:
970            self._create_snapshot_in_db_validate(context, volume)
971
972        reservations = self._create_snapshots_in_db_reserve(
973            context, volume_list)
974
975        options_list = []
976        for volume in volume_list:
977            options = self._create_snapshot_in_db_options(
978                context, volume, name, description, cgsnapshot_id,
979                group_snapshot_id)
980            options_list.append(options)
981
982        try:
983            for options in options_list:
984                snapshot = objects.Snapshot(context=context, **options)
985                snapshot.create()
986                snapshot_list.append(snapshot)
987
988            QUOTAS.commit(context, reservations)
989        except Exception:
990            with excutils.save_and_reraise_exception():
991                try:
992                    for snap in snapshot_list:
993                        snap.destroy()
994                finally:
995                    QUOTAS.rollback(context, reservations)
996
997        return snapshot_list
998
999    def _create_snapshot_in_db_validate(self, context, volume):
1000        context.authorize(snapshot_policy.CREATE_POLICY, target_obj=volume)
1001
1002        if volume['status'] == 'maintenance':
1003            LOG.info('Unable to create the snapshot for volume, '
1004                     'because it is in maintenance.', resource=volume)
1005            msg = _("The snapshot cannot be created when the volume is in "
1006                    "maintenance mode.")
1007            raise exception.InvalidVolume(reason=msg)
1008        if self._is_volume_migrating(volume):
1009            # Volume is migrating, wait until done
1010            msg = _("Snapshot cannot be created while volume is migrating.")
1011            raise exception.InvalidVolume(reason=msg)
1012        if volume['status'] == 'error':
1013            msg = _("The snapshot cannot be created when the volume is "
1014                    "in error status.")
1015            LOG.error(msg)
1016            raise exception.InvalidVolume(reason=msg)
1017
1018    def _create_snapshots_in_db_reserve(self, context, volume_list):
1019        reserve_opts_list = []
1020        total_reserve_opts = {}
1021        try:
1022            for volume in volume_list:
1023                if CONF.no_snapshot_gb_quota:
1024                    reserve_opts = {'snapshots': 1}
1025                else:
1026                    reserve_opts = {'snapshots': 1,
1027                                    'gigabytes': volume['size']}
1028                QUOTAS.add_volume_type_opts(context,
1029                                            reserve_opts,
1030                                            volume.get('volume_type_id'))
1031                reserve_opts_list.append(reserve_opts)
1032
1033            for reserve_opts in reserve_opts_list:
1034                for (key, value) in reserve_opts.items():
1035                    if key not in total_reserve_opts.keys():
1036                        total_reserve_opts[key] = value
1037                    else:
1038                        total_reserve_opts[key] = \
1039                            total_reserve_opts[key] + value
1040            reservations = QUOTAS.reserve(context, **total_reserve_opts)
1041        except exception.OverQuota as e:
1042            quota_utils.process_reserve_over_quota(
1043                context,
1044                e,
1045                resource='snapshots',
1046                size=total_reserve_opts.get('gigabytes', volume.size))
1047
1048        return reservations
1049
1050    def _create_snapshot_in_db_options(self, context, volume,
1051                                       name, description,
1052                                       cgsnapshot_id,
1053                                       group_snapshot_id=None):
1054        options = {'volume_id': volume['id'],
1055                   'cgsnapshot_id': cgsnapshot_id,
1056                   'group_snapshot_id': group_snapshot_id,
1057                   'user_id': context.user_id,
1058                   'project_id': context.project_id,
1059                   'status': fields.SnapshotStatus.CREATING,
1060                   'progress': '0%',
1061                   'volume_size': volume['size'],
1062                   'display_name': name,
1063                   'display_description': description,
1064                   'volume_type_id': volume['volume_type_id'],
1065                   'encryption_key_id': volume['encryption_key_id']}
1066        return options
1067
1068    def create_snapshot(self, context,
1069                        volume, name, description,
1070                        metadata=None, cgsnapshot_id=None,
1071                        group_snapshot_id=None):
1072        result = self._create_snapshot(context, volume, name, description,
1073                                       False, metadata, cgsnapshot_id,
1074                                       group_snapshot_id)
1075        LOG.info("Snapshot create request issued successfully.",
1076                 resource=result)
1077        return result
1078
1079    def create_snapshot_force(self, context,
1080                              volume, name,
1081                              description, metadata=None):
1082        result = self._create_snapshot(context, volume, name, description,
1083                                       True, metadata)
1084        LOG.info("Snapshot force create request issued successfully.",
1085                 resource=result)
1086        return result
1087
1088    def delete_snapshot(self, context, snapshot, force=False,
1089                        unmanage_only=False):
1090        context.authorize(snapshot_policy.DELETE_POLICY,
1091                          target_obj=snapshot)
1092        if not unmanage_only:
1093            snapshot.assert_not_frozen()
1094
1095        # Build required conditions for conditional update
1096        expected = {'cgsnapshot_id': None,
1097                    'group_snapshot_id': None}
1098        # If not force deleting we have status conditions
1099        if not force:
1100            expected['status'] = (fields.SnapshotStatus.AVAILABLE,
1101                                  fields.SnapshotStatus.ERROR)
1102
1103        values = {'status': fields.SnapshotStatus.DELETING}
1104        if unmanage_only is True:
1105            values['status'] = fields.SnapshotStatus.UNMANAGING
1106        result = snapshot.conditional_update(values, expected)
1107        if not result:
1108            status = utils.build_or_str(expected.get('status'),
1109                                        _('status must be %s and'))
1110            msg = (_('Snapshot %s must not be part of a group.') %
1111                   status)
1112            LOG.error(msg)
1113            raise exception.InvalidSnapshot(reason=msg)
1114
1115        self.volume_rpcapi.delete_snapshot(context, snapshot, unmanage_only)
1116        LOG.info("Snapshot delete request issued successfully.",
1117                 resource=snapshot)
1118
1119    def update_snapshot(self, context, snapshot, fields):
1120        context.authorize(snapshot_policy.UPDATE_POLICY,
1121                          target_obj=snapshot)
1122        snapshot.update(fields)
1123        snapshot.save()
1124
1125    def get_volume_metadata(self, context, volume):
1126        """Get all metadata associated with a volume."""
1127        context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume)
1128        rv = self.db.volume_metadata_get(context, volume['id'])
1129        LOG.info("Get volume metadata completed successfully.",
1130                 resource=volume)
1131        return dict(rv)
1132
1133    def create_volume_metadata(self, context, volume, metadata):
1134        """Creates volume metadata."""
1135        context.authorize(vol_meta_policy.CREATE_POLICY, target_obj=volume)
1136        db_meta = self._update_volume_metadata(context, volume, metadata)
1137
1138        LOG.info("Create volume metadata completed successfully.",
1139                 resource=volume)
1140        return db_meta
1141
1142    def delete_volume_metadata(self, context, volume,
1143                               key, meta_type=common.METADATA_TYPES.user):
1144        """Delete the given metadata item from a volume."""
1145        context.authorize(vol_meta_policy.DELETE_POLICY, target_obj=volume)
1146        if volume.status in ('maintenance', 'uploading'):
1147            msg = _('Deleting volume metadata is not allowed for volumes in '
1148                    '%s status.') % volume.status
1149            LOG.info(msg, resource=volume)
1150            raise exception.InvalidVolume(reason=msg)
1151        self.db.volume_metadata_delete(context, volume.id, key, meta_type)
1152        LOG.info("Delete volume metadata completed successfully.",
1153                 resource=volume)
1154
1155    def _update_volume_metadata(self, context, volume, metadata, delete=False,
1156                                meta_type=common.METADATA_TYPES.user):
1157        if volume['status'] in ('maintenance', 'uploading'):
1158            msg = _('Updating volume metadata is not allowed for volumes in '
1159                    '%s status.') % volume['status']
1160            LOG.info(msg, resource=volume)
1161            raise exception.InvalidVolume(reason=msg)
1162        utils.check_metadata_properties(metadata)
1163        return self.db.volume_metadata_update(context, volume['id'],
1164                                              metadata, delete, meta_type)
1165
1166    def update_volume_metadata(self, context, volume, metadata, delete=False,
1167                               meta_type=common.METADATA_TYPES.user):
1168        """Updates volume metadata.
1169
1170        If delete is True, metadata items that are not specified in the
1171        `metadata` argument will be deleted.
1172
1173        """
1174        context.authorize(vol_meta_policy.UPDATE_POLICY, target_obj=volume)
1175        db_meta = self._update_volume_metadata(context, volume, metadata,
1176                                               delete, meta_type)
1177
1178        # TODO(jdg): Implement an RPC call for drivers that may use this info
1179
1180        LOG.info("Update volume metadata completed successfully.",
1181                 resource=volume)
1182        return db_meta
1183
1184    def get_volume_admin_metadata(self, context, volume):
1185        """Get all administration metadata associated with a volume."""
1186        rv = self.db.volume_admin_metadata_get(context, volume['id'])
1187        LOG.info("Get volume admin metadata completed successfully.",
1188                 resource=volume)
1189        return dict(rv)
1190
1191    def update_volume_admin_metadata(self, context, volume, metadata,
1192                                     delete=False, add=True, update=True):
1193        """Updates or creates volume administration metadata.
1194
1195        If delete is True, metadata items that are not specified in the
1196        `metadata` argument will be deleted.
1197
1198        """
1199        context.authorize(vol_meta_policy.UPDATE_ADMIN_METADATA_POLICY,
1200                          target_obj=volume)
1201        utils.check_metadata_properties(metadata)
1202        db_meta = self.db.volume_admin_metadata_update(context, volume.id,
1203                                                       metadata, delete, add,
1204                                                       update)
1205
1206        # TODO(jdg): Implement an RPC call for drivers that may use this info
1207
1208        LOG.info("Update volume admin metadata completed successfully.",
1209                 resource=volume)
1210        return db_meta
1211
1212    def get_snapshot_metadata(self, context, snapshot):
1213        """Get all metadata associated with a snapshot."""
1214        context.authorize(s_meta_policy.GET_POLICY,
1215                          target_obj=snapshot)
1216        LOG.info("Get snapshot metadata completed successfully.",
1217                 resource=snapshot)
1218        return snapshot.metadata
1219
1220    def delete_snapshot_metadata(self, context, snapshot, key):
1221        """Delete the given metadata item from a snapshot."""
1222        context.authorize(s_meta_policy.DELETE_POLICY,
1223                          target_obj=snapshot)
1224        snapshot.delete_metadata_key(context, key)
1225        LOG.info("Delete snapshot metadata completed successfully.",
1226                 resource=snapshot)
1227
1228    def update_snapshot_metadata(self, context,
1229                                 snapshot, metadata,
1230                                 delete=False):
1231        """Updates or creates snapshot metadata.
1232
1233        If delete is True, metadata items that are not specified in the
1234        `metadata` argument will be deleted.
1235
1236        """
1237        context.authorize(s_meta_policy.UPDATE_POLICY,
1238                          target_obj=snapshot)
1239        if delete:
1240            _metadata = metadata
1241        else:
1242            orig_meta = snapshot.metadata
1243            _metadata = orig_meta.copy()
1244            _metadata.update(metadata)
1245
1246        utils.check_metadata_properties(_metadata)
1247
1248        snapshot.metadata = _metadata
1249        snapshot.save()
1250
1251        # TODO(jdg): Implement an RPC call for drivers that may use this info
1252
1253        LOG.info("Update snapshot metadata completed successfully.",
1254                 resource=snapshot)
1255        return snapshot.metadata
1256
1257    def get_snapshot_metadata_value(self, snapshot, key):
1258        LOG.info("Get snapshot metadata value not implemented.",
1259                 resource=snapshot)
1260        # FIXME(jdg): Huh?  Pass?
1261        pass
1262
1263    def get_volumes_image_metadata(self, context):
1264        context.authorize(vol_meta_policy.GET_POLICY)
1265        db_data = self.db.volume_glance_metadata_get_all(context)
1266        results = collections.defaultdict(dict)
1267        for meta_entry in db_data:
1268            results[meta_entry['volume_id']].update({meta_entry['key']:
1269                                                     meta_entry['value']})
1270        return results
1271
1272    def get_volume_image_metadata(self, context, volume):
1273        context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume)
1274        db_data = self.db.volume_glance_metadata_get(context, volume['id'])
1275        LOG.info("Get volume image-metadata completed successfully.",
1276                 resource=volume)
1277        return {meta_entry.key: meta_entry.value for meta_entry in db_data}
1278
1279    def get_list_volumes_image_metadata(self, context, volume_id_list):
1280        db_data = self.db.volume_glance_metadata_list_get(context,
1281                                                          volume_id_list)
1282        results = collections.defaultdict(dict)
1283        for meta_entry in db_data:
1284            results[meta_entry['volume_id']].update({meta_entry['key']:
1285                                                     meta_entry['value']})
1286        return results
1287
1288    def copy_volume_to_image(self, context, volume, metadata, force):
1289        """Create a new image from the specified volume."""
1290        if not CONF.enable_force_upload and force:
1291            LOG.info("Force upload to image is disabled, "
1292                     "Force option will be ignored.",
1293                     resource={'type': 'volume', 'id': volume['id']})
1294            force = False
1295
1296        # Build required conditions for conditional update
1297        expected = {'status': ('available', 'in-use') if force
1298                    else 'available'}
1299        values = {'status': 'uploading',
1300                  'previous_status': volume.model.status}
1301
1302        result = volume.conditional_update(values, expected)
1303        if not result:
1304            msg = (_('Volume %(vol_id)s status must be %(statuses)s') %
1305                   {'vol_id': volume.id,
1306                    'statuses': utils.build_or_str(expected['status'])})
1307            raise exception.InvalidVolume(reason=msg)
1308
1309        try:
1310            glance_core_props = CONF.glance_core_properties
1311            if glance_core_props:
1312                try:
1313                    vol_img_metadata = self.get_volume_image_metadata(
1314                        context, volume)
1315                    custom_property_set = (
1316                        set(vol_img_metadata).difference(glance_core_props))
1317                    if custom_property_set:
1318                        metadata['properties'] = {
1319                            custom_prop: vol_img_metadata[custom_prop]
1320                            for custom_prop in custom_property_set}
1321                except exception.GlanceMetadataNotFound:
1322                    # If volume is not created from image, No glance metadata
1323                    # would be available for that volume in
1324                    # volume glance metadata table
1325                    pass
1326
1327            recv_metadata = self.image_service.create(
1328                context, self.image_service._translate_to_glance(metadata))
1329        except Exception:
1330            # NOTE(geguileo): To mimic behavior before conditional_update we
1331            # will rollback status if image create fails
1332            with excutils.save_and_reraise_exception():
1333                volume.conditional_update(
1334                    {'status': volume.model.previous_status,
1335                     'previous_status': None},
1336                    {'status': 'uploading'})
1337
1338        self.volume_rpcapi.copy_volume_to_image(context,
1339                                                volume,
1340                                                recv_metadata)
1341
1342        response = {"id": volume['id'],
1343                    "updated_at": volume['updated_at'],
1344                    "status": 'uploading',
1345                    "display_description": volume['display_description'],
1346                    "size": volume['size'],
1347                    "volume_type": volume['volume_type'],
1348                    "image_id": recv_metadata['id'],
1349                    "container_format": recv_metadata['container_format'],
1350                    "disk_format": recv_metadata['disk_format'],
1351                    "image_name": recv_metadata.get('name', None)}
1352        if 'protected' in recv_metadata:
1353            response['protected'] = recv_metadata.get('protected')
1354        if 'is_public' in recv_metadata:
1355            response['is_public'] = recv_metadata.get('is_public')
1356        elif 'visibility' in recv_metadata:
1357            response['visibility'] = recv_metadata.get('visibility')
1358        LOG.info("Copy volume to image completed successfully.",
1359                 resource=volume)
1360        return response
1361
1362    def _extend(self, context, volume, new_size, attached=False):
1363        value = {'status': 'extending',
1364                 'previous_status': volume.status}
1365        if attached:
1366            expected = {'status': 'in-use'}
1367        else:
1368            expected = {'status': 'available'}
1369        orig_status = {'status': volume.status}
1370
1371        def _roll_back_status():
1372            status = orig_status['status']
1373            msg = _('Could not return volume %(id)s to %(status)s.')
1374            try:
1375                if not volume.conditional_update(orig_status, value):
1376                    LOG.error(msg, {'id': volume.id, 'status': status})
1377            except Exception:
1378                LOG.exception(msg, {'id': volume.id, 'status': status})
1379
1380        size_increase = (int(new_size)) - volume.size
1381        if size_increase <= 0:
1382            msg = (_("New size for extend must be greater "
1383                     "than current size. (current: %(size)s, "
1384                     "extended: %(new_size)s).") % {'new_size': new_size,
1385                                                    'size': volume.size})
1386            raise exception.InvalidInput(reason=msg)
1387
1388        result = volume.conditional_update(value, expected)
1389        if not result:
1390            msg = (_("Volume %(vol_id)s status must be '%(expected)s' "
1391                     "to extend, currently %(status)s.")
1392                   % {'vol_id': volume.id,
1393                      'status': volume.status,
1394                      'expected': six.text_type(expected)})
1395            raise exception.InvalidVolume(reason=msg)
1396
1397        rollback = True
1398        try:
1399            values = {'per_volume_gigabytes': new_size}
1400            QUOTAS.limit_check(context, project_id=context.project_id,
1401                               **values)
1402            rollback = False
1403        except exception.OverQuota as e:
1404            quotas = e.kwargs['quotas']
1405            raise exception.VolumeSizeExceedsLimit(
1406                size=new_size, limit=quotas['per_volume_gigabytes'])
1407        finally:
1408            # NOTE(geguileo): To mimic behavior before conditional_update we
1409            # will rollback status on quota reservation failure regardless of
1410            # the exception that caused the failure.
1411            if rollback:
1412                _roll_back_status()
1413
1414        try:
1415            reservations = None
1416            reserve_opts = {'gigabytes': size_increase}
1417            QUOTAS.add_volume_type_opts(context, reserve_opts,
1418                                        volume.volume_type_id)
1419            reservations = QUOTAS.reserve(context,
1420                                          project_id=volume.project_id,
1421                                          **reserve_opts)
1422        except exception.OverQuota as exc:
1423            gigabytes = exc.kwargs['usages']['gigabytes']
1424            gb_quotas = exc.kwargs['quotas']['gigabytes']
1425
1426            consumed = gigabytes['reserved'] + gigabytes['in_use']
1427            LOG.error("Quota exceeded for %(s_pid)s, tried to extend volume "
1428                      "by %(s_size)sG, (%(d_consumed)dG of %(d_quota)dG "
1429                      "already consumed).",
1430                      {'s_pid': context.project_id,
1431                       's_size': size_increase,
1432                       'd_consumed': consumed,
1433                       'd_quota': gb_quotas})
1434            raise exception.VolumeSizeExceedsAvailableQuota(
1435                requested=size_increase, consumed=consumed, quota=gb_quotas)
1436        finally:
1437            # NOTE(geguileo): To mimic behavior before conditional_update we
1438            # will rollback status on quota reservation failure regardless of
1439            # the exception that caused the failure.
1440            if reservations is None:
1441                _roll_back_status()
1442
1443        volume_type = {}
1444        if volume.volume_type_id:
1445            volume_type = volume_types.get_volume_type(context.elevated(),
1446                                                       volume.volume_type_id)
1447
1448        request_spec = {
1449            'volume_properties': volume,
1450            'volume_type': volume_type,
1451            'volume_id': volume.id
1452        }
1453
1454        self.scheduler_rpcapi.extend_volume(context, volume, new_size,
1455                                            reservations, request_spec)
1456
1457        LOG.info("Extend volume request issued successfully.",
1458                 resource=volume)
1459
1460    def extend(self, context, volume, new_size):
1461        context.authorize(vol_action_policy.EXTEND_POLICY,
1462                          target_obj=volume)
1463        self._extend(context, volume, new_size, attached=False)
1464
1465    # NOTE(tommylikehu): New method is added here so that administrator
1466    # can enable/disable this ability by editing the policy file if the
1467    # cloud environment doesn't allow this operation.
1468    def extend_attached_volume(self, context, volume, new_size):
1469        context.authorize(vol_action_policy.EXTEND_ATTACHED_POLICY,
1470                          target_obj=volume)
1471        self._extend(context, volume, new_size, attached=True)
1472
1473    def migrate_volume(self, context, volume, host, cluster_name, force_copy,
1474                       lock_volume):
1475        """Migrate the volume to the specified host or cluster."""
1476        elevated = context.elevated()
1477        context.authorize(vol_action_policy.MIGRATE_POLICY,
1478                          target_obj=volume)
1479
1480        # If we received a request to migrate to a host
1481        # Look for the service - must be up and enabled
1482        svc_host = host and volume_utils.extract_host(host, 'backend')
1483        svc_cluster = cluster_name and volume_utils.extract_host(cluster_name,
1484                                                                 'backend')
1485        # NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get
1486        # a service from the DB we are getting either one specific service from
1487        # a host or any service from a cluster that is up, which means that the
1488        # cluster itself is also up.
1489        try:
1490            svc = objects.Service.get_by_id(elevated, None, is_up=True,
1491                                            topic=constants.VOLUME_TOPIC,
1492                                            host=svc_host, disabled=False,
1493                                            cluster_name=svc_cluster,
1494                                            backend_match_level='pool')
1495        except exception.ServiceNotFound:
1496            msg = _("No available service named '%s'") % (cluster_name or host)
1497            LOG.error(msg)
1498            raise exception.InvalidHost(reason=msg)
1499        # Even if we were requested to do a migration to a host, if the host is
1500        # in a cluster we will do a cluster migration.
1501        cluster_name = svc.cluster_name
1502
1503        # Build required conditions for conditional update
1504        expected = {'status': ('available', 'in-use'),
1505                    'migration_status': self.AVAILABLE_MIGRATION_STATUS,
1506                    'replication_status': (
1507                        None,
1508                        fields.ReplicationStatus.DISABLED,
1509                        fields.ReplicationStatus.NOT_CAPABLE),
1510                    'consistencygroup_id': (None, ''),
1511                    'group_id': (None, '')}
1512
1513        # We want to make sure that the migration is to another host or
1514        # another cluster.
1515        if cluster_name:
1516            expected['cluster_name'] = db.Not(cluster_name)
1517        else:
1518            expected['host'] = db.Not(host)
1519
1520        filters = [~db.volume_has_snapshots_filter()]
1521
1522        updates = {'migration_status': 'starting',
1523                   'previous_status': volume.model.status}
1524
1525        # When the migration of an available volume starts, both the status
1526        # and the migration status of the volume will be changed.
1527        # If the admin sets lock_volume flag to True, the volume
1528        # status is changed to 'maintenance', telling users
1529        # that this volume is in maintenance mode, and no action is allowed
1530        # on this volume, e.g. attach, detach, retype, migrate, etc.
1531        if lock_volume:
1532            updates['status'] = db.Case(
1533                [(volume.model.status == 'available', 'maintenance')],
1534                else_=volume.model.status)
1535
1536        result = volume.conditional_update(updates, expected, filters)
1537
1538        if not result:
1539            msg = _('Volume %s status must be available or in-use, must not '
1540                    'be migrating, have snapshots, be replicated, be part of '
1541                    'a group and destination host/cluster must be different '
1542                    'than the current one') % volume.id
1543            LOG.error(msg)
1544            raise exception.InvalidVolume(reason=msg)
1545
1546        # Call the scheduler to ensure that the host exists and that it can
1547        # accept the volume
1548        volume_type = {}
1549        if volume.volume_type_id:
1550            volume_type = volume_types.get_volume_type(context.elevated(),
1551                                                       volume.volume_type_id)
1552        request_spec = {'volume_properties': volume,
1553                        'volume_type': volume_type,
1554                        'volume_id': volume.id}
1555        self.scheduler_rpcapi.migrate_volume(context,
1556                                             volume,
1557                                             cluster_name or host,
1558                                             force_copy,
1559                                             request_spec)
1560        LOG.info("Migrate volume request issued successfully.",
1561                 resource=volume)
1562
1563    def migrate_volume_completion(self, context, volume, new_volume, error):
1564        context.authorize(vol_action_policy.MIGRATE_COMPLETE_POLICY,
1565                          target_obj=volume)
1566        if not (volume.migration_status or new_volume.migration_status):
1567            # When we're not migrating and haven't hit any errors, we issue
1568            # volume attach and detach requests so the volumes don't end in
1569            # 'attaching' and 'detaching' state
1570            if not error:
1571                attachments = volume.volume_attachment
1572                for attachment in attachments:
1573                    self.detach(context, volume, attachment.id)
1574
1575                    self.attach(context, new_volume,
1576                                attachment.instance_uuid,
1577                                attachment.attached_host,
1578                                attachment.mountpoint,
1579                                'rw')
1580
1581            return new_volume.id
1582
1583        if not volume.migration_status:
1584            msg = _('Source volume not mid-migration.')
1585            raise exception.InvalidVolume(reason=msg)
1586
1587        if not new_volume.migration_status:
1588            msg = _('Destination volume not mid-migration.')
1589            raise exception.InvalidVolume(reason=msg)
1590
1591        expected_status = 'target:%s' % volume.id
1592        if not new_volume.migration_status == expected_status:
1593            msg = (_('Destination has migration_status %(stat)s, expected '
1594                     '%(exp)s.') % {'stat': new_volume.migration_status,
1595                                    'exp': expected_status})
1596            raise exception.InvalidVolume(reason=msg)
1597
1598        LOG.info("Migrate volume completion issued successfully.",
1599                 resource=volume)
1600        return self.volume_rpcapi.migrate_volume_completion(context, volume,
1601                                                            new_volume, error)
1602
1603    def update_readonly_flag(self, context, volume, flag):
1604        context.authorize(vol_action_policy.UPDATE_READONLY_POLICY,
1605                          target_obj=volume)
1606        if volume['status'] != 'available':
1607            msg = _('Volume %(vol_id)s status must be available '
1608                    'to update readonly flag, but current status is: '
1609                    '%(vol_status)s.') % {'vol_id': volume['id'],
1610                                          'vol_status': volume['status']}
1611            raise exception.InvalidVolume(reason=msg)
1612        self.update_volume_admin_metadata(context.elevated(), volume,
1613                                          {'readonly': six.text_type(flag)})
1614        LOG.info("Update readonly setting on volume "
1615                 "completed successfully.",
1616                 resource=volume)
1617
1618    def retype(self, context, volume, new_type, migration_policy=None):
1619        """Attempt to modify the type associated with an existing volume."""
1620        context.authorize(vol_action_policy.RETYPE_POLICY, target_obj=volume)
1621        if migration_policy and migration_policy not in ('on-demand', 'never'):
1622            msg = _('migration_policy must be \'on-demand\' or \'never\', '
1623                    'passed: %s') % new_type
1624            LOG.error(msg)
1625            raise exception.InvalidInput(reason=msg)
1626
1627        # Support specifying volume type by ID or name
1628        try:
1629            new_type = (
1630                volume_type.VolumeType.get_by_name_or_id(context.elevated(),
1631                                                         new_type))
1632        except exception.InvalidVolumeType:
1633            msg = _('Invalid volume_type passed: %s.') % new_type
1634            LOG.error(msg)
1635            raise exception.InvalidInput(reason=msg)
1636
1637        new_type_id = new_type['id']
1638
1639        # NOTE(jdg): We check here if multiattach is involved in either side
1640        # of the retype, we can't change multiattach on an in-use volume
1641        # because there's things the hypervisor needs when attaching, so
1642        # we just disallow retype of in-use volumes in this case.  You still
1643        # have to get through scheduling if all the conditions are met, we
1644        # should consider an up front capabilities check to give fast feedback
1645        # rather than "No hosts found" and error status
1646        src_is_multiattach = volume.multiattach
1647        tgt_is_multiattach = False
1648
1649        if new_type:
1650            tgt_is_multiattach = self._is_multiattach(new_type)
1651
1652        if src_is_multiattach != tgt_is_multiattach:
1653            if volume.status != "available":
1654                msg = _('Invalid volume_type passed, retypes affecting '
1655                        'multiattach are only allowed on available volumes, '
1656                        'the specified volume however currently has a status '
1657                        'of: %s.') % volume.status
1658                LOG.info(msg)
1659                raise exception.InvalidInput(reason=msg)
1660
1661            # If they are retyping to a multiattach capable, make sure they
1662            # are allowed to do so.
1663            if tgt_is_multiattach:
1664                context.authorize(vol_policy.MULTIATTACH_POLICY,
1665                                  target_obj=volume)
1666
1667        if tgt_is_multiattach and self._is_encrypted(new_type):
1668            msg = ('Retype requested both encryption and multi-attach, '
1669                   'which is not supported.')
1670            raise exception.InvalidInput(reason=msg)
1671
1672        # We're checking here in so that we can report any quota issues as
1673        # early as possible, but won't commit until we change the type. We
1674        # pass the reservations onward in case we need to roll back.
1675        reservations = quota_utils.get_volume_type_reservation(
1676            context, volume, new_type_id, reserve_vol_type_only=True)
1677
1678        # Get old reservations
1679        try:
1680            reserve_opts = {'volumes': -1, 'gigabytes': -volume.size}
1681            QUOTAS.add_volume_type_opts(context,
1682                                        reserve_opts,
1683                                        volume.volume_type_id)
1684            # NOTE(wanghao): We don't need to reserve volumes and gigabytes
1685            # quota for retyping operation since they didn't changed, just
1686            # reserve volume_type and type gigabytes is fine.
1687            reserve_opts.pop('volumes')
1688            reserve_opts.pop('gigabytes')
1689            old_reservations = QUOTAS.reserve(context,
1690                                              project_id=volume.project_id,
1691                                              **reserve_opts)
1692        except Exception:
1693            volume.status = volume.previous_status
1694            volume.save()
1695            msg = _("Failed to update quota usage while retyping volume.")
1696            LOG.exception(msg, resource=volume)
1697            raise exception.CinderException(msg)
1698
1699        # Build required conditions for conditional update
1700        expected = {'status': ('available', 'in-use'),
1701                    'migration_status': self.AVAILABLE_MIGRATION_STATUS,
1702                    'consistencygroup_id': (None, ''),
1703                    'group_id': (None, ''),
1704                    'volume_type_id': db.Not(new_type_id)}
1705
1706        # We don't support changing QoS at the front-end yet for in-use volumes
1707        # TODO(avishay): Call Nova to change QoS setting (libvirt has support
1708        # - virDomainSetBlockIoTune() - Nova does not have support yet).
1709        filters = [db.volume_qos_allows_retype(new_type_id)]
1710
1711        updates = {'status': 'retyping',
1712                   'previous_status': objects.Volume.model.status}
1713
1714        if not volume.conditional_update(updates, expected, filters):
1715            msg = _('Retype needs volume to be in available or in-use state, '
1716                    'not be part of an active migration or a consistency '
1717                    'group, requested type has to be different that the '
1718                    'one from the volume, and for in-use volumes front-end '
1719                    'qos specs cannot change.')
1720            LOG.error(msg)
1721            QUOTAS.rollback(context, reservations + old_reservations,
1722                            project_id=volume.project_id)
1723            raise exception.InvalidVolume(reason=msg)
1724
1725        request_spec = {'volume_properties': volume,
1726                        'volume_id': volume.id,
1727                        'volume_type': new_type,
1728                        'migration_policy': migration_policy,
1729                        'quota_reservations': reservations,
1730                        'old_reservations': old_reservations}
1731
1732        self.scheduler_rpcapi.retype(context, volume,
1733                                     request_spec=request_spec,
1734                                     filter_properties={})
1735        volume.multiattach = tgt_is_multiattach
1736        volume.save()
1737        LOG.info("Retype volume request issued successfully.",
1738                 resource=volume)
1739
1740    def _get_service_by_host_cluster(self, context, host, cluster_name,
1741                                     resource='volume'):
1742        elevated = context.elevated()
1743
1744        svc_cluster = cluster_name and volume_utils.extract_host(cluster_name,
1745                                                                 'backend')
1746        svc_host = host and volume_utils.extract_host(host, 'backend')
1747
1748        # NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get
1749        # a service from the DB we are getting either one specific service from
1750        # a host or any service that is up from a cluster, which means that the
1751        # cluster itself is also up.
1752        try:
1753            service = objects.Service.get_by_id(elevated, None, host=svc_host,
1754                                                binary=constants.VOLUME_BINARY,
1755                                                cluster_name=svc_cluster)
1756        except exception.ServiceNotFound:
1757            with excutils.save_and_reraise_exception():
1758                LOG.error('Unable to find service: %(service)s for '
1759                          'given host: %(host)s and cluster %(cluster)s.',
1760                          {'service': constants.VOLUME_BINARY, 'host': host,
1761                           'cluster': cluster_name})
1762
1763        if service.disabled and (not service.cluster_name or
1764                                 service.cluster.disabled):
1765            LOG.error('Unable to manage existing %s on a disabled '
1766                      'service.', resource)
1767            raise exception.ServiceUnavailable()
1768
1769        if not service.is_up:
1770            LOG.error('Unable to manage existing %s on a service that is '
1771                      'down.', resource)
1772            raise exception.ServiceUnavailable()
1773
1774        return service
1775
1776    def manage_existing(self, context, host, cluster_name, ref, name=None,
1777                        description=None, volume_type=None, metadata=None,
1778                        availability_zone=None, bootable=False):
1779
1780        if 'source-name' in ref:
1781            vol_id = volume_utils.extract_id_from_volume_name(
1782                ref['source-name'])
1783            if vol_id and volume_utils.check_already_managed_volume(vol_id):
1784                raise exception.InvalidVolume(
1785                    _("Unable to manage existing volume."
1786                      " The volume is already managed"))
1787
1788        if volume_type and 'extra_specs' not in volume_type:
1789            extra_specs = volume_types.get_volume_type_extra_specs(
1790                volume_type['id'])
1791            volume_type['extra_specs'] = extra_specs
1792
1793        service = self._get_service_by_host_cluster(context, host,
1794                                                    cluster_name)
1795
1796        if availability_zone is None:
1797            availability_zone = service.availability_zone
1798
1799        if not cluster_name and bool(volume_utils.extract_host(host, 'pool')):
1800            manage_host = host
1801        else:
1802            manage_host = service.host
1803
1804        manage_what = {
1805            'context': context,
1806            'name': name,
1807            'description': description,
1808            'host': manage_host,
1809            'cluster_name': service.cluster_name,
1810            'ref': ref,
1811            'volume_type': volume_type,
1812            'metadata': metadata,
1813            'availability_zone': availability_zone,
1814            'bootable': bootable,
1815            'size': 0,
1816            'group_snapshot': None,
1817            'optional_args': {'is_quota_committed': False},
1818            'volume_type_id': None if not volume_type else volume_type['id'],
1819        }
1820
1821        try:
1822            flow_engine = manage_existing.get_flow(self.scheduler_rpcapi,
1823                                                   self.db,
1824                                                   manage_what)
1825        except Exception:
1826            msg = _('Failed to manage api volume flow.')
1827            LOG.exception(msg)
1828            raise exception.CinderException(msg)
1829
1830        # Attaching this listener will capture all of the notifications that
1831        # taskflow sends out and redirect them to a more useful log for
1832        # cinder's debugging (or error reporting) usage.
1833        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
1834            flow_engine.run()
1835            vol_ref = flow_engine.storage.fetch('volume')
1836            LOG.info("Manage volume request issued successfully.",
1837                     resource=vol_ref)
1838            return vol_ref
1839
1840    def get_manageable_volumes(self, context, host, cluster_name, marker=None,
1841                               limit=None, offset=None, sort_keys=None,
1842                               sort_dirs=None):
1843        svc = self._get_service_by_host_cluster(context, host, cluster_name)
1844        return self.volume_rpcapi.get_manageable_volumes(context, svc,
1845                                                         marker, limit,
1846                                                         offset, sort_keys,
1847                                                         sort_dirs)
1848
1849    def manage_existing_snapshot(self, context, ref, volume,
1850                                 name=None, description=None,
1851                                 metadata=None):
1852        service = self._get_service_by_host_cluster(context, volume.host,
1853                                                    volume.cluster_name,
1854                                                    'snapshot')
1855
1856        snapshot_object = self.create_snapshot_in_db(context, volume, name,
1857                                                     description, True,
1858                                                     metadata, None,
1859                                                     commit_quota=True)
1860        self.volume_rpcapi.manage_existing_snapshot(
1861            context, snapshot_object, ref, service.service_topic_queue)
1862        return snapshot_object
1863
1864    def get_manageable_snapshots(self, context, host, cluster_name,
1865                                 marker=None, limit=None, offset=None,
1866                                 sort_keys=None, sort_dirs=None):
1867        svc = self._get_service_by_host_cluster(context, host, cluster_name,
1868                                                'snapshot')
1869        return self.volume_rpcapi.get_manageable_snapshots(context, svc,
1870                                                           marker, limit,
1871                                                           offset, sort_keys,
1872                                                           sort_dirs)
1873
1874    def _get_cluster_and_services_for_replication(self, ctxt, host,
1875                                                  cluster_name):
1876        services = objects.ServiceList.get_all(
1877            ctxt, filters={'host': host, 'cluster_name': cluster_name,
1878                           'binary': constants.VOLUME_BINARY})
1879
1880        if not services:
1881            if host:
1882                msg = _("No service found with host=%s") % host
1883            else:
1884                msg = _("No service found with cluster=%s") % cluster_name
1885
1886            raise exception.ServiceNotFound(msg)
1887
1888        cluster = services[0].cluster
1889        # Check that the host or cluster we received only results in 1 host or
1890        # hosts from the same cluster.
1891        if cluster_name:
1892            check_attribute = 'cluster_name'
1893            expected = cluster.name
1894        else:
1895            check_attribute = 'host'
1896            expected = services[0].host
1897        if any(getattr(s, check_attribute) != expected for s in services):
1898            msg = _('Services from different clusters found.')
1899            raise exception.InvalidParameterValue(msg)
1900
1901        # If we received host parameter but host belongs to a cluster we have
1902        # to change all the services in the cluster, not just one host
1903        if host and cluster:
1904            services = cluster.services
1905
1906        return cluster, services
1907
1908    def _replication_db_change(self, ctxt, field, expected_value, new_value,
1909                               host, cluster_name, check_up=False):
1910        def _error_msg(service):
1911            expected = utils.build_or_str(six.text_type(expected_value))
1912            up_msg = 'and must be up ' if check_up else ''
1913            msg = (_('%(field)s in %(service)s must be %(expected)s '
1914                     '%(up_msg)sto failover.')
1915                   % {'field': field, 'service': service,
1916                      'expected': expected, 'up_msg': up_msg})
1917            LOG.error(msg)
1918            return msg
1919
1920        cluster, services = self._get_cluster_and_services_for_replication(
1921            ctxt, host, cluster_name)
1922
1923        expect = {field: expected_value}
1924        change = {field: new_value}
1925
1926        if cluster:
1927            old_value = getattr(cluster, field)
1928            if ((check_up and not cluster.is_up)
1929                    or not cluster.conditional_update(change, expect)):
1930                msg = _error_msg(cluster.name)
1931                raise exception.InvalidInput(reason=msg)
1932
1933        changed = []
1934        not_changed = []
1935        for service in services:
1936            if ((not check_up or service.is_up)
1937                    and service.conditional_update(change, expect)):
1938                changed.append(service)
1939            else:
1940                not_changed.append(service)
1941
1942        # If there were some services that couldn't be changed we should at
1943        # least log the error.
1944        if not_changed:
1945            msg = _error_msg([s.host for s in not_changed])
1946            # If we couldn't change any of the services
1947            if not changed:
1948                # Undo the cluster change
1949                if cluster:
1950                    setattr(cluster, field, old_value)
1951                    cluster.save()
1952                raise exception.InvalidInput(
1953                    reason=_('No service could be changed: %s') % msg)
1954            LOG.warning('Some services could not be changed: %s', msg)
1955
1956        return cluster, services
1957
1958    def failover(self, ctxt, host, cluster_name, secondary_id=None):
1959        ctxt.authorize(svr_policy.FAILOVER_POLICY)
1960        ctxt = ctxt if ctxt.is_admin else ctxt.elevated()
1961
1962        # TODO(geguileo): In P - Remove this version check
1963        rpc_version = self.volume_rpcapi.determine_rpc_version_cap()
1964        rpc_version = versionutils.convert_version_to_tuple(rpc_version)
1965        if cluster_name and rpc_version < (3, 5):
1966            msg = _('replication operations with cluster field')
1967            raise exception.UnavailableDuringUpgrade(action=msg)
1968
1969        rep_fields = fields.ReplicationStatus
1970        expected_values = [rep_fields.ENABLED, rep_fields.FAILED_OVER]
1971        new_value = rep_fields.FAILING_OVER
1972
1973        cluster, services = self._replication_db_change(
1974            ctxt, 'replication_status', expected_values, new_value, host,
1975            cluster_name, check_up=True)
1976
1977        self.volume_rpcapi.failover(ctxt, services[0], secondary_id)
1978
1979    def freeze_host(self, ctxt, host, cluster_name):
1980        ctxt.authorize(svr_policy.FREEZE_POLICY)
1981        ctxt = ctxt if ctxt.is_admin else ctxt.elevated()
1982
1983        expected = False
1984        new_value = True
1985        cluster, services = self._replication_db_change(
1986            ctxt, 'frozen', expected, new_value, host, cluster_name,
1987            check_up=False)
1988
1989        # Should we set service status to disabled to keep
1990        # scheduler calls from being sent? Just use existing
1991        # `cinder service-disable reason=freeze`
1992        self.volume_rpcapi.freeze_host(ctxt, services[0])
1993
1994    def thaw_host(self, ctxt, host, cluster_name):
1995        ctxt.authorize(svr_policy.THAW_POLICY)
1996        ctxt = ctxt if ctxt.is_admin else ctxt.elevated()
1997
1998        expected = True
1999        new_value = False
2000        cluster, services = self._replication_db_change(
2001            ctxt, 'frozen', expected, new_value, host, cluster_name,
2002            check_up=False)
2003
2004        if not self.volume_rpcapi.thaw_host(ctxt, services[0]):
2005            return "Backend reported error during thaw_host operation."
2006
2007    def check_volume_filters(self, filters, strict=False):
2008        """Sets the user filter value to accepted format"""
2009        booleans = self.db.get_booleans_for_table('volume')
2010
2011        # To translate any true/false equivalent to True/False
2012        # which is only acceptable format in database queries.
2013
2014        for key, val in filters.items():
2015            try:
2016                if key in booleans:
2017                    filters[key] = self._check_boolean_filter_value(
2018                        key, val, strict)
2019                elif key == 'display_name':
2020                    # Use the raw value of display name as is for the filter
2021                    # without passing it through ast.literal_eval(). If the
2022                    # display name is a properly quoted string (e.g. '"foo"')
2023                    # then literal_eval() strips the quotes (i.e. 'foo'), so
2024                    # the filter becomes different from the user input.
2025                    continue
2026                else:
2027                    filters[key] = ast.literal_eval(val)
2028            except (ValueError, SyntaxError):
2029                LOG.debug('Could not evaluate value %s, assuming string', val)
2030
2031    def _check_boolean_filter_value(self, key, val, strict=False):
2032        """Boolean filter values in Volume GET.
2033
2034        Before VOLUME_LIST_BOOTABLE, all values other than 'False', 'false',
2035        'FALSE' were trated as True for specific boolean filter parameters in
2036        Volume GET request.
2037
2038        But VOLUME_LIST_BOOTABLE onwards, only true/True/0/1/False/false
2039        parameters are supported.
2040        All other input values to specific boolean filter parameter will
2041        lead to raising exception.
2042
2043        This changes API behavior. So, micro version introduced for
2044        VOLUME_LIST_BOOTABLE onwards.
2045        """
2046        if strict:
2047            # for updated behavior, from VOLUME_LIST_BOOTABLE onwards.
2048            # To translate any true/false/t/f/0/1 to True/False
2049            # which is only acceptable format in database queries.
2050            try:
2051                return strutils.bool_from_string(val, strict=True)
2052            except ValueError:
2053                msg = _('\'%(key)s = %(value)s\'') % {'key': key,
2054                                                      'value': val}
2055                raise exception.InvalidInput(reason=msg)
2056        else:
2057            # For existing behavior(before version VOLUME_LIST_BOOTABLE)
2058            accepted_true = ['True', 'true', 'TRUE']
2059            accepted_false = ['False', 'false', 'FALSE']
2060
2061            if val in accepted_false:
2062                return False
2063            elif val in accepted_true:
2064                return True
2065            else:
2066                return bool(val)
2067
2068    def _attachment_reserve(self, ctxt, vref, instance_uuid=None):
2069        # NOTE(jdg): Reserved is a special case, we're avoiding allowing
2070        # creation of other new reserves/attachments while in this state
2071        # so we avoid contention issues with shared connections
2072
2073        # Multiattach of bootable volumes is a special case with it's own
2074        # policy, check that here right off the bat
2075        if (vref.get('multiattach', False) and
2076                vref.status == 'in-use' and
2077                vref.bootable):
2078            ctxt.authorize(
2079                attachment_policy.MULTIATTACH_BOOTABLE_VOLUME_POLICY,
2080                target_obj=vref)
2081
2082        # FIXME(JDG):  We want to be able to do things here like reserve a
2083        # volume for Nova to do BFV WHILE the volume may be in the process of
2084        # downloading image, we add downloading here; that's easy enough but
2085        # we've got a race between with the attaching/detaching that we do
2086        # locally on the Cinder node.  Just come up with an easy way to
2087        # determine if we're attaching to the Cinder host for some work or if
2088        # we're being used by the outside world.
2089        expected = {'multiattach': vref.multiattach,
2090                    'status': (('available', 'in-use', 'downloading')
2091                               if vref.multiattach
2092                               else ('available', 'downloading'))}
2093
2094        result = vref.conditional_update({'status': 'reserved'}, expected)
2095
2096        if not result:
2097            override = False
2098            if instance_uuid:
2099                # Refresh the volume reference in case multiple instances were
2100                # being concurrently attached to the same non-multiattach
2101                # volume.
2102                vref = objects.Volume.get_by_id(ctxt, vref.id)
2103                for attachment in vref.volume_attachment:
2104                    # If we're attaching the same volume to the same instance,
2105                    # we could be migrating the instance to another host in
2106                    # which case we want to allow the reservation.
2107                    # (LP BUG: 1694530)
2108                    if attachment.instance_uuid == instance_uuid:
2109                        override = True
2110                        break
2111
2112            if not override:
2113                msg = (_('Volume %(vol_id)s status must be %(statuses)s') %
2114                       {'vol_id': vref.id,
2115                        'statuses': utils.build_or_str(expected['status'])})
2116                raise exception.InvalidVolume(reason=msg)
2117
2118        values = {'volume_id': vref.id,
2119                  'volume_host': vref.host,
2120                  'attach_status': 'reserved',
2121                  'instance_uuid': instance_uuid}
2122        db_ref = self.db.volume_attach(ctxt.elevated(), values)
2123        return objects.VolumeAttachment.get_by_id(ctxt, db_ref['id'])
2124
2125    def attachment_create(self,
2126                          ctxt,
2127                          volume_ref,
2128                          instance_uuid,
2129                          connector=None):
2130        """Create an attachment record for the specified volume."""
2131        ctxt.authorize(attachment_policy.CREATE_POLICY, target_obj=volume_ref)
2132        connection_info = {}
2133        if "error" in volume_ref.status:
2134            msg = ('Volume attachments can not be created if the volume '
2135                   'is in an error state. '
2136                   'The Volume %(volume_id)s currently has a status of: '
2137                   '%(volume_status)s ') % {
2138                       'volume_id': volume_ref.id,
2139                       'volume_status': volume_ref.status}
2140            LOG.error(msg)
2141            raise exception.InvalidVolume(reason=msg)
2142        attachment_ref = self._attachment_reserve(ctxt,
2143                                                  volume_ref,
2144                                                  instance_uuid)
2145        if connector:
2146            connection_info = (
2147                self.volume_rpcapi.attachment_update(ctxt,
2148                                                     volume_ref,
2149                                                     connector,
2150                                                     attachment_ref.id))
2151        attachment_ref.connection_info = connection_info
2152        if self.db.volume_admin_metadata_get(
2153                ctxt.elevated(),
2154                volume_ref['id']).get('readonly', False):
2155            attachment_ref.attach_mode = 'ro'
2156        attachment_ref.save()
2157        return attachment_ref
2158
2159    @coordination.synchronized(
2160        '{f_name}-{attachment_ref.volume_id}-{connector[host]}')
2161    def attachment_update(self, ctxt, attachment_ref, connector):
2162        """Update an existing attachment record."""
2163        # Valid items to update (connector includes mode and mountpoint):
2164        #   1. connector (required)
2165        #     a. mode (if None use value from attachment_ref)
2166        #     b. mountpoint (if None use value from attachment_ref)
2167        #     c. instance_uuid(if None use value from attachment_ref)
2168
2169        # This method has a synchronized() lock on the volume id
2170        # because we have to prevent race conditions around checking
2171        # for duplicate attachment requests to the same host.
2172
2173        # We fetch the volume object and pass it to the rpc call because we
2174        # need to direct this to the correct host/backend
2175
2176        ctxt.authorize(attachment_policy.UPDATE_POLICY,
2177                       target_obj=attachment_ref)
2178        volume_ref = objects.Volume.get_by_id(ctxt, attachment_ref.volume_id)
2179        if "error" in volume_ref.status:
2180            msg = ('Volume attachments can not be updated if the volume '
2181                   'is in an error state. The Volume %(volume_id)s '
2182                   'currently has a status of: %(volume_status)s ') % {
2183                       'volume_id': volume_ref.id,
2184                       'volume_status': volume_ref.status}
2185            LOG.error(msg)
2186            raise exception.InvalidVolume(reason=msg)
2187
2188        if (len(volume_ref.volume_attachment) > 1 and
2189            not (volume_ref.multiattach or
2190                 self._is_multiattach(volume_ref.volume_type))):
2191            # Check whether all connection hosts are unique
2192            # Multiple attachments to different hosts is permitted to
2193            # support Nova instance migration.
2194
2195            # This particular check also does not prevent multiple attachments
2196            # for a multiattach volume to the same instance.
2197
2198            connection_hosts = set(a.connector['host']
2199                                   for a in volume_ref.volume_attachment
2200                                   if a.connection_info)
2201
2202            if len(connection_hosts) > 0:
2203                # We raced, and have more than one connection
2204
2205                msg = _('duplicate connectors detected on volume '
2206                        '%(vol)s') % {'vol': volume_ref.id}
2207
2208                raise exception.InvalidVolume(reason=msg)
2209
2210        connection_info = (
2211            self.volume_rpcapi.attachment_update(ctxt,
2212                                                 volume_ref,
2213                                                 connector,
2214                                                 attachment_ref.id))
2215        attachment_ref.connection_info = connection_info
2216        attachment_ref.save()
2217        return attachment_ref
2218
2219    def attachment_delete(self, ctxt, attachment):
2220        ctxt.authorize(attachment_policy.DELETE_POLICY,
2221                       target_obj=attachment)
2222        volume = objects.Volume.get_by_id(ctxt, attachment.volume_id)
2223        if attachment.attach_status == 'reserved':
2224            self.db.volume_detached(ctxt.elevated(), attachment.volume_id,
2225                                    attachment.get('id'))
2226            self.db.volume_admin_metadata_delete(ctxt.elevated(),
2227                                                 attachment.volume_id,
2228                                                 'attached_mode')
2229            volume_utils.notify_about_volume_usage(ctxt, volume, "detach.end")
2230        else:
2231            self.volume_rpcapi.attachment_delete(ctxt,
2232                                                 attachment.id,
2233                                                 volume)
2234        status_updates = {'status': 'available',
2235                          'attach_status': 'detached'}
2236        remaining_attachments = AO_LIST.get_all_by_volume_id(ctxt, volume.id)
2237        LOG.debug("Remaining volume attachments: %s", remaining_attachments,
2238                  resource=volume)
2239
2240        # NOTE(jdg) Try and figure out the > state we have left and set that
2241        # attached > attaching > > detaching > reserved
2242        pending_status_list = []
2243        for attachment in remaining_attachments:
2244            pending_status_list.append(attachment.attach_status)
2245            LOG.debug("Adding status of: %s to pending status list "
2246                      "for volume.", attachment.attach_status,
2247                      resource=volume)
2248
2249        LOG.debug("Pending status list for volume during "
2250                  "attachment-delete: %s",
2251                  pending_status_list, resource=volume)
2252        if 'attached' in pending_status_list:
2253            status_updates['status'] = 'in-use'
2254            status_updates['attach_status'] = 'attached'
2255        elif 'attaching' in pending_status_list:
2256            status_updates['status'] = 'attaching'
2257            status_updates['attach_status'] = 'attaching'
2258        elif 'detaching' in pending_status_list:
2259            status_updates['status'] = 'detaching'
2260            status_updates['attach_status'] = 'detaching'
2261        elif 'reserved' in pending_status_list:
2262            status_updates['status'] = 'reserved'
2263            status_updates['attach_status'] = 'reserved'
2264
2265        volume.status = status_updates['status']
2266        volume.attach_status = status_updates['attach_status']
2267        volume.save()
2268        return remaining_attachments
2269
2270
2271class HostAPI(base.Base):
2272    """Sub-set of the Volume Manager API for managing host operations."""
2273
2274    def set_host_enabled(self, context, host, enabled):
2275        """Sets the specified host's ability to accept new volumes."""
2276        raise NotImplementedError()
2277
2278    def get_host_uptime(self, context, host):
2279        """Returns the result of calling "uptime" on the target host."""
2280        raise NotImplementedError()
2281
2282    def host_power_action(self, context, host, action):
2283        raise NotImplementedError()
2284
2285    def set_host_maintenance(self, context, host, mode):
2286        """Start/Stop host maintenance window.
2287
2288        On start, it triggers volume evacuation.
2289        """
2290        raise NotImplementedError()
2291