1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 #include "pmacct.h"
23 #include "addr.h"
24 #ifdef WITH_KAFKA
25 #include "kafka_common.h"
26 #endif
27 #include "pmacct-data.h"
28 #include "plugin_hooks.h"
29 #include "plugin_common.h"
30 #include "tee_plugin.h"
31 #include "nfacctd.h"
32
33 /* Global variables */
34 char tee_send_buf[65535];
35 struct tee_receivers receivers;
36 int err_cant_bridge_af;
37
tee_plugin(int pipe_fd,struct configuration * cfgptr,void * ptr)38 void tee_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
39 {
40 struct pkt_msg *msg;
41 unsigned char *pipebuf;
42 struct pollfd pfd;
43 int refresh_timeout, ret, pool_idx, recv_idx, recv_budget, poll_bypass;
44 struct ring *rg = &((struct channels_list_entry *)ptr)->rg;
45 struct ch_status *status = ((struct channels_list_entry *)ptr)->status;
46 u_int32_t bufsz = ((struct channels_list_entry *)ptr)->bufsize;
47 unsigned char *dataptr;
48 struct tee_receiver *target = NULL;
49 struct plugin_requests req;
50
51 unsigned char *rgptr;
52 int pollagain = TRUE;
53 u_int32_t seq = 1, rg_err_count = 0;
54
55 #ifdef WITH_ZMQ
56 struct p_zmq_host *zmq_host = &((struct channels_list_entry *)ptr)->zmq_host;
57 #else
58 void *zmq_host = NULL;
59 #endif
60
61 #ifdef WITH_REDIS
62 struct p_redis_host redis_host;
63 #endif
64
65 memcpy(&config, cfgptr, sizeof(struct configuration));
66 recollect_pipe_memory(ptr);
67 pm_setproctitle("%s [%s]", "Tee Plugin", config.name);
68 if (config.pidfile) write_pid_file_plugin(config.pidfile, config.type, config.name);
69 if (config.logfile) {
70 fclose(config.logfile_fd);
71 config.logfile_fd = open_output_file(config.logfile, "a", FALSE);
72 }
73
74 if (config.proc_priority) {
75 int ret;
76
77 ret = setpriority(PRIO_PROCESS, 0, config.proc_priority);
78 if (ret) Log(LOG_WARNING, "WARN ( %s/%s ): proc_priority failed (errno: %d)\n", config.name, config.type, errno);
79 else Log(LOG_INFO, "INFO ( %s/%s ): proc_priority set to %d\n", config.name, config.type, getpriority(PRIO_PROCESS, 0));
80 }
81
82 /* signal handling */
83 signal(SIGINT, Tee_exit_now);
84 signal(SIGUSR1, SIG_IGN);
85 signal(SIGUSR2, reload_maps); /* sets to true the reload_maps flag */
86 signal(SIGPIPE, SIG_IGN);
87 signal(SIGCHLD, SIG_IGN);
88
89 if (config.tee_transparent && getuid() != 0) {
90 Log(LOG_ERR, "ERROR ( %s/%s ): Transparent mode requires super-user permissions. Exiting ...\n", config.name, config.type);
91 exit_gracefully(1);
92 }
93
94 if (!config.tee_receivers) {
95 Log(LOG_ERR, "ERROR ( %s/%s ): No receivers specified: tee_receivers is required. Exiting ...\n", config.name, config.type);
96 exit_gracefully(1);
97 }
98
99 memset(empty_mem_area_256b, 0, sizeof(empty_mem_area_256b));
100 memset(&receivers, 0, sizeof(receivers));
101 memset(&req, 0, sizeof(req));
102 reload_map = FALSE;
103
104 /* Setting up pools */
105 if (!config.tee_max_receiver_pools) config.tee_max_receiver_pools = MAX_TEE_POOLS;
106
107 receivers.pools = malloc((config.tee_max_receiver_pools+1)*sizeof(struct tee_receivers_pool));
108 if (!receivers.pools) {
109 Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate receiver pools. Exiting ...\n", config.name, config.type);
110 exit_gracefully(1);
111 }
112 else memset(receivers.pools, 0, (config.tee_max_receiver_pools+1)*sizeof(struct tee_receivers_pool));
113
114 /* Setting up receivers per pool */
115 if (!config.tee_max_receivers) config.tee_max_receivers = MAX_TEE_RECEIVERS;
116
117 for (pool_idx = 0; pool_idx < config.tee_max_receiver_pools; pool_idx++) {
118 receivers.pools[pool_idx].receivers = malloc(config.tee_max_receivers*sizeof(struct tee_receiver));
119 if (!receivers.pools[pool_idx].receivers) {
120 Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate receivers for pool #%u. Exiting ...\n", config.name, config.type, pool_idx);
121 exit_gracefully(1);
122 }
123 else memset(receivers.pools[pool_idx].receivers, 0, config.tee_max_receivers*sizeof(struct tee_receiver));
124 }
125
126 if (config.tee_receivers) {
127 int recvs_allocated = FALSE;
128
129 req.key_value_table = (void *) &receivers;
130 load_id_file(MAP_TEE_RECVS, config.tee_receivers, NULL, &req, &recvs_allocated);
131 }
132
133 config.sql_refresh_time = DEFAULT_TEE_REFRESH_TIME;
134 refresh_timeout = config.sql_refresh_time*1000;
135
136 pipebuf = (unsigned char *) pm_malloc(config.buffer_size);
137
138 if (config.pipe_zmq) P_zmq_pipe_init(zmq_host, &pipe_fd, &seq);
139 else setnonblocking(pipe_fd);
140
141 memset(pipebuf, 0, config.buffer_size);
142 err_cant_bridge_af = 0;
143
144 /* Arrange send socket */
145 Tee_init_socks();
146
147 #ifdef WITH_REDIS
148 if (config.redis_host) {
149 char log_id[SHORTBUFLEN];
150
151 snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
152 p_redis_init(&redis_host, log_id, p_redis_thread_produce_common_plugin_handler);
153 }
154 #endif
155
156 /* plugin main loop */
157 for (;;) {
158 poll_again:
159 status->wakeup = TRUE;
160 poll_bypass = FALSE;
161
162 pfd.fd = pipe_fd;
163 pfd.events = POLLIN;
164
165 ret = poll(&pfd, (pfd.fd == ERR ? 0 : 1), refresh_timeout);
166
167 if (ret < 0) goto poll_again;
168
169 poll_ops:
170 if (reload_map) {
171 if (config.tee_receivers) {
172 int recvs_allocated = FALSE;
173
174 Tee_destroy_recvs();
175 load_id_file(MAP_TEE_RECVS, config.tee_receivers, NULL, &req, &recvs_allocated);
176
177 Tee_init_socks();
178 }
179
180 reload_map = FALSE;
181 }
182
183 if (reload_log) {
184 reload_logs();
185 reload_log = FALSE;
186 }
187
188 recv_budget = 0;
189 if (poll_bypass) {
190 poll_bypass = FALSE;
191 goto read_data;
192 }
193
194 switch (ret) {
195 case 0: /* timeout */
196 /* reserved for future since we don't currently cache/batch/etc */
197 break;
198 default: /* we received data */
199 read_data:
200 if (recv_budget == DEFAULT_PLUGIN_COMMON_RECV_BUDGET) {
201 poll_bypass = TRUE;
202 goto poll_ops;
203 }
204
205 if (config.pipe_homegrown) {
206 if (!pollagain) {
207 seq++;
208 seq %= MAX_SEQNUM;
209 if (seq == 0) rg_err_count = FALSE;
210 }
211 else {
212 if ((ret = read(pipe_fd, &rgptr, sizeof(rgptr))) == 0)
213 exit_gracefully(1); /* we exit silently; something happened at the write end */
214 }
215
216 if ((rg->ptr + bufsz) > rg->end) rg->ptr = rg->base;
217
218 if (((struct ch_buf_hdr *)rg->ptr)->seq != seq) {
219 if (!pollagain) {
220 pollagain = TRUE;
221 goto poll_again;
222 }
223 else {
224 rg_err_count++;
225 if (config.debug || (rg_err_count > MAX_RG_COUNT_ERR)) {
226 Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected (plugin_buffer_size=%" PRIu64 " plugin_pipe_size=%" PRIu64 ").\n",
227 config.name, config.type, config.buffer_size, config.pipe_size);
228 Log(LOG_WARNING, "WARN ( %s/%s ): Increase values or look for plugin_buffer_size, plugin_pipe_size in CONFIG-KEYS document.\n\n",
229 config.name, config.type);
230 }
231
232 rg->ptr = (rg->base + status->last_buf_off);
233 seq = ((struct ch_buf_hdr *)rg->ptr)->seq;
234 }
235 }
236
237 pollagain = FALSE;
238 memcpy(pipebuf, rg->ptr, bufsz);
239 rg->ptr += bufsz;
240 }
241 #ifdef WITH_ZMQ
242 else if (config.pipe_zmq) {
243 ret = p_zmq_topic_recv(zmq_host, pipebuf, config.buffer_size);
244 if (ret > 0) {
245 if (seq && (((struct ch_buf_hdr *)pipebuf)->seq != ((seq + 1) % MAX_SEQNUM))) {
246 Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected. Sequence received=%u expected=%u\n",
247 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->seq, ((seq + 1) % MAX_SEQNUM));
248 }
249
250 seq = ((struct ch_buf_hdr *)pipebuf)->seq;
251 }
252 else goto poll_again;
253 }
254 #endif
255
256 msg = (struct pkt_msg *) (pipebuf+sizeof(struct ch_buf_hdr));
257 msg->payload = (pipebuf+sizeof(struct ch_buf_hdr)+PmsgSz);
258
259 if (config.debug_internal_msg)
260 Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer received len=%" PRIu64 " seq=%u num_entries=%u\n",
261 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->len, seq,
262 ((struct ch_buf_hdr *)pipebuf)->num);
263
264 while (((struct ch_buf_hdr *)pipebuf)->num > 0) {
265 for (pool_idx = 0; pool_idx < receivers.num; pool_idx++) {
266 if (msg->bcast || !evaluate_tags(&receivers.pools[pool_idx].tag_filter, msg->tag)) {
267 if (!receivers.pools[pool_idx].balance.func) {
268 for (recv_idx = 0; recv_idx < receivers.pools[pool_idx].num; recv_idx++) {
269 target = &receivers.pools[pool_idx].receivers[recv_idx];
270 Tee_send(msg, (struct sockaddr *) &target->dest, target->fd, config.tee_transparent);
271 }
272
273 #ifdef WITH_KAFKA
274 /* Checking the handler is the most light weight op we can perform
275 in order to ensure we are in business with the Kafka broker */
276 if (p_kafka_get_handler(&receivers.pools[pool_idx].kafka_host)) {
277 Tee_kafka_send(msg, &receivers.pools[pool_idx]);
278 }
279 #endif
280
281 #ifdef WITH_ZMQ
282 if (p_zmq_get_sock(&receivers.pools[pool_idx].zmq_host)) {
283 Tee_zmq_send(msg, &receivers.pools[pool_idx]);
284 }
285 #endif
286 }
287 else {
288 target = receivers.pools[pool_idx].balance.func(&receivers.pools[pool_idx], msg);
289 Tee_send(msg, (struct sockaddr *) &target->dest, target->fd, config.tee_transparent);
290 }
291 }
292 }
293
294 ((struct ch_buf_hdr *)pipebuf)->num--;
295 if (((struct ch_buf_hdr *)pipebuf)->num) {
296 dataptr = (unsigned char *) msg;
297 dataptr += (PmsgSz + msg->len);
298 msg = (struct pkt_msg *) dataptr;
299 msg->payload = (dataptr + PmsgSz);
300 }
301 }
302
303 recv_budget++;
304 goto read_data;
305 }
306 }
307 }
308
Tee_exit_now(int signum)309 void Tee_exit_now(int signum)
310 {
311 wait(NULL);
312 exit_gracefully(0);
313 }
314
Tee_craft_transparent_msg(struct pkt_msg * msg,struct sockaddr * target)315 size_t Tee_craft_transparent_msg(struct pkt_msg *msg, struct sockaddr *target)
316 {
317 char *buf_ptr = tee_send_buf;
318 struct sockaddr *sa = (struct sockaddr *) &msg->agent;
319 struct sockaddr_in *sa4 = (struct sockaddr_in *) &msg->agent;
320 struct pm_iphdr *i4h = (struct pm_iphdr *) buf_ptr;
321 struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) &msg->agent;
322 struct ip6_hdr *i6h = (struct ip6_hdr *) buf_ptr;
323 struct pm_udphdr *uh;
324 size_t msglen = 0;
325
326 if (sa->sa_family == target->sa_family) {
327 /* UDP header first */
328 if (target->sa_family == AF_INET) {
329 buf_ptr += IP4HdrSz;
330 uh = (struct pm_udphdr *) buf_ptr;
331 uh->uh_sport = sa4->sin_port;
332 uh->uh_dport = ((struct sockaddr_in *)target)->sin_port;
333 }
334 else if (target->sa_family == AF_INET6) {
335 buf_ptr += IP6HdrSz;
336 uh = (struct pm_udphdr *) buf_ptr;
337 uh->uh_sport = sa6->sin6_port;
338 uh->uh_dport = ((struct sockaddr_in6 *)target)->sin6_port;
339 }
340 else {
341 assert(0);
342 return msglen;
343 }
344
345 uh->uh_ulen = htons(msg->len+UDPHdrSz);
346 uh->uh_sum = 0;
347
348 /* IP header then */
349 if (target->sa_family == AF_INET) {
350 i4h->ip_vhl = 4;
351 i4h->ip_vhl <<= 4;
352 i4h->ip_vhl |= (IP4HdrSz/4);
353
354 if (config.nfprobe_ipprec) {
355 int opt = config.nfprobe_ipprec << 5;
356 i4h->ip_tos = opt;
357 }
358 else i4h->ip_tos = 0;
359
360 #if !defined BSD
361 i4h->ip_len = htons(IP4HdrSz+UDPHdrSz+msg->len);
362 #else
363 i4h->ip_len = IP4HdrSz+UDPHdrSz+msg->len;
364 #endif
365 i4h->ip_id = 0;
366 i4h->ip_off = 0;
367 i4h->ip_ttl = 255;
368 i4h->ip_p = IPPROTO_UDP;
369 i4h->ip_sum = 0;
370 i4h->ip_src.s_addr = sa4->sin_addr.s_addr;
371 i4h->ip_dst.s_addr = ((struct sockaddr_in *)target)->sin_addr.s_addr;
372 }
373 else if (target->sa_family == AF_INET6) {
374 i6h->ip6_vfc = 6;
375 i6h->ip6_vfc <<= 4;
376 i6h->ip6_plen = htons(UDPHdrSz+msg->len);
377 i6h->ip6_nxt = IPPROTO_UDP;
378 i6h->ip6_hlim = 255;
379 memcpy(&i6h->ip6_src, &sa6->sin6_addr, IP6AddrSz);
380 memcpy(&i6h->ip6_dst, &((struct sockaddr_in6 *)target)->sin6_addr, IP6AddrSz);
381 }
382
383 /* Put everything together and send */
384 buf_ptr += UDPHdrSz;
385 memcpy(buf_ptr, msg->payload, msg->len);
386
387 msglen = (IP4HdrSz + UDPHdrSz + msg->len);
388 }
389 else {
390 time_t now = time(NULL);
391
392 if (now > err_cant_bridge_af + 60) {
393 Log(LOG_ERR, "ERROR ( %s/%s ): Can't bridge Address Families when in transparent mode\n", config.name, config.type);
394 err_cant_bridge_af = now;
395 }
396 }
397
398 return msglen;
399 }
400
Tee_send(struct pkt_msg * msg,struct sockaddr * target,int fd,int transparent)401 void Tee_send(struct pkt_msg *msg, struct sockaddr *target, int fd, int transparent)
402 {
403 struct host_addr r;
404 char recv_addr[50];
405 u_int16_t recv_port;
406
407 if (config.debug) {
408 char *flow = NULL, netflow[] = "NetFlow/IPFIX", sflow[] = "sFlow";
409 struct host_addr a;
410 char agent_addr[50];
411 u_int16_t agent_port;
412
413 sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
414 addr_to_str(agent_addr, &a);
415
416 sa_to_addr((struct sockaddr *)target, &r, &recv_port);
417 addr_to_str(recv_addr, &r);
418
419 if (config.acct_type == ACCT_NF) flow = netflow;
420 else if (config.acct_type == ACCT_SF) flow = sflow;
421
422 Log(LOG_DEBUG, "DEBUG ( %s/%s ): Sending %s packet from [%s:%u] seqno [%u] to [%s:%u]\n",
423 config.name, config.type, flow, agent_addr, agent_port, msg->seqno,
424 recv_addr, recv_port);
425 }
426
427 if (!transparent) {
428 if (send(fd, msg->payload, msg->len, 0) == -1) {
429 struct host_addr a;
430 char agent_addr[50];
431 u_int16_t agent_port;
432
433 sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
434 addr_to_str(agent_addr, &a);
435
436 sa_to_addr((struct sockaddr *)target, &r, &recv_port);
437 addr_to_str(recv_addr, &r);
438
439 Log(LOG_ERR, "ERROR ( %s/%s ): send() from [%s:%u] seqno [%u] to [%s:%u] failed (%s)\n",
440 config.name, config.type, agent_addr, agent_port, msg->seqno, recv_addr,
441 recv_port, strerror(errno));
442 }
443 }
444 else {
445 size_t msglen;
446
447 msglen = Tee_craft_transparent_msg(msg, target);
448
449 if (msglen && send(fd, tee_send_buf, msglen, 0) == -1) {
450 struct host_addr a;
451 char agent_addr[50];
452 u_int16_t agent_port;
453
454 sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
455 addr_to_str(agent_addr, &a);
456
457 sa_to_addr((struct sockaddr *)target, &r, &recv_port);
458 addr_to_str(recv_addr, &r);
459
460 Log(LOG_ERR, "ERROR ( %s/%s ): raw send() from [%s:%u] seqno [%u] to [%s:%u] failed (%s)\n",
461 config.name, config.type, agent_addr, agent_port, msg->seqno, recv_addr,
462 recv_port, strerror(errno));
463 }
464 }
465 }
466
467 #ifdef WITH_KAFKA
Tee_kafka_send(struct pkt_msg * msg,struct tee_receivers_pool * pool)468 void Tee_kafka_send(struct pkt_msg *msg, struct tee_receivers_pool *pool)
469 {
470 struct p_kafka_host *kafka_host = &pool->kafka_host;
471 struct sockaddr *sa, target;
472 time_t last_fail, now;
473 size_t msglen = 0;
474
475 memset(&target, 0, sizeof(target));
476 sa = (struct sockaddr *) &msg->agent;
477
478 target.sa_family = sa->sa_family;
479
480 if (config.debug) {
481 char *flow = NULL, netflow[] = "NetFlow/IPFIX", sflow[] = "sFlow";
482 char *broker, *topic;
483 struct host_addr a;
484 char agent_addr[50];
485 u_int16_t agent_port;
486
487 sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
488 addr_to_str(agent_addr, &a);
489
490 broker = p_kafka_get_broker(kafka_host);
491 topic = p_kafka_get_topic(kafka_host);
492
493 if (config.acct_type == ACCT_NF) flow = netflow;
494 else if (config.acct_type == ACCT_SF) flow = sflow;
495
496 Log(LOG_DEBUG, "DEBUG ( %s/%s ): Sending %s packet from [%s:%u] seqno [%u] to Kafka [%s-%s]\n",
497 config.name, config.type, flow, agent_addr, agent_port, msg->seqno,
498 broker, topic);
499 }
500
501 last_fail = P_broker_timers_get_last_fail(&kafka_host->btimers);
502 if (last_fail) {
503 now = time(NULL);
504
505 if ((last_fail + P_broker_timers_get_retry_interval(&kafka_host->btimers)) <= now) {
506 Tee_init_kafka_host(kafka_host, pool->kafka_broker, pool->kafka_topic, pool->id);
507 }
508 }
509
510 if (config.tee_transparent) {
511 msglen = Tee_craft_transparent_msg(msg, &target);
512
513 if (msglen) p_kafka_produce_data(kafka_host, tee_send_buf, msglen);
514 }
515 }
516 #endif
517
518 #ifdef WITH_ZMQ
Tee_zmq_send(struct pkt_msg * msg,struct tee_receivers_pool * pool)519 void Tee_zmq_send(struct pkt_msg *msg, struct tee_receivers_pool *pool)
520 {
521 struct p_zmq_host *zmq_host = &pool->zmq_host;
522 struct sockaddr *sa, target;
523 size_t msglen = 0;
524 int ret;
525
526 memset(&target, 0, sizeof(target));
527 sa = (struct sockaddr *) &msg->agent;
528
529 target.sa_family = sa->sa_family;
530
531 if (config.debug) {
532 char *flow = NULL, netflow[] = "NetFlow/IPFIX", sflow[] = "sFlow";
533 char *address;
534 struct host_addr a;
535 char agent_addr[50];
536 u_int16_t agent_port;
537
538 sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
539 addr_to_str(agent_addr, &a);
540
541 address = p_zmq_get_address(zmq_host);
542
543 if (config.acct_type == ACCT_NF) flow = netflow;
544 else if (config.acct_type == ACCT_SF) flow = sflow;
545
546 Log(LOG_DEBUG, "DEBUG ( %s/%s ): Sending %s packet from [%s:%u] seqno [%u] via ZeroMQ [%s]\n",
547 config.name, config.type, flow, agent_addr, agent_port, msg->seqno, address);
548 }
549
550 if (config.tee_transparent) {
551 msglen = Tee_craft_transparent_msg(msg, &target);
552
553 if (msglen) {
554 ret = p_zmq_send_bin(&zmq_host->sock, tee_send_buf, msglen, TRUE);
555 if (ret == ERR && errno == EAGAIN) {
556 char *address;
557
558 address = p_zmq_get_address(zmq_host);
559 Log(LOG_WARNING, "WARN ( %s/%s ): Queue full: ZeroMQ [%s]\n", config.name, config.type, address);
560 }
561 }
562 }
563 }
564 #endif
565
Tee_destroy_recvs()566 void Tee_destroy_recvs()
567 {
568 struct tee_receiver *target = NULL;
569 int pool_idx, recv_idx;
570
571 for (pool_idx = 0; pool_idx < receivers.num; pool_idx++) {
572 for (recv_idx = 0; recv_idx < receivers.pools[pool_idx].num; recv_idx++) {
573 target = &receivers.pools[pool_idx].receivers[recv_idx];
574 if (target->fd) close(target->fd);
575 }
576
577 memset(receivers.pools[pool_idx].receivers, 0, config.tee_max_receivers*sizeof(struct tee_receiver));
578 memset(&receivers.pools[pool_idx].tag_filter, 0, sizeof(struct pretag_filter));
579 memset(&receivers.pools[pool_idx].balance, 0, sizeof(struct tee_balance));
580 receivers.pools[pool_idx].id = 0;
581 receivers.pools[pool_idx].num = 0;
582
583 #ifdef WITH_KAFKA
584 if (strlen(receivers.pools[pool_idx].kafka_broker)) {
585 p_kafka_close(&receivers.pools[pool_idx].kafka_host, FALSE);
586 memset(receivers.pools[pool_idx].kafka_broker, 0, sizeof(receivers.pools[pool_idx].kafka_broker));
587 memset(receivers.pools[pool_idx].kafka_topic, 0, sizeof(receivers.pools[pool_idx].kafka_topic));
588 }
589 #endif
590
591 #ifdef WITH_ZMQ
592 if (strlen(receivers.pools[pool_idx].zmq_address)) {
593 p_zmq_close(&receivers.pools[pool_idx].zmq_host);
594 memset(receivers.pools[pool_idx].zmq_address, 0, sizeof(receivers.pools[pool_idx].zmq_address));
595 }
596 #endif
597 }
598
599 receivers.num = 0;
600 }
601
Tee_init_socks()602 void Tee_init_socks()
603 {
604 struct tee_receiver *target = NULL;
605 struct sockaddr *sa;
606 int pool_idx, recv_idx, err;
607 char dest_addr[256], dest_serv[256];
608
609 for (pool_idx = 0; pool_idx < receivers.num; pool_idx++) {
610 for (recv_idx = 0; recv_idx < receivers.pools[pool_idx].num; recv_idx++) {
611 target = &receivers.pools[pool_idx].receivers[recv_idx];
612 sa = (struct sockaddr *) &target->dest;
613
614 if (sa->sa_family != 0) {
615 if ((err = getnameinfo(sa, target->dest_len, dest_addr, sizeof(dest_addr),
616 dest_serv, sizeof(dest_serv), NI_NUMERICHOST)) == -1) {
617 Log(LOG_ERR, "ERROR ( %s/%s ): getnameinfo: %d\n", config.name, config.type, err);
618 exit_gracefully(1);
619 }
620 }
621
622 target->fd = Tee_prepare_sock((struct sockaddr *) &target->dest, target->dest_len, receivers.pools[pool_idx].src_port,
623 config.tee_transparent, config.tee_pipe_size);
624
625 if (config.debug) {
626 struct host_addr recv_addr;
627 char recv_addr_str[INET6_ADDRSTRLEN];
628 u_int16_t recv_port;
629
630 sa_to_addr((struct sockaddr *)&target->dest, &recv_addr, &recv_port);
631 addr_to_str(recv_addr_str, &recv_addr);
632 Log(LOG_DEBUG, "DEBUG ( %s/%s ): PoolID=%u receiver=%s fd=%d\n",
633 config.name, config.type, receivers.pools[pool_idx].id, recv_addr_str, target->fd);
634 }
635 }
636
637 #ifdef WITH_KAFKA
638 if (strlen(receivers.pools[pool_idx].kafka_broker)) {
639 Tee_init_kafka_host(&receivers.pools[pool_idx].kafka_host, receivers.pools[pool_idx].kafka_broker,
640 receivers.pools[pool_idx].kafka_topic, receivers.pools[pool_idx].id);
641 }
642 #endif
643
644 #ifdef WITH_ZMQ
645 if (strlen(receivers.pools[pool_idx].zmq_address)) {
646 Tee_init_zmq_host(&receivers.pools[pool_idx].zmq_host, receivers.pools[pool_idx].zmq_address,
647 receivers.pools[pool_idx].id);
648 }
649 #endif
650 }
651 }
652
653 #ifdef WITH_KAFKA
Tee_init_kafka_host(struct p_kafka_host * kafka_host,char * kafka_broker,char * kafka_topic,u_int32_t pool_id)654 void Tee_init_kafka_host(struct p_kafka_host *kafka_host, char *kafka_broker, char *kafka_topic, u_int32_t pool_id)
655 {
656 p_kafka_init_host(kafka_host, config.tee_kafka_config_file);
657 p_kafka_connect_to_produce(kafka_host);
658 p_kafka_set_broker(kafka_host, kafka_broker, FALSE);
659 p_kafka_set_topic(kafka_host, kafka_topic);
660 p_kafka_set_content_type(kafka_host, PM_KAFKA_CNT_TYPE_BIN);
661 P_broker_timers_set_retry_interval(&kafka_host->btimers, PM_KAFKA_DEFAULT_RETRY);
662
663 if (config.debug) {
664 char *broker, *topic;
665
666 broker = p_kafka_get_broker(kafka_host);
667 topic = p_kafka_get_topic(kafka_host);
668 Log(LOG_DEBUG, "DEBUG ( %s/%s ): PoolID=%u KafkaBroker=%s KafkaTopic=%s\n",
669 config.name, config.type, pool_id, broker, topic);
670 }
671 }
672 #endif
673
674 #ifdef WITH_ZMQ
Tee_init_zmq_host(struct p_zmq_host * zmq_host,char * zmq_address,u_int32_t pool_id)675 void Tee_init_zmq_host(struct p_zmq_host *zmq_host, char *zmq_address, u_int32_t pool_id)
676 {
677 char log_id[SHORTBUFLEN];
678
679 p_zmq_init_push(zmq_host, zmq_address);
680 snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
681 p_zmq_set_log_id(zmq_host, log_id);
682 p_zmq_set_hwm(zmq_host, PM_ZMQ_DEFAULT_FLOW_HWM);
683 p_zmq_push_setup(zmq_host);
684
685 if (config.debug) {
686 char *broker;
687
688 broker = p_zmq_get_address(zmq_host);
689 Log(LOG_DEBUG, "DEBUG ( %s/%s ): PoolID=%u ZmqAddress=%s\n", config.name, config.type, pool_id, broker);
690 }
691 }
692 #endif
693
Tee_prepare_sock(struct sockaddr * addr,socklen_t len,u_int16_t src_port,int transparent,int pipe_size)694 int Tee_prepare_sock(struct sockaddr *addr, socklen_t len, u_int16_t src_port, int transparent, int pipe_size)
695 {
696 int s, ret = 0;
697
698 if (!transparent) {
699 struct host_addr source_ip;
700 struct sockaddr_storage ssource_ip;
701
702 memset(&source_ip, 0, sizeof(source_ip));
703 memset(&ssource_ip, 0, sizeof(ssource_ip));
704
705 if (src_port) {
706 source_ip.family = addr->sa_family;
707 ret = addr_to_sa((struct sockaddr *) &ssource_ip, &source_ip, src_port);
708 }
709
710 if ((s = socket(addr->sa_family, SOCK_DGRAM, 0)) == -1) {
711 Log(LOG_ERR, "ERROR ( %s/%s ): socket() error: %s\n", config.name, config.type, strerror(errno));
712 exit_gracefully(1);
713 }
714
715 if (ret && bind(s, (struct sockaddr *) &ssource_ip, sizeof(ssource_ip)) == -1)
716 Log(LOG_WARNING, "WARN ( %s/%s ): bind() error: %s\n", config.name, config.type, strerror(errno));
717 }
718 else {
719 if ((s = socket(addr->sa_family, SOCK_RAW, IPPROTO_RAW)) == -1) {
720 Log(LOG_ERR, "ERROR ( %s/%s ): socket() error: %s\n", config.name, config.type, strerror(errno));
721 exit_gracefully(1);
722 }
723
724
725 #if defined BSD && !defined(__DragonFly__) /* no int hincl in code here */
726 setsockopt(s, IPPROTO_IP, IP_HDRINCL, &hincl, (socklen_t) sizeof(hincl));
727 #endif
728 }
729
730 if (pipe_size) {
731 socklen_t l = sizeof(pipe_size);
732 int saved = 0, obtained = 0;
733
734 getsockopt(s, SOL_SOCKET, SO_SNDBUF, &saved, &l);
735 Setsocksize(s, SOL_SOCKET, SO_SNDBUF, &pipe_size, (socklen_t) sizeof(pipe_size));
736 getsockopt(s, SOL_SOCKET, SO_SNDBUF, &obtained, &l);
737
738 if (obtained < saved) {
739 Setsocksize(s, SOL_SOCKET, SO_SNDBUF, &saved, l);
740 getsockopt(s, SOL_SOCKET, SO_SNDBUF, &obtained, &l);
741 }
742 Log(LOG_INFO, "INFO ( %s/%s ): tee_pipe_size: obtained=%d target=%d.\n", config.name, config.type, obtained, pipe_size);
743 }
744
745 if (connect(s, (struct sockaddr *)addr, len) == -1) {
746 Log(LOG_ERR, "ERROR ( %s/%s ): connect() error: %s\n", config.name, config.type, strerror(errno));
747 exit_gracefully(1);
748 }
749
750 return(s);
751 }
752
Tee_parse_hostport(const char * s,struct sockaddr * addr,socklen_t * len,int dont_check_port)753 int Tee_parse_hostport(const char *s, struct sockaddr *addr, socklen_t *len, int dont_check_port)
754 {
755 char *orig, *host, *port, zero_port[] = "]:0";
756 struct addrinfo hints, *res;
757 int herr;
758
759 memset(&hints, '\0', sizeof(hints));
760
761 if ((host = orig = strdup(s)) == NULL) {
762 Log(LOG_ERR, "ERROR ( %s/%s ): Tee_parse_hostport() out of memory. Exiting ..\n", config.name, config.type);
763 exit_gracefully(1);
764 }
765
766 trim_spaces(host);
767 trim_spaces(orig);
768
769 if ((port = strrchr(host, ':')) == NULL || *(++port) == '\0') {
770 if (dont_check_port) {
771 port = zero_port;
772 ++port; ++port;
773 }
774 else return TRUE;
775 }
776
777 if (*host == '\0') return TRUE;
778
779 *(port - 1) = '\0';
780
781 /* Accept [host]:port for numeric IPv6 addresses;
782 XXX: if dont_check_port is set to true, check for ']' will be inaccurate */
783 if (*host == '[' && *(port - 2) == ']') {
784 host++;
785 *(port - 2) = '\0';
786 hints.ai_family = AF_INET6;
787 }
788
789 hints.ai_socktype = SOCK_DGRAM;
790
791 /* Validations */
792 if ((herr = getaddrinfo(host, port, &hints, &res)) == -1) return TRUE;
793 if (res == NULL || res->ai_addr == NULL) return TRUE;
794 if (res->ai_addrlen > *len) return TRUE;
795
796 memcpy(addr, res->ai_addr, res->ai_addrlen);
797 free(orig);
798 *len = res->ai_addrlen;
799
800 return FALSE;
801 }
802
Tee_rr_balance(void * pool,struct pkt_msg * msg)803 struct tee_receiver *Tee_rr_balance(void *pool, struct pkt_msg *msg)
804 {
805 struct tee_receivers_pool *p = pool;
806 struct tee_receiver *target = NULL;
807
808 if (p) {
809 target = &p->receivers[p->balance.next % p->num];
810 p->balance.next++;
811 p->balance.next %= p->num;
812 }
813
814 return target;
815 }
816
Tee_hash_agent_balance(void * pool,struct pkt_msg * msg)817 struct tee_receiver *Tee_hash_agent_balance(void *pool, struct pkt_msg *msg)
818 {
819 struct tee_receivers_pool *p = pool;
820 struct tee_receiver *target = NULL;
821 struct sockaddr *sa = (struct sockaddr *) &msg->agent;
822 struct sockaddr_in *sa4 = (struct sockaddr_in *) &msg->agent;
823
824 if (p) {
825 if (sa->sa_family == AF_INET) target = &p->receivers[sa4->sin_addr.s_addr & p->num];
826 /* XXX: hashing against IPv6 agents is not supported (yet) */
827 }
828
829 return target;
830 }
831
Tee_hash_tag_balance(void * pool,struct pkt_msg * msg)832 struct tee_receiver *Tee_hash_tag_balance(void *pool, struct pkt_msg *msg)
833 {
834 struct tee_receivers_pool *p = pool;
835 struct tee_receiver *target = NULL;
836
837 if (p) target = &p->receivers[msg->tag % p->num];
838
839 return target;
840 }
841
Tee_select_templates(unsigned char * pkt,int pkt_len,int nfv,unsigned char * tpl_pkt,int * tpl_pkt_len)842 void Tee_select_templates(unsigned char *pkt, int pkt_len, int nfv, unsigned char *tpl_pkt, int *tpl_pkt_len)
843 {
844 struct struct_header_v9 *hdr_v9 = NULL;
845 struct struct_header_ipfix *hdr_v10 = NULL;
846 struct data_hdr_v9 *hdr_flowset = NULL;
847
848 unsigned char *src_ptr = pkt, *dst_ptr = tpl_pkt;
849 u_int16_t flowsetNo = 0, flowsetCount = 0, flowsetTplCount = 0;
850 int tmp_len = 0, hdr_len = 0, term1 = 0, term2 = 0;
851
852 if (!pkt || !pkt_len || !tpl_pkt || !tpl_pkt_len) return;
853 if (nfv != 9 && nfv != 10) return;
854 if (pkt_len > NETFLOW_MSG_SIZE) return;
855
856 (*tpl_pkt_len) = 0;
857
858 /* NetFlow v9 */
859 if (nfv == 9) {
860 hdr_v9 = (struct struct_header_v9 *) tpl_pkt;
861 hdr_len = sizeof(struct struct_header_v9);
862 }
863 else if (nfv == 10) {
864 hdr_v10 = (struct struct_header_ipfix *) tpl_pkt;
865 hdr_len = sizeof(struct struct_header_ipfix);
866 }
867
868 if (pkt_len < hdr_len) return;
869
870 memcpy(dst_ptr, src_ptr, hdr_len);
871 src_ptr += hdr_len;
872 dst_ptr += hdr_len;
873 pkt_len -= hdr_len;
874 tmp_len += hdr_len;
875
876 if (nfv == 9) {
877 flowsetNo = htons(hdr_v9->count);
878 term1 = (flowsetNo + 1); /* trick to use > operator */
879 term2 = flowsetCount;
880 }
881 else if (nfv == 10) {
882 term1 = pkt_len;
883 term2 = 0;
884 }
885
886 hdr_flowset = (struct data_hdr_v9 *) src_ptr;
887
888 while (term1 > term2) {
889 int fset_id = ntohs(hdr_flowset->flow_id);
890 int fset_len = ntohs(hdr_flowset->flow_len);
891 int fset_hdr_len = sizeof(struct data_hdr_v9);
892
893 if (!fset_len || (fset_hdr_len + fset_len) > pkt_len) break;
894
895 /* if template, copy over */
896 if (((nfv == 9) && (fset_id == 0 || fset_id == 1)) ||
897 ((nfv == 10) && (fset_id == 2 || fset_id == 3))) {
898 memcpy(dst_ptr, src_ptr, fset_len);
899
900 src_ptr += fset_len;
901 dst_ptr += fset_len;
902
903 pkt_len -= fset_len;
904 tmp_len += fset_len;
905
906 flowsetTplCount++;
907 }
908 /* if data, skip */
909 else {
910 src_ptr += fset_len;
911 pkt_len -= fset_len;
912 }
913
914 if (nfv == 9) {
915 flowsetCount++;
916 term2 = flowsetCount;
917 }
918 else if (nfv == 10) {
919 term1 = pkt_len;
920 }
921
922 hdr_flowset = (struct data_hdr_v9 *) src_ptr;
923 }
924
925 /* if we have at least one template, let's update the template packet */
926 if (flowsetTplCount) {
927 if (nfv == 9) {
928 hdr_v9->count = htons(flowsetTplCount);
929 }
930 else if (nfv == 10) {
931 hdr_v10->len = htons(tmp_len);
932 }
933
934 (*tpl_pkt_len) = tmp_len;
935 }
936 }
937