1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "suricata.h"
27 #include "decode.h"
28 #include "conf.h"
29 #include "threadvars.h"
30 #include "tm-threads.h"
31 #include "runmodes.h"
32 
33 #include "util-random.h"
34 #include "util-time.h"
35 
36 #include "flow.h"
37 #include "flow-queue.h"
38 #include "flow-hash.h"
39 #include "flow-util.h"
40 #include "flow-var.h"
41 #include "flow-private.h"
42 #include "flow-timeout.h"
43 #include "flow-manager.h"
44 #include "flow-storage.h"
45 #include "flow-spare-pool.h"
46 
47 #include "stream-tcp-private.h"
48 #include "stream-tcp-reassemble.h"
49 #include "stream-tcp.h"
50 
51 #include "util-unittest.h"
52 #include "util-unittest-helper.h"
53 #include "util-byte.h"
54 
55 #include "util-debug.h"
56 #include "util-privs.h"
57 #include "util-signal.h"
58 
59 #include "threads.h"
60 #include "detect.h"
61 #include "detect-engine-state.h"
62 #include "stream.h"
63 
64 #include "app-layer-parser.h"
65 
66 #include "host-timeout.h"
67 #include "defrag-timeout.h"
68 #include "ippair-timeout.h"
69 
70 #include "output-flow.h"
71 #include "util-validate.h"
72 
73 /* Run mode selected at suricata.c */
74 extern int run_mode;
75 
76 /** queue to pass flows to cleanup/log thread(s) */
77 FlowQueue flow_recycle_q;
78 
79 /* multi flow mananger support */
80 static uint32_t flowmgr_number = 1;
81 /* atomic counter for flow managers, to assign instance id */
82 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
83 
84 /* multi flow recycler support */
85 static uint32_t flowrec_number = 1;
86 /* atomic counter for flow recyclers, to assign instance id */
87 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
88 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
89 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
90 
FlowTimeoutsInit(void)91 void FlowTimeoutsInit(void)
92 {
93     SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
94 }
95 
FlowTimeoutsEmergency(void)96 void FlowTimeoutsEmergency(void)
97 {
98     SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
99 }
100 
101 /* 1 seconds */
102 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
103 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
104 /* 0.3 seconds */
105 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
106 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
107 #define NEW_FLOW_COUNT_COND 10
108 
109 typedef struct FlowTimeoutCounters_ {
110     uint32_t new;
111     uint32_t est;
112     uint32_t clo;
113     uint32_t byp;
114 
115     uint32_t rows_checked;
116     uint32_t rows_skipped;
117     uint32_t rows_empty;
118     uint32_t rows_maxlen;
119 
120     uint32_t flows_checked;
121     uint32_t flows_notimeout;
122     uint32_t flows_timeout;
123     uint32_t flows_timeout_inuse;
124     uint32_t flows_removed;
125     uint32_t flows_aside;
126     uint32_t flows_aside_needs_work;
127 
128     uint32_t bypassed_count;
129     uint64_t bypassed_pkts;
130     uint64_t bypassed_bytes;
131 } FlowTimeoutCounters;
132 
133 /**
134  * \brief Used to disable flow manager thread(s).
135  *
136  * \todo Kinda hackish since it uses the tv name to identify flow manager
137  *       thread.  We need an all weather identification scheme.
138  */
FlowDisableFlowManagerThread(void)139 void FlowDisableFlowManagerThread(void)
140 {
141     SCMutexLock(&tv_root_lock);
142     /* flow manager thread(s) is/are a part of mgmt threads */
143     for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
144         if (strncasecmp(tv->name, thread_name_flow_mgr,
145             strlen(thread_name_flow_mgr)) == 0)
146         {
147             TmThreadsSetFlag(tv, THV_KILL);
148         }
149     }
150     SCMutexUnlock(&tv_root_lock);
151 
152     struct timeval start_ts;
153     struct timeval cur_ts;
154     gettimeofday(&start_ts, NULL);
155 
156 again:
157     gettimeofday(&cur_ts, NULL);
158     if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
159         FatalError(SC_ERR_SHUTDOWN, "unable to get all flow manager "
160                 "threads to shutdown in time");
161     }
162 
163     SCMutexLock(&tv_root_lock);
164     for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
165         if (strncasecmp(tv->name, thread_name_flow_mgr,
166             strlen(thread_name_flow_mgr)) == 0)
167         {
168             if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
169                 SCMutexUnlock(&tv_root_lock);
170                 /* sleep outside lock */
171                 SleepMsec(1);
172                 goto again;
173             }
174         }
175     }
176     SCMutexUnlock(&tv_root_lock);
177 
178     /* reset count, so we can kill and respawn (unix socket) */
179     SC_ATOMIC_SET(flowmgr_cnt, 0);
180     return;
181 }
182 
183 /** \internal
184  *  \brief check if a flow is timed out
185  *
186  *  \param f flow
187  *  \param ts timestamp
188  *
189  *  \retval 0 not timed out
190  *  \retval 1 timed out
191  */
FlowManagerFlowTimeout(Flow * f,struct timeval * ts,int32_t * next_ts,const bool emerg)192 static int FlowManagerFlowTimeout(Flow *f, struct timeval *ts, int32_t *next_ts, const bool emerg)
193 {
194     int32_t flow_times_out_at = f->timeout_at;
195     if (emerg) {
196         extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
197         flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
198     }
199     if (*next_ts == 0 || flow_times_out_at < *next_ts)
200         *next_ts = flow_times_out_at;
201 
202     /* do the timeout check */
203     if (flow_times_out_at >= ts->tv_sec) {
204         return 0;
205     }
206 
207     return 1;
208 }
209 
FlowBypassedTimeout(Flow * f,struct timeval * ts,FlowTimeoutCounters * counters)210 static inline int FlowBypassedTimeout(Flow *f, struct timeval *ts,
211                                       FlowTimeoutCounters *counters)
212 {
213 #ifdef CAPTURE_OFFLOAD
214     if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
215         return 1;
216     }
217 
218     FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
219     if (fc && fc->BypassUpdate) {
220         /* flow will be possibly updated */
221         uint64_t pkts_tosrc = fc->tosrcpktcnt;
222         uint64_t bytes_tosrc = fc->tosrcbytecnt;
223         uint64_t pkts_todst = fc->todstpktcnt;
224         uint64_t bytes_todst = fc->todstbytecnt;
225         bool update = fc->BypassUpdate(f, fc->bypass_data, ts->tv_sec);
226         if (update) {
227             SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
228             pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
229             bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
230             pkts_todst = fc->todstpktcnt - pkts_todst;
231             bytes_todst = fc->todstbytecnt - bytes_todst;
232             if (f->livedev) {
233                 SC_ATOMIC_ADD(f->livedev->bypassed,
234                         pkts_tosrc + pkts_todst);
235             }
236             counters->bypassed_pkts += pkts_tosrc + pkts_todst;
237             counters->bypassed_bytes += bytes_tosrc + bytes_todst;
238             return 0;
239         } else {
240             SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
241             if (f->livedev) {
242                 if (FLOW_IS_IPV4(f)) {
243                     LiveDevSubBypassStats(f->livedev, 1, AF_INET);
244                 } else if (FLOW_IS_IPV6(f)) {
245                     LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
246                 }
247             }
248             counters->bypassed_count++;
249             return 1;
250         }
251     }
252 #endif /* CAPTURE_OFFLOAD */
253     return 1;
254 }
255 
FMFlowLock(Flow * f)256 static inline void FMFlowLock(Flow *f)
257 {
258     FLOWLOCK_WRLOCK(f);
259 }
260 
261 typedef struct FlowManagerTimeoutThread {
262     /* used to temporarily store flows that have timed out and are
263      * removed from the hash */
264     FlowQueuePrivate aside_queue;
265 } FlowManagerTimeoutThread;
266 
ProcessAsideQueue(FlowManagerTimeoutThread * td,FlowTimeoutCounters * counters)267 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
268 {
269     FlowQueuePrivate recycle = { NULL, NULL, 0 };
270     counters->flows_aside += td->aside_queue.len;
271 
272     uint32_t cnt = 0;
273     Flow *f;
274     while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
275         /* flow is still locked */
276 
277         if (f->proto == IPPROTO_TCP && !(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
278 #ifdef CAPTURE_OFFLOAD
279                 f->flow_state != FLOW_STATE_CAPTURE_BYPASSED &&
280 #endif
281                 f->flow_state != FLOW_STATE_LOCAL_BYPASSED &&
282                 FlowForceReassemblyNeedReassembly(f) == 1) {
283             /* Send the flow to its thread */
284             FlowForceReassemblyForFlow(f);
285             FLOWLOCK_UNLOCK(f);
286             /* flow ownership is passed to the worker thread */
287 
288             counters->flows_aside_needs_work++;
289             continue;
290         }
291         FLOWLOCK_UNLOCK(f);
292 
293         FlowQueuePrivateAppendFlow(&recycle, f);
294         if (recycle.len == 100) {
295             FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
296         }
297         cnt++;
298     }
299     if (recycle.len) {
300         FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
301     }
302     return cnt;
303 }
304 
305 /**
306  *  \internal
307  *
308  *  \brief check all flows in a hash row for timing out
309  *
310  *  \param f last flow in the hash row
311  *  \param ts timestamp
312  *  \param emergency bool indicating emergency mode
313  *  \param counters ptr to FlowTimeoutCounters structure
314  */
FlowManagerHashRowTimeout(FlowManagerTimeoutThread * td,Flow * f,struct timeval * ts,int emergency,FlowTimeoutCounters * counters,int32_t * next_ts)315 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td,
316         Flow *f, struct timeval *ts,
317         int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
318 {
319     uint32_t checked = 0;
320     Flow *prev_f = NULL;
321 
322     do {
323         checked++;
324 
325         /* check flow timeout based on lastts and state. Both can be
326          * accessed w/o Flow lock as we do have the hash row lock (so flow
327          * can't disappear) and flow_state is atomic. lastts can only
328          * be modified when we have both the flow and hash row lock */
329 
330         /* timeout logic goes here */
331         if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
332 
333             counters->flows_notimeout++;
334 
335             prev_f = f;
336             f = f->next;
337             continue;
338         }
339 
340         FMFlowLock(f); //FLOWLOCK_WRLOCK(f);
341 
342         Flow *next_flow = f->next;
343 
344         /* never prune a flow that is used by a packet we
345          * are currently processing in one of the threads */
346         if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) {
347             FLOWLOCK_UNLOCK(f);
348             prev_f = f;
349             if (f->use_cnt > 0) {
350                 counters->flows_timeout_inuse++;
351             }
352             f = f->next;
353             continue;
354         }
355 
356         f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
357 
358         counters->flows_timeout++;
359 
360         RemoveFromHash(f, prev_f);
361 
362         FlowQueuePrivateAppendFlow(&td->aside_queue, f);
363         /* flow is still locked in the queue */
364 
365         f = next_flow;
366     } while (f != NULL);
367 
368     counters->flows_checked += checked;
369     if (checked > counters->rows_maxlen)
370         counters->rows_maxlen = checked;
371 }
372 
FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread * td,Flow * f,struct timeval * ts,FlowTimeoutCounters * counters)373 static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td,
374         Flow *f, struct timeval *ts, FlowTimeoutCounters *counters)
375 {
376     do {
377         FLOWLOCK_WRLOCK(f);
378         Flow *next_flow = f->next;
379         f->next = NULL;
380         f->fb = NULL;
381 
382         DEBUG_VALIDATE_BUG_ON(f->use_cnt > 0);
383 
384         FlowQueuePrivateAppendFlow(&td->aside_queue, f);
385         /* flow is still locked in the queue */
386 
387         f = next_flow;
388     } while (f != NULL);
389 }
390 
391 /**
392  *  \brief time out flows from the hash
393  *
394  *  \param ts timestamp
395  *  \param hash_min min hash index to consider
396  *  \param hash_max max hash index to consider
397  *  \param counters ptr to FlowTimeoutCounters structure
398  *
399  *  \retval cnt number of timed out flow
400  */
FlowTimeoutHash(FlowManagerTimeoutThread * td,struct timeval * ts,const uint32_t hash_min,const uint32_t hash_max,FlowTimeoutCounters * counters)401 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td,
402         struct timeval *ts,
403         const uint32_t hash_min, const uint32_t hash_max,
404         FlowTimeoutCounters *counters)
405 {
406     uint32_t cnt = 0;
407     const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
408     const uint32_t rows_checked = hash_max - hash_min;
409     uint32_t rows_skipped = 0;
410     uint32_t rows_empty = 0;
411 
412 #if __WORDSIZE==64
413 #define BITS 64
414 #define TYPE uint64_t
415 #else
416 #define BITS 32
417 #define TYPE uint32_t
418 #endif
419 
420     for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
421         TYPE check_bits = 0;
422         const uint32_t check = MIN(BITS, (hash_max - idx));
423         for (uint32_t i = 0; i < check; i++) {
424             FlowBucket *fb = &flow_hash[idx+i];
425             check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i;
426         }
427         if (check_bits == 0)
428             continue;
429 
430         for (uint32_t i = 0; i < check; i++) {
431             FlowBucket *fb = &flow_hash[idx+i];
432             if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) {
433                 FBLOCK_LOCK(fb);
434                 Flow *evicted = NULL;
435                 if (fb->evicted != NULL || fb->head != NULL) {
436                     if (fb->evicted != NULL) {
437                         /* transfer out of bucket so we can do additional work outside
438                          * of the bucket lock */
439                         evicted = fb->evicted;
440                         fb->evicted = NULL;
441                     }
442                     if (fb->head != NULL) {
443                         int32_t next_ts = 0;
444                         FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
445 
446                         if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
447                             SC_ATOMIC_SET(fb->next_ts, next_ts);
448                     }
449                     if (fb->evicted == NULL && fb->head == NULL) {
450                         SC_ATOMIC_SET(fb->next_ts, INT_MAX);
451                     }
452                 } else {
453                     SC_ATOMIC_SET(fb->next_ts, INT_MAX);
454                     rows_empty++;
455                 }
456                 FBLOCK_UNLOCK(fb);
457                 /* processed evicted list */
458                 if (evicted) {
459                     FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
460                 }
461             } else {
462                 rows_skipped++;
463             }
464         }
465         if (td->aside_queue.len) {
466             cnt += ProcessAsideQueue(td, counters);
467         }
468     }
469 
470     counters->rows_checked += rows_checked;
471     counters->rows_skipped += rows_skipped;
472     counters->rows_empty += rows_empty;
473 
474     if (td->aside_queue.len) {
475         cnt += ProcessAsideQueue(td, counters);
476     }
477     counters->flows_removed += cnt;
478     /* coverity[missing_unlock : FALSE] */
479     return cnt;
480 }
481 
FlowTimeoutHashInChunks(FlowManagerTimeoutThread * td,struct timeval * ts,const uint32_t hash_min,const uint32_t hash_max,FlowTimeoutCounters * counters,uint32_t iter,const uint32_t chunks)482 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td,
483         struct timeval *ts,
484         const uint32_t hash_min, const uint32_t hash_max,
485         FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks)
486 {
487     const uint32_t rows = hash_max - hash_min;
488     const uint32_t chunk_size = rows / chunks;
489 
490     const uint32_t min = iter * chunk_size + hash_min;
491     uint32_t max = min + chunk_size;
492     /* we start at beginning of hash at next iteration so let's check
493      * hash till the end */
494     if (iter + 1 == chunks) {
495         max = hash_max;
496     }
497     const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters);
498     return cnt;
499 }
500 
501 /**
502  *  \internal
503  *
504  *  \brief move all flows out of a hash row
505  *
506  *  \param f last flow in the hash row
507  *
508  *  \retval cnt removed out flows
509  */
FlowManagerHashRowCleanup(Flow * f,FlowQueuePrivate * recycle_q,const int mode)510 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
511 {
512     uint32_t cnt = 0;
513 
514     do {
515         FLOWLOCK_WRLOCK(f);
516 
517         Flow *next_flow = f->next;
518 
519         /* remove from the hash */
520         if (mode == 0) {
521             RemoveFromHash(f, NULL);
522         } else {
523             FlowBucket *fb = f->fb;
524             fb->evicted = f->next;
525             f->next = NULL;
526             f->fb = NULL;
527         }
528         f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN;
529 
530         /* no one is referring to this flow, use_cnt 0, removed from hash
531          * so we can unlock it and move it to the recycle queue. */
532         FLOWLOCK_UNLOCK(f);
533         FlowQueuePrivateAppendFlow(recycle_q, f);
534 
535         cnt++;
536 
537         f = next_flow;
538     } while (f != NULL);
539 
540     return cnt;
541 }
542 
543 /**
544  *  \brief remove all flows from the hash
545  *
546  *  \retval cnt number of removes out flows
547  */
FlowCleanupHash(void)548 static uint32_t FlowCleanupHash(void)
549 {
550     FlowQueuePrivate local_queue = { NULL, NULL, 0 };
551     uint32_t cnt = 0;
552 
553     for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
554         FlowBucket *fb = &flow_hash[idx];
555 
556         FBLOCK_LOCK(fb);
557 
558         if (fb->head != NULL) {
559             /* we have a flow, or more than one */
560             cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
561         }
562         if (fb->evicted != NULL) {
563             /* we have a flow, or more than one */
564             cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
565         }
566 
567         FBLOCK_UNLOCK(fb);
568         if (local_queue.len >= 25) {
569             FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
570         }
571     }
572     FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
573 
574     return cnt;
575 }
576 
Recycler(ThreadVars * tv,void * output_thread_data,Flow * f)577 static void Recycler(ThreadVars *tv, void *output_thread_data, Flow *f)
578 {
579     FLOWLOCK_WRLOCK(f);
580 
581     (void)OutputFlowLog(tv, output_thread_data, f);
582 
583     FlowClearMemory (f, f->protomap);
584     FLOWLOCK_UNLOCK(f);
585     FlowSparePoolReturnFlow(f);
586 }
587 
588 typedef struct FlowQueueTimeoutCounters {
589     uint32_t flows_removed;
590     uint32_t flows_timeout;
591 } FlowQueueTimeoutCounters;
592 
593 extern int g_detect_disabled;
594 
595 typedef struct FlowCounters_ {
596     uint16_t flow_mgr_full_pass;
597     uint16_t flow_mgr_cnt_clo;
598     uint16_t flow_mgr_cnt_new;
599     uint16_t flow_mgr_cnt_est;
600     uint16_t flow_mgr_cnt_byp;
601     uint16_t flow_mgr_spare;
602     uint16_t flow_emerg_mode_enter;
603     uint16_t flow_emerg_mode_over;
604 
605     uint16_t flow_mgr_flows_checked;
606     uint16_t flow_mgr_flows_notimeout;
607     uint16_t flow_mgr_flows_timeout;
608     uint16_t flow_mgr_flows_timeout_inuse;
609     uint16_t flow_mgr_flows_aside;
610     uint16_t flow_mgr_flows_aside_needs_work;
611 
612     uint16_t flow_mgr_rows_maxlen;
613 
614     uint16_t flow_bypassed_cnt_clo;
615     uint16_t flow_bypassed_pkts;
616     uint16_t flow_bypassed_bytes;
617 } FlowCounters;
618 
619 typedef struct FlowManagerThreadData_ {
620     uint32_t instance;
621     uint32_t min;
622     uint32_t max;
623 
624     FlowCounters cnt;
625 
626     FlowManagerTimeoutThread timeout;
627 } FlowManagerThreadData;
628 
FlowCountersInit(ThreadVars * t,FlowCounters * fc)629 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
630 {
631     fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
632     fc->flow_mgr_cnt_clo = StatsRegisterCounter("flow.mgr.closed_pruned", t);
633     fc->flow_mgr_cnt_new = StatsRegisterCounter("flow.mgr.new_pruned", t);
634     fc->flow_mgr_cnt_est = StatsRegisterCounter("flow.mgr.est_pruned", t);
635     fc->flow_mgr_cnt_byp = StatsRegisterCounter("flow.mgr.bypassed_pruned", t);
636     fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
637     fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
638     fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
639 
640     fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
641     fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
642     fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
643     fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
644     fc->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow.mgr.flows_timeout_inuse", t);
645     fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
646     fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
647 
648     fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
649     fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
650     fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
651 }
652 
FlowManagerThreadInit(ThreadVars * t,const void * initdata,void ** data)653 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
654 {
655     FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
656     if (ftd == NULL)
657         return TM_ECODE_FAILED;
658 
659     ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
660     SCLogDebug("flow manager instance %u", ftd->instance);
661 
662     /* set the min and max value used for hash row walking
663      * each thread has it's own section of the flow hash */
664     uint32_t range = flow_config.hash_size / flowmgr_number;
665     if (ftd->instance == 0)
666         ftd->max = range;
667     else if ((ftd->instance + 1) == flowmgr_number) {
668         ftd->min = (range * ftd->instance) + 1;
669         ftd->max = flow_config.hash_size;
670     } else {
671         ftd->min = (range * ftd->instance) + 1;
672         ftd->max = (range * (ftd->instance + 1));
673     }
674     BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
675 
676     SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
677 
678     /* pass thread data back to caller */
679     *data = ftd;
680 
681     FlowCountersInit(t, &ftd->cnt);
682 
683     PacketPoolInit();
684     return TM_ECODE_OK;
685 }
686 
FlowManagerThreadDeinit(ThreadVars * t,void * data)687 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
688 {
689     PacketPoolDestroy();
690     SCFree(data);
691     return TM_ECODE_OK;
692 }
693 
FlowTimeoutsMin(void)694 static uint32_t FlowTimeoutsMin(void)
695 {
696     FlowProtoTimeoutPtr t = SC_ATOMIC_GET(flow_timeouts);
697     uint32_t m = -1;
698     for (unsigned int i = 0; i < FLOW_PROTO_MAX; i++) {
699         m = MIN(m, t[i].new_timeout);
700         m = MIN(m, t[i].est_timeout);
701 
702         if (i == FLOW_PROTO_TCP) {
703             m = MIN(m, t[i].closed_timeout);
704         }
705         if (i == FLOW_PROTO_TCP || i == FLOW_PROTO_UDP) {
706             m = MIN(m, t[i].bypassed_timeout);
707         }
708     }
709     return m;
710 }
711 
712 //#define FM_PROFILE
713 
714 /** \brief Thread that manages the flow table and times out flows.
715  *
716  *  \param td ThreadVars casted to void ptr
717  *
718  *  Keeps an eye on the spare list, alloc flows if needed...
719  */
FlowManager(ThreadVars * th_v,void * thread_data)720 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
721 {
722     FlowManagerThreadData *ftd = thread_data;
723     struct timeval ts;
724     uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
725     bool emerg = false;
726     bool prev_emerg = false;
727     uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
728     uint32_t flow_last_sec = 0;
729 /* VJ leaving disabled for now, as hosts are only used by tags and the numbers
730  * are really low. Might confuse ppl
731     uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
732     uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v);
733     uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
734 */
735     memset(&ts, 0, sizeof(ts));
736     uint32_t hash_passes = 0;
737 #ifdef FM_PROFILE
738     uint32_t hash_row_checks = 0;
739     uint32_t hash_passes_chunks = 0;
740 #endif
741     uint32_t hash_full_passes = 0;
742 
743     const uint32_t min_timeout = FlowTimeoutsMin();
744     const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
745 
746     /* don't start our activities until time is setup */
747     while (!TimeModeIsReady()) {
748         if (suricata_ctl_flags != 0)
749             return TM_ECODE_OK;
750     }
751 
752     SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name,
753             ftd->instance, min_timeout, pass_in_sec);
754 
755 #ifdef FM_PROFILE
756     struct timeval endts;
757     struct timeval active;
758     struct timeval paused;
759     struct timeval sleeping;
760     memset(&endts, 0, sizeof(endts));
761     memset(&active, 0, sizeof(active));
762     memset(&paused, 0, sizeof(paused));
763     memset(&sleeping, 0, sizeof(sleeping));
764 #endif
765 
766     struct timeval startts;
767     memset(&startts, 0, sizeof(startts));
768     gettimeofday(&startts, NULL);
769 
770     uint32_t hash_pass_iter = 0;
771     uint32_t emerg_over_cnt = 0;
772     uint64_t next_run_ms = 0;
773 
774     while (1)
775     {
776         if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
777             TmThreadsSetFlag(th_v, THV_PAUSED);
778 #ifdef FM_PROFILE
779             struct timeval pause_startts;
780             memset(&pause_startts, 0, sizeof(pause_startts));
781             gettimeofday(&pause_startts, NULL);
782 #endif
783             TmThreadTestThreadUnPaused(th_v);
784 #ifdef FM_PROFILE
785             struct timeval pause_endts;
786             memset(&pause_endts, 0, sizeof(pause_endts));
787             gettimeofday(&pause_endts, NULL);
788             struct timeval pause_time;
789             memset(&pause_time, 0, sizeof(pause_time));
790             timersub(&pause_endts, &pause_startts, &pause_time);
791             timeradd(&paused, &pause_time, &paused);
792 #endif
793             TmThreadsUnsetFlag(th_v, THV_PAUSED);
794         }
795 
796         if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
797             emerg = true;
798         }
799 #ifdef FM_PROFILE
800         struct timeval run_startts;
801         memset(&run_startts, 0, sizeof(run_startts));
802         gettimeofday(&run_startts, NULL);
803 #endif
804         /* Get the time */
805         memset(&ts, 0, sizeof(ts));
806         TimeGet(&ts);
807         SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
808         const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
809         const uint32_t rt = (uint32_t)ts.tv_sec;
810         const bool emerge_p = (emerg && !prev_emerg);
811         if (emerge_p) {
812             next_run_ms = 0;
813             prev_emerg = true;
814             SCLogNotice("Flow emergency mode entered...");
815             StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
816         }
817         if (ts_ms >= next_run_ms) {
818             if (ftd->instance == 0) {
819                 const uint32_t sq_len = FlowSpareGetPoolSize();
820                 const uint32_t spare_perc = sq_len * 100 / flow_config.prealloc;
821                 /* see if we still have enough spare flows */
822                 if (spare_perc < 90 || spare_perc > 110) {
823                     FlowSparePoolUpdate(sq_len);
824                 }
825             }
826             const uint32_t secs_passed = rt - flow_last_sec;
827 
828             /* try to time out flows */
829             FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
830 
831             if (emerg) {
832                 /* in emergency mode, do a full pass of the hash table */
833                 FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters);
834                 hash_passes++;
835                 hash_full_passes++;
836                 hash_passes++;
837 #ifdef FM_PROFILE
838                 hash_passes_chunks += 1;
839                 hash_row_checks += counters.rows_checked;
840 #endif
841                 StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
842             } else {
843                 /* non-emergency mode: scan part of the hash */
844                 const uint32_t chunks = MIN(secs_passed, pass_in_sec);
845                 for (uint32_t i = 0; i < chunks; i++) {
846                     FlowTimeoutHashInChunks(&ftd->timeout, &ts, ftd->min, ftd->max,
847                             &counters, hash_pass_iter, pass_in_sec);
848                     hash_pass_iter++;
849                     if (hash_pass_iter == pass_in_sec) {
850                         hash_pass_iter = 0;
851                         hash_full_passes++;
852                         StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
853                     }
854                 }
855                 hash_passes++;
856 #ifdef FM_PROFILE
857                 hash_row_checks += counters.rows_checked;
858                 hash_passes_chunks += chunks;
859 #endif
860             }
861             flow_last_sec = rt;
862 
863             /*
864                StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
865                uint32_t hosts_active = HostGetActiveCount();
866                StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active);
867                uint32_t hosts_spare = HostGetSpareCount();
868                StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare);
869              */
870             StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_clo, (uint64_t)counters.clo);
871             StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_new, (uint64_t)counters.new);
872             StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_est, (uint64_t)counters.est);
873             StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_byp, (uint64_t)counters.byp);
874 
875             StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
876             StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
877 
878             StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
879             //StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
880             StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
881             StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters.flows_aside);
882             StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside_needs_work, (uint64_t)counters.flows_aside_needs_work);
883 
884             StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters.bypassed_count);
885             StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters.bypassed_pkts);
886             StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters.bypassed_bytes);
887 
888             StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
889             // TODO AVG MAXLEN
890             // TODO LOOKUP STEPS MAXLEN and AVG LEN
891             /* Don't fear, FlowManagerThread is here...
892              * clear emergency bit if we have at least xx flows pruned. */
893             uint32_t len = FlowSpareGetPoolSize();
894             StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len);
895             if (emerg == true) {
896                 SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
897                         "flow_spare_q status: %"PRIu32"%% flows at the queue",
898                         len, flow_config.prealloc, len * 100 / flow_config.prealloc);
899 
900             /* only if we have pruned this "emergency_recovery" percentage
901              * of flows, we will unset the emergency bit */
902             if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
903                 emerg_over_cnt++;
904             } else {
905                 emerg_over_cnt = 0;
906             }
907 
908             if (emerg_over_cnt >= 30) {
909                 SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
910                 FlowTimeoutsReset();
911 
912                 emerg = false;
913                 prev_emerg = FALSE;
914                 emerg_over_cnt = 0;
915                 hash_pass_iter = 0;
916                 SCLogNotice("Flow emergency mode over, back to normal... unsetting"
917                           " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
918                           "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
919                           "%% flows at the queue", (uintmax_t)ts.tv_sec,
920                           (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
921 
922                 StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
923             }
924         }
925         next_run_ms = ts_ms + 667;
926         if (emerg)
927             next_run_ms = ts_ms + 250;
928         }
929         if (flow_last_sec == 0) {
930             flow_last_sec = rt;
931         }
932 
933         if (ftd->instance == 0 &&
934                 (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) {
935             DefragTimeoutHash(&ts);
936             //uint32_t hosts_pruned =
937             HostTimeoutHash(&ts);
938             IPPairTimeoutHash(&ts);
939             other_last_sec = (uint32_t)ts.tv_sec;
940         }
941 
942 
943 #ifdef FM_PROFILE
944         struct timeval run_endts;
945         memset(&run_endts, 0, sizeof(run_endts));
946         gettimeofday(&run_endts, NULL);
947         struct timeval run_time;
948         memset(&run_time, 0, sizeof(run_time));
949         timersub(&run_endts, &run_startts, &run_time);
950         timeradd(&active, &run_time, &active);
951 #endif
952 
953         if (TmThreadsCheckFlag(th_v, THV_KILL)) {
954             StatsSyncCounters(th_v);
955             break;
956         }
957 
958 #ifdef FM_PROFILE
959         struct timeval sleep_startts;
960         memset(&sleep_startts, 0, sizeof(sleep_startts));
961         gettimeofday(&sleep_startts, NULL);
962 #endif
963         usleep(250);
964 
965 #ifdef FM_PROFILE
966         struct timeval sleep_endts;
967         memset(&sleep_endts, 0, sizeof(sleep_endts));
968         gettimeofday(&sleep_endts, NULL);
969 
970         struct timeval sleep_time;
971         memset(&sleep_time, 0, sizeof(sleep_time));
972         timersub(&sleep_endts, &sleep_startts, &sleep_time);
973         timeradd(&sleeping, &sleep_time, &sleeping);
974 #endif
975         SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
976 
977         StatsSyncCountersIfSignalled(th_v);
978     }
979     SCLogPerf("%" PRIu32 " new flows, %" PRIu32 " established flows were "
980               "timed out, %"PRIu32" flows in closed state", new_cnt,
981               established_cnt, closing_cnt);
982 
983 #ifdef FM_PROFILE
984     SCLogNotice("hash passes %u avg chunks %u full %u rows %u (rows/s %u)",
985             hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1),
986             hash_full_passes, hash_row_checks,
987             hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
988 
989     gettimeofday(&endts, NULL);
990     struct timeval total_run_time;
991     timersub(&endts, &startts, &total_run_time);
992 
993     SCLogNotice("FM: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
994             (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
995             (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
996             (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
997             (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
998 #endif
999     return TM_ECODE_OK;
1000 }
1001 
1002 /** \brief spawn the flow manager thread */
FlowManagerThreadSpawn()1003 void FlowManagerThreadSpawn()
1004 {
1005     intmax_t setting = 1;
1006     (void)ConfGetInt("flow.managers", &setting);
1007 
1008     if (setting < 1 || setting > 1024) {
1009         FatalError(SC_ERR_INVALID_ARGUMENTS,
1010                 "invalid flow.managers setting %"PRIdMAX, setting);
1011     }
1012     flowmgr_number = (uint32_t)setting;
1013 
1014     SCLogConfig("using %u flow manager threads", flowmgr_number);
1015     StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse);
1016 
1017     for (uint32_t u = 0; u < flowmgr_number; u++) {
1018         char name[TM_THREAD_NAME_MAX];
1019         snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
1020 
1021         ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
1022                 "FlowManager", 0);
1023         BUG_ON(tv_flowmgr == NULL);
1024 
1025         if (tv_flowmgr == NULL) {
1026             FatalError(SC_ERR_FATAL, "flow manager thread creation failed");
1027         }
1028         if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1029             FatalError(SC_ERR_FATAL, "flow manager thread spawn failed");
1030         }
1031     }
1032     return;
1033 }
1034 
1035 typedef struct FlowRecyclerThreadData_ {
1036     void *output_thread_data;
1037 } FlowRecyclerThreadData;
1038 
FlowRecyclerThreadInit(ThreadVars * t,const void * initdata,void ** data)1039 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
1040 {
1041     FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
1042     if (ftd == NULL)
1043         return TM_ECODE_FAILED;
1044     if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
1045         SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
1046         SCFree(ftd);
1047         return TM_ECODE_FAILED;
1048     }
1049     SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1050 
1051     *data = ftd;
1052     return TM_ECODE_OK;
1053 }
1054 
FlowRecyclerThreadDeinit(ThreadVars * t,void * data)1055 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1056 {
1057     FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
1058     if (ftd->output_thread_data != NULL)
1059         OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
1060 
1061     SCFree(data);
1062     return TM_ECODE_OK;
1063 }
1064 
1065 /** \brief Thread that manages timed out flows.
1066  *
1067  *  \param td ThreadVars casted to void ptr
1068  */
FlowRecycler(ThreadVars * th_v,void * thread_data)1069 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1070 {
1071     struct timeval ts;
1072     uint64_t recycled_cnt = 0;
1073     FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1074     BUG_ON(ftd == NULL);
1075 
1076     memset(&ts, 0, sizeof(ts));
1077     uint32_t fr_passes = 0;
1078 
1079 #ifdef FM_PROFILE
1080     struct timeval endts;
1081     struct timeval active;
1082     struct timeval paused;
1083     struct timeval sleeping;
1084     memset(&endts, 0, sizeof(endts));
1085     memset(&active, 0, sizeof(active));
1086     memset(&paused, 0, sizeof(paused));
1087     memset(&sleeping, 0, sizeof(sleeping));
1088 #endif
1089     struct timeval startts;
1090     memset(&startts, 0, sizeof(startts));
1091     gettimeofday(&startts, NULL);
1092 
1093     while (1)
1094     {
1095         if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1096             TmThreadsSetFlag(th_v, THV_PAUSED);
1097 #ifdef FM_PROFILE
1098             struct timeval pause_startts;
1099             memset(&pause_startts, 0, sizeof(pause_startts));
1100             gettimeofday(&pause_startts, NULL);
1101 #endif
1102             TmThreadTestThreadUnPaused(th_v);
1103 
1104 #ifdef FM_PROFILE
1105             struct timeval pause_endts;
1106             memset(&pause_endts, 0, sizeof(pause_endts));
1107             gettimeofday(&pause_endts, NULL);
1108 
1109             struct timeval pause_time;
1110             memset(&pause_time, 0, sizeof(pause_time));
1111             timersub(&pause_endts, &pause_startts, &pause_time);
1112             timeradd(&paused, &pause_time, &paused);
1113 #endif
1114             TmThreadsUnsetFlag(th_v, THV_PAUSED);
1115         }
1116         fr_passes++;
1117 #ifdef FM_PROFILE
1118         struct timeval run_startts;
1119         memset(&run_startts, 0, sizeof(run_startts));
1120         gettimeofday(&run_startts, NULL);
1121 #endif
1122         SC_ATOMIC_ADD(flowrec_busy,1);
1123         FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
1124 
1125         const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1126 
1127         /* Get the time */
1128         memset(&ts, 0, sizeof(ts));
1129         TimeGet(&ts);
1130         SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
1131 
1132         Flow *f;
1133         while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1134             Recycler(th_v, ftd->output_thread_data, f);
1135             recycled_cnt++;
1136         }
1137         SC_ATOMIC_SUB(flowrec_busy,1);
1138 
1139 #ifdef FM_PROFILE
1140         struct timeval run_endts;
1141         memset(&run_endts, 0, sizeof(run_endts));
1142         gettimeofday(&run_endts, NULL);
1143 
1144         struct timeval run_time;
1145         memset(&run_time, 0, sizeof(run_time));
1146         timersub(&run_endts, &run_startts, &run_time);
1147         timeradd(&active, &run_time, &active);
1148 #endif
1149 
1150         if (bail) {
1151             break;
1152         }
1153 
1154 #ifdef FM_PROFILE
1155         struct timeval sleep_startts;
1156         memset(&sleep_startts, 0, sizeof(sleep_startts));
1157         gettimeofday(&sleep_startts, NULL);
1158 #endif
1159         usleep(250);
1160 #ifdef FM_PROFILE
1161         struct timeval sleep_endts;
1162         memset(&sleep_endts, 0, sizeof(sleep_endts));
1163         gettimeofday(&sleep_endts, NULL);
1164         struct timeval sleep_time;
1165         memset(&sleep_time, 0, sizeof(sleep_time));
1166         timersub(&sleep_endts, &sleep_startts, &sleep_time);
1167         timeradd(&sleeping, &sleep_time, &sleeping);
1168 #endif
1169 
1170         SCLogDebug("woke up...");
1171 
1172         StatsSyncCountersIfSignalled(th_v);
1173     }
1174     StatsSyncCounters(th_v);
1175 #ifdef FM_PROFILE
1176     gettimeofday(&endts, NULL);
1177     struct timeval total_run_time;
1178     timersub(&endts, &startts, &total_run_time);
1179     SCLogNotice("FR: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
1180             (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
1181             (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
1182             (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
1183             (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
1184 
1185     SCLogNotice("FR passes %u passes/s %u", fr_passes,
1186             (uint32_t)fr_passes/((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
1187 #endif
1188     SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1189     return TM_ECODE_OK;
1190 }
1191 
FlowRecyclerReadyToShutdown(void)1192 static bool FlowRecyclerReadyToShutdown(void)
1193 {
1194     if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1195         return false;
1196     }
1197     uint32_t len = 0;
1198     FQLOCK_LOCK(&flow_recycle_q);
1199     len = flow_recycle_q.qlen;
1200     FQLOCK_UNLOCK(&flow_recycle_q);
1201 
1202     return ((len == 0));
1203 }
1204 
1205 /** \brief spawn the flow recycler thread */
FlowRecyclerThreadSpawn()1206 void FlowRecyclerThreadSpawn()
1207 {
1208     intmax_t setting = 1;
1209     (void)ConfGetInt("flow.recyclers", &setting);
1210 
1211     if (setting < 1 || setting > 1024) {
1212         FatalError(SC_ERR_INVALID_ARGUMENTS,
1213                 "invalid flow.recyclers setting %"PRIdMAX, setting);
1214     }
1215     flowrec_number = (uint32_t)setting;
1216 
1217     SCLogConfig("using %u flow recycler threads", flowrec_number);
1218 
1219     for (uint32_t u = 0; u < flowrec_number; u++) {
1220         char name[TM_THREAD_NAME_MAX];
1221         snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1222 
1223         ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1224                 "FlowRecycler", 0);
1225 
1226         if (tv_flowrec == NULL) {
1227             FatalError(SC_ERR_FATAL, "flow recycler thread creation failed");
1228         }
1229         if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1230             FatalError(SC_ERR_FATAL, "flow recycler thread spawn failed");
1231         }
1232     }
1233     return;
1234 }
1235 
1236 /**
1237  * \brief Used to disable flow recycler thread(s).
1238  *
1239  * \note this should only be called when the flow manager is already gone
1240  *
1241  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1242  *       thread.  We need an all weather identification scheme.
1243  */
FlowDisableFlowRecyclerThread(void)1244 void FlowDisableFlowRecyclerThread(void)
1245 {
1246     int cnt = 0;
1247 
1248     /* move all flows still in the hash to the recycler queue */
1249 #ifndef DEBUG
1250     (void)FlowCleanupHash();
1251 #else
1252     uint32_t flows = FlowCleanupHash();
1253     SCLogDebug("flows to progress: %u", flows);
1254 #endif
1255 
1256     /* make sure all flows are processed */
1257     do {
1258         usleep(10);
1259     } while (FlowRecyclerReadyToShutdown() == false);
1260 
1261     SCMutexLock(&tv_root_lock);
1262     /* flow recycler thread(s) is/are a part of mgmt threads */
1263     for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1264         if (strncasecmp(tv->name, thread_name_flow_rec,
1265             strlen(thread_name_flow_rec)) == 0)
1266         {
1267             TmThreadsSetFlag(tv, THV_KILL);
1268             cnt++;
1269         }
1270     }
1271     SCMutexUnlock(&tv_root_lock);
1272 
1273     struct timeval start_ts;
1274     struct timeval cur_ts;
1275     gettimeofday(&start_ts, NULL);
1276 
1277 again:
1278     gettimeofday(&cur_ts, NULL);
1279     if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1280         FatalError(SC_ERR_SHUTDOWN, "unable to get all flow recycler "
1281                 "threads to shutdown in time");
1282     }
1283 
1284     SCMutexLock(&tv_root_lock);
1285     for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1286         if (strncasecmp(tv->name, thread_name_flow_rec,
1287             strlen(thread_name_flow_rec)) == 0)
1288         {
1289             if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1290                 SCMutexUnlock(&tv_root_lock);
1291                 /* sleep outside lock */
1292                 SleepMsec(1);
1293                 goto again;
1294             }
1295         }
1296     }
1297     SCMutexUnlock(&tv_root_lock);
1298 
1299     /* reset count, so we can kill and respawn (unix socket) */
1300     SC_ATOMIC_SET(flowrec_cnt, 0);
1301     return;
1302 }
1303 
TmModuleFlowManagerRegister(void)1304 void TmModuleFlowManagerRegister (void)
1305 {
1306     tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1307     tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1308     tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1309     tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1310     tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
1311     tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
1312     SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1313 
1314     SC_ATOMIC_INIT(flowmgr_cnt);
1315     SC_ATOMIC_INITPTR(flow_timeouts);
1316 }
1317 
TmModuleFlowRecyclerRegister(void)1318 void TmModuleFlowRecyclerRegister (void)
1319 {
1320     tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1321     tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1322     tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1323     tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1324     tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
1325     tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
1326     SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1327 
1328     SC_ATOMIC_INIT(flowrec_cnt);
1329     SC_ATOMIC_INIT(flowrec_busy);
1330 }
1331