1# Copyright (c) 2010 OpenStack Foundation 2# Copyright 2010 United States Government as represented by the 3# Administrator of the National Aeronautics and Space Administration. 4# All Rights Reserved. 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); you may 7# not use this file except in compliance with the License. You may obtain 8# a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 15# License for the specific language governing permissions and limitations 16# under the License. 17 18""" 19Scheduler Service 20""" 21 22import collections 23from datetime import datetime 24 25import eventlet 26from oslo_config import cfg 27from oslo_log import log as logging 28import oslo_messaging as messaging 29from oslo_service import periodic_task 30from oslo_utils import excutils 31from oslo_utils import importutils 32from oslo_utils import timeutils 33from oslo_utils import versionutils 34import six 35 36from cinder import context 37from cinder import db 38from cinder import exception 39from cinder import flow_utils 40from cinder.i18n import _ 41from cinder import manager 42from cinder.message import api as mess_api 43from cinder.message import message_field 44from cinder import objects 45from cinder.objects import fields 46from cinder import quota 47from cinder import rpc 48from cinder.scheduler.flows import create_volume 49from cinder.scheduler import rpcapi as scheduler_rpcapi 50from cinder.volume import rpcapi as volume_rpcapi 51 52 53scheduler_driver_opt = cfg.StrOpt('scheduler_driver', 54 default='cinder.scheduler.filter_scheduler.' 55 'FilterScheduler', 56 help='Default scheduler driver to use') 57 58CONF = cfg.CONF 59CONF.register_opt(scheduler_driver_opt) 60 61QUOTAS = quota.QUOTAS 62 63LOG = logging.getLogger(__name__) 64 65 66class SchedulerManager(manager.CleanableManager, manager.Manager): 67 """Chooses a host to create volumes.""" 68 69 RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION 70 71 target = messaging.Target(version=RPC_API_VERSION) 72 73 def __init__(self, scheduler_driver=None, service_name=None, 74 *args, **kwargs): 75 if not scheduler_driver: 76 scheduler_driver = CONF.scheduler_driver 77 self.driver = importutils.import_object(scheduler_driver) 78 super(SchedulerManager, self).__init__(*args, **kwargs) 79 self._startup_delay = True 80 self.volume_api = volume_rpcapi.VolumeAPI() 81 self.sch_api = scheduler_rpcapi.SchedulerAPI() 82 self.message_api = mess_api.API() 83 self.rpc_api_version = versionutils.convert_version_to_int( 84 self.RPC_API_VERSION) 85 86 def init_host_with_rpc(self): 87 ctxt = context.get_admin_context() 88 self.request_service_capabilities(ctxt) 89 90 eventlet.sleep(CONF.periodic_interval) 91 self._startup_delay = False 92 93 def reset(self): 94 super(SchedulerManager, self).reset() 95 self.volume_api = volume_rpcapi.VolumeAPI() 96 self.sch_api = scheduler_rpcapi.SchedulerAPI() 97 self.driver.reset() 98 99 @periodic_task.periodic_task(spacing=CONF.message_reap_interval, 100 run_immediately=True) 101 def _clean_expired_messages(self, context): 102 self.message_api.cleanup_expired_messages(context) 103 104 @periodic_task.periodic_task(spacing=CONF.reservation_clean_interval, 105 run_immediately=True) 106 def _clean_expired_reservation(self, context): 107 QUOTAS.expire(context) 108 109 def update_service_capabilities(self, context, service_name=None, 110 host=None, capabilities=None, 111 cluster_name=None, timestamp=None, 112 **kwargs): 113 """Process a capability update from a service node.""" 114 if capabilities is None: 115 capabilities = {} 116 # If we received the timestamp we have to deserialize it 117 elif timestamp: 118 timestamp = datetime.strptime(timestamp, 119 timeutils.PERFECT_TIME_FORMAT) 120 121 self.driver.update_service_capabilities(service_name, 122 host, 123 capabilities, 124 cluster_name, 125 timestamp) 126 127 def notify_service_capabilities(self, context, service_name, 128 capabilities, host=None, backend=None, 129 timestamp=None): 130 """Process a capability update from a service node.""" 131 # TODO(geguileo): On v4 remove host field. 132 if capabilities is None: 133 capabilities = {} 134 # If we received the timestamp we have to deserialize it 135 elif timestamp: 136 timestamp = datetime.strptime(timestamp, 137 timeutils.PERFECT_TIME_FORMAT) 138 backend = backend or host 139 self.driver.notify_service_capabilities(service_name, 140 backend, 141 capabilities, 142 timestamp) 143 144 def _wait_for_scheduler(self): 145 # NOTE(dulek): We're waiting for scheduler to announce that it's ready 146 # or CONF.periodic_interval seconds from service startup has passed. 147 while self._startup_delay and not self.driver.is_ready(): 148 eventlet.sleep(1) 149 150 def create_group(self, context, group, group_spec=None, 151 group_filter_properties=None, request_spec_list=None, 152 filter_properties_list=None): 153 self._wait_for_scheduler() 154 try: 155 self.driver.schedule_create_group( 156 context, group, 157 group_spec, 158 request_spec_list, 159 group_filter_properties, 160 filter_properties_list) 161 except exception.NoValidBackend: 162 LOG.error("Could not find a backend for group " 163 "%(group_id)s.", 164 {'group_id': group.id}) 165 group.status = fields.GroupStatus.ERROR 166 group.save() 167 except Exception: 168 with excutils.save_and_reraise_exception(): 169 LOG.exception("Failed to create generic group " 170 "%(group_id)s.", 171 {'group_id': group.id}) 172 group.status = fields.GroupStatus.ERROR 173 group.save() 174 175 @objects.Volume.set_workers 176 def create_volume(self, context, volume, snapshot_id=None, image_id=None, 177 request_spec=None, filter_properties=None, 178 backup_id=None): 179 self._wait_for_scheduler() 180 181 try: 182 flow_engine = create_volume.get_flow(context, 183 self.driver, 184 request_spec, 185 filter_properties, 186 volume, 187 snapshot_id, 188 image_id, 189 backup_id) 190 except Exception: 191 msg = _("Failed to create scheduler manager volume flow") 192 LOG.exception(msg) 193 raise exception.CinderException(msg) 194 195 with flow_utils.DynamicLogListener(flow_engine, logger=LOG): 196 flow_engine.run() 197 198 def create_snapshot(self, ctxt, volume, snapshot, backend, 199 request_spec=None, filter_properties=None): 200 """Create snapshot for a volume. 201 202 The main purpose of this method is to check if target 203 backend (of volume and snapshot) has sufficient capacity 204 to host to-be-created snapshot. 205 """ 206 self._wait_for_scheduler() 207 208 try: 209 tgt_backend = self.driver.backend_passes_filters( 210 ctxt, backend, request_spec, filter_properties) 211 tgt_backend.consume_from_volume( 212 {'size': request_spec['volume_properties']['size']}) 213 except exception.NoValidBackend as ex: 214 self._set_snapshot_state_and_notify('create_snapshot', 215 snapshot, 'error', 216 ctxt, ex, request_spec) 217 else: 218 volume_rpcapi.VolumeAPI().create_snapshot(ctxt, volume, 219 snapshot) 220 221 def _do_cleanup(self, ctxt, vo_resource): 222 # We can only receive cleanup requests for volumes, but we check anyway 223 # We need to cleanup the volume status for cases where the scheduler 224 # died while scheduling the volume creation. 225 if (isinstance(vo_resource, objects.Volume) and 226 vo_resource.status == 'creating'): 227 vo_resource.status = 'error' 228 vo_resource.save() 229 230 def request_service_capabilities(self, context): 231 volume_rpcapi.VolumeAPI().publish_service_capabilities(context) 232 233 def migrate_volume(self, context, volume, backend, force_copy, 234 request_spec, filter_properties): 235 """Ensure that the backend exists and can accept the volume.""" 236 self._wait_for_scheduler() 237 238 def _migrate_volume_set_error(self, context, ex, request_spec): 239 if volume.status == 'maintenance': 240 previous_status = ( 241 volume.previous_status or 'maintenance') 242 volume_state = {'volume_state': {'migration_status': 'error', 243 'status': previous_status}} 244 else: 245 volume_state = {'volume_state': {'migration_status': 'error'}} 246 self._set_volume_state_and_notify('migrate_volume_to_host', 247 volume_state, 248 context, ex, request_spec) 249 250 try: 251 tgt_backend = self.driver.backend_passes_filters(context, backend, 252 request_spec, 253 filter_properties) 254 except exception.NoValidBackend as ex: 255 _migrate_volume_set_error(self, context, ex, request_spec) 256 except Exception as ex: 257 with excutils.save_and_reraise_exception(): 258 _migrate_volume_set_error(self, context, ex, request_spec) 259 else: 260 volume_rpcapi.VolumeAPI().migrate_volume(context, volume, 261 tgt_backend, 262 force_copy) 263 264 # FIXME(geguileo): Remove this in v4.0 of RPC API. 265 def migrate_volume_to_host(self, context, volume, host, force_host_copy, 266 request_spec, filter_properties=None): 267 return self.migrate_volume(context, volume, host, force_host_copy, 268 request_spec, filter_properties) 269 270 def retype(self, context, volume, request_spec, filter_properties=None): 271 """Schedule the modification of a volume's type. 272 273 :param context: the request context 274 :param volume: the volume object to retype 275 :param request_spec: parameters for this retype request 276 :param filter_properties: parameters to filter by 277 """ 278 279 self._wait_for_scheduler() 280 281 def _retype_volume_set_error(self, context, ex, request_spec, 282 volume_ref, reservations, msg=None): 283 if reservations: 284 QUOTAS.rollback(context, reservations) 285 previous_status = ( 286 volume_ref.previous_status or volume_ref.status) 287 volume_state = {'volume_state': {'status': previous_status}} 288 self._set_volume_state_and_notify('retype', volume_state, 289 context, ex, request_spec, msg) 290 291 reservations = request_spec.get('quota_reservations') 292 old_reservations = request_spec.get('old_reservations', None) 293 new_type = request_spec.get('volume_type') 294 if new_type is None: 295 msg = _('New volume type not specified in request_spec.') 296 ex = exception.ParameterNotFound(param='volume_type') 297 _retype_volume_set_error(self, context, ex, request_spec, 298 volume, reservations, msg) 299 300 # Default migration policy is 'never' 301 migration_policy = request_spec.get('migration_policy') 302 if not migration_policy: 303 migration_policy = 'never' 304 305 try: 306 tgt_backend = self.driver.find_retype_backend(context, 307 request_spec, 308 filter_properties, 309 migration_policy) 310 except Exception as ex: 311 # Not having a valid host is an expected exception, so we don't 312 # reraise on it. 313 reraise = not isinstance(ex, exception.NoValidBackend) 314 with excutils.save_and_reraise_exception(reraise=reraise): 315 _retype_volume_set_error(self, context, ex, request_spec, 316 volume, reservations) 317 else: 318 volume_rpcapi.VolumeAPI().retype(context, volume, 319 new_type['id'], tgt_backend, 320 migration_policy, 321 reservations, 322 old_reservations) 323 324 def manage_existing(self, context, volume, request_spec, 325 filter_properties=None): 326 """Ensure that the host exists and can accept the volume.""" 327 328 self._wait_for_scheduler() 329 330 def _manage_existing_set_error(self, context, ex, request_spec): 331 volume_state = {'volume_state': {'status': 'error_managing'}} 332 self._set_volume_state_and_notify('manage_existing', volume_state, 333 context, ex, request_spec) 334 335 try: 336 backend = self.driver.backend_passes_filters( 337 context, volume.service_topic_queue, request_spec, 338 filter_properties) 339 340 # At the API we didn't have the pool info, so the volume DB entry 341 # was created without it, now we add it. 342 volume.host = backend.host 343 volume.cluster_name = backend.cluster_name 344 volume.save() 345 346 except exception.NoValidBackend as ex: 347 _manage_existing_set_error(self, context, ex, request_spec) 348 except Exception as ex: 349 with excutils.save_and_reraise_exception(): 350 _manage_existing_set_error(self, context, ex, request_spec) 351 else: 352 volume_rpcapi.VolumeAPI().manage_existing(context, volume, 353 request_spec.get('ref')) 354 355 def get_pools(self, context, filters=None): 356 """Get active pools from scheduler's cache. 357 358 NOTE(dulek): There's no self._wait_for_scheduler() because get_pools is 359 an RPC call (is blocking for the c-api). Also this is admin-only API 360 extension so it won't hurt the user much to retry the request manually. 361 """ 362 return self.driver.get_pools(context, filters) 363 364 def validate_host_capacity(self, context, backend, request_spec, 365 filter_properties): 366 try: 367 backend_state = self.driver.backend_passes_filters( 368 context, 369 backend, 370 request_spec, filter_properties) 371 backend_state.consume_from_volume( 372 {'size': request_spec['volume_properties']['size']}) 373 except exception.NoValidBackend: 374 LOG.error("Desired host %(host)s does not have enough " 375 "capacity.", {'host': backend}) 376 return False 377 return True 378 379 def extend_volume(self, context, volume, new_size, reservations, 380 request_spec=None, filter_properties=None): 381 382 def _extend_volume_set_error(self, context, ex, request_spec): 383 volume_state = {'volume_state': {'status': volume.previous_status, 384 'previous_status': None}} 385 self._set_volume_state_and_notify('extend_volume', volume_state, 386 context, ex, request_spec) 387 388 if not filter_properties: 389 filter_properties = {} 390 391 filter_properties['new_size'] = new_size 392 try: 393 backend_state = self.driver.backend_passes_filters( 394 context, 395 volume.service_topic_queue, 396 request_spec, filter_properties) 397 backend_state.consume_from_volume( 398 {'size': new_size - volume.size}) 399 volume_rpcapi.VolumeAPI().extend_volume(context, volume, new_size, 400 reservations) 401 except exception.NoValidBackend as ex: 402 QUOTAS.rollback(context, reservations, 403 project_id=volume.project_id) 404 _extend_volume_set_error(self, context, ex, request_spec) 405 self.message_api.create( 406 context, 407 message_field.Action.EXTEND_VOLUME, 408 resource_uuid=volume.id, 409 exception=ex) 410 411 def _set_volume_state_and_notify(self, method, updates, context, ex, 412 request_spec, msg=None): 413 # TODO(harlowja): move into a task that just does this later. 414 if not msg: 415 msg = ("Failed to schedule_%(method)s: %(ex)s" % 416 {'method': method, 'ex': six.text_type(ex)}) 417 LOG.error(msg) 418 419 volume_state = updates['volume_state'] 420 properties = request_spec.get('volume_properties', {}) 421 422 volume_id = request_spec.get('volume_id', None) 423 424 if volume_id: 425 db.volume_update(context, volume_id, volume_state) 426 427 if volume_state.get('status') == 'error_managing': 428 volume_state['status'] = 'error' 429 430 payload = dict(request_spec=request_spec, 431 volume_properties=properties, 432 volume_id=volume_id, 433 state=volume_state, 434 method=method, 435 reason=ex) 436 437 rpc.get_notifier("scheduler").error(context, 438 'scheduler.' + method, 439 payload) 440 441 def _set_snapshot_state_and_notify(self, method, snapshot, state, 442 context, ex, request_spec, 443 msg=None): 444 if not msg: 445 msg = ("Failed to schedule_%(method)s: %(ex)s" % 446 {'method': method, 'ex': six.text_type(ex)}) 447 LOG.error(msg) 448 449 model_update = dict(status=state) 450 snapshot.update(model_update) 451 snapshot.save() 452 453 payload = dict(request_spec=request_spec, 454 snapshot_id=snapshot.id, 455 state=state, 456 method=method, 457 reason=ex) 458 459 rpc.get_notifier("scheduler").error(context, 460 'scheduler.' + method, 461 payload) 462 463 @property 464 def upgrading_cloud(self): 465 min_version_str = self.sch_api.determine_rpc_version_cap() 466 min_version = versionutils.convert_version_to_int(min_version_str) 467 return min_version < self.rpc_api_version 468 469 def _cleanup_destination(self, clusters, service): 470 """Determines the RPC method, destination service and name. 471 472 The name is only used for logging, and it is the topic queue. 473 """ 474 # For the scheduler we don't have a specific destination, as any 475 # scheduler will do and we know we are up, since we are running this 476 # code. 477 if service.binary == 'cinder-scheduler': 478 cleanup_rpc = self.sch_api.do_cleanup 479 dest = None 480 dest_name = service.host 481 else: 482 cleanup_rpc = self.volume_api.do_cleanup 483 484 # For clustered volume services we try to get info from the cache. 485 if service.is_clustered: 486 # Get cluster info from cache 487 dest = clusters[service.binary].get(service.cluster_name) 488 # Cache miss forces us to get the cluster from the DB via OVO 489 if not dest: 490 dest = service.cluster 491 clusters[service.binary][service.cluster_name] = dest 492 dest_name = dest.name 493 # Non clustered volume services 494 else: 495 dest = service 496 dest_name = service.host 497 return cleanup_rpc, dest, dest_name 498 499 def work_cleanup(self, context, cleanup_request): 500 """Process request from API to do cleanup on services. 501 502 Here we retrieve from the DB which services we want to clean up based 503 on the request from the user. 504 505 Then send individual cleanup requests to each of the services that are 506 up, and we finally return a tuple with services that we have sent a 507 cleanup request and those that were not up and we couldn't send it. 508 """ 509 if self.upgrading_cloud: 510 raise exception.UnavailableDuringUpgrade(action='workers cleanup') 511 512 LOG.info('Workers cleanup request started.') 513 514 filters = dict(service_id=cleanup_request.service_id, 515 cluster_name=cleanup_request.cluster_name, 516 host=cleanup_request.host, 517 binary=cleanup_request.binary, 518 is_up=cleanup_request.is_up, 519 disabled=cleanup_request.disabled) 520 # Get the list of all the services that match the request 521 services = objects.ServiceList.get_all(context, filters) 522 523 until = cleanup_request.until or timeutils.utcnow() 524 requested = [] 525 not_requested = [] 526 527 # To reduce DB queries we'll cache the clusters data 528 clusters = collections.defaultdict(dict) 529 530 for service in services: 531 cleanup_request.cluster_name = service.cluster_name 532 cleanup_request.service_id = service.id 533 cleanup_request.host = service.host 534 cleanup_request.binary = service.binary 535 cleanup_request.until = until 536 537 cleanup_rpc, dest, dest_name = self._cleanup_destination(clusters, 538 service) 539 540 # If it's a scheduler or the service is up, send the request. 541 if not dest or dest.is_up: 542 LOG.info('Sending cleanup for %(binary)s %(dest_name)s.', 543 {'binary': service.binary, 544 'dest_name': dest_name}) 545 cleanup_rpc(context, cleanup_request) 546 requested.append(service) 547 # We don't send cleanup requests when there are no services alive 548 # to do the cleanup. 549 else: 550 LOG.info('No service available to cleanup %(binary)s ' 551 '%(dest_name)s.', 552 {'binary': service.binary, 553 'dest_name': dest_name}) 554 not_requested.append(service) 555 556 LOG.info('Cleanup requests completed.') 557 return requested, not_requested 558