1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2019 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 #include "pmacct.h"
23 #ifdef WITH_KAFKA
24 #include "kafka_common.h"
25 #endif
26 #ifdef WITH_ZMQ
27 #include "zmq_common.h"
28 #endif
29 #include "tee_plugin.h"
30 #include "tee_recvs.h"
31
tee_recvs_map_id_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)32 int tee_recvs_map_id_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
33 {
34 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
35 int pool_idx;
36 u_int32_t pool_id;
37 char *endptr = NULL;
38
39 if (table && table->pools) {
40 if (table->num < config.tee_max_receiver_pools) {
41 pool_id = strtoull(value, &endptr, 10);
42
43 if (!pool_id || pool_id > UINT32_MAX) {
44 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Invalid Pool ID specified.\n", config.name, config.type, filename);
45 return TRUE;
46 }
47
48 /* Ensure no pool ID duplicates */
49 for (pool_idx = 0; pool_idx < table->num; pool_idx++) {
50 if (pool_id == table->pools[table->num].id) {
51 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Duplicate Pool ID specified: %u.\n", config.name, config.type, filename, pool_id);
52 return TRUE;
53 }
54 }
55
56 table->pools[table->num].id = pool_id;
57 }
58 else {
59 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Maximum amount of receivers pool reached: %u.\n", config.name, config.type, filename, config.tee_max_receiver_pools);
60 return TRUE;
61 }
62 }
63 else {
64 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
65 return TRUE;
66 }
67
68 return FALSE;
69 }
70
tee_recvs_map_ip_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)71 int tee_recvs_map_ip_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
72 {
73 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
74 struct tee_receiver *target = NULL;
75 int recv_idx;
76 char *str_ptr, *token;
77
78 if (table && table->pools && table->pools[table->num].receivers) {
79 str_ptr = value;
80 recv_idx = 0;
81
82 while ((token = extract_token(&str_ptr, ','))) {
83 if (recv_idx < config.tee_max_receivers) {
84 target = &table->pools[table->num].receivers[recv_idx];
85 target->dest_len = sizeof(target->dest);
86 if (!Tee_parse_hostport(token, (struct sockaddr *)&target->dest, &target->dest_len, FALSE)) recv_idx++;
87 else Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Invalid receiver %s.\n",
88 config.name, config.type, filename, token);
89 }
90 else {
91 Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Maximum amount of receivers pool reached %u.\n",
92 config.name, config.type, filename, config.tee_max_receiver_pools);
93 break;
94 }
95 }
96
97 if (!recv_idx) {
98 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] No valid receivers.\n", config.name, config.type, filename);
99 return TRUE;
100 }
101 else table->pools[table->num].num = recv_idx;
102 }
103 else {
104 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
105 return TRUE;
106 }
107
108 return FALSE;
109 }
110
tee_recvs_map_tag_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)111 int tee_recvs_map_tag_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
112 {
113 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
114 int ret;
115
116 if (table && table->pools) ret = load_tags(filename, &table->pools[table->num].tag_filter, value);
117 else {
118 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
119 return TRUE;
120 }
121
122 if (!ret) return TRUE;
123 else return FALSE;
124 }
125
tee_recvs_map_balance_alg_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)126 int tee_recvs_map_balance_alg_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
127 {
128 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
129
130 if (table && table->pools) {
131 if (!strncmp(value, "rr", 2)) {
132 table->pools[table->num].balance.type = TEE_BALANCE_RR;
133 table->pools[table->num].balance.func = Tee_rr_balance;
134 }
135 else if (!strncmp(value, "hash-agent", 10)) {
136 table->pools[table->num].balance.type = TEE_BALANCE_HASH_AGENT;
137 table->pools[table->num].balance.func = Tee_hash_agent_balance;
138 }
139 else if (!strncmp(value, "hash-tag", 8)) {
140 table->pools[table->num].balance.type = TEE_BALANCE_HASH_TAG;
141 table->pools[table->num].balance.func = Tee_hash_tag_balance;
142 }
143 else {
144 table->pools[table->num].balance.func = NULL;
145 Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Unknown balance algorithm '%s'. Ignoring.\n", config.name, config.type, filename, value);
146 }
147 }
148 else {
149 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
150 return TRUE;
151 }
152
153 return FALSE;
154 }
155
tee_recvs_map_src_port_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)156 int tee_recvs_map_src_port_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
157 {
158 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
159 int port;
160
161 if (table && table->pools) {
162 port = atoi(value);
163
164 if (port <= UINT16_MAX) table->pools[table->num].src_port = port;
165 else {
166 Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Invalid source port specified '%s'. Ignoring.\n", config.name, config.type, filename, value);
167 }
168 }
169 else {
170 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
171 return TRUE;
172 }
173
174 return FALSE;
175 }
176
177 #ifdef WITH_KAFKA
tee_recvs_map_kafka_broker_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)178 int tee_recvs_map_kafka_broker_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
179 {
180 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
181
182 if (table && table->pools) {
183 int len = sizeof(table->pools[table->num].kafka_broker);
184
185 memset(table->pools[table->num].kafka_broker, 0, len);
186 strlcpy(table->pools[table->num].kafka_broker, value, len);
187 table->pools[table->num].kafka_broker[len] = '\0';
188 }
189 else {
190 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
191 return TRUE;
192 }
193
194 return FALSE;
195 }
196
tee_recvs_map_kafka_topic_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)197 int tee_recvs_map_kafka_topic_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
198 {
199 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
200
201 if (table && table->pools) {
202 int len = sizeof(table->pools[table->num].kafka_topic);
203
204 memset(table->pools[table->num].kafka_topic, 0, len);
205 strlcpy(table->pools[table->num].kafka_topic, value, len);
206 table->pools[table->num].kafka_topic[len] = '\0';
207 }
208 else {
209 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
210 return TRUE;
211 }
212
213 return FALSE;
214 }
215 #endif
216
217 #ifdef WITH_ZMQ
tee_recvs_map_zmq_address_handler(char * filename,struct id_entry * e,char * value,struct plugin_requests * req,int acct_type)218 int tee_recvs_map_zmq_address_handler(char *filename, struct id_entry *e, char *value, struct plugin_requests *req, int acct_type)
219 {
220 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
221
222 if (table && table->pools) {
223 int len = sizeof(table->pools[table->num].zmq_address);
224
225 memset(table->pools[table->num].zmq_address, 0, len);
226 strlcpy(table->pools[table->num].zmq_address, value, len);
227 table->pools[table->num].zmq_address[len] = '\0';
228 }
229 else {
230 Log(LOG_ERR, "ERROR ( %s/%s ): [%s] Receivers table not allocated.\n", config.name, config.type, filename);
231 return TRUE;
232 }
233
234 return FALSE;
235 }
236 #endif
237
tee_recvs_map_validate(char * filename,int lineno,struct plugin_requests * req)238 void tee_recvs_map_validate(char *filename, int lineno, struct plugin_requests *req)
239 {
240 struct tee_receivers *table = (struct tee_receivers *) req->key_value_table;
241 int valid = FALSE, emit_methods = 0;
242
243 if (table && table->pools && table->pools[table->num].receivers) {
244 /* Check: emit to either IP address(es) or Kafka broker(s) or ZeroMQ queue */
245 if (table->pools[table->num].num > 0) emit_methods++;
246 if (strlen(table->pools[table->num].kafka_broker)) emit_methods++;
247 if (strlen(table->pools[table->num].zmq_address)) emit_methods++;
248
249 if (emit_methods > 1) {
250 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'ip', 'kafka_broker' and 'zmq_address' are mutual exclusive. Line ignored.\n",
251 config.name, config.type, filename, lineno);
252 valid = FALSE;
253 goto zero_entry;
254 }
255
256 if (!emit_methods) {
257 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'ip' or 'kafka_broker' or 'zmq_address' must be specified. Line ignored.\n",
258 config.name, config.type, filename, lineno);
259 valid = FALSE;
260 goto zero_entry;
261 }
262
263 /* Check: valid pool ID */
264 if (table->pools[table->num].id > 0) valid = TRUE;
265 else {
266 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] Invalid pool 'id' specified. Line ignored.\n",
267 config.name, config.type, filename, lineno);
268 valid = FALSE;
269 goto zero_entry;
270 }
271
272 if (table->pools[table->num].num > 0) valid = TRUE;
273
274 /*
275 Check: if emitting to Kafka:
276 a) make sure we have both broker string and topic,
277 b) balance-alg is not set, tee_transparent is set to true
278 */
279 #ifdef WITH_KAFKA
280 if (strlen(table->pools[table->num].kafka_broker)) {
281 if (!config.tee_transparent) {
282 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] tee_transparent must be set to 'true' when emitting to Kafka. Line ignored.\n",
283 config.name, config.type, filename, lineno);
284 valid = FALSE;
285 goto zero_entry;
286 }
287
288 if (table->pools[table->num].balance.func) {
289 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'balance-alg' is not compatible with emitting to Kafka. Line ignored.\n",
290 config.name, config.type, filename, lineno);
291 valid = FALSE;
292 goto zero_entry;
293 }
294
295 if (!strlen(table->pools[table->num].kafka_topic)) {
296 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'kafka_topic' missing. Line ignored.\n",
297 config.name, config.type, filename, lineno);
298 valid = FALSE;
299 goto zero_entry;
300 }
301
302 valid = TRUE;
303 }
304 #endif
305
306 /*
307 Check: if emitting via ZeroMQ:
308 a) make sure we have an address string,
309 b) balance-alg is not set, tee_transparent is set to true
310 */
311 #ifdef WITH_ZMQ
312 if (strlen(table->pools[table->num].zmq_address)) {
313 if (!config.tee_transparent) {
314 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] tee_transparent must be set to 'true' when emitting via ZeroMQ. Line ignored.\n",
315 config.name, config.type, filename, lineno);
316 valid = FALSE;
317 goto zero_entry;
318 }
319
320 if (table->pools[table->num].balance.func) {
321 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] 'balance-alg' is not compatible with emitting via ZeroMQ. Line ignored.\n",
322 config.name, config.type, filename, lineno);
323 valid = FALSE;
324 goto zero_entry;
325 }
326
327 valid = TRUE;
328 }
329 #endif
330
331 if (valid) table->num++;
332 else {
333 zero_entry:
334
335 table->pools[table->num].id = 0;
336 table->pools[table->num].num = 0;
337 table->pools[table->num].src_port = 0;
338 memset(table->pools[table->num].receivers, 0, config.tee_max_receivers*sizeof(struct tee_receiver));
339 memset(&table->pools[table->num].tag_filter, 0, sizeof(struct pretag_filter));
340 memset(&table->pools[table->num].balance, 0, sizeof(struct tee_balance));
341
342 #ifdef WITH_KAFKA
343 memset(&table->pools[table->num].kafka_host, 0, sizeof(struct p_kafka_host));
344 memset(&table->pools[table->num].kafka_broker, 0, sizeof(table->pools[table->num].kafka_broker));
345 memset(&table->pools[table->num].kafka_topic, 0, sizeof(table->pools[table->num].kafka_topic));
346 #endif
347
348 #ifdef WITH_ZMQ
349 memset(&table->pools[table->num].zmq_host, 0, sizeof(struct p_zmq_host));
350 memset(&table->pools[table->num].zmq_address, 0, sizeof(table->pools[table->num].zmq_address));
351 #endif
352 }
353 }
354 }
355