1# Copyright (c) 2011 Intel Corporation 2# Copyright (c) 2011 OpenStack Foundation 3# All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17"""The FilterScheduler is for creating volumes. 18 19You can customize this scheduler by specifying your own volume Filters and 20Weighing Functions. 21""" 22 23from oslo_config import cfg 24from oslo_log import log as logging 25from oslo_serialization import jsonutils 26 27from cinder import exception 28from cinder.i18n import _ 29from cinder.scheduler import driver 30from cinder.scheduler import scheduler_options 31from cinder.volume import utils 32 33CONF = cfg.CONF 34LOG = logging.getLogger(__name__) 35 36 37class FilterScheduler(driver.Scheduler): 38 """Scheduler that can be used for filtering and weighing.""" 39 def __init__(self, *args, **kwargs): 40 super(FilterScheduler, self).__init__(*args, **kwargs) 41 self.cost_function_cache = None 42 self.options = scheduler_options.SchedulerOptions() 43 self.max_attempts = self._max_attempts() 44 45 def schedule(self, context, topic, method, *args, **kwargs): 46 """Schedule contract that returns best-suited host for this request.""" 47 self._schedule(context, topic, *args, **kwargs) 48 49 def _get_configuration_options(self): 50 """Fetch options dictionary. Broken out for testing.""" 51 return self.options.get_configuration() 52 53 def populate_filter_properties(self, request_spec, filter_properties): 54 """Stuff things into filter_properties. 55 56 Can be overridden in a subclass to add more data. 57 """ 58 vol = request_spec['volume_properties'] 59 filter_properties['size'] = vol['size'] 60 filter_properties['availability_zone'] = vol.get('availability_zone') 61 filter_properties['user_id'] = vol.get('user_id') 62 filter_properties['metadata'] = vol.get('metadata') 63 filter_properties['qos_specs'] = vol.get('qos_specs') 64 65 def schedule_create_group(self, context, group, 66 group_spec, 67 request_spec_list, 68 group_filter_properties, 69 filter_properties_list): 70 weighed_backend = self._schedule_generic_group( 71 context, 72 group_spec, 73 request_spec_list, 74 group_filter_properties, 75 filter_properties_list) 76 77 if not weighed_backend: 78 raise exception.NoValidBackend(reason=_("No weighed backends " 79 "available")) 80 81 backend = weighed_backend.obj 82 83 updated_group = driver.generic_group_update_db(context, group, 84 backend.host, 85 backend.cluster_name) 86 87 self.volume_rpcapi.create_group(context, updated_group) 88 89 def schedule_create_volume(self, context, request_spec, filter_properties): 90 backend = self._schedule(context, request_spec, filter_properties) 91 92 if not backend: 93 raise exception.NoValidBackend(reason=_("No weighed backends " 94 "available")) 95 96 backend = backend.obj 97 volume_id = request_spec['volume_id'] 98 99 updated_volume = driver.volume_update_db(context, volume_id, 100 backend.host, 101 backend.cluster_name) 102 self._post_select_populate_filter_properties(filter_properties, 103 backend) 104 105 # context is not serializable 106 filter_properties.pop('context', None) 107 108 self.volume_rpcapi.create_volume(context, updated_volume, request_spec, 109 filter_properties, 110 allow_reschedule=True) 111 112 def backend_passes_filters(self, context, backend, request_spec, 113 filter_properties): 114 """Check if the specified backend passes the filters.""" 115 weighed_backends = self._get_weighted_candidates(context, request_spec, 116 filter_properties) 117 # If backend has no pool defined we will ignore it in the comparison 118 ignore_pool = not bool(utils.extract_host(backend, 'pool')) 119 for weighed_backend in weighed_backends: 120 backend_id = weighed_backend.obj.backend_id 121 if ignore_pool: 122 backend_id = utils.extract_host(backend_id) 123 if backend_id == backend: 124 return weighed_backend.obj 125 126 reason_param = {'resource': 'volume', 127 'id': '??id missing??', 128 'backend': backend} 129 for resource in ['volume', 'group', 'snapshot']: 130 resource_id = request_spec.get('%s_id' % resource, None) 131 if resource_id: 132 reason_param.update({'resource': resource, 'id': resource_id}) 133 break 134 raise exception.NoValidBackend(_('Cannot place %(resource)s %(id)s ' 135 'on %(backend)s.') % reason_param) 136 137 def find_retype_backend(self, context, request_spec, 138 filter_properties=None, migration_policy='never'): 139 """Find a backend that can accept the volume with its new type.""" 140 filter_properties = filter_properties or {} 141 backend = (request_spec['volume_properties'].get('cluster_name') 142 or request_spec['volume_properties']['host']) 143 144 # The volume already exists on this backend, and so we shouldn't check 145 # if it can accept the volume again in the CapacityFilter. 146 filter_properties['vol_exists_on'] = backend 147 148 weighed_backends = self._get_weighted_candidates(context, request_spec, 149 filter_properties) 150 if not weighed_backends: 151 raise exception.NoValidBackend( 152 reason=_('No valid backends for volume %(id)s with type ' 153 '%(type)s') % {'id': request_spec['volume_id'], 154 'type': request_spec['volume_type']}) 155 156 for weighed_backend in weighed_backends: 157 backend_state = weighed_backend.obj 158 if backend_state.backend_id == backend: 159 return backend_state 160 161 if utils.extract_host(backend, 'pool') is None: 162 # legacy volumes created before pool is introduced has no pool 163 # info in host. But host_state.host always include pool level 164 # info. In this case if above exact match didn't work out, we 165 # find host_state that are of the same host of volume being 166 # retyped. In other words, for legacy volumes, retyping could 167 # cause migration between pools on same host, which we consider 168 # it is different from migration between hosts thus allow that 169 # to happen even migration policy is 'never'. 170 for weighed_backend in weighed_backends: 171 backend_state = weighed_backend.obj 172 new_backend = utils.extract_host(backend_state.backend_id, 173 'backend') 174 if new_backend == backend: 175 return backend_state 176 177 if migration_policy == 'never': 178 raise exception.NoValidBackend( 179 reason=_('Current backend not valid for volume %(id)s with ' 180 'type %(type)s, migration not allowed') % 181 {'id': request_spec['volume_id'], 182 'type': request_spec['volume_type']}) 183 184 top_backend = self._choose_top_backend(weighed_backends, request_spec) 185 return top_backend.obj 186 187 def get_pools(self, context, filters): 188 return self.host_manager.get_pools(context, filters) 189 190 def _post_select_populate_filter_properties(self, filter_properties, 191 backend_state): 192 """Populate filter properties with additional information. 193 194 Add additional information to the filter properties after a backend has 195 been selected by the scheduling process. 196 """ 197 # Add a retry entry for the selected volume backend: 198 self._add_retry_backend(filter_properties, backend_state.backend_id) 199 200 def _add_retry_backend(self, filter_properties, backend): 201 """Add a retry entry for the selected volume backend. 202 203 In the event that the request gets re-scheduled, this entry will signal 204 that the given backend has already been tried. 205 """ 206 retry = filter_properties.get('retry', None) 207 if not retry: 208 return 209 # TODO(geguileo): In P - change to only use backends 210 for key in ('hosts', 'backends'): 211 backends = retry.get(key) 212 if backends is not None: 213 backends.append(backend) 214 215 def _max_attempts(self): 216 max_attempts = CONF.scheduler_max_attempts 217 if max_attempts < 1: 218 raise exception.InvalidParameterValue( 219 err=_("Invalid value for 'scheduler_max_attempts', " 220 "must be >=1")) 221 return max_attempts 222 223 def _log_volume_error(self, volume_id, retry): 224 """Log requests with exceptions from previous volume operations.""" 225 exc = retry.pop('exc', None) # string-ified exception from volume 226 if not exc: 227 return # no exception info from a previous attempt, skip 228 229 # TODO(geguileo): In P - change to hosts = retry.get('backends') 230 backends = retry.get('backends', retry.get('hosts')) 231 if not backends: 232 return # no previously attempted hosts, skip 233 234 last_backend = backends[-1] 235 LOG.error("Error scheduling %(volume_id)s from last vol-service: " 236 "%(last_backend)s : %(exc)s", 237 {'volume_id': volume_id, 238 'last_backend': last_backend, 239 'exc': exc}) 240 241 def _populate_retry(self, filter_properties, properties): 242 """Populate filter properties with history of retries for request. 243 244 If maximum retries is exceeded, raise NoValidBackend. 245 """ 246 max_attempts = self.max_attempts 247 retry = filter_properties.pop('retry', {}) 248 249 if max_attempts == 1: 250 # re-scheduling is disabled. 251 return 252 253 # retry is enabled, update attempt count: 254 if retry: 255 retry['num_attempts'] += 1 256 else: 257 retry = { 258 'num_attempts': 1, 259 'backends': [], # list of volume service backends tried 260 'hosts': [] # TODO(geguileo): Remove in P and leave backends 261 } 262 filter_properties['retry'] = retry 263 264 volume_id = properties.get('volume_id') 265 self._log_volume_error(volume_id, retry) 266 267 if retry['num_attempts'] > max_attempts: 268 raise exception.NoValidBackend( 269 reason=_("Exceeded max scheduling attempts %(max_attempts)d " 270 "for volume %(volume_id)s") % 271 {'max_attempts': max_attempts, 272 'volume_id': volume_id}) 273 274 def _get_weighted_candidates(self, context, request_spec, 275 filter_properties=None): 276 """Return a list of backends that meet required specs. 277 278 Returned list is ordered by their fitness. 279 """ 280 elevated = context.elevated() 281 282 # Since Cinder is using mixed filters from Oslo and it's own, which 283 # takes 'resource_XX' and 'volume_XX' as input respectively, copying 284 # 'volume_XX' to 'resource_XX' will make both filters happy. 285 volume_type = request_spec.get("volume_type") 286 resource_type = volume_type if volume_type is not None else {} 287 288 config_options = self._get_configuration_options() 289 290 if filter_properties is None: 291 filter_properties = {} 292 self._populate_retry(filter_properties, 293 request_spec['volume_properties']) 294 295 request_spec_dict = jsonutils.to_primitive(request_spec) 296 297 filter_properties.update({'context': context, 298 'request_spec': request_spec_dict, 299 'config_options': config_options, 300 'volume_type': volume_type, 301 'resource_type': resource_type}) 302 303 self.populate_filter_properties(request_spec, 304 filter_properties) 305 306 # If multiattach is enabled on a volume, we need to add 307 # multiattach to extra specs, so that the capability 308 # filtering is enabled. 309 multiattach = request_spec['volume_properties'].get('multiattach', 310 False) 311 if multiattach and 'multiattach' not in resource_type.get( 312 'extra_specs', {}): 313 if 'extra_specs' not in resource_type: 314 resource_type['extra_specs'] = {} 315 316 resource_type['extra_specs'].update( 317 multiattach='<is> True') 318 319 # Revert volume consumed capacity if it's a rescheduled request 320 retry = filter_properties.get('retry', {}) 321 if retry.get('backends', []): 322 self.host_manager.revert_volume_consumed_capacity( 323 retry['backends'][-1], 324 request_spec['volume_properties']['size']) 325 # Find our local list of acceptable backends by filtering and 326 # weighing our options. we virtually consume resources on 327 # it so subsequent selections can adjust accordingly. 328 329 # Note: remember, we are using an iterator here. So only 330 # traverse this list once. 331 backends = self.host_manager.get_all_backend_states(elevated) 332 333 # Filter local hosts based on requirements ... 334 backends = self.host_manager.get_filtered_backends(backends, 335 filter_properties) 336 if not backends: 337 return [] 338 339 LOG.debug("Filtered %s", backends) 340 # weighted_backends = WeightedHost() ... the best 341 # backend for the job. 342 weighed_backends = self.host_manager.get_weighed_backends( 343 backends, filter_properties) 344 return weighed_backends 345 346 def _get_weighted_candidates_generic_group( 347 self, context, group_spec, request_spec_list, 348 group_filter_properties=None, 349 filter_properties_list=None): 350 """Finds backends that supports the group. 351 352 Returns a list of backends that meet the required specs, 353 ordered by their fitness. 354 """ 355 elevated = context.elevated() 356 357 backends_by_group_type = self._get_weighted_candidates_by_group_type( 358 context, group_spec, group_filter_properties) 359 360 weighed_backends = [] 361 backends_by_vol_type = [] 362 index = 0 363 for request_spec in request_spec_list: 364 volume_properties = request_spec['volume_properties'] 365 # Since Cinder is using mixed filters from Oslo and it's own, which 366 # takes 'resource_XX' and 'volume_XX' as input respectively, 367 # copying 'volume_XX' to 'resource_XX' will make both filters 368 # happy. 369 resource_properties = volume_properties.copy() 370 volume_type = request_spec.get("volume_type", None) 371 resource_type = request_spec.get("volume_type", None) 372 request_spec.update({'resource_properties': resource_properties}) 373 374 config_options = self._get_configuration_options() 375 376 filter_properties = {} 377 if filter_properties_list: 378 filter_properties = filter_properties_list[index] 379 if filter_properties is None: 380 filter_properties = {} 381 self._populate_retry(filter_properties, resource_properties) 382 383 # Add group_support in extra_specs if it is not there. 384 # Make sure it is populated in filter_properties 385 # if 'group_support' not in resource_type.get( 386 # 'extra_specs', {}): 387 # resource_type['extra_specs'].update( 388 # group_support='<is> True') 389 390 filter_properties.update({'context': context, 391 'request_spec': request_spec, 392 'config_options': config_options, 393 'volume_type': volume_type, 394 'resource_type': resource_type}) 395 396 self.populate_filter_properties(request_spec, 397 filter_properties) 398 399 # Find our local list of acceptable backends by filtering and 400 # weighing our options. we virtually consume resources on 401 # it so subsequent selections can adjust accordingly. 402 403 # Note: remember, we are using an iterator here. So only 404 # traverse this list once. 405 all_backends = self.host_manager.get_all_backend_states(elevated) 406 if not all_backends: 407 return [] 408 409 # Filter local backends based on requirements ... 410 backends = self.host_manager.get_filtered_backends( 411 all_backends, filter_properties) 412 413 if not backends: 414 return [] 415 416 LOG.debug("Filtered %s", backends) 417 418 # weighted_backend = WeightedHost() ... the best 419 # backend for the job. 420 temp_weighed_backends = self.host_manager.get_weighed_backends( 421 backends, 422 filter_properties) 423 if not temp_weighed_backends: 424 return [] 425 if index == 0: 426 backends_by_vol_type = temp_weighed_backends 427 else: 428 backends_by_vol_type = self._find_valid_backends( 429 backends_by_vol_type, temp_weighed_backends) 430 if not backends_by_vol_type: 431 return [] 432 433 index += 1 434 435 # Find backends selected by both the group type and volume types. 436 weighed_backends = self._find_valid_backends(backends_by_vol_type, 437 backends_by_group_type) 438 439 return weighed_backends 440 441 def _find_valid_backends(self, backend_list1, backend_list2): 442 new_backends = [] 443 for backend1 in backend_list1: 444 for backend2 in backend_list2: 445 # Should schedule creation of group on backend level, 446 # not pool level. 447 if (utils.extract_host(backend1.obj.backend_id) == 448 utils.extract_host(backend2.obj.backend_id)): 449 new_backends.append(backend1) 450 if not new_backends: 451 return [] 452 return new_backends 453 454 def _get_weighted_candidates_by_group_type( 455 self, context, group_spec, 456 group_filter_properties=None): 457 """Finds backends that supports the group type. 458 459 Returns a list of backends that meet the required specs, 460 ordered by their fitness. 461 """ 462 elevated = context.elevated() 463 464 weighed_backends = [] 465 volume_properties = group_spec['volume_properties'] 466 # Since Cinder is using mixed filters from Oslo and it's own, which 467 # takes 'resource_XX' and 'volume_XX' as input respectively, 468 # copying 'volume_XX' to 'resource_XX' will make both filters 469 # happy. 470 resource_properties = volume_properties.copy() 471 group_type = group_spec.get("group_type", None) 472 resource_type = group_spec.get("group_type", None) 473 group_spec.update({'resource_properties': resource_properties}) 474 475 config_options = self._get_configuration_options() 476 477 if group_filter_properties is None: 478 group_filter_properties = {} 479 self._populate_retry(group_filter_properties, resource_properties) 480 481 group_filter_properties.update({'context': context, 482 'request_spec': group_spec, 483 'config_options': config_options, 484 'group_type': group_type, 485 'resource_type': resource_type}) 486 487 self.populate_filter_properties(group_spec, 488 group_filter_properties) 489 490 # Find our local list of acceptable backends by filtering and 491 # weighing our options. we virtually consume resources on 492 # it so subsequent selections can adjust accordingly. 493 494 # Note: remember, we are using an iterator here. So only 495 # traverse this list once. 496 all_backends = self.host_manager.get_all_backend_states(elevated) 497 if not all_backends: 498 return [] 499 500 # Filter local backends based on requirements ... 501 backends = self.host_manager.get_filtered_backends( 502 all_backends, group_filter_properties) 503 504 if not backends: 505 return [] 506 507 LOG.debug("Filtered %s", backends) 508 509 # weighted_backends = WeightedHost() ... the best backend for the job. 510 weighed_backends = self.host_manager.get_weighed_backends( 511 backends, 512 group_filter_properties) 513 if not weighed_backends: 514 return [] 515 516 return weighed_backends 517 518 def _schedule(self, context, request_spec, filter_properties=None): 519 weighed_backends = self._get_weighted_candidates(context, request_spec, 520 filter_properties) 521 # When we get the weighed_backends, we clear those backends that don't 522 # match the resource's backend (it could be assigend from group, 523 # snapshot or volume). 524 resource_backend = request_spec.get('resource_backend') 525 if weighed_backends and resource_backend: 526 resource_backend_has_pool = bool(utils.extract_host( 527 resource_backend, 'pool')) 528 # Get host name including host@backend#pool info from 529 # weighed_backends. 530 for backend in weighed_backends[::-1]: 531 backend_id = ( 532 backend.obj.backend_id if resource_backend_has_pool 533 else utils.extract_host(backend.obj.backend_id) 534 ) 535 if backend_id != resource_backend: 536 weighed_backends.remove(backend) 537 if not weighed_backends: 538 LOG.warning('No weighed backend found for volume ' 539 'with properties: %s', 540 filter_properties['request_spec'].get('volume_type')) 541 return None 542 return self._choose_top_backend(weighed_backends, request_spec) 543 544 def _schedule_generic_group(self, context, group_spec, request_spec_list, 545 group_filter_properties=None, 546 filter_properties_list=None): 547 weighed_backends = self._get_weighted_candidates_generic_group( 548 context, 549 group_spec, 550 request_spec_list, 551 group_filter_properties, 552 filter_properties_list) 553 if not weighed_backends: 554 return None 555 return self._choose_top_backend_generic_group(weighed_backends) 556 557 def _choose_top_backend(self, weighed_backends, request_spec): 558 top_backend = weighed_backends[0] 559 backend_state = top_backend.obj 560 LOG.debug("Choosing %s", backend_state.backend_id) 561 volume_properties = request_spec['volume_properties'] 562 backend_state.consume_from_volume(volume_properties) 563 return top_backend 564 565 def _choose_top_backend_generic_group(self, weighed_backends): 566 top_backend = weighed_backends[0] 567 backend_state = top_backend.obj 568 LOG.debug("Choosing %s", backend_state.backend_id) 569 return top_backend 570