1""" 2Manage RabbitMQ Upstreams 3========================= 4 5Example: 6 7.. code-block:: yaml 8 9 rabbit_upstream: 10 rabbitmq_upstream.present: 11 - name: upstream_1 12 - uri: amqp://my_user:my_password@rabbitmq_host 13 - trust_user_id: True 14 - ack_mode: on-confirm 15 - max_hops: 1 16 17.. versionadded:: 3000 18""" 19 20 21import json 22import logging 23 24import salt.utils.data 25import salt.utils.dictdiffer 26from salt.exceptions import CommandExecutionError 27 28log = logging.getLogger(__name__) 29 30 31def __virtual__(): 32 """ 33 Only load if the appropriate rabbitmq module functions are loaded. 34 """ 35 requirements = [ 36 "rabbitmq.list_upstreams", 37 "rabbitmq.upstream_exists", 38 "rabbitmq.set_upstream", 39 "rabbitmq.delete_upstream", 40 ] 41 if all(req in __salt__ for req in requirements): 42 return True 43 return (False, "rabbitmq module could not be loaded") 44 45 46def present( 47 name, 48 uri, 49 prefetch_count=None, 50 reconnect_delay=None, 51 ack_mode=None, 52 trust_user_id=None, 53 exchange=None, 54 max_hops=None, 55 expires=None, 56 message_ttl=None, 57 ha_policy=None, 58 queue=None, 59 runas=None, 60): 61 """ 62 Ensure the RabbitMQ upstream exists. 63 64 :param str name: The name of the upstream connection 65 :param str uri: The URI to connect to. If upstream is a cluster and can have 66 several URIs, you can enter them here separated by spaces. 67 Examples: 68 - amqp://user:password@server_name 69 - amqp://user:password@server_name/vhost 70 When connecting with SSL, several URI-parameters need also be specified: 71 - cacertfile = /path/to/cacert.pem 72 - certfile = /path/to/cert.pem 73 - keyfile = /part/to/key.pem 74 - verity = verify_peer 75 - fail_if_no_peer_cert = true | false 76 - auth_mechanism = external 77 Example: 78 - amqp://user:password@server_name?cacertfile=/path/to/cacert.pem&\ 79 certfile=/path/to/cert.pem&keyfile=/path/to/key.pem&verify=verify_peer 80 - amqp://server-name?cacertfile=/path/to/cacert.pem&certfile=/path/to/cert.pem&\ 81 keyfile=/path/to/key.pem&verify=verify_peer&fail_if_no_peer_cert=true&auth_mechanism=external 82 :param int prefetch_count: Maximum number of unacknowledged messages that may 83 be in flight over a federation link at one time. Default: 1000 84 :param int reconnect_delay: Time in seconds to wait after a network link 85 goes down before attempting reconnection. Default: 5 86 :param str ack_mode: The following values are allowed: 87 on-confirm: Messages are acknowledged to the upstream broker after they 88 have been confirmed downstream. Handles network errors and broker failures 89 without losing messages. The slowest option, and the default. 90 on-publish: Messages are acknowledged to the upstream broker after they 91 have been published downstream. Handles network errors without losing 92 messages, but may lose messages in the event of broker failures. 93 no-ack: Message acknowledgements are not used. The fastest option, but 94 you may lose messages in the event of network or broker failures. 95 :param bool trust_user_id: Set ``True`` to preserve the "user-id" field across 96 a federation link, even if the user-id does not match that used to republish 97 the message. Set to ``False`` to clear the "user-id" field when messages 98 are federated. Only set this to ``True`` if you trust the upstream broker 99 not to forge user-ids. 100 :param str exchange: The name of the upstream exchange. Default is to use the 101 same name as the federated exchange. 102 :param int max_hops: Maximum number of federation links that messages can 103 traverse before being dropped. Defaults to 1 if not set. 104 :param int expires: Time in milliseconds that the upstream should remember 105 about this node for. After this time all upstream state will be removed. 106 Set to ``None`` (Default) to mean "forever". 107 :param int message_ttl: Time in milliseconds that undelivered messages should 108 be held upstream when there is a network outage or backlog. 109 Set to ``None`` (default) to mean "forever". 110 :param str ha_policy: Determines the "x-ha-policy"-argument for the upstream 111 queue for a federated exchange. Default is "none" meaning the queue is 112 not HA. 113 :param str queue: The name of the upstream queue. Default is to use the same 114 name as the federated queue. 115 116 .. versionadded:: 3000 117 118 """ 119 ret = {"name": name, "result": False, "comment": "", "changes": {}} 120 action = None 121 122 try: 123 current_upstreams = __salt__["rabbitmq.list_upstreams"](runas=runas) 124 except CommandExecutionError as err: 125 ret["comment"] = "Error: {}".format(err) 126 return ret 127 new_config = salt.utils.data.filter_falsey( 128 { 129 "uri": uri, 130 "prefetch-count": prefetch_count, 131 "reconnect-delay": reconnect_delay, 132 "ack-mode": ack_mode, 133 "trust-user-id": trust_user_id, 134 "exchange": exchange, 135 "max-hops": max_hops, 136 "expires": expires, 137 "message-ttl": message_ttl, 138 "ha-policy": ha_policy, 139 "queue": queue, 140 } 141 ) 142 143 if name in current_upstreams: 144 current_config = json.loads(current_upstreams.get(name, "")) 145 diff_config = salt.utils.dictdiffer.deep_diff(current_config, new_config) 146 if diff_config: 147 action = "update" 148 else: 149 ret["result"] = True 150 ret["comment"] = 'Upstream "{}" already present as specified.'.format(name) 151 else: 152 action = "create" 153 diff_config = {"old": None, "new": new_config} 154 155 if action: 156 if __opts__["test"]: 157 ret["result"] = None 158 ret["comment"] = 'Upstream "{}" would have been {}d.'.format(name, action) 159 else: 160 try: 161 res = __salt__["rabbitmq.set_upstream"]( 162 name, 163 uri, 164 prefetch_count=prefetch_count, 165 reconnect_delay=reconnect_delay, 166 ack_mode=ack_mode, 167 trust_user_id=trust_user_id, 168 exchange=exchange, 169 max_hops=max_hops, 170 expires=expires, 171 message_ttl=message_ttl, 172 ha_policy=ha_policy, 173 queue=queue, 174 runas=runas, 175 ) 176 ret["result"] = res 177 ret["comment"] = 'Upstream "{}" {}d.'.format(name, action) 178 ret["changes"] = diff_config 179 except CommandExecutionError as exp: 180 ret["comment"] = "Error trying to {} upstream: {}".format(action, exp) 181 return ret 182 183 184def absent(name, runas=None): 185 """ 186 Ensure the named upstream is absent. 187 188 :param str name: The name of the upstream to remove 189 :param str runas: User to run the command 190 191 .. versionadded:: 3000 192 """ 193 ret = {"name": name, "result": False, "comment": "", "changes": {}} 194 195 try: 196 upstream_exists = __salt__["rabbitmq.upstream_exists"](name, runas=runas) 197 except CommandExecutionError as err: 198 ret["comment"] = "Error: {}".format(err) 199 return ret 200 201 if upstream_exists: 202 if __opts__["test"]: 203 ret["result"] = None 204 ret["comment"] = 'Upstream "{}" would have been deleted.'.format(name) 205 else: 206 try: 207 res = __salt__["rabbitmq.delete_upstream"](name, runas=runas) 208 if res: 209 ret["result"] = True 210 ret["comment"] = 'Upstream "{}" has been deleted.'.format(name) 211 ret["changes"] = {"old": name, "new": None} 212 except CommandExecutionError as err: 213 ret["comment"] = "Error: {}".format(err) 214 else: 215 ret["result"] = True 216 ret["comment"] = 'The upstream "{}" is already absent.'.format(name) 217 return ret 218