1 /*
2 
3     Copyright 2008--2013, Centre for Advanced Internet Architectures,
4     Swinburne University of Technology, http://caia.swin.edu.au
5 
6     Author: Amiel Heyde, amiel@swin.edu.au
7 
8     This program is free software; you can redistribute it and/or modify
9     it under the terms of the GNU General Public License version 2 as
10     published by the Free Software Foundation.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 
21     $Id: instance.c 176 2015-05-21 00:35:27Z szander $
22 
23     Note to self - Changes to 0.3.1:
24     Used explicit casting to enable compilation on 64bit: *(struct in_addr *)&
25     See http://www.gidforums.com/t-7865.html for details (in_addr_t vs in_addr)
26 
27 */
28 
29 #include <pcap.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 
34 #include <sys/queue.h>
35 
36 
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <arpa/inet.h>
41 #include <netinet/if_ether.h>
42 #include <netinet/tcp.h>
43 #include <pthread.h>
44 #include <unistd.h>
45 
46 #include "spptool.h"
47 #include "crc32.h"
48 #include "crc64.h"
49 #include "timeval.h"
50 #include "instance.h"
51 #include "master.h"
52 #include "slave.h"
53 
54 //######## VARIABLES #########//
55 PUBLIC monitor_point_t mp[MP_COUNT];
56 
57 PRIVATE char errbuf[PCAP_ERRBUF_SIZE];
58 PUBLIC in_addr_t addr[2];
59 PUBLIC in_addr_t nat_addr[2];
60 
61 extern int hash_fields;
62 extern HASH_FUNCTION hash_function;
63 extern int finished;
64 extern int options;
65 extern int delta_t_max;
66 extern int verbosity;
67 extern unsigned int sec_offset;
68 
69 //######## PROTOTYPES ########//
70 
71 PUBLIC void mpoint_load(monitor_point_t * mpoint, const mp_type_t type, const char * name, mp_id_t id);
72 PUBLIC void instance_unload(mp_id_t id);
73 
74 PRIVATE instance_t* assembleInstance(const struct pcap_pkthdr *pcap_hdr, const struct ip *ip_hdr, unsigned short ip_caplen,
75                                      direction_t direction);
76 PUBLIC void createInstance(u_char *args, const struct pcap_pkthdr *pcap_hdr, const u_char *pkt);
77 PUBLIC void * createInstances(void * args);
78 PRIVATE void setPcapFilter(monitor_point_t * mpoint, char * filter_string);
79 PUBLIC uint64_t getHash(const struct ip *ip_hdr, unsigned short ip_caplen);
80 
81 //######## FUNCTIONS #########//
82 
83 /*
84  * Initialises the monitor point struct. Called once for MON and once for REF.
85  * */
mpoint_load(monitor_point_t * mpoint,const mp_type_t type,const char * name,mp_id_t id)86 PUBLIC void mpoint_load(monitor_point_t * mpoint, const mp_type_t type, const char * name, mp_id_t id)
87 {
88   mpoint->id = id;
89 
90   mpoint->q_size[IN] = 0;
91   mpoint->q_size[OUT] = 0;
92   mpoint->type = type;
93   switch(type) {
94 
95     case live:
96 
97                 if((mpoint->dev = pcap_open_live(name, BUFSIZ, 1, 500, errbuf)) == NULL) {
98                  printf("PCAP error: %s\n", errbuf);
99                  exit(-1);
100                 }
101                 //Possible auto IP discovery from interface
102                // if(pcap_lookupnet(pcap_lookupdev(errbuf), netp, errbuf)) {
103                //  printf("PCAP error: %s\n", errbuf);
104                //  exit(-1);
105                // }
106                 break;
107     case file:
108                 // Open PCAP file for reading
109                 if((mpoint->dev = pcap_open_offline(name, errbuf)) == NULL) {
110                  printf("Error opening file: %s\n", errbuf);
111                   exit(-2);
112                 }
113                 mpoint->byte_order_swapped = pcap_is_swapped(mpoint->dev);
114                 break;
115     case remote:
116                 loadMaster(mpoint, name);
117                 break;
118     default:
119                 break;
120   }
121 
122 
123   TAILQ_INIT(&mpoint->instance_q[IN]);
124   TAILQ_INIT(&mpoint->instance_q[OUT]);
125 
126   pthread_mutex_init(&mpoint->thresh_mutex, NULL);
127   pthread_cond_init(&mpoint->thresh_cond, NULL);
128 
129   pthread_mutex_init(&mpoint->q_mutex[IN], NULL);
130   pthread_mutex_init(&mpoint->q_mutex[OUT], NULL);
131 
132   pthread_mutex_init(&mpoint->q_size_mutex[OUT], NULL);
133   pthread_mutex_init(&mpoint->q_size_mutex[IN], NULL);
134 
135   mpoint->finished = 0;
136   if(mpoint->type != remote) {
137       mpoint->datalink_type = pcap_datalink(mpoint->dev);
138         if(verbosity & 32) printf("Monitor point id %d has datalink type: %d\n", mpoint->id, mpoint->datalink_type);
139   }
140 }
141 
142 
143 /*
144  * Constructs the PCAP filter string.
145  * Spawns the input reader threads
146  * */
mpoint_start(monitor_point_t * mpoint)147 PUBLIC void mpoint_start(monitor_point_t * mpoint) {
148 
149  if(options & use_pcap_filter) {
150    if(mpoint->type != remote) {
151       char filter_string[90];
152       char buf[8];
153       strcpy(filter_string, "host ");
154       strcat(filter_string, inet_ntoa(*(struct in_addr *)&addr[REF]));
155       if (nat_addr[0] == 0 && nat_addr[1] == 0)
156         strcat(filter_string, " and host ");
157       else //one of the ends has NAT
158         strcat(filter_string, " or host ");
159       strcat(filter_string, (char*)inet_ntoa(*(struct in_addr *)&addr[MON]));
160       strcat(filter_string, " and !icmp[icmptype]==3 and !port ");
161       sprintf(buf, "%d", PORT);
162       strcat(filter_string, buf);
163           if(verbosity & 32) printf("PCAP filter string: %s\n", filter_string);
164       setPcapFilter(mpoint, filter_string);
165     }
166  }
167   switch(mpoint->type) {
168     case file:
169     case live:
170                 pthread_create(&mpoint->thread, NULL, &createInstances, (void*)mpoint);
171                 if(verbosity & 2) printf("INFO: Thread started\n");
172                 break;
173     case remote:
174                 pthread_create(&mpoint->thread, NULL, &runMaster, (void*)mpoint);
175                 break;
176     default:
177                 break;
178   }
179 
180 
181 
182 }
183 
184 /*
185  * Shuts down pcap, frees up the TAILQ lists
186  * */
mpoint_unload(monitor_point_t * mpoint)187 PUBLIC void mpoint_unload(monitor_point_t * mpoint) {
188 
189   switch(mpoint->type) {
190     case live:
191     case file:
192                 // Close pcap device
193                 pcap_close(mpoint->dev);
194                 break;
195     case remote:
196                 break;
197     default:
198                 break;
199   }
200 
201   //CLEAN BOTH INSTANCE QUEUES
202   direction_t direction;
203   instance_t *ins, *ins_next;
204 
205 
206   for(direction = IN; direction <= OUT; direction++) {
207     pthread_mutex_lock(&mpoint->q_mutex[direction]);
208     ins = TAILQ_FIRST(&mpoint->instance_q[direction]);
209     while (ins != NULL) {
210       ins_next = TAILQ_NEXT(ins, entries);
211       free(ins);
212       ins = ins_next;
213     }
214     pthread_mutex_unlock(&mpoint->q_mutex[direction]);
215   }
216 }
217 
assembleInstance(const struct pcap_pkthdr * pcap_hdr,const struct ip * ip_hdr,unsigned short ip_caplen,direction_t direction)218 PRIVATE instance_t* assembleInstance(const struct pcap_pkthdr *pcap_hdr, const struct ip *ip_hdr, unsigned short ip_caplen,
219                                      direction_t direction){
220 
221   instance_t * ins = malloc(sizeof(instance_t));
222   //printf("))) Malloc to ptr: %u\n", ins);
223   ins->pkt_id = getHash(ip_hdr, ip_caplen);
224 
225   ins->ts = pcap_hdr->ts;                                                                     // Store instance timestamp
226   return ins;
227 }
228 
229 /*
230  * Creates hash of a packet that is used for the pkt_id field.
231  *
232  * The pkt_id is crucial for determining whether a packet seen at REF
233  * has also been seen at MON. We rely on hashing together one or more fields
234  * that are expected to be vary from packet to packet but also be invariant along
235  * the path between REF and MON. The global 'hash_fields' holds a bitmask
236  * indicating which fields should be included in the hash calculation.
237  *
238  * In SPP <= 0.3.6 we assumed IP.ID (Identification) field would be non-zero and
239  * unique per packet emitted by a given source (at least for time periods longer than
240  * a handful of RTTs). However, RFC 6864 (Feb 2013) officially deprecated this use of
241  * IP.ID field, and it is now a largely unreliable mechanism for disambiguating packets
242  * that might otherwise look the same based on other header fields.
243  *
244  * Some other fields may or may not be invariant along a path. Certain middleboxes have
245  * been observed in the wild actually twiddling with the raw TCP sequence numbers,
246  * ensuring they wont match between REF and MON.
247  *
248  * */
getHash(const struct ip * ip_hdr,unsigned short ip_caplen)249 PUBLIC uint64_t getHash(const struct ip *ip_hdr, unsigned short ip_caplen) {
250 
251   unsigned int hash_offset = 0;
252   unsigned char hash_data[HASH_DATA_LENGTH + 1];
253   u_char * transport_hdr = ((u_char *)ip_hdr + (ip_hdr->ip_hl * 4));
254   struct tcphdr *tcp_hdr = (struct tcphdr *)transport_hdr;
255 
256   //printf("assembling\n");
257   if (nat_addr[0] == 0 && nat_addr[1] == 0) {	// Do not include IP addresses in hash when running through NAT (This would cause no hashes to match)
258     if(hash_fields & 1) {                 // Add Source address field
259       memcpy((void*)(hash_data + hash_offset), (void*)&ip_hdr->ip_src, sizeof(ip_hdr->ip_src));
260       hash_offset += sizeof(ip_hdr->ip_src);
261     }
262     if(hash_fields & 2) {                 // Add Destination address field
263       memcpy((void*)(hash_data + hash_offset), (void*)&ip_hdr->ip_dst, sizeof(ip_hdr->ip_dst));
264       hash_offset += sizeof(ip_hdr->ip_dst);
265     }
266   }
267   if(hash_fields & 4) {                   // Add Protocol
268     memcpy((void*)(hash_data + hash_offset), (void*)&ip_hdr->ip_p, sizeof(ip_hdr->ip_p));
269     hash_offset += sizeof(ip_hdr->ip_p);
270   }
271   if(hash_fields & 8) {                   // Add Identification field
272     memcpy((void*)(hash_data + hash_offset), (void*)&ip_hdr->ip_id, sizeof(ip_hdr->ip_id));
273     hash_offset += sizeof(ip_hdr->ip_id);
274   }
275 
276 
277   if(ip_hdr->ip_p == 6 || ip_hdr->ip_p == 17) {  //We have a TCP or UDP packet
278     if(hash_fields & 16) {                // Add TCP/UDP src
279       memcpy((void*)(hash_data + hash_offset), (void*)transport_hdr, 2);
280       hash_offset += 2;
281     }
282     if(hash_fields & 32) {                // Add TCP/UDP dst
283       memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + 2), 2);
284      hash_offset += 2;
285     }
286 
287     if(ip_hdr->ip_p == 6) {  //We have a TCP packet
288       if(hash_fields & 64) {              // Add TCP Seq No
289         memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + 4), 4);
290         hash_offset += 4;
291       }
292       if(hash_fields & 128) {             // Add TCP Ack No
293         memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + 8), 4);
294         hash_offset += 4;
295       }
296       if(hash_fields & 256) {             // Add TCP data offset, flags, window size
297 	memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + 12), 4);
298 	hash_offset += 4;
299       }
300       if(hash_fields & 512) {             // Add TCP Checksum, urgent pointer
301         memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + 16), 4);
302         hash_offset += 4;
303       }
304       if(hash_fields & 8192) {            // Add up to first hash_bytes bytes of TCP payload
305 	unsigned short hash_bytes = 12;
306 	unsigned short tcp_data_start = tcp_hdr->th_off * 4;
307 	int tcp_data_len = ip_caplen - (ip_hdr->ip_hl * 4) - tcp_data_start;
308         if (tcp_data_len > 0) {
309           if (tcp_data_len < hash_bytes) {
310 	    hash_bytes = tcp_data_len;
311 	  }
312 	  memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + tcp_data_start), hash_bytes);
313 	  hash_offset += hash_bytes;
314         }
315       }
316       if(hash_fields & 16384) {           // Hash across all TCP options bytes if present
317 	unsigned short hash_bytes = tcp_hdr->th_off * 4 - 20; // Number of bytes of TCP options
318         int tcp_optdata_len = ip_caplen - (ip_hdr->ip_hl * 4) - 20;
319         if (tcp_optdata_len > 0) {
320           if (tcp_optdata_len < hash_bytes) {
321             hash_bytes = tcp_optdata_len;
322           }
323 	  memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + 20), hash_bytes);
324 	  hash_offset += hash_bytes;
325 	}
326       }
327 
328     }
329     else {  //Must be UDP
330       if(hash_fields & 1024) {             // Add UDP length, checksum
331         memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + 4), 4);
332         hash_offset += 4;
333       }
334       if(hash_fields & 2048) {             // Add UDP payload (up to 12 bytes)
335         unsigned short hash_bytes = 12;
336 	int data_len = ip_caplen - 8;
337         if (data_len > 0) {
338 	  if (data_len < hash_bytes) {
339 	    hash_bytes = data_len;
340 	  }
341 	  memcpy((void*)(hash_data + hash_offset), (void*)(transport_hdr + 8), hash_bytes);
342 	  hash_offset += hash_bytes;
343         }
344       }
345 
346     }
347   } else {
348 	if(hash_fields & 4096) {
349 		// If not TCP or UDP add up to first 20 bytes past IP header
350                 unsigned short hash_bytes = 20;
351 		int ip_data_len = ip_caplen - ip_hdr->ip_hl * 4;
352                 if (ip_data_len > 0) {
353 		  if (ip_data_len < hash_bytes) {
354 		    hash_bytes = ip_data_len;
355 		  }
356 		  memcpy((void*)(hash_data + hash_offset), (void*)transport_hdr, hash_bytes);
357 		  hash_offset += hash_bytes;
358                 }
359 	}
360   }
361 
362   return hash_function(0, hash_data, hash_offset);
363 }
364 
365 /*
366  * This is spawned inside its own thread. It is the "read the input stream"
367  * thread. One is created for each input stream (ie, one for each of REF
368  * and MON)
369  * */
createInstances(void * args)370 PUBLIC void * createInstances(void * args) {
371     monitor_point_t * mpoint = (monitor_point_t *)args;
372 
373     pcap_loop(mpoint->dev, -1, createInstance, (u_char*)mpoint);
374     sleep(1);         //Leave some time for all calculations to be finished before sending the telling the program to quit.
375     mpoint->finished = 1;
376 
377     pthread_exit(NULL);
378 }
379 
380 /*
381  * Reads a single packet in and adds it to the incoming queue to be processed
382  * by a different thread.
383  *
384  * */
createInstance(u_char * args,const struct pcap_pkthdr * pcap_hdr,const u_char * pkt)385 PUBLIC void createInstance(u_char *args, const struct pcap_pkthdr *pcap_hdr, const u_char *pkt)
386 {
387 
388   direction_t direction;
389   struct ip * ip_hdr;
390 
391   in_addr_t src_addr, dst_addr;
392   instance_t * ins;
393 
394   struct ether_header * eth_hdr = (struct ether_header *)pkt;
395   monitor_point_t * mpoint = (monitor_point_t *)args;
396   int pkt_err = 0;
397   int link_hdr_len = 0;
398   unsigned short ip_caplen;
399 
400   if(finished)pthread_exit(NULL);                     // Shut this thread down if we have been told to finish
401 
402   direction = -1;                                     // No direction by default (-1)
403 
404  switch(mpoint->datalink_type) {
405     case DLT_EN10MB:                                  //printf("Found Ethernet\n");
406                       if(eth_hdr->ether_type != 8 && eth_hdr->ether_type != 2048) { // If we are not carrying IP
407                             if(verbosity & 32) printf("Skipping Ethernet frame not containing IPv4\n");
408                         return;
409                       }
410                       link_hdr_len = sizeof(struct ether_header);
411                       break;
412     case DLT_LOOP:
413     case DLT_NULL:                                    //printf("Found Null/Loop\n");
414 
415                       if(*(uint32_t*)pkt != 2) {
416                             if(verbosity & 32) printf("Skipping Null/Loopback frame not containing IPv4\n");
417                         return;
418                       }
419                       link_hdr_len = 4;
420                       break;
421     case DLT_LINUX_SLL:
422 		      //Changed by David Hayes to reflect the pcap man page
423                       link_hdr_len = 16;
424                       break;
425     case DLT_PPP:
426                       // Support PPP-encapsulated frames with/without HDLC encaps
427 		      if ((*pkt == 0xFF) && (*(pkt+1) == 0x03)) {
428 			link_hdr_len = 4;
429 		      } else {
430 			link_hdr_len = 2;
431 		      }
432                       break;
433     default:
434                       printf("DataLink type not supported\n");
435                       exit(EXIT_FAILURE);
436     }
437 
438   ip_hdr = (struct ip *)(pkt + link_hdr_len);
439 
440   if(ip_hdr->ip_v != 4){
441     if(verbosity & 32) printf("INFO: Skipping Packet: not IPv4\n");
442     pkt_err = 1;
443   }
444 
445   ip_caplen = pcap_hdr->caplen - link_hdr_len;
446   if(ip_hdr->ip_len == 0){
447     // Some instances of captured TSO'ed frames have been seen with ip_len=0,
448     // so re-construct a fake a lower-bound IP packet length based on how many bytes
449     // actually captured (may be used later during pkt_id generation)
450     ip_hdr->ip_len = htons(ip_caplen);
451   }
452 
453   if(mpoint->byte_order_swapped){
454     src_addr = ntohl(ip_hdr->ip_src.s_addr);   //TEST THIS?
455 //    printf("%s", src_addr);
456     dst_addr = ntohl(ip_hdr->ip_dst.s_addr);   //TEST THIS?
457   } else {
458     src_addr = ip_hdr->ip_src.s_addr;
459     dst_addr = ip_hdr->ip_dst.s_addr;
460   }
461 
462 
463   if(verbosity & 32) {
464     char addr_string[2][16];
465     strncpy(addr_string[0], inet_ntoa(*(struct in_addr *)&src_addr), 16); // inet_ntoa() not always thread-safe?
466     strncpy(addr_string[1], inet_ntoa(*(struct in_addr *)&dst_addr), 16); // inet_ntoa() not always thread-safe?
467     printf("INFO: Next packet from monitor point %u: src %s, dst %s\n",
468            mpoint->id, addr_string[0], addr_string[1]);
469   }
470 
471   if(src_addr == addr[0] && dst_addr == addr[1])                   // Determine if we have found an OUT packet
472     direction = OUT;
473   else if (src_addr == nat_addr[0] && dst_addr == addr[1])
474     direction = OUT;
475   else if (src_addr == addr[0] && dst_addr == nat_addr[1])
476     direction = OUT;
477   else if(src_addr == addr[1] && dst_addr == addr[0])              // Determine if we have found an IN packet
478     direction = IN;
479   else if(src_addr == addr[1] && dst_addr == nat_addr[0])
480     direction = IN;
481   else if(src_addr == nat_addr[1] && dst_addr == addr[0])
482     direction = IN;
483   else
484     pkt_err = 1;
485 
486 
487   if(pkt_err != 1) {
488 
489     if(options & run_slave) {
490       sendHashes(pcap_hdr, ip_hdr, ip_caplen, direction);
491       //printf("For debugging only \n");
492     }
493     else {
494       ins = assembleInstance(pcap_hdr, ip_hdr, ip_caplen, direction);
495       pthread_mutex_lock(&mpoint->q_mutex[direction]);
496       TAILQ_INSERT_TAIL(&mpoint->instance_q[direction], ins, entries);       // Insert instance into appropriate queue
497       pthread_mutex_unlock(&mpoint->q_mutex[direction]);
498 
499       pthread_mutex_lock(&mpoint->q_size_mutex[direction]); //uncommented by David Hayes
500       mpoint->q_size[direction]++;                         // Increment queue length
501       //printf("DEBUGGING... \n");
502       pthread_mutex_unlock(&mpoint->q_size_mutex[direction]);
503       //printf("DEBUGGING 2... \n");
504 
505       if(verbosity & 16) {
506         printf("INFO: Added %lu to mpoint %u instance_q[%u] - timestamp: %llu.%06llu\n", ins->pkt_id, mpoint->id, direction,
507                (unsigned long long)ins->ts.tv_sec, (unsigned long long)ins->ts.tv_usec);
508       }
509     }
510   }
511   else if(verbosity & 32) {
512     printf("Packet discarded - (Not matching src and dst address)\n");
513     char addr_string[2][16];
514     strncpy(addr_string[0], inet_ntoa(*(struct in_addr *)&addr[0]), 16);
515     strncpy(addr_string[1], inet_ntoa(*(struct in_addr *)&addr[1]), 16);
516     printf("Src and dst should be %s or %s\n", addr_string[0], addr_string[1]);
517 
518   }
519 
520   // Check to see if the processing queues are full. If they are, wait a little bit
521   // If we're doign a live or remote capture, do not wait - presumably, the frame
522   // is not added to the queue if the queue is full.
523 
524   // sza: I removed the sleeping for reading from files. If we sleep here we slow
525   // down the analysis process. We rather use a bit more CPU to finish as quickly
526   // as possible
527 #if 0
528   if(mpoint->type == file) {
529     pthread_mutex_lock(&mpoint->q_size_mutex[IN]);
530     pthread_mutex_lock(&mpoint->q_size_mutex[OUT]);
531     while(mpoint->q_size[IN] >= Q_MAX_LEN && mpoint->q_size[OUT] >= Q_MAX_LEN) {
532       pthread_mutex_unlock(&mpoint->q_size_mutex[IN]);
533       pthread_mutex_unlock(&mpoint->q_size_mutex[OUT]);
534       if(verbosity & 2) printf("INFO: Monitor point %u sleeping\n", mpoint->id);
535       usleep(10);
536       if(finished)pthread_exit(NULL);
537       pthread_mutex_lock(&mpoint->q_size_mutex[IN]);
538       pthread_mutex_lock(&mpoint->q_size_mutex[OUT]);
539     }
540     pthread_mutex_unlock(&mpoint->q_size_mutex[IN]);
541     pthread_mutex_unlock(&mpoint->q_size_mutex[OUT]);
542   }
543 #endif
544   if(finished)pthread_exit(NULL);
545 
546 }
547 
548 /*
549  * Remove a certain packet pair from the yet-to-be-processed queue
550  * */
removeInstance(instance_t * instance,monitor_point_t * mpoint,direction_t direction)551 PUBLIC void removeInstance(instance_t * instance, monitor_point_t * mpoint, direction_t direction) {
552     //printf("))) About to free ptr: %u\n", instance);
553     pthread_mutex_lock(&mpoint->q_mutex[direction]);
554     TAILQ_REMOVE(&mpoint->instance_q[direction], instance, entries);
555     //printf("%%% removeInstance: removed from Q\n");
556     pthread_mutex_unlock(&mpoint->q_mutex[direction]);
557       free(instance);
558 
559     //printf("%%% removeInstance: freed\n");
560     pthread_mutex_lock(&mpoint->q_size_mutex[direction]);
561     mpoint->q_size[direction]--;
562     pthread_mutex_unlock(&mpoint->q_size_mutex[direction]);
563     //printf("%%% removeInstance: done\n");
564     //displayQueueSize();
565 }
566 
567 /*
568  * Prune packets older than delta_t_max from the yet-to-be-processed queue.
569  * Calls removeInstance() on each of the pruned packets
570  * */
removeOldInstances(monitor_point_t * mpoint,direction_t direction,struct timeval * cur_time)571 PUBLIC void removeOldInstances(monitor_point_t * mpoint, direction_t direction, struct timeval * cur_time) {
572   int delta_t;
573   instance_t *ins, *ins_tmp;
574   pthread_mutex_lock(&mpoint->q_mutex[direction]);
575 
576   TAILQ_FOREACH_SAFE(ins, &mpoint->instance_q[direction], entries, ins_tmp) {
577     pthread_mutex_unlock(&mpoint->q_mutex[direction]);
578      delta_t = (cur_time->tv_sec - sec_offset) - ins->ts.tv_sec;
579 
580     if(delta_t  > (delta_t_max + 1)) {
581       if(verbosity & 8) {
582         printf("INFO: Removing old instance %lu\n", ins->pkt_id);
583       }
584       removeInstance(ins, mpoint, direction);
585     }
586     else {
587       return;                    // No point searching any more as items are in chronological order
588     }
589     pthread_mutex_lock(&mpoint->q_mutex[direction]);
590   }
591   pthread_mutex_unlock(&mpoint->q_mutex[direction]);
592 
593 }
594 
595 /*
596  * Applies the filter string, called from mpoint_start
597  * */
setPcapFilter(monitor_point_t * mpoint,char * filter_string)598 PRIVATE void setPcapFilter(monitor_point_t * mpoint, char * filter_string) {
599   struct bpf_program bpf_prog;
600 
601   /* Note: Using this filter implicitly limits us to DLT_EN10MB frames.
602    * (https://www.tcpdump.org/manpages/pcap-filter.7.html)
603    */
604 
605   if(pcap_compile(mpoint->dev, &bpf_prog, filter_string,0,1) == -1)
606     printf("Error compiling BPF program: %s\n", pcap_geterr(mpoint->dev));
607 
608   if(pcap_setfilter(mpoint->dev, &bpf_prog) == -1)
609     printf("Error setting BPF filter: %s\n", pcap_geterr(mpoint->dev));
610 
611   pcap_freecode(&bpf_prog);
612 }
613