1# Copyright (c) 2011 OpenStack Foundation
2# All Rights Reserved.
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
8#         http://www.apache.org/licenses/LICENSE-2.0
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
17Manage backends in the current zone.
20import collections
22from oslo_config import cfg
23from oslo_log import log as logging
24from oslo_utils import importutils
25from oslo_utils import strutils
26from oslo_utils import timeutils
28from cinder.common import constants
29from cinder import context as cinder_context
30from cinder import exception
31from cinder import objects
32from cinder.scheduler import filters
33from cinder import utils
34from cinder.volume import utils as vol_utils
35from cinder.volume import volume_types
38# FIXME: This file should be renamed to backend_manager, we should also rename
39# HostManager class, and scheduler_host_manager option, and also the weight
40# classes, and add code to maintain backward compatibility.
43host_manager_opts = [
44    cfg.ListOpt('scheduler_default_filters',
45                default=[
46                    'AvailabilityZoneFilter',
47                    'CapacityFilter',
48                    'CapabilitiesFilter'
49                ],
50                help='Which filter class names to use for filtering hosts '
51                     'when not specified in the request.'),
52    cfg.ListOpt('scheduler_default_weighers',
53                default=[
54                    'CapacityWeigher'
55                ],
56                help='Which weigher class names to use for weighing hosts.'),
57    cfg.StrOpt('scheduler_weight_handler',
58               default='cinder.scheduler.weights.OrderedHostWeightHandler',
59               help='Which handler to use for selecting the host/pool '
60                    'after weighing'),
63CONF = cfg.CONF
65CONF.import_opt('scheduler_driver', 'cinder.scheduler.manager')
66CONF.import_opt('max_over_subscription_ratio', 'cinder.volume.driver')
68LOG = logging.getLogger(__name__)
71class ReadOnlyDict(collections.Mapping):
72    """A read-only dict."""
73    def __init__(self, source=None):
74        if source is not None:
75            self.data = dict(source)
76        else:
77            self.data = {}
79    def __getitem__(self, key):
80        return self.data[key]
82    def __iter__(self):
83        return iter(self.data)
85    def __len__(self):
86        return len(self.data)
88    def __repr__(self):
89        return '%s(%r)' % (self.__class__.__name__, self.data)
92class BackendState(object):
93    """Mutable and immutable information tracked for a volume backend."""
95    def __init__(self, host, cluster_name, capabilities=None, service=None):
96        # NOTE(geguileo): We have a circular dependency between BackendState
97        # and PoolState and we resolve it with an instance attribute instead
98        # of a class attribute that we would assign after the PoolState
99        # declaration because this way we avoid splitting the code.
100        self.pool_state_cls = PoolState
102        self.capabilities = None
103        self.service = None
104        self.host = host
105        self.cluster_name = cluster_name
106        self.update_capabilities(capabilities, service)
108        self.volume_backend_name = None
109        self.vendor_name = None
110        self.driver_version = 0
111        self.storage_protocol = None
112        self.QoS_support = False
113        # Mutable available resources.
114        # These will change as resources are virtually "consumed".
115        self.total_capacity_gb = 0
116        # capacity has been allocated in cinder POV, which should be
117        # sum(vol['size'] for vol in vols_on_hosts)
118        self.allocated_capacity_gb = 0
119        self.free_capacity_gb = None
120        self.reserved_percentage = 0
121        # The apparent allocated space indicating how much capacity
122        # has been provisioned. This could be the sum of sizes of
123        # all volumes on a backend, which could be greater than or
124        # equal to the allocated_capacity_gb.
125        self.provisioned_capacity_gb = 0
126        self.max_over_subscription_ratio = 1.0
127        self.thin_provisioning_support = False
128        self.thick_provisioning_support = False
129        # Does this backend support attaching a volume to more than
130        # once host/instance?
131        self.multiattach = False
133        # PoolState for all pools
134        self.pools = {}
136        self.updated = None
138    @property
139    def backend_id(self):
140        return self.cluster_name or self.host
142    def update_capabilities(self, capabilities=None, service=None):
143        # Read-only capability dicts
145        if capabilities is None:
146            capabilities = {}
147        self.capabilities = ReadOnlyDict(capabilities)
148        if service is None:
149            service = {}
150        self.service = ReadOnlyDict(service)
152    def update_from_volume_capability(self, capability, service=None):
153        """Update information about a host from its volume_node info.
155        'capability' is the status info reported by volume backend, a typical
156        capability looks like this:
158        .. code-block:: python
160         {
161          capability = {
162              'volume_backend_name': 'Local iSCSI', #
163              'vendor_name': 'OpenStack',           #  backend level
164              'driver_version': '1.0',              #  mandatory/fixed
165              'storage_protocol': 'iSCSI',          #  stats&capabilities
167              'active_volumes': 10,                 #
168              'IOPS_provisioned': 30000,            #  optional custom
169              'fancy_capability_1': 'eat',          #  stats & capabilities
170              'fancy_capability_2': 'drink',        #
172              'pools': [
173                  {'pool_name': '1st pool',         #
174                   'total_capacity_gb': 500,        #  mandatory stats for
175                   'free_capacity_gb': 230,         #  pools
176                   'allocated_capacity_gb': 270,    #
177                   'QoS_support': 'False',          #
178                   'reserved_percentage': 0,        #
180                   'dying_disks': 100,              #
181                   'super_hero_1': 'spider-man',    #  optional custom
182                   'super_hero_2': 'flash',         #  stats & capabilities
183                   'super_hero_3': 'neoncat'        #
184                  },
185                  {'pool_name': '2nd pool',
186                   'total_capacity_gb': 1024,
187                   'free_capacity_gb': 1024,
188                   'allocated_capacity_gb': 0,
189                   'QoS_support': 'False',
190                   'reserved_percentage': 0,
192                   'dying_disks': 200,
193                   'super_hero_1': 'superman',
194                   'super_hero_2': ' ',
195                   'super_hero_2': 'Hulk'
196                  }
197              ]
198          }
199         }
201        """
202        self.update_capabilities(capability, service)
204        if capability:
205            if self.updated and self.updated > capability['timestamp']:
206                return
208            # Update backend level info
209            self.update_backend(capability)
211            # Update pool level info
212            self.update_pools(capability, service)
214    def update_pools(self, capability, service):
215        """Update storage pools information from backend reported info."""
216        if not capability:
217            return
219        pools = capability.get('pools', None)
220        active_pools = set()
221        if pools and isinstance(pools, list):
222            # Update all pools stats according to information from list
223            # of pools in volume capacity
224            for pool_cap in pools:
225                pool_name = pool_cap['pool_name']
226                self._append_backend_info(pool_cap)
227                cur_pool = self.pools.get(pool_name, None)
228                if not cur_pool:
229                    # Add new pool
230                    cur_pool = self.pool_state_cls(self.host,
231                                                   self.cluster_name,
232                                                   pool_cap,
233                                                   pool_name)
234                    self.pools[pool_name] = cur_pool
235                cur_pool.update_from_volume_capability(pool_cap, service)
237                active_pools.add(pool_name)
238        elif pools is None:
239            # To handle legacy driver that doesn't report pool
240            # information in the capability, we have to prepare
241            # a pool from backend level info, or to update the one
242            # we created in self.pools.
243            pool_name = self.volume_backend_name
244            if pool_name is None:
245                # To get DEFAULT_POOL_NAME
246                pool_name = vol_utils.extract_host(self.host, 'pool', True)
248            if len(self.pools) == 0:
249                # No pool was there
250                single_pool = self.pool_state_cls(self.host, self.cluster_name,
251                                                  capability, pool_name)
252                self._append_backend_info(capability)
253                self.pools[pool_name] = single_pool
254            else:
255                # this is an update from legacy driver
256                try:
257                    single_pool = self.pools[pool_name]
258                except KeyError:
259                    single_pool = self.pool_state_cls(self.host,
260                                                      self.cluster_name,
261                                                      capability,
262                                                      pool_name)
263                    self._append_backend_info(capability)
264                    self.pools[pool_name] = single_pool
266            single_pool.update_from_volume_capability(capability, service)
267            active_pools.add(pool_name)
269        # remove non-active pools from self.pools
270        nonactive_pools = set(self.pools.keys()) - active_pools
271        for pool in nonactive_pools:
272            LOG.debug("Removing non-active pool %(pool)s @ %(host)s "
273                      "from scheduler cache.", {'pool': pool,
274                                                'host': self.host})
275            del self.pools[pool]
277    def _append_backend_info(self, pool_cap):
278        # Fill backend level info to pool if needed.
279        if not pool_cap.get('volume_backend_name', None):
280            pool_cap['volume_backend_name'] = self.volume_backend_name
282        if not pool_cap.get('storage_protocol', None):
283            pool_cap['storage_protocol'] = self.storage_protocol
285        if not pool_cap.get('vendor_name', None):
286            pool_cap['vendor_name'] = self.vendor_name
288        if not pool_cap.get('driver_version', None):
289            pool_cap['driver_version'] = self.driver_version
291        if not pool_cap.get('timestamp', None):
292            pool_cap['timestamp'] = self.updated
294    def update_backend(self, capability):
295        self.volume_backend_name = capability.get('volume_backend_name', None)
296        self.vendor_name = capability.get('vendor_name', None)
297        self.driver_version = capability.get('driver_version', None)
298        self.storage_protocol = capability.get('storage_protocol', None)
299        self.updated = capability['timestamp']
301    def consume_from_volume(self, volume, update_time=True):
302        """Incrementally update host state from a volume."""
303        volume_gb = volume['size']
304        self.allocated_capacity_gb += volume_gb
305        self.provisioned_capacity_gb += volume_gb
306        if self.free_capacity_gb == 'infinite':
307            # There's virtually infinite space on back-end
308            pass
309        elif self.free_capacity_gb == 'unknown':
310            # Unable to determine the actual free space on back-end
311            pass
312        else:
313            self.free_capacity_gb -= volume_gb
314        if update_time:
315            self.updated = timeutils.utcnow()
316        LOG.debug("Consumed %s GB from backend: %s", volume['size'], self)
318    def __repr__(self):
319        # FIXME(zhiteng) backend level free_capacity_gb isn't as
320        # meaningful as it used to be before pool is introduced, we'd
321        # come up with better representation of HostState.
322        grouping = 'cluster' if self.cluster_name else 'host'
323        grouping_name = self.backend_id
324        return ("%(grouping)s '%(grouping_name)s':"
325                "free_capacity_gb: %(free_capacity_gb)s, "
326                "total_capacity_gb: %(total_capacity_gb)s,"
327                "allocated_capacity_gb: %(allocated_capacity_gb)s, "
328                "max_over_subscription_ratio: %(mosr)s,"
329                "reserved_percentage: %(reserved_percentage)s, "
330                "provisioned_capacity_gb: %(provisioned_capacity_gb)s,"
331                "thin_provisioning_support: %(thin_provisioning_support)s, "
332                "thick_provisioning_support: %(thick)s,"
333                "pools: %(pools)s,"
334                "updated at: %(updated)s" %
335                {'grouping': grouping, 'grouping_name': grouping_name,
336                 'free_capacity_gb': self.free_capacity_gb,
337                 'total_capacity_gb': self.total_capacity_gb,
338                 'allocated_capacity_gb': self.allocated_capacity_gb,
339                 'mosr': self.max_over_subscription_ratio,
340                 'reserved_percentage': self.reserved_percentage,
341                 'provisioned_capacity_gb': self.provisioned_capacity_gb,
342                 'thin_provisioning_support': self.thin_provisioning_support,
343                 'thick': self.thick_provisioning_support,
344                 'pools': self.pools, 'updated': self.updated})
347class PoolState(BackendState):
348    def __init__(self, host, cluster_name, capabilities, pool_name):
349        new_host = vol_utils.append_host(host, pool_name)
350        new_cluster = vol_utils.append_host(cluster_name, pool_name)
351        super(PoolState, self).__init__(new_host, new_cluster, capabilities)
352        self.pool_name = pool_name
353        # No pools in pool
354        self.pools = None
356    def update_from_volume_capability(self, capability, service=None):
357        """Update information about a pool from its volume_node info."""
358        LOG.debug("Updating capabilities for %s: %s", self.host, capability)
359        self.update_capabilities(capability, service)
360        if capability:
361            if self.updated and self.updated > capability['timestamp']:
362                return
363            self.update_backend(capability)
365            self.total_capacity_gb = capability.get('total_capacity_gb', 0)
366            self.free_capacity_gb = capability.get('free_capacity_gb', 0)
367            self.allocated_capacity_gb = capability.get(
368                'allocated_capacity_gb', 0)
369            self.QoS_support = capability.get('QoS_support', False)
370            self.reserved_percentage = capability.get('reserved_percentage', 0)
371            # provisioned_capacity_gb is the apparent total capacity of
372            # all the volumes created on a backend, which is greater than
373            # or equal to allocated_capacity_gb, which is the apparent
374            # total capacity of all the volumes created on a backend
375            # in Cinder. Using allocated_capacity_gb as the default of
376            # provisioned_capacity_gb if it is not set.
377            self.provisioned_capacity_gb = capability.get(
378                'provisioned_capacity_gb', self.allocated_capacity_gb)
379            self.thin_provisioning_support = capability.get(
380                'thin_provisioning_support', False)
381            self.thick_provisioning_support = capability.get(
382                'thick_provisioning_support', False)
384            self.max_over_subscription_ratio = (
385                utils.calculate_max_over_subscription_ratio(
386                    capability, CONF.max_over_subscription_ratio))
388            self.multiattach = capability.get('multiattach', False)
390    def update_pools(self, capability):
391        # Do nothing, since we don't have pools within pool, yet
392        pass
395class HostManager(object):
396    """Base HostManager class."""
398    backend_state_cls = BackendState
400    REQUIRED_KEYS = frozenset([
401        'pool_name',
402        'total_capacity_gb',
403        'free_capacity_gb',
404        'allocated_capacity_gb',
405        'provisioned_capacity_gb',
406        'thin_provisioning_support',
407        'thick_provisioning_support',
408        'max_over_subscription_ratio',
409        'reserved_percentage'])
411    def __init__(self):
412        self.service_states = {}  # { <host|cluster>: {<service>: {cap k : v}}}
413        self.backend_state_map = {}
414        self.filter_handler = filters.BackendFilterHandler('cinder.scheduler.'
415                                                           'filters')
416        self.filter_classes = self.filter_handler.get_all_classes()
417        self.enabled_filters = self._choose_backend_filters(
418            CONF.scheduler_default_filters)
419        self.weight_handler = importutils.import_object(
420            CONF.scheduler_weight_handler,
421            'cinder.scheduler.weights')
422        self.weight_classes = self.weight_handler.get_all_classes()
424        self._no_capabilities_backends = set()  # Services without capabilities
425        self._update_backend_state_map(cinder_context.get_admin_context())
426        self.service_states_last_update = {}
428    def _choose_backend_filters(self, filter_cls_names):
429        """Return a list of available filter names.
431        This function checks input filter names against a predefined set
432        of acceptable filters (all loaded filters). If input is None,
433        it uses CONF.scheduler_default_filters instead.
434        """
435        if not isinstance(filter_cls_names, (list, tuple)):
436            filter_cls_names = [filter_cls_names]
437        good_filters = []
438        bad_filters = []
439        for filter_name in filter_cls_names:
440            found_class = False
441            for cls in self.filter_classes:
442                if cls.__name__ == filter_name:
443                    found_class = True
444                    good_filters.append(cls)
445                    break
446            if not found_class:
447                bad_filters.append(filter_name)
448        if bad_filters:
449            raise exception.SchedulerHostFilterNotFound(
450                filter_name=", ".join(bad_filters))
451        return good_filters
453    def _choose_backend_weighers(self, weight_cls_names):
454        """Return a list of available weigher names.
456        This function checks input weigher names against a predefined set
457        of acceptable weighers (all loaded weighers).  If input is None,
458        it uses CONF.scheduler_default_weighers instead.
459        """
460        if weight_cls_names is None:
461            weight_cls_names = CONF.scheduler_default_weighers
462        if not isinstance(weight_cls_names, (list, tuple)):
463            weight_cls_names = [weight_cls_names]
465        good_weighers = []
466        bad_weighers = []
467        for weigher_name in weight_cls_names:
468            found_class = False
469            for cls in self.weight_classes:
470                if cls.__name__ == weigher_name:
471                    good_weighers.append(cls)
472                    found_class = True
473                    break
474            if not found_class:
475                bad_weighers.append(weigher_name)
476        if bad_weighers:
477            raise exception.SchedulerHostWeigherNotFound(
478                weigher_name=", ".join(bad_weighers))
479        return good_weighers
481    def get_filtered_backends(self, backends, filter_properties,
482                              filter_class_names=None):
483        """Filter backends and return only ones passing all filters."""
484        if filter_class_names is not None:
485            filter_classes = self._choose_backend_filters(filter_class_names)
486        else:
487            filter_classes = self.enabled_filters
488        return self.filter_handler.get_filtered_objects(filter_classes,
489                                                        backends,
490                                                        filter_properties)
492    def get_weighed_backends(self, backends, weight_properties,
493                             weigher_class_names=None):
494        """Weigh the backends."""
495        weigher_classes = self._choose_backend_weighers(weigher_class_names)
496        return self.weight_handler.get_weighed_objects(weigher_classes,
497                                                       backends,
498                                                       weight_properties)
500    def update_service_capabilities(self, service_name, host, capabilities,
501                                    cluster_name, timestamp):
502        """Update the per-service capabilities based on this notification."""
503        if service_name != 'volume':
504            LOG.debug('Ignoring %(service_name)s service update '
505                      'from %(host)s',
506                      {'service_name': service_name, 'host': host})
507            return
509        # TODO(geguileo): In P - Remove the next line since we receive the
510        # timestamp
511        timestamp = timestamp or timeutils.utcnow()
512        # Copy the capabilities, so we don't modify the original dict
513        capab_copy = dict(capabilities)
514        capab_copy["timestamp"] = timestamp
516        # Set the default capabilities in case None is set.
517        backend = cluster_name or host
518        capab_old = self.service_states.get(backend, {"timestamp": 0})
519        capab_last_update = self.service_states_last_update.get(
520            backend, {"timestamp": 0})
522        # Ignore older updates
523        if capab_old['timestamp'] and timestamp < capab_old['timestamp']:
524            LOG.info('Ignoring old capability report from %s.', backend)
525            return
527        # If the capabilities are not changed and the timestamp is older,
528        # record the capabilities.
530        # There are cases: capab_old has the capabilities set,
531        # but the timestamp may be None in it. So does capab_last_update.
533        if (not self._get_updated_pools(capab_old, capab_copy)) and (
534                (not capab_old.get("timestamp")) or
535                (not capab_last_update.get("timestamp")) or
536                (capab_last_update["timestamp"] < capab_old["timestamp"])):
537            self.service_states_last_update[backend] = capab_old
539        self.service_states[backend] = capab_copy
541        cluster_msg = (('Cluster: %s - Host: ' % cluster_name) if cluster_name
542                       else '')
543        LOG.debug("Received %(service_name)s service update from %(cluster)s"
544                  "%(host)s: %(cap)s%(cluster)s",
545                  {'service_name': service_name, 'host': host,
546                   'cap': capabilities,
547                   'cluster': cluster_msg})
549        self._no_capabilities_backends.discard(backend)
551    def notify_service_capabilities(self, service_name, backend, capabilities,
552                                    timestamp):
553        """Notify the ceilometer with updated volume stats"""
554        if service_name != 'volume':
555            return
557        updated = []
558        capa_new = self.service_states.get(backend, {})
559        timestamp = timestamp or timeutils.utcnow()
561        # Compare the capabilities and timestamps to decide notifying
562        if not capa_new:
563            updated = self._get_updated_pools(capa_new, capabilities)
564        else:
565            if timestamp > self.service_states[backend]["timestamp"]:
566                updated = self._get_updated_pools(
567                    self.service_states[backend], capabilities)
568                if not updated:
569                    updated = self._get_updated_pools(
570                        self.service_states_last_update.get(backend, {}),
571                        self.service_states.get(backend, {}))
573        if updated:
574            capab_copy = dict(capabilities)
575            capab_copy["timestamp"] = timestamp
576            # If capabilities changes, notify and record the capabilities.
577            self.service_states_last_update[backend] = capab_copy
578            self.get_usage_and_notify(capabilities, updated, backend,
579                                      timestamp)
581    def has_all_capabilities(self):
582        return len(self._no_capabilities_backends) == 0
584    def _update_backend_state_map(self, context):
586        # Get resource usage across the available volume nodes:
587        topic = constants.VOLUME_TOPIC
588        volume_services = objects.ServiceList.get_all(context,
589                                                      {'topic': topic,
590                                                       'disabled': False,
591                                                       'frozen': False})
592        active_backends = set()
593        active_hosts = set()
594        no_capabilities_backends = set()
595        for service in volume_services.objects:
596            host = service.host
597            if not service.is_up:
598                LOG.warning("volume service is down. (host: %s)", host)
599                continue
601            backend_key = service.service_topic_queue
602            # We only pay attention to the first up service of a cluster since
603            # they all refer to the same capabilities entry in service_states
604            if backend_key in active_backends:
605                active_hosts.add(host)
606                continue
608            # Capabilities may come from the cluster or the host if the service
609            # has just been converted to a cluster service.
610            capabilities = (self.service_states.get(service.cluster_name, None)
611                            or self.service_states.get(service.host, None))
612            if capabilities is None:
613                no_capabilities_backends.add(backend_key)
614                continue
616            # Since the service could have been added or remove from a cluster
617            backend_state = self.backend_state_map.get(backend_key, None)
618            if not backend_state:
619                backend_state = self.backend_state_cls(
620                    host,
621                    service.cluster_name,
622                    capabilities=capabilities,
623                    service=dict(service))
624                self.backend_state_map[backend_key] = backend_state
626            # update capabilities and attributes in backend_state
627            backend_state.update_from_volume_capability(capabilities,
628                                                        service=dict(service))
629            active_backends.add(backend_key)
631        self._no_capabilities_backends = no_capabilities_backends
633        # remove non-active keys from backend_state_map
634        inactive_backend_keys = set(self.backend_state_map) - active_backends
635        for backend_key in inactive_backend_keys:
636            # NOTE(geguileo): We don't want to log the removal of a host from
637            # the map when we are removing it because it has been added to a
638            # cluster.
639            if backend_key not in active_hosts:
640                LOG.info("Removing non-active backend: %(backend)s from "
641                         "scheduler cache.", {'backend': backend_key})
642            del self.backend_state_map[backend_key]
644    def revert_volume_consumed_capacity(self, pool_name, size):
645        for backend_key, state in self.backend_state_map.items():
646            for key in state.pools:
647                pool_state = state.pools[key]
648                if pool_name == '#'.join([backend_key, pool_state.pool_name]):
649                    pool_state.consume_from_volume({'size': -size},
650                                                   update_time=False)
652    def get_all_backend_states(self, context):
653        """Returns a dict of all the backends the HostManager knows about.
655        Each of the consumable resources in BackendState are
656        populated with capabilities scheduler received from RPC.
658        For example:
659          {'': BackendState(), ...}
660        """
662        self._update_backend_state_map(context)
664        # build a pool_state map and return that map instead of
665        # backend_state_map
666        all_pools = {}
667        for backend_key, state in self.backend_state_map.items():
668            for key in state.pools:
669                pool = state.pools[key]
670                # use backend_key.pool_name to make sure key is unique
671                pool_key = '.'.join([backend_key, pool.pool_name])
672                all_pools[pool_key] = pool
674        return all_pools.values()
676    def _filter_pools_by_volume_type(self, context, volume_type, pools):
677        """Return the pools filtered by volume type specs"""
679        # wrap filter properties only with volume_type
680        filter_properties = {
681            'context': context,
682            'volume_type': volume_type,
683            'resource_type': volume_type,
684            'qos_specs': volume_type.get('qos_specs'),
685        }
687        filtered = self.get_filtered_backends(pools.values(),
688                                              filter_properties)
690        # filter the pools by value
691        return {k: v for k, v in pools.items() if v in filtered}
693    def get_pools(self, context, filters=None):
694        """Returns a dict of all pools on all hosts HostManager knows about."""
696        self._update_backend_state_map(context)
698        all_pools = {}
699        name = volume_type = None
700        if filters:
701            name = filters.pop('name', None)
702            volume_type = filters.pop('volume_type', None)
704        for backend_key, state in self.backend_state_map.items():
705            for key in state.pools:
706                filtered = False
707                pool = state.pools[key]
708                # use backend_key.pool_name to make sure key is unique
709                pool_key = vol_utils.append_host(backend_key, pool.pool_name)
710                new_pool = dict(name=pool_key)
711                new_pool.update(dict(capabilities=pool.capabilities))
713                if name and new_pool.get('name') != name:
714                    continue
716                if filters:
717                    # filter all other items in capabilities
718                    for (attr, value) in filters.items():
719                        cap = new_pool.get('capabilities').get(attr)
720                        if not self._equal_after_convert(cap, value):
721                            filtered = True
722                            break
724                if not filtered:
725                    all_pools[pool_key] = pool
727        # filter pools by volume type
728        if volume_type:
729            volume_type = volume_types.get_by_name_or_id(
730                context, volume_type)
731            all_pools = (
732                self._filter_pools_by_volume_type(context,
733                                                  volume_type,
734                                                  all_pools))
736        # encapsulate pools in format:{name: XXX, capabilities: XXX}
737        return [dict(name=key, capabilities=value.capabilities)
738                for key, value in all_pools.items()]
740    def get_usage_and_notify(self, capa_new, updated_pools, host, timestamp):
741        context = cinder_context.get_admin_context()
742        usage = self._get_usage(capa_new, updated_pools, host, timestamp)
744        self._notify_capacity_usage(context, usage)
746    def _get_usage(self, capa_new, updated_pools, host, timestamp):
747        pools = capa_new.get('pools')
748        usage = []
749        if pools and isinstance(pools, list):
750            backend_usage = dict(type='backend',
751                                 name_to_id=host,
752                                 total=0,
753                                 free=0,
754                                 allocated=0,
755                                 provisioned=0,
756                                 virtual_free=0,
757                                 reported_at=timestamp)
759            # Process the usage.
760            for pool in pools:
761                pool_usage = self._get_pool_usage(pool, host, timestamp)
762                if pool_usage:
763                    backend_usage["total"] += pool_usage["total"]
764                    backend_usage["free"] += pool_usage["free"]
765                    backend_usage["allocated"] += pool_usage["allocated"]
766                    backend_usage["provisioned"] += pool_usage["provisioned"]
767                    backend_usage["virtual_free"] += pool_usage["virtual_free"]
768                # Only the updated pool is reported.
769                if pool in updated_pools:
770                    usage.append(pool_usage)
771            usage.append(backend_usage)
772        return usage
774    def _get_pool_usage(self, pool, host, timestamp):
775        total = pool["total_capacity_gb"]
776        free = pool["free_capacity_gb"]
778        unknowns = ["unknown", "infinite", None]
779        if (total in unknowns) or (free in unknowns):
780            return {}
782        allocated = pool["allocated_capacity_gb"]
783        provisioned = pool["provisioned_capacity_gb"]
784        reserved = pool["reserved_percentage"]
785        ratio = utils.calculate_max_over_subscription_ratio(
786            pool, CONF.max_over_subscription_ratio)
787        support = pool["thin_provisioning_support"]
789        virtual_free = utils.calculate_virtual_free_capacity(
790            total,
791            free,
792            provisioned,
793            support,
794            ratio,
795            reserved,
796            support)
798        pool_usage = dict(
799            type='pool',
800            name_to_id='#'.join([host, pool['pool_name']]),
801            total=float(total),
802            free=float(free),
803            allocated=float(allocated),
804            provisioned=float(provisioned),
805            virtual_free=float(virtual_free),
806            reported_at=timestamp)
808        return pool_usage
810    def _get_updated_pools(self, old_capa, new_capa):
811        # Judge if the capabilities should be reported.
813        new_pools = new_capa.get('pools', [])
814        if not new_pools:
815            return []
817        if isinstance(new_pools, list):
818            # If the volume_stats is not well prepared, don't notify.
819            if not all(
820                    self.REQUIRED_KEYS.issubset(pool) for pool in new_pools):
821                return []
822        else:
823            LOG.debug("The reported capabilities are not well structured...")
824            return []
826        old_pools = old_capa.get('pools', [])
827        if not old_pools:
828            return new_pools
830        updated_pools = []
832        newpools = {}
833        oldpools = {}
834        for new_pool in new_pools:
835            newpools[new_pool['pool_name']] = new_pool
837        for old_pool in old_pools:
838            oldpools[old_pool['pool_name']] = old_pool
840        for key in newpools.keys():
841            if key in oldpools.keys():
842                for k in self.REQUIRED_KEYS:
843                    if newpools[key][k] != oldpools[key][k]:
844                        updated_pools.append(newpools[key])
845                        break
846            else:
847                updated_pools.append(newpools[key])
849        return updated_pools
851    def _notify_capacity_usage(self, context, usage):
852        if usage:
853            for u in usage:
854                vol_utils.notify_about_capacity_usage(
855                    context, u, u['type'], None, None)
856        LOG.debug("Publish storage capacity: %s.", usage)
858    def _equal_after_convert(self, capability, value):
860        if isinstance(value, type(capability)) or capability is None:
861            return value == capability
863        if isinstance(capability, bool):
864            return capability == strutils.bool_from_string(value)
866        # We can not check or convert value parameter's type in
867        # anywhere else.
868        # If the capability and value are not in the same type,
869        # we just convert them into string to compare them.
870        return str(value) == str(capability)