1# Copyright 2010 United States Government as represented by the 2# Administrator of the National Aeronautics and Space Administration. 3# All Rights Reserved. 4# Copyright 2013 Red Hat, Inc. 5# Copyright 2013 New Dream Network, LLC (DreamHost) 6# 7# Licensed under the Apache License, Version 2.0 (the "License"); you may 8# not use this file except in compliance with the License. You may obtain 9# a copy of the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, software 14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 16# License for the specific language governing permissions and limitations 17# under the License. 18 19import abc 20import functools 21import inspect 22import logging 23import threading 24import traceback 25 26from oslo_config import cfg 27from oslo_service import service 28from oslo_utils import eventletutils 29from oslo_utils import timeutils 30from stevedore import driver 31 32from oslo_messaging._drivers import base as driver_base 33from oslo_messaging import _utils as utils 34from oslo_messaging import exceptions 35 36__all__ = [ 37 'ExecutorLoadFailure', 38 'MessageHandlingServer', 39 'MessagingServerError', 40 'ServerListenError', 41] 42 43LOG = logging.getLogger(__name__) 44 45# The default number of seconds of waiting after which we will emit a log 46# message 47DEFAULT_LOG_AFTER = 30 48 49 50_pool_opts = [ 51 cfg.IntOpt('executor_thread_pool_size', 52 default=64, 53 deprecated_name="rpc_thread_pool_size", 54 help='Size of executor thread pool when' 55 ' executor is threading or eventlet.'), 56] 57 58 59class MessagingServerError(exceptions.MessagingException): 60 """Base class for all MessageHandlingServer exceptions.""" 61 62 63class ExecutorLoadFailure(MessagingServerError): 64 """Raised if an executor can't be loaded.""" 65 66 def __init__(self, executor, ex): 67 msg = 'Failed to load executor "%s": %s' % (executor, ex) 68 super(ExecutorLoadFailure, self).__init__(msg) 69 self.executor = executor 70 self.ex = ex 71 72 73class ServerListenError(MessagingServerError): 74 """Raised if we failed to listen on a target.""" 75 76 def __init__(self, target, ex): 77 msg = 'Failed to listen on target "%s": %s' % (target, ex) 78 super(ServerListenError, self).__init__(msg) 79 self.target = target 80 self.ex = ex 81 82 83class TaskTimeout(MessagingServerError): 84 """Raised if we timed out waiting for a task to complete.""" 85 86 87class _OrderedTask(object): 88 """A task which must be executed in a particular order. 89 90 A caller may wait for this task to complete by calling 91 `wait_for_completion`. 92 93 A caller may run this task with `run_once`, which will ensure that however 94 many times the task is called it only runs once. Simultaneous callers will 95 block until the running task completes, which means that any caller can be 96 sure that the task has completed after run_once returns. 97 """ 98 99 INIT = 0 # The task has not yet started 100 RUNNING = 1 # The task is running somewhere 101 COMPLETE = 2 # The task has run somewhere 102 103 def __init__(self, name): 104 """Create a new _OrderedTask. 105 106 :param name: The name of this task. Used in log messages. 107 """ 108 super(_OrderedTask, self).__init__() 109 110 self._name = name 111 self._cond = threading.Condition() 112 self._state = self.INIT 113 114 def _wait(self, condition, msg, log_after, timeout_timer): 115 """Wait while condition() is true. Write a log message if condition() 116 has not become false within `log_after` seconds. Raise TaskTimeout if 117 timeout_timer expires while waiting. 118 """ 119 120 log_timer = None 121 if log_after != 0: 122 log_timer = timeutils.StopWatch(duration=log_after) 123 log_timer.start() 124 125 while condition(): 126 if log_timer is not None and log_timer.expired(): 127 LOG.warning('Possible hang: %s', msg) 128 LOG.debug(''.join(traceback.format_stack())) 129 # Only log once. After than we wait indefinitely without 130 # logging. 131 log_timer = None 132 133 if timeout_timer is not None and timeout_timer.expired(): 134 raise TaskTimeout(msg) 135 136 timeouts = [] 137 if log_timer is not None: 138 timeouts.append(log_timer.leftover()) 139 if timeout_timer is not None: 140 timeouts.append(timeout_timer.leftover()) 141 142 wait = None 143 if timeouts: 144 wait = min(timeouts) 145 self._cond.wait(wait) 146 147 @property 148 def complete(self): 149 return self._state == self.COMPLETE 150 151 def wait_for_completion(self, caller, log_after, timeout_timer): 152 """Wait until this task has completed. 153 154 :param caller: The name of the task which is waiting. 155 :param log_after: Emit a log message if waiting longer than `log_after` 156 seconds. 157 :param timeout_timer: Raise TaskTimeout if StopWatch object 158 `timeout_timer` expires while waiting. 159 """ 160 with self._cond: 161 msg = '%s is waiting for %s to complete' % (caller, self._name) 162 self._wait(lambda: not self.complete, 163 msg, log_after, timeout_timer) 164 165 def run_once(self, fn, log_after, timeout_timer): 166 """Run a task exactly once. If it is currently running in another 167 thread, wait for it to complete. If it has already run, return 168 immediately without running it again. 169 170 :param fn: The task to run. It must be a callable taking no arguments. 171 It may optionally return another callable, which also takes 172 no arguments, which will be executed after completion has 173 been signaled to other threads. 174 :param log_after: Emit a log message if waiting longer than `log_after` 175 seconds. 176 :param timeout_timer: Raise TaskTimeout if StopWatch object 177 `timeout_timer` expires while waiting. 178 """ 179 with self._cond: 180 if self._state == self.INIT: 181 self._state = self.RUNNING 182 # Note that nothing waits on RUNNING, so no need to notify 183 184 # We need to release the condition lock before calling out to 185 # prevent deadlocks. Reacquire it immediately afterwards. 186 self._cond.release() 187 try: 188 post_fn = fn() 189 finally: 190 self._cond.acquire() 191 self._state = self.COMPLETE 192 self._cond.notify_all() 193 194 if post_fn is not None: 195 # Release the condition lock before calling out to prevent 196 # deadlocks. Reacquire it immediately afterwards. 197 self._cond.release() 198 try: 199 post_fn() 200 finally: 201 self._cond.acquire() 202 elif self._state == self.RUNNING: 203 msg = ('%s is waiting for another thread to complete' 204 % self._name) 205 self._wait(lambda: self._state == self.RUNNING, 206 msg, log_after, timeout_timer) 207 208 209class _OrderedTaskRunner(object): 210 """Mixin for a class which executes ordered tasks.""" 211 212 def __init__(self, *args, **kwargs): 213 super(_OrderedTaskRunner, self).__init__(*args, **kwargs) 214 215 # Get a list of methods on this object which have the _ordered 216 # attribute 217 self._tasks = [name 218 for (name, member) in inspect.getmembers(self) 219 if inspect.ismethod(member) and 220 getattr(member, '_ordered', False)] 221 self.reset_states() 222 223 self._reset_lock = threading.Lock() 224 225 def reset_states(self): 226 # Create new task states for tasks in reset 227 self._states = {task: _OrderedTask(task) for task in self._tasks} 228 229 @staticmethod 230 def decorate_ordered(fn, state, after, reset_after): 231 232 @functools.wraps(fn) 233 def wrapper(self, *args, **kwargs): 234 # If the reset_after state has already completed, reset state so 235 # we can run again. 236 # NOTE(mdbooth): This is ugly and requires external locking to be 237 # deterministic when using multiple threads. Consider a thread that 238 # does: server.stop(), server.wait(). If another thread causes a 239 # reset between stop() and wait(), this will not have the intended 240 # behaviour. It is safe without external locking, if the caller 241 # instantiates a new object. 242 with self._reset_lock: 243 if (reset_after is not None and 244 self._states[reset_after].complete): 245 self.reset_states() 246 247 # Store the states we started with in case the state wraps on us 248 # while we're sleeping. We must wait and run_once in the same 249 # epoch. If the epoch ended while we were sleeping, run_once will 250 # safely do nothing. 251 states = self._states 252 253 log_after = kwargs.pop('log_after', DEFAULT_LOG_AFTER) 254 timeout = kwargs.pop('timeout', None) 255 256 timeout_timer = None 257 if timeout is not None: 258 timeout_timer = timeutils.StopWatch(duration=timeout) 259 timeout_timer.start() 260 261 # Wait for the given preceding state to complete 262 if after is not None: 263 states[after].wait_for_completion(state, 264 log_after, timeout_timer) 265 266 # Run this state 267 states[state].run_once(lambda: fn(self, *args, **kwargs), 268 log_after, timeout_timer) 269 return wrapper 270 271 272def ordered(after=None, reset_after=None): 273 """A method which will be executed as an ordered task. The method will be 274 called exactly once, however many times it is called. If it is called 275 multiple times simultaneously it will only be called once, but all callers 276 will wait until execution is complete. 277 278 If `after` is given, this method will not run until `after` has completed. 279 280 If `reset_after` is given and the target method has completed, allow this 281 task to run again by resetting all task states. 282 283 :param after: Optionally, the name of another `ordered` method. Wait for 284 the completion of `after` before executing this method. 285 :param reset_after: Optionally, the name of another `ordered` method. Reset 286 all states when calling this method if `reset_after` 287 has completed. 288 """ 289 def _ordered(fn): 290 # Set an attribute on the method so we can find it later 291 setattr(fn, '_ordered', True) 292 state = fn.__name__ 293 294 return _OrderedTaskRunner.decorate_ordered(fn, state, after, 295 reset_after) 296 return _ordered 297 298 299class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner, 300 metaclass=abc.ABCMeta): 301 """Server for handling messages. 302 303 Connect a transport to a dispatcher that knows how to process the 304 message using an executor that knows how the app wants to create 305 new tasks. 306 """ 307 308 def __init__(self, transport, dispatcher, executor=None): 309 """Construct a message handling server. 310 311 The dispatcher parameter is a DispatcherBase instance which is used 312 for routing request to endpoint for processing. 313 314 The executor parameter controls how incoming messages will be received 315 and dispatched. Executor is automatically detected from 316 execution environment. 317 It handles many message in parallel. If your application need 318 asynchronism then you need to consider to use the eventlet executor. 319 320 :param transport: the messaging transport 321 :type transport: Transport 322 :param dispatcher: has a dispatch() method which is invoked for each 323 incoming request 324 :type dispatcher: DispatcherBase 325 :param executor: name of message executor - available values are 326 'eventlet' and 'threading' 327 :type executor: str 328 """ 329 if executor and executor not in ("threading", "eventlet"): 330 raise ExecutorLoadFailure( 331 executor, 332 "Executor should be None or 'eventlet' and 'threading'") 333 if not executor: 334 executor = utils.get_executor_with_context() 335 336 self.conf = transport.conf 337 self.conf.register_opts(_pool_opts) 338 339 self.transport = transport 340 self.dispatcher = dispatcher 341 self.executor_type = executor 342 if self.executor_type == "eventlet": 343 eventletutils.warn_eventlet_not_patched( 344 expected_patched_modules=['thread'], 345 what="the 'oslo.messaging eventlet executor'") 346 347 self.listener = None 348 349 try: 350 mgr = driver.DriverManager('oslo.messaging.executors', 351 self.executor_type) 352 except RuntimeError as ex: 353 raise ExecutorLoadFailure(self.executor_type, ex) 354 355 self._executor_cls = mgr.driver 356 357 self._work_executor = None 358 359 self._started = False 360 361 super(MessageHandlingServer, self).__init__() 362 363 def _on_incoming(self, incoming): 364 """Handles on_incoming event 365 366 :param incoming: incoming request. 367 """ 368 self._work_executor.submit(self._process_incoming, incoming) 369 370 @abc.abstractmethod 371 def _process_incoming(self, incoming): 372 """Perform processing incoming request 373 374 :param incoming: incoming request. 375 """ 376 377 @abc.abstractmethod 378 def _create_listener(self): 379 """Creates listener object for polling requests 380 :return: MessageListenerAdapter 381 """ 382 383 @ordered(reset_after='stop') 384 def start(self, override_pool_size=None): 385 """Start handling incoming messages. 386 387 This method causes the server to begin polling the transport for 388 incoming messages and passing them to the dispatcher. Message 389 processing will continue until the stop() method is called. 390 391 The executor controls how the server integrates with the applications 392 I/O handling strategy - it may choose to poll for messages in a new 393 process, thread or co-operatively scheduled coroutine or simply by 394 registering a callback with an event loop. Similarly, the executor may 395 choose to dispatch messages in a new thread, coroutine or simply the 396 current thread. 397 """ 398 if self._started: 399 LOG.warning('The server has already been started. Ignoring ' 400 'the redundant call to start().') 401 return 402 403 self._started = True 404 405 executor_opts = {} 406 407 executor_opts["max_workers"] = ( 408 override_pool_size or self.conf.executor_thread_pool_size 409 ) 410 self._work_executor = self._executor_cls(**executor_opts) 411 412 try: 413 self.listener = self._create_listener() 414 except driver_base.TransportDriverError as ex: 415 raise ServerListenError(self.target, ex) 416 417 self.listener.start(self._on_incoming) 418 419 @ordered(after='start') 420 def stop(self): 421 """Stop handling incoming messages. 422 423 Once this method returns, no new incoming messages will be handled by 424 the server. However, the server may still be in the process of handling 425 some messages, and underlying driver resources associated to this 426 server are still in use. See 'wait' for more details. 427 """ 428 if self.listener: 429 self.listener.stop() 430 self._started = False 431 432 @ordered(after='stop') 433 def wait(self): 434 """Wait for message processing to complete. 435 436 After calling stop(), there may still be some existing messages 437 which have not been completely processed. The wait() method blocks 438 until all message processing has completed. 439 440 Once it's finished, the underlying driver resources associated to this 441 server are released (like closing useless network connections). 442 """ 443 self._work_executor.shutdown(wait=True) 444 445 # Close listener connection after processing all messages 446 if self.listener: 447 self.listener.cleanup() 448 449 def reset(self): 450 """Reset service. 451 452 Called in case service running in daemon mode receives SIGHUP. 453 """ 454 # TODO(sergey.vilgelm): implement this method 455 pass 456