1#    Copyright 2013 OpenStack Foundation
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14"""RADOS Block Device Driver"""
15
16from __future__ import absolute_import
17import binascii
18import json
19import math
20import os
21import tempfile
22
23from castellan import key_manager
24from eventlet import tpool
25from os_brick import encryptors
26from os_brick.initiator import linuxrbd
27from oslo_config import cfg
28from oslo_log import log as logging
29from oslo_utils import encodeutils
30from oslo_utils import excutils
31from oslo_utils import fileutils
32from oslo_utils import units
33import six
34from six.moves import urllib
35
36from cinder import exception
37from cinder.i18n import _
38from cinder.image import image_utils
39from cinder import interface
40from cinder import objects
41from cinder.objects import fields
42from cinder import utils
43from cinder.volume import configuration
44from cinder.volume import driver
45from cinder.volume import utils as volume_utils
46
47try:
48    import rados
49    import rbd
50except ImportError:
51    rados = None
52    rbd = None
53
54
55LOG = logging.getLogger(__name__)
56
57RBD_OPTS = [
58    cfg.StrOpt('rbd_cluster_name',
59               default='ceph',
60               help='The name of ceph cluster'),
61    cfg.StrOpt('rbd_pool',
62               default='rbd',
63               help='The RADOS pool where rbd volumes are stored'),
64    cfg.StrOpt('rbd_user',
65               help='The RADOS client name for accessing rbd volumes '
66                    '- only set when using cephx authentication'),
67    cfg.StrOpt('rbd_ceph_conf',
68               default='',  # default determined by librados
69               help='Path to the ceph configuration file'),
70    cfg.StrOpt('rbd_keyring_conf',
71               default='',
72               help='Path to the ceph keyring file'),
73    cfg.BoolOpt('rbd_flatten_volume_from_snapshot',
74                default=False,
75                help='Flatten volumes created from snapshots to remove '
76                     'dependency from volume to snapshot'),
77    cfg.StrOpt('rbd_secret_uuid',
78               help='The libvirt uuid of the secret for the rbd_user '
79                    'volumes'),
80    cfg.IntOpt('rbd_max_clone_depth',
81               default=5,
82               help='Maximum number of nested volume clones that are '
83                    'taken before a flatten occurs. Set to 0 to disable '
84                    'cloning.'),
85    cfg.IntOpt('rbd_store_chunk_size', default=4,
86               help='Volumes will be chunked into objects of this size '
87                    '(in megabytes).'),
88    cfg.IntOpt('rados_connect_timeout', default=-1,
89               help='Timeout value (in seconds) used when connecting to '
90                    'ceph cluster. If value < 0, no timeout is set and '
91                    'default librados value is used.'),
92    cfg.IntOpt('rados_connection_retries', default=3,
93               help='Number of retries if connection to ceph cluster '
94                    'failed.'),
95    cfg.IntOpt('rados_connection_interval', default=5,
96               help='Interval value (in seconds) between connection '
97                    'retries to ceph cluster.'),
98    cfg.IntOpt('replication_connect_timeout', default=5,
99               help='Timeout value (in seconds) used when connecting to '
100                    'ceph cluster to do a demotion/promotion of volumes. '
101                    'If value < 0, no timeout is set and default librados '
102                    'value is used.'),
103    cfg.BoolOpt('report_dynamic_total_capacity', default=True,
104                help='Set to True for driver to report total capacity as a '
105                     'dynamic value -used + current free- and to False to '
106                     'report a static value -quota max bytes if defined and '
107                     'global size of cluster if not-.'),
108    cfg.BoolOpt('rbd_exclusive_cinder_pool', default=False,
109                help="Set to True if the pool is used exclusively by Cinder. "
110                     "On exclusive use driver won't query images' provisioned "
111                     "size as they will match the value calculated by the "
112                     "Cinder core code for allocated_capacity_gb. This "
113                     "reduces the load on the Ceph cluster as well as on the "
114                     "volume service."),
115]
116
117CONF = cfg.CONF
118CONF.register_opts(RBD_OPTS, group=configuration.SHARED_CONF_GROUP)
119
120EXTRA_SPECS_REPL_ENABLED = "replication_enabled"
121
122
123class RBDVolumeProxy(object):
124    """Context manager for dealing with an existing rbd volume.
125
126    This handles connecting to rados and opening an ioctx automatically, and
127    otherwise acts like a librbd Image object.
128
129    Also this may reuse an external connection (client and ioctx args), but
130    note, that caller will be responsible for opening/closing connection.
131    Also `pool`, `remote`, `timeout` args will be ignored in that case.
132
133    The underlying librados client and ioctx can be accessed as the attributes
134    'client' and 'ioctx'.
135    """
136    def __init__(self, driver, name, pool=None, snapshot=None,
137                 read_only=False, remote=None, timeout=None,
138                 client=None, ioctx=None):
139        self._close_conn = not (client and ioctx)
140        rados_client, rados_ioctx = driver._connect_to_rados(
141            pool, remote, timeout) if self._close_conn else (client, ioctx)
142
143        if snapshot is not None:
144            snapshot = utils.convert_str(snapshot)
145        try:
146            self.volume = driver.rbd.Image(rados_ioctx,
147                                           utils.convert_str(name),
148                                           snapshot=snapshot,
149                                           read_only=read_only)
150            self.volume = tpool.Proxy(self.volume)
151        except driver.rbd.Error:
152            if self._close_conn:
153                driver._disconnect_from_rados(rados_client, rados_ioctx)
154            raise
155        self.driver = driver
156        self.client = rados_client
157        self.ioctx = rados_ioctx
158
159    def __enter__(self):
160        return self
161
162    def __exit__(self, type_, value, traceback):
163        try:
164            self.volume.close()
165        finally:
166            if self._close_conn:
167                self.driver._disconnect_from_rados(self.client, self.ioctx)
168
169    def __getattr__(self, attrib):
170        return getattr(self.volume, attrib)
171
172
173class RADOSClient(object):
174    """Context manager to simplify error handling for connecting to ceph."""
175    def __init__(self, driver, pool=None):
176        self.driver = driver
177        self.cluster, self.ioctx = driver._connect_to_rados(pool)
178
179    def __enter__(self):
180        return self
181
182    def __exit__(self, type_, value, traceback):
183        self.driver._disconnect_from_rados(self.cluster, self.ioctx)
184
185    @property
186    def features(self):
187        features = self.cluster.conf_get('rbd_default_features')
188        if ((features is None) or (int(features) == 0)):
189            features = self.driver.rbd.RBD_FEATURE_LAYERING
190        return int(features)
191
192
193@interface.volumedriver
194class RBDDriver(driver.CloneableImageVD,
195                driver.MigrateVD, driver.ManageableVD, driver.BaseVD):
196    """Implements RADOS block device (RBD) volume commands."""
197
198    VERSION = '1.2.0'
199
200    # ThirdPartySystems wiki page
201    CI_WIKI_NAME = "Cinder_Jenkins"
202
203    SYSCONFDIR = '/usr/local/etc/ceph/'
204
205    def __init__(self, active_backend_id=None, *args, **kwargs):
206        super(RBDDriver, self).__init__(*args, **kwargs)
207        self.configuration.append_config_values(RBD_OPTS)
208        self._stats = {}
209        # allow overrides for testing
210        self.rados = kwargs.get('rados', rados)
211        self.rbd = kwargs.get('rbd', rbd)
212
213        # All string args used with librbd must be None or utf-8 otherwise
214        # librbd will break.
215        for attr in ['rbd_cluster_name', 'rbd_user',
216                     'rbd_ceph_conf', 'rbd_pool']:
217            val = getattr(self.configuration, attr)
218            if val is not None:
219                setattr(self.configuration, attr, utils.convert_str(val))
220
221        self._backend_name = (self.configuration.volume_backend_name or
222                              self.__class__.__name__)
223        self._active_backend_id = active_backend_id
224        self._active_config = {}
225        self._is_replication_enabled = False
226        self._replication_targets = []
227        self._target_names = []
228
229    def _get_target_config(self, target_id):
230        """Get a replication target from known replication targets."""
231        for target in self._replication_targets:
232            if target['name'] == target_id:
233                return target
234        if not target_id or target_id == 'default':
235            return {
236                'name': self.configuration.rbd_cluster_name,
237                'conf': self.configuration.rbd_ceph_conf,
238                'user': self.configuration.rbd_user
239            }
240        raise exception.InvalidReplicationTarget(
241            reason=_('RBD: Unknown failover target host %s.') % target_id)
242
243    def do_setup(self, context):
244        """Performs initialization steps that could raise exceptions."""
245        self._do_setup_replication()
246        self._active_config = self._get_target_config(self._active_backend_id)
247
248    def _do_setup_replication(self):
249        replication_devices = self.configuration.safe_get(
250            'replication_device')
251        if replication_devices:
252            self._parse_replication_configs(replication_devices)
253            self._is_replication_enabled = True
254            self._target_names.append('default')
255
256    def _parse_replication_configs(self, replication_devices):
257        for replication_device in replication_devices:
258            if 'backend_id' not in replication_device:
259                msg = _('Missing backend_id in replication_device '
260                        'configuration.')
261                raise exception.InvalidConfigurationValue(msg)
262
263            name = replication_device['backend_id']
264            conf = replication_device.get('conf',
265                                          self.SYSCONFDIR + name + '.conf')
266            user = replication_device.get(
267                'user', self.configuration.rbd_user or 'cinder')
268            # Pool has to be the same in all clusters
269            replication_target = {'name': name,
270                                  'conf': utils.convert_str(conf),
271                                  'user': utils.convert_str(user)}
272            LOG.info('Adding replication target: %s.', name)
273            self._replication_targets.append(replication_target)
274            self._target_names.append(name)
275
276    def _get_config_tuple(self, remote=None):
277        if not remote:
278            remote = self._active_config
279        return (remote.get('name'), remote.get('conf'), remote.get('user'))
280
281    def check_for_setup_error(self):
282        """Returns an error if prerequisites aren't met."""
283        if rados is None:
284            msg = _('rados and rbd python libraries not found')
285            raise exception.VolumeBackendAPIException(data=msg)
286
287        for attr in ['rbd_cluster_name', 'rbd_pool']:
288            val = getattr(self.configuration, attr)
289            if not val:
290                raise exception.InvalidConfigurationValue(option=attr,
291                                                          value=val)
292        # NOTE: Checking connection to ceph
293        # RADOSClient __init__ method invokes _connect_to_rados
294        # so no need to check for self.rados.Error here.
295        with RADOSClient(self):
296            pass
297
298    def RBDProxy(self):
299        return tpool.Proxy(self.rbd.RBD())
300
301    def _ceph_args(self):
302        args = []
303
304        name, conf, user = self._get_config_tuple()
305
306        if user:
307            args.extend(['--id', user])
308        if name:
309            args.extend(['--cluster', name])
310        if conf:
311            args.extend(['--conf', conf])
312
313        return args
314
315    def _connect_to_rados(self, pool=None, remote=None, timeout=None):
316        @utils.retry(exception.VolumeBackendAPIException,
317                     self.configuration.rados_connection_interval,
318                     self.configuration.rados_connection_retries)
319        def _do_conn(pool, remote, timeout):
320            name, conf, user = self._get_config_tuple(remote)
321
322            if pool is not None:
323                pool = utils.convert_str(pool)
324            else:
325                pool = self.configuration.rbd_pool
326
327            if timeout is None:
328                timeout = self.configuration.rados_connect_timeout
329
330            LOG.debug("connecting to %(name)s (timeout=%(timeout)s).",
331                      {'name': name, 'timeout': timeout})
332
333            client = self.rados.Rados(rados_id=user,
334                                      clustername=name,
335                                      conffile=conf)
336
337            try:
338                if timeout >= 0:
339                    timeout = six.text_type(timeout)
340                    client.conf_set('rados_osd_op_timeout', timeout)
341                    client.conf_set('rados_mon_op_timeout', timeout)
342                    client.conf_set('client_mount_timeout', timeout)
343
344                client.connect()
345                ioctx = client.open_ioctx(pool)
346                return client, ioctx
347            except self.rados.Error:
348                msg = _("Error connecting to ceph cluster.")
349                LOG.exception(msg)
350                client.shutdown()
351                raise exception.VolumeBackendAPIException(data=msg)
352
353        return _do_conn(pool, remote, timeout)
354
355    def _disconnect_from_rados(self, client, ioctx):
356        # closing an ioctx cannot raise an exception
357        ioctx.close()
358        client.shutdown()
359
360    def _get_backup_snaps(self, rbd_image):
361        """Get list of any backup snapshots that exist on this volume.
362
363        There should only ever be one but accept all since they need to be
364        deleted before the volume can be.
365        """
366        # NOTE(dosaboy): we do the import here otherwise we get import conflict
367        # issues between the rbd driver and the ceph backup driver. These
368        # issues only seem to occur when NOT using them together and are
369        # triggered when the ceph backup driver imports the rbd volume driver.
370        from cinder.backup.drivers import ceph
371        return ceph.CephBackupDriver.get_backup_snaps(rbd_image)
372
373    def _get_mon_addrs(self):
374        args = ['ceph', 'mon', 'dump', '--format=json']
375        args.extend(self._ceph_args())
376        out, _ = self._execute(*args)
377        lines = out.split('\n')
378        if lines[0].startswith('dumped monmap epoch'):
379            lines = lines[1:]
380        monmap = json.loads('\n'.join(lines))
381        addrs = [mon['addr'] for mon in monmap['mons']]
382        hosts = []
383        ports = []
384        for addr in addrs:
385            host_port = addr[:addr.rindex('/')]
386            host, port = host_port.rsplit(':', 1)
387            hosts.append(host.strip('[]'))
388            ports.append(port)
389        return hosts, ports
390
391    def _get_usage_info(self):
392        """Calculate provisioned volume space in GiB.
393
394        Stats report should send provisioned size of volumes (snapshot must not
395        be included) and not the physical size of those volumes.
396
397        We must include all volumes, not only Cinder created volumes, because
398        Cinder created volumes are reported by the Cinder core code as
399        allocated_capacity_gb.
400        """
401        total_provisioned = 0
402        with RADOSClient(self) as client:
403            for t in self.RBDProxy().list(client.ioctx):
404                try:
405                    with RBDVolumeProxy(self, t, read_only=True,
406                                        client=client.cluster,
407                                        ioctx=client.ioctx) as v:
408                        size = v.size()
409                except (self.rbd.ImageNotFound, self.rbd.OSError):
410                    LOG.debug("Image %s is not found.", t)
411                else:
412                    total_provisioned += size
413
414        total_provisioned = math.ceil(float(total_provisioned) / units.Gi)
415        return total_provisioned
416
417    def _get_pool_stats(self):
418        """Gets pool free and total capacity in GiB.
419
420        Calculate free and total capacity of the pool based on the pool's
421        defined quota and pools stats.
422
423        Returns a tuple with (free, total) where they are either unknown or a
424        real number with a 2 digit precision.
425        """
426        pool_name = self.configuration.rbd_pool
427
428        with RADOSClient(self) as client:
429            ret, df_outbuf, __ = client.cluster.mon_command(
430                '{"prefix":"df", "format":"json"}', '')
431            if ret:
432                LOG.warning('Unable to get rados pool stats.')
433                return 'unknown', 'unknown'
434
435            ret, quota_outbuf, __ = client.cluster.mon_command(
436                '{"prefix":"osd pool get-quota", "pool": "%s",'
437                ' "format":"json"}' % pool_name, '')
438            if ret:
439                LOG.warning('Unable to get rados pool quotas.')
440                return 'unknown', 'unknown'
441
442        df_outbuf = encodeutils.safe_decode(df_outbuf)
443        df_data = json.loads(df_outbuf)
444        pool_stats = [pool for pool in df_data['pools']
445                      if pool['name'] == pool_name][0]['stats']
446
447        quota_outbuf = encodeutils.safe_decode(quota_outbuf)
448        bytes_quota = json.loads(quota_outbuf)['quota_max_bytes']
449        # With quota the total is the quota limit and free is quota - used
450        if bytes_quota:
451            total_capacity = bytes_quota
452            free_capacity = max(min(total_capacity - pool_stats['bytes_used'],
453                                    pool_stats['max_avail']),
454                                0)
455        # Without quota free is pools max available and total is global size
456        else:
457            total_capacity = df_data['stats']['total_bytes']
458            free_capacity = pool_stats['max_avail']
459
460        # If we want dynamic total capacity (default behavior)
461        if self.configuration.safe_get('report_dynamic_total_capacity'):
462            total_capacity = free_capacity + pool_stats['bytes_used']
463
464        free_capacity = round((float(free_capacity) / units.Gi), 2)
465        total_capacity = round((float(total_capacity) / units.Gi), 2)
466
467        return free_capacity, total_capacity
468
469    def _update_volume_stats(self):
470        location_info = '%s:%s:%s:%s:%s' % (
471            self.configuration.rbd_cluster_name,
472            self.configuration.rbd_ceph_conf,
473            self._get_fsid(),
474            self.configuration.rbd_user,
475            self.configuration.rbd_pool)
476
477        stats = {
478            'vendor_name': 'Open Source',
479            'driver_version': self.VERSION,
480            'storage_protocol': 'ceph',
481            'total_capacity_gb': 'unknown',
482            'free_capacity_gb': 'unknown',
483            'reserved_percentage': (
484                self.configuration.safe_get('reserved_percentage')),
485            'multiattach': False,
486            'thin_provisioning_support': True,
487            'max_over_subscription_ratio': (
488                self.configuration.safe_get('max_over_subscription_ratio')),
489            'location_info': location_info,
490        }
491
492        backend_name = self.configuration.safe_get('volume_backend_name')
493        stats['volume_backend_name'] = backend_name or 'RBD'
494
495        stats['replication_enabled'] = self._is_replication_enabled
496        if self._is_replication_enabled:
497            stats['replication_targets'] = self._target_names
498
499        try:
500            free_capacity, total_capacity = self._get_pool_stats()
501            stats['free_capacity_gb'] = free_capacity
502            stats['total_capacity_gb'] = total_capacity
503
504            # For exclusive pools let scheduler set provisioned_capacity_gb to
505            # allocated_capacity_gb, and for non exclusive query the value.
506            if not self.configuration.safe_get('rbd_exclusive_cinder_pool'):
507                total_gbi = self._get_usage_info()
508                stats['provisioned_capacity_gb'] = total_gbi
509        except self.rados.Error:
510            # just log and return unknown capacities and let scheduler set
511            # provisioned_capacity_gb = allocated_capacity_gb
512            LOG.exception('error refreshing volume stats')
513        self._stats = stats
514
515    def get_volume_stats(self, refresh=False):
516        """Return the current state of the volume service.
517
518        If 'refresh' is True, run the update first.
519        """
520        if refresh:
521            self._update_volume_stats()
522        return self._stats
523
524    def _get_clone_depth(self, client, volume_name, depth=0):
525        """Returns the number of ancestral clones of the given volume."""
526        parent_volume = self.rbd.Image(client.ioctx, volume_name)
527        try:
528            _pool, parent, _snap = self._get_clone_info(parent_volume,
529                                                        volume_name)
530        finally:
531            parent_volume.close()
532
533        if not parent:
534            return depth
535
536        # If clone depth was reached, flatten should have occurred so if it has
537        # been exceeded then something has gone wrong.
538        if depth > self.configuration.rbd_max_clone_depth:
539            raise Exception(_("clone depth exceeds limit of %s") %
540                            (self.configuration.rbd_max_clone_depth))
541
542        return self._get_clone_depth(client, parent, depth + 1)
543
544    def _extend_if_required(self, volume, src_vref):
545        """Extends a volume if required
546
547        In case src_vref size is smaller than the size if the requested
548        new volume call _resize().
549        """
550        if volume.size != src_vref.size:
551            LOG.debug("resize volume '%(dst_vol)s' from %(src_size)d to "
552                      "%(dst_size)d",
553                      {'dst_vol': volume.name, 'src_size': src_vref.size,
554                       'dst_size': volume.size})
555            self._resize(volume)
556
557    def create_cloned_volume(self, volume, src_vref):
558        """Create a cloned volume from another volume.
559
560        Since we are cloning from a volume and not a snapshot, we must first
561        create a snapshot of the source volume.
562
563        The user has the option to limit how long a volume's clone chain can be
564        by setting rbd_max_clone_depth. If a clone is made of another clone
565        and that clone has rbd_max_clone_depth clones behind it, the dest
566        volume will be flattened.
567        """
568        src_name = utils.convert_str(src_vref.name)
569        dest_name = utils.convert_str(volume.name)
570        clone_snap = "%s.clone_snap" % dest_name
571
572        # Do full copy if requested
573        if self.configuration.rbd_max_clone_depth <= 0:
574            with RBDVolumeProxy(self, src_name, read_only=True) as vol:
575                vol.copy(vol.ioctx, dest_name)
576                self._extend_if_required(volume, src_vref)
577            return
578
579        # Otherwise do COW clone.
580        with RADOSClient(self) as client:
581            src_volume = self.rbd.Image(client.ioctx, src_name)
582            LOG.debug("creating snapshot='%s'", clone_snap)
583            try:
584                # Create new snapshot of source volume
585                src_volume.create_snap(clone_snap)
586                src_volume.protect_snap(clone_snap)
587                # Now clone source volume snapshot
588                LOG.debug("cloning '%(src_vol)s@%(src_snap)s' to "
589                          "'%(dest)s'",
590                          {'src_vol': src_name, 'src_snap': clone_snap,
591                           'dest': dest_name})
592                self.RBDProxy().clone(client.ioctx, src_name, clone_snap,
593                                      client.ioctx, dest_name,
594                                      features=client.features)
595            except Exception as e:
596                src_volume.unprotect_snap(clone_snap)
597                src_volume.remove_snap(clone_snap)
598                msg = (_("Failed to clone '%(src_vol)s@%(src_snap)s' to "
599                         "'%(dest)s', error: %(error)s") %
600                       {'src_vol': src_name,
601                        'src_snap': clone_snap,
602                        'dest': dest_name,
603                        'error': e})
604                LOG.exception(msg)
605                raise exception.VolumeBackendAPIException(data=msg)
606            finally:
607                src_volume.close()
608
609            depth = self._get_clone_depth(client, src_name)
610            # If dest volume is a clone and rbd_max_clone_depth reached,
611            # flatten the dest after cloning. Zero rbd_max_clone_depth means
612            # infinite is allowed.
613            if depth >= self.configuration.rbd_max_clone_depth:
614                LOG.info("maximum clone depth (%d) has been reached - "
615                         "flattening dest volume",
616                         self.configuration.rbd_max_clone_depth)
617                dest_volume = self.rbd.Image(client.ioctx, dest_name)
618                try:
619                    # Flatten destination volume
620                    LOG.debug("flattening dest volume %s", dest_name)
621                    dest_volume.flatten()
622                except Exception as e:
623                    msg = (_("Failed to flatten volume %(volume)s with "
624                             "error: %(error)s.") %
625                           {'volume': dest_name,
626                            'error': e})
627                    LOG.exception(msg)
628                    src_volume.close()
629                    raise exception.VolumeBackendAPIException(data=msg)
630                finally:
631                    dest_volume.close()
632
633                try:
634                    # remove temporary snap
635                    LOG.debug("remove temporary snap %s", clone_snap)
636                    src_volume.unprotect_snap(clone_snap)
637                    src_volume.remove_snap(clone_snap)
638                except Exception as e:
639                    msg = (_("Failed to remove temporary snap "
640                             "%(snap_name)s, error: %(error)s") %
641                           {'snap_name': clone_snap,
642                            'error': e})
643                    LOG.exception(msg)
644                    src_volume.close()
645                    raise exception.VolumeBackendAPIException(data=msg)
646
647            try:
648                volume_update = self._enable_replication_if_needed(volume)
649            except Exception:
650                self.RBDProxy().remove(client.ioctx, dest_name)
651                src_volume.unprotect_snap(clone_snap)
652                src_volume.remove_snap(clone_snap)
653                err_msg = (_('Failed to enable image replication'))
654                raise exception.ReplicationError(reason=err_msg,
655                                                 volume_id=volume.id)
656            finally:
657                src_volume.close()
658
659            self._extend_if_required(volume, src_vref)
660
661        LOG.debug("clone created successfully")
662        return volume_update
663
664    def _enable_replication(self, volume):
665        """Enable replication for a volume.
666
667        Returns required volume update.
668        """
669        vol_name = utils.convert_str(volume.name)
670        with RBDVolumeProxy(self, vol_name) as image:
671            had_exclusive_lock = (image.features() &
672                                  self.rbd.RBD_FEATURE_EXCLUSIVE_LOCK)
673            had_journaling = image.features() & self.rbd.RBD_FEATURE_JOURNALING
674            if not had_exclusive_lock:
675                image.update_features(self.rbd.RBD_FEATURE_EXCLUSIVE_LOCK,
676                                      True)
677            if not had_journaling:
678                image.update_features(self.rbd.RBD_FEATURE_JOURNALING, True)
679            image.mirror_image_enable()
680
681        driver_data = self._dumps({
682            'had_journaling': bool(had_journaling),
683            'had_exclusive_lock': bool(had_exclusive_lock)
684        })
685        return {'replication_status': fields.ReplicationStatus.ENABLED,
686                'replication_driver_data': driver_data}
687
688    def _is_replicated_type(self, volume_type):
689        # We do a safe attribute get because volume_type could be None
690        specs = getattr(volume_type, 'extra_specs', {})
691        return specs.get(EXTRA_SPECS_REPL_ENABLED) == "<is> True"
692
693    def _enable_replication_if_needed(self, volume):
694        if self._is_replicated_type(volume.volume_type):
695            return self._enable_replication(volume)
696        if self._is_replication_enabled:
697            return {'replication_status': fields.ReplicationStatus.DISABLED}
698        return None
699
700    def _check_encryption_provider(self, volume, context):
701        """Check that this is a LUKS encryption provider.
702
703        :returns: encryption dict
704        """
705
706        encryption = self.db.volume_encryption_metadata_get(context, volume.id)
707        provider = encryption['provider']
708        if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP:
709            provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider]
710        if provider != encryptors.LUKS:
711            message = _("Provider %s not supported.") % provider
712            raise exception.VolumeDriverException(message=message)
713
714        if 'cipher' not in encryption or 'key_size' not in encryption:
715            msg = _('encryption spec must contain "cipher" and'
716                    '"key_size"')
717            raise exception.VolumeDriverException(message=msg)
718
719        return encryption
720
721    def _create_encrypted_volume(self, volume, context):
722        """Create an encrypted volume.
723
724        This works by creating an encrypted image locally,
725        and then uploading it to the volume.
726        """
727
728        encryption = self._check_encryption_provider(volume, context)
729
730        # Fetch the key associated with the volume and decode the passphrase
731        keymgr = key_manager.API(CONF)
732        key = keymgr.get(context, encryption['encryption_key_id'])
733        passphrase = binascii.hexlify(key.get_encoded()).decode('utf-8')
734
735        # create a file
736        tmp_dir = self._image_conversion_dir()
737
738        with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp_image:
739            with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp_key:
740                with open(tmp_key.name, 'w') as f:
741                    f.write(passphrase)
742
743                cipher_spec = image_utils.decode_cipher(encryption['cipher'],
744                                                        encryption['key_size'])
745
746                create_cmd = (
747                    'qemu-img', 'create', '-f', 'luks',
748                    '-o', 'cipher-alg=%(cipher_alg)s,'
749                    'cipher-mode=%(cipher_mode)s,'
750                    'ivgen-alg=%(ivgen_alg)s' % cipher_spec,
751                    '--object', 'secret,id=luks_sec,'
752                    'format=raw,file=%(passfile)s' % {'passfile':
753                                                      tmp_key.name},
754                    '-o', 'key-secret=luks_sec',
755                    tmp_image.name,
756                    '%sM' % (volume.size * 1024))
757                self._execute(*create_cmd)
758
759            # Copy image into RBD
760            chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
761            order = int(math.log(chunk_size, 2))
762
763            cmd = ['rbd', 'import',
764                   '--pool', self.configuration.rbd_pool,
765                   '--order', order,
766                   tmp_image.name, volume.name]
767            cmd.extend(self._ceph_args())
768            self._execute(*cmd)
769
770    def create_volume(self, volume):
771        """Creates a logical volume."""
772
773        if volume.encryption_key_id:
774            return self._create_encrypted_volume(volume, volume.obj_context)
775
776        size = int(volume.size) * units.Gi
777
778        LOG.debug("creating volume '%s'", volume.name)
779
780        chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
781        order = int(math.log(chunk_size, 2))
782        vol_name = utils.convert_str(volume.name)
783
784        with RADOSClient(self) as client:
785            self.RBDProxy().create(client.ioctx,
786                                   vol_name,
787                                   size,
788                                   order,
789                                   old_format=False,
790                                   features=client.features)
791
792            try:
793                volume_update = self._enable_replication_if_needed(volume)
794            except Exception:
795                self.RBDProxy().remove(client.ioctx, vol_name)
796                err_msg = (_('Failed to enable image replication'))
797                raise exception.ReplicationError(reason=err_msg,
798                                                 volume_id=volume.id)
799        return volume_update
800
801    def _flatten(self, pool, volume_name):
802        LOG.debug('flattening %(pool)s/%(img)s',
803                  dict(pool=pool, img=volume_name))
804        with RBDVolumeProxy(self, volume_name, pool) as vol:
805            vol.flatten()
806
807    def _clone(self, volume, src_pool, src_image, src_snap):
808        LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s',
809                  dict(pool=src_pool, img=src_image, snap=src_snap,
810                       dst=volume.name))
811
812        chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
813        order = int(math.log(chunk_size, 2))
814        vol_name = utils.convert_str(volume.name)
815
816        with RADOSClient(self, src_pool) as src_client:
817            with RADOSClient(self) as dest_client:
818                self.RBDProxy().clone(src_client.ioctx,
819                                      utils.convert_str(src_image),
820                                      utils.convert_str(src_snap),
821                                      dest_client.ioctx,
822                                      vol_name,
823                                      features=src_client.features,
824                                      order=order)
825
826            try:
827                volume_update = self._enable_replication_if_needed(volume)
828            except Exception:
829                self.RBDProxy().remove(dest_client.ioctx, vol_name)
830                err_msg = (_('Failed to enable image replication'))
831                raise exception.ReplicationError(reason=err_msg,
832                                                 volume_id=volume.id)
833            return volume_update or {}
834
835    def _resize(self, volume, **kwargs):
836        size = kwargs.get('size', None)
837        if not size:
838            size = int(volume.size) * units.Gi
839
840        with RBDVolumeProxy(self, volume.name) as vol:
841            vol.resize(size)
842
843    def create_volume_from_snapshot(self, volume, snapshot):
844        """Creates a volume from a snapshot."""
845        volume_update = self._clone(volume, self.configuration.rbd_pool,
846                                    snapshot.volume_name, snapshot.name)
847        if self.configuration.rbd_flatten_volume_from_snapshot:
848            self._flatten(self.configuration.rbd_pool, volume.name)
849        if int(volume.size):
850            self._resize(volume)
851        return volume_update
852
853    def _delete_backup_snaps(self, rbd_image):
854        backup_snaps = self._get_backup_snaps(rbd_image)
855        if backup_snaps:
856            for snap in backup_snaps:
857                rbd_image.remove_snap(snap['name'])
858        else:
859            LOG.debug("volume has no backup snaps")
860
861    def _get_clone_info(self, volume, volume_name, snap=None):
862        """If volume is a clone, return its parent info.
863
864        Returns a tuple of (pool, parent, snap). A snapshot may optionally be
865        provided for the case where a cloned volume has been flattened but it's
866        snapshot still depends on the parent.
867        """
868        try:
869            if snap:
870                volume.set_snap(snap)
871            pool, parent, parent_snap = tuple(volume.parent_info())
872            if snap:
873                volume.set_snap(None)
874            # Strip the tag off the end of the volume name since it will not be
875            # in the snap name.
876            if volume_name.endswith('.deleted'):
877                volume_name = volume_name[:-len('.deleted')]
878            # Now check the snap name matches.
879            if parent_snap == "%s.clone_snap" % volume_name:
880                return pool, parent, parent_snap
881        except self.rbd.ImageNotFound:
882            LOG.debug("Volume %s is not a clone.", volume_name)
883            volume.set_snap(None)
884
885        return (None, None, None)
886
887    def _get_children_info(self, volume, snap):
888        """List children for the given snapshot of a volume(image).
889
890        Returns a list of (pool, image).
891        """
892
893        children_list = []
894
895        if snap:
896            volume.set_snap(snap)
897            children_list = volume.list_children()
898            volume.set_snap(None)
899
900        return children_list
901
902    def _delete_clone_parent_refs(self, client, parent_name, parent_snap):
903        """Walk back up the clone chain and delete references.
904
905        Deletes references i.e. deleted parent volumes and snapshots.
906        """
907        parent_rbd = self.rbd.Image(client.ioctx, parent_name)
908        parent_has_snaps = False
909        try:
910            # Check for grandparent
911            _pool, g_parent, g_parent_snap = self._get_clone_info(parent_rbd,
912                                                                  parent_name,
913                                                                  parent_snap)
914
915            LOG.debug("deleting parent snapshot %s", parent_snap)
916            parent_rbd.unprotect_snap(parent_snap)
917            parent_rbd.remove_snap(parent_snap)
918
919            parent_has_snaps = bool(list(parent_rbd.list_snaps()))
920        finally:
921            parent_rbd.close()
922
923        # If parent has been deleted in Cinder, delete the silent reference and
924        # keep walking up the chain if it is itself a clone.
925        if (not parent_has_snaps) and parent_name.endswith('.deleted'):
926            LOG.debug("deleting parent %s", parent_name)
927            self.RBDProxy().remove(client.ioctx, parent_name)
928
929            # Now move up to grandparent if there is one
930            if g_parent:
931                self._delete_clone_parent_refs(client, g_parent, g_parent_snap)
932
933    def delete_volume(self, volume):
934        """Deletes a logical volume."""
935        # NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
936        #                utf-8 otherwise librbd will barf.
937        volume_name = utils.convert_str(volume.name)
938        with RADOSClient(self) as client:
939            try:
940                rbd_image = self.rbd.Image(client.ioctx, volume_name)
941            except self.rbd.ImageNotFound:
942                LOG.info("volume %s no longer exists in backend",
943                         volume_name)
944                return
945
946            clone_snap = None
947            parent = None
948
949            # Ensure any backup snapshots are deleted
950            self._delete_backup_snaps(rbd_image)
951
952            # If the volume has non-clone snapshots this delete is expected to
953            # raise VolumeIsBusy so do so straight away.
954            try:
955                snaps = rbd_image.list_snaps()
956                for snap in snaps:
957                    if snap['name'].endswith('.clone_snap'):
958                        LOG.debug("volume has clone snapshot(s)")
959                        # We grab one of these and use it when fetching parent
960                        # info in case the volume has been flattened.
961                        clone_snap = snap['name']
962                        break
963
964                    raise exception.VolumeIsBusy(volume_name=volume_name)
965
966                # Determine if this volume is itself a clone
967                _pool, parent, parent_snap = self._get_clone_info(rbd_image,
968                                                                  volume_name,
969                                                                  clone_snap)
970            finally:
971                rbd_image.close()
972
973            @utils.retry(self.rbd.ImageBusy,
974                         self.configuration.rados_connection_interval,
975                         self.configuration.rados_connection_retries)
976            def _try_remove_volume(client, volume_name):
977                self.RBDProxy().remove(client.ioctx, volume_name)
978
979            if clone_snap is None:
980                LOG.debug("deleting rbd volume %s", volume_name)
981                try:
982                    _try_remove_volume(client, volume_name)
983                except self.rbd.ImageBusy:
984                    msg = (_("ImageBusy error raised while deleting rbd "
985                             "volume. This may have been caused by a "
986                             "connection from a client that has crashed and, "
987                             "if so, may be resolved by retrying the delete "
988                             "after 30 seconds has elapsed."))
989                    LOG.warning(msg)
990                    # Now raise this so that volume stays available so that we
991                    # delete can be retried.
992                    raise exception.VolumeIsBusy(msg, volume_name=volume_name)
993                except self.rbd.ImageNotFound:
994                    LOG.info("RBD volume %s not found, allowing delete "
995                             "operation to proceed.", volume_name)
996                    return
997
998                # If it is a clone, walk back up the parent chain deleting
999                # references.
1000                if parent:
1001                    LOG.debug("volume is a clone so cleaning references")
1002                    self._delete_clone_parent_refs(client, parent, parent_snap)
1003            else:
1004                # If the volume has copy-on-write clones we will not be able to
1005                # delete it. Instead we will keep it as a silent volume which
1006                # will be deleted when it's snapshot and clones are deleted.
1007                new_name = "%s.deleted" % (volume_name)
1008                self.RBDProxy().rename(client.ioctx, volume_name, new_name)
1009
1010    def create_snapshot(self, snapshot):
1011        """Creates an rbd snapshot."""
1012        with RBDVolumeProxy(self, snapshot.volume_name) as volume:
1013            snap = utils.convert_str(snapshot.name)
1014            volume.create_snap(snap)
1015            volume.protect_snap(snap)
1016
1017    def delete_snapshot(self, snapshot):
1018        """Deletes an rbd snapshot."""
1019        # NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
1020        #                utf-8 otherwise librbd will barf.
1021        volume_name = utils.convert_str(snapshot.volume_name)
1022        snap_name = utils.convert_str(snapshot.name)
1023
1024        with RBDVolumeProxy(self, volume_name) as volume:
1025            try:
1026                volume.unprotect_snap(snap_name)
1027            except self.rbd.InvalidArgument:
1028                LOG.info(
1029                    "InvalidArgument: Unable to unprotect snapshot %s.",
1030                    snap_name)
1031            except self.rbd.ImageNotFound:
1032                LOG.info(
1033                    "ImageNotFound: Unable to unprotect snapshot %s.",
1034                    snap_name)
1035            except self.rbd.ImageBusy:
1036                children_list = self._get_children_info(volume, snap_name)
1037
1038                if children_list:
1039                    for (pool, image) in children_list:
1040                        LOG.info('Image %(pool)s/%(image)s is dependent '
1041                                 'on the snapshot %(snap)s.',
1042                                 {'pool': pool,
1043                                  'image': image,
1044                                  'snap': snap_name})
1045
1046                raise exception.SnapshotIsBusy(snapshot_name=snap_name)
1047            try:
1048                volume.remove_snap(snap_name)
1049            except self.rbd.ImageNotFound:
1050                LOG.info("Snapshot %s does not exist in backend.",
1051                         snap_name)
1052
1053    def _disable_replication(self, volume):
1054        """Disable replication on the given volume."""
1055        vol_name = utils.convert_str(volume.name)
1056        with RBDVolumeProxy(self, vol_name) as image:
1057            image.mirror_image_disable(False)
1058            driver_data = json.loads(volume.replication_driver_data)
1059            # If 'journaling' and/or 'exclusive-lock' have
1060            # been enabled in '_enable_replication',
1061            # they will be disabled here. If not, it will keep
1062            # what it was before.
1063            if not driver_data['had_journaling']:
1064                image.update_features(self.rbd.RBD_FEATURE_JOURNALING, False)
1065            if not driver_data['had_exclusive_lock']:
1066                image.update_features(self.rbd.RBD_FEATURE_EXCLUSIVE_LOCK,
1067                                      False)
1068        return {'replication_status': fields.ReplicationStatus.DISABLED,
1069                'replication_driver_data': None}
1070
1071    def retype(self, context, volume, new_type, diff, host):
1072        """Retype from one volume type to another on the same backend."""
1073        old_vol_replicated = self._is_replicated_type(volume.volume_type)
1074        new_vol_replicated = self._is_replicated_type(new_type)
1075
1076        if old_vol_replicated and not new_vol_replicated:
1077            try:
1078                return True, self._disable_replication(volume)
1079            except Exception:
1080                err_msg = (_('Failed to disable image replication'))
1081                raise exception.ReplicationError(reason=err_msg,
1082                                                 volume_id=volume.id)
1083        elif not old_vol_replicated and new_vol_replicated:
1084            try:
1085                return True, self._enable_replication(volume)
1086            except Exception:
1087                err_msg = (_('Failed to enable image replication'))
1088                raise exception.ReplicationError(reason=err_msg,
1089                                                 volume_id=volume.id)
1090
1091        if not new_vol_replicated and self._is_replication_enabled:
1092            update = {'replication_status': fields.ReplicationStatus.DISABLED}
1093        else:
1094            update = None
1095        return True, update
1096
1097    def _dumps(self, obj):
1098        return json.dumps(obj, separators=(',', ':'), sort_keys=True)
1099
1100    def _exec_on_volume(self, volume_name, remote, operation, *args, **kwargs):
1101        @utils.retry(rbd.ImageBusy,
1102                     self.configuration.rados_connection_interval,
1103                     self.configuration.rados_connection_retries)
1104        def _do_exec():
1105            timeout = self.configuration.replication_connect_timeout
1106            with RBDVolumeProxy(self, volume_name, self.configuration.rbd_pool,
1107                                remote=remote, timeout=timeout) as rbd_image:
1108                return getattr(rbd_image, operation)(*args, **kwargs)
1109        return _do_exec()
1110
1111    def _failover_volume(self, volume, remote, is_demoted, replication_status):
1112        """Process failover for a volume.
1113
1114        There are 2 different cases that will return different update values
1115        for the volume:
1116
1117        - Volume has replication enabled and failover succeeded: Set
1118          replication status to failed-over.
1119        - Volume has replication enabled and failover fails: Set status to
1120          error, replication status to failover-error, and store previous
1121          status in previous_status field.
1122        """
1123        # Failover is allowed when volume has it enabled or it has already
1124        # failed over, because we may want to do a second failover.
1125        vol_name = utils.convert_str(volume.name)
1126        try:
1127            self._exec_on_volume(vol_name, remote,
1128                                 'mirror_image_promote', not is_demoted)
1129
1130            return {'volume_id': volume.id,
1131                    'updates': {'replication_status': replication_status}}
1132        except Exception as e:
1133            replication_status = fields.ReplicationStatus.FAILOVER_ERROR
1134            LOG.error('Failed to failover volume %(volume)s with '
1135                      'error: %(error)s.',
1136                      {'volume': volume.name, 'error': e})
1137
1138        # Failover failed
1139        error_result = {
1140            'volume_id': volume.id,
1141            'updates': {
1142                'status': 'error',
1143                'previous_status': volume.status,
1144                'replication_status': replication_status
1145            }
1146        }
1147
1148        return error_result
1149
1150    def _demote_volumes(self, volumes, until_failure=True):
1151        """Try to demote volumes on the current primary cluster."""
1152        result = []
1153        try_demoting = True
1154        for volume in volumes:
1155            demoted = False
1156            if try_demoting:
1157                vol_name = utils.convert_str(volume.name)
1158                try:
1159                    self._exec_on_volume(vol_name, self._active_config,
1160                                         'mirror_image_demote')
1161                    demoted = True
1162                except Exception as e:
1163                    LOG.debug('Failed to demote %(volume)s with error: '
1164                              '%(error)s.',
1165                              {'volume': volume.name, 'error': e})
1166                    try_demoting = not until_failure
1167            result.append(demoted)
1168        return result
1169
1170    def _get_failover_target_config(self, secondary_id=None):
1171        if not secondary_id:
1172            # In auto mode exclude failback and active
1173            candidates = set(self._target_names).difference(
1174                ('default', self._active_backend_id))
1175            if not candidates:
1176                raise exception.InvalidReplicationTarget(
1177                    reason=_('RBD: No available failover target host.'))
1178            secondary_id = candidates.pop()
1179        return secondary_id, self._get_target_config(secondary_id)
1180
1181    def failover_host(self, context, volumes, secondary_id=None, groups=None):
1182        """Failover to replication target."""
1183        LOG.info('RBD driver failover started.')
1184        if not self._is_replication_enabled:
1185            raise exception.UnableToFailOver(
1186                reason=_('RBD: Replication is not enabled.'))
1187
1188        if secondary_id == 'default':
1189            replication_status = fields.ReplicationStatus.ENABLED
1190        else:
1191            replication_status = fields.ReplicationStatus.FAILED_OVER
1192
1193        secondary_id, remote = self._get_failover_target_config(secondary_id)
1194
1195        # Try to demote the volumes first
1196        demotion_results = self._demote_volumes(volumes)
1197        # Do the failover taking into consideration if they have been demoted
1198        updates = [self._failover_volume(volume, remote, is_demoted,
1199                                         replication_status)
1200                   for volume, is_demoted in zip(volumes, demotion_results)]
1201        self._active_backend_id = secondary_id
1202        self._active_config = remote
1203        LOG.info('RBD driver failover completed.')
1204        return secondary_id, updates, []
1205
1206    def ensure_export(self, context, volume):
1207        """Synchronously recreates an export for a logical volume."""
1208        pass
1209
1210    def create_export(self, context, volume, connector):
1211        """Exports the volume."""
1212        pass
1213
1214    def remove_export(self, context, volume):
1215        """Removes an export for a logical volume."""
1216        pass
1217
1218    def _get_keyring_contents(self):
1219        # NOTE(danpawlik) If keyring is not provided in Cinder configuration,
1220        # os-brick library will take keyring from default path.
1221        keyring_file = self.configuration.rbd_keyring_conf
1222        keyring_data = None
1223        try:
1224            if os.path.isfile(keyring_file):
1225                with open(keyring_file, 'r') as k_file:
1226                    keyring_data = k_file.read()
1227        except IOError:
1228            LOG.debug('Cannot read RBD keyring file: %s.', keyring_file)
1229
1230        return keyring_data
1231
1232    def initialize_connection(self, volume, connector):
1233        hosts, ports = self._get_mon_addrs()
1234        data = {
1235            'driver_volume_type': 'rbd',
1236            'data': {
1237                'name': '%s/%s' % (self.configuration.rbd_pool,
1238                                   volume.name),
1239                'hosts': hosts,
1240                'ports': ports,
1241                'cluster_name': self.configuration.rbd_cluster_name,
1242                'auth_enabled': (self.configuration.rbd_user is not None),
1243                'auth_username': self.configuration.rbd_user,
1244                'secret_type': 'ceph',
1245                'secret_uuid': self.configuration.rbd_secret_uuid,
1246                'volume_id': volume.id,
1247                "discard": True,
1248                'keyring': self._get_keyring_contents(),
1249            }
1250        }
1251        LOG.debug('connection data: %s', data)
1252        return data
1253
1254    def terminate_connection(self, volume, connector, **kwargs):
1255        pass
1256
1257    def _parse_location(self, location):
1258        prefix = 'rbd://'
1259        if not location.startswith(prefix):
1260            reason = _('Not stored in rbd')
1261            raise exception.ImageUnacceptable(image_id=location, reason=reason)
1262        pieces = [urllib.parse.unquote(loc)
1263                  for loc in location[len(prefix):].split('/')]
1264        if any(map(lambda p: p == '', pieces)):
1265            reason = _('Blank components')
1266            raise exception.ImageUnacceptable(image_id=location, reason=reason)
1267        if len(pieces) != 4:
1268            reason = _('Not an rbd snapshot')
1269            raise exception.ImageUnacceptable(image_id=location, reason=reason)
1270        return pieces
1271
1272    def _get_fsid(self):
1273        with RADOSClient(self) as client:
1274            return client.cluster.get_fsid()
1275
1276    def _is_cloneable(self, image_location, image_meta):
1277        try:
1278            fsid, pool, image, snapshot = self._parse_location(image_location)
1279        except exception.ImageUnacceptable as e:
1280            LOG.debug('not cloneable: %s.', e)
1281            return False
1282
1283        if self._get_fsid() != fsid:
1284            LOG.debug('%s is in a different ceph cluster.', image_location)
1285            return False
1286
1287        if image_meta['disk_format'] != 'raw':
1288            LOG.debug("rbd image clone requires image format to be "
1289                      "'raw' but image %(image)s is '%(format)s'",
1290                      {"image": image_location,
1291                       "format": image_meta['disk_format']})
1292            return False
1293
1294        # check that we can read the image
1295        try:
1296            with RBDVolumeProxy(self, image,
1297                                pool=pool,
1298                                snapshot=snapshot,
1299                                read_only=True):
1300                return True
1301        except self.rbd.Error as e:
1302            LOG.debug('Unable to open image %(loc)s: %(err)s.',
1303                      dict(loc=image_location, err=e))
1304            return False
1305
1306    def clone_image(self, context, volume,
1307                    image_location, image_meta,
1308                    image_service):
1309        if image_location:
1310            # Note: image_location[0] is glance image direct_url.
1311            # image_location[1] contains the list of all locations (including
1312            # direct_url) or None if show_multiple_locations is False in
1313            # glance configuration.
1314            if image_location[1]:
1315                url_locations = [location['url'] for
1316                                 location in image_location[1]]
1317            else:
1318                url_locations = [image_location[0]]
1319
1320            # iterate all locations to look for a cloneable one.
1321            for url_location in url_locations:
1322                if url_location and self._is_cloneable(
1323                        url_location, image_meta):
1324                    _prefix, pool, image, snapshot = \
1325                        self._parse_location(url_location)
1326                    volume_update = self._clone(volume, pool, image, snapshot)
1327                    volume_update['provider_location'] = None
1328                    self._resize(volume)
1329                    return volume_update, True
1330        return ({}, False)
1331
1332    def _image_conversion_dir(self):
1333        tmpdir = (CONF.image_conversion_dir or
1334                  tempfile.gettempdir())
1335
1336        # ensure temporary directory exists
1337        if not os.path.exists(tmpdir):
1338            os.makedirs(tmpdir)
1339
1340        return tmpdir
1341
1342    def copy_image_to_encrypted_volume(self, context, volume, image_service,
1343                                       image_id):
1344        self._copy_image_to_volume(context, volume, image_service, image_id,
1345                                   encrypted=True)
1346
1347    def copy_image_to_volume(self, context, volume, image_service, image_id):
1348        self._copy_image_to_volume(context, volume, image_service, image_id)
1349
1350    def _encrypt_image(self, context, volume, tmp_dir, src_image_path):
1351        encryption = self._check_encryption_provider(volume, context)
1352
1353        # Fetch the key associated with the volume and decode the passphrase
1354        keymgr = key_manager.API(CONF)
1355        key = keymgr.get(context, encryption['encryption_key_id'])
1356        passphrase = binascii.hexlify(key.get_encoded()).decode('utf-8')
1357
1358        # Decode the dm-crypt style cipher spec into something qemu-img can use
1359        cipher_spec = image_utils.decode_cipher(encryption['cipher'],
1360                                                encryption['key_size'])
1361
1362        tmp_dir = self._image_conversion_dir()
1363
1364        with tempfile.NamedTemporaryFile(prefix='luks_',
1365                                         dir=tmp_dir) as pass_file:
1366            with open(pass_file.name, 'w') as f:
1367                f.write(passphrase)
1368
1369            # Convert the raw image to luks
1370            dest_image_path = src_image_path + '.luks'
1371            image_utils.convert_image(src_image_path, dest_image_path,
1372                                      'luks', src_format='raw',
1373                                      cipher_spec=cipher_spec,
1374                                      passphrase_file=pass_file.name)
1375
1376            # Replace the original image with the now encrypted image
1377            os.rename(dest_image_path, src_image_path)
1378
1379    def _copy_image_to_volume(self, context, volume, image_service, image_id,
1380                              encrypted=False):
1381
1382        tmp_dir = self._image_conversion_dir()
1383
1384        with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp:
1385            image_utils.fetch_to_raw(context, image_service, image_id,
1386                                     tmp.name,
1387                                     self.configuration.volume_dd_blocksize,
1388                                     size=volume.size)
1389
1390            if encrypted:
1391                self._encrypt_image(context, volume, tmp_dir, tmp.name)
1392
1393            self.delete_volume(volume)
1394
1395            chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
1396            order = int(math.log(chunk_size, 2))
1397            # keep using the command line import instead of librbd since it
1398            # detects zeroes to preserve sparseness in the image
1399            args = ['rbd', 'import',
1400                    '--pool', self.configuration.rbd_pool,
1401                    '--order', order,
1402                    tmp.name, volume.name,
1403                    '--new-format']
1404            args.extend(self._ceph_args())
1405            self._try_execute(*args)
1406        self._resize(volume)
1407        # We may need to re-enable replication because we have deleted the
1408        # original image and created a new one using the command line import.
1409        try:
1410            self._enable_replication_if_needed(volume)
1411        except Exception:
1412            err_msg = (_('Failed to enable image replication'))
1413            raise exception.ReplicationError(reason=err_msg,
1414                                             volume_id=volume.id)
1415
1416    def copy_volume_to_image(self, context, volume, image_service, image_meta):
1417        tmp_dir = self._image_conversion_dir()
1418        tmp_file = os.path.join(tmp_dir,
1419                                volume.name + '-' + image_meta['id'])
1420        with fileutils.remove_path_on_error(tmp_file):
1421            args = ['rbd', 'export',
1422                    '--pool', self.configuration.rbd_pool,
1423                    volume.name, tmp_file]
1424            args.extend(self._ceph_args())
1425            self._try_execute(*args)
1426            image_utils.upload_volume(context, image_service,
1427                                      image_meta, tmp_file)
1428        os.unlink(tmp_file)
1429
1430    def extend_volume(self, volume, new_size):
1431        """Extend an existing volume."""
1432        old_size = volume.size
1433
1434        try:
1435            size = int(new_size) * units.Gi
1436            self._resize(volume, size=size)
1437        except Exception:
1438            msg = _('Failed to Extend Volume '
1439                    '%(volname)s') % {'volname': volume.name}
1440            LOG.error(msg)
1441            raise exception.VolumeBackendAPIException(data=msg)
1442
1443        LOG.debug("Extend volume from %(old_size)s GB to %(new_size)s GB.",
1444                  {'old_size': old_size, 'new_size': new_size})
1445
1446    def manage_existing(self, volume, existing_ref):
1447        """Manages an existing image.
1448
1449        Renames the image name to match the expected name for the volume.
1450        Error checking done by manage_existing_get_size is not repeated.
1451
1452        :param volume:
1453            volume ref info to be set
1454        :param existing_ref:
1455            existing_ref is a dictionary of the form:
1456            {'source-name': <name of rbd image>}
1457        """
1458        # Raise an exception if we didn't find a suitable rbd image.
1459        with RADOSClient(self) as client:
1460            rbd_name = existing_ref['source-name']
1461            self.RBDProxy().rename(client.ioctx,
1462                                   utils.convert_str(rbd_name),
1463                                   utils.convert_str(volume.name))
1464
1465    def manage_existing_get_size(self, volume, existing_ref):
1466        """Return size of an existing image for manage_existing.
1467
1468        :param volume:
1469            volume ref info to be set
1470        :param existing_ref:
1471            existing_ref is a dictionary of the form:
1472            {'source-name': <name of rbd image>}
1473        """
1474
1475        # Check that the reference is valid
1476        if 'source-name' not in existing_ref:
1477            reason = _('Reference must contain source-name element.')
1478            raise exception.ManageExistingInvalidReference(
1479                existing_ref=existing_ref, reason=reason)
1480
1481        rbd_name = utils.convert_str(existing_ref['source-name'])
1482
1483        with RADOSClient(self) as client:
1484            # Raise an exception if we didn't find a suitable rbd image.
1485            try:
1486                rbd_image = self.rbd.Image(client.ioctx, rbd_name)
1487            except self.rbd.ImageNotFound:
1488                kwargs = {'existing_ref': rbd_name,
1489                          'reason': 'Specified rbd image does not exist.'}
1490                raise exception.ManageExistingInvalidReference(**kwargs)
1491
1492            image_size = rbd_image.size()
1493            rbd_image.close()
1494
1495            # RBD image size is returned in bytes.  Attempt to parse
1496            # size as a float and round up to the next integer.
1497            try:
1498                convert_size = int(math.ceil(float(image_size) / units.Gi))
1499                return convert_size
1500            except ValueError:
1501                exception_message = (_("Failed to manage existing volume "
1502                                       "%(name)s, because reported size "
1503                                       "%(size)s was not a floating-point"
1504                                       " number.")
1505                                     % {'name': rbd_name,
1506                                        'size': image_size})
1507                raise exception.VolumeBackendAPIException(
1508                    data=exception_message)
1509
1510    def _get_image_status(self, image_name):
1511        args = ['rbd', 'status',
1512                '--pool', self.configuration.rbd_pool,
1513                '--format=json',
1514                image_name]
1515        args.extend(self._ceph_args())
1516        out, _ = self._execute(*args)
1517        return json.loads(out)
1518
1519    def get_manageable_volumes(self, cinder_volumes, marker, limit, offset,
1520                               sort_keys, sort_dirs):
1521        manageable_volumes = []
1522        cinder_ids = [resource['id'] for resource in cinder_volumes]
1523
1524        with RADOSClient(self) as client:
1525            for image_name in self.RBDProxy().list(client.ioctx):
1526                image_id = volume_utils.extract_id_from_volume_name(image_name)
1527                with RBDVolumeProxy(self, image_name, read_only=True) as image:
1528                    try:
1529                        image_info = {
1530                            'reference': {'source-name': image_name},
1531                            'size': int(math.ceil(
1532                                float(image.size()) / units.Gi)),
1533                            'cinder_id': None,
1534                            'extra_info': None
1535                        }
1536                        if image_id in cinder_ids:
1537                            image_info['cinder_id'] = image_id
1538                            image_info['safe_to_manage'] = False
1539                            image_info['reason_not_safe'] = 'already managed'
1540                        elif len(self._get_image_status(
1541                                image_name)['watchers']) > 0:
1542                            # If the num of watchers of image is >= 1, then the
1543                            # image is considered to be used by client(s).
1544                            image_info['safe_to_manage'] = False
1545                            image_info['reason_not_safe'] = 'volume in use'
1546                        else:
1547                            image_info['safe_to_manage'] = True
1548                            image_info['reason_not_safe'] = None
1549                        manageable_volumes.append(image_info)
1550                    except self.rbd.ImageNotFound:
1551                        LOG.debug("Image %s is not found.", image_name)
1552
1553        return volume_utils.paginate_entries_list(
1554            manageable_volumes, marker, limit, offset, sort_keys, sort_dirs)
1555
1556    def unmanage(self, volume):
1557        pass
1558
1559    def update_migrated_volume(self, ctxt, volume, new_volume,
1560                               original_volume_status):
1561        """Return model update from RBD for migrated volume.
1562
1563        This method should rename the back-end volume name(id) on the
1564        destination host back to its original name(id) on the source host.
1565
1566        :param ctxt: The context used to run the method update_migrated_volume
1567        :param volume: The original volume that was migrated to this backend
1568        :param new_volume: The migration volume object that was created on
1569                           this backend as part of the migration process
1570        :param original_volume_status: The status of the original volume
1571        :returns: model_update to update DB with any needed changes
1572        """
1573        name_id = None
1574        provider_location = None
1575
1576        existing_name = CONF.volume_name_template % new_volume.id
1577        wanted_name = CONF.volume_name_template % volume.id
1578        with RADOSClient(self) as client:
1579            try:
1580                self.RBDProxy().rename(client.ioctx,
1581                                       utils.convert_str(existing_name),
1582                                       utils.convert_str(wanted_name))
1583            except self.rbd.ImageNotFound:
1584                LOG.error('Unable to rename the logical volume '
1585                          'for volume %s.', volume.id)
1586                # If the rename fails, _name_id should be set to the new
1587                # volume id and provider_location should be set to the
1588                # one from the new volume as well.
1589                name_id = new_volume._name_id or new_volume.id
1590                provider_location = new_volume['provider_location']
1591        return {'_name_id': name_id, 'provider_location': provider_location}
1592
1593    def migrate_volume(self, context, volume, host):
1594
1595        refuse_to_migrate = (False, None)
1596
1597        if volume.status not in ('available', 'retyping', 'maintenance'):
1598            LOG.debug('Only available volumes can be migrated using backend '
1599                      'assisted migration. Falling back to generic migration.')
1600            return refuse_to_migrate
1601
1602        if (host['capabilities']['storage_protocol'] != 'ceph'):
1603            LOG.debug('Source and destination drivers need to be RBD '
1604                      'to use backend assisted migration. Falling back to '
1605                      'generic migration.')
1606            return refuse_to_migrate
1607
1608        loc_info = host['capabilities'].get('location_info')
1609
1610        LOG.debug('Attempting RBD assisted volume migration. volume: %(id)s, '
1611                  'host: %(host)s, status=%(status)s.',
1612                  {'id': volume.id, 'host': host, 'status': volume.status})
1613
1614        if not loc_info:
1615            LOG.debug('Could not find location_info in capabilities reported '
1616                      'by the destination driver. Falling back to generic '
1617                      'migration.')
1618            return refuse_to_migrate
1619
1620        try:
1621            (rbd_cluster_name, rbd_ceph_conf, rbd_fsid, rbd_user, rbd_pool) = (
1622                utils.convert_str(loc_info).split(':'))
1623        except ValueError:
1624            LOG.error('Location info needed for backend enabled volume '
1625                      'migration not in correct format: %s. Falling back to '
1626                      'generic volume migration.', loc_info)
1627            return refuse_to_migrate
1628
1629        with linuxrbd.RBDClient(rbd_user, rbd_pool, conffile=rbd_ceph_conf,
1630                                rbd_cluster_name=rbd_cluster_name) as target:
1631            if ((rbd_fsid != self._get_fsid() or
1632                 rbd_fsid != target.client.get_fsid())):
1633                LOG.info('Migration between clusters is not supported. '
1634                         'Falling back to generic migration.')
1635                return refuse_to_migrate
1636
1637            with RBDVolumeProxy(self, volume.name, read_only=True) as source:
1638                try:
1639                    source.copy(target.ioctx, volume.name)
1640                except Exception:
1641                    with excutils.save_and_reraise_exception():
1642                        LOG.error('Error copying rbd image %(vol)s to target '
1643                                  'pool %(pool)s.',
1644                                  {'vol': volume.name, 'pool': rbd_pool})
1645                        self.RBDProxy().remove(target.ioctx, volume.name)
1646
1647        try:
1648            # If the source fails to delete for some reason, we want to leave
1649            # the target volume in place in case deleting it might cause a lose
1650            # of data.
1651            self.delete_volume(volume)
1652        except Exception:
1653            reason = 'Failed to delete migration source volume %s.', volume.id
1654            raise exception.VolumeMigrationFailed(reason=reason)
1655
1656        LOG.info('Successful RBD assisted volume migration.')
1657
1658        return (True, None)
1659
1660    def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
1661        """Return size of an existing image for manage_existing.
1662
1663        :param snapshot:
1664            snapshot ref info to be set
1665        :param existing_ref:
1666            existing_ref is a dictionary of the form:
1667            {'source-name': <name of snapshot>}
1668        """
1669        # Check that the reference is valid
1670        if not isinstance(existing_ref, dict):
1671            existing_ref = {"source-name": existing_ref}
1672        if 'source-name' not in existing_ref:
1673            reason = _('Reference must contain source-name element.')
1674            raise exception.ManageExistingInvalidReference(
1675                existing_ref=existing_ref, reason=reason)
1676
1677        volume_name = utils.convert_str(snapshot.volume_name)
1678        snapshot_name = utils.convert_str(existing_ref['source-name'])
1679
1680        with RADOSClient(self) as client:
1681            # Raise an exception if we didn't find a suitable rbd image.
1682            try:
1683                rbd_snapshot = self.rbd.Image(client.ioctx, volume_name,
1684                                              snapshot=snapshot_name)
1685            except self.rbd.ImageNotFound:
1686                kwargs = {'existing_ref': snapshot_name,
1687                          'reason': 'Specified snapshot does not exist.'}
1688                raise exception.ManageExistingInvalidReference(**kwargs)
1689
1690            snapshot_size = rbd_snapshot.size()
1691            rbd_snapshot.close()
1692
1693            # RBD image size is returned in bytes.  Attempt to parse
1694            # size as a float and round up to the next integer.
1695            try:
1696                convert_size = int(math.ceil(float(snapshot_size) / units.Gi))
1697                return convert_size
1698            except ValueError:
1699                exception_message = (_("Failed to manage existing snapshot "
1700                                       "%(name)s, because reported size "
1701                                       "%(size)s was not a floating-point"
1702                                       " number.")
1703                                     % {'name': snapshot_name,
1704                                        'size': snapshot_size})
1705                raise exception.VolumeBackendAPIException(
1706                    data=exception_message)
1707
1708    def manage_existing_snapshot(self, snapshot, existing_ref):
1709        """Manages an existing snapshot.
1710
1711        Renames the snapshot name to match the expected name for the snapshot.
1712        Error checking done by manage_existing_get_size is not repeated.
1713
1714        :param snapshot:
1715            snapshot ref info to be set
1716        :param existing_ref:
1717            existing_ref is a dictionary of the form:
1718            {'source-name': <name of rbd snapshot>}
1719        """
1720        if not isinstance(existing_ref, dict):
1721            existing_ref = {"source-name": existing_ref}
1722        volume_name = utils.convert_str(snapshot.volume_name)
1723        with RBDVolumeProxy(self, volume_name) as volume:
1724            snapshot_name = existing_ref['source-name']
1725            volume.rename_snap(utils.convert_str(snapshot_name),
1726                               utils.convert_str(snapshot.name))
1727
1728    def get_backup_device(self, context, backup):
1729        """Get a backup device from an existing volume.
1730
1731        To support incremental backups on Ceph to Ceph we don't clone
1732        the volume.
1733        """
1734
1735        if not backup.service.endswith('ceph') or backup.snapshot_id:
1736            return super(RBDDriver, self).get_backup_device(context, backup)
1737
1738        volume = objects.Volume.get_by_id(context, backup.volume_id)
1739        return (volume, False)
1740