1# Copyright 2014, Red Hat, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15""" 16Controller that manages the interface between the driver and the messaging 17service. 18 19This module defines a Controller class that is responsible for performing 20messaging-related operations (Tasks) requested by the driver, and for managing 21the connection to the messaging service. The Controller creates a background 22thread which performs all messaging operations and socket I/O. The 23Controller's messaging logic is executed in the background thread via lambda 24functions scheduled by the Controller. 25""" 26 27import abc 28import collections 29import logging 30import os 31import platform 32import queue 33import random 34import sys 35import threading 36import time 37import uuid 38 39from oslo_utils import eventletutils 40import proton 41import pyngus 42 43from oslo_messaging._drivers.amqp1_driver.addressing import AddresserFactory 44from oslo_messaging._drivers.amqp1_driver.addressing import keyify 45from oslo_messaging._drivers.amqp1_driver.addressing import SERVICE_NOTIFY 46from oslo_messaging._drivers.amqp1_driver.addressing import SERVICE_RPC 47from oslo_messaging._drivers.amqp1_driver import eventloop 48from oslo_messaging import exceptions 49from oslo_messaging.target import Target 50from oslo_messaging import transport 51 52LOG = logging.getLogger(__name__) 53 54 55class Task(object): 56 """Run a command on the eventloop thread, wait until it completes 57 """ 58 59 @abc.abstractmethod 60 def wait(self): 61 """Called by the client thread to wait for the operation to 62 complete. The implementation may optionally return a value. 63 """ 64 65 @abc.abstractmethod 66 def _execute(self, controller): 67 """This method will be run on the eventloop thread to perform the 68 messaging operation. 69 """ 70 71 72class SubscribeTask(Task): 73 """A task that creates a subscription to the given target. Messages 74 arriving from the target are given to the listener. 75 """ 76 def __init__(self, target, listener, notifications=False): 77 super(SubscribeTask, self).__init__() 78 self._target = target() # mutable - need a copy 79 self._subscriber_id = listener.id 80 self._in_queue = listener.incoming 81 self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC 82 self._wakeup = eventletutils.Event() 83 84 def wait(self): 85 self._wakeup.wait() 86 87 def _execute(self, controller): 88 controller.subscribe(self) 89 self._wakeup.set() 90 91 92class SendTask(Task): 93 """This is the class used by the Controller to send messages to a given 94 destination. 95 """ 96 def __init__(self, name, message, target, deadline, retry, 97 wait_for_ack, notification=False): 98 super(SendTask, self).__init__() 99 self.name = name 100 # note: target can be either a Target class or a string 101 # target is mutable - make copy 102 self.target = target() if isinstance(target, Target) else target 103 self.message = message 104 self.deadline = deadline 105 self.wait_for_ack = wait_for_ack 106 self.service = SERVICE_NOTIFY if notification else SERVICE_RPC 107 self.timer = None 108 self._retry = None if retry is None or retry < 0 else retry 109 self._wakeup = eventletutils.Event() 110 self._error = None 111 self._sender = None 112 113 def wait(self): 114 self._wakeup.wait() 115 return self._error 116 117 def _execute(self, controller): 118 if self.deadline: 119 # time out the send 120 self.timer = controller.processor.alarm(self._on_timeout, 121 self.deadline) 122 controller.send(self) 123 124 def _prepare(self, sender): 125 """Called immediately before the message is handed off to the i/o 126 system. This implies that the sender link is up. 127 """ 128 self._sender = sender 129 130 def _on_ack(self, state, info): 131 """If wait_for_ack is True, this is called by the eventloop thread when 132 the ack/nack is received from the peer. If wait_for_ack is False this 133 is called by the eventloop right after the message is written to the 134 link. In the last case state will always be set to ACCEPTED. 135 """ 136 if state != pyngus.SenderLink.ACCEPTED: 137 msg = ("{name} message send to {target} failed: remote" 138 " disposition: {disp}, info:" 139 "{info}".format(name=self.name, 140 target=self.target, 141 disp=state, 142 info=info)) 143 self._error = exceptions.MessageDeliveryFailure(msg) 144 LOG.warning("%s", msg) 145 self._cleanup() 146 self._wakeup.set() 147 148 def _on_timeout(self): 149 """Invoked by the eventloop when our timer expires 150 """ 151 self.timer = None 152 self._sender and self._sender.cancel_send(self) 153 msg = ("{name} message sent to {target} failed: timed" 154 " out".format(name=self.name, target=self.target)) 155 LOG.warning("%s", msg) 156 # Only raise a MessagingTimeout if the caller has explicitly specified 157 # a timeout. 158 self._error = exceptions.MessagingTimeout(msg) \ 159 if self.message.ttl else \ 160 exceptions.MessageDeliveryFailure(msg) 161 self._cleanup() 162 self._wakeup.set() 163 164 def _on_error(self, description): 165 """Invoked by the eventloop if the send operation fails for reasons 166 other than timeout and nack. 167 """ 168 msg = ("{name} message sent to {target} failed:" 169 " {reason}".format(name=self.name, 170 target=self.target, 171 reason=description)) 172 LOG.warning("%s", msg) 173 self._error = exceptions.MessageDeliveryFailure(msg) 174 self._cleanup() 175 self._wakeup.set() 176 177 def _cleanup(self): 178 self._sender = None 179 if self.timer: 180 self.timer.cancel() 181 self.timer = None 182 183 @property 184 def _can_retry(self): 185 # has the retry count expired? 186 if self._retry is not None: 187 self._retry -= 1 188 if self._retry < 0: 189 return False 190 return True 191 192 193class RPCCallTask(SendTask): 194 """Performs an RPC Call. Sends the request and waits for a response from 195 the destination. 196 """ 197 def __init__(self, target, message, deadline, retry, wait_for_ack): 198 super(RPCCallTask, self).__init__("RPC Call", message, target, 199 deadline, retry, wait_for_ack) 200 self._reply_link = None 201 self._reply_msg = None 202 self._msg_id = None 203 204 def wait(self): 205 error = super(RPCCallTask, self).wait() 206 return error or self._reply_msg 207 208 def _prepare(self, sender): 209 super(RPCCallTask, self)._prepare(sender) 210 # reserve a message id for mapping the received response 211 if self._msg_id: 212 # already set so this is a re-transmit. To be safe cancel the old 213 # msg_id and allocate a fresh one. 214 self._reply_link.cancel_response(self._msg_id) 215 self._reply_link = sender._reply_link 216 rl = self._reply_link 217 self._msg_id = rl.prepare_for_response(self.message, self._on_reply) 218 219 def _on_reply(self, message): 220 # called if/when the reply message arrives 221 self._reply_msg = message 222 self._cleanup() 223 self._wakeup.set() 224 225 def _on_ack(self, state, info): 226 if state != pyngus.SenderLink.ACCEPTED: 227 super(RPCCallTask, self)._on_ack(state, info) 228 # must wait for reply if ACCEPTED 229 230 def _cleanup(self): 231 if self._msg_id: 232 self._reply_link.cancel_response(self._msg_id) 233 self._msg_id = None 234 self._reply_link = None 235 super(RPCCallTask, self)._cleanup() 236 237 238class RPCMonitoredCallTask(RPCCallTask): 239 """An RPC call which expects a periodic heartbeat until the response is 240 received. There are two timeouts: 241 deadline - overall hard timeout, implemented in RPCCallTask 242 call_monitor_timeout - keep alive timeout, reset when heartbeat arrives 243 """ 244 def __init__(self, target, message, deadline, call_monitor_timeout, 245 retry, wait_for_ack): 246 super(RPCMonitoredCallTask, self).__init__(target, message, deadline, 247 retry, wait_for_ack) 248 assert call_monitor_timeout is not None # nosec 249 self._monitor_timeout = call_monitor_timeout 250 self._monitor_timer = None 251 self._set_alarm = None 252 253 def _execute(self, controller): 254 self._set_alarm = controller.processor.defer 255 self._monitor_timer = self._set_alarm(self._call_timeout, 256 self._monitor_timeout) 257 super(RPCMonitoredCallTask, self)._execute(controller) 258 259 def _call_timeout(self): 260 # monitor_timeout expired 261 self._monitor_timer = None 262 self._sender and self._sender.cancel_send(self) 263 msg = ("{name} message sent to {target} failed: call monitor timed" 264 " out".format(name=self.name, target=self.target)) 265 LOG.warning("%s", msg) 266 self._error = exceptions.MessagingTimeout(msg) 267 self._cleanup() 268 self._wakeup.set() 269 270 def _on_reply(self, message): 271 # if reply is null, then this is the call monitor heartbeat 272 if message.body is None: 273 self._monitor_timer.cancel() 274 self._monitor_timer = self._set_alarm(self._call_timeout, 275 self._monitor_timeout) 276 else: 277 super(RPCMonitoredCallTask, self)._on_reply(message) 278 279 def _cleanup(self): 280 self._set_alarm = None 281 if self._monitor_timer: 282 self._monitor_timer.cancel() 283 self._monitor_timer = None 284 super(RPCMonitoredCallTask, self)._cleanup() 285 286 287class MessageDispositionTask(Task): 288 """A task that updates the message disposition as accepted or released 289 for a Server 290 """ 291 def __init__(self, disposition, released=False): 292 super(MessageDispositionTask, self).__init__() 293 self._disposition = disposition 294 self._released = released 295 296 def wait(self): 297 # disposition update does not have to block the sender since there is 298 # no result to pend for. This avoids a thread context switch with 299 # every RPC call 300 pass 301 302 def _execute(self, controller): 303 try: 304 self._disposition(self._released) 305 except Exception as e: 306 # there's really nothing we can do about a failed disposition. 307 LOG.exception("Message acknowledgment failed: %s", e) 308 309 310class Sender(pyngus.SenderEventHandler): 311 """A link for sending to a particular destination on the message bus. 312 """ 313 def __init__(self, destination, scheduler, delay, service): 314 super(Sender, self).__init__() 315 self._destination = destination 316 self._service = service 317 self._address = None 318 self._link = None 319 self._scheduler = scheduler 320 self._delay = delay # for re-connecting/re-transmitting 321 # holds all pending SendTasks 322 self._pending_sends = collections.deque() 323 # holds all messages sent but not yet acked 324 self._unacked = set() 325 self._reply_link = None 326 self._connection = None 327 self._resend_timer = None 328 329 @property 330 def pending_messages(self): 331 return len(self._pending_sends) 332 333 @property 334 def unacked_messages(self): 335 return len(self._unacked) 336 337 def attach(self, connection, reply_link, addresser): 338 """Open the link. Called by the Controller when the AMQP connection 339 becomes active. 340 """ 341 self._connection = connection 342 self._reply_link = reply_link 343 self._address = addresser.resolve(self._destination, self._service) 344 LOG.debug("Sender %s attached", self._address) 345 self._link = self._open_link() 346 347 def detach(self): 348 """Close the link. Called by the controller when shutting down or in 349 response to a close requested by the remote. May be re-attached later 350 (after a reset is done) 351 """ 352 LOG.debug("Sender %s detached", self._address) 353 self._connection = None 354 self._reply_link = None 355 if self._resend_timer: 356 self._resend_timer.cancel() 357 self._resend_timer = None 358 if self._link: 359 self._link.close() 360 361 def reset(self, reason="Link reset"): 362 """Called by the controller on connection failover. Release all link 363 resources, abort any in-flight messages, and check the retry limit on 364 all pending send requests. 365 """ 366 self._address = None 367 self._connection = None 368 self._reply_link = None 369 if self._link: 370 self._link.destroy() 371 self._link = None 372 self._abort_unacked(reason) 373 self._check_retry_limit(reason) 374 375 def destroy(self, reason="Link destroyed"): 376 """Destroy the sender and all pending messages. Called on driver 377 shutdown. 378 """ 379 LOG.debug("Sender %s destroyed", self._address) 380 self.reset(reason) 381 self._abort_pending(reason) 382 383 def send_message(self, send_task): 384 """Send a message out the link. 385 """ 386 if not self._can_send or self._pending_sends: 387 self._pending_sends.append(send_task) 388 else: 389 self._send(send_task) 390 391 def cancel_send(self, send_task): 392 """Attempts to cancel a send request. It is possible that the send has 393 already completed, so this is best-effort. 394 """ 395 # may be in either list, or none 396 self._unacked.discard(send_task) 397 try: 398 self._pending_sends.remove(send_task) 399 except ValueError: 400 pass 401 402 # Pyngus callbacks: 403 404 def sender_active(self, sender_link): 405 LOG.debug("Sender %s active", self._address) 406 self._send_pending() 407 408 def credit_granted(self, sender_link): 409 pass 410 411 def sender_remote_closed(self, sender_link, pn_condition): 412 # The remote has initiated a close. This could happen when the message 413 # bus is shutting down, or it detected an error 414 LOG.warning("Sender %(addr)s failed due to remote initiated close:" 415 " condition=%(cond)s", 416 {'addr': self._address, 'cond': pn_condition}) 417 self._link.close() 418 # sender_closed() will be called once the link completes closing 419 420 def sender_closed(self, sender_link): 421 self._handle_sender_closed() 422 423 def sender_failed(self, sender_link, error): 424 """Protocol error occurred.""" 425 LOG.warning("Sender %(addr)s failed error=%(error)s", 426 {'addr': self._address, 'error': error}) 427 self._handle_sender_closed(str(error)) 428 429 # end Pyngus callbacks 430 431 def _handle_sender_closed(self, reason="Sender closed"): 432 self._abort_unacked(reason) 433 if self._connection: 434 # still attached, so attempt to restart the link 435 self._check_retry_limit(reason) 436 self._scheduler.defer(self._reopen_link, self._delay) 437 438 def _check_retry_limit(self, reason): 439 # Called on recoverable connection or link failure. Remove any pending 440 # sends that have exhausted their retry count: 441 expired = set() 442 for send_task in self._pending_sends: 443 if not send_task._can_retry: 444 expired.add(send_task) 445 send_task._on_error("Message send failed: %s" % reason) 446 while expired: 447 self._pending_sends.remove(expired.pop()) 448 449 def _abort_unacked(self, error): 450 # fail all messages that have been sent to the message bus and have not 451 # been acked yet 452 while self._unacked: 453 send_task = self._unacked.pop() 454 send_task._on_error("Message send failed: %s" % error) 455 456 def _abort_pending(self, error): 457 # fail all messages that have yet to be sent to the message bus 458 while self._pending_sends: 459 send_task = self._pending_sends.popleft() 460 send_task._on_error("Message send failed: %s" % error) 461 462 @property 463 def _can_send(self): 464 return self._link and self._link.active 465 466 # acknowledge status 467 _TIMED_OUT = pyngus.SenderLink.TIMED_OUT 468 _ACCEPTED = pyngus.SenderLink.ACCEPTED 469 _RELEASED = pyngus.SenderLink.RELEASED 470 _MODIFIED = pyngus.SenderLink.MODIFIED 471 472 def _send(self, send_task): 473 send_task._prepare(self) 474 send_task.message.address = self._address 475 if send_task.wait_for_ack: 476 self._unacked.add(send_task) 477 478 def pyngus_callback(link, handle, state, info): 479 # invoked when the message bus (n)acks this message 480 if state == Sender._TIMED_OUT: 481 # ignore pyngus timeout - we maintain our own timer 482 # which will properly deal with this case 483 return 484 self._unacked.discard(send_task) 485 if state == Sender._ACCEPTED: 486 send_task._on_ack(Sender._ACCEPTED, info) 487 elif (state == Sender._RELEASED or 488 (state == Sender._MODIFIED and 489 # assuming delivery-failed means in-doubt: 490 not info.get("delivery-failed") and 491 not info.get("undeliverable-here"))): 492 # These states indicate that the message was never 493 # forwarded beyond the next hop so they can be 494 # re-transmitted without risk of duplication 495 self._resend(send_task) 496 else: 497 # some error - let task figure it out... 498 send_task._on_ack(state, info) 499 500 self._link.send(send_task.message, 501 delivery_callback=pyngus_callback, 502 handle=self, 503 deadline=send_task.deadline) 504 else: # do not wait for ack 505 self._link.send(send_task.message, 506 delivery_callback=None, 507 handle=self, 508 deadline=send_task.deadline) 509 send_task._on_ack(pyngus.SenderLink.ACCEPTED, {}) 510 511 def _resend(self, send_task): 512 # the message bus returned the message without forwarding it. Wait a 513 # bit for other outstanding sends to finish - most likely ending up 514 # here since they are all going to the same destination - then resend 515 # this message 516 if send_task._can_retry: 517 # note well: once there is something on the pending list no further 518 # messages will be sent (they will all queue up behind this one). 519 self._pending_sends.append(send_task) 520 if self._resend_timer is None: 521 sched = self._scheduler 522 # this will get the pending sends going again 523 self._resend_timer = sched.defer(self._resend_pending, 524 self._delay) 525 else: 526 send_task._on_error("Send retries exhausted") 527 528 def _resend_pending(self): 529 # run from the _resend_timer, attempt to resend pending messages 530 self._resend_timer = None 531 self._send_pending() 532 533 def _send_pending(self): 534 # flush all pending messages out 535 if self._can_send: 536 while self._pending_sends: 537 self._send(self._pending_sends.popleft()) 538 539 def _open_link(self): 540 name = "openstack.org/om/sender/[%s]/%s" % (self._address, 541 uuid.uuid4().hex) 542 link = self._connection.create_sender(name=name, 543 source_address=self._address, 544 target_address=self._address, 545 event_handler=self) 546 link.open() 547 return link 548 549 def _reopen_link(self): 550 if self._connection: 551 if self._link: 552 self._link.destroy() 553 self._link = self._open_link() 554 555 556class Replies(pyngus.ReceiverEventHandler): 557 """This is the receiving link for all RPC reply messages. Messages are 558 routed to the proper incoming queue using the correlation-id header in the 559 message. 560 """ 561 def __init__(self, connection, on_ready, on_down, capacity): 562 self._correlation = {} # map of correlation-id to response queue 563 self._on_ready = on_ready 564 self._on_down = on_down 565 rname = ("openstack.org/om/receiver/[rpc-response]/%s" 566 % uuid.uuid4().hex) 567 self._receiver = connection.create_receiver("rpc-response", 568 event_handler=self, 569 name=rname) 570 571 # capacity determines the maximum number of reply messages this link is 572 # willing to receive. As messages are received and capacity is 573 # consumed, this driver will 'top up' the capacity back to max 574 # capacity. This number should be large enough to avoid needlessly 575 # flow-controlling the replies. 576 self._capacity = capacity 577 self._capacity_low = (capacity + 1) / 2 578 self._receiver.open() 579 580 def detach(self): 581 # close the link 582 if self._receiver: 583 self._receiver.close() 584 585 def destroy(self): 586 self._correlation.clear() 587 if self._receiver: 588 self._receiver.destroy() 589 self._receiver = None 590 591 def prepare_for_response(self, request, callback): 592 """Apply a unique message identifier to this request message. This will 593 be used to identify messages received in reply. The identifier is 594 placed in the 'id' field of the request message. It is expected that 595 the identifier will appear in the 'correlation-id' field of the 596 corresponding response message. 597 598 When the caller is done receiving replies, it must call cancel_response 599 """ 600 request.id = uuid.uuid4().hex 601 # reply is placed on reply_queue 602 self._correlation[request.id] = callback 603 request.reply_to = self._receiver.source_address 604 return request.id 605 606 def cancel_response(self, msg_id): 607 """Abort waiting for the response message corresponding to msg_id. 608 This can be used if the request fails and no reply is expected. 609 """ 610 try: 611 del self._correlation[msg_id] 612 except KeyError: 613 pass 614 615 @property 616 def active(self): 617 return self._receiver and self._receiver.active 618 619 # Pyngus ReceiverLink event callbacks: 620 621 def receiver_active(self, receiver_link): 622 """This is a Pyngus callback, invoked by Pyngus when the receiver_link 623 has transitioned to the open state and is able to receive incoming 624 messages. 625 """ 626 LOG.debug("Replies link active src=%s", self._receiver.source_address) 627 receiver_link.add_capacity(self._capacity) 628 self._on_ready() 629 630 def receiver_remote_closed(self, receiver, pn_condition): 631 """This is a Pyngus callback, invoked by Pyngus when the peer of this 632 receiver link has initiated closing the connection. 633 """ 634 if pn_condition: 635 LOG.error("Reply subscription closed by peer: %s", 636 pn_condition) 637 receiver.close() 638 639 def receiver_failed(self, receiver_link, error): 640 """Protocol error occurred.""" 641 LOG.error("Link to reply queue failed. error=%(error)s", 642 {"error": error}) 643 self._on_down() 644 645 def receiver_closed(self, receiver_link): 646 self._on_down() 647 648 def message_received(self, receiver, message, handle): 649 """This is a Pyngus callback, invoked by Pyngus when a new message 650 arrives on this receiver link from the peer. 651 """ 652 key = message.correlation_id 653 try: 654 self._correlation[key](message) 655 receiver.message_accepted(handle) 656 except KeyError: 657 LOG.warning("Can't find receiver for response msg id=%s, " 658 "dropping!", key) 659 receiver.message_modified(handle, True, True, None) 660 # ensure we have enough credit 661 if receiver.capacity <= self._capacity_low: 662 receiver.add_capacity(self._capacity - receiver.capacity) 663 664 665class Server(pyngus.ReceiverEventHandler): 666 """A group of links that receive messages from a set of addresses derived 667 from a given target. Messages arriving on the links are placed on the 668 'incoming' queue. 669 """ 670 def __init__(self, target, incoming, scheduler, delay, capacity): 671 self._target = target 672 self._incoming = incoming 673 self._addresses = [] 674 self._capacity = capacity # credit per each link 675 self._capacity_low = (capacity + 1) / 2 676 self._receivers = [] 677 self._scheduler = scheduler 678 self._delay = delay # for link re-attach 679 self._connection = None 680 self._reopen_scheduled = False 681 682 def attach(self, connection): 683 """Create receiver links over the given connection for all the 684 configured addresses. 685 """ 686 self._connection = connection 687 for a in self._addresses: 688 name = "openstack.org/om/receiver/[%s]/%s" % (a, uuid.uuid4().hex) 689 r = self._open_link(a, name) 690 self._receivers.append(r) 691 692 def detach(self): 693 """Attempt a clean shutdown of the links""" 694 self._connection = None 695 self._addresses = [] 696 for receiver in self._receivers: 697 receiver.close() 698 699 def reset(self): 700 # destroy the links, but keep the addresses around since we may be 701 # failing over. Since links are destroyed, this cannot be called from 702 # any of the following ReceiverLink callbacks. 703 self._connection = None 704 self._addresses = [] 705 self._reopen_scheduled = False 706 for r in self._receivers: 707 r.destroy() 708 self._receivers = [] 709 710 # Pyngus ReceiverLink event callbacks. Note that all of the Server's links 711 # share this handler 712 713 def receiver_remote_closed(self, receiver, pn_condition): 714 """This is a Pyngus callback, invoked by Pyngus when the peer of this 715 receiver link has initiated closing the connection. 716 """ 717 LOG.debug("Server subscription to %s remote detach", 718 receiver.source_address) 719 if pn_condition: 720 vals = { 721 "addr": receiver.source_address or receiver.target_address, 722 "err_msg": pn_condition 723 } 724 LOG.error("Server subscription %(addr)s closed " 725 "by peer: %(err_msg)s", vals) 726 receiver.close() 727 728 def receiver_failed(self, receiver_link, error): 729 """Protocol error occurred.""" 730 LOG.error("Listener link queue failed. error=%(error)s", 731 {"error": error}) 732 self.receiver_closed(receiver_link) 733 734 def receiver_closed(self, receiver_link): 735 LOG.debug("Server subscription to %s closed", 736 receiver_link.source_address) 737 # If still attached, attempt to re-start link 738 if self._connection and not self._reopen_scheduled: 739 LOG.debug("Server subscription reopen scheduled") 740 self._reopen_scheduled = True 741 self._scheduler.defer(self._reopen_links, self._delay) 742 743 def message_received(self, receiver, message, handle): 744 """This is a Pyngus callback, invoked by Pyngus when a new message 745 arrives on this receiver link from the peer. 746 """ 747 def message_disposition(released=False): 748 if receiver in self._receivers and not receiver.closed: 749 if released: 750 receiver.message_released(handle) 751 else: 752 receiver.message_accepted(handle) 753 if receiver.capacity <= self._capacity_low: 754 receiver.add_capacity(self._capacity - receiver.capacity) 755 else: 756 LOG.debug("Can't find receiver for settlement") 757 758 qentry = {"message": message, "disposition": message_disposition} 759 self._incoming.put(qentry) 760 761 def _open_link(self, address, name): 762 props = {"snd-settle-mode": "mixed"} 763 r = self._connection.create_receiver(source_address=address, 764 target_address=address, 765 event_handler=self, 766 name=name, 767 properties=props) 768 r.add_capacity(self._capacity) 769 r.open() 770 return r 771 772 def _reopen_links(self): 773 # attempt to re-establish any closed links 774 LOG.debug("Server subscription reopening") 775 self._reopen_scheduled = False 776 if self._connection: 777 for i in range(len(self._receivers)): 778 link = self._receivers[i] 779 if link.closed: 780 addr = link.target_address 781 name = link.name 782 link.destroy() 783 self._receivers[i] = self._open_link(addr, name) 784 785 786class RPCServer(Server): 787 """Subscribes to RPC addresses""" 788 def __init__(self, target, incoming, scheduler, delay, capacity): 789 super(RPCServer, self).__init__(target, incoming, scheduler, delay, 790 capacity) 791 792 def attach(self, connection, addresser): 793 # Generate the AMQP 1.0 addresses for the base class 794 self._addresses = [ 795 addresser.unicast_address(self._target, SERVICE_RPC), 796 addresser.multicast_address(self._target, SERVICE_RPC), 797 addresser.anycast_address(self._target, SERVICE_RPC) 798 ] 799 # now invoke the base class with the generated addresses 800 super(RPCServer, self).attach(connection) 801 802 803class NotificationServer(Server): 804 """Subscribes to Notification addresses""" 805 def __init__(self, target, incoming, scheduler, delay, capacity): 806 super(NotificationServer, self).__init__(target, incoming, scheduler, 807 delay, capacity) 808 809 def attach(self, connection, addresser): 810 # Generate the AMQP 1.0 addresses for the base class 811 self._addresses = [ 812 addresser.anycast_address(self._target, SERVICE_NOTIFY) 813 ] 814 # now invoke the base class with the generated addresses 815 super(NotificationServer, self).attach(connection) 816 817 818class Hosts(object): 819 """An order list of TransportHost addresses. Connection failover progresses 820 from one host to the next. The default realm comes from the configuration 821 and is only used if no realm is present in the URL. 822 """ 823 def __init__(self, url, default_realm=None): 824 self.virtual_host = url.virtual_host 825 if url.hosts: 826 self._entries = url.hosts[:] 827 else: 828 self._entries = [transport.TransportHost(hostname="localhost", 829 port=5672)] 830 for entry in self._entries: 831 entry.port = entry.port or 5672 832 entry.username = entry.username 833 entry.password = entry.password 834 if default_realm and entry.username and '@' not in entry.username: 835 entry.username = entry.username + '@' + default_realm 836 self._current = random.randint(0, len(self._entries) - 1) # nosec 837 838 @property 839 def current(self): 840 return self._entries[self._current] 841 842 def next(self): 843 if len(self._entries) > 1: 844 self._current = (self._current + 1) % len(self._entries) 845 return self.current 846 847 def __repr__(self): 848 return '<Hosts ' + str(self) + '>' 849 850 def __str__(self): 851 r = ', vhost=%s' % self.virtual_host if self.virtual_host else '' 852 return ", ".join(["%r" % th for th in self._entries]) + r 853 854 855class Controller(pyngus.ConnectionEventHandler): 856 """Controls the connection to the AMQP messaging service. This object is 857 the 'brains' of the driver. It maintains the logic for addressing, sending 858 and receiving messages, and managing the connection. All messaging and I/O 859 work is done on the Eventloop thread, allowing the driver to run 860 asynchronously from the messaging clients. 861 """ 862 def __init__(self, url, default_exchange, config): 863 self.processor = None 864 self._socket_connection = None 865 self._node = platform.node() or "<UNKNOWN>" 866 self._command = os.path.basename(sys.argv[0]) 867 self._pid = os.getpid() 868 # queue of drivertask objects to execute on the eventloop thread 869 self._tasks = queue.Queue(maxsize=500) 870 # limit the number of Task()'s to execute per call to _process_tasks(). 871 # This allows the eventloop main thread to return to servicing socket 872 # I/O in a timely manner 873 self._max_task_batch = 50 874 # cache of all Sender links indexed by address: 875 self._all_senders = {} 876 # active Sender links indexed by address: 877 self._active_senders = set() 878 # closing Sender links indexed by address: 879 self._purged_senders = [] 880 # Servers indexed by target. Each entry is a map indexed by the 881 # specific ProtonListener's identifier: 882 self._servers = {} 883 884 self._container_name = config.oslo_messaging_amqp.container_name 885 self.idle_timeout = config.oslo_messaging_amqp.idle_timeout 886 self.trace_protocol = config.oslo_messaging_amqp.trace 887 self.ssl = config.oslo_messaging_amqp.ssl 888 self.ssl_ca_file = config.oslo_messaging_amqp.ssl_ca_file 889 self.ssl_cert_file = config.oslo_messaging_amqp.ssl_cert_file 890 self.ssl_key_file = config.oslo_messaging_amqp.ssl_key_file 891 self.ssl_key_password = config.oslo_messaging_amqp.ssl_key_password 892 self.ssl_verify_vhost = config.oslo_messaging_amqp.ssl_verify_vhost 893 self.pseudo_vhost = config.oslo_messaging_amqp.pseudo_vhost 894 self.sasl_mechanisms = config.oslo_messaging_amqp.sasl_mechanisms 895 self.sasl_config_dir = config.oslo_messaging_amqp.sasl_config_dir 896 self.sasl_config_name = config.oslo_messaging_amqp.sasl_config_name 897 self.hosts = Hosts(url, config.oslo_messaging_amqp.sasl_default_realm) 898 self.conn_retry_interval = \ 899 config.oslo_messaging_amqp.connection_retry_interval 900 self.conn_retry_backoff = \ 901 config.oslo_messaging_amqp.connection_retry_backoff 902 self.conn_retry_interval_max = \ 903 config.oslo_messaging_amqp.connection_retry_interval_max 904 self.link_retry_delay = config.oslo_messaging_amqp.link_retry_delay 905 906 _opts = config.oslo_messaging_amqp 907 factory_args = {"legacy_server_prefix": _opts.server_request_prefix, 908 "legacy_broadcast_prefix": _opts.broadcast_prefix, 909 "legacy_group_prefix": _opts.group_request_prefix, 910 "rpc_prefix": _opts.rpc_address_prefix, 911 "notify_prefix": _opts.notify_address_prefix, 912 "multicast": _opts.multicast_address, 913 "unicast": _opts.unicast_address, 914 "anycast": _opts.anycast_address, 915 "notify_exchange": _opts.default_notification_exchange, 916 "rpc_exchange": _opts.default_rpc_exchange} 917 918 self.addresser_factory = AddresserFactory(default_exchange, 919 _opts.addressing_mode, 920 **factory_args) 921 self.addresser = None 922 923 # cannot send an RPC request until the replies link is active, as we 924 # need the peer assigned address, so need to delay sending any RPC 925 # requests until this link is active: 926 self.reply_link = None 927 # Set True when the driver is shutting down 928 self._closing = False 929 # only schedule one outstanding reconnect attempt at a time 930 self._reconnecting = False 931 self._delay = self.conn_retry_interval # seconds between retries 932 # prevent queuing up multiple requests to run _process_tasks() 933 self._process_tasks_scheduled = False 934 self._process_tasks_lock = threading.Lock() 935 # credit levels for incoming links 936 self._reply_credit = _opts.reply_link_credit 937 self._rpc_credit = _opts.rpc_server_credit 938 self._notify_credit = _opts.notify_server_credit 939 # sender link maintenance timer and interval 940 self._link_maint_timer = None 941 self._link_maint_timeout = _opts.default_sender_link_timeout 942 943 def connect(self): 944 """Connect to the messaging service.""" 945 self.processor = eventloop.Thread(self._container_name, self._node, 946 self._command, self._pid) 947 self.processor.wakeup(lambda: self._do_connect()) 948 949 def add_task(self, task): 950 """Add a Task for execution on processor thread.""" 951 self._tasks.put(task) 952 self._schedule_task_processing() 953 954 def shutdown(self, timeout=30): 955 """Shutdown the messaging service.""" 956 LOG.info("Shutting down the AMQP 1.0 connection") 957 if self.processor: 958 self.processor.wakeup(self._start_shutdown) 959 LOG.debug("Waiting for eventloop to exit") 960 self.processor.join(timeout) 961 self._hard_reset("Shutting down") 962 for sender in self._all_senders.values(): 963 sender.destroy() 964 self._all_senders.clear() 965 self._servers.clear() 966 self.processor.destroy() 967 self.processor = None 968 LOG.debug("Eventloop exited, driver shut down") 969 970 # The remaining methods are reserved to run from the eventloop thread only! 971 # They must not be invoked directly! 972 973 # methods executed by Tasks created by the driver: 974 975 def send(self, send_task): 976 if send_task.deadline and send_task.deadline <= time.monotonic(): 977 send_task._on_timeout() 978 return 979 key = keyify(send_task.target, send_task.service) 980 sender = self._all_senders.get(key) 981 if not sender: 982 sender = Sender(send_task.target, self.processor, 983 self.link_retry_delay, send_task.service) 984 self._all_senders[key] = sender 985 if self.reply_link and self.reply_link.active: 986 sender.attach(self._socket_connection.pyngus_conn, 987 self.reply_link, self.addresser) 988 self._active_senders.add(key) 989 sender.send_message(send_task) 990 991 def subscribe(self, subscribe_task): 992 """Subscribe to a given target""" 993 if subscribe_task._service == SERVICE_NOTIFY: 994 t = "notification" 995 server = NotificationServer(subscribe_task._target, 996 subscribe_task._in_queue, 997 self.processor, 998 self.link_retry_delay, 999 self._notify_credit) 1000 else: 1001 t = "RPC" 1002 server = RPCServer(subscribe_task._target, 1003 subscribe_task._in_queue, 1004 self.processor, 1005 self.link_retry_delay, 1006 self._rpc_credit) 1007 1008 LOG.debug("Subscribing to %(type)s target %(target)s", 1009 {'type': t, 'target': subscribe_task._target}) 1010 key = keyify(subscribe_task._target, subscribe_task._service) 1011 servers = self._servers.get(key) 1012 if servers is None: 1013 servers = {} 1014 self._servers[key] = servers 1015 servers[subscribe_task._subscriber_id] = server 1016 if self._active: 1017 server.attach(self._socket_connection.pyngus_conn, 1018 self.addresser) 1019 1020 # commands executed on the processor (eventloop) via 'wakeup()': 1021 1022 def _do_connect(self): 1023 """Establish connection and reply subscription on processor thread.""" 1024 host = self.hosts.current 1025 conn_props = {'properties': {'process': self._command, 1026 'pid': self._pid, 1027 'node': self._node}} 1028 # only set hostname in the AMQP 1.0 Open performative if the message 1029 # bus can interpret it as the virtual host. We leave it unspecified 1030 # since apparently noone can agree on how it should be used otherwise! 1031 if self.hosts.virtual_host and not self.pseudo_vhost: 1032 conn_props['hostname'] = self.hosts.virtual_host 1033 if self.idle_timeout: 1034 conn_props["idle-time-out"] = float(self.idle_timeout) 1035 if self.trace_protocol: 1036 conn_props["x-trace-protocol"] = self.trace_protocol 1037 1038 # SSL configuration 1039 ssl_enabled = False 1040 if self.ssl: 1041 ssl_enabled = True 1042 conn_props["x-ssl"] = self.ssl 1043 if self.ssl_ca_file: 1044 conn_props["x-ssl-ca-file"] = self.ssl_ca_file 1045 ssl_enabled = True 1046 if self.ssl_cert_file: 1047 ssl_enabled = True 1048 conn_props["x-ssl-identity"] = (self.ssl_cert_file, 1049 self.ssl_key_file, 1050 self.ssl_key_password) 1051 if ssl_enabled: 1052 # Set the identity of the remote server for SSL to use when 1053 # verifying the received certificate. Typically this is the DNS 1054 # name used to set up the TCP connections. However some servers 1055 # may provide a certificate for the virtual host instead. If that 1056 # is the case we need to use the virtual hostname instead. 1057 # Refer to SSL Server Name Indication (SNI) for the entire story: 1058 # https://tools.ietf.org/html/rfc6066 1059 if self.ssl_verify_vhost: 1060 if self.hosts.virtual_host: 1061 conn_props['x-ssl-peer-name'] = self.hosts.virtual_host 1062 else: 1063 conn_props['x-ssl-peer-name'] = host.hostname 1064 1065 # SASL configuration: 1066 if self.sasl_mechanisms: 1067 conn_props["x-sasl-mechs"] = self.sasl_mechanisms 1068 if self.sasl_config_dir: 1069 conn_props["x-sasl-config-dir"] = self.sasl_config_dir 1070 if self.sasl_config_name: 1071 conn_props["x-sasl-config-name"] = self.sasl_config_name 1072 1073 self._socket_connection = self.processor.connect(host, 1074 handler=self, 1075 properties=conn_props) 1076 LOG.debug("Connection initiated") 1077 1078 def _process_tasks(self): 1079 """Execute Task objects in the context of the processor thread.""" 1080 with self._process_tasks_lock: 1081 self._process_tasks_scheduled = False 1082 count = 0 1083 while (not self._tasks.empty() and 1084 count < self._max_task_batch): 1085 try: 1086 self._tasks.get(False)._execute(self) 1087 except Exception as e: 1088 LOG.exception("Error processing task: %s", e) 1089 count += 1 1090 1091 # if we hit _max_task_batch, resume task processing later: 1092 if not self._tasks.empty(): 1093 self._schedule_task_processing() 1094 1095 def _schedule_task_processing(self): 1096 """_process_tasks() helper: prevent queuing up multiple requests for 1097 task processing. This method is called both by the application thread 1098 and the processing thread. 1099 """ 1100 if self.processor: 1101 with self._process_tasks_lock: 1102 already_scheduled = self._process_tasks_scheduled 1103 self._process_tasks_scheduled = True 1104 if not already_scheduled: 1105 self.processor.wakeup(lambda: self._process_tasks()) 1106 1107 def _start_shutdown(self): 1108 """Called when the application is closing the transport. 1109 Attempt to cleanly flush/close all links. 1110 """ 1111 self._closing = True 1112 if self._active: 1113 # try a clean shutdown 1114 self._detach_senders() 1115 self._detach_servers() 1116 self.reply_link.detach() 1117 self._socket_connection.pyngus_conn.close() 1118 else: 1119 # don't wait for a close from the remote, may never happen 1120 self.processor.shutdown() 1121 1122 # reply link callbacks: 1123 1124 def _reply_link_ready(self): 1125 """Invoked when the Replies reply link has become active. At this 1126 point, we are ready to receive messages, so start all pending RPC 1127 requests. 1128 """ 1129 LOG.info("Messaging is active (%(hostname)s:%(port)s%(vhost)s)", 1130 {'hostname': self.hosts.current.hostname, 1131 'port': self.hosts.current.port, 1132 'vhost': ("/" + self.hosts.virtual_host 1133 if self.hosts.virtual_host else "")}) 1134 1135 for sender in self._all_senders.values(): 1136 sender.attach(self._socket_connection.pyngus_conn, 1137 self.reply_link, self.addresser) 1138 1139 def _reply_link_down(self): 1140 # Treat it as a recoverable failure because the RPC reply address is 1141 # now invalid for all in-flight RPC requests. 1142 if not self._closing: 1143 self._detach_senders() 1144 self._detach_servers() 1145 self._socket_connection.pyngus_conn.close() 1146 # once closed, _handle_connection_loss() will initiate reconnect 1147 1148 # callback from eventloop on socket error 1149 1150 def socket_error(self, error): 1151 """Called by eventloop when a socket error occurs.""" 1152 LOG.error("Socket failure: %s", error) 1153 self._handle_connection_loss(str(error)) 1154 1155 # Pyngus connection event callbacks (and their helpers), all invoked from 1156 # the eventloop thread: 1157 1158 def connection_failed(self, connection, error): 1159 """This is a Pyngus callback, invoked by Pyngus when a non-recoverable 1160 error occurs on the connection. 1161 """ 1162 if connection is not self._socket_connection.pyngus_conn: 1163 # pyngus bug: ignore failure callback on destroyed connections 1164 return 1165 LOG.debug("AMQP Connection failure: %s", error) 1166 self._handle_connection_loss(str(error)) 1167 1168 def connection_active(self, connection): 1169 """This is a Pyngus callback, invoked by Pyngus when the connection to 1170 the peer is up. At this point, the driver will activate all subscriber 1171 links (server) and the reply link. 1172 """ 1173 LOG.debug("Connection active (%(hostname)s:%(port)s), subscribing...", 1174 {'hostname': self.hosts.current.hostname, 1175 'port': self.hosts.current.port}) 1176 # allocate an addresser based on the advertised properties of the 1177 # message bus 1178 props = connection.remote_properties or {} 1179 self.addresser = self.addresser_factory(props, 1180 self.hosts.virtual_host 1181 if self.pseudo_vhost else None) 1182 for servers in self._servers.values(): 1183 for server in servers.values(): 1184 server.attach(self._socket_connection.pyngus_conn, 1185 self.addresser) 1186 self.reply_link = Replies(self._socket_connection.pyngus_conn, 1187 self._reply_link_ready, 1188 self._reply_link_down, 1189 self._reply_credit) 1190 self._delay = self.conn_retry_interval # reset 1191 # schedule periodic maintenance of sender links 1192 self._link_maint_timer = self.processor.defer(self._purge_sender_links, 1193 self._link_maint_timeout) 1194 1195 def connection_closed(self, connection): 1196 """This is a Pyngus callback, invoked by Pyngus when the connection has 1197 cleanly closed. This occurs after the driver closes the connection 1198 locally, and the peer has acknowledged the close. At this point, the 1199 shutdown of the driver's connection is complete. 1200 """ 1201 LOG.debug("AMQP connection closed.") 1202 # if the driver isn't being shutdown, failover and reconnect 1203 self._handle_connection_loss("AMQP connection closed.") 1204 1205 def connection_remote_closed(self, connection, reason): 1206 """This is a Pyngus callback, invoked by Pyngus when the peer has 1207 requested that the connection be closed. 1208 """ 1209 # The messaging service/broker is trying to shut down the 1210 # connection. Acknowledge the close, and try to reconnect/failover 1211 # later once the connection has closed (connection_closed is called). 1212 if reason: 1213 LOG.info("Connection closed by peer: %s", reason) 1214 self._detach_senders() 1215 self._detach_servers() 1216 self.reply_link.detach() 1217 self._socket_connection.pyngus_conn.close() 1218 1219 def sasl_done(self, connection, pn_sasl, outcome): 1220 """This is a Pyngus callback invoked when the SASL handshake 1221 has completed. The outcome of the handshake is passed in the outcome 1222 argument. 1223 """ 1224 if outcome == proton.SASL.OK: 1225 return 1226 LOG.error("AUTHENTICATION FAILURE: Cannot connect to " 1227 "%(hostname)s:%(port)s as user %(username)s", 1228 {'hostname': self.hosts.current.hostname, 1229 'port': self.hosts.current.port, 1230 'username': self.hosts.current.username}) 1231 # pyngus will invoke connection_failed() eventually 1232 1233 def _handle_connection_loss(self, reason): 1234 """The connection to the messaging service has been lost. Try to 1235 reestablish the connection/failover if not shutting down the driver. 1236 """ 1237 self.addresser = None 1238 self._socket_connection.close() 1239 if self._closing: 1240 # we're in the middle of shutting down the driver anyways, 1241 # just consider it done: 1242 self.processor.shutdown() 1243 else: 1244 # for some reason, we've lost the connection to the messaging 1245 # service. Try to re-establish the connection: 1246 if not self._reconnecting: 1247 self._reconnecting = True 1248 LOG.info("Delaying reconnect attempt for %d seconds", 1249 self._delay) 1250 self.processor.defer(lambda: self._do_reconnect(reason), 1251 self._delay) 1252 self._delay = min(self._delay * self.conn_retry_backoff, 1253 self.conn_retry_interval_max) 1254 if self._link_maint_timer: 1255 self._link_maint_timer.cancel() 1256 self._link_maint_timer = None 1257 1258 def _do_reconnect(self, reason): 1259 """Invoked on connection/socket failure, failover and re-connect to the 1260 messaging service. 1261 """ 1262 self._reconnecting = False 1263 if not self._closing: 1264 self._hard_reset(reason) 1265 host = self.hosts.next() 1266 LOG.info("Reconnecting to: %(hostname)s:%(port)s", 1267 {'hostname': host.hostname, 'port': host.port}) 1268 self.processor.wakeup(lambda: self._do_connect()) 1269 1270 def _hard_reset(self, reason): 1271 """Reset the controller to its pre-connection state""" 1272 # note well: since this method destroys the connection, it cannot be 1273 # invoked directly from a pyngus callback. Use processor.defer() to 1274 # run this method on the main loop instead. 1275 for sender in self._purged_senders: 1276 sender.destroy(reason) 1277 del self._purged_senders[:] 1278 self._active_senders.clear() 1279 unused = [] 1280 for key, sender in self._all_senders.items(): 1281 # clean up any sender links that no longer have messages to send 1282 if sender.pending_messages == 0: 1283 unused.append(key) 1284 else: 1285 sender.reset(reason) 1286 self._active_senders.add(key) 1287 for key in unused: 1288 self._all_senders[key].destroy(reason) 1289 del self._all_senders[key] 1290 for servers in self._servers.values(): 1291 for server in servers.values(): 1292 server.reset() 1293 if self.reply_link: 1294 self.reply_link.destroy() 1295 self.reply_link = None 1296 if self._socket_connection: 1297 self._socket_connection.reset() 1298 1299 def _detach_senders(self): 1300 """Close all sender links""" 1301 for sender in self._all_senders.values(): 1302 sender.detach() 1303 1304 def _detach_servers(self): 1305 """Close all listener links""" 1306 for servers in self._servers.values(): 1307 for server in servers.values(): 1308 server.detach() 1309 1310 def _purge_sender_links(self): 1311 """Purge inactive sender links""" 1312 if not self._closing: 1313 # destroy links that have already been closed 1314 for sender in self._purged_senders: 1315 sender.destroy("Idle link purged") 1316 del self._purged_senders[:] 1317 1318 # determine next set to purge 1319 purge = set(self._all_senders.keys()) - self._active_senders 1320 for key in purge: 1321 sender = self._all_senders[key] 1322 if not sender.pending_messages and not sender.unacked_messages: 1323 sender.detach() 1324 self._purged_senders.append(self._all_senders.pop(key)) 1325 self._active_senders.clear() 1326 self._link_maint_timer = \ 1327 self.processor.defer(self._purge_sender_links, 1328 self._link_maint_timeout) 1329 1330 @property 1331 def _active(self): 1332 # Is the connection up 1333 return (self._socket_connection and 1334 self._socket_connection.pyngus_conn.active) 1335