1# Copyright (c) 2011 OpenStack Foundation
2# All Rights Reserved.
3#
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
7#
8#         http://www.apache.org/licenses/LICENSE-2.0
9#
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
15
16"""
17Manage backends in the current zone.
18"""
19
20import collections
21
22from oslo_config import cfg
23from oslo_log import log as logging
24from oslo_utils import importutils
25from oslo_utils import strutils
26from oslo_utils import timeutils
27
28from cinder.common import constants
29from cinder import context as cinder_context
30from cinder import exception
31from cinder import objects
32from cinder.scheduler import filters
33from cinder import utils
34from cinder.volume import utils as vol_utils
35from cinder.volume import volume_types
36
37
38# FIXME: This file should be renamed to backend_manager, we should also rename
39# HostManager class, and scheduler_host_manager option, and also the weight
40# classes, and add code to maintain backward compatibility.
41
42
43host_manager_opts = [
44    cfg.ListOpt('scheduler_default_filters',
45                default=[
46                    'AvailabilityZoneFilter',
47                    'CapacityFilter',
48                    'CapabilitiesFilter'
49                ],
50                help='Which filter class names to use for filtering hosts '
51                     'when not specified in the request.'),
52    cfg.ListOpt('scheduler_default_weighers',
53                default=[
54                    'CapacityWeigher'
55                ],
56                help='Which weigher class names to use for weighing hosts.'),
57    cfg.StrOpt('scheduler_weight_handler',
58               default='cinder.scheduler.weights.OrderedHostWeightHandler',
59               help='Which handler to use for selecting the host/pool '
60                    'after weighing'),
61]
62
63CONF = cfg.CONF
64CONF.register_opts(host_manager_opts)
65CONF.import_opt('scheduler_driver', 'cinder.scheduler.manager')
66CONF.import_opt('max_over_subscription_ratio', 'cinder.volume.driver')
67
68LOG = logging.getLogger(__name__)
69
70
71class ReadOnlyDict(collections.Mapping):
72    """A read-only dict."""
73    def __init__(self, source=None):
74        if source is not None:
75            self.data = dict(source)
76        else:
77            self.data = {}
78
79    def __getitem__(self, key):
80        return self.data[key]
81
82    def __iter__(self):
83        return iter(self.data)
84
85    def __len__(self):
86        return len(self.data)
87
88    def __repr__(self):
89        return '%s(%r)' % (self.__class__.__name__, self.data)
90
91
92class BackendState(object):
93    """Mutable and immutable information tracked for a volume backend."""
94
95    def __init__(self, host, cluster_name, capabilities=None, service=None):
96        # NOTE(geguileo): We have a circular dependency between BackendState
97        # and PoolState and we resolve it with an instance attribute instead
98        # of a class attribute that we would assign after the PoolState
99        # declaration because this way we avoid splitting the code.
100        self.pool_state_cls = PoolState
101
102        self.capabilities = None
103        self.service = None
104        self.host = host
105        self.cluster_name = cluster_name
106        self.update_capabilities(capabilities, service)
107
108        self.volume_backend_name = None
109        self.vendor_name = None
110        self.driver_version = 0
111        self.storage_protocol = None
112        self.QoS_support = False
113        # Mutable available resources.
114        # These will change as resources are virtually "consumed".
115        self.total_capacity_gb = 0
116        # capacity has been allocated in cinder POV, which should be
117        # sum(vol['size'] for vol in vols_on_hosts)
118        self.allocated_capacity_gb = 0
119        self.free_capacity_gb = None
120        self.reserved_percentage = 0
121        # The apparent allocated space indicating how much capacity
122        # has been provisioned. This could be the sum of sizes of
123        # all volumes on a backend, which could be greater than or
124        # equal to the allocated_capacity_gb.
125        self.provisioned_capacity_gb = 0
126        self.max_over_subscription_ratio = 1.0
127        self.thin_provisioning_support = False
128        self.thick_provisioning_support = False
129        # Does this backend support attaching a volume to more than
130        # once host/instance?
131        self.multiattach = False
132
133        # PoolState for all pools
134        self.pools = {}
135
136        self.updated = None
137
138    @property
139    def backend_id(self):
140        return self.cluster_name or self.host
141
142    def update_capabilities(self, capabilities=None, service=None):
143        # Read-only capability dicts
144
145        if capabilities is None:
146            capabilities = {}
147        self.capabilities = ReadOnlyDict(capabilities)
148        if service is None:
149            service = {}
150        self.service = ReadOnlyDict(service)
151
152    def update_from_volume_capability(self, capability, service=None):
153        """Update information about a host from its volume_node info.
154
155        'capability' is the status info reported by volume backend, a typical
156        capability looks like this:
157
158        .. code-block:: python
159
160         {
161          capability = {
162              'volume_backend_name': 'Local iSCSI', #
163              'vendor_name': 'OpenStack',           #  backend level
164              'driver_version': '1.0',              #  mandatory/fixed
165              'storage_protocol': 'iSCSI',          #  stats&capabilities
166
167              'active_volumes': 10,                 #
168              'IOPS_provisioned': 30000,            #  optional custom
169              'fancy_capability_1': 'eat',          #  stats & capabilities
170              'fancy_capability_2': 'drink',        #
171
172              'pools': [
173                  {'pool_name': '1st pool',         #
174                   'total_capacity_gb': 500,        #  mandatory stats for
175                   'free_capacity_gb': 230,         #  pools
176                   'allocated_capacity_gb': 270,    #
177                   'QoS_support': 'False',          #
178                   'reserved_percentage': 0,        #
179
180                   'dying_disks': 100,              #
181                   'super_hero_1': 'spider-man',    #  optional custom
182                   'super_hero_2': 'flash',         #  stats & capabilities
183                   'super_hero_3': 'neoncat'        #
184                  },
185                  {'pool_name': '2nd pool',
186                   'total_capacity_gb': 1024,
187                   'free_capacity_gb': 1024,
188                   'allocated_capacity_gb': 0,
189                   'QoS_support': 'False',
190                   'reserved_percentage': 0,
191
192                   'dying_disks': 200,
193                   'super_hero_1': 'superman',
194                   'super_hero_2': ' ',
195                   'super_hero_2': 'Hulk'
196                  }
197              ]
198          }
199         }
200
201        """
202        self.update_capabilities(capability, service)
203
204        if capability:
205            if self.updated and self.updated > capability['timestamp']:
206                return
207
208            # Update backend level info
209            self.update_backend(capability)
210
211            # Update pool level info
212            self.update_pools(capability, service)
213
214    def update_pools(self, capability, service):
215        """Update storage pools information from backend reported info."""
216        if not capability:
217            return
218
219        pools = capability.get('pools', None)
220        active_pools = set()
221        if pools and isinstance(pools, list):
222            # Update all pools stats according to information from list
223            # of pools in volume capacity
224            for pool_cap in pools:
225                pool_name = pool_cap['pool_name']
226                self._append_backend_info(pool_cap)
227                cur_pool = self.pools.get(pool_name, None)
228                if not cur_pool:
229                    # Add new pool
230                    cur_pool = self.pool_state_cls(self.host,
231                                                   self.cluster_name,
232                                                   pool_cap,
233                                                   pool_name)
234                    self.pools[pool_name] = cur_pool
235                cur_pool.update_from_volume_capability(pool_cap, service)
236
237                active_pools.add(pool_name)
238        elif pools is None:
239            # To handle legacy driver that doesn't report pool
240            # information in the capability, we have to prepare
241            # a pool from backend level info, or to update the one
242            # we created in self.pools.
243            pool_name = self.volume_backend_name
244            if pool_name is None:
245                # To get DEFAULT_POOL_NAME
246                pool_name = vol_utils.extract_host(self.host, 'pool', True)
247
248            if len(self.pools) == 0:
249                # No pool was there
250                single_pool = self.pool_state_cls(self.host, self.cluster_name,
251                                                  capability, pool_name)
252                self._append_backend_info(capability)
253                self.pools[pool_name] = single_pool
254            else:
255                # this is an update from legacy driver
256                try:
257                    single_pool = self.pools[pool_name]
258                except KeyError:
259                    single_pool = self.pool_state_cls(self.host,
260                                                      self.cluster_name,
261                                                      capability,
262                                                      pool_name)
263                    self._append_backend_info(capability)
264                    self.pools[pool_name] = single_pool
265
266            single_pool.update_from_volume_capability(capability, service)
267            active_pools.add(pool_name)
268
269        # remove non-active pools from self.pools
270        nonactive_pools = set(self.pools.keys()) - active_pools
271        for pool in nonactive_pools:
272            LOG.debug("Removing non-active pool %(pool)s @ %(host)s "
273                      "from scheduler cache.", {'pool': pool,
274                                                'host': self.host})
275            del self.pools[pool]
276
277    def _append_backend_info(self, pool_cap):
278        # Fill backend level info to pool if needed.
279        if not pool_cap.get('volume_backend_name', None):
280            pool_cap['volume_backend_name'] = self.volume_backend_name
281
282        if not pool_cap.get('storage_protocol', None):
283            pool_cap['storage_protocol'] = self.storage_protocol
284
285        if not pool_cap.get('vendor_name', None):
286            pool_cap['vendor_name'] = self.vendor_name
287
288        if not pool_cap.get('driver_version', None):
289            pool_cap['driver_version'] = self.driver_version
290
291        if not pool_cap.get('timestamp', None):
292            pool_cap['timestamp'] = self.updated
293
294    def update_backend(self, capability):
295        self.volume_backend_name = capability.get('volume_backend_name', None)
296        self.vendor_name = capability.get('vendor_name', None)
297        self.driver_version = capability.get('driver_version', None)
298        self.storage_protocol = capability.get('storage_protocol', None)
299        self.updated = capability['timestamp']
300
301    def consume_from_volume(self, volume, update_time=True):
302        """Incrementally update host state from a volume."""
303        volume_gb = volume['size']
304        self.allocated_capacity_gb += volume_gb
305        self.provisioned_capacity_gb += volume_gb
306        if self.free_capacity_gb == 'infinite':
307            # There's virtually infinite space on back-end
308            pass
309        elif self.free_capacity_gb == 'unknown':
310            # Unable to determine the actual free space on back-end
311            pass
312        else:
313            self.free_capacity_gb -= volume_gb
314        if update_time:
315            self.updated = timeutils.utcnow()
316        LOG.debug("Consumed %s GB from backend: %s", volume['size'], self)
317
318    def __repr__(self):
319        # FIXME(zhiteng) backend level free_capacity_gb isn't as
320        # meaningful as it used to be before pool is introduced, we'd
321        # come up with better representation of HostState.
322        grouping = 'cluster' if self.cluster_name else 'host'
323        grouping_name = self.backend_id
324        return ("%(grouping)s '%(grouping_name)s':"
325                "free_capacity_gb: %(free_capacity_gb)s, "
326                "total_capacity_gb: %(total_capacity_gb)s,"
327                "allocated_capacity_gb: %(allocated_capacity_gb)s, "
328                "max_over_subscription_ratio: %(mosr)s,"
329                "reserved_percentage: %(reserved_percentage)s, "
330                "provisioned_capacity_gb: %(provisioned_capacity_gb)s,"
331                "thin_provisioning_support: %(thin_provisioning_support)s, "
332                "thick_provisioning_support: %(thick)s,"
333                "pools: %(pools)s,"
334                "updated at: %(updated)s" %
335                {'grouping': grouping, 'grouping_name': grouping_name,
336                 'free_capacity_gb': self.free_capacity_gb,
337                 'total_capacity_gb': self.total_capacity_gb,
338                 'allocated_capacity_gb': self.allocated_capacity_gb,
339                 'mosr': self.max_over_subscription_ratio,
340                 'reserved_percentage': self.reserved_percentage,
341                 'provisioned_capacity_gb': self.provisioned_capacity_gb,
342                 'thin_provisioning_support': self.thin_provisioning_support,
343                 'thick': self.thick_provisioning_support,
344                 'pools': self.pools, 'updated': self.updated})
345
346
347class PoolState(BackendState):
348    def __init__(self, host, cluster_name, capabilities, pool_name):
349        new_host = vol_utils.append_host(host, pool_name)
350        new_cluster = vol_utils.append_host(cluster_name, pool_name)
351        super(PoolState, self).__init__(new_host, new_cluster, capabilities)
352        self.pool_name = pool_name
353        # No pools in pool
354        self.pools = None
355
356    def update_from_volume_capability(self, capability, service=None):
357        """Update information about a pool from its volume_node info."""
358        LOG.debug("Updating capabilities for %s: %s", self.host, capability)
359        self.update_capabilities(capability, service)
360        if capability:
361            if self.updated and self.updated > capability['timestamp']:
362                return
363            self.update_backend(capability)
364
365            self.total_capacity_gb = capability.get('total_capacity_gb', 0)
366            self.free_capacity_gb = capability.get('free_capacity_gb', 0)
367            self.allocated_capacity_gb = capability.get(
368                'allocated_capacity_gb', 0)
369            self.QoS_support = capability.get('QoS_support', False)
370            self.reserved_percentage = capability.get('reserved_percentage', 0)
371            # provisioned_capacity_gb is the apparent total capacity of
372            # all the volumes created on a backend, which is greater than
373            # or equal to allocated_capacity_gb, which is the apparent
374            # total capacity of all the volumes created on a backend
375            # in Cinder. Using allocated_capacity_gb as the default of
376            # provisioned_capacity_gb if it is not set.
377            self.provisioned_capacity_gb = capability.get(
378                'provisioned_capacity_gb', self.allocated_capacity_gb)
379            self.thin_provisioning_support = capability.get(
380                'thin_provisioning_support', False)
381            self.thick_provisioning_support = capability.get(
382                'thick_provisioning_support', False)
383
384            self.max_over_subscription_ratio = (
385                utils.calculate_max_over_subscription_ratio(
386                    capability, CONF.max_over_subscription_ratio))
387
388            self.multiattach = capability.get('multiattach', False)
389
390    def update_pools(self, capability):
391        # Do nothing, since we don't have pools within pool, yet
392        pass
393
394
395class HostManager(object):
396    """Base HostManager class."""
397
398    backend_state_cls = BackendState
399
400    REQUIRED_KEYS = frozenset([
401        'pool_name',
402        'total_capacity_gb',
403        'free_capacity_gb',
404        'allocated_capacity_gb',
405        'provisioned_capacity_gb',
406        'thin_provisioning_support',
407        'thick_provisioning_support',
408        'max_over_subscription_ratio',
409        'reserved_percentage'])
410
411    def __init__(self):
412        self.service_states = {}  # { <host|cluster>: {<service>: {cap k : v}}}
413        self.backend_state_map = {}
414        self.filter_handler = filters.BackendFilterHandler('cinder.scheduler.'
415                                                           'filters')
416        self.filter_classes = self.filter_handler.get_all_classes()
417        self.enabled_filters = self._choose_backend_filters(
418            CONF.scheduler_default_filters)
419        self.weight_handler = importutils.import_object(
420            CONF.scheduler_weight_handler,
421            'cinder.scheduler.weights')
422        self.weight_classes = self.weight_handler.get_all_classes()
423
424        self._no_capabilities_backends = set()  # Services without capabilities
425        self._update_backend_state_map(cinder_context.get_admin_context())
426        self.service_states_last_update = {}
427
428    def _choose_backend_filters(self, filter_cls_names):
429        """Return a list of available filter names.
430
431        This function checks input filter names against a predefined set
432        of acceptable filters (all loaded filters). If input is None,
433        it uses CONF.scheduler_default_filters instead.
434        """
435        if not isinstance(filter_cls_names, (list, tuple)):
436            filter_cls_names = [filter_cls_names]
437        good_filters = []
438        bad_filters = []
439        for filter_name in filter_cls_names:
440            found_class = False
441            for cls in self.filter_classes:
442                if cls.__name__ == filter_name:
443                    found_class = True
444                    good_filters.append(cls)
445                    break
446            if not found_class:
447                bad_filters.append(filter_name)
448        if bad_filters:
449            raise exception.SchedulerHostFilterNotFound(
450                filter_name=", ".join(bad_filters))
451        return good_filters
452
453    def _choose_backend_weighers(self, weight_cls_names):
454        """Return a list of available weigher names.
455
456        This function checks input weigher names against a predefined set
457        of acceptable weighers (all loaded weighers).  If input is None,
458        it uses CONF.scheduler_default_weighers instead.
459        """
460        if weight_cls_names is None:
461            weight_cls_names = CONF.scheduler_default_weighers
462        if not isinstance(weight_cls_names, (list, tuple)):
463            weight_cls_names = [weight_cls_names]
464
465        good_weighers = []
466        bad_weighers = []
467        for weigher_name in weight_cls_names:
468            found_class = False
469            for cls in self.weight_classes:
470                if cls.__name__ == weigher_name:
471                    good_weighers.append(cls)
472                    found_class = True
473                    break
474            if not found_class:
475                bad_weighers.append(weigher_name)
476        if bad_weighers:
477            raise exception.SchedulerHostWeigherNotFound(
478                weigher_name=", ".join(bad_weighers))
479        return good_weighers
480
481    def get_filtered_backends(self, backends, filter_properties,
482                              filter_class_names=None):
483        """Filter backends and return only ones passing all filters."""
484        if filter_class_names is not None:
485            filter_classes = self._choose_backend_filters(filter_class_names)
486        else:
487            filter_classes = self.enabled_filters
488        return self.filter_handler.get_filtered_objects(filter_classes,
489                                                        backends,
490                                                        filter_properties)
491
492    def get_weighed_backends(self, backends, weight_properties,
493                             weigher_class_names=None):
494        """Weigh the backends."""
495        weigher_classes = self._choose_backend_weighers(weigher_class_names)
496        return self.weight_handler.get_weighed_objects(weigher_classes,
497                                                       backends,
498                                                       weight_properties)
499
500    def update_service_capabilities(self, service_name, host, capabilities,
501                                    cluster_name, timestamp):
502        """Update the per-service capabilities based on this notification."""
503        if service_name != 'volume':
504            LOG.debug('Ignoring %(service_name)s service update '
505                      'from %(host)s',
506                      {'service_name': service_name, 'host': host})
507            return
508
509        # TODO(geguileo): In P - Remove the next line since we receive the
510        # timestamp
511        timestamp = timestamp or timeutils.utcnow()
512        # Copy the capabilities, so we don't modify the original dict
513        capab_copy = dict(capabilities)
514        capab_copy["timestamp"] = timestamp
515
516        # Set the default capabilities in case None is set.
517        backend = cluster_name or host
518        capab_old = self.service_states.get(backend, {"timestamp": 0})
519        capab_last_update = self.service_states_last_update.get(
520            backend, {"timestamp": 0})
521
522        # Ignore older updates
523        if capab_old['timestamp'] and timestamp < capab_old['timestamp']:
524            LOG.info('Ignoring old capability report from %s.', backend)
525            return
526
527        # If the capabilities are not changed and the timestamp is older,
528        # record the capabilities.
529
530        # There are cases: capab_old has the capabilities set,
531        # but the timestamp may be None in it. So does capab_last_update.
532
533        if (not self._get_updated_pools(capab_old, capab_copy)) and (
534                (not capab_old.get("timestamp")) or
535                (not capab_last_update.get("timestamp")) or
536                (capab_last_update["timestamp"] < capab_old["timestamp"])):
537            self.service_states_last_update[backend] = capab_old
538
539        self.service_states[backend] = capab_copy
540
541        cluster_msg = (('Cluster: %s - Host: ' % cluster_name) if cluster_name
542                       else '')
543        LOG.debug("Received %(service_name)s service update from %(cluster)s"
544                  "%(host)s: %(cap)s%(cluster)s",
545                  {'service_name': service_name, 'host': host,
546                   'cap': capabilities,
547                   'cluster': cluster_msg})
548
549        self._no_capabilities_backends.discard(backend)
550
551    def notify_service_capabilities(self, service_name, backend, capabilities,
552                                    timestamp):
553        """Notify the ceilometer with updated volume stats"""
554        if service_name != 'volume':
555            return
556
557        updated = []
558        capa_new = self.service_states.get(backend, {})
559        timestamp = timestamp or timeutils.utcnow()
560
561        # Compare the capabilities and timestamps to decide notifying
562        if not capa_new:
563            updated = self._get_updated_pools(capa_new, capabilities)
564        else:
565            if timestamp > self.service_states[backend]["timestamp"]:
566                updated = self._get_updated_pools(
567                    self.service_states[backend], capabilities)
568                if not updated:
569                    updated = self._get_updated_pools(
570                        self.service_states_last_update.get(backend, {}),
571                        self.service_states.get(backend, {}))
572
573        if updated:
574            capab_copy = dict(capabilities)
575            capab_copy["timestamp"] = timestamp
576            # If capabilities changes, notify and record the capabilities.
577            self.service_states_last_update[backend] = capab_copy
578            self.get_usage_and_notify(capabilities, updated, backend,
579                                      timestamp)
580
581    def has_all_capabilities(self):
582        return len(self._no_capabilities_backends) == 0
583
584    def _update_backend_state_map(self, context):
585
586        # Get resource usage across the available volume nodes:
587        topic = constants.VOLUME_TOPIC
588        volume_services = objects.ServiceList.get_all(context,
589                                                      {'topic': topic,
590                                                       'disabled': False,
591                                                       'frozen': False})
592        active_backends = set()
593        active_hosts = set()
594        no_capabilities_backends = set()
595        for service in volume_services.objects:
596            host = service.host
597            if not service.is_up:
598                LOG.warning("volume service is down. (host: %s)", host)
599                continue
600
601            backend_key = service.service_topic_queue
602            # We only pay attention to the first up service of a cluster since
603            # they all refer to the same capabilities entry in service_states
604            if backend_key in active_backends:
605                active_hosts.add(host)
606                continue
607
608            # Capabilities may come from the cluster or the host if the service
609            # has just been converted to a cluster service.
610            capabilities = (self.service_states.get(service.cluster_name, None)
611                            or self.service_states.get(service.host, None))
612            if capabilities is None:
613                no_capabilities_backends.add(backend_key)
614                continue
615
616            # Since the service could have been added or remove from a cluster
617            backend_state = self.backend_state_map.get(backend_key, None)
618            if not backend_state:
619                backend_state = self.backend_state_cls(
620                    host,
621                    service.cluster_name,
622                    capabilities=capabilities,
623                    service=dict(service))
624                self.backend_state_map[backend_key] = backend_state
625
626            # update capabilities and attributes in backend_state
627            backend_state.update_from_volume_capability(capabilities,
628                                                        service=dict(service))
629            active_backends.add(backend_key)
630
631        self._no_capabilities_backends = no_capabilities_backends
632
633        # remove non-active keys from backend_state_map
634        inactive_backend_keys = set(self.backend_state_map) - active_backends
635        for backend_key in inactive_backend_keys:
636            # NOTE(geguileo): We don't want to log the removal of a host from
637            # the map when we are removing it because it has been added to a
638            # cluster.
639            if backend_key not in active_hosts:
640                LOG.info("Removing non-active backend: %(backend)s from "
641                         "scheduler cache.", {'backend': backend_key})
642            del self.backend_state_map[backend_key]
643
644    def revert_volume_consumed_capacity(self, pool_name, size):
645        for backend_key, state in self.backend_state_map.items():
646            for key in state.pools:
647                pool_state = state.pools[key]
648                if pool_name == '#'.join([backend_key, pool_state.pool_name]):
649                    pool_state.consume_from_volume({'size': -size},
650                                                   update_time=False)
651
652    def get_all_backend_states(self, context):
653        """Returns a dict of all the backends the HostManager knows about.
654
655        Each of the consumable resources in BackendState are
656        populated with capabilities scheduler received from RPC.
657
658        For example:
659          {'192.168.1.100': BackendState(), ...}
660        """
661
662        self._update_backend_state_map(context)
663
664        # build a pool_state map and return that map instead of
665        # backend_state_map
666        all_pools = {}
667        for backend_key, state in self.backend_state_map.items():
668            for key in state.pools:
669                pool = state.pools[key]
670                # use backend_key.pool_name to make sure key is unique
671                pool_key = '.'.join([backend_key, pool.pool_name])
672                all_pools[pool_key] = pool
673
674        return all_pools.values()
675
676    def _filter_pools_by_volume_type(self, context, volume_type, pools):
677        """Return the pools filtered by volume type specs"""
678
679        # wrap filter properties only with volume_type
680        filter_properties = {
681            'context': context,
682            'volume_type': volume_type,
683            'resource_type': volume_type,
684            'qos_specs': volume_type.get('qos_specs'),
685        }
686
687        filtered = self.get_filtered_backends(pools.values(),
688                                              filter_properties)
689
690        # filter the pools by value
691        return {k: v for k, v in pools.items() if v in filtered}
692
693    def get_pools(self, context, filters=None):
694        """Returns a dict of all pools on all hosts HostManager knows about."""
695
696        self._update_backend_state_map(context)
697
698        all_pools = {}
699        name = volume_type = None
700        if filters:
701            name = filters.pop('name', None)
702            volume_type = filters.pop('volume_type', None)
703
704        for backend_key, state in self.backend_state_map.items():
705            for key in state.pools:
706                filtered = False
707                pool = state.pools[key]
708                # use backend_key.pool_name to make sure key is unique
709                pool_key = vol_utils.append_host(backend_key, pool.pool_name)
710                new_pool = dict(name=pool_key)
711                new_pool.update(dict(capabilities=pool.capabilities))
712
713                if name and new_pool.get('name') != name:
714                    continue
715
716                if filters:
717                    # filter all other items in capabilities
718                    for (attr, value) in filters.items():
719                        cap = new_pool.get('capabilities').get(attr)
720                        if not self._equal_after_convert(cap, value):
721                            filtered = True
722                            break
723
724                if not filtered:
725                    all_pools[pool_key] = pool
726
727        # filter pools by volume type
728        if volume_type:
729            volume_type = volume_types.get_by_name_or_id(
730                context, volume_type)
731            all_pools = (
732                self._filter_pools_by_volume_type(context,
733                                                  volume_type,
734                                                  all_pools))
735
736        # encapsulate pools in format:{name: XXX, capabilities: XXX}
737        return [dict(name=key, capabilities=value.capabilities)
738                for key, value in all_pools.items()]
739
740    def get_usage_and_notify(self, capa_new, updated_pools, host, timestamp):
741        context = cinder_context.get_admin_context()
742        usage = self._get_usage(capa_new, updated_pools, host, timestamp)
743
744        self._notify_capacity_usage(context, usage)
745
746    def _get_usage(self, capa_new, updated_pools, host, timestamp):
747        pools = capa_new.get('pools')
748        usage = []
749        if pools and isinstance(pools, list):
750            backend_usage = dict(type='backend',
751                                 name_to_id=host,
752                                 total=0,
753                                 free=0,
754                                 allocated=0,
755                                 provisioned=0,
756                                 virtual_free=0,
757                                 reported_at=timestamp)
758
759            # Process the usage.
760            for pool in pools:
761                pool_usage = self._get_pool_usage(pool, host, timestamp)
762                if pool_usage:
763                    backend_usage["total"] += pool_usage["total"]
764                    backend_usage["free"] += pool_usage["free"]
765                    backend_usage["allocated"] += pool_usage["allocated"]
766                    backend_usage["provisioned"] += pool_usage["provisioned"]
767                    backend_usage["virtual_free"] += pool_usage["virtual_free"]
768                # Only the updated pool is reported.
769                if pool in updated_pools:
770                    usage.append(pool_usage)
771            usage.append(backend_usage)
772        return usage
773
774    def _get_pool_usage(self, pool, host, timestamp):
775        total = pool["total_capacity_gb"]
776        free = pool["free_capacity_gb"]
777
778        unknowns = ["unknown", "infinite", None]
779        if (total in unknowns) or (free in unknowns):
780            return {}
781
782        allocated = pool["allocated_capacity_gb"]
783        provisioned = pool["provisioned_capacity_gb"]
784        reserved = pool["reserved_percentage"]
785        ratio = utils.calculate_max_over_subscription_ratio(
786            pool, CONF.max_over_subscription_ratio)
787        support = pool["thin_provisioning_support"]
788
789        virtual_free = utils.calculate_virtual_free_capacity(
790            total,
791            free,
792            provisioned,
793            support,
794            ratio,
795            reserved,
796            support)
797
798        pool_usage = dict(
799            type='pool',
800            name_to_id='#'.join([host, pool['pool_name']]),
801            total=float(total),
802            free=float(free),
803            allocated=float(allocated),
804            provisioned=float(provisioned),
805            virtual_free=float(virtual_free),
806            reported_at=timestamp)
807
808        return pool_usage
809
810    def _get_updated_pools(self, old_capa, new_capa):
811        # Judge if the capabilities should be reported.
812
813        new_pools = new_capa.get('pools', [])
814        if not new_pools:
815            return []
816
817        if isinstance(new_pools, list):
818            # If the volume_stats is not well prepared, don't notify.
819            if not all(
820                    self.REQUIRED_KEYS.issubset(pool) for pool in new_pools):
821                return []
822        else:
823            LOG.debug("The reported capabilities are not well structured...")
824            return []
825
826        old_pools = old_capa.get('pools', [])
827        if not old_pools:
828            return new_pools
829
830        updated_pools = []
831
832        newpools = {}
833        oldpools = {}
834        for new_pool in new_pools:
835            newpools[new_pool['pool_name']] = new_pool
836
837        for old_pool in old_pools:
838            oldpools[old_pool['pool_name']] = old_pool
839
840        for key in newpools.keys():
841            if key in oldpools.keys():
842                for k in self.REQUIRED_KEYS:
843                    if newpools[key][k] != oldpools[key][k]:
844                        updated_pools.append(newpools[key])
845                        break
846            else:
847                updated_pools.append(newpools[key])
848
849        return updated_pools
850
851    def _notify_capacity_usage(self, context, usage):
852        if usage:
853            for u in usage:
854                vol_utils.notify_about_capacity_usage(
855                    context, u, u['type'], None, None)
856        LOG.debug("Publish storage capacity: %s.", usage)
857
858    def _equal_after_convert(self, capability, value):
859
860        if isinstance(value, type(capability)) or capability is None:
861            return value == capability
862
863        if isinstance(capability, bool):
864            return capability == strutils.bool_from_string(value)
865
866        # We can not check or convert value parameter's type in
867        # anywhere else.
868        # If the capability and value are not in the same type,
869        # we just convert them into string to compare them.
870        return str(value) == str(capability)
871