1package rabbithole
2
3import (
4	"encoding/json"
5	"net/http"
6	"net/url"
7)
8
9//
10// GET /api/exchanges
11//
12
13// IngressEgressStats represents common message flow metrics.
14type IngressEgressStats struct {
15	PublishIn        int         `json:"publish_in"`
16	PublishInDetails RateDetails `json:"publish_in_details"`
17
18	PublishOut        int         `json:"publish_out"`
19	PublishOutDetails RateDetails `json:"publish_out_details"`
20}
21
22// ExchangeInfo represents and exchange and its properties.
23type ExchangeInfo struct {
24	Name       string                 `json:"name"`
25	Vhost      string                 `json:"vhost"`
26	Type       string                 `json:"type"`
27	Durable    bool                   `json:"durable"`
28	AutoDelete bool                   `json:"auto_delete"`
29	Internal   bool                   `json:"internal"`
30	Arguments  map[string]interface{} `json:"arguments"`
31
32	MessageStats IngressEgressStats `json:"message_stats"`
33}
34
35// ExchangeSettings is a set of exchange properties. Use this type when declaring
36// an exchange.
37type ExchangeSettings struct {
38	Type       string                 `json:"type"`
39	Durable    bool                   `json:"durable"`
40	AutoDelete bool                   `json:"auto_delete,omitempty"`
41	Arguments  map[string]interface{} `json:"arguments,omitempty"`
42}
43
44// ListExchanges lists all exchanges in a cluster. This only includes exchanges in the
45// virtual hosts accessible to the user.
46func (c *Client) ListExchanges() (rec []ExchangeInfo, err error) {
47	req, err := newGETRequest(c, "exchanges")
48	if err != nil {
49		return []ExchangeInfo{}, err
50	}
51
52	if err = executeAndParseRequest(c, req, &rec); err != nil {
53		return []ExchangeInfo{}, err
54	}
55
56	return rec, nil
57}
58
59//
60// GET /api/exchanges/{vhost}
61//
62
63// ListExchangesIn lists all exchanges in a virtual host.
64func (c *Client) ListExchangesIn(vhost string) (rec []ExchangeInfo, err error) {
65	req, err := newGETRequest(c, "exchanges/"+url.PathEscape(vhost))
66	if err != nil {
67		return []ExchangeInfo{}, err
68	}
69
70	if err = executeAndParseRequest(c, req, &rec); err != nil {
71		return []ExchangeInfo{}, err
72	}
73
74	return rec, nil
75}
76
77//
78// GET /api/exchanges/{vhost}/{name}
79//
80
81// Example response:
82//
83// {
84//   "incoming": [
85//     {
86//       "stats": {
87//         "publish": 2760,
88//         "publish_details": {
89//           "rate": 20
90//         }
91//       },
92//       "channel_details": {
93//         "name": "127.0.0.1:46928 -> 127.0.0.1:5672 (2)",
94//         "number": 2,
95//         "connection_name": "127.0.0.1:46928 -> 127.0.0.1:5672",
96//         "peer_port": 46928,
97//         "peer_host": "127.0.0.1"
98//       }
99//     }
100//   ],
101//   "outgoing": [
102//     {
103//       "stats": {
104//         "publish": 1280,
105//         "publish_details": {
106//           "rate": 20
107//         }
108//       },
109//       "queue": {
110//         "name": "amq.gen-7NhO_yRr4lDdp-8hdnvfuw",
111//         "vhost": "rabbit\/hole"
112//       }
113//     }
114//   ],
115//   "message_stats": {
116//     "publish_in": 2760,
117//     "publish_in_details": {
118//       "rate": 20
119//     },
120//     "publish_out": 1280,
121//     "publish_out_details": {
122//       "rate": 20
123//     }
124//   },
125//   "name": "amq.fanout",
126//   "vhost": "rabbit\/hole",
127//   "type": "fanout",
128//   "durable": true,
129//   "auto_delete": false,
130//   "internal": false,
131//   "arguments": {
132//   }
133// }
134
135// ExchangeIngressDetails represents ingress (inbound) message flow metrics of an exchange.
136type ExchangeIngressDetails struct {
137	Stats          MessageStats      `json:"stats"`
138	ChannelDetails PublishingChannel `json:"channel_details"`
139}
140
141// PublishingChannel represents a channel and its basic properties.
142type PublishingChannel struct {
143	Number         int    `json:"number"`
144	Name           string `json:"name"`
145	ConnectionName string `json:"connection_name"`
146	PeerPort       Port   `json:"peer_port"`
147	PeerHost       string `json:"peer_host"`
148}
149
150// NameAndVhost repesents a named entity in a virtual host.
151type NameAndVhost struct {
152	Name  string `json:"name"`
153	Vhost string `json:"vhost"`
154}
155
156// ExchangeEgressDetails represents egress (outbound) message flow metrics of an exchange.
157type ExchangeEgressDetails struct {
158	Stats MessageStats `json:"stats"`
159	Queue NameAndVhost `json:"queue"`
160}
161
162// DetailedExchangeInfo represents an exchange with all of its properties and metrics.
163type DetailedExchangeInfo struct {
164	Name       string                 `json:"name"`
165	Vhost      string                 `json:"vhost"`
166	Type       string                 `json:"type"`
167	Durable    bool                   `json:"durable"`
168	AutoDelete bool                   `json:"auto_delete"`
169	Internal   bool                   `json:"internal"`
170	Arguments  map[string]interface{} `json:"arguments"`
171
172	Incoming     []ExchangeIngressDetails `json:"incoming"`
173	Outgoing     []ExchangeEgressDetails  `json:"outgoing"`
174	PublishStats IngressEgressStats       `json:"message_stats"`
175}
176
177// GetExchange returns information about an exchange.
178func (c *Client) GetExchange(vhost, exchange string) (rec *DetailedExchangeInfo, err error) {
179	req, err := newGETRequest(c, "exchanges/"+url.PathEscape(vhost)+"/"+url.PathEscape(exchange))
180	if err != nil {
181		return nil, err
182	}
183
184	if err = executeAndParseRequest(c, req, &rec); err != nil {
185		return nil, err
186	}
187
188	return rec, nil
189}
190
191//
192// PUT /api/exchanges/{vhost}/{exchange}
193//
194
195// DeclareExchange declares an exchange.
196func (c *Client) DeclareExchange(vhost, exchange string, info ExchangeSettings) (res *http.Response, err error) {
197	if info.Arguments == nil {
198		info.Arguments = make(map[string]interface{})
199	}
200	body, err := json.Marshal(info)
201	if err != nil {
202		return nil, err
203	}
204
205	req, err := newRequestWithBody(c, "PUT", "exchanges/"+url.PathEscape(vhost)+"/"+url.PathEscape(exchange), body)
206	if err != nil {
207		return nil, err
208	}
209
210	if res, err = executeRequest(c, req); err != nil {
211		return nil, err
212	}
213
214	return res, nil
215}
216
217//
218// DELETE /api/exchanges/{vhost}/{name}
219//
220
221// DeleteExchange deletes an exchange.
222func (c *Client) DeleteExchange(vhost, exchange string) (res *http.Response, err error) {
223	req, err := newRequestWithBody(c, "DELETE", "exchanges/"+url.PathEscape(vhost)+"/"+url.PathEscape(exchange), nil)
224	if err != nil {
225		return nil, err
226	}
227
228	if res, err = executeRequest(c, req); err != nil {
229		return nil, err
230	}
231
232	return res, nil
233}
234