1"""
2Manage RabbitMQ Upstreams
3=========================
4
5Example:
6
7.. code-block:: yaml
8
9    rabbit_upstream:
10      rabbitmq_upstream.present:
11      - name: upstream_1
12      - uri: amqp://my_user:my_password@rabbitmq_host
13      - trust_user_id: True
14      - ack_mode: on-confirm
15      - max_hops: 1
16
17.. versionadded:: 3000
18"""
19
20
21import json
22import logging
23
24import salt.utils.data
25import salt.utils.dictdiffer
26from salt.exceptions import CommandExecutionError
27
28log = logging.getLogger(__name__)
29
30
31def __virtual__():
32    """
33    Only load if the appropriate rabbitmq module functions are loaded.
34    """
35    requirements = [
36        "rabbitmq.list_upstreams",
37        "rabbitmq.upstream_exists",
38        "rabbitmq.set_upstream",
39        "rabbitmq.delete_upstream",
40    ]
41    if all(req in __salt__ for req in requirements):
42        return True
43    return (False, "rabbitmq module could not be loaded")
44
45
46def present(
47    name,
48    uri,
49    prefetch_count=None,
50    reconnect_delay=None,
51    ack_mode=None,
52    trust_user_id=None,
53    exchange=None,
54    max_hops=None,
55    expires=None,
56    message_ttl=None,
57    ha_policy=None,
58    queue=None,
59    runas=None,
60):
61    """
62    Ensure the RabbitMQ upstream exists.
63
64    :param str name: The name of the upstream connection
65    :param str uri: The URI to connect to. If upstream is a cluster and can have
66        several URIs, you can enter them here separated by spaces.
67        Examples:
68        - amqp://user:password@server_name
69        - amqp://user:password@server_name/vhost
70        When connecting with SSL, several URI-parameters need also be specified:
71        - cacertfile = /path/to/cacert.pem
72        - certfile = /path/to/cert.pem
73        - keyfile = /part/to/key.pem
74        - verity = verify_peer
75        - fail_if_no_peer_cert = true | false
76        - auth_mechanism = external
77        Example:
78        - amqp://user:password@server_name?cacertfile=/path/to/cacert.pem&\
79            certfile=/path/to/cert.pem&keyfile=/path/to/key.pem&verify=verify_peer
80        - amqp://server-name?cacertfile=/path/to/cacert.pem&certfile=/path/to/cert.pem&\
81            keyfile=/path/to/key.pem&verify=verify_peer&fail_if_no_peer_cert=true&auth_mechanism=external
82    :param int prefetch_count: Maximum number of unacknowledged messages that may
83        be in flight over a federation link at one time. Default: 1000
84    :param int reconnect_delay: Time in seconds to wait after a network link
85        goes down before attempting reconnection. Default: 5
86    :param str ack_mode: The following values are allowed:
87        on-confirm: Messages are acknowledged to the upstream broker after they
88        have been confirmed downstream. Handles network errors and broker failures
89        without losing messages. The slowest option, and the default.
90        on-publish: Messages are acknowledged to the upstream broker after they
91        have been published downstream. Handles network errors without losing
92        messages, but may lose messages in the event of broker failures.
93        no-ack: Message acknowledgements are not used. The fastest option, but
94        you may lose messages in the event of network or broker failures.
95    :param bool trust_user_id: Set ``True`` to preserve the "user-id" field across
96        a federation link, even if the user-id does not match that used to republish
97        the message. Set to ``False`` to clear the "user-id" field when messages
98        are federated. Only set this to ``True`` if you trust the upstream broker
99        not to forge user-ids.
100    :param str exchange: The name of the upstream exchange. Default is to use the
101        same name as the federated exchange.
102    :param int max_hops: Maximum number of federation links that messages can
103        traverse before being dropped. Defaults to 1 if not set.
104    :param int expires: Time in milliseconds that the upstream should remember
105        about this node for. After this time all upstream state will be removed.
106        Set to ``None`` (Default) to mean "forever".
107    :param int message_ttl: Time in milliseconds that undelivered messages should
108        be held upstream when there is a network outage or backlog.
109        Set to ``None`` (default) to mean "forever".
110    :param str ha_policy: Determines the "x-ha-policy"-argument for the upstream
111        queue for a federated exchange. Default is "none" meaning the queue is
112        not HA.
113    :param str queue: The name of the upstream queue. Default is to use the same
114        name as the federated queue.
115
116    .. versionadded:: 3000
117
118    """
119    ret = {"name": name, "result": False, "comment": "", "changes": {}}
120    action = None
121
122    try:
123        current_upstreams = __salt__["rabbitmq.list_upstreams"](runas=runas)
124    except CommandExecutionError as err:
125        ret["comment"] = "Error: {}".format(err)
126        return ret
127    new_config = salt.utils.data.filter_falsey(
128        {
129            "uri": uri,
130            "prefetch-count": prefetch_count,
131            "reconnect-delay": reconnect_delay,
132            "ack-mode": ack_mode,
133            "trust-user-id": trust_user_id,
134            "exchange": exchange,
135            "max-hops": max_hops,
136            "expires": expires,
137            "message-ttl": message_ttl,
138            "ha-policy": ha_policy,
139            "queue": queue,
140        }
141    )
142
143    if name in current_upstreams:
144        current_config = json.loads(current_upstreams.get(name, ""))
145        diff_config = salt.utils.dictdiffer.deep_diff(current_config, new_config)
146        if diff_config:
147            action = "update"
148        else:
149            ret["result"] = True
150            ret["comment"] = 'Upstream "{}" already present as specified.'.format(name)
151    else:
152        action = "create"
153        diff_config = {"old": None, "new": new_config}
154
155    if action:
156        if __opts__["test"]:
157            ret["result"] = None
158            ret["comment"] = 'Upstream "{}" would have been {}d.'.format(name, action)
159        else:
160            try:
161                res = __salt__["rabbitmq.set_upstream"](
162                    name,
163                    uri,
164                    prefetch_count=prefetch_count,
165                    reconnect_delay=reconnect_delay,
166                    ack_mode=ack_mode,
167                    trust_user_id=trust_user_id,
168                    exchange=exchange,
169                    max_hops=max_hops,
170                    expires=expires,
171                    message_ttl=message_ttl,
172                    ha_policy=ha_policy,
173                    queue=queue,
174                    runas=runas,
175                )
176                ret["result"] = res
177                ret["comment"] = 'Upstream "{}" {}d.'.format(name, action)
178                ret["changes"] = diff_config
179            except CommandExecutionError as exp:
180                ret["comment"] = "Error trying to {} upstream: {}".format(action, exp)
181    return ret
182
183
184def absent(name, runas=None):
185    """
186    Ensure the named upstream is absent.
187
188    :param str name: The name of the upstream to remove
189    :param str runas: User to run the command
190
191    .. versionadded:: 3000
192    """
193    ret = {"name": name, "result": False, "comment": "", "changes": {}}
194
195    try:
196        upstream_exists = __salt__["rabbitmq.upstream_exists"](name, runas=runas)
197    except CommandExecutionError as err:
198        ret["comment"] = "Error: {}".format(err)
199        return ret
200
201    if upstream_exists:
202        if __opts__["test"]:
203            ret["result"] = None
204            ret["comment"] = 'Upstream "{}" would have been deleted.'.format(name)
205        else:
206            try:
207                res = __salt__["rabbitmq.delete_upstream"](name, runas=runas)
208                if res:
209                    ret["result"] = True
210                    ret["comment"] = 'Upstream "{}" has been deleted.'.format(name)
211                    ret["changes"] = {"old": name, "new": None}
212            except CommandExecutionError as err:
213                ret["comment"] = "Error: {}".format(err)
214    else:
215        ret["result"] = True
216        ret["comment"] = 'The upstream "{}" is already absent.'.format(name)
217    return ret
218