1 #include <arpa/inet.h>
2 #include <errno.h>
3 #include <netinet/in.h>
4 #include <ndpi_api.h>
5 #include <ndpi_main.h>
6 #include <ndpi_typedefs.h>
7 #include <pcap/pcap.h>
8 #include <pthread.h>
9 #include <signal.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <unistd.h>
14 
15 #define MAX_FLOW_ROOTS_PER_THREAD 2048
16 #define MAX_IDLE_FLOWS_PER_THREAD 64
17 #define TICK_RESOLUTION 1000
18 #define MAX_READER_THREADS 4
19 #define IDLE_SCAN_PERIOD 10000 /* msec */
20 #define MAX_IDLE_TIME 300000 /* msec */
21 #define INITIAL_THREAD_HASH 0x03dd018b
22 
23 #ifndef ETH_P_IP
24 #define ETH_P_IP 0x0800
25 #endif
26 
27 #ifndef ETH_P_IPV6
28 #define ETH_P_IPV6 0x86DD
29 #endif
30 
31 #ifndef ETH_P_ARP
32 #define ETH_P_ARP  0x0806
33 #endif
34 
35 enum nDPI_l3_type {
36   L3_IP, L3_IP6
37 };
38 
39 struct nDPI_flow_info {
40   uint32_t flow_id;
41   unsigned long long int packets_processed;
42   uint64_t first_seen;
43   uint64_t last_seen;
44   uint64_t hashval;
45 
46   enum nDPI_l3_type l3_type;
47   union {
48     struct {
49       uint32_t src;
50       uint32_t dst;
51     } v4;
52     struct {
53       uint64_t src[2];
54       uint64_t dst[2];
55     } v6;
56   } ip_tuple;
57 
58   unsigned long long int total_l4_data_len;
59   uint16_t src_port;
60   uint16_t dst_port;
61 
62   uint8_t is_midstream_flow:1;
63   uint8_t flow_fin_ack_seen:1;
64   uint8_t flow_ack_seen:1;
65   uint8_t detection_completed:1;
66   uint8_t tls_client_hello_seen:1;
67   uint8_t tls_server_hello_seen:1;
68   uint8_t flow_info_printed:1;
69   uint8_t reserved_00:1;
70   uint8_t l4_protocol;
71 
72   struct ndpi_proto detected_l7_protocol;
73   struct ndpi_proto guessed_protocol;
74 
75   struct ndpi_flow_struct * ndpi_flow;
76   struct ndpi_id_struct * ndpi_src;
77   struct ndpi_id_struct * ndpi_dst;
78 };
79 
80 struct nDPI_workflow {
81   pcap_t * pcap_handle;
82 
83   uint8_t error_or_eof:1;
84   uint8_t reserved_00:7;
85   uint8_t reserved_01[3];
86 
87   unsigned long long int packets_captured;
88   unsigned long long int packets_processed;
89   unsigned long long int total_l4_data_len;
90   unsigned long long int detected_flow_protocols;
91 
92   uint64_t last_idle_scan_time;
93   uint64_t last_time;
94 
95   void ** ndpi_flows_active;
96   unsigned long long int max_active_flows;
97   unsigned long long int cur_active_flows;
98   unsigned long long int total_active_flows;
99 
100   void ** ndpi_flows_idle;
101   unsigned long long int max_idle_flows;
102   unsigned long long int cur_idle_flows;
103   unsigned long long int total_idle_flows;
104 
105   struct ndpi_detection_module_struct * ndpi_struct;
106 };
107 
108 struct nDPI_reader_thread {
109   struct nDPI_workflow * workflow;
110   pthread_t thread_id;
111   int array_index;
112 };
113 
114 static struct nDPI_reader_thread reader_threads[MAX_READER_THREADS] = {};
115 static int reader_thread_count = MAX_READER_THREADS;
116 static int main_thread_shutdown = 0;
117 static uint32_t flow_id = 0;
118 
119 static void free_workflow(struct nDPI_workflow ** const workflow);
120 
init_workflow(char const * const file_or_device)121 static struct nDPI_workflow * init_workflow(char const * const file_or_device)
122 {
123   char pcap_error_buffer[PCAP_ERRBUF_SIZE];
124   struct nDPI_workflow * workflow = (struct nDPI_workflow *)ndpi_calloc(1, sizeof(*workflow));
125 
126   if (workflow == NULL) {
127     return NULL;
128   }
129 
130   if (access(file_or_device, R_OK) != 0 && errno == ENOENT) {
131     workflow->pcap_handle = pcap_open_live(file_or_device, /* 1536 */ 65535, 1, 250, pcap_error_buffer);
132   } else {
133     workflow->pcap_handle = pcap_open_offline_with_tstamp_precision(file_or_device, PCAP_TSTAMP_PRECISION_MICRO,
134 								    pcap_error_buffer);
135   }
136 
137   if (workflow->pcap_handle == NULL) {
138     fprintf(stderr, "pcap_open_live / pcap_open_offline_with_tstamp_precision: %.*s\n",
139 	    (int) PCAP_ERRBUF_SIZE, pcap_error_buffer);
140     free_workflow(&workflow);
141     return NULL;
142   }
143 
144   ndpi_init_prefs init_prefs = ndpi_no_prefs;
145   workflow->ndpi_struct = ndpi_init_detection_module(init_prefs);
146   if (workflow->ndpi_struct == NULL) {
147     free_workflow(&workflow);
148     return NULL;
149   }
150 
151   workflow->total_active_flows = 0;
152   workflow->max_active_flows = MAX_FLOW_ROOTS_PER_THREAD;
153   workflow->ndpi_flows_active = (void **)ndpi_calloc(workflow->max_active_flows, sizeof(void *));
154   if (workflow->ndpi_flows_active == NULL) {
155     free_workflow(&workflow);
156     return NULL;
157   }
158 
159   workflow->total_idle_flows = 0;
160   workflow->max_idle_flows = MAX_IDLE_FLOWS_PER_THREAD;
161   workflow->ndpi_flows_idle = (void **)ndpi_calloc(workflow->max_idle_flows, sizeof(void *));
162   if (workflow->ndpi_flows_idle == NULL) {
163     free_workflow(&workflow);
164     return NULL;
165   }
166 
167   NDPI_PROTOCOL_BITMASK protos;
168   NDPI_BITMASK_SET_ALL(protos);
169   ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos);
170   ndpi_finalize_initialization(workflow->ndpi_struct);
171 
172   return workflow;
173 }
174 
ndpi_flow_info_freer(void * const node)175 static void ndpi_flow_info_freer(void * const node)
176 {
177   struct nDPI_flow_info * const flow = (struct nDPI_flow_info *)node;
178 
179   ndpi_free(flow->ndpi_dst);
180   ndpi_free(flow->ndpi_src);
181   ndpi_flow_free(flow->ndpi_flow);
182   ndpi_free(flow);
183 }
184 
free_workflow(struct nDPI_workflow ** const workflow)185 static void free_workflow(struct nDPI_workflow ** const workflow)
186 {
187   struct nDPI_workflow * const w = *workflow;
188 
189   if (w == NULL) {
190     return;
191   }
192 
193   if (w->pcap_handle != NULL) {
194     pcap_close(w->pcap_handle);
195     w->pcap_handle = NULL;
196   }
197 
198   if (w->ndpi_struct != NULL) {
199     ndpi_exit_detection_module(w->ndpi_struct);
200   }
201   for(size_t i = 0; i < w->max_active_flows; i++) {
202     ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_freer);
203   }
204   ndpi_free(w->ndpi_flows_active);
205   ndpi_free(w->ndpi_flows_idle);
206   ndpi_free(w);
207   *workflow = NULL;
208 }
209 
get_default_pcapdev(char * errbuf)210 static char * get_default_pcapdev(char *errbuf)
211 {
212   char * ifname;
213   pcap_if_t * all_devices = NULL;
214 
215   if (pcap_findalldevs(&all_devices, errbuf) != 0)
216     {
217       return NULL;
218     }
219 
220   ifname = strdup(all_devices[0].name);
221   pcap_freealldevs(all_devices);
222 
223   return ifname;
224 }
225 
setup_reader_threads(char const * const file_or_device)226 static int setup_reader_threads(char const * const file_or_device)
227 {
228   char * file_or_default_device;
229   char pcap_error_buffer[PCAP_ERRBUF_SIZE];
230 
231   if (reader_thread_count > MAX_READER_THREADS) {
232     return 1;
233   }
234 
235   if (file_or_device == NULL) {
236     file_or_default_device = get_default_pcapdev(pcap_error_buffer);
237     if (file_or_default_device == NULL) {
238       fprintf(stderr, "pcap_findalldevs: %.*s\n", (int) PCAP_ERRBUF_SIZE, pcap_error_buffer);
239       return 1;
240     }
241   } else {
242     file_or_default_device = strdup(file_or_device);
243     if (file_or_default_device == NULL) {
244       return 1;
245     }
246   }
247 
248   for (int i = 0; i < reader_thread_count; ++i) {
249     reader_threads[i].workflow = init_workflow(file_or_default_device);
250     if (reader_threads[i].workflow == NULL)
251       {
252 	free(file_or_default_device);
253 	return 1;
254       }
255   }
256 
257   free(file_or_default_device);
258   return 0;
259 }
260 
ip_tuple_to_string(struct nDPI_flow_info const * const flow,char * const src_addr_str,size_t src_addr_len,char * const dst_addr_str,size_t dst_addr_len)261 static int ip_tuple_to_string(struct nDPI_flow_info const * const flow,
262                               char * const src_addr_str, size_t src_addr_len,
263                               char * const dst_addr_str, size_t dst_addr_len)
264 {
265   switch (flow->l3_type) {
266   case L3_IP:
267     return inet_ntop(AF_INET, (struct sockaddr_in *)&flow->ip_tuple.v4.src,
268 		     src_addr_str, src_addr_len) != NULL &&
269       inet_ntop(AF_INET, (struct sockaddr_in *)&flow->ip_tuple.v4.dst,
270 		dst_addr_str, dst_addr_len) != NULL;
271   case L3_IP6:
272     return inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.src[0],
273 		     src_addr_str, src_addr_len) != NULL &&
274       inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.dst[0],
275 		dst_addr_str, dst_addr_len) != NULL;
276   }
277 
278   return 0;
279 }
280 
281 #ifdef VERBOSE
print_packet_info(struct nDPI_reader_thread const * const reader_thread,struct pcap_pkthdr const * const header,uint32_t l4_data_len,struct nDPI_flow_info const * const flow)282 static void print_packet_info(struct nDPI_reader_thread const * const reader_thread,
283                               struct pcap_pkthdr const * const header,
284                               uint32_t l4_data_len,
285                               struct nDPI_flow_info const * const flow)
286 {
287   struct nDPI_workflow const * const workflow = reader_thread->workflow;
288   char src_addr_str[INET6_ADDRSTRLEN+1] = {0};
289   char dst_addr_str[INET6_ADDRSTRLEN+1] = {0};
290   char buf[256];
291   int used = 0, ret;
292 
293   ret = snprintf(buf, sizeof(buf), "[%8llu, %d, %4u] %4u bytes: ",
294 		 workflow->packets_captured, reader_thread->array_index,
295 		 flow->flow_id, header->caplen);
296   if (ret > 0) {
297     used += ret;
298   }
299 
300   if (ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str)) != 0) {
301     ret = snprintf(buf + used, sizeof(buf) - used, "IP[%s -> %s]", src_addr_str, dst_addr_str);
302   } else {
303     ret = snprintf(buf + used, sizeof(buf) - used, "IP[ERROR]");
304   }
305   if (ret > 0) {
306     used += ret;
307   }
308 
309   switch (flow->l4_protocol) {
310   case IPPROTO_UDP:
311     ret = snprintf(buf + used, sizeof(buf) - used, " -> UDP[%u -> %u, %u bytes]",
312 		   flow->src_port, flow->dst_port, l4_data_len);
313     break;
314   case IPPROTO_TCP:
315     ret = snprintf(buf + used, sizeof(buf) - used, " -> TCP[%u -> %u, %u bytes]",
316 		   flow->src_port, flow->dst_port, l4_data_len);
317     break;
318   case IPPROTO_ICMP:
319     ret = snprintf(buf + used, sizeof(buf) - used, " -> ICMP");
320     break;
321   case IPPROTO_ICMPV6:
322     ret = snprintf(buf + used, sizeof(buf) - used, " -> ICMP6");
323     break;
324   case IPPROTO_HOPOPTS:
325     ret = snprintf(buf + used, sizeof(buf) - used, " -> ICMP6 Hop-By-Hop");
326     break;
327   default:
328     ret = snprintf(buf + used, sizeof(buf) - used, " -> Unknown[0x%X]", flow->l4_protocol);
329     break;
330   }
331   if (ret > 0) {
332     used += ret;
333   }
334 
335   printf("%.*s\n", used, buf);
336 }
337 #endif
338 
ip_tuples_equal(struct nDPI_flow_info const * const A,struct nDPI_flow_info const * const B)339 static int ip_tuples_equal(struct nDPI_flow_info const * const A,
340                            struct nDPI_flow_info const * const B)
341 {
342   if (A->l3_type == L3_IP && B->l3_type == L3_IP6) {
343     return A->ip_tuple.v4.src == B->ip_tuple.v4.src &&
344       A->ip_tuple.v4.dst == B->ip_tuple.v4.dst;
345   } else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6) {
346     return A->ip_tuple.v6.src[0] == B->ip_tuple.v6.src[0] &&
347       A->ip_tuple.v6.src[1] == B->ip_tuple.v6.src[1] &&
348       A->ip_tuple.v6.dst[0] == B->ip_tuple.v6.dst[0] &&
349       A->ip_tuple.v6.dst[1] == B->ip_tuple.v6.dst[1];
350   }
351   return 0;
352 }
353 
ip_tuples_compare(struct nDPI_flow_info const * const A,struct nDPI_flow_info const * const B)354 static int ip_tuples_compare(struct nDPI_flow_info const * const A,
355                              struct nDPI_flow_info const * const B)
356 {
357   if (A->l3_type == L3_IP && B->l3_type == L3_IP6) {
358     if (A->ip_tuple.v4.src < B->ip_tuple.v4.src ||
359 	A->ip_tuple.v4.dst < B->ip_tuple.v4.dst)
360       {
361 	return -1;
362       }
363     if (A->ip_tuple.v4.src > B->ip_tuple.v4.src ||
364 	A->ip_tuple.v4.dst > B->ip_tuple.v4.dst)
365       {
366 	return 1;
367       }
368   } else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6) {
369     if ((A->ip_tuple.v6.src[0] < B->ip_tuple.v6.src[0] &&
370 	 A->ip_tuple.v6.src[1] < B->ip_tuple.v6.src[1]) ||
371 	(A->ip_tuple.v6.dst[0] < B->ip_tuple.v6.dst[0] &&
372 	 A->ip_tuple.v6.dst[1] < B->ip_tuple.v6.dst[1]))
373       {
374 	return -1;
375       }
376     if ((A->ip_tuple.v6.src[0] > B->ip_tuple.v6.src[0] &&
377 	 A->ip_tuple.v6.src[1] > B->ip_tuple.v6.src[1]) ||
378 	(A->ip_tuple.v6.dst[0] > B->ip_tuple.v6.dst[0] &&
379 	 A->ip_tuple.v6.dst[1] > B->ip_tuple.v6.dst[1]))
380       {
381 	return 1;
382       }
383   }
384   if (A->src_port < B->src_port ||
385       A->dst_port < B->dst_port)
386     {
387       return -1;
388     } else if (A->src_port > B->src_port ||
389                A->dst_port > B->dst_port)
390     {
391       return 1;
392     }
393   return 0;
394 }
395 
ndpi_idle_scan_walker(void const * const A,ndpi_VISIT which,int depth,void * const user_data)396 static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
397 {
398   struct nDPI_workflow * const workflow = (struct nDPI_workflow *)user_data;
399   struct nDPI_flow_info * const flow = *(struct nDPI_flow_info **)A;
400 
401   (void)depth;
402 
403   if (workflow == NULL || flow == NULL) {
404     return;
405   }
406 
407   if (workflow->cur_idle_flows == MAX_IDLE_FLOWS_PER_THREAD) {
408     return;
409   }
410 
411   if (which == ndpi_preorder || which == ndpi_leaf) {
412     if ((flow->flow_fin_ack_seen == 1 && flow->flow_ack_seen == 1) ||
413 	flow->last_seen + MAX_IDLE_TIME < workflow->last_time)
414       {
415 	char src_addr_str[INET6_ADDRSTRLEN+1];
416 	char dst_addr_str[INET6_ADDRSTRLEN+1];
417 	ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str));
418 	workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow;
419 	workflow->total_idle_flows++;
420       }
421   }
422 }
423 
ndpi_workflow_node_cmp(void const * const A,void const * const B)424 static int ndpi_workflow_node_cmp(void const * const A, void const * const B) {
425   struct nDPI_flow_info const * const flow_info_a = (struct nDPI_flow_info *)A;
426   struct nDPI_flow_info const * const flow_info_b = (struct nDPI_flow_info *)B;
427 
428   if (flow_info_a->hashval < flow_info_b->hashval) {
429     return(-1);
430   } else if (flow_info_a->hashval > flow_info_b->hashval) {
431     return(1);
432   }
433 
434   /* Flows have the same hash */
435   if (flow_info_a->l4_protocol < flow_info_b->l4_protocol) {
436     return(-1);
437   } else if (flow_info_a->l4_protocol > flow_info_b->l4_protocol) {
438     return(1);
439   }
440 
441   if (ip_tuples_equal(flow_info_a, flow_info_b) != 0 &&
442       flow_info_a->src_port == flow_info_b->src_port &&
443       flow_info_a->dst_port == flow_info_b->dst_port)
444     {
445       return(0);
446     }
447 
448   return ip_tuples_compare(flow_info_a, flow_info_b);
449 }
450 
check_for_idle_flows(struct nDPI_workflow * const workflow)451 static void check_for_idle_flows(struct nDPI_workflow * const workflow)
452 {
453   if (workflow->last_idle_scan_time + IDLE_SCAN_PERIOD < workflow->last_time) {
454     for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) {
455       ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
456 
457       while (workflow->cur_idle_flows > 0) {
458 	struct nDPI_flow_info * const f =
459 	  (struct nDPI_flow_info *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows];
460 	if (f->flow_fin_ack_seen == 1) {
461 	  printf("Free fin flow with id %u\n", f->flow_id);
462 	} else {
463 	  printf("Free idle flow with id %u\n", f->flow_id);
464 	}
465 	ndpi_tdelete(f, &workflow->ndpi_flows_active[idle_scan_index],
466 		     ndpi_workflow_node_cmp);
467 	ndpi_flow_info_freer(f);
468 	workflow->cur_active_flows--;
469       }
470     }
471 
472     workflow->last_idle_scan_time = workflow->last_time;
473   }
474 }
475 
ndpi_process_packet(uint8_t * const args,struct pcap_pkthdr const * const header,uint8_t const * const packet)476 static void ndpi_process_packet(uint8_t * const args,
477                                 struct pcap_pkthdr const * const header,
478                                 uint8_t const * const packet)
479 {
480   struct nDPI_reader_thread * const reader_thread =
481     (struct nDPI_reader_thread *)args;
482   struct nDPI_workflow * workflow;
483   struct nDPI_flow_info flow = {};
484 
485   size_t hashed_index;
486   void * tree_result;
487   struct nDPI_flow_info * flow_to_process;
488 
489   int direction_changed = 0;
490   struct ndpi_id_struct * ndpi_src;
491   struct ndpi_id_struct * ndpi_dst;
492 
493   const struct ndpi_ethhdr * ethernet;
494   const struct ndpi_iphdr * ip;
495   struct ndpi_ipv6hdr * ip6;
496 
497   uint64_t time_ms;
498   const uint16_t eth_offset = 0;
499   uint16_t ip_offset;
500   uint16_t ip_size;
501 
502   const uint8_t * l4_ptr = NULL;
503   uint16_t l4_len = 0;
504 
505   uint16_t type;
506   int thread_index = INITIAL_THREAD_HASH; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
507 
508   if (reader_thread == NULL) {
509     return;
510   }
511   workflow = reader_thread->workflow;
512 
513   if (workflow == NULL) {
514     return;
515   }
516 
517   workflow->packets_captured++;
518   time_ms = ((uint64_t) header->ts.tv_sec) * TICK_RESOLUTION + header->ts.tv_usec / (1000000 / TICK_RESOLUTION);
519   workflow->last_time = time_ms;
520 
521   check_for_idle_flows(workflow);
522 
523   /* process datalink layer */
524   switch (pcap_datalink(workflow->pcap_handle)) {
525   case DLT_NULL:
526     if (ntohl(*((uint32_t *)&packet[eth_offset])) == 0x00000002) {
527       type = ETH_P_IP;
528     } else {
529       type = ETH_P_IPV6;
530     }
531     ip_offset = 4 + eth_offset;
532     break;
533   case DLT_EN10MB:
534     if (header->len < sizeof(struct ndpi_ethhdr)) {
535       fprintf(stderr, "[%8llu, %d] Ethernet packet too short - skipping\n",
536 	      workflow->packets_captured, reader_thread->array_index);
537       return;
538     }
539     ethernet = (struct ndpi_ethhdr *) &packet[eth_offset];
540     ip_offset = sizeof(struct ndpi_ethhdr) + eth_offset;
541     type = ntohs(ethernet->h_proto);
542     switch (type) {
543     case ETH_P_IP: /* IPv4 */
544       if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)) {
545 	fprintf(stderr, "[%8llu, %d] IP packet too short - skipping\n",
546 		workflow->packets_captured, reader_thread->array_index);
547 	return;
548       }
549       break;
550     case ETH_P_IPV6: /* IPV6 */
551       if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)) {
552 	fprintf(stderr, "[%8llu, %d] IP6 packet too short - skipping\n",
553 		workflow->packets_captured, reader_thread->array_index);
554 	return;
555       }
556       break;
557     case ETH_P_ARP: /* ARP */
558       return;
559     default:
560       fprintf(stderr, "[%8llu, %d] Unknown Ethernet packet with type 0x%X - skipping\n",
561 	      workflow->packets_captured, reader_thread->array_index, type);
562       return;
563     }
564     break;
565   default:
566     fprintf(stderr, "[%8llu, %d] Captured non IP/Ethernet packet with datalink type 0x%X - skipping\n",
567 	    workflow->packets_captured, reader_thread->array_index, pcap_datalink(workflow->pcap_handle));
568     return;
569   }
570 
571   if (type == ETH_P_IP) {
572     ip = (struct ndpi_iphdr *)&packet[ip_offset];
573     ip6 = NULL;
574   } else if (type == ETH_P_IPV6) {
575     ip = NULL;
576     ip6 = (struct ndpi_ipv6hdr *)&packet[ip_offset];
577   } else {
578     fprintf(stderr, "[%8llu, %d] Captured non IPv4/IPv6 packet with type 0x%X - skipping\n",
579 	    workflow->packets_captured, reader_thread->array_index, type);
580     return;
581   }
582   ip_size = header->len - ip_offset;
583 
584   if (type == ETH_P_IP && header->len >= ip_offset) {
585     if (header->caplen < header->len) {
586       fprintf(stderr, "[%8llu, %d] Captured packet size is smaller than packet size: %u < %u\n",
587 	      workflow->packets_captured, reader_thread->array_index, header->caplen, header->len);
588     }
589   }
590 
591   /* process layer3 e.g. IPv4 / IPv6 */
592   if (ip != NULL && ip->version == 4) {
593     if (ip_size < sizeof(*ip)) {
594       fprintf(stderr, "[%8llu, %d] Packet smaller than IP4 header length: %u < %zu\n",
595 	      workflow->packets_captured, reader_thread->array_index, ip_size, sizeof(*ip));
596       return;
597     }
598 
599     flow.l3_type = L3_IP;
600     if (ndpi_detection_get_l4((uint8_t*)ip, ip_size, &l4_ptr, &l4_len,
601 			      &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0)
602       {
603 	fprintf(stderr, "[%8llu, %d] nDPI IPv4/L4 payload detection failed, L4 length: %zu\n",
604 		workflow->packets_captured, reader_thread->array_index, ip_size - sizeof(*ip));
605 	return;
606       }
607 
608     flow.ip_tuple.v4.src = ip->saddr;
609     flow.ip_tuple.v4.dst = ip->daddr;
610     uint32_t min_addr = (flow.ip_tuple.v4.src > flow.ip_tuple.v4.dst ?
611 			 flow.ip_tuple.v4.dst : flow.ip_tuple.v4.src);
612     thread_index = min_addr + ip->protocol;
613   } else if (ip6 != NULL) {
614     if (ip_size < sizeof(ip6->ip6_hdr)) {
615       fprintf(stderr, "[%8llu, %d] Packet smaller than IP6 header length: %u < %zu\n",
616 	      workflow->packets_captured, reader_thread->array_index, ip_size, sizeof(ip6->ip6_hdr));
617       return;
618     }
619 
620     flow.l3_type = L3_IP6;
621     if (ndpi_detection_get_l4((uint8_t*)ip6, ip_size, &l4_ptr, &l4_len,
622 			      &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0)
623       {
624 	fprintf(stderr, "[%8llu, %d] nDPI IPv6/L4 payload detection failed, L4 length: %zu\n",
625 		workflow->packets_captured, reader_thread->array_index, ip_size - sizeof(*ip6));
626 	return;
627       }
628 
629     flow.ip_tuple.v6.src[0] = ip6->ip6_src.u6_addr.u6_addr64[0];
630     flow.ip_tuple.v6.src[1] = ip6->ip6_src.u6_addr.u6_addr64[1];
631     flow.ip_tuple.v6.dst[0] = ip6->ip6_dst.u6_addr.u6_addr64[0];
632     flow.ip_tuple.v6.dst[1] = ip6->ip6_dst.u6_addr.u6_addr64[1];
633     uint64_t min_addr[2];
634     if (flow.ip_tuple.v6.src[0] > flow.ip_tuple.v6.dst[0] &&
635 	flow.ip_tuple.v6.src[1] > flow.ip_tuple.v6.dst[1])
636       {
637 	min_addr[0] = flow.ip_tuple.v6.dst[0];
638 	min_addr[1] = flow.ip_tuple.v6.dst[0];
639       } else {
640       min_addr[0] = flow.ip_tuple.v6.src[0];
641       min_addr[1] = flow.ip_tuple.v6.src[0];
642     }
643     thread_index = min_addr[0] + min_addr[1] + ip6->ip6_hdr.ip6_un1_nxt;
644   } else {
645     fprintf(stderr, "[%8llu, %d] Non IP/IPv6 protocol detected: 0x%X\n",
646 	    workflow->packets_captured, reader_thread->array_index, type);
647     return;
648   }
649 
650   /* process layer4 e.g. TCP / UDP */
651   if (flow.l4_protocol == IPPROTO_TCP) {
652     const struct ndpi_tcphdr * tcp;
653 
654     if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) {
655       fprintf(stderr, "[%8llu, %d] Malformed TCP packet, packet size smaller than expected: %u < %zu\n",
656 	      workflow->packets_captured, reader_thread->array_index,
657 	      header->len, (l4_ptr - packet) + sizeof(struct ndpi_tcphdr));
658       return;
659     }
660     tcp = (struct ndpi_tcphdr *)l4_ptr;
661     flow.is_midstream_flow = (tcp->syn == 0 ? 1 : 0);
662     flow.flow_fin_ack_seen = (tcp->fin == 1 && tcp->ack == 1 ? 1 : 0);
663     flow.flow_ack_seen = tcp->ack;
664     flow.src_port = ntohs(tcp->source);
665     flow.dst_port = ntohs(tcp->dest);
666   } else if (flow.l4_protocol == IPPROTO_UDP) {
667     const struct ndpi_udphdr * udp;
668 
669     if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_udphdr)) {
670       fprintf(stderr, "[%8llu, %d] Malformed UDP packet, packet size smaller than expected: %u < %zu\n",
671 	      workflow->packets_captured, reader_thread->array_index,
672 	      header->len, (l4_ptr - packet) + sizeof(struct ndpi_udphdr));
673       return;
674     }
675     udp = (struct ndpi_udphdr *)l4_ptr;
676     flow.src_port = ntohs(udp->source);
677     flow.dst_port = ntohs(udp->dest);
678   }
679 
680   /* distribute flows to threads while keeping stability (same flow goes always to same thread) */
681   thread_index += (flow.src_port < flow.dst_port ? flow.dst_port : flow.src_port);
682   thread_index %= reader_thread_count;
683   if (thread_index != reader_thread->array_index) {
684     return;
685   }
686   workflow->packets_processed++;
687   workflow->total_l4_data_len += l4_len;
688 
689 #ifdef VERBOSE
690   print_packet_info(reader_thread, header, l4_data_len, &flow);
691 #endif
692 
693   /* calculate flow hash for btree find, search(insert) */
694   if (flow.l3_type == L3_IP) {
695     if (ndpi_flowv4_flow_hash(flow.l4_protocol, flow.ip_tuple.v4.src, flow.ip_tuple.v4.dst,
696 			      flow.src_port, flow.dst_port, 0, 0,
697 			      (uint8_t *)&flow.hashval, sizeof(flow.hashval)) != 0)
698       {
699 	flow.hashval = flow.ip_tuple.v4.src + flow.ip_tuple.v4.dst; // fallback
700       }
701   } else if (flow.l3_type == L3_IP6) {
702     if (ndpi_flowv6_flow_hash(flow.l4_protocol, &ip6->ip6_src, &ip6->ip6_dst,
703 			      flow.src_port, flow.dst_port, 0, 0,
704 			      (uint8_t *)&flow.hashval, sizeof(flow.hashval)) != 0)
705       {
706 	flow.hashval = flow.ip_tuple.v6.src[0] + flow.ip_tuple.v6.src[1];
707 	flow.hashval += flow.ip_tuple.v6.dst[0] + flow.ip_tuple.v6.dst[1];
708       }
709   }
710   flow.hashval += flow.l4_protocol + flow.src_port + flow.dst_port;
711 
712   hashed_index = flow.hashval % workflow->max_active_flows;
713   tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
714   if (tree_result == NULL) {
715     /* flow not found in btree: switch src <-> dst and try to find it again */
716     uint64_t orig_src_ip[2] = { flow.ip_tuple.v6.src[0], flow.ip_tuple.v6.src[1] };
717     uint64_t orig_dst_ip[2] = { flow.ip_tuple.v6.dst[0], flow.ip_tuple.v6.dst[1] };
718     uint16_t orig_src_port = flow.src_port;
719     uint16_t orig_dst_port = flow.dst_port;
720 
721     flow.ip_tuple.v6.src[0] = orig_dst_ip[0];
722     flow.ip_tuple.v6.src[1] = orig_dst_ip[1];
723     flow.ip_tuple.v6.dst[0] = orig_src_ip[0];
724     flow.ip_tuple.v6.dst[1] = orig_src_ip[1];
725     flow.src_port = orig_dst_port;
726     flow.dst_port = orig_src_port;
727 
728     tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
729     if (tree_result != NULL) {
730       direction_changed = 1;
731     }
732 
733     flow.ip_tuple.v6.src[0] = orig_src_ip[0];
734     flow.ip_tuple.v6.src[1] = orig_src_ip[1];
735     flow.ip_tuple.v6.dst[0] = orig_dst_ip[0];
736     flow.ip_tuple.v6.dst[1] = orig_dst_ip[1];
737     flow.src_port = orig_src_port;
738     flow.dst_port = orig_dst_port;
739   }
740 
741   if (tree_result == NULL) {
742     /* flow still not found, must be new */
743     if (workflow->cur_active_flows == workflow->max_active_flows) {
744       fprintf(stderr, "[%8llu, %d] max flows to track reached: %llu, idle: %llu\n",
745 	      workflow->packets_captured, reader_thread->array_index,
746 	      workflow->max_active_flows, workflow->cur_idle_flows);
747       return;
748     }
749 
750     flow_to_process = (struct nDPI_flow_info *)ndpi_malloc(sizeof(*flow_to_process));
751     if (flow_to_process == NULL) {
752       fprintf(stderr, "[%8llu, %d] Not enough memory for flow info\n",
753 	      workflow->packets_captured, reader_thread->array_index);
754       return;
755     }
756 
757     workflow->cur_active_flows++;
758     workflow->total_active_flows++;
759     memcpy(flow_to_process, &flow, sizeof(*flow_to_process));
760     flow_to_process->flow_id = flow_id++;
761 
762     flow_to_process->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT);
763     if (flow_to_process->ndpi_flow == NULL) {
764       fprintf(stderr, "[%8llu, %d, %4u] Not enough memory for flow struct\n",
765 	      workflow->packets_captured, reader_thread->array_index, flow_to_process->flow_id);
766       return;
767     }
768     memset(flow_to_process->ndpi_flow, 0, SIZEOF_FLOW_STRUCT);
769 
770     flow_to_process->ndpi_src = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT);
771     if (flow_to_process->ndpi_src == NULL) {
772       fprintf(stderr, "[%8llu, %d, %4u] Not enough memory for src id struct\n",
773 	      workflow->packets_captured, reader_thread->array_index, flow_to_process->flow_id);
774       return;
775     }
776 
777     flow_to_process->ndpi_dst = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT);
778     if (flow_to_process->ndpi_dst == NULL) {
779       fprintf(stderr, "[%8llu, %d, %4u] Not enough memory for dst id struct\n",
780 	      workflow->packets_captured, reader_thread->array_index, flow_to_process->flow_id);
781       return;
782     }
783 
784     printf("[%8llu, %d, %4u] new %sflow\n", workflow->packets_captured, thread_index,
785 	   flow_to_process->flow_id,
786 	   (flow_to_process->is_midstream_flow != 0 ? "midstream-" : ""));
787     if (ndpi_tsearch(flow_to_process, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL) {
788       /* Possible Leak, but should not happen as we'd abort earlier. */
789       return;
790     }
791 
792     ndpi_src = flow_to_process->ndpi_src;
793     ndpi_dst = flow_to_process->ndpi_dst;
794   } else {
795     flow_to_process = *(struct nDPI_flow_info **)tree_result;
796 
797     if (direction_changed != 0) {
798       ndpi_src = flow_to_process->ndpi_dst;
799       ndpi_dst = flow_to_process->ndpi_src;
800     } else {
801       ndpi_src = flow_to_process->ndpi_src;
802       ndpi_dst = flow_to_process->ndpi_dst;
803     }
804   }
805 
806   flow_to_process->packets_processed++;
807   flow_to_process->total_l4_data_len += l4_len;
808   /* update timestamps, important for timeout handling */
809   if (flow_to_process->first_seen == 0) {
810     flow_to_process->first_seen = time_ms;
811   }
812   flow_to_process->last_seen = time_ms;
813   /* current packet is an TCP-ACK? */
814   flow_to_process->flow_ack_seen = flow.flow_ack_seen;
815 
816   /* TCP-FIN: indicates that at least one side wants to end the connection */
817   if (flow.flow_fin_ack_seen != 0 && flow_to_process->flow_fin_ack_seen == 0) {
818     flow_to_process->flow_fin_ack_seen = 1;
819     printf("[%8llu, %d, %4u] end of flow\n",  workflow->packets_captured, thread_index,
820 	   flow_to_process->flow_id);
821     return;
822   }
823 
824   /*
825    * This example tries to use maximum supported packets for detection:
826    * for uint8: 0xFF
827    */
828   if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFF) {
829     return;
830   } else if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFE) {
831     /* last chance to guess something, better then nothing */
832     uint8_t protocol_was_guessed = 0;
833     flow_to_process->guessed_protocol =
834       ndpi_detection_giveup(workflow->ndpi_struct,
835 			    flow_to_process->ndpi_flow,
836 			    1, &protocol_was_guessed);
837     if (protocol_was_guessed != 0) {
838       printf("[%8llu, %d, %4d][GUESSED] protocol: %s | app protocol: %s | category: %s\n",
839 	     workflow->packets_captured,
840 	     reader_thread->array_index,
841 	     flow_to_process->flow_id,
842 	     ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.master_protocol),
843 	     ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.app_protocol),
844 	     ndpi_category_get_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.category));
845     } else {
846       printf("[%8llu, %d, %4d][FLOW NOT CLASSIFIED]\n",
847 	     workflow->packets_captured, reader_thread->array_index, flow_to_process->flow_id);
848     }
849   }
850 
851   flow_to_process->detected_l7_protocol =
852     ndpi_detection_process_packet(workflow->ndpi_struct, flow_to_process->ndpi_flow,
853 				  ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
854 				  ip_size, time_ms, ndpi_src, ndpi_dst);
855 
856   if (ndpi_is_protocol_detected(workflow->ndpi_struct,
857 				flow_to_process->detected_l7_protocol) != 0 &&
858       flow_to_process->detection_completed == 0)
859     {
860       if (flow_to_process->detected_l7_protocol.master_protocol != NDPI_PROTOCOL_UNKNOWN ||
861           flow_to_process->detected_l7_protocol.app_protocol != NDPI_PROTOCOL_UNKNOWN)
862       {
863         flow_to_process->detection_completed = 1;
864         workflow->detected_flow_protocols++;
865 
866         printf("[%8llu, %d, %4d][DETECTED] protocol: %s | app protocol: %s | category: %s\n",
867 	       workflow->packets_captured,
868 	       reader_thread->array_index,
869 	       flow_to_process->flow_id,
870 	       ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.master_protocol),
871 	       ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.app_protocol),
872 	       ndpi_category_get_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.category));
873       }
874     }
875 
876   if (flow_to_process->ndpi_flow->num_extra_packets_checked <=
877       flow_to_process->ndpi_flow->max_extra_packets_to_check)
878     {
879       /*
880        * Your business logic starts here.
881        *
882        * This example does print some information about
883        * TLS client and server hellos if available.
884        *
885        * You could also use nDPI's built-in json serialization
886        * and send it to a high-level application for further processing.
887        *
888        * EoE - End of Example
889        */
890 
891       if (flow_to_process->flow_info_printed == 0)
892       {
893         char const * const flow_info = ndpi_get_flow_info(flow_to_process->ndpi_flow, &flow_to_process->detected_l7_protocol);
894         if (flow_info != NULL)
895         {
896           printf("[%8llu, %d, %4d] info: %s\n",
897             workflow->packets_captured,
898             reader_thread->array_index,
899             flow_to_process->flow_id,
900             flow_info);
901           flow_to_process->flow_info_printed = 1;
902         }
903       }
904 
905       if (flow_to_process->detected_l7_protocol.master_protocol == NDPI_PROTOCOL_TLS ||
906 	  flow_to_process->detected_l7_protocol.app_protocol == NDPI_PROTOCOL_TLS)
907         {
908 	  if (flow_to_process->tls_client_hello_seen == 0 &&
909 	      flow_to_process->ndpi_flow->l4.tcp.tls.hello_processed != 0)
910             {
911 	      uint8_t unknown_tls_version = 0;
912 	      printf("[%8llu, %d, %4d][TLS-CLIENT-HELLO] version: %s | sni: %s | alpn: %s\n",
913 		     workflow->packets_captured,
914 		     reader_thread->array_index,
915 		     flow_to_process->flow_id,
916 		     ndpi_ssl_version2str(flow_to_process->ndpi_flow,
917 					  flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.ssl_version,
918 					  &unknown_tls_version),
919 		     flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.client_requested_server_name,
920 		     (flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.alpn != NULL ?
921 		      flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.alpn : "-"));
922 	      flow_to_process->tls_client_hello_seen = 1;
923             }
924 	  if (flow_to_process->tls_server_hello_seen == 0 &&
925 	      flow_to_process->ndpi_flow->l4.tcp.tls.certificate_processed != 0)
926             {
927 	      uint8_t unknown_tls_version = 0;
928 	      printf("[%8llu, %d, %4d][TLS-SERVER-HELLO] version: %s | common-name(s): %.*s | "
929 		     "issuer: %s | subject: %s\n",
930 		     workflow->packets_captured,
931 		     reader_thread->array_index,
932 		     flow_to_process->flow_id,
933 		     ndpi_ssl_version2str(flow_to_process->ndpi_flow,
934 					  flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.ssl_version,
935 					  &unknown_tls_version),
936 		     (flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.server_names_len == 0 ?
937 		      1 : flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.server_names_len),
938 		     (flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.server_names == NULL ?
939 		      "-" : flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.server_names),
940 		     (flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.issuerDN != NULL ?
941 		      flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.issuerDN : "-"),
942 		     (flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.subjectDN != NULL ?
943 		      flow_to_process->ndpi_flow->protos.tls_quic_stun.tls_quic.subjectDN : "-"));
944 	      flow_to_process->tls_server_hello_seen = 1;
945             }
946         }
947     }
948 }
949 
run_pcap_loop(struct nDPI_reader_thread const * const reader_thread)950 static void run_pcap_loop(struct nDPI_reader_thread const * const reader_thread)
951 {
952   if (reader_thread->workflow != NULL &&
953       reader_thread->workflow->pcap_handle != NULL) {
954 
955     if (pcap_loop(reader_thread->workflow->pcap_handle, -1,
956 		  &ndpi_process_packet, (uint8_t *)reader_thread) == PCAP_ERROR) {
957 
958       fprintf(stderr, "Error while reading pcap file: '%s'\n",
959 	      pcap_geterr(reader_thread->workflow->pcap_handle));
960       reader_thread->workflow->error_or_eof = 1;
961     }
962   }
963 }
964 
break_pcap_loop(struct nDPI_reader_thread * const reader_thread)965 static void break_pcap_loop(struct nDPI_reader_thread * const reader_thread)
966 {
967   if (reader_thread->workflow != NULL &&
968       reader_thread->workflow->pcap_handle != NULL)
969     {
970       pcap_breakloop(reader_thread->workflow->pcap_handle);
971     }
972 }
973 
processing_thread(void * const ndpi_thread_arg)974 static void * processing_thread(void * const ndpi_thread_arg)
975 {
976   struct nDPI_reader_thread const * const reader_thread =
977     (struct nDPI_reader_thread *)ndpi_thread_arg;
978 
979   printf("Starting Thread %d\n", reader_thread->array_index);
980   run_pcap_loop(reader_thread);
981   reader_thread->workflow->error_or_eof = 1;
982   return NULL;
983 }
984 
processing_threads_error_or_eof(void)985 static int processing_threads_error_or_eof(void)
986 {
987   for (int i = 0; i < reader_thread_count; ++i) {
988     if (reader_threads[i].workflow->error_or_eof == 0) {
989       return 0;
990     }
991   }
992   return 1;
993 }
994 
start_reader_threads(void)995 static int start_reader_threads(void)
996 {
997   sigset_t thread_signal_set, old_signal_set;
998 
999   sigfillset(&thread_signal_set);
1000   sigdelset(&thread_signal_set, SIGINT);
1001   sigdelset(&thread_signal_set, SIGTERM);
1002   if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) {
1003     fprintf(stderr, "pthread_sigmask: %s\n", strerror(errno));
1004     return 1;
1005   }
1006 
1007   for (int i = 0; i < reader_thread_count; ++i) {
1008     reader_threads[i].array_index = i;
1009 
1010     if (reader_threads[i].workflow == NULL) {
1011       /* no more threads should be started */
1012       break;
1013     }
1014 
1015     if (pthread_create(&reader_threads[i].thread_id, NULL,
1016 		       processing_thread, &reader_threads[i]) != 0)
1017       {
1018 	fprintf(stderr, "pthread_create: %s\n", strerror(errno));
1019 	return 1;
1020       }
1021   }
1022 
1023   if (pthread_sigmask(SIG_BLOCK, &old_signal_set, NULL) != 0) {
1024     fprintf(stderr, "pthread_sigmask: %s\n", strerror(errno));
1025     return 1;
1026   }
1027 
1028   return 0;
1029 }
1030 
stop_reader_threads(void)1031 static int stop_reader_threads(void)
1032 {
1033   unsigned long long int total_packets_processed = 0;
1034   unsigned long long int total_l4_data_len = 0;
1035   unsigned long long int total_flows_captured = 0;
1036   unsigned long long int total_flows_idle = 0;
1037   unsigned long long int total_flows_detected = 0;
1038 
1039   for (int i = 0; i < reader_thread_count; ++i) {
1040     break_pcap_loop(&reader_threads[i]);
1041   }
1042 
1043   printf("------------------------------------ Stopping reader threads\n");
1044 
1045   for (int i = 0; i < reader_thread_count; ++i) {
1046     if (reader_threads[i].workflow == NULL) {
1047       continue;
1048     }
1049 
1050     total_packets_processed += reader_threads[i].workflow->packets_processed;
1051     total_l4_data_len += reader_threads[i].workflow->total_l4_data_len;
1052     total_flows_captured += reader_threads[i].workflow->total_active_flows;
1053     total_flows_idle += reader_threads[i].workflow->total_idle_flows;
1054     total_flows_detected += reader_threads[i].workflow->detected_flow_protocols;
1055 
1056     printf("Stopping Thread %d, processed %10llu packets, %12llu bytes, total flows: %8llu, "
1057 	   "idle flows: %8llu, detected flows: %8llu\n",
1058 	   reader_threads[i].array_index, reader_threads[i].workflow->packets_processed,
1059 	   reader_threads[i].workflow->total_l4_data_len, reader_threads[i].workflow->total_active_flows,
1060 	   reader_threads[i].workflow->total_idle_flows, reader_threads[i].workflow->detected_flow_protocols);
1061   }
1062   /* total packets captured: same value for all threads as packet2thread distribution happens later */
1063   printf("Total packets captured.: %llu\n",
1064 	 reader_threads[0].workflow->packets_captured);
1065   printf("Total packets processed: %llu\n", total_packets_processed);
1066   printf("Total layer4 data size.: %llu\n", total_l4_data_len);
1067   printf("Total flows captured...: %llu\n", total_flows_captured);
1068   printf("Total flows timed out..: %llu\n", total_flows_idle);
1069   printf("Total flows detected...: %llu\n", total_flows_detected);
1070 
1071   for (int i = 0; i < reader_thread_count; ++i) {
1072     if (reader_threads[i].workflow == NULL) {
1073       continue;
1074     }
1075 
1076     if (pthread_join(reader_threads[i].thread_id, NULL) != 0) {
1077       fprintf(stderr, "pthread_join: %s\n", strerror(errno));
1078     }
1079 
1080     free_workflow(&reader_threads[i].workflow);
1081   }
1082 
1083   return 0;
1084 }
1085 
sighandler(int signum)1086 static void sighandler(int signum)
1087 {
1088   fprintf(stderr, "Received SIGNAL %d\n", signum);
1089 
1090   if (main_thread_shutdown == 0) {
1091     main_thread_shutdown = 1;
1092     if (stop_reader_threads() != 0) {
1093       fprintf(stderr, "Failed to stop reader threads!\n");
1094       exit(EXIT_FAILURE);
1095     }
1096   } else {
1097     fprintf(stderr, "Reader threads are already shutting down, please be patient.\n");
1098   }
1099 }
1100 
main(int argc,char ** argv)1101 int main(int argc, char ** argv)
1102 {
1103   if (argc == 0) {
1104     return 1;
1105   }
1106 
1107   printf("usage: %s [PCAP-FILE-OR-INTERFACE]\n"
1108 	 "----------------------------------\n"
1109 	 "nDPI version: %s\n"
1110 	 " API version: %u\n"
1111 	 "libgcrypt...: %s\n"
1112 	 "----------------------------------\n",
1113 	 argv[0],
1114 	 ndpi_revision(), ndpi_get_api_version(),
1115 	 (ndpi_get_gcrypt_version() == NULL ? "-" : ndpi_get_gcrypt_version()));
1116 
1117   if (setup_reader_threads((argc >= 2 ? argv[1] : NULL)) != 0) {
1118     fprintf(stderr, "%s: setup_reader_threads failed\n", argv[0]);
1119     return 1;
1120   }
1121 
1122   if (start_reader_threads() != 0) {
1123     fprintf(stderr, "%s: start_reader_threads\n", argv[0]);
1124     return 1;
1125   }
1126 
1127   signal(SIGINT, sighandler);
1128   signal(SIGTERM, sighandler);
1129   while (main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0) {
1130     sleep(1);
1131   }
1132 
1133   if (main_thread_shutdown == 0 && stop_reader_threads() != 0) {
1134     fprintf(stderr, "%s: stop_reader_threads\n", argv[0]);
1135     return 1;
1136   }
1137 
1138   return 0;
1139 }
1140