1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4# Copyright 2013 Red Hat, Inc.
5# Copyright 2013 New Dream Network, LLC (DreamHost)
6#
7#    Licensed under the Apache License, Version 2.0 (the "License"); you may
8#    not use this file except in compliance with the License. You may obtain
9#    a copy of the License at
10#
11#         http://www.apache.org/licenses/LICENSE-2.0
12#
13#    Unless required by applicable law or agreed to in writing, software
14#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16#    License for the specific language governing permissions and limitations
17#    under the License.
18
19import abc
20import functools
21import inspect
22import logging
23import threading
24import traceback
25
26from oslo_config import cfg
27from oslo_service import service
28from oslo_utils import eventletutils
29from oslo_utils import timeutils
30from stevedore import driver
31
32from oslo_messaging._drivers import base as driver_base
33from oslo_messaging import _utils as utils
34from oslo_messaging import exceptions
35
36__all__ = [
37    'ExecutorLoadFailure',
38    'MessageHandlingServer',
39    'MessagingServerError',
40    'ServerListenError',
41]
42
43LOG = logging.getLogger(__name__)
44
45# The default number of seconds of waiting after which we will emit a log
46# message
47DEFAULT_LOG_AFTER = 30
48
49
50_pool_opts = [
51    cfg.IntOpt('executor_thread_pool_size',
52               default=64,
53               deprecated_name="rpc_thread_pool_size",
54               help='Size of executor thread pool when'
55               ' executor is threading or eventlet.'),
56]
57
58
59class MessagingServerError(exceptions.MessagingException):
60    """Base class for all MessageHandlingServer exceptions."""
61
62
63class ExecutorLoadFailure(MessagingServerError):
64    """Raised if an executor can't be loaded."""
65
66    def __init__(self, executor, ex):
67        msg = 'Failed to load executor "%s": %s' % (executor, ex)
68        super(ExecutorLoadFailure, self).__init__(msg)
69        self.executor = executor
70        self.ex = ex
71
72
73class ServerListenError(MessagingServerError):
74    """Raised if we failed to listen on a target."""
75
76    def __init__(self, target, ex):
77        msg = 'Failed to listen on target "%s": %s' % (target, ex)
78        super(ServerListenError, self).__init__(msg)
79        self.target = target
80        self.ex = ex
81
82
83class TaskTimeout(MessagingServerError):
84    """Raised if we timed out waiting for a task to complete."""
85
86
87class _OrderedTask(object):
88    """A task which must be executed in a particular order.
89
90    A caller may wait for this task to complete by calling
91    `wait_for_completion`.
92
93    A caller may run this task with `run_once`, which will ensure that however
94    many times the task is called it only runs once. Simultaneous callers will
95    block until the running task completes, which means that any caller can be
96    sure that the task has completed after run_once returns.
97    """
98
99    INIT = 0      # The task has not yet started
100    RUNNING = 1   # The task is running somewhere
101    COMPLETE = 2  # The task has run somewhere
102
103    def __init__(self, name):
104        """Create a new _OrderedTask.
105
106        :param name: The name of this task. Used in log messages.
107        """
108        super(_OrderedTask, self).__init__()
109
110        self._name = name
111        self._cond = threading.Condition()
112        self._state = self.INIT
113
114    def _wait(self, condition, msg, log_after, timeout_timer):
115        """Wait while condition() is true. Write a log message if condition()
116        has not become false within `log_after` seconds. Raise TaskTimeout if
117        timeout_timer expires while waiting.
118        """
119
120        log_timer = None
121        if log_after != 0:
122            log_timer = timeutils.StopWatch(duration=log_after)
123            log_timer.start()
124
125        while condition():
126            if log_timer is not None and log_timer.expired():
127                LOG.warning('Possible hang: %s', msg)
128                LOG.debug(''.join(traceback.format_stack()))
129                # Only log once. After than we wait indefinitely without
130                # logging.
131                log_timer = None
132
133            if timeout_timer is not None and timeout_timer.expired():
134                raise TaskTimeout(msg)
135
136            timeouts = []
137            if log_timer is not None:
138                timeouts.append(log_timer.leftover())
139            if timeout_timer is not None:
140                timeouts.append(timeout_timer.leftover())
141
142            wait = None
143            if timeouts:
144                wait = min(timeouts)
145            self._cond.wait(wait)
146
147    @property
148    def complete(self):
149        return self._state == self.COMPLETE
150
151    def wait_for_completion(self, caller, log_after, timeout_timer):
152        """Wait until this task has completed.
153
154        :param caller: The name of the task which is waiting.
155        :param log_after: Emit a log message if waiting longer than `log_after`
156                          seconds.
157        :param timeout_timer: Raise TaskTimeout if StopWatch object
158                              `timeout_timer` expires while waiting.
159        """
160        with self._cond:
161            msg = '%s is waiting for %s to complete' % (caller, self._name)
162            self._wait(lambda: not self.complete,
163                       msg, log_after, timeout_timer)
164
165    def run_once(self, fn, log_after, timeout_timer):
166        """Run a task exactly once. If it is currently running in another
167        thread, wait for it to complete. If it has already run, return
168        immediately without running it again.
169
170        :param fn: The task to run. It must be a callable taking no arguments.
171                   It may optionally return another callable, which also takes
172                   no arguments, which will be executed after completion has
173                   been signaled to other threads.
174        :param log_after: Emit a log message if waiting longer than `log_after`
175                          seconds.
176        :param timeout_timer: Raise TaskTimeout if StopWatch object
177                              `timeout_timer` expires while waiting.
178        """
179        with self._cond:
180            if self._state == self.INIT:
181                self._state = self.RUNNING
182                # Note that nothing waits on RUNNING, so no need to notify
183
184                # We need to release the condition lock before calling out to
185                # prevent deadlocks. Reacquire it immediately afterwards.
186                self._cond.release()
187                try:
188                    post_fn = fn()
189                finally:
190                    self._cond.acquire()
191                    self._state = self.COMPLETE
192                    self._cond.notify_all()
193
194                if post_fn is not None:
195                    # Release the condition lock before calling out to prevent
196                    # deadlocks. Reacquire it immediately afterwards.
197                    self._cond.release()
198                    try:
199                        post_fn()
200                    finally:
201                        self._cond.acquire()
202            elif self._state == self.RUNNING:
203                msg = ('%s is waiting for another thread to complete'
204                       % self._name)
205                self._wait(lambda: self._state == self.RUNNING,
206                           msg, log_after, timeout_timer)
207
208
209class _OrderedTaskRunner(object):
210    """Mixin for a class which executes ordered tasks."""
211
212    def __init__(self, *args, **kwargs):
213        super(_OrderedTaskRunner, self).__init__(*args, **kwargs)
214
215        # Get a list of methods on this object which have the _ordered
216        # attribute
217        self._tasks = [name
218                       for (name, member) in inspect.getmembers(self)
219                       if inspect.ismethod(member) and
220                       getattr(member, '_ordered', False)]
221        self.reset_states()
222
223        self._reset_lock = threading.Lock()
224
225    def reset_states(self):
226        # Create new task states for tasks in reset
227        self._states = {task: _OrderedTask(task) for task in self._tasks}
228
229    @staticmethod
230    def decorate_ordered(fn, state, after, reset_after):
231
232        @functools.wraps(fn)
233        def wrapper(self, *args, **kwargs):
234            # If the reset_after state has already completed, reset state so
235            # we can run again.
236            # NOTE(mdbooth): This is ugly and requires external locking to be
237            # deterministic when using multiple threads. Consider a thread that
238            # does: server.stop(), server.wait(). If another thread causes a
239            # reset between stop() and wait(), this will not have the intended
240            # behaviour. It is safe without external locking, if the caller
241            # instantiates a new object.
242            with self._reset_lock:
243                if (reset_after is not None and
244                        self._states[reset_after].complete):
245                    self.reset_states()
246
247            # Store the states we started with in case the state wraps on us
248            # while we're sleeping. We must wait and run_once in the same
249            # epoch. If the epoch ended while we were sleeping, run_once will
250            # safely do nothing.
251            states = self._states
252
253            log_after = kwargs.pop('log_after', DEFAULT_LOG_AFTER)
254            timeout = kwargs.pop('timeout', None)
255
256            timeout_timer = None
257            if timeout is not None:
258                timeout_timer = timeutils.StopWatch(duration=timeout)
259                timeout_timer.start()
260
261            # Wait for the given preceding state to complete
262            if after is not None:
263                states[after].wait_for_completion(state,
264                                                  log_after, timeout_timer)
265
266            # Run this state
267            states[state].run_once(lambda: fn(self, *args, **kwargs),
268                                   log_after, timeout_timer)
269        return wrapper
270
271
272def ordered(after=None, reset_after=None):
273    """A method which will be executed as an ordered task. The method will be
274    called exactly once, however many times it is called. If it is called
275    multiple times simultaneously it will only be called once, but all callers
276    will wait until execution is complete.
277
278    If `after` is given, this method will not run until `after` has completed.
279
280    If `reset_after` is given and the target method has completed, allow this
281    task to run again by resetting all task states.
282
283    :param after: Optionally, the name of another `ordered` method. Wait for
284                  the completion of `after` before executing this method.
285    :param reset_after: Optionally, the name of another `ordered` method. Reset
286                        all states when calling this method if `reset_after`
287                        has completed.
288    """
289    def _ordered(fn):
290        # Set an attribute on the method so we can find it later
291        setattr(fn, '_ordered', True)
292        state = fn.__name__
293
294        return _OrderedTaskRunner.decorate_ordered(fn, state, after,
295                                                   reset_after)
296    return _ordered
297
298
299class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner,
300                            metaclass=abc.ABCMeta):
301    """Server for handling messages.
302
303    Connect a transport to a dispatcher that knows how to process the
304    message using an executor that knows how the app wants to create
305    new tasks.
306    """
307
308    def __init__(self, transport, dispatcher, executor=None):
309        """Construct a message handling server.
310
311        The dispatcher parameter is a DispatcherBase instance which is used
312        for routing request to endpoint for processing.
313
314        The executor parameter controls how incoming messages will be received
315        and dispatched. Executor is automatically detected from
316        execution environment.
317        It handles many message in parallel. If your application need
318        asynchronism then you need to consider to use the eventlet executor.
319
320        :param transport: the messaging transport
321        :type transport: Transport
322        :param dispatcher: has a dispatch() method which is invoked for each
323                           incoming request
324        :type dispatcher: DispatcherBase
325        :param executor: name of message executor - available values are
326                         'eventlet' and 'threading'
327        :type executor: str
328        """
329        if executor and executor not in ("threading", "eventlet"):
330            raise ExecutorLoadFailure(
331                executor,
332                "Executor should be None or 'eventlet' and 'threading'")
333        if not executor:
334            executor = utils.get_executor_with_context()
335
336        self.conf = transport.conf
337        self.conf.register_opts(_pool_opts)
338
339        self.transport = transport
340        self.dispatcher = dispatcher
341        self.executor_type = executor
342        if self.executor_type == "eventlet":
343            eventletutils.warn_eventlet_not_patched(
344                expected_patched_modules=['thread'],
345                what="the 'oslo.messaging eventlet executor'")
346
347        self.listener = None
348
349        try:
350            mgr = driver.DriverManager('oslo.messaging.executors',
351                                       self.executor_type)
352        except RuntimeError as ex:
353            raise ExecutorLoadFailure(self.executor_type, ex)
354
355        self._executor_cls = mgr.driver
356
357        self._work_executor = None
358
359        self._started = False
360
361        super(MessageHandlingServer, self).__init__()
362
363    def _on_incoming(self, incoming):
364        """Handles on_incoming event
365
366        :param incoming: incoming request.
367        """
368        self._work_executor.submit(self._process_incoming, incoming)
369
370    @abc.abstractmethod
371    def _process_incoming(self, incoming):
372        """Perform processing incoming request
373
374        :param incoming: incoming request.
375        """
376
377    @abc.abstractmethod
378    def _create_listener(self):
379        """Creates listener object for polling requests
380        :return: MessageListenerAdapter
381        """
382
383    @ordered(reset_after='stop')
384    def start(self, override_pool_size=None):
385        """Start handling incoming messages.
386
387        This method causes the server to begin polling the transport for
388        incoming messages and passing them to the dispatcher. Message
389        processing will continue until the stop() method is called.
390
391        The executor controls how the server integrates with the applications
392        I/O handling strategy - it may choose to poll for messages in a new
393        process, thread or co-operatively scheduled coroutine or simply by
394        registering a callback with an event loop. Similarly, the executor may
395        choose to dispatch messages in a new thread, coroutine or simply the
396        current thread.
397        """
398        if self._started:
399            LOG.warning('The server has already been started. Ignoring '
400                        'the redundant call to start().')
401            return
402
403        self._started = True
404
405        executor_opts = {}
406
407        executor_opts["max_workers"] = (
408            override_pool_size or self.conf.executor_thread_pool_size
409        )
410        self._work_executor = self._executor_cls(**executor_opts)
411
412        try:
413            self.listener = self._create_listener()
414        except driver_base.TransportDriverError as ex:
415            raise ServerListenError(self.target, ex)
416
417        self.listener.start(self._on_incoming)
418
419    @ordered(after='start')
420    def stop(self):
421        """Stop handling incoming messages.
422
423        Once this method returns, no new incoming messages will be handled by
424        the server. However, the server may still be in the process of handling
425        some messages, and underlying driver resources associated to this
426        server are still in use. See 'wait' for more details.
427        """
428        if self.listener:
429            self.listener.stop()
430        self._started = False
431
432    @ordered(after='stop')
433    def wait(self):
434        """Wait for message processing to complete.
435
436        After calling stop(), there may still be some existing messages
437        which have not been completely processed. The wait() method blocks
438        until all message processing has completed.
439
440        Once it's finished, the underlying driver resources associated to this
441        server are released (like closing useless network connections).
442        """
443        self._work_executor.shutdown(wait=True)
444
445        # Close listener connection after processing all messages
446        if self.listener:
447            self.listener.cleanup()
448
449    def reset(self):
450        """Reset service.
451
452        Called in case service running in daemon mode receives SIGHUP.
453        """
454        # TODO(sergey.vilgelm): implement this method
455        pass
456