1# Copyright 2013 OpenStack Foundation 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14"""RADOS Block Device Driver""" 15 16from __future__ import absolute_import 17import binascii 18import json 19import math 20import os 21import tempfile 22 23from castellan import key_manager 24from eventlet import tpool 25from os_brick import encryptors 26from os_brick.initiator import linuxrbd 27from oslo_config import cfg 28from oslo_log import log as logging 29from oslo_utils import encodeutils 30from oslo_utils import excutils 31from oslo_utils import fileutils 32from oslo_utils import units 33import six 34from six.moves import urllib 35 36from cinder import exception 37from cinder.i18n import _ 38from cinder.image import image_utils 39from cinder import interface 40from cinder import objects 41from cinder.objects import fields 42from cinder import utils 43from cinder.volume import configuration 44from cinder.volume import driver 45from cinder.volume import utils as volume_utils 46 47try: 48 import rados 49 import rbd 50except ImportError: 51 rados = None 52 rbd = None 53 54 55LOG = logging.getLogger(__name__) 56 57RBD_OPTS = [ 58 cfg.StrOpt('rbd_cluster_name', 59 default='ceph', 60 help='The name of ceph cluster'), 61 cfg.StrOpt('rbd_pool', 62 default='rbd', 63 help='The RADOS pool where rbd volumes are stored'), 64 cfg.StrOpt('rbd_user', 65 help='The RADOS client name for accessing rbd volumes ' 66 '- only set when using cephx authentication'), 67 cfg.StrOpt('rbd_ceph_conf', 68 default='', # default determined by librados 69 help='Path to the ceph configuration file'), 70 cfg.StrOpt('rbd_keyring_conf', 71 default='', 72 help='Path to the ceph keyring file'), 73 cfg.BoolOpt('rbd_flatten_volume_from_snapshot', 74 default=False, 75 help='Flatten volumes created from snapshots to remove ' 76 'dependency from volume to snapshot'), 77 cfg.StrOpt('rbd_secret_uuid', 78 help='The libvirt uuid of the secret for the rbd_user ' 79 'volumes'), 80 cfg.IntOpt('rbd_max_clone_depth', 81 default=5, 82 help='Maximum number of nested volume clones that are ' 83 'taken before a flatten occurs. Set to 0 to disable ' 84 'cloning.'), 85 cfg.IntOpt('rbd_store_chunk_size', default=4, 86 help='Volumes will be chunked into objects of this size ' 87 '(in megabytes).'), 88 cfg.IntOpt('rados_connect_timeout', default=-1, 89 help='Timeout value (in seconds) used when connecting to ' 90 'ceph cluster. If value < 0, no timeout is set and ' 91 'default librados value is used.'), 92 cfg.IntOpt('rados_connection_retries', default=3, 93 help='Number of retries if connection to ceph cluster ' 94 'failed.'), 95 cfg.IntOpt('rados_connection_interval', default=5, 96 help='Interval value (in seconds) between connection ' 97 'retries to ceph cluster.'), 98 cfg.IntOpt('replication_connect_timeout', default=5, 99 help='Timeout value (in seconds) used when connecting to ' 100 'ceph cluster to do a demotion/promotion of volumes. ' 101 'If value < 0, no timeout is set and default librados ' 102 'value is used.'), 103 cfg.BoolOpt('report_dynamic_total_capacity', default=True, 104 help='Set to True for driver to report total capacity as a ' 105 'dynamic value -used + current free- and to False to ' 106 'report a static value -quota max bytes if defined and ' 107 'global size of cluster if not-.'), 108 cfg.BoolOpt('rbd_exclusive_cinder_pool', default=False, 109 help="Set to True if the pool is used exclusively by Cinder. " 110 "On exclusive use driver won't query images' provisioned " 111 "size as they will match the value calculated by the " 112 "Cinder core code for allocated_capacity_gb. This " 113 "reduces the load on the Ceph cluster as well as on the " 114 "volume service."), 115] 116 117CONF = cfg.CONF 118CONF.register_opts(RBD_OPTS, group=configuration.SHARED_CONF_GROUP) 119 120EXTRA_SPECS_REPL_ENABLED = "replication_enabled" 121 122 123class RBDVolumeProxy(object): 124 """Context manager for dealing with an existing rbd volume. 125 126 This handles connecting to rados and opening an ioctx automatically, and 127 otherwise acts like a librbd Image object. 128 129 Also this may reuse an external connection (client and ioctx args), but 130 note, that caller will be responsible for opening/closing connection. 131 Also `pool`, `remote`, `timeout` args will be ignored in that case. 132 133 The underlying librados client and ioctx can be accessed as the attributes 134 'client' and 'ioctx'. 135 """ 136 def __init__(self, driver, name, pool=None, snapshot=None, 137 read_only=False, remote=None, timeout=None, 138 client=None, ioctx=None): 139 self._close_conn = not (client and ioctx) 140 rados_client, rados_ioctx = driver._connect_to_rados( 141 pool, remote, timeout) if self._close_conn else (client, ioctx) 142 143 if snapshot is not None: 144 snapshot = utils.convert_str(snapshot) 145 try: 146 self.volume = driver.rbd.Image(rados_ioctx, 147 utils.convert_str(name), 148 snapshot=snapshot, 149 read_only=read_only) 150 self.volume = tpool.Proxy(self.volume) 151 except driver.rbd.Error: 152 if self._close_conn: 153 driver._disconnect_from_rados(rados_client, rados_ioctx) 154 raise 155 self.driver = driver 156 self.client = rados_client 157 self.ioctx = rados_ioctx 158 159 def __enter__(self): 160 return self 161 162 def __exit__(self, type_, value, traceback): 163 try: 164 self.volume.close() 165 finally: 166 if self._close_conn: 167 self.driver._disconnect_from_rados(self.client, self.ioctx) 168 169 def __getattr__(self, attrib): 170 return getattr(self.volume, attrib) 171 172 173class RADOSClient(object): 174 """Context manager to simplify error handling for connecting to ceph.""" 175 def __init__(self, driver, pool=None): 176 self.driver = driver 177 self.cluster, self.ioctx = driver._connect_to_rados(pool) 178 179 def __enter__(self): 180 return self 181 182 def __exit__(self, type_, value, traceback): 183 self.driver._disconnect_from_rados(self.cluster, self.ioctx) 184 185 @property 186 def features(self): 187 features = self.cluster.conf_get('rbd_default_features') 188 if ((features is None) or (int(features) == 0)): 189 features = self.driver.rbd.RBD_FEATURE_LAYERING 190 return int(features) 191 192 193@interface.volumedriver 194class RBDDriver(driver.CloneableImageVD, 195 driver.MigrateVD, driver.ManageableVD, driver.BaseVD): 196 """Implements RADOS block device (RBD) volume commands.""" 197 198 VERSION = '1.2.0' 199 200 # ThirdPartySystems wiki page 201 CI_WIKI_NAME = "Cinder_Jenkins" 202 203 SYSCONFDIR = '/usr/local/etc/ceph/' 204 205 def __init__(self, active_backend_id=None, *args, **kwargs): 206 super(RBDDriver, self).__init__(*args, **kwargs) 207 self.configuration.append_config_values(RBD_OPTS) 208 self._stats = {} 209 # allow overrides for testing 210 self.rados = kwargs.get('rados', rados) 211 self.rbd = kwargs.get('rbd', rbd) 212 213 # All string args used with librbd must be None or utf-8 otherwise 214 # librbd will break. 215 for attr in ['rbd_cluster_name', 'rbd_user', 216 'rbd_ceph_conf', 'rbd_pool']: 217 val = getattr(self.configuration, attr) 218 if val is not None: 219 setattr(self.configuration, attr, utils.convert_str(val)) 220 221 self._backend_name = (self.configuration.volume_backend_name or 222 self.__class__.__name__) 223 self._active_backend_id = active_backend_id 224 self._active_config = {} 225 self._is_replication_enabled = False 226 self._replication_targets = [] 227 self._target_names = [] 228 229 def _get_target_config(self, target_id): 230 """Get a replication target from known replication targets.""" 231 for target in self._replication_targets: 232 if target['name'] == target_id: 233 return target 234 if not target_id or target_id == 'default': 235 return { 236 'name': self.configuration.rbd_cluster_name, 237 'conf': self.configuration.rbd_ceph_conf, 238 'user': self.configuration.rbd_user 239 } 240 raise exception.InvalidReplicationTarget( 241 reason=_('RBD: Unknown failover target host %s.') % target_id) 242 243 def do_setup(self, context): 244 """Performs initialization steps that could raise exceptions.""" 245 self._do_setup_replication() 246 self._active_config = self._get_target_config(self._active_backend_id) 247 248 def _do_setup_replication(self): 249 replication_devices = self.configuration.safe_get( 250 'replication_device') 251 if replication_devices: 252 self._parse_replication_configs(replication_devices) 253 self._is_replication_enabled = True 254 self._target_names.append('default') 255 256 def _parse_replication_configs(self, replication_devices): 257 for replication_device in replication_devices: 258 if 'backend_id' not in replication_device: 259 msg = _('Missing backend_id in replication_device ' 260 'configuration.') 261 raise exception.InvalidConfigurationValue(msg) 262 263 name = replication_device['backend_id'] 264 conf = replication_device.get('conf', 265 self.SYSCONFDIR + name + '.conf') 266 user = replication_device.get( 267 'user', self.configuration.rbd_user or 'cinder') 268 # Pool has to be the same in all clusters 269 replication_target = {'name': name, 270 'conf': utils.convert_str(conf), 271 'user': utils.convert_str(user)} 272 LOG.info('Adding replication target: %s.', name) 273 self._replication_targets.append(replication_target) 274 self._target_names.append(name) 275 276 def _get_config_tuple(self, remote=None): 277 if not remote: 278 remote = self._active_config 279 return (remote.get('name'), remote.get('conf'), remote.get('user')) 280 281 def check_for_setup_error(self): 282 """Returns an error if prerequisites aren't met.""" 283 if rados is None: 284 msg = _('rados and rbd python libraries not found') 285 raise exception.VolumeBackendAPIException(data=msg) 286 287 for attr in ['rbd_cluster_name', 'rbd_pool']: 288 val = getattr(self.configuration, attr) 289 if not val: 290 raise exception.InvalidConfigurationValue(option=attr, 291 value=val) 292 # NOTE: Checking connection to ceph 293 # RADOSClient __init__ method invokes _connect_to_rados 294 # so no need to check for self.rados.Error here. 295 with RADOSClient(self): 296 pass 297 298 def RBDProxy(self): 299 return tpool.Proxy(self.rbd.RBD()) 300 301 def _ceph_args(self): 302 args = [] 303 304 name, conf, user = self._get_config_tuple() 305 306 if user: 307 args.extend(['--id', user]) 308 if name: 309 args.extend(['--cluster', name]) 310 if conf: 311 args.extend(['--conf', conf]) 312 313 return args 314 315 def _connect_to_rados(self, pool=None, remote=None, timeout=None): 316 @utils.retry(exception.VolumeBackendAPIException, 317 self.configuration.rados_connection_interval, 318 self.configuration.rados_connection_retries) 319 def _do_conn(pool, remote, timeout): 320 name, conf, user = self._get_config_tuple(remote) 321 322 if pool is not None: 323 pool = utils.convert_str(pool) 324 else: 325 pool = self.configuration.rbd_pool 326 327 if timeout is None: 328 timeout = self.configuration.rados_connect_timeout 329 330 LOG.debug("connecting to %(name)s (timeout=%(timeout)s).", 331 {'name': name, 'timeout': timeout}) 332 333 client = self.rados.Rados(rados_id=user, 334 clustername=name, 335 conffile=conf) 336 337 try: 338 if timeout >= 0: 339 timeout = six.text_type(timeout) 340 client.conf_set('rados_osd_op_timeout', timeout) 341 client.conf_set('rados_mon_op_timeout', timeout) 342 client.conf_set('client_mount_timeout', timeout) 343 344 client.connect() 345 ioctx = client.open_ioctx(pool) 346 return client, ioctx 347 except self.rados.Error: 348 msg = _("Error connecting to ceph cluster.") 349 LOG.exception(msg) 350 client.shutdown() 351 raise exception.VolumeBackendAPIException(data=msg) 352 353 return _do_conn(pool, remote, timeout) 354 355 def _disconnect_from_rados(self, client, ioctx): 356 # closing an ioctx cannot raise an exception 357 ioctx.close() 358 client.shutdown() 359 360 def _get_backup_snaps(self, rbd_image): 361 """Get list of any backup snapshots that exist on this volume. 362 363 There should only ever be one but accept all since they need to be 364 deleted before the volume can be. 365 """ 366 # NOTE(dosaboy): we do the import here otherwise we get import conflict 367 # issues between the rbd driver and the ceph backup driver. These 368 # issues only seem to occur when NOT using them together and are 369 # triggered when the ceph backup driver imports the rbd volume driver. 370 from cinder.backup.drivers import ceph 371 return ceph.CephBackupDriver.get_backup_snaps(rbd_image) 372 373 def _get_mon_addrs(self): 374 args = ['ceph', 'mon', 'dump', '--format=json'] 375 args.extend(self._ceph_args()) 376 out, _ = self._execute(*args) 377 lines = out.split('\n') 378 if lines[0].startswith('dumped monmap epoch'): 379 lines = lines[1:] 380 monmap = json.loads('\n'.join(lines)) 381 addrs = [mon['addr'] for mon in monmap['mons']] 382 hosts = [] 383 ports = [] 384 for addr in addrs: 385 host_port = addr[:addr.rindex('/')] 386 host, port = host_port.rsplit(':', 1) 387 hosts.append(host.strip('[]')) 388 ports.append(port) 389 return hosts, ports 390 391 def _get_usage_info(self): 392 """Calculate provisioned volume space in GiB. 393 394 Stats report should send provisioned size of volumes (snapshot must not 395 be included) and not the physical size of those volumes. 396 397 We must include all volumes, not only Cinder created volumes, because 398 Cinder created volumes are reported by the Cinder core code as 399 allocated_capacity_gb. 400 """ 401 total_provisioned = 0 402 with RADOSClient(self) as client: 403 for t in self.RBDProxy().list(client.ioctx): 404 try: 405 with RBDVolumeProxy(self, t, read_only=True, 406 client=client.cluster, 407 ioctx=client.ioctx) as v: 408 size = v.size() 409 except (self.rbd.ImageNotFound, self.rbd.OSError): 410 LOG.debug("Image %s is not found.", t) 411 else: 412 total_provisioned += size 413 414 total_provisioned = math.ceil(float(total_provisioned) / units.Gi) 415 return total_provisioned 416 417 def _get_pool_stats(self): 418 """Gets pool free and total capacity in GiB. 419 420 Calculate free and total capacity of the pool based on the pool's 421 defined quota and pools stats. 422 423 Returns a tuple with (free, total) where they are either unknown or a 424 real number with a 2 digit precision. 425 """ 426 pool_name = self.configuration.rbd_pool 427 428 with RADOSClient(self) as client: 429 ret, df_outbuf, __ = client.cluster.mon_command( 430 '{"prefix":"df", "format":"json"}', '') 431 if ret: 432 LOG.warning('Unable to get rados pool stats.') 433 return 'unknown', 'unknown' 434 435 ret, quota_outbuf, __ = client.cluster.mon_command( 436 '{"prefix":"osd pool get-quota", "pool": "%s",' 437 ' "format":"json"}' % pool_name, '') 438 if ret: 439 LOG.warning('Unable to get rados pool quotas.') 440 return 'unknown', 'unknown' 441 442 df_outbuf = encodeutils.safe_decode(df_outbuf) 443 df_data = json.loads(df_outbuf) 444 pool_stats = [pool for pool in df_data['pools'] 445 if pool['name'] == pool_name][0]['stats'] 446 447 quota_outbuf = encodeutils.safe_decode(quota_outbuf) 448 bytes_quota = json.loads(quota_outbuf)['quota_max_bytes'] 449 # With quota the total is the quota limit and free is quota - used 450 if bytes_quota: 451 total_capacity = bytes_quota 452 free_capacity = max(min(total_capacity - pool_stats['bytes_used'], 453 pool_stats['max_avail']), 454 0) 455 # Without quota free is pools max available and total is global size 456 else: 457 total_capacity = df_data['stats']['total_bytes'] 458 free_capacity = pool_stats['max_avail'] 459 460 # If we want dynamic total capacity (default behavior) 461 if self.configuration.safe_get('report_dynamic_total_capacity'): 462 total_capacity = free_capacity + pool_stats['bytes_used'] 463 464 free_capacity = round((float(free_capacity) / units.Gi), 2) 465 total_capacity = round((float(total_capacity) / units.Gi), 2) 466 467 return free_capacity, total_capacity 468 469 def _update_volume_stats(self): 470 location_info = '%s:%s:%s:%s:%s' % ( 471 self.configuration.rbd_cluster_name, 472 self.configuration.rbd_ceph_conf, 473 self._get_fsid(), 474 self.configuration.rbd_user, 475 self.configuration.rbd_pool) 476 477 stats = { 478 'vendor_name': 'Open Source', 479 'driver_version': self.VERSION, 480 'storage_protocol': 'ceph', 481 'total_capacity_gb': 'unknown', 482 'free_capacity_gb': 'unknown', 483 'reserved_percentage': ( 484 self.configuration.safe_get('reserved_percentage')), 485 'multiattach': False, 486 'thin_provisioning_support': True, 487 'max_over_subscription_ratio': ( 488 self.configuration.safe_get('max_over_subscription_ratio')), 489 'location_info': location_info, 490 } 491 492 backend_name = self.configuration.safe_get('volume_backend_name') 493 stats['volume_backend_name'] = backend_name or 'RBD' 494 495 stats['replication_enabled'] = self._is_replication_enabled 496 if self._is_replication_enabled: 497 stats['replication_targets'] = self._target_names 498 499 try: 500 free_capacity, total_capacity = self._get_pool_stats() 501 stats['free_capacity_gb'] = free_capacity 502 stats['total_capacity_gb'] = total_capacity 503 504 # For exclusive pools let scheduler set provisioned_capacity_gb to 505 # allocated_capacity_gb, and for non exclusive query the value. 506 if not self.configuration.safe_get('rbd_exclusive_cinder_pool'): 507 total_gbi = self._get_usage_info() 508 stats['provisioned_capacity_gb'] = total_gbi 509 except self.rados.Error: 510 # just log and return unknown capacities and let scheduler set 511 # provisioned_capacity_gb = allocated_capacity_gb 512 LOG.exception('error refreshing volume stats') 513 self._stats = stats 514 515 def get_volume_stats(self, refresh=False): 516 """Return the current state of the volume service. 517 518 If 'refresh' is True, run the update first. 519 """ 520 if refresh: 521 self._update_volume_stats() 522 return self._stats 523 524 def _get_clone_depth(self, client, volume_name, depth=0): 525 """Returns the number of ancestral clones of the given volume.""" 526 parent_volume = self.rbd.Image(client.ioctx, volume_name) 527 try: 528 _pool, parent, _snap = self._get_clone_info(parent_volume, 529 volume_name) 530 finally: 531 parent_volume.close() 532 533 if not parent: 534 return depth 535 536 # If clone depth was reached, flatten should have occurred so if it has 537 # been exceeded then something has gone wrong. 538 if depth > self.configuration.rbd_max_clone_depth: 539 raise Exception(_("clone depth exceeds limit of %s") % 540 (self.configuration.rbd_max_clone_depth)) 541 542 return self._get_clone_depth(client, parent, depth + 1) 543 544 def _extend_if_required(self, volume, src_vref): 545 """Extends a volume if required 546 547 In case src_vref size is smaller than the size if the requested 548 new volume call _resize(). 549 """ 550 if volume.size != src_vref.size: 551 LOG.debug("resize volume '%(dst_vol)s' from %(src_size)d to " 552 "%(dst_size)d", 553 {'dst_vol': volume.name, 'src_size': src_vref.size, 554 'dst_size': volume.size}) 555 self._resize(volume) 556 557 def create_cloned_volume(self, volume, src_vref): 558 """Create a cloned volume from another volume. 559 560 Since we are cloning from a volume and not a snapshot, we must first 561 create a snapshot of the source volume. 562 563 The user has the option to limit how long a volume's clone chain can be 564 by setting rbd_max_clone_depth. If a clone is made of another clone 565 and that clone has rbd_max_clone_depth clones behind it, the dest 566 volume will be flattened. 567 """ 568 src_name = utils.convert_str(src_vref.name) 569 dest_name = utils.convert_str(volume.name) 570 clone_snap = "%s.clone_snap" % dest_name 571 572 # Do full copy if requested 573 if self.configuration.rbd_max_clone_depth <= 0: 574 with RBDVolumeProxy(self, src_name, read_only=True) as vol: 575 vol.copy(vol.ioctx, dest_name) 576 self._extend_if_required(volume, src_vref) 577 return 578 579 # Otherwise do COW clone. 580 with RADOSClient(self) as client: 581 src_volume = self.rbd.Image(client.ioctx, src_name) 582 LOG.debug("creating snapshot='%s'", clone_snap) 583 try: 584 # Create new snapshot of source volume 585 src_volume.create_snap(clone_snap) 586 src_volume.protect_snap(clone_snap) 587 # Now clone source volume snapshot 588 LOG.debug("cloning '%(src_vol)s@%(src_snap)s' to " 589 "'%(dest)s'", 590 {'src_vol': src_name, 'src_snap': clone_snap, 591 'dest': dest_name}) 592 self.RBDProxy().clone(client.ioctx, src_name, clone_snap, 593 client.ioctx, dest_name, 594 features=client.features) 595 except Exception as e: 596 src_volume.unprotect_snap(clone_snap) 597 src_volume.remove_snap(clone_snap) 598 msg = (_("Failed to clone '%(src_vol)s@%(src_snap)s' to " 599 "'%(dest)s', error: %(error)s") % 600 {'src_vol': src_name, 601 'src_snap': clone_snap, 602 'dest': dest_name, 603 'error': e}) 604 LOG.exception(msg) 605 raise exception.VolumeBackendAPIException(data=msg) 606 finally: 607 src_volume.close() 608 609 depth = self._get_clone_depth(client, src_name) 610 # If dest volume is a clone and rbd_max_clone_depth reached, 611 # flatten the dest after cloning. Zero rbd_max_clone_depth means 612 # infinite is allowed. 613 if depth >= self.configuration.rbd_max_clone_depth: 614 LOG.info("maximum clone depth (%d) has been reached - " 615 "flattening dest volume", 616 self.configuration.rbd_max_clone_depth) 617 dest_volume = self.rbd.Image(client.ioctx, dest_name) 618 try: 619 # Flatten destination volume 620 LOG.debug("flattening dest volume %s", dest_name) 621 dest_volume.flatten() 622 except Exception as e: 623 msg = (_("Failed to flatten volume %(volume)s with " 624 "error: %(error)s.") % 625 {'volume': dest_name, 626 'error': e}) 627 LOG.exception(msg) 628 src_volume.close() 629 raise exception.VolumeBackendAPIException(data=msg) 630 finally: 631 dest_volume.close() 632 633 try: 634 # remove temporary snap 635 LOG.debug("remove temporary snap %s", clone_snap) 636 src_volume.unprotect_snap(clone_snap) 637 src_volume.remove_snap(clone_snap) 638 except Exception as e: 639 msg = (_("Failed to remove temporary snap " 640 "%(snap_name)s, error: %(error)s") % 641 {'snap_name': clone_snap, 642 'error': e}) 643 LOG.exception(msg) 644 src_volume.close() 645 raise exception.VolumeBackendAPIException(data=msg) 646 647 try: 648 volume_update = self._enable_replication_if_needed(volume) 649 except Exception: 650 self.RBDProxy().remove(client.ioctx, dest_name) 651 src_volume.unprotect_snap(clone_snap) 652 src_volume.remove_snap(clone_snap) 653 err_msg = (_('Failed to enable image replication')) 654 raise exception.ReplicationError(reason=err_msg, 655 volume_id=volume.id) 656 finally: 657 src_volume.close() 658 659 self._extend_if_required(volume, src_vref) 660 661 LOG.debug("clone created successfully") 662 return volume_update 663 664 def _enable_replication(self, volume): 665 """Enable replication for a volume. 666 667 Returns required volume update. 668 """ 669 vol_name = utils.convert_str(volume.name) 670 with RBDVolumeProxy(self, vol_name) as image: 671 had_exclusive_lock = (image.features() & 672 self.rbd.RBD_FEATURE_EXCLUSIVE_LOCK) 673 had_journaling = image.features() & self.rbd.RBD_FEATURE_JOURNALING 674 if not had_exclusive_lock: 675 image.update_features(self.rbd.RBD_FEATURE_EXCLUSIVE_LOCK, 676 True) 677 if not had_journaling: 678 image.update_features(self.rbd.RBD_FEATURE_JOURNALING, True) 679 image.mirror_image_enable() 680 681 driver_data = self._dumps({ 682 'had_journaling': bool(had_journaling), 683 'had_exclusive_lock': bool(had_exclusive_lock) 684 }) 685 return {'replication_status': fields.ReplicationStatus.ENABLED, 686 'replication_driver_data': driver_data} 687 688 def _is_replicated_type(self, volume_type): 689 # We do a safe attribute get because volume_type could be None 690 specs = getattr(volume_type, 'extra_specs', {}) 691 return specs.get(EXTRA_SPECS_REPL_ENABLED) == "<is> True" 692 693 def _enable_replication_if_needed(self, volume): 694 if self._is_replicated_type(volume.volume_type): 695 return self._enable_replication(volume) 696 if self._is_replication_enabled: 697 return {'replication_status': fields.ReplicationStatus.DISABLED} 698 return None 699 700 def _check_encryption_provider(self, volume, context): 701 """Check that this is a LUKS encryption provider. 702 703 :returns: encryption dict 704 """ 705 706 encryption = self.db.volume_encryption_metadata_get(context, volume.id) 707 provider = encryption['provider'] 708 if provider in encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP: 709 provider = encryptors.LEGACY_PROVIDER_CLASS_TO_FORMAT_MAP[provider] 710 if provider != encryptors.LUKS: 711 message = _("Provider %s not supported.") % provider 712 raise exception.VolumeDriverException(message=message) 713 714 if 'cipher' not in encryption or 'key_size' not in encryption: 715 msg = _('encryption spec must contain "cipher" and' 716 '"key_size"') 717 raise exception.VolumeDriverException(message=msg) 718 719 return encryption 720 721 def _create_encrypted_volume(self, volume, context): 722 """Create an encrypted volume. 723 724 This works by creating an encrypted image locally, 725 and then uploading it to the volume. 726 """ 727 728 encryption = self._check_encryption_provider(volume, context) 729 730 # Fetch the key associated with the volume and decode the passphrase 731 keymgr = key_manager.API(CONF) 732 key = keymgr.get(context, encryption['encryption_key_id']) 733 passphrase = binascii.hexlify(key.get_encoded()).decode('utf-8') 734 735 # create a file 736 tmp_dir = self._image_conversion_dir() 737 738 with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp_image: 739 with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp_key: 740 with open(tmp_key.name, 'w') as f: 741 f.write(passphrase) 742 743 cipher_spec = image_utils.decode_cipher(encryption['cipher'], 744 encryption['key_size']) 745 746 create_cmd = ( 747 'qemu-img', 'create', '-f', 'luks', 748 '-o', 'cipher-alg=%(cipher_alg)s,' 749 'cipher-mode=%(cipher_mode)s,' 750 'ivgen-alg=%(ivgen_alg)s' % cipher_spec, 751 '--object', 'secret,id=luks_sec,' 752 'format=raw,file=%(passfile)s' % {'passfile': 753 tmp_key.name}, 754 '-o', 'key-secret=luks_sec', 755 tmp_image.name, 756 '%sM' % (volume.size * 1024)) 757 self._execute(*create_cmd) 758 759 # Copy image into RBD 760 chunk_size = self.configuration.rbd_store_chunk_size * units.Mi 761 order = int(math.log(chunk_size, 2)) 762 763 cmd = ['rbd', 'import', 764 '--pool', self.configuration.rbd_pool, 765 '--order', order, 766 tmp_image.name, volume.name] 767 cmd.extend(self._ceph_args()) 768 self._execute(*cmd) 769 770 def create_volume(self, volume): 771 """Creates a logical volume.""" 772 773 if volume.encryption_key_id: 774 return self._create_encrypted_volume(volume, volume.obj_context) 775 776 size = int(volume.size) * units.Gi 777 778 LOG.debug("creating volume '%s'", volume.name) 779 780 chunk_size = self.configuration.rbd_store_chunk_size * units.Mi 781 order = int(math.log(chunk_size, 2)) 782 vol_name = utils.convert_str(volume.name) 783 784 with RADOSClient(self) as client: 785 self.RBDProxy().create(client.ioctx, 786 vol_name, 787 size, 788 order, 789 old_format=False, 790 features=client.features) 791 792 try: 793 volume_update = self._enable_replication_if_needed(volume) 794 except Exception: 795 self.RBDProxy().remove(client.ioctx, vol_name) 796 err_msg = (_('Failed to enable image replication')) 797 raise exception.ReplicationError(reason=err_msg, 798 volume_id=volume.id) 799 return volume_update 800 801 def _flatten(self, pool, volume_name): 802 LOG.debug('flattening %(pool)s/%(img)s', 803 dict(pool=pool, img=volume_name)) 804 with RBDVolumeProxy(self, volume_name, pool) as vol: 805 vol.flatten() 806 807 def _clone(self, volume, src_pool, src_image, src_snap): 808 LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s', 809 dict(pool=src_pool, img=src_image, snap=src_snap, 810 dst=volume.name)) 811 812 chunk_size = self.configuration.rbd_store_chunk_size * units.Mi 813 order = int(math.log(chunk_size, 2)) 814 vol_name = utils.convert_str(volume.name) 815 816 with RADOSClient(self, src_pool) as src_client: 817 with RADOSClient(self) as dest_client: 818 self.RBDProxy().clone(src_client.ioctx, 819 utils.convert_str(src_image), 820 utils.convert_str(src_snap), 821 dest_client.ioctx, 822 vol_name, 823 features=src_client.features, 824 order=order) 825 826 try: 827 volume_update = self._enable_replication_if_needed(volume) 828 except Exception: 829 self.RBDProxy().remove(dest_client.ioctx, vol_name) 830 err_msg = (_('Failed to enable image replication')) 831 raise exception.ReplicationError(reason=err_msg, 832 volume_id=volume.id) 833 return volume_update or {} 834 835 def _resize(self, volume, **kwargs): 836 size = kwargs.get('size', None) 837 if not size: 838 size = int(volume.size) * units.Gi 839 840 with RBDVolumeProxy(self, volume.name) as vol: 841 vol.resize(size) 842 843 def create_volume_from_snapshot(self, volume, snapshot): 844 """Creates a volume from a snapshot.""" 845 volume_update = self._clone(volume, self.configuration.rbd_pool, 846 snapshot.volume_name, snapshot.name) 847 if self.configuration.rbd_flatten_volume_from_snapshot: 848 self._flatten(self.configuration.rbd_pool, volume.name) 849 if int(volume.size): 850 self._resize(volume) 851 return volume_update 852 853 def _delete_backup_snaps(self, rbd_image): 854 backup_snaps = self._get_backup_snaps(rbd_image) 855 if backup_snaps: 856 for snap in backup_snaps: 857 rbd_image.remove_snap(snap['name']) 858 else: 859 LOG.debug("volume has no backup snaps") 860 861 def _get_clone_info(self, volume, volume_name, snap=None): 862 """If volume is a clone, return its parent info. 863 864 Returns a tuple of (pool, parent, snap). A snapshot may optionally be 865 provided for the case where a cloned volume has been flattened but it's 866 snapshot still depends on the parent. 867 """ 868 try: 869 if snap: 870 volume.set_snap(snap) 871 pool, parent, parent_snap = tuple(volume.parent_info()) 872 if snap: 873 volume.set_snap(None) 874 # Strip the tag off the end of the volume name since it will not be 875 # in the snap name. 876 if volume_name.endswith('.deleted'): 877 volume_name = volume_name[:-len('.deleted')] 878 # Now check the snap name matches. 879 if parent_snap == "%s.clone_snap" % volume_name: 880 return pool, parent, parent_snap 881 except self.rbd.ImageNotFound: 882 LOG.debug("Volume %s is not a clone.", volume_name) 883 volume.set_snap(None) 884 885 return (None, None, None) 886 887 def _get_children_info(self, volume, snap): 888 """List children for the given snapshot of a volume(image). 889 890 Returns a list of (pool, image). 891 """ 892 893 children_list = [] 894 895 if snap: 896 volume.set_snap(snap) 897 children_list = volume.list_children() 898 volume.set_snap(None) 899 900 return children_list 901 902 def _delete_clone_parent_refs(self, client, parent_name, parent_snap): 903 """Walk back up the clone chain and delete references. 904 905 Deletes references i.e. deleted parent volumes and snapshots. 906 """ 907 parent_rbd = self.rbd.Image(client.ioctx, parent_name) 908 parent_has_snaps = False 909 try: 910 # Check for grandparent 911 _pool, g_parent, g_parent_snap = self._get_clone_info(parent_rbd, 912 parent_name, 913 parent_snap) 914 915 LOG.debug("deleting parent snapshot %s", parent_snap) 916 parent_rbd.unprotect_snap(parent_snap) 917 parent_rbd.remove_snap(parent_snap) 918 919 parent_has_snaps = bool(list(parent_rbd.list_snaps())) 920 finally: 921 parent_rbd.close() 922 923 # If parent has been deleted in Cinder, delete the silent reference and 924 # keep walking up the chain if it is itself a clone. 925 if (not parent_has_snaps) and parent_name.endswith('.deleted'): 926 LOG.debug("deleting parent %s", parent_name) 927 self.RBDProxy().remove(client.ioctx, parent_name) 928 929 # Now move up to grandparent if there is one 930 if g_parent: 931 self._delete_clone_parent_refs(client, g_parent, g_parent_snap) 932 933 def delete_volume(self, volume): 934 """Deletes a logical volume.""" 935 # NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are 936 # utf-8 otherwise librbd will barf. 937 volume_name = utils.convert_str(volume.name) 938 with RADOSClient(self) as client: 939 try: 940 rbd_image = self.rbd.Image(client.ioctx, volume_name) 941 except self.rbd.ImageNotFound: 942 LOG.info("volume %s no longer exists in backend", 943 volume_name) 944 return 945 946 clone_snap = None 947 parent = None 948 949 # Ensure any backup snapshots are deleted 950 self._delete_backup_snaps(rbd_image) 951 952 # If the volume has non-clone snapshots this delete is expected to 953 # raise VolumeIsBusy so do so straight away. 954 try: 955 snaps = rbd_image.list_snaps() 956 for snap in snaps: 957 if snap['name'].endswith('.clone_snap'): 958 LOG.debug("volume has clone snapshot(s)") 959 # We grab one of these and use it when fetching parent 960 # info in case the volume has been flattened. 961 clone_snap = snap['name'] 962 break 963 964 raise exception.VolumeIsBusy(volume_name=volume_name) 965 966 # Determine if this volume is itself a clone 967 _pool, parent, parent_snap = self._get_clone_info(rbd_image, 968 volume_name, 969 clone_snap) 970 finally: 971 rbd_image.close() 972 973 @utils.retry(self.rbd.ImageBusy, 974 self.configuration.rados_connection_interval, 975 self.configuration.rados_connection_retries) 976 def _try_remove_volume(client, volume_name): 977 self.RBDProxy().remove(client.ioctx, volume_name) 978 979 if clone_snap is None: 980 LOG.debug("deleting rbd volume %s", volume_name) 981 try: 982 _try_remove_volume(client, volume_name) 983 except self.rbd.ImageBusy: 984 msg = (_("ImageBusy error raised while deleting rbd " 985 "volume. This may have been caused by a " 986 "connection from a client that has crashed and, " 987 "if so, may be resolved by retrying the delete " 988 "after 30 seconds has elapsed.")) 989 LOG.warning(msg) 990 # Now raise this so that volume stays available so that we 991 # delete can be retried. 992 raise exception.VolumeIsBusy(msg, volume_name=volume_name) 993 except self.rbd.ImageNotFound: 994 LOG.info("RBD volume %s not found, allowing delete " 995 "operation to proceed.", volume_name) 996 return 997 998 # If it is a clone, walk back up the parent chain deleting 999 # references. 1000 if parent: 1001 LOG.debug("volume is a clone so cleaning references") 1002 self._delete_clone_parent_refs(client, parent, parent_snap) 1003 else: 1004 # If the volume has copy-on-write clones we will not be able to 1005 # delete it. Instead we will keep it as a silent volume which 1006 # will be deleted when it's snapshot and clones are deleted. 1007 new_name = "%s.deleted" % (volume_name) 1008 self.RBDProxy().rename(client.ioctx, volume_name, new_name) 1009 1010 def create_snapshot(self, snapshot): 1011 """Creates an rbd snapshot.""" 1012 with RBDVolumeProxy(self, snapshot.volume_name) as volume: 1013 snap = utils.convert_str(snapshot.name) 1014 volume.create_snap(snap) 1015 volume.protect_snap(snap) 1016 1017 def delete_snapshot(self, snapshot): 1018 """Deletes an rbd snapshot.""" 1019 # NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are 1020 # utf-8 otherwise librbd will barf. 1021 volume_name = utils.convert_str(snapshot.volume_name) 1022 snap_name = utils.convert_str(snapshot.name) 1023 1024 with RBDVolumeProxy(self, volume_name) as volume: 1025 try: 1026 volume.unprotect_snap(snap_name) 1027 except self.rbd.InvalidArgument: 1028 LOG.info( 1029 "InvalidArgument: Unable to unprotect snapshot %s.", 1030 snap_name) 1031 except self.rbd.ImageNotFound: 1032 LOG.info( 1033 "ImageNotFound: Unable to unprotect snapshot %s.", 1034 snap_name) 1035 except self.rbd.ImageBusy: 1036 children_list = self._get_children_info(volume, snap_name) 1037 1038 if children_list: 1039 for (pool, image) in children_list: 1040 LOG.info('Image %(pool)s/%(image)s is dependent ' 1041 'on the snapshot %(snap)s.', 1042 {'pool': pool, 1043 'image': image, 1044 'snap': snap_name}) 1045 1046 raise exception.SnapshotIsBusy(snapshot_name=snap_name) 1047 try: 1048 volume.remove_snap(snap_name) 1049 except self.rbd.ImageNotFound: 1050 LOG.info("Snapshot %s does not exist in backend.", 1051 snap_name) 1052 1053 def _disable_replication(self, volume): 1054 """Disable replication on the given volume.""" 1055 vol_name = utils.convert_str(volume.name) 1056 with RBDVolumeProxy(self, vol_name) as image: 1057 image.mirror_image_disable(False) 1058 driver_data = json.loads(volume.replication_driver_data) 1059 # If 'journaling' and/or 'exclusive-lock' have 1060 # been enabled in '_enable_replication', 1061 # they will be disabled here. If not, it will keep 1062 # what it was before. 1063 if not driver_data['had_journaling']: 1064 image.update_features(self.rbd.RBD_FEATURE_JOURNALING, False) 1065 if not driver_data['had_exclusive_lock']: 1066 image.update_features(self.rbd.RBD_FEATURE_EXCLUSIVE_LOCK, 1067 False) 1068 return {'replication_status': fields.ReplicationStatus.DISABLED, 1069 'replication_driver_data': None} 1070 1071 def retype(self, context, volume, new_type, diff, host): 1072 """Retype from one volume type to another on the same backend.""" 1073 old_vol_replicated = self._is_replicated_type(volume.volume_type) 1074 new_vol_replicated = self._is_replicated_type(new_type) 1075 1076 if old_vol_replicated and not new_vol_replicated: 1077 try: 1078 return True, self._disable_replication(volume) 1079 except Exception: 1080 err_msg = (_('Failed to disable image replication')) 1081 raise exception.ReplicationError(reason=err_msg, 1082 volume_id=volume.id) 1083 elif not old_vol_replicated and new_vol_replicated: 1084 try: 1085 return True, self._enable_replication(volume) 1086 except Exception: 1087 err_msg = (_('Failed to enable image replication')) 1088 raise exception.ReplicationError(reason=err_msg, 1089 volume_id=volume.id) 1090 1091 if not new_vol_replicated and self._is_replication_enabled: 1092 update = {'replication_status': fields.ReplicationStatus.DISABLED} 1093 else: 1094 update = None 1095 return True, update 1096 1097 def _dumps(self, obj): 1098 return json.dumps(obj, separators=(',', ':'), sort_keys=True) 1099 1100 def _exec_on_volume(self, volume_name, remote, operation, *args, **kwargs): 1101 @utils.retry(rbd.ImageBusy, 1102 self.configuration.rados_connection_interval, 1103 self.configuration.rados_connection_retries) 1104 def _do_exec(): 1105 timeout = self.configuration.replication_connect_timeout 1106 with RBDVolumeProxy(self, volume_name, self.configuration.rbd_pool, 1107 remote=remote, timeout=timeout) as rbd_image: 1108 return getattr(rbd_image, operation)(*args, **kwargs) 1109 return _do_exec() 1110 1111 def _failover_volume(self, volume, remote, is_demoted, replication_status): 1112 """Process failover for a volume. 1113 1114 There are 2 different cases that will return different update values 1115 for the volume: 1116 1117 - Volume has replication enabled and failover succeeded: Set 1118 replication status to failed-over. 1119 - Volume has replication enabled and failover fails: Set status to 1120 error, replication status to failover-error, and store previous 1121 status in previous_status field. 1122 """ 1123 # Failover is allowed when volume has it enabled or it has already 1124 # failed over, because we may want to do a second failover. 1125 vol_name = utils.convert_str(volume.name) 1126 try: 1127 self._exec_on_volume(vol_name, remote, 1128 'mirror_image_promote', not is_demoted) 1129 1130 return {'volume_id': volume.id, 1131 'updates': {'replication_status': replication_status}} 1132 except Exception as e: 1133 replication_status = fields.ReplicationStatus.FAILOVER_ERROR 1134 LOG.error('Failed to failover volume %(volume)s with ' 1135 'error: %(error)s.', 1136 {'volume': volume.name, 'error': e}) 1137 1138 # Failover failed 1139 error_result = { 1140 'volume_id': volume.id, 1141 'updates': { 1142 'status': 'error', 1143 'previous_status': volume.status, 1144 'replication_status': replication_status 1145 } 1146 } 1147 1148 return error_result 1149 1150 def _demote_volumes(self, volumes, until_failure=True): 1151 """Try to demote volumes on the current primary cluster.""" 1152 result = [] 1153 try_demoting = True 1154 for volume in volumes: 1155 demoted = False 1156 if try_demoting: 1157 vol_name = utils.convert_str(volume.name) 1158 try: 1159 self._exec_on_volume(vol_name, self._active_config, 1160 'mirror_image_demote') 1161 demoted = True 1162 except Exception as e: 1163 LOG.debug('Failed to demote %(volume)s with error: ' 1164 '%(error)s.', 1165 {'volume': volume.name, 'error': e}) 1166 try_demoting = not until_failure 1167 result.append(demoted) 1168 return result 1169 1170 def _get_failover_target_config(self, secondary_id=None): 1171 if not secondary_id: 1172 # In auto mode exclude failback and active 1173 candidates = set(self._target_names).difference( 1174 ('default', self._active_backend_id)) 1175 if not candidates: 1176 raise exception.InvalidReplicationTarget( 1177 reason=_('RBD: No available failover target host.')) 1178 secondary_id = candidates.pop() 1179 return secondary_id, self._get_target_config(secondary_id) 1180 1181 def failover_host(self, context, volumes, secondary_id=None, groups=None): 1182 """Failover to replication target.""" 1183 LOG.info('RBD driver failover started.') 1184 if not self._is_replication_enabled: 1185 raise exception.UnableToFailOver( 1186 reason=_('RBD: Replication is not enabled.')) 1187 1188 if secondary_id == 'default': 1189 replication_status = fields.ReplicationStatus.ENABLED 1190 else: 1191 replication_status = fields.ReplicationStatus.FAILED_OVER 1192 1193 secondary_id, remote = self._get_failover_target_config(secondary_id) 1194 1195 # Try to demote the volumes first 1196 demotion_results = self._demote_volumes(volumes) 1197 # Do the failover taking into consideration if they have been demoted 1198 updates = [self._failover_volume(volume, remote, is_demoted, 1199 replication_status) 1200 for volume, is_demoted in zip(volumes, demotion_results)] 1201 self._active_backend_id = secondary_id 1202 self._active_config = remote 1203 LOG.info('RBD driver failover completed.') 1204 return secondary_id, updates, [] 1205 1206 def ensure_export(self, context, volume): 1207 """Synchronously recreates an export for a logical volume.""" 1208 pass 1209 1210 def create_export(self, context, volume, connector): 1211 """Exports the volume.""" 1212 pass 1213 1214 def remove_export(self, context, volume): 1215 """Removes an export for a logical volume.""" 1216 pass 1217 1218 def _get_keyring_contents(self): 1219 # NOTE(danpawlik) If keyring is not provided in Cinder configuration, 1220 # os-brick library will take keyring from default path. 1221 keyring_file = self.configuration.rbd_keyring_conf 1222 keyring_data = None 1223 try: 1224 if os.path.isfile(keyring_file): 1225 with open(keyring_file, 'r') as k_file: 1226 keyring_data = k_file.read() 1227 except IOError: 1228 LOG.debug('Cannot read RBD keyring file: %s.', keyring_file) 1229 1230 return keyring_data 1231 1232 def initialize_connection(self, volume, connector): 1233 hosts, ports = self._get_mon_addrs() 1234 data = { 1235 'driver_volume_type': 'rbd', 1236 'data': { 1237 'name': '%s/%s' % (self.configuration.rbd_pool, 1238 volume.name), 1239 'hosts': hosts, 1240 'ports': ports, 1241 'cluster_name': self.configuration.rbd_cluster_name, 1242 'auth_enabled': (self.configuration.rbd_user is not None), 1243 'auth_username': self.configuration.rbd_user, 1244 'secret_type': 'ceph', 1245 'secret_uuid': self.configuration.rbd_secret_uuid, 1246 'volume_id': volume.id, 1247 "discard": True, 1248 'keyring': self._get_keyring_contents(), 1249 } 1250 } 1251 LOG.debug('connection data: %s', data) 1252 return data 1253 1254 def terminate_connection(self, volume, connector, **kwargs): 1255 pass 1256 1257 def _parse_location(self, location): 1258 prefix = 'rbd://' 1259 if not location.startswith(prefix): 1260 reason = _('Not stored in rbd') 1261 raise exception.ImageUnacceptable(image_id=location, reason=reason) 1262 pieces = [urllib.parse.unquote(loc) 1263 for loc in location[len(prefix):].split('/')] 1264 if any(map(lambda p: p == '', pieces)): 1265 reason = _('Blank components') 1266 raise exception.ImageUnacceptable(image_id=location, reason=reason) 1267 if len(pieces) != 4: 1268 reason = _('Not an rbd snapshot') 1269 raise exception.ImageUnacceptable(image_id=location, reason=reason) 1270 return pieces 1271 1272 def _get_fsid(self): 1273 with RADOSClient(self) as client: 1274 return client.cluster.get_fsid() 1275 1276 def _is_cloneable(self, image_location, image_meta): 1277 try: 1278 fsid, pool, image, snapshot = self._parse_location(image_location) 1279 except exception.ImageUnacceptable as e: 1280 LOG.debug('not cloneable: %s.', e) 1281 return False 1282 1283 if self._get_fsid() != fsid: 1284 LOG.debug('%s is in a different ceph cluster.', image_location) 1285 return False 1286 1287 if image_meta['disk_format'] != 'raw': 1288 LOG.debug("rbd image clone requires image format to be " 1289 "'raw' but image %(image)s is '%(format)s'", 1290 {"image": image_location, 1291 "format": image_meta['disk_format']}) 1292 return False 1293 1294 # check that we can read the image 1295 try: 1296 with RBDVolumeProxy(self, image, 1297 pool=pool, 1298 snapshot=snapshot, 1299 read_only=True): 1300 return True 1301 except self.rbd.Error as e: 1302 LOG.debug('Unable to open image %(loc)s: %(err)s.', 1303 dict(loc=image_location, err=e)) 1304 return False 1305 1306 def clone_image(self, context, volume, 1307 image_location, image_meta, 1308 image_service): 1309 if image_location: 1310 # Note: image_location[0] is glance image direct_url. 1311 # image_location[1] contains the list of all locations (including 1312 # direct_url) or None if show_multiple_locations is False in 1313 # glance configuration. 1314 if image_location[1]: 1315 url_locations = [location['url'] for 1316 location in image_location[1]] 1317 else: 1318 url_locations = [image_location[0]] 1319 1320 # iterate all locations to look for a cloneable one. 1321 for url_location in url_locations: 1322 if url_location and self._is_cloneable( 1323 url_location, image_meta): 1324 _prefix, pool, image, snapshot = \ 1325 self._parse_location(url_location) 1326 volume_update = self._clone(volume, pool, image, snapshot) 1327 volume_update['provider_location'] = None 1328 self._resize(volume) 1329 return volume_update, True 1330 return ({}, False) 1331 1332 def _image_conversion_dir(self): 1333 tmpdir = (CONF.image_conversion_dir or 1334 tempfile.gettempdir()) 1335 1336 # ensure temporary directory exists 1337 if not os.path.exists(tmpdir): 1338 os.makedirs(tmpdir) 1339 1340 return tmpdir 1341 1342 def copy_image_to_encrypted_volume(self, context, volume, image_service, 1343 image_id): 1344 self._copy_image_to_volume(context, volume, image_service, image_id, 1345 encrypted=True) 1346 1347 def copy_image_to_volume(self, context, volume, image_service, image_id): 1348 self._copy_image_to_volume(context, volume, image_service, image_id) 1349 1350 def _encrypt_image(self, context, volume, tmp_dir, src_image_path): 1351 encryption = self._check_encryption_provider(volume, context) 1352 1353 # Fetch the key associated with the volume and decode the passphrase 1354 keymgr = key_manager.API(CONF) 1355 key = keymgr.get(context, encryption['encryption_key_id']) 1356 passphrase = binascii.hexlify(key.get_encoded()).decode('utf-8') 1357 1358 # Decode the dm-crypt style cipher spec into something qemu-img can use 1359 cipher_spec = image_utils.decode_cipher(encryption['cipher'], 1360 encryption['key_size']) 1361 1362 tmp_dir = self._image_conversion_dir() 1363 1364 with tempfile.NamedTemporaryFile(prefix='luks_', 1365 dir=tmp_dir) as pass_file: 1366 with open(pass_file.name, 'w') as f: 1367 f.write(passphrase) 1368 1369 # Convert the raw image to luks 1370 dest_image_path = src_image_path + '.luks' 1371 image_utils.convert_image(src_image_path, dest_image_path, 1372 'luks', src_format='raw', 1373 cipher_spec=cipher_spec, 1374 passphrase_file=pass_file.name) 1375 1376 # Replace the original image with the now encrypted image 1377 os.rename(dest_image_path, src_image_path) 1378 1379 def _copy_image_to_volume(self, context, volume, image_service, image_id, 1380 encrypted=False): 1381 1382 tmp_dir = self._image_conversion_dir() 1383 1384 with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp: 1385 image_utils.fetch_to_raw(context, image_service, image_id, 1386 tmp.name, 1387 self.configuration.volume_dd_blocksize, 1388 size=volume.size) 1389 1390 if encrypted: 1391 self._encrypt_image(context, volume, tmp_dir, tmp.name) 1392 1393 self.delete_volume(volume) 1394 1395 chunk_size = self.configuration.rbd_store_chunk_size * units.Mi 1396 order = int(math.log(chunk_size, 2)) 1397 # keep using the command line import instead of librbd since it 1398 # detects zeroes to preserve sparseness in the image 1399 args = ['rbd', 'import', 1400 '--pool', self.configuration.rbd_pool, 1401 '--order', order, 1402 tmp.name, volume.name, 1403 '--new-format'] 1404 args.extend(self._ceph_args()) 1405 self._try_execute(*args) 1406 self._resize(volume) 1407 # We may need to re-enable replication because we have deleted the 1408 # original image and created a new one using the command line import. 1409 try: 1410 self._enable_replication_if_needed(volume) 1411 except Exception: 1412 err_msg = (_('Failed to enable image replication')) 1413 raise exception.ReplicationError(reason=err_msg, 1414 volume_id=volume.id) 1415 1416 def copy_volume_to_image(self, context, volume, image_service, image_meta): 1417 tmp_dir = self._image_conversion_dir() 1418 tmp_file = os.path.join(tmp_dir, 1419 volume.name + '-' + image_meta['id']) 1420 with fileutils.remove_path_on_error(tmp_file): 1421 args = ['rbd', 'export', 1422 '--pool', self.configuration.rbd_pool, 1423 volume.name, tmp_file] 1424 args.extend(self._ceph_args()) 1425 self._try_execute(*args) 1426 image_utils.upload_volume(context, image_service, 1427 image_meta, tmp_file) 1428 os.unlink(tmp_file) 1429 1430 def extend_volume(self, volume, new_size): 1431 """Extend an existing volume.""" 1432 old_size = volume.size 1433 1434 try: 1435 size = int(new_size) * units.Gi 1436 self._resize(volume, size=size) 1437 except Exception: 1438 msg = _('Failed to Extend Volume ' 1439 '%(volname)s') % {'volname': volume.name} 1440 LOG.error(msg) 1441 raise exception.VolumeBackendAPIException(data=msg) 1442 1443 LOG.debug("Extend volume from %(old_size)s GB to %(new_size)s GB.", 1444 {'old_size': old_size, 'new_size': new_size}) 1445 1446 def manage_existing(self, volume, existing_ref): 1447 """Manages an existing image. 1448 1449 Renames the image name to match the expected name for the volume. 1450 Error checking done by manage_existing_get_size is not repeated. 1451 1452 :param volume: 1453 volume ref info to be set 1454 :param existing_ref: 1455 existing_ref is a dictionary of the form: 1456 {'source-name': <name of rbd image>} 1457 """ 1458 # Raise an exception if we didn't find a suitable rbd image. 1459 with RADOSClient(self) as client: 1460 rbd_name = existing_ref['source-name'] 1461 self.RBDProxy().rename(client.ioctx, 1462 utils.convert_str(rbd_name), 1463 utils.convert_str(volume.name)) 1464 1465 def manage_existing_get_size(self, volume, existing_ref): 1466 """Return size of an existing image for manage_existing. 1467 1468 :param volume: 1469 volume ref info to be set 1470 :param existing_ref: 1471 existing_ref is a dictionary of the form: 1472 {'source-name': <name of rbd image>} 1473 """ 1474 1475 # Check that the reference is valid 1476 if 'source-name' not in existing_ref: 1477 reason = _('Reference must contain source-name element.') 1478 raise exception.ManageExistingInvalidReference( 1479 existing_ref=existing_ref, reason=reason) 1480 1481 rbd_name = utils.convert_str(existing_ref['source-name']) 1482 1483 with RADOSClient(self) as client: 1484 # Raise an exception if we didn't find a suitable rbd image. 1485 try: 1486 rbd_image = self.rbd.Image(client.ioctx, rbd_name) 1487 except self.rbd.ImageNotFound: 1488 kwargs = {'existing_ref': rbd_name, 1489 'reason': 'Specified rbd image does not exist.'} 1490 raise exception.ManageExistingInvalidReference(**kwargs) 1491 1492 image_size = rbd_image.size() 1493 rbd_image.close() 1494 1495 # RBD image size is returned in bytes. Attempt to parse 1496 # size as a float and round up to the next integer. 1497 try: 1498 convert_size = int(math.ceil(float(image_size) / units.Gi)) 1499 return convert_size 1500 except ValueError: 1501 exception_message = (_("Failed to manage existing volume " 1502 "%(name)s, because reported size " 1503 "%(size)s was not a floating-point" 1504 " number.") 1505 % {'name': rbd_name, 1506 'size': image_size}) 1507 raise exception.VolumeBackendAPIException( 1508 data=exception_message) 1509 1510 def _get_image_status(self, image_name): 1511 args = ['rbd', 'status', 1512 '--pool', self.configuration.rbd_pool, 1513 '--format=json', 1514 image_name] 1515 args.extend(self._ceph_args()) 1516 out, _ = self._execute(*args) 1517 return json.loads(out) 1518 1519 def get_manageable_volumes(self, cinder_volumes, marker, limit, offset, 1520 sort_keys, sort_dirs): 1521 manageable_volumes = [] 1522 cinder_ids = [resource['id'] for resource in cinder_volumes] 1523 1524 with RADOSClient(self) as client: 1525 for image_name in self.RBDProxy().list(client.ioctx): 1526 image_id = volume_utils.extract_id_from_volume_name(image_name) 1527 with RBDVolumeProxy(self, image_name, read_only=True) as image: 1528 try: 1529 image_info = { 1530 'reference': {'source-name': image_name}, 1531 'size': int(math.ceil( 1532 float(image.size()) / units.Gi)), 1533 'cinder_id': None, 1534 'extra_info': None 1535 } 1536 if image_id in cinder_ids: 1537 image_info['cinder_id'] = image_id 1538 image_info['safe_to_manage'] = False 1539 image_info['reason_not_safe'] = 'already managed' 1540 elif len(self._get_image_status( 1541 image_name)['watchers']) > 0: 1542 # If the num of watchers of image is >= 1, then the 1543 # image is considered to be used by client(s). 1544 image_info['safe_to_manage'] = False 1545 image_info['reason_not_safe'] = 'volume in use' 1546 else: 1547 image_info['safe_to_manage'] = True 1548 image_info['reason_not_safe'] = None 1549 manageable_volumes.append(image_info) 1550 except self.rbd.ImageNotFound: 1551 LOG.debug("Image %s is not found.", image_name) 1552 1553 return volume_utils.paginate_entries_list( 1554 manageable_volumes, marker, limit, offset, sort_keys, sort_dirs) 1555 1556 def unmanage(self, volume): 1557 pass 1558 1559 def update_migrated_volume(self, ctxt, volume, new_volume, 1560 original_volume_status): 1561 """Return model update from RBD for migrated volume. 1562 1563 This method should rename the back-end volume name(id) on the 1564 destination host back to its original name(id) on the source host. 1565 1566 :param ctxt: The context used to run the method update_migrated_volume 1567 :param volume: The original volume that was migrated to this backend 1568 :param new_volume: The migration volume object that was created on 1569 this backend as part of the migration process 1570 :param original_volume_status: The status of the original volume 1571 :returns: model_update to update DB with any needed changes 1572 """ 1573 name_id = None 1574 provider_location = None 1575 1576 existing_name = CONF.volume_name_template % new_volume.id 1577 wanted_name = CONF.volume_name_template % volume.id 1578 with RADOSClient(self) as client: 1579 try: 1580 self.RBDProxy().rename(client.ioctx, 1581 utils.convert_str(existing_name), 1582 utils.convert_str(wanted_name)) 1583 except self.rbd.ImageNotFound: 1584 LOG.error('Unable to rename the logical volume ' 1585 'for volume %s.', volume.id) 1586 # If the rename fails, _name_id should be set to the new 1587 # volume id and provider_location should be set to the 1588 # one from the new volume as well. 1589 name_id = new_volume._name_id or new_volume.id 1590 provider_location = new_volume['provider_location'] 1591 return {'_name_id': name_id, 'provider_location': provider_location} 1592 1593 def migrate_volume(self, context, volume, host): 1594 1595 refuse_to_migrate = (False, None) 1596 1597 if volume.status not in ('available', 'retyping', 'maintenance'): 1598 LOG.debug('Only available volumes can be migrated using backend ' 1599 'assisted migration. Falling back to generic migration.') 1600 return refuse_to_migrate 1601 1602 if (host['capabilities']['storage_protocol'] != 'ceph'): 1603 LOG.debug('Source and destination drivers need to be RBD ' 1604 'to use backend assisted migration. Falling back to ' 1605 'generic migration.') 1606 return refuse_to_migrate 1607 1608 loc_info = host['capabilities'].get('location_info') 1609 1610 LOG.debug('Attempting RBD assisted volume migration. volume: %(id)s, ' 1611 'host: %(host)s, status=%(status)s.', 1612 {'id': volume.id, 'host': host, 'status': volume.status}) 1613 1614 if not loc_info: 1615 LOG.debug('Could not find location_info in capabilities reported ' 1616 'by the destination driver. Falling back to generic ' 1617 'migration.') 1618 return refuse_to_migrate 1619 1620 try: 1621 (rbd_cluster_name, rbd_ceph_conf, rbd_fsid, rbd_user, rbd_pool) = ( 1622 utils.convert_str(loc_info).split(':')) 1623 except ValueError: 1624 LOG.error('Location info needed for backend enabled volume ' 1625 'migration not in correct format: %s. Falling back to ' 1626 'generic volume migration.', loc_info) 1627 return refuse_to_migrate 1628 1629 with linuxrbd.RBDClient(rbd_user, rbd_pool, conffile=rbd_ceph_conf, 1630 rbd_cluster_name=rbd_cluster_name) as target: 1631 if ((rbd_fsid != self._get_fsid() or 1632 rbd_fsid != target.client.get_fsid())): 1633 LOG.info('Migration between clusters is not supported. ' 1634 'Falling back to generic migration.') 1635 return refuse_to_migrate 1636 1637 with RBDVolumeProxy(self, volume.name, read_only=True) as source: 1638 try: 1639 source.copy(target.ioctx, volume.name) 1640 except Exception: 1641 with excutils.save_and_reraise_exception(): 1642 LOG.error('Error copying rbd image %(vol)s to target ' 1643 'pool %(pool)s.', 1644 {'vol': volume.name, 'pool': rbd_pool}) 1645 self.RBDProxy().remove(target.ioctx, volume.name) 1646 1647 try: 1648 # If the source fails to delete for some reason, we want to leave 1649 # the target volume in place in case deleting it might cause a lose 1650 # of data. 1651 self.delete_volume(volume) 1652 except Exception: 1653 reason = 'Failed to delete migration source volume %s.', volume.id 1654 raise exception.VolumeMigrationFailed(reason=reason) 1655 1656 LOG.info('Successful RBD assisted volume migration.') 1657 1658 return (True, None) 1659 1660 def manage_existing_snapshot_get_size(self, snapshot, existing_ref): 1661 """Return size of an existing image for manage_existing. 1662 1663 :param snapshot: 1664 snapshot ref info to be set 1665 :param existing_ref: 1666 existing_ref is a dictionary of the form: 1667 {'source-name': <name of snapshot>} 1668 """ 1669 # Check that the reference is valid 1670 if not isinstance(existing_ref, dict): 1671 existing_ref = {"source-name": existing_ref} 1672 if 'source-name' not in existing_ref: 1673 reason = _('Reference must contain source-name element.') 1674 raise exception.ManageExistingInvalidReference( 1675 existing_ref=existing_ref, reason=reason) 1676 1677 volume_name = utils.convert_str(snapshot.volume_name) 1678 snapshot_name = utils.convert_str(existing_ref['source-name']) 1679 1680 with RADOSClient(self) as client: 1681 # Raise an exception if we didn't find a suitable rbd image. 1682 try: 1683 rbd_snapshot = self.rbd.Image(client.ioctx, volume_name, 1684 snapshot=snapshot_name) 1685 except self.rbd.ImageNotFound: 1686 kwargs = {'existing_ref': snapshot_name, 1687 'reason': 'Specified snapshot does not exist.'} 1688 raise exception.ManageExistingInvalidReference(**kwargs) 1689 1690 snapshot_size = rbd_snapshot.size() 1691 rbd_snapshot.close() 1692 1693 # RBD image size is returned in bytes. Attempt to parse 1694 # size as a float and round up to the next integer. 1695 try: 1696 convert_size = int(math.ceil(float(snapshot_size) / units.Gi)) 1697 return convert_size 1698 except ValueError: 1699 exception_message = (_("Failed to manage existing snapshot " 1700 "%(name)s, because reported size " 1701 "%(size)s was not a floating-point" 1702 " number.") 1703 % {'name': snapshot_name, 1704 'size': snapshot_size}) 1705 raise exception.VolumeBackendAPIException( 1706 data=exception_message) 1707 1708 def manage_existing_snapshot(self, snapshot, existing_ref): 1709 """Manages an existing snapshot. 1710 1711 Renames the snapshot name to match the expected name for the snapshot. 1712 Error checking done by manage_existing_get_size is not repeated. 1713 1714 :param snapshot: 1715 snapshot ref info to be set 1716 :param existing_ref: 1717 existing_ref is a dictionary of the form: 1718 {'source-name': <name of rbd snapshot>} 1719 """ 1720 if not isinstance(existing_ref, dict): 1721 existing_ref = {"source-name": existing_ref} 1722 volume_name = utils.convert_str(snapshot.volume_name) 1723 with RBDVolumeProxy(self, volume_name) as volume: 1724 snapshot_name = existing_ref['source-name'] 1725 volume.rename_snap(utils.convert_str(snapshot_name), 1726 utils.convert_str(snapshot.name)) 1727 1728 def get_backup_device(self, context, backup): 1729 """Get a backup device from an existing volume. 1730 1731 To support incremental backups on Ceph to Ceph we don't clone 1732 the volume. 1733 """ 1734 1735 if not backup.service.endswith('ceph') or backup.snapshot_id: 1736 return super(RBDDriver, self).get_backup_device(context, backup) 1737 1738 volume = objects.Volume.get_by_id(context, backup.volume_id) 1739 return (volume, False) 1740