1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 #include "pmacct.h"
23 #include "addr.h"
24 #ifdef WITH_KAFKA
25 #include "kafka_common.h"
26 #endif
27 #include "pmacct-data.h"
28 #include "plugin_hooks.h"
29 #include "plugin_common.h"
30 #include "tee_plugin.h"
31 #include "nfacctd.h"
32 
33 /* Global variables */
34 char tee_send_buf[65535];
35 struct tee_receivers receivers;
36 int err_cant_bridge_af;
37 
tee_plugin(int pipe_fd,struct configuration * cfgptr,void * ptr)38 void tee_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
39 {
40   struct pkt_msg *msg;
41   unsigned char *pipebuf;
42   struct pollfd pfd;
43   int refresh_timeout, ret, pool_idx, recv_idx, recv_budget, poll_bypass;
44   struct ring *rg = &((struct channels_list_entry *)ptr)->rg;
45   struct ch_status *status = ((struct channels_list_entry *)ptr)->status;
46   u_int32_t bufsz = ((struct channels_list_entry *)ptr)->bufsize;
47   unsigned char *dataptr;
48   struct tee_receiver *target = NULL;
49   struct plugin_requests req;
50 
51   unsigned char *rgptr;
52   int pollagain = TRUE;
53   u_int32_t seq = 1, rg_err_count = 0;
54 
55 #ifdef WITH_ZMQ
56   struct p_zmq_host *zmq_host = &((struct channels_list_entry *)ptr)->zmq_host;
57 #else
58   void *zmq_host = NULL;
59 #endif
60 
61 #ifdef WITH_REDIS
62   struct p_redis_host redis_host;
63 #endif
64 
65   memcpy(&config, cfgptr, sizeof(struct configuration));
66   recollect_pipe_memory(ptr);
67   pm_setproctitle("%s [%s]", "Tee Plugin", config.name);
68   if (config.pidfile) write_pid_file_plugin(config.pidfile, config.type, config.name);
69   if (config.logfile) {
70     fclose(config.logfile_fd);
71     config.logfile_fd = open_output_file(config.logfile, "a", FALSE);
72   }
73 
74   if (config.proc_priority) {
75     int ret;
76 
77     ret = setpriority(PRIO_PROCESS, 0, config.proc_priority);
78     if (ret) Log(LOG_WARNING, "WARN ( %s/%s ): proc_priority failed (errno: %d)\n", config.name, config.type, errno);
79     else Log(LOG_INFO, "INFO ( %s/%s ): proc_priority set to %d\n", config.name, config.type, getpriority(PRIO_PROCESS, 0));
80   }
81 
82   /* signal handling */
83   signal(SIGINT, Tee_exit_now);
84   signal(SIGUSR1, SIG_IGN);
85   signal(SIGUSR2, reload_maps); /* sets to true the reload_maps flag */
86   signal(SIGPIPE, SIG_IGN);
87   signal(SIGCHLD, SIG_IGN);
88 
89   if (config.tee_transparent && getuid() != 0) {
90     Log(LOG_ERR, "ERROR ( %s/%s ): Transparent mode requires super-user permissions. Exiting ...\n", config.name, config.type);
91     exit_gracefully(1);
92   }
93 
94   if (!config.tee_receivers) {
95     Log(LOG_ERR, "ERROR ( %s/%s ): No receivers specified: tee_receivers is required. Exiting ...\n", config.name, config.type);
96     exit_gracefully(1);
97   }
98 
99   memset(empty_mem_area_256b, 0, sizeof(empty_mem_area_256b));
100   memset(&receivers, 0, sizeof(receivers));
101   memset(&req, 0, sizeof(req));
102   reload_map = FALSE;
103 
104   /* Setting up pools */
105   if (!config.tee_max_receiver_pools) config.tee_max_receiver_pools = MAX_TEE_POOLS;
106 
107   receivers.pools = malloc((config.tee_max_receiver_pools+1)*sizeof(struct tee_receivers_pool));
108   if (!receivers.pools) {
109     Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate receiver pools. Exiting ...\n", config.name, config.type);
110     exit_gracefully(1);
111   }
112   else memset(receivers.pools, 0, (config.tee_max_receiver_pools+1)*sizeof(struct tee_receivers_pool));
113 
114   /* Setting up receivers per pool */
115   if (!config.tee_max_receivers) config.tee_max_receivers = MAX_TEE_RECEIVERS;
116 
117   for (pool_idx = 0; pool_idx < config.tee_max_receiver_pools; pool_idx++) {
118     receivers.pools[pool_idx].receivers = malloc(config.tee_max_receivers*sizeof(struct tee_receiver));
119     if (!receivers.pools[pool_idx].receivers) {
120       Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate receivers for pool #%u. Exiting ...\n", config.name, config.type, pool_idx);
121       exit_gracefully(1);
122     }
123     else memset(receivers.pools[pool_idx].receivers, 0, config.tee_max_receivers*sizeof(struct tee_receiver));
124   }
125 
126   if (config.tee_receivers) {
127     int recvs_allocated = FALSE;
128 
129     req.key_value_table = (void *) &receivers;
130     load_id_file(MAP_TEE_RECVS, config.tee_receivers, NULL, &req, &recvs_allocated);
131   }
132 
133   config.sql_refresh_time = DEFAULT_TEE_REFRESH_TIME;
134   refresh_timeout = config.sql_refresh_time*1000;
135 
136   pipebuf = (unsigned char *) pm_malloc(config.buffer_size);
137 
138   if (config.pipe_zmq) P_zmq_pipe_init(zmq_host, &pipe_fd, &seq);
139   else setnonblocking(pipe_fd);
140 
141   memset(pipebuf, 0, config.buffer_size);
142   err_cant_bridge_af = 0;
143 
144   /* Arrange send socket */
145   Tee_init_socks();
146 
147 #ifdef WITH_REDIS
148   if (config.redis_host) {
149     char log_id[SHORTBUFLEN];
150 
151     snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
152     p_redis_init(&redis_host, log_id, p_redis_thread_produce_common_plugin_handler);
153   }
154 #endif
155 
156   /* plugin main loop */
157   for (;;) {
158     poll_again:
159     status->wakeup = TRUE;
160     poll_bypass = FALSE;
161 
162     pfd.fd = pipe_fd;
163     pfd.events = POLLIN;
164 
165     ret = poll(&pfd, (pfd.fd == ERR ? 0 : 1), refresh_timeout);
166 
167     if (ret < 0) goto poll_again;
168 
169     poll_ops:
170     if (reload_map) {
171       if (config.tee_receivers) {
172         int recvs_allocated = FALSE;
173 
174         Tee_destroy_recvs();
175         load_id_file(MAP_TEE_RECVS, config.tee_receivers, NULL, &req, &recvs_allocated);
176 
177         Tee_init_socks();
178       }
179 
180       reload_map = FALSE;
181     }
182 
183     if (reload_log) {
184       reload_logs();
185       reload_log = FALSE;
186     }
187 
188     recv_budget = 0;
189     if (poll_bypass) {
190       poll_bypass = FALSE;
191       goto read_data;
192     }
193 
194     switch (ret) {
195     case 0: /* timeout */
196       /* reserved for future since we don't currently cache/batch/etc */
197       break;
198     default: /* we received data */
199       read_data:
200       if (recv_budget == DEFAULT_PLUGIN_COMMON_RECV_BUDGET) {
201 	poll_bypass = TRUE;
202 	goto poll_ops;
203       }
204 
205       if (config.pipe_homegrown) {
206         if (!pollagain) {
207           seq++;
208           seq %= MAX_SEQNUM;
209           if (seq == 0) rg_err_count = FALSE;
210         }
211         else {
212           if ((ret = read(pipe_fd, &rgptr, sizeof(rgptr))) == 0)
213             exit_gracefully(1); /* we exit silently; something happened at the write end */
214         }
215 
216         if ((rg->ptr + bufsz) > rg->end) rg->ptr = rg->base;
217 
218         if (((struct ch_buf_hdr *)rg->ptr)->seq != seq) {
219           if (!pollagain) {
220             pollagain = TRUE;
221             goto poll_again;
222           }
223           else {
224             rg_err_count++;
225             if (config.debug || (rg_err_count > MAX_RG_COUNT_ERR)) {
226               Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected (plugin_buffer_size=%" PRIu64 " plugin_pipe_size=%" PRIu64 ").\n",
227                         config.name, config.type, config.buffer_size, config.pipe_size);
228               Log(LOG_WARNING, "WARN ( %s/%s ): Increase values or look for plugin_buffer_size, plugin_pipe_size in CONFIG-KEYS document.\n\n",
229                         config.name, config.type);
230             }
231 
232 	    rg->ptr = (rg->base + status->last_buf_off);
233             seq = ((struct ch_buf_hdr *)rg->ptr)->seq;
234           }
235         }
236 
237         pollagain = FALSE;
238         memcpy(pipebuf, rg->ptr, bufsz);
239         rg->ptr += bufsz;
240       }
241 #ifdef WITH_ZMQ
242       else if (config.pipe_zmq) {
243 	ret = p_zmq_topic_recv(zmq_host, pipebuf, config.buffer_size);
244 	if (ret > 0) {
245 	  if (seq && (((struct ch_buf_hdr *)pipebuf)->seq != ((seq + 1) % MAX_SEQNUM))) {
246 	    Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected. Sequence received=%u expected=%u\n",
247 		config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->seq, ((seq + 1) % MAX_SEQNUM));
248 	  }
249 
250 	  seq = ((struct ch_buf_hdr *)pipebuf)->seq;
251 	}
252 	else goto poll_again;
253       }
254 #endif
255 
256       msg = (struct pkt_msg *) (pipebuf+sizeof(struct ch_buf_hdr));
257       msg->payload = (pipebuf+sizeof(struct ch_buf_hdr)+PmsgSz);
258 
259       if (config.debug_internal_msg)
260         Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer received len=%" PRIu64 " seq=%u num_entries=%u\n",
261                 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->len, seq,
262                 ((struct ch_buf_hdr *)pipebuf)->num);
263 
264       while (((struct ch_buf_hdr *)pipebuf)->num > 0) {
265 	for (pool_idx = 0; pool_idx < receivers.num; pool_idx++) {
266 	  if (msg->bcast || !evaluate_tags(&receivers.pools[pool_idx].tag_filter, msg->tag)) {
267 	    if (!receivers.pools[pool_idx].balance.func) {
268 	      for (recv_idx = 0; recv_idx < receivers.pools[pool_idx].num; recv_idx++) {
269 	        target = &receivers.pools[pool_idx].receivers[recv_idx];
270 	        Tee_send(msg, (struct sockaddr *) &target->dest, target->fd, config.tee_transparent);
271 	      }
272 
273 #ifdef WITH_KAFKA
274 	      /* Checking the handler is the most light weight op we can perform
275 		 in order to ensure we are in business with the Kafka broker */
276 	      if (p_kafka_get_handler(&receivers.pools[pool_idx].kafka_host)) {
277 		Tee_kafka_send(msg, &receivers.pools[pool_idx]);
278 	      }
279 #endif
280 
281 #ifdef WITH_ZMQ
282 	      if (p_zmq_get_sock(&receivers.pools[pool_idx].zmq_host)) {
283 		Tee_zmq_send(msg, &receivers.pools[pool_idx]);
284 	      }
285 #endif
286 	    }
287 	    else {
288 	      target = receivers.pools[pool_idx].balance.func(&receivers.pools[pool_idx], msg);
289 	      Tee_send(msg, (struct sockaddr *) &target->dest, target->fd, config.tee_transparent);
290 	    }
291 	  }
292 	}
293 
294         ((struct ch_buf_hdr *)pipebuf)->num--;
295         if (((struct ch_buf_hdr *)pipebuf)->num) {
296 	  dataptr = (unsigned char *) msg;
297           dataptr += (PmsgSz + msg->len);
298 	  msg = (struct pkt_msg *) dataptr;
299 	  msg->payload = (dataptr + PmsgSz);
300 	}
301       }
302 
303       recv_budget++;
304       goto read_data;
305     }
306   }
307 }
308 
Tee_exit_now(int signum)309 void Tee_exit_now(int signum)
310 {
311   wait(NULL);
312   exit_gracefully(0);
313 }
314 
Tee_craft_transparent_msg(struct pkt_msg * msg,struct sockaddr * target)315 size_t Tee_craft_transparent_msg(struct pkt_msg *msg, struct sockaddr *target)
316 {
317   char *buf_ptr = tee_send_buf;
318   struct sockaddr *sa = (struct sockaddr *) &msg->agent;
319   struct sockaddr_in *sa4 = (struct sockaddr_in *) &msg->agent;
320   struct pm_iphdr *i4h = (struct pm_iphdr *) buf_ptr;
321   struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) &msg->agent;
322   struct ip6_hdr *i6h = (struct ip6_hdr *) buf_ptr;
323   struct pm_udphdr *uh;
324   size_t msglen = 0;
325 
326   if (sa->sa_family == target->sa_family) {
327     /* UDP header first */
328     if (target->sa_family == AF_INET) {
329       buf_ptr += IP4HdrSz;
330       uh = (struct pm_udphdr *) buf_ptr;
331       uh->uh_sport = sa4->sin_port;
332       uh->uh_dport = ((struct sockaddr_in *)target)->sin_port;
333     }
334     else if (target->sa_family == AF_INET6) {
335       buf_ptr += IP6HdrSz;
336       uh = (struct pm_udphdr *) buf_ptr;
337       uh->uh_sport = sa6->sin6_port;
338       uh->uh_dport = ((struct sockaddr_in6 *)target)->sin6_port;
339     }
340     else {
341       assert(0);
342       return msglen;
343     }
344 
345     uh->uh_ulen = htons(msg->len+UDPHdrSz);
346     uh->uh_sum = 0;
347 
348     /* IP header then */
349     if (target->sa_family == AF_INET) {
350       i4h->ip_vhl = 4;
351       i4h->ip_vhl <<= 4;
352       i4h->ip_vhl |= (IP4HdrSz/4);
353 
354       if (config.nfprobe_ipprec) {
355 	int opt = config.nfprobe_ipprec << 5;
356         i4h->ip_tos = opt;
357       }
358       else i4h->ip_tos = 0;
359 
360 #if !defined BSD
361       i4h->ip_len = htons(IP4HdrSz+UDPHdrSz+msg->len);
362 #else
363       i4h->ip_len = IP4HdrSz+UDPHdrSz+msg->len;
364 #endif
365       i4h->ip_id = 0;
366       i4h->ip_off = 0;
367       i4h->ip_ttl = 255;
368       i4h->ip_p = IPPROTO_UDP;
369       i4h->ip_sum = 0;
370       i4h->ip_src.s_addr = sa4->sin_addr.s_addr;
371       i4h->ip_dst.s_addr = ((struct sockaddr_in *)target)->sin_addr.s_addr;
372     }
373     else if (target->sa_family == AF_INET6) {
374       i6h->ip6_vfc = 6;
375       i6h->ip6_vfc <<= 4;
376       i6h->ip6_plen = htons(UDPHdrSz+msg->len);
377       i6h->ip6_nxt = IPPROTO_UDP;
378       i6h->ip6_hlim = 255;
379       memcpy(&i6h->ip6_src, &sa6->sin6_addr, IP6AddrSz);
380       memcpy(&i6h->ip6_dst, &((struct sockaddr_in6 *)target)->sin6_addr, IP6AddrSz);
381     }
382 
383     /* Put everything together and send */
384     buf_ptr += UDPHdrSz;
385     memcpy(buf_ptr, msg->payload, msg->len);
386 
387     msglen = (IP4HdrSz + UDPHdrSz + msg->len);
388   }
389   else {
390     time_t now = time(NULL);
391 
392     if (now > err_cant_bridge_af + 60) {
393       Log(LOG_ERR, "ERROR ( %s/%s ): Can't bridge Address Families when in transparent mode\n", config.name, config.type);
394       err_cant_bridge_af = now;
395     }
396   }
397 
398   return msglen;
399 }
400 
Tee_send(struct pkt_msg * msg,struct sockaddr * target,int fd,int transparent)401 void Tee_send(struct pkt_msg *msg, struct sockaddr *target, int fd, int transparent)
402 {
403   struct host_addr r;
404   char recv_addr[50];
405   u_int16_t recv_port;
406 
407   if (config.debug) {
408     char *flow = NULL, netflow[] = "NetFlow/IPFIX", sflow[] = "sFlow";
409     struct host_addr a;
410     char agent_addr[50];
411     u_int16_t agent_port;
412 
413     sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
414     addr_to_str(agent_addr, &a);
415 
416     sa_to_addr((struct sockaddr *)target, &r, &recv_port);
417     addr_to_str(recv_addr, &r);
418 
419     if (config.acct_type == ACCT_NF) flow = netflow;
420     else if (config.acct_type == ACCT_SF) flow = sflow;
421 
422     Log(LOG_DEBUG, "DEBUG ( %s/%s ): Sending %s packet from [%s:%u] seqno [%u] to [%s:%u]\n",
423                         config.name, config.type, flow, agent_addr, agent_port, msg->seqno,
424 			recv_addr, recv_port);
425   }
426 
427   if (!transparent) {
428     if (send(fd, msg->payload, msg->len, 0) == -1) {
429       struct host_addr a;
430       char agent_addr[50];
431       u_int16_t agent_port;
432 
433       sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
434       addr_to_str(agent_addr, &a);
435 
436       sa_to_addr((struct sockaddr *)target, &r, &recv_port);
437       addr_to_str(recv_addr, &r);
438 
439       Log(LOG_ERR, "ERROR ( %s/%s ): send() from [%s:%u] seqno [%u] to [%s:%u] failed (%s)\n",
440 			config.name, config.type, agent_addr, agent_port, msg->seqno, recv_addr,
441 			recv_port, strerror(errno));
442     }
443   }
444   else {
445     size_t msglen;
446 
447     msglen = Tee_craft_transparent_msg(msg, target);
448 
449     if (msglen && send(fd, tee_send_buf, msglen, 0) == -1) {
450       struct host_addr a;
451       char agent_addr[50];
452       u_int16_t agent_port;
453 
454       sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
455       addr_to_str(agent_addr, &a);
456 
457       sa_to_addr((struct sockaddr *)target, &r, &recv_port);
458       addr_to_str(recv_addr, &r);
459 
460       Log(LOG_ERR, "ERROR ( %s/%s ): raw send() from [%s:%u] seqno [%u] to [%s:%u] failed (%s)\n",
461 	  config.name, config.type, agent_addr, agent_port, msg->seqno, recv_addr,
462 	  recv_port, strerror(errno));
463     }
464   }
465 }
466 
467 #ifdef WITH_KAFKA
Tee_kafka_send(struct pkt_msg * msg,struct tee_receivers_pool * pool)468 void Tee_kafka_send(struct pkt_msg *msg, struct tee_receivers_pool *pool)
469 {
470   struct p_kafka_host *kafka_host = &pool->kafka_host;
471   struct sockaddr *sa, target;
472   time_t last_fail, now;
473   size_t msglen = 0;
474 
475   memset(&target, 0, sizeof(target));
476   sa = (struct sockaddr *) &msg->agent;
477 
478   target.sa_family = sa->sa_family;
479 
480   if (config.debug) {
481     char *flow = NULL, netflow[] = "NetFlow/IPFIX", sflow[] = "sFlow";
482     char *broker, *topic;
483     struct host_addr a;
484     char agent_addr[50];
485     u_int16_t agent_port;
486 
487     sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
488     addr_to_str(agent_addr, &a);
489 
490     broker = p_kafka_get_broker(kafka_host);
491     topic = p_kafka_get_topic(kafka_host);
492 
493     if (config.acct_type == ACCT_NF) flow = netflow;
494     else if (config.acct_type == ACCT_SF) flow = sflow;
495 
496     Log(LOG_DEBUG, "DEBUG ( %s/%s ): Sending %s packet from [%s:%u] seqno [%u] to Kafka [%s-%s]\n",
497                         config.name, config.type, flow, agent_addr, agent_port, msg->seqno,
498                         broker, topic);
499   }
500 
501   last_fail = P_broker_timers_get_last_fail(&kafka_host->btimers);
502   if (last_fail) {
503     now = time(NULL);
504 
505     if ((last_fail + P_broker_timers_get_retry_interval(&kafka_host->btimers)) <= now) {
506       Tee_init_kafka_host(kafka_host, pool->kafka_broker, pool->kafka_topic, pool->id);
507     }
508   }
509 
510   if (config.tee_transparent) {
511     msglen = Tee_craft_transparent_msg(msg, &target);
512 
513     if (msglen) p_kafka_produce_data(kafka_host, tee_send_buf, msglen);
514   }
515 }
516 #endif
517 
518 #ifdef WITH_ZMQ
Tee_zmq_send(struct pkt_msg * msg,struct tee_receivers_pool * pool)519 void Tee_zmq_send(struct pkt_msg *msg, struct tee_receivers_pool *pool)
520 {
521   struct p_zmq_host *zmq_host = &pool->zmq_host;
522   struct sockaddr *sa, target;
523   size_t msglen = 0;
524   int ret;
525 
526   memset(&target, 0, sizeof(target));
527   sa = (struct sockaddr *) &msg->agent;
528 
529   target.sa_family = sa->sa_family;
530 
531   if (config.debug) {
532     char *flow = NULL, netflow[] = "NetFlow/IPFIX", sflow[] = "sFlow";
533     char *address;
534     struct host_addr a;
535     char agent_addr[50];
536     u_int16_t agent_port;
537 
538     sa_to_addr((struct sockaddr *)msg, &a, &agent_port);
539     addr_to_str(agent_addr, &a);
540 
541     address = p_zmq_get_address(zmq_host);
542 
543     if (config.acct_type == ACCT_NF) flow = netflow;
544     else if (config.acct_type == ACCT_SF) flow = sflow;
545 
546     Log(LOG_DEBUG, "DEBUG ( %s/%s ): Sending %s packet from [%s:%u] seqno [%u] via ZeroMQ [%s]\n",
547 	config.name, config.type, flow, agent_addr, agent_port, msg->seqno, address);
548   }
549 
550   if (config.tee_transparent) {
551     msglen = Tee_craft_transparent_msg(msg, &target);
552 
553     if (msglen) {
554       ret = p_zmq_send_bin(&zmq_host->sock, tee_send_buf, msglen, TRUE);
555       if (ret == ERR && errno == EAGAIN) {
556 	char *address;
557 
558 	address = p_zmq_get_address(zmq_host);
559 	Log(LOG_WARNING, "WARN ( %s/%s ): Queue full: ZeroMQ [%s]\n", config.name, config.type, address);
560       }
561     }
562   }
563 }
564 #endif
565 
Tee_destroy_recvs()566 void Tee_destroy_recvs()
567 {
568   struct tee_receiver *target = NULL;
569   int pool_idx, recv_idx;
570 
571   for (pool_idx = 0; pool_idx < receivers.num; pool_idx++) {
572     for (recv_idx = 0; recv_idx < receivers.pools[pool_idx].num; recv_idx++) {
573       target = &receivers.pools[pool_idx].receivers[recv_idx];
574       if (target->fd) close(target->fd);
575     }
576 
577     memset(receivers.pools[pool_idx].receivers, 0, config.tee_max_receivers*sizeof(struct tee_receiver));
578     memset(&receivers.pools[pool_idx].tag_filter, 0, sizeof(struct pretag_filter));
579     memset(&receivers.pools[pool_idx].balance, 0, sizeof(struct tee_balance));
580     receivers.pools[pool_idx].id = 0;
581     receivers.pools[pool_idx].num = 0;
582 
583 #ifdef WITH_KAFKA
584     if (strlen(receivers.pools[pool_idx].kafka_broker)) {
585       p_kafka_close(&receivers.pools[pool_idx].kafka_host, FALSE);
586       memset(receivers.pools[pool_idx].kafka_broker, 0, sizeof(receivers.pools[pool_idx].kafka_broker));
587       memset(receivers.pools[pool_idx].kafka_topic, 0, sizeof(receivers.pools[pool_idx].kafka_topic));
588     }
589 #endif
590 
591 #ifdef WITH_ZMQ
592     if (strlen(receivers.pools[pool_idx].zmq_address)) {
593       p_zmq_close(&receivers.pools[pool_idx].zmq_host);
594       memset(receivers.pools[pool_idx].zmq_address, 0, sizeof(receivers.pools[pool_idx].zmq_address));
595     }
596 #endif
597   }
598 
599   receivers.num = 0;
600 }
601 
Tee_init_socks()602 void Tee_init_socks()
603 {
604   struct tee_receiver *target = NULL;
605   struct sockaddr *sa;
606   int pool_idx, recv_idx, err;
607   char dest_addr[256], dest_serv[256];
608 
609   for (pool_idx = 0; pool_idx < receivers.num; pool_idx++) {
610     for (recv_idx = 0; recv_idx < receivers.pools[pool_idx].num; recv_idx++) {
611       target = &receivers.pools[pool_idx].receivers[recv_idx];
612       sa = (struct sockaddr *) &target->dest;
613 
614       if (sa->sa_family != 0) {
615         if ((err = getnameinfo(sa, target->dest_len, dest_addr, sizeof(dest_addr),
616             dest_serv, sizeof(dest_serv), NI_NUMERICHOST)) == -1) {
617           Log(LOG_ERR, "ERROR ( %s/%s ): getnameinfo: %d\n", config.name, config.type, err);
618           exit_gracefully(1);
619         }
620       }
621 
622       target->fd = Tee_prepare_sock((struct sockaddr *) &target->dest, target->dest_len, receivers.pools[pool_idx].src_port,
623 				    config.tee_transparent, config.tee_pipe_size);
624 
625       if (config.debug) {
626 	struct host_addr recv_addr;
627         char recv_addr_str[INET6_ADDRSTRLEN];
628 	u_int16_t recv_port;
629 
630 	sa_to_addr((struct sockaddr *)&target->dest, &recv_addr, &recv_port);
631         addr_to_str(recv_addr_str, &recv_addr);
632         Log(LOG_DEBUG, "DEBUG ( %s/%s ): PoolID=%u receiver=%s fd=%d\n",
633 	    config.name, config.type, receivers.pools[pool_idx].id, recv_addr_str, target->fd);
634       }
635     }
636 
637 #ifdef WITH_KAFKA
638     if (strlen(receivers.pools[pool_idx].kafka_broker)) {
639       Tee_init_kafka_host(&receivers.pools[pool_idx].kafka_host, receivers.pools[pool_idx].kafka_broker,
640 			  receivers.pools[pool_idx].kafka_topic, receivers.pools[pool_idx].id);
641     }
642 #endif
643 
644 #ifdef WITH_ZMQ
645     if (strlen(receivers.pools[pool_idx].zmq_address)) {
646       Tee_init_zmq_host(&receivers.pools[pool_idx].zmq_host, receivers.pools[pool_idx].zmq_address,
647 			receivers.pools[pool_idx].id);
648     }
649 #endif
650   }
651 }
652 
653 #ifdef WITH_KAFKA
Tee_init_kafka_host(struct p_kafka_host * kafka_host,char * kafka_broker,char * kafka_topic,u_int32_t pool_id)654 void Tee_init_kafka_host(struct p_kafka_host *kafka_host, char *kafka_broker, char *kafka_topic, u_int32_t pool_id)
655 {
656   p_kafka_init_host(kafka_host, config.tee_kafka_config_file);
657   p_kafka_connect_to_produce(kafka_host);
658   p_kafka_set_broker(kafka_host, kafka_broker, FALSE);
659   p_kafka_set_topic(kafka_host, kafka_topic);
660   p_kafka_set_content_type(kafka_host, PM_KAFKA_CNT_TYPE_BIN);
661   P_broker_timers_set_retry_interval(&kafka_host->btimers, PM_KAFKA_DEFAULT_RETRY);
662 
663   if (config.debug) {
664     char *broker, *topic;
665 
666     broker = p_kafka_get_broker(kafka_host);
667     topic = p_kafka_get_topic(kafka_host);
668     Log(LOG_DEBUG, "DEBUG ( %s/%s ): PoolID=%u KafkaBroker=%s KafkaTopic=%s\n",
669 	config.name, config.type, pool_id, broker, topic);
670   }
671 }
672 #endif
673 
674 #ifdef WITH_ZMQ
Tee_init_zmq_host(struct p_zmq_host * zmq_host,char * zmq_address,u_int32_t pool_id)675 void Tee_init_zmq_host(struct p_zmq_host *zmq_host, char *zmq_address, u_int32_t pool_id)
676 {
677   char log_id[SHORTBUFLEN];
678 
679   p_zmq_init_push(zmq_host, zmq_address);
680   snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
681   p_zmq_set_log_id(zmq_host, log_id);
682   p_zmq_set_hwm(zmq_host, PM_ZMQ_DEFAULT_FLOW_HWM);
683   p_zmq_push_setup(zmq_host);
684 
685   if (config.debug) {
686     char *broker;
687 
688     broker = p_zmq_get_address(zmq_host);
689     Log(LOG_DEBUG, "DEBUG ( %s/%s ): PoolID=%u ZmqAddress=%s\n", config.name, config.type, pool_id, broker);
690   }
691 }
692 #endif
693 
Tee_prepare_sock(struct sockaddr * addr,socklen_t len,u_int16_t src_port,int transparent,int pipe_size)694 int Tee_prepare_sock(struct sockaddr *addr, socklen_t len, u_int16_t src_port, int transparent, int pipe_size)
695 {
696   int s, ret = 0;
697 
698   if (!transparent) {
699     struct host_addr source_ip;
700     struct sockaddr_storage ssource_ip;
701 
702     memset(&source_ip, 0, sizeof(source_ip));
703     memset(&ssource_ip, 0, sizeof(ssource_ip));
704 
705     if (src_port) {
706       source_ip.family = addr->sa_family;
707       ret = addr_to_sa((struct sockaddr *) &ssource_ip, &source_ip, src_port);
708     }
709 
710     if ((s = socket(addr->sa_family, SOCK_DGRAM, 0)) == -1) {
711       Log(LOG_ERR, "ERROR ( %s/%s ): socket() error: %s\n", config.name, config.type, strerror(errno));
712       exit_gracefully(1);
713     }
714 
715     if (ret && bind(s, (struct sockaddr *) &ssource_ip, sizeof(ssource_ip)) == -1)
716       Log(LOG_WARNING, "WARN ( %s/%s ): bind() error: %s\n", config.name, config.type, strerror(errno));
717   }
718   else {
719     if ((s = socket(addr->sa_family, SOCK_RAW, IPPROTO_RAW)) == -1) {
720       Log(LOG_ERR, "ERROR ( %s/%s ): socket() error: %s\n", config.name, config.type, strerror(errno));
721       exit_gracefully(1);
722     }
723 
724 
725 #if defined BSD && !defined(__DragonFly__)  /* no int hincl in code here */
726     setsockopt(s, IPPROTO_IP, IP_HDRINCL, &hincl, (socklen_t) sizeof(hincl));
727 #endif
728   }
729 
730   if (pipe_size) {
731     socklen_t l = sizeof(pipe_size);
732     int saved = 0, obtained = 0;
733 
734     getsockopt(s, SOL_SOCKET, SO_SNDBUF, &saved, &l);
735     Setsocksize(s, SOL_SOCKET, SO_SNDBUF, &pipe_size, (socklen_t) sizeof(pipe_size));
736     getsockopt(s, SOL_SOCKET, SO_SNDBUF, &obtained, &l);
737 
738     if (obtained < saved) {
739       Setsocksize(s, SOL_SOCKET, SO_SNDBUF, &saved, l);
740       getsockopt(s, SOL_SOCKET, SO_SNDBUF, &obtained, &l);
741     }
742     Log(LOG_INFO, "INFO ( %s/%s ): tee_pipe_size: obtained=%d target=%d.\n", config.name, config.type, obtained, pipe_size);
743   }
744 
745   if (connect(s, (struct sockaddr *)addr, len) == -1) {
746     Log(LOG_ERR, "ERROR ( %s/%s ): connect() error: %s\n", config.name, config.type, strerror(errno));
747     exit_gracefully(1);
748   }
749 
750   return(s);
751 }
752 
Tee_parse_hostport(const char * s,struct sockaddr * addr,socklen_t * len,int dont_check_port)753 int Tee_parse_hostport(const char *s, struct sockaddr *addr, socklen_t *len, int dont_check_port)
754 {
755   char *orig, *host, *port, zero_port[] = "]:0";
756   struct addrinfo hints, *res;
757   int herr;
758 
759   memset(&hints, '\0', sizeof(hints));
760 
761   if ((host = orig = strdup(s)) == NULL) {
762     Log(LOG_ERR, "ERROR ( %s/%s ): Tee_parse_hostport() out of memory. Exiting ..\n", config.name, config.type);
763     exit_gracefully(1);
764   }
765 
766   trim_spaces(host);
767   trim_spaces(orig);
768 
769   if ((port = strrchr(host, ':')) == NULL || *(++port) == '\0') {
770     if (dont_check_port) {
771       port = zero_port;
772       ++port; ++port;
773     }
774     else return TRUE;
775   }
776 
777   if (*host == '\0') return TRUE;
778 
779   *(port - 1) = '\0';
780 
781   /* Accept [host]:port for numeric IPv6 addresses;
782      XXX: if dont_check_port is set to true, check for ']' will be inaccurate */
783   if (*host == '[' && *(port - 2) == ']') {
784     host++;
785     *(port - 2) = '\0';
786     hints.ai_family = AF_INET6;
787   }
788 
789   hints.ai_socktype = SOCK_DGRAM;
790 
791   /* Validations */
792   if ((herr = getaddrinfo(host, port, &hints, &res)) == -1) return TRUE;
793   if (res == NULL || res->ai_addr == NULL) return TRUE;
794   if (res->ai_addrlen > *len) return TRUE;
795 
796   memcpy(addr, res->ai_addr, res->ai_addrlen);
797   free(orig);
798   *len = res->ai_addrlen;
799 
800   return FALSE;
801 }
802 
Tee_rr_balance(void * pool,struct pkt_msg * msg)803 struct tee_receiver *Tee_rr_balance(void *pool, struct pkt_msg *msg)
804 {
805   struct tee_receivers_pool *p = pool;
806   struct tee_receiver *target = NULL;
807 
808   if (p) {
809     target = &p->receivers[p->balance.next % p->num];
810     p->balance.next++;
811     p->balance.next %= p->num;
812   }
813 
814   return target;
815 }
816 
Tee_hash_agent_balance(void * pool,struct pkt_msg * msg)817 struct tee_receiver *Tee_hash_agent_balance(void *pool, struct pkt_msg *msg)
818 {
819   struct tee_receivers_pool *p = pool;
820   struct tee_receiver *target = NULL;
821   struct sockaddr *sa = (struct sockaddr *) &msg->agent;
822   struct sockaddr_in *sa4 = (struct sockaddr_in *) &msg->agent;
823 
824   if (p) {
825     if (sa->sa_family == AF_INET) target = &p->receivers[sa4->sin_addr.s_addr & p->num];
826     /* XXX: hashing against IPv6 agents is not supported (yet) */
827   }
828 
829   return target;
830 }
831 
Tee_hash_tag_balance(void * pool,struct pkt_msg * msg)832 struct tee_receiver *Tee_hash_tag_balance(void *pool, struct pkt_msg *msg)
833 {
834   struct tee_receivers_pool *p = pool;
835   struct tee_receiver *target = NULL;
836 
837   if (p) target = &p->receivers[msg->tag % p->num];
838 
839   return target;
840 }
841 
Tee_select_templates(unsigned char * pkt,int pkt_len,int nfv,unsigned char * tpl_pkt,int * tpl_pkt_len)842 void Tee_select_templates(unsigned char *pkt, int pkt_len, int nfv, unsigned char *tpl_pkt, int *tpl_pkt_len)
843 {
844   struct struct_header_v9 *hdr_v9 = NULL;
845   struct struct_header_ipfix *hdr_v10 = NULL;
846   struct data_hdr_v9 *hdr_flowset = NULL;
847 
848   unsigned char *src_ptr = pkt, *dst_ptr = tpl_pkt;
849   u_int16_t flowsetNo = 0, flowsetCount = 0, flowsetTplCount = 0;
850   int tmp_len = 0, hdr_len = 0, term1 = 0, term2 = 0;
851 
852   if (!pkt || !pkt_len || !tpl_pkt || !tpl_pkt_len) return;
853   if (nfv != 9 && nfv != 10) return;
854   if (pkt_len > NETFLOW_MSG_SIZE) return;
855 
856   (*tpl_pkt_len) = 0;
857 
858   /* NetFlow v9 */
859   if (nfv == 9) {
860     hdr_v9 = (struct struct_header_v9 *) tpl_pkt;
861     hdr_len = sizeof(struct struct_header_v9);
862   }
863   else if (nfv == 10) {
864     hdr_v10 = (struct struct_header_ipfix *) tpl_pkt;
865     hdr_len = sizeof(struct struct_header_ipfix);
866   }
867 
868   if (pkt_len < hdr_len) return;
869 
870   memcpy(dst_ptr, src_ptr, hdr_len);
871   src_ptr += hdr_len;
872   dst_ptr += hdr_len;
873   pkt_len -= hdr_len;
874   tmp_len += hdr_len;
875 
876   if (nfv == 9) {
877     flowsetNo = htons(hdr_v9->count);
878     term1 = (flowsetNo + 1); /* trick to use > operator */
879     term2 = flowsetCount;
880   }
881   else if (nfv == 10) {
882     term1 = pkt_len;
883     term2 = 0;
884   }
885 
886   hdr_flowset = (struct data_hdr_v9 *) src_ptr;
887 
888   while (term1 > term2) {
889     int fset_id = ntohs(hdr_flowset->flow_id);
890     int fset_len = ntohs(hdr_flowset->flow_len);
891     int fset_hdr_len = sizeof(struct data_hdr_v9);
892 
893     if (!fset_len || (fset_hdr_len + fset_len) > pkt_len) break;
894 
895     /* if template, copy over */
896     if (((nfv == 9) && (fset_id == 0 || fset_id == 1)) ||
897        ((nfv == 10) && (fset_id == 2 || fset_id == 3))) {
898       memcpy(dst_ptr, src_ptr, fset_len);
899 
900       src_ptr += fset_len;
901       dst_ptr += fset_len;
902 
903       pkt_len -= fset_len;
904       tmp_len += fset_len;
905 
906       flowsetTplCount++;
907     }
908     /* if data, skip */
909     else {
910       src_ptr += fset_len;
911       pkt_len -= fset_len;
912     }
913 
914     if (nfv == 9) {
915       flowsetCount++;
916       term2 = flowsetCount;
917     }
918     else if (nfv == 10) {
919       term1 = pkt_len;
920     }
921 
922     hdr_flowset = (struct data_hdr_v9 *) src_ptr;
923   }
924 
925   /* if we have at least one template, let's update the template packet */
926   if (flowsetTplCount) {
927     if (nfv == 9) {
928       hdr_v9->count = htons(flowsetTplCount);
929     }
930     else if (nfv == 10) {
931       hdr_v10->len = htons(tmp_len);
932     }
933 
934     (*tpl_pkt_len) = tmp_len;
935   }
936 }
937