1# Copyright (c) 2011 OpenStack Foundation 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16""" 17Manage backends in the current zone. 18""" 19 20import collections 21 22from oslo_config import cfg 23from oslo_log import log as logging 24from oslo_utils import importutils 25from oslo_utils import strutils 26from oslo_utils import timeutils 27 28from cinder.common import constants 29from cinder import context as cinder_context 30from cinder import exception 31from cinder import objects 32from cinder.scheduler import filters 33from cinder import utils 34from cinder.volume import utils as vol_utils 35from cinder.volume import volume_types 36 37 38# FIXME: This file should be renamed to backend_manager, we should also rename 39# HostManager class, and scheduler_host_manager option, and also the weight 40# classes, and add code to maintain backward compatibility. 41 42 43host_manager_opts = [ 44 cfg.ListOpt('scheduler_default_filters', 45 default=[ 46 'AvailabilityZoneFilter', 47 'CapacityFilter', 48 'CapabilitiesFilter' 49 ], 50 help='Which filter class names to use for filtering hosts ' 51 'when not specified in the request.'), 52 cfg.ListOpt('scheduler_default_weighers', 53 default=[ 54 'CapacityWeigher' 55 ], 56 help='Which weigher class names to use for weighing hosts.'), 57 cfg.StrOpt('scheduler_weight_handler', 58 default='cinder.scheduler.weights.OrderedHostWeightHandler', 59 help='Which handler to use for selecting the host/pool ' 60 'after weighing'), 61] 62 63CONF = cfg.CONF 64CONF.register_opts(host_manager_opts) 65CONF.import_opt('scheduler_driver', 'cinder.scheduler.manager') 66CONF.import_opt('max_over_subscription_ratio', 'cinder.volume.driver') 67 68LOG = logging.getLogger(__name__) 69 70 71class ReadOnlyDict(collections.Mapping): 72 """A read-only dict.""" 73 def __init__(self, source=None): 74 if source is not None: 75 self.data = dict(source) 76 else: 77 self.data = {} 78 79 def __getitem__(self, key): 80 return self.data[key] 81 82 def __iter__(self): 83 return iter(self.data) 84 85 def __len__(self): 86 return len(self.data) 87 88 def __repr__(self): 89 return '%s(%r)' % (self.__class__.__name__, self.data) 90 91 92class BackendState(object): 93 """Mutable and immutable information tracked for a volume backend.""" 94 95 def __init__(self, host, cluster_name, capabilities=None, service=None): 96 # NOTE(geguileo): We have a circular dependency between BackendState 97 # and PoolState and we resolve it with an instance attribute instead 98 # of a class attribute that we would assign after the PoolState 99 # declaration because this way we avoid splitting the code. 100 self.pool_state_cls = PoolState 101 102 self.capabilities = None 103 self.service = None 104 self.host = host 105 self.cluster_name = cluster_name 106 self.update_capabilities(capabilities, service) 107 108 self.volume_backend_name = None 109 self.vendor_name = None 110 self.driver_version = 0 111 self.storage_protocol = None 112 self.QoS_support = False 113 # Mutable available resources. 114 # These will change as resources are virtually "consumed". 115 self.total_capacity_gb = 0 116 # capacity has been allocated in cinder POV, which should be 117 # sum(vol['size'] for vol in vols_on_hosts) 118 self.allocated_capacity_gb = 0 119 self.free_capacity_gb = None 120 self.reserved_percentage = 0 121 # The apparent allocated space indicating how much capacity 122 # has been provisioned. This could be the sum of sizes of 123 # all volumes on a backend, which could be greater than or 124 # equal to the allocated_capacity_gb. 125 self.provisioned_capacity_gb = 0 126 self.max_over_subscription_ratio = 1.0 127 self.thin_provisioning_support = False 128 self.thick_provisioning_support = False 129 # Does this backend support attaching a volume to more than 130 # once host/instance? 131 self.multiattach = False 132 133 # PoolState for all pools 134 self.pools = {} 135 136 self.updated = None 137 138 @property 139 def backend_id(self): 140 return self.cluster_name or self.host 141 142 def update_capabilities(self, capabilities=None, service=None): 143 # Read-only capability dicts 144 145 if capabilities is None: 146 capabilities = {} 147 self.capabilities = ReadOnlyDict(capabilities) 148 if service is None: 149 service = {} 150 self.service = ReadOnlyDict(service) 151 152 def update_from_volume_capability(self, capability, service=None): 153 """Update information about a host from its volume_node info. 154 155 'capability' is the status info reported by volume backend, a typical 156 capability looks like this: 157 158 .. code-block:: python 159 160 { 161 capability = { 162 'volume_backend_name': 'Local iSCSI', # 163 'vendor_name': 'OpenStack', # backend level 164 'driver_version': '1.0', # mandatory/fixed 165 'storage_protocol': 'iSCSI', # stats&capabilities 166 167 'active_volumes': 10, # 168 'IOPS_provisioned': 30000, # optional custom 169 'fancy_capability_1': 'eat', # stats & capabilities 170 'fancy_capability_2': 'drink', # 171 172 'pools': [ 173 {'pool_name': '1st pool', # 174 'total_capacity_gb': 500, # mandatory stats for 175 'free_capacity_gb': 230, # pools 176 'allocated_capacity_gb': 270, # 177 'QoS_support': 'False', # 178 'reserved_percentage': 0, # 179 180 'dying_disks': 100, # 181 'super_hero_1': 'spider-man', # optional custom 182 'super_hero_2': 'flash', # stats & capabilities 183 'super_hero_3': 'neoncat' # 184 }, 185 {'pool_name': '2nd pool', 186 'total_capacity_gb': 1024, 187 'free_capacity_gb': 1024, 188 'allocated_capacity_gb': 0, 189 'QoS_support': 'False', 190 'reserved_percentage': 0, 191 192 'dying_disks': 200, 193 'super_hero_1': 'superman', 194 'super_hero_2': ' ', 195 'super_hero_2': 'Hulk' 196 } 197 ] 198 } 199 } 200 201 """ 202 self.update_capabilities(capability, service) 203 204 if capability: 205 if self.updated and self.updated > capability['timestamp']: 206 return 207 208 # Update backend level info 209 self.update_backend(capability) 210 211 # Update pool level info 212 self.update_pools(capability, service) 213 214 def update_pools(self, capability, service): 215 """Update storage pools information from backend reported info.""" 216 if not capability: 217 return 218 219 pools = capability.get('pools', None) 220 active_pools = set() 221 if pools and isinstance(pools, list): 222 # Update all pools stats according to information from list 223 # of pools in volume capacity 224 for pool_cap in pools: 225 pool_name = pool_cap['pool_name'] 226 self._append_backend_info(pool_cap) 227 cur_pool = self.pools.get(pool_name, None) 228 if not cur_pool: 229 # Add new pool 230 cur_pool = self.pool_state_cls(self.host, 231 self.cluster_name, 232 pool_cap, 233 pool_name) 234 self.pools[pool_name] = cur_pool 235 cur_pool.update_from_volume_capability(pool_cap, service) 236 237 active_pools.add(pool_name) 238 elif pools is None: 239 # To handle legacy driver that doesn't report pool 240 # information in the capability, we have to prepare 241 # a pool from backend level info, or to update the one 242 # we created in self.pools. 243 pool_name = self.volume_backend_name 244 if pool_name is None: 245 # To get DEFAULT_POOL_NAME 246 pool_name = vol_utils.extract_host(self.host, 'pool', True) 247 248 if len(self.pools) == 0: 249 # No pool was there 250 single_pool = self.pool_state_cls(self.host, self.cluster_name, 251 capability, pool_name) 252 self._append_backend_info(capability) 253 self.pools[pool_name] = single_pool 254 else: 255 # this is an update from legacy driver 256 try: 257 single_pool = self.pools[pool_name] 258 except KeyError: 259 single_pool = self.pool_state_cls(self.host, 260 self.cluster_name, 261 capability, 262 pool_name) 263 self._append_backend_info(capability) 264 self.pools[pool_name] = single_pool 265 266 single_pool.update_from_volume_capability(capability, service) 267 active_pools.add(pool_name) 268 269 # remove non-active pools from self.pools 270 nonactive_pools = set(self.pools.keys()) - active_pools 271 for pool in nonactive_pools: 272 LOG.debug("Removing non-active pool %(pool)s @ %(host)s " 273 "from scheduler cache.", {'pool': pool, 274 'host': self.host}) 275 del self.pools[pool] 276 277 def _append_backend_info(self, pool_cap): 278 # Fill backend level info to pool if needed. 279 if not pool_cap.get('volume_backend_name', None): 280 pool_cap['volume_backend_name'] = self.volume_backend_name 281 282 if not pool_cap.get('storage_protocol', None): 283 pool_cap['storage_protocol'] = self.storage_protocol 284 285 if not pool_cap.get('vendor_name', None): 286 pool_cap['vendor_name'] = self.vendor_name 287 288 if not pool_cap.get('driver_version', None): 289 pool_cap['driver_version'] = self.driver_version 290 291 if not pool_cap.get('timestamp', None): 292 pool_cap['timestamp'] = self.updated 293 294 def update_backend(self, capability): 295 self.volume_backend_name = capability.get('volume_backend_name', None) 296 self.vendor_name = capability.get('vendor_name', None) 297 self.driver_version = capability.get('driver_version', None) 298 self.storage_protocol = capability.get('storage_protocol', None) 299 self.updated = capability['timestamp'] 300 301 def consume_from_volume(self, volume, update_time=True): 302 """Incrementally update host state from a volume.""" 303 volume_gb = volume['size'] 304 self.allocated_capacity_gb += volume_gb 305 self.provisioned_capacity_gb += volume_gb 306 if self.free_capacity_gb == 'infinite': 307 # There's virtually infinite space on back-end 308 pass 309 elif self.free_capacity_gb == 'unknown': 310 # Unable to determine the actual free space on back-end 311 pass 312 else: 313 self.free_capacity_gb -= volume_gb 314 if update_time: 315 self.updated = timeutils.utcnow() 316 LOG.debug("Consumed %s GB from backend: %s", volume['size'], self) 317 318 def __repr__(self): 319 # FIXME(zhiteng) backend level free_capacity_gb isn't as 320 # meaningful as it used to be before pool is introduced, we'd 321 # come up with better representation of HostState. 322 grouping = 'cluster' if self.cluster_name else 'host' 323 grouping_name = self.backend_id 324 return ("%(grouping)s '%(grouping_name)s':" 325 "free_capacity_gb: %(free_capacity_gb)s, " 326 "total_capacity_gb: %(total_capacity_gb)s," 327 "allocated_capacity_gb: %(allocated_capacity_gb)s, " 328 "max_over_subscription_ratio: %(mosr)s," 329 "reserved_percentage: %(reserved_percentage)s, " 330 "provisioned_capacity_gb: %(provisioned_capacity_gb)s," 331 "thin_provisioning_support: %(thin_provisioning_support)s, " 332 "thick_provisioning_support: %(thick)s," 333 "pools: %(pools)s," 334 "updated at: %(updated)s" % 335 {'grouping': grouping, 'grouping_name': grouping_name, 336 'free_capacity_gb': self.free_capacity_gb, 337 'total_capacity_gb': self.total_capacity_gb, 338 'allocated_capacity_gb': self.allocated_capacity_gb, 339 'mosr': self.max_over_subscription_ratio, 340 'reserved_percentage': self.reserved_percentage, 341 'provisioned_capacity_gb': self.provisioned_capacity_gb, 342 'thin_provisioning_support': self.thin_provisioning_support, 343 'thick': self.thick_provisioning_support, 344 'pools': self.pools, 'updated': self.updated}) 345 346 347class PoolState(BackendState): 348 def __init__(self, host, cluster_name, capabilities, pool_name): 349 new_host = vol_utils.append_host(host, pool_name) 350 new_cluster = vol_utils.append_host(cluster_name, pool_name) 351 super(PoolState, self).__init__(new_host, new_cluster, capabilities) 352 self.pool_name = pool_name 353 # No pools in pool 354 self.pools = None 355 356 def update_from_volume_capability(self, capability, service=None): 357 """Update information about a pool from its volume_node info.""" 358 LOG.debug("Updating capabilities for %s: %s", self.host, capability) 359 self.update_capabilities(capability, service) 360 if capability: 361 if self.updated and self.updated > capability['timestamp']: 362 return 363 self.update_backend(capability) 364 365 self.total_capacity_gb = capability.get('total_capacity_gb', 0) 366 self.free_capacity_gb = capability.get('free_capacity_gb', 0) 367 self.allocated_capacity_gb = capability.get( 368 'allocated_capacity_gb', 0) 369 self.QoS_support = capability.get('QoS_support', False) 370 self.reserved_percentage = capability.get('reserved_percentage', 0) 371 # provisioned_capacity_gb is the apparent total capacity of 372 # all the volumes created on a backend, which is greater than 373 # or equal to allocated_capacity_gb, which is the apparent 374 # total capacity of all the volumes created on a backend 375 # in Cinder. Using allocated_capacity_gb as the default of 376 # provisioned_capacity_gb if it is not set. 377 self.provisioned_capacity_gb = capability.get( 378 'provisioned_capacity_gb', self.allocated_capacity_gb) 379 self.thin_provisioning_support = capability.get( 380 'thin_provisioning_support', False) 381 self.thick_provisioning_support = capability.get( 382 'thick_provisioning_support', False) 383 384 self.max_over_subscription_ratio = ( 385 utils.calculate_max_over_subscription_ratio( 386 capability, CONF.max_over_subscription_ratio)) 387 388 self.multiattach = capability.get('multiattach', False) 389 390 def update_pools(self, capability): 391 # Do nothing, since we don't have pools within pool, yet 392 pass 393 394 395class HostManager(object): 396 """Base HostManager class.""" 397 398 backend_state_cls = BackendState 399 400 REQUIRED_KEYS = frozenset([ 401 'pool_name', 402 'total_capacity_gb', 403 'free_capacity_gb', 404 'allocated_capacity_gb', 405 'provisioned_capacity_gb', 406 'thin_provisioning_support', 407 'thick_provisioning_support', 408 'max_over_subscription_ratio', 409 'reserved_percentage']) 410 411 def __init__(self): 412 self.service_states = {} # { <host|cluster>: {<service>: {cap k : v}}} 413 self.backend_state_map = {} 414 self.filter_handler = filters.BackendFilterHandler('cinder.scheduler.' 415 'filters') 416 self.filter_classes = self.filter_handler.get_all_classes() 417 self.enabled_filters = self._choose_backend_filters( 418 CONF.scheduler_default_filters) 419 self.weight_handler = importutils.import_object( 420 CONF.scheduler_weight_handler, 421 'cinder.scheduler.weights') 422 self.weight_classes = self.weight_handler.get_all_classes() 423 424 self._no_capabilities_backends = set() # Services without capabilities 425 self._update_backend_state_map(cinder_context.get_admin_context()) 426 self.service_states_last_update = {} 427 428 def _choose_backend_filters(self, filter_cls_names): 429 """Return a list of available filter names. 430 431 This function checks input filter names against a predefined set 432 of acceptable filters (all loaded filters). If input is None, 433 it uses CONF.scheduler_default_filters instead. 434 """ 435 if not isinstance(filter_cls_names, (list, tuple)): 436 filter_cls_names = [filter_cls_names] 437 good_filters = [] 438 bad_filters = [] 439 for filter_name in filter_cls_names: 440 found_class = False 441 for cls in self.filter_classes: 442 if cls.__name__ == filter_name: 443 found_class = True 444 good_filters.append(cls) 445 break 446 if not found_class: 447 bad_filters.append(filter_name) 448 if bad_filters: 449 raise exception.SchedulerHostFilterNotFound( 450 filter_name=", ".join(bad_filters)) 451 return good_filters 452 453 def _choose_backend_weighers(self, weight_cls_names): 454 """Return a list of available weigher names. 455 456 This function checks input weigher names against a predefined set 457 of acceptable weighers (all loaded weighers). If input is None, 458 it uses CONF.scheduler_default_weighers instead. 459 """ 460 if weight_cls_names is None: 461 weight_cls_names = CONF.scheduler_default_weighers 462 if not isinstance(weight_cls_names, (list, tuple)): 463 weight_cls_names = [weight_cls_names] 464 465 good_weighers = [] 466 bad_weighers = [] 467 for weigher_name in weight_cls_names: 468 found_class = False 469 for cls in self.weight_classes: 470 if cls.__name__ == weigher_name: 471 good_weighers.append(cls) 472 found_class = True 473 break 474 if not found_class: 475 bad_weighers.append(weigher_name) 476 if bad_weighers: 477 raise exception.SchedulerHostWeigherNotFound( 478 weigher_name=", ".join(bad_weighers)) 479 return good_weighers 480 481 def get_filtered_backends(self, backends, filter_properties, 482 filter_class_names=None): 483 """Filter backends and return only ones passing all filters.""" 484 if filter_class_names is not None: 485 filter_classes = self._choose_backend_filters(filter_class_names) 486 else: 487 filter_classes = self.enabled_filters 488 return self.filter_handler.get_filtered_objects(filter_classes, 489 backends, 490 filter_properties) 491 492 def get_weighed_backends(self, backends, weight_properties, 493 weigher_class_names=None): 494 """Weigh the backends.""" 495 weigher_classes = self._choose_backend_weighers(weigher_class_names) 496 return self.weight_handler.get_weighed_objects(weigher_classes, 497 backends, 498 weight_properties) 499 500 def update_service_capabilities(self, service_name, host, capabilities, 501 cluster_name, timestamp): 502 """Update the per-service capabilities based on this notification.""" 503 if service_name != 'volume': 504 LOG.debug('Ignoring %(service_name)s service update ' 505 'from %(host)s', 506 {'service_name': service_name, 'host': host}) 507 return 508 509 # TODO(geguileo): In P - Remove the next line since we receive the 510 # timestamp 511 timestamp = timestamp or timeutils.utcnow() 512 # Copy the capabilities, so we don't modify the original dict 513 capab_copy = dict(capabilities) 514 capab_copy["timestamp"] = timestamp 515 516 # Set the default capabilities in case None is set. 517 backend = cluster_name or host 518 capab_old = self.service_states.get(backend, {"timestamp": 0}) 519 capab_last_update = self.service_states_last_update.get( 520 backend, {"timestamp": 0}) 521 522 # Ignore older updates 523 if capab_old['timestamp'] and timestamp < capab_old['timestamp']: 524 LOG.info('Ignoring old capability report from %s.', backend) 525 return 526 527 # If the capabilities are not changed and the timestamp is older, 528 # record the capabilities. 529 530 # There are cases: capab_old has the capabilities set, 531 # but the timestamp may be None in it. So does capab_last_update. 532 533 if (not self._get_updated_pools(capab_old, capab_copy)) and ( 534 (not capab_old.get("timestamp")) or 535 (not capab_last_update.get("timestamp")) or 536 (capab_last_update["timestamp"] < capab_old["timestamp"])): 537 self.service_states_last_update[backend] = capab_old 538 539 self.service_states[backend] = capab_copy 540 541 cluster_msg = (('Cluster: %s - Host: ' % cluster_name) if cluster_name 542 else '') 543 LOG.debug("Received %(service_name)s service update from %(cluster)s" 544 "%(host)s: %(cap)s%(cluster)s", 545 {'service_name': service_name, 'host': host, 546 'cap': capabilities, 547 'cluster': cluster_msg}) 548 549 self._no_capabilities_backends.discard(backend) 550 551 def notify_service_capabilities(self, service_name, backend, capabilities, 552 timestamp): 553 """Notify the ceilometer with updated volume stats""" 554 if service_name != 'volume': 555 return 556 557 updated = [] 558 capa_new = self.service_states.get(backend, {}) 559 timestamp = timestamp or timeutils.utcnow() 560 561 # Compare the capabilities and timestamps to decide notifying 562 if not capa_new: 563 updated = self._get_updated_pools(capa_new, capabilities) 564 else: 565 if timestamp > self.service_states[backend]["timestamp"]: 566 updated = self._get_updated_pools( 567 self.service_states[backend], capabilities) 568 if not updated: 569 updated = self._get_updated_pools( 570 self.service_states_last_update.get(backend, {}), 571 self.service_states.get(backend, {})) 572 573 if updated: 574 capab_copy = dict(capabilities) 575 capab_copy["timestamp"] = timestamp 576 # If capabilities changes, notify and record the capabilities. 577 self.service_states_last_update[backend] = capab_copy 578 self.get_usage_and_notify(capabilities, updated, backend, 579 timestamp) 580 581 def has_all_capabilities(self): 582 return len(self._no_capabilities_backends) == 0 583 584 def _update_backend_state_map(self, context): 585 586 # Get resource usage across the available volume nodes: 587 topic = constants.VOLUME_TOPIC 588 volume_services = objects.ServiceList.get_all(context, 589 {'topic': topic, 590 'disabled': False, 591 'frozen': False}) 592 active_backends = set() 593 active_hosts = set() 594 no_capabilities_backends = set() 595 for service in volume_services.objects: 596 host = service.host 597 if not service.is_up: 598 LOG.warning("volume service is down. (host: %s)", host) 599 continue 600 601 backend_key = service.service_topic_queue 602 # We only pay attention to the first up service of a cluster since 603 # they all refer to the same capabilities entry in service_states 604 if backend_key in active_backends: 605 active_hosts.add(host) 606 continue 607 608 # Capabilities may come from the cluster or the host if the service 609 # has just been converted to a cluster service. 610 capabilities = (self.service_states.get(service.cluster_name, None) 611 or self.service_states.get(service.host, None)) 612 if capabilities is None: 613 no_capabilities_backends.add(backend_key) 614 continue 615 616 # Since the service could have been added or remove from a cluster 617 backend_state = self.backend_state_map.get(backend_key, None) 618 if not backend_state: 619 backend_state = self.backend_state_cls( 620 host, 621 service.cluster_name, 622 capabilities=capabilities, 623 service=dict(service)) 624 self.backend_state_map[backend_key] = backend_state 625 626 # update capabilities and attributes in backend_state 627 backend_state.update_from_volume_capability(capabilities, 628 service=dict(service)) 629 active_backends.add(backend_key) 630 631 self._no_capabilities_backends = no_capabilities_backends 632 633 # remove non-active keys from backend_state_map 634 inactive_backend_keys = set(self.backend_state_map) - active_backends 635 for backend_key in inactive_backend_keys: 636 # NOTE(geguileo): We don't want to log the removal of a host from 637 # the map when we are removing it because it has been added to a 638 # cluster. 639 if backend_key not in active_hosts: 640 LOG.info("Removing non-active backend: %(backend)s from " 641 "scheduler cache.", {'backend': backend_key}) 642 del self.backend_state_map[backend_key] 643 644 def revert_volume_consumed_capacity(self, pool_name, size): 645 for backend_key, state in self.backend_state_map.items(): 646 for key in state.pools: 647 pool_state = state.pools[key] 648 if pool_name == '#'.join([backend_key, pool_state.pool_name]): 649 pool_state.consume_from_volume({'size': -size}, 650 update_time=False) 651 652 def get_all_backend_states(self, context): 653 """Returns a dict of all the backends the HostManager knows about. 654 655 Each of the consumable resources in BackendState are 656 populated with capabilities scheduler received from RPC. 657 658 For example: 659 {'192.168.1.100': BackendState(), ...} 660 """ 661 662 self._update_backend_state_map(context) 663 664 # build a pool_state map and return that map instead of 665 # backend_state_map 666 all_pools = {} 667 for backend_key, state in self.backend_state_map.items(): 668 for key in state.pools: 669 pool = state.pools[key] 670 # use backend_key.pool_name to make sure key is unique 671 pool_key = '.'.join([backend_key, pool.pool_name]) 672 all_pools[pool_key] = pool 673 674 return all_pools.values() 675 676 def _filter_pools_by_volume_type(self, context, volume_type, pools): 677 """Return the pools filtered by volume type specs""" 678 679 # wrap filter properties only with volume_type 680 filter_properties = { 681 'context': context, 682 'volume_type': volume_type, 683 'resource_type': volume_type, 684 'qos_specs': volume_type.get('qos_specs'), 685 } 686 687 filtered = self.get_filtered_backends(pools.values(), 688 filter_properties) 689 690 # filter the pools by value 691 return {k: v for k, v in pools.items() if v in filtered} 692 693 def get_pools(self, context, filters=None): 694 """Returns a dict of all pools on all hosts HostManager knows about.""" 695 696 self._update_backend_state_map(context) 697 698 all_pools = {} 699 name = volume_type = None 700 if filters: 701 name = filters.pop('name', None) 702 volume_type = filters.pop('volume_type', None) 703 704 for backend_key, state in self.backend_state_map.items(): 705 for key in state.pools: 706 filtered = False 707 pool = state.pools[key] 708 # use backend_key.pool_name to make sure key is unique 709 pool_key = vol_utils.append_host(backend_key, pool.pool_name) 710 new_pool = dict(name=pool_key) 711 new_pool.update(dict(capabilities=pool.capabilities)) 712 713 if name and new_pool.get('name') != name: 714 continue 715 716 if filters: 717 # filter all other items in capabilities 718 for (attr, value) in filters.items(): 719 cap = new_pool.get('capabilities').get(attr) 720 if not self._equal_after_convert(cap, value): 721 filtered = True 722 break 723 724 if not filtered: 725 all_pools[pool_key] = pool 726 727 # filter pools by volume type 728 if volume_type: 729 volume_type = volume_types.get_by_name_or_id( 730 context, volume_type) 731 all_pools = ( 732 self._filter_pools_by_volume_type(context, 733 volume_type, 734 all_pools)) 735 736 # encapsulate pools in format:{name: XXX, capabilities: XXX} 737 return [dict(name=key, capabilities=value.capabilities) 738 for key, value in all_pools.items()] 739 740 def get_usage_and_notify(self, capa_new, updated_pools, host, timestamp): 741 context = cinder_context.get_admin_context() 742 usage = self._get_usage(capa_new, updated_pools, host, timestamp) 743 744 self._notify_capacity_usage(context, usage) 745 746 def _get_usage(self, capa_new, updated_pools, host, timestamp): 747 pools = capa_new.get('pools') 748 usage = [] 749 if pools and isinstance(pools, list): 750 backend_usage = dict(type='backend', 751 name_to_id=host, 752 total=0, 753 free=0, 754 allocated=0, 755 provisioned=0, 756 virtual_free=0, 757 reported_at=timestamp) 758 759 # Process the usage. 760 for pool in pools: 761 pool_usage = self._get_pool_usage(pool, host, timestamp) 762 if pool_usage: 763 backend_usage["total"] += pool_usage["total"] 764 backend_usage["free"] += pool_usage["free"] 765 backend_usage["allocated"] += pool_usage["allocated"] 766 backend_usage["provisioned"] += pool_usage["provisioned"] 767 backend_usage["virtual_free"] += pool_usage["virtual_free"] 768 # Only the updated pool is reported. 769 if pool in updated_pools: 770 usage.append(pool_usage) 771 usage.append(backend_usage) 772 return usage 773 774 def _get_pool_usage(self, pool, host, timestamp): 775 total = pool["total_capacity_gb"] 776 free = pool["free_capacity_gb"] 777 778 unknowns = ["unknown", "infinite", None] 779 if (total in unknowns) or (free in unknowns): 780 return {} 781 782 allocated = pool["allocated_capacity_gb"] 783 provisioned = pool["provisioned_capacity_gb"] 784 reserved = pool["reserved_percentage"] 785 ratio = utils.calculate_max_over_subscription_ratio( 786 pool, CONF.max_over_subscription_ratio) 787 support = pool["thin_provisioning_support"] 788 789 virtual_free = utils.calculate_virtual_free_capacity( 790 total, 791 free, 792 provisioned, 793 support, 794 ratio, 795 reserved, 796 support) 797 798 pool_usage = dict( 799 type='pool', 800 name_to_id='#'.join([host, pool['pool_name']]), 801 total=float(total), 802 free=float(free), 803 allocated=float(allocated), 804 provisioned=float(provisioned), 805 virtual_free=float(virtual_free), 806 reported_at=timestamp) 807 808 return pool_usage 809 810 def _get_updated_pools(self, old_capa, new_capa): 811 # Judge if the capabilities should be reported. 812 813 new_pools = new_capa.get('pools', []) 814 if not new_pools: 815 return [] 816 817 if isinstance(new_pools, list): 818 # If the volume_stats is not well prepared, don't notify. 819 if not all( 820 self.REQUIRED_KEYS.issubset(pool) for pool in new_pools): 821 return [] 822 else: 823 LOG.debug("The reported capabilities are not well structured...") 824 return [] 825 826 old_pools = old_capa.get('pools', []) 827 if not old_pools: 828 return new_pools 829 830 updated_pools = [] 831 832 newpools = {} 833 oldpools = {} 834 for new_pool in new_pools: 835 newpools[new_pool['pool_name']] = new_pool 836 837 for old_pool in old_pools: 838 oldpools[old_pool['pool_name']] = old_pool 839 840 for key in newpools.keys(): 841 if key in oldpools.keys(): 842 for k in self.REQUIRED_KEYS: 843 if newpools[key][k] != oldpools[key][k]: 844 updated_pools.append(newpools[key]) 845 break 846 else: 847 updated_pools.append(newpools[key]) 848 849 return updated_pools 850 851 def _notify_capacity_usage(self, context, usage): 852 if usage: 853 for u in usage: 854 vol_utils.notify_about_capacity_usage( 855 context, u, u['type'], None, None) 856 LOG.debug("Publish storage capacity: %s.", usage) 857 858 def _equal_after_convert(self, capability, value): 859 860 if isinstance(value, type(capability)) or capability is None: 861 return value == capability 862 863 if isinstance(capability, bool): 864 return capability == strutils.bool_from_string(value) 865 866 # We can not check or convert value parameter's type in 867 # anywhere else. 868 # If the capability and value are not in the same type, 869 # we just convert them into string to compare them. 870 return str(value) == str(capability) 871