1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18 /**
19 * \file
20 *
21 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22 * \author Victor Julien <victor@inliniac.net>
23 */
24
25 #include "suricata-common.h"
26 #include "suricata.h"
27 #include "decode.h"
28 #include "conf.h"
29 #include "threadvars.h"
30 #include "tm-threads.h"
31 #include "runmodes.h"
32
33 #include "util-random.h"
34 #include "util-time.h"
35
36 #include "flow.h"
37 #include "flow-queue.h"
38 #include "flow-hash.h"
39 #include "flow-util.h"
40 #include "flow-var.h"
41 #include "flow-private.h"
42 #include "flow-timeout.h"
43 #include "flow-manager.h"
44 #include "flow-storage.h"
45 #include "flow-spare-pool.h"
46
47 #include "stream-tcp-private.h"
48 #include "stream-tcp-reassemble.h"
49 #include "stream-tcp.h"
50
51 #include "util-unittest.h"
52 #include "util-unittest-helper.h"
53 #include "util-byte.h"
54
55 #include "util-debug.h"
56 #include "util-privs.h"
57 #include "util-signal.h"
58
59 #include "threads.h"
60 #include "detect.h"
61 #include "detect-engine-state.h"
62 #include "stream.h"
63
64 #include "app-layer-parser.h"
65
66 #include "host-timeout.h"
67 #include "defrag-timeout.h"
68 #include "ippair-timeout.h"
69
70 #include "output-flow.h"
71 #include "util-validate.h"
72
73 /* Run mode selected at suricata.c */
74 extern int run_mode;
75
76 /** queue to pass flows to cleanup/log thread(s) */
77 FlowQueue flow_recycle_q;
78
79 /* multi flow mananger support */
80 static uint32_t flowmgr_number = 1;
81 /* atomic counter for flow managers, to assign instance id */
82 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
83
84 /* multi flow recycler support */
85 static uint32_t flowrec_number = 1;
86 /* atomic counter for flow recyclers, to assign instance id */
87 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
88 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
89 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
90
FlowTimeoutsInit(void)91 void FlowTimeoutsInit(void)
92 {
93 SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
94 }
95
FlowTimeoutsEmergency(void)96 void FlowTimeoutsEmergency(void)
97 {
98 SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
99 }
100
101 /* 1 seconds */
102 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
103 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
104 /* 0.3 seconds */
105 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
106 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
107 #define NEW_FLOW_COUNT_COND 10
108
109 typedef struct FlowTimeoutCounters_ {
110 uint32_t new;
111 uint32_t est;
112 uint32_t clo;
113 uint32_t byp;
114
115 uint32_t rows_checked;
116 uint32_t rows_skipped;
117 uint32_t rows_empty;
118 uint32_t rows_maxlen;
119
120 uint32_t flows_checked;
121 uint32_t flows_notimeout;
122 uint32_t flows_timeout;
123 uint32_t flows_timeout_inuse;
124 uint32_t flows_removed;
125 uint32_t flows_aside;
126 uint32_t flows_aside_needs_work;
127
128 uint32_t bypassed_count;
129 uint64_t bypassed_pkts;
130 uint64_t bypassed_bytes;
131 } FlowTimeoutCounters;
132
133 /**
134 * \brief Used to disable flow manager thread(s).
135 *
136 * \todo Kinda hackish since it uses the tv name to identify flow manager
137 * thread. We need an all weather identification scheme.
138 */
FlowDisableFlowManagerThread(void)139 void FlowDisableFlowManagerThread(void)
140 {
141 SCMutexLock(&tv_root_lock);
142 /* flow manager thread(s) is/are a part of mgmt threads */
143 for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
144 if (strncasecmp(tv->name, thread_name_flow_mgr,
145 strlen(thread_name_flow_mgr)) == 0)
146 {
147 TmThreadsSetFlag(tv, THV_KILL);
148 }
149 }
150 SCMutexUnlock(&tv_root_lock);
151
152 struct timeval start_ts;
153 struct timeval cur_ts;
154 gettimeofday(&start_ts, NULL);
155
156 again:
157 gettimeofday(&cur_ts, NULL);
158 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
159 FatalError(SC_ERR_SHUTDOWN, "unable to get all flow manager "
160 "threads to shutdown in time");
161 }
162
163 SCMutexLock(&tv_root_lock);
164 for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
165 if (strncasecmp(tv->name, thread_name_flow_mgr,
166 strlen(thread_name_flow_mgr)) == 0)
167 {
168 if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
169 SCMutexUnlock(&tv_root_lock);
170 /* sleep outside lock */
171 SleepMsec(1);
172 goto again;
173 }
174 }
175 }
176 SCMutexUnlock(&tv_root_lock);
177
178 /* reset count, so we can kill and respawn (unix socket) */
179 SC_ATOMIC_SET(flowmgr_cnt, 0);
180 return;
181 }
182
183 /** \internal
184 * \brief check if a flow is timed out
185 *
186 * \param f flow
187 * \param ts timestamp
188 *
189 * \retval 0 not timed out
190 * \retval 1 timed out
191 */
FlowManagerFlowTimeout(Flow * f,struct timeval * ts,int32_t * next_ts,const bool emerg)192 static int FlowManagerFlowTimeout(Flow *f, struct timeval *ts, int32_t *next_ts, const bool emerg)
193 {
194 int32_t flow_times_out_at = f->timeout_at;
195 if (emerg) {
196 extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
197 flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
198 }
199 if (*next_ts == 0 || flow_times_out_at < *next_ts)
200 *next_ts = flow_times_out_at;
201
202 /* do the timeout check */
203 if (flow_times_out_at >= ts->tv_sec) {
204 return 0;
205 }
206
207 return 1;
208 }
209
FlowBypassedTimeout(Flow * f,struct timeval * ts,FlowTimeoutCounters * counters)210 static inline int FlowBypassedTimeout(Flow *f, struct timeval *ts,
211 FlowTimeoutCounters *counters)
212 {
213 #ifdef CAPTURE_OFFLOAD
214 if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
215 return 1;
216 }
217
218 FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
219 if (fc && fc->BypassUpdate) {
220 /* flow will be possibly updated */
221 uint64_t pkts_tosrc = fc->tosrcpktcnt;
222 uint64_t bytes_tosrc = fc->tosrcbytecnt;
223 uint64_t pkts_todst = fc->todstpktcnt;
224 uint64_t bytes_todst = fc->todstbytecnt;
225 bool update = fc->BypassUpdate(f, fc->bypass_data, ts->tv_sec);
226 if (update) {
227 SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
228 pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
229 bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
230 pkts_todst = fc->todstpktcnt - pkts_todst;
231 bytes_todst = fc->todstbytecnt - bytes_todst;
232 if (f->livedev) {
233 SC_ATOMIC_ADD(f->livedev->bypassed,
234 pkts_tosrc + pkts_todst);
235 }
236 counters->bypassed_pkts += pkts_tosrc + pkts_todst;
237 counters->bypassed_bytes += bytes_tosrc + bytes_todst;
238 return 0;
239 } else {
240 SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
241 if (f->livedev) {
242 if (FLOW_IS_IPV4(f)) {
243 LiveDevSubBypassStats(f->livedev, 1, AF_INET);
244 } else if (FLOW_IS_IPV6(f)) {
245 LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
246 }
247 }
248 counters->bypassed_count++;
249 return 1;
250 }
251 }
252 #endif /* CAPTURE_OFFLOAD */
253 return 1;
254 }
255
FMFlowLock(Flow * f)256 static inline void FMFlowLock(Flow *f)
257 {
258 FLOWLOCK_WRLOCK(f);
259 }
260
261 typedef struct FlowManagerTimeoutThread {
262 /* used to temporarily store flows that have timed out and are
263 * removed from the hash */
264 FlowQueuePrivate aside_queue;
265 } FlowManagerTimeoutThread;
266
ProcessAsideQueue(FlowManagerTimeoutThread * td,FlowTimeoutCounters * counters)267 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
268 {
269 FlowQueuePrivate recycle = { NULL, NULL, 0 };
270 counters->flows_aside += td->aside_queue.len;
271
272 uint32_t cnt = 0;
273 Flow *f;
274 while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
275 /* flow is still locked */
276
277 if (f->proto == IPPROTO_TCP && !(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
278 #ifdef CAPTURE_OFFLOAD
279 f->flow_state != FLOW_STATE_CAPTURE_BYPASSED &&
280 #endif
281 f->flow_state != FLOW_STATE_LOCAL_BYPASSED &&
282 FlowForceReassemblyNeedReassembly(f) == 1) {
283 /* Send the flow to its thread */
284 FlowForceReassemblyForFlow(f);
285 FLOWLOCK_UNLOCK(f);
286 /* flow ownership is passed to the worker thread */
287
288 counters->flows_aside_needs_work++;
289 continue;
290 }
291 FLOWLOCK_UNLOCK(f);
292
293 FlowQueuePrivateAppendFlow(&recycle, f);
294 if (recycle.len == 100) {
295 FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
296 }
297 cnt++;
298 }
299 if (recycle.len) {
300 FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
301 }
302 return cnt;
303 }
304
305 /**
306 * \internal
307 *
308 * \brief check all flows in a hash row for timing out
309 *
310 * \param f last flow in the hash row
311 * \param ts timestamp
312 * \param emergency bool indicating emergency mode
313 * \param counters ptr to FlowTimeoutCounters structure
314 */
FlowManagerHashRowTimeout(FlowManagerTimeoutThread * td,Flow * f,struct timeval * ts,int emergency,FlowTimeoutCounters * counters,int32_t * next_ts)315 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td,
316 Flow *f, struct timeval *ts,
317 int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
318 {
319 uint32_t checked = 0;
320 Flow *prev_f = NULL;
321
322 do {
323 checked++;
324
325 /* check flow timeout based on lastts and state. Both can be
326 * accessed w/o Flow lock as we do have the hash row lock (so flow
327 * can't disappear) and flow_state is atomic. lastts can only
328 * be modified when we have both the flow and hash row lock */
329
330 /* timeout logic goes here */
331 if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
332
333 counters->flows_notimeout++;
334
335 prev_f = f;
336 f = f->next;
337 continue;
338 }
339
340 FMFlowLock(f); //FLOWLOCK_WRLOCK(f);
341
342 Flow *next_flow = f->next;
343
344 /* never prune a flow that is used by a packet we
345 * are currently processing in one of the threads */
346 if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) {
347 FLOWLOCK_UNLOCK(f);
348 prev_f = f;
349 if (f->use_cnt > 0) {
350 counters->flows_timeout_inuse++;
351 }
352 f = f->next;
353 continue;
354 }
355
356 f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
357
358 counters->flows_timeout++;
359
360 RemoveFromHash(f, prev_f);
361
362 FlowQueuePrivateAppendFlow(&td->aside_queue, f);
363 /* flow is still locked in the queue */
364
365 f = next_flow;
366 } while (f != NULL);
367
368 counters->flows_checked += checked;
369 if (checked > counters->rows_maxlen)
370 counters->rows_maxlen = checked;
371 }
372
FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread * td,Flow * f,struct timeval * ts,FlowTimeoutCounters * counters)373 static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td,
374 Flow *f, struct timeval *ts, FlowTimeoutCounters *counters)
375 {
376 do {
377 FLOWLOCK_WRLOCK(f);
378 Flow *next_flow = f->next;
379 f->next = NULL;
380 f->fb = NULL;
381
382 DEBUG_VALIDATE_BUG_ON(f->use_cnt > 0);
383
384 FlowQueuePrivateAppendFlow(&td->aside_queue, f);
385 /* flow is still locked in the queue */
386
387 f = next_flow;
388 } while (f != NULL);
389 }
390
391 /**
392 * \brief time out flows from the hash
393 *
394 * \param ts timestamp
395 * \param hash_min min hash index to consider
396 * \param hash_max max hash index to consider
397 * \param counters ptr to FlowTimeoutCounters structure
398 *
399 * \retval cnt number of timed out flow
400 */
FlowTimeoutHash(FlowManagerTimeoutThread * td,struct timeval * ts,const uint32_t hash_min,const uint32_t hash_max,FlowTimeoutCounters * counters)401 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td,
402 struct timeval *ts,
403 const uint32_t hash_min, const uint32_t hash_max,
404 FlowTimeoutCounters *counters)
405 {
406 uint32_t cnt = 0;
407 const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
408 const uint32_t rows_checked = hash_max - hash_min;
409 uint32_t rows_skipped = 0;
410 uint32_t rows_empty = 0;
411
412 #if __WORDSIZE==64
413 #define BITS 64
414 #define TYPE uint64_t
415 #else
416 #define BITS 32
417 #define TYPE uint32_t
418 #endif
419
420 for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
421 TYPE check_bits = 0;
422 const uint32_t check = MIN(BITS, (hash_max - idx));
423 for (uint32_t i = 0; i < check; i++) {
424 FlowBucket *fb = &flow_hash[idx+i];
425 check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i;
426 }
427 if (check_bits == 0)
428 continue;
429
430 for (uint32_t i = 0; i < check; i++) {
431 FlowBucket *fb = &flow_hash[idx+i];
432 if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) {
433 FBLOCK_LOCK(fb);
434 Flow *evicted = NULL;
435 if (fb->evicted != NULL || fb->head != NULL) {
436 if (fb->evicted != NULL) {
437 /* transfer out of bucket so we can do additional work outside
438 * of the bucket lock */
439 evicted = fb->evicted;
440 fb->evicted = NULL;
441 }
442 if (fb->head != NULL) {
443 int32_t next_ts = 0;
444 FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
445
446 if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
447 SC_ATOMIC_SET(fb->next_ts, next_ts);
448 }
449 if (fb->evicted == NULL && fb->head == NULL) {
450 SC_ATOMIC_SET(fb->next_ts, INT_MAX);
451 }
452 } else {
453 SC_ATOMIC_SET(fb->next_ts, INT_MAX);
454 rows_empty++;
455 }
456 FBLOCK_UNLOCK(fb);
457 /* processed evicted list */
458 if (evicted) {
459 FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
460 }
461 } else {
462 rows_skipped++;
463 }
464 }
465 if (td->aside_queue.len) {
466 cnt += ProcessAsideQueue(td, counters);
467 }
468 }
469
470 counters->rows_checked += rows_checked;
471 counters->rows_skipped += rows_skipped;
472 counters->rows_empty += rows_empty;
473
474 if (td->aside_queue.len) {
475 cnt += ProcessAsideQueue(td, counters);
476 }
477 counters->flows_removed += cnt;
478 /* coverity[missing_unlock : FALSE] */
479 return cnt;
480 }
481
FlowTimeoutHashInChunks(FlowManagerTimeoutThread * td,struct timeval * ts,const uint32_t hash_min,const uint32_t hash_max,FlowTimeoutCounters * counters,uint32_t iter,const uint32_t chunks)482 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td,
483 struct timeval *ts,
484 const uint32_t hash_min, const uint32_t hash_max,
485 FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks)
486 {
487 const uint32_t rows = hash_max - hash_min;
488 const uint32_t chunk_size = rows / chunks;
489
490 const uint32_t min = iter * chunk_size + hash_min;
491 uint32_t max = min + chunk_size;
492 /* we start at beginning of hash at next iteration so let's check
493 * hash till the end */
494 if (iter + 1 == chunks) {
495 max = hash_max;
496 }
497 const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters);
498 return cnt;
499 }
500
501 /**
502 * \internal
503 *
504 * \brief move all flows out of a hash row
505 *
506 * \param f last flow in the hash row
507 *
508 * \retval cnt removed out flows
509 */
FlowManagerHashRowCleanup(Flow * f,FlowQueuePrivate * recycle_q,const int mode)510 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
511 {
512 uint32_t cnt = 0;
513
514 do {
515 FLOWLOCK_WRLOCK(f);
516
517 Flow *next_flow = f->next;
518
519 /* remove from the hash */
520 if (mode == 0) {
521 RemoveFromHash(f, NULL);
522 } else {
523 FlowBucket *fb = f->fb;
524 fb->evicted = f->next;
525 f->next = NULL;
526 f->fb = NULL;
527 }
528 f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN;
529
530 /* no one is referring to this flow, use_cnt 0, removed from hash
531 * so we can unlock it and move it to the recycle queue. */
532 FLOWLOCK_UNLOCK(f);
533 FlowQueuePrivateAppendFlow(recycle_q, f);
534
535 cnt++;
536
537 f = next_flow;
538 } while (f != NULL);
539
540 return cnt;
541 }
542
543 /**
544 * \brief remove all flows from the hash
545 *
546 * \retval cnt number of removes out flows
547 */
FlowCleanupHash(void)548 static uint32_t FlowCleanupHash(void)
549 {
550 FlowQueuePrivate local_queue = { NULL, NULL, 0 };
551 uint32_t cnt = 0;
552
553 for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
554 FlowBucket *fb = &flow_hash[idx];
555
556 FBLOCK_LOCK(fb);
557
558 if (fb->head != NULL) {
559 /* we have a flow, or more than one */
560 cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
561 }
562 if (fb->evicted != NULL) {
563 /* we have a flow, or more than one */
564 cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
565 }
566
567 FBLOCK_UNLOCK(fb);
568 if (local_queue.len >= 25) {
569 FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
570 }
571 }
572 FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
573
574 return cnt;
575 }
576
Recycler(ThreadVars * tv,void * output_thread_data,Flow * f)577 static void Recycler(ThreadVars *tv, void *output_thread_data, Flow *f)
578 {
579 FLOWLOCK_WRLOCK(f);
580
581 (void)OutputFlowLog(tv, output_thread_data, f);
582
583 FlowClearMemory (f, f->protomap);
584 FLOWLOCK_UNLOCK(f);
585 FlowSparePoolReturnFlow(f);
586 }
587
588 typedef struct FlowQueueTimeoutCounters {
589 uint32_t flows_removed;
590 uint32_t flows_timeout;
591 } FlowQueueTimeoutCounters;
592
593 extern int g_detect_disabled;
594
595 typedef struct FlowCounters_ {
596 uint16_t flow_mgr_full_pass;
597 uint16_t flow_mgr_cnt_clo;
598 uint16_t flow_mgr_cnt_new;
599 uint16_t flow_mgr_cnt_est;
600 uint16_t flow_mgr_cnt_byp;
601 uint16_t flow_mgr_spare;
602 uint16_t flow_emerg_mode_enter;
603 uint16_t flow_emerg_mode_over;
604
605 uint16_t flow_mgr_flows_checked;
606 uint16_t flow_mgr_flows_notimeout;
607 uint16_t flow_mgr_flows_timeout;
608 uint16_t flow_mgr_flows_timeout_inuse;
609 uint16_t flow_mgr_flows_aside;
610 uint16_t flow_mgr_flows_aside_needs_work;
611
612 uint16_t flow_mgr_rows_maxlen;
613
614 uint16_t flow_bypassed_cnt_clo;
615 uint16_t flow_bypassed_pkts;
616 uint16_t flow_bypassed_bytes;
617 } FlowCounters;
618
619 typedef struct FlowManagerThreadData_ {
620 uint32_t instance;
621 uint32_t min;
622 uint32_t max;
623
624 FlowCounters cnt;
625
626 FlowManagerTimeoutThread timeout;
627 } FlowManagerThreadData;
628
FlowCountersInit(ThreadVars * t,FlowCounters * fc)629 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
630 {
631 fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
632 fc->flow_mgr_cnt_clo = StatsRegisterCounter("flow.mgr.closed_pruned", t);
633 fc->flow_mgr_cnt_new = StatsRegisterCounter("flow.mgr.new_pruned", t);
634 fc->flow_mgr_cnt_est = StatsRegisterCounter("flow.mgr.est_pruned", t);
635 fc->flow_mgr_cnt_byp = StatsRegisterCounter("flow.mgr.bypassed_pruned", t);
636 fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
637 fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
638 fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
639
640 fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
641 fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
642 fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
643 fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
644 fc->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow.mgr.flows_timeout_inuse", t);
645 fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
646 fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
647
648 fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
649 fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
650 fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
651 }
652
FlowManagerThreadInit(ThreadVars * t,const void * initdata,void ** data)653 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
654 {
655 FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
656 if (ftd == NULL)
657 return TM_ECODE_FAILED;
658
659 ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
660 SCLogDebug("flow manager instance %u", ftd->instance);
661
662 /* set the min and max value used for hash row walking
663 * each thread has it's own section of the flow hash */
664 uint32_t range = flow_config.hash_size / flowmgr_number;
665 if (ftd->instance == 0)
666 ftd->max = range;
667 else if ((ftd->instance + 1) == flowmgr_number) {
668 ftd->min = (range * ftd->instance) + 1;
669 ftd->max = flow_config.hash_size;
670 } else {
671 ftd->min = (range * ftd->instance) + 1;
672 ftd->max = (range * (ftd->instance + 1));
673 }
674 BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
675
676 SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
677
678 /* pass thread data back to caller */
679 *data = ftd;
680
681 FlowCountersInit(t, &ftd->cnt);
682
683 PacketPoolInit();
684 return TM_ECODE_OK;
685 }
686
FlowManagerThreadDeinit(ThreadVars * t,void * data)687 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
688 {
689 PacketPoolDestroy();
690 SCFree(data);
691 return TM_ECODE_OK;
692 }
693
FlowTimeoutsMin(void)694 static uint32_t FlowTimeoutsMin(void)
695 {
696 FlowProtoTimeoutPtr t = SC_ATOMIC_GET(flow_timeouts);
697 uint32_t m = -1;
698 for (unsigned int i = 0; i < FLOW_PROTO_MAX; i++) {
699 m = MIN(m, t[i].new_timeout);
700 m = MIN(m, t[i].est_timeout);
701
702 if (i == FLOW_PROTO_TCP) {
703 m = MIN(m, t[i].closed_timeout);
704 }
705 if (i == FLOW_PROTO_TCP || i == FLOW_PROTO_UDP) {
706 m = MIN(m, t[i].bypassed_timeout);
707 }
708 }
709 return m;
710 }
711
712 //#define FM_PROFILE
713
714 /** \brief Thread that manages the flow table and times out flows.
715 *
716 * \param td ThreadVars casted to void ptr
717 *
718 * Keeps an eye on the spare list, alloc flows if needed...
719 */
FlowManager(ThreadVars * th_v,void * thread_data)720 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
721 {
722 FlowManagerThreadData *ftd = thread_data;
723 struct timeval ts;
724 uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
725 bool emerg = false;
726 bool prev_emerg = false;
727 uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
728 uint32_t flow_last_sec = 0;
729 /* VJ leaving disabled for now, as hosts are only used by tags and the numbers
730 * are really low. Might confuse ppl
731 uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
732 uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v);
733 uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
734 */
735 memset(&ts, 0, sizeof(ts));
736 uint32_t hash_passes = 0;
737 #ifdef FM_PROFILE
738 uint32_t hash_row_checks = 0;
739 uint32_t hash_passes_chunks = 0;
740 #endif
741 uint32_t hash_full_passes = 0;
742
743 const uint32_t min_timeout = FlowTimeoutsMin();
744 const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
745
746 /* don't start our activities until time is setup */
747 while (!TimeModeIsReady()) {
748 if (suricata_ctl_flags != 0)
749 return TM_ECODE_OK;
750 }
751
752 SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name,
753 ftd->instance, min_timeout, pass_in_sec);
754
755 #ifdef FM_PROFILE
756 struct timeval endts;
757 struct timeval active;
758 struct timeval paused;
759 struct timeval sleeping;
760 memset(&endts, 0, sizeof(endts));
761 memset(&active, 0, sizeof(active));
762 memset(&paused, 0, sizeof(paused));
763 memset(&sleeping, 0, sizeof(sleeping));
764 #endif
765
766 struct timeval startts;
767 memset(&startts, 0, sizeof(startts));
768 gettimeofday(&startts, NULL);
769
770 uint32_t hash_pass_iter = 0;
771 uint32_t emerg_over_cnt = 0;
772 uint64_t next_run_ms = 0;
773
774 while (1)
775 {
776 if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
777 TmThreadsSetFlag(th_v, THV_PAUSED);
778 #ifdef FM_PROFILE
779 struct timeval pause_startts;
780 memset(&pause_startts, 0, sizeof(pause_startts));
781 gettimeofday(&pause_startts, NULL);
782 #endif
783 TmThreadTestThreadUnPaused(th_v);
784 #ifdef FM_PROFILE
785 struct timeval pause_endts;
786 memset(&pause_endts, 0, sizeof(pause_endts));
787 gettimeofday(&pause_endts, NULL);
788 struct timeval pause_time;
789 memset(&pause_time, 0, sizeof(pause_time));
790 timersub(&pause_endts, &pause_startts, &pause_time);
791 timeradd(&paused, &pause_time, &paused);
792 #endif
793 TmThreadsUnsetFlag(th_v, THV_PAUSED);
794 }
795
796 if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
797 emerg = true;
798 }
799 #ifdef FM_PROFILE
800 struct timeval run_startts;
801 memset(&run_startts, 0, sizeof(run_startts));
802 gettimeofday(&run_startts, NULL);
803 #endif
804 /* Get the time */
805 memset(&ts, 0, sizeof(ts));
806 TimeGet(&ts);
807 SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
808 const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
809 const uint32_t rt = (uint32_t)ts.tv_sec;
810 const bool emerge_p = (emerg && !prev_emerg);
811 if (emerge_p) {
812 next_run_ms = 0;
813 prev_emerg = true;
814 SCLogNotice("Flow emergency mode entered...");
815 StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
816 }
817 if (ts_ms >= next_run_ms) {
818 if (ftd->instance == 0) {
819 const uint32_t sq_len = FlowSpareGetPoolSize();
820 const uint32_t spare_perc = sq_len * 100 / flow_config.prealloc;
821 /* see if we still have enough spare flows */
822 if (spare_perc < 90 || spare_perc > 110) {
823 FlowSparePoolUpdate(sq_len);
824 }
825 }
826 const uint32_t secs_passed = rt - flow_last_sec;
827
828 /* try to time out flows */
829 FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
830
831 if (emerg) {
832 /* in emergency mode, do a full pass of the hash table */
833 FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters);
834 hash_passes++;
835 hash_full_passes++;
836 hash_passes++;
837 #ifdef FM_PROFILE
838 hash_passes_chunks += 1;
839 hash_row_checks += counters.rows_checked;
840 #endif
841 StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
842 } else {
843 /* non-emergency mode: scan part of the hash */
844 const uint32_t chunks = MIN(secs_passed, pass_in_sec);
845 for (uint32_t i = 0; i < chunks; i++) {
846 FlowTimeoutHashInChunks(&ftd->timeout, &ts, ftd->min, ftd->max,
847 &counters, hash_pass_iter, pass_in_sec);
848 hash_pass_iter++;
849 if (hash_pass_iter == pass_in_sec) {
850 hash_pass_iter = 0;
851 hash_full_passes++;
852 StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
853 }
854 }
855 hash_passes++;
856 #ifdef FM_PROFILE
857 hash_row_checks += counters.rows_checked;
858 hash_passes_chunks += chunks;
859 #endif
860 }
861 flow_last_sec = rt;
862
863 /*
864 StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
865 uint32_t hosts_active = HostGetActiveCount();
866 StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active);
867 uint32_t hosts_spare = HostGetSpareCount();
868 StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare);
869 */
870 StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_clo, (uint64_t)counters.clo);
871 StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_new, (uint64_t)counters.new);
872 StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_est, (uint64_t)counters.est);
873 StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_byp, (uint64_t)counters.byp);
874
875 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
876 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
877
878 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
879 //StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
880 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
881 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters.flows_aside);
882 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside_needs_work, (uint64_t)counters.flows_aside_needs_work);
883
884 StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters.bypassed_count);
885 StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters.bypassed_pkts);
886 StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters.bypassed_bytes);
887
888 StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
889 // TODO AVG MAXLEN
890 // TODO LOOKUP STEPS MAXLEN and AVG LEN
891 /* Don't fear, FlowManagerThread is here...
892 * clear emergency bit if we have at least xx flows pruned. */
893 uint32_t len = FlowSpareGetPoolSize();
894 StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len);
895 if (emerg == true) {
896 SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
897 "flow_spare_q status: %"PRIu32"%% flows at the queue",
898 len, flow_config.prealloc, len * 100 / flow_config.prealloc);
899
900 /* only if we have pruned this "emergency_recovery" percentage
901 * of flows, we will unset the emergency bit */
902 if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
903 emerg_over_cnt++;
904 } else {
905 emerg_over_cnt = 0;
906 }
907
908 if (emerg_over_cnt >= 30) {
909 SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
910 FlowTimeoutsReset();
911
912 emerg = false;
913 prev_emerg = FALSE;
914 emerg_over_cnt = 0;
915 hash_pass_iter = 0;
916 SCLogNotice("Flow emergency mode over, back to normal... unsetting"
917 " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
918 "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
919 "%% flows at the queue", (uintmax_t)ts.tv_sec,
920 (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
921
922 StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
923 }
924 }
925 next_run_ms = ts_ms + 667;
926 if (emerg)
927 next_run_ms = ts_ms + 250;
928 }
929 if (flow_last_sec == 0) {
930 flow_last_sec = rt;
931 }
932
933 if (ftd->instance == 0 &&
934 (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) {
935 DefragTimeoutHash(&ts);
936 //uint32_t hosts_pruned =
937 HostTimeoutHash(&ts);
938 IPPairTimeoutHash(&ts);
939 other_last_sec = (uint32_t)ts.tv_sec;
940 }
941
942
943 #ifdef FM_PROFILE
944 struct timeval run_endts;
945 memset(&run_endts, 0, sizeof(run_endts));
946 gettimeofday(&run_endts, NULL);
947 struct timeval run_time;
948 memset(&run_time, 0, sizeof(run_time));
949 timersub(&run_endts, &run_startts, &run_time);
950 timeradd(&active, &run_time, &active);
951 #endif
952
953 if (TmThreadsCheckFlag(th_v, THV_KILL)) {
954 StatsSyncCounters(th_v);
955 break;
956 }
957
958 #ifdef FM_PROFILE
959 struct timeval sleep_startts;
960 memset(&sleep_startts, 0, sizeof(sleep_startts));
961 gettimeofday(&sleep_startts, NULL);
962 #endif
963 usleep(250);
964
965 #ifdef FM_PROFILE
966 struct timeval sleep_endts;
967 memset(&sleep_endts, 0, sizeof(sleep_endts));
968 gettimeofday(&sleep_endts, NULL);
969
970 struct timeval sleep_time;
971 memset(&sleep_time, 0, sizeof(sleep_time));
972 timersub(&sleep_endts, &sleep_startts, &sleep_time);
973 timeradd(&sleeping, &sleep_time, &sleeping);
974 #endif
975 SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
976
977 StatsSyncCountersIfSignalled(th_v);
978 }
979 SCLogPerf("%" PRIu32 " new flows, %" PRIu32 " established flows were "
980 "timed out, %"PRIu32" flows in closed state", new_cnt,
981 established_cnt, closing_cnt);
982
983 #ifdef FM_PROFILE
984 SCLogNotice("hash passes %u avg chunks %u full %u rows %u (rows/s %u)",
985 hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1),
986 hash_full_passes, hash_row_checks,
987 hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
988
989 gettimeofday(&endts, NULL);
990 struct timeval total_run_time;
991 timersub(&endts, &startts, &total_run_time);
992
993 SCLogNotice("FM: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
994 (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
995 (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
996 (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
997 (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
998 #endif
999 return TM_ECODE_OK;
1000 }
1001
1002 /** \brief spawn the flow manager thread */
FlowManagerThreadSpawn()1003 void FlowManagerThreadSpawn()
1004 {
1005 intmax_t setting = 1;
1006 (void)ConfGetInt("flow.managers", &setting);
1007
1008 if (setting < 1 || setting > 1024) {
1009 FatalError(SC_ERR_INVALID_ARGUMENTS,
1010 "invalid flow.managers setting %"PRIdMAX, setting);
1011 }
1012 flowmgr_number = (uint32_t)setting;
1013
1014 SCLogConfig("using %u flow manager threads", flowmgr_number);
1015 StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse);
1016
1017 for (uint32_t u = 0; u < flowmgr_number; u++) {
1018 char name[TM_THREAD_NAME_MAX];
1019 snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
1020
1021 ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
1022 "FlowManager", 0);
1023 BUG_ON(tv_flowmgr == NULL);
1024
1025 if (tv_flowmgr == NULL) {
1026 FatalError(SC_ERR_FATAL, "flow manager thread creation failed");
1027 }
1028 if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1029 FatalError(SC_ERR_FATAL, "flow manager thread spawn failed");
1030 }
1031 }
1032 return;
1033 }
1034
1035 typedef struct FlowRecyclerThreadData_ {
1036 void *output_thread_data;
1037 } FlowRecyclerThreadData;
1038
FlowRecyclerThreadInit(ThreadVars * t,const void * initdata,void ** data)1039 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
1040 {
1041 FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
1042 if (ftd == NULL)
1043 return TM_ECODE_FAILED;
1044 if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
1045 SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
1046 SCFree(ftd);
1047 return TM_ECODE_FAILED;
1048 }
1049 SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1050
1051 *data = ftd;
1052 return TM_ECODE_OK;
1053 }
1054
FlowRecyclerThreadDeinit(ThreadVars * t,void * data)1055 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1056 {
1057 FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
1058 if (ftd->output_thread_data != NULL)
1059 OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
1060
1061 SCFree(data);
1062 return TM_ECODE_OK;
1063 }
1064
1065 /** \brief Thread that manages timed out flows.
1066 *
1067 * \param td ThreadVars casted to void ptr
1068 */
FlowRecycler(ThreadVars * th_v,void * thread_data)1069 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1070 {
1071 struct timeval ts;
1072 uint64_t recycled_cnt = 0;
1073 FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1074 BUG_ON(ftd == NULL);
1075
1076 memset(&ts, 0, sizeof(ts));
1077 uint32_t fr_passes = 0;
1078
1079 #ifdef FM_PROFILE
1080 struct timeval endts;
1081 struct timeval active;
1082 struct timeval paused;
1083 struct timeval sleeping;
1084 memset(&endts, 0, sizeof(endts));
1085 memset(&active, 0, sizeof(active));
1086 memset(&paused, 0, sizeof(paused));
1087 memset(&sleeping, 0, sizeof(sleeping));
1088 #endif
1089 struct timeval startts;
1090 memset(&startts, 0, sizeof(startts));
1091 gettimeofday(&startts, NULL);
1092
1093 while (1)
1094 {
1095 if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1096 TmThreadsSetFlag(th_v, THV_PAUSED);
1097 #ifdef FM_PROFILE
1098 struct timeval pause_startts;
1099 memset(&pause_startts, 0, sizeof(pause_startts));
1100 gettimeofday(&pause_startts, NULL);
1101 #endif
1102 TmThreadTestThreadUnPaused(th_v);
1103
1104 #ifdef FM_PROFILE
1105 struct timeval pause_endts;
1106 memset(&pause_endts, 0, sizeof(pause_endts));
1107 gettimeofday(&pause_endts, NULL);
1108
1109 struct timeval pause_time;
1110 memset(&pause_time, 0, sizeof(pause_time));
1111 timersub(&pause_endts, &pause_startts, &pause_time);
1112 timeradd(&paused, &pause_time, &paused);
1113 #endif
1114 TmThreadsUnsetFlag(th_v, THV_PAUSED);
1115 }
1116 fr_passes++;
1117 #ifdef FM_PROFILE
1118 struct timeval run_startts;
1119 memset(&run_startts, 0, sizeof(run_startts));
1120 gettimeofday(&run_startts, NULL);
1121 #endif
1122 SC_ATOMIC_ADD(flowrec_busy,1);
1123 FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
1124
1125 const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1126
1127 /* Get the time */
1128 memset(&ts, 0, sizeof(ts));
1129 TimeGet(&ts);
1130 SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
1131
1132 Flow *f;
1133 while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1134 Recycler(th_v, ftd->output_thread_data, f);
1135 recycled_cnt++;
1136 }
1137 SC_ATOMIC_SUB(flowrec_busy,1);
1138
1139 #ifdef FM_PROFILE
1140 struct timeval run_endts;
1141 memset(&run_endts, 0, sizeof(run_endts));
1142 gettimeofday(&run_endts, NULL);
1143
1144 struct timeval run_time;
1145 memset(&run_time, 0, sizeof(run_time));
1146 timersub(&run_endts, &run_startts, &run_time);
1147 timeradd(&active, &run_time, &active);
1148 #endif
1149
1150 if (bail) {
1151 break;
1152 }
1153
1154 #ifdef FM_PROFILE
1155 struct timeval sleep_startts;
1156 memset(&sleep_startts, 0, sizeof(sleep_startts));
1157 gettimeofday(&sleep_startts, NULL);
1158 #endif
1159 usleep(250);
1160 #ifdef FM_PROFILE
1161 struct timeval sleep_endts;
1162 memset(&sleep_endts, 0, sizeof(sleep_endts));
1163 gettimeofday(&sleep_endts, NULL);
1164 struct timeval sleep_time;
1165 memset(&sleep_time, 0, sizeof(sleep_time));
1166 timersub(&sleep_endts, &sleep_startts, &sleep_time);
1167 timeradd(&sleeping, &sleep_time, &sleeping);
1168 #endif
1169
1170 SCLogDebug("woke up...");
1171
1172 StatsSyncCountersIfSignalled(th_v);
1173 }
1174 StatsSyncCounters(th_v);
1175 #ifdef FM_PROFILE
1176 gettimeofday(&endts, NULL);
1177 struct timeval total_run_time;
1178 timersub(&endts, &startts, &total_run_time);
1179 SCLogNotice("FR: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
1180 (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
1181 (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
1182 (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
1183 (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
1184
1185 SCLogNotice("FR passes %u passes/s %u", fr_passes,
1186 (uint32_t)fr_passes/((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
1187 #endif
1188 SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1189 return TM_ECODE_OK;
1190 }
1191
FlowRecyclerReadyToShutdown(void)1192 static bool FlowRecyclerReadyToShutdown(void)
1193 {
1194 if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1195 return false;
1196 }
1197 uint32_t len = 0;
1198 FQLOCK_LOCK(&flow_recycle_q);
1199 len = flow_recycle_q.qlen;
1200 FQLOCK_UNLOCK(&flow_recycle_q);
1201
1202 return ((len == 0));
1203 }
1204
1205 /** \brief spawn the flow recycler thread */
FlowRecyclerThreadSpawn()1206 void FlowRecyclerThreadSpawn()
1207 {
1208 intmax_t setting = 1;
1209 (void)ConfGetInt("flow.recyclers", &setting);
1210
1211 if (setting < 1 || setting > 1024) {
1212 FatalError(SC_ERR_INVALID_ARGUMENTS,
1213 "invalid flow.recyclers setting %"PRIdMAX, setting);
1214 }
1215 flowrec_number = (uint32_t)setting;
1216
1217 SCLogConfig("using %u flow recycler threads", flowrec_number);
1218
1219 for (uint32_t u = 0; u < flowrec_number; u++) {
1220 char name[TM_THREAD_NAME_MAX];
1221 snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1222
1223 ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1224 "FlowRecycler", 0);
1225
1226 if (tv_flowrec == NULL) {
1227 FatalError(SC_ERR_FATAL, "flow recycler thread creation failed");
1228 }
1229 if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1230 FatalError(SC_ERR_FATAL, "flow recycler thread spawn failed");
1231 }
1232 }
1233 return;
1234 }
1235
1236 /**
1237 * \brief Used to disable flow recycler thread(s).
1238 *
1239 * \note this should only be called when the flow manager is already gone
1240 *
1241 * \todo Kinda hackish since it uses the tv name to identify flow recycler
1242 * thread. We need an all weather identification scheme.
1243 */
FlowDisableFlowRecyclerThread(void)1244 void FlowDisableFlowRecyclerThread(void)
1245 {
1246 int cnt = 0;
1247
1248 /* move all flows still in the hash to the recycler queue */
1249 #ifndef DEBUG
1250 (void)FlowCleanupHash();
1251 #else
1252 uint32_t flows = FlowCleanupHash();
1253 SCLogDebug("flows to progress: %u", flows);
1254 #endif
1255
1256 /* make sure all flows are processed */
1257 do {
1258 usleep(10);
1259 } while (FlowRecyclerReadyToShutdown() == false);
1260
1261 SCMutexLock(&tv_root_lock);
1262 /* flow recycler thread(s) is/are a part of mgmt threads */
1263 for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1264 if (strncasecmp(tv->name, thread_name_flow_rec,
1265 strlen(thread_name_flow_rec)) == 0)
1266 {
1267 TmThreadsSetFlag(tv, THV_KILL);
1268 cnt++;
1269 }
1270 }
1271 SCMutexUnlock(&tv_root_lock);
1272
1273 struct timeval start_ts;
1274 struct timeval cur_ts;
1275 gettimeofday(&start_ts, NULL);
1276
1277 again:
1278 gettimeofday(&cur_ts, NULL);
1279 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1280 FatalError(SC_ERR_SHUTDOWN, "unable to get all flow recycler "
1281 "threads to shutdown in time");
1282 }
1283
1284 SCMutexLock(&tv_root_lock);
1285 for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1286 if (strncasecmp(tv->name, thread_name_flow_rec,
1287 strlen(thread_name_flow_rec)) == 0)
1288 {
1289 if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1290 SCMutexUnlock(&tv_root_lock);
1291 /* sleep outside lock */
1292 SleepMsec(1);
1293 goto again;
1294 }
1295 }
1296 }
1297 SCMutexUnlock(&tv_root_lock);
1298
1299 /* reset count, so we can kill and respawn (unix socket) */
1300 SC_ATOMIC_SET(flowrec_cnt, 0);
1301 return;
1302 }
1303
TmModuleFlowManagerRegister(void)1304 void TmModuleFlowManagerRegister (void)
1305 {
1306 tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1307 tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1308 tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1309 tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1310 tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
1311 tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
1312 SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1313
1314 SC_ATOMIC_INIT(flowmgr_cnt);
1315 SC_ATOMIC_INITPTR(flow_timeouts);
1316 }
1317
TmModuleFlowRecyclerRegister(void)1318 void TmModuleFlowRecyclerRegister (void)
1319 {
1320 tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1321 tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1322 tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1323 tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1324 tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
1325 tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
1326 SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1327
1328 SC_ATOMIC_INIT(flowrec_cnt);
1329 SC_ATOMIC_INIT(flowrec_busy);
1330 }
1331