1# Copyright (c) 2011 Intel Corporation
2# Copyright (c) 2011 OpenStack Foundation
3# All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17"""The FilterScheduler is for creating volumes.
18
19You can customize this scheduler by specifying your own volume Filters and
20Weighing Functions.
21"""
22
23from oslo_config import cfg
24from oslo_log import log as logging
25from oslo_serialization import jsonutils
26
27from cinder import exception
28from cinder.i18n import _
29from cinder.scheduler import driver
30from cinder.scheduler import scheduler_options
31from cinder.volume import utils
32
33CONF = cfg.CONF
34LOG = logging.getLogger(__name__)
35
36
37class FilterScheduler(driver.Scheduler):
38    """Scheduler that can be used for filtering and weighing."""
39    def __init__(self, *args, **kwargs):
40        super(FilterScheduler, self).__init__(*args, **kwargs)
41        self.cost_function_cache = None
42        self.options = scheduler_options.SchedulerOptions()
43        self.max_attempts = self._max_attempts()
44
45    def schedule(self, context, topic, method, *args, **kwargs):
46        """Schedule contract that returns best-suited host for this request."""
47        self._schedule(context, topic, *args, **kwargs)
48
49    def _get_configuration_options(self):
50        """Fetch options dictionary. Broken out for testing."""
51        return self.options.get_configuration()
52
53    def populate_filter_properties(self, request_spec, filter_properties):
54        """Stuff things into filter_properties.
55
56        Can be overridden in a subclass to add more data.
57        """
58        vol = request_spec['volume_properties']
59        filter_properties['size'] = vol['size']
60        filter_properties['availability_zone'] = vol.get('availability_zone')
61        filter_properties['user_id'] = vol.get('user_id')
62        filter_properties['metadata'] = vol.get('metadata')
63        filter_properties['qos_specs'] = vol.get('qos_specs')
64
65    def schedule_create_group(self, context, group,
66                              group_spec,
67                              request_spec_list,
68                              group_filter_properties,
69                              filter_properties_list):
70        weighed_backend = self._schedule_generic_group(
71            context,
72            group_spec,
73            request_spec_list,
74            group_filter_properties,
75            filter_properties_list)
76
77        if not weighed_backend:
78            raise exception.NoValidBackend(reason=_("No weighed backends "
79                                                    "available"))
80
81        backend = weighed_backend.obj
82
83        updated_group = driver.generic_group_update_db(context, group,
84                                                       backend.host,
85                                                       backend.cluster_name)
86
87        self.volume_rpcapi.create_group(context, updated_group)
88
89    def schedule_create_volume(self, context, request_spec, filter_properties):
90        backend = self._schedule(context, request_spec, filter_properties)
91
92        if not backend:
93            raise exception.NoValidBackend(reason=_("No weighed backends "
94                                                    "available"))
95
96        backend = backend.obj
97        volume_id = request_spec['volume_id']
98
99        updated_volume = driver.volume_update_db(context, volume_id,
100                                                 backend.host,
101                                                 backend.cluster_name)
102        self._post_select_populate_filter_properties(filter_properties,
103                                                     backend)
104
105        # context is not serializable
106        filter_properties.pop('context', None)
107
108        self.volume_rpcapi.create_volume(context, updated_volume, request_spec,
109                                         filter_properties,
110                                         allow_reschedule=True)
111
112    def backend_passes_filters(self, context, backend, request_spec,
113                               filter_properties):
114        """Check if the specified backend passes the filters."""
115        weighed_backends = self._get_weighted_candidates(context, request_spec,
116                                                         filter_properties)
117        # If backend has no pool defined we will ignore it in the comparison
118        ignore_pool = not bool(utils.extract_host(backend, 'pool'))
119        for weighed_backend in weighed_backends:
120            backend_id = weighed_backend.obj.backend_id
121            if ignore_pool:
122                backend_id = utils.extract_host(backend_id)
123            if backend_id == backend:
124                return weighed_backend.obj
125
126        reason_param = {'resource': 'volume',
127                        'id': '??id missing??',
128                        'backend': backend}
129        for resource in ['volume', 'group', 'snapshot']:
130            resource_id = request_spec.get('%s_id' % resource, None)
131            if resource_id:
132                reason_param.update({'resource': resource, 'id': resource_id})
133                break
134        raise exception.NoValidBackend(_('Cannot place %(resource)s %(id)s '
135                                         'on %(backend)s.') % reason_param)
136
137    def find_retype_backend(self, context, request_spec,
138                            filter_properties=None, migration_policy='never'):
139        """Find a backend that can accept the volume with its new type."""
140        filter_properties = filter_properties or {}
141        backend = (request_spec['volume_properties'].get('cluster_name')
142                   or request_spec['volume_properties']['host'])
143
144        # The volume already exists on this backend, and so we shouldn't check
145        # if it can accept the volume again in the CapacityFilter.
146        filter_properties['vol_exists_on'] = backend
147
148        weighed_backends = self._get_weighted_candidates(context, request_spec,
149                                                         filter_properties)
150        if not weighed_backends:
151            raise exception.NoValidBackend(
152                reason=_('No valid backends for volume %(id)s with type '
153                         '%(type)s') % {'id': request_spec['volume_id'],
154                                        'type': request_spec['volume_type']})
155
156        for weighed_backend in weighed_backends:
157            backend_state = weighed_backend.obj
158            if backend_state.backend_id == backend:
159                return backend_state
160
161        if utils.extract_host(backend, 'pool') is None:
162            # legacy volumes created before pool is introduced has no pool
163            # info in host.  But host_state.host always include pool level
164            # info. In this case if above exact match didn't work out, we
165            # find host_state that are of the same host of volume being
166            # retyped. In other words, for legacy volumes, retyping could
167            # cause migration between pools on same host, which we consider
168            # it is different from migration between hosts thus allow that
169            # to happen even migration policy is 'never'.
170            for weighed_backend in weighed_backends:
171                backend_state = weighed_backend.obj
172                new_backend = utils.extract_host(backend_state.backend_id,
173                                                 'backend')
174                if new_backend == backend:
175                    return backend_state
176
177        if migration_policy == 'never':
178            raise exception.NoValidBackend(
179                reason=_('Current backend not valid for volume %(id)s with '
180                         'type %(type)s, migration not allowed') %
181                {'id': request_spec['volume_id'],
182                 'type': request_spec['volume_type']})
183
184        top_backend = self._choose_top_backend(weighed_backends, request_spec)
185        return top_backend.obj
186
187    def get_pools(self, context, filters):
188        return self.host_manager.get_pools(context, filters)
189
190    def _post_select_populate_filter_properties(self, filter_properties,
191                                                backend_state):
192        """Populate filter properties with additional information.
193
194        Add additional information to the filter properties after a backend has
195        been selected by the scheduling process.
196        """
197        # Add a retry entry for the selected volume backend:
198        self._add_retry_backend(filter_properties, backend_state.backend_id)
199
200    def _add_retry_backend(self, filter_properties, backend):
201        """Add a retry entry for the selected volume backend.
202
203        In the event that the request gets re-scheduled, this entry will signal
204        that the given backend has already been tried.
205        """
206        retry = filter_properties.get('retry', None)
207        if not retry:
208            return
209        # TODO(geguileo): In P - change to only use backends
210        for key in ('hosts', 'backends'):
211            backends = retry.get(key)
212            if backends is not None:
213                backends.append(backend)
214
215    def _max_attempts(self):
216        max_attempts = CONF.scheduler_max_attempts
217        if max_attempts < 1:
218            raise exception.InvalidParameterValue(
219                err=_("Invalid value for 'scheduler_max_attempts', "
220                      "must be >=1"))
221        return max_attempts
222
223    def _log_volume_error(self, volume_id, retry):
224        """Log requests with exceptions from previous volume operations."""
225        exc = retry.pop('exc', None)  # string-ified exception from volume
226        if not exc:
227            return  # no exception info from a previous attempt, skip
228
229        # TODO(geguileo): In P - change to hosts = retry.get('backends')
230        backends = retry.get('backends', retry.get('hosts'))
231        if not backends:
232            return  # no previously attempted hosts, skip
233
234        last_backend = backends[-1]
235        LOG.error("Error scheduling %(volume_id)s from last vol-service: "
236                  "%(last_backend)s : %(exc)s",
237                  {'volume_id': volume_id,
238                   'last_backend': last_backend,
239                   'exc': exc})
240
241    def _populate_retry(self, filter_properties, properties):
242        """Populate filter properties with history of retries for request.
243
244        If maximum retries is exceeded, raise NoValidBackend.
245        """
246        max_attempts = self.max_attempts
247        retry = filter_properties.pop('retry', {})
248
249        if max_attempts == 1:
250            # re-scheduling is disabled.
251            return
252
253        # retry is enabled, update attempt count:
254        if retry:
255            retry['num_attempts'] += 1
256        else:
257            retry = {
258                'num_attempts': 1,
259                'backends': [],  # list of volume service backends tried
260                'hosts': []  # TODO(geguileo): Remove in P and leave backends
261            }
262        filter_properties['retry'] = retry
263
264        volume_id = properties.get('volume_id')
265        self._log_volume_error(volume_id, retry)
266
267        if retry['num_attempts'] > max_attempts:
268            raise exception.NoValidBackend(
269                reason=_("Exceeded max scheduling attempts %(max_attempts)d "
270                         "for volume %(volume_id)s") %
271                {'max_attempts': max_attempts,
272                 'volume_id': volume_id})
273
274    def _get_weighted_candidates(self, context, request_spec,
275                                 filter_properties=None):
276        """Return a list of backends that meet required specs.
277
278        Returned list is ordered by their fitness.
279        """
280        elevated = context.elevated()
281
282        # Since Cinder is using mixed filters from Oslo and it's own, which
283        # takes 'resource_XX' and 'volume_XX' as input respectively, copying
284        # 'volume_XX' to 'resource_XX' will make both filters happy.
285        volume_type = request_spec.get("volume_type")
286        resource_type = volume_type if volume_type is not None else {}
287
288        config_options = self._get_configuration_options()
289
290        if filter_properties is None:
291            filter_properties = {}
292        self._populate_retry(filter_properties,
293                             request_spec['volume_properties'])
294
295        request_spec_dict = jsonutils.to_primitive(request_spec)
296
297        filter_properties.update({'context': context,
298                                  'request_spec': request_spec_dict,
299                                  'config_options': config_options,
300                                  'volume_type': volume_type,
301                                  'resource_type': resource_type})
302
303        self.populate_filter_properties(request_spec,
304                                        filter_properties)
305
306        # If multiattach is enabled on a volume, we need to add
307        # multiattach to extra specs, so that the capability
308        # filtering is enabled.
309        multiattach = request_spec['volume_properties'].get('multiattach',
310                                                            False)
311        if multiattach and 'multiattach' not in resource_type.get(
312                'extra_specs', {}):
313            if 'extra_specs' not in resource_type:
314                resource_type['extra_specs'] = {}
315
316            resource_type['extra_specs'].update(
317                multiattach='<is> True')
318
319        # Revert volume consumed capacity if it's a rescheduled request
320        retry = filter_properties.get('retry', {})
321        if retry.get('backends', []):
322            self.host_manager.revert_volume_consumed_capacity(
323                retry['backends'][-1],
324                request_spec['volume_properties']['size'])
325        # Find our local list of acceptable backends by filtering and
326        # weighing our options. we virtually consume resources on
327        # it so subsequent selections can adjust accordingly.
328
329        # Note: remember, we are using an iterator here. So only
330        # traverse this list once.
331        backends = self.host_manager.get_all_backend_states(elevated)
332
333        # Filter local hosts based on requirements ...
334        backends = self.host_manager.get_filtered_backends(backends,
335                                                           filter_properties)
336        if not backends:
337            return []
338
339        LOG.debug("Filtered %s", backends)
340        # weighted_backends = WeightedHost() ... the best
341        # backend for the job.
342        weighed_backends = self.host_manager.get_weighed_backends(
343            backends, filter_properties)
344        return weighed_backends
345
346    def _get_weighted_candidates_generic_group(
347            self, context, group_spec, request_spec_list,
348            group_filter_properties=None,
349            filter_properties_list=None):
350        """Finds backends that supports the group.
351
352        Returns a list of backends that meet the required specs,
353        ordered by their fitness.
354        """
355        elevated = context.elevated()
356
357        backends_by_group_type = self._get_weighted_candidates_by_group_type(
358            context, group_spec, group_filter_properties)
359
360        weighed_backends = []
361        backends_by_vol_type = []
362        index = 0
363        for request_spec in request_spec_list:
364            volume_properties = request_spec['volume_properties']
365            # Since Cinder is using mixed filters from Oslo and it's own, which
366            # takes 'resource_XX' and 'volume_XX' as input respectively,
367            # copying 'volume_XX' to 'resource_XX' will make both filters
368            # happy.
369            resource_properties = volume_properties.copy()
370            volume_type = request_spec.get("volume_type", None)
371            resource_type = request_spec.get("volume_type", None)
372            request_spec.update({'resource_properties': resource_properties})
373
374            config_options = self._get_configuration_options()
375
376            filter_properties = {}
377            if filter_properties_list:
378                filter_properties = filter_properties_list[index]
379                if filter_properties is None:
380                    filter_properties = {}
381            self._populate_retry(filter_properties, resource_properties)
382
383            # Add group_support in extra_specs if it is not there.
384            # Make sure it is populated in filter_properties
385            # if 'group_support' not in resource_type.get(
386            #         'extra_specs', {}):
387            #     resource_type['extra_specs'].update(
388            #         group_support='<is> True')
389
390            filter_properties.update({'context': context,
391                                      'request_spec': request_spec,
392                                      'config_options': config_options,
393                                      'volume_type': volume_type,
394                                      'resource_type': resource_type})
395
396            self.populate_filter_properties(request_spec,
397                                            filter_properties)
398
399            # Find our local list of acceptable backends by filtering and
400            # weighing our options. we virtually consume resources on
401            # it so subsequent selections can adjust accordingly.
402
403            # Note: remember, we are using an iterator here. So only
404            # traverse this list once.
405            all_backends = self.host_manager.get_all_backend_states(elevated)
406            if not all_backends:
407                return []
408
409            # Filter local backends based on requirements ...
410            backends = self.host_manager.get_filtered_backends(
411                all_backends, filter_properties)
412
413            if not backends:
414                return []
415
416            LOG.debug("Filtered %s", backends)
417
418            # weighted_backend = WeightedHost() ... the best
419            # backend for the job.
420            temp_weighed_backends = self.host_manager.get_weighed_backends(
421                backends,
422                filter_properties)
423            if not temp_weighed_backends:
424                return []
425            if index == 0:
426                backends_by_vol_type = temp_weighed_backends
427            else:
428                backends_by_vol_type = self._find_valid_backends(
429                    backends_by_vol_type, temp_weighed_backends)
430                if not backends_by_vol_type:
431                    return []
432
433            index += 1
434
435        # Find backends selected by both the group type and volume types.
436        weighed_backends = self._find_valid_backends(backends_by_vol_type,
437                                                     backends_by_group_type)
438
439        return weighed_backends
440
441    def _find_valid_backends(self, backend_list1, backend_list2):
442        new_backends = []
443        for backend1 in backend_list1:
444            for backend2 in backend_list2:
445                # Should schedule creation of group on backend level,
446                # not pool level.
447                if (utils.extract_host(backend1.obj.backend_id) ==
448                        utils.extract_host(backend2.obj.backend_id)):
449                    new_backends.append(backend1)
450        if not new_backends:
451            return []
452        return new_backends
453
454    def _get_weighted_candidates_by_group_type(
455            self, context, group_spec,
456            group_filter_properties=None):
457        """Finds backends that supports the group type.
458
459        Returns a list of backends that meet the required specs,
460        ordered by their fitness.
461        """
462        elevated = context.elevated()
463
464        weighed_backends = []
465        volume_properties = group_spec['volume_properties']
466        # Since Cinder is using mixed filters from Oslo and it's own, which
467        # takes 'resource_XX' and 'volume_XX' as input respectively,
468        # copying 'volume_XX' to 'resource_XX' will make both filters
469        # happy.
470        resource_properties = volume_properties.copy()
471        group_type = group_spec.get("group_type", None)
472        resource_type = group_spec.get("group_type", None)
473        group_spec.update({'resource_properties': resource_properties})
474
475        config_options = self._get_configuration_options()
476
477        if group_filter_properties is None:
478            group_filter_properties = {}
479        self._populate_retry(group_filter_properties, resource_properties)
480
481        group_filter_properties.update({'context': context,
482                                        'request_spec': group_spec,
483                                        'config_options': config_options,
484                                        'group_type': group_type,
485                                        'resource_type': resource_type})
486
487        self.populate_filter_properties(group_spec,
488                                        group_filter_properties)
489
490        # Find our local list of acceptable backends by filtering and
491        # weighing our options. we virtually consume resources on
492        # it so subsequent selections can adjust accordingly.
493
494        # Note: remember, we are using an iterator here. So only
495        # traverse this list once.
496        all_backends = self.host_manager.get_all_backend_states(elevated)
497        if not all_backends:
498            return []
499
500        # Filter local backends based on requirements ...
501        backends = self.host_manager.get_filtered_backends(
502            all_backends, group_filter_properties)
503
504        if not backends:
505            return []
506
507        LOG.debug("Filtered %s", backends)
508
509        # weighted_backends = WeightedHost() ... the best backend for the job.
510        weighed_backends = self.host_manager.get_weighed_backends(
511            backends,
512            group_filter_properties)
513        if not weighed_backends:
514            return []
515
516        return weighed_backends
517
518    def _schedule(self, context, request_spec, filter_properties=None):
519        weighed_backends = self._get_weighted_candidates(context, request_spec,
520                                                         filter_properties)
521        # When we get the weighed_backends, we clear those backends that don't
522        # match the resource's backend (it could be assigend from group,
523        # snapshot or volume).
524        resource_backend = request_spec.get('resource_backend')
525        if weighed_backends and resource_backend:
526            resource_backend_has_pool = bool(utils.extract_host(
527                resource_backend, 'pool'))
528            # Get host name including host@backend#pool info from
529            # weighed_backends.
530            for backend in weighed_backends[::-1]:
531                backend_id = (
532                    backend.obj.backend_id if resource_backend_has_pool
533                    else utils.extract_host(backend.obj.backend_id)
534                )
535                if backend_id != resource_backend:
536                    weighed_backends.remove(backend)
537        if not weighed_backends:
538            LOG.warning('No weighed backend found for volume '
539                        'with properties: %s',
540                        filter_properties['request_spec'].get('volume_type'))
541            return None
542        return self._choose_top_backend(weighed_backends, request_spec)
543
544    def _schedule_generic_group(self, context, group_spec, request_spec_list,
545                                group_filter_properties=None,
546                                filter_properties_list=None):
547        weighed_backends = self._get_weighted_candidates_generic_group(
548            context,
549            group_spec,
550            request_spec_list,
551            group_filter_properties,
552            filter_properties_list)
553        if not weighed_backends:
554            return None
555        return self._choose_top_backend_generic_group(weighed_backends)
556
557    def _choose_top_backend(self, weighed_backends, request_spec):
558        top_backend = weighed_backends[0]
559        backend_state = top_backend.obj
560        LOG.debug("Choosing %s", backend_state.backend_id)
561        volume_properties = request_spec['volume_properties']
562        backend_state.consume_from_volume(volume_properties)
563        return top_backend
564
565    def _choose_top_backend_generic_group(self, weighed_backends):
566        top_backend = weighed_backends[0]
567        backend_state = top_backend.obj
568        LOG.debug("Choosing %s", backend_state.backend_id)
569        return top_backend
570