1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17"""Base Manager class.
18
19Managers are responsible for a certain aspect of the system.  It is a logical
20grouping of code relating to a portion of the system.  In general other
21components should be using the manager to make changes to the components that
22it is responsible for.
23
24For example, other components that need to deal with volumes in some way,
25should do so by calling methods on the VolumeManager instead of directly
26changing fields in the database.  This allows us to keep all of the code
27relating to volumes in the same place.
28
29We have adopted a basic strategy of Smart managers and dumb data, which means
30rather than attaching methods to data objects, components should call manager
31methods that act on the data.
32
33Methods on managers that can be executed locally should be called directly. If
34a particular method must execute on a remote host, this should be done via rpc
35to the service that wraps the manager
36
37Managers should be responsible for most of the db access, and
38non-implementation specific data.  Anything implementation specific that can't
39be generalized should be done by the Driver.
40
41In general, we prefer to have one manager with multiple drivers for different
42implementations, but sometimes it makes sense to have multiple managers.  You
43can think of it this way: Abstract different overall strategies at the manager
44level(FlatNetwork vs VlanNetwork), and different implementations at the driver
45level(LinuxNetDriver vs CiscoNetDriver).
46
47Managers will often provide methods for initial setup of a host or periodic
48tasks to a wrapping service.
49
50This module provides Manager, a base class for managers.
51
52"""
53
54
55from oslo_config import cfg
56from oslo_log import log as logging
57import oslo_messaging as messaging
58from oslo_service import periodic_task
59from oslo_utils import timeutils
60
61from cinder import context
62from cinder import db
63from cinder.db import base
64from cinder import exception
65from cinder import objects
66from cinder import rpc
67from cinder.scheduler import rpcapi as scheduler_rpcapi
68from cinder import utils
69
70from eventlet import greenpool
71from eventlet import tpool
72
73
74CONF = cfg.CONF
75LOG = logging.getLogger(__name__)
76
77
78class PeriodicTasks(periodic_task.PeriodicTasks):
79    def __init__(self):
80        super(PeriodicTasks, self).__init__(CONF)
81
82
83class Manager(base.Base, PeriodicTasks):
84    # Set RPC API version to 1.0 by default.
85    RPC_API_VERSION = '1.0'
86
87    target = messaging.Target(version=RPC_API_VERSION)
88
89    def __init__(self, host=None, db_driver=None, cluster=None, **kwargs):
90        if not host:
91            host = CONF.host
92        self.host = host
93        self.cluster = cluster
94        self.additional_endpoints = []
95        self.availability_zone = CONF.storage_availability_zone
96        super(Manager, self).__init__(db_driver)
97
98    def _set_tpool_size(self, nthreads):
99        # NOTE(geguileo): Until PR #472 is merged we have to be very careful
100        # not to call "tpool.execute" before calling this method.
101        tpool.set_num_threads(nthreads)
102
103    @property
104    def service_topic_queue(self):
105        return self.cluster or self.host
106
107    def init_host(self, service_id=None, added_to_cluster=None):
108        """Handle initialization if this is a standalone service.
109
110        A hook point for services to execute tasks before the services are made
111        available (i.e. showing up on RPC and starting to accept RPC calls) to
112        other components.  Child classes should override this method.
113
114        :param service_id: ID of the service where the manager is running.
115        :param added_to_cluster: True when a host's cluster configuration has
116                                 changed from not being defined or being '' to
117                                 any other value and the DB service record
118                                 reflects this new value.
119        """
120        pass
121
122    def init_host_with_rpc(self):
123        """A hook for service to do jobs after RPC is ready.
124
125        Like init_host(), this method is a hook where services get a chance
126        to execute tasks that *need* RPC. Child classes should override
127        this method.
128
129        """
130        pass
131
132    def is_working(self):
133        """Method indicating if service is working correctly.
134
135        This method is supposed to be overridden by subclasses and return if
136        manager is working correctly.
137        """
138        return True
139
140    def reset(self):
141        """Method executed when SIGHUP is caught by the process.
142
143        We're utilizing it to reset RPC API version pins to avoid restart of
144        the service when rolling upgrade is completed.
145        """
146        LOG.info('Resetting cached RPC version pins.')
147        rpc.LAST_OBJ_VERSIONS = {}
148        rpc.LAST_RPC_VERSIONS = {}
149
150    def set_log_levels(self, context, log_request):
151        utils.set_log_levels(log_request.prefix, log_request.level)
152
153    def get_log_levels(self, context, log_request):
154        levels = utils.get_log_levels(log_request.prefix)
155        log_levels = [objects.LogLevel(context, prefix=prefix, level=level)
156                      for prefix, level in levels.items()]
157        return objects.LogLevelList(context, objects=log_levels)
158
159
160class ThreadPoolManager(Manager):
161    def __init__(self, *args, **kwargs):
162        self._tp = greenpool.GreenPool()
163        super(ThreadPoolManager, self).__init__(*args, **kwargs)
164
165    def _add_to_threadpool(self, func, *args, **kwargs):
166        self._tp.spawn_n(func, *args, **kwargs)
167
168
169class SchedulerDependentManager(ThreadPoolManager):
170    """Periodically send capability updates to the Scheduler services.
171
172    Services that need to update the Scheduler of their capabilities
173    should derive from this class. Otherwise they can derive from
174    manager.Manager directly. Updates are only sent after
175    update_service_capabilities is called with non-None values.
176
177    """
178
179    def __init__(self, host=None, db_driver=None, service_name='undefined',
180                 cluster=None):
181        self.last_capabilities = None
182        self.service_name = service_name
183        self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
184        super(SchedulerDependentManager, self).__init__(host, db_driver,
185                                                        cluster=cluster)
186
187    def update_service_capabilities(self, capabilities):
188        """Remember these capabilities to send on next periodic update."""
189        self.last_capabilities = capabilities
190
191    def _publish_service_capabilities(self, context):
192        """Pass data back to the scheduler at a periodic interval."""
193        if self.last_capabilities:
194            LOG.debug('Notifying Schedulers of capabilities ...')
195            self.scheduler_rpcapi.update_service_capabilities(
196                context,
197                self.service_name,
198                self.host,
199                self.last_capabilities,
200                self.cluster)
201            try:
202                self.scheduler_rpcapi.notify_service_capabilities(
203                    context,
204                    self.service_name,
205                    self.service_topic_queue,
206                    self.last_capabilities)
207            except exception.ServiceTooOld as e:
208                # This means we have Newton's c-sch in the deployment, so
209                # rpcapi cannot send the message. We can safely ignore the
210                # error. Log it because it shouldn't happen after upgrade.
211                msg = ("Failed to notify about cinder-volume service "
212                       "capabilities for host %(host)s. This is normal "
213                       "during a live upgrade. Error: %(e)s")
214                LOG.warning(msg, {'host': self.host, 'e': e})
215
216    def reset(self):
217        super(SchedulerDependentManager, self).reset()
218        self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
219
220
221class CleanableManager(object):
222    def do_cleanup(self, context, cleanup_request):
223        LOG.info('Initiating service %s cleanup',
224                 cleanup_request.service_id)
225
226        # If the 'until' field in the cleanup request is not set, we default to
227        # this very moment.
228        until = cleanup_request.until or timeutils.utcnow()
229        keep_entry = False
230
231        to_clean = db.worker_get_all(
232            context,
233            resource_type=cleanup_request.resource_type,
234            resource_id=cleanup_request.resource_id,
235            service_id=cleanup_request.service_id,
236            until=until)
237
238        for clean in to_clean:
239            original_service_id = clean.service_id
240            original_time = clean.updated_at
241            # Try to do a soft delete to mark the entry as being cleaned up
242            # by us (setting service id to our service id).
243            res = db.worker_claim_for_cleanup(context,
244                                              claimer_id=self.service_id,
245                                              orm_worker=clean)
246
247            # Claim may fail if entry is being cleaned by another service, has
248            # been removed (finished cleaning) by another service or the user
249            # started a new cleanable operation.
250            # In any of these cases we don't have to do cleanup or remove the
251            # worker entry.
252            if not res:
253                continue
254
255            # Try to get versioned object for resource we have to cleanup
256            try:
257                vo_cls = getattr(objects, clean.resource_type)
258                vo = vo_cls.get_by_id(context, clean.resource_id)
259                # Set the worker DB entry in the VO and mark it as being a
260                # clean operation
261                clean.cleaning = True
262                vo.worker = clean
263            except exception.NotFound:
264                LOG.debug('Skipping cleanup for non existent %(type)s %(id)s.',
265                          {'type': clean.resource_type,
266                           'id': clean.resource_id})
267            else:
268                # Resource status should match
269                if vo.status != clean.status:
270                    LOG.debug('Skipping cleanup for mismatching work on '
271                              '%(type)s %(id)s: %(exp_sts)s <> %(found_sts)s.',
272                              {'type': clean.resource_type,
273                               'id': clean.resource_id,
274                               'exp_sts': clean.status,
275                               'found_sts': vo.status})
276                else:
277                    LOG.info('Cleaning %(type)s with id %(id)s and status '
278                             '%(status)s',
279                             {'type': clean.resource_type,
280                              'id': clean.resource_id,
281                              'status': clean.status},
282                             resource=vo)
283                    try:
284                        # Some cleanup jobs are performed asynchronously, so
285                        # we don't delete the worker entry, they'll take care
286                        # of it
287                        keep_entry = self._do_cleanup(context, vo)
288                    except Exception:
289                        LOG.exception('Could not perform cleanup.')
290                        # Return the worker DB entry to the original service
291                        db.worker_update(context, clean.id,
292                                         service_id=original_service_id,
293                                         updated_at=original_time)
294                        continue
295
296            # The resource either didn't exist or was properly cleaned, either
297            # way we can remove the entry from the worker table if the cleanup
298            # method doesn't want to keep the entry (for example for delayed
299            # deletion).
300            if not keep_entry and not db.worker_destroy(context, id=clean.id):
301                LOG.warning('Could not remove worker entry %s.', clean.id)
302
303        LOG.info('Service %s cleanup completed.', cleanup_request.service_id)
304
305    def _do_cleanup(self, ctxt, vo_resource):
306        return False
307
308    def init_host(self, service_id, **kwargs):
309        ctxt = context.get_admin_context()
310        self.service_id = service_id
311        # TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove
312        # call to workers_init.
313        db.workers_init()
314        cleanup_request = objects.CleanupRequest(service_id=service_id)
315        self.do_cleanup(ctxt, cleanup_request)
316