1 /*
2 * pmacct (Promiscuous mode IP Accounting package)
3 *
4 * Copyright (c) 2003-2020 Paolo Lucente <paolo@pmacct.net>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 /* includes */
20 #include "pmacct.h"
21 #include "pmacct-data.h"
22 #include "addr.h"
23 #include "thread_pool.h"
24
25 /* Global variables */
26 thread_pool_t *redis_pool;
27
28 /* Functions */
p_redis_thread_wrapper(struct p_redis_host * redis_host)29 void p_redis_thread_wrapper(struct p_redis_host *redis_host)
30 {
31 /* initialize threads pool */
32 redis_pool = allocate_thread_pool(1);
33
34 assert(redis_pool);
35 assert(redis_host);
36
37 Log(LOG_DEBUG, "DEBUG ( %s ): %d thread(s) initialized\n", redis_host->log_id, 1);
38
39 /* giving a kick to the Redis thread */
40 send_to_pool(redis_pool, p_redis_master_produce_thread, redis_host);
41 }
42
p_redis_master_produce_thread(void * rh)43 void p_redis_master_produce_thread(void *rh)
44 {
45 struct p_redis_host *redis_host = rh;
46 unsigned int ret = 0, period = 0;
47
48 p_redis_connect(redis_host, TRUE);
49
50 for (;;) {
51 if (!ret) {
52 (*redis_host->th_hdlr)(redis_host);
53 period = PM_REDIS_DEFAULT_REFRESH_TIME;
54 }
55 else {
56 period = ret;
57 }
58
59 ret = sleep(period);
60 }
61 }
62
p_redis_init(struct p_redis_host * redis_host,char * log_id,redis_thread_handler th_hdlr)63 void p_redis_init(struct p_redis_host *redis_host, char *log_id, redis_thread_handler th_hdlr)
64 {
65 if (!redis_host || !log_id || !th_hdlr) return;
66
67 memset(redis_host, 0, sizeof(struct p_redis_host));
68
69 if (config.redis_host) {
70 p_redis_set_log_id(redis_host, log_id);
71 p_redis_set_db(redis_host, config.redis_db);
72 p_redis_set_exp_time(redis_host, PM_REDIS_DEFAULT_EXP_TIME);
73 p_redis_set_thread_handler(redis_host, th_hdlr);
74
75 if (!config.cluster_name) {
76 Log(LOG_ERR, "ERROR ( %s ): redis_host requires cluster_name to be specified. Exiting...\n\n", redis_host->log_id);
77 exit_gracefully(1);
78 }
79
80 if (!config.cluster_id) {
81 Log(LOG_ERR, "ERROR ( %s ): redis_host requires cluster_id to be specified. Exiting...\n\n", redis_host->log_id);
82 exit_gracefully(1);
83 }
84
85 p_redis_thread_wrapper(redis_host);
86 }
87 }
88
p_redis_connect(struct p_redis_host * redis_host,int fatal)89 int p_redis_connect(struct p_redis_host *redis_host, int fatal)
90 {
91 struct sockaddr_storage dest;
92 socklen_t dest_len = sizeof(dest);
93 char dest_str[INET6_ADDRSTRLEN];
94 int dest_port = PM_REDIS_DEFAULT_PORT;
95
96 time_t now = time(NULL);
97
98 assert(redis_host);
99
100 if (config.redis_host) {
101 if (now >= (redis_host->last_conn + PM_REDIS_DEFAULT_CONN_RETRY)) {
102 redis_host->last_conn = now;
103
104 /* round of parsing and validation */
105 parse_hostport(config.redis_host, (struct sockaddr *)&dest, &dest_len);
106 sa_to_str(dest_str, sizeof(dest_str), (struct sockaddr *)&dest);
107 sa_to_port(&dest_port, (struct sockaddr *)&dest);
108
109 redis_host->ctx = redisConnect(dest_str, dest_port);
110
111 if (redis_host->ctx == NULL || redis_host->ctx->err) {
112 if (redis_host->ctx) {
113 if (fatal) {
114 Log(LOG_ERR, "ERROR ( %s ): Connection error: %s\n", redis_host->log_id, redis_host->ctx->errstr);
115 exit_gracefully(1);
116 }
117 else {
118 return ERR;
119 }
120 }
121 else {
122 Log(LOG_ERR, "ERROR ( %s ): Connection error: can't allocate redis context\n", redis_host->log_id);
123 exit_gracefully(1);
124 }
125 }
126 else {
127 Log(LOG_DEBUG, "DEBUG ( %s ): Connection successful\n", redis_host->log_id);
128 }
129 }
130 }
131
132 return SUCCESS;
133 }
134
p_redis_close(struct p_redis_host * redis_host)135 void p_redis_close(struct p_redis_host *redis_host)
136 {
137 redisFree(redis_host->ctx);
138 }
139
p_redis_set_string(struct p_redis_host * redis_host,char * resource,char * value,int expire)140 void p_redis_set_string(struct p_redis_host *redis_host, char *resource, char *value, int expire)
141 {
142 if (expire > 0) {
143 redis_host->reply = redisCommand(redis_host->ctx, "SETEX %s%s%d%s%s %d %s", config.cluster_name, PM_REDIS_DEFAULT_SEP,
144 config.cluster_id, PM_REDIS_DEFAULT_SEP, resource, redis_host->exp_time, value);
145 }
146 else {
147 redis_host->reply = redisCommand(redis_host->ctx, "SET %s%s%d%s%s %s", config.cluster_name, PM_REDIS_DEFAULT_SEP,
148 config.cluster_id, PM_REDIS_DEFAULT_SEP, resource, value);
149 }
150
151 p_redis_process_reply(redis_host);
152 }
153
p_redis_set_int(struct p_redis_host * redis_host,char * resource,int value,int expire)154 void p_redis_set_int(struct p_redis_host *redis_host, char *resource, int value, int expire)
155 {
156 if (expire > 0) {
157 redis_host->reply = redisCommand(redis_host->ctx, "SETEX %s%s%d%s%s %d %d", config.cluster_name, PM_REDIS_DEFAULT_SEP,
158 config.cluster_id, PM_REDIS_DEFAULT_SEP, resource, redis_host->exp_time, value);
159 }
160 else {
161 redis_host->reply = redisCommand(redis_host->ctx, "SET %s%s%d%s%s %d", config.cluster_name, PM_REDIS_DEFAULT_SEP,
162 config.cluster_id, PM_REDIS_DEFAULT_SEP, resource, value);
163 }
164
165 p_redis_process_reply(redis_host);
166 }
167
p_redis_ping(struct p_redis_host * redis_host)168 void p_redis_ping(struct p_redis_host *redis_host)
169 {
170 redis_host->reply = redisCommand(redis_host->ctx, "PING");
171 p_redis_process_reply(redis_host);
172 }
173
p_redis_select_db(struct p_redis_host * redis_host)174 void p_redis_select_db(struct p_redis_host *redis_host)
175 {
176 char select_cmd[VERYSHORTBUFLEN];
177
178 if (redis_host->db) {
179 snprintf(select_cmd, sizeof(select_cmd), "SELECT %d", redis_host->db);
180 redis_host->reply = redisCommand(redis_host->ctx, select_cmd);
181 p_redis_process_reply(redis_host);
182 }
183 }
184
p_redis_process_reply(struct p_redis_host * redis_host)185 void p_redis_process_reply(struct p_redis_host *redis_host)
186 {
187 if (redis_host->reply) {
188 if (redis_host->reply->type == REDIS_REPLY_ERROR) {
189 Log(LOG_WARNING, "WARN ( %s ): reply='%s'\n", redis_host->log_id, redis_host->reply->str);
190 }
191
192 freeReplyObject(redis_host->reply);
193 }
194 else {
195 p_redis_connect(redis_host, FALSE);
196 }
197 }
198
p_redis_set_log_id(struct p_redis_host * redis_host,char * log_id)199 void p_redis_set_log_id(struct p_redis_host *redis_host, char *log_id)
200 {
201 if (redis_host) {
202 strlcpy(redis_host->log_id, log_id, sizeof(redis_host->log_id));
203 strncat(redis_host->log_id, "/redis", (sizeof(redis_host->log_id) - strlen(redis_host->log_id)));
204 }
205 }
206
p_redis_set_db(struct p_redis_host * redis_host,int db)207 void p_redis_set_db(struct p_redis_host *redis_host, int db)
208 {
209 if (redis_host) redis_host->db = db;
210 }
211
p_redis_set_exp_time(struct p_redis_host * redis_host,int exp_time)212 void p_redis_set_exp_time(struct p_redis_host *redis_host, int exp_time)
213 {
214 if (redis_host) redis_host->exp_time = exp_time;
215 }
216
p_redis_set_thread_handler(struct p_redis_host * redis_host,redis_thread_handler th_hdlr)217 void p_redis_set_thread_handler(struct p_redis_host *redis_host, redis_thread_handler th_hdlr)
218 {
219 if (redis_host) redis_host->th_hdlr = th_hdlr;
220 }
221
p_redis_thread_produce_common_core_handler(void * rh)222 void p_redis_thread_produce_common_core_handler(void *rh)
223 {
224 struct p_redis_host *redis_host = rh;
225 char buf[SRVBUFLEN], name_and_type[SHORTBUFLEN], daemon_type[VERYSHORTBUFLEN];
226
227 switch (config.acct_type) {
228 case ACCT_NF:
229 snprintf(daemon_type, sizeof(daemon_type), "%s", "nfacctd");
230 break;
231 case ACCT_SF:
232 snprintf(daemon_type, sizeof(daemon_type), "%s", "sfacctd");
233 break;
234 case ACCT_PM:
235 if (config.uacctd_group) {
236 snprintf(daemon_type, sizeof(daemon_type), "%s", "uacctd");
237 }
238 else {
239 snprintf(daemon_type, sizeof(daemon_type), "%s", "pmacctd");
240 }
241 break;
242 case ACCT_PMBGP:
243 snprintf(daemon_type, sizeof(daemon_type), "%s", "pmbgpd");
244 break;
245 case ACCT_PMBMP:
246 snprintf(daemon_type, sizeof(daemon_type), "%s", "pmbmpd");
247 break;
248 case ACCT_PMTELE:
249 snprintf(daemon_type, sizeof(daemon_type), "%s", "pmtelemetryd");
250 break;
251 default:
252 break;
253 }
254 p_redis_set_string(redis_host, "daemon_type", daemon_type, PM_REDIS_DEFAULT_EXP_TIME);
255
256 snprintf(name_and_type, sizeof(name_and_type), "process%s%s%s%s", PM_REDIS_DEFAULT_SEP,
257 config.name, PM_REDIS_DEFAULT_SEP, config.type);
258 p_redis_set_int(redis_host, name_and_type, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
259
260 if (config.acct_type < ACCT_FWPLANE_MAX) {
261 if (config.nfacctd_isis) {
262 snprintf(buf, sizeof(buf), "%s%sisis", name_and_type, PM_REDIS_DEFAULT_SEP);
263 p_redis_set_int(redis_host, buf, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
264 }
265
266 if (config.bgp_daemon) {
267 snprintf(buf, sizeof(buf), "%s%sbgp", name_and_type, PM_REDIS_DEFAULT_SEP);
268 p_redis_set_int(redis_host, buf, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
269 }
270
271 if (config.bmp_daemon) {
272 snprintf(buf, sizeof(buf), "%s%sbmp", name_and_type, PM_REDIS_DEFAULT_SEP);
273 p_redis_set_int(redis_host, buf, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
274 }
275
276 if (config.telemetry_daemon) {
277 snprintf(buf, sizeof(buf), "%s%stelemetry", name_and_type, PM_REDIS_DEFAULT_SEP);
278 p_redis_set_int(redis_host, buf, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
279 }
280 }
281 }
282
p_redis_thread_produce_common_plugin_handler(void * rh)283 void p_redis_thread_produce_common_plugin_handler(void *rh)
284 {
285 struct p_redis_host *redis_host = rh;
286 char name_and_type[SRVBUFLEN];
287
288 snprintf(name_and_type, sizeof(name_and_type), "process%s%s%s%s", PM_REDIS_DEFAULT_SEP,
289 config.name, PM_REDIS_DEFAULT_SEP, config.type);
290 p_redis_set_int(redis_host, name_and_type, TRUE, PM_REDIS_DEFAULT_EXP_TIME);
291 }
292