1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 /* includes */
23 #include "pmacct.h"
24 #include "addr.h"
25 #ifdef WITH_KAFKA
26 #include "kafka_common.h"
27 #endif
28 #include "nfacctd.h"
29 #include "pretag_handlers.h"
30 #include "pmacct-data.h"
31 #include "plugin_hooks.h"
32 #include "pkt_handlers.h"
33 #include "ip_flow.h"
34 #include "ip_frag.h"
35 #include "classifier.h"
36 #include "net_aggr.h"
37 #include "bgp/bgp_packet.h"
38 #include "bgp/bgp.h"
39 #include "isis/isis.h"
40 #include "bmp/bmp.h"
41 #include "telemetry/telemetry.h"
42 #if defined (WITH_NDPI)
43 #include "ndpi/ndpi.h"
44 #endif
45 #include "tee_plugin/tee_plugin.h"
46 
47 /* Global variables */
48 struct template_cache tpl_cache;
49 struct host_addr debug_a;
50 char debug_agent_addr[50];
51 u_int16_t debug_agent_port;
52 extern struct channels_list_entry channels_list[MAX_N_PLUGINS]; /* communication channels: core <-> plugins */
53 
54 /* Functions */
usage_daemon(char * prog_name)55 void usage_daemon(char *prog_name)
56 {
57   printf("%s %s (%s)\n", NFACCTD_USAGE_HEADER, PMACCT_VERSION, PMACCT_BUILD);
58   printf("Usage: %s [ -D | -d ] [ -L IP address ] [ -l port ] [ -c primitive [ , ... ] ] [ -P plugin [ , ... ] ]\n", prog_name);
59   printf("       %s [ -f config_file ]\n", prog_name);
60   printf("       %s [ -h ]\n", prog_name);
61   printf("\nGeneral options:\n");
62   printf("  -h  \tShow this page\n");
63   printf("  -V  \tShow version and compile-time options and exit\n");
64   printf("  -L  \tBind to the specified IP address\n");
65   printf("  -l  \tListen on the specified UDP port\n");
66   printf("  -f  \tLoad configuration from the specified file\n");
67   printf("  -a  \tPrint list of supported aggregation primitives\n");
68   printf("  -c  \tAggregation method, see full list of primitives with -a (DEFAULT: src_host)\n");
69   printf("  -D  \tDaemonize\n");
70   printf("  -n  \tPath to a file containing networks and/or ASNs definitions\n");
71   printf("  -t  \tPath to a file containing ports definitions\n");
72   printf("  -P  \t[ memory | print | mysql | pgsql | sqlite3 | amqp | kafka | tee ] \n\tActivate plugin\n");
73   printf("  -d  \tEnable debug\n");
74   printf("  -S  \t[ auth | mail | daemon | kern | user | local[0-7] ] \n\tLog to the specified syslog facility\n");
75   printf("  -F  \tWrite Core Process PID into the specified file\n");
76   printf("  -R  \tRenormalize sampled data\n");
77   printf("  -u  \tLeave IP protocols in numerical format\n");
78   printf("  -I  \tRead packets from the specified savefile\n");
79   printf("  -Z  \tReading from a savefile, sleep the given amount of seconds at startup and between replays\n");
80   printf("  -W  \tReading from a savefile, don't exit but sleep when finished\n");
81   printf("  -Y  \tReading from a savefile, replay the number of times specified\n");
82   printf("\nMemory plugin (-P memory) options:\n");
83   printf("  -p  \tSocket for client-server communication (DEFAULT: /tmp/collect.pipe)\n");
84   printf("  -b  \tNumber of buckets\n");
85   printf("  -m  \tNumber of memory pools\n");
86   printf("  -s  \tMemory pool size\n");
87   printf("\nPrint plugin (-P print) plugin options:\n");
88   printf("  -r  \tRefresh time (in seconds)\n");
89   printf("  -O  \t[ formatted | csv | json | avro ] \n\tOutput format\n");
90   printf("  -o  \tPath to output file\n");
91   printf("  -M  \tPrint event init/close marker messages\n");
92   printf("  -A  \tAppend output (applies to -o)\n");
93   printf("  -E  \tCSV format separator (applies to -O csv, DEFAULT: ',')\n");
94   printf("\n");
95   printf("For examples, see:\n");
96   printf("  https://github.com/pmacct/pmacct/blob/master/QUICKSTART or\n");
97   printf("  https://github.com/pmacct/pmacct/wiki\n");
98   printf("\n");
99   printf("For suggestions, critics, bugs, contact me: %s.\n", MANTAINER);
100 }
101 
102 
main(int argc,char ** argv,char ** envp)103 int main(int argc,char **argv, char **envp)
104 {
105   struct plugins_list_entry *list;
106   struct plugin_requests req;
107   struct packet_ptrs_vector pptrs;
108   char config_file[SRVBUFLEN];
109   unsigned char *netflow_packet;
110   unsigned char *netflow_templates_packet;
111   int logf, rc = 0, yes=1, allowed;
112   struct host_addr addr;
113   struct hosts_table allow;
114   struct id_table bpas_table;
115   struct id_table blp_table;
116   struct id_table bmed_table;
117   struct id_table biss_table;
118   struct id_table bta_table;
119   struct id_table bitr_table;
120   struct id_table sampling_table;
121   u_int32_t idx;
122   int ret;
123   int capture_methods = 0;
124 
125   struct sockaddr_storage server, server_templates;
126   struct sockaddr_storage client;
127   struct ipv6_mreq multi_req6;
128   struct tee_receiver tee_templates;
129   socklen_t clen = sizeof(client), slen = 0;
130   struct ip_mreq multi_req4;
131   int templates_sock = 0;
132 
133   int pm_pcap_savefile_round = 0;
134 
135   unsigned char dummy_packet[64];
136   unsigned char dummy_packet_vlan[64];
137   unsigned char dummy_packet_mpls[128];
138   unsigned char dummy_packet_vlan_mpls[128];
139   struct pcap_pkthdr dummy_pkthdr;
140   struct pcap_pkthdr dummy_pkthdr_vlan;
141   struct pcap_pkthdr dummy_pkthdr_mpls;
142   struct pcap_pkthdr dummy_pkthdr_vlan_mpls;
143 
144   unsigned char dummy_packet6[92];
145   unsigned char dummy_packet_vlan6[92];
146   unsigned char dummy_packet_mpls6[128];
147   unsigned char dummy_packet_vlan_mpls6[128];
148   struct pcap_pkthdr dummy_pkthdr6;
149   struct pcap_pkthdr dummy_pkthdr_vlan6;
150   struct pcap_pkthdr dummy_pkthdr_mpls6;
151   struct pcap_pkthdr dummy_pkthdr_vlan_mpls6;
152 
153   struct packet_ptrs recv_pptrs;
154   struct pcap_pkthdr recv_pkthdr;
155 
156   sigset_t signal_set;
157 
158   /* getopt() stuff */
159   extern char *optarg;
160   extern int optind, opterr, optopt;
161   int errflag, cp;
162 
163   /* select() stuff */
164   fd_set read_descs, bkp_read_descs;
165   int select_fd, bkp_select_fd, num_descs;
166 
167 #ifdef WITH_REDIS
168   struct p_redis_host redis_host;
169 #endif
170 
171 #if defined HAVE_MALLOPT
172   mallopt(M_CHECK_ACTION, 0);
173 #endif
174 
175   umask(077);
176   NF_compute_once();
177 
178   /* a bunch of default definitions */
179   reload_map = FALSE;
180   print_stats = FALSE;
181   reload_geoipv2_file = FALSE;
182   sampling_map_allocated = FALSE;
183   bpas_map_allocated = FALSE;
184   blp_map_allocated = FALSE;
185   bmed_map_allocated = FALSE;
186   biss_map_allocated = FALSE;
187   bta_map_allocated = FALSE;
188   bitr_map_allocated = FALSE;
189   custom_primitives_allocated = FALSE;
190   bta_map_caching = TRUE;
191   sampling_map_caching = TRUE;
192   find_id_func = NF_find_id;
193   plugins_list = NULL;
194 
195   netflow_packet = malloc(NETFLOW_MSG_SIZE);
196   netflow_templates_packet = malloc(NETFLOW_MSG_SIZE);
197 
198   data_plugins = 0;
199   tee_plugins = 0;
200   xflow_status_table_entries = 0;
201   xflow_tot_bad_datagrams = 0;
202   errflag = 0;
203 
204   memset(cfg_cmdline, 0, sizeof(cfg_cmdline));
205   memset(&server, 0, sizeof(server));
206   memset(&client, 0, sizeof(client));
207   memset(&config, 0, sizeof(struct configuration));
208   memset(&config_file, 0, sizeof(config_file));
209   memset(&failed_plugins, 0, sizeof(failed_plugins));
210   memset(&pptrs, 0, sizeof(pptrs));
211   memset(&req, 0, sizeof(req));
212   memset(&class, 0, sizeof(class));
213   memset(&xflow_status_table, 0, sizeof(xflow_status_table));
214   memset(empty_mem_area_256b, 0, sizeof(empty_mem_area_256b));
215 
216   memset(&bpas_table, 0, sizeof(bpas_table));
217   memset(&blp_table, 0, sizeof(blp_table));
218   memset(&bmed_table, 0, sizeof(bmed_table));
219   memset(&biss_table, 0, sizeof(biss_table));
220   memset(&bta_table, 0, sizeof(bta_table));
221   memset(&bitr_table, 0, sizeof(bitr_table));
222   memset(&sampling_table, 0, sizeof(sampling_table));
223   memset(&reload_map_tstamp, 0, sizeof(reload_map_tstamp));
224   log_notifications_init(&log_notifications);
225   config.acct_type = ACCT_NF;
226 
227   rows = 0;
228   memset(&device, 0, sizeof(device));
229 
230   memset(&recv_pptrs, 0, sizeof(recv_pptrs));
231   memset(&recv_pkthdr, 0, sizeof(recv_pkthdr));
232 
233   select_fd = 0;
234   bkp_select_fd = 0;
235   num_descs = 0;
236   FD_ZERO(&read_descs);
237   FD_ZERO(&bkp_read_descs);
238 
239   /* getting commandline values */
240   while (!errflag && ((cp = getopt(argc, argv, ARGS_NFACCTD)) != -1)) {
241     if (!cfg_cmdline[rows]) cfg_cmdline[rows] = malloc(SRVBUFLEN);
242     memset(cfg_cmdline[rows], 0, SRVBUFLEN);
243     switch (cp) {
244     case 'L':
245       strlcpy(cfg_cmdline[rows], "nfacctd_ip: ", SRVBUFLEN);
246       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
247       rows++;
248       break;
249     case 'l':
250       strlcpy(cfg_cmdline[rows], "nfacctd_port: ", SRVBUFLEN);
251       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
252       rows++;
253       break;
254     case 'P':
255       strlcpy(cfg_cmdline[rows], "plugins: ", SRVBUFLEN);
256       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
257       rows++;
258       break;
259     case 'D':
260       strlcpy(cfg_cmdline[rows], "daemonize: true", SRVBUFLEN);
261       rows++;
262       break;
263     case 'd':
264       debug = TRUE;
265       strlcpy(cfg_cmdline[rows], "debug: true", SRVBUFLEN);
266       rows++;
267       break;
268     case 'n':
269       strlcpy(cfg_cmdline[rows], "networks_file: ", SRVBUFLEN);
270       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
271       rows++;
272       break;
273     case 't':
274       strlcpy(cfg_cmdline[rows], "ports_file: ", SRVBUFLEN);
275       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
276       rows++;
277       break;
278     case 'O':
279       strlcpy(cfg_cmdline[rows], "print_output: ", SRVBUFLEN);
280       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
281       rows++;
282       break;
283     case 'o':
284       strlcpy(cfg_cmdline[rows], "print_output_file: ", SRVBUFLEN);
285       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
286       rows++;
287       break;
288     case 'M':
289       strlcpy(cfg_cmdline[rows], "print_markers: true", SRVBUFLEN);
290       rows++;
291       break;
292     case 'A':
293       strlcpy(cfg_cmdline[rows], "print_output_file_append: true", SRVBUFLEN);
294       rows++;
295       break;
296     case 'E':
297       strlcpy(cfg_cmdline[rows], "print_output_separator: ", SRVBUFLEN);
298       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
299       rows++;
300       break;
301     case 'u':
302       strlcpy(cfg_cmdline[rows], "print_num_protos: true", SRVBUFLEN);
303       rows++;
304       break;
305     case 'f':
306       strlcpy(config_file, optarg, sizeof(config_file));
307       free(cfg_cmdline[rows]);
308       cfg_cmdline[rows] = NULL;
309       break;
310     case 'F':
311       strlcpy(cfg_cmdline[rows], "pidfile: ", SRVBUFLEN);
312       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
313       rows++;
314       break;
315     case 'c':
316       strlcpy(cfg_cmdline[rows], "aggregate: ", SRVBUFLEN);
317       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
318       rows++;
319       break;
320     case 'b':
321       strlcpy(cfg_cmdline[rows], "imt_buckets: ", SRVBUFLEN);
322       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
323       rows++;
324       break;
325     case 'm':
326       strlcpy(cfg_cmdline[rows], "imt_mem_pools_number: ", SRVBUFLEN);
327       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
328       rows++;
329       break;
330     case 'p':
331       strlcpy(cfg_cmdline[rows], "imt_path: ", SRVBUFLEN);
332       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
333       rows++;
334       break;
335     case 'r':
336       strlcpy(cfg_cmdline[rows], "sql_refresh_time: ", SRVBUFLEN);
337       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
338       rows++;
339       break;
340     case 's':
341       strlcpy(cfg_cmdline[rows], "imt_mem_pools_size: ", SRVBUFLEN);
342       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
343       rows++;
344       break;
345     case 'S':
346       strlcpy(cfg_cmdline[rows], "syslog: ", SRVBUFLEN);
347       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
348       rows++;
349       break;
350     case 'R':
351       strlcpy(cfg_cmdline[rows], "sfacctd_renormalize: true", SRVBUFLEN);
352       rows++;
353       break;
354     case 'I':
355       strlcpy(cfg_cmdline[rows], "pcap_savefile: ", SRVBUFLEN);
356       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
357       rows++;
358       break;
359     case 'W':
360       strlcpy(cfg_cmdline[rows], "pcap_savefile_wait: true", SRVBUFLEN);
361       rows++;
362       break;
363     case 'Z':
364       strlcpy(cfg_cmdline[rows], "pcap_savefile_delay: ", SRVBUFLEN);
365       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
366       rows++;
367       break;
368     case 'Y':
369       strlcpy(cfg_cmdline[rows], "pcap_savefile_replay: ", SRVBUFLEN);
370       strncat(cfg_cmdline[rows], optarg, CFG_LINE_LEN(cfg_cmdline[rows]));
371       rows++;
372       break;
373     case 'h':
374       usage_daemon(argv[0]);
375       exit(0);
376       break;
377     case 'V':
378       version_daemon(NFACCTD_USAGE_HEADER);
379       exit(0);
380       break;
381     case 'a':
382       print_primitives(config.acct_type, NFACCTD_USAGE_HEADER);
383       exit(0);
384       break;
385     default:
386       usage_daemon(argv[0]);
387       exit(1);
388       break;
389     }
390   }
391 
392   /* post-checks and resolving conflicts */
393   if (strlen(config_file)) {
394     if (parse_configuration_file(config_file) != SUCCESS)
395       exit(1);
396   }
397   else {
398     if (parse_configuration_file(NULL) != SUCCESS)
399       exit(1);
400   }
401 
402   /* XXX: glue; i'm conscious it's a dirty solution from an engineering viewpoint;
403      someday later i'll fix this */
404   list = plugins_list;
405   while (list) {
406     list->cfg.acct_type = ACCT_NF;
407     set_default_preferences(&list->cfg);
408     if (!strcmp(list->type.string, "core")) {
409       memcpy(&config, &list->cfg, sizeof(struct configuration));
410       config.name = list->name;
411       config.type = list->type.string;
412     }
413     list = list->next;
414   }
415 
416   if (config.files_umask) umask(config.files_umask);
417 
418   initsetproctitle(argc, argv, envp);
419   if (config.syslog) {
420     logf = parse_log_facility(config.syslog);
421     if (logf == ERR) {
422       config.syslog = NULL;
423       printf("WARN ( %s/core ): specified syslog facility is not supported. Logging to standard error (stderr).\n", config.name);
424     }
425     else openlog(NULL, LOG_PID, logf);
426     Log(LOG_INFO, "INFO ( %s/core ): Start logging ...\n", config.name);
427   }
428 
429   if (config.logfile) {
430     config.logfile_fd = open_output_file(config.logfile, "a", FALSE);
431     list = plugins_list;
432     while (list) {
433       list->cfg.logfile_fd = config.logfile_fd ;
434       list = list->next;
435     }
436   }
437 
438   if (config.daemon) {
439     list = plugins_list;
440     while (list) {
441       if (!strcmp(list->type.string, "print") && !list->cfg.print_output_file)
442 	printf("INFO ( %s/%s ): Daemonizing. Bye bye screen.\n", list->name, list->type.string);
443       list = list->next;
444     }
445 
446     if (!config.syslog && !config.logfile) {
447       if (debug || config.debug) {
448 	printf("WARN ( %s/core ): debug is enabled; forking in background. Logging to standard error (stderr) will get lost.\n", config.name);
449       }
450     }
451 
452     daemonize();
453   }
454 
455   if (config.proc_priority) {
456     int ret;
457 
458     ret = setpriority(PRIO_PROCESS, 0, config.proc_priority);
459     if (ret) Log(LOG_WARNING, "WARN ( %s/core ): proc_priority failed (errno: %d)\n", config.name, errno);
460     else Log(LOG_INFO, "INFO ( %s/core ): proc_priority set to %d\n", config.name, getpriority(PRIO_PROCESS, 0));
461   }
462 
463   Log(LOG_INFO, "INFO ( %s/core ): %s %s (%s)\n", config.name, NFACCTD_USAGE_HEADER, PMACCT_VERSION, PMACCT_BUILD);
464   Log(LOG_INFO, "INFO ( %s/core ): %s\n", config.name, PMACCT_COMPILE_ARGS);
465 
466   if (strlen(config_file)) {
467     char canonical_path[PATH_MAX], *canonical_path_ptr;
468 
469     canonical_path_ptr = realpath(config_file, canonical_path);
470     if (canonical_path_ptr) Log(LOG_INFO, "INFO ( %s/core ): Reading configuration file '%s'.\n", config.name, canonical_path);
471   }
472   else Log(LOG_INFO, "INFO ( %s/core ): Reading configuration from cmdline.\n", config.name);
473 
474   /* Enforcing policies over aggregation methods */
475   list = plugins_list;
476   while (list) {
477     if (list->type.id != PLUGIN_ID_CORE) {
478       /* applies to all plugins */
479       plugin_pipe_check(&list->cfg);
480 
481       if (list->cfg.sampling_rate && config.ext_sampling_rate) {
482         Log(LOG_ERR, "ERROR ( %s/core ): Internal packet sampling and external packet sampling are mutual exclusive.\n", config.name);
483         exit_gracefully(1);
484       }
485 
486       /* applies to specific plugins */
487       if (list->type.id == PLUGIN_ID_NFPROBE || list->type.id == PLUGIN_ID_SFPROBE) {
488 	Log(LOG_ERR, "ERROR ( %s/core ): 'nfprobe' and 'sfprobe' plugins not supported in 'nfacctd'.\n", config.name);
489 	exit_gracefully(1);
490       }
491       else if (list->type.id == PLUGIN_ID_TEE) {
492         tee_plugins++;
493 	list->cfg.what_to_count = COUNT_NONE;
494 	list->cfg.data_type = PIPE_TYPE_MSG;
495       }
496       else {
497 	list->cfg.data_type = PIPE_TYPE_METADATA;
498 
499 	if (list->cfg.what_to_count_2 & (COUNT_POST_NAT_SRC_HOST|COUNT_POST_NAT_DST_HOST|
500 			COUNT_POST_NAT_SRC_PORT|COUNT_POST_NAT_DST_PORT|COUNT_NAT_EVENT|
501 			COUNT_TIMESTAMP_START|COUNT_TIMESTAMP_END|COUNT_TIMESTAMP_ARRIVAL))
502 	  list->cfg.data_type |= PIPE_TYPE_NAT;
503 
504 	if (list->cfg.what_to_count_2 & (COUNT_MPLS_LABEL_TOP|COUNT_MPLS_LABEL_BOTTOM|
505 			COUNT_MPLS_STACK_DEPTH))
506 	  list->cfg.data_type |= PIPE_TYPE_MPLS;
507 
508 	if (list->cfg.what_to_count_2 & (COUNT_TUNNEL_SRC_MAC|COUNT_TUNNEL_DST_MAC|
509 			COUNT_TUNNEL_SRC_HOST|COUNT_TUNNEL_DST_HOST|COUNT_TUNNEL_IP_PROTO|
510 			COUNT_TUNNEL_IP_TOS|COUNT_TUNNEL_SRC_PORT|COUNT_TUNNEL_DST_PORT|
511 			COUNT_VXLAN))
512 	  list->cfg.data_type |= PIPE_TYPE_TUN;
513 
514 	if (list->cfg.what_to_count_2 & (COUNT_LABEL))
515 	  list->cfg.data_type |= PIPE_TYPE_VLEN;
516 
517         if (list->cfg.what_to_count & (COUNT_SRC_PORT|COUNT_DST_PORT|COUNT_SUM_PORT|COUNT_TCPFLAGS)) {
518           enable_ip_fragment_handler();
519 	}
520 
521 	evaluate_sums(&list->cfg.what_to_count, &list->cfg.what_to_count_2, list->name, list->type.string);
522 	if (!list->cfg.what_to_count && !list->cfg.what_to_count_2 && !list->cfg.cpptrs.num) {
523 	  Log(LOG_WARNING, "WARN ( %s/%s ): defaulting to SRC HOST aggregation.\n", list->name, list->type.string);
524 	  list->cfg.what_to_count |= COUNT_SRC_HOST;
525 	}
526 	if (list->cfg.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) {
527 	  if (!list->cfg.networks_file && list->cfg.nfacctd_as & NF_AS_NEW) {
528 	    Log(LOG_ERR, "ERROR ( %s/%s ): AS aggregation selected but NO 'networks_file' specified. Exiting...\n\n", list->name, list->type.string);
529 	    exit_gracefully(1);
530 	  }
531           if (!list->cfg.bgp_daemon && !list->cfg.bmp_daemon && list->cfg.nfacctd_as == NF_AS_BGP) {
532             Log(LOG_ERR, "ERROR ( %s/%s ): AS aggregation selected but 'bgp_daemon' or 'bmp_daemon' is not enabled. Exiting...\n\n", list->name, list->type.string);
533             exit_gracefully(1);
534 	  }
535           if (list->cfg.nfacctd_as & NF_AS_FALLBACK && list->cfg.networks_file)
536             list->cfg.nfacctd_as |= NF_AS_NEW;
537         }
538 	if (list->cfg.what_to_count & (COUNT_SRC_NET|COUNT_DST_NET|COUNT_SUM_NET|COUNT_SRC_NMASK|COUNT_DST_NMASK|COUNT_PEER_DST_IP)) {
539 	  if (!list->cfg.nfacctd_net) {
540 	    if (list->cfg.networks_file) list->cfg.nfacctd_net |= NF_NET_NEW;
541 	    if (list->cfg.networks_mask) list->cfg.nfacctd_net |= NF_NET_STATIC;
542 	    if (!list->cfg.nfacctd_net) list->cfg.nfacctd_net = NF_NET_KEEP;
543 	  }
544 	  else {
545 	    if ((list->cfg.nfacctd_net == NF_NET_NEW && !list->cfg.networks_file) ||
546 	        (list->cfg.nfacctd_net == NF_NET_STATIC && !list->cfg.networks_mask) ||
547 	        (list->cfg.nfacctd_net == NF_NET_BGP && !list->cfg.bgp_daemon && !list->cfg.bmp_daemon) ||
548 	        (list->cfg.nfacctd_net == NF_NET_IGP && !list->cfg.nfacctd_isis)) {
549 	      Log(LOG_ERR, "ERROR ( %s/%s ): network aggregation selected but none of 'bgp_daemon', 'bmp_daemon', 'isis_daemon', 'networks_file', 'networks_mask' is specified. Exiting ...\n\n", list->name, list->type.string);
550 	      exit_gracefully(1);
551 	    }
552             if (list->cfg.nfacctd_net & NF_NET_FALLBACK && list->cfg.networks_file)
553               list->cfg.nfacctd_net |= NF_NET_NEW;
554 	  }
555 	}
556 
557 #if defined (WITH_NDPI)
558         if (list->cfg.what_to_count_2 & COUNT_NDPI_CLASS) {
559 	  enable_ip_fragment_handler();
560           config.classifier_ndpi = TRUE;
561         }
562 
563         if ((list->cfg.what_to_count & COUNT_CLASS) && (list->cfg.what_to_count_2 & COUNT_NDPI_CLASS)) {
564           Log(LOG_ERR, "ERROR ( %s/%s ): 'class_legacy' and 'class' primitives are mutual exclusive. Exiting...\n\n", list->name, list->type.string);
565           exit_gracefully(1);
566         }
567 #endif
568 
569 	list->cfg.type_id = list->type.id;
570 	bgp_config_checks(&list->cfg);
571 
572 	data_plugins++;
573 	list->cfg.what_to_count |= COUNT_COUNTERS;
574       }
575     }
576 
577     list = list->next;
578   }
579 
580   if (tee_plugins && data_plugins) {
581     Log(LOG_ERR, "ERROR ( %s/core ): 'tee' plugins are not compatible with data (memory/mysql/pgsql/etc.) plugins. Exiting...\n\n", config.name);
582     exit_gracefully(1);
583   }
584 
585   if (config.pcap_savefile) capture_methods++;
586   if (config.nfacctd_port || config.nfacctd_ip) capture_methods++;
587 #ifdef WITH_KAFKA
588   if (config.nfacctd_kafka_broker_host || config.nfacctd_kafka_topic) capture_methods++;
589 #endif
590 #ifdef WITH_ZMQ
591   if (config.nfacctd_zmq_address) capture_methods++;
592 #endif
593 
594   if (capture_methods > 1) {
595     Log(LOG_ERR, "ERROR ( %s/core ): pcap_savefile, nfacctd_ip, nfacctd_kafka_* and nfacctd_zmq_* are mutual exclusive. Exiting...\n\n", config.name);
596     exit_gracefully(1);
597   }
598 
599   if (config.nfacctd_templates_receiver) {
600     if (!config.nfacctd_port && !config.nfacctd_ip && capture_methods) {
601       Log(LOG_ERR, "ERROR ( %s/core ): nfacctd_templates_receiver only applies to live UDP collection (nfacctd_ip, nfacctd_port). Exiting...\n\n", config.name);
602       exit_gracefully(1);
603     }
604 
605     if (tee_plugins) {
606       Log(LOG_ERR, "ERROR ( %s/core ): nfacctd_templates_receiver and tee plugin ae mutual exclusive. Exiting...\n\n", config.name);
607       exit_gracefully(1);
608     }
609   }
610 
611 #ifdef WITH_KAFKA
612   if ((config.nfacctd_kafka_broker_host && !config.nfacctd_kafka_topic) || (config.nfacctd_kafka_topic && !config.nfacctd_kafka_broker_host)) {
613     Log(LOG_ERR, "ERROR ( %s/core ): Kafka collection requires both nfacctd_kafka_broker_host and nfacctd_kafka_topic to be specified. Exiting...\n\n", config.name);
614     exit_gracefully(1);
615   }
616 
617   if (config.nfacctd_kafka_broker_host && tee_plugins) {
618     Log(LOG_ERR, "ERROR ( %s/core ): Kafka collection is mutual exclusive with 'tee' plugins. Exiting...\n\n", config.name);
619     exit_gracefully(1);
620   }
621 #endif
622 
623 #ifdef WITH_ZMQ
624   if (config.nfacctd_zmq_address && tee_plugins) {
625     Log(LOG_ERR, "ERROR ( %s/core ): ZeroMQ collection is mutual exclusive with 'tee' plugins. Exiting...\n\n", config.name);
626     exit_gracefully(1);
627   }
628 #endif
629 
630   /* signal handling we want to inherit to plugins (when not re-defined elsewhere) */
631   memset(&sighandler_action, 0, sizeof(sighandler_action)); /* To ensure the struct holds no garbage values */
632   sigemptyset(&sighandler_action.sa_mask);  /* Within a signal handler all the signals are enabled */
633   sighandler_action.sa_flags = SA_RESTART;  /* To enable re-entering a system call afer done with signal handling */
634 
635   sighandler_action.sa_handler = startup_handle_falling_child;
636   sigaction(SIGCHLD, &sighandler_action, NULL);
637 
638   /* handles reopening of syslog channel */
639   sighandler_action.sa_handler = reload;
640   sigaction(SIGHUP, &sighandler_action, NULL);
641 
642   /* logs various statistics via Log() calls */
643   sighandler_action.sa_handler = push_stats;
644   sigaction(SIGUSR1, &sighandler_action, NULL);
645 
646   /* sets to true the reload_maps flag */
647   sighandler_action.sa_handler = reload_maps;
648   sigaction(SIGUSR2, &sighandler_action, NULL);
649 
650   /* we want to exit gracefully when a pipe is broken */
651   sighandler_action.sa_handler = SIG_IGN;
652   sigaction(SIGPIPE, &sighandler_action, NULL);
653 
654   sighandler_action.sa_handler = PM_sigalrm_noop_handler;
655   sigaction(SIGALRM, &sighandler_action, NULL);
656 
657   if (config.pcap_savefile) {
658     open_pcap_savefile(&device, config.pcap_savefile);
659     pm_pcap_savefile_round = 1;
660 
661     enable_ip_fragment_handler();
662   }
663 #ifdef WITH_KAFKA
664   else if (config.nfacctd_kafka_broker_host) {
665     NF_init_kafka_host(&nfacctd_kafka_host);
666     recv_pptrs.pkthdr = &recv_pkthdr;
667 
668     enable_ip_fragment_handler();
669   }
670 #endif
671 
672 #ifdef WITH_ZMQ
673   else if (config.nfacctd_zmq_address) {
674     int pipe_fd = 0;
675     NF_init_zmq_host(&nfacctd_zmq_host, &pipe_fd);
676     recv_pptrs.pkthdr = &recv_pkthdr;
677 
678     enable_ip_fragment_handler();
679   }
680 #endif
681   else {
682     /* If no IP address is supplied, let's set our default
683        behaviour: IPv4 address, INADDR_ANY, port 2100 */
684     if (!config.nfacctd_port) config.nfacctd_port = DEFAULT_NFACCTD_PORT;
685     collector_port = config.nfacctd_port;
686 
687     if (!config.nfacctd_ip) {
688       struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&server;
689 
690       sa6->sin6_family = AF_INET6;
691       sa6->sin6_port = htons(config.nfacctd_port);
692       slen = sizeof(struct sockaddr_in6);
693 
694       if (config.nfacctd_templates_port) {
695 	sa6 = (struct sockaddr_in6 *)&server_templates;
696 
697 	sa6->sin6_family = AF_INET6;
698 	sa6->sin6_port = htons(config.nfacctd_templates_port);
699       }
700     }
701     else {
702       trim_spaces(config.nfacctd_ip);
703       ret = str_to_addr(config.nfacctd_ip, &addr);
704       if (!ret) {
705 	Log(LOG_ERR, "ERROR ( %s/core ): 'nfacctd_ip' value is not valid. Exiting.\n", config.name);
706 	exit_gracefully(1);
707       }
708       slen = addr_to_sa((struct sockaddr *)&server, &addr, config.nfacctd_port);
709 
710       if (config.nfacctd_templates_port) {
711         addr_to_sa((struct sockaddr *)&server_templates, &addr, config.nfacctd_templates_port);
712       }
713     }
714 
715     /* socket creation */
716     config.sock = socket(((struct sockaddr *)&server)->sa_family, SOCK_DGRAM, 0);
717     if (config.sock < 0) {
718       /* retry with IPv4 */
719       if (!config.nfacctd_ip) {
720 	struct sockaddr_in *sa4 = (struct sockaddr_in *)&server;
721 
722 	sa4->sin_family = AF_INET;
723 	sa4->sin_addr.s_addr = htonl(0);
724 	sa4->sin_port = htons(config.nfacctd_port);
725 	slen = sizeof(struct sockaddr_in);
726 
727 	config.sock = socket(((struct sockaddr *)&server)->sa_family, SOCK_DGRAM, 0);
728       }
729 
730       if (config.sock < 0) {
731 	Log(LOG_ERR, "ERROR ( %s/core ): socket() failed.\n", config.name);
732 	exit_gracefully(1);
733       }
734     }
735 
736     if (config.nfacctd_templates_port) {
737       config.nfacctd_templates_sock = socket(((struct sockaddr *)&server_templates)->sa_family, SOCK_DGRAM, 0);
738       if (config.nfacctd_templates_sock < 0) {
739 	/* retry with IPv4 */
740 	if (!config.nfacctd_ip) {
741 	  struct sockaddr_in *sa4 = (struct sockaddr_in *)&server_templates;
742 
743 	  sa4->sin_family = AF_INET;
744 	  sa4->sin_addr.s_addr = htonl(0);
745 	  sa4->sin_port = htons(config.nfacctd_templates_port);
746 	  slen = sizeof(struct sockaddr_in);
747 
748 	  config.nfacctd_templates_sock = socket(((struct sockaddr *)&server_templates)->sa_family, SOCK_DGRAM, 0);
749 	}
750 
751 	if (config.nfacctd_templates_sock < 0) {
752 	  Log(LOG_ERR, "ERROR ( %s/core ): socket() failed.\n", config.name);
753 	  exit_gracefully(1);
754 	}
755       }
756 
757       FD_SET(config.sock, &bkp_read_descs);
758       FD_SET(config.nfacctd_templates_sock, &bkp_read_descs);
759       bkp_select_fd = (config.sock < config.nfacctd_templates_sock) ? config.nfacctd_templates_sock : config.sock;
760       bkp_select_fd++;
761     }
762 
763     /* bind socket to port */
764 #if (defined LINUX) && (defined HAVE_SO_REUSEPORT)
765     rc = setsockopt(config.sock, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, (char *) &yes, (socklen_t) sizeof(yes));
766     if (rc < 0) Log(LOG_ERR, "WARN ( %s/core ): setsockopt() failed for SO_REUSEADDR|SO_REUSEPORT.\n", config.name);
767 
768     if (config.nfacctd_templates_port) {
769       rc = setsockopt(config.nfacctd_templates_sock, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, (char *) &yes, (socklen_t) sizeof(yes));
770       if (rc < 0) Log(LOG_ERR, "WARN ( %s/core ): setsockopt() failed for SO_REUSEADDR|SO_REUSEPORT.\n", config.name);
771     }
772 #else
773     rc = setsockopt(config.sock, SOL_SOCKET, SO_REUSEADDR, (char *) &yes, (socklen_t) sizeof(yes));
774     if (rc < 0) Log(LOG_ERR, "WARN ( %s/core ): setsockopt() failed for SO_REUSEADDR.\n", config.name);
775 
776     if (config.nfacctd_templates_port) {
777       rc = setsockopt(config.nfacctd_templates_sock, SOL_SOCKET, SO_REUSEADDR, (char *) &yes, (socklen_t) sizeof(yes));
778       if (rc < 0) Log(LOG_ERR, "WARN ( %s/core ): setsockopt() failed for SO_REUSEADDR.\n", config.name);
779     }
780 #endif
781 
782 #if (defined IPV6_BINDV6ONLY)
783     {
784      int no=0;
785 
786       rc = setsockopt(config.sock, IPPROTO_IPV6, IPV6_BINDV6ONLY, (char *) &no, (socklen_t) sizeof(no));
787       if (rc < 0) Log(LOG_ERR, "WARN ( %s/core ): setsockopt() failed for IPV6_BINDV6ONLY.\n", config.name);
788     }
789 #endif
790 
791     if (config.nfacctd_pipe_size) {
792       socklen_t l = sizeof(config.nfacctd_pipe_size);
793       int saved = 0, obtained = 0;
794 
795       getsockopt(config.sock, SOL_SOCKET, SO_RCVBUF, &saved, &l);
796       Setsocksize(config.sock, SOL_SOCKET, SO_RCVBUF, &config.nfacctd_pipe_size, (socklen_t) sizeof(config.nfacctd_pipe_size));
797       getsockopt(config.sock, SOL_SOCKET, SO_RCVBUF, &obtained, &l);
798 
799       if (obtained < saved) {
800 	Setsocksize(config.sock, SOL_SOCKET, SO_RCVBUF, &saved, l);
801 	getsockopt(config.sock, SOL_SOCKET, SO_RCVBUF, &obtained, &l);
802       }
803       Log(LOG_INFO, "INFO ( %s/core ): nfacctd_pipe_size: obtained=%d target=%d.\n", config.name, obtained, config.nfacctd_pipe_size);
804     }
805 
806     /* Multicast: memberships handling */
807     for (idx = 0; mcast_groups[idx].family && idx < MAX_MCAST_GROUPS; idx++) {
808       if (mcast_groups[idx].family == AF_INET) {
809 	memset(&multi_req4, 0, sizeof(multi_req4));
810 	multi_req4.imr_multiaddr.s_addr = mcast_groups[idx].address.ipv4.s_addr;
811 	if (setsockopt(config.sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&multi_req4, (socklen_t) sizeof(multi_req4)) < 0) {
812 	  Log(LOG_ERR, "ERROR ( %s/core ): IPv4 multicast address - ADD membership failed.\n", config.name);
813 	  exit_gracefully(1);
814 	}
815       }
816       if (mcast_groups[idx].family == AF_INET6) {
817 	memset(&multi_req6, 0, sizeof(multi_req6));
818 	ip6_addr_cpy(&multi_req6.ipv6mr_multiaddr, &mcast_groups[idx].address.ipv6);
819 	if (setsockopt(config.sock, IPPROTO_IPV6, IPV6_JOIN_GROUP, (char *)&multi_req6, (socklen_t) sizeof(multi_req6)) < 0) {
820 	  Log(LOG_ERR, "ERROR ( %s/core ): IPv6 multicast address - ADD membership failed.\n", config.name);
821 	  exit_gracefully(1);
822 	}
823       }
824     }
825 
826     memset(&tee_templates, 0, sizeof(struct tee_receiver));
827 
828     if (config.nfacctd_templates_receiver) {
829       tee_templates.dest_len = sizeof(tee_templates.dest);
830 
831       ret = Tee_parse_hostport(config.nfacctd_templates_receiver, (struct sockaddr *) &tee_templates.dest, &tee_templates.dest_len, FALSE);
832       if (ret) {
833 	Log(LOG_ERR, "ERROR ( %s/core ): Invalid receiver: %s.\n", config.name, config.nfacctd_templates_receiver);
834 	exit_gracefully(1);
835       }
836 
837       tee_templates.fd = Tee_prepare_sock((struct sockaddr *) &tee_templates.dest, tee_templates.dest_len, FALSE, TRUE, FALSE);
838     }
839   }
840 
841   if (config.nfacctd_allow_file) load_allow_file(config.nfacctd_allow_file, &allow);
842   else memset(&allow, 0, sizeof(allow));
843 
844   if (config.sampling_map) {
845     load_id_file(MAP_SAMPLING, config.sampling_map, &sampling_table, &req, &sampling_map_allocated);
846     set_sampling_table(&pptrs, (u_char *) &sampling_table);
847   }
848   else set_sampling_table(&pptrs, NULL);
849 
850   if (config.nfacctd_flow_to_rd_map) {
851     load_id_file(MAP_FLOW_TO_RD, config.nfacctd_flow_to_rd_map, &bitr_table, &req, &bitr_map_allocated);
852     pptrs.v4.bitr_table = (u_char *) &bitr_table;
853   }
854   else pptrs.v4.bitr_table = NULL;
855 
856   if (config.aggregate_primitives) {
857     req.key_value_table = (void *) &custom_primitives_registry;
858     load_id_file(MAP_CUSTOM_PRIMITIVES, config.aggregate_primitives, NULL, &req, &custom_primitives_allocated);
859   }
860   else memset(&custom_primitives_registry, 0, sizeof(custom_primitives_registry));
861 
862   /* fixing per plugin custom primitives pointers, offsets and lengths */
863   list = plugins_list;
864   while(list) {
865     custom_primitives_reconcile(&list->cfg.cpptrs, &custom_primitives_registry);
866     if (custom_primitives_vlen(&list->cfg.cpptrs)) list->cfg.data_type |= PIPE_TYPE_VLEN;
867     list = list->next;
868   }
869 
870   if (config.bgp_daemon && config.bmp_daemon) {
871     Log(LOG_ERR, "ERROR ( %s/core ): bgp_daemon and bmp_daemon are currently mutual exclusive. Exiting.\n", config.name);
872     exit_gracefully(1);
873   }
874 
875   /* starting the ISIS threa */
876   if (config.nfacctd_isis) {
877     req.bpf_filter = TRUE;
878 
879     nfacctd_isis_wrapper();
880 
881     /* Let's give the ISIS thread some advantage to create its structures */
882     sleep(DEFAULT_SLOTH_SLEEP_TIME);
883   }
884 
885   /* starting the BGP thread */
886   if (config.bgp_daemon) {
887     int sleep_time = DEFAULT_SLOTH_SLEEP_TIME;
888 
889     req.bpf_filter = TRUE;
890 
891     if (config.bgp_daemon_stdcomm_pattern_to_asn && config.bgp_daemon_lrgcomm_pattern_to_asn) {
892       Log(LOG_ERR, "ERROR ( %s/core ): bgp_stdcomm_pattern_to_asn and bgp_lrgcomm_pattern_to_asn are mutual exclusive. Exiting.\n", config.name);
893       exit_gracefully(1);
894     }
895 
896     load_comm_patterns(&config.bgp_daemon_stdcomm_pattern, &config.bgp_daemon_extcomm_pattern,
897 			&config.bgp_daemon_lrgcomm_pattern, &config.bgp_daemon_stdcomm_pattern_to_asn,
898 			&config.bgp_daemon_lrgcomm_pattern_to_asn);
899 
900     if (config.bgp_daemon_peer_as_src_type == BGP_SRC_PRIMITIVES_MAP) {
901       if (config.bgp_daemon_peer_as_src_map) {
902         load_id_file(MAP_BGP_PEER_AS_SRC, config.bgp_daemon_peer_as_src_map, &bpas_table, &req, &bpas_map_allocated);
903         pptrs.v4.bpas_table = (u_char *) &bpas_table;
904       }
905       else {
906 	Log(LOG_ERR, "ERROR ( %s/core ): bgp_peer_as_src_type set to 'map' but no map defined. Exiting.\n", config.name);
907 	exit_gracefully(1);
908       }
909     }
910     else pptrs.v4.bpas_table = NULL;
911 
912     if (config.bgp_daemon_src_local_pref_type == BGP_SRC_PRIMITIVES_MAP) {
913       if (config.bgp_daemon_src_local_pref_map) {
914         load_id_file(MAP_BGP_SRC_LOCAL_PREF, config.bgp_daemon_src_local_pref_map, &blp_table, &req, &blp_map_allocated);
915         pptrs.v4.blp_table = (u_char *) &blp_table;
916       }
917       else {
918 	Log(LOG_ERR, "ERROR ( %s/core ): bgp_src_local_pref_type set to 'map' but no map defined. Exiting.\n", config.name);
919 	exit_gracefully(1);
920       }
921     }
922     else pptrs.v4.blp_table = NULL;
923 
924     if (config.bgp_daemon_src_med_type == BGP_SRC_PRIMITIVES_MAP) {
925       if (config.bgp_daemon_src_med_map) {
926         load_id_file(MAP_BGP_SRC_MED, config.bgp_daemon_src_med_map, &bmed_table, &req, &bmed_map_allocated);
927         pptrs.v4.bmed_table = (u_char *) &bmed_table;
928       }
929       else {
930 	Log(LOG_ERR, "ERROR ( %s/core ): bgp_src_med_type set to 'map' but no map defined. Exiting.\n", config.name);
931 	exit_gracefully(1);
932       }
933     }
934     else pptrs.v4.bmed_table = NULL;
935 
936     if (config.bgp_daemon_to_xflow_agent_map) {
937       load_id_file(MAP_BGP_TO_XFLOW_AGENT, config.bgp_daemon_to_xflow_agent_map, &bta_table, &req, &bta_map_allocated);
938       pptrs.v4.bta_table = (u_char *) &bta_table;
939     }
940     else pptrs.v4.bta_table = NULL;
941 
942     bgp_daemon_wrapper();
943 
944     /* Let's give the BGP thread some advantage to create its structures */
945     if (config.rpki_roas_file || config.rpki_rtr_cache) sleep_time += DEFAULT_SLOTH_SLEEP_TIME;
946     sleep(sleep_time);
947   }
948 
949   /* starting the BMP thread */
950   if (config.bmp_daemon) {
951     int sleep_time = DEFAULT_SLOTH_SLEEP_TIME;
952 
953     req.bpf_filter = TRUE;
954 
955     bmp_daemon_wrapper();
956 
957     /* Let's give the BMP thread some advantage to create its structures */
958     if (config.rpki_roas_file || config.rpki_rtr_cache) sleep_time += DEFAULT_SLOTH_SLEEP_TIME;
959     sleep(sleep_time);
960   }
961 
962   /* starting the telemetry thread */
963   if (config.telemetry_daemon) {
964     telemetry_wrapper();
965 
966     /* Let's give the telemetry thread some advantage to create its structures */
967     sleep(DEFAULT_SLOTH_SLEEP_TIME);
968   }
969 
970 #if defined WITH_GEOIP
971   if (config.geoip_ipv4_file || config.geoip_ipv6_file) {
972     req.bpf_filter = TRUE;
973   }
974 #endif
975 
976 #if defined WITH_GEOIPV2
977   if (config.geoipv2_file) {
978     req.bpf_filter = TRUE;
979   }
980 #endif
981 
982   if (!config.pcap_savefile && !config.nfacctd_kafka_broker_host && !config.nfacctd_zmq_address) {
983     rc = bind(config.sock, (struct sockaddr *) &server, slen);
984     if (rc < 0) {
985       Log(LOG_ERR, "ERROR ( %s/core ): bind() to ip=%s port=%d/udp failed (errno: %d).\n", config.name, config.nfacctd_ip, config.nfacctd_port, errno);
986       exit_gracefully(1);
987     }
988 
989     if (config.nfacctd_templates_port) {
990       rc = bind(config.nfacctd_templates_sock, (struct sockaddr *) &server_templates, slen);
991       if (rc < 0) {
992 	Log(LOG_ERR, "ERROR ( %s/core ): bind() to ip=%s port=%d/udp failed (errno: %d).\n", config.name, config.nfacctd_ip, config.nfacctd_templates_port, errno);
993 	exit_gracefully(1);
994       }
995     }
996   }
997 
998   init_classifiers(NULL);
999 
1000 #if defined (WITH_NDPI)
1001   if (config.classifier_ndpi) {
1002     enable_ip_fragment_handler();
1003     pm_ndpi_wfl = pm_ndpi_workflow_init();
1004     pm_ndpi_export_proto_to_class(pm_ndpi_wfl);
1005   }
1006   else pm_ndpi_wfl = NULL;
1007 #endif
1008 
1009   /* plugins glue: creation */
1010   load_plugins(&req);
1011   load_plugin_filters(1);
1012   evaluate_packet_handlers();
1013   pm_setproctitle("%s [%s]", "Core Process", config.proc_name);
1014   if (config.pidfile) write_pid_file(config.pidfile);
1015   load_networks(config.networks_file, &nt, &nc);
1016 
1017   /* signals to be handled only by the core process;
1018      we set proper handlers after plugin creation */
1019   sighandler_action.sa_handler = PM_sigint_handler;
1020   sigaction(SIGINT, &sighandler_action, NULL);
1021 
1022   sighandler_action.sa_handler = PM_sigint_handler;
1023   sigaction(SIGTERM, &sighandler_action, NULL);
1024 
1025   sighandler_action.sa_handler = handle_falling_child;
1026   sigaction(SIGCHLD, &sighandler_action, NULL);
1027 
1028   kill(getpid(), SIGCHLD);
1029 
1030   /* initializing template cache */
1031   memset(&tpl_cache, 0, sizeof(tpl_cache));
1032   tpl_cache.num = TEMPLATE_CACHE_ENTRIES;
1033 
1034   if (config.nfacctd_templates_file) {
1035     load_templates_from_file(config.nfacctd_templates_file);
1036   }
1037 
1038   /* arranging static pointers to dummy packet; to speed up things into the
1039      main loop we mantain two packet_ptrs structures when IPv6 is enabled:
1040      we will sync here 'pptrs6' for common tables and pointers */
1041   memset(dummy_packet, 0, sizeof(dummy_packet));
1042   pptrs.v4.f_agent = (u_char *) &client;
1043   pptrs.v4.packet_ptr = dummy_packet;
1044   pptrs.v4.pkthdr = &dummy_pkthdr;
1045   Assign16(((struct eth_header *)pptrs.v4.packet_ptr)->ether_type, htons(ETHERTYPE_IP)); /* 0x800 */
1046   pptrs.v4.mac_ptr = (u_char *)((struct eth_header *)pptrs.v4.packet_ptr)->ether_dhost;
1047   pptrs.v4.iph_ptr = pptrs.v4.packet_ptr + ETHER_HDRLEN;
1048   pptrs.v4.tlh_ptr = pptrs.v4.packet_ptr + ETHER_HDRLEN + sizeof(struct pm_iphdr);
1049   Assign8(((struct pm_iphdr *)pptrs.v4.iph_ptr)->ip_vhl, 5);
1050   // pptrs.v4.pkthdr->caplen = 38; /* eth_header + pm_iphdr + pm_tlhdr */
1051   pptrs.v4.pkthdr->caplen = 55;
1052   pptrs.v4.pkthdr->len = 100; /* fake len */
1053   pptrs.v4.l3_proto = ETHERTYPE_IP;
1054 
1055   memset(dummy_packet_vlan, 0, sizeof(dummy_packet_vlan));
1056   pptrs.vlan4.f_agent = (u_char *) &client;
1057   pptrs.vlan4.packet_ptr = dummy_packet_vlan;
1058   pptrs.vlan4.pkthdr = &dummy_pkthdr_vlan;
1059   Assign16(((struct eth_header *)pptrs.vlan4.packet_ptr)->ether_type, htons(ETHERTYPE_8021Q));
1060   pptrs.vlan4.mac_ptr = (u_char *)((struct eth_header *)pptrs.vlan4.packet_ptr)->ether_dhost;
1061   pptrs.vlan4.vlan_ptr = pptrs.vlan4.packet_ptr + ETHER_HDRLEN;
1062   Assign16(*(pptrs.vlan4.vlan_ptr+2), htons(ETHERTYPE_IP));
1063   pptrs.vlan4.iph_ptr = pptrs.vlan4.packet_ptr + ETHER_HDRLEN + IEEE8021Q_TAGLEN;
1064   pptrs.vlan4.tlh_ptr = pptrs.vlan4.packet_ptr + ETHER_HDRLEN + IEEE8021Q_TAGLEN + sizeof(struct pm_iphdr);
1065   Assign8(((struct pm_iphdr *)pptrs.vlan4.iph_ptr)->ip_vhl, 5);
1066   // pptrs.vlan4.pkthdr->caplen = 42; /* eth_header + vlan + pm_iphdr + pm_tlhdr */
1067   pptrs.vlan4.pkthdr->caplen = 59;
1068   pptrs.vlan4.pkthdr->len = 100; /* fake len */
1069   pptrs.vlan4.l3_proto = ETHERTYPE_IP;
1070 
1071   memset(dummy_packet_mpls, 0, sizeof(dummy_packet_mpls));
1072   pptrs.mpls4.f_agent = (u_char *) &client;
1073   pptrs.mpls4.packet_ptr = dummy_packet_mpls;
1074   pptrs.mpls4.pkthdr = &dummy_pkthdr_mpls;
1075   Assign16(((struct eth_header *)pptrs.mpls4.packet_ptr)->ether_type, htons(ETHERTYPE_MPLS));
1076   pptrs.mpls4.mac_ptr = (u_char *)((struct eth_header *)pptrs.mpls4.packet_ptr)->ether_dhost;
1077   pptrs.mpls4.mpls_ptr = pptrs.mpls4.packet_ptr + ETHER_HDRLEN;
1078   // pptrs.mpls4.pkthdr->caplen = 78; /* eth_header + upto 10 MPLS labels + pm_iphdr + pm_tlhdr */
1079   pptrs.mpls4.pkthdr->caplen = 95;
1080   pptrs.mpls4.pkthdr->len = 100; /* fake len */
1081   pptrs.mpls4.l3_proto = ETHERTYPE_IP;
1082 
1083   memset(dummy_packet_vlan_mpls, 0, sizeof(dummy_packet_vlan_mpls));
1084   pptrs.vlanmpls4.f_agent = (u_char *) &client;
1085   pptrs.vlanmpls4.packet_ptr = dummy_packet_vlan_mpls;
1086   pptrs.vlanmpls4.pkthdr = &dummy_pkthdr_vlan_mpls;
1087   Assign16(((struct eth_header *)pptrs.vlanmpls4.packet_ptr)->ether_type, htons(ETHERTYPE_8021Q));
1088   pptrs.vlanmpls4.mac_ptr = (u_char *)((struct eth_header *)pptrs.vlanmpls4.packet_ptr)->ether_dhost;
1089   pptrs.vlanmpls4.vlan_ptr = pptrs.vlanmpls4.packet_ptr + ETHER_HDRLEN;
1090   Assign16(((struct vlan_header *)pptrs.vlanmpls4.vlan_ptr)->proto, htons(ETHERTYPE_MPLS));
1091   pptrs.vlanmpls4.mpls_ptr = pptrs.vlanmpls4.packet_ptr + ETHER_HDRLEN + IEEE8021Q_TAGLEN;
1092   // pptrs.vlanmpls4.pkthdr->caplen = 82; /* eth_header + vlan + upto 10 MPLS labels + pm_iphdr + pm_tlhdr */
1093   pptrs.vlanmpls4.pkthdr->caplen = 99;
1094   pptrs.vlanmpls4.pkthdr->len = 100; /* fake len */
1095   pptrs.vlanmpls4.l3_proto = ETHERTYPE_IP;
1096 
1097   memset(dummy_packet6, 0, sizeof(dummy_packet6));
1098   pptrs.v6.f_agent = (u_char *) &client;
1099   pptrs.v6.packet_ptr = dummy_packet6;
1100   pptrs.v6.pkthdr = &dummy_pkthdr6;
1101   Assign16(((struct eth_header *)pptrs.v6.packet_ptr)->ether_type, htons(ETHERTYPE_IPV6));
1102   pptrs.v6.mac_ptr = (u_char *)((struct eth_header *)pptrs.v6.packet_ptr)->ether_dhost;
1103   pptrs.v6.iph_ptr = pptrs.v6.packet_ptr + ETHER_HDRLEN;
1104   pptrs.v6.tlh_ptr = pptrs.v6.packet_ptr + ETHER_HDRLEN + sizeof(struct ip6_hdr);
1105   Assign16(((struct ip6_hdr *)pptrs.v6.iph_ptr)->ip6_plen, htons(100));
1106   ((struct ip6_hdr *)pptrs.v6.iph_ptr)->ip6_hlim = 64;
1107   // pptrs.v6.pkthdr->caplen = 60; /* eth_header + ip6_hdr + pm_tlhdr */
1108   pptrs.v6.pkthdr->caplen = 77;
1109   pptrs.v6.pkthdr->len = 100; /* fake len */
1110   pptrs.v6.l3_proto = ETHERTYPE_IPV6;
1111 
1112   memset(dummy_packet_vlan6, 0, sizeof(dummy_packet_vlan6));
1113   pptrs.vlan6.f_agent = (u_char *) &client;
1114   pptrs.vlan6.packet_ptr = dummy_packet_vlan6;
1115   pptrs.vlan6.pkthdr = &dummy_pkthdr_vlan6;
1116   Assign16(((struct eth_header *)pptrs.vlan6.packet_ptr)->ether_type, htons(ETHERTYPE_8021Q));
1117   pptrs.vlan6.mac_ptr = (u_char *)((struct eth_header *)pptrs.vlan6.packet_ptr)->ether_dhost;
1118   pptrs.vlan6.vlan_ptr = pptrs.vlan6.packet_ptr + ETHER_HDRLEN;
1119   Assign8(*(pptrs.vlan6.vlan_ptr+2), 0x86);
1120   Assign8(*(pptrs.vlan6.vlan_ptr+3), 0xDD);
1121   pptrs.vlan6.iph_ptr = pptrs.vlan6.packet_ptr + ETHER_HDRLEN + IEEE8021Q_TAGLEN;
1122   pptrs.vlan6.tlh_ptr = pptrs.vlan6.packet_ptr + ETHER_HDRLEN + IEEE8021Q_TAGLEN + sizeof(struct ip6_hdr);
1123   Assign16(((struct ip6_hdr *)pptrs.vlan6.iph_ptr)->ip6_plen, htons(100));
1124   ((struct ip6_hdr *)pptrs.vlan6.iph_ptr)->ip6_hlim = 64;
1125   // pptrs.vlan6.pkthdr->caplen = 64; /* eth_header + vlan + ip6_hdr + pm_tlhdr */
1126   pptrs.vlan6.pkthdr->caplen = 81;
1127   pptrs.vlan6.pkthdr->len = 100; /* fake len */
1128   pptrs.vlan6.l3_proto = ETHERTYPE_IPV6;
1129 
1130   memset(dummy_packet_mpls6, 0, sizeof(dummy_packet_mpls6));
1131   pptrs.mpls6.f_agent = (u_char *) &client;
1132   pptrs.mpls6.packet_ptr = dummy_packet_mpls6;
1133   pptrs.mpls6.pkthdr = &dummy_pkthdr_mpls6;
1134   Assign16(((struct eth_header *)pptrs.mpls6.packet_ptr)->ether_type, htons(ETHERTYPE_MPLS));
1135   pptrs.mpls6.mac_ptr = (u_char *)((struct eth_header *)pptrs.mpls6.packet_ptr)->ether_dhost;
1136   pptrs.mpls6.mpls_ptr = pptrs.mpls6.packet_ptr + ETHER_HDRLEN;
1137   // pptrs.mpls6.pkthdr->caplen = 100; /* eth_header + upto 10 MPLS labels + ip6_hdr + pm_tlhdr */
1138   pptrs.mpls6.pkthdr->caplen = 117;
1139   pptrs.mpls6.pkthdr->len = 128; /* fake len */
1140   pptrs.mpls6.l3_proto = ETHERTYPE_IPV6;
1141 
1142   memset(dummy_packet_vlan_mpls6, 0, sizeof(dummy_packet_vlan_mpls6));
1143   pptrs.vlanmpls6.f_agent = (u_char *) &client;
1144   pptrs.vlanmpls6.packet_ptr = dummy_packet_vlan_mpls6;
1145   pptrs.vlanmpls6.pkthdr = &dummy_pkthdr_vlan_mpls6;
1146   Assign16(((struct eth_header *)pptrs.vlanmpls6.packet_ptr)->ether_type, htons(ETHERTYPE_8021Q));
1147   pptrs.vlanmpls6.mac_ptr = (u_char *)((struct eth_header *)pptrs.vlanmpls6.packet_ptr)->ether_dhost;
1148   pptrs.vlanmpls6.vlan_ptr = pptrs.vlanmpls6.packet_ptr + ETHER_HDRLEN;
1149   Assign8(*(pptrs.vlanmpls6.vlan_ptr+2), 0x88);
1150   Assign8(*(pptrs.vlanmpls6.vlan_ptr+3), 0x47);
1151   pptrs.vlanmpls6.mpls_ptr = pptrs.vlanmpls6.packet_ptr + ETHER_HDRLEN + IEEE8021Q_TAGLEN;
1152   // pptrs.vlanmpls6.pkthdr->caplen = 104; /* eth_header + vlan + upto 10 MPLS labels + ip6_hdr + pm_tlhdr */
1153   pptrs.vlanmpls6.pkthdr->caplen = 121;
1154   pptrs.vlanmpls6.pkthdr->len = 128; /* fake len */
1155   pptrs.vlanmpls6.l3_proto = ETHERTYPE_IPV6;
1156 
1157   if (config.pcap_savefile) {
1158     Log(LOG_INFO, "INFO ( %s/core ): reading NetFlow/IPFIX data from: %s\n", config.name, config.pcap_savefile);
1159     allowed = TRUE;
1160 
1161     if (!config.pcap_sf_delay) sleep(2);
1162     else sleep(config.pcap_sf_delay);
1163   }
1164 #ifdef WITH_KAFKA
1165   else if (config.nfacctd_kafka_broker_host) {
1166     Log(LOG_INFO, "INFO ( %s/core ): reading NetFlow/IPFIX data from Kafka %s:%s\n", config.name,
1167         p_kafka_get_broker(&nfacctd_kafka_host), p_kafka_get_topic(&nfacctd_kafka_host));
1168     allowed = TRUE;
1169   }
1170 #endif
1171 #ifdef WITH_ZMQ
1172   else if (config.nfacctd_zmq_address) {
1173     Log(LOG_INFO, "INFO ( %s/core ): reading NetFlow/IPFIX data from ZeroMQ %s\n", config.name,
1174         p_zmq_get_address(&nfacctd_zmq_host));
1175     allowed = TRUE;
1176   }
1177 #endif
1178   else {
1179     char srv_string[INET6_ADDRSTRLEN];
1180     struct host_addr srv_addr;
1181     u_int16_t srv_port;
1182 
1183     sa_to_addr((struct sockaddr *)&server, &srv_addr, &srv_port);
1184     addr_to_str(srv_string, &srv_addr);
1185     Log(LOG_INFO, "INFO ( %s/core ): waiting for NetFlow/IPFIX data on %s:%u\n", config.name, srv_string, srv_port);
1186     allowed = TRUE;
1187 
1188     if (config.nfacctd_templates_port) {
1189       sa_to_addr((struct sockaddr *)&server_templates, &srv_addr, &srv_port);
1190       addr_to_str(srv_string, &srv_addr);
1191       Log(LOG_INFO, "INFO ( %s/core ): waiting for NetFlow/IPFIX templates on %s:%u\n", config.name, srv_string, srv_port);
1192     }
1193   }
1194 
1195 #ifdef WITH_REDIS
1196   if (config.redis_host) {
1197     char log_id[SHORTBUFLEN];
1198 
1199     snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
1200     p_redis_init(&redis_host, log_id, p_redis_thread_produce_common_core_handler);
1201   }
1202 #endif
1203 
1204   /* fixing NetFlow v9/IPFIX template func pointers */
1205   get_ext_db_ie_by_type = &ext_db_get_ie;
1206 
1207   sigemptyset(&signal_set);
1208   sigaddset(&signal_set, SIGCHLD);
1209   sigaddset(&signal_set, SIGHUP);
1210   sigaddset(&signal_set, SIGUSR1);
1211   sigaddset(&signal_set, SIGUSR2);
1212   sigaddset(&signal_set, SIGTERM);
1213   if (config.daemon) {
1214     sigaddset(&signal_set, SIGINT);
1215   }
1216 
1217   /* Main loop */
1218   for (;;) {
1219     sigprocmask(SIG_BLOCK, &signal_set, NULL);
1220 
1221     if (config.pcap_savefile) {
1222       ret = recvfrom_savefile(&device, (void **) &netflow_packet, (struct sockaddr *) &client, NULL, &pm_pcap_savefile_round, &recv_pptrs);
1223     }
1224 #ifdef WITH_KAFKA
1225     else if (config.nfacctd_kafka_broker_host) {
1226       int kafka_reconnect = FALSE;
1227       void *kafka_msg = NULL;
1228 
1229       ret = p_kafka_consume_poller(&nfacctd_kafka_host, &kafka_msg, 1000);
1230 
1231       switch (ret) {
1232       case TRUE: /* got data */
1233         ret = p_kafka_consume_data(&nfacctd_kafka_host, kafka_msg, netflow_packet, NETFLOW_MSG_SIZE);
1234 	if (ret < 0) kafka_reconnect = TRUE;
1235 	break;
1236       case FALSE: /* timeout */
1237 	continue;
1238 	break;
1239       case ERR: /* error */
1240       default:
1241 	kafka_reconnect = TRUE;
1242 	break;
1243       }
1244 
1245       if (kafka_reconnect) {
1246 	/* Close */
1247         p_kafka_manage_consumer(&nfacctd_kafka_host, FALSE);
1248 
1249 	/* Re-open */
1250 	NF_init_kafka_host(&nfacctd_kafka_host);
1251 
1252 	continue;
1253       }
1254 
1255       ret = recvfrom_rawip(netflow_packet, ret, (struct sockaddr *) &client, &recv_pptrs);
1256     }
1257 #endif
1258 #ifdef WITH_ZMQ
1259     else if (config.nfacctd_zmq_address) {
1260       ret = p_zmq_recv_poll(&nfacctd_zmq_host.sock, 1000);
1261 
1262       switch (ret) {
1263       case TRUE: /* got data */
1264         ret = p_zmq_recv_bin(&nfacctd_zmq_host.sock, netflow_packet, NETFLOW_MSG_SIZE);
1265 	if (ret < 0) continue; /* ZMQ_RECONNECT_IVL */
1266 	break;
1267       case FALSE: /* timeout */
1268 	continue;
1269 	break;
1270       case ERR: /* error */
1271       default:
1272 	continue; /* ZMQ_RECONNECT_IVL */
1273 	break;
1274       }
1275 
1276       ret = recvfrom_rawip(netflow_packet, ret, (struct sockaddr *) &client, &recv_pptrs);
1277     }
1278 #endif
1279     else {
1280       if (!config.nfacctd_templates_port) {
1281         ret = recvfrom(config.sock, (unsigned char *)netflow_packet, NETFLOW_MSG_SIZE, 0, (struct sockaddr *) &client, &clen);
1282       }
1283       else {
1284 	select_func_again:
1285 	select_fd = bkp_select_fd;
1286 	memcpy(&read_descs, &bkp_read_descs, sizeof(bkp_read_descs));
1287 
1288 	num_descs = select(select_fd, &read_descs, NULL, NULL, NULL);
1289 
1290 	select_read_again:
1291 	if (num_descs > 0) {
1292 	  if (FD_ISSET(config.sock, &read_descs)) {
1293             ret = recvfrom(config.sock, (unsigned char *)netflow_packet, NETFLOW_MSG_SIZE, 0, (struct sockaddr *) &client, &clen);
1294 	    FD_CLR(config.sock, &read_descs);
1295 	    num_descs--;
1296 
1297 	    templates_sock = FALSE;
1298 	    collector_port = config.nfacctd_port;
1299 	  }
1300 	  else if (FD_ISSET(config.nfacctd_templates_sock, &read_descs)) {
1301 	    ret = recvfrom(config.nfacctd_templates_sock, (unsigned char *)netflow_packet, NETFLOW_MSG_SIZE, 0, (struct sockaddr *) &client, &clen);
1302 	    FD_CLR(config.nfacctd_templates_sock, &read_descs);
1303 	    num_descs--;
1304 
1305 	    templates_sock = TRUE;
1306 	    collector_port = config.nfacctd_templates_port;
1307 	  }
1308 	}
1309 	else goto select_func_again;
1310       }
1311     }
1312 
1313     /* we have no data or not not enough data to decode the version */
1314     if (!netflow_packet || ret < 2) continue;
1315     pptrs.v4.f_len = ret;
1316 
1317     ipv4_mapped_to_ipv4(&client);
1318 
1319     /* check if Hosts Allow Table is loaded; if it is, we will enforce rules */
1320     if (allow.num) allowed = check_allow(&allow, (struct sockaddr *)&client);
1321     if (!allowed) continue;
1322 
1323     if (reload_map) {
1324       bta_map_caching = TRUE;
1325       sampling_map_caching = TRUE;
1326       req.key_value_table = NULL;
1327 
1328       if (config.nfacctd_allow_file) load_allow_file(config.nfacctd_allow_file, &allow);
1329 
1330       load_networks(config.networks_file, &nt, &nc);
1331 
1332       if (config.bgp_daemon && config.bgp_daemon_peer_as_src_map)
1333         load_id_file(MAP_BGP_PEER_AS_SRC, config.bgp_daemon_peer_as_src_map, &bpas_table, &req, &bpas_map_allocated);
1334       if (config.bgp_daemon && config.bgp_daemon_src_local_pref_map)
1335         load_id_file(MAP_BGP_SRC_LOCAL_PREF, config.bgp_daemon_src_local_pref_map, &blp_table, &req, &blp_map_allocated);
1336       if (config.bgp_daemon && config.bgp_daemon_src_med_map)
1337         load_id_file(MAP_BGP_SRC_MED, config.bgp_daemon_src_med_map, &bmed_table, &req, &bmed_map_allocated);
1338       if (config.bgp_daemon && config.bgp_daemon_to_xflow_agent_map)
1339         load_id_file(MAP_BGP_TO_XFLOW_AGENT, config.bgp_daemon_to_xflow_agent_map, &bta_table, &req, &bta_map_allocated);
1340       if (config.nfacctd_flow_to_rd_map)
1341         load_id_file(MAP_FLOW_TO_RD, config.nfacctd_flow_to_rd_map, &bitr_table, &req, &bitr_map_allocated);
1342       if (config.sampling_map) {
1343         load_id_file(MAP_SAMPLING, config.sampling_map, &sampling_table, &req, &sampling_map_allocated);
1344         set_sampling_table(&pptrs, (u_char *) &sampling_table);
1345       }
1346 
1347       reload_map = FALSE;
1348       gettimeofday(&reload_map_tstamp, NULL);
1349     }
1350 
1351     if (reload_log) {
1352       reload_logs();
1353       reload_log = FALSE;
1354     }
1355 
1356     if (print_stats) {
1357       time_t now = time(NULL);
1358 
1359       print_status_table(now, XFLOW_STATUS_TABLE_SZ);
1360       print_stats = FALSE;
1361     }
1362 
1363     if (data_plugins) {
1364       int has_templates = 0;
1365       u_int16_t nfv;
1366 
1367       /* We will change byte ordering in order to avoid a bunch of ntohs() calls */
1368       nfv = ((struct struct_header_v5 *)netflow_packet)->version = ntohs(((struct struct_header_v5 *)netflow_packet)->version);
1369 
1370       reset_tag_label_status(&pptrs);
1371       reset_shadow_status(&pptrs);
1372 
1373       switch (nfv) {
1374       case 5:
1375 	process_v5_packet(netflow_packet, ret, &pptrs.v4, &req, nfv, NULL);
1376 	break;
1377       /* NetFlow v9 + IPFIX */
1378       case 9:
1379       case 10:
1380 	process_v9_packet(netflow_packet, ret, &pptrs, &req, nfv, NULL, &has_templates);
1381 
1382 	/* Let's replicate templates only if not received on
1383 	   nfacctd_templates_port in order to prevent infinite
1384 	   looping */
1385 	if (config.nfacctd_templates_receiver && has_templates && !templates_sock) {
1386 	  int netflow_templates_len = 0;
1387 	  struct pkt_msg tee_msg;
1388 
1389 	  memset(&tee_msg, 0, sizeof(tee_msg));
1390 	  memcpy(&tee_msg.agent, &client, sizeof(client));
1391 
1392 	  /* fix version before sending */
1393 	  ((struct struct_header_v5 *)netflow_packet)->version = ntohs(((struct struct_header_v5 *)netflow_packet)->version);
1394 
1395 	  Tee_select_templates(netflow_packet, ret, nfv, netflow_templates_packet, &netflow_templates_len);
1396 
1397 	  tee_msg.payload = netflow_templates_packet;
1398 	  tee_msg.len = netflow_templates_len;
1399 
1400 	  if (tee_msg.len) {
1401 	    Tee_send(&tee_msg, (struct sockaddr *) &tee_templates.dest, tee_templates.fd, TRUE);
1402 	  }
1403 	}
1404 
1405 	break;
1406       default:
1407         if (!config.nfacctd_disable_checks) {
1408 	  notify_malf_packet(LOG_INFO, "INFO", "discarding unknown packet", (struct sockaddr *) pptrs.v4.f_agent, 0);
1409 	  xflow_tot_bad_datagrams++;
1410         }
1411 	break;
1412       }
1413     }
1414     else if (tee_plugins) {
1415       if (req.ptm_c.exec_ptm_dissect) {
1416 	reset_tag_label_status(&pptrs);
1417 
1418 	/* We will change byte ordering in order to avoid a bunch of ntohs() calls */
1419 	((struct struct_header_v5 *)netflow_packet)->version = ntohs(((struct struct_header_v5 *)netflow_packet)->version);
1420       }
1421 
1422       process_raw_packet(netflow_packet, ret, &pptrs, &req);
1423     }
1424 
1425     if (num_descs > 0) goto select_read_again;
1426 
1427     sigprocmask(SIG_UNBLOCK, &signal_set, NULL);
1428   }
1429 }
1430 
process_v5_packet(unsigned char * pkt,u_int16_t len,struct packet_ptrs * pptrs,struct plugin_requests * req,u_int16_t version,struct NF_dissect * tee_dissect)1431 void process_v5_packet(unsigned char *pkt, u_int16_t len, struct packet_ptrs *pptrs,
1432 		struct plugin_requests *req, u_int16_t version, struct NF_dissect *tee_dissect)
1433 {
1434   struct struct_header_v5 *hdr_v5 = (struct struct_header_v5 *)pkt;
1435   struct struct_export_v5 *exp_v5;
1436   unsigned short int count = ntohs(hdr_v5->count);
1437 
1438   if (len < NfHdrV5Sz) {
1439     notify_malf_packet(LOG_INFO, "INFO", "discarding short NetFlow v5 packet", (struct sockaddr *) pptrs->f_agent, 0);
1440     xflow_tot_bad_datagrams++;
1441     return;
1442   }
1443   pptrs->f_header = pkt;
1444   pkt += NfHdrV5Sz;
1445   exp_v5 = (struct struct_export_v5 *)pkt;
1446   pptrs->f_status = (u_char *) nfv5_check_status(pptrs);
1447   pptrs->f_status_g = NULL;
1448 
1449   reset_mac(pptrs);
1450   pptrs->flow_type = NF9_FTYPE_TRAFFIC;
1451 
1452   if (tee_dissect) {
1453     tee_dissect->hdrVersion = version;
1454     tee_dissect->hdrCount = 1;
1455     tee_dissect->hdrBasePtr = (u_char *) hdr_v5;
1456     tee_dissect->hdrEndPtr = (u_char *) (hdr_v5 + NfHdrV5Sz);
1457     tee_dissect->hdrLen = NfHdrV5Sz;
1458 
1459     /* no flowset in NetFlow v5 */
1460     tee_dissect->flowSetBasePtr = NULL;
1461     tee_dissect->flowSetEndPtr = NULL;
1462     tee_dissect->flowSetLen = 0;
1463   }
1464 
1465   if ((count <= V5_MAXFLOWS) && ((count*NfDataV5Sz)+NfHdrV5Sz == len)) {
1466     if (config.debug) {
1467       sa_to_addr((struct sockaddr *)pptrs->f_agent, &debug_a, &debug_agent_port);
1468       addr_to_str(debug_agent_addr, &debug_a);
1469 
1470       Log(LOG_DEBUG, "DEBUG ( %s/core ): Received NetFlow packet from [%s:%u] version [%u] seqno [%u]\n",
1471 	  config.name, debug_agent_addr, debug_agent_port, version, ntohl(hdr_v5->flow_sequence));
1472     }
1473 
1474     while (count) {
1475       reset_net_status(pptrs);
1476       pptrs->f_data = (unsigned char *) exp_v5;
1477 
1478       if (tee_dissect) {
1479 	tee_dissect->elemBasePtr = pptrs->f_data;
1480 	tee_dissect->elemEndPtr = (u_char *) (pptrs->f_data + NfDataV5Sz);
1481 	tee_dissect->elemLen = NfDataV5Sz;
1482 	pptrs->tee_dissect_bcast = FALSE;
1483 
1484 	exec_plugins(pptrs, req);
1485 
1486 	goto finalize_record;
1487       }
1488 
1489       if (req->bpf_filter) {
1490         Assign32(((struct pm_iphdr *)pptrs->iph_ptr)->ip_src.s_addr, exp_v5->srcaddr.s_addr);
1491         Assign32(((struct pm_iphdr *)pptrs->iph_ptr)->ip_dst.s_addr, exp_v5->dstaddr.s_addr);
1492         Assign8(((struct pm_iphdr *)pptrs->iph_ptr)->ip_p, exp_v5->prot);
1493         Assign8(((struct pm_iphdr *)pptrs->iph_ptr)->ip_tos, exp_v5->tos);
1494         Assign16(((struct pm_tlhdr *)pptrs->tlh_ptr)->src_port, exp_v5->srcport);
1495         Assign16(((struct pm_tlhdr *)pptrs->tlh_ptr)->dst_port, exp_v5->dstport);
1496 	Assign8(((struct pm_tcphdr *)pptrs->tlh_ptr)->th_flags, exp_v5->tcp_flags);
1497       }
1498 
1499       pptrs->lm_mask_src = exp_v5->src_mask;
1500       pptrs->lm_mask_dst = exp_v5->dst_mask;
1501       pptrs->lm_method_src = NF_NET_KEEP;
1502       pptrs->lm_method_dst = NF_NET_KEEP;
1503 
1504       /* Let's copy some relevant field */
1505       pptrs->l4_proto = exp_v5->prot;
1506 
1507       /* IP header's id field is unused; we will use it to transport our id */
1508       if (config.nfacctd_isis) isis_srcdst_lookup(pptrs);
1509       if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, pptrs, &pptrs->bta, &pptrs->bta2);
1510       if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, pptrs, &pptrs->bitr, NULL);
1511       if (config.bgp_daemon) bgp_srcdst_lookup(pptrs, FUNC_TYPE_BGP);
1512       if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, pptrs, &pptrs->bpas, NULL);
1513       if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, pptrs, &pptrs->blp, NULL);
1514       if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, pptrs, &pptrs->bmed, NULL);
1515       if (config.bmp_daemon) bmp_srcdst_lookup(pptrs);
1516       exec_plugins(pptrs, req);
1517 
1518       finalize_record:
1519       exp_v5++;
1520       count--;
1521     }
1522   }
1523   else {
1524     notify_malf_packet(LOG_INFO, "INFO", "discarding malformed NetFlow v5 packet", (struct sockaddr *) pptrs->f_agent, 0);
1525     xflow_tot_bad_datagrams++;
1526     return;
1527   }
1528 }
1529 
process_v9_packet(unsigned char * pkt,u_int16_t len,struct packet_ptrs_vector * pptrsv,struct plugin_requests * req,u_int16_t version,struct NF_dissect * tee_dissect,int * has_templates)1530 void process_v9_packet(unsigned char *pkt, u_int16_t len, struct packet_ptrs_vector *pptrsv,
1531 		struct plugin_requests *req, u_int16_t version, struct NF_dissect *tee_dissect,
1532 		int *has_templates)
1533 {
1534   struct struct_header_v9 *hdr_v9 = (struct struct_header_v9 *)pkt;
1535   struct struct_header_ipfix *hdr_v10 = (struct struct_header_ipfix *)pkt;
1536   struct template_hdr_v9 *template_hdr = NULL;
1537   struct options_template_hdr_v9 *opt_template_hdr = NULL;
1538   struct template_cache_entry *tpl = NULL;
1539   struct data_hdr_v9 *data_hdr = NULL;
1540   struct packet_ptrs *pptrs = &pptrsv->v4;
1541   u_int16_t fid, off = 0, flowoff = 0, flowsetlen = 0, flowsetNo = 0;
1542   u_int16_t flowsetCount = 0, direction = 0, FlowSeqInc = 0;
1543   u_int32_t HdrSz = 0, SourceId = 0, FlowSeq = 0;
1544   u_char *dummy_packet_ptr = NULL;
1545 
1546   if (version == 9) {
1547     HdrSz = NfHdrV9Sz;
1548     if (len >= HdrSz) {
1549       SourceId = ntohl(hdr_v9->source_id);
1550       FlowSeq = ntohl(hdr_v9->flow_sequence);
1551       flowsetNo = htons(hdr_v9->count);
1552       flowsetCount = 0;
1553     }
1554   }
1555   else if (version == 10) {
1556     HdrSz = IpFixHdrSz;
1557     if (len >= HdrSz) {
1558       SourceId = ntohl(hdr_v10->source_id);
1559       FlowSeq = ntohl(hdr_v10->flow_sequence);
1560     }
1561     flowsetNo = 0;
1562     flowsetCount = 0;
1563   }
1564 
1565   if (tee_dissect) {
1566     tee_dissect->hdrVersion = version;
1567     if (version == 9) tee_dissect->hdrCount = flowsetNo; /* imprecise .. */
1568     else if (version == 10) tee_dissect->hdrCount = 0;
1569     tee_dissect->hdrBasePtr = pkt;
1570     tee_dissect->hdrEndPtr = (u_char *) (pkt + HdrSz);
1571     tee_dissect->hdrLen = HdrSz;
1572   }
1573 
1574   if (config.debug) {
1575     sa_to_addr((struct sockaddr *)pptrs->f_agent, &debug_a, &debug_agent_port);
1576     addr_to_str(debug_agent_addr, &debug_a);
1577 
1578     Log(LOG_DEBUG, "DEBUG ( %s/core ): Received NetFlow/IPFIX packet from [%s:%u] version [%u] seqno [%u]\n",
1579 			config.name, debug_agent_addr, debug_agent_port, version, FlowSeq);
1580   }
1581 
1582   if (len < HdrSz) {
1583     notify_malf_packet(LOG_INFO, "INFO", "discarding short NetFlow v9/IPFIX packet", (struct sockaddr *) pptrsv->v4.f_agent, 0);
1584     xflow_tot_bad_datagrams++;
1585     return;
1586   }
1587   pptrs->f_header = pkt;
1588   pkt += HdrSz;
1589   off += HdrSz;
1590   pptrsv->v4.f_status = (u_char *) nfv9_check_status(pptrs, SourceId, 0, FlowSeq, TRUE);
1591   set_vector_f_status(pptrsv);
1592   pptrsv->v4.f_status_g = (u_char *) nfv9_check_status(pptrs, 0, NF9_OPT_SCOPE_SYSTEM, 0, FALSE);
1593   set_vector_f_status_g(pptrsv);
1594 
1595   process_flowset:
1596   if (off+NfDataHdrV9Sz >= len) {
1597     notify_malf_packet(LOG_INFO, "INFO", "unable to read next Flowset (incomplete NetFlow v9/IPFIX packet)",
1598 			(struct sockaddr *) pptrsv->v4.f_agent, FlowSeq);
1599     xflow_tot_bad_datagrams++;
1600     return;
1601   }
1602 
1603   data_hdr = (struct data_hdr_v9 *)pkt;
1604 
1605   if (data_hdr->flow_len == 0) {
1606     notify_malf_packet(LOG_INFO, "INFO", "unable to read next Flowset (NetFlow v9/IPFIX packet claiming flow_len 0!)",
1607 			(struct sockaddr *) pptrsv->v4.f_agent, FlowSeq);
1608     xflow_tot_bad_datagrams++;
1609     return;
1610   }
1611 
1612   fid = ntohs(data_hdr->flow_id);
1613   flowsetlen = ntohs(data_hdr->flow_len);
1614   if (flowsetlen < NfDataHdrV9Sz) {
1615     notify_malf_packet(LOG_INFO, "INFO", "unable to read next Flowset (NetFlow v9/IPFIX packet (flowsetlen < NfDataHdrV9Sz)",
1616                         (struct sockaddr *) pptrsv->v4.f_agent, FlowSeq);
1617     xflow_tot_bad_datagrams++;
1618     return;
1619   }
1620 
1621   if (tee_dissect) {
1622     tee_dissect->flowSetBasePtr = pkt;
1623     tee_dissect->flowSetEndPtr = (u_char *) (pkt + NfDataHdrV9Sz);
1624     tee_dissect->flowSetLen = NfDataHdrV9Sz; /* updated later */
1625   }
1626 
1627   if (fid == 0 || fid == 2) { /* template: 0 NetFlow v9, 2 IPFIX */
1628     unsigned char *tpl_ptr = pkt;
1629     u_int16_t pens = 0;
1630 
1631     if (has_templates) (*has_templates) = TRUE;
1632     flowoff = 0;
1633     tpl_ptr += NfDataHdrV9Sz;
1634     flowoff += NfDataHdrV9Sz;
1635 
1636     /* broadcast the whole flowset over */
1637     if (tee_dissect) {
1638       tee_dissect->elemBasePtr = NULL;
1639       tee_dissect->elemEndPtr = NULL;
1640       tee_dissect->elemLen = 0;
1641 
1642       tee_dissect->flowSetEndPtr = (u_char *) (tee_dissect->flowSetBasePtr + ntohs(data_hdr->flow_len));
1643       tee_dissect->flowSetLen = ntohs(data_hdr->flow_len);
1644       pptrs->tee_dissect_bcast = TRUE;
1645 
1646       exec_plugins(pptrs, req);
1647     }
1648 
1649     while (flowoff < flowsetlen) {
1650       u_int32_t tpl_len = 0;
1651 
1652       template_hdr = (struct template_hdr_v9 *) tpl_ptr;
1653       if (off+flowsetlen > len) {
1654         notify_malf_packet(LOG_INFO, "INFO", "unable to read next Template Flowset (incomplete NetFlow v9/IPFIX packet)",
1655 		        (struct sockaddr *) pptrsv->v4.f_agent, FlowSeq);
1656         xflow_tot_bad_datagrams++;
1657         return;
1658       }
1659 
1660       tpl = handle_template(template_hdr, pptrs, fid, SourceId, &pens, flowsetlen-flowoff, FlowSeq);
1661       if (!tpl) return;
1662 
1663       tpl_len = sizeof(struct template_hdr_v9)+(ntohs(template_hdr->num)*sizeof(struct template_field_v9))+(pens*sizeof(u_int32_t));
1664       tpl_ptr += tpl_len;
1665       flowoff += tpl_len;
1666     }
1667 
1668     pkt += flowsetlen;
1669     off += flowsetlen;
1670   }
1671   else if (fid == 1 || fid == 3) { /* options template: 1 NetFlow v9, 3 IPFIX */
1672     unsigned char *tpl_ptr = pkt;
1673     u_int16_t pens = 0;
1674 
1675     if (has_templates) (*has_templates) = TRUE;
1676     flowoff = 0;
1677     tpl_ptr += NfDataHdrV9Sz;
1678     flowoff += NfDataHdrV9Sz;
1679 
1680     /* broadcast the whole flowset over */
1681     if (tee_dissect) {
1682       tee_dissect->elemBasePtr = NULL;
1683       tee_dissect->elemEndPtr = NULL;
1684       tee_dissect->elemLen = 0;
1685 
1686       tee_dissect->flowSetEndPtr = (u_char *) (tee_dissect->flowSetBasePtr + ntohs(data_hdr->flow_len));
1687       tee_dissect->flowSetLen = ntohs(data_hdr->flow_len);
1688 
1689       exec_plugins(pptrs, req);
1690     }
1691 
1692     while (flowoff < flowsetlen) {
1693       u_int32_t tpl_len = 0;
1694 
1695       opt_template_hdr = (struct options_template_hdr_v9 *) tpl_ptr;
1696       if (off+flowsetlen > len) {
1697         notify_malf_packet(LOG_INFO, "INFO", "unable to read next Options Template Flowset (incomplete NetFlow v9/IPFIX packet)",
1698                         (struct sockaddr *) pptrsv->v4.f_agent, FlowSeq);
1699         xflow_tot_bad_datagrams++;
1700         return;
1701       }
1702 
1703       tpl = handle_template((struct template_hdr_v9 *)opt_template_hdr, pptrs, fid, SourceId, &pens, flowsetlen-flowoff, FlowSeq);
1704       if (!tpl) return;
1705 
1706       /* Increment is not precise for NetFlow v9 but will work */
1707       tpl_len = sizeof(struct options_template_hdr_v9) +
1708 		(((ntohs(opt_template_hdr->scope_len) + ntohs(opt_template_hdr->option_len)) * sizeof(struct template_field_v9)) +
1709 		(pens * sizeof(u_int32_t)));
1710 
1711       tpl_ptr += tpl_len;
1712       flowoff += tpl_len;
1713     }
1714 
1715     pkt += flowsetlen;
1716     off += flowsetlen;
1717   }
1718   else if (fid >= 256) { /* data */
1719     if (off+flowsetlen > len) {
1720       notify_malf_packet(LOG_INFO, "INFO", "unable to read next Data Flowset (incomplete NetFlow v9/IPFIX packet)",
1721 		      (struct sockaddr *) pptrsv->v4.f_agent, FlowSeq);
1722       xflow_tot_bad_datagrams++;
1723       return;
1724     }
1725 
1726     flowoff = 0;
1727     pkt += NfDataHdrV9Sz;
1728     flowoff += NfDataHdrV9Sz;
1729 
1730     tpl = find_template(data_hdr->flow_id, (struct sockaddr *) pptrs->f_agent, fid, SourceId);
1731     if (!tpl) {
1732       sa_to_addr((struct sockaddr *)pptrs->f_agent, &debug_a, &debug_agent_port);
1733       addr_to_str(debug_agent_addr, &debug_a);
1734 
1735       Log(LOG_DEBUG, "DEBUG ( %s/core ): Discarded NetFlow v9/IPFIX packet (R: unknown template %u [%s:%u])\n",
1736 		config.name, fid, debug_agent_addr, SourceId);
1737       pkt += (flowsetlen-NfDataHdrV9Sz);
1738       off += flowsetlen;
1739     }
1740     else if (tpl->template_type == 1) { /* Options coming */
1741       struct xflow_status_entry *entry;
1742       struct xflow_status_entry_sampling *sentry, *ssaved;
1743       struct xflow_status_entry_class *centry, *csaved;
1744 
1745       /* broadcast the whole flowset over */
1746       if (tee_dissect) {
1747         tee_dissect->elemBasePtr = NULL;
1748         tee_dissect->elemEndPtr = NULL;
1749         tee_dissect->elemLen = 0;
1750 
1751 	tee_dissect->flowSetEndPtr = (u_char *) (tee_dissect->flowSetBasePtr + ntohs(data_hdr->flow_len));
1752 	tee_dissect->flowSetLen = ntohs(data_hdr->flow_len);
1753         pptrs->tee_dissect_bcast = TRUE;
1754 
1755 	exec_plugins(pptrs, req);
1756 	/* goto finalize_opt_record later */
1757       }
1758 
1759       while (flowoff+tpl->len <= flowsetlen) {
1760 	entry = (struct xflow_status_entry *) pptrs->f_status;
1761 	sentry = NULL, ssaved = NULL;
1762 	centry = NULL, csaved = NULL;
1763 
1764 	if (tee_dissect) goto finalize_opt_record;
1765 
1766 	/* Is this option about sampling? */
1767 	if (tpl->tpl[NF9_FLOW_SAMPLER_ID].len || tpl->tpl[NF9_SAMPLING_INTERVAL].len == 4 || tpl->tpl[NF9_SAMPLING_PKT_INTERVAL].len == 4) {
1768 	  u_int8_t t8 = 0;
1769 	  u_int16_t t16 = 0;
1770 	  u_int32_t sampler_id = 0, t32 = 0, t32_2 = 0;
1771 	  u_int64_t t64 = 0;
1772 
1773 	  /* Handling the global option scoping case */
1774 	  if (!config.nfacctd_disable_opt_scope_check) {
1775 	    if (tpl->tpl[NF9_OPT_SCOPE_SYSTEM].len) entry = (struct xflow_status_entry *) pptrs->f_status_g;
1776 	    else {
1777 	      if (version == 10) {
1778 		if (tpl->tpl[IPFIX_SCOPE_TEMPLATE_ID].len) {
1779 		  entry = (struct xflow_status_entry *) pptrs->f_status;
1780 		  memcpy(&t16, pkt+tpl->tpl[IPFIX_SCOPE_TEMPLATE_ID].off, 2);
1781 		  sampler_id = ntohs(t16);
1782 		}
1783 	      }
1784 	    }
1785 	  }
1786 	  else entry = (struct xflow_status_entry *) pptrs->f_status_g;
1787 
1788 	  if (tpl->tpl[NF9_FLOW_SAMPLER_ID].len == 1) {
1789 	    memcpy(&t8, pkt+tpl->tpl[NF9_FLOW_SAMPLER_ID].off, 1);
1790 	    sampler_id = t8;
1791 	  }
1792 	  else if (tpl->tpl[NF9_FLOW_SAMPLER_ID].len == 2) {
1793 	    memcpy(&t16, pkt+tpl->tpl[NF9_FLOW_SAMPLER_ID].off, 2);
1794 	    sampler_id = ntohs(t16);
1795 	  }
1796           else if (tpl->tpl[NF9_FLOW_SAMPLER_ID].len == 4) {
1797             memcpy(&t32, pkt+tpl->tpl[NF9_FLOW_SAMPLER_ID].off, 4);
1798             sampler_id = ntohl(t32);
1799           }
1800           else if (tpl->tpl[NF9_SELECTOR_ID].len == 8) {
1801             memcpy(&t64, pkt+tpl->tpl[NF9_SELECTOR_ID].off, 8);
1802             sampler_id = pm_ntohll(t64); /* XXX: sampler_id to be moved to 64 bit */
1803           }
1804 
1805 	  if (entry) sentry = search_smp_id_status_table(entry->sampling, sampler_id, FALSE);
1806 	  if (!sentry) sentry = create_smp_entry_status_table(entry);
1807 	  else ssaved = sentry->next;
1808 
1809 	  if (sentry) {
1810 	    memset(sentry, 0, sizeof(struct xflow_status_entry_sampling));
1811 	    if (tpl->tpl[NF9_SAMPLING_INTERVAL].len == 1) {
1812 	      memcpy(&t8, pkt+tpl->tpl[NF9_SAMPLING_INTERVAL].off, 1);
1813 	      sentry->sample_pool = t8;
1814 	    }
1815 	    if (tpl->tpl[NF9_SAMPLING_INTERVAL].len == 2) {
1816 	      memcpy(&t16, pkt+tpl->tpl[NF9_SAMPLING_INTERVAL].off, 2);
1817 	      sentry->sample_pool = ntohs(t16);
1818 	    }
1819 	    if (tpl->tpl[NF9_SAMPLING_INTERVAL].len == 4) {
1820 	      memcpy(&t32, pkt+tpl->tpl[NF9_SAMPLING_INTERVAL].off, 4);
1821 	      sentry->sample_pool = ntohl(t32);
1822 	    }
1823 	    if (tpl->tpl[NF9_FLOW_SAMPLER_INTERVAL].len == 1) {
1824 	      memcpy(&t8, pkt+tpl->tpl[NF9_FLOW_SAMPLER_INTERVAL].off, 1);
1825 	      sentry->sample_pool = t8;
1826 	    }
1827 	    else if (tpl->tpl[NF9_FLOW_SAMPLER_INTERVAL].len == 2) {
1828 	      memcpy(&t16, pkt+tpl->tpl[NF9_FLOW_SAMPLER_INTERVAL].off, 2);
1829 	      sentry->sample_pool = ntohs(t16);
1830 	    }
1831 	    else if (tpl->tpl[NF9_FLOW_SAMPLER_INTERVAL].len == 4) {
1832 	      memcpy(&t32, pkt+tpl->tpl[NF9_FLOW_SAMPLER_INTERVAL].off, 4);
1833 	      sentry->sample_pool = ntohl(t32);
1834 	    }
1835             else if (tpl->tpl[NF9_SAMPLING_PKT_INTERVAL].len == 4 && tpl->tpl[NF9_SAMPLING_PKT_SPACE].len == 4) {
1836 	      u_int32_t pkt_interval = 0, pkt_space = 0;
1837 
1838               memcpy(&t32, pkt+tpl->tpl[NF9_SAMPLING_PKT_INTERVAL].off, 4);
1839               memcpy(&t32_2, pkt+tpl->tpl[NF9_SAMPLING_PKT_SPACE].off, 4);
1840 	      pkt_interval = ntohl(t32);
1841 	      pkt_space = ntohl(t32_2);
1842 
1843               if (pkt_interval) sentry->sample_pool = ((pkt_interval + pkt_space) / pkt_interval);
1844             }
1845 
1846 	    sentry->sampler_id = sampler_id;
1847 	    if (ssaved) sentry->next = ssaved;
1848 	  }
1849 	}
1850 
1851 	if (tpl->tpl[NF9_APPLICATION_ID].len == 4 && tpl->tpl[NF9_APPLICATION_NAME].len > 0) {
1852 	  struct pkt_classifier css;
1853 	  pm_class_t class_id = 0, class_int_id = 0;
1854 
1855 	  /* Handling the global option scoping case */
1856 	  if (!config.nfacctd_disable_opt_scope_check) {
1857 	    if (tpl->tpl[NF9_OPT_SCOPE_SYSTEM].len) entry = (struct xflow_status_entry *) pptrs->f_status_g;
1858 	  }
1859 	  else entry = (struct xflow_status_entry *) pptrs->f_status_g;
1860 
1861 	  memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
1862 
1863           if (entry) centry = search_class_id_status_table(entry->class, class_id);
1864           if (!centry) {
1865 	    centry = create_class_entry_status_table(entry);
1866 	    class_int_id = pmct_find_first_free();
1867 	  }
1868           else {
1869 	    csaved = centry->next;
1870 	    class_int_id = centry->class_int_id;
1871 	    pmct_unregister(centry->class_int_id);
1872 	  }
1873 
1874           if (centry) {
1875             memset(centry, 0, sizeof(struct xflow_status_entry_class));
1876 	    memset(&css, 0, sizeof(struct pkt_classifier));
1877 	    memcpy(&centry->class_name, pkt+tpl->tpl[NF9_APPLICATION_NAME].off, MIN((MAX_PROTOCOL_LEN-1), tpl->tpl[NF9_APPLICATION_NAME].len));
1878             centry->class_id = class_id;
1879 	    centry->class_int_id = class_int_id;
1880             if (csaved) centry->next = csaved;
1881 
1882 	    css.id = centry->class_int_id;
1883 	    strlcpy(css.protocol, centry->class_name, MAX_PROTOCOL_LEN);
1884 	    pmct_register(&css);
1885           }
1886 	}
1887 
1888 	if (tpl->tpl[NF9_EXPORTER_IPV4_ADDRESS].len == 4 || tpl->tpl[NF9_EXPORTER_IPV6_ADDRESS].len == 16) {
1889           /* Handling the global option scoping case */
1890           if (!config.nfacctd_disable_opt_scope_check) {
1891             if (tpl->tpl[NF9_OPT_SCOPE_SYSTEM].len) entry = (struct xflow_status_entry *) pptrs->f_status_g;
1892           }
1893           else entry = (struct xflow_status_entry *) pptrs->f_status_g;
1894 
1895 	  if (entry) {
1896 	    if (tpl->tpl[NF9_EXPORTER_IPV4_ADDRESS].len) {
1897 	      raw_to_addr(&entry->exp_addr, pkt+tpl->tpl[NF9_EXPORTER_IPV4_ADDRESS].off, AF_INET);
1898 	      raw_to_sa(&entry->exp_sa, pkt+tpl->tpl[NF9_EXPORTER_IPV4_ADDRESS].off, 0, AF_INET);
1899 	    }
1900 	    else if (tpl->tpl[NF9_EXPORTER_IPV6_ADDRESS].len) {
1901 	      raw_to_addr(&entry->exp_addr, pkt+tpl->tpl[NF9_EXPORTER_IPV6_ADDRESS].off, AF_INET6);
1902 	      raw_to_sa(&entry->exp_sa, pkt+tpl->tpl[NF9_EXPORTER_IPV6_ADDRESS].off, 0, AF_INET6);
1903 	    }
1904 	  }
1905 	}
1906 
1907 	if (config.nfacctd_account_options) {
1908 	  pptrs->f_data = pkt;
1909 	  pptrs->f_tpl = (u_char *) tpl;
1910 	  reset_net_status_v(pptrsv);
1911 	  pptrs->flow_type = NF_evaluate_flow_type(tpl, pptrs);
1912 
1913 	  exec_plugins(pptrs, req);
1914 	}
1915 
1916 	finalize_opt_record:
1917         pkt += tpl->len;
1918         flowoff += tpl->len;
1919 
1920         FlowSeqInc++;
1921       }
1922 
1923       /* last pre-flight check for the subsequent subtraction */
1924       if (flowoff > flowsetlen) {
1925         notify_malf_packet(LOG_INFO, "INFO", "aborting malformed Options Data element (incomplete NetFlow v9/IPFIX packet)",
1926                       (struct sockaddr *) pptrsv->v4.f_agent, FlowSeq);
1927         xflow_tot_bad_datagrams++;
1928         return;
1929       }
1930 
1931       pkt += (flowsetlen-flowoff); /* handling padding */
1932       off += flowsetlen;
1933     }
1934     else {
1935       while (flowoff+tpl->len <= flowsetlen) {
1936         /* Let's bake offsets and lengths if we have variable-length fields */
1937         if (tpl->vlen) {
1938 	  int ret;
1939 
1940 	  ret = resolve_vlen_template(pkt, (flowsetlen - flowoff), tpl);
1941 	  if (ret == ERR) break;
1942 	}
1943 
1944         pptrs->f_data = pkt;
1945 	pptrs->f_tpl = (u_char *) tpl;
1946 	reset_net_status_v(pptrsv);
1947 
1948 	if (tee_dissect) {
1949 	  tee_dissect->elemBasePtr = pkt;
1950 	  tee_dissect->elemEndPtr = (u_char *) (pkt + tpl->len);
1951 	  tee_dissect->elemLen = tpl->len;
1952           pptrs->tee_dissect_bcast = FALSE;
1953 
1954 	  exec_plugins(pptrs, req);
1955 
1956 	  goto finalize_record;
1957 	}
1958 
1959 	pptrs->flow_type = NF_evaluate_flow_type(tpl, pptrs);
1960 	direction = NF_evaluate_direction(tpl, pptrs);
1961 
1962 	/* we need to understand the IP protocol version in order to build the fake packet */
1963 	switch (pptrs->flow_type) {
1964 	case NF9_FTYPE_IPV4:
1965 	  if (req->bpf_filter) {
1966 	    reset_mac(pptrs);
1967 	    reset_ip4(pptrs);
1968 
1969 	    if (direction == DIRECTION_IN) {
1970               memcpy(pptrs->mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
1971               memcpy(pptrs->mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
1972 	    }
1973 	    else if (direction == DIRECTION_OUT) {
1974               memcpy(pptrs->mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
1975               memcpy(pptrs->mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
1976 	    }
1977 	    ((struct pm_iphdr *)pptrs->iph_ptr)->ip_vhl = 0x45;
1978             memcpy(&((struct pm_iphdr *)pptrs->iph_ptr)->ip_src, pkt+tpl->tpl[NF9_IPV4_SRC_ADDR].off, tpl->tpl[NF9_IPV4_SRC_ADDR].len);
1979             memcpy(&((struct pm_iphdr *)pptrs->iph_ptr)->ip_dst, pkt+tpl->tpl[NF9_IPV4_DST_ADDR].off, tpl->tpl[NF9_IPV4_DST_ADDR].len);
1980             memcpy(&((struct pm_iphdr *)pptrs->iph_ptr)->ip_p, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
1981             memcpy(&((struct pm_iphdr *)pptrs->iph_ptr)->ip_tos, pkt+tpl->tpl[NF9_SRC_TOS].off, tpl->tpl[NF9_SRC_TOS].len);
1982             memcpy(&((struct pm_tlhdr *)pptrs->tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
1983             memcpy(&((struct pm_tlhdr *)pptrs->tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
1984             memcpy(&((struct pm_tcphdr *)pptrs->tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
1985 	  }
1986 
1987 	  memcpy(&pptrs->lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
1988 	  memcpy(&pptrs->lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
1989 	  pptrs->lm_method_src = NF_NET_KEEP;
1990 	  pptrs->lm_method_dst = NF_NET_KEEP;
1991 
1992 	  /* Let's copy some relevant field */
1993 	  pptrs->l4_proto = 0;
1994 	  memcpy(&pptrs->l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
1995 
1996 	  if (tpl->tpl[NF9_APPLICATION_ID].len == 4) {
1997 	    struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
1998 	    struct xflow_status_entry *gentry = (struct xflow_status_entry *) pptrs->f_status_g;
1999 	    pm_class_t class_id = 0;
2000 
2001 	    memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
2002 	    if (entry) pptrs->class = NF_evaluate_classifiers(entry->class, &class_id, gentry);
2003 	  }
2004 	  if (config.nfacctd_isis) isis_srcdst_lookup(pptrs);
2005 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, pptrs, &pptrs->bta, &pptrs->bta2);
2006 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, pptrs, &pptrs->bitr, NULL);
2007 	  if (config.bgp_daemon) bgp_srcdst_lookup(pptrs, FUNC_TYPE_BGP);
2008 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, pptrs, &pptrs->bpas, NULL);
2009 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, pptrs, &pptrs->blp, NULL);
2010 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, pptrs, &pptrs->bmed, NULL);
2011           if (config.bmp_daemon) bmp_srcdst_lookup(pptrs);
2012           exec_plugins(pptrs, req);
2013 	  break;
2014 	case NF9_FTYPE_IPV6:
2015 	  pptrsv->v6.f_header = pptrs->f_header;
2016 	  pptrsv->v6.f_data = pptrs->f_data;
2017 	  pptrsv->v6.f_tpl = pptrs->f_tpl;
2018 	  pptrsv->v6.flow_type = pptrs->flow_type;
2019 
2020 	  if (req->bpf_filter) {
2021 	    reset_mac(&pptrsv->v6);
2022 	    reset_ip6(&pptrsv->v6);
2023 
2024 	    if (direction == DIRECTION_IN) {
2025 	      memcpy(pptrsv->v6.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
2026 	      memcpy(pptrsv->v6.mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
2027 	    }
2028 	    else if (direction == DIRECTION_OUT) {
2029 	      memcpy(pptrsv->v6.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
2030 	      memcpy(pptrsv->v6.mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
2031 	    }
2032 	    ((struct ip6_hdr *)pptrsv->v6.iph_ptr)->ip6_ctlun.ip6_un2_vfc = 0x60;
2033             memcpy(&((struct ip6_hdr *)pptrsv->v6.iph_ptr)->ip6_src, pkt+tpl->tpl[NF9_IPV6_SRC_ADDR].off, tpl->tpl[NF9_IPV6_SRC_ADDR].len);
2034             memcpy(&((struct ip6_hdr *)pptrsv->v6.iph_ptr)->ip6_dst, pkt+tpl->tpl[NF9_IPV6_DST_ADDR].off, tpl->tpl[NF9_IPV6_DST_ADDR].len);
2035             memcpy(&((struct ip6_hdr *)pptrsv->v6.iph_ptr)->ip6_nxt, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2036 	    /* XXX: class ID ? */
2037             memcpy(&((struct pm_tlhdr *)pptrsv->v6.tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
2038             memcpy(&((struct pm_tlhdr *)pptrsv->v6.tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
2039             memcpy(&((struct pm_tcphdr *)pptrsv->v6.tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
2040 	  }
2041 
2042           memcpy(&pptrsv->v6.lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
2043           memcpy(&pptrsv->v6.lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
2044           pptrsv->v6.lm_method_src = NF_NET_KEEP;
2045           pptrsv->v6.lm_method_dst = NF_NET_KEEP;
2046 
2047 	  /* Let's copy some relevant field */
2048 	  pptrsv->v6.l4_proto = 0;
2049 	  memcpy(&pptrsv->v6.l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2050 
2051 	  if (tpl->tpl[NF9_APPLICATION_ID].len == 4) {
2052 	    struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
2053 	    struct xflow_status_entry *gentry = (struct xflow_status_entry *) pptrs->f_status_g;
2054 	    pm_class_t class_id = 0;
2055 
2056 	    memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
2057 	    if (entry) pptrsv->v6.class = NF_evaluate_classifiers(entry->class, &class_id, gentry);
2058 	  }
2059 	  if (config.nfacctd_isis) isis_srcdst_lookup(&pptrsv->v6);
2060 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, &pptrsv->v6, &pptrsv->v6.bta, &pptrsv->v6.bta2);
2061 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, &pptrsv->v6, &pptrsv->v6.bitr, NULL);
2062 	  if (config.bgp_daemon) bgp_srcdst_lookup(&pptrsv->v6, FUNC_TYPE_BGP);
2063 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, &pptrsv->v6, &pptrsv->v6.bpas, NULL);
2064 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, &pptrsv->v6, &pptrsv->v6.blp, NULL);
2065 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, &pptrsv->v6, &pptrsv->v6.bmed, NULL);
2066           if (config.bmp_daemon) bmp_srcdst_lookup(&pptrsv->v6);
2067           exec_plugins(&pptrsv->v6, req);
2068 	  break;
2069 	case NF9_FTYPE_VLAN_IPV4:
2070 	  pptrsv->vlan4.f_header = pptrs->f_header;
2071 	  pptrsv->vlan4.f_data = pptrs->f_data;
2072 	  pptrsv->vlan4.f_tpl = pptrs->f_tpl;
2073 	  pptrsv->vlan4.flow_type = pptrs->flow_type;
2074 
2075 	  if (req->bpf_filter) {
2076 	    reset_mac_vlan(&pptrsv->vlan4);
2077 	    reset_ip4(&pptrsv->vlan4);
2078 
2079 	    if (direction == DIRECTION_IN) {
2080 	      memcpy(pptrsv->vlan4.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
2081 	      memcpy(pptrsv->vlan4.mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
2082 	      memcpy(pptrsv->vlan4.vlan_ptr, pkt+tpl->tpl[NF9_IN_VLAN].off, tpl->tpl[NF9_IN_VLAN].len);
2083 	    }
2084 	    else if (direction == DIRECTION_OUT) {
2085 	      memcpy(pptrsv->vlan4.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
2086 	      memcpy(pptrsv->vlan4.mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
2087 	      memcpy(pptrsv->vlan4.vlan_ptr, pkt+tpl->tpl[NF9_OUT_VLAN].off, tpl->tpl[NF9_OUT_VLAN].len);
2088 	    }
2089 	    ((struct pm_iphdr *)pptrsv->vlan4.iph_ptr)->ip_vhl = 0x45;
2090 	    memcpy(&((struct pm_iphdr *)pptrsv->vlan4.iph_ptr)->ip_src, pkt+tpl->tpl[NF9_IPV4_SRC_ADDR].off, tpl->tpl[NF9_IPV4_SRC_ADDR].len);
2091 	    memcpy(&((struct pm_iphdr *)pptrsv->vlan4.iph_ptr)->ip_dst, pkt+tpl->tpl[NF9_IPV4_DST_ADDR].off, tpl->tpl[NF9_IPV4_DST_ADDR].len);
2092 	    memcpy(&((struct pm_iphdr *)pptrsv->vlan4.iph_ptr)->ip_p, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2093 	    memcpy(&((struct pm_iphdr *)pptrsv->vlan4.iph_ptr)->ip_tos, pkt+tpl->tpl[NF9_SRC_TOS].off, tpl->tpl[NF9_SRC_TOS].len);
2094 	    memcpy(&((struct pm_tlhdr *)pptrsv->vlan4.tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
2095 	    memcpy(&((struct pm_tlhdr *)pptrsv->vlan4.tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
2096             memcpy(&((struct pm_tcphdr *)pptrsv->vlan4.tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
2097 	  }
2098 
2099           memcpy(&pptrsv->vlan4.lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
2100           memcpy(&pptrsv->vlan4.lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
2101           pptrsv->vlan4.lm_method_src = NF_NET_KEEP;
2102           pptrsv->vlan4.lm_method_dst = NF_NET_KEEP;
2103 
2104 	  /* Let's copy some relevant field */
2105 	  pptrsv->vlan4.l4_proto = 0;
2106 	  memcpy(&pptrsv->vlan4.l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2107 
2108 	  if (tpl->tpl[NF9_APPLICATION_ID].len == 4) {
2109 	    struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
2110 	    struct xflow_status_entry *gentry = (struct xflow_status_entry *) pptrs->f_status_g;
2111             pm_class_t class_id = 0;
2112 
2113             memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
2114 	    if (entry) pptrsv->vlan4.class = NF_evaluate_classifiers(entry->class, &class_id, gentry);
2115 	  }
2116 	  if (config.nfacctd_isis) isis_srcdst_lookup(&pptrsv->vlan4);
2117 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, &pptrsv->vlan4, &pptrsv->vlan4.bta, &pptrsv->vlan4.bta2);
2118 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, &pptrsv->vlan4, &pptrsv->vlan4.bitr, NULL);
2119 	  if (config.bgp_daemon) bgp_srcdst_lookup(&pptrsv->vlan4, FUNC_TYPE_BGP);
2120 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, &pptrsv->vlan4, &pptrsv->vlan4.bpas, NULL);
2121 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, &pptrsv->vlan4, &pptrsv->vlan4.blp, NULL);
2122 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, &pptrsv->vlan4, &pptrsv->vlan4.bmed, NULL);
2123           if (config.bmp_daemon) bmp_srcdst_lookup(&pptrsv->vlan4);
2124 	  exec_plugins(&pptrsv->vlan4, req);
2125 	  break;
2126 	case NF9_FTYPE_VLAN_IPV6:
2127 	  pptrsv->vlan6.f_header = pptrs->f_header;
2128 	  pptrsv->vlan6.f_data = pptrs->f_data;
2129 	  pptrsv->vlan6.f_tpl = pptrs->f_tpl;
2130 	  pptrsv->vlan6.flow_type = pptrs->flow_type;
2131 
2132 	  if (req->bpf_filter) {
2133 	    reset_mac_vlan(&pptrsv->vlan6);
2134 	    reset_ip6(&pptrsv->vlan6);
2135 
2136 	    if (direction == DIRECTION_IN) {
2137 	      memcpy(pptrsv->vlan6.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
2138 	      memcpy(pptrsv->vlan6.mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
2139 	      memcpy(pptrsv->vlan6.vlan_ptr, pkt+tpl->tpl[NF9_IN_VLAN].off, tpl->tpl[NF9_IN_VLAN].len);
2140 	    }
2141             else if (direction == DIRECTION_OUT) {
2142 	      memcpy(pptrsv->vlan6.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
2143 	      memcpy(pptrsv->vlan6.mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
2144 	      memcpy(pptrsv->vlan6.vlan_ptr, pkt+tpl->tpl[NF9_OUT_VLAN].off, tpl->tpl[NF9_OUT_VLAN].len);
2145 	    }
2146 	    ((struct ip6_hdr *)pptrsv->vlan6.iph_ptr)->ip6_ctlun.ip6_un2_vfc = 0x60;
2147 	    memcpy(&((struct ip6_hdr *)pptrsv->vlan6.iph_ptr)->ip6_src, pkt+tpl->tpl[NF9_IPV6_SRC_ADDR].off, tpl->tpl[NF9_IPV6_SRC_ADDR].len);
2148 	    memcpy(&((struct ip6_hdr *)pptrsv->vlan6.iph_ptr)->ip6_dst, pkt+tpl->tpl[NF9_IPV6_DST_ADDR].off, tpl->tpl[NF9_IPV6_DST_ADDR].len);
2149 	    memcpy(&((struct ip6_hdr *)pptrsv->vlan6.iph_ptr)->ip6_nxt, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2150 	    /* XXX: class ID ? */
2151 	    memcpy(&((struct pm_tlhdr *)pptrsv->vlan6.tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
2152 	    memcpy(&((struct pm_tlhdr *)pptrsv->vlan6.tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
2153             memcpy(&((struct pm_tcphdr *)pptrsv->vlan6.tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
2154 	  }
2155 
2156           memcpy(&pptrsv->vlan6.lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
2157           memcpy(&pptrsv->vlan6.lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
2158           pptrsv->vlan6.lm_method_src = NF_NET_KEEP;
2159           pptrsv->vlan6.lm_method_dst = NF_NET_KEEP;
2160 
2161 	  /* Let's copy some relevant field */
2162 	  pptrsv->vlan6.l4_proto = 0;
2163 	  memcpy(&pptrsv->vlan6.l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2164 
2165 	  if (tpl->tpl[NF9_APPLICATION_ID].len == 4) {
2166 	    struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
2167 	    struct xflow_status_entry *gentry = (struct xflow_status_entry *) pptrs->f_status_g;
2168             pm_class_t class_id = 0;
2169 
2170             memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
2171 	    if (entry) pptrsv->vlan6.class = NF_evaluate_classifiers(entry->class, &class_id, gentry);
2172 	  }
2173 	  if (config.nfacctd_isis) isis_srcdst_lookup(&pptrsv->vlan6);
2174 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, &pptrsv->vlan6, &pptrsv->vlan6.bta, &pptrsv->vlan6.bta2);
2175 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, &pptrsv->vlan6, &pptrsv->vlan6.bitr, NULL);
2176 	  if (config.bgp_daemon) bgp_srcdst_lookup(&pptrsv->vlan6, FUNC_TYPE_BGP);
2177 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, &pptrsv->vlan6, &pptrsv->vlan6.bpas, NULL);
2178 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, &pptrsv->vlan6, &pptrsv->vlan6.blp, NULL);
2179 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, &pptrsv->vlan6, &pptrsv->vlan6.bmed, NULL);
2180           if (config.bmp_daemon) bmp_srcdst_lookup(&pptrsv->vlan6);
2181 	  exec_plugins(&pptrsv->vlan6, req);
2182 	  break;
2183         case NF9_FTYPE_MPLS_IPV4:
2184           pptrsv->mpls4.f_header = pptrs->f_header;
2185           pptrsv->mpls4.f_data = pptrs->f_data;
2186           pptrsv->mpls4.f_tpl = pptrs->f_tpl;
2187 	  pptrsv->mpls4.flow_type = pptrs->flow_type;
2188 
2189           if (req->bpf_filter) {
2190 	    u_char *ptr = pptrsv->mpls4.mpls_ptr;
2191 	    u_int32_t idx;
2192 
2193             /* XXX: fix caplen */
2194             reset_mac(&pptrsv->mpls4);
2195 	    if (direction == DIRECTION_IN) {
2196               memcpy(pptrsv->mpls4.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
2197               memcpy(pptrsv->mpls4.mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
2198 	    }
2199 	    else if (direction == DIRECTION_OUT) {
2200               memcpy(pptrsv->mpls4.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
2201               memcpy(pptrsv->mpls4.mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
2202 	    }
2203 
2204 	    for (idx = NF9_MPLS_LABEL_1; idx <= NF9_MPLS_LABEL_10 && tpl->tpl[idx].len; idx++, ptr += 4) {
2205 	      memset(ptr, 0, 4);
2206 	      memcpy(ptr, pkt+tpl->tpl[idx].off, tpl->tpl[idx].len);
2207 	    }
2208 	    stick_bosbit(ptr-4);
2209 	    pptrsv->mpls4.iph_ptr = ptr;
2210 	    pptrsv->mpls4.tlh_ptr = ptr + IP4HdrSz;
2211             reset_ip4(&pptrsv->mpls4);
2212 
2213 	    ((struct pm_iphdr *)pptrsv->mpls4.iph_ptr)->ip_vhl = 0x45;
2214             memcpy(&((struct pm_iphdr *)pptrsv->mpls4.iph_ptr)->ip_src, pkt+tpl->tpl[NF9_IPV4_SRC_ADDR].off, tpl->tpl[NF9_IPV4_SRC_ADDR].len);
2215             memcpy(&((struct pm_iphdr *)pptrsv->mpls4.iph_ptr)->ip_dst, pkt+tpl->tpl[NF9_IPV4_DST_ADDR].off, tpl->tpl[NF9_IPV4_DST_ADDR].len);
2216             memcpy(&((struct pm_iphdr *)pptrsv->mpls4.iph_ptr)->ip_p, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2217             memcpy(&((struct pm_iphdr *)pptrsv->mpls4.iph_ptr)->ip_tos, pkt+tpl->tpl[NF9_SRC_TOS].off, tpl->tpl[NF9_SRC_TOS].len);
2218             memcpy(&((struct pm_tlhdr *)pptrsv->mpls4.tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
2219             memcpy(&((struct pm_tlhdr *)pptrsv->mpls4.tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
2220             memcpy(&((struct pm_tcphdr *)pptrsv->mpls4.tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
2221 	  }
2222 
2223           memcpy(&pptrsv->mpls4.lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
2224           memcpy(&pptrsv->mpls4.lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
2225           pptrsv->mpls4.lm_method_src = NF_NET_KEEP;
2226           pptrsv->mpls4.lm_method_dst = NF_NET_KEEP;
2227 
2228 	  /* Let's copy some relevant field */
2229 	  pptrsv->mpls4.l4_proto = 0;
2230 	  memcpy(&pptrsv->mpls4.l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2231 
2232 	  if (tpl->tpl[NF9_APPLICATION_ID].len == 4) {
2233 	    struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
2234 	    struct xflow_status_entry *gentry = (struct xflow_status_entry *) pptrs->f_status_g;
2235             pm_class_t class_id = 0;
2236 
2237             memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
2238 	    if (entry) pptrsv->mpls4.class = NF_evaluate_classifiers(entry->class, &class_id, gentry);
2239 	  }
2240 	  if (config.nfacctd_isis) isis_srcdst_lookup(&pptrsv->mpls4);
2241 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, &pptrsv->mpls4, &pptrsv->mpls4.bta, &pptrsv->mpls4.bta2);
2242 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, &pptrsv->mpls4, &pptrsv->mpls4.bitr, NULL);
2243 	  if (config.bgp_daemon) bgp_srcdst_lookup(&pptrsv->mpls4, FUNC_TYPE_BGP);
2244 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, &pptrsv->mpls4, &pptrsv->mpls4.bpas, NULL);
2245 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, &pptrsv->mpls4, &pptrsv->mpls4.blp, NULL);
2246 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, &pptrsv->mpls4, &pptrsv->mpls4.bmed, NULL);
2247           if (config.bmp_daemon) bmp_srcdst_lookup(&pptrsv->mpls4);
2248           exec_plugins(&pptrsv->mpls4, req);
2249           break;
2250 	case NF9_FTYPE_MPLS_IPV6:
2251 	  pptrsv->mpls6.f_header = pptrs->f_header;
2252 	  pptrsv->mpls6.f_data = pptrs->f_data;
2253 	  pptrsv->mpls6.f_tpl = pptrs->f_tpl;
2254 	  pptrsv->mpls6.flow_type = pptrs->flow_type;
2255 
2256 	  if (req->bpf_filter) {
2257 	    u_char *ptr = pptrsv->mpls6.mpls_ptr;
2258 	    u_int32_t idx;
2259 
2260 	    /* XXX: fix caplen */
2261 	    reset_mac(&pptrsv->mpls6);
2262 	    if (direction == DIRECTION_IN) {
2263 	      memcpy(pptrsv->mpls6.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
2264 	      memcpy(pptrsv->mpls6.mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
2265 	    }
2266 	    else if (direction == DIRECTION_OUT) {
2267 	      memcpy(pptrsv->mpls6.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
2268 	      memcpy(pptrsv->mpls6.mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
2269 	    }
2270             for (idx = NF9_MPLS_LABEL_1; idx <= NF9_MPLS_LABEL_10 && tpl->tpl[idx].len; idx++, ptr += 4) {
2271 	      memset(ptr, 0, 4);
2272 	      memcpy(ptr, pkt+tpl->tpl[idx].off, tpl->tpl[idx].len);
2273 	    }
2274 	    stick_bosbit(ptr-4);
2275 	    pptrsv->mpls6.iph_ptr = ptr;
2276 	    pptrsv->mpls6.tlh_ptr = ptr + IP6HdrSz;
2277 	    reset_ip6(&pptrsv->mpls6);
2278 
2279 	    ((struct ip6_hdr *)pptrsv->mpls6.iph_ptr)->ip6_ctlun.ip6_un2_vfc = 0x60;
2280 	    memcpy(&((struct ip6_hdr *)pptrsv->mpls6.iph_ptr)->ip6_src, pkt+tpl->tpl[NF9_IPV6_SRC_ADDR].off, tpl->tpl[NF9_IPV6_SRC_ADDR].len);
2281 	    memcpy(&((struct ip6_hdr *)pptrsv->mpls6.iph_ptr)->ip6_dst, pkt+tpl->tpl[NF9_IPV6_DST_ADDR].off, tpl->tpl[NF9_IPV6_DST_ADDR].len);
2282 	    memcpy(&((struct ip6_hdr *)pptrsv->mpls6.iph_ptr)->ip6_nxt, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2283 	    /* XXX: class ID ? */
2284 	    memcpy(&((struct pm_tlhdr *)pptrsv->mpls6.tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
2285 	    memcpy(&((struct pm_tlhdr *)pptrsv->mpls6.tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
2286             memcpy(&((struct pm_tcphdr *)pptrsv->mpls6.tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
2287 	  }
2288 
2289           memcpy(&pptrsv->mpls6.lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
2290           memcpy(&pptrsv->mpls6.lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
2291           pptrsv->mpls6.lm_method_src = NF_NET_KEEP;
2292           pptrsv->mpls6.lm_method_dst = NF_NET_KEEP;
2293 
2294 	  /* Let's copy some relevant field */
2295 	  pptrsv->mpls6.l4_proto = 0;
2296 	  memcpy(&pptrsv->mpls6.l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2297 
2298 	  if (tpl->tpl[NF9_APPLICATION_ID].len == 4) {
2299 	    struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
2300 	    struct xflow_status_entry *gentry = (struct xflow_status_entry *) pptrs->f_status_g;
2301             pm_class_t class_id = 0;
2302 
2303             memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
2304 	    if (entry) pptrsv->mpls6.class = NF_evaluate_classifiers(entry->class, &class_id, gentry);
2305 	  }
2306 	  if (config.nfacctd_isis) isis_srcdst_lookup(&pptrsv->mpls6);
2307 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, &pptrsv->mpls6, &pptrsv->mpls6.bta, &pptrsv->mpls6.bta2);
2308 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, &pptrsv->mpls6, &pptrsv->mpls6.bitr, NULL);
2309 	  if (config.bgp_daemon) bgp_srcdst_lookup(&pptrsv->mpls6, FUNC_TYPE_BGP);
2310 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, &pptrsv->mpls6, &pptrsv->mpls6.bpas, NULL);
2311 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, &pptrsv->mpls6, &pptrsv->mpls6.blp, NULL);
2312 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, &pptrsv->mpls6, &pptrsv->mpls6.bmed, NULL);
2313           if (config.bmp_daemon) bmp_srcdst_lookup(&pptrsv->mpls6);
2314 	  exec_plugins(&pptrsv->mpls6, req);
2315 	  break;
2316         case NF9_FTYPE_VLAN_MPLS_IPV4:
2317 	  pptrsv->vlanmpls4.f_header = pptrs->f_header;
2318 	  pptrsv->vlanmpls4.f_data = pptrs->f_data;
2319 	  pptrsv->vlanmpls4.f_tpl = pptrs->f_tpl;
2320 	  pptrsv->vlanmpls4.flow_type = pptrs->flow_type;
2321 
2322           if (req->bpf_filter) {
2323             u_char *ptr = pptrsv->vlanmpls4.mpls_ptr;
2324 	    u_int32_t idx;
2325 
2326 	    /* XXX: fix caplen */
2327 	    reset_mac_vlan(&pptrsv->vlanmpls4);
2328 	    if (direction == DIRECTION_IN) {
2329 	      memcpy(pptrsv->vlanmpls4.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
2330 	      memcpy(pptrsv->vlanmpls4.mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
2331 	      memcpy(pptrsv->vlanmpls4.vlan_ptr, pkt+tpl->tpl[NF9_IN_VLAN].off, tpl->tpl[NF9_IN_VLAN].len);
2332 	    }
2333 	    else if (direction == DIRECTION_OUT) {
2334 	      memcpy(pptrsv->vlanmpls4.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
2335 	      memcpy(pptrsv->vlanmpls4.mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
2336 	      memcpy(pptrsv->vlanmpls4.vlan_ptr, pkt+tpl->tpl[NF9_OUT_VLAN].off, tpl->tpl[NF9_OUT_VLAN].len);
2337 	    }
2338 
2339 	    for (idx = NF9_MPLS_LABEL_1; idx <= NF9_MPLS_LABEL_10 && tpl->tpl[idx].len; idx++, ptr += 4) {
2340 	      memset(ptr, 0, 4);
2341 	      memcpy(ptr, pkt+tpl->tpl[idx].off, tpl->tpl[idx].len);
2342 	    }
2343 	    stick_bosbit(ptr-4);
2344 	    pptrsv->vlanmpls4.iph_ptr = ptr;
2345 	    pptrsv->vlanmpls4.tlh_ptr = ptr + IP4HdrSz;
2346             reset_ip4(&pptrsv->vlanmpls4);
2347 
2348 	    ((struct pm_iphdr *)pptrsv->vlanmpls4.iph_ptr)->ip_vhl = 0x45;
2349             memcpy(&((struct pm_iphdr *)pptrsv->vlanmpls4.iph_ptr)->ip_src, pkt+tpl->tpl[NF9_IPV4_SRC_ADDR].off, tpl->tpl[NF9_IPV4_SRC_ADDR].len);
2350 	    memcpy(&((struct pm_iphdr *)pptrsv->vlanmpls4.iph_ptr)->ip_dst, pkt+tpl->tpl[NF9_IPV4_DST_ADDR].off, tpl->tpl[NF9_IPV4_DST_ADDR].len);
2351 	    memcpy(&((struct pm_iphdr *)pptrsv->vlanmpls4.iph_ptr)->ip_p, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2352 	    memcpy(&((struct pm_iphdr *)pptrsv->vlanmpls4.iph_ptr)->ip_tos, pkt+tpl->tpl[NF9_SRC_TOS].off, tpl->tpl[NF9_SRC_TOS].len);
2353 	    memcpy(&((struct pm_tlhdr *)pptrsv->vlanmpls4.tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
2354 	    memcpy(&((struct pm_tlhdr *)pptrsv->vlanmpls4.tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
2355             memcpy(&((struct pm_tcphdr *)pptrsv->vlanmpls4.tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
2356 	  }
2357 
2358           memcpy(&pptrsv->vlanmpls4.lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
2359           memcpy(&pptrsv->vlanmpls4.lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
2360           pptrsv->vlanmpls4.lm_method_src = NF_NET_KEEP;
2361           pptrsv->vlanmpls4.lm_method_dst = NF_NET_KEEP;
2362 
2363 	  /* Let's copy some relevant field */
2364 	  pptrsv->vlanmpls4.l4_proto = 0;
2365 	  memcpy(&pptrsv->vlanmpls4.l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2366 
2367 	  if (tpl->tpl[NF9_APPLICATION_ID].len == 4) {
2368 	    struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
2369 	    struct xflow_status_entry *gentry = (struct xflow_status_entry *) pptrs->f_status_g;
2370             pm_class_t class_id = 0;
2371 
2372             memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
2373 	    if (entry) pptrsv->vlanmpls4.class = NF_evaluate_classifiers(entry->class, &class_id, gentry);
2374 	  }
2375 	  if (config.nfacctd_isis) isis_srcdst_lookup(&pptrsv->vlanmpls4);
2376 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, &pptrsv->vlanmpls4, &pptrsv->vlanmpls4.bta, &pptrsv->vlanmpls4.bta2);
2377 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, &pptrsv->vlanmpls4, &pptrsv->vlanmpls4.bitr, NULL);
2378 	  if (config.bgp_daemon) bgp_srcdst_lookup(&pptrsv->vlanmpls4, FUNC_TYPE_BGP);
2379 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, &pptrsv->vlanmpls4, &pptrsv->vlanmpls4.bpas, NULL);
2380 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, &pptrsv->vlanmpls4, &pptrsv->vlanmpls4.blp, NULL);
2381 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, &pptrsv->vlanmpls4, &pptrsv->vlanmpls4.bmed, NULL);
2382           if (config.bmp_daemon) bmp_srcdst_lookup(&pptrsv->vlanmpls4);
2383 	  exec_plugins(&pptrsv->vlanmpls4, req);
2384 	  break;
2385         case NF9_FTYPE_VLAN_MPLS_IPV6:
2386 	  pptrsv->vlanmpls6.f_header = pptrs->f_header;
2387 	  pptrsv->vlanmpls6.f_data = pptrs->f_data;
2388 	  pptrsv->vlanmpls6.f_tpl = pptrs->f_tpl;
2389 	  pptrsv->vlanmpls6.flow_type = pptrs->flow_type;
2390 
2391           if (req->bpf_filter) {
2392             u_char *ptr = pptrsv->vlanmpls6.mpls_ptr;
2393             u_int32_t idx;
2394 
2395             /* XXX: fix caplen */
2396 	    reset_mac_vlan(&pptrsv->vlanmpls6);
2397 	    if (direction == DIRECTION_IN) {
2398 	      memcpy(pptrsv->vlanmpls6.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
2399 	      memcpy(pptrsv->vlanmpls6.mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
2400 	      memcpy(pptrsv->vlanmpls6.vlan_ptr, pkt+tpl->tpl[NF9_IN_VLAN].off, tpl->tpl[NF9_IN_VLAN].len);
2401 	    }
2402 	    else if (direction == DIRECTION_OUT) {
2403 	      memcpy(pptrsv->vlanmpls6.mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
2404 	      memcpy(pptrsv->vlanmpls6.mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
2405 	      memcpy(pptrsv->vlanmpls6.vlan_ptr, pkt+tpl->tpl[NF9_OUT_VLAN].off, tpl->tpl[NF9_OUT_VLAN].len);
2406 	    }
2407 	    for (idx = NF9_MPLS_LABEL_1; idx <= NF9_MPLS_LABEL_10 && tpl->tpl[idx].len; idx++, ptr += 4) {
2408 	      memset(ptr, 0, 4);
2409 	      memcpy(ptr, pkt+tpl->tpl[idx].off, tpl->tpl[idx].len);
2410 	    }
2411 	    stick_bosbit(ptr-4);
2412 	    pptrsv->vlanmpls6.iph_ptr = ptr;
2413 	    pptrsv->vlanmpls6.tlh_ptr = ptr + IP6HdrSz;
2414 	    reset_ip6(&pptrsv->vlanmpls6);
2415 
2416 	    ((struct ip6_hdr *)pptrsv->vlanmpls6.iph_ptr)->ip6_ctlun.ip6_un2_vfc = 0x60;
2417 	    memcpy(&((struct ip6_hdr *)pptrsv->vlanmpls6.iph_ptr)->ip6_src, pkt+tpl->tpl[NF9_IPV6_SRC_ADDR].off, tpl->tpl[NF9_IPV6_SRC_ADDR].len);
2418 	    memcpy(&((struct ip6_hdr *)pptrsv->vlanmpls6.iph_ptr)->ip6_dst, pkt+tpl->tpl[NF9_IPV6_DST_ADDR].off, tpl->tpl[NF9_IPV6_DST_ADDR].len);
2419 	    memcpy(&((struct ip6_hdr *)pptrsv->vlanmpls6.iph_ptr)->ip6_nxt, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2420 	    /* XXX: class ID ? */
2421 	    memcpy(&((struct pm_tlhdr *)pptrsv->vlanmpls6.tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
2422 	    memcpy(&((struct pm_tlhdr *)pptrsv->vlanmpls6.tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
2423             memcpy(&((struct pm_tcphdr *)pptrsv->vlanmpls6.tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
2424 	  }
2425 
2426           memcpy(&pptrsv->vlanmpls6.lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
2427           memcpy(&pptrsv->vlanmpls6.lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
2428           pptrsv->vlanmpls6.lm_method_src = NF_NET_KEEP;
2429           pptrsv->vlanmpls6.lm_method_dst = NF_NET_KEEP;
2430 
2431 	  /* Let's copy some relevant field */
2432 	  pptrsv->vlanmpls6.l4_proto = 0;
2433 	  memcpy(&pptrsv->vlanmpls6.l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2434 
2435 	  if (tpl->tpl[NF9_APPLICATION_ID].len == 4) {
2436 	    struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
2437 	    struct xflow_status_entry *gentry = (struct xflow_status_entry *) pptrs->f_status_g;
2438             pm_class_t class_id = 0;
2439 
2440             memcpy(&class_id, pkt+tpl->tpl[NF9_APPLICATION_ID].off, 4);
2441 	    if (entry) pptrsv->vlanmpls6.class = NF_evaluate_classifiers(entry->class, &class_id, gentry);
2442 	  }
2443 	  if (config.nfacctd_isis) isis_srcdst_lookup(&pptrsv->vlanmpls6);
2444 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, &pptrsv->vlanmpls6, &pptrsv->vlanmpls6.bta, &pptrsv->vlanmpls6.bta2);
2445 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, &pptrsv->vlanmpls6, &pptrsv->vlanmpls6.bitr, NULL);
2446 	  if (config.bgp_daemon) bgp_srcdst_lookup(&pptrsv->vlanmpls6, FUNC_TYPE_BGP);
2447 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, &pptrsv->vlanmpls6, &pptrsv->vlanmpls6.bpas, NULL);
2448 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, &pptrsv->vlanmpls6, &pptrsv->vlanmpls6.blp, NULL);
2449 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, &pptrsv->vlanmpls6, &pptrsv->vlanmpls6.bmed, NULL);
2450           if (config.bmp_daemon) bmp_srcdst_lookup(&pptrsv->vlanmpls6);
2451 	  exec_plugins(&pptrsv->vlanmpls6, req);
2452 	  break;
2453 	case NF9_FTYPE_NAT_EVENT:
2454 	  /* XXX: aggregate_filter & NAT64 case */
2455 	  if (req->bpf_filter) {
2456 	    reset_mac(pptrs);
2457 	    reset_ip4(pptrs);
2458 
2459 	    if (direction == DIRECTION_IN) {
2460               memcpy(pptrs->mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_IN_SRC_MAC].off, tpl->tpl[NF9_IN_SRC_MAC].len);
2461               memcpy(pptrs->mac_ptr, pkt+tpl->tpl[NF9_IN_DST_MAC].off, tpl->tpl[NF9_IN_DST_MAC].len);
2462 	    }
2463 	    else if (direction == DIRECTION_OUT) {
2464               memcpy(pptrs->mac_ptr+ETH_ADDR_LEN, pkt+tpl->tpl[NF9_OUT_SRC_MAC].off, tpl->tpl[NF9_OUT_SRC_MAC].len);
2465               memcpy(pptrs->mac_ptr, pkt+tpl->tpl[NF9_OUT_DST_MAC].off, tpl->tpl[NF9_OUT_DST_MAC].len);
2466 	    }
2467 	    ((struct pm_iphdr *)pptrs->iph_ptr)->ip_vhl = 0x45;
2468             memcpy(&((struct pm_iphdr *)pptrs->iph_ptr)->ip_src, pkt+tpl->tpl[NF9_IPV4_SRC_ADDR].off, tpl->tpl[NF9_IPV4_SRC_ADDR].len);
2469             memcpy(&((struct pm_iphdr *)pptrs->iph_ptr)->ip_dst, pkt+tpl->tpl[NF9_IPV4_DST_ADDR].off, tpl->tpl[NF9_IPV4_DST_ADDR].len);
2470             memcpy(&((struct pm_iphdr *)pptrs->iph_ptr)->ip_p, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2471             memcpy(&((struct pm_iphdr *)pptrs->iph_ptr)->ip_tos, pkt+tpl->tpl[NF9_SRC_TOS].off, tpl->tpl[NF9_SRC_TOS].len);
2472             memcpy(&((struct pm_tlhdr *)pptrs->tlh_ptr)->src_port, pkt+tpl->tpl[NF9_L4_SRC_PORT].off, tpl->tpl[NF9_L4_SRC_PORT].len);
2473             memcpy(&((struct pm_tlhdr *)pptrs->tlh_ptr)->dst_port, pkt+tpl->tpl[NF9_L4_DST_PORT].off, tpl->tpl[NF9_L4_DST_PORT].len);
2474             memcpy(&((struct pm_tcphdr *)pptrs->tlh_ptr)->th_flags, pkt+tpl->tpl[NF9_TCP_FLAGS].off, tpl->tpl[NF9_TCP_FLAGS].len);
2475 	  }
2476 
2477 	  memcpy(&pptrs->lm_mask_src, pkt+tpl->tpl[NF9_SRC_MASK].off, tpl->tpl[NF9_SRC_MASK].len);
2478 	  memcpy(&pptrs->lm_mask_dst, pkt+tpl->tpl[NF9_DST_MASK].off, tpl->tpl[NF9_DST_MASK].len);
2479 	  pptrs->lm_method_src = NF_NET_KEEP;
2480 	  pptrs->lm_method_dst = NF_NET_KEEP;
2481 
2482 	  /* Let's copy some relevant field */
2483 	  pptrs->l4_proto = 0;
2484 	  memcpy(&pptrs->l4_proto, pkt+tpl->tpl[NF9_L4_PROTOCOL].off, tpl->tpl[NF9_L4_PROTOCOL].len);
2485 
2486 	  if (config.nfacctd_isis) isis_srcdst_lookup(pptrs);
2487 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, pptrs, &pptrs->bta, &pptrs->bta2);
2488 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, pptrs, &pptrs->bitr, NULL);
2489 	  if (config.bgp_daemon) bgp_srcdst_lookup(pptrs, FUNC_TYPE_BGP);
2490 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, pptrs, &pptrs->bpas, NULL);
2491 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, pptrs, &pptrs->blp, NULL);
2492 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, pptrs, &pptrs->bmed, NULL);
2493 	  if (config.bmp_daemon) bmp_srcdst_lookup(pptrs);
2494 
2495           exec_plugins(pptrs, req);
2496 	  break;
2497 	case NF9_FTYPE_DLFS:
2498 	  dummy_packet_ptr = pptrs->packet_ptr;
2499 	  nfv9_datalink_frame_section_handler(pptrs);
2500 
2501 	  if (config.nfacctd_isis) isis_srcdst_lookup(pptrs);
2502 	  if (config.bgp_daemon_to_xflow_agent_map) BTA_find_id((struct id_table *)pptrs->bta_table, pptrs, &pptrs->bta, &pptrs->bta2);
2503 	  if (config.nfacctd_flow_to_rd_map) NF_find_id((struct id_table *)pptrs->bitr_table, pptrs, &pptrs->bitr, NULL);
2504 	  if (config.bgp_daemon) bgp_srcdst_lookup(pptrs, FUNC_TYPE_BGP);
2505 	  if (config.bgp_daemon_peer_as_src_map) NF_find_id((struct id_table *)pptrs->bpas_table, pptrs, &pptrs->bpas, NULL);
2506 	  if (config.bgp_daemon_src_local_pref_map) NF_find_id((struct id_table *)pptrs->blp_table, pptrs, &pptrs->blp, NULL);
2507 	  if (config.bgp_daemon_src_med_map) NF_find_id((struct id_table *)pptrs->bmed_table, pptrs, &pptrs->bmed, NULL);
2508           if (config.bmp_daemon) bmp_srcdst_lookup(pptrs);
2509 
2510           exec_plugins(pptrs, req);
2511 	  reset_dummy_v4(pptrs, dummy_packet_ptr);
2512 	  break;
2513 	default:
2514 	  break;
2515         }
2516 
2517         finalize_record:
2518         pkt += tpl->len;
2519         flowoff += tpl->len;
2520 
2521 	/* we have to reset; let's not do to zero to short-circuit
2522 	   the case of the last record in a flowset; we can save a
2523 	   round through resolve_vlen_template() */
2524 	if (tpl->vlen) tpl->len = 1;
2525 	FlowSeqInc++;
2526       }
2527 
2528       /* last pre-flight check for the subsequent subtraction */
2529       if (flowoff > flowsetlen) {
2530         notify_malf_packet(LOG_INFO, "INFO", "aborting malformed Data element (incomplete NetFlow v9/IPFIX packet)",
2531                       (struct sockaddr *) pptrsv->v4.f_agent, FlowSeq);
2532         xflow_tot_bad_datagrams++;
2533         return;
2534       }
2535 
2536       pkt += (flowsetlen-flowoff); /* handling padding */
2537       off += flowsetlen;
2538     }
2539   }
2540   else { /* unsupported flowset */
2541     if (off+flowsetlen > len) {
2542       Log(LOG_DEBUG, "DEBUG ( %s/core ): unable to read unsupported Flowset (ID: '%u').\n", config.name, fid);
2543       return;
2544     }
2545     pkt += flowsetlen;
2546     off += flowsetlen;
2547   }
2548 
2549   if ((version == 9 && (flowsetCount + 1) < flowsetNo && off < len) ||
2550       (version == 10 && off < len)) {
2551     flowsetCount++;
2552     goto process_flowset;
2553   }
2554 
2555   /* Set IPFIX Sequence number increment */
2556   if (version == 10) {
2557     struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrsv->v4.f_status;
2558 
2559     if (entry) entry->inc = FlowSeqInc;
2560   }
2561 }
2562 
process_raw_packet(unsigned char * pkt,u_int16_t len,struct packet_ptrs_vector * pptrsv,struct plugin_requests * req)2563 void process_raw_packet(unsigned char *pkt, u_int16_t len, struct packet_ptrs_vector *pptrsv,
2564                 struct plugin_requests *req)
2565 {
2566   struct packet_ptrs *pptrs = &pptrsv->v4;
2567   u_int16_t nfv;
2568 
2569   /* basic length check against longest NetFlow header */
2570   if (len < NfHdrV5Sz) {
2571     notify_malf_packet(LOG_INFO, "INFO", "discarding short NetFlow packet", (struct sockaddr *) pptrs->f_agent, 0);
2572     xflow_tot_bad_datagrams++;
2573     return;
2574   }
2575 
2576   if (req->ptm_c.exec_ptm_dissect) nfv = ((struct struct_header_v5 *)pkt)->version;
2577   else nfv = ntohs(((struct struct_header_v5 *)pkt)->version);
2578 
2579   if (nfv != 5 && nfv != 9 && nfv != 10) {
2580     if (!config.nfacctd_disable_checks) {
2581       notify_malf_packet(LOG_INFO, "INFO", "discarding unknown NetFlow packet", (struct sockaddr *) pptrs->f_agent, 0);
2582       xflow_tot_bad_datagrams++;
2583     }
2584     return;
2585   }
2586 
2587   pptrs->f_header = pkt;
2588 
2589   switch (nfv) {
2590   case 5:
2591     pptrs->seqno = ntohl(((struct struct_header_v5 *)pkt)->flow_sequence);
2592     if (!req->ptm_c.exec_ptm_dissect) nfv5_check_status(pptrs); /* stats collection */
2593     break;
2594   case 9:
2595     pptrs->seqno = ntohl(((struct struct_header_v9 *)pkt)->flow_sequence);
2596     if (!req->ptm_c.exec_ptm_dissect) {
2597       u_int32_t SourceId = ntohl(((struct struct_header_v9 *)pkt)->source_id);
2598       nfv9_check_status(pptrs, SourceId, 0, pptrs->seqno, TRUE); /* stats collection */
2599     }
2600     break;
2601   case 10:
2602     pptrs->seqno = ntohl(((struct struct_header_ipfix *)pkt)->flow_sequence);
2603     if (!req->ptm_c.exec_ptm_dissect) {
2604       u_int32_t SourceId = ntohl(((struct struct_header_ipfix *)pkt)->source_id);
2605       nfv9_check_status(pptrs, SourceId, 0, pptrs->seqno, TRUE); /* stats collection */
2606     }
2607     break;
2608   default:
2609     pptrs->seqno = 0;
2610     break;
2611   }
2612 
2613   if (!req->ptm_c.exec_ptm_dissect && config.debug) {
2614     sa_to_addr((struct sockaddr *)pptrs->f_agent, &debug_a, &debug_agent_port);
2615     addr_to_str(debug_agent_addr, &debug_a);
2616 
2617     Log(LOG_DEBUG, "DEBUG ( %s/core ): Received NetFlow/IPFIX packet from [%s:%u] version [%u] seqno [%u]\n",
2618 	config.name, debug_agent_addr, debug_agent_port, nfv, pptrs->seqno);
2619   }
2620 
2621   if (req->ptm_c.exec_ptm_dissect) {
2622     struct NF_dissect tee_dissect;
2623 
2624     memset(&tee_dissect, 0, sizeof(tee_dissect));
2625     pptrsv->v4.tee_dissect = (char *) &tee_dissect;
2626     req->ptm_c.exec_ptm_res = TRUE;
2627 
2628     switch(nfv) {
2629     case 5:
2630       process_v5_packet(pkt, len, &pptrsv->v4, req, nfv, &tee_dissect);
2631       break;
2632     /* NetFlow v9 + IPFIX */
2633     case 9:
2634     case 10:
2635       process_v9_packet(pkt, len, pptrsv, req, nfv, &tee_dissect, NULL);
2636       break;
2637     default:
2638       break;
2639     }
2640   }
2641 
2642   /* If dissecting, we also send the full original packet */
2643   if (req->ptm_c.exec_ptm_dissect)
2644     ((struct struct_header_v5 *)pkt)->version = htons(((struct struct_header_v5 *)pkt)->version);
2645 
2646   pptrs->tee_dissect = NULL;
2647   pptrs->f_data = NULL;
2648   pptrs->f_tpl = NULL;
2649   req->ptm_c.exec_ptm_res = FALSE;
2650 
2651   exec_plugins(pptrs, req);
2652 }
2653 
NF_compute_once()2654 void NF_compute_once()
2655 {
2656   struct pkt_data dummy;
2657 
2658   CounterSz = sizeof(dummy.pkt_len);
2659   PdataSz = sizeof(struct pkt_data);
2660   PpayloadSz = sizeof(struct pkt_payload);
2661   PmsgSz = sizeof(struct pkt_msg);
2662   PextrasSz = sizeof(struct pkt_extras);
2663   PbgpSz = sizeof(struct pkt_bgp_primitives);
2664   PlbgpSz = sizeof(struct pkt_legacy_bgp_primitives);
2665   PnatSz = sizeof(struct pkt_nat_primitives);
2666   PmplsSz = sizeof(struct pkt_mpls_primitives);
2667   PtunSz = sizeof(struct pkt_tunnel_primitives);
2668   PvhdrSz = sizeof(struct pkt_vlen_hdr_primitives);
2669   PmLabelTSz = sizeof(pm_label_t);
2670   PtLabelTSz = sizeof(pt_label_t);
2671   ChBufHdrSz = sizeof(struct ch_buf_hdr);
2672   CharPtrSz = sizeof(char *);
2673   NfHdrV5Sz = sizeof(struct struct_header_v5);
2674   NfHdrV9Sz = sizeof(struct struct_header_v9);
2675   NfDataHdrV9Sz = sizeof(struct data_hdr_v9);
2676   NfTplHdrV9Sz = sizeof(struct template_hdr_v9);
2677   NfTplFieldV9Sz = sizeof(struct template_field_v9);
2678   NfOptTplHdrV9Sz = sizeof(struct options_template_hdr_v9);
2679   NfDataV5Sz = sizeof(struct struct_export_v5);
2680   IP4HdrSz = sizeof(struct pm_iphdr);
2681   IP4TlSz = sizeof(struct pm_iphdr)+sizeof(struct pm_tlhdr);
2682   PptrsSz = sizeof(struct packet_ptrs);
2683   CSSz = sizeof(struct class_st);
2684   HostAddrSz = sizeof(struct host_addr);
2685   UDPHdrSz = sizeof(struct pm_udphdr);
2686   IpFixHdrSz = sizeof(struct struct_header_ipfix);
2687 
2688   IP6HdrSz = sizeof(struct ip6_hdr);
2689   IP6AddrSz = sizeof(struct in6_addr);
2690   IP6TlSz = sizeof(struct ip6_hdr)+sizeof(struct pm_tlhdr);
2691 }
2692 
NF_evaluate_flow_type(struct template_cache_entry * tpl,struct packet_ptrs * pptrs)2693 u_int8_t NF_evaluate_flow_type(struct template_cache_entry *tpl, struct packet_ptrs *pptrs)
2694 {
2695   u_int8_t ret = NF9_FTYPE_TRAFFIC;
2696   u_int8_t have_ip_proto = FALSE;
2697 
2698   /* first round: event vs traffic */
2699   if (!tpl->tpl[NF9_IN_BYTES].len && !tpl->tpl[NF9_OUT_BYTES].len && !tpl->tpl[NF9_FLOW_BYTES].len &&
2700       !tpl->tpl[NF9_INITIATOR_OCTETS].len && !tpl->tpl[NF9_RESPONDER_OCTETS].len && /* packets? && */
2701       !tpl->tpl[NF9_DATALINK_FRAME_SECTION].len) {
2702     ret = NF9_FTYPE_EVENT;
2703   }
2704   else {
2705     if ((tpl->tpl[NF9_IN_VLAN].len && *(pptrs->f_data+tpl->tpl[NF9_IN_VLAN].off) > 0) ||
2706         (tpl->tpl[NF9_OUT_VLAN].len && *(pptrs->f_data+tpl->tpl[NF9_OUT_VLAN].off) > 0)) ret += NF9_FTYPE_VLAN;
2707     if (tpl->tpl[NF9_MPLS_LABEL_1].len /* check: value > 0 ? */) ret += NF9_FTYPE_MPLS;
2708 
2709     /* Explicit IP protocol definition first; a bit of heuristics as fallback */
2710     if (tpl->tpl[NF9_IP_PROTOCOL_VERSION].len) {
2711       if (*(pptrs->f_data+tpl->tpl[NF9_IP_PROTOCOL_VERSION].off) == 4) {
2712 	have_ip_proto = TRUE;
2713       }
2714       else if (*(pptrs->f_data+tpl->tpl[NF9_IP_PROTOCOL_VERSION].off) == 6) {
2715 	ret += NF9_FTYPE_TRAFFIC_IPV6;
2716 	have_ip_proto = TRUE;
2717       }
2718     }
2719 
2720     if (!have_ip_proto) {
2721       if (tpl->tpl[NF9_IPV4_SRC_ADDR].len) {
2722 	have_ip_proto = TRUE;
2723       }
2724       else if (tpl->tpl[NF9_IPV6_SRC_ADDR].len) {
2725 	ret += NF9_FTYPE_TRAFFIC_IPV6;
2726 	have_ip_proto = TRUE;
2727       }
2728     }
2729 
2730     if (tpl->tpl[NF9_DATALINK_FRAME_SECTION].len) ret = NF9_FTYPE_DLFS;
2731   }
2732 
2733   /* second round: overrides */
2734 
2735   /* NetFlow Event Logging (NEL): generic NAT event support */
2736   if (tpl->tpl[NF9_NAT_EVENT].len) ret = NF9_FTYPE_NAT_EVENT;
2737 
2738   /* NetFlow/IPFIX option final override */
2739   if (tpl->template_type == 1) ret = NF9_FTYPE_OPTION;
2740 
2741   return ret;
2742 }
2743 
NF_evaluate_direction(struct template_cache_entry * tpl,struct packet_ptrs * pptrs)2744 u_int16_t NF_evaluate_direction(struct template_cache_entry *tpl, struct packet_ptrs *pptrs)
2745 {
2746   u_int16_t ret = DIRECTION_IN;
2747 
2748   if (tpl->tpl[NF9_DIRECTION].len && *(pptrs->f_data+tpl->tpl[NF9_DIRECTION].off) == 1) ret = DIRECTION_OUT;
2749 
2750   return ret;
2751 }
2752 
reset_mac(struct packet_ptrs * pptrs)2753 void reset_mac(struct packet_ptrs *pptrs)
2754 {
2755   memset(pptrs->mac_ptr, 0, 2*ETH_ADDR_LEN);
2756 }
2757 
reset_mac_vlan(struct packet_ptrs * pptrs)2758 void reset_mac_vlan(struct packet_ptrs *pptrs)
2759 {
2760   memset(pptrs->mac_ptr, 0, 2*ETH_ADDR_LEN);
2761   memset(pptrs->vlan_ptr, 0, 2);
2762 }
2763 
reset_ip4(struct packet_ptrs * pptrs)2764 void reset_ip4(struct packet_ptrs *pptrs)
2765 {
2766   memset(pptrs->iph_ptr, 0, IP4TlSz);
2767   Assign8(((struct pm_iphdr *)pptrs->iph_ptr)->ip_vhl, 5);
2768 }
2769 
reset_ip6(struct packet_ptrs * pptrs)2770 void reset_ip6(struct packet_ptrs *pptrs)
2771 {
2772   memset(pptrs->iph_ptr, 0, IP6TlSz);
2773   Assign16(((struct ip6_hdr *)pptrs->iph_ptr)->ip6_plen, htons(100));
2774   ((struct ip6_hdr *)pptrs->iph_ptr)->ip6_hlim = 64;
2775 }
2776 
reset_dummy_v4(struct packet_ptrs * pptrs,u_char * dummy_packet)2777 void reset_dummy_v4(struct packet_ptrs *pptrs, u_char *dummy_packet)
2778 {
2779   pptrs->packet_ptr = dummy_packet;
2780   /* pptrs->pkthdr = dummy_pkthdr; */
2781   Assign16(((struct eth_header *)pptrs->packet_ptr)->ether_type, htons(ETHERTYPE_IP));
2782   pptrs->mac_ptr = (u_char *)((struct eth_header *)pptrs->packet_ptr)->ether_dhost;
2783   pptrs->iph_ptr = pptrs->packet_ptr + ETHER_HDRLEN;
2784   pptrs->tlh_ptr = pptrs->packet_ptr + ETHER_HDRLEN + sizeof(struct pm_iphdr);
2785   Assign8(((struct pm_iphdr *)pptrs->iph_ptr)->ip_vhl, 5);
2786   pptrs->pkthdr->caplen = 55;
2787   pptrs->pkthdr->len = 100;
2788   pptrs->l3_proto = ETHERTYPE_IP;
2789 }
2790 
notify_malf_packet(short int severity,char * severity_str,char * ostr,struct sockaddr * sa,u_int32_t seq)2791 void notify_malf_packet(short int severity, char *severity_str, char *ostr, struct sockaddr *sa, u_int32_t seq)
2792 {
2793   struct host_addr a;
2794   char errstr[SRVBUFLEN];
2795   char agent_addr[50] /* able to fit an IPv6 string aswell */, any[] = "0.0.0.0";
2796   u_int16_t agent_port;
2797 
2798   sa_to_addr((struct sockaddr *)sa, &a, &agent_port);
2799   addr_to_str(agent_addr, &a);
2800 
2801   if (seq) snprintf(errstr, SRVBUFLEN, "%s ( %s/core ): %s: nfacctd=%s:%u agent=%s:%u seq=%u\n",
2802 		severity_str, config.name, ostr, ((config.nfacctd_ip) ? config.nfacctd_ip : any),
2803 		collector_port, agent_addr, agent_port, seq);
2804   else snprintf(errstr, SRVBUFLEN, "%s ( %s/core ): %s: nfacctd=%s:%u agent=%s:%u\n",
2805 		severity_str, config.name, ostr, ((config.nfacctd_ip) ? config.nfacctd_ip : any),
2806 		collector_port, agent_addr, agent_port);
2807 
2808   Log(severity, "%s", errstr);
2809 }
2810 
NF_find_id(struct id_table * t,struct packet_ptrs * pptrs,pm_id_t * tag,pm_id_t * tag2)2811 int NF_find_id(struct id_table *t, struct packet_ptrs *pptrs, pm_id_t *tag, pm_id_t *tag2)
2812 {
2813   struct xflow_status_entry *entry = (struct xflow_status_entry *) pptrs->f_status;
2814   struct sockaddr *sa = NULL;
2815   u_char *saved_f_agent = NULL;
2816   int x, begin = 0, end = 0;
2817   pm_id_t ret = 0;
2818 
2819   if (!t) return 0;
2820 
2821   /* if NF9_EXPORTER_IPV[46]_ADDRESS from NetFlow v9/IPFIX options, use it */
2822   if (entry && entry->exp_sa.sa_family) {
2823     saved_f_agent = pptrs->f_agent;
2824     pptrs->f_agent = (u_char *) &entry->exp_sa;
2825   }
2826 
2827   sa = (struct sockaddr *) pptrs->f_agent;
2828 
2829   /* The id_table is shared between by IPv4 and IPv6 NetFlow agents.
2830      IPv4 ones are in the lower part (0..x), IPv6 ones are in the upper
2831      part (x+1..end)
2832   */
2833 
2834   pretag_init_vars(pptrs, t);
2835   if (tag) *tag = 0;
2836   if (tag2) *tag2 = 0;
2837   if (pptrs) {
2838     pptrs->have_tag = FALSE;
2839     pptrs->have_tag2 = FALSE;
2840   }
2841 
2842   /* Giving a first try with index(es) */
2843   if (config.maps_index && pretag_index_have_one(t)) {
2844     struct id_entry *index_results[ID_TABLE_INDEX_RESULTS];
2845     u_int32_t iterator, num_results;
2846 
2847     num_results = pretag_index_lookup(t, pptrs, index_results, ID_TABLE_INDEX_RESULTS);
2848 
2849     for (iterator = 0; index_results[iterator] && iterator < num_results; iterator++) {
2850       ret = pretag_entry_process(index_results[iterator], pptrs, tag, tag2);
2851       if (!(ret & PRETAG_MAP_RCODE_JEQ)) goto exit_lane;
2852     }
2853 
2854     /* if we have at least one index we trust we did a good job */
2855     goto exit_lane;
2856   }
2857 
2858   if (sa->sa_family == AF_INET) {
2859     begin = 0;
2860     end = t->ipv4_num;
2861   }
2862   else if (sa->sa_family == AF_INET6) {
2863     begin = t->num-t->ipv6_num;
2864     end = t->num;
2865   }
2866 
2867   for (x = begin; x < end; x++) {
2868     if (host_addr_mask_sa_cmp(&t->e[x].key.agent_ip.a, &t->e[x].key.agent_mask, sa) == 0) {
2869       ret = pretag_entry_process(&t->e[x], pptrs, tag, tag2);
2870 
2871       if (!ret || ret > TRUE) {
2872         if (ret & PRETAG_MAP_RCODE_JEQ) {
2873           x = t->e[x].jeq.ptr->pos;
2874           x--; // yes, it will be automagically incremented by the for() cycle
2875         }
2876         else break;
2877       }
2878     }
2879   }
2880 
2881   exit_lane:
2882   if (entry && entry->exp_sa.sa_family) pptrs->f_agent = saved_f_agent;
2883 
2884   return ret;
2885 }
2886 
nfv5_check_status(struct packet_ptrs * pptrs)2887 struct xflow_status_entry *nfv5_check_status(struct packet_ptrs *pptrs)
2888 {
2889   struct struct_header_v5 *hdr = (struct struct_header_v5 *) pptrs->f_header;
2890   struct sockaddr *sa = (struct sockaddr *) pptrs->f_agent;
2891   u_int32_t aux1 = (hdr->engine_id << 8 | hdr->engine_type);
2892   int hash = hash_status_table(aux1, sa, XFLOW_STATUS_TABLE_SZ);
2893   struct xflow_status_entry *entry = NULL;
2894 
2895   if (hash >= 0) {
2896     entry = search_status_table(sa, aux1, 0, hash, XFLOW_STATUS_TABLE_MAX_ENTRIES);
2897     if (entry) {
2898       update_status_table(entry, ntohl(hdr->flow_sequence), pptrs->f_len);
2899       entry->inc = ntohs(hdr->count);
2900     }
2901   }
2902 
2903   return entry;
2904 }
2905 
nfv9_check_status(struct packet_ptrs * pptrs,u_int32_t sid,u_int32_t flags,u_int32_t seq,u_int8_t update)2906 struct xflow_status_entry *nfv9_check_status(struct packet_ptrs *pptrs, u_int32_t sid, u_int32_t flags, u_int32_t seq, u_int8_t update)
2907 {
2908   struct sockaddr *sa = (struct sockaddr *) pptrs->f_agent;
2909   int hash = hash_status_table(sid, sa, XFLOW_STATUS_TABLE_SZ);
2910   struct xflow_status_entry *entry = NULL;
2911 
2912   if (hash >= 0) {
2913     entry = search_status_table(sa, sid, flags, hash, XFLOW_STATUS_TABLE_MAX_ENTRIES);
2914     if (entry && update) {
2915       update_status_table(entry, seq, pptrs->f_len);
2916       entry->inc = 1;
2917     }
2918   }
2919 
2920   return entry;
2921 }
2922 
NF_evaluate_classifiers(struct xflow_status_entry_class * entry,pm_class_t * class_id,struct xflow_status_entry * gentry)2923 pm_class_t NF_evaluate_classifiers(struct xflow_status_entry_class *entry, pm_class_t *class_id, struct xflow_status_entry *gentry)
2924 {
2925   struct xflow_status_entry_class *centry;
2926 
2927   /* Try #1: let's see if we have a matching class for the given SourceId/ObservedDomainId */
2928   centry = search_class_id_status_table(entry, *class_id);
2929   if (centry) {
2930     return centry->class_int_id;
2931   }
2932 
2933   /* Try #2: let's chance if we have a global option */
2934   if (gentry) {
2935     centry = search_class_id_status_table(gentry->class, *class_id);
2936     if (centry) {
2937       return centry->class_int_id;
2938     }
2939   }
2940 
2941   return 0;
2942 }
2943 
nfv9_datalink_frame_section_handler(struct packet_ptrs * pptrs)2944 void nfv9_datalink_frame_section_handler(struct packet_ptrs *pptrs)
2945 {
2946   struct template_cache_entry *tpl = (struct template_cache_entry *) pptrs->f_tpl;
2947   struct utpl_field *utpl = NULL;
2948   u_int16_t frame_type = NF9_DL_F_TYPE_UNKNOWN, t16;
2949 
2950   /* cleanups */
2951   reset_index_pkt_ptrs(pptrs);
2952   pptrs->packet_ptr = pptrs->mac_ptr = pptrs->vlan_ptr = pptrs->mpls_ptr = NULL;
2953   pptrs->iph_ptr = pptrs->tlh_ptr = pptrs->payload_ptr = NULL;
2954   pptrs->l3_proto = pptrs->l4_proto = FALSE;
2955 #if defined (WITH_NDPI)
2956   memset(&pptrs->ndpi_class, 0, sizeof(pm_class2_t));
2957 #endif
2958 
2959   if ((utpl = (*get_ext_db_ie_by_type)(tpl, 0, NF9_DATALINK_FRAME_TYPE, FALSE))) {
2960     memcpy(&t16, pptrs->f_data+utpl->off, MIN(utpl->len, 2));
2961     frame_type = ntohs(t16);
2962   }
2963   /* XXX: in case of no NF9_DATALINK_FRAME_TYPE, let's assume Ethernet */
2964   else frame_type = NF9_DL_F_TYPE_ETHERNET;
2965 
2966   if (tpl->tpl[NF9_DATALINK_FRAME_SECTION].len) {
2967     pptrs->pkthdr->caplen = tpl->tpl[NF9_DATALINK_FRAME_SECTION].len;
2968     pptrs->packet_ptr = (u_char *) pptrs->f_data+tpl->tpl[NF9_DATALINK_FRAME_SECTION].off;
2969 
2970     if (frame_type == NF9_DL_F_TYPE_ETHERNET) {
2971       eth_handler(pptrs->pkthdr, pptrs);
2972       if (pptrs->iph_ptr) {
2973 	if ((*pptrs->l3_handler)(pptrs)) {
2974 #if defined (WITH_NDPI)
2975 	  if (config.classifier_ndpi && pm_ndpi_wfl) {
2976 	    pptrs->ndpi_class = pm_ndpi_workflow_process_packet(pm_ndpi_wfl, pptrs);
2977 	  }
2978 #endif
2979 	  set_index_pkt_ptrs(pptrs);
2980 	}
2981       }
2982     }
2983   }
2984 }
2985 
2986 #ifdef WITH_KAFKA
NF_init_kafka_host(void * kh)2987 void NF_init_kafka_host(void *kh)
2988 {
2989   struct p_kafka_host *kafka_host = kh;
2990 
2991   p_kafka_init_host(kafka_host, config.nfacctd_kafka_config_file);
2992   p_kafka_connect_to_consume(kafka_host);
2993   p_kafka_set_broker(kafka_host, config.nfacctd_kafka_broker_host, config.nfacctd_kafka_broker_port);
2994   p_kafka_set_topic(kafka_host, config.nfacctd_kafka_topic);
2995   p_kafka_set_content_type(kafka_host, PM_KAFKA_CNT_TYPE_BIN);
2996   p_kafka_manage_consumer(kafka_host, TRUE);
2997 }
2998 #endif
2999 
3000 #ifdef WITH_ZMQ
NF_init_zmq_host(void * zh,int * pipe_fd)3001 void NF_init_zmq_host(void *zh, int *pipe_fd)
3002 {
3003   struct p_zmq_host *zmq_host = zh;
3004   char log_id[SHORTBUFLEN];
3005 
3006   p_zmq_init_pull(zmq_host);
3007 
3008   snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
3009   p_zmq_set_log_id(zmq_host, log_id);
3010 
3011   p_zmq_set_address(zmq_host, config.nfacctd_zmq_address);
3012   p_zmq_set_hwm(zmq_host, PM_ZMQ_DEFAULT_FLOW_HWM);
3013   p_zmq_pull_setup(zmq_host);
3014   p_zmq_set_retry_timeout(zmq_host, PM_ZMQ_DEFAULT_RETRY);
3015 
3016   if (pipe_fd) (*pipe_fd) = p_zmq_get_fd(zmq_host);
3017 }
3018 #endif
3019