1 /*
2  * pmacct (Promiscuous mode IP Accounting package)
3  *
4  * Copyright (c) 2003-2020 Paolo Lucente <paolo@pmacct.net>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 /* includes */
20 #include "pmacct.h"
21 #include "pmacct-data.h"
22 #include "addr.h"
23 #include "thread_pool.h"
24 
25 /* Global variables */
26 thread_pool_t *redis_pool;
27 
28 /* Functions */
p_redis_thread_wrapper(struct p_redis_host * redis_host)29 void p_redis_thread_wrapper(struct p_redis_host *redis_host)
30 {
31   /* initialize threads pool */
32   redis_pool = allocate_thread_pool(1);
33 
34   assert(redis_pool);
35   assert(redis_host);
36 
37   Log(LOG_DEBUG, "DEBUG ( %s ): %d thread(s) initialized\n", redis_host->log_id, 1);
38 
39   /* giving a kick to the Redis thread */
40   send_to_pool(redis_pool, p_redis_master_produce_thread, redis_host);
41 }
42 
p_redis_master_produce_thread(void * rh)43 void p_redis_master_produce_thread(void *rh)
44 {
45   struct p_redis_host *redis_host = rh;
46   unsigned int ret = 0, period = 0;
47 
48   p_redis_connect(redis_host, TRUE);
49 
50   for (;;) {
51     if (!ret) {
52       (*redis_host->th_hdlr)(redis_host);
53       period = PM_REDIS_DEFAULT_REFRESH_TIME;
54     }
55     else {
56       period = ret;
57     }
58 
59     ret = sleep(period);
60   }
61 }
62 
p_redis_init(struct p_redis_host * redis_host,char * log_id,redis_thread_handler th_hdlr)63 void p_redis_init(struct p_redis_host *redis_host, char *log_id, redis_thread_handler th_hdlr)
64 {
65   if (!redis_host || !log_id || !th_hdlr) return;
66 
67   memset(redis_host, 0, sizeof(struct p_redis_host));
68 
69   if (config.redis_host) {
70     p_redis_set_log_id(redis_host, log_id);
71     p_redis_set_db(redis_host, config.redis_db);
72     p_redis_set_exp_time(redis_host, PM_REDIS_DEFAULT_EXP_TIME);
73     p_redis_set_thread_handler(redis_host, th_hdlr);
74 
75     if (!config.cluster_name) {
76       Log(LOG_ERR, "ERROR ( %s ): redis_host requires cluster_name to be specified. Exiting...\n\n", redis_host->log_id);
77       exit_gracefully(1);
78     }
79 
80     if (!config.cluster_id) {
81       Log(LOG_ERR, "ERROR ( %s ): redis_host requires cluster_id to be specified. Exiting...\n\n", redis_host->log_id);
82       exit_gracefully(1);
83     }
84 
85     p_redis_thread_wrapper(redis_host);
86   }
87 }
88 
p_redis_connect(struct p_redis_host * redis_host,int fatal)89 int p_redis_connect(struct p_redis_host *redis_host, int fatal)
90 {
91   struct sockaddr_storage dest;
92   socklen_t dest_len = sizeof(dest);
93   char dest_str[INET6_ADDRSTRLEN];
94   int dest_port = PM_REDIS_DEFAULT_PORT;
95 
96   time_t now = time(NULL);
97 
98   assert(redis_host);
99 
100   if (config.redis_host) {
101     if (now >= (redis_host->last_conn + PM_REDIS_DEFAULT_CONN_RETRY)) {
102       redis_host->last_conn = now;
103 
104       /* round of parsing and validation */
105       parse_hostport(config.redis_host, (struct sockaddr *)&dest, &dest_len);
106       sa_to_str(dest_str, sizeof(dest_str), (struct sockaddr *)&dest);
107       sa_to_port(&dest_port, (struct sockaddr *)&dest);
108 
109       redis_host->ctx = redisConnect(dest_str, dest_port);
110 
111       if (redis_host->ctx == NULL || redis_host->ctx->err) {
112 	if (redis_host->ctx) {
113 	  if (fatal) {
114 	    Log(LOG_ERR, "ERROR ( %s ): Connection error: %s\n", redis_host->log_id, redis_host->ctx->errstr);
115 	    exit_gracefully(1);
116 	  }
117 	  else {
118 	    return ERR;
119 	  }
120 	}
121 	else {
122 	  Log(LOG_ERR, "ERROR ( %s ): Connection error: can't allocate redis context\n", redis_host->log_id);
123           exit_gracefully(1);
124 	}
125       }
126       else {
127 	Log(LOG_DEBUG, "DEBUG ( %s ): Connection successful\n", redis_host->log_id);
128       }
129     }
130   }
131 
132   return SUCCESS;
133 }
134 
p_redis_close(struct p_redis_host * redis_host)135 void p_redis_close(struct p_redis_host *redis_host)
136 {
137   redisFree(redis_host->ctx);
138 }
139 
p_redis_set_string(struct p_redis_host * redis_host,char * resource,char * value,int expire)140 void p_redis_set_string(struct p_redis_host *redis_host, char *resource, char *value, int expire)
141 {
142   if (expire > 0) {
143     redis_host->reply = redisCommand(redis_host->ctx, "SETEX %s%s%d%s%s %d %s", config.cluster_name, PM_REDIS_DEFAULT_SEP,
144 				     config.cluster_id, PM_REDIS_DEFAULT_SEP, resource, redis_host->exp_time, value);
145   }
146   else {
147     redis_host->reply = redisCommand(redis_host->ctx, "SET %s%s%d%s%s %s", config.cluster_name, PM_REDIS_DEFAULT_SEP,
148 				     config.cluster_id, PM_REDIS_DEFAULT_SEP, resource, value);
149   }
150 
151   p_redis_process_reply(redis_host);
152 }
153 
p_redis_set_int(struct p_redis_host * redis_host,char * resource,int value,int expire)154 void p_redis_set_int(struct p_redis_host *redis_host, char *resource, int value, int expire)
155 {
156   if (expire > 0) {
157     redis_host->reply = redisCommand(redis_host->ctx, "SETEX %s%s%d%s%s %d %d", config.cluster_name, PM_REDIS_DEFAULT_SEP,
158 				     config.cluster_id, PM_REDIS_DEFAULT_SEP, resource, redis_host->exp_time, value);
159   }
160   else {
161     redis_host->reply = redisCommand(redis_host->ctx, "SET %s%s%d%s%s %d", config.cluster_name, PM_REDIS_DEFAULT_SEP,
162 				     config.cluster_id, PM_REDIS_DEFAULT_SEP, resource, value);
163   }
164 
165   p_redis_process_reply(redis_host);
166 }
167 
p_redis_ping(struct p_redis_host * redis_host)168 void p_redis_ping(struct p_redis_host *redis_host)
169 {
170   redis_host->reply = redisCommand(redis_host->ctx, "PING");
171   p_redis_process_reply(redis_host);
172 }
173 
p_redis_select_db(struct p_redis_host * redis_host)174 void p_redis_select_db(struct p_redis_host *redis_host)
175 {
176   char select_cmd[VERYSHORTBUFLEN];
177 
178   if (redis_host->db) {
179     snprintf(select_cmd, sizeof(select_cmd), "SELECT %d", redis_host->db);
180     redis_host->reply = redisCommand(redis_host->ctx, select_cmd);
181     p_redis_process_reply(redis_host);
182   }
183 }
184 
p_redis_process_reply(struct p_redis_host * redis_host)185 void p_redis_process_reply(struct p_redis_host *redis_host)
186 {
187   if (redis_host->reply) {
188     if (redis_host->reply->type == REDIS_REPLY_ERROR) {
189       Log(LOG_WARNING, "WARN ( %s ): reply='%s'\n", redis_host->log_id, redis_host->reply->str);
190     }
191 
192     freeReplyObject(redis_host->reply);
193   }
194   else {
195     p_redis_connect(redis_host, FALSE);
196   }
197 }
198 
p_redis_set_log_id(struct p_redis_host * redis_host,char * log_id)199 void p_redis_set_log_id(struct p_redis_host *redis_host, char *log_id)
200 {
201   if (redis_host) {
202     strlcpy(redis_host->log_id, log_id, sizeof(redis_host->log_id));
203     strncat(redis_host->log_id, "/redis", (sizeof(redis_host->log_id) - strlen(redis_host->log_id)));
204   }
205 }
206 
p_redis_set_db(struct p_redis_host * redis_host,int db)207 void p_redis_set_db(struct p_redis_host *redis_host, int db)
208 {
209   if (redis_host) redis_host->db = db;
210 }
211 
p_redis_set_exp_time(struct p_redis_host * redis_host,int exp_time)212 void p_redis_set_exp_time(struct p_redis_host *redis_host, int exp_time)
213 {
214   if (redis_host) redis_host->exp_time = exp_time;
215 }
216 
p_redis_set_thread_handler(struct p_redis_host * redis_host,redis_thread_handler th_hdlr)217 void p_redis_set_thread_handler(struct p_redis_host *redis_host, redis_thread_handler th_hdlr)
218 {
219   if (redis_host) redis_host->th_hdlr = th_hdlr;
220 }
221 
p_redis_thread_produce_common_core_handler(void * rh)222 void p_redis_thread_produce_common_core_handler(void *rh)
223 {
224   struct p_redis_host *redis_host = rh;
225   char buf[SRVBUFLEN], name_and_type[SHORTBUFLEN], daemon_type[VERYSHORTBUFLEN];
226 
227   switch (config.acct_type) {
228   case ACCT_NF:
229     snprintf(daemon_type, sizeof(daemon_type), "%s", "nfacctd");
230     break;
231   case ACCT_SF:
232     snprintf(daemon_type, sizeof(daemon_type), "%s", "sfacctd");
233     break;
234   case ACCT_PM:
235     if (config.uacctd_group) {
236       snprintf(daemon_type, sizeof(daemon_type), "%s", "uacctd");
237     }
238     else {
239       snprintf(daemon_type, sizeof(daemon_type), "%s", "pmacctd");
240     }
241     break;
242   case ACCT_PMBGP:
243     snprintf(daemon_type, sizeof(daemon_type), "%s", "pmbgpd");
244     break;
245   case ACCT_PMBMP:
246     snprintf(daemon_type, sizeof(daemon_type), "%s", "pmbmpd");
247     break;
248   case ACCT_PMTELE:
249     snprintf(daemon_type, sizeof(daemon_type), "%s", "pmtelemetryd");
250     break;
251   default:
252     break;
253   }
254   p_redis_set_string(redis_host, "daemon_type", daemon_type, PM_REDIS_DEFAULT_EXP_TIME);
255 
256   snprintf(name_and_type, sizeof(name_and_type), "process%s%s%s%s", PM_REDIS_DEFAULT_SEP,
257 	   config.name, PM_REDIS_DEFAULT_SEP, config.type);
258   p_redis_set_int(redis_host, name_and_type, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
259 
260   if (config.acct_type < ACCT_FWPLANE_MAX) {
261     if (config.nfacctd_isis) {
262       snprintf(buf, sizeof(buf), "%s%sisis", name_and_type, PM_REDIS_DEFAULT_SEP);
263       p_redis_set_int(redis_host, buf, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
264     }
265 
266     if (config.bgp_daemon) {
267       snprintf(buf, sizeof(buf), "%s%sbgp", name_and_type, PM_REDIS_DEFAULT_SEP);
268       p_redis_set_int(redis_host, buf, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
269     }
270 
271     if (config.bmp_daemon) {
272       snprintf(buf, sizeof(buf), "%s%sbmp", name_and_type, PM_REDIS_DEFAULT_SEP);
273       p_redis_set_int(redis_host, buf, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
274     }
275 
276     if (config.telemetry_daemon) {
277       snprintf(buf, sizeof(buf), "%s%stelemetry", name_and_type, PM_REDIS_DEFAULT_SEP);
278       p_redis_set_int(redis_host, buf, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
279     }
280   }
281 }
282 
p_redis_thread_produce_common_plugin_handler(void * rh)283 void p_redis_thread_produce_common_plugin_handler(void *rh)
284 {
285   struct p_redis_host *redis_host = rh;
286   char name_and_type[SRVBUFLEN];
287 
288   snprintf(name_and_type, sizeof(name_and_type), "process%s%s%s%s", PM_REDIS_DEFAULT_SEP,
289 	   config.name, PM_REDIS_DEFAULT_SEP, config.type);
290   p_redis_set_int(redis_host, name_and_type, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
291 }
292