1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2019 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "plugin_common.h"
26 #include "amqp_common.h"
27
28 /* Global variables */
29 struct p_amqp_host amqpp_amqp_host;
30 struct p_amqp_host bgp_daemon_msglog_amqp_host;
31 struct p_amqp_host bgp_table_dump_amqp_host;
32 struct p_amqp_host bmp_daemon_msglog_amqp_host;
33 struct p_amqp_host bmp_dump_amqp_host;
34 struct p_amqp_host sfacctd_counter_amqp_host;
35 struct p_amqp_host telemetry_daemon_msglog_amqp_host;
36 struct p_amqp_host telemetry_dump_amqp_host;
37
38 char rabbitmq_user[] = "guest";
39 char rabbitmq_pwd[] = "guest";
40 char default_amqp_exchange[] = "pmacct";
41 char default_amqp_exchange_type[] = "direct";
42 char default_amqp_routing_key[] = "acct";
43 char default_amqp_host[] = "127.0.0.1";
44 char default_amqp_vhost[] = "/";
45
46 /* Functions */
p_amqp_init_host(struct p_amqp_host * amqp_host)47 void p_amqp_init_host(struct p_amqp_host *amqp_host)
48 {
49 if (amqp_host) {
50 memset(amqp_host, 0, sizeof(struct p_amqp_host));
51 p_amqp_set_frame_max(amqp_host, AMQP_DEFAULT_FRAME_SIZE);
52 p_amqp_set_heartbeat_interval(amqp_host, AMQP_DEFAULT_HEARTBEAT);
53 P_broker_timers_set_retry_interval(&amqp_host->btimers, AMQP_DEFAULT_RETRY);
54 }
55 }
56
p_amqp_set_user(struct p_amqp_host * amqp_host,char * user)57 void p_amqp_set_user(struct p_amqp_host *amqp_host, char *user)
58 {
59 if (amqp_host) amqp_host->user = user;
60 }
61
p_amqp_set_passwd(struct p_amqp_host * amqp_host,char * passwd)62 void p_amqp_set_passwd(struct p_amqp_host *amqp_host, char *passwd)
63 {
64 if (amqp_host) amqp_host->passwd = passwd;
65 }
66
p_amqp_set_exchange(struct p_amqp_host * amqp_host,char * exchange)67 void p_amqp_set_exchange(struct p_amqp_host *amqp_host, char *exchange)
68 {
69 if (amqp_host) amqp_host->exchange = exchange;
70 }
71
p_amqp_set_routing_key(struct p_amqp_host * amqp_host,char * routing_key)72 void p_amqp_set_routing_key(struct p_amqp_host *amqp_host, char *routing_key)
73 {
74 if (amqp_host) amqp_host->routing_key = routing_key;
75 }
76
p_amqp_unset_routing_key(struct p_amqp_host * amqp_host)77 void p_amqp_unset_routing_key(struct p_amqp_host *amqp_host)
78 {
79 if (amqp_host) amqp_host->routing_key = NULL;
80 }
81
p_amqp_get_routing_key(struct p_amqp_host * amqp_host)82 char *p_amqp_get_routing_key(struct p_amqp_host *amqp_host)
83 {
84 if (amqp_host) return amqp_host->routing_key;
85
86 return NULL;
87 }
88
p_amqp_init_routing_key_rr(struct p_amqp_host * amqp_host)89 void p_amqp_init_routing_key_rr(struct p_amqp_host *amqp_host)
90 {
91 if (amqp_host) memset(&amqp_host->rk_rr, 0, sizeof(struct p_table_rr));
92 }
93
p_amqp_set_routing_key_rr(struct p_amqp_host * amqp_host,int rk_rr)94 void p_amqp_set_routing_key_rr(struct p_amqp_host *amqp_host, int rk_rr)
95 {
96 if (amqp_host) amqp_host->rk_rr.max = rk_rr;
97 }
98
p_amqp_get_routing_key_rr(struct p_amqp_host * amqp_host)99 int p_amqp_get_routing_key_rr(struct p_amqp_host *amqp_host)
100 {
101 if (amqp_host) return amqp_host->rk_rr.max;
102
103 return FALSE;
104 }
105
p_amqp_set_exchange_type(struct p_amqp_host * amqp_host,char * exchange_type)106 void p_amqp_set_exchange_type(struct p_amqp_host *amqp_host, char *exchange_type)
107 {
108 if (amqp_host && exchange_type) {
109 lower_string(exchange_type);
110 amqp_host->exchange_type = exchange_type;
111 }
112 }
113
p_amqp_set_host(struct p_amqp_host * amqp_host,char * host)114 void p_amqp_set_host(struct p_amqp_host *amqp_host, char *host)
115 {
116 if (amqp_host) amqp_host->host = host;
117 }
118
p_amqp_set_vhost(struct p_amqp_host * amqp_host,char * vhost)119 void p_amqp_set_vhost(struct p_amqp_host *amqp_host, char *vhost)
120 {
121 if (amqp_host) amqp_host->vhost = vhost;
122 }
123
p_amqp_set_frame_max(struct p_amqp_host * amqp_host,u_int32_t opt)124 void p_amqp_set_frame_max(struct p_amqp_host *amqp_host, u_int32_t opt)
125 {
126 if (amqp_host) {
127 if (opt > PM_AMQP_MIN_FRAME_SIZE) amqp_host->frame_max = opt;
128 }
129 }
130
p_amqp_set_heartbeat_interval(struct p_amqp_host * amqp_host,int opt)131 void p_amqp_set_heartbeat_interval(struct p_amqp_host *amqp_host, int opt)
132 {
133 if (amqp_host) {
134 amqp_host->heartbeat_interval = opt;
135 }
136 }
137
p_amqp_set_persistent_msg(struct p_amqp_host * amqp_host,int opt)138 void p_amqp_set_persistent_msg(struct p_amqp_host *amqp_host, int opt)
139 {
140 if (amqp_host) amqp_host->persistent_msg = opt;
141 }
142
p_amqp_set_content_type_json(struct p_amqp_host * amqp_host)143 void p_amqp_set_content_type_json(struct p_amqp_host *amqp_host)
144 {
145 if (amqp_host) {
146 amqp_host->msg_props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
147 amqp_host->msg_props.content_type = amqp_cstring_bytes("application/json");
148 }
149 }
150
p_amqp_set_content_type_binary(struct p_amqp_host * amqp_host)151 void p_amqp_set_content_type_binary(struct p_amqp_host *amqp_host)
152 {
153 if (amqp_host) {
154 amqp_host->msg_props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
155 amqp_host->msg_props.content_type = amqp_cstring_bytes("application/octet-stream");
156 }
157 }
158
p_amqp_get_sockfd(struct p_amqp_host * amqp_host)159 int p_amqp_get_sockfd(struct p_amqp_host *amqp_host)
160 {
161 if (amqp_host) {
162 if (!P_broker_timers_get_last_fail(&amqp_host->btimers)) return amqp_get_sockfd(amqp_host->conn);
163 }
164
165 return ERR;
166 }
167
p_amqp_get_version()168 void p_amqp_get_version()
169 {
170 printf("rabbimq-c %s\n", amqp_version());
171 }
172
p_amqp_connect_to_publish(struct p_amqp_host * amqp_host)173 int p_amqp_connect_to_publish(struct p_amqp_host *amqp_host)
174 {
175 amqp_host->conn = amqp_new_connection();
176
177 amqp_host->socket = amqp_tcp_socket_new(amqp_host->conn);
178 if (!amqp_host->socket) {
179 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_connect_to_publish(): no socket\n", config.name, config.type);
180 p_amqp_close(amqp_host, TRUE);
181 return ERR;
182 }
183
184 amqp_host->status = amqp_socket_open(amqp_host->socket, amqp_host->host, 5672 /* default port */);
185
186 if (amqp_host->status != AMQP_STATUS_OK) {
187 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_connect_to_publish(): unable to open socket\n", config.name, config.type);
188 p_amqp_close(amqp_host, TRUE);
189 return ERR;
190 }
191
192 amqp_host->ret = amqp_login(amqp_host->conn, amqp_host->vhost, 0, amqp_host->frame_max, amqp_host->heartbeat_interval, AMQP_SASL_METHOD_PLAIN, amqp_host->user, amqp_host->passwd);
193 if (amqp_host->ret.reply_type != AMQP_RESPONSE_NORMAL) {
194 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_connect_to_publish(): login\n", config.name, config.type);
195 p_amqp_close(amqp_host, TRUE);
196 return ERR;
197 }
198
199 amqp_channel_open(amqp_host->conn, 1);
200
201 amqp_host->ret = amqp_get_rpc_reply(amqp_host->conn);
202 if (amqp_host->ret.reply_type != AMQP_RESPONSE_NORMAL) {
203 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_connect_to_publish(): unable to open channel\n", config.name, config.type);
204 p_amqp_close(amqp_host, TRUE);
205 return ERR;
206 }
207
208 if (amqp_host->persistent_msg)
209 amqp_exchange_declare(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
210 amqp_cstring_bytes(amqp_host->exchange_type), 0, 1 /* durable */, 0, 0, amqp_empty_table);
211 else
212 amqp_exchange_declare(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
213 amqp_cstring_bytes(amqp_host->exchange_type), 0, 0, 0, 0, amqp_empty_table);
214
215 amqp_host->ret = amqp_get_rpc_reply(amqp_host->conn);
216 if (amqp_host->ret.reply_type != AMQP_RESPONSE_NORMAL) {
217 const char *err_msg;
218
219 switch (amqp_host->ret.reply_type) {
220 case AMQP_RESPONSE_NONE:
221 err_msg = "Missing RPC reply type";
222 break;
223 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
224 err_msg = "Client library exception";
225 break;
226 case AMQP_RESPONSE_SERVER_EXCEPTION:
227 err_msg = "Server generated an exception";
228 break;
229 default:
230 err_msg = "Unknown";
231 break;
232 }
233
234 Log(LOG_ERR, "ERROR ( %s/%s ): Handshake with RabbitMQ failed: %s\n", config.name, config.type, err_msg);
235 p_amqp_close(amqp_host, TRUE);
236 return ERR;
237 }
238
239 // XXX: to be removed
240 amqp_host->msg_props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
241 amqp_host->msg_props.content_type = amqp_cstring_bytes("application/json");
242
243 if (amqp_host->persistent_msg) {
244 amqp_host->msg_props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
245 amqp_host->msg_props.delivery_mode = 2; /* persistent delivery */
246 }
247
248 Log(LOG_DEBUG, "DEBUG ( %s/%s ): Connection successful to RabbitMQ: p_amqp_connect_to_publish()\n", config.name, config.type);
249
250 P_broker_timers_unset_last_fail(&amqp_host->btimers);
251 return SUCCESS;
252 }
253
p_amqp_publish_string(struct p_amqp_host * amqp_host,char * json_str)254 int p_amqp_publish_string(struct p_amqp_host *amqp_host, char *json_str)
255 {
256 if (p_amqp_is_alive(amqp_host) == ERR) {
257 p_amqp_close(amqp_host, TRUE);
258 return ERR;
259 }
260
261 amqp_host->status = amqp_basic_publish(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
262 amqp_cstring_bytes(amqp_host->routing_key), 0, 0, &amqp_host->msg_props,
263 amqp_cstring_bytes(json_str));
264
265 if (amqp_host->status) {
266 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_publish_string() [E=%s RK=%s DM=%u]\n",
267 config.name, config.type, amqp_host->exchange,
268 amqp_host->routing_key, amqp_host->msg_props.delivery_mode);
269 p_amqp_close(amqp_host, TRUE);
270 return ERR;
271 }
272 else {
273 if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): publishing to RabbitMQ: p_amqp_publish_string() [E=%s RK=%s DM=%u]: %s\n",
274 config.name, config.type, amqp_host->exchange,
275 amqp_host->routing_key, amqp_host->msg_props.delivery_mode,
276 json_str);
277 }
278
279 return SUCCESS;
280 }
281
p_amqp_publish_binary(struct p_amqp_host * amqp_host,void * data,u_int32_t data_len)282 int p_amqp_publish_binary(struct p_amqp_host *amqp_host, void *data, u_int32_t data_len)
283 {
284 amqp_bytes_t pdata;
285
286 if (p_amqp_is_alive(amqp_host) == ERR) {
287 p_amqp_close(amqp_host, TRUE);
288 return ERR;
289 }
290
291 memset(&pdata, 0, sizeof(pdata));
292 pdata.len = data_len;
293 pdata.bytes = data;
294
295 amqp_host->status = amqp_basic_publish(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
296 amqp_cstring_bytes(amqp_host->routing_key), 0, 0, &amqp_host->msg_props,
297 pdata);
298
299 if (amqp_host->status) {
300 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_publish_binary() [E=%s RK=%s DM=%u]\n",
301 config.name, config.type, amqp_host->exchange,
302 amqp_host->routing_key, amqp_host->msg_props.delivery_mode);
303 p_amqp_close(amqp_host, TRUE);
304 return ERR;
305 }
306 else {
307 if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): publishing to RabbitMQ: p_amqp_publish_binary() [E=%s RK=%s DM=%u]\n",
308 config.name, config.type, amqp_host->exchange,
309 amqp_host->routing_key, amqp_host->msg_props.delivery_mode);
310 }
311
312 return SUCCESS;
313 }
314
p_amqp_close(struct p_amqp_host * amqp_host,int set_fail)315 void p_amqp_close(struct p_amqp_host *amqp_host, int set_fail)
316 {
317 if (amqp_host->conn) {
318 if (amqp_get_socket(amqp_host->conn)) amqp_connection_close(amqp_host->conn, AMQP_REPLY_SUCCESS);
319
320 if (set_fail) {
321 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_close()\n", config.name, config.type);
322 P_broker_timers_set_last_fail(&amqp_host->btimers, time(NULL));
323 }
324 amqp_destroy_connection(amqp_host->conn);
325 amqp_host->conn = NULL;
326 }
327 }
328
p_amqp_is_alive(struct p_amqp_host * amqp_host)329 int p_amqp_is_alive(struct p_amqp_host *amqp_host)
330 {
331 if (amqp_host->status == AMQP_STATUS_OK && amqp_host->conn && (amqp_get_sockfd(amqp_host->conn) >= 0)) return SUCCESS;
332 else return ERR;
333 }
334
335 #if defined WITH_JANSSON
write_and_free_json_amqp(void * amqp_log,void * obj)336 int write_and_free_json_amqp(void *amqp_log, void *obj)
337 {
338 char *orig_amqp_routing_key = NULL, dyn_amqp_routing_key[SRVBUFLEN];
339 struct p_amqp_host *alog = (struct p_amqp_host *) amqp_log;
340 int ret = ERR;
341
342 char *tmpbuf = NULL;
343 json_t *json_obj = (json_t *) obj;
344
345 tmpbuf = json_dumps(json_obj, JSON_PRESERVE_ORDER);
346 json_decref(json_obj);
347
348 if (tmpbuf) {
349 if (alog->rk_rr.max) {
350 orig_amqp_routing_key = p_amqp_get_routing_key(alog);
351 P_handle_table_dyn_rr(dyn_amqp_routing_key, SRVBUFLEN, orig_amqp_routing_key, &alog->rk_rr);
352 p_amqp_set_routing_key(alog, dyn_amqp_routing_key);
353 }
354
355 ret = p_amqp_publish_string(alog, tmpbuf);
356 free(tmpbuf);
357
358 if (alog->rk_rr.max) p_amqp_set_routing_key(alog, orig_amqp_routing_key);
359 }
360
361 return ret;
362 }
363 #else
write_and_free_json_amqp(void * amqp_log,void * obj)364 int write_and_free_json_amqp(void *amqp_log, void *obj)
365 {
366 if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): write_and_free_json_amqp(): JSON object not created due to missing --enable-jansson\n", config.name, config.type);
367 }
368 #endif
369
write_binary_amqp(void * amqp_log,void * obj,size_t len)370 int write_binary_amqp(void *amqp_log, void *obj, size_t len)
371 {
372 char *orig_amqp_routing_key = NULL, dyn_amqp_routing_key[SRVBUFLEN];
373 struct p_amqp_host *alog = (struct p_amqp_host *) amqp_log;
374 int ret = ERR;
375
376 if (obj && len) {
377 if (alog->rk_rr.max) {
378 orig_amqp_routing_key = p_amqp_get_routing_key(alog);
379 P_handle_table_dyn_rr(dyn_amqp_routing_key, SRVBUFLEN, orig_amqp_routing_key, &alog->rk_rr);
380 p_amqp_set_routing_key(alog, dyn_amqp_routing_key);
381 }
382
383 ret = p_amqp_publish_binary(alog, obj, len);
384
385 if (alog->rk_rr.max) p_amqp_set_routing_key(alog, orig_amqp_routing_key);
386 }
387
388 return ret;
389 }
390
write_string_amqp(void * amqp_log,char * obj)391 int write_string_amqp(void *amqp_log, char *obj)
392 {
393 char *orig_amqp_routing_key = NULL, dyn_amqp_routing_key[SRVBUFLEN];
394 struct p_amqp_host *alog = (struct p_amqp_host *) amqp_log;
395 int ret = ERR;
396
397 if (obj) {
398 if (alog->rk_rr.max) {
399 orig_amqp_routing_key = p_amqp_get_routing_key(alog);
400 P_handle_table_dyn_rr(dyn_amqp_routing_key, SRVBUFLEN, orig_amqp_routing_key, &alog->rk_rr);
401 p_amqp_set_routing_key(alog, dyn_amqp_routing_key);
402 }
403
404 ret = p_amqp_publish_string(alog, obj);
405
406 if (alog->rk_rr.max) p_amqp_set_routing_key(alog, orig_amqp_routing_key);
407 }
408
409 return ret;
410 }
411