1# Copyright 2010 United States Government as represented by the 2# Administrator of the National Aeronautics and Space Administration. 3# All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17"""Base Manager class. 18 19Managers are responsible for a certain aspect of the system. It is a logical 20grouping of code relating to a portion of the system. In general other 21components should be using the manager to make changes to the components that 22it is responsible for. 23 24For example, other components that need to deal with volumes in some way, 25should do so by calling methods on the VolumeManager instead of directly 26changing fields in the database. This allows us to keep all of the code 27relating to volumes in the same place. 28 29We have adopted a basic strategy of Smart managers and dumb data, which means 30rather than attaching methods to data objects, components should call manager 31methods that act on the data. 32 33Methods on managers that can be executed locally should be called directly. If 34a particular method must execute on a remote host, this should be done via rpc 35to the service that wraps the manager 36 37Managers should be responsible for most of the db access, and 38non-implementation specific data. Anything implementation specific that can't 39be generalized should be done by the Driver. 40 41In general, we prefer to have one manager with multiple drivers for different 42implementations, but sometimes it makes sense to have multiple managers. You 43can think of it this way: Abstract different overall strategies at the manager 44level(FlatNetwork vs VlanNetwork), and different implementations at the driver 45level(LinuxNetDriver vs CiscoNetDriver). 46 47Managers will often provide methods for initial setup of a host or periodic 48tasks to a wrapping service. 49 50This module provides Manager, a base class for managers. 51 52""" 53 54 55from oslo_config import cfg 56from oslo_log import log as logging 57import oslo_messaging as messaging 58from oslo_service import periodic_task 59from oslo_utils import timeutils 60 61from cinder import context 62from cinder import db 63from cinder.db import base 64from cinder import exception 65from cinder import objects 66from cinder import rpc 67from cinder.scheduler import rpcapi as scheduler_rpcapi 68from cinder import utils 69 70from eventlet import greenpool 71from eventlet import tpool 72 73 74CONF = cfg.CONF 75LOG = logging.getLogger(__name__) 76 77 78class PeriodicTasks(periodic_task.PeriodicTasks): 79 def __init__(self): 80 super(PeriodicTasks, self).__init__(CONF) 81 82 83class Manager(base.Base, PeriodicTasks): 84 # Set RPC API version to 1.0 by default. 85 RPC_API_VERSION = '1.0' 86 87 target = messaging.Target(version=RPC_API_VERSION) 88 89 def __init__(self, host=None, db_driver=None, cluster=None, **kwargs): 90 if not host: 91 host = CONF.host 92 self.host = host 93 self.cluster = cluster 94 self.additional_endpoints = [] 95 self.availability_zone = CONF.storage_availability_zone 96 super(Manager, self).__init__(db_driver) 97 98 def _set_tpool_size(self, nthreads): 99 # NOTE(geguileo): Until PR #472 is merged we have to be very careful 100 # not to call "tpool.execute" before calling this method. 101 tpool.set_num_threads(nthreads) 102 103 @property 104 def service_topic_queue(self): 105 return self.cluster or self.host 106 107 def init_host(self, service_id=None, added_to_cluster=None): 108 """Handle initialization if this is a standalone service. 109 110 A hook point for services to execute tasks before the services are made 111 available (i.e. showing up on RPC and starting to accept RPC calls) to 112 other components. Child classes should override this method. 113 114 :param service_id: ID of the service where the manager is running. 115 :param added_to_cluster: True when a host's cluster configuration has 116 changed from not being defined or being '' to 117 any other value and the DB service record 118 reflects this new value. 119 """ 120 pass 121 122 def init_host_with_rpc(self): 123 """A hook for service to do jobs after RPC is ready. 124 125 Like init_host(), this method is a hook where services get a chance 126 to execute tasks that *need* RPC. Child classes should override 127 this method. 128 129 """ 130 pass 131 132 def is_working(self): 133 """Method indicating if service is working correctly. 134 135 This method is supposed to be overridden by subclasses and return if 136 manager is working correctly. 137 """ 138 return True 139 140 def reset(self): 141 """Method executed when SIGHUP is caught by the process. 142 143 We're utilizing it to reset RPC API version pins to avoid restart of 144 the service when rolling upgrade is completed. 145 """ 146 LOG.info('Resetting cached RPC version pins.') 147 rpc.LAST_OBJ_VERSIONS = {} 148 rpc.LAST_RPC_VERSIONS = {} 149 150 def set_log_levels(self, context, log_request): 151 utils.set_log_levels(log_request.prefix, log_request.level) 152 153 def get_log_levels(self, context, log_request): 154 levels = utils.get_log_levels(log_request.prefix) 155 log_levels = [objects.LogLevel(context, prefix=prefix, level=level) 156 for prefix, level in levels.items()] 157 return objects.LogLevelList(context, objects=log_levels) 158 159 160class ThreadPoolManager(Manager): 161 def __init__(self, *args, **kwargs): 162 self._tp = greenpool.GreenPool() 163 super(ThreadPoolManager, self).__init__(*args, **kwargs) 164 165 def _add_to_threadpool(self, func, *args, **kwargs): 166 self._tp.spawn_n(func, *args, **kwargs) 167 168 169class SchedulerDependentManager(ThreadPoolManager): 170 """Periodically send capability updates to the Scheduler services. 171 172 Services that need to update the Scheduler of their capabilities 173 should derive from this class. Otherwise they can derive from 174 manager.Manager directly. Updates are only sent after 175 update_service_capabilities is called with non-None values. 176 177 """ 178 179 def __init__(self, host=None, db_driver=None, service_name='undefined', 180 cluster=None): 181 self.last_capabilities = None 182 self.service_name = service_name 183 self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() 184 super(SchedulerDependentManager, self).__init__(host, db_driver, 185 cluster=cluster) 186 187 def update_service_capabilities(self, capabilities): 188 """Remember these capabilities to send on next periodic update.""" 189 self.last_capabilities = capabilities 190 191 def _publish_service_capabilities(self, context): 192 """Pass data back to the scheduler at a periodic interval.""" 193 if self.last_capabilities: 194 LOG.debug('Notifying Schedulers of capabilities ...') 195 self.scheduler_rpcapi.update_service_capabilities( 196 context, 197 self.service_name, 198 self.host, 199 self.last_capabilities, 200 self.cluster) 201 try: 202 self.scheduler_rpcapi.notify_service_capabilities( 203 context, 204 self.service_name, 205 self.service_topic_queue, 206 self.last_capabilities) 207 except exception.ServiceTooOld as e: 208 # This means we have Newton's c-sch in the deployment, so 209 # rpcapi cannot send the message. We can safely ignore the 210 # error. Log it because it shouldn't happen after upgrade. 211 msg = ("Failed to notify about cinder-volume service " 212 "capabilities for host %(host)s. This is normal " 213 "during a live upgrade. Error: %(e)s") 214 LOG.warning(msg, {'host': self.host, 'e': e}) 215 216 def reset(self): 217 super(SchedulerDependentManager, self).reset() 218 self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() 219 220 221class CleanableManager(object): 222 def do_cleanup(self, context, cleanup_request): 223 LOG.info('Initiating service %s cleanup', 224 cleanup_request.service_id) 225 226 # If the 'until' field in the cleanup request is not set, we default to 227 # this very moment. 228 until = cleanup_request.until or timeutils.utcnow() 229 keep_entry = False 230 231 to_clean = db.worker_get_all( 232 context, 233 resource_type=cleanup_request.resource_type, 234 resource_id=cleanup_request.resource_id, 235 service_id=cleanup_request.service_id, 236 until=until) 237 238 for clean in to_clean: 239 original_service_id = clean.service_id 240 original_time = clean.updated_at 241 # Try to do a soft delete to mark the entry as being cleaned up 242 # by us (setting service id to our service id). 243 res = db.worker_claim_for_cleanup(context, 244 claimer_id=self.service_id, 245 orm_worker=clean) 246 247 # Claim may fail if entry is being cleaned by another service, has 248 # been removed (finished cleaning) by another service or the user 249 # started a new cleanable operation. 250 # In any of these cases we don't have to do cleanup or remove the 251 # worker entry. 252 if not res: 253 continue 254 255 # Try to get versioned object for resource we have to cleanup 256 try: 257 vo_cls = getattr(objects, clean.resource_type) 258 vo = vo_cls.get_by_id(context, clean.resource_id) 259 # Set the worker DB entry in the VO and mark it as being a 260 # clean operation 261 clean.cleaning = True 262 vo.worker = clean 263 except exception.NotFound: 264 LOG.debug('Skipping cleanup for non existent %(type)s %(id)s.', 265 {'type': clean.resource_type, 266 'id': clean.resource_id}) 267 else: 268 # Resource status should match 269 if vo.status != clean.status: 270 LOG.debug('Skipping cleanup for mismatching work on ' 271 '%(type)s %(id)s: %(exp_sts)s <> %(found_sts)s.', 272 {'type': clean.resource_type, 273 'id': clean.resource_id, 274 'exp_sts': clean.status, 275 'found_sts': vo.status}) 276 else: 277 LOG.info('Cleaning %(type)s with id %(id)s and status ' 278 '%(status)s', 279 {'type': clean.resource_type, 280 'id': clean.resource_id, 281 'status': clean.status}, 282 resource=vo) 283 try: 284 # Some cleanup jobs are performed asynchronously, so 285 # we don't delete the worker entry, they'll take care 286 # of it 287 keep_entry = self._do_cleanup(context, vo) 288 except Exception: 289 LOG.exception('Could not perform cleanup.') 290 # Return the worker DB entry to the original service 291 db.worker_update(context, clean.id, 292 service_id=original_service_id, 293 updated_at=original_time) 294 continue 295 296 # The resource either didn't exist or was properly cleaned, either 297 # way we can remove the entry from the worker table if the cleanup 298 # method doesn't want to keep the entry (for example for delayed 299 # deletion). 300 if not keep_entry and not db.worker_destroy(context, id=clean.id): 301 LOG.warning('Could not remove worker entry %s.', clean.id) 302 303 LOG.info('Service %s cleanup completed.', cleanup_request.service_id) 304 305 def _do_cleanup(self, ctxt, vo_resource): 306 return False 307 308 def init_host(self, service_id, **kwargs): 309 ctxt = context.get_admin_context() 310 self.service_id = service_id 311 # TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove 312 # call to workers_init. 313 db.workers_init() 314 cleanup_request = objects.CleanupRequest(service_id=service_id) 315 self.do_cleanup(ctxt, cleanup_request) 316