1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2019 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 #include "pmacct.h"
23 #ifdef WITH_KAFKA
24 #include "kafka_common.h"
25 #endif
26 #ifdef WITH_ZMQ
27 #include "zmq_common.h"
28 #endif
29 #include "tee_plugin.h"
30 #include "tee_recvs.h"
31 
tee_recvs_map_id_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)32 int tee_recvs_map_id_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
33 {
34   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
35   int pool_idx;
36   u_int32_t pool_id;
37   char *endptr = NULL;
38 
39   if (table && table->pools) {
40     if (table->num < config.tee_max_receiver_pools) {
41       pool_id = strtoull(value, &endptr, 10);
42 
43       if (!pool_id || pool_id > UINT32_MAX) {
44         Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Invalid Pool ID specified.\n", config.name, config.type, filename);
45         return TRUE;
46       }
47 
48       /* Ensure no pool ID duplicates */
49       for (pool_idx = 0; pool_idx < table->num; pool_idx++) {
50 	if (pool_id == table->pools[table->num].id) {
51 	  Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Duplicate Pool ID specified: %u.\n", config.name, config.type, filename, pool_id);
52 	  return TRUE;
53 	}
54       }
55 
56       table->pools[table->num].id = pool_id;
57     }
58     else {
59       Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Maximum amount of receivers pool reached: %u.\n", config.name, config.type, filename, config.tee_max_receiver_pools);
60       return TRUE;
61     }
62   }
63   else {
64     Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
65     return TRUE;
66   }
67 
68   return FALSE;
69 }
70 
tee_recvs_map_ip_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)71 int tee_recvs_map_ip_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
72 {
73   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
74   struct tee_receiver *target = NULL;
75   int recv_idx;
76   char *str_ptr, *token;
77 
78   if (table && table->pools && table->pools[table->num].receivers) {
79     str_ptr = value;
80     recv_idx = 0;
81 
82     while ((token = extract_token(&str_ptr, ','))) {
83       if (recv_idx < config.tee_max_receivers) {
84 	target = &table->pools[table->num].receivers[recv_idx];
85 	target->dest_len = sizeof(target->dest);
86 	if (!Tee_parse_hostport(token, (struct sockaddr *)&target->dest, &target->dest_len, FALSE)) recv_idx++;
87 	else Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Invalid receiver %s.\n",
88 		config.name, config.type, filename, token);
89       }
90       else {
91 	Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Maximum amount of receivers pool reached %u.\n",
92 		config.name, config.type, filename, config.tee_max_receiver_pools);
93 	break;
94       }
95     }
96 
97     if (!recv_idx) {
98       Log(LOG_ERR, "ERROR ( %s/%s ): [%s] No valid receivers.\n", config.name, config.type, filename);
99       return TRUE;
100     }
101     else table->pools[table->num].num = recv_idx;
102   }
103   else {
104     Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
105     return TRUE;
106   }
107 
108   return FALSE;
109 }
110 
tee_recvs_map_tag_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)111 int tee_recvs_map_tag_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
112 {
113   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
114   int ret;
115 
116   if (table && table->pools) ret = load_tags(filename, &table->pools[table->num].tag_filter, value);
117   else {
118     Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
119     return TRUE;
120   }
121 
122   if (!ret) return TRUE;
123   else return FALSE;
124 }
125 
tee_recvs_map_balance_alg_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)126 int tee_recvs_map_balance_alg_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
127 {
128   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
129 
130   if (table && table->pools) {
131     if (!strncmp(value, "rr", 2)) {
132       table->pools[table->num].balance.type = TEE_BALANCE_RR;
133       table->pools[table->num].balance.func = Tee_rr_balance;
134     }
135     else if (!strncmp(value, "hash-agent", 10)) {
136       table->pools[table->num].balance.type = TEE_BALANCE_HASH_AGENT;
137       table->pools[table->num].balance.func = Tee_hash_agent_balance;
138     }
139     else if (!strncmp(value, "hash-tag", 8)) {
140       table->pools[table->num].balance.type = TEE_BALANCE_HASH_TAG;
141       table->pools[table->num].balance.func = Tee_hash_tag_balance;
142     }
143     else {
144       table->pools[table->num].balance.func = NULL;
145       Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Unknown balance algorithm '%s'. Ignoring.\n", config.name, config.type, filename, value);
146     }
147   }
148   else {
149     Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
150     return TRUE;
151   }
152 
153   return FALSE;
154 }
155 
tee_recvs_map_src_port_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)156 int tee_recvs_map_src_port_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
157 {
158   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
159   int port;
160 
161   if (table && table->pools) {
162     port = atoi(value);
163 
164     if (port <= UINT16_MAX) table->pools[table->num].src_port = port;
165     else {
166       Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Invalid source port specified '%s'. Ignoring.\n", config.name, config.type, filename, value);
167     }
168   }
169   else {
170     Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
171     return TRUE;
172   }
173 
174   return FALSE;
175 }
176 
177 #ifdef WITH_KAFKA
tee_recvs_map_kafka_broker_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)178 int tee_recvs_map_kafka_broker_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
179 {
180   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
181 
182   if (table && table->pools) {
183     int len = sizeof(table->pools[table->num].kafka_broker);
184 
185     memset(table->pools[table->num].kafka_broker, 0, len);
186     strlcpy(table->pools[table->num].kafka_broker, value, len);
187     table->pools[table->num].kafka_broker[len] = '\0';
188   }
189   else {
190     Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
191     return TRUE;
192   }
193 
194   return FALSE;
195 }
196 
tee_recvs_map_kafka_topic_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)197 int tee_recvs_map_kafka_topic_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
198 {
199   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
200 
201   if (table && table->pools) {
202     int len = sizeof(table->pools[table->num].kafka_topic);
203 
204     memset(table->pools[table->num].kafka_topic, 0, len);
205     strlcpy(table->pools[table->num].kafka_topic, value, len);
206     table->pools[table->num].kafka_topic[len] = '\0';
207   }
208   else {
209     Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
210     return TRUE;
211   }
212 
213   return FALSE;
214 }
215 #endif
216 
217 #ifdef WITH_ZMQ
tee_recvs_map_zmq_address_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)218 int tee_recvs_map_zmq_address_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
219 {
220   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
221 
222   if (table && table->pools) {
223     int len = sizeof(table->pools[table->num].zmq_address);
224 
225     memset(table->pools[table->num].zmq_address, 0, len);
226     strlcpy(table->pools[table->num].zmq_address, value, len);
227     table->pools[table->num].zmq_address[len] = '\0';
228   }
229   else {
230     Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
231     return TRUE;
232   }
233 
234   return FALSE;
235 }
236 #endif
237 
tee_recvs_map_validate(char * filename,int lineno,struct plugin_requests * req)238 void tee_recvs_map_validate(char *filename, int lineno, struct plugin_requests *req)
239 {
240   struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
241   int valid = FALSE, emit_methods = 0;
242 
243   if (table && table->pools && table->pools[table->num].receivers) {
244     /* Check: emit to either IP address(es) or Kafka broker(s) or ZeroMQ queue */
245     if (table->pools[table->num].num > 0) emit_methods++;
246     if (strlen(table->pools[table->num].kafka_broker)) emit_methods++;
247     if (strlen(table->pools[table->num].zmq_address)) emit_methods++;
248 
249     if (emit_methods > 1) {
250       Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'ip', 'kafka_broker' and 'zmq_address' are mutual exclusive. Line ignored.\n",
251 	  config.name, config.type, filename, lineno);
252       valid = FALSE;
253       goto zero_entry;
254     }
255 
256     if (!emit_methods) {
257       Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'ip' or 'kafka_broker' or 'zmq_address' must be specified. Line ignored.\n",
258 	  config.name, config.type, filename, lineno);
259       valid = FALSE;
260       goto zero_entry;
261     }
262 
263     /* Check: valid pool ID */
264     if (table->pools[table->num].id > 0) valid = TRUE;
265     else {
266       Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] Invalid pool 'id' specified. Line ignored.\n",
267 	  config.name, config.type, filename, lineno);
268       valid = FALSE;
269       goto zero_entry;
270     }
271 
272     if (table->pools[table->num].num > 0) valid = TRUE;
273 
274     /*
275        Check: if emitting to Kafka:
276        a) make sure we have both broker string and topic,
277        b) balance-alg is not set, tee_transparent is set to true
278     */
279 #ifdef WITH_KAFKA
280     if (strlen(table->pools[table->num].kafka_broker)) {
281       if (!config.tee_transparent) {
282 	Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] tee_transparent must be set to 'true' when emitting to Kafka. Line ignored.\n",
283 	    config.name, config.type, filename, lineno);
284 	valid = FALSE;
285 	goto zero_entry;
286       }
287 
288       if (table->pools[table->num].balance.func) {
289 	Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'balance-alg' is not compatible with emitting to Kafka. Line ignored.\n",
290 	    config.name, config.type, filename, lineno);
291 	valid = FALSE;
292 	goto zero_entry;
293       }
294 
295       if (!strlen(table->pools[table->num].kafka_topic)) {
296 	Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'kafka_topic' missing. Line ignored.\n",
297 	    config.name, config.type, filename, lineno);
298 	valid = FALSE;
299 	goto zero_entry;
300       }
301 
302       valid = TRUE;
303     }
304 #endif
305 
306     /*
307        Check: if emitting via ZeroMQ:
308        a) make sure we have an address string,
309        b) balance-alg is not set, tee_transparent is set to true
310     */
311 #ifdef WITH_ZMQ
312     if (strlen(table->pools[table->num].zmq_address)) {
313       if (!config.tee_transparent) {
314 	Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] tee_transparent must be set to 'true' when emitting via ZeroMQ. Line ignored.\n",
315 	    config.name, config.type, filename, lineno);
316 	valid = FALSE;
317 	goto zero_entry;
318       }
319 
320       if (table->pools[table->num].balance.func) {
321 	Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'balance-alg' is not compatible with emitting via ZeroMQ. Line ignored.\n",
322 	    config.name, config.type, filename, lineno);
323 	valid = FALSE;
324 	goto zero_entry;
325       }
326 
327       valid = TRUE;
328     }
329 #endif
330 
331     if (valid) table->num++;
332     else {
333       zero_entry:
334 
335       table->pools[table->num].id = 0;
336       table->pools[table->num].num = 0;
337       table->pools[table->num].src_port = 0;
338       memset(table->pools[table->num].receivers, 0, config.tee_max_receivers*sizeof(struct tee_receiver));
339       memset(&table->pools[table->num].tag_filter, 0, sizeof(struct pretag_filter));
340       memset(&table->pools[table->num].balance, 0, sizeof(struct tee_balance));
341 
342 #ifdef WITH_KAFKA
343       memset(&table->pools[table->num].kafka_host, 0, sizeof(struct p_kafka_host));
344       memset(&table->pools[table->num].kafka_broker, 0, sizeof(table->pools[table->num].kafka_broker));
345       memset(&table->pools[table->num].kafka_topic, 0, sizeof(table->pools[table->num].kafka_topic));
346 #endif
347 
348 #ifdef WITH_ZMQ
349       memset(&table->pools[table->num].zmq_host, 0, sizeof(struct p_zmq_host));
350       memset(&table->pools[table->num].zmq_address, 0, sizeof(table->pools[table->num].zmq_address));
351 #endif
352     }
353   }
354 }
355