1#    Copyright 2014, Red Hat, Inc.
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14
15"""
16Controller that manages the interface between the driver and the messaging
17service.
18
19This module defines a Controller class that is responsible for performing
20messaging-related operations (Tasks) requested by the driver, and for managing
21the connection to the messaging service.  The Controller creates a background
22thread which performs all messaging operations and socket I/O.  The
23Controller's messaging logic is executed in the background thread via lambda
24functions scheduled by the Controller.
25"""
26
27import abc
28import collections
29import logging
30import os
31import platform
32import queue
33import random
34import sys
35import threading
36import time
37import uuid
38
39from oslo_utils import eventletutils
40import proton
41import pyngus
42
43from oslo_messaging._drivers.amqp1_driver.addressing import AddresserFactory
44from oslo_messaging._drivers.amqp1_driver.addressing import keyify
45from oslo_messaging._drivers.amqp1_driver.addressing import SERVICE_NOTIFY
46from oslo_messaging._drivers.amqp1_driver.addressing import SERVICE_RPC
47from oslo_messaging._drivers.amqp1_driver import eventloop
48from oslo_messaging import exceptions
49from oslo_messaging.target import Target
50from oslo_messaging import transport
51
52LOG = logging.getLogger(__name__)
53
54
55class Task(object):
56    """Run a command on the eventloop thread, wait until it completes
57    """
58
59    @abc.abstractmethod
60    def wait(self):
61        """Called by the client thread to wait for the operation to
62        complete. The implementation may optionally return a value.
63        """
64
65    @abc.abstractmethod
66    def _execute(self, controller):
67        """This method will be run on the eventloop thread to perform the
68        messaging operation.
69        """
70
71
72class SubscribeTask(Task):
73    """A task that creates a subscription to the given target.  Messages
74    arriving from the target are given to the listener.
75    """
76    def __init__(self, target, listener, notifications=False):
77        super(SubscribeTask, self).__init__()
78        self._target = target()  # mutable - need a copy
79        self._subscriber_id = listener.id
80        self._in_queue = listener.incoming
81        self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC
82        self._wakeup = eventletutils.Event()
83
84    def wait(self):
85        self._wakeup.wait()
86
87    def _execute(self, controller):
88        controller.subscribe(self)
89        self._wakeup.set()
90
91
92class SendTask(Task):
93    """This is the class used by the Controller to send messages to a given
94    destination.
95    """
96    def __init__(self, name, message, target, deadline, retry,
97                 wait_for_ack, notification=False):
98        super(SendTask, self).__init__()
99        self.name = name
100        # note: target can be either a Target class or a string
101        # target is mutable - make copy
102        self.target = target() if isinstance(target, Target) else target
103        self.message = message
104        self.deadline = deadline
105        self.wait_for_ack = wait_for_ack
106        self.service = SERVICE_NOTIFY if notification else SERVICE_RPC
107        self.timer = None
108        self._retry = None if retry is None or retry < 0 else retry
109        self._wakeup = eventletutils.Event()
110        self._error = None
111        self._sender = None
112
113    def wait(self):
114        self._wakeup.wait()
115        return self._error
116
117    def _execute(self, controller):
118        if self.deadline:
119            # time out the send
120            self.timer = controller.processor.alarm(self._on_timeout,
121                                                    self.deadline)
122        controller.send(self)
123
124    def _prepare(self, sender):
125        """Called immediately before the message is handed off to the i/o
126        system.  This implies that the sender link is up.
127        """
128        self._sender = sender
129
130    def _on_ack(self, state, info):
131        """If wait_for_ack is True, this is called by the eventloop thread when
132        the ack/nack is received from the peer.  If wait_for_ack is False this
133        is called by the eventloop right after the message is written to the
134        link.  In the last case state will always be set to ACCEPTED.
135        """
136        if state != pyngus.SenderLink.ACCEPTED:
137            msg = ("{name} message send to {target} failed: remote"
138                   " disposition: {disp}, info:"
139                   "{info}".format(name=self.name,
140                                   target=self.target,
141                                   disp=state,
142                                   info=info))
143            self._error = exceptions.MessageDeliveryFailure(msg)
144            LOG.warning("%s", msg)
145        self._cleanup()
146        self._wakeup.set()
147
148    def _on_timeout(self):
149        """Invoked by the eventloop when our timer expires
150        """
151        self.timer = None
152        self._sender and self._sender.cancel_send(self)
153        msg = ("{name} message sent to {target} failed: timed"
154               " out".format(name=self.name, target=self.target))
155        LOG.warning("%s", msg)
156        # Only raise a MessagingTimeout if the caller has explicitly specified
157        # a timeout.
158        self._error = exceptions.MessagingTimeout(msg) \
159            if self.message.ttl else \
160            exceptions.MessageDeliveryFailure(msg)
161        self._cleanup()
162        self._wakeup.set()
163
164    def _on_error(self, description):
165        """Invoked by the eventloop if the send operation fails for reasons
166        other than timeout and nack.
167        """
168        msg = ("{name} message sent to {target} failed:"
169               " {reason}".format(name=self.name,
170                                  target=self.target,
171                                  reason=description))
172        LOG.warning("%s", msg)
173        self._error = exceptions.MessageDeliveryFailure(msg)
174        self._cleanup()
175        self._wakeup.set()
176
177    def _cleanup(self):
178        self._sender = None
179        if self.timer:
180            self.timer.cancel()
181            self.timer = None
182
183    @property
184    def _can_retry(self):
185        # has the retry count expired?
186        if self._retry is not None:
187            self._retry -= 1
188            if self._retry < 0:
189                return False
190        return True
191
192
193class RPCCallTask(SendTask):
194    """Performs an RPC Call.  Sends the request and waits for a response from
195    the destination.
196    """
197    def __init__(self, target, message, deadline, retry, wait_for_ack):
198        super(RPCCallTask, self).__init__("RPC Call", message, target,
199                                          deadline, retry, wait_for_ack)
200        self._reply_link = None
201        self._reply_msg = None
202        self._msg_id = None
203
204    def wait(self):
205        error = super(RPCCallTask, self).wait()
206        return error or self._reply_msg
207
208    def _prepare(self, sender):
209        super(RPCCallTask, self)._prepare(sender)
210        # reserve a message id for mapping the received response
211        if self._msg_id:
212            # already set so this is a re-transmit. To be safe cancel the old
213            # msg_id and allocate a fresh one.
214            self._reply_link.cancel_response(self._msg_id)
215        self._reply_link = sender._reply_link
216        rl = self._reply_link
217        self._msg_id = rl.prepare_for_response(self.message, self._on_reply)
218
219    def _on_reply(self, message):
220        # called if/when the reply message arrives
221        self._reply_msg = message
222        self._cleanup()
223        self._wakeup.set()
224
225    def _on_ack(self, state, info):
226        if state != pyngus.SenderLink.ACCEPTED:
227            super(RPCCallTask, self)._on_ack(state, info)
228        # must wait for reply if ACCEPTED
229
230    def _cleanup(self):
231        if self._msg_id:
232            self._reply_link.cancel_response(self._msg_id)
233            self._msg_id = None
234        self._reply_link = None
235        super(RPCCallTask, self)._cleanup()
236
237
238class RPCMonitoredCallTask(RPCCallTask):
239    """An RPC call which expects a periodic heartbeat until the response is
240    received.  There are two timeouts:
241    deadline - overall hard timeout, implemented in RPCCallTask
242    call_monitor_timeout - keep alive timeout, reset when heartbeat arrives
243    """
244    def __init__(self, target, message, deadline, call_monitor_timeout,
245                 retry, wait_for_ack):
246        super(RPCMonitoredCallTask, self).__init__(target, message, deadline,
247                                                   retry, wait_for_ack)
248        assert call_monitor_timeout is not None  # nosec
249        self._monitor_timeout = call_monitor_timeout
250        self._monitor_timer = None
251        self._set_alarm = None
252
253    def _execute(self, controller):
254        self._set_alarm = controller.processor.defer
255        self._monitor_timer = self._set_alarm(self._call_timeout,
256                                              self._monitor_timeout)
257        super(RPCMonitoredCallTask, self)._execute(controller)
258
259    def _call_timeout(self):
260        # monitor_timeout expired
261        self._monitor_timer = None
262        self._sender and self._sender.cancel_send(self)
263        msg = ("{name} message sent to {target} failed: call monitor timed"
264               " out".format(name=self.name, target=self.target))
265        LOG.warning("%s", msg)
266        self._error = exceptions.MessagingTimeout(msg)
267        self._cleanup()
268        self._wakeup.set()
269
270    def _on_reply(self, message):
271        # if reply is null, then this is the call monitor heartbeat
272        if message.body is None:
273            self._monitor_timer.cancel()
274            self._monitor_timer = self._set_alarm(self._call_timeout,
275                                                  self._monitor_timeout)
276        else:
277            super(RPCMonitoredCallTask, self)._on_reply(message)
278
279    def _cleanup(self):
280        self._set_alarm = None
281        if self._monitor_timer:
282            self._monitor_timer.cancel()
283            self._monitor_timer = None
284        super(RPCMonitoredCallTask, self)._cleanup()
285
286
287class MessageDispositionTask(Task):
288    """A task that updates the message disposition as accepted or released
289    for a Server
290    """
291    def __init__(self, disposition, released=False):
292        super(MessageDispositionTask, self).__init__()
293        self._disposition = disposition
294        self._released = released
295
296    def wait(self):
297        # disposition update does not have to block the sender since there is
298        # no result to pend for.  This avoids a thread context switch with
299        # every RPC call
300        pass
301
302    def _execute(self, controller):
303        try:
304            self._disposition(self._released)
305        except Exception as e:
306            # there's really nothing we can do about a failed disposition.
307            LOG.exception("Message acknowledgment failed: %s", e)
308
309
310class Sender(pyngus.SenderEventHandler):
311    """A link for sending to a particular destination on the message bus.
312    """
313    def __init__(self, destination, scheduler, delay, service):
314        super(Sender, self).__init__()
315        self._destination = destination
316        self._service = service
317        self._address = None
318        self._link = None
319        self._scheduler = scheduler
320        self._delay = delay  # for re-connecting/re-transmitting
321        # holds all pending SendTasks
322        self._pending_sends = collections.deque()
323        # holds all messages sent but not yet acked
324        self._unacked = set()
325        self._reply_link = None
326        self._connection = None
327        self._resend_timer = None
328
329    @property
330    def pending_messages(self):
331        return len(self._pending_sends)
332
333    @property
334    def unacked_messages(self):
335        return len(self._unacked)
336
337    def attach(self, connection, reply_link, addresser):
338        """Open the link. Called by the Controller when the AMQP connection
339        becomes active.
340        """
341        self._connection = connection
342        self._reply_link = reply_link
343        self._address = addresser.resolve(self._destination, self._service)
344        LOG.debug("Sender %s attached", self._address)
345        self._link = self._open_link()
346
347    def detach(self):
348        """Close the link.  Called by the controller when shutting down or in
349        response to a close requested by the remote.  May be re-attached later
350        (after a reset is done)
351        """
352        LOG.debug("Sender %s detached", self._address)
353        self._connection = None
354        self._reply_link = None
355        if self._resend_timer:
356            self._resend_timer.cancel()
357            self._resend_timer = None
358        if self._link:
359            self._link.close()
360
361    def reset(self, reason="Link reset"):
362        """Called by the controller on connection failover. Release all link
363        resources, abort any in-flight messages, and check the retry limit on
364        all pending send requests.
365        """
366        self._address = None
367        self._connection = None
368        self._reply_link = None
369        if self._link:
370            self._link.destroy()
371            self._link = None
372        self._abort_unacked(reason)
373        self._check_retry_limit(reason)
374
375    def destroy(self, reason="Link destroyed"):
376        """Destroy the sender and all pending messages.  Called on driver
377        shutdown.
378        """
379        LOG.debug("Sender %s destroyed", self._address)
380        self.reset(reason)
381        self._abort_pending(reason)
382
383    def send_message(self, send_task):
384        """Send a message out the link.
385        """
386        if not self._can_send or self._pending_sends:
387            self._pending_sends.append(send_task)
388        else:
389            self._send(send_task)
390
391    def cancel_send(self, send_task):
392        """Attempts to cancel a send request.  It is possible that the send has
393        already completed, so this is best-effort.
394        """
395        # may be in either list, or none
396        self._unacked.discard(send_task)
397        try:
398            self._pending_sends.remove(send_task)
399        except ValueError:
400            pass
401
402    # Pyngus callbacks:
403
404    def sender_active(self, sender_link):
405        LOG.debug("Sender %s active", self._address)
406        self._send_pending()
407
408    def credit_granted(self, sender_link):
409        pass
410
411    def sender_remote_closed(self, sender_link, pn_condition):
412        # The remote has initiated a close.  This could happen when the message
413        # bus is shutting down, or it detected an error
414        LOG.warning("Sender %(addr)s failed due to remote initiated close:"
415                    " condition=%(cond)s",
416                    {'addr': self._address, 'cond': pn_condition})
417        self._link.close()
418        # sender_closed() will be called once the link completes closing
419
420    def sender_closed(self, sender_link):
421        self._handle_sender_closed()
422
423    def sender_failed(self, sender_link, error):
424        """Protocol error occurred."""
425        LOG.warning("Sender %(addr)s failed error=%(error)s",
426                    {'addr': self._address, 'error': error})
427        self._handle_sender_closed(str(error))
428
429    # end Pyngus callbacks
430
431    def _handle_sender_closed(self, reason="Sender closed"):
432        self._abort_unacked(reason)
433        if self._connection:
434            # still attached, so attempt to restart the link
435            self._check_retry_limit(reason)
436            self._scheduler.defer(self._reopen_link, self._delay)
437
438    def _check_retry_limit(self, reason):
439        # Called on recoverable connection or link failure.  Remove any pending
440        # sends that have exhausted their retry count:
441        expired = set()
442        for send_task in self._pending_sends:
443            if not send_task._can_retry:
444                expired.add(send_task)
445                send_task._on_error("Message send failed: %s" % reason)
446        while expired:
447            self._pending_sends.remove(expired.pop())
448
449    def _abort_unacked(self, error):
450        # fail all messages that have been sent to the message bus and have not
451        # been acked yet
452        while self._unacked:
453            send_task = self._unacked.pop()
454            send_task._on_error("Message send failed: %s" % error)
455
456    def _abort_pending(self, error):
457        # fail all messages that have yet to be sent to the message bus
458        while self._pending_sends:
459            send_task = self._pending_sends.popleft()
460            send_task._on_error("Message send failed: %s" % error)
461
462    @property
463    def _can_send(self):
464        return self._link and self._link.active
465
466    # acknowledge status
467    _TIMED_OUT = pyngus.SenderLink.TIMED_OUT
468    _ACCEPTED = pyngus.SenderLink.ACCEPTED
469    _RELEASED = pyngus.SenderLink.RELEASED
470    _MODIFIED = pyngus.SenderLink.MODIFIED
471
472    def _send(self, send_task):
473        send_task._prepare(self)
474        send_task.message.address = self._address
475        if send_task.wait_for_ack:
476            self._unacked.add(send_task)
477
478            def pyngus_callback(link, handle, state, info):
479                # invoked when the message bus (n)acks this message
480                if state == Sender._TIMED_OUT:
481                    # ignore pyngus timeout - we maintain our own timer
482                    # which will properly deal with this case
483                    return
484                self._unacked.discard(send_task)
485                if state == Sender._ACCEPTED:
486                    send_task._on_ack(Sender._ACCEPTED, info)
487                elif (state == Sender._RELEASED or
488                      (state == Sender._MODIFIED and
489                          # assuming delivery-failed means in-doubt:
490                          not info.get("delivery-failed") and
491                          not info.get("undeliverable-here"))):
492                    # These states indicate that the message was never
493                    # forwarded beyond the next hop so they can be
494                    # re-transmitted without risk of duplication
495                    self._resend(send_task)
496                else:
497                    # some error - let task figure it out...
498                    send_task._on_ack(state, info)
499
500            self._link.send(send_task.message,
501                            delivery_callback=pyngus_callback,
502                            handle=self,
503                            deadline=send_task.deadline)
504        else:  # do not wait for ack
505            self._link.send(send_task.message,
506                            delivery_callback=None,
507                            handle=self,
508                            deadline=send_task.deadline)
509            send_task._on_ack(pyngus.SenderLink.ACCEPTED, {})
510
511    def _resend(self, send_task):
512        # the message bus returned the message without forwarding it. Wait a
513        # bit for other outstanding sends to finish - most likely ending up
514        # here since they are all going to the same destination - then resend
515        # this message
516        if send_task._can_retry:
517            # note well: once there is something on the pending list no further
518            # messages will be sent (they will all queue up behind this one).
519            self._pending_sends.append(send_task)
520            if self._resend_timer is None:
521                sched = self._scheduler
522                # this will get the pending sends going again
523                self._resend_timer = sched.defer(self._resend_pending,
524                                                 self._delay)
525        else:
526            send_task._on_error("Send retries exhausted")
527
528    def _resend_pending(self):
529        # run from the _resend_timer, attempt to resend pending messages
530        self._resend_timer = None
531        self._send_pending()
532
533    def _send_pending(self):
534        # flush all pending messages out
535        if self._can_send:
536            while self._pending_sends:
537                self._send(self._pending_sends.popleft())
538
539    def _open_link(self):
540        name = "openstack.org/om/sender/[%s]/%s" % (self._address,
541                                                    uuid.uuid4().hex)
542        link = self._connection.create_sender(name=name,
543                                              source_address=self._address,
544                                              target_address=self._address,
545                                              event_handler=self)
546        link.open()
547        return link
548
549    def _reopen_link(self):
550        if self._connection:
551            if self._link:
552                self._link.destroy()
553            self._link = self._open_link()
554
555
556class Replies(pyngus.ReceiverEventHandler):
557    """This is the receiving link for all RPC reply messages.  Messages are
558    routed to the proper incoming queue using the correlation-id header in the
559    message.
560    """
561    def __init__(self, connection, on_ready, on_down, capacity):
562        self._correlation = {}  # map of correlation-id to response queue
563        self._on_ready = on_ready
564        self._on_down = on_down
565        rname = ("openstack.org/om/receiver/[rpc-response]/%s"
566                 % uuid.uuid4().hex)
567        self._receiver = connection.create_receiver("rpc-response",
568                                                    event_handler=self,
569                                                    name=rname)
570
571        # capacity determines the maximum number of reply messages this link is
572        # willing to receive. As messages are received and capacity is
573        # consumed, this driver will 'top up' the capacity back to max
574        # capacity.  This number should be large enough to avoid needlessly
575        # flow-controlling the replies.
576        self._capacity = capacity
577        self._capacity_low = (capacity + 1) / 2
578        self._receiver.open()
579
580    def detach(self):
581        # close the link
582        if self._receiver:
583            self._receiver.close()
584
585    def destroy(self):
586        self._correlation.clear()
587        if self._receiver:
588            self._receiver.destroy()
589            self._receiver = None
590
591    def prepare_for_response(self, request, callback):
592        """Apply a unique message identifier to this request message. This will
593        be used to identify messages received in reply.  The identifier is
594        placed in the 'id' field of the request message.  It is expected that
595        the identifier will appear in the 'correlation-id' field of the
596        corresponding response message.
597
598        When the caller is done receiving replies, it must call cancel_response
599        """
600        request.id = uuid.uuid4().hex
601        # reply is placed on reply_queue
602        self._correlation[request.id] = callback
603        request.reply_to = self._receiver.source_address
604        return request.id
605
606    def cancel_response(self, msg_id):
607        """Abort waiting for the response message corresponding to msg_id.
608        This can be used if the request fails and no reply is expected.
609        """
610        try:
611            del self._correlation[msg_id]
612        except KeyError:
613            pass
614
615    @property
616    def active(self):
617        return self._receiver and self._receiver.active
618
619    # Pyngus ReceiverLink event callbacks:
620
621    def receiver_active(self, receiver_link):
622        """This is a Pyngus callback, invoked by Pyngus when the receiver_link
623        has transitioned to the open state and is able to receive incoming
624        messages.
625        """
626        LOG.debug("Replies link active src=%s", self._receiver.source_address)
627        receiver_link.add_capacity(self._capacity)
628        self._on_ready()
629
630    def receiver_remote_closed(self, receiver, pn_condition):
631        """This is a Pyngus callback, invoked by Pyngus when the peer of this
632        receiver link has initiated closing the connection.
633        """
634        if pn_condition:
635            LOG.error("Reply subscription closed by peer: %s",
636                      pn_condition)
637        receiver.close()
638
639    def receiver_failed(self, receiver_link, error):
640        """Protocol error occurred."""
641        LOG.error("Link to reply queue failed. error=%(error)s",
642                  {"error": error})
643        self._on_down()
644
645    def receiver_closed(self, receiver_link):
646        self._on_down()
647
648    def message_received(self, receiver, message, handle):
649        """This is a Pyngus callback, invoked by Pyngus when a new message
650        arrives on this receiver link from the peer.
651        """
652        key = message.correlation_id
653        try:
654            self._correlation[key](message)
655            receiver.message_accepted(handle)
656        except KeyError:
657            LOG.warning("Can't find receiver for response msg id=%s, "
658                        "dropping!", key)
659            receiver.message_modified(handle, True, True, None)
660        # ensure we have enough credit
661        if receiver.capacity <= self._capacity_low:
662            receiver.add_capacity(self._capacity - receiver.capacity)
663
664
665class Server(pyngus.ReceiverEventHandler):
666    """A group of links that receive messages from a set of addresses derived
667    from a given target.  Messages arriving on the links are placed on the
668    'incoming' queue.
669    """
670    def __init__(self, target, incoming, scheduler, delay, capacity):
671        self._target = target
672        self._incoming = incoming
673        self._addresses = []
674        self._capacity = capacity   # credit per each link
675        self._capacity_low = (capacity + 1) / 2
676        self._receivers = []
677        self._scheduler = scheduler
678        self._delay = delay  # for link re-attach
679        self._connection = None
680        self._reopen_scheduled = False
681
682    def attach(self, connection):
683        """Create receiver links over the given connection for all the
684        configured addresses.
685        """
686        self._connection = connection
687        for a in self._addresses:
688            name = "openstack.org/om/receiver/[%s]/%s" % (a, uuid.uuid4().hex)
689            r = self._open_link(a, name)
690            self._receivers.append(r)
691
692    def detach(self):
693        """Attempt a clean shutdown of the links"""
694        self._connection = None
695        self._addresses = []
696        for receiver in self._receivers:
697            receiver.close()
698
699    def reset(self):
700        # destroy the links, but keep the addresses around since we may be
701        # failing over.  Since links are destroyed, this cannot be called from
702        # any of the following ReceiverLink callbacks.
703        self._connection = None
704        self._addresses = []
705        self._reopen_scheduled = False
706        for r in self._receivers:
707            r.destroy()
708        self._receivers = []
709
710    # Pyngus ReceiverLink event callbacks.  Note that all of the Server's links
711    # share this handler
712
713    def receiver_remote_closed(self, receiver, pn_condition):
714        """This is a Pyngus callback, invoked by Pyngus when the peer of this
715        receiver link has initiated closing the connection.
716        """
717        LOG.debug("Server subscription to %s remote detach",
718                  receiver.source_address)
719        if pn_condition:
720            vals = {
721                "addr": receiver.source_address or receiver.target_address,
722                "err_msg": pn_condition
723            }
724            LOG.error("Server subscription %(addr)s closed "
725                      "by peer: %(err_msg)s", vals)
726        receiver.close()
727
728    def receiver_failed(self, receiver_link, error):
729        """Protocol error occurred."""
730        LOG.error("Listener link queue failed. error=%(error)s",
731                  {"error": error})
732        self.receiver_closed(receiver_link)
733
734    def receiver_closed(self, receiver_link):
735        LOG.debug("Server subscription to %s closed",
736                  receiver_link.source_address)
737        # If still attached, attempt to re-start link
738        if self._connection and not self._reopen_scheduled:
739            LOG.debug("Server subscription reopen scheduled")
740            self._reopen_scheduled = True
741            self._scheduler.defer(self._reopen_links, self._delay)
742
743    def message_received(self, receiver, message, handle):
744        """This is a Pyngus callback, invoked by Pyngus when a new message
745        arrives on this receiver link from the peer.
746        """
747        def message_disposition(released=False):
748            if receiver in self._receivers and not receiver.closed:
749                if released:
750                    receiver.message_released(handle)
751                else:
752                    receiver.message_accepted(handle)
753                if receiver.capacity <= self._capacity_low:
754                    receiver.add_capacity(self._capacity - receiver.capacity)
755            else:
756                LOG.debug("Can't find receiver for settlement")
757
758        qentry = {"message": message, "disposition": message_disposition}
759        self._incoming.put(qentry)
760
761    def _open_link(self, address, name):
762        props = {"snd-settle-mode": "mixed"}
763        r = self._connection.create_receiver(source_address=address,
764                                             target_address=address,
765                                             event_handler=self,
766                                             name=name,
767                                             properties=props)
768        r.add_capacity(self._capacity)
769        r.open()
770        return r
771
772    def _reopen_links(self):
773        # attempt to re-establish any closed links
774        LOG.debug("Server subscription reopening")
775        self._reopen_scheduled = False
776        if self._connection:
777            for i in range(len(self._receivers)):
778                link = self._receivers[i]
779                if link.closed:
780                    addr = link.target_address
781                    name = link.name
782                    link.destroy()
783                    self._receivers[i] = self._open_link(addr, name)
784
785
786class RPCServer(Server):
787    """Subscribes to RPC addresses"""
788    def __init__(self, target, incoming, scheduler, delay, capacity):
789        super(RPCServer, self).__init__(target, incoming, scheduler, delay,
790                                        capacity)
791
792    def attach(self, connection, addresser):
793        # Generate the AMQP 1.0 addresses for the base class
794        self._addresses = [
795            addresser.unicast_address(self._target, SERVICE_RPC),
796            addresser.multicast_address(self._target, SERVICE_RPC),
797            addresser.anycast_address(self._target, SERVICE_RPC)
798        ]
799        # now invoke the base class with the generated addresses
800        super(RPCServer, self).attach(connection)
801
802
803class NotificationServer(Server):
804    """Subscribes to Notification addresses"""
805    def __init__(self, target, incoming, scheduler, delay, capacity):
806        super(NotificationServer, self).__init__(target, incoming, scheduler,
807                                                 delay, capacity)
808
809    def attach(self, connection, addresser):
810        # Generate the AMQP 1.0 addresses for the base class
811        self._addresses = [
812            addresser.anycast_address(self._target, SERVICE_NOTIFY)
813        ]
814        # now invoke the base class with the generated addresses
815        super(NotificationServer, self).attach(connection)
816
817
818class Hosts(object):
819    """An order list of TransportHost addresses. Connection failover progresses
820    from one host to the next.  The default realm comes from the configuration
821    and is only used if no realm is present in the URL.
822    """
823    def __init__(self, url, default_realm=None):
824        self.virtual_host = url.virtual_host
825        if url.hosts:
826            self._entries = url.hosts[:]
827        else:
828            self._entries = [transport.TransportHost(hostname="localhost",
829                                                     port=5672)]
830        for entry in self._entries:
831            entry.port = entry.port or 5672
832            entry.username = entry.username
833            entry.password = entry.password
834            if default_realm and entry.username and '@' not in entry.username:
835                entry.username = entry.username + '@' + default_realm
836        self._current = random.randint(0, len(self._entries) - 1)  # nosec
837
838    @property
839    def current(self):
840        return self._entries[self._current]
841
842    def next(self):
843        if len(self._entries) > 1:
844            self._current = (self._current + 1) % len(self._entries)
845        return self.current
846
847    def __repr__(self):
848        return '<Hosts ' + str(self) + '>'
849
850    def __str__(self):
851        r = ', vhost=%s' % self.virtual_host if self.virtual_host else ''
852        return ", ".join(["%r" % th for th in self._entries]) + r
853
854
855class Controller(pyngus.ConnectionEventHandler):
856    """Controls the connection to the AMQP messaging service.  This object is
857    the 'brains' of the driver.  It maintains the logic for addressing, sending
858    and receiving messages, and managing the connection.  All messaging and I/O
859    work is done on the Eventloop thread, allowing the driver to run
860    asynchronously from the messaging clients.
861    """
862    def __init__(self, url, default_exchange, config):
863        self.processor = None
864        self._socket_connection = None
865        self._node = platform.node() or "<UNKNOWN>"
866        self._command = os.path.basename(sys.argv[0])
867        self._pid = os.getpid()
868        # queue of drivertask objects to execute on the eventloop thread
869        self._tasks = queue.Queue(maxsize=500)
870        # limit the number of Task()'s to execute per call to _process_tasks().
871        # This allows the eventloop main thread to return to servicing socket
872        # I/O in a timely manner
873        self._max_task_batch = 50
874        # cache of all Sender links indexed by address:
875        self._all_senders = {}
876        # active Sender links indexed by address:
877        self._active_senders = set()
878        # closing Sender links indexed by address:
879        self._purged_senders = []
880        # Servers indexed by target. Each entry is a map indexed by the
881        # specific ProtonListener's identifier:
882        self._servers = {}
883
884        self._container_name = config.oslo_messaging_amqp.container_name
885        self.idle_timeout = config.oslo_messaging_amqp.idle_timeout
886        self.trace_protocol = config.oslo_messaging_amqp.trace
887        self.ssl = config.oslo_messaging_amqp.ssl
888        self.ssl_ca_file = config.oslo_messaging_amqp.ssl_ca_file
889        self.ssl_cert_file = config.oslo_messaging_amqp.ssl_cert_file
890        self.ssl_key_file = config.oslo_messaging_amqp.ssl_key_file
891        self.ssl_key_password = config.oslo_messaging_amqp.ssl_key_password
892        self.ssl_verify_vhost = config.oslo_messaging_amqp.ssl_verify_vhost
893        self.pseudo_vhost = config.oslo_messaging_amqp.pseudo_vhost
894        self.sasl_mechanisms = config.oslo_messaging_amqp.sasl_mechanisms
895        self.sasl_config_dir = config.oslo_messaging_amqp.sasl_config_dir
896        self.sasl_config_name = config.oslo_messaging_amqp.sasl_config_name
897        self.hosts = Hosts(url, config.oslo_messaging_amqp.sasl_default_realm)
898        self.conn_retry_interval = \
899            config.oslo_messaging_amqp.connection_retry_interval
900        self.conn_retry_backoff = \
901            config.oslo_messaging_amqp.connection_retry_backoff
902        self.conn_retry_interval_max = \
903            config.oslo_messaging_amqp.connection_retry_interval_max
904        self.link_retry_delay = config.oslo_messaging_amqp.link_retry_delay
905
906        _opts = config.oslo_messaging_amqp
907        factory_args = {"legacy_server_prefix": _opts.server_request_prefix,
908                        "legacy_broadcast_prefix": _opts.broadcast_prefix,
909                        "legacy_group_prefix": _opts.group_request_prefix,
910                        "rpc_prefix": _opts.rpc_address_prefix,
911                        "notify_prefix": _opts.notify_address_prefix,
912                        "multicast": _opts.multicast_address,
913                        "unicast": _opts.unicast_address,
914                        "anycast": _opts.anycast_address,
915                        "notify_exchange": _opts.default_notification_exchange,
916                        "rpc_exchange": _opts.default_rpc_exchange}
917
918        self.addresser_factory = AddresserFactory(default_exchange,
919                                                  _opts.addressing_mode,
920                                                  **factory_args)
921        self.addresser = None
922
923        # cannot send an RPC request until the replies link is active, as we
924        # need the peer assigned address, so need to delay sending any RPC
925        # requests until this link is active:
926        self.reply_link = None
927        # Set True when the driver is shutting down
928        self._closing = False
929        # only schedule one outstanding reconnect attempt at a time
930        self._reconnecting = False
931        self._delay = self.conn_retry_interval  # seconds between retries
932        # prevent queuing up multiple requests to run _process_tasks()
933        self._process_tasks_scheduled = False
934        self._process_tasks_lock = threading.Lock()
935        # credit levels for incoming links
936        self._reply_credit = _opts.reply_link_credit
937        self._rpc_credit = _opts.rpc_server_credit
938        self._notify_credit = _opts.notify_server_credit
939        # sender link maintenance timer and interval
940        self._link_maint_timer = None
941        self._link_maint_timeout = _opts.default_sender_link_timeout
942
943    def connect(self):
944        """Connect to the messaging service."""
945        self.processor = eventloop.Thread(self._container_name, self._node,
946                                          self._command, self._pid)
947        self.processor.wakeup(lambda: self._do_connect())
948
949    def add_task(self, task):
950        """Add a Task for execution on processor thread."""
951        self._tasks.put(task)
952        self._schedule_task_processing()
953
954    def shutdown(self, timeout=30):
955        """Shutdown the messaging service."""
956        LOG.info("Shutting down the AMQP 1.0 connection")
957        if self.processor:
958            self.processor.wakeup(self._start_shutdown)
959            LOG.debug("Waiting for eventloop to exit")
960            self.processor.join(timeout)
961            self._hard_reset("Shutting down")
962            for sender in self._all_senders.values():
963                sender.destroy()
964            self._all_senders.clear()
965            self._servers.clear()
966            self.processor.destroy()
967            self.processor = None
968        LOG.debug("Eventloop exited, driver shut down")
969
970    # The remaining methods are reserved to run from the eventloop thread only!
971    # They must not be invoked directly!
972
973    # methods executed by Tasks created by the driver:
974
975    def send(self, send_task):
976        if send_task.deadline and send_task.deadline <= time.monotonic():
977            send_task._on_timeout()
978            return
979        key = keyify(send_task.target, send_task.service)
980        sender = self._all_senders.get(key)
981        if not sender:
982            sender = Sender(send_task.target, self.processor,
983                            self.link_retry_delay, send_task.service)
984            self._all_senders[key] = sender
985            if self.reply_link and self.reply_link.active:
986                sender.attach(self._socket_connection.pyngus_conn,
987                              self.reply_link, self.addresser)
988        self._active_senders.add(key)
989        sender.send_message(send_task)
990
991    def subscribe(self, subscribe_task):
992        """Subscribe to a given target"""
993        if subscribe_task._service == SERVICE_NOTIFY:
994            t = "notification"
995            server = NotificationServer(subscribe_task._target,
996                                        subscribe_task._in_queue,
997                                        self.processor,
998                                        self.link_retry_delay,
999                                        self._notify_credit)
1000        else:
1001            t = "RPC"
1002            server = RPCServer(subscribe_task._target,
1003                               subscribe_task._in_queue,
1004                               self.processor,
1005                               self.link_retry_delay,
1006                               self._rpc_credit)
1007
1008        LOG.debug("Subscribing to %(type)s target %(target)s",
1009                  {'type': t, 'target': subscribe_task._target})
1010        key = keyify(subscribe_task._target, subscribe_task._service)
1011        servers = self._servers.get(key)
1012        if servers is None:
1013            servers = {}
1014            self._servers[key] = servers
1015        servers[subscribe_task._subscriber_id] = server
1016        if self._active:
1017            server.attach(self._socket_connection.pyngus_conn,
1018                          self.addresser)
1019
1020    # commands executed on the processor (eventloop) via 'wakeup()':
1021
1022    def _do_connect(self):
1023        """Establish connection and reply subscription on processor thread."""
1024        host = self.hosts.current
1025        conn_props = {'properties': {'process': self._command,
1026                                     'pid': self._pid,
1027                                     'node': self._node}}
1028        # only set hostname in the AMQP 1.0 Open performative if the message
1029        # bus can interpret it as the virtual host.  We leave it unspecified
1030        # since apparently noone can agree on how it should be used otherwise!
1031        if self.hosts.virtual_host and not self.pseudo_vhost:
1032            conn_props['hostname'] = self.hosts.virtual_host
1033        if self.idle_timeout:
1034            conn_props["idle-time-out"] = float(self.idle_timeout)
1035        if self.trace_protocol:
1036            conn_props["x-trace-protocol"] = self.trace_protocol
1037
1038        # SSL configuration
1039        ssl_enabled = False
1040        if self.ssl:
1041            ssl_enabled = True
1042            conn_props["x-ssl"] = self.ssl
1043        if self.ssl_ca_file:
1044            conn_props["x-ssl-ca-file"] = self.ssl_ca_file
1045            ssl_enabled = True
1046        if self.ssl_cert_file:
1047            ssl_enabled = True
1048            conn_props["x-ssl-identity"] = (self.ssl_cert_file,
1049                                            self.ssl_key_file,
1050                                            self.ssl_key_password)
1051        if ssl_enabled:
1052            # Set the identity of the remote server for SSL to use when
1053            # verifying the received certificate.  Typically this is the DNS
1054            # name used to set up the TCP connections.  However some servers
1055            # may provide a certificate for the virtual host instead.  If that
1056            # is the case we need to use the virtual hostname instead.
1057            # Refer to SSL Server Name Indication (SNI) for the entire story:
1058            # https://tools.ietf.org/html/rfc6066
1059            if self.ssl_verify_vhost:
1060                if self.hosts.virtual_host:
1061                    conn_props['x-ssl-peer-name'] = self.hosts.virtual_host
1062            else:
1063                conn_props['x-ssl-peer-name'] = host.hostname
1064
1065        # SASL configuration:
1066        if self.sasl_mechanisms:
1067            conn_props["x-sasl-mechs"] = self.sasl_mechanisms
1068        if self.sasl_config_dir:
1069            conn_props["x-sasl-config-dir"] = self.sasl_config_dir
1070        if self.sasl_config_name:
1071            conn_props["x-sasl-config-name"] = self.sasl_config_name
1072
1073        self._socket_connection = self.processor.connect(host,
1074                                                         handler=self,
1075                                                         properties=conn_props)
1076        LOG.debug("Connection initiated")
1077
1078    def _process_tasks(self):
1079        """Execute Task objects in the context of the processor thread."""
1080        with self._process_tasks_lock:
1081            self._process_tasks_scheduled = False
1082        count = 0
1083        while (not self._tasks.empty() and
1084               count < self._max_task_batch):
1085            try:
1086                self._tasks.get(False)._execute(self)
1087            except Exception as e:
1088                LOG.exception("Error processing task: %s", e)
1089            count += 1
1090
1091        # if we hit _max_task_batch, resume task processing later:
1092        if not self._tasks.empty():
1093            self._schedule_task_processing()
1094
1095    def _schedule_task_processing(self):
1096        """_process_tasks() helper: prevent queuing up multiple requests for
1097        task processing.  This method is called both by the application thread
1098        and the processing thread.
1099        """
1100        if self.processor:
1101            with self._process_tasks_lock:
1102                already_scheduled = self._process_tasks_scheduled
1103                self._process_tasks_scheduled = True
1104            if not already_scheduled:
1105                self.processor.wakeup(lambda: self._process_tasks())
1106
1107    def _start_shutdown(self):
1108        """Called when the application is closing the transport.
1109        Attempt to cleanly flush/close all links.
1110        """
1111        self._closing = True
1112        if self._active:
1113            # try a clean shutdown
1114            self._detach_senders()
1115            self._detach_servers()
1116            self.reply_link.detach()
1117            self._socket_connection.pyngus_conn.close()
1118        else:
1119            # don't wait for a close from the remote, may never happen
1120            self.processor.shutdown()
1121
1122    # reply link callbacks:
1123
1124    def _reply_link_ready(self):
1125        """Invoked when the Replies reply link has become active.  At this
1126        point, we are ready to receive messages, so start all pending RPC
1127        requests.
1128        """
1129        LOG.info("Messaging is active (%(hostname)s:%(port)s%(vhost)s)",
1130                 {'hostname': self.hosts.current.hostname,
1131                  'port': self.hosts.current.port,
1132                  'vhost': ("/" + self.hosts.virtual_host
1133                            if self.hosts.virtual_host else "")})
1134
1135        for sender in self._all_senders.values():
1136            sender.attach(self._socket_connection.pyngus_conn,
1137                          self.reply_link, self.addresser)
1138
1139    def _reply_link_down(self):
1140        # Treat it as a recoverable failure because the RPC reply address is
1141        # now invalid for all in-flight RPC requests.
1142        if not self._closing:
1143            self._detach_senders()
1144            self._detach_servers()
1145            self._socket_connection.pyngus_conn.close()
1146            # once closed, _handle_connection_loss() will initiate reconnect
1147
1148    # callback from eventloop on socket error
1149
1150    def socket_error(self, error):
1151        """Called by eventloop when a socket error occurs."""
1152        LOG.error("Socket failure: %s", error)
1153        self._handle_connection_loss(str(error))
1154
1155    # Pyngus connection event callbacks (and their helpers), all invoked from
1156    # the eventloop thread:
1157
1158    def connection_failed(self, connection, error):
1159        """This is a Pyngus callback, invoked by Pyngus when a non-recoverable
1160        error occurs on the connection.
1161        """
1162        if connection is not self._socket_connection.pyngus_conn:
1163            # pyngus bug: ignore failure callback on destroyed connections
1164            return
1165        LOG.debug("AMQP Connection failure: %s", error)
1166        self._handle_connection_loss(str(error))
1167
1168    def connection_active(self, connection):
1169        """This is a Pyngus callback, invoked by Pyngus when the connection to
1170        the peer is up.  At this point, the driver will activate all subscriber
1171        links (server) and the reply link.
1172        """
1173        LOG.debug("Connection active (%(hostname)s:%(port)s), subscribing...",
1174                  {'hostname': self.hosts.current.hostname,
1175                   'port': self.hosts.current.port})
1176        # allocate an addresser based on the advertised properties of the
1177        # message bus
1178        props = connection.remote_properties or {}
1179        self.addresser = self.addresser_factory(props,
1180                                                self.hosts.virtual_host
1181                                                if self.pseudo_vhost else None)
1182        for servers in self._servers.values():
1183            for server in servers.values():
1184                server.attach(self._socket_connection.pyngus_conn,
1185                              self.addresser)
1186        self.reply_link = Replies(self._socket_connection.pyngus_conn,
1187                                  self._reply_link_ready,
1188                                  self._reply_link_down,
1189                                  self._reply_credit)
1190        self._delay = self.conn_retry_interval   # reset
1191        # schedule periodic maintenance of sender links
1192        self._link_maint_timer = self.processor.defer(self._purge_sender_links,
1193                                                      self._link_maint_timeout)
1194
1195    def connection_closed(self, connection):
1196        """This is a Pyngus callback, invoked by Pyngus when the connection has
1197        cleanly closed.  This occurs after the driver closes the connection
1198        locally, and the peer has acknowledged the close.  At this point, the
1199        shutdown of the driver's connection is complete.
1200        """
1201        LOG.debug("AMQP connection closed.")
1202        # if the driver isn't being shutdown, failover and reconnect
1203        self._handle_connection_loss("AMQP connection closed.")
1204
1205    def connection_remote_closed(self, connection, reason):
1206        """This is a Pyngus callback, invoked by Pyngus when the peer has
1207        requested that the connection be closed.
1208        """
1209        # The messaging service/broker is trying to shut down the
1210        # connection. Acknowledge the close, and try to reconnect/failover
1211        # later once the connection has closed (connection_closed is called).
1212        if reason:
1213            LOG.info("Connection closed by peer: %s", reason)
1214        self._detach_senders()
1215        self._detach_servers()
1216        self.reply_link.detach()
1217        self._socket_connection.pyngus_conn.close()
1218
1219    def sasl_done(self, connection, pn_sasl, outcome):
1220        """This is a Pyngus callback invoked when the SASL handshake
1221        has completed.  The outcome of the handshake is passed in the outcome
1222        argument.
1223        """
1224        if outcome == proton.SASL.OK:
1225            return
1226        LOG.error("AUTHENTICATION FAILURE: Cannot connect to "
1227                  "%(hostname)s:%(port)s as user %(username)s",
1228                  {'hostname': self.hosts.current.hostname,
1229                   'port': self.hosts.current.port,
1230                   'username': self.hosts.current.username})
1231        # pyngus will invoke connection_failed() eventually
1232
1233    def _handle_connection_loss(self, reason):
1234        """The connection to the messaging service has been lost.  Try to
1235        reestablish the connection/failover if not shutting down the driver.
1236        """
1237        self.addresser = None
1238        self._socket_connection.close()
1239        if self._closing:
1240            # we're in the middle of shutting down the driver anyways,
1241            # just consider it done:
1242            self.processor.shutdown()
1243        else:
1244            # for some reason, we've lost the connection to the messaging
1245            # service.  Try to re-establish the connection:
1246            if not self._reconnecting:
1247                self._reconnecting = True
1248                LOG.info("Delaying reconnect attempt for %d seconds",
1249                         self._delay)
1250                self.processor.defer(lambda: self._do_reconnect(reason),
1251                                     self._delay)
1252                self._delay = min(self._delay * self.conn_retry_backoff,
1253                                  self.conn_retry_interval_max)
1254            if self._link_maint_timer:
1255                self._link_maint_timer.cancel()
1256                self._link_maint_timer = None
1257
1258    def _do_reconnect(self, reason):
1259        """Invoked on connection/socket failure, failover and re-connect to the
1260        messaging service.
1261        """
1262        self._reconnecting = False
1263        if not self._closing:
1264            self._hard_reset(reason)
1265            host = self.hosts.next()
1266            LOG.info("Reconnecting to: %(hostname)s:%(port)s",
1267                     {'hostname': host.hostname, 'port': host.port})
1268            self.processor.wakeup(lambda: self._do_connect())
1269
1270    def _hard_reset(self, reason):
1271        """Reset the controller to its pre-connection state"""
1272        # note well: since this method destroys the connection, it cannot be
1273        # invoked directly from a pyngus callback.  Use processor.defer() to
1274        # run this method on the main loop instead.
1275        for sender in self._purged_senders:
1276            sender.destroy(reason)
1277        del self._purged_senders[:]
1278        self._active_senders.clear()
1279        unused = []
1280        for key, sender in self._all_senders.items():
1281            # clean up any sender links that no longer have messages to send
1282            if sender.pending_messages == 0:
1283                unused.append(key)
1284            else:
1285                sender.reset(reason)
1286                self._active_senders.add(key)
1287        for key in unused:
1288            self._all_senders[key].destroy(reason)
1289            del self._all_senders[key]
1290        for servers in self._servers.values():
1291            for server in servers.values():
1292                server.reset()
1293        if self.reply_link:
1294            self.reply_link.destroy()
1295            self.reply_link = None
1296        if self._socket_connection:
1297            self._socket_connection.reset()
1298
1299    def _detach_senders(self):
1300        """Close all sender links"""
1301        for sender in self._all_senders.values():
1302            sender.detach()
1303
1304    def _detach_servers(self):
1305        """Close all listener links"""
1306        for servers in self._servers.values():
1307            for server in servers.values():
1308                server.detach()
1309
1310    def _purge_sender_links(self):
1311        """Purge inactive sender links"""
1312        if not self._closing:
1313            # destroy links that have already been closed
1314            for sender in self._purged_senders:
1315                sender.destroy("Idle link purged")
1316            del self._purged_senders[:]
1317
1318            # determine next set to purge
1319            purge = set(self._all_senders.keys()) - self._active_senders
1320            for key in purge:
1321                sender = self._all_senders[key]
1322                if not sender.pending_messages and not sender.unacked_messages:
1323                    sender.detach()
1324                    self._purged_senders.append(self._all_senders.pop(key))
1325            self._active_senders.clear()
1326            self._link_maint_timer = \
1327                self.processor.defer(self._purge_sender_links,
1328                                     self._link_maint_timeout)
1329
1330    @property
1331    def _active(self):
1332        # Is the connection up
1333        return (self._socket_connection and
1334                self._socket_connection.pyngus_conn.active)
1335