1# Copyright (c) 2012 OpenStack Foundation 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15"""Volume-related Utilities and helpers.""" 16 17 18import ast 19import functools 20import json 21import math 22import operator 23from os import urandom 24import re 25import time 26import uuid 27 28from castellan.common.credentials import keystone_password 29from castellan.common import exception as castellan_exception 30from castellan import key_manager as castellan_key_manager 31import eventlet 32from eventlet import tpool 33from keystoneauth1 import loading as ks_loading 34from oslo_concurrency import processutils 35from oslo_config import cfg 36from oslo_log import log as logging 37from oslo_utils import strutils 38from oslo_utils import timeutils 39from oslo_utils import units 40from random import shuffle 41import six 42from six.moves import range 43 44from cinder.brick.local_dev import lvm as brick_lvm 45from cinder import context 46from cinder import db 47from cinder import exception 48from cinder.i18n import _ 49from cinder import objects 50from cinder import rpc 51from cinder import utils 52from cinder.volume import group_types 53from cinder.volume import throttling 54from cinder.volume import volume_types 55 56 57CONF = cfg.CONF 58 59LOG = logging.getLogger(__name__) 60 61 62def null_safe_str(s): 63 return str(s) if s else '' 64 65 66def _usage_from_volume(context, volume_ref, **kw): 67 now = timeutils.utcnow() 68 launched_at = volume_ref['launched_at'] or now 69 created_at = volume_ref['created_at'] or now 70 volume_status = volume_ref['status'] 71 if volume_status == 'error_managing_deleting': 72 volume_status = 'deleting' 73 usage_info = dict( 74 tenant_id=volume_ref['project_id'], 75 host=volume_ref['host'], 76 user_id=volume_ref['user_id'], 77 availability_zone=volume_ref['availability_zone'], 78 volume_id=volume_ref['id'], 79 volume_type=volume_ref['volume_type_id'], 80 display_name=volume_ref['display_name'], 81 launched_at=launched_at.isoformat(), 82 created_at=created_at.isoformat(), 83 status=volume_status, 84 snapshot_id=volume_ref['snapshot_id'], 85 size=volume_ref['size'], 86 replication_status=volume_ref['replication_status'], 87 replication_extended_status=volume_ref['replication_extended_status'], 88 replication_driver_data=volume_ref['replication_driver_data'], 89 metadata=volume_ref.get('volume_metadata'),) 90 91 usage_info.update(kw) 92 try: 93 attachments = db.volume_attachment_get_all_by_volume_id( 94 context, volume_ref['id']) 95 usage_info['volume_attachment'] = attachments 96 97 glance_meta = db.volume_glance_metadata_get(context, volume_ref['id']) 98 if glance_meta: 99 usage_info['glance_metadata'] = glance_meta 100 except exception.GlanceMetadataNotFound: 101 pass 102 except exception.VolumeNotFound: 103 LOG.debug("Can not find volume %s at notify usage", volume_ref['id']) 104 105 return usage_info 106 107 108def _usage_from_backup(backup, **kw): 109 num_dependent_backups = backup.num_dependent_backups 110 usage_info = dict(tenant_id=backup.project_id, 111 user_id=backup.user_id, 112 availability_zone=backup.availability_zone, 113 backup_id=backup.id, 114 host=backup.host, 115 display_name=backup.display_name, 116 created_at=backup.created_at.isoformat(), 117 status=backup.status, 118 volume_id=backup.volume_id, 119 size=backup.size, 120 service_metadata=backup.service_metadata, 121 service=backup.service, 122 fail_reason=backup.fail_reason, 123 parent_id=backup.parent_id, 124 num_dependent_backups=num_dependent_backups, 125 snapshot_id=backup.snapshot_id, 126 ) 127 128 usage_info.update(kw) 129 return usage_info 130 131 132@utils.if_notifications_enabled 133def notify_about_volume_usage(context, volume, event_suffix, 134 extra_usage_info=None, host=None): 135 if not host: 136 host = CONF.host 137 138 if not extra_usage_info: 139 extra_usage_info = {} 140 141 usage_info = _usage_from_volume(context, volume, **extra_usage_info) 142 143 rpc.get_notifier("volume", host).info(context, 'volume.%s' % event_suffix, 144 usage_info) 145 146 147@utils.if_notifications_enabled 148def notify_about_backup_usage(context, backup, event_suffix, 149 extra_usage_info=None, 150 host=None): 151 if not host: 152 host = CONF.host 153 154 if not extra_usage_info: 155 extra_usage_info = {} 156 157 usage_info = _usage_from_backup(backup, **extra_usage_info) 158 159 rpc.get_notifier("backup", host).info(context, 'backup.%s' % event_suffix, 160 usage_info) 161 162 163def _usage_from_snapshot(snapshot, context, **extra_usage_info): 164 # (niedbalski) a snapshot might be related to a deleted 165 # volume, if that's the case, the volume information is still 166 # required for filling the usage_info, so we enforce to read 167 # the volume data even if the volume has been deleted. 168 context.read_deleted = "yes" 169 volume = db.volume_get(context, snapshot.volume_id) 170 usage_info = { 171 'tenant_id': snapshot.project_id, 172 'user_id': snapshot.user_id, 173 'availability_zone': volume['availability_zone'], 174 'volume_id': snapshot.volume_id, 175 'volume_size': snapshot.volume_size, 176 'snapshot_id': snapshot.id, 177 'display_name': snapshot.display_name, 178 'created_at': snapshot.created_at.isoformat(), 179 'status': snapshot.status, 180 'deleted': null_safe_str(snapshot.deleted), 181 'metadata': null_safe_str(snapshot.metadata), 182 } 183 184 usage_info.update(extra_usage_info) 185 return usage_info 186 187 188@utils.if_notifications_enabled 189def notify_about_snapshot_usage(context, snapshot, event_suffix, 190 extra_usage_info=None, host=None): 191 if not host: 192 host = CONF.host 193 194 if not extra_usage_info: 195 extra_usage_info = {} 196 197 usage_info = _usage_from_snapshot(snapshot, context, **extra_usage_info) 198 199 rpc.get_notifier('snapshot', host).info(context, 200 'snapshot.%s' % event_suffix, 201 usage_info) 202 203 204def _usage_from_capacity(capacity, **extra_usage_info): 205 206 capacity_info = { 207 'name_to_id': capacity['name_to_id'], 208 'total': capacity['total'], 209 'free': capacity['free'], 210 'allocated': capacity['allocated'], 211 'provisioned': capacity['provisioned'], 212 'virtual_free': capacity['virtual_free'], 213 'reported_at': capacity['reported_at'] 214 } 215 216 capacity_info.update(extra_usage_info) 217 return capacity_info 218 219 220@utils.if_notifications_enabled 221def notify_about_capacity_usage(context, capacity, suffix, 222 extra_usage_info=None, host=None): 223 if not host: 224 host = CONF.host 225 226 if not extra_usage_info: 227 extra_usage_info = {} 228 229 usage_info = _usage_from_capacity(capacity, **extra_usage_info) 230 231 rpc.get_notifier('capacity', host).info(context, 232 'capacity.%s' % suffix, 233 usage_info) 234 235 236@utils.if_notifications_enabled 237def notify_about_replication_usage(context, volume, suffix, 238 extra_usage_info=None, host=None): 239 if not host: 240 host = CONF.host 241 242 if not extra_usage_info: 243 extra_usage_info = {} 244 245 usage_info = _usage_from_volume(context, volume, 246 **extra_usage_info) 247 248 rpc.get_notifier('replication', host).info(context, 249 'replication.%s' % suffix, 250 usage_info) 251 252 253@utils.if_notifications_enabled 254def notify_about_replication_error(context, volume, suffix, 255 extra_error_info=None, host=None): 256 if not host: 257 host = CONF.host 258 259 if not extra_error_info: 260 extra_error_info = {} 261 262 usage_info = _usage_from_volume(context, volume, 263 **extra_error_info) 264 265 rpc.get_notifier('replication', host).error(context, 266 'replication.%s' % suffix, 267 usage_info) 268 269 270def _usage_from_consistencygroup(group_ref, **kw): 271 usage_info = dict(tenant_id=group_ref.project_id, 272 user_id=group_ref.user_id, 273 availability_zone=group_ref.availability_zone, 274 consistencygroup_id=group_ref.id, 275 name=group_ref.name, 276 created_at=group_ref.created_at.isoformat(), 277 status=group_ref.status) 278 279 usage_info.update(kw) 280 return usage_info 281 282 283@utils.if_notifications_enabled 284def notify_about_consistencygroup_usage(context, group, event_suffix, 285 extra_usage_info=None, host=None): 286 if not host: 287 host = CONF.host 288 289 if not extra_usage_info: 290 extra_usage_info = {} 291 292 usage_info = _usage_from_consistencygroup(group, 293 **extra_usage_info) 294 295 rpc.get_notifier("consistencygroup", host).info( 296 context, 297 'consistencygroup.%s' % event_suffix, 298 usage_info) 299 300 301def _usage_from_group(group_ref, **kw): 302 usage_info = dict(tenant_id=group_ref.project_id, 303 user_id=group_ref.user_id, 304 availability_zone=group_ref.availability_zone, 305 group_id=group_ref.id, 306 group_type=group_ref.group_type_id, 307 name=group_ref.name, 308 created_at=group_ref.created_at.isoformat(), 309 status=group_ref.status) 310 311 usage_info.update(kw) 312 return usage_info 313 314 315@utils.if_notifications_enabled 316def notify_about_group_usage(context, group, event_suffix, 317 extra_usage_info=None, host=None): 318 if not host: 319 host = CONF.host 320 321 if not extra_usage_info: 322 extra_usage_info = {} 323 324 usage_info = _usage_from_group(group, 325 **extra_usage_info) 326 327 rpc.get_notifier("group", host).info( 328 context, 329 'group.%s' % event_suffix, 330 usage_info) 331 332 333def _usage_from_cgsnapshot(cgsnapshot, **kw): 334 usage_info = dict( 335 tenant_id=cgsnapshot.project_id, 336 user_id=cgsnapshot.user_id, 337 cgsnapshot_id=cgsnapshot.id, 338 name=cgsnapshot.name, 339 consistencygroup_id=cgsnapshot.consistencygroup_id, 340 created_at=cgsnapshot.created_at.isoformat(), 341 status=cgsnapshot.status) 342 343 usage_info.update(kw) 344 return usage_info 345 346 347def _usage_from_group_snapshot(group_snapshot, **kw): 348 usage_info = dict( 349 tenant_id=group_snapshot.project_id, 350 user_id=group_snapshot.user_id, 351 group_snapshot_id=group_snapshot.id, 352 name=group_snapshot.name, 353 group_id=group_snapshot.group_id, 354 group_type=group_snapshot.group_type_id, 355 created_at=group_snapshot.created_at.isoformat(), 356 status=group_snapshot.status) 357 358 usage_info.update(kw) 359 return usage_info 360 361 362@utils.if_notifications_enabled 363def notify_about_cgsnapshot_usage(context, cgsnapshot, event_suffix, 364 extra_usage_info=None, host=None): 365 if not host: 366 host = CONF.host 367 368 if not extra_usage_info: 369 extra_usage_info = {} 370 371 usage_info = _usage_from_cgsnapshot(cgsnapshot, 372 **extra_usage_info) 373 374 rpc.get_notifier("cgsnapshot", host).info( 375 context, 376 'cgsnapshot.%s' % event_suffix, 377 usage_info) 378 379 380@utils.if_notifications_enabled 381def notify_about_group_snapshot_usage(context, group_snapshot, event_suffix, 382 extra_usage_info=None, host=None): 383 if not host: 384 host = CONF.host 385 386 if not extra_usage_info: 387 extra_usage_info = {} 388 389 usage_info = _usage_from_group_snapshot(group_snapshot, 390 **extra_usage_info) 391 392 rpc.get_notifier("group_snapshot", host).info( 393 context, 394 'group_snapshot.%s' % event_suffix, 395 usage_info) 396 397 398def _check_blocksize(blocksize): 399 400 # Check if volume_dd_blocksize is valid 401 try: 402 # Rule out zero-sized/negative/float dd blocksize which 403 # cannot be caught by strutils 404 if blocksize.startswith(('-', '0')) or '.' in blocksize: 405 raise ValueError 406 strutils.string_to_bytes('%sB' % blocksize) 407 except ValueError: 408 LOG.warning("Incorrect value error: %(blocksize)s, " 409 "it may indicate that \'volume_dd_blocksize\' " 410 "was configured incorrectly. Fall back to default.", 411 {'blocksize': blocksize}) 412 # Fall back to default blocksize 413 CONF.clear_override('volume_dd_blocksize') 414 blocksize = CONF.volume_dd_blocksize 415 416 return blocksize 417 418 419def check_for_odirect_support(src, dest, flag='oflag=direct'): 420 421 # Check whether O_DIRECT is supported 422 try: 423 # iflag=direct and if=/dev/zero combination does not work 424 # error: dd: failed to open '/dev/zero': Invalid argument 425 if (src == '/dev/zero' and flag == 'iflag=direct'): 426 return False 427 else: 428 utils.execute('dd', 'count=0', 'if=%s' % src, 429 'of=%s' % dest, 430 flag, run_as_root=True) 431 return True 432 except processutils.ProcessExecutionError: 433 return False 434 435 436def _copy_volume_with_path(prefix, srcstr, deststr, size_in_m, blocksize, 437 sync=False, execute=utils.execute, ionice=None, 438 sparse=False): 439 cmd = prefix[:] 440 441 if ionice: 442 cmd.extend(('ionice', ionice)) 443 444 blocksize = _check_blocksize(blocksize) 445 size_in_bytes = size_in_m * units.Mi 446 447 cmd.extend(('dd', 'if=%s' % srcstr, 'of=%s' % deststr, 448 'count=%d' % size_in_bytes, 'bs=%s' % blocksize)) 449 450 # Use O_DIRECT to avoid thrashing the system buffer cache 451 odirect = check_for_odirect_support(srcstr, deststr, 'iflag=direct') 452 453 cmd.append('iflag=count_bytes,direct' if odirect else 'iflag=count_bytes') 454 455 if check_for_odirect_support(srcstr, deststr, 'oflag=direct'): 456 cmd.append('oflag=direct') 457 odirect = True 458 459 # If the volume is being unprovisioned then 460 # request the data is persisted before returning, 461 # so that it's not discarded from the cache. 462 conv = [] 463 if sync and not odirect: 464 conv.append('fdatasync') 465 if sparse: 466 conv.append('sparse') 467 if conv: 468 conv_options = 'conv=' + ",".join(conv) 469 cmd.append(conv_options) 470 471 # Perform the copy 472 start_time = timeutils.utcnow() 473 execute(*cmd, run_as_root=True) 474 duration = timeutils.delta_seconds(start_time, timeutils.utcnow()) 475 476 # NOTE(jdg): use a default of 1, mostly for unit test, but in 477 # some incredible event this is 0 (cirros image?) don't barf 478 if duration < 1: 479 duration = 1 480 mbps = (size_in_m / duration) 481 LOG.debug("Volume copy details: src %(src)s, dest %(dest)s, " 482 "size %(sz).2f MB, duration %(duration).2f sec", 483 {"src": srcstr, 484 "dest": deststr, 485 "sz": size_in_m, 486 "duration": duration}) 487 LOG.info("Volume copy %(size_in_m).2f MB at %(mbps).2f MB/s", 488 {'size_in_m': size_in_m, 'mbps': mbps}) 489 490 491def _open_volume_with_path(path, mode): 492 try: 493 with utils.temporary_chown(path): 494 handle = open(path, mode) 495 return handle 496 except Exception: 497 LOG.error("Failed to open volume from %(path)s.", {'path': path}) 498 499 500def _transfer_data(src, dest, length, chunk_size): 501 """Transfer data between files (Python IO objects).""" 502 503 chunks = int(math.ceil(length / chunk_size)) 504 remaining_length = length 505 506 LOG.debug("%(chunks)s chunks of %(bytes)s bytes to be transferred.", 507 {'chunks': chunks, 'bytes': chunk_size}) 508 509 for chunk in range(0, chunks): 510 before = time.time() 511 data = tpool.execute(src.read, min(chunk_size, remaining_length)) 512 513 # If we have reached end of source, discard any extraneous bytes from 514 # destination volume if trim is enabled and stop writing. 515 if data == b'': 516 break 517 518 tpool.execute(dest.write, data) 519 remaining_length -= len(data) 520 delta = (time.time() - before) 521 rate = (chunk_size / delta) / units.Ki 522 LOG.debug("Transferred chunk %(chunk)s of %(chunks)s (%(rate)dK/s).", 523 {'chunk': chunk + 1, 'chunks': chunks, 'rate': rate}) 524 525 # yield to any other pending operations 526 eventlet.sleep(0) 527 528 tpool.execute(dest.flush) 529 530 531def _copy_volume_with_file(src, dest, size_in_m): 532 src_handle = src 533 if isinstance(src, six.string_types): 534 src_handle = _open_volume_with_path(src, 'rb') 535 536 dest_handle = dest 537 if isinstance(dest, six.string_types): 538 dest_handle = _open_volume_with_path(dest, 'wb') 539 540 if not src_handle: 541 raise exception.DeviceUnavailable( 542 _("Failed to copy volume, source device unavailable.")) 543 544 if not dest_handle: 545 raise exception.DeviceUnavailable( 546 _("Failed to copy volume, destination device unavailable.")) 547 548 start_time = timeutils.utcnow() 549 550 _transfer_data(src_handle, dest_handle, size_in_m * units.Mi, units.Mi * 4) 551 552 duration = max(1, timeutils.delta_seconds(start_time, timeutils.utcnow())) 553 554 if isinstance(src, six.string_types): 555 src_handle.close() 556 if isinstance(dest, six.string_types): 557 dest_handle.close() 558 559 mbps = (size_in_m / duration) 560 LOG.info("Volume copy completed (%(size_in_m).2f MB at " 561 "%(mbps).2f MB/s).", 562 {'size_in_m': size_in_m, 'mbps': mbps}) 563 564 565def copy_volume(src, dest, size_in_m, blocksize, sync=False, 566 execute=utils.execute, ionice=None, throttle=None, 567 sparse=False): 568 """Copy data from the source volume to the destination volume. 569 570 The parameters 'src' and 'dest' are both typically of type str, which 571 represents the path to each volume on the filesystem. Connectors can 572 optionally return a volume handle of type RawIOBase for volumes that are 573 not available on the local filesystem for open/close operations. 574 575 If either 'src' or 'dest' are not of type str, then they are assumed to be 576 of type RawIOBase or any derivative that supports file operations such as 577 read and write. In this case, the handles are treated as file handles 578 instead of file paths and, at present moment, throttling is unavailable. 579 """ 580 581 if (isinstance(src, six.string_types) and 582 isinstance(dest, six.string_types)): 583 if not throttle: 584 throttle = throttling.Throttle.get_default() 585 with throttle.subcommand(src, dest) as throttle_cmd: 586 _copy_volume_with_path(throttle_cmd['prefix'], src, dest, 587 size_in_m, blocksize, sync=sync, 588 execute=execute, ionice=ionice, 589 sparse=sparse) 590 else: 591 _copy_volume_with_file(src, dest, size_in_m) 592 593 594def clear_volume(volume_size, volume_path, volume_clear=None, 595 volume_clear_size=None, volume_clear_ionice=None, 596 throttle=None): 597 """Unprovision old volumes to prevent data leaking between users.""" 598 if volume_clear is None: 599 volume_clear = CONF.volume_clear 600 601 if volume_clear_size is None: 602 volume_clear_size = CONF.volume_clear_size 603 604 if volume_clear_size == 0: 605 volume_clear_size = volume_size 606 607 if volume_clear_ionice is None: 608 volume_clear_ionice = CONF.volume_clear_ionice 609 610 LOG.info("Performing secure delete on volume: %s", volume_path) 611 612 # We pass sparse=False explicitly here so that zero blocks are not 613 # skipped in order to clear the volume. 614 if volume_clear == 'zero': 615 return copy_volume('/dev/zero', volume_path, volume_clear_size, 616 CONF.volume_dd_blocksize, 617 sync=True, execute=utils.execute, 618 ionice=volume_clear_ionice, 619 throttle=throttle, sparse=False) 620 else: 621 raise exception.InvalidConfigurationValue( 622 option='volume_clear', 623 value=volume_clear) 624 625 626def supports_thin_provisioning(): 627 return brick_lvm.LVM.supports_thin_provisioning( 628 utils.get_root_helper()) 629 630 631def get_all_physical_volumes(vg_name=None): 632 return brick_lvm.LVM.get_all_physical_volumes( 633 utils.get_root_helper(), 634 vg_name) 635 636 637def get_all_volume_groups(vg_name=None): 638 return brick_lvm.LVM.get_all_volume_groups( 639 utils.get_root_helper(), 640 vg_name) 641 642# Default symbols to use for passwords. Avoids visually confusing characters. 643# ~6 bits per symbol 644DEFAULT_PASSWORD_SYMBOLS = ('23456789', # Removed: 0,1 645 'ABCDEFGHJKLMNPQRSTUVWXYZ', # Removed: I, O 646 'abcdefghijkmnopqrstuvwxyz') # Removed: l 647 648 649def generate_password(length=16, symbolgroups=DEFAULT_PASSWORD_SYMBOLS): 650 """Generate a random password from the supplied symbol groups. 651 652 At least one symbol from each group will be included. Unpredictable 653 results if length is less than the number of symbol groups. 654 655 Believed to be reasonably secure (with a reasonable password length!) 656 657 """ 658 # NOTE(jerdfelt): Some password policies require at least one character 659 # from each group of symbols, so start off with one random character 660 # from each symbol group 661 662 bytes = 1 # Number of random bytes to generate for each choice 663 664 password = [s[ord(urandom(bytes)) % len(s)] 665 for s in symbolgroups] 666 # If length < len(symbolgroups), the leading characters will only 667 # be from the first length groups. Try our best to not be predictable 668 # by shuffling and then truncating. 669 shuffle(password) 670 password = password[:length] 671 length -= len(password) 672 673 # then fill with random characters from all symbol groups 674 symbols = ''.join(symbolgroups) 675 password.extend( 676 [symbols[ord(urandom(bytes)) % len(symbols)] 677 for _i in range(length)]) 678 679 # finally shuffle to ensure first x characters aren't from a 680 # predictable group 681 shuffle(password) 682 683 return ''.join(password) 684 685 686def generate_username(length=20, symbolgroups=DEFAULT_PASSWORD_SYMBOLS): 687 # Use the same implementation as the password generation. 688 return generate_password(length, symbolgroups) 689 690 691DEFAULT_POOL_NAME = '_pool0' 692 693 694def extract_host(host, level='backend', default_pool_name=False): 695 """Extract Host, Backend or Pool information from host string. 696 697 :param host: String for host, which could include host@backend#pool info 698 :param level: Indicate which level of information should be extracted 699 from host string. Level can be 'host', 'backend' or 'pool', 700 default value is 'backend' 701 :param default_pool_name: this flag specify what to do if level == 'pool' 702 and there is no 'pool' info encoded in host 703 string. default_pool_name=True will return 704 DEFAULT_POOL_NAME, otherwise we return None. 705 Default value of this parameter is False. 706 :return: expected information, string or None 707 :raises: exception.InvalidVolume 708 709 For example: 710 host = 'HostA@BackendB#PoolC' 711 ret = extract_host(host, 'host') 712 # ret is 'HostA' 713 ret = extract_host(host, 'backend') 714 # ret is 'HostA@BackendB' 715 ret = extract_host(host, 'pool') 716 # ret is 'PoolC' 717 718 host = 'HostX@BackendY' 719 ret = extract_host(host, 'pool') 720 # ret is None 721 ret = extract_host(host, 'pool', True) 722 # ret is '_pool0' 723 """ 724 725 if host is None: 726 msg = _("volume is not assigned to a host") 727 raise exception.InvalidVolume(reason=msg) 728 729 if level == 'host': 730 # make sure pool is not included 731 hst = host.split('#')[0] 732 return hst.split('@')[0] 733 elif level == 'backend': 734 return host.split('#')[0] 735 elif level == 'pool': 736 lst = host.split('#') 737 if len(lst) == 2: 738 return lst[1] 739 elif default_pool_name is True: 740 return DEFAULT_POOL_NAME 741 else: 742 return None 743 744 745def append_host(host, pool): 746 """Encode pool into host info.""" 747 if not host or not pool: 748 return host 749 750 new_host = "#".join([host, pool]) 751 return new_host 752 753 754def matching_backend_name(src_volume_type, volume_type): 755 if src_volume_type.get('volume_backend_name') and \ 756 volume_type.get('volume_backend_name'): 757 return src_volume_type.get('volume_backend_name') == \ 758 volume_type.get('volume_backend_name') 759 else: 760 return False 761 762 763def hosts_are_equivalent(host_1, host_2): 764 # In case host_1 or host_2 are None 765 if not (host_1 and host_2): 766 return host_1 == host_2 767 return extract_host(host_1) == extract_host(host_2) 768 769 770def read_proc_mounts(): 771 """Read the /proc/mounts file. 772 773 It's a dummy function but it eases the writing of unit tests as mocking 774 __builtin__open() for a specific file only is not trivial. 775 """ 776 with open('/proc/mounts') as mounts: 777 return mounts.readlines() 778 779 780def extract_id_from_volume_name(vol_name): 781 regex = re.compile( 782 CONF.volume_name_template.replace('%s', '(?P<uuid>.+)')) 783 match = regex.match(vol_name) 784 return match.group('uuid') if match else None 785 786 787def check_already_managed_volume(vol_id): 788 """Check cinder db for already managed volume. 789 790 :param vol_id: volume id parameter 791 :returns: bool -- return True, if db entry with specified 792 volume id exists, otherwise return False 793 """ 794 try: 795 return (vol_id and isinstance(vol_id, six.string_types) and 796 uuid.UUID(vol_id, version=4) and 797 objects.Volume.exists(context.get_admin_context(), vol_id)) 798 except ValueError: 799 return False 800 801 802def extract_id_from_snapshot_name(snap_name): 803 """Return a snapshot's ID from its name on the backend.""" 804 regex = re.compile( 805 CONF.snapshot_name_template.replace('%s', '(?P<uuid>.+)')) 806 match = regex.match(snap_name) 807 return match.group('uuid') if match else None 808 809 810def paginate_entries_list(entries, marker, limit, offset, sort_keys, 811 sort_dirs): 812 """Paginate a list of entries. 813 814 :param entries: list of dictionaries 815 :marker: The last element previously returned 816 :limit: The maximum number of items to return 817 :offset: The number of items to skip from the marker or from the first 818 element. 819 :sort_keys: A list of keys in the dictionaries to sort by 820 :sort_dirs: A list of sort directions, where each is either 'asc' or 'dec' 821 """ 822 comparers = [(operator.itemgetter(key.strip()), multiplier) 823 for (key, multiplier) in zip(sort_keys, sort_dirs)] 824 825 def comparer(left, right): 826 for fn, d in comparers: 827 left_val = fn(left) 828 right_val = fn(right) 829 if isinstance(left_val, dict): 830 left_val = sorted(left_val.values())[0] 831 if isinstance(right_val, dict): 832 right_val = sorted(right_val.values())[0] 833 if left_val == right_val: 834 continue 835 if d == 'asc': 836 return -1 if left_val < right_val else 1 837 else: 838 return -1 if left_val > right_val else 1 839 else: 840 return 0 841 sorted_entries = sorted(entries, key=functools.cmp_to_key(comparer)) 842 843 start_index = 0 844 if offset is None: 845 offset = 0 846 if marker: 847 if not isinstance(marker, dict): 848 try: 849 marker = json.loads(marker) 850 except ValueError: 851 msg = _('marker %s can not be analysed, please use json like ' 852 'format') % marker 853 raise exception.InvalidInput(reason=msg) 854 start_index = -1 855 for i, entry in enumerate(sorted_entries): 856 if entry['reference'] == marker: 857 start_index = i + 1 858 break 859 if start_index < 0: 860 msg = _('marker not found: %s') % marker 861 raise exception.InvalidInput(reason=msg) 862 range_end = start_index + limit 863 return sorted_entries[start_index + offset:range_end + offset] 864 865 866def convert_config_string_to_dict(config_string): 867 """Convert config file replication string to a dict. 868 869 The only supported form is as follows: 870 "{'key-1'='val-1' 'key-2'='val-2'...}" 871 872 :param config_string: Properly formatted string to convert to dict. 873 :response: dict of string values 874 """ 875 876 resultant_dict = {} 877 878 try: 879 st = config_string.replace("=", ":") 880 st = st.replace(" ", ", ") 881 resultant_dict = ast.literal_eval(st) 882 except Exception: 883 LOG.warning("Error encountered translating config_string: " 884 "%(config_string)s to dict", 885 {'config_string': config_string}) 886 887 return resultant_dict 888 889 890def create_encryption_key(context, key_manager, volume_type_id): 891 encryption_key_id = None 892 if volume_types.is_encrypted(context, volume_type_id): 893 volume_type_encryption = ( 894 volume_types.get_volume_type_encryption(context, 895 volume_type_id)) 896 cipher = volume_type_encryption.cipher 897 length = volume_type_encryption.key_size 898 algorithm = cipher.split('-')[0] if cipher else None 899 try: 900 encryption_key_id = key_manager.create_key( 901 context, 902 algorithm=algorithm, 903 length=length) 904 except castellan_exception.KeyManagerError: 905 # The messaging back to the client here is 906 # purposefully terse, so we don't leak any sensitive 907 # details. 908 LOG.exception("Key manager error") 909 raise exception.Invalid(message="Key manager error") 910 911 return encryption_key_id 912 913 914def delete_encryption_key(context, key_manager, encryption_key_id): 915 try: 916 key_manager.delete(context, encryption_key_id) 917 except castellan_exception.ManagedObjectNotFoundError: 918 pass 919 except castellan_exception.KeyManagerError: 920 LOG.info("First attempt to delete key id %s failed, retrying with " 921 "cinder's service context.", encryption_key_id) 922 conf = CONF 923 ks_loading.register_auth_conf_options(conf, 'keystone_authtoken') 924 service_context = keystone_password.KeystonePassword( 925 password=conf.keystone_authtoken.password, 926 auth_url=conf.keystone_authtoken.auth_url, 927 username=conf.keystone_authtoken.username, 928 user_domain_name=conf.keystone_authtoken.user_domain_name, 929 project_name=conf.keystone_authtoken.project_name, 930 project_domain_name=conf.keystone_authtoken.project_domain_name) 931 try: 932 castellan_key_manager.API(conf).delete(service_context, 933 encryption_key_id) 934 except castellan_exception.ManagedObjectNotFoundError: 935 pass 936 937 938def clone_encryption_key(context, key_manager, encryption_key_id): 939 clone_key_id = None 940 if encryption_key_id is not None: 941 clone_key_id = key_manager.store( 942 context, 943 key_manager.get(context, encryption_key_id)) 944 return clone_key_id 945 946 947def is_replicated_str(str): 948 spec = (str or '').split() 949 return (len(spec) == 2 and 950 spec[0] == '<is>' and strutils.bool_from_string(spec[1])) 951 952 953def is_replicated_spec(extra_specs): 954 return (extra_specs and 955 is_replicated_str(extra_specs.get('replication_enabled'))) 956 957 958def group_get_by_id(group_id): 959 ctxt = context.get_admin_context() 960 group = db.group_get(ctxt, group_id) 961 return group 962 963 964def is_group_a_cg_snapshot_type(group_or_snap): 965 LOG.debug("Checking if %s is a consistent snapshot group", 966 group_or_snap) 967 if group_or_snap["group_type_id"] is not None: 968 spec = group_types.get_group_type_specs( 969 group_or_snap["group_type_id"], 970 key="consistent_group_snapshot_enabled" 971 ) 972 return spec == "<is> True" 973 return False 974 975 976def is_group_a_type(group, key): 977 if group.group_type_id is not None: 978 spec = group_types.get_group_type_specs( 979 group.group_type_id, key=key 980 ) 981 return spec == "<is> True" 982 return False 983 984 985def get_max_over_subscription_ratio(str_value, supports_auto=False): 986 """Get the max_over_subscription_ratio from a string 987 988 As some drivers need to do some calculations with the value and we are now 989 receiving a string value in the conf, this converts the value to float 990 when appropriate. 991 992 :param str_value: Configuration object 993 :param supports_auto: Tell if the calling driver supports auto MOSR. 994 :param drv_msg: Error message from the caller 995 :response: value of mosr 996 """ 997 998 if not supports_auto and str_value == "auto": 999 msg = _("This driver does not support automatic " 1000 "max_over_subscription_ratio calculation. Please use a " 1001 "valid float value.") 1002 LOG.error(msg) 1003 raise exception.VolumeDriverException(message=msg) 1004 1005 if str_value == 'auto': 1006 return str_value 1007 1008 mosr = float(str_value) 1009 if mosr < 1: 1010 msg = _("The value of max_over_subscription_ratio must be " 1011 "greater than 1.") 1012 LOG.error(msg) 1013 raise exception.InvalidParameterValue(message=msg) 1014 return mosr 1015