1 /*
2 * aprsc
3 *
4 * (c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
5 *
6 * This program is licensed under the BSD license, which can be found
7 * in the file LICENSE.
8 *
9 */
10
11 /*
12 * dupecheck.c: the dupe-checking thread
13 */
14
15 #include <stdio.h>
16 #include <errno.h>
17 #include <string.h>
18 #include <signal.h>
19 #include <time.h>
20 #include <stdlib.h>
21 #include <pthread.h>
22
23 #ifdef USE_EVENTFD
24 #include <sys/eventfd.h>
25 #endif
26
27 #include "dupecheck.h"
28 #include "config.h"
29 #include "hlog.h"
30 #include "hmalloc.h"
31 #include "cellmalloc.h"
32 #include "keyhash.h"
33 #include "filter.h"
34 #include "historydb.h"
35 #include "http.h"
36 #include "accept.h"
37
38 int dupecheck_shutting_down;
39 int dupecheck_running;
40 pthread_t dupecheck_th;
41 long dupecheck_cellgauge;
42
43 int pbuf_global_count;
44 int pbuf_global_dupe_count;
45
46 int pbuf_global_count_limit = 5000; /* Real criteria is expirer.. */
47 int pbuf_global_dupe_count_limit = 100; /* .. but we set some minimum packet counts
48 into the global pbuf queue anyway. */
49
50 long long dupecheck_outcount; /* 64 bit counters for statistics */
51 long long dupecheck_dupecount;
52 long long dupecheck_dupetypes[DTYPE_MAX+1];
53
54 #define DUPECHECK_DB_SIZE 8192 /* Hash index table size */
55 struct dupe_record_t *dupecheck_db[DUPECHECK_DB_SIZE]; /* Hash index table */
56
57 #ifndef _FOR_VALGRIND_
58 struct dupe_record_t *dupecheck_free;
59 cellarena_t *dupecheck_cells;
60 #endif
61
62
63 volatile uint32_t dupecheck_seqnum = -2000; // Explicit early wrap-around..
64 volatile uint32_t dupecheck_dupe_seqnum = -2000; // Explicit early wrap-around..
65
66 #ifdef USE_EVENTFD
67 int dupecheck_eventfd = -1;
68 struct pollfd dupecheck_eventfd_poll;
69 #endif
70
pbuf_seqnum_lag(const uint32_t seqnum,const uint32_t pbuf_seq)71 static int pbuf_seqnum_lag(const uint32_t seqnum, const uint32_t pbuf_seq)
72 {
73 // The lag calculation method takes care of the value space
74 // wrap-around, thus this is not limited on on first 4 billion
75 // packets, or whatever smallish.. As long as there are less
76 // than 2 billion packets in the in-core value spaces.
77
78 int lag = (int32_t)(seqnum - pbuf_seq);
79
80 if (pbuf_seq == 0) // Worker without data.
81 lag = 2000000000;
82 // Presumption is that above mentioned situation has very short
83 // existence, but as it can happen, we flag it with such a high
84 // value that temporarily the global_pbuf_purger() will not
85 // purge any items out of the global queue.
86 return lag;
87 }
88
89
90 /*
91 * Global pbuf purger cleans out pbufs that are too old..
92 */
global_pbuf_purger(const int all,int pbuf_lag,int pbuf_dupe_lag)93 static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
94 {
95 struct pbuf_t *pb, *pb2;
96 struct pbuf_t *freeset[2002];
97 int n, n1, n2, lag;
98 time_t lastage1 = 0, lastage2 = 0;
99
100 time_t expire2 = tick - pbuf_global_dupe_expiration;
101 time_t expire1 = tick - pbuf_global_expiration;
102
103 if (all) {
104 pbuf_global_count_limit = 0;
105 pbuf_global_dupe_count_limit = 0;
106 expire1 = expire2 = tick+10;
107 }
108
109 pb = pbuf_global;
110 if (pb)
111 lastage1 = pb->t;
112 n = 0;
113 n1 = 0;
114 while ( pbuf_global_count > pbuf_global_count_limit && pb ) {
115
116 lastage1 = pb->t;
117 if (pb->t >= expire1)
118 break; // stop at newer than expire1
119
120 lag = pbuf_seqnum_lag(dupecheck_seqnum, pb->seqnum);
121 if (pbuf_lag >= lag) {
122 hlog(LOG_DEBUG, "global_pbuf_purger: stop at lag %d, dupecheck at %d, pb %d", lag, dupecheck_seqnum, pb->seqnum);
123 break; // some output-worker is lagging behind this item!
124 }
125
126 freeset[n++] = pb;
127 ++n1;
128 --pbuf_global_count;
129 // dissociate the pbuf from the chain
130 pb2 = pb->next; pb->next = NULL; pb = pb2;
131 if (n >= 2000) {
132 pbuf_free_many(freeset, n);
133 n = 0;
134 }
135 }
136 pbuf_global = pb;
137 if (n > 0) {
138 pbuf_free_many(freeset, n);
139 }
140
141 pb = pbuf_global_dupe;
142 if (pb)
143 lastage2 = pb->t;
144 n = 0;
145 n2 = 0;
146 while ( pbuf_global_dupe_count > pbuf_global_dupe_count_limit && pb ) {
147
148 lastage2 = pb->t;
149 if (pb->t >= expire2)
150 break; // stop at newer than expire2
151 lag = pbuf_seqnum_lag(dupecheck_dupe_seqnum, pb->seqnum);
152 if (pbuf_dupe_lag >= lag) {
153 hlog(LOG_DEBUG, "global_pbuf_purger: dupe stop at lag %d, dupecheck at %d, pb %d", lag, dupecheck_seqnum, pb->seqnum);
154 break; // some output-worker is lagging behind this item!
155 }
156
157 freeset[n++] = pb;
158 ++n2;
159 --pbuf_global_dupe_count;
160 // dissociate the pbuf from the chain
161 pb2 = pb->next; pb->next = NULL; pb = pb2;
162 if (n >= 2000) {
163 pbuf_free_many(freeset, n);
164 n = 0;
165 }
166 }
167 pbuf_global_dupe = pb;
168 if (n > 0) {
169 pbuf_free_many(freeset, n);
170 }
171
172 // debug printout time... map "undefined" lag values to zero.
173
174 //if (pbuf_lag == 2000000000) pbuf_lag = 0;
175 //if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0;
176
177 if (lastage1 == 0) lastage1 = tick+2; // makes printout of "-2" (or "-1")
178 if (lastage2 == 0) lastage2 = tick+2;
179
180 /*
181
182 static int show_zeros = 1;
183
184 if (show_zeros || n1 || n2 || pbuf_lag || pbuf_dupe_lag) {
185 // report only when there is something to report...
186 hlog( LOG_DEBUG,
187 "global_pbuf_purger() freed %d/%d main pbufs, %d/%d dupe bufs, lags: %d/%d Ages: %d/%d",
188 n1, pbuf_global_count, n2, pbuf_global_dupe_count,
189 pbuf_lag, pbuf_dupe_lag,
190 (int)(tick-lastage1), (int)(tick-lastage2) );
191
192 if (!(n1 || n2 || pbuf_lag || pbuf_dupe_lag))
193 show_zeros = 0;
194 else
195 show_zeros = 1;
196 }
197 */
198 }
199
200
201
202 /*
203 * The cellmalloc does not need internal MUTEX, it is being used in single thread..
204 */
205
dupecheck_init(void)206 void dupecheck_init(void)
207 {
208 #ifndef _FOR_VALGRIND_
209 dupecheck_cells = cellinit( "dupecheck",
210 sizeof(struct dupe_record_t),
211 __alignof__(struct dupe_record_t),
212 CELLMALLOC_POLICY_LIFO | CELLMALLOC_POLICY_NOMUTEX,
213 2048 /* 2 MB at the time */,
214 0 /* minfree */);
215 #endif
216
217 #ifdef USE_EVENTFD
218 dupecheck_eventfd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
219 if (dupecheck_eventfd < 0) {
220 hlog(LOG_ERR, "dupecheck: eventfd init failed: %s", strerror(errno));
221 exit(1);
222 }
223 dupecheck_eventfd_poll.fd = dupecheck_eventfd;
224 dupecheck_eventfd_poll.events = POLLIN;
225 hlog(LOG_DEBUG, "dupecheck: eventfd initialized on fd %d", dupecheck_eventfd);
226 #endif
227
228 }
229
dupecheck_db_alloc(int len)230 static struct dupe_record_t *dupecheck_db_alloc(int len)
231 {
232 struct dupe_record_t *dp;
233 #ifndef _FOR_VALGRIND_
234 if (dupecheck_free) { /* pick from free chain */
235 dp = dupecheck_free;
236 dupecheck_free = dp->next;
237 } else
238 dp = cellmalloc(dupecheck_cells);
239 if (!dp) {
240 hlog(LOG_ERR, "dupecheck: cellmalloc failed");
241 return NULL;
242 }
243 #else
244 dp = hmalloc(len + sizeof(*dp));
245 #endif
246 memset(dp, 0, sizeof(*dp));
247 dp->len = len;
248 dp->packet = dp->packetbuf;
249 if (len > sizeof(dp->packetbuf))
250 dp->packet = hmalloc(len+1);
251
252 ++dupecheck_cellgauge;
253
254 return dp;
255 }
256
dupecheck_db_free(struct dupe_record_t * dp)257 static void dupecheck_db_free(struct dupe_record_t *dp)
258 {
259 #ifndef _FOR_VALGRIND_
260 if (dp->packet != dp->packetbuf)
261 hfree(dp->packet);
262 dp->next = dupecheck_free;
263 dupecheck_free = dp;
264 // cellfree(dupecheck_cells, dp);
265 #else
266 hfree(dp);
267 #endif
268 --dupecheck_cellgauge;
269 }
270
271 /* The dupecheck_cleanup() is for regular database cleanups,
272 * Call this about once a minute.
273 *
274 * Note: entry validity is possibly shorter time than the cleanup
275 * invocation interval!
276 */
dupecheck_cleanup(void)277 static void dupecheck_cleanup(void)
278 {
279 struct dupe_record_t *dp, **dpp;
280 time_t expiretime = tick - dupefilter_storetime;
281 time_t futuretime = tick + dupefilter_storetime;
282 int cleancount = 0, i;
283
284 for (i = 0; i < DUPECHECK_DB_SIZE; ++i) {
285 dpp = & dupecheck_db[i];
286 while (( dp = *dpp )) {
287 if (dp->t < expiretime || dp->t > futuretime) {
288 /* Old... or too far in the future, discard. */
289 *dpp = dp->next;
290 dp->next = NULL;
291 dupecheck_db_free(dp);
292 ++cleancount;
293 continue;
294 }
295 /* No expiry, just advance the pointer */
296 dpp = &dp->next;
297 }
298 }
299 // hlog( LOG_DEBUG, "dupecheck_cleanup() removed %d entries, count now %ld",
300 // cleancount, dupecheck_cellgauge );
301 }
302
303 /*
304 * Append a dupecheck record in a leaf list of the hash
305 */
306
dupecheck_append(struct dupe_record_t ** dpp,uint32_t hash,int addrlen,const char * addr,int datalen,const char * data)307 static int dupecheck_append(struct dupe_record_t **dpp, uint32_t hash, int addrlen, const char *addr, int datalen, const char *data)
308 {
309 struct dupe_record_t *dp;
310
311 dp = dupecheck_db_alloc(addrlen + datalen);
312 if (!dp)
313 return -1; // alloc error!
314
315 *dpp = dp;
316 memcpy(dp->packet, addr, addrlen);
317 memcpy(dp->packet + addrlen, data, datalen);
318 //hlog(LOG_DEBUG, "dupecheck_append '%.*s'", addrlen+datalen, dp->packet);
319 dp->hash = hash;
320 dp->t = tick; /* Use the current timestamp instead of the arrival time.
321 If our incoming worker, or dupecheck, is lagging for
322 reason or another (for example, a huge incoming burst
323 of traffic), using the arrival time instead of current
324 time could make the dupecheck db entry expire too early.
325 In an extreme trouble case, we could expire dupecheck db
326 entries very soon after the packet has gone out from us,
327 which would make loops more likely and possibly increase
328 the traffic and make us lag even more.
329 This timestamp should be closer to the *outgoing* time
330 than the *incoming* time, and current timestamp is a
331 good middle ground. Simulator is not important.
332 */
333 return 0;
334 }
335
dupecheck_add_buf(const char * s,int len,int dtype)336 static int dupecheck_add_buf(const char *s, int len, int dtype)
337 {
338 uint32_t hash, idx;
339 struct dupe_record_t **dpp, *dp;
340
341 //hlog(LOG_DEBUG, "dupecheck_add_buf '%.*s'", len, s);
342
343 hash = keyhash(s, len, 0);
344 idx = hash;
345
346 idx ^= (idx >> 13); /* fold the hash bits.. */
347 idx ^= (idx >> 26); /* fold the hash bits.. */
348 idx = idx % DUPECHECK_DB_SIZE;
349 dpp = &dupecheck_db[idx];
350
351 while (*dpp) {
352 dp = *dpp;
353 if (dp->hash == hash) {
354 // HASH match!
355 if (dp->len == len &&
356 memcmp(s, dp->packet, len) == 0) {
357 // PACKET MATCH!
358 //hlog(LOG_DEBUG, "dupecheck_add_buf got it already: %.*s", len, s);
359 dp->t = tick;
360 return 0; /* no need to add, we have it */
361 }
362 // no packet match.. check next
363 }
364 dpp = &dp->next;
365 }
366 // dpp points to pointer at the tail of the chain
367
368 dp = dupecheck_db_alloc(len);
369 if (!dp)
370 return -1; // alloc error!
371
372 *dpp = dp;
373 memcpy(dp->packet, s, len);
374 //hlog(LOG_DEBUG, "dupecheck_add_buf appended '%.*s'", len, s);
375 dp->hash = hash;
376 dp->t = tick;
377 dp->dtype = dtype;
378
379 return 0;
380 }
381
382 /*
383 * mangle packet in common ways and store mangled versions
384 * in dupecheck db, so that the mangled versions will be dropped
385 */
386
dupecheck_mangle_store(const char * addr,int addrlen,const char * data,int datalen)387 static int dupecheck_mangle_store(const char *addr, int addrlen, const char *data, int datalen)
388 {
389 char ib[PACKETLEN_MAX];
390 char tb1[PACKETLEN_MAX];
391 char tb2[PACKETLEN_MAX];
392 char tb3[PACKETLEN_MAX];
393 int ilen;
394 int tlen1, tlen2, tlen3;
395 int i;
396 char c;
397
398 /* TODO: dupecheck_mangle_store: Check for the necessity to do
399 * any futher packet scans when doing the initial scan
400 * (check for 8-bit / low data => optimize /shortcut)
401 */
402
403 ilen = addrlen + datalen;
404
405 if (ilen > PACKETLEN_MAX)
406 return -1;
407
408 /* create a copy of normal packet data */
409 memcpy(ib, addr, addrlen);
410 memcpy(ib + addrlen, data, datalen);
411
412 //hlog(LOG_DEBUG, "dupecheck_mangle_store ib: '%.*s'", ilen, ib);
413
414 /********************************************/
415 /* remove spaces from the end of the packet */
416 memcpy(tb1, ib, ilen);
417 tlen1 = ilen;
418 while (tlen1 > 0 && tb1[tlen1-1] == ' ')
419 --tlen1;
420
421 if (tlen1 != ilen) {
422 //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d spaces: '%.*s'", ilen-tlen1, tlen1, tb1);
423 dupecheck_add_buf(tb1, tlen1, DTYPE_SPACE_TRIM);
424 }
425
426 /*************************/
427 /* tb1: 8th bit data deleted
428 * tb2: 8th bit is cleared
429 * tb3: 8th bit replaced with a space
430 */
431 tlen1 = tlen2 = tlen3 = 0;
432 for (i = 0; i < ilen; i++) {
433 c = ib[i] & 0x7F;
434 tb2[tlen2++] = c;
435 if (ib[i] != c) {
436 /* high bit is on */
437 tb3[tlen3++] = ' ';
438 } else {
439 /* 7-bit char */
440 tb1[tlen1++] = c;
441 tb3[tlen3++] = c;
442 }
443 }
444
445 if (tlen1 != ilen) {
446 //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d 8-bit chars: '%.*s'", ilen-tlen1, tlen1, tb1);
447 //hlog(LOG_DEBUG, "dupecheck_mangle_store: ANDed %d 8-bit chars: '%.*s'", ilen-tlen1, tlen2, tb2);
448 //hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d 8-bit chars: '%.*s'", ilen-tlen1, tlen3, tb3);
449 dupecheck_add_buf(tb1, tlen1, DTYPE_STRIP_8BIT);
450 dupecheck_add_buf(tb2, tlen2, DTYPE_CLEAR_8BIT);
451 dupecheck_add_buf(tb3, tlen3, DTYPE_SPACED_8BIT);
452 }
453
454 /**********************************************
455 * tb1: Low data (0 <= x < 0x20 deleted
456 * tb2: Low data replaced with spaces
457 */
458 tlen1 = tlen2 = 0;
459 for (i = 0; i < ilen; i++) {
460 c = ib[i];
461 if (c < 0x20 && c > 0) {
462 /* low data, tb2 gets a space and tb1 gets nothing */
463 tb2[tlen2++] = ' ';
464 } else {
465 /* regular stuff */
466 tb1[tlen1++] = c;
467 tb2[tlen2++] = c;
468 }
469 }
470
471 if (tlen1 != ilen) {
472 /* if there was low data, store it */
473 //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1);
474 //hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2);
475 dupecheck_add_buf(tb1, tlen1, DTYPE_LOWDATA_STRIP);
476 dupecheck_add_buf(tb2, tlen2, DTYPE_LOWDATA_SPACED);
477 }
478
479 /**********************************************
480 * tb1: Del characters (0x7f) deleted
481 * tb2: Del characters replaced with spaces
482 */
483 tlen1 = tlen2 = 0;
484 for (i = 0; i < ilen; i++) {
485 c = ib[i];
486 if (c == 0x7f) {
487 /* low data, tb2 gets a space and tb1 gets nothing */
488 tb2[tlen2++] = ' ';
489 } else {
490 /* regular stuff */
491 tb1[tlen1++] = c;
492 tb2[tlen2++] = c;
493 }
494 }
495
496 if (tlen1 != ilen) {
497 /* if there was low data, store it */
498 //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1);
499 //hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2);
500 dupecheck_add_buf(tb1, tlen1, DTYPE_DEL_STRIP);
501 dupecheck_add_buf(tb2, tlen2, DTYPE_DEL_SPACED);
502 }
503
504 return 0;
505 }
506
507 /*
508 * check a single packet for duplicates
509 */
510
dupecheck(struct pbuf_t * pb)511 static int dupecheck(struct pbuf_t *pb)
512 {
513 /* check a single packet */
514 // pb->flags |= F_DUPE; /* this is a duplicate! */
515
516 int i;
517 int addrlen; // length of the address part
518 int datalen; // length of the payload
519 uint32_t hash, idx;
520 const char *addr;
521 const char *data;
522 struct dupe_record_t **dpp, *dp;
523 time_t expiretime = tick - dupefilter_storetime;
524
525 // 1) collect canonic rep of the packet
526 addr = pb->data;
527 addrlen = pb->dstcall_end_or_ssid - addr;
528
529 data = pb->info_start;
530 datalen = pb->packet_len - (data - pb->data) - 2; // ignore CRLF: -2
531
532 /* TODO:
533 * Do duplicate checking on an unmodified packet
534 * (no space trimming or anything), but do store
535 * both trimmed and untrimmed version (if they differ)
536 * separately to the db.
537 * This way a space-trimmed second packet will not
538 * pass (mangled packet), but a non-trimmed second
539 * packet will pass if the mangled version
540 * came in first.
541 */
542
543 // there are no 3rd-party frames in APRS-IS ...
544
545 // 2) calculate checksum (from disjoint memory areas)
546
547 hash = keyhash(addr, addrlen, 0);
548 hash = keyhash(data, datalen, hash);
549 idx = hash;
550
551 // 3) lookup if same checksum is in some hash bucket chain
552 // 3b) compare packet...
553 // 3b1) flag as F_DUPE if so
554 idx ^= (idx >> 13); /* fold the hash bits.. */
555 idx ^= (idx >> 26); /* fold the hash bits.. */
556 i = idx % DUPECHECK_DB_SIZE;
557 dpp = &dupecheck_db[i];
558 while (*dpp) {
559 dp = *dpp;
560 if (dp->hash == hash &&
561 dp->t >= expiretime) {
562 // HASH match! And not too old!
563 if (dp->len == addrlen + datalen &&
564 memcmp(addr, dp->packet, addrlen) == 0 &&
565 memcmp(data, dp->packet + addrlen, datalen) == 0) {
566 // PACKET MATCH!
567 //hlog(LOG_DEBUG, "Dupe: %.*s", pb->packet_len - 2, pb->data);
568 //hlog(LOG_DEBUG, "Orig: %.*s %.*s", addrlen, dp->addresses, datalen, dp->packet);
569 pb->flags |= F_DUPE;
570 filter_postprocess_dupefilter(pb);
571 if (dp->dtype >= 0 && dp->dtype < DTYPE_MAX)
572 dupecheck_dupetypes[dp->dtype]++;
573 return F_DUPE;
574 }
575 // no packet match.. check next
576 }
577 dpp = &dp->next;
578 }
579 // dpp points to pointer at the tail of the chain
580
581 // 4) Add comparison copy of non-dupe into dupe-db
582 if (dupecheck_append(dpp, hash, addrlen, addr, datalen, data) == -1)
583 return -1;
584
585 // 5) mangle packet in a few common ways, and store to dupe-db
586 dupecheck_mangle_store(addr, addrlen, data, datalen);
587
588 return 0;
589 }
590
591 /*
592 * Worker asks for info on outgoing lag to adjust its main-loop delays
593 * and priorities
594 */
595
outgoing_lag_report(struct worker_t * self,int * lag,int * dupelag)596 int outgoing_lag_report(struct worker_t *self, int *lag, int *dupelag)
597 {
598 int lag1 = pbuf_seqnum_lag(dupecheck_seqnum, self->last_pbuf_seqnum);
599 int lag2 = pbuf_seqnum_lag(dupecheck_dupe_seqnum, self->last_pbuf_dupe_seqnum);
600
601 if (lag) *lag = lag1;
602 if (dupelag) *dupelag = lag2;
603
604 if (lag1 == 2000000000) lag1 = 0;
605 if (lag2 == 2000000000) lag2 = 0;
606
607 if (lag1 < lag2) lag1 = lag2;
608
609 return lag1; // Higher of the two..
610 }
611
dupecheck_drain_worker(struct worker_t * w,struct pbuf_t *** pb_out_prevp,struct pbuf_t ** pb_out_last,struct pbuf_t *** pb_out_dupe_prevp,struct pbuf_t ** pb_out_dupe_last,int * pb_out_count,int * pb_out_dupe_count)612 static int dupecheck_drain_worker(struct worker_t *w,
613 struct pbuf_t ***pb_out_prevp, struct pbuf_t **pb_out_last,
614 struct pbuf_t ***pb_out_dupe_prevp, struct pbuf_t **pb_out_dupe_last,
615 int *pb_out_count, int *pb_out_dupe_count)
616 {
617 struct pbuf_t *pb_list;
618 struct pbuf_t *pb, *pbnext;
619 int n = 0;
620 int me;
621
622 /* grab worker's list of packets */
623 if ((me = pthread_mutex_lock(&w->pbuf_incoming_mutex))) {
624 hlog(LOG_ERR, "dupecheck_drain_worker: could not lock pbuf_incoming_mutex: %s", strerror(me));
625 return 0;
626 }
627 pb_list = w->pbuf_incoming;
628 w->pbuf_incoming = NULL;
629 w->pbuf_incoming_last = &w->pbuf_incoming;
630 //int c = w->pbuf_incoming_count;
631 w->pbuf_incoming_count = 0;
632 if ((me = pthread_mutex_unlock(&w->pbuf_incoming_mutex))) {
633 hlog(LOG_ERR, "dupecheck_drain_worker: could not unlock pbuf_incoming_mutex: %s", strerror(me));
634 }
635
636 //hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d",
637 // c, w->id, dupecheck_seqnum);
638
639 for (pb = pb_list; (pb); pb = pbnext) {
640 if (pb->t > tick + 1) {
641 hlog(LOG_ERR, "dupecheck: drain got packet from future %d with t %d > tick %d, worker %d!\n%*s",
642 pb->seqnum, pb->t, tick, w->id, pb->packet_len-2, pb->data);
643 } else if (tick - pb->t > 10) {
644 hlog(LOG_ERR, "dupecheck: drain got packet %d aged %d sec from worker %d\n%*s",
645 pb->seqnum, tick - pb->t, w->id, pb->packet_len-2, pb->data);
646 }
647
648 int rc = dupecheck(pb);
649 pbnext = pb->next; // it may get modified below..
650
651 if (rc == 0) {
652 /* put non-duplicate packet in history database
653 * and let filter module do it's thing, if historydb
654 * is enabled (disabled if no filtered listeners
655 * configured, for memory savings)
656 */
657 if (have_filtered_listeners) {
658 historydb_insert(pb);
659 filter_postprocess_dupefilter(pb);
660 }
661
662 // Not duplicate
663 **pb_out_prevp = pb;
664 *pb_out_prevp = &pb->next;
665 *pb_out_last = pb;
666 pb->seqnum = ++dupecheck_seqnum;
667 *pb_out_count = *pb_out_count + 1;
668 } else {
669 // Duplicate
670 **pb_out_dupe_prevp = pb;
671 *pb_out_dupe_prevp = &pb->next;
672 *pb_out_dupe_last = pb;
673 pb->seqnum = ++dupecheck_dupe_seqnum;
674 *pb_out_dupe_count = *pb_out_dupe_count + 1;
675 //hlog(LOG_DEBUG, "is duplicate");
676 }
677 n++;
678 }
679
680 return n;
681 }
682
683 /*
684 * Dupecheck thread
685 */
686
dupecheck_thread(void)687 static void dupecheck_thread(void)
688 {
689 sigset_t sigs_to_block;
690 struct worker_t *w;
691 struct pbuf_t *pb_out, **pb_out_prevp, *pb_out_last;
692 struct pbuf_t *pb_out_dupe, **pb_out_dupe_prevp, *pb_out_dupe_last;
693 int n;
694 int e;
695 int c, d;
696 int pb_out_count, pb_out_dupe_count;
697 time_t cleanup_tick = tick;
698
699 #ifndef USE_EVENTFD
700 struct timespec sleepspec;
701 sleepspec.tv_sec = 0;
702 sleepspec.tv_nsec = 30 * 1000 * 1000;
703 #endif
704
705 pthreads_profiling_reset("dupecheck");
706
707 sigemptyset(&sigs_to_block);
708 sigaddset(&sigs_to_block, SIGALRM);
709 sigaddset(&sigs_to_block, SIGINT);
710 sigaddset(&sigs_to_block, SIGTERM);
711 sigaddset(&sigs_to_block, SIGQUIT);
712 sigaddset(&sigs_to_block, SIGHUP);
713 sigaddset(&sigs_to_block, SIGURG);
714 sigaddset(&sigs_to_block, SIGPIPE);
715 sigaddset(&sigs_to_block, SIGUSR1);
716 sigaddset(&sigs_to_block, SIGUSR2);
717 pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
718
719 hlog(LOG_INFO, "Dupecheck thread ready.");
720
721 while (!dupecheck_shutting_down) {
722 n = d = 0;
723 pb_out = NULL;
724 pb_out_prevp = &pb_out;
725 pb_out_dupe = NULL;
726 pb_out_dupe_prevp = &pb_out_dupe;
727 pb_out_count = pb_out_dupe_count = 0;
728 pb_out_last = pb_out_dupe_last = NULL;
729
730 /* walk through worker threads */
731 for (w = worker_threads; (w); w = w->next) {
732 /* if there are items in the worker's pbuf_incoming, grab them and process */
733 if (!w->pbuf_incoming)
734 continue;
735
736 n += dupecheck_drain_worker(w,
737 &pb_out_prevp, &pb_out_last,
738 &pb_out_dupe_prevp, &pb_out_dupe_last,
739 &pb_out_count, &pb_out_dupe_count);
740 }
741
742 if ((http_worker) && http_worker->pbuf_incoming) {
743 n += dupecheck_drain_worker(http_worker,
744 &pb_out_prevp, &pb_out_last,
745 &pb_out_dupe_prevp, &pb_out_dupe_last,
746 &pb_out_count, &pb_out_dupe_count);
747 }
748
749 if ((udp_worker) && udp_worker->pbuf_incoming) {
750 n += dupecheck_drain_worker(udp_worker,
751 &pb_out_prevp, &pb_out_last,
752 &pb_out_dupe_prevp, &pb_out_dupe_last,
753 &pb_out_count, &pb_out_dupe_count);
754 }
755
756 // terminate those out-chains in every case..
757 *pb_out_prevp = NULL;
758 *pb_out_dupe_prevp = NULL;
759
760 /* put packets in the global buffer */
761 if (pb_out || pb_out_dupe) {
762 if ((e = rwl_wrlock(&pbuf_global_rwlock))) {
763 hlog(LOG_CRIT, "dupecheck: Failed to wrlock pbuf_global_rwlock!");
764 exit(1);
765 }
766
767 if (pb_out) {
768 *pbuf_global_prevp = pb_out;
769 pbuf_global_prevp = pb_out_prevp;
770 pbuf_global_count += pb_out_count;
771 }
772
773 if (pb_out_dupe) {
774 *pbuf_global_dupe_prevp = pb_out_dupe;
775 pbuf_global_dupe_prevp = pb_out_dupe_prevp;
776 pbuf_global_dupe_count += pb_out_dupe_count;
777 }
778
779 if ((e = rwl_wrunlock(&pbuf_global_rwlock))) {
780 hlog(LOG_CRIT, "dupecheck: Failed to wrunlock pbuf_global_rwlock!");
781 exit(1);
782 }
783 }
784
785 dupecheck_outcount += pb_out_count;
786 dupecheck_dupecount += pb_out_dupe_count;
787
788 if (cleanup_tick <= tick) { // once in a (simulated) minute or so..
789 cleanup_tick = tick + 10;
790
791 /*
792 if ((e = rwl_wrlock(&pbuf_global_rwlock))) {
793 hlog(LOG_CRIT, "dupecheck: Failed to wrlock pbuf_global_rwlock!");
794 exit(1);
795 }
796 */
797
798 /* walk through worker threads */
799 int worker_pbuf_lag;
800 int worker_pbuf_dupe_lag;
801 worker_pbuf_lag = worker_pbuf_dupe_lag = -1;
802 for (w = worker_threads; (w); w = w->next) {
803 /* Find the highest worker lag count after we have appended
804 * the packets in the buffer.
805 */
806 c = pbuf_seqnum_lag(dupecheck_seqnum, w->last_pbuf_seqnum);
807 if (w->last_pbuf_seqnum == 0)
808 c = 2000000000;
809 if (c > worker_pbuf_lag)
810 worker_pbuf_lag = c;
811 c = pbuf_seqnum_lag(dupecheck_dupe_seqnum, w->last_pbuf_dupe_seqnum);
812 if (c > worker_pbuf_dupe_lag)
813 worker_pbuf_dupe_lag = c;
814 }
815
816 global_pbuf_purger(0, worker_pbuf_lag, worker_pbuf_dupe_lag);
817
818 /*
819 if ((e = rwl_wrunlock(&pbuf_global_rwlock))) {
820 hlog(LOG_CRIT, "dupecheck: Failed to wrunlock pbuf_global_rwlock!");
821 exit(1);
822 }
823 */
824
825 dupecheck_cleanup();
826 }
827
828 // if (n > 0)
829 // hlog(LOG_DEBUG, "Dupecheck did analyze %d packets, found %d duplicates", n, pb_out_dupe_count);
830 /* sleep a little */
831 #ifdef USE_EVENTFD
832 int p = poll(&dupecheck_eventfd_poll, 1, 1000);
833 //hlog(LOG_DEBUG, "dupecheck: poll returned %d", p);
834 if (p > 0) {
835 uint64_t u;
836 p = read(dupecheck_eventfd, &u, sizeof(uint64_t));
837 //hlog(LOG_DEBUG, "dupecheck: eventfd read %d: %lu", p, u);
838 }
839 #else
840 nanosleep(&sleepspec, NULL);
841 #endif
842 }
843
844 hlog( LOG_INFO, "Dupecheck thread shut down; seqnum=%u/%u",
845 pbuf_seqnum_lag(dupecheck_seqnum,(uint32_t)-2000), // initial bias..
846 pbuf_seqnum_lag(dupecheck_dupe_seqnum,(uint32_t)-2000));
847
848 dupecheck_running = 0;
849 }
850
851 /*
852 * Start / stop dupecheck
853 */
854
dupecheck_start(void)855 void dupecheck_start(void)
856 {
857 if (dupecheck_running)
858 return;
859
860 dupecheck_shutting_down = 0;
861
862 if (pthread_create(&dupecheck_th, &pthr_attrs, (void *)dupecheck_thread, NULL))
863 perror("pthread_create failed for dupecheck_thread");
864
865 dupecheck_running = 1;
866 }
867
dupecheck_stop(void)868 void dupecheck_stop(void)
869 {
870 int e;
871
872 if (!dupecheck_running)
873 return;
874
875 dupecheck_shutting_down = 1;
876
877 #ifdef USE_EVENTFD
878 /* wake up dupecheck from sleep */
879 uint64_t u = 1;
880 int i = write(dupecheck_eventfd, &u, sizeof(uint64_t));
881 if (i != sizeof(uint64_t)) {
882 hlog(LOG_ERR, "incoming_stop() failed to write to dupecheck_eventfd: %s", strerror(errno));
883 }
884 #endif
885
886 if ((e = pthread_join(dupecheck_th, NULL)))
887 hlog(LOG_ERR, "Could not pthread_join dupecheck_th: %s", strerror(e));
888 else
889 hlog(LOG_INFO, "Dupecheck thread has terminated.");
890 }
891
892 /* The dupecheck_atend() is primarily for valgrind() to clean up dupecache.
893 */
dupecheck_atend(void)894 void dupecheck_atend(void)
895 {
896 int i;
897 struct dupe_record_t *dp, *dp2;
898
899 for (i = 0; i < DUPECHECK_DB_SIZE; ++i) {
900 dp = dupecheck_db[i];
901 while (dp) {
902 dp2 = dp->next;
903 dupecheck_db_free(dp);
904 dp = dp2;
905 }
906 dupecheck_db[i] = NULL;
907 }
908 #if 0 /* Well, not really... valgrind did hfree() the dupecells,
909 and without valgrind we really are not interested of freeup of
910 the free chain... */
911 dp = dupecheck_free;
912 for ( ; dp ; dp = dp2 ) {
913 dp2 = dp->next;
914 cellfree(dupecheck_cells, dp);
915 }
916 #endif
917 global_pbuf_purger(1, -1, -1); // purge everything..
918 }
919
920 /*
921 * cellmalloc status
922 */
923 #ifndef _FOR_VALGRIND_
dupecheck_cell_stats(struct cellstatus_t * cellst)924 void dupecheck_cell_stats(struct cellstatus_t *cellst)
925 {
926 // TODO: this is not quite thread safe, but may be OK
927 cellstatus(dupecheck_cells, cellst);
928 }
929 #endif
930
931
932