1# Copyright 2017 Inspur Corp. 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15# 16 17import math 18import random 19import re 20import time 21import unicodedata 22 23from eventlet import greenthread 24from oslo_concurrency import processutils 25from oslo_config import cfg 26from oslo_log import log as logging 27from oslo_serialization import jsonutils as json 28from oslo_service import loopingcall 29from oslo_utils import excutils 30from oslo_utils import strutils 31from oslo_utils import units 32import paramiko 33import six 34 35from cinder import context 36from cinder import exception 37from cinder.i18n import _ 38from cinder.objects import fields 39from cinder import ssh_utils 40from cinder import utils as cinder_utils 41from cinder.volume import driver 42from cinder.volume.drivers.inspur.instorage import ( 43 replication as instorage_rep) 44from cinder.volume.drivers.inspur.instorage import instorage_const 45from cinder.volume.drivers.san import san 46from cinder.volume import qos_specs 47from cinder.volume import utils 48from cinder.volume import volume_types 49 50INTERVAL_1_SEC = 1 51DEFAULT_TIMEOUT = 20 52LOG = logging.getLogger(__name__) 53 54instorage_mcs_opts = [ 55 cfg.BoolOpt('instorage_mcs_vol_autoexpand', 56 default=True, 57 help='Storage system autoexpand parameter for volumes ' 58 '(True/False)'), 59 cfg.BoolOpt('instorage_mcs_vol_compression', 60 default=False, 61 help='Storage system compression option for volumes'), 62 cfg.BoolOpt('instorage_mcs_vol_intier', 63 default=True, 64 help='Enable InTier for volumes'), 65 cfg.BoolOpt('instorage_mcs_allow_tenant_qos', 66 default=False, 67 help='Allow tenants to specify QOS on create'), 68 cfg.IntOpt('instorage_mcs_vol_grainsize', 69 default=256, 70 min=32, max=256, 71 help='Storage system grain size parameter for volumes ' 72 '(32/64/128/256)'), 73 cfg.IntOpt('instorage_mcs_vol_rsize', 74 default=2, 75 min=-1, max=100, 76 help='Storage system space-efficiency parameter for volumes ' 77 '(percentage)'), 78 cfg.IntOpt('instorage_mcs_vol_warning', 79 default=0, 80 min=-1, max=100, 81 help='Storage system threshold for volume capacity warnings ' 82 '(percentage)'), 83 cfg.IntOpt('instorage_mcs_localcopy_timeout', 84 default=120, 85 min=1, max=600, 86 help='Maximum number of seconds to wait for LocalCopy to be ' 87 'prepared.'), 88 cfg.IntOpt('instorage_mcs_localcopy_rate', 89 default=50, 90 min=1, max=100, 91 help='Specifies the InStorage LocalCopy copy rate to be used ' 92 'when creating a full volume copy. The default is rate ' 93 'is 50, and the valid rates are 1-100.'), 94 cfg.StrOpt('instorage_mcs_vol_iogrp', 95 default='0', 96 help='The I/O group in which to allocate volumes. It can be a ' 97 'comma-separated list in which case the driver will select an ' 98 'io_group based on least number of volumes associated with the ' 99 'io_group.'), 100 cfg.StrOpt('instorage_san_secondary_ip', 101 default=None, 102 help='Specifies secondary management IP or hostname to be ' 103 'used if san_ip is invalid or becomes inaccessible.'), 104 cfg.ListOpt('instorage_mcs_volpool_name', 105 default=['volpool'], 106 help='Comma separated list of storage system storage ' 107 'pools for volumes.'), 108] 109 110CONF = cfg.CONF 111CONF.register_opts(instorage_mcs_opts) 112 113 114class InStorageMCSCommonDriver(driver.VolumeDriver, san.SanDriver): 115 """Inspur InStorage MCS abstract base class for iSCSI/FC volume drivers. 116 117 Version history: 118 119 .. code-block:: none 120 121 1.0 - Initial driver 122 """ 123 124 VERSION = "1.0.0" 125 VDISKCOPYOPS_INTERVAL = 600 126 DEFAULT_GR_SLEEP = random.randint(20, 500) / 100.0 127 128 def __init__(self, *args, **kwargs): 129 super(InStorageMCSCommonDriver, self).__init__(*args, **kwargs) 130 self.configuration.append_config_values(instorage_mcs_opts) 131 self._backend_name = self.configuration.safe_get('volume_backend_name') 132 self.active_ip = self.configuration.san_ip 133 self.inactive_ip = self.configuration.instorage_san_secondary_ip 134 self._local_backend_assistant = InStorageAssistant(self._run_ssh) 135 self._aux_backend_assistant = None 136 self._assistant = self._local_backend_assistant 137 self._vdiskcopyops = {} 138 self._vdiskcopyops_loop = None 139 self.protocol = None 140 self.replication = None 141 self._state = {'storage_nodes': {}, 142 'enabled_protocols': set(), 143 'compression_enabled': False, 144 'available_iogrps': [], 145 'system_name': None, 146 'system_id': None, 147 'code_level': None, 148 } 149 self._active_backend_id = kwargs.get('active_backend_id') 150 151 # This dictionary is used to map each replication target to certain 152 # replication manager object. 153 self.replica_manager = {} 154 155 # One driver can be configured with only one replication target 156 # to failover. 157 self._replica_target = {} 158 159 # This boolean is used to indicate whether replication is supported 160 # by this storage. 161 self._replica_enabled = False 162 163 # This list is used to save the supported replication modes. 164 self._supported_replica_types = [] 165 166 # This is used to save the available pools in failed-over status 167 self._secondary_pools = None 168 169 @cinder_utils.trace 170 def do_setup(self, ctxt): 171 """Check that we have all configuration details from the storage.""" 172 # InStorage has the limitation that can not burst more than 3 new ssh 173 # connections within 1 second. So slow down the initialization. 174 # however, this maybe removed later. 175 greenthread.sleep(1) 176 177 # Update the instorage state 178 self._update_instorage_state() 179 180 # v2.1 replication setup 181 self._get_instorage_config() 182 183 # Validate that the pool exists 184 self._validate_pools_exist() 185 186 def _update_instorage_state(self): 187 # Get storage system name, id, and code level 188 self._state.update(self._assistant.get_system_info()) 189 190 # Check if compression is supported 191 self._state['compression_enabled'] = (self._assistant. 192 compression_enabled()) 193 194 # Get the available I/O groups 195 self._state['available_iogrps'] = (self._assistant. 196 get_available_io_groups()) 197 198 # Get the iSCSI and FC names of the InStorage/MCS nodes 199 self._state['storage_nodes'] = self._assistant.get_node_info() 200 201 # Add the iSCSI IP addresses and WWPNs to the storage node info 202 self._assistant.add_iscsi_ip_addrs(self._state['storage_nodes']) 203 self._assistant.add_fc_wwpns(self._state['storage_nodes']) 204 205 # For each node, check what connection modes it supports. Delete any 206 # nodes that do not support any types (may be partially configured). 207 to_delete = [] 208 for k, node in self._state['storage_nodes'].items(): 209 if ((len(node['ipv4']) or len(node['ipv6'])) and 210 len(node['iscsi_name'])): 211 node['enabled_protocols'].append('iSCSI') 212 self._state['enabled_protocols'].add('iSCSI') 213 if len(node['WWPN']): 214 node['enabled_protocols'].append('FC') 215 self._state['enabled_protocols'].add('FC') 216 if not len(node['enabled_protocols']): 217 to_delete.append(k) 218 for delkey in to_delete: 219 del self._state['storage_nodes'][delkey] 220 221 def _get_backend_pools(self): 222 if not self._active_backend_id: 223 return self.configuration.instorage_mcs_volpool_name 224 elif not self._secondary_pools: 225 self._secondary_pools = [self._replica_target.get('pool_name')] 226 return self._secondary_pools 227 228 def _validate_pools_exist(self): 229 # Validate that the pool exists 230 pools = self._get_backend_pools() 231 for pool in pools: 232 try: 233 self._assistant.get_pool_attrs(pool) 234 except exception.VolumeBackendAPIException: 235 msg = _('Failed getting details for pool %s.') % pool 236 raise exception.InvalidInput(reason=msg) 237 238 @cinder_utils.trace 239 def check_for_setup_error(self): 240 """Ensure that the flags are set properly.""" 241 242 # Check that we have the system ID information 243 if self._state['system_name'] is None: 244 exception_msg = _('Unable to determine system name.') 245 raise exception.VolumeBackendAPIException(data=exception_msg) 246 if self._state['system_id'] is None: 247 exception_msg = _('Unable to determine system id.') 248 raise exception.VolumeBackendAPIException(data=exception_msg) 249 250 # Make sure we have at least one node configured 251 if not len(self._state['storage_nodes']): 252 msg = _('do_setup: No configured nodes.') 253 LOG.error(msg) 254 raise exception.VolumeDriverException(message=msg) 255 256 if self.protocol not in self._state['enabled_protocols']: 257 raise exception.InvalidInput( 258 reason=_('The storage device does not support %(prot)s. ' 259 'Please configure the device to support %(prot)s or ' 260 'switch to a driver using a different protocol.') 261 % {'prot': self.protocol}) 262 263 required_flags = ['san_ip', 'san_ssh_port', 'san_login', 264 'instorage_mcs_volpool_name'] 265 for flag in required_flags: 266 if not self.configuration.safe_get(flag): 267 raise exception.InvalidInput(reason=_('%s is not set.') % flag) 268 269 # Ensure that either password or keyfile were set 270 if not (self.configuration.san_password or 271 self.configuration.san_private_key): 272 raise exception.InvalidInput( 273 reason=_('Password or SSH private key is required for ' 274 'authentication: set either san_password or ' 275 'san_private_key option.')) 276 277 opts = self._assistant.build_default_opts(self.configuration) 278 self._assistant.check_vdisk_opts(self._state, opts) 279 280 def _run_ssh(self, cmd_list, check_exit_code=True, attempts=1): 281 """SSH tool""" 282 cinder_utils.check_ssh_injection(cmd_list) 283 command = ' '.join(cmd_list) 284 if not self.sshpool: 285 try: 286 self.sshpool = self._set_up_sshpool(self.active_ip) 287 except paramiko.SSHException: 288 LOG.warning('Unable to use san_ip to create SSHPool. Now ' 289 'attempting to use instorage_san_secondary_ip ' 290 'to create SSHPool.') 291 if self._switch_ip(): 292 self.sshpool = self._set_up_sshpool(self.active_ip) 293 else: 294 LOG.error('Unable to create SSHPool using san_ip ' 295 'and not able to use ' 296 'instorage_san_secondary_ip since it is ' 297 'not configured.') 298 raise 299 try: 300 return self._ssh_execute(self.sshpool, command, 301 check_exit_code, attempts) 302 303 except Exception: 304 # Need to check if creating an SSHPool instorage_san_secondary_ip 305 # before raising an error. 306 try: 307 if self._switch_ip(): 308 LOG.warning("Unable to execute SSH command with " 309 "%(inactive)s. Attempting to execute SSH " 310 "command with %(active)s.", 311 {'inactive': self.inactive_ip, 312 'active': self.active_ip}) 313 self.sshpool = self._set_up_sshpool(self.active_ip) 314 return self._ssh_execute(self.sshpool, command, 315 check_exit_code, attempts) 316 else: 317 LOG.warning('Not able to use ' 318 'instorage_san_secondary_ip since it is ' 319 'not configured.') 320 raise 321 except Exception: 322 with excutils.save_and_reraise_exception(): 323 LOG.error("Error running SSH command: %s", 324 command) 325 326 def _set_up_sshpool(self, ip): 327 password = self.configuration.san_password 328 privatekey = self.configuration.san_private_key 329 min_size = self.configuration.ssh_min_pool_conn 330 max_size = self.configuration.ssh_max_pool_conn 331 sshpool = ssh_utils.SSHPool( 332 ip, 333 self.configuration.san_ssh_port, 334 self.configuration.ssh_conn_timeout, 335 self.configuration.san_login, 336 password=password, 337 privatekey=privatekey, 338 min_size=min_size, 339 max_size=max_size) 340 341 return sshpool 342 343 def _ssh_execute(self, sshpool, command, 344 check_exit_code=True, attempts=1): 345 try: 346 with sshpool.item() as ssh: 347 while attempts > 0: 348 attempts -= 1 349 try: 350 return processutils.ssh_execute( 351 ssh, 352 command, 353 check_exit_code=check_exit_code) 354 except Exception as e: 355 LOG.exception('Error has occurred') 356 last_exception = e 357 greenthread.sleep(self.DEFAULT_GR_SLEEP) 358 try: 359 raise processutils.ProcessExecutionError( 360 exit_code=last_exception.exit_code, 361 stdout=last_exception.stdout, 362 stderr=last_exception.stderr, 363 cmd=last_exception.cmd) 364 except AttributeError: 365 raise processutils.ProcessExecutionError( 366 exit_code=-1, 367 stdout="", 368 stderr="Error running SSH command", 369 cmd=command) 370 371 except Exception: 372 with excutils.save_and_reraise_exception(): 373 LOG.error("Error running SSH command: %s", command) 374 375 def _switch_ip(self): 376 # Change active_ip if instorage_san_secondary_ip is set. 377 if self.configuration.instorage_san_secondary_ip is None: 378 return False 379 380 self.inactive_ip, self.active_ip = self.active_ip, self.inactive_ip 381 LOG.info('Switch active_ip from %(old)s to %(new)s.', 382 {'old': self.inactive_ip, 383 'new': self.active_ip}) 384 return True 385 386 def ensure_export(self, ctxt, volume): 387 """Check that the volume exists on the storage.""" 388 vol_name = self._get_target_vol(volume) 389 volume_defined = self._assistant.is_vdisk_defined(vol_name) 390 391 if not volume_defined: 392 LOG.error('ensure_export: Volume %s not found on storage.', 393 volume['name']) 394 395 def create_export(self, ctxt, volume, connector): 396 pass 397 398 def remove_export(self, ctxt, volume): 399 pass 400 401 def _get_vdisk_params(self, type_id, volume_type=None, 402 volume_metadata=None): 403 return self._assistant.get_vdisk_params( 404 self.configuration, 405 self._state, 406 type_id, 407 volume_type=volume_type, 408 volume_metadata=volume_metadata) 409 410 @cinder_utils.trace 411 def create_volume(self, volume): 412 opts = self._get_vdisk_params( 413 volume.volume_type_id, 414 volume_metadata=volume.get('volume_metadata')) 415 pool = utils.extract_host(volume.host, 'pool') 416 417 opts['iogrp'] = self._assistant.select_io_group(self._state, opts) 418 self._assistant.create_vdisk(volume.name, six.text_type(volume.size), 419 'gb', pool, opts) 420 if opts['qos']: 421 self._assistant.add_vdisk_qos(volume.name, opts['qos']) 422 423 model_update = None 424 ctxt = context.get_admin_context() 425 rep_type = self._get_volume_replicated_type(ctxt, volume) 426 427 if rep_type: 428 replica_obj = self._get_replica_obj(rep_type) 429 replica_obj.volume_replication_setup(ctxt, volume) 430 model_update = { 431 'replication_status': fields.ReplicationStatus.ENABLED} 432 433 return model_update 434 435 def create_volume_from_snapshot(self, volume, snapshot): 436 if snapshot.volume_size > volume.size: 437 msg = (_("create_volume_from_snapshot: snapshot %(snapshot_name)s " 438 "size is %(snapshot_size)dGB and doesn't fit in target " 439 "volume %(volume_name)s of size %(volume_size)dGB.") % 440 {'snapshot_name': snapshot.name, 441 'snapshot_size': snapshot.volume_size, 442 'volume_name': volume.name, 443 'volume_size': volume.size}) 444 LOG.error(msg) 445 raise exception.InvalidInput(message=msg) 446 447 opts = self._get_vdisk_params( 448 volume.volume_type_id, 449 volume_metadata=volume.get('volume_metadata')) 450 pool = utils.extract_host(volume.host, 'pool') 451 self._assistant.create_copy(snapshot.name, volume.name, 452 snapshot.id, self.configuration, 453 opts, True, pool=pool) 454 # The volume size is equal to the snapshot size in most 455 # of the cases. But in some scenario, the volume size 456 # may be bigger than the source volume size. 457 # InStorage does not support localcopy between two volumes 458 # with two different size. So InStorage will copy volume 459 # from snapshot first and then extend the volume to 460 # the target size. 461 if volume.size > snapshot.volume_size: 462 # extend the new created target volume to expected size. 463 self._extend_volume_op(volume, volume.size, 464 snapshot.volume_size) 465 if opts['qos']: 466 self._assistant.add_vdisk_qos(volume.name, opts['qos']) 467 468 ctxt = context.get_admin_context() 469 rep_type = self._get_volume_replicated_type(ctxt, volume) 470 471 if rep_type: 472 self._validate_replication_enabled() 473 replica_obj = self._get_replica_obj(rep_type) 474 replica_obj.volume_replication_setup(ctxt, volume) 475 return {'replication_status': fields.ReplicationStatus.ENABLED} 476 477 def create_cloned_volume(self, tgt_volume, src_volume): 478 """Creates a clone of the specified volume.""" 479 480 if src_volume.size > tgt_volume.size: 481 msg = (_("create_cloned_volume: source volume %(src_vol)s " 482 "size is %(src_size)dGB and doesn't fit in target " 483 "volume %(tgt_vol)s of size %(tgt_size)dGB.") % 484 {'src_vol': src_volume.name, 485 'src_size': src_volume.size, 486 'tgt_vol': tgt_volume.name, 487 'tgt_size': tgt_volume.size}) 488 LOG.error(msg) 489 raise exception.InvalidInput(message=msg) 490 491 opts = self._get_vdisk_params( 492 tgt_volume.volume_type_id, 493 volume_metadata=tgt_volume.get('volume_metadata')) 494 pool = utils.extract_host(tgt_volume.host, 'pool') 495 self._assistant.create_copy(src_volume.name, tgt_volume.name, 496 src_volume.id, self.configuration, 497 opts, True, pool=pool) 498 499 # The source volume size is equal to target volume size 500 # in most of the cases. But in some scenarios, the target 501 # volume size may be bigger than the source volume size. 502 # InStorage does not support localcopy between two volumes 503 # with two different sizes. So InStorage will copy volume 504 # from source volume first and then extend target 505 # volume to original size. 506 if tgt_volume.size > src_volume.size: 507 # extend the new created target volume to expected size. 508 self._extend_volume_op(tgt_volume, tgt_volume.size, 509 src_volume.size) 510 511 if opts['qos']: 512 self._assistant.add_vdisk_qos(tgt_volume.name, opts['qos']) 513 514 ctxt = context.get_admin_context() 515 rep_type = self._get_volume_replicated_type(ctxt, tgt_volume) 516 517 if rep_type: 518 self._validate_replication_enabled() 519 replica_obj = self._get_replica_obj(rep_type) 520 replica_obj.volume_replication_setup(ctxt, tgt_volume) 521 return {'replication_status': fields.ReplicationStatus.ENABLED} 522 523 def extend_volume(self, volume, new_size): 524 self._extend_volume_op(volume, new_size) 525 526 @cinder_utils.trace 527 def _extend_volume_op(self, volume, new_size, old_size=None): 528 volume_name = self._get_target_vol(volume) 529 ret = self._assistant.ensure_vdisk_no_lc_mappings(volume_name, 530 allow_snaps=False) 531 if not ret: 532 msg = (_('_extend_volume_op: Extending a volume with snapshots is ' 533 'not supported.')) 534 LOG.error(msg) 535 raise exception.VolumeDriverException(message=msg) 536 537 if old_size is None: 538 old_size = volume.size 539 extend_amt = int(new_size) - old_size 540 541 rel_info = self._assistant.get_relationship_info(volume_name) 542 if rel_info: 543 LOG.warning('_extend_volume_op: Extending a volume with ' 544 'remote copy is not recommended.') 545 try: 546 tgt_vol = instorage_const.REPLICA_AUX_VOL_PREFIX + volume.name 547 rep_type = rel_info['copy_type'] 548 self._local_backend_assistant.delete_relationship( 549 volume.name) 550 self._local_backend_assistant.extend_vdisk(volume.name, 551 extend_amt) 552 self._aux_backend_assistant.extend_vdisk(tgt_vol, extend_amt) 553 tgt_sys = self._aux_backend_assistant.get_system_info() 554 self._local_backend_assistant.create_relationship( 555 volume.name, tgt_vol, tgt_sys.get('system_name'), 556 True if instorage_const.ASYNC == rep_type else False) 557 except Exception as e: 558 msg = (_('Failed to extend a volume with remote copy ' 559 '%(volume)s. Exception: ' 560 '%(err)s.') % {'volume': volume.id, 561 'err': six.text_type(e)}) 562 LOG.error(msg) 563 raise exception.VolumeDriverException(message=msg) 564 else: 565 self._assistant.extend_vdisk(volume_name, extend_amt) 566 567 @cinder_utils.trace 568 def delete_volume(self, volume): 569 ctxt = context.get_admin_context() 570 571 rep_type = self._get_volume_replicated_type(ctxt, volume) 572 if rep_type: 573 self._aux_backend_assistant.delete_rc_volume(volume.name, 574 target_vol=True) 575 if not self._active_backend_id: 576 self._local_backend_assistant.delete_rc_volume(volume.name) 577 else: 578 # If it's in fail over state, also try to delete the volume 579 # in master backend 580 try: 581 self._local_backend_assistant.delete_rc_volume( 582 volume.name) 583 except Exception as ex: 584 LOG.error('Failed to get delete volume %(volume)s in ' 585 'master backend. Exception: %(err)s.', 586 {'volume': volume.name, 'err': ex}) 587 else: 588 if self._active_backend_id: 589 msg = (_('Error: delete non-replicate volume in failover mode' 590 ' is not allowed.')) 591 LOG.error(msg) 592 raise exception.VolumeDriverException(message=msg) 593 else: 594 self._assistant.delete_vdisk(volume.name, False) 595 596 if volume.id in self._vdiskcopyops: 597 del self._vdiskcopyops[volume.id] 598 599 if not self._vdiskcopyops: 600 self._vdiskcopyops_loop.stop() 601 self._vdiskcopyops_loop = None 602 603 def create_snapshot(self, snapshot): 604 source_vol = snapshot.volume 605 pool = utils.extract_host(source_vol.host, 'pool') 606 opts = self._get_vdisk_params(source_vol.volume_type_id) 607 self._assistant.create_copy(snapshot.volume_name, snapshot.name, 608 snapshot.volume_id, self.configuration, 609 opts, False, pool=pool) 610 611 def delete_snapshot(self, snapshot): 612 self._assistant.delete_vdisk(snapshot.name, False) 613 614 def add_vdisk_copy(self, volume, dest_pool, vol_type): 615 return self._assistant.add_vdisk_copy(volume, dest_pool, 616 vol_type, self._state, 617 self.configuration) 618 619 def _add_vdisk_copy_op(self, ctxt, volume, new_op): 620 if volume.id in self._vdiskcopyops: 621 self._vdiskcopyops[volume.id]['copyops'].append(new_op) 622 else: 623 self._vdiskcopyops[volume.id] = {'name': volume.name, 624 'copyops': [new_op]} 625 626 # We added the first copy operation, so start the looping call 627 if len(self._vdiskcopyops) == 1: 628 self._vdiskcopyops_loop = loopingcall.FixedIntervalLoopingCall( 629 self._check_volume_copy_ops) 630 self._vdiskcopyops_loop.start(interval=self.VDISKCOPYOPS_INTERVAL) 631 632 def _rm_vdisk_copy_op(self, ctxt, vol_id, orig_copy_id, new_copy_id): 633 try: 634 self._vdiskcopyops[vol_id]['copyops'].remove((orig_copy_id, 635 new_copy_id)) 636 if not self._vdiskcopyops[vol_id]['copyops']: 637 del self._vdiskcopyops[vol_id] 638 if not self._vdiskcopyops: 639 self._vdiskcopyops_loop.stop() 640 self._vdiskcopyops_loop = None 641 except KeyError: 642 LOG.error('_rm_vdisk_copy_op: Volume %s does not have any ' 643 'registered vdisk copy operations.', vol_id) 644 return 645 except ValueError: 646 LOG.error('_rm_vdisk_copy_op: Volume %(vol)s does not have ' 647 'the specified vdisk copy operation: orig=%(orig)s ' 648 'new=%(new)s.', 649 {'vol': vol_id, 'orig': orig_copy_id, 650 'new': new_copy_id}) 651 return 652 653 def _check_volume_copy_ops(self): 654 LOG.debug("Enter: update volume copy status.") 655 ctxt = context.get_admin_context() 656 copy_items = list(self._vdiskcopyops.items()) 657 for vol_id, copy_ops_data in copy_items: 658 vol_name = copy_ops_data['name'] 659 copy_ops = copy_ops_data['copyops'] 660 661 if not self._assistant.is_vdisk_defined(vol_name): 662 LOG.warning('Volume %s does not exist.', vol_id) 663 del self._vdiskcopyops[vol_id] 664 if not self._vdiskcopyops: 665 self._vdiskcopyops_loop.stop() 666 self._vdiskcopyops_loop = None 667 continue 668 669 for copy_op in copy_ops: 670 try: 671 synced = self._assistant.check_vdisk_copy_synced( 672 vol_name, copy_op[1]) 673 except Exception: 674 LOG.info('_check_volume_copy_ops: Volume %(vol)s does ' 675 'not have the specified vdisk copy ' 676 'operation: orig=%(orig)s new=%(new)s.', 677 {'vol': vol_id, 'orig': copy_op[0], 678 'new': copy_op[1]}) 679 else: 680 if synced: 681 self._assistant.rm_vdisk_copy( 682 vol_name, copy_op[0]) 683 self._rm_vdisk_copy_op(ctxt, vol_id, copy_op[0], 684 copy_op[1]) 685 LOG.debug("Exit: update volume copy status.") 686 687 @cinder_utils.trace 688 def migrate_volume(self, ctxt, volume, host): 689 """Migrate directly if source and dest are managed by same storage. 690 691 We create a new vdisk copy in the desired pool, and add the original 692 vdisk copy to the admin_metadata of the volume to be deleted. The 693 deletion will occur using a periodic task once the new copy is synced. 694 695 :param ctxt: Context 696 :param volume: A dictionary describing the volume to migrate 697 :param host: A dictionary describing the host to migrate to, where 698 host['host'] is its name, and host['capabilities'] is a 699 dictionary of its reported capabilities. 700 """ 701 false_ret = (False, None) 702 dest_pool = self._assistant.can_migrate_to_host(host, self._state) 703 if dest_pool is None: 704 return false_ret 705 706 ctxt = context.get_admin_context() 707 volume_type_id = volume.volume_type_id 708 if volume_type_id is not None: 709 vol_type = volume_types.get_volume_type(ctxt, volume_type_id) 710 else: 711 vol_type = None 712 713 self._check_volume_copy_ops() 714 new_op = self.add_vdisk_copy(volume.name, dest_pool, vol_type) 715 self._add_vdisk_copy_op(ctxt, volume, new_op) 716 return (True, None) 717 718 @cinder_utils.trace 719 def retype(self, ctxt, volume, new_type, diff, host): 720 """Convert the volume to be of the new type. 721 722 Returns a boolean indicating whether the retype occurred. 723 724 :param ctxt: Context 725 :param volume: A volume object describing the volume to migrate 726 :param new_type: A dictionary describing the volume type to convert to 727 :param diff: A dictionary with the difference between the two types 728 :param host: A dictionary describing the host to migrate to, where 729 host['host'] is its name, and host['capabilities'] is a 730 dictionary of its reported capabilities. 731 """ 732 def retype_iogrp_property(volume, new, old): 733 if new != old: 734 self._assistant.change_vdisk_iogrp(volume.name, 735 self._state, (new, old)) 736 737 no_copy_keys = ['warning', 'autoexpand', 'intier'] 738 copy_keys = ['rsize', 'grainsize', 'compression'] 739 all_keys = no_copy_keys + copy_keys 740 old_opts = self._get_vdisk_params( 741 volume.volume_type_id, 742 volume_metadata=volume.get('volume_matadata')) 743 new_opts = self._get_vdisk_params(new_type['id'], 744 volume_type=new_type) 745 746 vdisk_changes = [] 747 need_copy = False 748 for key in all_keys: 749 if old_opts[key] != new_opts[key]: 750 if key in copy_keys: 751 need_copy = True 752 break 753 elif key in no_copy_keys: 754 vdisk_changes.append(key) 755 756 if (utils.extract_host(volume.host, 'pool') != 757 utils.extract_host(host['host'], 'pool')): 758 need_copy = True 759 760 # Check if retype affects volume replication 761 model_update = None 762 new_rep_type = self._get_specs_replicated_type(new_type) 763 old_rep_type = self._get_volume_replicated_type(ctxt, volume) 764 old_io_grp = self._assistant.get_volume_io_group(volume.name) 765 766 # There are three options for rep_type: None, sync, async 767 if new_rep_type != old_rep_type: 768 if (old_io_grp not in 769 InStorageAssistant._get_valid_requested_io_groups( 770 self._state, new_opts)): 771 msg = (_('Unable to retype: it is not allowed to change ' 772 'replication type and io group at the same time.')) 773 LOG.error(msg) 774 raise exception.VolumeDriverException(message=msg) 775 if new_rep_type and old_rep_type: 776 msg = (_('Unable to retype: it is not allowed to change ' 777 '%(old_rep_type)s volume to %(new_rep_type)s ' 778 'volume.') % 779 {'old_rep_type': old_rep_type, 780 'new_rep_type': new_rep_type}) 781 LOG.error(msg) 782 raise exception.VolumeDriverException(message=msg) 783 # If volume is replicated, can't copy 784 if need_copy: 785 msg = (_('Unable to retype: Current action needs volume-copy,' 786 ' it is not allowed when new type is replication.' 787 ' Volume = %s') % volume.id) 788 LOG.error(msg) 789 raise exception.VolumeDriverException(message=msg) 790 791 new_io_grp = self._assistant.select_io_group(self._state, new_opts) 792 793 if need_copy: 794 self._check_volume_copy_ops() 795 dest_pool = self._assistant.can_migrate_to_host(host, self._state) 796 if dest_pool is None: 797 return False 798 799 retype_iogrp_property(volume, 800 new_io_grp, old_io_grp) 801 try: 802 new_op = self.add_vdisk_copy(volume.name, 803 dest_pool, 804 new_type) 805 self._add_vdisk_copy_op(ctxt, volume, new_op) 806 except exception.VolumeDriverException: 807 # roll back changing iogrp property 808 retype_iogrp_property(volume, old_io_grp, new_io_grp) 809 msg = (_('Unable to retype: A copy of volume %s exists. ' 810 'Retyping would exceed the limit of 2 copies.'), 811 volume.id) 812 LOG.error(msg) 813 raise exception.VolumeDriverException(message=msg) 814 else: 815 retype_iogrp_property(volume, new_io_grp, old_io_grp) 816 817 self._assistant.change_vdisk_options(volume.name, vdisk_changes, 818 new_opts, self._state) 819 820 if new_opts['qos']: 821 # Add the new QoS setting to the volume. If the volume has an 822 # old QoS setting, it will be overwritten. 823 self._assistant.update_vdisk_qos(volume.name, new_opts['qos']) 824 elif old_opts['qos']: 825 # If the old_opts contain QoS keys, disable them. 826 self._assistant.disable_vdisk_qos(volume.name, old_opts['qos']) 827 828 # Delete replica if needed 829 if old_rep_type and not new_rep_type: 830 self._aux_backend_assistant.delete_rc_volume(volume.name, 831 target_vol=True) 832 model_update = { 833 'replication_status': fields.ReplicationStatus.DISABLED, 834 'replication_driver_data': None, 835 'replication_extended_status': None} 836 # Add replica if needed 837 if not old_rep_type and new_rep_type: 838 replica_obj = self._get_replica_obj(new_rep_type) 839 replica_obj.volume_replication_setup(ctxt, volume) 840 model_update = { 841 'replication_status': fields.ReplicationStatus.ENABLED} 842 843 return True, model_update 844 845 def update_migrated_volume(self, ctxt, volume, new_volume, 846 original_volume_status): 847 """Return model update from InStorage for migrated volume. 848 849 This method should rename the back-end volume name(id) on the 850 destination host back to its original name(id) on the source host. 851 852 :param ctxt: The context used to run the method update_migrated_volume 853 :param volume: The original volume that was migrated to this backend 854 :param new_volume: The migration volume object that was created on 855 this backend as part of the migration process 856 :param original_volume_status: The status of the original volume 857 :returns: model_update to update DB with any needed changes 858 """ 859 current_name = CONF.volume_name_template % new_volume.id 860 original_volume_name = CONF.volume_name_template % volume.id 861 try: 862 self._assistant.rename_vdisk(current_name, original_volume_name) 863 except exception.VolumeBackendAPIException: 864 LOG.error('Unable to rename the logical volume ' 865 'for volume: %s', volume.id) 866 return {'_name_id': new_volume._name_id or new_volume.id} 867 # If the back-end name(id) for the volume has been renamed, 868 # it is OK for the volume to keep the original name(id) and there is 869 # no need to use the column "_name_id" to establish the mapping 870 # relationship between the volume id and the back-end volume 871 # name(id). 872 # Set the key "_name_id" to None for a successful rename. 873 model_update = {'_name_id': None} 874 return model_update 875 876 def manage_existing(self, volume, ref): 877 """Manages an existing vdisk. 878 879 Renames the vdisk to match the expected name for the volume. 880 Error checking done by manage_existing_get_size is not repeated - 881 if we got here then we have a vdisk that isn't in use (or we don't 882 care if it is in use. 883 """ 884 # Check that the reference is valid 885 vdisk = self._manage_input_check(ref) 886 vdisk_io_grp = self._assistant.get_volume_io_group(vdisk['name']) 887 if vdisk_io_grp not in self._state['available_iogrps']: 888 msg = (_("Failed to manage existing volume due to " 889 "the volume to be managed is not in a valid " 890 "I/O group.")) 891 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 892 893 # Add replication check 894 ctxt = context.get_admin_context() 895 rep_type = self._get_volume_replicated_type(ctxt, volume) 896 vol_rep_type = None 897 rel_info = self._assistant.get_relationship_info(vdisk['name']) 898 if rel_info: 899 vol_rep_type = rel_info['copy_type'] 900 aux_info = self._aux_backend_assistant.get_system_info() 901 if rel_info['aux_cluster_id'] != aux_info['system_id']: 902 msg = (_("Failed to manage existing volume due to the aux " 903 "cluster for volume %(volume)s is %(aux_id)s. The " 904 "configured cluster id is %(cfg_id)s") % 905 {'volume': vdisk['name'], 906 'aux_id': rel_info['aux_cluster_id'], 907 'cfg_id': aux_info['system_id']}) 908 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 909 910 if vol_rep_type != rep_type: 911 msg = (_("Failed to manage existing volume due to " 912 "the replication type of the volume to be managed is " 913 "mismatch with the provided replication type.")) 914 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 915 916 if volume.volume_type_id: 917 opts = self._get_vdisk_params( 918 volume.volume_type_id, 919 volume_metadata=volume.get('volume_metadata')) 920 vdisk_copy = self._assistant.get_vdisk_copy_attrs( 921 vdisk['name'], '0') 922 923 if vdisk_copy['autoexpand'] == 'on' and opts['rsize'] == -1: 924 msg = (_("Failed to manage existing volume due to " 925 "the volume to be managed is thin, but " 926 "the volume type chosen is thick.")) 927 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 928 929 if not vdisk_copy['autoexpand'] and opts['rsize'] != -1: 930 msg = (_("Failed to manage existing volume due to " 931 "the volume to be managed is thick, but " 932 "the volume type chosen is thin.")) 933 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 934 935 if (vdisk_copy['compressed_copy'] == 'no' and 936 opts['compression']): 937 msg = (_("Failed to manage existing volume due to the " 938 "volume to be managed is not compress, but " 939 "the volume type chosen is compress.")) 940 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 941 942 if (vdisk_copy['compressed_copy'] == 'yes' and 943 not opts['compression']): 944 msg = (_("Failed to manage existing volume due to the " 945 "volume to be managed is compress, but " 946 "the volume type chosen is not compress.")) 947 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 948 949 if (vdisk_io_grp not in 950 InStorageAssistant._get_valid_requested_io_groups( 951 self._state, opts)): 952 msg = (_("Failed to manage existing volume due to " 953 "I/O group mismatch. The I/O group of the " 954 "volume to be managed is %(vdisk_iogrp)s. I/O group" 955 "of the chosen type is %(opt_iogrp)s.") % 956 {'vdisk_iogrp': vdisk['IO_group_name'], 957 'opt_iogrp': opts['iogrp']}) 958 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 959 pool = utils.extract_host(volume.host, 'pool') 960 if vdisk['mdisk_grp_name'] != pool: 961 msg = (_("Failed to manage existing volume due to the " 962 "pool of the volume to be managed does not " 963 "match the backend pool. Pool of the " 964 "volume to be managed is %(vdisk_pool)s. Pool " 965 "of the backend is %(backend_pool)s.") % 966 {'vdisk_pool': vdisk['mdisk_grp_name'], 967 'backend_pool': 968 self._get_backend_pools()}) 969 raise exception.ManageExistingVolumeTypeMismatch(reason=msg) 970 971 model_update = {} 972 self._assistant.rename_vdisk(vdisk['name'], volume.name) 973 if vol_rep_type: 974 aux_vol = instorage_const.REPLICA_AUX_VOL_PREFIX + volume.name 975 self._aux_backend_assistant.rename_vdisk( 976 rel_info['aux_vdisk_name'], aux_vol) 977 model_update = { 978 'replication_status': fields.ReplicationStatus.ENABLED} 979 return model_update 980 981 def manage_existing_get_size(self, volume, ref): 982 """Return size of an existing Vdisk for manage_existing. 983 984 existing_ref is a dictionary of the form: 985 {'source-id': <uid of disk>} or 986 {'source-name': <name of the disk>} 987 988 Optional elements are: 989 'manage_if_in_use': True/False (default is False) 990 If set to True, a volume will be managed even if it is currently 991 attached to a host system. 992 """ 993 994 # Check that the reference is valid 995 vdisk = self._manage_input_check(ref) 996 997 # Check if the disk is in use, if we need to. 998 manage_if_in_use = ref.get('manage_if_in_use', False) 999 if (not manage_if_in_use and 1000 self._assistant.is_vdisk_in_use(vdisk['name'])): 1001 reason = _('The specified vdisk is mapped to a host.') 1002 raise exception.ManageExistingInvalidReference(existing_ref=ref, 1003 reason=reason) 1004 1005 return int(math.ceil(float(vdisk['capacity']) / units.Gi)) 1006 1007 def unmanage(self, volume): 1008 """Remove the specified volume from Cinder management.""" 1009 pass 1010 1011 def get_volume_stats(self, refresh=False): 1012 """Get volume stats. 1013 1014 If we haven't gotten stats yet or 'refresh' is True, 1015 run update the stats first. 1016 """ 1017 if not self._stats or refresh: 1018 self._update_volume_stats() 1019 1020 return self._stats 1021 1022 # ## Group method ## # 1023 def create_group(self, context, group): 1024 """Create a group. 1025 1026 Inspur InStorage will create group until group-snapshot creation, 1027 db will maintain the volumes and group relationship. 1028 """ 1029 1030 # now we only support consistent group 1031 if not utils.is_group_a_cg_snapshot_type(group): 1032 raise NotImplementedError() 1033 1034 LOG.debug("Creating group.") 1035 model_update = {'status': fields.GroupStatus.AVAILABLE} 1036 return model_update 1037 1038 def create_group_from_src(self, context, group, volumes, 1039 group_snapshot=None, snapshots=None, 1040 source_group=None, source_vols=None): 1041 """Creates a group from source. 1042 1043 :param context: the context of the caller. 1044 :param group: the dictionary of the group to be created. 1045 :param volumes: a list of volume dictionaries in the group. 1046 :param group_snapshot: the dictionary of the group_snapshot as source. 1047 :param snapshots: a list of snapshot dictionaries 1048 in the group_snapshot. 1049 :param source_group: the dictionary of a group as source. 1050 :param source_vols: a list of volume dictionaries in the source_group. 1051 :returns: model_update, volumes_model_update 1052 """ 1053 1054 # now we only support consistent group 1055 if not utils.is_group_a_cg_snapshot_type(group): 1056 raise NotImplementedError() 1057 1058 LOG.debug('Enter: create_group_from_src.') 1059 if group_snapshot and snapshots: 1060 group_name = 'group-' + group_snapshot.id 1061 sources = snapshots 1062 1063 elif source_group and source_vols: 1064 group_name = 'group-' + source_group.id 1065 sources = source_vols 1066 1067 else: 1068 error_msg = _("create_group_from_src must be creating from" 1069 " a group snapshot, or a source group.") 1070 raise exception.InvalidInput(reason=error_msg) 1071 1072 LOG.debug('create_group_from_src: group_name %(group_name)s' 1073 ' %(sources)s', {'group_name': group_name, 1074 'sources': sources}) 1075 self._assistant.create_lc_consistgrp(group_name) # create group 1076 timeout = self.configuration.instorage_mcs_localcopy_timeout 1077 model_update, snapshots_model = ( 1078 self._assistant.create_group_from_source(group, group_name, 1079 sources, volumes, 1080 self._state, 1081 self.configuration, 1082 timeout)) 1083 LOG.debug("Leave: create_group_from_src.") 1084 return model_update, snapshots_model 1085 1086 def delete_group(self, context, group, volumes): 1087 """Deletes a group. 1088 1089 Inspur InStorage will delete the volumes of the group. 1090 """ 1091 1092 # now we only support consistent group 1093 if not utils.is_group_a_cg_snapshot_type(group): 1094 raise NotImplementedError() 1095 1096 LOG.debug("Deleting group.") 1097 model_update = {'status': fields.ConsistencyGroupStatus.DELETED} 1098 volumes_model_update = [] 1099 1100 for volume in volumes: 1101 try: 1102 self._assistant.delete_vdisk(volume.name, True) 1103 volumes_model_update.append( 1104 {'id': volume.id, 1105 'status': fields.ConsistencyGroupStatus.DELETED}) 1106 except exception.VolumeBackendAPIException as err: 1107 model_update['status'] = ( 1108 fields.ConsistencyGroupStatus.ERROR_DELETING) 1109 LOG.error("Failed to delete the volume %(vol)s of group. " 1110 "Exception: %(exception)s.", 1111 {'vol': volume.name, 'exception': err}) 1112 volumes_model_update.append( 1113 {'id': volume.id, 1114 'status': fields.ConsistencyGroupStatus.ERROR_DELETING}) 1115 1116 return model_update, volumes_model_update 1117 1118 def update_group(self, ctxt, group, add_volumes=None, 1119 remove_volumes=None): 1120 """Adds or removes volume(s) to/from an existing group.""" 1121 1122 if not utils.is_group_a_cg_snapshot_type(group): 1123 raise NotImplementedError() 1124 1125 LOG.debug("Updating group.") 1126 # as we don't keep group info on device, nonthing need to be done 1127 return None, None, None 1128 1129 def create_group_snapshot(self, ctxt, group_snapshot, snapshots): 1130 """Creates a cgsnapshot.""" 1131 1132 # now we only support consistent group 1133 if not utils.is_group_a_cg_snapshot_type(group_snapshot): 1134 raise NotImplementedError() 1135 1136 # Use cgsnapshot id as cg name 1137 group_name = 'group_snap-' + group_snapshot.id 1138 # Create new cg as cg_snapshot 1139 self._assistant.create_lc_consistgrp(group_name) 1140 1141 timeout = self.configuration.instorage_mcs_localcopy_timeout 1142 model_update, snapshots_model = ( 1143 self._assistant.run_group_snapshots(group_name, 1144 snapshots, 1145 self._state, 1146 self.configuration, 1147 timeout)) 1148 1149 return model_update, snapshots_model 1150 1151 def delete_group_snapshot(self, context, group_snapshot, snapshots): 1152 """Deletes a cgsnapshot.""" 1153 1154 # now we only support consistent group 1155 if not utils.is_group_a_cg_snapshot_type(group_snapshot): 1156 raise NotImplementedError() 1157 1158 group_snapshot_id = group_snapshot.id 1159 group_name = 'group_snap-' + group_snapshot_id 1160 model_update, snapshots_model = ( 1161 self._assistant.delete_group_snapshots(group_name, 1162 snapshots)) 1163 1164 return model_update, snapshots_model 1165 1166 def get_pool(self, volume): 1167 attr = self._assistant.get_vdisk_attributes(volume.name) 1168 1169 if attr is None: 1170 msg = (_('get_pool: Failed to get attributes for volume ' 1171 '%s') % volume.id) 1172 LOG.error(msg) 1173 raise exception.VolumeDriverException(message=msg) 1174 1175 return attr['mdisk_grp_name'] 1176 1177 def _update_volume_stats(self): 1178 """Retrieve stats info from volume group.""" 1179 1180 LOG.debug("Updating volume stats.") 1181 data = {} 1182 1183 data['vendor_name'] = 'Inspur' 1184 data['driver_version'] = self.VERSION 1185 data['storage_protocol'] = self.protocol 1186 data['pools'] = [] 1187 1188 backend_name = self.configuration.safe_get('volume_backend_name') 1189 data['volume_backend_name'] = (backend_name or 1190 self._state['system_name']) 1191 1192 data['pools'] = [self._build_pool_stats(pool) 1193 for pool in 1194 self._get_backend_pools()] 1195 if self._replica_enabled: 1196 data['replication'] = self._replica_enabled 1197 data['replication_enabled'] = self._replica_enabled 1198 data['replication_targets'] = self._get_replication_targets() 1199 self._stats = data 1200 1201 def _build_pool_stats(self, pool): 1202 """Build pool status""" 1203 QoS_support = True 1204 pool_stats = {} 1205 try: 1206 pool_data = self._assistant.get_pool_attrs(pool) 1207 if pool_data: 1208 in_tier = pool_data['in_tier'] in ['on', 'auto'] 1209 total_capacity_gb = float(pool_data['capacity']) / units.Gi 1210 free_capacity_gb = float(pool_data['free_capacity']) / units.Gi 1211 allocated_capacity_gb = (float(pool_data['used_capacity']) / 1212 units.Gi) 1213 provisioned_capacity_gb = float( 1214 pool_data['virtual_capacity']) / units.Gi 1215 1216 rsize = self.configuration.safe_get( 1217 'instorage_mcs_vol_rsize') 1218 # rsize of -1 or 100 means fully allocate the mdisk 1219 use_thick_provisioning = rsize == -1 or rsize == 100 1220 over_sub_ratio = self.configuration.safe_get( 1221 'max_over_subscription_ratio') 1222 location_info = ('InStorageMCSDriver:%(sys_id)s:%(pool)s' % 1223 {'sys_id': self._state['system_id'], 1224 'pool': pool_data['name']}) 1225 pool_stats = { 1226 'pool_name': pool_data['name'], 1227 'total_capacity_gb': total_capacity_gb, 1228 'free_capacity_gb': free_capacity_gb, 1229 'allocated_capacity_gb': allocated_capacity_gb, 1230 'provisioned_capacity_gb': provisioned_capacity_gb, 1231 'compression_support': self._state['compression_enabled'], 1232 'reserved_percentage': 1233 self.configuration.reserved_percentage, 1234 'QoS_support': QoS_support, 1235 'consistent_group_snapshot_enabled': True, 1236 'location_info': location_info, 1237 'intier_support': in_tier, 1238 'multiattach': False, 1239 'thin_provisioning_support': not use_thick_provisioning, 1240 'thick_provisioning_support': use_thick_provisioning, 1241 'max_over_subscription_ratio': over_sub_ratio, 1242 } 1243 if self._replica_enabled: 1244 pool_stats.update({ 1245 'replication_enabled': self._replica_enabled, 1246 'replication_type': self._supported_replica_types, 1247 'replication_targets': self._get_replication_targets(), 1248 'replication_count': len(self._get_replication_targets()) 1249 }) 1250 1251 except exception.VolumeBackendAPIException: 1252 msg = _('Failed getting details for pool %s.') % pool 1253 raise exception.VolumeBackendAPIException(data=msg) 1254 1255 return pool_stats 1256 1257 def _get_replication_targets(self): 1258 return [self._replica_target['backend_id']] 1259 1260 def _manage_input_check(self, ref): 1261 """Verify the input of manage function.""" 1262 # Check that the reference is valid 1263 if 'source-name' in ref: 1264 manage_source = ref['source-name'] 1265 vdisk = self._assistant.get_vdisk_attributes(manage_source) 1266 elif 'source-id' in ref: 1267 manage_source = ref['source-id'] 1268 vdisk = self._assistant.vdisk_by_uid(manage_source) 1269 else: 1270 reason = _('Reference must contain source-id or ' 1271 'source-name element.') 1272 raise exception.ManageExistingInvalidReference(existing_ref=ref, 1273 reason=reason) 1274 1275 if vdisk is None: 1276 reason = (_('No vdisk with the UID specified by ref %s.') 1277 % manage_source) 1278 raise exception.ManageExistingInvalidReference(existing_ref=ref, 1279 reason=reason) 1280 return vdisk 1281 1282 # #### V2.1 replication methods #### # 1283 @cinder_utils.trace 1284 def failover_host(self, context, volumes, secondary_id=None): 1285 if not self._replica_enabled: 1286 msg = _("Replication is not properly enabled on backend.") 1287 LOG.error(msg) 1288 raise exception.UnableToFailOver(reason=msg) 1289 1290 if instorage_const.FAILBACK_VALUE == secondary_id: 1291 # In this case the administrator would like to fail back. 1292 secondary_id, volumes_update = self._replication_failback(context, 1293 volumes) 1294 elif (secondary_id == self._replica_target['backend_id'] or 1295 secondary_id is None): 1296 # In this case the administrator would like to fail over. 1297 secondary_id, volumes_update = self._replication_failover(context, 1298 volumes) 1299 else: 1300 msg = (_("Invalid secondary id %s.") % secondary_id) 1301 LOG.error(msg) 1302 raise exception.InvalidReplicationTarget(reason=msg) 1303 1304 return secondary_id, volumes_update 1305 1306 def _replication_failback(self, ctxt, volumes): 1307 """Fail back all the volume on the secondary backend.""" 1308 volumes_update = [] 1309 if not self._active_backend_id: 1310 LOG.info("Host has been failed back. doesn't need " 1311 "to fail back again") 1312 return None, volumes_update 1313 1314 try: 1315 self._local_backend_assistant.get_system_info() 1316 except Exception: 1317 msg = (_("Unable to failback due to primary is not reachable.")) 1318 LOG.error(msg) 1319 raise exception.UnableToFailOver(reason=msg) 1320 1321 normal_volumes, rep_volumes = self._classify_volume(ctxt, volumes) 1322 1323 # start synchronize from aux volume to master volume 1324 self._sync_with_aux(ctxt, rep_volumes) 1325 self._wait_replica_ready(ctxt, rep_volumes) 1326 1327 rep_volumes_update = self._failback_replica_volumes(ctxt, 1328 rep_volumes) 1329 volumes_update.extend(rep_volumes_update) 1330 1331 normal_volumes_update = self._failback_normal_volumes(normal_volumes) 1332 volumes_update.extend(normal_volumes_update) 1333 1334 self._assistant = self._local_backend_assistant 1335 self._active_backend_id = None 1336 1337 # Update the instorage state 1338 self._update_instorage_state() 1339 self._update_volume_stats() 1340 return instorage_const.FAILBACK_VALUE, volumes_update 1341 1342 @cinder_utils.trace 1343 def _failback_replica_volumes(self, ctxt, rep_volumes): 1344 volumes_update = [] 1345 1346 for volume in rep_volumes: 1347 rep_type = self._get_volume_replicated_type(ctxt, volume) 1348 replica_obj = self._get_replica_obj(rep_type) 1349 tgt_volume = instorage_const.REPLICA_AUX_VOL_PREFIX + volume.name 1350 rep_info = self._assistant.get_relationship_info(tgt_volume) 1351 if not rep_info: 1352 replication_status = fields.ReplicationStatus.FAILOVER_ERROR 1353 volumes_update.append( 1354 {'volume_id': volume.id, 1355 'updates': { 1356 'replication_status': replication_status, 1357 'status': 'error'}}) 1358 LOG.error('_failback_replica_volumes:no rc-releationship ' 1359 'is established between master: %(master)s and ' 1360 'aux %(aux)s. Please re-establish the ' 1361 'relationship and synchronize the volumes on ' 1362 'backend storage.', 1363 {'master': volume.name, 'aux': tgt_volume}) 1364 continue 1365 LOG.debug('_failover_replica_volumes: vol=%(vol)s, master_vol=' 1366 '%(master_vol)s, aux_vol=%(aux_vol)s, state=%(state)s' 1367 'primary=%(primary)s', 1368 {'vol': volume.name, 1369 'master_vol': rep_info['master_vdisk_name'], 1370 'aux_vol': rep_info['aux_vdisk_name'], 1371 'state': rep_info['state'], 1372 'primary': rep_info['primary']}) 1373 try: 1374 model_updates = replica_obj.replication_failback(volume) 1375 volumes_update.append( 1376 {'volume_id': volume.id, 1377 'updates': model_updates}) 1378 except exception.VolumeDriverException: 1379 LOG.error('Unable to fail back volume %(volume_id)s', 1380 {'volume_id': volume.id}) 1381 replication_status = fields.ReplicationStatus.FAILOVER_ERROR 1382 volumes_update.append( 1383 {'volume_id': volume.id, 1384 'updates': {'replication_status': replication_status, 1385 'status': 'error'}}) 1386 return volumes_update 1387 1388 def _failback_normal_volumes(self, normal_volumes): 1389 volumes_update = [] 1390 for vol in normal_volumes: 1391 pre_status = 'available' 1392 if ('replication_driver_data' in vol and 1393 vol.replication_driver_data): 1394 rep_data = json.loads(vol.replication_driver_data) 1395 pre_status = rep_data['previous_status'] 1396 volumes_update.append( 1397 {'volume_id': vol.id, 1398 'updates': {'status': pre_status, 1399 'replication_driver_data': ''}}) 1400 return volumes_update 1401 1402 @cinder_utils.trace 1403 def _sync_with_aux(self, ctxt, volumes): 1404 try: 1405 rep_mgr = self._get_replica_mgr() 1406 rep_mgr.establish_target_partnership() 1407 except Exception as ex: 1408 LOG.warning('Fail to establish partnership in backend. ' 1409 'error=%(ex)s', {'error': ex}) 1410 for volume in volumes: 1411 tgt_volume = instorage_const.REPLICA_AUX_VOL_PREFIX + volume.name 1412 rep_info = self._assistant.get_relationship_info(tgt_volume) 1413 if not rep_info: 1414 LOG.error('_sync_with_aux: no rc-releationship is ' 1415 'established between master: %(master)s and aux ' 1416 '%(aux)s. Please re-establish the relationship ' 1417 'and synchronize the volumes on backend ' 1418 'storage.', {'master': volume.name, 1419 'aux': tgt_volume}) 1420 continue 1421 LOG.debug('_sync_with_aux: volume: %(volume)s rep_info:master_vol=' 1422 '%(master_vol)s, aux_vol=%(aux_vol)s, state=%(state)s, ' 1423 'primary=%(primary)s', 1424 {'volume': volume.name, 1425 'master_vol': rep_info['master_vdisk_name'], 1426 'aux_vol': rep_info['aux_vdisk_name'], 1427 'state': rep_info['state'], 1428 'primary': rep_info['primary']}) 1429 try: 1430 if rep_info['state'] != instorage_const.REP_CONSIS_SYNC: 1431 if rep_info['primary'] == 'master': 1432 self._assistant.start_relationship(tgt_volume) 1433 else: 1434 self._assistant.start_relationship(tgt_volume, 1435 primary='aux') 1436 except Exception as ex: 1437 LOG.warning('Fail to copy data from aux to master. master:' 1438 ' %(master)s and aux %(aux)s. Please ' 1439 're-establish the relationship and synchronize' 1440 ' the volumes on backend storage. error=' 1441 '%(ex)s', {'master': volume.name, 1442 'aux': tgt_volume, 1443 'error': ex}) 1444 1445 def _wait_replica_ready(self, ctxt, volumes): 1446 for volume in volumes: 1447 tgt_volume = instorage_const.REPLICA_AUX_VOL_PREFIX + volume.name 1448 try: 1449 self._wait_replica_vol_ready(ctxt, tgt_volume) 1450 except Exception as ex: 1451 LOG.error('_wait_replica_ready: wait for volume:%(volume)s' 1452 ' remote copy synchronization failed due to ' 1453 'error:%(err)s.', {'volume': tgt_volume, 1454 'err': ex}) 1455 1456 @cinder_utils.trace 1457 def _wait_replica_vol_ready(self, ctxt, volume): 1458 def _replica_vol_ready(): 1459 rep_info = self._assistant.get_relationship_info(volume) 1460 if not rep_info: 1461 msg = (_('_wait_replica_vol_ready: no rc-releationship' 1462 'is established for volume:%(volume)s. Please ' 1463 're-establish the rc-relationship and ' 1464 'synchronize the volumes on backend storage.'), 1465 {'volume': volume}) 1466 LOG.error(msg) 1467 raise exception.VolumeBackendAPIException(data=msg) 1468 LOG.debug('_replica_vol_ready:volume: %(volume)s rep_info: ' 1469 'master_vol=%(master_vol)s, aux_vol=%(aux_vol)s, ' 1470 'state=%(state)s, primary=%(primary)s', 1471 {'volume': volume, 1472 'master_vol': rep_info['master_vdisk_name'], 1473 'aux_vol': rep_info['aux_vdisk_name'], 1474 'state': rep_info['state'], 1475 'primary': rep_info['primary']}) 1476 if rep_info['state'] == instorage_const.REP_CONSIS_SYNC: 1477 return True 1478 if rep_info['state'] == instorage_const.REP_IDL_DISC: 1479 msg = (_('Wait synchronize failed. volume: %(volume)s'), 1480 {'volume': volume}) 1481 LOG.error(msg) 1482 raise exception.VolumeBackendAPIException(data=msg) 1483 return False 1484 1485 self._assistant._wait_for_a_condition( 1486 _replica_vol_ready, timeout=instorage_const.DEFAULT_RC_TIMEOUT, 1487 interval=instorage_const.DEFAULT_RC_INTERVAL, 1488 raise_exception=True) 1489 1490 def _replication_failover(self, ctxt, volumes): 1491 volumes_update = [] 1492 if self._active_backend_id: 1493 LOG.info("Host has been failed over to %s", 1494 self._active_backend_id) 1495 return self._active_backend_id, volumes_update 1496 1497 try: 1498 self._aux_backend_assistant.get_system_info() 1499 except Exception as ex: 1500 msg = (_("Unable to failover due to replication target is not " 1501 "reachable. error=%(ex)s"), {'error': ex}) 1502 LOG.error(msg) 1503 raise exception.UnableToFailOver(reason=msg) 1504 1505 normal_volumes, rep_volumes = self._classify_volume(ctxt, volumes) 1506 1507 rep_volumes_update = self._failover_replica_volumes(ctxt, rep_volumes) 1508 volumes_update.extend(rep_volumes_update) 1509 1510 normal_volumes_update = self._failover_normal_volumes(normal_volumes) 1511 volumes_update.extend(normal_volumes_update) 1512 1513 self._assistant = self._aux_backend_assistant 1514 self._active_backend_id = self._replica_target['backend_id'] 1515 self._secondary_pools = [self._replica_target['pool_name']] 1516 1517 # Update the instorage state 1518 self._update_instorage_state() 1519 self._update_volume_stats() 1520 return self._active_backend_id, volumes_update 1521 1522 @cinder_utils.trace 1523 def _failover_replica_volumes(self, ctxt, rep_volumes): 1524 volumes_update = [] 1525 1526 for volume in rep_volumes: 1527 rep_type = self._get_volume_replicated_type(ctxt, volume) 1528 replica_obj = self._get_replica_obj(rep_type) 1529 # Try do the fail-over. 1530 try: 1531 rep_info = self._aux_backend_assistant.get_relationship_info( 1532 instorage_const.REPLICA_AUX_VOL_PREFIX + volume.name) 1533 if not rep_info: 1534 rep_status = fields.ReplicationStatus.FAILOVER_ERROR 1535 volumes_update.append( 1536 {'volume_id': volume.id, 1537 'updates': {'replication_status': rep_status, 1538 'status': 'error'}}) 1539 LOG.error('_failover_replica_volumes: no rc-' 1540 'releationship is established for master:' 1541 '%(master)s. Please re-establish the rc-' 1542 'relationship and synchronize the volumes on' 1543 ' backend storage.', 1544 {'master': volume.name}) 1545 continue 1546 LOG.debug('_failover_replica_volumes: vol=%(vol)s, ' 1547 'master_vol=%(master_vol)s, aux_vol=%(aux_vol)s, ' 1548 'state=%(state)s, primary=%(primary)s', 1549 {'vol': volume.name, 1550 'master_vol': rep_info['master_vdisk_name'], 1551 'aux_vol': rep_info['aux_vdisk_name'], 1552 'state': rep_info['state'], 1553 'primary': rep_info['primary']}) 1554 model_updates = replica_obj.failover_volume_host(ctxt, volume) 1555 volumes_update.append( 1556 {'volume_id': volume.id, 1557 'updates': model_updates}) 1558 except exception.VolumeDriverException: 1559 LOG.error('Unable to failover to aux volume. Please make ' 1560 'sure that the aux volume is ready.') 1561 volumes_update.append( 1562 {'volume_id': volume.id, 1563 'updates': {'status': 'error', 1564 'replication_status': 1565 fields.ReplicationStatus.FAILOVER_ERROR}}) 1566 return volumes_update 1567 1568 def _failover_normal_volumes(self, normal_volumes): 1569 volumes_update = [] 1570 for volume in normal_volumes: 1571 # If the volume is not of replicated type, we need to 1572 # force the status into error state so a user knows they 1573 # do not have access to the volume. 1574 rep_data = json.dumps({'previous_status': volume.status}) 1575 volumes_update.append( 1576 {'volume_id': volume.id, 1577 'updates': {'status': 'error', 1578 'replication_driver_data': rep_data}}) 1579 return volumes_update 1580 1581 def _classify_volume(self, ctxt, volumes): 1582 normal_volumes = [] 1583 replica_volumes = [] 1584 1585 for v in volumes: 1586 volume_type = self._get_volume_replicated_type(ctxt, v) 1587 if volume_type and v.status == 'available': 1588 replica_volumes.append(v) 1589 else: 1590 normal_volumes.append(v) 1591 1592 return normal_volumes, replica_volumes 1593 1594 def _get_replica_obj(self, rep_type): 1595 replica_manager = self.replica_manager[ 1596 self._replica_target['backend_id']] 1597 return replica_manager.get_replica_obj(rep_type) 1598 1599 def _get_replica_mgr(self): 1600 replica_manager = self.replica_manager[ 1601 self._replica_target['backend_id']] 1602 return replica_manager 1603 1604 def _get_target_vol(self, volume): 1605 tgt_vol = volume.name 1606 if self._active_backend_id: 1607 ctxt = context.get_admin_context() 1608 rep_type = self._get_volume_replicated_type(ctxt, volume) 1609 if rep_type: 1610 tgt_vol = instorage_const.REPLICA_AUX_VOL_PREFIX + volume.name 1611 return tgt_vol 1612 1613 def _validate_replication_enabled(self): 1614 if not self._replica_enabled: 1615 msg = _("Replication is not properly configured on backend.") 1616 LOG.error(msg) 1617 raise exception.VolumeBackendAPIException(data=msg) 1618 1619 def _get_specs_replicated_type(self, volume_type): 1620 replication_type = None 1621 extra_specs = volume_type.get("extra_specs", {}) 1622 rep_val = extra_specs.get('replication_enabled') 1623 if rep_val == "<is> True": 1624 replication_type = extra_specs.get('replication_type', 1625 instorage_const.ASYNC) 1626 # The format for replication_type in extra spec is in 1627 # "<in> async". Otherwise, the code will 1628 # not reach here. 1629 if replication_type != instorage_const.ASYNC: 1630 # Pick up the replication type specified in the 1631 # extra spec from the format like "<in> async". 1632 replication_type = replication_type.split()[1] 1633 if replication_type not in instorage_const.VALID_REP_TYPES: 1634 msg = (_("Invalid replication type %s.") % replication_type) 1635 LOG.error(msg) 1636 raise exception.InvalidInput(reason=msg) 1637 return replication_type 1638 1639 def _get_volume_replicated_type(self, ctxt, volume): 1640 replication_type = None 1641 if volume.get("volume_type_id"): 1642 volume_type = volume_types.get_volume_type( 1643 ctxt, volume.volume_type_id) 1644 replication_type = self._get_specs_replicated_type(volume_type) 1645 1646 return replication_type 1647 1648 def _get_instorage_config(self): 1649 self._do_replication_setup() 1650 1651 if self._active_backend_id and self._replica_target: 1652 self._assistant = self._aux_backend_assistant 1653 1654 self._replica_enabled = (True if (self._assistant. 1655 replication_licensed() and 1656 self._replica_target) else False) 1657 if self._replica_enabled: 1658 self._supported_replica_types = instorage_const.VALID_REP_TYPES 1659 1660 def _do_replication_setup(self): 1661 rep_devs = self.configuration.safe_get('replication_device') 1662 if not rep_devs: 1663 return 1664 1665 if len(rep_devs) > 1: 1666 raise exception.InvalidInput( 1667 reason=_('Multiple replication devices are configured. ' 1668 'Now only one replication_device is supported.')) 1669 1670 required_flags = ['san_ip', 'backend_id', 'san_login', 1671 'san_password', 'pool_name'] 1672 for flag in required_flags: 1673 if flag not in rep_devs[0]: 1674 raise exception.InvalidInput( 1675 reason=_('%s is not set.') % flag) 1676 1677 rep_target = {} 1678 rep_target['san_ip'] = rep_devs[0].get('san_ip') 1679 rep_target['backend_id'] = rep_devs[0].get('backend_id') 1680 rep_target['san_login'] = rep_devs[0].get('san_login') 1681 rep_target['san_password'] = rep_devs[0].get('san_password') 1682 rep_target['pool_name'] = rep_devs[0].get('pool_name') 1683 1684 # Each replication target will have a corresponding replication. 1685 self._replication_initialize(rep_target) 1686 1687 def _replication_initialize(self, target): 1688 rep_manager = instorage_rep.InStorageMCSReplicationManager( 1689 self, target, InStorageAssistant) 1690 1691 if self._active_backend_id: 1692 if self._active_backend_id != target['backend_id']: 1693 msg = (_("Invalid secondary id %s.") % self._active_backend_id) 1694 LOG.error(msg) 1695 raise exception.InvalidInput(reason=msg) 1696 # Setup partnership only in non-failover state 1697 else: 1698 try: 1699 rep_manager.establish_target_partnership() 1700 except exception.VolumeDriverException: 1701 LOG.error('The replication src %(src)s has not ' 1702 'successfully established partnership with the ' 1703 'replica target %(tgt)s.', 1704 {'src': self.configuration.san_ip, 1705 'tgt': target['backend_id']}) 1706 1707 self._aux_backend_assistant = rep_manager.get_target_assistant() 1708 self.replica_manager[target['backend_id']] = rep_manager 1709 self._replica_target = target 1710 1711 1712class InStorageAssistant(object): 1713 1714 # All the supported QoS key are saved in this dict. When a new 1715 # key is going to add, three values MUST be set: 1716 # 'default': to indicate the value, when the parameter is disabled. 1717 # 'param': to indicate the corresponding parameter in the command. 1718 # 'type': to indicate the type of this value. 1719 WAIT_TIME = 5 1720 mcs_qos_keys = {'IOThrottling': {'default': '0', 1721 'param': 'rate', 1722 'type': int}} 1723 1724 def __init__(self, run_ssh): 1725 self.ssh = InStorageSSH(run_ssh) 1726 self.check_lcmapping_interval = 3 1727 1728 @staticmethod 1729 def handle_keyerror(cmd, out): 1730 msg = (_('Could not find key in output of command %(cmd)s: %(out)s.') 1731 % {'out': out, 'cmd': cmd}) 1732 raise exception.VolumeBackendAPIException(data=msg) 1733 1734 def compression_enabled(self): 1735 """Return whether or not compression is enabled for this system.""" 1736 resp = self.ssh.lslicense() 1737 keys = ['license_compression_enclosures', 1738 'license_compression_capacity'] 1739 for key in keys: 1740 if resp.get(key, '0') != '0': 1741 return True 1742 try: 1743 resp = self.ssh.lsguicapabilities() 1744 if resp.get('compression', '0') == 'yes': 1745 return True 1746 except exception.VolumeBackendAPIException: 1747 LOG.exception("Failed to fetch licensing scheme.") 1748 return False 1749 1750 def replication_licensed(self): 1751 """Return whether or not replication is enabled for this system.""" 1752 return True 1753 1754 def get_system_info(self): 1755 """Return system's name, ID, and code level.""" 1756 resp = self.ssh.lssystem() 1757 level = resp['code_level'] 1758 match_obj = re.search('([0-9].){3}[0-9]', level) 1759 if match_obj is None: 1760 msg = _('Failed to get code level (%s).') % level 1761 raise exception.VolumeBackendAPIException(data=msg) 1762 code_level = match_obj.group().split('.') 1763 return {'code_level': tuple([int(x) for x in code_level]), 1764 'system_name': resp['name'], 1765 'system_id': resp['id']} 1766 1767 def get_node_info(self): 1768 """Return dictionary containing information on system's nodes.""" 1769 nodes = {} 1770 resp = self.ssh.lsnode() 1771 for node_data in resp: 1772 try: 1773 if node_data['status'] != 'online': 1774 continue 1775 node = {} 1776 node['id'] = node_data['id'] 1777 node['name'] = node_data['name'] 1778 node['IO_group'] = node_data['IO_group_id'] 1779 node['iscsi_name'] = node_data['iscsi_name'] 1780 node['WWNN'] = node_data['WWNN'] 1781 node['status'] = node_data['status'] 1782 node['WWPN'] = [] 1783 node['ipv4'] = [] 1784 node['ipv6'] = [] 1785 node['enabled_protocols'] = [] 1786 nodes[node['id']] = node 1787 except KeyError: 1788 self.handle_keyerror('lsnode', node_data) 1789 return nodes 1790 1791 def get_pool_attrs(self, pool): 1792 """Return attributes for the specified pool.""" 1793 return self.ssh.lsmdiskgrp(pool) 1794 1795 def get_available_io_groups(self): 1796 """Return list of available IO groups.""" 1797 iogrps = [] 1798 resp = self.ssh.lsiogrp() 1799 for iogrp in resp: 1800 try: 1801 if int(iogrp['node_count']) > 0: 1802 iogrps.append(int(iogrp['id'])) 1803 except KeyError: 1804 self.handle_keyerror('lsiogrp', iogrp) 1805 except ValueError: 1806 msg = (_('Expected integer for node_count, ' 1807 'mcsinq lsiogrp returned: %(node)s.') % 1808 {'node': iogrp['node_count']}) 1809 raise exception.VolumeBackendAPIException(data=msg) 1810 return iogrps 1811 1812 def get_vdisk_count_by_io_group(self): 1813 res = {} 1814 resp = self.ssh.lsiogrp() 1815 for iogrp in resp: 1816 try: 1817 if int(iogrp['node_count']) > 0: 1818 res[int(iogrp['id'])] = int(iogrp['vdisk_count']) 1819 except KeyError: 1820 self.handle_keyerror('lsiogrp', iogrp) 1821 except ValueError: 1822 msg = (_('Expected integer for node_count, ' 1823 'mcsinq lsiogrp returned: %(node)s') % 1824 {'node': iogrp['node_count']}) 1825 raise exception.VolumeBackendAPIException(data=msg) 1826 return res 1827 1828 def select_io_group(self, state, opts): 1829 selected_iog = 0 1830 iog_list = InStorageAssistant._get_valid_requested_io_groups( 1831 state, opts) 1832 if len(iog_list) == 0: 1833 raise exception.InvalidInput( 1834 reason=_('Given I/O group(s) %(iogrp)s not valid; available ' 1835 'I/O groups are %(avail)s.') 1836 % {'iogrp': opts['iogrp'], 1837 'avail': state['available_iogrps']}) 1838 iog_vdc = self.get_vdisk_count_by_io_group() 1839 LOG.debug("IO group current balance %s", iog_vdc) 1840 min_vdisk_count = iog_vdc[iog_list[0]] 1841 selected_iog = iog_list[0] 1842 for iog in iog_list: 1843 if iog_vdc[iog] < min_vdisk_count: 1844 min_vdisk_count = iog_vdc[iog] 1845 selected_iog = iog 1846 LOG.debug("Selected io_group is %d", selected_iog) 1847 return selected_iog 1848 1849 def get_volume_io_group(self, vol_name): 1850 vdisk = self.ssh.lsvdisk(vol_name) 1851 if vdisk: 1852 resp = self.ssh.lsiogrp() 1853 for iogrp in resp: 1854 if iogrp['name'] == vdisk['IO_group_name']: 1855 return int(iogrp['id']) 1856 return None 1857 1858 def add_iscsi_ip_addrs(self, storage_nodes): 1859 """Add iSCSI IP addresses to system node information.""" 1860 resp = self.ssh.lsportip() 1861 for ip_data in resp: 1862 try: 1863 state = ip_data['state'] 1864 if ip_data['node_id'] in storage_nodes and ( 1865 state == 'configured' or state == 'online'): 1866 node = storage_nodes[ip_data['node_id']] 1867 if len(ip_data['IP_address']): 1868 node['ipv4'].append(ip_data['IP_address']) 1869 if len(ip_data['IP_address_6']): 1870 node['ipv6'].append(ip_data['IP_address_6']) 1871 except KeyError: 1872 self.handle_keyerror('lsportip', ip_data) 1873 1874 def add_fc_wwpns(self, storage_nodes): 1875 """Add FC WWPNs to system node information.""" 1876 for key in storage_nodes: 1877 node = storage_nodes[key] 1878 wwpns = set(node['WWPN']) 1879 resp = self.ssh.lsportfc(node_id=node['id']) 1880 for port_info in resp: 1881 if (port_info['type'] == 'fc' and 1882 port_info['status'] == 'active'): 1883 wwpns.add(port_info['WWPN']) 1884 node['WWPN'] = list(wwpns) 1885 LOG.info('WWPN on node %(node)s: %(wwpn)s.', 1886 {'node': node['id'], 'wwpn': node['WWPN']}) 1887 1888 def get_conn_fc_wwpns(self, host): 1889 wwpns = set() 1890 resp = self.ssh.lsfabric(host=host) 1891 for wwpn in resp.select('local_wwpn'): 1892 if wwpn is not None: 1893 wwpns.add(wwpn) 1894 return list(wwpns) 1895 1896 def add_chap_secret_to_host(self, host_name): 1897 """Generate and store a randomly-generated CHAP secret for the host.""" 1898 chap_secret = utils.generate_password() 1899 self.ssh.add_chap_secret(chap_secret, host_name) 1900 return chap_secret 1901 1902 def get_chap_secret_for_host(self, host_name): 1903 """Generate and store a randomly-generated CHAP secret for the host.""" 1904 resp = self.ssh.lsiscsiauth() 1905 host_found = False 1906 for host_data in resp: 1907 try: 1908 if host_data['name'] == host_name: 1909 host_found = True 1910 if host_data['iscsi_auth_method'] == 'chap': 1911 return host_data['iscsi_chap_secret'] 1912 except KeyError: 1913 self.handle_keyerror('lsiscsiauth', host_data) 1914 if not host_found: 1915 msg = _('Failed to find host %s.') % host_name 1916 raise exception.VolumeBackendAPIException(data=msg) 1917 return None 1918 1919 def get_host_from_connector(self, connector, volume_name=None): 1920 """Return the InStorage host described by the connector.""" 1921 LOG.debug('Enter: get_host_from_connector: %s.', connector) 1922 1923 # If we have FC information, we have a faster lookup option 1924 host_name = None 1925 if 'wwpns' in connector: 1926 for wwpn in connector['wwpns']: 1927 resp = self.ssh.lsfabric(wwpn=wwpn) 1928 for wwpn_info in resp: 1929 try: 1930 if (wwpn_info['remote_wwpn'] and 1931 wwpn_info['name'] and 1932 wwpn_info['remote_wwpn'].lower() == 1933 wwpn.lower()): 1934 host_name = wwpn_info['name'] 1935 break 1936 except KeyError: 1937 self.handle_keyerror('lsfabric', wwpn_info) 1938 if host_name: 1939 break 1940 if host_name: 1941 LOG.debug('Leave: get_host_from_connector: host %s.', host_name) 1942 return host_name 1943 1944 def update_host_list(host, host_list): 1945 idx = host_list.index(host) 1946 del host_list[idx] 1947 host_list.insert(0, host) 1948 1949 # That didn't work, so try exhaustive search 1950 hosts_info = self.ssh.lshost() 1951 host_list = list(hosts_info.select('name')) 1952 # If we have a "real" connector, we might be able to find the 1953 # host entry with fewer queries if we move the host entries 1954 # that contain the connector's host property value to the front 1955 # of the list 1956 if 'host' in connector: 1957 # order host_list such that the host entries that 1958 # contain the connector's host name are at the 1959 # beginning of the list 1960 for host in host_list: 1961 if re.search(connector['host'], host): 1962 update_host_list(host, host_list) 1963 # If we have a volume name we have a potential fast path 1964 # for finding the matching host for that volume. 1965 # Add the host_names that have mappings for our volume to the 1966 # head of the list of host names to search them first 1967 if volume_name: 1968 hosts_map_info = self.ssh.lsvdiskhostmap(volume_name) 1969 hosts_map_info_list = list(hosts_map_info.select('host_name')) 1970 # remove the fast path host names from the end of the list 1971 # and move to the front so they are only searched for once. 1972 for host in hosts_map_info_list: 1973 update_host_list(host, host_list) 1974 found = False 1975 for name in host_list: 1976 try: 1977 resp = self.ssh.lshost(host=name) 1978 except exception.VolumeBackendAPIException as ex: 1979 LOG.debug("Exception message: %s", ex.msg) 1980 if 'CMMVC5754E' in ex.msg: 1981 LOG.debug("CMMVC5754E found in CLI exception.") 1982 # CMMVC5754E: The specified object does not exist 1983 # The host has been deleted while walking the list. 1984 # This is a result of a host change on the MCS that 1985 # is out of band to this request. 1986 continue 1987 # unexpected error so reraise it 1988 with excutils.save_and_reraise_exception(): 1989 pass 1990 if 'initiator' in connector: 1991 for iscsi in resp.select('iscsi_name'): 1992 if iscsi == connector['initiator']: 1993 host_name = name 1994 found = True 1995 break 1996 elif 'wwpns' in connector and len(connector['wwpns']): 1997 connector_wwpns = [str(x).lower() for x in connector['wwpns']] 1998 for wwpn in resp.select('WWPN'): 1999 if wwpn and wwpn.lower() in connector_wwpns: 2000 host_name = name 2001 found = True 2002 break 2003 if found: 2004 break 2005 2006 LOG.debug('Leave: get_host_from_connector: host %s.', host_name) 2007 return host_name 2008 2009 def create_host(self, connector): 2010 """Create a new host on the storage system. 2011 2012 We create a host name and associate it with the given connection 2013 information. The host name will be a cleaned up version of the given 2014 host name (at most 55 characters), plus a random 8-character suffix to 2015 avoid collisions. The total length should be at most 63 characters. 2016 """ 2017 LOG.debug('Enter: create_host: host %s.', connector['host']) 2018 2019 # Before we start, make sure host name is a string and that we have 2020 # one port at least . 2021 host_name = connector['host'] 2022 if not isinstance(host_name, six.string_types): 2023 msg = _('create_host: Host name is not unicode or string.') 2024 LOG.error(msg) 2025 raise exception.VolumeDriverException(message=msg) 2026 2027 ports = [] 2028 if 'initiator' in connector: 2029 ports.append(['initiator', '%s' % connector['initiator']]) 2030 if 'wwpns' in connector: 2031 for wwpn in connector['wwpns']: 2032 ports.append(['wwpn', '%s' % wwpn]) 2033 if not len(ports): 2034 msg = _('create_host: No initiators or wwpns supplied.') 2035 LOG.error(msg) 2036 raise exception.VolumeDriverException(message=msg) 2037 2038 # Build a host name for the InStorage host - first clean up the name 2039 if isinstance(host_name, six.text_type): 2040 host_name = unicodedata.normalize('NFKD', host_name).encode( 2041 'ascii', 'replace').decode('ascii') 2042 2043 for num in range(0, 128): 2044 ch = str(chr(num)) 2045 if not ch.isalnum() and ch not in [' ', '.', '-', '_']: 2046 host_name = host_name.replace(ch, '-') 2047 2048 # InStorage doesn't expect hostname that doesn't starts with letter or 2049 # _. 2050 if not re.match('^[A-Za-z]', host_name): 2051 host_name = '_' + host_name 2052 2053 # Add a random 8-character suffix to avoid collisions 2054 rand_id = str(random.randint(0, 99999999)).zfill(8) 2055 host_name = '%s-%s' % (host_name[:55], rand_id) 2056 2057 # Create a host with one port 2058 port = ports.pop(0) 2059 self.ssh.mkhost(host_name, port[0], port[1]) 2060 2061 # Add any additional ports to the host 2062 for port in ports: 2063 self.ssh.addhostport(host_name, port[0], port[1]) 2064 2065 LOG.debug('Leave: create_host: host %(host)s - %(host_name)s.', 2066 {'host': connector['host'], 'host_name': host_name}) 2067 return host_name 2068 2069 def delete_host(self, host_name): 2070 self.ssh.rmhost(host_name) 2071 2072 def check_host_mapped_vols(self, host_name): 2073 return self.ssh.lshostvdiskmap(host_name) 2074 2075 def map_vol_to_host(self, volume_name, host_name, multihostmap): 2076 """Create a mapping between a volume to a host.""" 2077 2078 LOG.debug('Enter: map_vol_to_host: volume %(volume_name)s to ' 2079 'host %(host_name)s.', 2080 {'volume_name': volume_name, 'host_name': host_name}) 2081 2082 # Check if this volume is already mapped to this host 2083 result_lun = self.ssh.get_vdiskhostmapid(volume_name, host_name) 2084 if result_lun is None: 2085 result_lun = self.ssh.mkvdiskhostmap(host_name, volume_name, None, 2086 multihostmap) 2087 2088 LOG.debug('Leave: map_vol_to_host: LUN %(result_lun)s, volume ' 2089 '%(volume_name)s, host %(host_name)s.', 2090 {'result_lun': result_lun, 2091 'volume_name': volume_name, 2092 'host_name': host_name}) 2093 return int(result_lun) 2094 2095 def unmap_vol_from_host(self, volume_name, host_name): 2096 """Unmap the volume and delete the host if it has no more mappings.""" 2097 2098 LOG.debug('Enter: unmap_vol_from_host: volume %(volume_name)s from ' 2099 'host %(host_name)s.', 2100 {'volume_name': volume_name, 'host_name': host_name}) 2101 2102 # Check if the mapping exists 2103 resp = self.ssh.lsvdiskhostmap(volume_name) 2104 if not len(resp): 2105 LOG.warning('unmap_vol_from_host: No mapping of volume ' 2106 '%(vol_name)s to any host found.', 2107 {'vol_name': volume_name}) 2108 return host_name 2109 if host_name is None: 2110 if len(resp) > 1: 2111 LOG.warning('unmap_vol_from_host: Multiple mappings of ' 2112 'volume %(vol_name)s found, no host ' 2113 'specified.', {'vol_name': volume_name}) 2114 return 2115 else: 2116 host_name = resp[0]['host_name'] 2117 else: 2118 found = False 2119 for h in resp.select('host_name'): 2120 if h == host_name: 2121 found = True 2122 if not found: 2123 LOG.warning('unmap_vol_from_host: No mapping of volume ' 2124 '%(vol_name)s to host %(host)s found.', 2125 {'vol_name': volume_name, 'host': host_name}) 2126 return host_name 2127 # We now know that the mapping exists 2128 self.ssh.rmvdiskhostmap(host_name, volume_name) 2129 2130 LOG.debug('Leave: unmap_vol_from_host: volume %(volume_name)s from ' 2131 'host %(host_name)s.', 2132 {'volume_name': volume_name, 'host_name': host_name}) 2133 return host_name 2134 2135 @staticmethod 2136 def build_default_opts(config): 2137 # Ignore capitalization 2138 2139 opt = {'rsize': config.instorage_mcs_vol_rsize, 2140 'warning': config.instorage_mcs_vol_warning, 2141 'autoexpand': config.instorage_mcs_vol_autoexpand, 2142 'grainsize': config.instorage_mcs_vol_grainsize, 2143 'compression': config.instorage_mcs_vol_compression, 2144 'intier': config.instorage_mcs_vol_intier, 2145 'iogrp': config.instorage_mcs_vol_iogrp, 2146 'qos': None, 2147 'replication': False} 2148 return opt 2149 2150 @staticmethod 2151 def check_vdisk_opts(state, opts): 2152 # Check that grainsize is 32/64/128/256 2153 if opts['grainsize'] not in [32, 64, 128, 256]: 2154 raise exception.InvalidInput( 2155 reason=_('Illegal value specified for ' 2156 'instorage_mcs_vol_grainsize: set to either ' 2157 '32, 64, 128, or 256.')) 2158 2159 # Check that compression is supported 2160 if opts['compression'] and not state['compression_enabled']: 2161 raise exception.InvalidInput( 2162 reason=_('System does not support compression.')) 2163 2164 # Check that rsize is set if compression is set 2165 if opts['compression'] and opts['rsize'] == -1: 2166 raise exception.InvalidInput( 2167 reason=_('If compression is set to True, rsize must ' 2168 'also be set (not equal to -1).')) 2169 2170 iogs = InStorageAssistant._get_valid_requested_io_groups(state, opts) 2171 2172 if len(iogs) == 0: 2173 raise exception.InvalidInput( 2174 reason=_('Given I/O group(s) %(iogrp)s not valid; available ' 2175 'I/O groups are %(avail)s.') 2176 % {'iogrp': opts['iogrp'], 2177 'avail': state['available_iogrps']}) 2178 2179 @staticmethod 2180 def _get_valid_requested_io_groups(state, opts): 2181 given_iogs = str(opts['iogrp']) 2182 iog_list = given_iogs.split(',') 2183 # convert to int 2184 iog_list = list(map(int, iog_list)) 2185 LOG.debug("Requested iogroups %s", iog_list) 2186 LOG.debug("Available iogroups %s", state['available_iogrps']) 2187 filtiog = set(iog_list).intersection(state['available_iogrps']) 2188 iog_list = list(filtiog) 2189 LOG.debug("Filtered (valid) requested iogroups %s", iog_list) 2190 return iog_list 2191 2192 def _get_opts_from_specs(self, opts, specs): 2193 qos = {} 2194 for k, value in specs.items(): 2195 # Get the scope, if using scope format 2196 key_split = k.split(':') 2197 if len(key_split) == 1: 2198 scope = None 2199 key = key_split[0] 2200 else: 2201 scope = key_split[0] 2202 key = key_split[1] 2203 2204 # We generally do not look at capabilities in the driver, but 2205 # replication is a special case where the user asks for 2206 # a volume to be replicated, and we want both the scheduler and 2207 # the driver to act on the value. 2208 if ((not scope or scope == 'capabilities') and 2209 key == 'replication'): 2210 scope = None 2211 key = 'replication' 2212 words = value.split() 2213 if not (words and len(words) == 2 and words[0] == '<is>'): 2214 LOG.error("Replication must be specified as " 2215 "'<is> True' or '<is> False'.") 2216 del words[0] 2217 value = words[0] 2218 2219 # Add the QoS. 2220 if scope and scope == 'qos': 2221 if key in self.mcs_qos_keys.keys(): 2222 try: 2223 type_fn = self.mcs_qos_keys[key]['type'] 2224 value = type_fn(value) 2225 qos[key] = value 2226 except ValueError: 2227 continue 2228 2229 # Any keys that the driver should look at should have the 2230 # 'drivers' scope. 2231 if scope and scope != 'drivers': 2232 continue 2233 if key in opts: 2234 this_type = type(opts[key]).__name__ 2235 if this_type == 'int': 2236 value = int(value) 2237 elif this_type == 'bool': 2238 value = strutils.bool_from_string(value) 2239 opts[key] = value 2240 if len(qos) != 0: 2241 opts['qos'] = qos 2242 return opts 2243 2244 def _get_qos_from_volume_metadata(self, volume_metadata): 2245 """Return the QoS information from the volume metadata.""" 2246 qos = {} 2247 for i in volume_metadata: 2248 k = i.get('key', None) 2249 value = i.get('value', None) 2250 key_split = k.split(':') 2251 if len(key_split) == 1: 2252 scope = None 2253 key = key_split[0] 2254 else: 2255 scope = key_split[0] 2256 key = key_split[1] 2257 # Add the QoS. 2258 if scope and scope == 'qos': 2259 if key in self.mcs_qos_keys.keys(): 2260 try: 2261 type_fn = self.mcs_qos_keys[key]['type'] 2262 value = type_fn(value) 2263 qos[key] = value 2264 except ValueError: 2265 continue 2266 return qos 2267 2268 def _wait_for_a_condition(self, testmethod, timeout=None, 2269 interval=INTERVAL_1_SEC, 2270 raise_exception=False): 2271 start_time = time.time() 2272 if timeout is None: 2273 timeout = DEFAULT_TIMEOUT 2274 2275 def _inner(): 2276 try: 2277 testValue = testmethod() 2278 except Exception as ex: 2279 if raise_exception: 2280 LOG.exception("_wait_for_a_condition: %s" 2281 " execution failed.", 2282 testmethod.__name__) 2283 raise exception.VolumeBackendAPIException(data=ex) 2284 else: 2285 testValue = False 2286 LOG.debug('Assistant.' 2287 '_wait_for_condition: %(method_name)s ' 2288 'execution failed for %(exception)s.', 2289 {'method_name': testmethod.__name__, 2290 'exception': ex.message}) 2291 if testValue: 2292 raise loopingcall.LoopingCallDone() 2293 2294 if int(time.time()) - start_time > timeout: 2295 msg = ( 2296 _('CommandLineAssistant._wait_for_condition: ' 2297 '%s timeout.') % testmethod.__name__) 2298 LOG.error(msg) 2299 raise exception.VolumeBackendAPIException(data=msg) 2300 2301 timer = loopingcall.FixedIntervalLoopingCall(_inner) 2302 timer.start(interval=interval).wait() 2303 2304 def get_vdisk_params(self, config, state, type_id, 2305 volume_type=None, volume_metadata=None): 2306 """Return the parameters for creating the vdisk. 2307 2308 Get volume type and defaults from config options 2309 and take them into account. 2310 """ 2311 opts = self.build_default_opts(config) 2312 ctxt = context.get_admin_context() 2313 if volume_type is None and type_id is not None: 2314 volume_type = volume_types.get_volume_type(ctxt, type_id) 2315 if volume_type: 2316 qos_specs_id = volume_type.get('qos_specs_id') 2317 specs = dict(volume_type).get('extra_specs') 2318 2319 # NOTE: We prefer the qos_specs association 2320 # and over-ride any existing 2321 # extra-specs settings if present 2322 if qos_specs_id is not None: 2323 kvs = qos_specs.get_qos_specs(ctxt, qos_specs_id)['specs'] 2324 # Merge the qos_specs into extra_specs and qos_specs has higher 2325 # priority than extra_specs if they have different values for 2326 # the same key. 2327 specs.update(kvs) 2328 opts = self._get_opts_from_specs(opts, specs) 2329 if (opts['qos'] is None and config.instorage_mcs_allow_tenant_qos and 2330 volume_metadata): 2331 qos = self._get_qos_from_volume_metadata(volume_metadata) 2332 if len(qos) != 0: 2333 opts['qos'] = qos 2334 2335 self.check_vdisk_opts(state, opts) 2336 return opts 2337 2338 @staticmethod 2339 def _get_vdisk_create_params(opts): 2340 intier = 'on' if opts['intier'] else 'off' 2341 if opts['rsize'] == -1: 2342 params = [] 2343 else: 2344 params = ['-rsize', '%s%%' % str(opts['rsize']), 2345 '-autoexpand', '-warning', 2346 '%s%%' % str(opts['warning'])] 2347 if not opts['autoexpand']: 2348 params.remove('-autoexpand') 2349 2350 if opts['compression']: 2351 params.append('-compressed') 2352 else: 2353 params.extend(['-grainsize', str(opts['grainsize'])]) 2354 2355 params.extend(['-intier', intier]) 2356 return params 2357 2358 def create_vdisk(self, name, size, units, pool, opts): 2359 name = '"%s"' % name 2360 LOG.debug('Enter: create_vdisk: vdisk %s.', name) 2361 params = self._get_vdisk_create_params(opts) 2362 self.ssh.mkvdisk(name, size, units, pool, opts, params) 2363 LOG.debug('Leave: _create_vdisk: volume %s.', name) 2364 2365 def delete_vdisk(self, vdisk, force): 2366 """Ensures that vdisk is not part of FC mapping and deletes it.""" 2367 LOG.debug('Enter: delete_vdisk: vdisk %s.', vdisk) 2368 if not self.is_vdisk_defined(vdisk): 2369 LOG.info('Tried to delete non-existent vdisk %s.', vdisk) 2370 return 2371 self.ensure_vdisk_no_lc_mappings(vdisk, allow_snaps=True, 2372 allow_lctgt=True) 2373 self.ssh.rmvdisk(vdisk, force=force) 2374 LOG.debug('Leave: delete_vdisk: vdisk %s.', vdisk) 2375 2376 def is_vdisk_defined(self, vdisk_name): 2377 """Check if vdisk is defined.""" 2378 attrs = self.get_vdisk_attributes(vdisk_name) 2379 return attrs is not None 2380 2381 def get_vdisk_attributes(self, vdisk): 2382 attrs = self.ssh.lsvdisk(vdisk) 2383 return attrs 2384 2385 def find_vdisk_copy_id(self, vdisk, pool): 2386 resp = self.ssh.lsvdiskcopy(vdisk) 2387 for copy_id, mdisk_grp in resp.select('copy_id', 'mdisk_grp_name'): 2388 if mdisk_grp == pool: 2389 return copy_id 2390 msg = _('Failed to find a vdisk copy in the expected pool.') 2391 LOG.error(msg) 2392 raise exception.VolumeDriverException(message=msg) 2393 2394 def get_vdisk_copy_attrs(self, vdisk, copy_id): 2395 return self.ssh.lsvdiskcopy(vdisk, copy_id=copy_id)[0] 2396 2397 def get_vdisk_copy_ids(self, vdisk): 2398 resp = self.ssh.lsvdiskcopy(vdisk) 2399 if len(resp) == 2: 2400 if resp[0]['primary'] == 'yes': 2401 primary = resp[0]['copy_id'] 2402 secondary = resp[1]['copy_id'] 2403 else: 2404 primary = resp[1]['copy_id'] 2405 secondary = resp[0]['copy_id'] 2406 2407 return primary, secondary 2408 else: 2409 msg = (_('list_vdisk_copy failed: No copy of volume %s exists.') 2410 % vdisk) 2411 raise exception.VolumeDriverException(message=msg) 2412 2413 def get_vdisk_copies(self, vdisk): 2414 copies = {'primary': None, 2415 'secondary': None} 2416 2417 resp = self.ssh.lsvdiskcopy(vdisk) 2418 for copy_id, status, sync, primary, mdisk_grp in ( 2419 resp.select('copy_id', 'status', 'sync', 2420 'primary', 'mdisk_grp_name')): 2421 copy = {'copy_id': copy_id, 2422 'status': status, 2423 'sync': sync, 2424 'primary': primary, 2425 'mdisk_grp_name': mdisk_grp, 2426 'sync_progress': None} 2427 if copy['sync'] != 'yes': 2428 progress_info = self.ssh.lsvdisksyncprogress(vdisk, copy_id) 2429 copy['sync_progress'] = progress_info['progress'] 2430 if copy['primary'] == 'yes': 2431 copies['primary'] = copy 2432 else: 2433 copies['secondary'] = copy 2434 return copies 2435 2436 def create_copy(self, src, tgt, src_id, config, opts, 2437 full_copy, pool=None): 2438 """Create a new snapshot using LocalCopy.""" 2439 LOG.debug('Enter: create_copy: snapshot %(src)s to %(tgt)s.', 2440 {'tgt': tgt, 'src': src}) 2441 2442 src_attrs = self.get_vdisk_attributes(src) 2443 if src_attrs is None: 2444 msg = (_('create_copy: Source vdisk %(src)s (%(src_id)s) ' 2445 'does not exist.') % {'src': src, 'src_id': src_id}) 2446 LOG.error(msg) 2447 raise exception.VolumeDriverException(message=msg) 2448 2449 src_size = src_attrs['capacity'] 2450 # In case we need to use a specific pool 2451 if not pool: 2452 pool = src_attrs['mdisk_grp_name'] 2453 2454 opts['iogrp'] = src_attrs['IO_group_id'] 2455 self.create_vdisk(tgt, src_size, 'b', pool, opts) 2456 timeout = config.instorage_mcs_localcopy_timeout 2457 try: 2458 self.run_localcopy(src, tgt, timeout, 2459 config.instorage_mcs_localcopy_rate, 2460 full_copy=full_copy) 2461 except Exception: 2462 with excutils.save_and_reraise_exception(): 2463 self.delete_vdisk(tgt, True) 2464 2465 LOG.debug('Leave: _create_copy: snapshot %(tgt)s from ' 2466 'vdisk %(src)s.', 2467 {'tgt': tgt, 'src': src}) 2468 2469 def extend_vdisk(self, vdisk, amount): 2470 self.ssh.expandvdisksize(vdisk, amount) 2471 2472 def add_vdisk_copy(self, vdisk, dest_pool, volume_type, state, config): 2473 """Add a vdisk copy in the given pool.""" 2474 resp = self.ssh.lsvdiskcopy(vdisk) 2475 if len(resp) > 1: 2476 msg = (_('add_vdisk_copy failed: A copy of volume %s exists. ' 2477 'Adding another copy would exceed the limit of ' 2478 '2 copies.') % vdisk) 2479 raise exception.VolumeDriverException(message=msg) 2480 orig_copy_id = resp[0].get("copy_id", None) 2481 2482 if orig_copy_id is None: 2483 msg = (_('add_vdisk_copy started without a vdisk copy in the ' 2484 'expected pool.')) 2485 LOG.error(msg) 2486 raise exception.VolumeDriverException(message=msg) 2487 2488 if volume_type is None: 2489 opts = self.get_vdisk_params(config, state, None) 2490 else: 2491 opts = self.get_vdisk_params(config, state, volume_type['id'], 2492 volume_type=volume_type) 2493 params = self._get_vdisk_create_params(opts) 2494 new_copy_id = self.ssh.addvdiskcopy(vdisk, dest_pool, params) 2495 return (orig_copy_id, new_copy_id) 2496 2497 def check_vdisk_copy_synced(self, vdisk, copy_id): 2498 sync = self.ssh.lsvdiskcopy(vdisk, copy_id=copy_id)[0]['sync'] 2499 if sync == 'yes': 2500 return True 2501 return False 2502 2503 def rm_vdisk_copy(self, vdisk, copy_id): 2504 self.ssh.rmvdiskcopy(vdisk, copy_id) 2505 2506 def _prepare_lc_map(self, lc_map_id, timeout): 2507 self.ssh.prestartlcmap(lc_map_id) 2508 mapping_ready = False 2509 max_retries = (timeout // self.WAIT_TIME) + 1 2510 for try_number in range(1, max_retries): 2511 mapping_attrs = self._get_localcopy_mapping_attributes(lc_map_id) 2512 if (mapping_attrs is None or 2513 'status' not in mapping_attrs): 2514 break 2515 if mapping_attrs['status'] == 'prepared': 2516 mapping_ready = True 2517 break 2518 elif mapping_attrs['status'] == 'stopped': 2519 self.ssh.prestartlcmap(lc_map_id) 2520 elif mapping_attrs['status'] != 'preparing': 2521 msg = (_('Unexecpted mapping status %(status)s for mapping ' 2522 '%(id)s. Attributes: %(attr)s.') 2523 % {'status': mapping_attrs['status'], 2524 'id': lc_map_id, 2525 'attr': mapping_attrs}) 2526 LOG.error(msg) 2527 raise exception.VolumeBackendAPIException(data=msg) 2528 greenthread.sleep(self.WAIT_TIME) 2529 2530 if not mapping_ready: 2531 msg = (_('Mapping %(id)s prepare failed to complete within the' 2532 'allotted %(to)d seconds timeout. Terminating.') 2533 % {'id': lc_map_id, 2534 'to': timeout}) 2535 LOG.error(msg) 2536 raise exception.VolumeDriverException(message=msg) 2537 2538 # Consistency Group 2539 def start_lc_consistgrp(self, lc_consistgrp): 2540 self.ssh.startlcconsistgrp(lc_consistgrp) 2541 2542 def create_lc_consistgrp(self, lc_consistgrp): 2543 self.ssh.mklcconsistgrp(lc_consistgrp) 2544 2545 def delete_lc_consistgrp(self, lc_consistgrp): 2546 self.ssh.rmlcconsistgrp(lc_consistgrp) 2547 2548 def stop_lc_consistgrp(self, lc_consistgrp): 2549 self.ssh.stoplcconsistgrp(lc_consistgrp) 2550 2551 def run_consistgrp_snapshots(self, lc_consistgrp, snapshots, state, 2552 config, timeout): 2553 model_update = {'status': fields.ConsistencyGroupStatus.AVAILABLE} 2554 snapshots_model_update = [] 2555 try: 2556 for snapshot in snapshots: 2557 opts = self.get_vdisk_params(config, state, 2558 snapshot.volume_type_id) 2559 2560 self.create_localcopy_to_consistgrp(snapshot.volume_name, 2561 snapshot.name, 2562 lc_consistgrp, 2563 config, opts) 2564 2565 self.prepare_lc_consistgrp(lc_consistgrp, timeout) 2566 self.start_lc_consistgrp(lc_consistgrp) 2567 # There is CG limitation that could not create more than 128 CGs. 2568 # After start CG, we delete CG to avoid CG limitation. 2569 # Cinder general will maintain the CG and snapshots relationship. 2570 self.delete_lc_consistgrp(lc_consistgrp) 2571 except exception.VolumeBackendAPIException as err: 2572 model_update['status'] = fields.ConsistencyGroupStatus.ERROR 2573 # Release cg 2574 self.delete_lc_consistgrp(lc_consistgrp) 2575 LOG.error("Failed to create CGSnapshot. " 2576 "Exception: %s.", err) 2577 2578 for snapshot in snapshots: 2579 snapshots_model_update.append( 2580 {'id': snapshot.id, 2581 'status': model_update['status']}) 2582 2583 return model_update, snapshots_model_update 2584 2585 def delete_consistgrp_snapshots(self, lc_consistgrp, snapshots): 2586 """Delete localcopy maps and consistent group.""" 2587 model_update = {'status': fields.ConsistencyGroupStatus.DELETED} 2588 snapshots_model_update = [] 2589 2590 try: 2591 for snapshot in snapshots: 2592 self.ssh.rmvdisk(snapshot.name, True) 2593 except exception.VolumeBackendAPIException as err: 2594 model_update['status'] = ( 2595 fields.ConsistencyGroupStatus.ERROR_DELETING) 2596 LOG.error("Failed to delete the snapshot %(snap)s of " 2597 "CGSnapshot. Exception: %(exception)s.", 2598 {'snap': snapshot.name, 'exception': err}) 2599 2600 for snapshot in snapshots: 2601 snapshots_model_update.append( 2602 {'id': snapshot.id, 2603 'status': model_update['status']}) 2604 2605 return model_update, snapshots_model_update 2606 2607 def run_group_snapshots(self, lc_group, snapshots, state, 2608 config, timeout): 2609 model_update = {'status': fields.GroupStatus.AVAILABLE} 2610 snapshots_model_update = [] 2611 try: 2612 for snapshot in snapshots: 2613 opts = self.get_vdisk_params(config, state, 2614 snapshot.volume_type_id) 2615 2616 self.create_localcopy_to_consistgrp(snapshot.volume_name, 2617 snapshot.name, 2618 lc_group, 2619 config, opts) 2620 2621 self.prepare_lc_consistgrp(lc_group, timeout) 2622 self.start_lc_consistgrp(lc_group) 2623 # There is CG limitation that could not create more than 128 CGs. 2624 # After start CG, we delete CG to avoid CG limitation. 2625 # Cinder general will maintain the group and snapshots 2626 # relationship. 2627 self.delete_lc_consistgrp(lc_group) 2628 except exception.VolumeBackendAPIException as err: 2629 model_update['status'] = fields.GroupStatus.ERROR 2630 # Release cg 2631 self.delete_lc_consistgrp(lc_group) 2632 LOG.error("Failed to create Group_Snapshot. " 2633 "Exception: %s.", err) 2634 2635 for snapshot in snapshots: 2636 snapshots_model_update.append( 2637 {'id': snapshot.id, 2638 'status': model_update['status']}) 2639 2640 return model_update, snapshots_model_update 2641 2642 def delete_group_snapshots(self, lc_group, snapshots): 2643 """Delete localcopy maps and group.""" 2644 model_update = {'status': fields.GroupStatus.DELETED} 2645 snapshots_model_update = [] 2646 2647 try: 2648 for snapshot in snapshots: 2649 self.ssh.rmvdisk(snapshot.name, True) 2650 except exception.VolumeBackendAPIException as err: 2651 model_update['status'] = ( 2652 fields.GroupStatus.ERROR_DELETING) 2653 LOG.error("Failed to delete the snapshot %(snap)s of " 2654 "Group_Snapshot. Exception: %(exception)s.", 2655 {'snap': snapshot.name, 'exception': err}) 2656 2657 for snapshot in snapshots: 2658 snapshots_model_update.append( 2659 {'id': snapshot.id, 2660 'status': model_update['status']}) 2661 2662 return model_update, snapshots_model_update 2663 2664 def prepare_lc_consistgrp(self, lc_consistgrp, timeout): 2665 """Prepare LC Consistency Group.""" 2666 self.ssh.prestartlcconsistgrp(lc_consistgrp) 2667 2668 def prepare_lc_consistgrp_success(): 2669 mapping_ready = False 2670 mapping_attrs = self._get_localcopy_consistgrp_attr(lc_consistgrp) 2671 if (mapping_attrs is None or 2672 'status' not in mapping_attrs): 2673 pass 2674 if mapping_attrs['status'] == 'prepared': 2675 mapping_ready = True 2676 elif mapping_attrs['status'] == 'stopped': 2677 self.ssh.prestartlcconsistgrp(lc_consistgrp) 2678 elif mapping_attrs['status'] != 'preparing': 2679 msg = (_('Unexpected mapping status %(status)s for mapping ' 2680 '%(id)s. Attributes: %(attr)s.') % 2681 {'status': mapping_attrs['status'], 2682 'id': lc_consistgrp, 2683 'attr': mapping_attrs}) 2684 LOG.error(msg) 2685 raise exception.VolumeBackendAPIException(data=msg) 2686 return mapping_ready 2687 self._wait_for_a_condition(prepare_lc_consistgrp_success, timeout) 2688 2689 def create_group_from_source(self, group, lc_group, 2690 sources, targets, state, 2691 config, timeout): 2692 """Create group from source""" 2693 LOG.debug('Enter: create_group_from_source: group %(group)s' 2694 ' source %(source)s, target %(target)s', 2695 {'group': lc_group, 'source': sources, 'target': targets}) 2696 model_update = {'status': fields.GroupStatus.AVAILABLE} 2697 ctxt = context.get_admin_context() 2698 try: 2699 for source, target in zip(sources, targets): 2700 opts = self.get_vdisk_params(config, state, 2701 source.volume_type_id) 2702 pool = utils.extract_host(target.host, 'pool') 2703 self.create_localcopy_to_consistgrp(source.name, 2704 target.name, 2705 lc_group, 2706 config, opts, 2707 True, pool=pool) 2708 self.prepare_lc_consistgrp(lc_group, timeout) 2709 self.start_lc_consistgrp(lc_group) 2710 self.delete_lc_consistgrp(lc_group) 2711 volumes_model_update = self._get_volume_model_updates( 2712 ctxt, targets, group.id, model_update['status']) 2713 except exception.VolumeBackendAPIException as err: 2714 model_update['status'] = fields.GroupStatus.ERROR 2715 volumes_model_update = self._get_volume_model_updates( 2716 ctxt, targets, group.id, model_update['status']) 2717 with excutils.save_and_reraise_exception(): 2718 self.delete_lc_consistgrp(lc_group) 2719 LOG.error("Failed to create group from group_snapshot. " 2720 "Exception: %s", err) 2721 return model_update, volumes_model_update 2722 2723 LOG.debug('Leave: create_cg_from_source.') 2724 return model_update, volumes_model_update 2725 2726 def _get_volume_model_updates(self, ctxt, volumes, cgId, 2727 status='available'): 2728 """Update the volume model's status and return it.""" 2729 volume_model_updates = [] 2730 LOG.info("Updating status for CG: %(id)s.", {'id': cgId}) 2731 if volumes: 2732 for volume in volumes: 2733 volume_model_updates.append({'id': volume.id, 2734 'status': status}) 2735 else: 2736 LOG.info("No volume found for CG: %(cg)s.", {'cg': cgId}) 2737 return volume_model_updates 2738 2739 def run_localcopy(self, source, target, timeout, copy_rate, 2740 full_copy=True): 2741 """Create a LocalCopy mapping from the source to the target.""" 2742 LOG.debug('Enter: run_localcopy: execute LocalCopy from source ' 2743 '%(source)s to target %(target)s.', 2744 {'source': source, 'target': target}) 2745 2746 lc_map_id = self.ssh.mklcmap(source, target, full_copy, copy_rate) 2747 self._prepare_lc_map(lc_map_id, timeout) 2748 self.ssh.startlcmap(lc_map_id) 2749 2750 LOG.debug('Leave: run_localcopy: LocalCopy started from ' 2751 '%(source)s to %(target)s.', 2752 {'source': source, 'target': target}) 2753 2754 def create_localcopy_to_consistgrp(self, source, target, consistgrp, 2755 config, opts, full_copy=False, 2756 pool=None): 2757 """Create a LocalCopy mapping and add to consistent group.""" 2758 LOG.debug('Enter: create_localcopy_to_consistgrp: create LocalCopy' 2759 ' from source %(source)s to target %(target)s' 2760 'Then add the localcopy to %(cg)s.', 2761 {'source': source, 'target': target, 'cg': consistgrp}) 2762 2763 src_attrs = self.get_vdisk_attributes(source) 2764 if src_attrs is None: 2765 msg = (_('create_copy: Source vdisk %(src)s ' 2766 'does not exist.') % {'src': source}) 2767 LOG.error(msg) 2768 raise exception.VolumeDriverException(message=msg) 2769 2770 src_size = src_attrs['capacity'] 2771 # In case we need to use a specific pool 2772 if not pool: 2773 pool = src_attrs['mdisk_grp_name'] 2774 opts['iogrp'] = src_attrs['IO_group_id'] 2775 self.create_vdisk(target, src_size, 'b', pool, opts) 2776 2777 self.ssh.mklcmap(source, target, full_copy, 2778 config.instorage_mcs_localcopy_rate, 2779 consistgrp=consistgrp) 2780 2781 LOG.debug('Leave: create_localcopy_to_consistgrp: ' 2782 'LocalCopy started from %(source)s to %(target)s.', 2783 {'source': source, 'target': target}) 2784 2785 def _get_vdisk_lc_mappings(self, vdisk): 2786 """Return LocalCopy mappings that this vdisk is associated with.""" 2787 mapping_ids = [] 2788 resp = self.ssh.lsvdisklcmappings(vdisk) 2789 for id in resp.select('id'): 2790 mapping_ids.append(id) 2791 return mapping_ids 2792 2793 def _get_localcopy_mapping_attributes(self, lc_map_id): 2794 resp = self.ssh.lslcmap(lc_map_id) 2795 if not len(resp): 2796 return None 2797 return resp[0] 2798 2799 def _get_localcopy_consistgrp_attr(self, lc_map_id): 2800 resp = self.ssh.lslcconsistgrp(lc_map_id) 2801 if not len(resp): 2802 return None 2803 return resp[0] 2804 2805 def _check_vdisk_lc_mappings(self, name, 2806 allow_snaps=True, allow_lctgt=False): 2807 """LocalCopy mapping check helper.""" 2808 LOG.debug('Loopcall: _check_vdisk_lc_mappings(), vdisk %s.', name) 2809 mapping_ids = self._get_vdisk_lc_mappings(name) 2810 wait_for_copy = False 2811 rmlcmap_failed_e = None 2812 for map_id in mapping_ids: 2813 attrs = self._get_localcopy_mapping_attributes(map_id) 2814 if not attrs: 2815 continue 2816 source = attrs['source_vdisk_name'] 2817 target = attrs['target_vdisk_name'] 2818 copy_rate = attrs['copy_rate'] 2819 status = attrs['status'] 2820 2821 if allow_lctgt and target == name and status == 'copying': 2822 self.ssh.stoplcmap(map_id) 2823 attrs = self._get_localcopy_mapping_attributes(map_id) 2824 if attrs: 2825 status = attrs['status'] 2826 2827 if copy_rate == '0': 2828 if source == name: 2829 # Vdisk with snapshots. Return False if snapshot 2830 # not allowed. 2831 if not allow_snaps: 2832 raise loopingcall.LoopingCallDone(retvalue=False) 2833 self.ssh.chlcmap(map_id, copyrate='50', autodel='on') 2834 wait_for_copy = True 2835 else: 2836 # A snapshot 2837 if target != name: 2838 msg = (_('Vdisk %(name)s not involved in ' 2839 'mapping %(src)s -> %(tgt)s.') % 2840 {'name': name, 'src': source, 'tgt': target}) 2841 LOG.error(msg) 2842 raise exception.VolumeDriverException(message=msg) 2843 if status in ['copying', 'prepared']: 2844 self.ssh.stoplcmap(map_id) 2845 # Need to wait for the lcmap to change to 2846 # stopped state before remove lcmap 2847 wait_for_copy = True 2848 elif status in ['stopping', 'preparing']: 2849 wait_for_copy = True 2850 else: 2851 try: 2852 self.ssh.rmlcmap(map_id) 2853 except exception.VolumeBackendAPIException as e: 2854 rmlcmap_failed_e = e 2855 # Case 4: Copy in progress - wait and will autodelete 2856 else: 2857 if status == 'prepared': 2858 self.ssh.stoplcmap(map_id) 2859 self.ssh.rmlcmap(map_id) 2860 elif status in ['idle_or_copied', 'stopped']: 2861 # Prepare failed or stopped 2862 self.ssh.rmlcmap(map_id) 2863 else: 2864 wait_for_copy = True 2865 2866 if not wait_for_copy and rmlcmap_failed_e is not None: 2867 raise rmlcmap_failed_e 2868 2869 if not wait_for_copy or not len(mapping_ids): 2870 raise loopingcall.LoopingCallDone(retvalue=True) 2871 2872 def ensure_vdisk_no_lc_mappings(self, name, allow_snaps=True, 2873 allow_lctgt=False): 2874 """Ensure vdisk has no localcopy mappings.""" 2875 timer = loopingcall.FixedIntervalLoopingCall( 2876 self._check_vdisk_lc_mappings, name, 2877 allow_snaps, allow_lctgt) 2878 # Create a timer greenthread. The default volume service heart 2879 # beat is every 10 seconds. The localcopy usually takes hours 2880 # before it finishes. Don't set the sleep interval shorter 2881 # than the heartbeat. Otherwise volume service heartbeat 2882 # will not be serviced. 2883 LOG.debug('Calling _ensure_vdisk_no_lc_mappings: vdisk %s.', 2884 name) 2885 ret = timer.start(interval=self.check_lcmapping_interval).wait() 2886 timer.stop() 2887 return ret 2888 2889 def start_relationship(self, volume_name, primary=None): 2890 vol_attrs = self.get_vdisk_attributes(volume_name) 2891 if vol_attrs['RC_name']: 2892 self.ssh.startrcrelationship(vol_attrs['RC_name'], primary) 2893 2894 def stop_relationship(self, volume_name, access=False): 2895 vol_attrs = self.get_vdisk_attributes(volume_name) 2896 if vol_attrs['RC_name']: 2897 self.ssh.stoprcrelationship(vol_attrs['RC_name'], access=access) 2898 2899 def create_relationship(self, master, aux, system, asynccopy): 2900 try: 2901 rc_id = self.ssh.mkrcrelationship(master, aux, system, 2902 asynccopy) 2903 except exception.VolumeBackendAPIException as e: 2904 # CMMVC5959E is the code in InStorage, meaning that 2905 # there is a relationship that already has this name on the 2906 # master cluster. 2907 if 'CMMVC5959E' not in six.text_type(e): 2908 # If there is no relation between the primary and the 2909 # secondary back-end storage, the exception is raised. 2910 raise 2911 if rc_id: 2912 self.start_relationship(master) 2913 2914 def delete_relationship(self, volume_name): 2915 vol_attrs = self.get_vdisk_attributes(volume_name) 2916 if vol_attrs['RC_name']: 2917 self.ssh.rmrcrelationship(vol_attrs['RC_name'], True) 2918 2919 def get_relationship_info(self, volume_name): 2920 vol_attrs = self.get_vdisk_attributes(volume_name) 2921 if not vol_attrs or not vol_attrs['RC_name']: 2922 LOG.info("Unable to get remote copy information for " 2923 "volume %s", volume_name) 2924 return 2925 2926 relationship = self.ssh.lsrcrelationship(vol_attrs['RC_name']) 2927 return relationship[0] if len(relationship) > 0 else None 2928 2929 def delete_rc_volume(self, volume_name, target_vol=False): 2930 vol_name = volume_name 2931 if target_vol: 2932 vol_name = instorage_const.REPLICA_AUX_VOL_PREFIX + volume_name 2933 2934 try: 2935 rel_info = self.get_relationship_info(vol_name) 2936 if rel_info: 2937 self.delete_relationship(vol_name) 2938 self.delete_vdisk(vol_name, False) 2939 except Exception as e: 2940 msg = (_('Unable to delete the volume for ' 2941 'volume %(vol)s. Exception: %(err)s.') % 2942 {'vol': vol_name, 'err': e}) 2943 LOG.error(msg) 2944 raise exception.VolumeDriverException(message=msg) 2945 2946 def switch_relationship(self, relationship, aux=True): 2947 self.ssh.switchrelationship(relationship, aux) 2948 2949 def get_partnership_info(self, system_name): 2950 partnership = self.ssh.lspartnership(system_name) 2951 return partnership[0] if len(partnership) > 0 else None 2952 2953 def get_partnershipcandidate_info(self, system_name): 2954 candidates = self.ssh.lspartnershipcandidate() 2955 for candidate in candidates: 2956 if system_name == candidate['name']: 2957 return candidate 2958 return None 2959 2960 def mkippartnership(self, ip_v4, bandwith=1000, copyrate=50): 2961 self.ssh.mkippartnership(ip_v4, bandwith, copyrate) 2962 2963 def mkfcpartnership(self, system_name, bandwith=1000, copyrate=50): 2964 self.ssh.mkfcpartnership(system_name, bandwith, copyrate) 2965 2966 def chpartnership(self, partnership_id): 2967 self.ssh.chpartnership(partnership_id) 2968 2969 @staticmethod 2970 def can_migrate_to_host(host, state): 2971 if 'location_info' not in host['capabilities']: 2972 return None 2973 info = host['capabilities']['location_info'] 2974 try: 2975 (dest_type, dest_id, dest_pool) = info.split(':') 2976 except ValueError: 2977 return None 2978 if (dest_type != 'InStorageMCSDriver' or dest_id != 2979 state['system_id']): 2980 return None 2981 return dest_pool 2982 2983 def add_vdisk_qos(self, vdisk, qos): 2984 """Add the QoS configuration to the volume.""" 2985 for key, value in qos.items(): 2986 if key in self.mcs_qos_keys.keys(): 2987 param = self.mcs_qos_keys[key]['param'] 2988 self.ssh.chvdisk(vdisk, ['-' + param, str(value)]) 2989 2990 def update_vdisk_qos(self, vdisk, qos): 2991 """Update all the QoS in terms of a key and value. 2992 2993 mcs_qos_keys saves all the supported QoS parameters. Going through 2994 this dict, we set the new values to all the parameters. If QoS is 2995 available in the QoS configuration, the value is taken from it; 2996 if not, the value will be set to default. 2997 """ 2998 for key, value in self.mcs_qos_keys.items(): 2999 param = value['param'] 3000 if key in qos.keys(): 3001 # If the value is set in QoS, take the value from 3002 # the QoS configuration. 3003 v = qos[key] 3004 else: 3005 # If not, set the value to default. 3006 v = value['default'] 3007 self.ssh.chvdisk(vdisk, ['-' + param, str(v)]) 3008 3009 def disable_vdisk_qos(self, vdisk, qos): 3010 """Disable the QoS.""" 3011 for key, value in qos.items(): 3012 if key in self.mcs_qos_keys.keys(): 3013 param = self.mcs_qos_keys[key]['param'] 3014 # Take the default value. 3015 value = self.mcs_qos_keys[key]['default'] 3016 self.ssh.chvdisk(vdisk, ['-' + param, value]) 3017 3018 def change_vdisk_options(self, vdisk, changes, opts, state): 3019 if 'warning' in opts: 3020 opts['warning'] = '%s%%' % str(opts['warning']) 3021 if 'intier' in opts: 3022 opts['intier'] = 'on' if opts['intier'] else 'off' 3023 if 'autoexpand' in opts: 3024 opts['autoexpand'] = 'on' if opts['autoexpand'] else 'off' 3025 3026 for key in changes: 3027 self.ssh.chvdisk(vdisk, ['-' + key, opts[key]]) 3028 3029 def change_vdisk_iogrp(self, vdisk, state, iogrp): 3030 if state['code_level'] < (3, 0, 0, 0): 3031 LOG.debug('Ignore change IO group as storage code level is ' 3032 '%(code_level)s, below the required 3, 0, 0, 0.', 3033 {'code_level': state['code_level']}) 3034 else: 3035 self.ssh.movevdisk(vdisk, str(iogrp[0])) 3036 self.ssh.addvdiskaccess(vdisk, str(iogrp[0])) 3037 self.ssh.rmvdiskaccess(vdisk, str(iogrp[1])) 3038 3039 def vdisk_by_uid(self, vdisk_uid): 3040 """Returns the properties of the vdisk with the specified UID. 3041 3042 Returns None if no such disk exists. 3043 """ 3044 3045 vdisks = self.ssh.lsvdisks_from_filter('vdisk_UID', vdisk_uid) 3046 3047 if len(vdisks) == 0: 3048 return None 3049 3050 if len(vdisks) != 1: 3051 msg = (_('Expected single vdisk returned from lsvdisk when ' 3052 'filtering on vdisk_UID. %(count)s were returned.') % 3053 {'count': len(vdisks)}) 3054 LOG.error(msg) 3055 raise exception.VolumeBackendAPIException(data=msg) 3056 3057 vdisk = vdisks.result[0] 3058 3059 return self.ssh.lsvdisk(vdisk['name']) 3060 3061 def is_vdisk_in_use(self, vdisk): 3062 """Returns True if the specified vdisk is mapped to at least 1 host.""" 3063 resp = self.ssh.lsvdiskhostmap(vdisk) 3064 return len(resp) != 0 3065 3066 def rename_vdisk(self, vdisk, new_name): 3067 self.ssh.chvdisk(vdisk, ['-name', new_name]) 3068 3069 def change_vdisk_primary_copy(self, vdisk, copy_id): 3070 self.ssh.chvdisk(vdisk, ['-primary', copy_id]) 3071 3072 3073class InStorageSSH(object): 3074 """SSH interface to Inspur InStorage systems.""" 3075 3076 def __init__(self, run_ssh): 3077 self._ssh = run_ssh 3078 3079 def _run_ssh(self, ssh_cmd): 3080 try: 3081 return self._ssh(ssh_cmd) 3082 except processutils.ProcessExecutionError as e: 3083 msg = (_('CLI Exception output:\n command: %(cmd)s\n ' 3084 'stdout: %(out)s\n stderr: %(err)s.') % 3085 {'cmd': ssh_cmd, 3086 'out': e.stdout, 3087 'err': e.stderr}) 3088 LOG.error(msg) 3089 raise exception.VolumeBackendAPIException(data=msg) 3090 3091 def run_ssh_inq(self, ssh_cmd, delim='!', with_header=False): 3092 """Run an SSH command and return parsed output.""" 3093 raw = self._run_ssh(ssh_cmd) 3094 return CLIParser(raw, ssh_cmd=ssh_cmd, delim=delim, 3095 with_header=with_header) 3096 3097 def run_ssh_assert_no_output(self, ssh_cmd): 3098 """Run an SSH command and assert no output returned.""" 3099 out, err = self._run_ssh(ssh_cmd) 3100 if len(out.strip()) != 0: 3101 msg = (_('Expected no output from CLI command %(cmd)s, ' 3102 'got %(out)s.') % {'cmd': ' '.join(ssh_cmd), 'out': out}) 3103 LOG.error(msg) 3104 raise exception.VolumeBackendAPIException(data=msg) 3105 3106 def run_ssh_check_created(self, ssh_cmd): 3107 """Run an SSH command and return the ID of the created object.""" 3108 out, err = self._run_ssh(ssh_cmd) 3109 try: 3110 match_obj = re.search(r'\[([0-9]+)\],? successfully created', out) 3111 return match_obj.group(1) 3112 except (AttributeError, IndexError): 3113 msg = (_('Failed to parse CLI output:\n command: %(cmd)s\n ' 3114 'stdout: %(out)s\n stderr: %(err)s.') % 3115 {'cmd': ssh_cmd, 3116 'out': out, 3117 'err': err}) 3118 LOG.error(msg) 3119 raise exception.VolumeBackendAPIException(data=msg) 3120 3121 def lsnode(self, node_id=None): 3122 with_header = True 3123 ssh_cmd = ['mcsinq', 'lsnode', '-delim', '!'] 3124 if node_id: 3125 with_header = False 3126 ssh_cmd.append(node_id) 3127 return self.run_ssh_inq(ssh_cmd, with_header=with_header) 3128 3129 def lslicense(self): 3130 ssh_cmd = ['mcsinq', 'lslicense', '-delim', '!'] 3131 return self.run_ssh_inq(ssh_cmd)[0] 3132 3133 def lsguicapabilities(self): 3134 ssh_cmd = ['mcsinq', 'lsguicapabilities', '-delim', '!'] 3135 return self.run_ssh_inq(ssh_cmd)[0] 3136 3137 def lssystem(self): 3138 ssh_cmd = ['mcsinq', 'lssystem', '-delim', '!'] 3139 return self.run_ssh_inq(ssh_cmd)[0] 3140 3141 def lsmdiskgrp(self, pool): 3142 ssh_cmd = ['mcsinq', 'lsmdiskgrp', '-bytes', '-delim', '!', 3143 '"%s"' % pool] 3144 return self.run_ssh_inq(ssh_cmd)[0] 3145 3146 def lsiogrp(self): 3147 ssh_cmd = ['mcsinq', 'lsiogrp', '-delim', '!'] 3148 return self.run_ssh_inq(ssh_cmd, with_header=True) 3149 3150 def lsportip(self): 3151 ssh_cmd = ['mcsinq', 'lsportip', '-delim', '!'] 3152 return self.run_ssh_inq(ssh_cmd, with_header=True) 3153 3154 def lshost(self, host=None): 3155 with_header = True 3156 ssh_cmd = ['mcsinq', 'lshost', '-delim', '!'] 3157 if host: 3158 with_header = False 3159 ssh_cmd.append('"%s"' % host) 3160 return self.run_ssh_inq(ssh_cmd, with_header=with_header) 3161 3162 def lsiscsiauth(self): 3163 ssh_cmd = ['mcsinq', 'lsiscsiauth', '-delim', '!'] 3164 return self.run_ssh_inq(ssh_cmd, with_header=True) 3165 3166 def lsfabric(self, wwpn=None, host=None): 3167 ssh_cmd = ['mcsinq', 'lsfabric', '-delim', '!'] 3168 if wwpn: 3169 ssh_cmd.extend(['-wwpn', wwpn]) 3170 elif host: 3171 ssh_cmd.extend(['-host', '"%s"' % host]) 3172 else: 3173 msg = (_('Must pass wwpn or host to lsfabric.')) 3174 LOG.error(msg) 3175 raise exception.VolumeDriverException(message=msg) 3176 return self.run_ssh_inq(ssh_cmd, with_header=True) 3177 3178 def lsrcrelationship(self, rc_rel): 3179 key_value = 'name=%s' % rc_rel 3180 ssh_cmd = ['mcsinq', 'lsrcrelationship', '-filtervalue', 3181 key_value, '-delim', '!'] 3182 return self.run_ssh_inq(ssh_cmd, with_header=True) 3183 3184 def lspartnership(self, system_name): 3185 key_value = 'name=%s' % system_name 3186 ssh_cmd = ['mcsinq', 'lspartnership', '-filtervalue', 3187 key_value, '-delim', '!'] 3188 return self.run_ssh_inq(ssh_cmd, with_header=True) 3189 3190 def lspartnershipcandidate(self): 3191 ssh_cmd = ['mcsinq', 'lspartnershipcandidate', '-delim', '!'] 3192 return self.run_ssh_inq(ssh_cmd, with_header=True) 3193 3194 def lsvdiskhostmap(self, vdisk): 3195 ssh_cmd = ['mcsinq', 'lsvdiskhostmap', '-delim', '!', '"%s"' % vdisk] 3196 return self.run_ssh_inq(ssh_cmd, with_header=True) 3197 3198 def lshostvdiskmap(self, host): 3199 ssh_cmd = ['mcsinq', 'lshostvdiskmap', '-delim', '!', '"%s"' % host] 3200 return self.run_ssh_inq(ssh_cmd, with_header=True) 3201 3202 def lsvdisk(self, vdisk): 3203 """Return vdisk attributes or None if it doesn't exist.""" 3204 ssh_cmd = ['mcsinq', 'lsvdisk', '-bytes', '-delim', '!', 3205 '"%s"' % vdisk] 3206 out, err = self._ssh(ssh_cmd, check_exit_code=False) 3207 if not err: 3208 return CLIParser((out, err), ssh_cmd=ssh_cmd, delim='!', 3209 with_header=False)[0] 3210 if 'CMMVC5754E' in err: 3211 return None 3212 msg = (_('CLI Exception output:\n command: %(cmd)s\n ' 3213 'stdout: %(out)s\n stderr: %(err)s.') % 3214 {'cmd': ssh_cmd, 3215 'out': out, 3216 'err': err}) 3217 LOG.error(msg) 3218 raise exception.VolumeBackendAPIException(data=msg) 3219 3220 def lsvdisks_from_filter(self, filter_name, value): 3221 """Performs an lsvdisk command, filtering the results as specified. 3222 3223 Returns an iterable for all matching vdisks. 3224 """ 3225 ssh_cmd = ['mcsinq', 'lsvdisk', '-bytes', '-delim', '!', 3226 '-filtervalue', '%s=%s' % (filter_name, value)] 3227 return self.run_ssh_inq(ssh_cmd, with_header=True) 3228 3229 def lsvdisklcmappings(self, vdisk): 3230 ssh_cmd = ['mcsinq', 'lsvdisklcmappings', '-delim', '!', 3231 '"%s"' % vdisk] 3232 return self.run_ssh_inq(ssh_cmd, with_header=True) 3233 3234 def lslcmap(self, lc_map_id): 3235 ssh_cmd = ['mcsinq', 'lslcmap', '-filtervalue', 3236 'id=%s' % lc_map_id, '-delim', '!'] 3237 return self.run_ssh_inq(ssh_cmd, with_header=True) 3238 3239 def lslcconsistgrp(self, lc_consistgrp): 3240 ssh_cmd = ['mcsinq', 'lslcconsistgrp', '-delim', '!', lc_consistgrp] 3241 out, err = self._ssh(ssh_cmd) 3242 return CLIParser((out, err), ssh_cmd=ssh_cmd, delim='!', 3243 with_header=False) 3244 3245 def lsvdiskcopy(self, vdisk, copy_id=None): 3246 ssh_cmd = ['mcsinq', 'lsvdiskcopy', '-delim', '!'] 3247 with_header = True 3248 if copy_id: 3249 ssh_cmd += ['-copy', copy_id] 3250 with_header = False 3251 ssh_cmd += ['"%s"' % vdisk] 3252 return self.run_ssh_inq(ssh_cmd, with_header=with_header) 3253 3254 def lsvdisksyncprogress(self, vdisk, copy_id): 3255 ssh_cmd = ['mcsinq', 'lsvdisksyncprogress', '-delim', '!', 3256 '-copy', copy_id, '"%s"' % vdisk] 3257 return self.run_ssh_inq(ssh_cmd, with_header=True)[0] 3258 3259 def lsportfc(self, node_id): 3260 ssh_cmd = ['mcsinq', 'lsportfc', '-delim', '!', 3261 '-filtervalue', 'node_id=%s' % node_id] 3262 return self.run_ssh_inq(ssh_cmd, with_header=True) 3263 3264 @staticmethod 3265 def _create_port_arg(port_type, port_name): 3266 if port_type == 'initiator': 3267 port = ['-iscsiname'] 3268 else: 3269 port = ['-hbawwpn'] 3270 port.append(port_name) 3271 return port 3272 3273 def mkhost(self, host_name, port_type, port_name): 3274 port = self._create_port_arg(port_type, port_name) 3275 ssh_cmd = ['mcsop', 'mkhost', '-force'] + port 3276 ssh_cmd += ['-name', '"%s"' % host_name] 3277 return self.run_ssh_check_created(ssh_cmd) 3278 3279 def addhostport(self, host, port_type, port_name): 3280 port = self._create_port_arg(port_type, port_name) 3281 ssh_cmd = ['mcsop', 'addhostport', '-force'] + port + ['"%s"' % host] 3282 self.run_ssh_assert_no_output(ssh_cmd) 3283 3284 def add_chap_secret(self, secret, host): 3285 ssh_cmd = ['mcsop', 'chhost', '-chapsecret', secret, '"%s"' % host] 3286 self.run_ssh_assert_no_output(ssh_cmd) 3287 3288 def mkvdiskhostmap(self, host, vdisk, lun, multihostmap): 3289 """Map vdisk to host. 3290 3291 If vdisk already mapped and multihostmap is True, use the force flag. 3292 """ 3293 ssh_cmd = ['mcsop', 'mkvdiskhostmap', '-host', '"%s"' % host, vdisk] 3294 3295 if lun: 3296 ssh_cmd.insert(ssh_cmd.index(vdisk), '-scsi') 3297 ssh_cmd.insert(ssh_cmd.index(vdisk), lun) 3298 3299 if multihostmap: 3300 ssh_cmd.insert(ssh_cmd.index('mkvdiskhostmap') + 1, '-force') 3301 try: 3302 self.run_ssh_check_created(ssh_cmd) 3303 result_lun = self.get_vdiskhostmapid(vdisk, host) 3304 if result_lun is None or (lun and lun != result_lun): 3305 msg = (_('mkvdiskhostmap error:\n command: %(cmd)s\n ' 3306 'lun: %(lun)s\n result_lun: %(result_lun)s') % 3307 {'cmd': ssh_cmd, 3308 'lun': lun, 3309 'result_lun': result_lun}) 3310 LOG.error(msg) 3311 raise exception.VolumeDriverException(message=msg) 3312 return result_lun 3313 except Exception as ex: 3314 if (not multihostmap and hasattr(ex, 'message') and 3315 'CMMVC6071E' in ex.message): 3316 LOG.error('volume is not allowed to be mapped to multi host') 3317 raise exception.VolumeDriverException( 3318 message=_('CMMVC6071E The VDisk-to-host mapping was not ' 3319 'created because the VDisk is already mapped ' 3320 'to a host.\n"')) 3321 with excutils.save_and_reraise_exception(): 3322 LOG.error('Error mapping VDisk-to-host') 3323 3324 def mkrcrelationship(self, master, aux, system, asynccopy): 3325 ssh_cmd = ['mcsop', 'mkrcrelationship', '-master', master, 3326 '-aux', aux, '-cluster', system] 3327 if asynccopy: 3328 ssh_cmd.append('-async') 3329 return self.run_ssh_check_created(ssh_cmd) 3330 3331 def rmrcrelationship(self, relationship, force=False): 3332 ssh_cmd = ['mcsop', 'rmrcrelationship'] 3333 if force: 3334 ssh_cmd += ['-force'] 3335 ssh_cmd += [relationship] 3336 self.run_ssh_assert_no_output(ssh_cmd) 3337 3338 def switchrelationship(self, relationship, aux=True): 3339 primary = 'aux' if aux else 'master' 3340 ssh_cmd = ['mcsop', 'switchrcrelationship', '-primary', 3341 primary, relationship] 3342 self.run_ssh_assert_no_output(ssh_cmd) 3343 3344 def startrcrelationship(self, rc_rel, primary=None): 3345 ssh_cmd = ['mcsop', 'startrcrelationship', '-force'] 3346 if primary: 3347 ssh_cmd.extend(['-primary', primary]) 3348 ssh_cmd.append(rc_rel) 3349 self.run_ssh_assert_no_output(ssh_cmd) 3350 3351 def stoprcrelationship(self, relationship, access=False): 3352 ssh_cmd = ['mcsop', 'stoprcrelationship'] 3353 if access: 3354 ssh_cmd.append('-access') 3355 ssh_cmd.append(relationship) 3356 self.run_ssh_assert_no_output(ssh_cmd) 3357 3358 def mkippartnership(self, ip_v4, bandwith=1000, backgroundcopyrate=50): 3359 ssh_cmd = ['mcsop', 'mkippartnership', '-type', 'ipv4', 3360 '-clusterip', ip_v4, '-linkbandwidthmbits', 3361 six.text_type(bandwith), 3362 '-backgroundcopyrate', six.text_type(backgroundcopyrate)] 3363 return self.run_ssh_assert_no_output(ssh_cmd) 3364 3365 def mkfcpartnership(self, system_name, bandwith=1000, 3366 backgroundcopyrate=50): 3367 ssh_cmd = ['mcsop', 'mkfcpartnership', '-linkbandwidthmbits', 3368 six.text_type(bandwith), 3369 '-backgroundcopyrate', six.text_type(backgroundcopyrate), 3370 system_name] 3371 return self.run_ssh_assert_no_output(ssh_cmd) 3372 3373 def chpartnership(self, partnership_id, start=True): 3374 action = '-start' if start else '-stop' 3375 ssh_cmd = ['mcsop', 'chpartnership', action, partnership_id] 3376 return self.run_ssh_assert_no_output(ssh_cmd) 3377 3378 def rmvdiskhostmap(self, host, vdisk): 3379 ssh_cmd = ['mcsop', 'rmvdiskhostmap', '-host', '"%s"' % host, 3380 '"%s"' % vdisk] 3381 self.run_ssh_assert_no_output(ssh_cmd) 3382 3383 def get_vdiskhostmapid(self, vdisk, host): 3384 resp = self.lsvdiskhostmap(vdisk) 3385 for mapping_info in resp: 3386 if mapping_info['host_name'] == host: 3387 lun_id = mapping_info['SCSI_id'] 3388 return lun_id 3389 return None 3390 3391 def rmhost(self, host): 3392 ssh_cmd = ['mcsop', 'rmhost', '"%s"' % host] 3393 self.run_ssh_assert_no_output(ssh_cmd) 3394 3395 def mkvdisk(self, name, size, units, pool, opts, params): 3396 ssh_cmd = ['mcsop', 'mkvdisk', '-name', name, '-mdiskgrp', 3397 '"%s"' % pool, '-iogrp', six.text_type(opts['iogrp']), 3398 '-size', size, '-unit', units] + params 3399 try: 3400 return self.run_ssh_check_created(ssh_cmd) 3401 except Exception as ex: 3402 if hasattr(ex, 'msg') and 'CMMVC6372W' in ex.msg: 3403 vdisk = self.lsvdisk(name) 3404 if vdisk: 3405 LOG.warning('CMMVC6372W The virtualized storage ' 3406 'capacity that the cluster is using is ' 3407 'approaching the virtualized storage ' 3408 'capacity that is licensed.') 3409 return vdisk['id'] 3410 with excutils.save_and_reraise_exception(): 3411 LOG.exception('Failed to create vdisk %(vol)s.', {'vol': name}) 3412 3413 def rmvdisk(self, vdisk, force=True): 3414 ssh_cmd = ['mcsop', 'rmvdisk'] 3415 if force: 3416 ssh_cmd += ['-force'] 3417 ssh_cmd += ['"%s"' % vdisk] 3418 self.run_ssh_assert_no_output(ssh_cmd) 3419 3420 def chvdisk(self, vdisk, params): 3421 ssh_cmd = ['mcsop', 'chvdisk'] + params + ['"%s"' % vdisk] 3422 self.run_ssh_assert_no_output(ssh_cmd) 3423 3424 def movevdisk(self, vdisk, iogrp): 3425 ssh_cmd = ['mcsop', 'movevdisk', '-iogrp', iogrp, '"%s"' % vdisk] 3426 self.run_ssh_assert_no_output(ssh_cmd) 3427 3428 def expandvdisksize(self, vdisk, amount): 3429 ssh_cmd = ( 3430 ['mcsop', 'expandvdisksize', '-size', six.text_type(amount), 3431 '-unit', 'gb', '"%s"' % vdisk]) 3432 self.run_ssh_assert_no_output(ssh_cmd) 3433 3434 def mklcmap(self, source, target, full_copy, copy_rate, consistgrp=None): 3435 ssh_cmd = ['mcsop', 'mklcmap', '-source', '"%s"' % source, '-target', 3436 '"%s"' % target, '-autodelete'] 3437 if not full_copy: 3438 ssh_cmd.extend(['-copyrate', '0']) 3439 else: 3440 ssh_cmd.extend(['-copyrate', six.text_type(copy_rate)]) 3441 if consistgrp: 3442 ssh_cmd.extend(['-consistgrp', consistgrp]) 3443 out, err = self._ssh(ssh_cmd, check_exit_code=False) 3444 if 'successfully created' not in out: 3445 msg = (_('CLI Exception output:\n command: %(cmd)s\n ' 3446 'stdout: %(out)s\n stderr: %(err)s.') % 3447 {'cmd': ssh_cmd, 3448 'out': out, 3449 'err': err}) 3450 LOG.error(msg) 3451 raise exception.VolumeBackendAPIException(data=msg) 3452 try: 3453 match_obj = re.search(r'LocalCopy Mapping, id \[([0-9]+)\], ' 3454 'successfully created', out) 3455 lc_map_id = match_obj.group(1) 3456 except (AttributeError, IndexError): 3457 msg = (_('Failed to parse CLI output:\n command: %(cmd)s\n ' 3458 'stdout: %(out)s\n stderr: %(err)s.') % 3459 {'cmd': ssh_cmd, 3460 'out': out, 3461 'err': err}) 3462 LOG.error(msg) 3463 raise exception.VolumeBackendAPIException(data=msg) 3464 return lc_map_id 3465 3466 def prestartlcmap(self, lc_map_id): 3467 ssh_cmd = ['mcsop', 'prestartlcmap', lc_map_id] 3468 self.run_ssh_assert_no_output(ssh_cmd) 3469 3470 def startlcmap(self, lc_map_id): 3471 ssh_cmd = ['mcsop', 'startlcmap', lc_map_id] 3472 self.run_ssh_assert_no_output(ssh_cmd) 3473 3474 def prestartlcconsistgrp(self, lc_consist_group): 3475 ssh_cmd = ['mcsop', 'prestartlcconsistgrp', lc_consist_group] 3476 self.run_ssh_assert_no_output(ssh_cmd) 3477 3478 def startlcconsistgrp(self, lc_consist_group): 3479 ssh_cmd = ['mcsop', 'startlcconsistgrp', lc_consist_group] 3480 self.run_ssh_assert_no_output(ssh_cmd) 3481 3482 def stoplcconsistgrp(self, lc_consist_group): 3483 ssh_cmd = ['mcsop', 'stoplcconsistgrp', lc_consist_group] 3484 self.run_ssh_assert_no_output(ssh_cmd) 3485 3486 def chlcmap(self, lc_map_id, copyrate='50', autodel='on'): 3487 ssh_cmd = ['mcsop', 'chlcmap', '-copyrate', copyrate, 3488 '-autodelete', autodel, lc_map_id] 3489 self.run_ssh_assert_no_output(ssh_cmd) 3490 3491 def stoplcmap(self, lc_map_id): 3492 ssh_cmd = ['mcsop', 'stoplcmap', lc_map_id] 3493 self.run_ssh_assert_no_output(ssh_cmd) 3494 3495 def rmlcmap(self, lc_map_id): 3496 ssh_cmd = ['mcsop', 'rmlcmap', '-force', lc_map_id] 3497 self.run_ssh_assert_no_output(ssh_cmd) 3498 3499 def mklcconsistgrp(self, lc_consist_group): 3500 ssh_cmd = ['mcsop', 'mklcconsistgrp', '-name', lc_consist_group] 3501 return self.run_ssh_check_created(ssh_cmd) 3502 3503 def rmlcconsistgrp(self, lc_consist_group): 3504 ssh_cmd = ['mcsop', 'rmlcconsistgrp', '-force', lc_consist_group] 3505 return self.run_ssh_assert_no_output(ssh_cmd) 3506 3507 def addvdiskcopy(self, vdisk, dest_pool, params): 3508 ssh_cmd = (['mcsop', 'addvdiskcopy'] + 3509 params + 3510 ['-mdiskgrp', '"%s"' % 3511 dest_pool, '"%s"' % 3512 vdisk]) 3513 return self.run_ssh_check_created(ssh_cmd) 3514 3515 def rmvdiskcopy(self, vdisk, copy_id): 3516 ssh_cmd = ['mcsop', 'rmvdiskcopy', '-copy', copy_id, '"%s"' % vdisk] 3517 self.run_ssh_assert_no_output(ssh_cmd) 3518 3519 def addvdiskaccess(self, vdisk, iogrp): 3520 ssh_cmd = ['mcsop', 'addvdiskaccess', '-iogrp', iogrp, 3521 '"%s"' % vdisk] 3522 self.run_ssh_assert_no_output(ssh_cmd) 3523 3524 def rmvdiskaccess(self, vdisk, iogrp): 3525 ssh_cmd = ['mcsop', 'rmvdiskaccess', '-iogrp', iogrp, '"%s"' % vdisk] 3526 self.run_ssh_assert_no_output(ssh_cmd) 3527 3528 3529class CLIParser(object): 3530 """Parse MCS CLI output and generate iterable.""" 3531 3532 def __init__(self, raw, ssh_cmd=None, delim='!', with_header=True): 3533 super(CLIParser, self).__init__() 3534 if ssh_cmd: 3535 self.ssh_cmd = ' '.join(ssh_cmd) 3536 else: 3537 self.ssh_cmd = 'None' 3538 self.raw = raw 3539 self.delim = delim 3540 self.with_header = with_header 3541 self.result = self._parse() 3542 3543 def select(self, *keys): 3544 for a in self.result: 3545 vs = [] 3546 for k in keys: 3547 v = a.get(k, None) 3548 if isinstance(v, six.string_types) or v is None: 3549 v = [v] 3550 if isinstance(v, list): 3551 vs.append(v) 3552 for item in zip(*vs): 3553 if len(item) == 1: 3554 yield item[0] 3555 else: 3556 yield item 3557 3558 def __getitem__(self, key): 3559 try: 3560 return self.result[key] 3561 except KeyError: 3562 msg = (_('Did not find the expected key %(key)s in %(fun)s: ' 3563 '%(raw)s.') % {'key': key, 'fun': self.ssh_cmd, 3564 'raw': self.raw}) 3565 raise exception.VolumeBackendAPIException(data=msg) 3566 3567 def __iter__(self): 3568 for a in self.result: 3569 yield a 3570 3571 def __len__(self): 3572 return len(self.result) 3573 3574 def _parse(self): 3575 def get_reader(content, delim): 3576 for line in content.lstrip().splitlines(): 3577 line = line.strip() 3578 if line: 3579 yield line.split(delim) 3580 else: 3581 yield [] 3582 3583 if isinstance(self.raw, six.string_types): 3584 stdout, stderr = self.raw, '' 3585 else: 3586 stdout, stderr = self.raw 3587 reader = get_reader(stdout, self.delim) 3588 result = [] 3589 3590 if self.with_header: 3591 hds = tuple() 3592 for row in reader: 3593 hds = row 3594 break 3595 for row in reader: 3596 cur = dict() 3597 if len(hds) != len(row): 3598 msg = (_('Unexpected CLI response: header/row mismatch. ' 3599 'header: %(header)s, row: %(row)s.') 3600 % {'header': hds, 3601 'row': row}) 3602 raise exception.VolumeBackendAPIException(data=msg) 3603 for k, v in zip(hds, row): 3604 CLIParser.append_dict(cur, k, v) 3605 result.append(cur) 3606 else: 3607 cur = dict() 3608 for row in reader: 3609 if row: 3610 CLIParser.append_dict(cur, row[0], ' '.join(row[1:])) 3611 elif cur: # start new section 3612 result.append(cur) 3613 cur = dict() 3614 if cur: 3615 result.append(cur) 3616 return result 3617 3618 @staticmethod 3619 def append_dict(dict_, key, value): 3620 key, value = key.strip(), value.strip() 3621 obj = dict_.get(key, None) 3622 if obj is None: 3623 dict_[key] = value 3624 elif isinstance(obj, list): 3625 obj.append(value) 3626 dict_[key] = obj 3627 else: 3628 dict_[key] = [obj, value] 3629 return dict_ 3630