1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2019 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if no, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "thread_pool.h"
26 #include "plugin_hooks.h"
27 #include "plugin_common.h"
28 #include "pkt_handlers.h"
29 
30 /* functions */
31 
32 /* load_plugins() starts plugin processes; creates pipes
33    and handles them inserting in channels_list structure */
34 
35 /* no AMQP: when not using map_shared, 'pipe_size' is the size of the pipe
36    created with socketpair(); when map_shared is enabled, it refers to the
37    size of the shared memory area */
load_plugins(struct plugin_requests * req)38 void load_plugins(struct plugin_requests *req)
39 {
40   u_int64_t buf_pipe_ratio_sz = 0, pipe_idx = 0;
41   int snd_buflen = 0, rcv_buflen = 0, socklen = 0, target_buflen = 0;
42 
43   int nfprobe_id = 0, min_sz = 0, extra_sz = 0, offset = 0;
44   struct plugins_list_entry *list = plugins_list;
45   socklen_t l = sizeof(list->cfg.pipe_size);
46   struct channels_list_entry *chptr = NULL;
47 
48 
49   init_random_seed();
50   init_pipe_channels();
51 
52 #ifdef WITH_ZMQ
53   char username[SHORTBUFLEN], password[SHORTBUFLEN];
54   memset(username, 0, sizeof(username));
55   memset(password, 0, sizeof(password));
56 
57   generate_random_string(username, (sizeof(username) - 1));
58   generate_random_string(password, (sizeof(password) - 1));
59 #endif
60 
61   while (list) {
62     if ((*list->type.func)) {
63       if (list->cfg.data_type & (PIPE_TYPE_METADATA|PIPE_TYPE_PAYLOAD|PIPE_TYPE_MSG));
64       else {
65 	Log(LOG_ERR, "ERROR ( %s/%s ): Data type not supported: %d\n", list->name, list->type.string, list->cfg.data_type);
66 	exit_gracefully(1);
67       }
68 
69       min_sz = ChBufHdrSz;
70       list->cfg.buffer_immediate = FALSE;
71       if (list->cfg.data_type & PIPE_TYPE_METADATA) min_sz += PdataSz;
72       if (list->cfg.data_type & PIPE_TYPE_PAYLOAD) {
73 	if (list->cfg.acct_type == ACCT_PM && list->cfg.snaplen) min_sz += (PpayloadSz+list->cfg.snaplen);
74 	else min_sz += (PpayloadSz+DEFAULT_PLOAD_SIZE);
75       }
76       if (list->cfg.data_type & PIPE_TYPE_EXTRAS) min_sz += PextrasSz;
77       if (list->cfg.data_type & PIPE_TYPE_MSG) {
78 	min_sz += PmsgSz;
79         if (!list->cfg.buffer_size) {
80           extra_sz = PKT_MSG_SIZE;
81           list->cfg.buffer_immediate = TRUE;
82         }
83       }
84       if (list->cfg.data_type & PIPE_TYPE_BGP) min_sz += sizeof(struct pkt_bgp_primitives);
85       if (list->cfg.data_type & PIPE_TYPE_LBGP) min_sz += sizeof(struct pkt_legacy_bgp_primitives);
86       if (list->cfg.data_type & PIPE_TYPE_NAT) min_sz += sizeof(struct pkt_nat_primitives);
87       if (list->cfg.data_type & PIPE_TYPE_TUN) min_sz += sizeof(struct pkt_tunnel_primitives);
88       if (list->cfg.data_type & PIPE_TYPE_MPLS) min_sz += sizeof(struct pkt_mpls_primitives);
89       if (list->cfg.cpptrs.len) min_sz += list->cfg.cpptrs.len;
90       if (list->cfg.data_type & PIPE_TYPE_VLEN) {
91 	min_sz += sizeof(struct pkt_vlen_hdr_primitives);
92 	if (!list->cfg.buffer_size) {
93 	  extra_sz = 1024; /* wild shot: 1Kb added for the actual variable-length data */
94 	  list->cfg.buffer_immediate = TRUE;
95 	}
96       }
97 
98       /* If nothing is supplied, let's hint some working default values */
99       if (!list->cfg.pipe_size || !list->cfg.buffer_size) {
100         if (!list->cfg.pipe_size) list->cfg.pipe_size = 4096000; /* 4Mb */
101         if (!list->cfg.buffer_size) {
102 	  if (list->cfg.pcap_savefile) list->cfg.buffer_size = 10240; /* 10Kb */
103 	  else list->cfg.buffer_size = MIN((min_sz + extra_sz), 10240);
104 	}
105       }
106 
107       /* some validations */
108       if (list->cfg.pipe_size < min_sz) list->cfg.pipe_size = min_sz;
109       if (list->cfg.buffer_size < min_sz) list->cfg.buffer_size = min_sz;
110       if (list->cfg.buffer_size > list->cfg.pipe_size) list->cfg.buffer_size = list->cfg.pipe_size;
111 
112       /*  if required let's align plugin_buffer_size to  4 bytes boundary */
113 #if NEED_ALIGN
114       while (list->cfg.buffer_size % 4 != 0) list->cfg.buffer_size--;
115 #endif
116 
117       if (!list->cfg.pipe_zmq) {
118         /* creating communication channel */
119         socketpair(AF_UNIX, SOCK_DGRAM, 0, list->pipe);
120 
121         /* checking SO_RCVBUF and SO_SNDBUF values; if different we take the smaller one */
122         getsockopt(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &rcv_buflen, &l);
123         getsockopt(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &snd_buflen, &l);
124         socklen = (rcv_buflen < snd_buflen) ? rcv_buflen : snd_buflen;
125 
126         buf_pipe_ratio_sz = (list->cfg.pipe_size/list->cfg.buffer_size)*sizeof(char *);
127         if (buf_pipe_ratio_sz > INT_MAX) {
128 	  Log(LOG_ERR, "ERROR ( %s/%s ): Current plugin_buffer_size elems per plugin_pipe_size: %llu. Max: %d.\nExiting.\n",
129 		list->name, list->type.string, (unsigned long long)(list->cfg.pipe_size/list->cfg.buffer_size),
130 		(INT_MAX/(int)sizeof(char *)));
131           exit_gracefully(1);
132         }
133         else target_buflen = buf_pipe_ratio_sz;
134 
135         if (target_buflen > socklen) {
136 	  Setsocksize(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &target_buflen, l);
137 	  Setsocksize(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &target_buflen, l);
138         }
139 
140         getsockopt(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &rcv_buflen, &l);
141         getsockopt(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &snd_buflen, &l);
142         if (rcv_buflen < snd_buflen) snd_buflen = rcv_buflen;
143 
144         if (snd_buflen < socklen) {
145 	  Setsocksize(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &socklen, l);
146 	  Setsocksize(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &socklen, l);
147 
148           getsockopt(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &rcv_buflen, &l);
149           getsockopt(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &snd_buflen, &l);
150           if (rcv_buflen < snd_buflen) snd_buflen = rcv_buflen;
151         }
152 
153         if (list->cfg.debug || (list->cfg.pipe_size > WARNING_PIPE_SIZE)) {
154 	  Log(LOG_INFO, "INFO ( %s/%s ): plugin_pipe_size=%" PRIu64 " bytes plugin_buffer_size=%" PRIu64 " bytes\n",
155 		list->name, list->type.string, list->cfg.pipe_size, list->cfg.buffer_size);
156 	  if (target_buflen <= snd_buflen)
157             Log(LOG_INFO, "INFO ( %s/%s ): ctrl channel: obtained=%d bytes target=%d bytes\n",
158 		list->name, list->type.string, snd_buflen, target_buflen);
159 	  else
160 	    /* This should return an error and exit but we fallback to a
161 	       warning in order to be backward compatible */
162             Log(LOG_WARNING, "WARN ( %s/%s ): ctrl channel: obtained=%d bytes target=%d bytes\n",
163 		list->name, list->type.string, snd_buflen, target_buflen);
164         }
165       }
166       else {
167 	pipe_idx++;
168         list->pipe[0] = list->pipe[1] = pipe_idx;
169       }
170 
171       list->cfg.name = list->name;
172       list->cfg.type = list->type.string;
173       list->cfg.type_id = list->type.id;
174       chptr = insert_pipe_channel(list->type.id, &list->cfg, list->pipe[1]);
175       if (!chptr) {
176 	Log(LOG_ERR, "ERROR ( %s/%s ): Unable to setup a new Core Process <-> Plugin channel.\nExiting.\n", list->name, list->type.string);
177 	exit_gracefully(1);
178       }
179       else chptr->plugin = list;
180 
181       /* sets new value to be assigned to 'wakeup'; 'TRUE' disables on-request wakeup */
182       if (list->type.id == PLUGIN_ID_MEMORY) chptr->request = TRUE;
183 
184       /* sets fixed/vlen offsets and cleaner routine; XXX: we should refine the cleaner
185 	 part: 1) ie. extras assumes it's automagically piled with metadata; 2) what if
186 	 multiple vlen components are stacked up? */
187       if (list->cfg.data_type & PIPE_TYPE_METADATA) {
188 	chptr->clean_func = pkt_data_clean;
189 	offset = sizeof(struct pkt_data);
190       }
191       if (list->cfg.data_type & PIPE_TYPE_PAYLOAD) chptr->clean_func = pkt_payload_clean;
192 
193       if (list->cfg.data_type & PIPE_TYPE_EXTRAS) {
194 	chptr->extras.off_pkt_extras = offset;
195 	offset += sizeof(struct pkt_extras);
196       }
197       if (list->cfg.data_type & PIPE_TYPE_MSG) chptr->clean_func = pkt_msg_clean;
198 
199       if (list->cfg.data_type & PIPE_TYPE_BGP) {
200         chptr->extras.off_pkt_bgp_primitives = offset;
201 	offset += sizeof(struct pkt_bgp_primitives);
202       }
203       else chptr->extras.off_pkt_bgp_primitives = 0;
204 
205       if (list->cfg.data_type & PIPE_TYPE_LBGP) {
206         chptr->extras.off_pkt_lbgp_primitives = offset;
207         offset += sizeof(struct pkt_legacy_bgp_primitives);
208       }
209       else chptr->extras.off_pkt_lbgp_primitives = 0;
210 
211       if (list->cfg.data_type & PIPE_TYPE_NAT) {
212         chptr->extras.off_pkt_nat_primitives = offset;
213         offset += sizeof(struct pkt_nat_primitives);
214       }
215       else chptr->extras.off_pkt_nat_primitives = 0;
216 
217       if (list->cfg.data_type & PIPE_TYPE_TUN) {
218         chptr->extras.off_pkt_tun_primitives = offset;
219         offset += sizeof(struct pkt_tunnel_primitives);
220       }
221       else chptr->extras.off_pkt_tun_primitives = 0;
222 
223       if (list->cfg.data_type & PIPE_TYPE_MPLS) {
224         chptr->extras.off_pkt_mpls_primitives = offset;
225         offset += sizeof(struct pkt_mpls_primitives);
226       }
227       else chptr->extras.off_pkt_mpls_primitives = 0;
228 
229       if (list->cfg.cpptrs.len) {
230 	chptr->extras.off_custom_primitives = offset;
231 	offset += list->cfg.cpptrs.len;
232       }
233       /* PIPE_TYPE_VLEN at the end of the stack so to not make
234 	 vlen other structures (although possible it would not
235 	 make much sense) */
236       if (list->cfg.data_type & PIPE_TYPE_VLEN) {
237         chptr->extras.off_pkt_vlen_hdr_primitives = offset;
238         offset += sizeof(struct pkt_vlen_hdr_primitives);
239       }
240       else chptr->extras.off_pkt_vlen_hdr_primitives = 0;
241       /* any further offset beyond this point must be set to
242          PM_VARIABLE_LENGTH so to indicate plugins to resolve
243          value at runtime. */
244 
245       chptr->datasize = min_sz-ChBufHdrSz;
246 
247       /* sets nfprobe ID */
248       if (list->type.id == PLUGIN_ID_NFPROBE) {
249 	list->cfg.nfprobe_id = nfprobe_id;
250 	nfprobe_id++;
251       }
252 
253       /* ZMQ inits, if required */
254 #ifdef WITH_ZMQ
255       if (list->cfg.pipe_zmq) {
256 	char log_id[LARGEBUFLEN];
257 
258 	p_zmq_plugin_pipe_init_core(&chptr->zmq_host, list->id, username, password);
259 	snprintf(log_id, sizeof(log_id), "%s/%s", list->name, list->type.string);
260 	p_zmq_set_log_id(&chptr->zmq_host, log_id);
261 	p_zmq_pub_setup(&chptr->zmq_host);
262       }
263 #endif
264 
265       switch (list->pid = fork()) {
266       case -1: /* Something went wrong */
267 	Log(LOG_WARNING, "WARN ( %s/%s ): Unable to initialize plugin: %s\n", list->name, list->type.string, strerror(errno));
268 	delete_pipe_channel(list->pipe[1]);
269 	break;
270       case 0: /* Child */
271 	/* SIGCHLD handling issue: SysV avoids zombies by ignoring SIGCHLD; to emulate
272 	   such semantics on BSD systems, we need an handler like handle_falling_child() */
273 #if defined (SOLARIS)
274 	signal(SIGCHLD, SIG_IGN);
275 #else
276 	signal(SIGCHLD, ignore_falling_child);
277 #endif
278 
279 #if defined HAVE_MALLOPT
280         mallopt(M_CHECK_ACTION, 0);
281 #endif
282 
283 	if (device.dev_desc) pcap_close(device.dev_desc);
284 	close(config.sock);
285 	close(config.bgp_sock);
286 	if (!list->cfg.pipe_zmq) close(list->pipe[1]);
287 	(*list->type.func)(list->pipe[0], &list->cfg, chptr);
288 	exit_gracefully(0);
289       default: /* Parent */
290 	if (!list->cfg.pipe_zmq) {
291 	  close(list->pipe[0]);
292 	  setnonblocking(list->pipe[1]);
293 	}
294 	break;
295       }
296 
297       /* some residual check */
298       if (chptr && list->cfg.a_filter) req->bpf_filter = TRUE;
299     }
300     list = list->next;
301   }
302 
303   sort_pipe_channels();
304 
305   /* define pre_tag_map(s) now so that they don't finish unnecessarily in plugin memory space */
306   {
307     int ptm_index = 0, ptm_global = FALSE;
308     char *ptm_ptr = NULL;
309 
310     list = plugins_list;
311 
312     while (list) {
313       if (list->cfg.pre_tag_map) {
314         if (!ptm_index) {
315           ptm_ptr = list->cfg.pre_tag_map;
316           ptm_global = TRUE;
317         }
318         else {
319           if (!ptm_ptr || strcmp(ptm_ptr, list->cfg.pre_tag_map))
320             ptm_global = FALSE;
321         }
322 
323 	if (list->cfg.type_id == PLUGIN_ID_TEE) {
324 	  req->ptm_c.load_ptm_plugin = list->cfg.type_id;
325 	  req->ptm_c.load_ptm_res = FALSE;
326 	}
327 
328         load_pre_tag_map(config.acct_type, list->cfg.pre_tag_map, &list->cfg.ptm, req, &list->cfg.ptm_alloc,
329                          list->cfg.maps_entries, list->cfg.maps_row_len);
330 
331 	if (list->cfg.type_id == PLUGIN_ID_TEE) {
332 	  list->cfg.ptm_complex = req->ptm_c.load_ptm_res;
333 	  if (req->ptm_c.load_ptm_res) req->ptm_c.exec_ptm_dissect = TRUE;
334 	}
335       }
336 
337       list = list->next;
338       ptm_index++;
339     }
340 
341     /* enforcing global flag */
342     list = plugins_list;
343 
344     while (list) {
345       list->cfg.ptm_global = ptm_global;
346       list = list->next;
347     }
348   }
349 }
350 
exec_plugins(struct packet_ptrs * pptrs,struct plugin_requests * req)351 void exec_plugins(struct packet_ptrs *pptrs, struct plugin_requests *req)
352 {
353   int saved_have_tag = FALSE, saved_have_tag2 = FALSE, saved_have_label = FALSE;
354   pm_id_t saved_tag = 0, saved_tag2 = 0;
355   pt_label_t *saved_label = malloc(sizeof(pt_label_t));
356 
357   int num, fixed_size;
358   u_int32_t savedptr;
359   char *bptr;
360   int index, got_tags = FALSE;
361 
362   pretag_init_label(saved_label);
363 
364 #if defined WITH_GEOIPV2
365   if (reload_geoipv2_file && config.geoipv2_file) {
366     pm_geoipv2_close();
367     pm_geoipv2_init();
368 
369     reload_geoipv2_file = FALSE;
370   }
371 #endif
372 
373   for (index = 0; channels_list[index].aggregation || channels_list[index].aggregation_2; index++) {
374     struct plugins_list_entry *p = channels_list[index].plugin;
375 
376     channels_list[index].already_reprocessed = FALSE;
377 
378     if (p->cfg.pre_tag_map && find_id_func) {
379       if (p->cfg.type_id == PLUGIN_ID_TEE) {
380 	/*
381 	   replicate and compute tagging if:
382 	   - a dissected flow hits a complex pre_tag_map or
383 	   - a non-dissected (full) packet hits a simple pre_tag_map
384 	*/
385 	if ((req->ptm_c.exec_ptm_res && !p->cfg.ptm_complex) || (!req->ptm_c.exec_ptm_res && p->cfg.ptm_complex))
386 	  continue;
387       }
388 
389       if (p->cfg.ptm_global && got_tags) {
390         pptrs->tag = saved_tag;
391         pptrs->tag2 = saved_tag2;
392 	pretag_copy_label(&pptrs->label, saved_label);
393 
394         pptrs->have_tag = saved_have_tag;
395         pptrs->have_tag2 = saved_have_tag2;
396         pptrs->have_label = saved_have_label;
397       }
398       else {
399 	if (p->cfg.type_id == PLUGIN_ID_TEE && req->ptm_c.exec_ptm_res && pptrs->tee_dissect_bcast) /* noop */;
400         else {
401 	  find_id_func(&p->cfg.ptm, pptrs, &pptrs->tag, &pptrs->tag2);
402 
403 	  if (p->cfg.ptm_global) {
404 	    saved_tag = pptrs->tag;
405 	    saved_tag2 = pptrs->tag2;
406 	    pretag_copy_label(saved_label, &pptrs->label);
407 
408 	    saved_have_tag = pptrs->have_tag;
409 	    saved_have_tag2 = pptrs->have_tag2;
410 	    saved_have_label = pptrs->have_label;
411 
412             got_tags = TRUE;
413 	  }
414         }
415       }
416     }
417     else {
418       if (p->cfg.type_id == PLUGIN_ID_TEE) {
419         /* stop dissected flows from being replicated in case of no pre_tag_map */
420         if (req->ptm_c.exec_ptm_res) continue;
421       }
422     }
423 
424     if (evaluate_filters(&channels_list[index].agg_filter, pptrs->packet_ptr, pptrs->pkthdr) &&
425         !evaluate_tags(&channels_list[index].tag_filter, pptrs->tag) &&
426         !evaluate_tags(&channels_list[index].tag2_filter, pptrs->tag2) &&
427         !evaluate_labels(&channels_list[index].label_filter, &pptrs->label) &&
428 	!check_shadow_status(pptrs, &channels_list[index])) {
429       /* arranging buffer: supported primitives + packet total length */
430 reprocess:
431       channels_list[index].reprocess = FALSE;
432       num = 0;
433 
434       /* rg.ptr points to slot's base address into the ring (shared memory); bufptr works
435 	 as a displacement into the slot to place sequentially packets */
436       bptr = channels_list[index].rg.ptr+ChBufHdrSz+channels_list[index].bufptr;
437       fixed_size = (*channels_list[index].clean_func)(bptr, channels_list[index].datasize);
438       channels_list[index].var_size = 0;
439       savedptr = channels_list[index].bufptr;
440       reset_fallback_status(pptrs);
441 
442       while (channels_list[index].phandler[num]) {
443         (*channels_list[index].phandler[num])(&channels_list[index], pptrs, &bptr);
444         num++;
445       }
446 
447       if (channels_list[index].s.rate && !channels_list[index].s.sampled_pkts) {
448 	channels_list[index].reprocess = FALSE;
449 	channels_list[index].bufptr = savedptr;
450 	channels_list[index].hdr.num--; /* let's cheat this value as it will get increased later */
451 	fixed_size = 0;
452 	channels_list[index].var_size = 0;
453       }
454 
455       if (channels_list[index].reprocess) {
456         /* Let's check if we have an issue with the buffer size */
457         if (channels_list[index].already_reprocessed) {
458           struct plugins_list_entry *list = channels_list[index].plugin;
459 
460           Log(LOG_ERR, "ERROR ( %s/%s ): plugin_buffer_size is too short.\n", list->name, list->type.string);
461           exit_gracefully(1);
462         }
463 
464         channels_list[index].already_reprocessed = TRUE;
465 
466 	/* Let's cheat the size in order to send out the current buffer */
467 	fixed_size = channels_list[index].plugin->cfg.pipe_size;
468       }
469       else {
470         channels_list[index].hdr.num++;
471         channels_list[index].bufptr += (fixed_size + channels_list[index].var_size);
472       }
473 
474       if (((channels_list[index].bufptr + fixed_size) > channels_list[index].bufend) ||
475 	  (channels_list[index].hdr.num == INT_MAX) || channels_list[index].buffer_immediate) {
476 	channels_list[index].hdr.seq++;
477 	channels_list[index].hdr.seq %= MAX_SEQNUM;
478 
479 	/* let's commit the buffer we just finished writing */
480 	((struct ch_buf_hdr *)channels_list[index].rg.ptr)->len = channels_list[index].bufptr;
481 	((struct ch_buf_hdr *)channels_list[index].rg.ptr)->seq = channels_list[index].hdr.seq;
482 	((struct ch_buf_hdr *)channels_list[index].rg.ptr)->num = channels_list[index].hdr.num;
483 
484 	channels_list[index].status->last_buf_off = (u_int64_t)(channels_list[index].rg.ptr - channels_list[index].rg.base);
485 
486         if (config.debug_internal_msg) {
487 	  struct plugins_list_entry *list = channels_list[index].plugin;
488 	  Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer released len=%" PRIu64 " seq=%u num_entries=%u off=%" PRIu64 "\n",
489 		list->name, list->type.string, channels_list[index].bufptr, channels_list[index].hdr.seq,
490 		channels_list[index].hdr.num, channels_list[index].status->last_buf_off);
491 	}
492 
493 	/* sending buffer to connected ZMQ subscriber(s) */
494 	if (channels_list[index].plugin->cfg.pipe_zmq) {
495 #ifdef WITH_ZMQ
496           struct channels_list_entry *chptr = &channels_list[index];
497 
498 	  int ret = p_zmq_topic_send(&chptr->zmq_host, chptr->rg.ptr, chptr->bufsize);
499           (void)ret; //Check error?
500 #endif
501 	}
502 	else {
503 	  if (channels_list[index].status->wakeup) {
504 	    channels_list[index].status->wakeup = channels_list[index].request;
505 	    if (write(channels_list[index].pipe, &channels_list[index].rg.ptr, CharPtrSz) != CharPtrSz) {
506 	      struct plugins_list_entry *list = channels_list[index].plugin;
507 	      Log(LOG_WARNING, "WARN ( %s/%s ): Failed during write: %s\n", list->name, list->type.string, strerror(errno));
508 	    }
509 	  }
510 	}
511 
512 	channels_list[index].rg.ptr += channels_list[index].bufsize;
513 
514 	if ((channels_list[index].rg.ptr+channels_list[index].bufsize) > channels_list[index].rg.end)
515 	  channels_list[index].rg.ptr = channels_list[index].rg.base;
516 
517 	/* let's protect the buffer we are going to write */
518         ((struct ch_buf_hdr *)channels_list[index].rg.ptr)->seq = -1;
519         ((struct ch_buf_hdr *)channels_list[index].rg.ptr)->num = 0;
520 
521         /* rewind pointer */
522         channels_list[index].bufptr = channels_list[index].buf;
523         channels_list[index].hdr.num = 0;
524 
525 	if (channels_list[index].reprocess) goto reprocess;
526 
527 	/* if reading from a savefile, let's sleep a bit after
528 	   having sent over a buffer worth of data */
529 	if (channels_list[index].plugin->cfg.pcap_savefile) usleep(1000); /* 1 msec */
530       }
531     }
532 
533     pptrs->tag = 0;
534     pptrs->tag2 = 0;
535     pretag_free_label(&pptrs->label);
536   }
537 
538   /* check if we have to reload the map: new loop is to
539      ensure we reload it for all plugins and prevent any
540      timing issues with pointers to labels */
541   if (reload_map_exec_plugins) {
542     memset(&req->ptm_c, 0, sizeof(struct ptm_complex));
543 
544     for (index = 0; channels_list[index].aggregation || channels_list[index].aggregation_2; index++) {
545       struct plugins_list_entry *p = channels_list[index].plugin;
546 
547       if (p->cfg.pre_tag_map && find_id_func) {
548         if (p->cfg.type_id == PLUGIN_ID_TEE) {
549           req->ptm_c.load_ptm_plugin = p->cfg.type_id;
550           req->ptm_c.load_ptm_res = FALSE;
551         }
552 
553         load_pre_tag_map(config.acct_type, p->cfg.pre_tag_map, &p->cfg.ptm, req, &p->cfg.ptm_alloc,
554                          p->cfg.maps_entries, p->cfg.maps_row_len);
555 
556         if (p->cfg.type_id == PLUGIN_ID_TEE) {
557           p->cfg.ptm_complex = req->ptm_c.load_ptm_res;
558           if (req->ptm_c.load_ptm_res) req->ptm_c.exec_ptm_dissect = TRUE;
559         }
560       }
561     }
562   }
563 
564   /* cleanups */
565   reload_map_exec_plugins = FALSE;
566   pretag_free_label(saved_label);
567   if (saved_label) free(saved_label);
568 }
569 
insert_pipe_channel(int plugin_type,struct configuration * cfg,int pipe)570 struct channels_list_entry *insert_pipe_channel(int plugin_type, struct configuration *cfg, int pipe)
571 {
572   struct channels_list_entry *chptr;
573   int index = 0;
574 
575   while (index < MAX_N_PLUGINS) {
576     chptr = &channels_list[index];
577     if (!chptr->aggregation && !chptr->aggregation_2) { /* found room */
578       chptr->aggregation = cfg->what_to_count;
579       chptr->aggregation_2 = cfg->what_to_count_2;
580       chptr->pipe = pipe;
581       chptr->agg_filter.table = cfg->bpfp_a_table;
582       chptr->agg_filter.num = (int *) &cfg->bpfp_a_num;
583       chptr->bufsize = cfg->buffer_size;
584       chptr->buffer_immediate = cfg->buffer_immediate;
585       chptr->core_pid = getpid();
586       chptr->tag = cfg->post_tag;
587       chptr->tag2 = cfg->post_tag2;
588       if (cfg->sampling_rate && plugin_type != PLUGIN_ID_SFPROBE) { /* sfprobe cares for itself */
589 	chptr->s.rate = cfg->sampling_rate;
590 
591 	if (cfg->acct_type == ACCT_NF) chptr->s.sf = &take_simple_systematic_skip;
592 	else chptr->s.sf = &take_simple_random_skip;
593       }
594       memcpy(&chptr->tag_filter, &cfg->ptf, sizeof(struct pretag_filter));
595       memcpy(&chptr->tag2_filter, &cfg->pt2f, sizeof(struct pretag_filter));
596       memcpy(&chptr->label_filter, &cfg->ptlf, sizeof(struct pretag_label_filter));
597       chptr->buf = 0;
598       chptr->bufptr = chptr->buf;
599       chptr->bufend = cfg->buffer_size-sizeof(struct ch_buf_hdr);
600 
601       // XXX: no need to map_shared() if using AMQP
602       /* +PKT_MSG_SIZE has been introduced as a margin as a
603          countermeasure against the reception of malicious NetFlow v9
604 	 templates */
605       chptr->rg.base = map_shared(0, cfg->pipe_size+PKT_MSG_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
606       if (chptr->rg.base == MAP_FAILED) {
607         Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate pipe buffer. Exiting ...\n", cfg->name, cfg->type);
608 	exit_gracefully(1);
609       }
610       memset(chptr->rg.base, 0, cfg->pipe_size);
611       chptr->rg.ptr = chptr->rg.base;
612       chptr->rg.end = chptr->rg.base+cfg->pipe_size;
613 
614       chptr->status = map_shared(0, sizeof(struct ch_status), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
615       if (chptr->status == MAP_FAILED) {
616         Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate status buffer. Exiting ...\n", cfg->name, cfg->type);
617         exit_gracefully(1);
618       }
619       memset(chptr->status, 0, sizeof(struct ch_status));
620 
621       break;
622     }
623     else chptr = NULL;
624 
625     index++;
626   }
627 
628   return chptr;
629 }
630 
delete_pipe_channel(int pipe)631 void delete_pipe_channel(int pipe)
632 {
633   struct channels_list_entry *chptr;
634   int index = 0, index2;
635 
636   while (index < MAX_N_PLUGINS) {
637     chptr = &channels_list[index];
638 
639     if (chptr->pipe == pipe) {
640       chptr->aggregation = FALSE;
641       chptr->aggregation_2 = FALSE;
642 
643       /* we ensure that any plugin is depending on the one
644 	 being removed via the 'same_aggregate' flag */
645       if (!chptr->same_aggregate) {
646 	index2 = index;
647 	for (index2++; index2 < MAX_N_PLUGINS; index2++) {
648 	  chptr = &channels_list[index2];
649 
650 	  if (!chptr->aggregation && !chptr->aggregation_2) break; /* we finished channels */
651 	  if (chptr->same_aggregate) {
652 	    chptr->same_aggregate = FALSE;
653 	    break;
654 	  }
655 	  else break; /* we have nothing to do */
656 	}
657       }
658 
659       index2 = index;
660       for (index2++; index2 < MAX_N_PLUGINS; index2++) {
661 	chptr = &channels_list[index2];
662 	if (chptr->aggregation || chptr->aggregation_2) {
663 	  memcpy(&channels_list[index], chptr, sizeof(struct channels_list_entry));
664 	  memset(chptr, 0, sizeof(struct channels_list_entry));
665 	  index++;
666 	}
667 	else break; /* we finished channels */
668       }
669 
670       break;
671     }
672 
673     index++;
674   }
675 }
676 
677 /* trivial sorting(tm) :-) */
sort_pipe_channels()678 void sort_pipe_channels()
679 {
680   struct channels_list_entry ctmp;
681   int x = 0, y = 0;
682 
683   while (x < MAX_N_PLUGINS) {
684     if (!channels_list[x].aggregation && !channels_list[x].aggregation_2) break;
685     y = x+1;
686     while (y < MAX_N_PLUGINS) {
687       if (!channels_list[y].aggregation && !channels_list[y].aggregation_2) break;
688       if (channels_list[x].aggregation == channels_list[y].aggregation &&
689           channels_list[x].aggregation_2 == channels_list[y].aggregation_2) {
690 	channels_list[y].same_aggregate = TRUE;
691 	if (y == x+1) x++;
692 	else {
693 	  memcpy(&ctmp, &channels_list[x+1], sizeof(struct channels_list_entry));
694 	  memcpy(&channels_list[x+1], &channels_list[y], sizeof(struct channels_list_entry));
695 	  memcpy(&channels_list[y], &ctmp, sizeof(struct channels_list_entry));
696 	  x++;
697 	}
698       }
699       y++;
700     }
701     x++;
702   }
703 }
704 
init_pipe_channels()705 void init_pipe_channels()
706 {
707   memset(&channels_list, 0, MAX_N_PLUGINS*sizeof(struct channels_list_entry));
708 }
709 
evaluate_sampling(struct sampling * smp,pm_counter_t * pkt_len,pm_counter_t * pkt_num,pm_counter_t * sample_pool)710 void evaluate_sampling(struct sampling *smp, pm_counter_t *pkt_len, pm_counter_t *pkt_num, pm_counter_t *sample_pool)
711 {
712   pm_counter_t delta, pkts = *pkt_num;
713 
714   if (!smp->rate) { /* sampling is disabled */
715     smp->sample_pool = pkts;
716     smp->sampled_pkts = pkts;
717     return;
718   }
719 
720   smp->sampled_pkts = 0;
721 
722 run_again:
723   if (!smp->counter) smp->counter = (smp->sf)(smp->rate);
724 
725   delta = MIN(smp->counter, pkts);
726   smp->counter -= delta;
727   pkts -= delta;
728   smp->sample_pool += delta;
729 
730   if (!smp->counter) {
731     smp->sampled_pkts++;
732     *sample_pool = smp->sample_pool;
733     smp->sample_pool = 0;
734     if (pkts > 0) goto run_again;
735   }
736 
737   /* Let's handle flows meaningfully */
738   if (smp->sampled_pkts && *pkt_num > 1) {
739     *pkt_len = ( *pkt_len / *pkt_num ) * smp->sampled_pkts;
740     *pkt_num = smp->sampled_pkts;
741   }
742 }
743 
744 /* simple random algorithm */
take_simple_random_skip(pm_counter_t mean)745 pm_counter_t take_simple_random_skip(pm_counter_t mean)
746 {
747   pm_counter_t skip;
748 
749   if (mean > 1) {
750     skip = ((random() % ((2 * mean) - 1)) + 1);
751     srandom(random());
752   }
753   else skip = 1; /* smp->rate == 1 */
754 
755   return skip;
756 }
757 
758 /* simple systematic algorithm */
take_simple_systematic_skip(pm_counter_t mean)759 pm_counter_t take_simple_systematic_skip(pm_counter_t mean)
760 {
761   pm_counter_t skip = mean;
762 
763   return skip;
764 }
765 
766 /* return value:
767    TRUE: We want it!
768    FALSE: Discard it!
769 */
evaluate_filters(struct aggregate_filter * filter,u_char * pkt,struct pcap_pkthdr * pkthdr)770 int evaluate_filters(struct aggregate_filter *filter, u_char *pkt, struct pcap_pkthdr *pkthdr)
771 {
772   int index;
773 
774   if (*filter->num == 0) return TRUE;  /* no entries in the filter array: aggregate filtering disabled */
775 
776   for (index = 0; index < *filter->num; index++) {
777     if (bpf_filter(filter->table[index]->bf_insns, pkt, pkthdr->len, pkthdr->caplen)) return TRUE;
778   }
779 
780   return FALSE;
781 }
782 
recollect_pipe_memory(struct channels_list_entry * mychptr)783 void recollect_pipe_memory(struct channels_list_entry *mychptr)
784 {
785   struct channels_list_entry *chptr;
786   int index = 0;
787 
788   while (index < MAX_N_PLUGINS) {
789     chptr = &channels_list[index];
790     if (mychptr->rg.base != chptr->rg.base) {
791       munmap(chptr->rg.base, (chptr->rg.end-chptr->rg.base)+PKT_MSG_SIZE);
792       munmap(chptr->status, sizeof(struct ch_status));
793     }
794     index++;
795   }
796 }
797 
init_random_seed()798 void init_random_seed()
799 {
800   struct timeval tv;
801 
802   gettimeofday(&tv, NULL);
803   srandom((unsigned int)tv.tv_usec);
804 }
805 
fill_pipe_buffer()806 void fill_pipe_buffer()
807 {
808   struct channels_list_entry *chptr;
809   int index;
810 
811   for (index = 0; channels_list[index].aggregation || channels_list[index].aggregation_2; index++) {
812     chptr = &channels_list[index];
813 
814     chptr->hdr.seq++;
815     chptr->hdr.seq %= MAX_SEQNUM;
816 
817     ((struct ch_buf_hdr *)chptr->rg.ptr)->seq = chptr->hdr.seq;
818     ((struct ch_buf_hdr *)chptr->rg.ptr)->num = chptr->hdr.num;
819 
820     if (chptr->plugin->cfg.pipe_zmq) {
821 #ifdef WITH_ZMQ
822       p_zmq_topic_send(&chptr->zmq_host, chptr->rg.ptr, chptr->bufsize);
823 #endif
824     }
825     else {
826       if (chptr->status->wakeup) {
827         chptr->status->wakeup = chptr->request;
828         if (write(chptr->pipe, &chptr->rg.ptr, CharPtrSz) != CharPtrSz)
829 	  Log(LOG_WARNING, "WARN ( %s/%s ): Failed during write: %s\n", chptr->plugin->cfg.name, chptr->plugin->cfg.type, strerror(errno));
830       }
831     }
832   }
833 }
834 
check_pipe_buffer_space(struct channels_list_entry * mychptr,struct pkt_vlen_hdr_primitives * pvlen,int len)835 int check_pipe_buffer_space(struct channels_list_entry *mychptr, struct pkt_vlen_hdr_primitives *pvlen, int len)
836 {
837   int buf_space = 0;
838 
839   if (!mychptr) return ERR;
840 
841   /* init to base of current element */
842   buf_space = mychptr->bufend - mychptr->bufptr;
843 
844   /* subtract fixed part, current variable part and new var part (len) */
845   buf_space -= mychptr->datasize;
846   if (pvlen) buf_space -= pvlen->tot_len;
847   buf_space -= len;
848 
849   /* return virdict. if positive fix sizes. if negative take care of triggering a reprocess */
850   if (buf_space >= 0) {
851     mychptr->var_size += len;
852     return FALSE;
853   }
854   else {
855     mychptr->bufptr += (mychptr->bufend - mychptr->bufptr);
856     mychptr->reprocess = TRUE;
857     mychptr->var_size = 0;
858 
859     return TRUE;
860   }
861 }
862 
return_pipe_buffer_space(struct channels_list_entry * mychptr,int len)863 void return_pipe_buffer_space(struct channels_list_entry *mychptr, int len)
864 {
865   if (!mychptr || !len) return;
866 
867   if (mychptr->var_size < len) return;
868 
869   mychptr->var_size -= len;
870 }
871 
check_shadow_status(struct packet_ptrs * pptrs,struct channels_list_entry * mychptr)872 int check_shadow_status(struct packet_ptrs *pptrs, struct channels_list_entry *mychptr)
873 {
874   if (pptrs->shadow) {
875     if (pptrs->tag && mychptr->aggregation & COUNT_TAG) return FALSE;
876     else if (pptrs->tag2 && mychptr->aggregation & COUNT_TAG2) return FALSE;
877     else return TRUE;
878   }
879   else return FALSE;
880 }
881 
load_plugin_filters(int link_type)882 void load_plugin_filters(int link_type)
883 {
884   struct plugins_list_entry *list = plugins_list;
885 
886   while (list) {
887     if ((*list->type.func)) {
888 
889       /* compiling aggregation filter if needed */
890       if (list->cfg.a_filter) {
891 	pcap_t *dev_desc;
892 	bpf_u_int32 localnet, netmask = 0;  /* pcap library stuff */
893 	char errbuf[PCAP_ERRBUF_SIZE], *count_token;
894 	int idx = 0;
895 
896 	dev_desc = pcap_open_dead(link_type, 128); /* 128 bytes should be long enough */
897 
898 	if (config.pcap_if) pcap_lookupnet(config.pcap_if, &localnet, &netmask, errbuf);
899 
900 	list->cfg.bpfp_a_table[idx] = malloc(sizeof(struct bpf_program));
901 	while ( (count_token = extract_token(&list->cfg.a_filter, ',')) && idx < AGG_FILTER_ENTRIES ) {
902 	  if (pcap_compile(dev_desc, list->cfg.bpfp_a_table[idx], count_token, 0, netmask) < 0) {
903 	    Log(LOG_WARNING, "WARN: %s\nWARN ( %s/%s ): aggregation filter disabled.\n",
904 	    				pcap_geterr(dev_desc), list->cfg.name, list->cfg.type);
905 	  }
906 	  else {
907 	    idx++;
908 	    list->cfg.bpfp_a_table[idx] = malloc(sizeof(struct bpf_program));
909 	  }
910 	}
911 
912 	list->cfg.bpfp_a_num = idx;
913       }
914     }
915     list = list->next;
916   }
917 }
918 
pkt_data_clean(void * pdata,int len)919 int pkt_data_clean(void *pdata, int len)
920 {
921   memset(pdata, 0, len);
922 
923   return len;
924 }
925 
pkt_payload_clean(void * ppayload,int len)926 int pkt_payload_clean(void *ppayload, int len)
927 {
928   memset(ppayload, 0, PpayloadSz);
929 
930   return PpayloadSz;
931 }
932 
pkt_msg_clean(void * ppayload,int len)933 int pkt_msg_clean(void *ppayload, int len)
934 {
935   memset(ppayload, 0, PmsgSz);
936 
937   return PmsgSz;
938 }
939 
pkt_extras_clean(void * pextras,int len)940 int pkt_extras_clean(void *pextras, int len)
941 {
942   memset(pextras, 0, PdataSz+PextrasSz);
943 
944   return PdataSz+PextrasSz;
945 }
946 
plugin_pipe_zmq_compile_check()947 void plugin_pipe_zmq_compile_check()
948 {
949 #ifndef WITH_ZMQ
950   Log(LOG_ERR, "ERROR ( %s/%s ): 'plugin_pipe_zmq' requires compiling with --enable-zmq. Exiting ..\n", config.name, config.type);
951   exit_gracefully(1);
952 #endif
953 }
954 
plugin_pipe_check(struct configuration * cfg)955 void plugin_pipe_check(struct configuration *cfg)
956 {
957   if (!cfg->pipe_zmq) cfg->pipe_homegrown = TRUE;
958 }
959 
P_zmq_pipe_init(void * zh,int * pipe_fd,u_int32_t * seq)960 void P_zmq_pipe_init(void *zh, int *pipe_fd, u_int32_t *seq)
961 {
962   plugin_pipe_zmq_compile_check();
963 
964 #ifdef WITH_ZMQ
965   if (zh) {
966     struct p_zmq_host *zmq_host = zh;
967     char log_id[LARGEBUFLEN];
968 
969     p_zmq_plugin_pipe_init_plugin(zmq_host);
970 
971     snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
972     p_zmq_set_log_id(zmq_host, log_id);
973 
974     p_zmq_set_hwm(zmq_host, config.pipe_zmq_hwm);
975     p_zmq_sub_setup(zmq_host);
976     p_zmq_set_retry_timeout(zmq_host, config.pipe_zmq_retry);
977 
978     if (pipe_fd) (*pipe_fd) = p_zmq_get_fd(zmq_host);
979     if (seq) (*seq) = 0;
980   }
981 #endif
982 }
983