1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2019 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "plugin_common.h"
26 #include "amqp_common.h"
27 
28 /* Global variables */
29 struct p_amqp_host amqpp_amqp_host;
30 struct p_amqp_host bgp_daemon_msglog_amqp_host;
31 struct p_amqp_host bgp_table_dump_amqp_host;
32 struct p_amqp_host bmp_daemon_msglog_amqp_host;
33 struct p_amqp_host bmp_dump_amqp_host;
34 struct p_amqp_host sfacctd_counter_amqp_host;
35 struct p_amqp_host telemetry_daemon_msglog_amqp_host;
36 struct p_amqp_host telemetry_dump_amqp_host;
37 
38 char rabbitmq_user[] = "guest";
39 char rabbitmq_pwd[] = "guest";
40 char default_amqp_exchange[] = "pmacct";
41 char default_amqp_exchange_type[] = "direct";
42 char default_amqp_routing_key[] = "acct";
43 char default_amqp_host[] = "127.0.0.1";
44 char default_amqp_vhost[] = "/";
45 
46 /* Functions */
p_amqp_init_host(struct p_amqp_host * amqp_host)47 void p_amqp_init_host(struct p_amqp_host *amqp_host)
48 {
49   if (amqp_host) {
50     memset(amqp_host, 0, sizeof(struct p_amqp_host));
51     p_amqp_set_frame_max(amqp_host, AMQP_DEFAULT_FRAME_SIZE);
52     p_amqp_set_heartbeat_interval(amqp_host, AMQP_DEFAULT_HEARTBEAT);
53     P_broker_timers_set_retry_interval(&amqp_host->btimers, AMQP_DEFAULT_RETRY);
54   }
55 }
56 
p_amqp_set_user(struct p_amqp_host * amqp_host,char * user)57 void p_amqp_set_user(struct p_amqp_host *amqp_host, char *user)
58 {
59   if (amqp_host) amqp_host->user = user;
60 }
61 
p_amqp_set_passwd(struct p_amqp_host * amqp_host,char * passwd)62 void p_amqp_set_passwd(struct p_amqp_host *amqp_host, char *passwd)
63 {
64   if (amqp_host) amqp_host->passwd = passwd;
65 }
66 
p_amqp_set_exchange(struct p_amqp_host * amqp_host,char * exchange)67 void p_amqp_set_exchange(struct p_amqp_host *amqp_host, char *exchange)
68 {
69   if (amqp_host) amqp_host->exchange = exchange;
70 }
71 
p_amqp_set_routing_key(struct p_amqp_host * amqp_host,char * routing_key)72 void p_amqp_set_routing_key(struct p_amqp_host *amqp_host, char *routing_key)
73 {
74   if (amqp_host) amqp_host->routing_key = routing_key;
75 }
76 
p_amqp_unset_routing_key(struct p_amqp_host * amqp_host)77 void p_amqp_unset_routing_key(struct p_amqp_host *amqp_host)
78 {
79   if (amqp_host) amqp_host->routing_key = NULL;
80 }
81 
p_amqp_get_routing_key(struct p_amqp_host * amqp_host)82 char *p_amqp_get_routing_key(struct p_amqp_host *amqp_host)
83 {
84   if (amqp_host) return amqp_host->routing_key;
85 
86   return NULL;
87 }
88 
p_amqp_init_routing_key_rr(struct p_amqp_host * amqp_host)89 void p_amqp_init_routing_key_rr(struct p_amqp_host *amqp_host)
90 {
91   if (amqp_host) memset(&amqp_host->rk_rr, 0, sizeof(struct p_table_rr));
92 }
93 
p_amqp_set_routing_key_rr(struct p_amqp_host * amqp_host,int rk_rr)94 void p_amqp_set_routing_key_rr(struct p_amqp_host *amqp_host, int rk_rr)
95 {
96   if (amqp_host) amqp_host->rk_rr.max = rk_rr;
97 }
98 
p_amqp_get_routing_key_rr(struct p_amqp_host * amqp_host)99 int p_amqp_get_routing_key_rr(struct p_amqp_host *amqp_host)
100 {
101   if (amqp_host) return amqp_host->rk_rr.max;
102 
103   return FALSE;
104 }
105 
p_amqp_set_exchange_type(struct p_amqp_host * amqp_host,char * exchange_type)106 void p_amqp_set_exchange_type(struct p_amqp_host *amqp_host, char *exchange_type)
107 {
108   if (amqp_host && exchange_type) {
109     lower_string(exchange_type);
110     amqp_host->exchange_type = exchange_type;
111   }
112 }
113 
p_amqp_set_host(struct p_amqp_host * amqp_host,char * host)114 void p_amqp_set_host(struct p_amqp_host *amqp_host, char *host)
115 {
116   if (amqp_host) amqp_host->host = host;
117 }
118 
p_amqp_set_vhost(struct p_amqp_host * amqp_host,char * vhost)119 void p_amqp_set_vhost(struct p_amqp_host *amqp_host, char *vhost)
120 {
121   if (amqp_host) amqp_host->vhost = vhost;
122 }
123 
p_amqp_set_frame_max(struct p_amqp_host * amqp_host,u_int32_t opt)124 void p_amqp_set_frame_max(struct p_amqp_host *amqp_host, u_int32_t opt)
125 {
126   if (amqp_host) {
127     if (opt > PM_AMQP_MIN_FRAME_SIZE) amqp_host->frame_max = opt;
128   }
129 }
130 
p_amqp_set_heartbeat_interval(struct p_amqp_host * amqp_host,int opt)131 void p_amqp_set_heartbeat_interval(struct p_amqp_host *amqp_host, int opt)
132 {
133   if (amqp_host) {
134     amqp_host->heartbeat_interval = opt;
135   }
136 }
137 
p_amqp_set_persistent_msg(struct p_amqp_host * amqp_host,int opt)138 void p_amqp_set_persistent_msg(struct p_amqp_host *amqp_host, int opt)
139 {
140   if (amqp_host) amqp_host->persistent_msg = opt;
141 }
142 
p_amqp_set_content_type_json(struct p_amqp_host * amqp_host)143 void p_amqp_set_content_type_json(struct p_amqp_host *amqp_host)
144 {
145   if (amqp_host) {
146     amqp_host->msg_props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
147     amqp_host->msg_props.content_type = amqp_cstring_bytes("application/json");
148   }
149 }
150 
p_amqp_set_content_type_binary(struct p_amqp_host * amqp_host)151 void p_amqp_set_content_type_binary(struct p_amqp_host *amqp_host)
152 {
153   if (amqp_host) {
154     amqp_host->msg_props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
155     amqp_host->msg_props.content_type = amqp_cstring_bytes("application/octet-stream");
156   }
157 }
158 
p_amqp_get_sockfd(struct p_amqp_host * amqp_host)159 int p_amqp_get_sockfd(struct p_amqp_host *amqp_host)
160 {
161   if (amqp_host) {
162     if (!P_broker_timers_get_last_fail(&amqp_host->btimers)) return amqp_get_sockfd(amqp_host->conn);
163   }
164 
165   return ERR;
166 }
167 
p_amqp_get_version()168 void p_amqp_get_version()
169 {
170   printf("rabbimq-c %s\n", amqp_version());
171 }
172 
p_amqp_connect_to_publish(struct p_amqp_host * amqp_host)173 int p_amqp_connect_to_publish(struct p_amqp_host *amqp_host)
174 {
175   amqp_host->conn = amqp_new_connection();
176 
177   amqp_host->socket = amqp_tcp_socket_new(amqp_host->conn);
178   if (!amqp_host->socket) {
179     Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_connect_to_publish(): no socket\n", config.name, config.type);
180     p_amqp_close(amqp_host, TRUE);
181     return ERR;
182   }
183 
184   amqp_host->status = amqp_socket_open(amqp_host->socket, amqp_host->host, 5672 /* default port */);
185 
186   if (amqp_host->status != AMQP_STATUS_OK) {
187     Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_connect_to_publish(): unable to open socket\n", config.name, config.type);
188     p_amqp_close(amqp_host, TRUE);
189     return ERR;
190   }
191 
192   amqp_host->ret = amqp_login(amqp_host->conn, amqp_host->vhost, 0, amqp_host->frame_max, amqp_host->heartbeat_interval, AMQP_SASL_METHOD_PLAIN, amqp_host->user, amqp_host->passwd);
193   if (amqp_host->ret.reply_type != AMQP_RESPONSE_NORMAL) {
194     Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_connect_to_publish(): login\n", config.name, config.type);
195     p_amqp_close(amqp_host, TRUE);
196     return ERR;
197   }
198 
199   amqp_channel_open(amqp_host->conn, 1);
200 
201   amqp_host->ret = amqp_get_rpc_reply(amqp_host->conn);
202   if (amqp_host->ret.reply_type != AMQP_RESPONSE_NORMAL) {
203     Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_connect_to_publish(): unable to open channel\n", config.name, config.type);
204     p_amqp_close(amqp_host, TRUE);
205     return ERR;
206   }
207 
208   if (amqp_host->persistent_msg)
209     amqp_exchange_declare(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
210 				amqp_cstring_bytes(amqp_host->exchange_type), 0, 1 /* durable */, 0, 0, amqp_empty_table);
211   else
212     amqp_exchange_declare(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
213 				amqp_cstring_bytes(amqp_host->exchange_type), 0, 0, 0, 0, amqp_empty_table);
214 
215   amqp_host->ret = amqp_get_rpc_reply(amqp_host->conn);
216   if (amqp_host->ret.reply_type != AMQP_RESPONSE_NORMAL) {
217     const char *err_msg;
218 
219     switch (amqp_host->ret.reply_type) {
220     case AMQP_RESPONSE_NONE:
221       err_msg = "Missing RPC reply type";
222       break;
223     case AMQP_RESPONSE_LIBRARY_EXCEPTION:
224       err_msg = "Client library exception";
225       break;
226     case AMQP_RESPONSE_SERVER_EXCEPTION:
227       err_msg = "Server generated an exception";
228       break;
229     default:
230       err_msg = "Unknown";
231       break;
232     }
233 
234     Log(LOG_ERR, "ERROR ( %s/%s ): Handshake with RabbitMQ failed: %s\n", config.name, config.type, err_msg);
235     p_amqp_close(amqp_host, TRUE);
236     return ERR;
237   }
238 
239   // XXX: to be removed
240   amqp_host->msg_props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
241   amqp_host->msg_props.content_type = amqp_cstring_bytes("application/json");
242 
243   if (amqp_host->persistent_msg) {
244     amqp_host->msg_props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
245     amqp_host->msg_props.delivery_mode = 2; /* persistent delivery */
246   }
247 
248   Log(LOG_DEBUG, "DEBUG ( %s/%s ): Connection successful to RabbitMQ: p_amqp_connect_to_publish()\n", config.name, config.type);
249 
250   P_broker_timers_unset_last_fail(&amqp_host->btimers);
251   return SUCCESS;
252 }
253 
p_amqp_publish_string(struct p_amqp_host * amqp_host,char * json_str)254 int p_amqp_publish_string(struct p_amqp_host *amqp_host, char *json_str)
255 {
256   if (p_amqp_is_alive(amqp_host) == ERR) {
257     p_amqp_close(amqp_host, TRUE);
258     return ERR;
259   }
260 
261   amqp_host->status = amqp_basic_publish(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
262 					 amqp_cstring_bytes(amqp_host->routing_key), 0, 0, &amqp_host->msg_props,
263 					 amqp_cstring_bytes(json_str));
264 
265   if (amqp_host->status) {
266     Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_publish_string() [E=%s RK=%s DM=%u]\n",
267 				config.name, config.type, amqp_host->exchange,
268 				amqp_host->routing_key, amqp_host->msg_props.delivery_mode);
269     p_amqp_close(amqp_host, TRUE);
270     return ERR;
271   }
272   else {
273     if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): publishing to RabbitMQ: p_amqp_publish_string() [E=%s RK=%s DM=%u]: %s\n",
274 				config.name, config.type, amqp_host->exchange,
275 				amqp_host->routing_key, amqp_host->msg_props.delivery_mode,
276 				json_str);
277   }
278 
279   return SUCCESS;
280 }
281 
p_amqp_publish_binary(struct p_amqp_host * amqp_host,void * data,u_int32_t data_len)282 int p_amqp_publish_binary(struct p_amqp_host *amqp_host, void *data, u_int32_t data_len)
283 {
284   amqp_bytes_t pdata;
285 
286   if (p_amqp_is_alive(amqp_host) == ERR) {
287     p_amqp_close(amqp_host, TRUE);
288     return ERR;
289   }
290 
291   memset(&pdata, 0, sizeof(pdata));
292   pdata.len = data_len;
293   pdata.bytes = data;
294 
295   amqp_host->status = amqp_basic_publish(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
296                                          amqp_cstring_bytes(amqp_host->routing_key), 0, 0, &amqp_host->msg_props,
297                                          pdata);
298 
299   if (amqp_host->status) {
300     Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_publish_binary() [E=%s RK=%s DM=%u]\n",
301 				config.name, config.type, amqp_host->exchange,
302 				amqp_host->routing_key, amqp_host->msg_props.delivery_mode);
303     p_amqp_close(amqp_host, TRUE);
304     return ERR;
305   }
306   else {
307     if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): publishing to RabbitMQ: p_amqp_publish_binary() [E=%s RK=%s DM=%u]\n",
308 				config.name, config.type, amqp_host->exchange,
309 				amqp_host->routing_key, amqp_host->msg_props.delivery_mode);
310   }
311 
312   return SUCCESS;
313 }
314 
p_amqp_close(struct p_amqp_host * amqp_host,int set_fail)315 void p_amqp_close(struct p_amqp_host *amqp_host, int set_fail)
316 {
317   if (amqp_host->conn) {
318     if (amqp_get_socket(amqp_host->conn)) amqp_connection_close(amqp_host->conn, AMQP_REPLY_SUCCESS);
319 
320     if (set_fail) {
321       Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to RabbitMQ: p_amqp_close()\n", config.name, config.type);
322       P_broker_timers_set_last_fail(&amqp_host->btimers, time(NULL));
323     }
324     amqp_destroy_connection(amqp_host->conn);
325     amqp_host->conn = NULL;
326   }
327 }
328 
p_amqp_is_alive(struct p_amqp_host * amqp_host)329 int p_amqp_is_alive(struct p_amqp_host *amqp_host)
330 {
331   if (amqp_host->status == AMQP_STATUS_OK && amqp_host->conn && (amqp_get_sockfd(amqp_host->conn) >= 0)) return SUCCESS;
332   else return ERR;
333 }
334 
335 #if defined WITH_JANSSON
write_and_free_json_amqp(void * amqp_log,void * obj)336 int write_and_free_json_amqp(void *amqp_log, void *obj)
337 {
338   char *orig_amqp_routing_key = NULL, dyn_amqp_routing_key[SRVBUFLEN];
339   struct p_amqp_host *alog = (struct p_amqp_host *) amqp_log;
340   int ret = ERR;
341 
342   char *tmpbuf = NULL;
343   json_t *json_obj = (json_t *) obj;
344 
345   tmpbuf = json_dumps(json_obj, JSON_PRESERVE_ORDER);
346   json_decref(json_obj);
347 
348   if (tmpbuf) {
349     if (alog->rk_rr.max) {
350       orig_amqp_routing_key = p_amqp_get_routing_key(alog);
351       P_handle_table_dyn_rr(dyn_amqp_routing_key, SRVBUFLEN, orig_amqp_routing_key, &alog->rk_rr);
352       p_amqp_set_routing_key(alog, dyn_amqp_routing_key);
353     }
354 
355     ret = p_amqp_publish_string(alog, tmpbuf);
356     free(tmpbuf);
357 
358     if (alog->rk_rr.max) p_amqp_set_routing_key(alog, orig_amqp_routing_key);
359   }
360 
361   return ret;
362 }
363 #else
write_and_free_json_amqp(void * amqp_log,void * obj)364 int write_and_free_json_amqp(void *amqp_log, void *obj)
365 {
366   if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): write_and_free_json_amqp(): JSON object not created due to missing --enable-jansson\n", config.name, config.type);
367 }
368 #endif
369 
write_binary_amqp(void * amqp_log,void * obj,size_t len)370 int write_binary_amqp(void *amqp_log, void *obj, size_t len)
371 {
372   char *orig_amqp_routing_key = NULL, dyn_amqp_routing_key[SRVBUFLEN];
373   struct p_amqp_host *alog = (struct p_amqp_host *) amqp_log;
374   int ret = ERR;
375 
376   if (obj && len) {
377     if (alog->rk_rr.max) {
378       orig_amqp_routing_key = p_amqp_get_routing_key(alog);
379       P_handle_table_dyn_rr(dyn_amqp_routing_key, SRVBUFLEN, orig_amqp_routing_key, &alog->rk_rr);
380       p_amqp_set_routing_key(alog, dyn_amqp_routing_key);
381     }
382 
383     ret = p_amqp_publish_binary(alog, obj, len);
384 
385     if (alog->rk_rr.max) p_amqp_set_routing_key(alog, orig_amqp_routing_key);
386   }
387 
388   return ret;
389 }
390 
write_string_amqp(void * amqp_log,char * obj)391 int write_string_amqp(void *amqp_log, char *obj)
392 {
393   char *orig_amqp_routing_key = NULL, dyn_amqp_routing_key[SRVBUFLEN];
394   struct p_amqp_host *alog = (struct p_amqp_host *) amqp_log;
395   int ret = ERR;
396 
397   if (obj) {
398     if (alog->rk_rr.max) {
399       orig_amqp_routing_key = p_amqp_get_routing_key(alog);
400       P_handle_table_dyn_rr(dyn_amqp_routing_key, SRVBUFLEN, orig_amqp_routing_key, &alog->rk_rr);
401       p_amqp_set_routing_key(alog, dyn_amqp_routing_key);
402     }
403 
404     ret = p_amqp_publish_string(alog, obj);
405 
406     if (alog->rk_rr.max) p_amqp_set_routing_key(alog, orig_amqp_routing_key);
407   }
408 
409   return ret;
410 }
411