1# Copyright (c) 2010 OpenStack Foundation
2# Copyright 2010 United States Government as represented by the
3# Administrator of the National Aeronautics and Space Administration.
4# All Rights Reserved.
5#
6#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7#    not use this file except in compliance with the License. You may obtain
8#    a copy of the License at
9#
10#         http://www.apache.org/licenses/LICENSE-2.0
11#
12#    Unless required by applicable law or agreed to in writing, software
13#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15#    License for the specific language governing permissions and limitations
16#    under the License.
17
18"""
19Scheduler Service
20"""
21
22import collections
23from datetime import datetime
24
25import eventlet
26from oslo_config import cfg
27from oslo_log import log as logging
28import oslo_messaging as messaging
29from oslo_service import periodic_task
30from oslo_utils import excutils
31from oslo_utils import importutils
32from oslo_utils import timeutils
33from oslo_utils import versionutils
34import six
35
36from cinder import context
37from cinder import db
38from cinder import exception
39from cinder import flow_utils
40from cinder.i18n import _
41from cinder import manager
42from cinder.message import api as mess_api
43from cinder.message import message_field
44from cinder import objects
45from cinder.objects import fields
46from cinder import quota
47from cinder import rpc
48from cinder.scheduler.flows import create_volume
49from cinder.scheduler import rpcapi as scheduler_rpcapi
50from cinder.volume import rpcapi as volume_rpcapi
51
52
53scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
54                                  default='cinder.scheduler.filter_scheduler.'
55                                          'FilterScheduler',
56                                  help='Default scheduler driver to use')
57
58CONF = cfg.CONF
59CONF.register_opt(scheduler_driver_opt)
60
61QUOTAS = quota.QUOTAS
62
63LOG = logging.getLogger(__name__)
64
65
66class SchedulerManager(manager.CleanableManager, manager.Manager):
67    """Chooses a host to create volumes."""
68
69    RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION
70
71    target = messaging.Target(version=RPC_API_VERSION)
72
73    def __init__(self, scheduler_driver=None, service_name=None,
74                 *args, **kwargs):
75        if not scheduler_driver:
76            scheduler_driver = CONF.scheduler_driver
77        self.driver = importutils.import_object(scheduler_driver)
78        super(SchedulerManager, self).__init__(*args, **kwargs)
79        self._startup_delay = True
80        self.volume_api = volume_rpcapi.VolumeAPI()
81        self.sch_api = scheduler_rpcapi.SchedulerAPI()
82        self.message_api = mess_api.API()
83        self.rpc_api_version = versionutils.convert_version_to_int(
84            self.RPC_API_VERSION)
85
86    def init_host_with_rpc(self):
87        ctxt = context.get_admin_context()
88        self.request_service_capabilities(ctxt)
89
90        eventlet.sleep(CONF.periodic_interval)
91        self._startup_delay = False
92
93    def reset(self):
94        super(SchedulerManager, self).reset()
95        self.volume_api = volume_rpcapi.VolumeAPI()
96        self.sch_api = scheduler_rpcapi.SchedulerAPI()
97        self.driver.reset()
98
99    @periodic_task.periodic_task(spacing=CONF.message_reap_interval,
100                                 run_immediately=True)
101    def _clean_expired_messages(self, context):
102        self.message_api.cleanup_expired_messages(context)
103
104    @periodic_task.periodic_task(spacing=CONF.reservation_clean_interval,
105                                 run_immediately=True)
106    def _clean_expired_reservation(self, context):
107        QUOTAS.expire(context)
108
109    def update_service_capabilities(self, context, service_name=None,
110                                    host=None, capabilities=None,
111                                    cluster_name=None, timestamp=None,
112                                    **kwargs):
113        """Process a capability update from a service node."""
114        if capabilities is None:
115            capabilities = {}
116        # If we received the timestamp we have to deserialize it
117        elif timestamp:
118            timestamp = datetime.strptime(timestamp,
119                                          timeutils.PERFECT_TIME_FORMAT)
120
121        self.driver.update_service_capabilities(service_name,
122                                                host,
123                                                capabilities,
124                                                cluster_name,
125                                                timestamp)
126
127    def notify_service_capabilities(self, context, service_name,
128                                    capabilities, host=None, backend=None,
129                                    timestamp=None):
130        """Process a capability update from a service node."""
131        # TODO(geguileo): On v4 remove host field.
132        if capabilities is None:
133            capabilities = {}
134        # If we received the timestamp we have to deserialize it
135        elif timestamp:
136            timestamp = datetime.strptime(timestamp,
137                                          timeutils.PERFECT_TIME_FORMAT)
138        backend = backend or host
139        self.driver.notify_service_capabilities(service_name,
140                                                backend,
141                                                capabilities,
142                                                timestamp)
143
144    def _wait_for_scheduler(self):
145        # NOTE(dulek): We're waiting for scheduler to announce that it's ready
146        # or CONF.periodic_interval seconds from service startup has passed.
147        while self._startup_delay and not self.driver.is_ready():
148            eventlet.sleep(1)
149
150    def create_group(self, context, group, group_spec=None,
151                     group_filter_properties=None, request_spec_list=None,
152                     filter_properties_list=None):
153        self._wait_for_scheduler()
154        try:
155            self.driver.schedule_create_group(
156                context, group,
157                group_spec,
158                request_spec_list,
159                group_filter_properties,
160                filter_properties_list)
161        except exception.NoValidBackend:
162            LOG.error("Could not find a backend for group "
163                      "%(group_id)s.",
164                      {'group_id': group.id})
165            group.status = fields.GroupStatus.ERROR
166            group.save()
167        except Exception:
168            with excutils.save_and_reraise_exception():
169                LOG.exception("Failed to create generic group "
170                              "%(group_id)s.",
171                              {'group_id': group.id})
172                group.status = fields.GroupStatus.ERROR
173                group.save()
174
175    @objects.Volume.set_workers
176    def create_volume(self, context, volume, snapshot_id=None, image_id=None,
177                      request_spec=None, filter_properties=None,
178                      backup_id=None):
179        self._wait_for_scheduler()
180
181        try:
182            flow_engine = create_volume.get_flow(context,
183                                                 self.driver,
184                                                 request_spec,
185                                                 filter_properties,
186                                                 volume,
187                                                 snapshot_id,
188                                                 image_id,
189                                                 backup_id)
190        except Exception:
191            msg = _("Failed to create scheduler manager volume flow")
192            LOG.exception(msg)
193            raise exception.CinderException(msg)
194
195        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
196            flow_engine.run()
197
198    def create_snapshot(self, ctxt, volume, snapshot, backend,
199                        request_spec=None, filter_properties=None):
200        """Create snapshot for a volume.
201
202        The main purpose of this method is to check if target
203        backend (of volume and snapshot) has sufficient capacity
204        to host to-be-created snapshot.
205        """
206        self._wait_for_scheduler()
207
208        try:
209            tgt_backend = self.driver.backend_passes_filters(
210                ctxt, backend, request_spec, filter_properties)
211            tgt_backend.consume_from_volume(
212                {'size': request_spec['volume_properties']['size']})
213        except exception.NoValidBackend as ex:
214            self._set_snapshot_state_and_notify('create_snapshot',
215                                                snapshot, 'error',
216                                                ctxt, ex, request_spec)
217        else:
218            volume_rpcapi.VolumeAPI().create_snapshot(ctxt, volume,
219                                                      snapshot)
220
221    def _do_cleanup(self, ctxt, vo_resource):
222        # We can only receive cleanup requests for volumes, but we check anyway
223        # We need to cleanup the volume status for cases where the scheduler
224        # died while scheduling the volume creation.
225        if (isinstance(vo_resource, objects.Volume) and
226                vo_resource.status == 'creating'):
227            vo_resource.status = 'error'
228            vo_resource.save()
229
230    def request_service_capabilities(self, context):
231        volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
232
233    def migrate_volume(self, context, volume, backend, force_copy,
234                       request_spec, filter_properties):
235        """Ensure that the backend exists and can accept the volume."""
236        self._wait_for_scheduler()
237
238        def _migrate_volume_set_error(self, context, ex, request_spec):
239            if volume.status == 'maintenance':
240                previous_status = (
241                    volume.previous_status or 'maintenance')
242                volume_state = {'volume_state': {'migration_status': 'error',
243                                                 'status': previous_status}}
244            else:
245                volume_state = {'volume_state': {'migration_status': 'error'}}
246            self._set_volume_state_and_notify('migrate_volume_to_host',
247                                              volume_state,
248                                              context, ex, request_spec)
249
250        try:
251            tgt_backend = self.driver.backend_passes_filters(context, backend,
252                                                             request_spec,
253                                                             filter_properties)
254        except exception.NoValidBackend as ex:
255            _migrate_volume_set_error(self, context, ex, request_spec)
256        except Exception as ex:
257            with excutils.save_and_reraise_exception():
258                _migrate_volume_set_error(self, context, ex, request_spec)
259        else:
260            volume_rpcapi.VolumeAPI().migrate_volume(context, volume,
261                                                     tgt_backend,
262                                                     force_copy)
263
264    # FIXME(geguileo): Remove this in v4.0 of RPC API.
265    def migrate_volume_to_host(self, context, volume, host, force_host_copy,
266                               request_spec, filter_properties=None):
267        return self.migrate_volume(context, volume, host, force_host_copy,
268                                   request_spec, filter_properties)
269
270    def retype(self, context, volume, request_spec, filter_properties=None):
271        """Schedule the modification of a volume's type.
272
273        :param context: the request context
274        :param volume: the volume object to retype
275        :param request_spec: parameters for this retype request
276        :param filter_properties: parameters to filter by
277        """
278
279        self._wait_for_scheduler()
280
281        def _retype_volume_set_error(self, context, ex, request_spec,
282                                     volume_ref, reservations, msg=None):
283            if reservations:
284                QUOTAS.rollback(context, reservations)
285            previous_status = (
286                volume_ref.previous_status or volume_ref.status)
287            volume_state = {'volume_state': {'status': previous_status}}
288            self._set_volume_state_and_notify('retype', volume_state,
289                                              context, ex, request_spec, msg)
290
291        reservations = request_spec.get('quota_reservations')
292        old_reservations = request_spec.get('old_reservations', None)
293        new_type = request_spec.get('volume_type')
294        if new_type is None:
295            msg = _('New volume type not specified in request_spec.')
296            ex = exception.ParameterNotFound(param='volume_type')
297            _retype_volume_set_error(self, context, ex, request_spec,
298                                     volume, reservations, msg)
299
300        # Default migration policy is 'never'
301        migration_policy = request_spec.get('migration_policy')
302        if not migration_policy:
303            migration_policy = 'never'
304
305        try:
306            tgt_backend = self.driver.find_retype_backend(context,
307                                                          request_spec,
308                                                          filter_properties,
309                                                          migration_policy)
310        except Exception as ex:
311            # Not having a valid host is an expected exception, so we don't
312            # reraise on it.
313            reraise = not isinstance(ex, exception.NoValidBackend)
314            with excutils.save_and_reraise_exception(reraise=reraise):
315                _retype_volume_set_error(self, context, ex, request_spec,
316                                         volume, reservations)
317        else:
318            volume_rpcapi.VolumeAPI().retype(context, volume,
319                                             new_type['id'], tgt_backend,
320                                             migration_policy,
321                                             reservations,
322                                             old_reservations)
323
324    def manage_existing(self, context, volume, request_spec,
325                        filter_properties=None):
326        """Ensure that the host exists and can accept the volume."""
327
328        self._wait_for_scheduler()
329
330        def _manage_existing_set_error(self, context, ex, request_spec):
331            volume_state = {'volume_state': {'status': 'error_managing'}}
332            self._set_volume_state_and_notify('manage_existing', volume_state,
333                                              context, ex, request_spec)
334
335        try:
336            backend = self.driver.backend_passes_filters(
337                context, volume.service_topic_queue, request_spec,
338                filter_properties)
339
340            # At the API we didn't have the pool info, so the volume DB entry
341            # was created without it, now we add it.
342            volume.host = backend.host
343            volume.cluster_name = backend.cluster_name
344            volume.save()
345
346        except exception.NoValidBackend as ex:
347            _manage_existing_set_error(self, context, ex, request_spec)
348        except Exception as ex:
349            with excutils.save_and_reraise_exception():
350                _manage_existing_set_error(self, context, ex, request_spec)
351        else:
352            volume_rpcapi.VolumeAPI().manage_existing(context, volume,
353                                                      request_spec.get('ref'))
354
355    def get_pools(self, context, filters=None):
356        """Get active pools from scheduler's cache.
357
358        NOTE(dulek): There's no self._wait_for_scheduler() because get_pools is
359        an RPC call (is blocking for the c-api). Also this is admin-only API
360        extension so it won't hurt the user much to retry the request manually.
361        """
362        return self.driver.get_pools(context, filters)
363
364    def validate_host_capacity(self, context, backend, request_spec,
365                               filter_properties):
366        try:
367            backend_state = self.driver.backend_passes_filters(
368                context,
369                backend,
370                request_spec, filter_properties)
371            backend_state.consume_from_volume(
372                {'size': request_spec['volume_properties']['size']})
373        except exception.NoValidBackend:
374            LOG.error("Desired host %(host)s does not have enough "
375                      "capacity.", {'host': backend})
376            return False
377        return True
378
379    def extend_volume(self, context, volume, new_size, reservations,
380                      request_spec=None, filter_properties=None):
381
382        def _extend_volume_set_error(self, context, ex, request_spec):
383            volume_state = {'volume_state': {'status': volume.previous_status,
384                                             'previous_status': None}}
385            self._set_volume_state_and_notify('extend_volume', volume_state,
386                                              context, ex, request_spec)
387
388        if not filter_properties:
389            filter_properties = {}
390
391        filter_properties['new_size'] = new_size
392        try:
393            backend_state = self.driver.backend_passes_filters(
394                context,
395                volume.service_topic_queue,
396                request_spec, filter_properties)
397            backend_state.consume_from_volume(
398                {'size': new_size - volume.size})
399            volume_rpcapi.VolumeAPI().extend_volume(context, volume, new_size,
400                                                    reservations)
401        except exception.NoValidBackend as ex:
402            QUOTAS.rollback(context, reservations,
403                            project_id=volume.project_id)
404            _extend_volume_set_error(self, context, ex, request_spec)
405            self.message_api.create(
406                context,
407                message_field.Action.EXTEND_VOLUME,
408                resource_uuid=volume.id,
409                exception=ex)
410
411    def _set_volume_state_and_notify(self, method, updates, context, ex,
412                                     request_spec, msg=None):
413        # TODO(harlowja): move into a task that just does this later.
414        if not msg:
415            msg = ("Failed to schedule_%(method)s: %(ex)s" %
416                   {'method': method, 'ex': six.text_type(ex)})
417        LOG.error(msg)
418
419        volume_state = updates['volume_state']
420        properties = request_spec.get('volume_properties', {})
421
422        volume_id = request_spec.get('volume_id', None)
423
424        if volume_id:
425            db.volume_update(context, volume_id, volume_state)
426
427        if volume_state.get('status') == 'error_managing':
428            volume_state['status'] = 'error'
429
430        payload = dict(request_spec=request_spec,
431                       volume_properties=properties,
432                       volume_id=volume_id,
433                       state=volume_state,
434                       method=method,
435                       reason=ex)
436
437        rpc.get_notifier("scheduler").error(context,
438                                            'scheduler.' + method,
439                                            payload)
440
441    def _set_snapshot_state_and_notify(self, method, snapshot, state,
442                                       context, ex, request_spec,
443                                       msg=None):
444        if not msg:
445            msg = ("Failed to schedule_%(method)s: %(ex)s" %
446                   {'method': method, 'ex': six.text_type(ex)})
447        LOG.error(msg)
448
449        model_update = dict(status=state)
450        snapshot.update(model_update)
451        snapshot.save()
452
453        payload = dict(request_spec=request_spec,
454                       snapshot_id=snapshot.id,
455                       state=state,
456                       method=method,
457                       reason=ex)
458
459        rpc.get_notifier("scheduler").error(context,
460                                            'scheduler.' + method,
461                                            payload)
462
463    @property
464    def upgrading_cloud(self):
465        min_version_str = self.sch_api.determine_rpc_version_cap()
466        min_version = versionutils.convert_version_to_int(min_version_str)
467        return min_version < self.rpc_api_version
468
469    def _cleanup_destination(self, clusters, service):
470        """Determines the RPC method, destination service and name.
471
472        The name is only used for logging, and it is the topic queue.
473        """
474        # For the scheduler we don't have a specific destination, as any
475        # scheduler will do and we know we are up, since we are running this
476        # code.
477        if service.binary == 'cinder-scheduler':
478            cleanup_rpc = self.sch_api.do_cleanup
479            dest = None
480            dest_name = service.host
481        else:
482            cleanup_rpc = self.volume_api.do_cleanup
483
484            # For clustered volume services we try to get info from the cache.
485            if service.is_clustered:
486                # Get cluster info from cache
487                dest = clusters[service.binary].get(service.cluster_name)
488                # Cache miss forces us to get the cluster from the DB via OVO
489                if not dest:
490                    dest = service.cluster
491                    clusters[service.binary][service.cluster_name] = dest
492                dest_name = dest.name
493            # Non clustered volume services
494            else:
495                dest = service
496                dest_name = service.host
497        return cleanup_rpc, dest, dest_name
498
499    def work_cleanup(self, context, cleanup_request):
500        """Process request from API to do cleanup on services.
501
502        Here we retrieve from the DB which services we want to clean up based
503        on the request from the user.
504
505        Then send individual cleanup requests to each of the services that are
506        up, and we finally return a tuple with services that we have sent a
507        cleanup request and those that were not up and we couldn't send it.
508        """
509        if self.upgrading_cloud:
510            raise exception.UnavailableDuringUpgrade(action='workers cleanup')
511
512        LOG.info('Workers cleanup request started.')
513
514        filters = dict(service_id=cleanup_request.service_id,
515                       cluster_name=cleanup_request.cluster_name,
516                       host=cleanup_request.host,
517                       binary=cleanup_request.binary,
518                       is_up=cleanup_request.is_up,
519                       disabled=cleanup_request.disabled)
520        # Get the list of all the services that match the request
521        services = objects.ServiceList.get_all(context, filters)
522
523        until = cleanup_request.until or timeutils.utcnow()
524        requested = []
525        not_requested = []
526
527        # To reduce DB queries we'll cache the clusters data
528        clusters = collections.defaultdict(dict)
529
530        for service in services:
531            cleanup_request.cluster_name = service.cluster_name
532            cleanup_request.service_id = service.id
533            cleanup_request.host = service.host
534            cleanup_request.binary = service.binary
535            cleanup_request.until = until
536
537            cleanup_rpc, dest, dest_name = self._cleanup_destination(clusters,
538                                                                     service)
539
540            # If it's a scheduler or the service is up, send the request.
541            if not dest or dest.is_up:
542                LOG.info('Sending cleanup for %(binary)s %(dest_name)s.',
543                         {'binary': service.binary,
544                          'dest_name': dest_name})
545                cleanup_rpc(context, cleanup_request)
546                requested.append(service)
547            # We don't send cleanup requests when there are no services alive
548            # to do the cleanup.
549            else:
550                LOG.info('No service available to cleanup %(binary)s '
551                         '%(dest_name)s.',
552                         {'binary': service.binary,
553                          'dest_name': dest_name})
554                not_requested.append(service)
555
556        LOG.info('Cleanup requests completed.')
557        return requested, not_requested
558