1 /*
2  *	aprsc
3  *
4  *	(c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
5  *
6  *	This program is licensed under the BSD license, which can be found
7  *	in the file LICENSE.
8  *
9  */
10 
11 /*
12  *	dupecheck.c: the dupe-checking thread
13  */
14 
15 #include <stdio.h>
16 #include <errno.h>
17 #include <string.h>
18 #include <signal.h>
19 #include <time.h>
20 #include <stdlib.h>
21 #include <pthread.h>
22 
23 #ifdef USE_EVENTFD
24 #include <sys/eventfd.h>
25 #endif
26 
27 #include "dupecheck.h"
28 #include "config.h"
29 #include "hlog.h"
30 #include "hmalloc.h"
31 #include "cellmalloc.h"
32 #include "keyhash.h"
33 #include "filter.h"
34 #include "historydb.h"
35 #include "http.h"
36 #include "accept.h"
37 
38 int dupecheck_shutting_down;
39 int dupecheck_running;
40 pthread_t dupecheck_th;
41 long dupecheck_cellgauge;
42 
43 int pbuf_global_count;
44 int pbuf_global_dupe_count;
45 
46 int pbuf_global_count_limit      =   5000; /* Real criteria is expirer..		 */
47 int pbuf_global_dupe_count_limit =   100; /* .. but we set some minimum packet counts
48 					     into the global pbuf queue anyway.  */
49 
50 long long dupecheck_outcount;  /* 64 bit counters for statistics */
51 long long dupecheck_dupecount;
52 long long dupecheck_dupetypes[DTYPE_MAX+1];
53 
54 #define DUPECHECK_DB_SIZE 8192		/* Hash index table size */
55 struct dupe_record_t *dupecheck_db[DUPECHECK_DB_SIZE]; /* Hash index table      */
56 
57 #ifndef _FOR_VALGRIND_
58 struct dupe_record_t *dupecheck_free;
59 cellarena_t *dupecheck_cells;
60 #endif
61 
62 
63 volatile uint32_t  dupecheck_seqnum      = -2000; // Explicit early wrap-around..
64 volatile uint32_t  dupecheck_dupe_seqnum = -2000; // Explicit early wrap-around..
65 
66 #ifdef USE_EVENTFD
67 int dupecheck_eventfd = -1;
68 struct pollfd dupecheck_eventfd_poll;
69 #endif
70 
pbuf_seqnum_lag(const uint32_t seqnum,const uint32_t pbuf_seq)71 static int pbuf_seqnum_lag(const uint32_t seqnum, const uint32_t pbuf_seq)
72 {
73 	// The lag calculation method takes care of the value space
74 	// wrap-around, thus this is not limited on on first 4 billion
75 	// packets, or whatever smallish..  As long as there are less
76 	// than 2 billion packets in the in-core value spaces.
77 
78 	int lag = (int32_t)(seqnum - pbuf_seq);
79 
80 	if (pbuf_seq == 0)	// Worker without data.
81 		lag = 2000000000;
82 	// Presumption is that above mentioned situation has very short
83 	// existence, but as it can happen, we flag it with such a high
84 	// value that temporarily the global_pbuf_purger() will not
85 	// purge any items out of the global queue.
86 	return lag;
87 }
88 
89 
90 /*
91  *	Global pbuf purger cleans out pbufs that are too old..
92  */
global_pbuf_purger(const int all,int pbuf_lag,int pbuf_dupe_lag)93 static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
94 {
95 	struct pbuf_t *pb, *pb2;
96 	struct pbuf_t *freeset[2002];
97 	int n, n1, n2, lag;
98 	time_t lastage1 = 0, lastage2 = 0;
99 
100 	time_t expire2 = tick - pbuf_global_dupe_expiration;
101 	time_t expire1 = tick - pbuf_global_expiration;
102 
103 	if (all) {
104 		pbuf_global_count_limit       = 0;
105 		pbuf_global_dupe_count_limit  = 0;
106 		expire1  = expire2       = tick+10;
107 	}
108 
109 	pb = pbuf_global;
110 	if (pb)
111 		lastage1 = pb->t;
112 	n = 0;
113 	n1 = 0;
114 	while ( pbuf_global_count > pbuf_global_count_limit && pb ) {
115 
116 		lastage1 = pb->t;
117 		if (pb->t >= expire1)
118 			break; // stop at newer than expire1
119 
120 		lag = pbuf_seqnum_lag(dupecheck_seqnum, pb->seqnum);
121 		if (pbuf_lag >= lag) {
122 			hlog(LOG_DEBUG, "global_pbuf_purger: stop at lag %d, dupecheck at %d, pb %d", lag, dupecheck_seqnum, pb->seqnum);
123 			break; // some output-worker is lagging behind this item!
124 		}
125 
126 		freeset[n++] = pb;
127 		++n1;
128 		--pbuf_global_count;
129 		// dissociate the pbuf from the chain
130 		pb2 = pb->next; pb->next = NULL; pb = pb2;
131 		if (n >= 2000) {
132 			pbuf_free_many(freeset, n);
133 			n = 0;
134 		}
135 	}
136 	pbuf_global = pb;
137 	if (n > 0) {
138 		pbuf_free_many(freeset, n);
139 	}
140 
141 	pb = pbuf_global_dupe;
142 	if (pb)
143 		lastage2 = pb->t;
144 	n = 0;
145 	n2 = 0;
146 	while ( pbuf_global_dupe_count > pbuf_global_dupe_count_limit && pb ) {
147 
148 		lastage2 = pb->t;
149 		if (pb->t >= expire2)
150 			break; // stop at newer than expire2
151 		lag = pbuf_seqnum_lag(dupecheck_dupe_seqnum, pb->seqnum);
152 		if (pbuf_dupe_lag >= lag) {
153 			hlog(LOG_DEBUG, "global_pbuf_purger: dupe stop at lag %d, dupecheck at %d, pb %d", lag, dupecheck_seqnum, pb->seqnum);
154 			break; // some output-worker is lagging behind this item!
155 		}
156 
157 		freeset[n++] = pb;
158 		++n2;
159 		--pbuf_global_dupe_count;
160 		// dissociate the pbuf from the chain
161 		pb2 = pb->next; pb->next = NULL; pb = pb2;
162 		if (n >= 2000) {
163 			pbuf_free_many(freeset, n);
164 			n = 0;
165 		}
166 	}
167 	pbuf_global_dupe = pb;
168 	if (n > 0) {
169 		pbuf_free_many(freeset, n);
170 	}
171 
172 	// debug printout time...  map "undefined" lag values to zero.
173 
174 	//if (pbuf_lag      == 2000000000) pbuf_lag      = 0;
175 	//if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0;
176 
177 	if (lastage1 == 0) lastage1 = tick+2; // makes printout of "-2" (or "-1")
178 	if (lastage2 == 0) lastage2 = tick+2;
179 
180 	/*
181 
182 	static int show_zeros = 1;
183 
184 	if (show_zeros || n1 || n2 || pbuf_lag  || pbuf_dupe_lag) {
185 		// report only when there is something to report...
186 		hlog( LOG_DEBUG,
187 		      "global_pbuf_purger()  freed %d/%d main pbufs, %d/%d dupe bufs, lags: %d/%d  Ages: %d/%d",
188 		      n1, pbuf_global_count, n2, pbuf_global_dupe_count,
189 		      pbuf_lag, pbuf_dupe_lag,
190 		      (int)(tick-lastage1), (int)(tick-lastage2) );
191 
192 		if (!(n1 || n2 || pbuf_lag || pbuf_dupe_lag))
193 			show_zeros = 0;
194 		else
195 			show_zeros = 1;
196 	}
197 	*/
198 }
199 
200 
201 
202 /*
203  *	The cellmalloc does not need internal MUTEX, it is being used in single thread..
204  */
205 
dupecheck_init(void)206 void dupecheck_init(void)
207 {
208 #ifndef _FOR_VALGRIND_
209 	dupecheck_cells = cellinit( "dupecheck",
210 				    sizeof(struct dupe_record_t),
211 				    __alignof__(struct dupe_record_t),
212 				    CELLMALLOC_POLICY_LIFO | CELLMALLOC_POLICY_NOMUTEX,
213 				    2048 /* 2 MB at the time */,
214 				    0 /* minfree */);
215 #endif
216 
217 #ifdef USE_EVENTFD
218 	dupecheck_eventfd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
219 	if (dupecheck_eventfd < 0) {
220 		hlog(LOG_ERR, "dupecheck: eventfd init failed: %s", strerror(errno));
221 		exit(1);
222 	}
223 	dupecheck_eventfd_poll.fd = dupecheck_eventfd;
224 	dupecheck_eventfd_poll.events = POLLIN;
225 	hlog(LOG_DEBUG, "dupecheck: eventfd initialized on fd %d", dupecheck_eventfd);
226 #endif
227 
228 }
229 
dupecheck_db_alloc(int len)230 static struct dupe_record_t *dupecheck_db_alloc(int len)
231 {
232 	struct dupe_record_t *dp;
233 #ifndef _FOR_VALGRIND_
234 	if (dupecheck_free) { /* pick from free chain */
235 		dp = dupecheck_free;
236 		dupecheck_free = dp->next;
237 	} else
238 		dp = cellmalloc(dupecheck_cells);
239 	if (!dp) {
240 		hlog(LOG_ERR, "dupecheck: cellmalloc failed");
241 		return NULL;
242 	}
243 #else
244 	dp = hmalloc(len + sizeof(*dp));
245 #endif
246 	memset(dp, 0, sizeof(*dp));
247 	dp->len = len;
248 	dp->packet = dp->packetbuf;
249 	if (len > sizeof(dp->packetbuf))
250 		dp->packet = hmalloc(len+1);
251 
252 	++dupecheck_cellgauge;
253 
254 	return dp;
255 }
256 
dupecheck_db_free(struct dupe_record_t * dp)257 static void dupecheck_db_free(struct dupe_record_t *dp)
258 {
259 #ifndef _FOR_VALGRIND_
260 	if (dp->packet != dp->packetbuf)
261 		hfree(dp->packet);
262 	dp->next = dupecheck_free;
263 	dupecheck_free = dp;
264 	// cellfree(dupecheck_cells, dp);
265 #else
266 	hfree(dp);
267 #endif
268 	--dupecheck_cellgauge;
269 }
270 
271 /*	The  dupecheck_cleanup() is for regular database cleanups,
272  *	Call this about once a minute.
273  *
274  *	Note: entry validity is possibly shorter time than the cleanup
275  *	invocation interval!
276  */
dupecheck_cleanup(void)277 static void dupecheck_cleanup(void)
278 {
279 	struct dupe_record_t *dp, **dpp;
280 	time_t expiretime = tick - dupefilter_storetime;
281 	time_t futuretime = tick + dupefilter_storetime;
282 	int cleancount = 0, i;
283 
284 	for (i = 0; i < DUPECHECK_DB_SIZE; ++i) {
285 		dpp = & dupecheck_db[i];
286 		while (( dp = *dpp )) {
287 			if (dp->t < expiretime || dp->t > futuretime) {
288 				/* Old... or too far in the future, discard. */
289 				*dpp = dp->next;
290 				dp->next = NULL;
291 				dupecheck_db_free(dp);
292 				++cleancount;
293 				continue;
294 			}
295 			/* No expiry, just advance the pointer */
296 			dpp = &dp->next;
297 		}
298 	}
299 	// hlog( LOG_DEBUG, "dupecheck_cleanup() removed %d entries, count now %ld",
300 	//       cleancount, dupecheck_cellgauge );
301 }
302 
303 /*
304  *	Append a dupecheck record in a leaf list of the hash
305  */
306 
dupecheck_append(struct dupe_record_t ** dpp,uint32_t hash,int addrlen,const char * addr,int datalen,const char * data)307 static int dupecheck_append(struct dupe_record_t **dpp, uint32_t hash, int addrlen, const char *addr, int datalen, const char *data)
308 {
309 	struct dupe_record_t *dp;
310 
311 	dp = dupecheck_db_alloc(addrlen + datalen);
312 	if (!dp)
313 		return -1; // alloc error!
314 
315 	*dpp = dp;
316 	memcpy(dp->packet, addr, addrlen);
317 	memcpy(dp->packet + addrlen, data, datalen);
318 	//hlog(LOG_DEBUG, "dupecheck_append '%.*s'", addrlen+datalen, dp->packet);
319 	dp->hash = hash;
320 	dp->t   = tick; /* Use the current timestamp instead of the arrival time.
321 			  If our incoming worker, or dupecheck, is lagging for
322 			  reason or another (for example, a huge incoming burst
323 			  of traffic), using the arrival time instead of current
324 			  time could make the dupecheck db entry expire too early.
325 			  In an extreme trouble case, we could expire dupecheck db
326 			  entries very soon after the packet has gone out from us,
327 			  which would make loops more likely and possibly increase
328 			  the traffic and make us lag even more.
329 			  This timestamp should be closer to the *outgoing* time
330 			  than the *incoming* time, and current timestamp is a
331 			  good middle ground. Simulator is not important.
332 			*/
333 	return 0;
334 }
335 
dupecheck_add_buf(const char * s,int len,int dtype)336 static int dupecheck_add_buf(const char *s, int len, int dtype)
337 {
338 	uint32_t hash, idx;
339 	struct dupe_record_t **dpp, *dp;
340 
341 	//hlog(LOG_DEBUG, "dupecheck_add_buf '%.*s'", len, s);
342 
343 	hash = keyhash(s, len, 0);
344 	idx  = hash;
345 
346 	idx ^= (idx >> 13); /* fold the hash bits.. */
347 	idx ^= (idx >> 26); /* fold the hash bits.. */
348 	idx = idx % DUPECHECK_DB_SIZE;
349 	dpp = &dupecheck_db[idx];
350 
351 	while (*dpp) {
352 		dp = *dpp;
353 		if (dp->hash == hash) {
354 			// HASH match!
355 			if (dp->len == len &&
356 			    memcmp(s, dp->packet, len) == 0) {
357 				// PACKET MATCH!
358 				//hlog(LOG_DEBUG, "dupecheck_add_buf got it already: %.*s", len, s);
359 				dp->t = tick;
360 				return 0; /* no need to add, we have it */
361 			}
362 			// no packet match.. check next
363 		}
364 		dpp = &dp->next;
365 	}
366 	// dpp points to pointer at the tail of the chain
367 
368 	dp = dupecheck_db_alloc(len);
369 	if (!dp)
370 		return -1; // alloc error!
371 
372 	*dpp = dp;
373 	memcpy(dp->packet, s, len);
374 	//hlog(LOG_DEBUG, "dupecheck_add_buf appended '%.*s'", len, s);
375 	dp->hash = hash;
376 	dp->t = tick;
377 	dp->dtype = dtype;
378 
379 	return 0;
380 }
381 
382 /*
383  *	mangle packet in common ways and store mangled versions
384  *	in dupecheck db, so that the mangled versions will be dropped
385  */
386 
dupecheck_mangle_store(const char * addr,int addrlen,const char * data,int datalen)387 static int dupecheck_mangle_store(const char *addr, int addrlen, const char *data, int datalen)
388 {
389 	char ib[PACKETLEN_MAX];
390 	char tb1[PACKETLEN_MAX];
391 	char tb2[PACKETLEN_MAX];
392 	char tb3[PACKETLEN_MAX];
393 	int ilen;
394 	int tlen1, tlen2, tlen3;
395 	int i;
396 	char c;
397 
398 	/* TODO: dupecheck_mangle_store: Check for the necessity to do
399 	 * any futher packet scans when doing the initial scan
400 	 * (check for 8-bit / low data => optimize /shortcut)
401 	 */
402 
403 	ilen = addrlen + datalen;
404 
405 	if (ilen > PACKETLEN_MAX)
406 		return -1;
407 
408 	/* create a copy of normal packet data */
409 	memcpy(ib, addr, addrlen);
410 	memcpy(ib + addrlen, data, datalen);
411 
412 	//hlog(LOG_DEBUG, "dupecheck_mangle_store ib: '%.*s'", ilen, ib);
413 
414 	/********************************************/
415 	/* remove spaces from the end of the packet */
416 	memcpy(tb1, ib, ilen);
417 	tlen1 = ilen;
418 	while (tlen1 > 0 && tb1[tlen1-1] == ' ')
419 		--tlen1;
420 
421 	if (tlen1 != ilen) {
422 		//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d spaces: '%.*s'", ilen-tlen1, tlen1, tb1);
423 		dupecheck_add_buf(tb1, tlen1, DTYPE_SPACE_TRIM);
424 	}
425 
426 	/*************************/
427 	/* tb1: 8th bit data deleted
428 	 * tb2: 8th bit is cleared
429 	 * tb3: 8th bit replaced with a space
430 	 */
431 	tlen1 = tlen2 = tlen3 = 0;
432 	for (i = 0; i < ilen; i++) {
433 		c = ib[i] & 0x7F;
434 		tb2[tlen2++] = c;
435 		if (ib[i] != c) {
436 			/* high bit is on */
437 			tb3[tlen3++] = ' ';
438 		} else {
439 			/* 7-bit char */
440 			tb1[tlen1++] = c;
441 			tb3[tlen3++] = c;
442 		}
443 	}
444 
445 	if (tlen1 != ilen) {
446 		//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed  %d 8-bit chars: '%.*s'", ilen-tlen1, tlen1, tb1);
447 		//hlog(LOG_DEBUG, "dupecheck_mangle_store: ANDed    %d 8-bit chars: '%.*s'", ilen-tlen1, tlen2, tb2);
448 		//hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d 8-bit chars: '%.*s'", ilen-tlen1, tlen3, tb3);
449 		dupecheck_add_buf(tb1, tlen1, DTYPE_STRIP_8BIT);
450 		dupecheck_add_buf(tb2, tlen2, DTYPE_CLEAR_8BIT);
451 		dupecheck_add_buf(tb3, tlen3, DTYPE_SPACED_8BIT);
452 	}
453 
454 	/**********************************************
455 	 * tb1: Low data (0 <= x < 0x20 deleted
456 	 * tb2: Low data replaced with spaces
457 	 */
458 	tlen1 = tlen2 = 0;
459 	for (i = 0; i < ilen; i++) {
460 		c = ib[i];
461 		if (c < 0x20 && c > 0) {
462 			/* low data, tb2 gets a space and tb1 gets nothing */
463 			tb2[tlen2++] = ' ';
464 		} else {
465 			/* regular stuff */
466 			tb1[tlen1++] = c;
467 			tb2[tlen2++] = c;
468 		}
469 	}
470 
471 	if (tlen1 != ilen) {
472 		/* if there was low data, store it */
473 		//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed  %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1);
474 		//hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2);
475 		dupecheck_add_buf(tb1, tlen1, DTYPE_LOWDATA_STRIP);
476 		dupecheck_add_buf(tb2, tlen2, DTYPE_LOWDATA_SPACED);
477 	}
478 
479 	/**********************************************
480 	 * tb1: Del characters (0x7f) deleted
481 	 * tb2: Del characters replaced with spaces
482 	 */
483 	tlen1 = tlen2 = 0;
484 	for (i = 0; i < ilen; i++) {
485 		c = ib[i];
486 		if (c == 0x7f) {
487 			/* low data, tb2 gets a space and tb1 gets nothing */
488 			tb2[tlen2++] = ' ';
489 		} else {
490 			/* regular stuff */
491 			tb1[tlen1++] = c;
492 			tb2[tlen2++] = c;
493 		}
494 	}
495 
496 	if (tlen1 != ilen) {
497 		/* if there was low data, store it */
498 		//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed  %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1);
499 		//hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2);
500 		dupecheck_add_buf(tb1, tlen1, DTYPE_DEL_STRIP);
501 		dupecheck_add_buf(tb2, tlen2, DTYPE_DEL_SPACED);
502 	}
503 
504 	return 0;
505 }
506 
507 /*
508  *	check a single packet for duplicates
509  */
510 
dupecheck(struct pbuf_t * pb)511 static int dupecheck(struct pbuf_t *pb)
512 {
513 	/* check a single packet */
514 	// pb->flags |= F_DUPE; /* this is a duplicate! */
515 
516 	int i;
517 	int addrlen;  // length of the address part
518 	int datalen;  // length of the payload
519 	uint32_t hash, idx;
520 	const char *addr;
521 	const char *data;
522 	struct dupe_record_t **dpp, *dp;
523 	time_t expiretime = tick -  dupefilter_storetime;
524 
525 	// 1) collect canonic rep of the packet
526 	addr    = pb->data;
527 	addrlen = pb->dstcall_end_or_ssid - addr;
528 
529 	data    = pb->info_start;
530 	datalen = pb->packet_len - (data - pb->data) - 2; // ignore CRLF: -2
531 
532 	/* TODO:
533 	 * Do duplicate checking on an unmodified packet
534 	 * (no space trimming or anything), but do store
535 	 * both trimmed and untrimmed version (if they differ)
536 	 * separately to the db.
537 	 * This way a space-trimmed second packet will not
538 	 * pass (mangled packet), but a non-trimmed second
539 	 * packet will pass if the mangled version
540 	 * came in first.
541 	 */
542 
543 	// there are no 3rd-party frames in APRS-IS ...
544 
545 	// 2) calculate checksum (from disjoint memory areas)
546 
547 	hash = keyhash(addr, addrlen, 0);
548 	hash = keyhash(data, datalen, hash);
549 	idx  = hash;
550 
551 	// 3) lookup if same checksum is in some hash bucket chain
552 	//  3b) compare packet...
553 	//    3b1) flag as F_DUPE if so
554 	idx ^= (idx >> 13); /* fold the hash bits.. */
555 	idx ^= (idx >> 26); /* fold the hash bits.. */
556 	i = idx % DUPECHECK_DB_SIZE;
557 	dpp = &dupecheck_db[i];
558 	while (*dpp) {
559 		dp = *dpp;
560 		if (dp->hash == hash &&
561 		    dp->t >= expiretime) {
562 			// HASH match!  And not too old!
563 			if (dp->len == addrlen + datalen &&
564 			    memcmp(addr, dp->packet, addrlen) == 0 &&
565 			    memcmp(data, dp->packet + addrlen, datalen) == 0) {
566 				// PACKET MATCH!
567 				//hlog(LOG_DEBUG, "Dupe: %.*s", pb->packet_len - 2, pb->data);
568 				//hlog(LOG_DEBUG, "Orig: %.*s %.*s", addrlen, dp->addresses, datalen, dp->packet);
569 				pb->flags |= F_DUPE;
570 				filter_postprocess_dupefilter(pb);
571 				if (dp->dtype >= 0 && dp->dtype < DTYPE_MAX)
572 					dupecheck_dupetypes[dp->dtype]++;
573 				return F_DUPE;
574 			}
575 			// no packet match.. check next
576 		}
577 		dpp = &dp->next;
578 	}
579 	// dpp points to pointer at the tail of the chain
580 
581 	// 4) Add comparison copy of non-dupe into dupe-db
582 	if (dupecheck_append(dpp, hash, addrlen, addr, datalen, data) == -1)
583 		return -1;
584 
585 	// 5) mangle packet in a few common ways, and store to dupe-db
586 	dupecheck_mangle_store(addr, addrlen, data, datalen);
587 
588 	return 0;
589 }
590 
591 /*
592  *	Worker asks for info on outgoing lag to adjust its main-loop delays
593  *	and priorities
594  */
595 
outgoing_lag_report(struct worker_t * self,int * lag,int * dupelag)596 int  outgoing_lag_report(struct worker_t *self, int *lag, int *dupelag)
597 {
598 	int lag1 = pbuf_seqnum_lag(dupecheck_seqnum, self->last_pbuf_seqnum);
599 	int lag2 = pbuf_seqnum_lag(dupecheck_dupe_seqnum, self->last_pbuf_dupe_seqnum);
600 
601 	if (lag)     *lag     = lag1;
602 	if (dupelag) *dupelag = lag2;
603 
604 	if (lag1 == 2000000000) lag1 = 0;
605 	if (lag2 == 2000000000) lag2 = 0;
606 
607 	if (lag1 < lag2) lag1 = lag2;
608 
609 	return lag1; // Higher of the two..
610 }
611 
dupecheck_drain_worker(struct worker_t * w,struct pbuf_t *** pb_out_prevp,struct pbuf_t ** pb_out_last,struct pbuf_t *** pb_out_dupe_prevp,struct pbuf_t ** pb_out_dupe_last,int * pb_out_count,int * pb_out_dupe_count)612 static int dupecheck_drain_worker(struct worker_t *w,
613 	struct pbuf_t ***pb_out_prevp, struct pbuf_t **pb_out_last,
614 	struct pbuf_t ***pb_out_dupe_prevp, struct pbuf_t **pb_out_dupe_last,
615 	int *pb_out_count, int *pb_out_dupe_count)
616 {
617 	struct pbuf_t *pb_list;
618 	struct pbuf_t *pb, *pbnext;
619 	int n = 0;
620 	int me;
621 
622 	/* grab worker's list of packets */
623 	if ((me = pthread_mutex_lock(&w->pbuf_incoming_mutex))) {
624 		hlog(LOG_ERR, "dupecheck_drain_worker: could not lock pbuf_incoming_mutex: %s", strerror(me));
625 		return 0;
626 	}
627 	pb_list = w->pbuf_incoming;
628 	w->pbuf_incoming = NULL;
629 	w->pbuf_incoming_last = &w->pbuf_incoming;
630 	//int c = w->pbuf_incoming_count;
631 	w->pbuf_incoming_count = 0;
632 	if ((me = pthread_mutex_unlock(&w->pbuf_incoming_mutex))) {
633 		hlog(LOG_ERR, "dupecheck_drain_worker: could not unlock pbuf_incoming_mutex: %s", strerror(me));
634 	}
635 
636 	//hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d",
637 	//     c, w->id, dupecheck_seqnum);
638 
639 	for (pb = pb_list; (pb); pb = pbnext) {
640 		if (pb->t > tick + 1) {
641 			hlog(LOG_ERR, "dupecheck: drain got packet from future %d with t %d > tick %d, worker %d!\n%*s",
642 				pb->seqnum, pb->t, tick, w->id, pb->packet_len-2, pb->data);
643 		} else if (tick - pb->t > 10) {
644 			hlog(LOG_ERR, "dupecheck: drain got packet %d aged %d sec from worker %d\n%*s",
645 				pb->seqnum, tick - pb->t, w->id, pb->packet_len-2, pb->data);
646 		}
647 
648 		int rc = dupecheck(pb);
649 		pbnext = pb->next; // it may get modified below..
650 
651 		if (rc == 0) {
652 			/* put non-duplicate packet in history database
653 			 * and let filter module do it's thing, if historydb
654 			 * is enabled (disabled if no filtered listeners
655 			 * configured, for memory savings)
656 			 */
657 			if (have_filtered_listeners) {
658 				historydb_insert(pb);
659 				filter_postprocess_dupefilter(pb);
660 			}
661 
662 			// Not duplicate
663 			**pb_out_prevp = pb;
664 			*pb_out_prevp = &pb->next;
665 			*pb_out_last  = pb;
666 			pb->seqnum = ++dupecheck_seqnum;
667 			*pb_out_count = *pb_out_count + 1;
668 		} else {
669 			// Duplicate
670 			**pb_out_dupe_prevp = pb;
671 			*pb_out_dupe_prevp = &pb->next;
672 			*pb_out_dupe_last  = pb;
673 			pb->seqnum = ++dupecheck_dupe_seqnum;
674 			*pb_out_dupe_count = *pb_out_dupe_count + 1;
675 			//hlog(LOG_DEBUG, "is duplicate");
676 		}
677 		n++;
678 	}
679 
680 	return n;
681 }
682 
683 /*
684  *	Dupecheck thread
685  */
686 
dupecheck_thread(void)687 static void dupecheck_thread(void)
688 {
689 	sigset_t sigs_to_block;
690 	struct worker_t *w;
691 	struct pbuf_t *pb_out, **pb_out_prevp, *pb_out_last;
692 	struct pbuf_t *pb_out_dupe, **pb_out_dupe_prevp, *pb_out_dupe_last;
693 	int n;
694 	int e;
695 	int c, d;
696 	int pb_out_count, pb_out_dupe_count;
697 	time_t cleanup_tick = tick;
698 
699 #ifndef USE_EVENTFD
700 	struct timespec sleepspec;
701 	sleepspec.tv_sec = 0;
702 	sleepspec.tv_nsec = 30 * 1000 * 1000;
703 #endif
704 
705 	pthreads_profiling_reset("dupecheck");
706 
707 	sigemptyset(&sigs_to_block);
708 	sigaddset(&sigs_to_block, SIGALRM);
709 	sigaddset(&sigs_to_block, SIGINT);
710 	sigaddset(&sigs_to_block, SIGTERM);
711 	sigaddset(&sigs_to_block, SIGQUIT);
712 	sigaddset(&sigs_to_block, SIGHUP);
713 	sigaddset(&sigs_to_block, SIGURG);
714 	sigaddset(&sigs_to_block, SIGPIPE);
715 	sigaddset(&sigs_to_block, SIGUSR1);
716 	sigaddset(&sigs_to_block, SIGUSR2);
717 	pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
718 
719 	hlog(LOG_INFO, "Dupecheck thread ready.");
720 
721 	while (!dupecheck_shutting_down) {
722 		n = d = 0;
723 		pb_out       = NULL;
724 		pb_out_prevp = &pb_out;
725 		pb_out_dupe  = NULL;
726 		pb_out_dupe_prevp = &pb_out_dupe;
727 		pb_out_count      = pb_out_dupe_count    = 0;
728 		pb_out_last       = pb_out_dupe_last     = NULL;
729 
730 		/* walk through worker threads */
731 		for (w = worker_threads; (w); w = w->next) {
732 			/* if there are items in the worker's pbuf_incoming, grab them and process */
733 			if (!w->pbuf_incoming)
734 				continue;
735 
736 			n += dupecheck_drain_worker(w,
737 				&pb_out_prevp, &pb_out_last,
738 				&pb_out_dupe_prevp, &pb_out_dupe_last,
739 				&pb_out_count, &pb_out_dupe_count);
740 		}
741 
742 		if ((http_worker) && http_worker->pbuf_incoming) {
743 			n += dupecheck_drain_worker(http_worker,
744 				&pb_out_prevp, &pb_out_last,
745 				&pb_out_dupe_prevp, &pb_out_dupe_last,
746 				&pb_out_count, &pb_out_dupe_count);
747 		}
748 
749 		if ((udp_worker) && udp_worker->pbuf_incoming) {
750 			n += dupecheck_drain_worker(udp_worker,
751 				&pb_out_prevp, &pb_out_last,
752 				&pb_out_dupe_prevp, &pb_out_dupe_last,
753 				&pb_out_count, &pb_out_dupe_count);
754 		}
755 
756 		// terminate those out-chains in every case..
757 		*pb_out_prevp = NULL;
758 		*pb_out_dupe_prevp = NULL;
759 
760 		/* put packets in the global buffer */
761 		if (pb_out || pb_out_dupe) {
762 			if ((e = rwl_wrlock(&pbuf_global_rwlock))) {
763 				hlog(LOG_CRIT, "dupecheck: Failed to wrlock pbuf_global_rwlock!");
764 				exit(1);
765 			}
766 
767 			if (pb_out) {
768 				*pbuf_global_prevp = pb_out;
769 				pbuf_global_prevp  = pb_out_prevp;
770 				pbuf_global_count += pb_out_count;
771 			}
772 
773 			if (pb_out_dupe) {
774 				*pbuf_global_dupe_prevp = pb_out_dupe;
775 				pbuf_global_dupe_prevp  = pb_out_dupe_prevp;
776 				pbuf_global_dupe_count += pb_out_dupe_count;
777 			}
778 
779 			if ((e = rwl_wrunlock(&pbuf_global_rwlock))) {
780 				hlog(LOG_CRIT, "dupecheck: Failed to wrunlock pbuf_global_rwlock!");
781 				exit(1);
782 			}
783 		}
784 
785 		dupecheck_outcount  += pb_out_count;
786 		dupecheck_dupecount += pb_out_dupe_count;
787 
788 		if (cleanup_tick <= tick) { // once in a (simulated) minute or so..
789 			cleanup_tick = tick + 10;
790 
791 			/*
792 			if ((e = rwl_wrlock(&pbuf_global_rwlock))) {
793 				hlog(LOG_CRIT, "dupecheck: Failed to wrlock pbuf_global_rwlock!");
794 				exit(1);
795 			}
796 			*/
797 
798 			/* walk through worker threads */
799 			int worker_pbuf_lag;
800 			int worker_pbuf_dupe_lag;
801 			worker_pbuf_lag = worker_pbuf_dupe_lag = -1;
802 			for (w = worker_threads; (w); w = w->next) {
803 				/* Find the highest worker lag count after we have appended
804 				 * the packets in the buffer.
805 				 */
806 				c = pbuf_seqnum_lag(dupecheck_seqnum, w->last_pbuf_seqnum);
807 				if (w->last_pbuf_seqnum == 0)
808 					c = 2000000000;
809 				if (c > worker_pbuf_lag)
810 					worker_pbuf_lag = c;
811 				c = pbuf_seqnum_lag(dupecheck_dupe_seqnum, w->last_pbuf_dupe_seqnum);
812 				if (c > worker_pbuf_dupe_lag)
813 					worker_pbuf_dupe_lag = c;
814 			}
815 
816 			global_pbuf_purger(0, worker_pbuf_lag, worker_pbuf_dupe_lag);
817 
818 			/*
819 			if ((e = rwl_wrunlock(&pbuf_global_rwlock))) {
820 				hlog(LOG_CRIT, "dupecheck: Failed to wrunlock pbuf_global_rwlock!");
821 				exit(1);
822 			}
823 			*/
824 
825 			dupecheck_cleanup();
826 		}
827 
828 		// if (n > 0)
829 		//    hlog(LOG_DEBUG, "Dupecheck did analyze %d packets, found %d duplicates", n, pb_out_dupe_count);
830 		/* sleep a little */
831 #ifdef USE_EVENTFD
832 		int p = poll(&dupecheck_eventfd_poll, 1, 1000);
833 		//hlog(LOG_DEBUG, "dupecheck: poll returned %d", p);
834 		if (p > 0) {
835 			uint64_t u;
836 			p = read(dupecheck_eventfd, &u, sizeof(uint64_t));
837 			//hlog(LOG_DEBUG, "dupecheck: eventfd read %d: %lu", p, u);
838 		}
839 #else
840 		nanosleep(&sleepspec, NULL);
841 #endif
842 	}
843 
844 	hlog( LOG_INFO, "Dupecheck thread shut down; seqnum=%u/%u",
845 	      pbuf_seqnum_lag(dupecheck_seqnum,(uint32_t)-2000),     // initial bias..
846 	      pbuf_seqnum_lag(dupecheck_dupe_seqnum,(uint32_t)-2000));
847 
848 	dupecheck_running = 0;
849 }
850 
851 /*
852  *	Start / stop dupecheck
853  */
854 
dupecheck_start(void)855 void dupecheck_start(void)
856 {
857 	if (dupecheck_running)
858 		return;
859 
860 	dupecheck_shutting_down = 0;
861 
862 	if (pthread_create(&dupecheck_th, &pthr_attrs, (void *)dupecheck_thread, NULL))
863 		perror("pthread_create failed for dupecheck_thread");
864 
865 	dupecheck_running = 1;
866 }
867 
dupecheck_stop(void)868 void dupecheck_stop(void)
869 {
870 	int e;
871 
872 	if (!dupecheck_running)
873 		return;
874 
875 	dupecheck_shutting_down = 1;
876 
877 #ifdef USE_EVENTFD
878 	/* wake up dupecheck from sleep */
879 	uint64_t u = 1;
880 	int i = write(dupecheck_eventfd, &u, sizeof(uint64_t));
881 	if (i != sizeof(uint64_t)) {
882 		hlog(LOG_ERR, "incoming_stop() failed to write to dupecheck_eventfd: %s", strerror(errno));
883 	}
884 #endif
885 
886 	if ((e = pthread_join(dupecheck_th, NULL)))
887 		hlog(LOG_ERR, "Could not pthread_join dupecheck_th: %s", strerror(e));
888 	else
889 		hlog(LOG_INFO, "Dupecheck thread has terminated.");
890 }
891 
892 /*	The  dupecheck_atend() is primarily for valgrind() to clean up dupecache.
893  */
dupecheck_atend(void)894 void dupecheck_atend(void)
895 {
896 	int i;
897 	struct dupe_record_t *dp, *dp2;
898 
899 	for (i = 0; i < DUPECHECK_DB_SIZE; ++i) {
900 		dp = dupecheck_db[i];
901 		while (dp) {
902 			dp2 = dp->next;
903 			dupecheck_db_free(dp);
904 			dp = dp2;
905 		}
906 		dupecheck_db[i] = NULL;
907 	}
908 #if 0 /* Well, not really...  valgrind did hfree() the dupecells,
909 	 and without valgrind we really are not interested of freeup of
910 	 the free chain... */
911 	dp = dupecheck_free;
912 	for ( ; dp ; dp = dp2 ) {
913 		dp2 = dp->next;
914 		cellfree(dupecheck_cells, dp);
915 	}
916 #endif
917 	global_pbuf_purger(1, -1, -1); // purge everything..
918 }
919 
920 /*
921  *	cellmalloc status
922  */
923 #ifndef _FOR_VALGRIND_
dupecheck_cell_stats(struct cellstatus_t * cellst)924 void dupecheck_cell_stats(struct cellstatus_t *cellst)
925 {
926 	// TODO: this is not quite thread safe, but may be OK
927 	cellstatus(dupecheck_cells, cellst);
928 }
929 #endif
930 
931 
932