1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2019 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if no, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "thread_pool.h"
26 #include "plugin_hooks.h"
27 #include "plugin_common.h"
28 #include "pkt_handlers.h"
29
30 /* functions */
31
32 /* load_plugins() starts plugin processes; creates pipes
33 and handles them inserting in channels_list structure */
34
35 /* no AMQP: when not using map_shared, 'pipe_size' is the size of the pipe
36 created with socketpair(); when map_shared is enabled, it refers to the
37 size of the shared memory area */
load_plugins(struct plugin_requests * req)38 void load_plugins(struct plugin_requests *req)
39 {
40 u_int64_t buf_pipe_ratio_sz = 0, pipe_idx = 0;
41 int snd_buflen = 0, rcv_buflen = 0, socklen = 0, target_buflen = 0;
42
43 int nfprobe_id = 0, min_sz = 0, extra_sz = 0, offset = 0;
44 struct plugins_list_entry *list = plugins_list;
45 socklen_t l = sizeof(list->cfg.pipe_size);
46 struct channels_list_entry *chptr = NULL;
47
48
49 init_random_seed();
50 init_pipe_channels();
51
52 #ifdef WITH_ZMQ
53 char username[SHORTBUFLEN], password[SHORTBUFLEN];
54 memset(username, 0, sizeof(username));
55 memset(password, 0, sizeof(password));
56
57 generate_random_string(username, (sizeof(username) - 1));
58 generate_random_string(password, (sizeof(password) - 1));
59 #endif
60
61 while (list) {
62 if ((*list->type.func)) {
63 if (list->cfg.data_type & (PIPE_TYPE_METADATA|PIPE_TYPE_PAYLOAD|PIPE_TYPE_MSG));
64 else {
65 Log(LOG_ERR, "ERROR ( %s/%s ): Data type not supported: %d\n", list->name, list->type.string, list->cfg.data_type);
66 exit_gracefully(1);
67 }
68
69 min_sz = ChBufHdrSz;
70 list->cfg.buffer_immediate = FALSE;
71 if (list->cfg.data_type & PIPE_TYPE_METADATA) min_sz += PdataSz;
72 if (list->cfg.data_type & PIPE_TYPE_PAYLOAD) {
73 if (list->cfg.acct_type == ACCT_PM && list->cfg.snaplen) min_sz += (PpayloadSz+list->cfg.snaplen);
74 else min_sz += (PpayloadSz+DEFAULT_PLOAD_SIZE);
75 }
76 if (list->cfg.data_type & PIPE_TYPE_EXTRAS) min_sz += PextrasSz;
77 if (list->cfg.data_type & PIPE_TYPE_MSG) {
78 min_sz += PmsgSz;
79 if (!list->cfg.buffer_size) {
80 extra_sz = PKT_MSG_SIZE;
81 list->cfg.buffer_immediate = TRUE;
82 }
83 }
84 if (list->cfg.data_type & PIPE_TYPE_BGP) min_sz += sizeof(struct pkt_bgp_primitives);
85 if (list->cfg.data_type & PIPE_TYPE_LBGP) min_sz += sizeof(struct pkt_legacy_bgp_primitives);
86 if (list->cfg.data_type & PIPE_TYPE_NAT) min_sz += sizeof(struct pkt_nat_primitives);
87 if (list->cfg.data_type & PIPE_TYPE_TUN) min_sz += sizeof(struct pkt_tunnel_primitives);
88 if (list->cfg.data_type & PIPE_TYPE_MPLS) min_sz += sizeof(struct pkt_mpls_primitives);
89 if (list->cfg.cpptrs.len) min_sz += list->cfg.cpptrs.len;
90 if (list->cfg.data_type & PIPE_TYPE_VLEN) {
91 min_sz += sizeof(struct pkt_vlen_hdr_primitives);
92 if (!list->cfg.buffer_size) {
93 extra_sz = 1024; /* wild shot: 1Kb added for the actual variable-length data */
94 list->cfg.buffer_immediate = TRUE;
95 }
96 }
97
98 /* If nothing is supplied, let's hint some working default values */
99 if (!list->cfg.pipe_size || !list->cfg.buffer_size) {
100 if (!list->cfg.pipe_size) list->cfg.pipe_size = 4096000; /* 4Mb */
101 if (!list->cfg.buffer_size) {
102 if (list->cfg.pcap_savefile) list->cfg.buffer_size = 10240; /* 10Kb */
103 else list->cfg.buffer_size = MIN((min_sz + extra_sz), 10240);
104 }
105 }
106
107 /* some validations */
108 if (list->cfg.pipe_size < min_sz) list->cfg.pipe_size = min_sz;
109 if (list->cfg.buffer_size < min_sz) list->cfg.buffer_size = min_sz;
110 if (list->cfg.buffer_size > list->cfg.pipe_size) list->cfg.buffer_size = list->cfg.pipe_size;
111
112 /* if required let's align plugin_buffer_size to 4 bytes boundary */
113 #if NEED_ALIGN
114 while (list->cfg.buffer_size % 4 != 0) list->cfg.buffer_size--;
115 #endif
116
117 if (!list->cfg.pipe_zmq) {
118 /* creating communication channel */
119 socketpair(AF_UNIX, SOCK_DGRAM, 0, list->pipe);
120
121 /* checking SO_RCVBUF and SO_SNDBUF values; if different we take the smaller one */
122 getsockopt(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &rcv_buflen, &l);
123 getsockopt(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &snd_buflen, &l);
124 socklen = (rcv_buflen < snd_buflen) ? rcv_buflen : snd_buflen;
125
126 buf_pipe_ratio_sz = (list->cfg.pipe_size/list->cfg.buffer_size)*sizeof(char *);
127 if (buf_pipe_ratio_sz > INT_MAX) {
128 Log(LOG_ERR, "ERROR ( %s/%s ): Current plugin_buffer_size elems per plugin_pipe_size: %llu. Max: %d.\nExiting.\n",
129 list->name, list->type.string, (unsigned long long)(list->cfg.pipe_size/list->cfg.buffer_size),
130 (INT_MAX/(int)sizeof(char *)));
131 exit_gracefully(1);
132 }
133 else target_buflen = buf_pipe_ratio_sz;
134
135 if (target_buflen > socklen) {
136 Setsocksize(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &target_buflen, l);
137 Setsocksize(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &target_buflen, l);
138 }
139
140 getsockopt(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &rcv_buflen, &l);
141 getsockopt(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &snd_buflen, &l);
142 if (rcv_buflen < snd_buflen) snd_buflen = rcv_buflen;
143
144 if (snd_buflen < socklen) {
145 Setsocksize(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &socklen, l);
146 Setsocksize(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &socklen, l);
147
148 getsockopt(list->pipe[0], SOL_SOCKET, SO_RCVBUF, &rcv_buflen, &l);
149 getsockopt(list->pipe[1], SOL_SOCKET, SO_SNDBUF, &snd_buflen, &l);
150 if (rcv_buflen < snd_buflen) snd_buflen = rcv_buflen;
151 }
152
153 if (list->cfg.debug || (list->cfg.pipe_size > WARNING_PIPE_SIZE)) {
154 Log(LOG_INFO, "INFO ( %s/%s ): plugin_pipe_size=%" PRIu64 " bytes plugin_buffer_size=%" PRIu64 " bytes\n",
155 list->name, list->type.string, list->cfg.pipe_size, list->cfg.buffer_size);
156 if (target_buflen <= snd_buflen)
157 Log(LOG_INFO, "INFO ( %s/%s ): ctrl channel: obtained=%d bytes target=%d bytes\n",
158 list->name, list->type.string, snd_buflen, target_buflen);
159 else
160 /* This should return an error and exit but we fallback to a
161 warning in order to be backward compatible */
162 Log(LOG_WARNING, "WARN ( %s/%s ): ctrl channel: obtained=%d bytes target=%d bytes\n",
163 list->name, list->type.string, snd_buflen, target_buflen);
164 }
165 }
166 else {
167 pipe_idx++;
168 list->pipe[0] = list->pipe[1] = pipe_idx;
169 }
170
171 list->cfg.name = list->name;
172 list->cfg.type = list->type.string;
173 list->cfg.type_id = list->type.id;
174 chptr = insert_pipe_channel(list->type.id, &list->cfg, list->pipe[1]);
175 if (!chptr) {
176 Log(LOG_ERR, "ERROR ( %s/%s ): Unable to setup a new Core Process <-> Plugin channel.\nExiting.\n", list->name, list->type.string);
177 exit_gracefully(1);
178 }
179 else chptr->plugin = list;
180
181 /* sets new value to be assigned to 'wakeup'; 'TRUE' disables on-request wakeup */
182 if (list->type.id == PLUGIN_ID_MEMORY) chptr->request = TRUE;
183
184 /* sets fixed/vlen offsets and cleaner routine; XXX: we should refine the cleaner
185 part: 1) ie. extras assumes it's automagically piled with metadata; 2) what if
186 multiple vlen components are stacked up? */
187 if (list->cfg.data_type & PIPE_TYPE_METADATA) {
188 chptr->clean_func = pkt_data_clean;
189 offset = sizeof(struct pkt_data);
190 }
191 if (list->cfg.data_type & PIPE_TYPE_PAYLOAD) chptr->clean_func = pkt_payload_clean;
192
193 if (list->cfg.data_type & PIPE_TYPE_EXTRAS) {
194 chptr->extras.off_pkt_extras = offset;
195 offset += sizeof(struct pkt_extras);
196 }
197 if (list->cfg.data_type & PIPE_TYPE_MSG) chptr->clean_func = pkt_msg_clean;
198
199 if (list->cfg.data_type & PIPE_TYPE_BGP) {
200 chptr->extras.off_pkt_bgp_primitives = offset;
201 offset += sizeof(struct pkt_bgp_primitives);
202 }
203 else chptr->extras.off_pkt_bgp_primitives = 0;
204
205 if (list->cfg.data_type & PIPE_TYPE_LBGP) {
206 chptr->extras.off_pkt_lbgp_primitives = offset;
207 offset += sizeof(struct pkt_legacy_bgp_primitives);
208 }
209 else chptr->extras.off_pkt_lbgp_primitives = 0;
210
211 if (list->cfg.data_type & PIPE_TYPE_NAT) {
212 chptr->extras.off_pkt_nat_primitives = offset;
213 offset += sizeof(struct pkt_nat_primitives);
214 }
215 else chptr->extras.off_pkt_nat_primitives = 0;
216
217 if (list->cfg.data_type & PIPE_TYPE_TUN) {
218 chptr->extras.off_pkt_tun_primitives = offset;
219 offset += sizeof(struct pkt_tunnel_primitives);
220 }
221 else chptr->extras.off_pkt_tun_primitives = 0;
222
223 if (list->cfg.data_type & PIPE_TYPE_MPLS) {
224 chptr->extras.off_pkt_mpls_primitives = offset;
225 offset += sizeof(struct pkt_mpls_primitives);
226 }
227 else chptr->extras.off_pkt_mpls_primitives = 0;
228
229 if (list->cfg.cpptrs.len) {
230 chptr->extras.off_custom_primitives = offset;
231 offset += list->cfg.cpptrs.len;
232 }
233 /* PIPE_TYPE_VLEN at the end of the stack so to not make
234 vlen other structures (although possible it would not
235 make much sense) */
236 if (list->cfg.data_type & PIPE_TYPE_VLEN) {
237 chptr->extras.off_pkt_vlen_hdr_primitives = offset;
238 offset += sizeof(struct pkt_vlen_hdr_primitives);
239 }
240 else chptr->extras.off_pkt_vlen_hdr_primitives = 0;
241 /* any further offset beyond this point must be set to
242 PM_VARIABLE_LENGTH so to indicate plugins to resolve
243 value at runtime. */
244
245 chptr->datasize = min_sz-ChBufHdrSz;
246
247 /* sets nfprobe ID */
248 if (list->type.id == PLUGIN_ID_NFPROBE) {
249 list->cfg.nfprobe_id = nfprobe_id;
250 nfprobe_id++;
251 }
252
253 /* ZMQ inits, if required */
254 #ifdef WITH_ZMQ
255 if (list->cfg.pipe_zmq) {
256 char log_id[LARGEBUFLEN];
257
258 p_zmq_plugin_pipe_init_core(&chptr->zmq_host, list->id, username, password);
259 snprintf(log_id, sizeof(log_id), "%s/%s", list->name, list->type.string);
260 p_zmq_set_log_id(&chptr->zmq_host, log_id);
261 p_zmq_pub_setup(&chptr->zmq_host);
262 }
263 #endif
264
265 switch (list->pid = fork()) {
266 case -1: /* Something went wrong */
267 Log(LOG_WARNING, "WARN ( %s/%s ): Unable to initialize plugin: %s\n", list->name, list->type.string, strerror(errno));
268 delete_pipe_channel(list->pipe[1]);
269 break;
270 case 0: /* Child */
271 /* SIGCHLD handling issue: SysV avoids zombies by ignoring SIGCHLD; to emulate
272 such semantics on BSD systems, we need an handler like handle_falling_child() */
273 #if defined (SOLARIS)
274 signal(SIGCHLD, SIG_IGN);
275 #else
276 signal(SIGCHLD, ignore_falling_child);
277 #endif
278
279 #if defined HAVE_MALLOPT
280 mallopt(M_CHECK_ACTION, 0);
281 #endif
282
283 if (device.dev_desc) pcap_close(device.dev_desc);
284 close(config.sock);
285 close(config.bgp_sock);
286 if (!list->cfg.pipe_zmq) close(list->pipe[1]);
287 (*list->type.func)(list->pipe[0], &list->cfg, chptr);
288 exit_gracefully(0);
289 default: /* Parent */
290 if (!list->cfg.pipe_zmq) {
291 close(list->pipe[0]);
292 setnonblocking(list->pipe[1]);
293 }
294 break;
295 }
296
297 /* some residual check */
298 if (chptr && list->cfg.a_filter) req->bpf_filter = TRUE;
299 }
300 list = list->next;
301 }
302
303 sort_pipe_channels();
304
305 /* define pre_tag_map(s) now so that they don't finish unnecessarily in plugin memory space */
306 {
307 int ptm_index = 0, ptm_global = FALSE;
308 char *ptm_ptr = NULL;
309
310 list = plugins_list;
311
312 while (list) {
313 if (list->cfg.pre_tag_map) {
314 if (!ptm_index) {
315 ptm_ptr = list->cfg.pre_tag_map;
316 ptm_global = TRUE;
317 }
318 else {
319 if (!ptm_ptr || strcmp(ptm_ptr, list->cfg.pre_tag_map))
320 ptm_global = FALSE;
321 }
322
323 if (list->cfg.type_id == PLUGIN_ID_TEE) {
324 req->ptm_c.load_ptm_plugin = list->cfg.type_id;
325 req->ptm_c.load_ptm_res = FALSE;
326 }
327
328 load_pre_tag_map(config.acct_type, list->cfg.pre_tag_map, &list->cfg.ptm, req, &list->cfg.ptm_alloc,
329 list->cfg.maps_entries, list->cfg.maps_row_len);
330
331 if (list->cfg.type_id == PLUGIN_ID_TEE) {
332 list->cfg.ptm_complex = req->ptm_c.load_ptm_res;
333 if (req->ptm_c.load_ptm_res) req->ptm_c.exec_ptm_dissect = TRUE;
334 }
335 }
336
337 list = list->next;
338 ptm_index++;
339 }
340
341 /* enforcing global flag */
342 list = plugins_list;
343
344 while (list) {
345 list->cfg.ptm_global = ptm_global;
346 list = list->next;
347 }
348 }
349 }
350
exec_plugins(struct packet_ptrs * pptrs,struct plugin_requests * req)351 void exec_plugins(struct packet_ptrs *pptrs, struct plugin_requests *req)
352 {
353 int saved_have_tag = FALSE, saved_have_tag2 = FALSE, saved_have_label = FALSE;
354 pm_id_t saved_tag = 0, saved_tag2 = 0;
355 pt_label_t *saved_label = malloc(sizeof(pt_label_t));
356
357 int num, fixed_size;
358 u_int32_t savedptr;
359 char *bptr;
360 int index, got_tags = FALSE;
361
362 pretag_init_label(saved_label);
363
364 #if defined WITH_GEOIPV2
365 if (reload_geoipv2_file && config.geoipv2_file) {
366 pm_geoipv2_close();
367 pm_geoipv2_init();
368
369 reload_geoipv2_file = FALSE;
370 }
371 #endif
372
373 for (index = 0; channels_list[index].aggregation || channels_list[index].aggregation_2; index++) {
374 struct plugins_list_entry *p = channels_list[index].plugin;
375
376 channels_list[index].already_reprocessed = FALSE;
377
378 if (p->cfg.pre_tag_map && find_id_func) {
379 if (p->cfg.type_id == PLUGIN_ID_TEE) {
380 /*
381 replicate and compute tagging if:
382 - a dissected flow hits a complex pre_tag_map or
383 - a non-dissected (full) packet hits a simple pre_tag_map
384 */
385 if ((req->ptm_c.exec_ptm_res && !p->cfg.ptm_complex) || (!req->ptm_c.exec_ptm_res && p->cfg.ptm_complex))
386 continue;
387 }
388
389 if (p->cfg.ptm_global && got_tags) {
390 pptrs->tag = saved_tag;
391 pptrs->tag2 = saved_tag2;
392 pretag_copy_label(&pptrs->label, saved_label);
393
394 pptrs->have_tag = saved_have_tag;
395 pptrs->have_tag2 = saved_have_tag2;
396 pptrs->have_label = saved_have_label;
397 }
398 else {
399 if (p->cfg.type_id == PLUGIN_ID_TEE && req->ptm_c.exec_ptm_res && pptrs->tee_dissect_bcast) /* noop */;
400 else {
401 find_id_func(&p->cfg.ptm, pptrs, &pptrs->tag, &pptrs->tag2);
402
403 if (p->cfg.ptm_global) {
404 saved_tag = pptrs->tag;
405 saved_tag2 = pptrs->tag2;
406 pretag_copy_label(saved_label, &pptrs->label);
407
408 saved_have_tag = pptrs->have_tag;
409 saved_have_tag2 = pptrs->have_tag2;
410 saved_have_label = pptrs->have_label;
411
412 got_tags = TRUE;
413 }
414 }
415 }
416 }
417 else {
418 if (p->cfg.type_id == PLUGIN_ID_TEE) {
419 /* stop dissected flows from being replicated in case of no pre_tag_map */
420 if (req->ptm_c.exec_ptm_res) continue;
421 }
422 }
423
424 if (evaluate_filters(&channels_list[index].agg_filter, pptrs->packet_ptr, pptrs->pkthdr) &&
425 !evaluate_tags(&channels_list[index].tag_filter, pptrs->tag) &&
426 !evaluate_tags(&channels_list[index].tag2_filter, pptrs->tag2) &&
427 !evaluate_labels(&channels_list[index].label_filter, &pptrs->label) &&
428 !check_shadow_status(pptrs, &channels_list[index])) {
429 /* arranging buffer: supported primitives + packet total length */
430 reprocess:
431 channels_list[index].reprocess = FALSE;
432 num = 0;
433
434 /* rg.ptr points to slot's base address into the ring (shared memory); bufptr works
435 as a displacement into the slot to place sequentially packets */
436 bptr = channels_list[index].rg.ptr+ChBufHdrSz+channels_list[index].bufptr;
437 fixed_size = (*channels_list[index].clean_func)(bptr, channels_list[index].datasize);
438 channels_list[index].var_size = 0;
439 savedptr = channels_list[index].bufptr;
440 reset_fallback_status(pptrs);
441
442 while (channels_list[index].phandler[num]) {
443 (*channels_list[index].phandler[num])(&channels_list[index], pptrs, &bptr);
444 num++;
445 }
446
447 if (channels_list[index].s.rate && !channels_list[index].s.sampled_pkts) {
448 channels_list[index].reprocess = FALSE;
449 channels_list[index].bufptr = savedptr;
450 channels_list[index].hdr.num--; /* let's cheat this value as it will get increased later */
451 fixed_size = 0;
452 channels_list[index].var_size = 0;
453 }
454
455 if (channels_list[index].reprocess) {
456 /* Let's check if we have an issue with the buffer size */
457 if (channels_list[index].already_reprocessed) {
458 struct plugins_list_entry *list = channels_list[index].plugin;
459
460 Log(LOG_ERR, "ERROR ( %s/%s ): plugin_buffer_size is too short.\n", list->name, list->type.string);
461 exit_gracefully(1);
462 }
463
464 channels_list[index].already_reprocessed = TRUE;
465
466 /* Let's cheat the size in order to send out the current buffer */
467 fixed_size = channels_list[index].plugin->cfg.pipe_size;
468 }
469 else {
470 channels_list[index].hdr.num++;
471 channels_list[index].bufptr += (fixed_size + channels_list[index].var_size);
472 }
473
474 if (((channels_list[index].bufptr + fixed_size) > channels_list[index].bufend) ||
475 (channels_list[index].hdr.num == INT_MAX) || channels_list[index].buffer_immediate) {
476 channels_list[index].hdr.seq++;
477 channels_list[index].hdr.seq %= MAX_SEQNUM;
478
479 /* let's commit the buffer we just finished writing */
480 ((struct ch_buf_hdr *)channels_list[index].rg.ptr)->len = channels_list[index].bufptr;
481 ((struct ch_buf_hdr *)channels_list[index].rg.ptr)->seq = channels_list[index].hdr.seq;
482 ((struct ch_buf_hdr *)channels_list[index].rg.ptr)->num = channels_list[index].hdr.num;
483
484 channels_list[index].status->last_buf_off = (u_int64_t)(channels_list[index].rg.ptr - channels_list[index].rg.base);
485
486 if (config.debug_internal_msg) {
487 struct plugins_list_entry *list = channels_list[index].plugin;
488 Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer released len=%" PRIu64 " seq=%u num_entries=%u off=%" PRIu64 "\n",
489 list->name, list->type.string, channels_list[index].bufptr, channels_list[index].hdr.seq,
490 channels_list[index].hdr.num, channels_list[index].status->last_buf_off);
491 }
492
493 /* sending buffer to connected ZMQ subscriber(s) */
494 if (channels_list[index].plugin->cfg.pipe_zmq) {
495 #ifdef WITH_ZMQ
496 struct channels_list_entry *chptr = &channels_list[index];
497
498 int ret = p_zmq_topic_send(&chptr->zmq_host, chptr->rg.ptr, chptr->bufsize);
499 (void)ret; //Check error?
500 #endif
501 }
502 else {
503 if (channels_list[index].status->wakeup) {
504 channels_list[index].status->wakeup = channels_list[index].request;
505 if (write(channels_list[index].pipe, &channels_list[index].rg.ptr, CharPtrSz) != CharPtrSz) {
506 struct plugins_list_entry *list = channels_list[index].plugin;
507 Log(LOG_WARNING, "WARN ( %s/%s ): Failed during write: %s\n", list->name, list->type.string, strerror(errno));
508 }
509 }
510 }
511
512 channels_list[index].rg.ptr += channels_list[index].bufsize;
513
514 if ((channels_list[index].rg.ptr+channels_list[index].bufsize) > channels_list[index].rg.end)
515 channels_list[index].rg.ptr = channels_list[index].rg.base;
516
517 /* let's protect the buffer we are going to write */
518 ((struct ch_buf_hdr *)channels_list[index].rg.ptr)->seq = -1;
519 ((struct ch_buf_hdr *)channels_list[index].rg.ptr)->num = 0;
520
521 /* rewind pointer */
522 channels_list[index].bufptr = channels_list[index].buf;
523 channels_list[index].hdr.num = 0;
524
525 if (channels_list[index].reprocess) goto reprocess;
526
527 /* if reading from a savefile, let's sleep a bit after
528 having sent over a buffer worth of data */
529 if (channels_list[index].plugin->cfg.pcap_savefile) usleep(1000); /* 1 msec */
530 }
531 }
532
533 pptrs->tag = 0;
534 pptrs->tag2 = 0;
535 pretag_free_label(&pptrs->label);
536 }
537
538 /* check if we have to reload the map: new loop is to
539 ensure we reload it for all plugins and prevent any
540 timing issues with pointers to labels */
541 if (reload_map_exec_plugins) {
542 memset(&req->ptm_c, 0, sizeof(struct ptm_complex));
543
544 for (index = 0; channels_list[index].aggregation || channels_list[index].aggregation_2; index++) {
545 struct plugins_list_entry *p = channels_list[index].plugin;
546
547 if (p->cfg.pre_tag_map && find_id_func) {
548 if (p->cfg.type_id == PLUGIN_ID_TEE) {
549 req->ptm_c.load_ptm_plugin = p->cfg.type_id;
550 req->ptm_c.load_ptm_res = FALSE;
551 }
552
553 load_pre_tag_map(config.acct_type, p->cfg.pre_tag_map, &p->cfg.ptm, req, &p->cfg.ptm_alloc,
554 p->cfg.maps_entries, p->cfg.maps_row_len);
555
556 if (p->cfg.type_id == PLUGIN_ID_TEE) {
557 p->cfg.ptm_complex = req->ptm_c.load_ptm_res;
558 if (req->ptm_c.load_ptm_res) req->ptm_c.exec_ptm_dissect = TRUE;
559 }
560 }
561 }
562 }
563
564 /* cleanups */
565 reload_map_exec_plugins = FALSE;
566 pretag_free_label(saved_label);
567 if (saved_label) free(saved_label);
568 }
569
insert_pipe_channel(int plugin_type,struct configuration * cfg,int pipe)570 struct channels_list_entry *insert_pipe_channel(int plugin_type, struct configuration *cfg, int pipe)
571 {
572 struct channels_list_entry *chptr;
573 int index = 0;
574
575 while (index < MAX_N_PLUGINS) {
576 chptr = &channels_list[index];
577 if (!chptr->aggregation && !chptr->aggregation_2) { /* found room */
578 chptr->aggregation = cfg->what_to_count;
579 chptr->aggregation_2 = cfg->what_to_count_2;
580 chptr->pipe = pipe;
581 chptr->agg_filter.table = cfg->bpfp_a_table;
582 chptr->agg_filter.num = (int *) &cfg->bpfp_a_num;
583 chptr->bufsize = cfg->buffer_size;
584 chptr->buffer_immediate = cfg->buffer_immediate;
585 chptr->core_pid = getpid();
586 chptr->tag = cfg->post_tag;
587 chptr->tag2 = cfg->post_tag2;
588 if (cfg->sampling_rate && plugin_type != PLUGIN_ID_SFPROBE) { /* sfprobe cares for itself */
589 chptr->s.rate = cfg->sampling_rate;
590
591 if (cfg->acct_type == ACCT_NF) chptr->s.sf = &take_simple_systematic_skip;
592 else chptr->s.sf = &take_simple_random_skip;
593 }
594 memcpy(&chptr->tag_filter, &cfg->ptf, sizeof(struct pretag_filter));
595 memcpy(&chptr->tag2_filter, &cfg->pt2f, sizeof(struct pretag_filter));
596 memcpy(&chptr->label_filter, &cfg->ptlf, sizeof(struct pretag_label_filter));
597 chptr->buf = 0;
598 chptr->bufptr = chptr->buf;
599 chptr->bufend = cfg->buffer_size-sizeof(struct ch_buf_hdr);
600
601 // XXX: no need to map_shared() if using AMQP
602 /* +PKT_MSG_SIZE has been introduced as a margin as a
603 countermeasure against the reception of malicious NetFlow v9
604 templates */
605 chptr->rg.base = map_shared(0, cfg->pipe_size+PKT_MSG_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
606 if (chptr->rg.base == MAP_FAILED) {
607 Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate pipe buffer. Exiting ...\n", cfg->name, cfg->type);
608 exit_gracefully(1);
609 }
610 memset(chptr->rg.base, 0, cfg->pipe_size);
611 chptr->rg.ptr = chptr->rg.base;
612 chptr->rg.end = chptr->rg.base+cfg->pipe_size;
613
614 chptr->status = map_shared(0, sizeof(struct ch_status), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
615 if (chptr->status == MAP_FAILED) {
616 Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate status buffer. Exiting ...\n", cfg->name, cfg->type);
617 exit_gracefully(1);
618 }
619 memset(chptr->status, 0, sizeof(struct ch_status));
620
621 break;
622 }
623 else chptr = NULL;
624
625 index++;
626 }
627
628 return chptr;
629 }
630
delete_pipe_channel(int pipe)631 void delete_pipe_channel(int pipe)
632 {
633 struct channels_list_entry *chptr;
634 int index = 0, index2;
635
636 while (index < MAX_N_PLUGINS) {
637 chptr = &channels_list[index];
638
639 if (chptr->pipe == pipe) {
640 chptr->aggregation = FALSE;
641 chptr->aggregation_2 = FALSE;
642
643 /* we ensure that any plugin is depending on the one
644 being removed via the 'same_aggregate' flag */
645 if (!chptr->same_aggregate) {
646 index2 = index;
647 for (index2++; index2 < MAX_N_PLUGINS; index2++) {
648 chptr = &channels_list[index2];
649
650 if (!chptr->aggregation && !chptr->aggregation_2) break; /* we finished channels */
651 if (chptr->same_aggregate) {
652 chptr->same_aggregate = FALSE;
653 break;
654 }
655 else break; /* we have nothing to do */
656 }
657 }
658
659 index2 = index;
660 for (index2++; index2 < MAX_N_PLUGINS; index2++) {
661 chptr = &channels_list[index2];
662 if (chptr->aggregation || chptr->aggregation_2) {
663 memcpy(&channels_list[index], chptr, sizeof(struct channels_list_entry));
664 memset(chptr, 0, sizeof(struct channels_list_entry));
665 index++;
666 }
667 else break; /* we finished channels */
668 }
669
670 break;
671 }
672
673 index++;
674 }
675 }
676
677 /* trivial sorting(tm) :-) */
sort_pipe_channels()678 void sort_pipe_channels()
679 {
680 struct channels_list_entry ctmp;
681 int x = 0, y = 0;
682
683 while (x < MAX_N_PLUGINS) {
684 if (!channels_list[x].aggregation && !channels_list[x].aggregation_2) break;
685 y = x+1;
686 while (y < MAX_N_PLUGINS) {
687 if (!channels_list[y].aggregation && !channels_list[y].aggregation_2) break;
688 if (channels_list[x].aggregation == channels_list[y].aggregation &&
689 channels_list[x].aggregation_2 == channels_list[y].aggregation_2) {
690 channels_list[y].same_aggregate = TRUE;
691 if (y == x+1) x++;
692 else {
693 memcpy(&ctmp, &channels_list[x+1], sizeof(struct channels_list_entry));
694 memcpy(&channels_list[x+1], &channels_list[y], sizeof(struct channels_list_entry));
695 memcpy(&channels_list[y], &ctmp, sizeof(struct channels_list_entry));
696 x++;
697 }
698 }
699 y++;
700 }
701 x++;
702 }
703 }
704
init_pipe_channels()705 void init_pipe_channels()
706 {
707 memset(&channels_list, 0, MAX_N_PLUGINS*sizeof(struct channels_list_entry));
708 }
709
evaluate_sampling(struct sampling * smp,pm_counter_t * pkt_len,pm_counter_t * pkt_num,pm_counter_t * sample_pool)710 void evaluate_sampling(struct sampling *smp, pm_counter_t *pkt_len, pm_counter_t *pkt_num, pm_counter_t *sample_pool)
711 {
712 pm_counter_t delta, pkts = *pkt_num;
713
714 if (!smp->rate) { /* sampling is disabled */
715 smp->sample_pool = pkts;
716 smp->sampled_pkts = pkts;
717 return;
718 }
719
720 smp->sampled_pkts = 0;
721
722 run_again:
723 if (!smp->counter) smp->counter = (smp->sf)(smp->rate);
724
725 delta = MIN(smp->counter, pkts);
726 smp->counter -= delta;
727 pkts -= delta;
728 smp->sample_pool += delta;
729
730 if (!smp->counter) {
731 smp->sampled_pkts++;
732 *sample_pool = smp->sample_pool;
733 smp->sample_pool = 0;
734 if (pkts > 0) goto run_again;
735 }
736
737 /* Let's handle flows meaningfully */
738 if (smp->sampled_pkts && *pkt_num > 1) {
739 *pkt_len = ( *pkt_len / *pkt_num ) * smp->sampled_pkts;
740 *pkt_num = smp->sampled_pkts;
741 }
742 }
743
744 /* simple random algorithm */
take_simple_random_skip(pm_counter_t mean)745 pm_counter_t take_simple_random_skip(pm_counter_t mean)
746 {
747 pm_counter_t skip;
748
749 if (mean > 1) {
750 skip = ((random() % ((2 * mean) - 1)) + 1);
751 srandom(random());
752 }
753 else skip = 1; /* smp->rate == 1 */
754
755 return skip;
756 }
757
758 /* simple systematic algorithm */
take_simple_systematic_skip(pm_counter_t mean)759 pm_counter_t take_simple_systematic_skip(pm_counter_t mean)
760 {
761 pm_counter_t skip = mean;
762
763 return skip;
764 }
765
766 /* return value:
767 TRUE: We want it!
768 FALSE: Discard it!
769 */
evaluate_filters(struct aggregate_filter * filter,u_char * pkt,struct pcap_pkthdr * pkthdr)770 int evaluate_filters(struct aggregate_filter *filter, u_char *pkt, struct pcap_pkthdr *pkthdr)
771 {
772 int index;
773
774 if (*filter->num == 0) return TRUE; /* no entries in the filter array: aggregate filtering disabled */
775
776 for (index = 0; index < *filter->num; index++) {
777 if (bpf_filter(filter->table[index]->bf_insns, pkt, pkthdr->len, pkthdr->caplen)) return TRUE;
778 }
779
780 return FALSE;
781 }
782
recollect_pipe_memory(struct channels_list_entry * mychptr)783 void recollect_pipe_memory(struct channels_list_entry *mychptr)
784 {
785 struct channels_list_entry *chptr;
786 int index = 0;
787
788 while (index < MAX_N_PLUGINS) {
789 chptr = &channels_list[index];
790 if (mychptr->rg.base != chptr->rg.base) {
791 munmap(chptr->rg.base, (chptr->rg.end-chptr->rg.base)+PKT_MSG_SIZE);
792 munmap(chptr->status, sizeof(struct ch_status));
793 }
794 index++;
795 }
796 }
797
init_random_seed()798 void init_random_seed()
799 {
800 struct timeval tv;
801
802 gettimeofday(&tv, NULL);
803 srandom((unsigned int)tv.tv_usec);
804 }
805
fill_pipe_buffer()806 void fill_pipe_buffer()
807 {
808 struct channels_list_entry *chptr;
809 int index;
810
811 for (index = 0; channels_list[index].aggregation || channels_list[index].aggregation_2; index++) {
812 chptr = &channels_list[index];
813
814 chptr->hdr.seq++;
815 chptr->hdr.seq %= MAX_SEQNUM;
816
817 ((struct ch_buf_hdr *)chptr->rg.ptr)->seq = chptr->hdr.seq;
818 ((struct ch_buf_hdr *)chptr->rg.ptr)->num = chptr->hdr.num;
819
820 if (chptr->plugin->cfg.pipe_zmq) {
821 #ifdef WITH_ZMQ
822 p_zmq_topic_send(&chptr->zmq_host, chptr->rg.ptr, chptr->bufsize);
823 #endif
824 }
825 else {
826 if (chptr->status->wakeup) {
827 chptr->status->wakeup = chptr->request;
828 if (write(chptr->pipe, &chptr->rg.ptr, CharPtrSz) != CharPtrSz)
829 Log(LOG_WARNING, "WARN ( %s/%s ): Failed during write: %s\n", chptr->plugin->cfg.name, chptr->plugin->cfg.type, strerror(errno));
830 }
831 }
832 }
833 }
834
check_pipe_buffer_space(struct channels_list_entry * mychptr,struct pkt_vlen_hdr_primitives * pvlen,int len)835 int check_pipe_buffer_space(struct channels_list_entry *mychptr, struct pkt_vlen_hdr_primitives *pvlen, int len)
836 {
837 int buf_space = 0;
838
839 if (!mychptr) return ERR;
840
841 /* init to base of current element */
842 buf_space = mychptr->bufend - mychptr->bufptr;
843
844 /* subtract fixed part, current variable part and new var part (len) */
845 buf_space -= mychptr->datasize;
846 if (pvlen) buf_space -= pvlen->tot_len;
847 buf_space -= len;
848
849 /* return virdict. if positive fix sizes. if negative take care of triggering a reprocess */
850 if (buf_space >= 0) {
851 mychptr->var_size += len;
852 return FALSE;
853 }
854 else {
855 mychptr->bufptr += (mychptr->bufend - mychptr->bufptr);
856 mychptr->reprocess = TRUE;
857 mychptr->var_size = 0;
858
859 return TRUE;
860 }
861 }
862
return_pipe_buffer_space(struct channels_list_entry * mychptr,int len)863 void return_pipe_buffer_space(struct channels_list_entry *mychptr, int len)
864 {
865 if (!mychptr || !len) return;
866
867 if (mychptr->var_size < len) return;
868
869 mychptr->var_size -= len;
870 }
871
check_shadow_status(struct packet_ptrs * pptrs,struct channels_list_entry * mychptr)872 int check_shadow_status(struct packet_ptrs *pptrs, struct channels_list_entry *mychptr)
873 {
874 if (pptrs->shadow) {
875 if (pptrs->tag && mychptr->aggregation & COUNT_TAG) return FALSE;
876 else if (pptrs->tag2 && mychptr->aggregation & COUNT_TAG2) return FALSE;
877 else return TRUE;
878 }
879 else return FALSE;
880 }
881
load_plugin_filters(int link_type)882 void load_plugin_filters(int link_type)
883 {
884 struct plugins_list_entry *list = plugins_list;
885
886 while (list) {
887 if ((*list->type.func)) {
888
889 /* compiling aggregation filter if needed */
890 if (list->cfg.a_filter) {
891 pcap_t *dev_desc;
892 bpf_u_int32 localnet, netmask = 0; /* pcap library stuff */
893 char errbuf[PCAP_ERRBUF_SIZE], *count_token;
894 int idx = 0;
895
896 dev_desc = pcap_open_dead(link_type, 128); /* 128 bytes should be long enough */
897
898 if (config.pcap_if) pcap_lookupnet(config.pcap_if, &localnet, &netmask, errbuf);
899
900 list->cfg.bpfp_a_table[idx] = malloc(sizeof(struct bpf_program));
901 while ( (count_token = extract_token(&list->cfg.a_filter, ',')) && idx < AGG_FILTER_ENTRIES ) {
902 if (pcap_compile(dev_desc, list->cfg.bpfp_a_table[idx], count_token, 0, netmask) < 0) {
903 Log(LOG_WARNING, "WARN: %s\nWARN ( %s/%s ): aggregation filter disabled.\n",
904 pcap_geterr(dev_desc), list->cfg.name, list->cfg.type);
905 }
906 else {
907 idx++;
908 list->cfg.bpfp_a_table[idx] = malloc(sizeof(struct bpf_program));
909 }
910 }
911
912 list->cfg.bpfp_a_num = idx;
913 }
914 }
915 list = list->next;
916 }
917 }
918
pkt_data_clean(void * pdata,int len)919 int pkt_data_clean(void *pdata, int len)
920 {
921 memset(pdata, 0, len);
922
923 return len;
924 }
925
pkt_payload_clean(void * ppayload,int len)926 int pkt_payload_clean(void *ppayload, int len)
927 {
928 memset(ppayload, 0, PpayloadSz);
929
930 return PpayloadSz;
931 }
932
pkt_msg_clean(void * ppayload,int len)933 int pkt_msg_clean(void *ppayload, int len)
934 {
935 memset(ppayload, 0, PmsgSz);
936
937 return PmsgSz;
938 }
939
pkt_extras_clean(void * pextras,int len)940 int pkt_extras_clean(void *pextras, int len)
941 {
942 memset(pextras, 0, PdataSz+PextrasSz);
943
944 return PdataSz+PextrasSz;
945 }
946
plugin_pipe_zmq_compile_check()947 void plugin_pipe_zmq_compile_check()
948 {
949 #ifndef WITH_ZMQ
950 Log(LOG_ERR, "ERROR ( %s/%s ): 'plugin_pipe_zmq' requires compiling with --enable-zmq. Exiting ..\n", config.name, config.type);
951 exit_gracefully(1);
952 #endif
953 }
954
plugin_pipe_check(struct configuration * cfg)955 void plugin_pipe_check(struct configuration *cfg)
956 {
957 if (!cfg->pipe_zmq) cfg->pipe_homegrown = TRUE;
958 }
959
P_zmq_pipe_init(void * zh,int * pipe_fd,u_int32_t * seq)960 void P_zmq_pipe_init(void *zh, int *pipe_fd, u_int32_t *seq)
961 {
962 plugin_pipe_zmq_compile_check();
963
964 #ifdef WITH_ZMQ
965 if (zh) {
966 struct p_zmq_host *zmq_host = zh;
967 char log_id[LARGEBUFLEN];
968
969 p_zmq_plugin_pipe_init_plugin(zmq_host);
970
971 snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
972 p_zmq_set_log_id(zmq_host, log_id);
973
974 p_zmq_set_hwm(zmq_host, config.pipe_zmq_hwm);
975 p_zmq_sub_setup(zmq_host);
976 p_zmq_set_retry_timeout(zmq_host, config.pipe_zmq_retry);
977
978 if (pipe_fd) (*pipe_fd) = p_zmq_get_fd(zmq_host);
979 if (seq) (*seq) = 0;
980 }
981 #endif
982 }
983