1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 thread_local uint64_t mutex_lock_contention;
48 thread_local uint64_t mutex_lock_wait_ticks;
49 thread_local uint64_t mutex_lock_cnt;
50 
51 thread_local uint64_t spin_lock_contention;
52 thread_local uint64_t spin_lock_wait_ticks;
53 thread_local uint64_t spin_lock_cnt;
54 
55 thread_local uint64_t rww_lock_contention;
56 thread_local uint64_t rww_lock_wait_ticks;
57 thread_local uint64_t rww_lock_cnt;
58 
59 thread_local uint64_t rwr_lock_contention;
60 thread_local uint64_t rwr_lock_wait_ticks;
61 thread_local uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 static void TmThreadDeinitMC(ThreadVars *tv);
76 
77 /* root of the threadvars list */
78 ThreadVars *tv_root[TVT_MAX] = { NULL };
79 
80 /* lock to protect tv_root */
81 SCMutex tv_root_lock = SCMUTEX_INITIALIZER;
82 
83 /**
84  * \brief Check if a thread flag is set.
85  *
86  * \retval 1 flag is set.
87  * \retval 0 flag is not set.
88  */
TmThreadsCheckFlag(ThreadVars * tv,uint32_t flag)89 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
90 {
91     return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
92 }
93 
94 /**
95  * \brief Set a thread flag.
96  */
TmThreadsSetFlag(ThreadVars * tv,uint32_t flag)97 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
98 {
99     SC_ATOMIC_OR(tv->flags, flag);
100 }
101 
102 /**
103  * \brief Unset a thread flag.
104  */
TmThreadsUnsetFlag(ThreadVars * tv,uint32_t flag)105 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
106 {
107     SC_ATOMIC_AND(tv->flags, ~flag);
108 }
109 
110 /**
111  * \brief Separate run function so we can call it recursively.
112  */
TmThreadsSlotVarRun(ThreadVars * tv,Packet * p,TmSlot * slot)113 TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
114 {
115     for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
116         PACKET_PROFILING_TMM_START(p, s->tm_id);
117         TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
118         PACKET_PROFILING_TMM_END(p, s->tm_id);
119 
120         /* handle error */
121         if (unlikely(r == TM_ECODE_FAILED)) {
122             /* Encountered error.  Return packets to packetpool and return */
123             TmThreadsSlotProcessPktFail(tv, s, NULL);
124             return TM_ECODE_FAILED;
125         }
126 
127         /* handle new packets */
128         while (tv->decode_pq.top != NULL) {
129             Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
130             if (unlikely(extra_p == NULL))
131                 continue;
132 
133             /* see if we need to process the packet */
134             if (s->slot_next != NULL) {
135                 r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
136                 if (unlikely(r == TM_ECODE_FAILED)) {
137                     TmThreadsSlotProcessPktFail(tv, s, extra_p);
138                     return TM_ECODE_FAILED;
139                 }
140             }
141             tv->tmqh_out(tv, extra_p);
142         }
143     }
144 
145     return TM_ECODE_OK;
146 }
147 
148 /** \internal
149  *
150  *  \brief Process flow timeout packets
151  *
152  *  Process flow timeout pseudo packets. During shutdown this loop
153  *  is run until the flow engine kills the thread and the queue is
154  *  empty.
155  */
TmThreadTimeoutLoop(ThreadVars * tv,TmSlot * s)156 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
157 {
158     TmSlot *fw_slot = tv->tm_flowworker;
159     int r = TM_ECODE_OK;
160 
161     if (tv->stream_pq == NULL || fw_slot == NULL) {
162         SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
163         return r;
164     }
165 
166     SCLogDebug("flow end loop starting");
167     while (1) {
168         SCMutexLock(&tv->stream_pq->mutex_q);
169         uint32_t len = tv->stream_pq->len;
170         SCMutexUnlock(&tv->stream_pq->mutex_q);
171         if (len > 0) {
172             while (len--) {
173                 SCMutexLock(&tv->stream_pq->mutex_q);
174                 Packet *p = PacketDequeue(tv->stream_pq);
175                 SCMutexUnlock(&tv->stream_pq->mutex_q);
176                 if (likely(p)) {
177                     if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
178                         if (r == TM_ECODE_FAILED)
179                             break;
180                     }
181                 }
182             }
183         } else {
184             if (TmThreadsCheckFlag(tv, THV_KILL)) {
185                 break;
186             }
187             SleepUsec(1);
188         }
189     }
190     SCLogDebug("flow end loop complete");
191     StatsSyncCounters(tv);
192 
193     return r;
194 }
195 
196 /*
197 
198     pcap/nfq
199 
200     pkt read
201         callback
202             process_pkt
203 
204     pfring
205 
206     pkt read
207         process_pkt
208 
209     slot:
210         setup
211 
212         pkt_ack_loop(tv, slot_data)
213 
214         deinit
215 
216     process_pkt:
217         while(s)
218             run s;
219         queue;
220 
221  */
222 
TmThreadsSlotPktAcqLoop(void * td)223 static void *TmThreadsSlotPktAcqLoop(void *td)
224 {
225     ThreadVars *tv = (ThreadVars *)td;
226     TmSlot *s = tv->tm_slots;
227     char run = 1;
228     TmEcode r = TM_ECODE_OK;
229     TmSlot *slot = NULL;
230 
231     /* Set the thread name */
232     if (SCSetThreadName(tv->name) < 0) {
233         SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
234     }
235 
236     if (tv->thread_setup_flags != 0)
237         TmThreadSetupOptions(tv);
238 
239     /* Drop the capabilities for this thread */
240     SCDropCaps(tv);
241 
242     PacketPoolInit();
243 
244     /* check if we are setup properly */
245     if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
246         SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
247                                  " PktAcqLoop=%p, tmqh_in=%p,"
248                                  " tmqh_out=%p",
249                    s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
250         TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
251         pthread_exit((void *) -1);
252         return NULL;
253     }
254 
255     for (slot = s; slot != NULL; slot = slot->slot_next) {
256         if (slot->SlotThreadInit != NULL) {
257             void *slot_data = NULL;
258             r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
259             if (r != TM_ECODE_OK) {
260                 if (r == TM_ECODE_DONE) {
261                     EngineDone();
262                     TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
263                     goto error;
264                 } else {
265                     TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
266                     goto error;
267                 }
268             }
269             (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
270         }
271 
272         /* if the flowworker module is the first, get the threads input queue */
273         if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
274             tv->stream_pq = tv->inq->pq;
275             tv->tm_flowworker = slot;
276             SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
277             tv->flow_queue = FlowQueueNew();
278             if (tv->flow_queue == NULL) {
279                 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
280                 pthread_exit((void *) -1);
281                 return NULL;
282             }
283         /* setup a queue */
284         } else if (slot->tm_id == TMM_FLOWWORKER) {
285             tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
286             if (tv->stream_pq_local == NULL)
287                 FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
288             SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
289             tv->stream_pq = tv->stream_pq_local;
290             tv->tm_flowworker = slot;
291             SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
292             tv->flow_queue = FlowQueueNew();
293             if (tv->flow_queue == NULL) {
294                 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
295                 pthread_exit((void *) -1);
296                 return NULL;
297             }
298         }
299     }
300 
301     StatsSetupPrivate(tv);
302 
303     TmThreadsSetFlag(tv, THV_INIT_DONE);
304 
305     while(run) {
306         if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
307             TmThreadsSetFlag(tv, THV_PAUSED);
308             TmThreadTestThreadUnPaused(tv);
309             TmThreadsUnsetFlag(tv, THV_PAUSED);
310         }
311 
312         r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
313 
314         if (r == TM_ECODE_FAILED) {
315             TmThreadsSetFlag(tv, THV_FAILED);
316             run = 0;
317         }
318         if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
319             run = 0;
320         }
321         if (r == TM_ECODE_DONE) {
322             run = 0;
323         }
324     }
325     StatsSyncCounters(tv);
326 
327     TmThreadsSetFlag(tv, THV_FLOW_LOOP);
328 
329     /* process all pseudo packets the flow timeout may throw at us */
330     TmThreadTimeoutLoop(tv, s);
331 
332     TmThreadsSetFlag(tv, THV_RUNNING_DONE);
333     TmThreadWaitForFlag(tv, THV_DEINIT);
334 
335     PacketPoolDestroy();
336 
337     for (slot = s; slot != NULL; slot = slot->slot_next) {
338         if (slot->SlotThreadExitPrintStats != NULL) {
339             slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
340         }
341 
342         if (slot->SlotThreadDeinit != NULL) {
343             r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
344             if (r != TM_ECODE_OK) {
345                 TmThreadsSetFlag(tv, THV_CLOSED);
346                 goto error;
347             }
348         }
349     }
350 
351     tv->stream_pq = NULL;
352     SCLogDebug("%s ending", tv->name);
353     TmThreadsSetFlag(tv, THV_CLOSED);
354     pthread_exit((void *) 0);
355     return NULL;
356 
357 error:
358     tv->stream_pq = NULL;
359     pthread_exit((void *) -1);
360     return NULL;
361 }
362 
TmThreadsSlotVar(void * td)363 static void *TmThreadsSlotVar(void *td)
364 {
365     ThreadVars *tv = (ThreadVars *)td;
366     TmSlot *s = (TmSlot *)tv->tm_slots;
367     Packet *p = NULL;
368     char run = 1;
369     TmEcode r = TM_ECODE_OK;
370 
371     PacketPoolInit();//Empty();
372 
373     /* Set the thread name */
374     if (SCSetThreadName(tv->name) < 0) {
375         SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
376     }
377 
378     if (tv->thread_setup_flags != 0)
379         TmThreadSetupOptions(tv);
380 
381     /* Drop the capabilities for this thread */
382     SCDropCaps(tv);
383 
384     /* check if we are setup properly */
385     if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
386         TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
387         pthread_exit((void *) -1);
388         return NULL;
389     }
390 
391     for (; s != NULL; s = s->slot_next) {
392         if (s->SlotThreadInit != NULL) {
393             void *slot_data = NULL;
394             r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
395             if (r != TM_ECODE_OK) {
396                 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
397                 goto error;
398             }
399             (void)SC_ATOMIC_SET(s->slot_data, slot_data);
400         }
401 
402         /* special case: we need to access the stream queue
403          * from the flow timeout code */
404 
405         /* if the flowworker module is the first, get the threads input queue */
406         if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
407             tv->stream_pq = tv->inq->pq;
408             tv->tm_flowworker = s;
409             SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
410             tv->flow_queue = FlowQueueNew();
411             if (tv->flow_queue == NULL) {
412                 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
413                 pthread_exit((void *) -1);
414                 return NULL;
415             }
416         /* setup a queue */
417         } else if (s->tm_id == TMM_FLOWWORKER) {
418             tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
419             if (tv->stream_pq_local == NULL)
420                 FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
421             SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
422             tv->stream_pq = tv->stream_pq_local;
423             tv->tm_flowworker = s;
424             SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
425             tv->flow_queue = FlowQueueNew();
426             if (tv->flow_queue == NULL) {
427                 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
428                 pthread_exit((void *) -1);
429                 return NULL;
430             }
431         }
432     }
433 
434     StatsSetupPrivate(tv);
435 
436     TmThreadsSetFlag(tv, THV_INIT_DONE);
437 
438     s = (TmSlot *)tv->tm_slots;
439 
440     while (run) {
441         if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
442             TmThreadsSetFlag(tv, THV_PAUSED);
443             TmThreadTestThreadUnPaused(tv);
444             TmThreadsUnsetFlag(tv, THV_PAUSED);
445         }
446 
447         /* input a packet */
448         p = tv->tmqh_in(tv);
449 
450         /* if we didn't get a packet see if we need to do some housekeeping */
451         if (unlikely(p == NULL)) {
452             if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
453                 p = PacketGetFromQueueOrAlloc();
454                 if (p != NULL) {
455                     p->flags |= PKT_PSEUDO_STREAM_END;
456                     PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
457                 }
458             }
459         }
460 
461         if (p != NULL) {
462             /* run the thread module(s) */
463             r = TmThreadsSlotVarRun(tv, p, s);
464             if (r == TM_ECODE_FAILED) {
465                 TmqhOutputPacketpool(tv, p);
466                 TmThreadsSetFlag(tv, THV_FAILED);
467                 break;
468             }
469 
470             /* output the packet */
471             tv->tmqh_out(tv, p);
472 
473             /* now handle the stream pq packets */
474             TmThreadsHandleInjectedPackets(tv);
475         }
476 
477         if (TmThreadsCheckFlag(tv, THV_KILL)) {
478             run = 0;
479         }
480     } /* while (run) */
481     StatsSyncCounters(tv);
482 
483     TmThreadsSetFlag(tv, THV_RUNNING_DONE);
484     TmThreadWaitForFlag(tv, THV_DEINIT);
485 
486     PacketPoolDestroy();
487 
488     s = (TmSlot *)tv->tm_slots;
489 
490     for ( ; s != NULL; s = s->slot_next) {
491         if (s->SlotThreadExitPrintStats != NULL) {
492             s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
493         }
494 
495         if (s->SlotThreadDeinit != NULL) {
496             r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
497             if (r != TM_ECODE_OK) {
498                 TmThreadsSetFlag(tv, THV_CLOSED);
499                 goto error;
500             }
501         }
502     }
503 
504     SCLogDebug("%s ending", tv->name);
505     tv->stream_pq = NULL;
506     TmThreadsSetFlag(tv, THV_CLOSED);
507     pthread_exit((void *) 0);
508     return NULL;
509 
510 error:
511     tv->stream_pq = NULL;
512     pthread_exit((void *) -1);
513     return NULL;
514 }
515 
TmThreadsManagement(void * td)516 static void *TmThreadsManagement(void *td)
517 {
518     ThreadVars *tv = (ThreadVars *)td;
519     TmSlot *s = (TmSlot *)tv->tm_slots;
520     TmEcode r = TM_ECODE_OK;
521 
522     BUG_ON(s == NULL);
523 
524     /* Set the thread name */
525     if (SCSetThreadName(tv->name) < 0) {
526         SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
527     }
528 
529     if (tv->thread_setup_flags != 0)
530         TmThreadSetupOptions(tv);
531 
532     /* Drop the capabilities for this thread */
533     SCDropCaps(tv);
534 
535     SCLogDebug("%s starting", tv->name);
536 
537     if (s->SlotThreadInit != NULL) {
538         void *slot_data = NULL;
539         r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
540         if (r != TM_ECODE_OK) {
541             TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
542             pthread_exit((void *) -1);
543             return NULL;
544         }
545         (void)SC_ATOMIC_SET(s->slot_data, slot_data);
546     }
547 
548     StatsSetupPrivate(tv);
549 
550     TmThreadsSetFlag(tv, THV_INIT_DONE);
551 
552     r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
553     /* handle error */
554     if (r == TM_ECODE_FAILED) {
555         TmThreadsSetFlag(tv, THV_FAILED);
556     }
557 
558     if (TmThreadsCheckFlag(tv, THV_KILL)) {
559         StatsSyncCounters(tv);
560     }
561 
562     TmThreadsSetFlag(tv, THV_RUNNING_DONE);
563     TmThreadWaitForFlag(tv, THV_DEINIT);
564 
565     if (s->SlotThreadExitPrintStats != NULL) {
566         s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
567     }
568 
569     if (s->SlotThreadDeinit != NULL) {
570         r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
571         if (r != TM_ECODE_OK) {
572             TmThreadsSetFlag(tv, THV_CLOSED);
573             pthread_exit((void *) -1);
574             return NULL;
575         }
576     }
577 
578     TmThreadsSetFlag(tv, THV_CLOSED);
579     pthread_exit((void *) 0);
580     return NULL;
581 }
582 
583 /**
584  * \brief We set the slot functions.
585  *
586  * \param tv   Pointer to the TV to set the slot function for.
587  * \param name Name of the slot variant.
588  * \param fn_p Pointer to a custom slot function.  Used only if slot variant
589  *             "name" is "custom".
590  *
591  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
592  */
TmThreadSetSlots(ThreadVars * tv,const char * name,void * (* fn_p)(void *))593 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
594 {
595     if (name == NULL) {
596         if (fn_p == NULL) {
597             printf("Both slot name and function pointer can't be NULL inside "
598                    "TmThreadSetSlots\n");
599             goto error;
600         } else {
601             name = "custom";
602         }
603     }
604 
605     if (strcmp(name, "varslot") == 0) {
606         tv->tm_func = TmThreadsSlotVar;
607     } else if (strcmp(name, "pktacqloop") == 0) {
608         tv->tm_func = TmThreadsSlotPktAcqLoop;
609     } else if (strcmp(name, "management") == 0) {
610         tv->tm_func = TmThreadsManagement;
611     } else if (strcmp(name, "command") == 0) {
612         tv->tm_func = TmThreadsManagement;
613     } else if (strcmp(name, "custom") == 0) {
614         if (fn_p == NULL)
615             goto error;
616         tv->tm_func = fn_p;
617     } else {
618         printf("Error: Slot \"%s\" not supported\n", name);
619         goto error;
620     }
621 
622     return TM_ECODE_OK;
623 
624 error:
625     return TM_ECODE_FAILED;
626 }
627 
TmThreadsGetTVContainingSlot(TmSlot * tm_slot)628 ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
629 {
630     SCMutexLock(&tv_root_lock);
631     for (int i = 0; i < TVT_MAX; i++) {
632         ThreadVars *tv = tv_root[i];
633         while (tv) {
634             TmSlot *slots = tv->tm_slots;
635             while (slots != NULL) {
636                 if (slots == tm_slot) {
637                     SCMutexUnlock(&tv_root_lock);
638                     return tv;
639                 }
640                 slots = slots->slot_next;
641             }
642             tv = tv->next;
643         }
644     }
645     SCMutexUnlock(&tv_root_lock);
646     return NULL;
647 }
648 
649 /**
650  * \brief Appends a new entry to the slots.
651  *
652  * \param tv   TV the slot is attached to.
653  * \param tm   TM to append.
654  * \param data Data to be passed on to the slot init function.
655  *
656  * \retval The allocated TmSlot or NULL if there is an error
657  */
TmSlotSetFuncAppend(ThreadVars * tv,TmModule * tm,const void * data)658 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
659 {
660     TmSlot *slot = SCMalloc(sizeof(TmSlot));
661     if (unlikely(slot == NULL))
662         return;
663     memset(slot, 0, sizeof(TmSlot));
664     SC_ATOMIC_INITPTR(slot->slot_data);
665     slot->SlotThreadInit = tm->ThreadInit;
666     slot->slot_initdata = data;
667     if (tm->Func) {
668         slot->SlotFunc = tm->Func;
669     } else if (tm->PktAcqLoop) {
670         slot->PktAcqLoop = tm->PktAcqLoop;
671         if (tm->PktAcqBreakLoop) {
672             tv->break_loop = true;
673         }
674     } else if (tm->Management) {
675         slot->Management = tm->Management;
676     }
677     slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
678     slot->SlotThreadDeinit = tm->ThreadDeinit;
679     /* we don't have to check for the return value "-1".  We wouldn't have
680      * received a TM as arg, if it didn't exist */
681     slot->tm_id = TmModuleGetIDForTM(tm);
682 
683     tv->tmm_flags |= tm->flags;
684     tv->cap_flags |= tm->cap_flags;
685 
686     if (tv->tm_slots == NULL) {
687         tv->tm_slots = slot;
688     } else {
689         TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
690 
691         /* get the last slot */
692         for ( ; a != NULL; a = a->slot_next) {
693              b = a;
694         }
695         /* append the new slot */
696         if (b != NULL) {
697             b->slot_next = slot;
698         }
699     }
700     return;
701 }
702 
703 /**
704  * \brief Returns the slot holding a TM with the particular tm_id.
705  *
706  * \param tm_id TM id of the TM whose slot has to be returned.
707  *
708  * \retval slots Pointer to the slot.
709  */
TmSlotGetSlotForTM(int tm_id)710 TmSlot *TmSlotGetSlotForTM(int tm_id)
711 {
712     SCMutexLock(&tv_root_lock);
713     for (int i = 0; i < TVT_MAX; i++) {
714         ThreadVars *tv = tv_root[i];
715         while (tv) {
716             TmSlot *slots = tv->tm_slots;
717             while (slots != NULL) {
718                 if (slots->tm_id == tm_id) {
719                     SCMutexUnlock(&tv_root_lock);
720                     return slots;
721                 }
722                 slots = slots->slot_next;
723             }
724             tv = tv->next;
725         }
726     }
727     SCMutexUnlock(&tv_root_lock);
728     return NULL;
729 }
730 
731 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun && !defined __DragonFly__
SetCPUAffinitySet(cpu_set_t * cs)732 static int SetCPUAffinitySet(cpu_set_t *cs)
733 {
734 #if defined OS_FREEBSD
735     int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
736                                SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
737 #elif OS_DARWIN
738     int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
739                               (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
740 #else
741     pid_t tid = syscall(SYS_gettid);
742     int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
743 #endif /* OS_FREEBSD */
744 
745     if (r != 0) {
746         printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
747                strerror(errno));
748         return -1;
749     }
750 
751     return 0;
752 }
753 #endif
754 
755 
756 /**
757  * \brief Set the thread affinity on the calling thread.
758  *
759  * \param cpuid Id of the core/cpu to setup the affinity.
760  *
761  * \retval 0 If all goes well; -1 if something is wrong.
762  */
SetCPUAffinity(uint16_t cpuid)763 static int SetCPUAffinity(uint16_t cpuid)
764 {
765 #if defined __OpenBSD__ || defined sun || defined __DragonFly__
766     return 0;
767 #else
768     int cpu = (int)cpuid;
769 
770 #if defined OS_WIN32 || defined __CYGWIN__
771     DWORD cs = 1 << cpu;
772 
773     int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
774     if (r != 0) {
775         printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
776                strerror(errno));
777         return -1;
778     }
779     SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
780                SCGetThreadIdLong(), cpu);
781 
782     return 0;
783 
784 #else
785     cpu_set_t cs;
786 
787     CPU_ZERO(&cs);
788     CPU_SET(cpu, &cs);
789     return SetCPUAffinitySet(&cs);
790 #endif /* windows */
791 #endif /* not supported */
792 }
793 
794 
795 /**
796  * \brief Set the thread options (thread priority).
797  *
798  * \param tv Pointer to the ThreadVars to setup the thread priority.
799  *
800  * \retval TM_ECODE_OK.
801  */
TmThreadSetThreadPriority(ThreadVars * tv,int prio)802 TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
803 {
804     tv->thread_setup_flags |= THREAD_SET_PRIORITY;
805     tv->thread_priority = prio;
806 
807     return TM_ECODE_OK;
808 }
809 
810 /**
811  * \brief Adjusting nice value for threads.
812  */
TmThreadSetPrio(ThreadVars * tv)813 void TmThreadSetPrio(ThreadVars *tv)
814 {
815     SCEnter();
816 #ifndef __CYGWIN__
817 #ifdef OS_WIN32
818 	if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
819         SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
820                    "thread %s: %s", tv->name, strerror(errno));
821     } else {
822         SCLogDebug("Priority set to %"PRId32" for thread %s",
823                    tv->thread_priority, tv->name);
824     }
825 #else
826     int ret = nice(tv->thread_priority);
827     if (ret == -1) {
828         SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
829                    "for thread %s: %s", tv->thread_priority, tv->name,
830                    strerror(errno));
831     } else {
832         SCLogDebug("Nice value set to %"PRId32" for thread %s",
833                    tv->thread_priority, tv->name);
834     }
835 #endif /* OS_WIN32 */
836 #endif
837     SCReturn;
838 }
839 
840 
841 /**
842  * \brief Set the thread options (cpu affinity).
843  *
844  * \param tv pointer to the ThreadVars to setup the affinity.
845  * \param cpu cpu on which affinity is set.
846  *
847  * \retval TM_ECODE_OK
848  */
TmThreadSetCPUAffinity(ThreadVars * tv,uint16_t cpu)849 TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
850 {
851     tv->thread_setup_flags |= THREAD_SET_AFFINITY;
852     tv->cpu_affinity = cpu;
853 
854     return TM_ECODE_OK;
855 }
856 
857 
TmThreadSetCPU(ThreadVars * tv,uint8_t type)858 TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
859 {
860     if (!threading_set_cpu_affinity)
861         return TM_ECODE_OK;
862 
863     if (type > MAX_CPU_SET) {
864         SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
865         return TM_ECODE_FAILED;
866     }
867 
868     tv->thread_setup_flags |= THREAD_SET_AFFTYPE;
869     tv->cpu_affinity = type;
870 
871     return TM_ECODE_OK;
872 }
873 
TmThreadGetNbThreads(uint8_t type)874 int TmThreadGetNbThreads(uint8_t type)
875 {
876     if (type >= MAX_CPU_SET) {
877         SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
878         return 0;
879     }
880 
881     return thread_affinity[type].nb_threads;
882 }
883 
884 /**
885  * \brief Set the thread options (cpu affinitythread).
886  *        Priority should be already set by pthread_create.
887  *
888  * \param tv pointer to the ThreadVars of the calling thread.
889  */
TmThreadSetupOptions(ThreadVars * tv)890 TmEcode TmThreadSetupOptions(ThreadVars *tv)
891 {
892     if (tv->thread_setup_flags & THREAD_SET_AFFINITY) {
893         SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
894                   "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
895                   SCGetThreadIdLong());
896         SetCPUAffinity(tv->cpu_affinity);
897     }
898 
899 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun && !defined __DragonFly__
900     if (tv->thread_setup_flags & THREAD_SET_PRIORITY)
901         TmThreadSetPrio(tv);
902     if (tv->thread_setup_flags & THREAD_SET_AFFTYPE) {
903         ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity];
904         if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
905             int cpu = AffinityGetNextCPU(taf);
906             SetCPUAffinity(cpu);
907             /* If CPU is in a set overwrite the default thread prio */
908             if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
909                 tv->thread_priority = PRIO_LOW;
910             } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
911                 tv->thread_priority = PRIO_MEDIUM;
912             } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
913                 tv->thread_priority = PRIO_HIGH;
914             } else {
915                 tv->thread_priority = taf->prio;
916             }
917             SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
918                       "%d, thread id %lu", tv->thread_priority,
919                       tv->name, cpu, SCGetThreadIdLong());
920         } else {
921             SetCPUAffinitySet(&taf->cpu_set);
922             tv->thread_priority = taf->prio;
923             SCLogPerf("Setting prio %d for thread \"%s\", "
924                       "thread id %lu", tv->thread_priority,
925                       tv->name, SCGetThreadIdLong());
926         }
927         TmThreadSetPrio(tv);
928     }
929 #endif
930 
931     return TM_ECODE_OK;
932 }
933 
934 /**
935  * \brief Creates and returns the TV instance for a new thread.
936  *
937  * \param name       Name of this TV instance
938  * \param inq_name   Incoming queue name
939  * \param inqh_name  Incoming queue handler name as set by TmqhSetup()
940  * \param outq_name  Outgoing queue name
941  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
942  * \param slots      String representation for the slot function to be used
943  * \param fn_p       Pointer to function when \"slots\" is of type \"custom\"
944  * \param mucond     Flag to indicate whether to initialize the condition
945  *                   and the mutex variables for this newly created TV.
946  *
947  * \retval the newly created TV instance, or NULL on error
948  */
TmThreadCreate(const char * name,const char * inq_name,const char * inqh_name,const char * outq_name,const char * outqh_name,const char * slots,void * (* fn_p)(void *),int mucond)949 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
950                            const char *outq_name, const char *outqh_name, const char *slots,
951                            void * (*fn_p)(void *), int mucond)
952 {
953     ThreadVars *tv = NULL;
954     Tmq *tmq = NULL;
955     Tmqh *tmqh = NULL;
956 
957     SCLogDebug("creating thread \"%s\"...", name);
958 
959     /* XXX create separate function for this: allocate a thread container */
960     tv = SCMalloc(sizeof(ThreadVars));
961     if (unlikely(tv == NULL))
962         goto error;
963     memset(tv, 0, sizeof(ThreadVars));
964 
965     SC_ATOMIC_INIT(tv->flags);
966     SCMutexInit(&tv->perf_public_ctx.m, NULL);
967 
968     strlcpy(tv->name, name, sizeof(tv->name));
969 
970     /* default state for every newly created thread */
971     TmThreadsSetFlag(tv, THV_PAUSE);
972     TmThreadsSetFlag(tv, THV_USE);
973 
974     /* set the incoming queue */
975     if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
976         SCLogDebug("inq_name \"%s\"", inq_name);
977 
978         tmq = TmqGetQueueByName(inq_name);
979         if (tmq == NULL) {
980             tmq = TmqCreateQueue(inq_name);
981             if (tmq == NULL)
982                 goto error;
983         }
984         SCLogDebug("tmq %p", tmq);
985 
986         tv->inq = tmq;
987         tv->inq->reader_cnt++;
988         SCLogDebug("tv->inq %p", tv->inq);
989     }
990     if (inqh_name != NULL) {
991         SCLogDebug("inqh_name \"%s\"", inqh_name);
992 
993         int id = TmqhNameToID(inqh_name);
994         if (id <= 0) {
995             goto error;
996         }
997         tmqh = TmqhGetQueueHandlerByName(inqh_name);
998         if (tmqh == NULL)
999             goto error;
1000 
1001         tv->tmqh_in = tmqh->InHandler;
1002         tv->inq_id = (uint8_t)id;
1003         SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
1004     }
1005 
1006     /* set the outgoing queue */
1007     if (outqh_name != NULL) {
1008         SCLogDebug("outqh_name \"%s\"", outqh_name);
1009 
1010         int id = TmqhNameToID(outqh_name);
1011         if (id <= 0) {
1012             goto error;
1013         }
1014 
1015         tmqh = TmqhGetQueueHandlerByName(outqh_name);
1016         if (tmqh == NULL)
1017             goto error;
1018 
1019         tv->tmqh_out = tmqh->OutHandler;
1020         tv->outq_id = (uint8_t)id;
1021 
1022         if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1023             SCLogDebug("outq_name \"%s\"", outq_name);
1024 
1025             if (tmqh->OutHandlerCtxSetup != NULL) {
1026                 tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1027                 if (tv->outctx == NULL)
1028                     goto error;
1029                 tv->outq = NULL;
1030             } else {
1031                 tmq = TmqGetQueueByName(outq_name);
1032                 if (tmq == NULL) {
1033                     tmq = TmqCreateQueue(outq_name);
1034                     if (tmq == NULL)
1035                         goto error;
1036                 }
1037                 SCLogDebug("tmq %p", tmq);
1038 
1039                 tv->outq = tmq;
1040                 tv->outctx = NULL;
1041                 tv->outq->writer_cnt++;
1042             }
1043         }
1044     }
1045 
1046     if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1047         goto error;
1048     }
1049 
1050     if (mucond != 0)
1051         TmThreadInitMC(tv);
1052 
1053     return tv;
1054 
1055 error:
1056     SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1057 
1058     if (tv != NULL)
1059         SCFree(tv);
1060     return NULL;
1061 }
1062 
1063 /**
1064  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1065  *        This function doesn't support custom slots, and hence shouldn't be
1066  *        supplied \"custom\" as its slot type.  All PPT threads are created
1067  *        with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1068  *        conditional variables are not used to kill the thread.
1069  *
1070  * \param name       Name of this TV instance
1071  * \param inq_name   Incoming queue name
1072  * \param inqh_name  Incoming queue handler name as set by TmqhSetup()
1073  * \param outq_name  Outgoing queue name
1074  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1075  * \param slots      String representation for the slot function to be used
1076  *
1077  * \retval the newly created TV instance, or NULL on error
1078  */
TmThreadCreatePacketHandler(const char * name,const char * inq_name,const char * inqh_name,const char * outq_name,const char * outqh_name,const char * slots)1079 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1080                                         const char *inqh_name, const char *outq_name,
1081                                         const char *outqh_name, const char *slots)
1082 {
1083     ThreadVars *tv = NULL;
1084 
1085     tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1086                         slots, NULL, 0);
1087 
1088     if (tv != NULL) {
1089         tv->type = TVT_PPT;
1090         tv->id = TmThreadsRegisterThread(tv, tv->type);
1091     }
1092 
1093 
1094     return tv;
1095 }
1096 
1097 /**
1098  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1099  *        This function supports only custom slot functions and hence a
1100  *        function pointer should be sent as an argument.
1101  *
1102  * \param name       Name of this TV instance
1103  * \param fn_p       Pointer to function when \"slots\" is of type \"custom\"
1104  * \param mucond     Flag to indicate whether to initialize the condition
1105  *                   and the mutex variables for this newly created TV.
1106  *
1107  * \retval the newly created TV instance, or NULL on error
1108  */
TmThreadCreateMgmtThread(const char * name,void * (fn_p)(void *),int mucond)1109 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1110                                      int mucond)
1111 {
1112     ThreadVars *tv = NULL;
1113 
1114     tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1115 
1116     if (tv != NULL) {
1117         tv->type = TVT_MGMT;
1118         tv->id = TmThreadsRegisterThread(tv, tv->type);
1119         TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
1120     }
1121 
1122     return tv;
1123 }
1124 
1125 /**
1126  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1127  *        This function supports only custom slot functions and hence a
1128  *        function pointer should be sent as an argument.
1129  *
1130  * \param name       Name of this TV instance
1131  * \param module     Name of TmModule with MANAGEMENT flag set.
1132  * \param mucond     Flag to indicate whether to initialize the condition
1133  *                   and the mutex variables for this newly created TV.
1134  *
1135  * \retval the newly created TV instance, or NULL on error
1136  */
TmThreadCreateMgmtThreadByName(const char * name,const char * module,int mucond)1137 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1138                                      int mucond)
1139 {
1140     ThreadVars *tv = NULL;
1141 
1142     tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1143 
1144     if (tv != NULL) {
1145         tv->type = TVT_MGMT;
1146         tv->id = TmThreadsRegisterThread(tv, tv->type);
1147         TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
1148 
1149         TmModule *m = TmModuleGetByName(module);
1150         if (m) {
1151             TmSlotSetFuncAppend(tv, m, NULL);
1152         }
1153     }
1154 
1155     return tv;
1156 }
1157 
1158 /**
1159  * \brief Creates and returns the TV instance for a Command thread (CMD).
1160  *        This function supports only custom slot functions and hence a
1161  *        function pointer should be sent as an argument.
1162  *
1163  * \param name       Name of this TV instance
1164  * \param module     Name of TmModule with COMMAND flag set.
1165  * \param mucond     Flag to indicate whether to initialize the condition
1166  *                   and the mutex variables for this newly created TV.
1167  *
1168  * \retval the newly created TV instance, or NULL on error
1169  */
TmThreadCreateCmdThreadByName(const char * name,const char * module,int mucond)1170 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1171                                      int mucond)
1172 {
1173     ThreadVars *tv = NULL;
1174 
1175     tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1176 
1177     if (tv != NULL) {
1178         tv->type = TVT_CMD;
1179         tv->id = TmThreadsRegisterThread(tv, tv->type);
1180         TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
1181 
1182         TmModule *m = TmModuleGetByName(module);
1183         if (m) {
1184             TmSlotSetFuncAppend(tv, m, NULL);
1185         }
1186     }
1187 
1188     return tv;
1189 }
1190 
1191 /**
1192  * \brief Appends this TV to tv_root based on its type
1193  *
1194  * \param type holds the type this TV belongs to.
1195  */
TmThreadAppend(ThreadVars * tv,int type)1196 void TmThreadAppend(ThreadVars *tv, int type)
1197 {
1198     SCMutexLock(&tv_root_lock);
1199 
1200     if (tv_root[type] == NULL) {
1201         tv_root[type] = tv;
1202         tv->next = NULL;
1203 
1204         SCMutexUnlock(&tv_root_lock);
1205 
1206         return;
1207     }
1208 
1209     ThreadVars *t = tv_root[type];
1210 
1211     while (t) {
1212         if (t->next == NULL) {
1213             t->next = tv;
1214             tv->next = NULL;
1215             break;
1216         }
1217 
1218         t = t->next;
1219     }
1220 
1221     SCMutexUnlock(&tv_root_lock);
1222 
1223     return;
1224 }
1225 
ThreadStillHasPackets(ThreadVars * tv)1226 static bool ThreadStillHasPackets(ThreadVars *tv)
1227 {
1228     if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1229         /* we wait till we dry out all the inq packets, before we
1230          * kill this thread.  Do note that you should have disabled
1231          * packet acquire by now using TmThreadDisableReceiveThreads()*/
1232         PacketQueue *q = tv->inq->pq;
1233         SCMutexLock(&q->mutex_q);
1234         uint32_t len = q->len;
1235         SCMutexUnlock(&q->mutex_q);
1236         if (len != 0) {
1237             return true;
1238         }
1239     }
1240 
1241     if (tv->stream_pq != NULL) {
1242         SCMutexLock(&tv->stream_pq->mutex_q);
1243         uint32_t len = tv->stream_pq->len;
1244         SCMutexUnlock(&tv->stream_pq->mutex_q);
1245 
1246         if (len != 0) {
1247             return true;
1248         }
1249     }
1250     return false;
1251 }
1252 
1253 /**
1254  * \brief Kill a thread.
1255  *
1256  * \param tv A ThreadVars instance corresponding to the thread that has to be
1257  *           killed.
1258  *
1259  * \retval r 1 killed succesfully
1260  *           0 not yet ready, needs another look
1261  */
TmThreadKillThread(ThreadVars * tv)1262 static int TmThreadKillThread(ThreadVars *tv)
1263 {
1264     BUG_ON(tv == NULL);
1265 
1266     /* kill only once :) */
1267     if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1268         return 1;
1269     }
1270 
1271     /* set the thread flag informing the thread that it needs to be
1272      * terminated */
1273     TmThreadsSetFlag(tv, THV_KILL);
1274     TmThreadsSetFlag(tv, THV_DEINIT);
1275 
1276     /* to be sure, signal more */
1277     if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1278         if (tv->inq_id != TMQH_NOT_SET) {
1279             Tmqh *qh = TmqhGetQueueHandlerByID(tv->inq_id);
1280             if (qh != NULL && qh->InShutdownHandler != NULL) {
1281                 qh->InShutdownHandler(tv);
1282             }
1283         }
1284         if (tv->inq != NULL) {
1285             for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1286                 SCCondSignal(&tv->inq->pq->cond_q);
1287             }
1288             SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1289         }
1290 
1291         if (tv->ctrl_cond != NULL ) {
1292             pthread_cond_broadcast(tv->ctrl_cond);
1293         }
1294         return 0;
1295     }
1296 
1297     if (tv->outctx != NULL) {
1298         if (tv->outq_id != TMQH_NOT_SET) {
1299             Tmqh *qh = TmqhGetQueueHandlerByID(tv->outq_id);
1300             if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1301                 qh->OutHandlerCtxFree(tv->outctx);
1302                 tv->outctx = NULL;
1303             }
1304         }
1305     }
1306 
1307     /* join it and flag it as dead */
1308     pthread_join(tv->t, NULL);
1309     SCLogDebug("thread %s stopped", tv->name);
1310     TmThreadsSetFlag(tv, THV_DEAD);
1311     return 1;
1312 }
1313 
1314 /** \internal
1315  *
1316  *  \brief make sure that all packet threads are done processing their
1317  *         in-flight packets, including 'injected' flow packets.
1318  */
TmThreadDrainPacketThreads(void)1319 static void TmThreadDrainPacketThreads(void)
1320 {
1321     ThreadVars *tv = NULL;
1322     struct timeval start_ts;
1323     struct timeval cur_ts;
1324     gettimeofday(&start_ts, NULL);
1325 
1326 again:
1327     gettimeofday(&cur_ts, NULL);
1328     if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1329         SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1330                 "to process their packets in time");
1331         return;
1332     }
1333 
1334     SCMutexLock(&tv_root_lock);
1335 
1336     /* all receive threads are part of packet processing threads */
1337     tv = tv_root[TVT_PPT];
1338     while (tv) {
1339         if (ThreadStillHasPackets(tv)) {
1340             /* we wait till we dry out all the inq packets, before we
1341              * kill this thread.  Do note that you should have disabled
1342              * packet acquire by now using TmThreadDisableReceiveThreads()*/
1343             SCMutexUnlock(&tv_root_lock);
1344 
1345             /* sleep outside lock */
1346             SleepMsec(1);
1347             goto again;
1348         }
1349         if (tv->flow_queue) {
1350             FQLOCK_LOCK(tv->flow_queue);
1351             bool fq_done = (tv->flow_queue->qlen == 0);
1352             FQLOCK_UNLOCK(tv->flow_queue);
1353             if (!fq_done) {
1354                 SCMutexUnlock(&tv_root_lock);
1355 
1356                 Packet *p = PacketGetFromAlloc();
1357                 if (p != NULL) {
1358                     p->flags |= PKT_PSEUDO_STREAM_END;
1359                     PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
1360                     PacketQueue *q = tv->stream_pq;
1361                     SCMutexLock(&q->mutex_q);
1362                     PacketEnqueue(q, p);
1363                     SCCondSignal(&q->cond_q);
1364                     SCMutexUnlock(&q->mutex_q);
1365                 }
1366 
1367                 /* don't sleep while holding a lock */
1368                 SleepMsec(1);
1369                 goto again;
1370             }
1371         }
1372         tv = tv->next;
1373     }
1374 
1375     SCMutexUnlock(&tv_root_lock);
1376     return;
1377 }
1378 
1379 /**
1380  *  \brief Disable all threads having the specified TMs.
1381  *
1382  *  Breaks out of the packet acquisition loop, and bumps
1383  *  into the 'flow loop', where it will process packets
1384  *  from the flow engine's shutdown handling.
1385  */
TmThreadDisableReceiveThreads(void)1386 void TmThreadDisableReceiveThreads(void)
1387 {
1388     ThreadVars *tv = NULL;
1389     struct timeval start_ts;
1390     struct timeval cur_ts;
1391     gettimeofday(&start_ts, NULL);
1392 
1393 again:
1394     gettimeofday(&cur_ts, NULL);
1395     if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1396         FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1397                 "thread - \"%s\". Killing engine", tv->name);
1398     }
1399 
1400     SCMutexLock(&tv_root_lock);
1401 
1402     /* all receive threads are part of packet processing threads */
1403     tv = tv_root[TVT_PPT];
1404 
1405     /* we do have to keep in mind that TVs are arranged in the order
1406      * right from receive to log.  The moment we fail to find a
1407      * receive TM amongst the slots in a tv, it indicates we are done
1408      * with all receive threads */
1409     while (tv) {
1410         int disable = 0;
1411         TmModule *tm = NULL;
1412         /* obtain the slots for this TV */
1413         TmSlot *slots = tv->tm_slots;
1414         while (slots != NULL) {
1415             tm = TmModuleGetById(slots->tm_id);
1416 
1417             if (tm->flags & TM_FLAG_RECEIVE_TM) {
1418                 disable = 1;
1419                 break;
1420             }
1421 
1422             slots = slots->slot_next;
1423             continue;
1424         }
1425 
1426         if (disable) {
1427             if (ThreadStillHasPackets(tv)) {
1428                 /* we wait till we dry out all the inq packets, before we
1429                  * kill this thread.  Do note that you should have disabled
1430                  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1431                 SCMutexUnlock(&tv_root_lock);
1432                 /* don't sleep while holding a lock */
1433                 SleepMsec(1);
1434                 goto again;
1435             }
1436 
1437             if (tv->flow_queue) {
1438                 FQLOCK_LOCK(tv->flow_queue);
1439                 bool fq_done = (tv->flow_queue->qlen == 0);
1440                 FQLOCK_UNLOCK(tv->flow_queue);
1441                 if (!fq_done) {
1442                     SCMutexUnlock(&tv_root_lock);
1443 
1444                     Packet *p = PacketGetFromAlloc();
1445                     if (p != NULL) {
1446                         p->flags |= PKT_PSEUDO_STREAM_END;
1447                         PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
1448                         PacketQueue *q = tv->stream_pq;
1449                         SCMutexLock(&q->mutex_q);
1450                         PacketEnqueue(q, p);
1451                         SCCondSignal(&q->cond_q);
1452                         SCMutexUnlock(&q->mutex_q);
1453                     }
1454 
1455                     /* don't sleep while holding a lock */
1456                     SleepMsec(1);
1457                     goto again;
1458                 }
1459             }
1460 
1461             /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1462             if (tm && tm->PktAcqBreakLoop != NULL) {
1463                 tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1464             }
1465             TmThreadsSetFlag(tv, THV_KILL_PKTACQ);
1466 
1467             if (tv->inq != NULL) {
1468                 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1469                     SCCondSignal(&tv->inq->pq->cond_q);
1470                 }
1471                 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1472             }
1473 
1474             /* wait for it to enter the 'flow loop' stage */
1475             while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1476                 SCMutexUnlock(&tv_root_lock);
1477 
1478                 SleepMsec(1);
1479                 goto again;
1480             }
1481         }
1482 
1483         tv = tv->next;
1484     }
1485 
1486     SCMutexUnlock(&tv_root_lock);
1487 
1488     /* finally wait for all packet threads to have
1489      * processed all of their 'live' packets so we
1490      * don't process the last live packets together
1491      * with FFR packets */
1492     TmThreadDrainPacketThreads();
1493     return;
1494 }
1495 
TmThreadDebugValidateNoMorePackets(void)1496 static void TmThreadDebugValidateNoMorePackets(void)
1497 {
1498 #ifdef DEBUG_VALIDATION
1499     SCMutexLock(&tv_root_lock);
1500     for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501         if (ThreadStillHasPackets(tv)) {
1502             SCMutexUnlock(&tv_root_lock);
1503             TmThreadDumpThreads();
1504             abort();
1505         }
1506     }
1507     SCMutexUnlock(&tv_root_lock);
1508 #endif
1509 }
1510 
1511 /**
1512  * \brief Disable all packet threads
1513  */
TmThreadDisablePacketThreads(void)1514 void TmThreadDisablePacketThreads(void)
1515 {
1516     struct timeval start_ts;
1517     struct timeval cur_ts;
1518 
1519     /* first drain all packet threads of their packets */
1520     TmThreadDrainPacketThreads();
1521 
1522     /* since all the threads possibly able to produce more packets
1523      * are now gone or inactive, we should see no packets anywhere
1524      * anymore. */
1525     TmThreadDebugValidateNoMorePackets();
1526 
1527     gettimeofday(&start_ts, NULL);
1528 again:
1529     gettimeofday(&cur_ts, NULL);
1530     if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1531         FatalError(SC_ERR_FATAL, "Engine unable to disable packet  "
1532                 "threads. Killing engine");
1533     }
1534 
1535     /* loop through the packet threads and kill them */
1536     SCMutexLock(&tv_root_lock);
1537     for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1538         TmThreadsSetFlag(tv, THV_KILL);
1539 
1540         /* separate worker threads (autofp) will still wait at their
1541          * input queues. So nudge them here so they will observe the
1542          * THV_KILL flag. */
1543         if (tv->inq != NULL) {
1544             for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1545                 SCCondSignal(&tv->inq->pq->cond_q);
1546             }
1547             SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1548         }
1549 
1550         while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1551             SCMutexUnlock(&tv_root_lock);
1552 
1553             SleepMsec(1);
1554             goto again;
1555         }
1556     }
1557     SCMutexUnlock(&tv_root_lock);
1558     return;
1559 }
1560 
TmThreadGetFirstTmSlotForPartialPattern(const char * tm_name)1561 TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
1562 {
1563     ThreadVars *tv = NULL;
1564     TmSlot *slots = NULL;
1565 
1566     SCMutexLock(&tv_root_lock);
1567 
1568     /* all receive threads are part of packet processing threads */
1569     tv = tv_root[TVT_PPT];
1570 
1571     while (tv) {
1572         slots = tv->tm_slots;
1573 
1574         while (slots != NULL) {
1575             TmModule *tm = TmModuleGetById(slots->tm_id);
1576 
1577             char *found = strstr(tm->name, tm_name);
1578             if (found != NULL)
1579                 goto end;
1580 
1581             slots = slots->slot_next;
1582         }
1583 
1584         tv = tv->next;
1585     }
1586 
1587  end:
1588     SCMutexUnlock(&tv_root_lock);
1589     return slots;
1590 }
1591 
1592 #define MIN_WAIT_TIME 100
1593 #define MAX_WAIT_TIME 999999
TmThreadKillThreadsFamily(int family)1594 void TmThreadKillThreadsFamily(int family)
1595 {
1596     ThreadVars *tv = NULL;
1597     unsigned int sleep_usec = MIN_WAIT_TIME;
1598 
1599     BUG_ON((family < 0) || (family >= TVT_MAX));
1600 
1601 again:
1602     SCMutexLock(&tv_root_lock);
1603     tv = tv_root[family];
1604 
1605     while (tv) {
1606         int r = TmThreadKillThread(tv);
1607         if (r == 0) {
1608             SCMutexUnlock(&tv_root_lock);
1609             SleepUsec(sleep_usec);
1610             sleep_usec *= 2; /* slowly back off */
1611             sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1612             goto again;
1613         }
1614         sleep_usec = MIN_WAIT_TIME; /* reset */
1615 
1616         tv = tv->next;
1617     }
1618     SCMutexUnlock(&tv_root_lock);
1619 }
1620 #undef MIN_WAIT_TIME
1621 #undef MAX_WAIT_TIME
1622 
TmThreadKillThreads(void)1623 void TmThreadKillThreads(void)
1624 {
1625     int i = 0;
1626 
1627     for (i = 0; i < TVT_MAX; i++) {
1628         TmThreadKillThreadsFamily(i);
1629     }
1630 
1631     return;
1632 }
1633 
TmThreadFree(ThreadVars * tv)1634 static void TmThreadFree(ThreadVars *tv)
1635 {
1636     TmSlot *s;
1637     TmSlot *ps;
1638     if (tv == NULL)
1639         return;
1640 
1641     SCLogDebug("Freeing thread '%s'.", tv->name);
1642 
1643     if (tv->flow_queue) {
1644         BUG_ON(tv->flow_queue->qlen != 0);
1645         SCFree(tv->flow_queue);
1646     }
1647 
1648     StatsThreadCleanup(tv);
1649 
1650     TmThreadDeinitMC(tv);
1651 
1652     if (tv->thread_group_name) {
1653         SCFree(tv->thread_group_name);
1654     }
1655 
1656     if (tv->printable_name) {
1657         SCFree(tv->printable_name);
1658     }
1659 
1660     if (tv->stream_pq_local) {
1661         BUG_ON(tv->stream_pq_local->len);
1662         SCMutexDestroy(&tv->stream_pq_local->mutex_q);
1663         SCFree(tv->stream_pq_local);
1664     }
1665 
1666     s = (TmSlot *)tv->tm_slots;
1667     while (s) {
1668         ps = s;
1669         s = s->slot_next;
1670         SCFree(ps);
1671     }
1672 
1673     TmThreadsUnregisterThread(tv->id);
1674     SCFree(tv);
1675 }
1676 
TmThreadSetGroupName(ThreadVars * tv,const char * name)1677 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1678 {
1679     char *thread_group_name = NULL;
1680 
1681     if (name == NULL)
1682         return;
1683 
1684     if (tv == NULL)
1685         return;
1686 
1687     thread_group_name = SCStrdup(name);
1688     if (unlikely(thread_group_name == NULL)) {
1689         SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1690         return;
1691     }
1692     tv->thread_group_name = thread_group_name;
1693 }
1694 
TmThreadClearThreadsFamily(int family)1695 void TmThreadClearThreadsFamily(int family)
1696 {
1697     ThreadVars *tv = NULL;
1698     ThreadVars *ptv = NULL;
1699 
1700     if ((family < 0) || (family >= TVT_MAX))
1701         return;
1702 
1703     SCMutexLock(&tv_root_lock);
1704     tv = tv_root[family];
1705 
1706     while (tv) {
1707         ptv = tv;
1708         tv = tv->next;
1709         TmThreadFree(ptv);
1710     }
1711     tv_root[family] = NULL;
1712     SCMutexUnlock(&tv_root_lock);
1713 }
1714 
1715 /**
1716  * \brief Spawns a thread associated with the ThreadVars instance tv
1717  *
1718  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1719  */
TmThreadSpawn(ThreadVars * tv)1720 TmEcode TmThreadSpawn(ThreadVars *tv)
1721 {
1722     pthread_attr_t attr;
1723     if (tv->tm_func == NULL) {
1724         printf("ERROR: no thread function set\n");
1725         return TM_ECODE_FAILED;
1726     }
1727 
1728     /* Initialize and set thread detached attribute */
1729     pthread_attr_init(&attr);
1730 
1731     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1732 
1733     int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1734     if (rc) {
1735         printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1736         return TM_ECODE_FAILED;
1737     }
1738 
1739     TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
1740 
1741     TmThreadAppend(tv, tv->type);
1742     return TM_ECODE_OK;
1743 }
1744 
1745 /**
1746  * \brief Initializes the mutex and condition variables for this TV
1747  *
1748  * It can be used by a thread to control a wait loop that can also be
1749  * influenced by other threads.
1750  *
1751  * \param tv Pointer to a TV instance
1752  */
TmThreadInitMC(ThreadVars * tv)1753 void TmThreadInitMC(ThreadVars *tv)
1754 {
1755     if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1756         FatalError(SC_ERR_FATAL,
1757                    "Fatal error encountered in TmThreadInitMC.  "
1758                    "Exiting...");
1759     }
1760 
1761     if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1762         printf("Error initializing the tv->m mutex\n");
1763         exit(EXIT_FAILURE);
1764     }
1765 
1766     if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1767         FatalError(SC_ERR_FATAL,
1768                    "Fatal error encountered in TmThreadInitMC.  "
1769                    "Exiting...");
1770     }
1771 
1772     if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1773         FatalError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1774                    "variable");
1775     }
1776 
1777     return;
1778 }
1779 
TmThreadDeinitMC(ThreadVars * tv)1780 static void TmThreadDeinitMC(ThreadVars *tv)
1781 {
1782     if (tv->ctrl_mutex) {
1783         SCCtrlMutexDestroy(tv->ctrl_mutex);
1784         SCFree(tv->ctrl_mutex);
1785     }
1786     if (tv->ctrl_cond) {
1787         SCCtrlCondDestroy(tv->ctrl_cond);
1788         SCFree(tv->ctrl_cond);
1789     }
1790     return;
1791 }
1792 
1793 /**
1794  * \brief Tests if the thread represented in the arg has been unpaused or not.
1795  *
1796  *        The function would return if the thread tv has been unpaused or if the
1797  *        kill flag for the thread has been set.
1798  *
1799  * \param tv Pointer to the TV instance.
1800  */
TmThreadTestThreadUnPaused(ThreadVars * tv)1801 void TmThreadTestThreadUnPaused(ThreadVars *tv)
1802 {
1803     while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1804         SleepUsec(100);
1805 
1806         if (TmThreadsCheckFlag(tv, THV_KILL))
1807             break;
1808     }
1809 
1810     return;
1811 }
1812 
1813 /**
1814  * \brief Waits till the specified flag(s) is(are) set.  We don't bother if
1815  *        the kill flag has been set or not on the thread.
1816  *
1817  * \param tv Pointer to the TV instance.
1818  */
TmThreadWaitForFlag(ThreadVars * tv,uint32_t flags)1819 void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
1820 {
1821     while (!TmThreadsCheckFlag(tv, flags)) {
1822         SleepUsec(100);
1823     }
1824 
1825     return;
1826 }
1827 
1828 /**
1829  * \brief Unpauses a thread
1830  *
1831  * \param tv Pointer to a TV instance that has to be unpaused
1832  */
TmThreadContinue(ThreadVars * tv)1833 void TmThreadContinue(ThreadVars *tv)
1834 {
1835     TmThreadsUnsetFlag(tv, THV_PAUSE);
1836 
1837     return;
1838 }
1839 
1840 /**
1841  * \brief Unpauses all threads present in tv_root
1842  */
TmThreadContinueThreads()1843 void TmThreadContinueThreads()
1844 {
1845     SCMutexLock(&tv_root_lock);
1846     for (int i = 0; i < TVT_MAX; i++) {
1847         ThreadVars *tv = tv_root[i];
1848         while (tv != NULL) {
1849             TmThreadContinue(tv);
1850             tv = tv->next;
1851         }
1852     }
1853     SCMutexUnlock(&tv_root_lock);
1854     return;
1855 }
1856 
1857 /**
1858  * \brief Pauses a thread
1859  *
1860  * \param tv Pointer to a TV instance that has to be paused
1861  */
TmThreadPause(ThreadVars * tv)1862 void TmThreadPause(ThreadVars *tv)
1863 {
1864     TmThreadsSetFlag(tv, THV_PAUSE);
1865     return;
1866 }
1867 
1868 /**
1869  * \brief Pauses all threads present in tv_root
1870  */
TmThreadPauseThreads()1871 void TmThreadPauseThreads()
1872 {
1873     TmThreadsListThreads();
1874 
1875     SCMutexLock(&tv_root_lock);
1876     for (int i = 0; i < TVT_MAX; i++) {
1877         ThreadVars *tv = tv_root[i];
1878         while (tv != NULL) {
1879             TmThreadPause(tv);
1880             tv = tv->next;
1881         }
1882     }
1883     SCMutexUnlock(&tv_root_lock);
1884 }
1885 
1886 /**
1887  * \brief Used to check the thread for certain conditions of failure.
1888  */
TmThreadCheckThreadState(void)1889 void TmThreadCheckThreadState(void)
1890 {
1891     SCMutexLock(&tv_root_lock);
1892     for (int i = 0; i < TVT_MAX; i++) {
1893         ThreadVars *tv = tv_root[i];
1894         while (tv) {
1895             if (TmThreadsCheckFlag(tv, THV_FAILED)) {
1896                 FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
1897             }
1898             tv = tv->next;
1899         }
1900     }
1901     SCMutexUnlock(&tv_root_lock);
1902     return;
1903 }
1904 
1905 /**
1906  *  \brief Used to check if all threads have finished their initialization.  On
1907  *         finding an un-initialized thread, it waits till that thread completes
1908  *         its initialization, before proceeding to the next thread.
1909  *
1910  *  \retval TM_ECODE_OK all initialized properly
1911  *  \retval TM_ECODE_FAILED failure
1912  */
TmThreadWaitOnThreadInit(void)1913 TmEcode TmThreadWaitOnThreadInit(void)
1914 {
1915     uint16_t mgt_num = 0;
1916     uint16_t ppt_num = 0;
1917 
1918     struct timeval start_ts;
1919     struct timeval cur_ts;
1920     gettimeofday(&start_ts, NULL);
1921 
1922 again:
1923     SCMutexLock(&tv_root_lock);
1924     for (int i = 0; i < TVT_MAX; i++) {
1925         ThreadVars *tv = tv_root[i];
1926         while (tv != NULL) {
1927             if (TmThreadsCheckFlag(tv, (THV_CLOSED|THV_DEAD))) {
1928                 SCMutexUnlock(&tv_root_lock);
1929 
1930                 SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1931                         "initialize: flags %04x", tv->name,
1932                         SC_ATOMIC_GET(tv->flags));
1933                 return TM_ECODE_FAILED;
1934             }
1935 
1936             if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1937                 SCMutexUnlock(&tv_root_lock);
1938 
1939                 gettimeofday(&cur_ts, NULL);
1940                 if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1941                     SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1942                             "initialize in time: flags %04x", tv->name,
1943                             SC_ATOMIC_GET(tv->flags));
1944                     return TM_ECODE_FAILED;
1945                 }
1946 
1947                 /* sleep a little to give the thread some
1948                  * time to finish initialization */
1949                 SleepUsec(100);
1950                 goto again;
1951             }
1952 
1953             if (TmThreadsCheckFlag(tv, THV_FAILED)) {
1954                 SCMutexUnlock(&tv_root_lock);
1955                 SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1956                         "initialize.", tv->name);
1957                 return TM_ECODE_FAILED;
1958             }
1959             if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
1960                 SCMutexUnlock(&tv_root_lock);
1961                 SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
1962                         "initialization.", tv->name);
1963                 return TM_ECODE_FAILED;
1964             }
1965 
1966             if (i == TVT_MGMT)
1967                 mgt_num++;
1968             else if (i == TVT_PPT)
1969                 ppt_num++;
1970 
1971             tv = tv->next;
1972         }
1973     }
1974     SCMutexUnlock(&tv_root_lock);
1975 
1976     SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management "
1977               "threads initialized, engine started.", ppt_num, mgt_num);
1978 
1979     return TM_ECODE_OK;
1980 }
1981 
1982 /**
1983  * \brief Returns the TV for the calling thread.
1984  *
1985  * \retval tv Pointer to the ThreadVars instance for the calling thread;
1986  *            NULL on no match
1987  */
TmThreadsGetCallingThread(void)1988 ThreadVars *TmThreadsGetCallingThread(void)
1989 {
1990     pthread_t self = pthread_self();
1991 
1992     SCMutexLock(&tv_root_lock);
1993     for (int i = 0; i < TVT_MAX; i++) {
1994         ThreadVars *tv = tv_root[i];
1995         while (tv) {
1996             if (pthread_equal(self, tv->t)) {
1997                 SCMutexUnlock(&tv_root_lock);
1998                 return tv;
1999             }
2000             tv = tv->next;
2001         }
2002     }
2003     SCMutexUnlock(&tv_root_lock);
2004     return NULL;
2005 }
2006 
2007 /**
2008  * \brief returns a count of all the threads that match the flag
2009  */
TmThreadCountThreadsByTmmFlags(uint8_t flags)2010 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
2011 {
2012     uint32_t cnt = 0;
2013     SCMutexLock(&tv_root_lock);
2014     for (int i = 0; i < TVT_MAX; i++) {
2015         ThreadVars *tv = tv_root[i];
2016         while (tv != NULL) {
2017             if ((tv->tmm_flags & flags) == flags)
2018                 cnt++;
2019 
2020             tv = tv->next;
2021         }
2022     }
2023     SCMutexUnlock(&tv_root_lock);
2024     return cnt;
2025 }
2026 
TmThreadDoDumpSlots(const ThreadVars * tv)2027 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2028 {
2029     for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2030         TmModule *m = TmModuleGetById(s->tm_id);
2031         SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2032             tv, s, s->tm_id, m->name);
2033     }
2034 }
2035 
TmThreadDumpThreads(void)2036 void TmThreadDumpThreads(void)
2037 {
2038     SCMutexLock(&tv_root_lock);
2039     for (int i = 0; i < TVT_MAX; i++) {
2040         ThreadVars *tv = tv_root[i];
2041         while (tv != NULL) {
2042             const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2043             SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2044                     tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2045             if (tv->inq && tv->stream_pq == tv->inq->pq) {
2046                 SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2047             } else if (tv->stream_pq_local != NULL) {
2048                 for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2049                     SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2050                             tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2051                 }
2052             }
2053             for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2054                 SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2055                         tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2056             }
2057             TmThreadDoDumpSlots(tv);
2058             tv = tv->next;
2059         }
2060     }
2061     SCMutexUnlock(&tv_root_lock);
2062     TmThreadsListThreads();
2063 }
2064 
2065 typedef struct Thread_ {
2066     ThreadVars *tv;     /**< threadvars structure */
2067     const char *name;
2068     int type;
2069     int in_use;         /**< bool to indicate this is in use */
2070 
2071     struct timeval pktts;   /**< current packet time of this thread
2072                              *   (offline mode) */
2073     uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2074                              *   time when the pktts was last updated. */
2075 } Thread;
2076 
2077 typedef struct Threads_ {
2078     Thread *threads;
2079     size_t threads_size;
2080     int threads_cnt;
2081 } Threads;
2082 
2083 static Threads thread_store = { NULL, 0, 0 };
2084 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2085 
TmThreadsListThreads(void)2086 void TmThreadsListThreads(void)
2087 {
2088     SCMutexLock(&thread_store_lock);
2089     for (size_t s = 0; s < thread_store.threads_size; s++) {
2090         Thread *t = &thread_store.threads[s];
2091         if (t == NULL || t->in_use == 0)
2092             continue;
2093 
2094         SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2095                 (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2096         if (t->tv) {
2097             ThreadVars *tv = t->tv;
2098             const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2099             SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2100                     tv, tv->type, tv->name, tv->tmm_flags, flags);
2101         }
2102     }
2103     SCMutexUnlock(&thread_store_lock);
2104 }
2105 
2106 #define STEP 32
2107 /**
2108  *  \retval id thread id, or 0 if not found
2109  */
TmThreadsRegisterThread(ThreadVars * tv,const int type)2110 int TmThreadsRegisterThread(ThreadVars *tv, const int type)
2111 {
2112     SCMutexLock(&thread_store_lock);
2113     if (thread_store.threads == NULL) {
2114         thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2115         BUG_ON(thread_store.threads == NULL);
2116         thread_store.threads_size = STEP;
2117     }
2118 
2119     size_t s;
2120     for (s = 0; s < thread_store.threads_size; s++) {
2121         if (thread_store.threads[s].in_use == 0) {
2122             Thread *t = &thread_store.threads[s];
2123             t->name = tv->name;
2124             t->type = type;
2125             t->tv = tv;
2126             t->in_use = 1;
2127 
2128             SCMutexUnlock(&thread_store_lock);
2129             return (int)(s+1);
2130         }
2131     }
2132 
2133     /* if we get here the array is completely filled */
2134     void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2135     BUG_ON(newmem == NULL);
2136     thread_store.threads = newmem;
2137     memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2138 
2139     Thread *t = &thread_store.threads[thread_store.threads_size];
2140     t->name = tv->name;
2141     t->type = type;
2142     t->tv = tv;
2143     t->in_use = 1;
2144 
2145     s = thread_store.threads_size;
2146     thread_store.threads_size += STEP;
2147 
2148     SCMutexUnlock(&thread_store_lock);
2149     return (int)(s+1);
2150 }
2151 #undef STEP
2152 
TmThreadsUnregisterThread(const int id)2153 void TmThreadsUnregisterThread(const int id)
2154 {
2155     SCMutexLock(&thread_store_lock);
2156     if (id <= 0 || id > (int)thread_store.threads_size) {
2157         SCMutexUnlock(&thread_store_lock);
2158         return;
2159     }
2160 
2161     /* id is one higher than index */
2162     int idx = id - 1;
2163 
2164     /* reset thread_id, which serves as clearing the record */
2165     thread_store.threads[idx].in_use = 0;
2166 
2167     /* check if we have at least one registered thread left */
2168     size_t s;
2169     for (s = 0; s < thread_store.threads_size; s++) {
2170         Thread *t = &thread_store.threads[s];
2171         if (t->in_use == 1) {
2172             goto end;
2173         }
2174     }
2175 
2176     /* if we get here no threads are registered */
2177     SCFree(thread_store.threads);
2178     thread_store.threads = NULL;
2179     thread_store.threads_size = 0;
2180     thread_store.threads_cnt = 0;
2181 
2182 end:
2183     SCMutexUnlock(&thread_store_lock);
2184 }
2185 
TmThreadsSetThreadTimestamp(const int id,const struct timeval * ts)2186 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2187 {
2188     SCMutexLock(&thread_store_lock);
2189     if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2190         SCMutexUnlock(&thread_store_lock);
2191         return;
2192     }
2193 
2194     int idx = id - 1;
2195     Thread *t = &thread_store.threads[idx];
2196     t->pktts = *ts;
2197     struct timeval systs;
2198     gettimeofday(&systs, NULL);
2199     t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2200     SCMutexUnlock(&thread_store_lock);
2201 }
2202 
TmThreadsTimeSubsysIsReady(void)2203 bool TmThreadsTimeSubsysIsReady(void)
2204 {
2205     bool ready = true;
2206     SCMutexLock(&thread_store_lock);
2207     for (size_t s = 0; s < thread_store.threads_size; s++) {
2208         Thread *t = &thread_store.threads[s];
2209         if (!t->in_use)
2210             break;
2211         if (t->sys_sec_stamp == 0) {
2212             ready = false;
2213             break;
2214         }
2215     }
2216     SCMutexUnlock(&thread_store_lock);
2217     return ready;
2218 }
2219 
TmThreadsInitThreadsTimestamp(const struct timeval * ts)2220 void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
2221 {
2222     struct timeval systs;
2223     gettimeofday(&systs, NULL);
2224     SCMutexLock(&thread_store_lock);
2225     for (size_t s = 0; s < thread_store.threads_size; s++) {
2226         Thread *t = &thread_store.threads[s];
2227         if (!t->in_use)
2228             break;
2229         t->pktts = *ts;
2230         t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2231     }
2232     SCMutexUnlock(&thread_store_lock);
2233 }
2234 
TmThreadsGetMinimalTimestamp(struct timeval * ts)2235 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2236 {
2237     struct timeval local, nullts;
2238     memset(&local, 0, sizeof(local));
2239     memset(&nullts, 0, sizeof(nullts));
2240     int set = 0;
2241     size_t s;
2242     struct timeval systs;
2243     gettimeofday(&systs, NULL);
2244 
2245     SCMutexLock(&thread_store_lock);
2246     for (s = 0; s < thread_store.threads_size; s++) {
2247         Thread *t = &thread_store.threads[s];
2248         if (t->in_use == 0)
2249             break;
2250         if (!(timercmp(&t->pktts, &nullts, ==))) {
2251             /* ignore sleeping threads */
2252             if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2253                 continue;
2254 
2255             if (!set) {
2256                 local = t->pktts;
2257                 set = 1;
2258             } else {
2259                 if (timercmp(&t->pktts, &local, <)) {
2260                     local = t->pktts;
2261                 }
2262             }
2263         }
2264     }
2265     SCMutexUnlock(&thread_store_lock);
2266     *ts = local;
2267     SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2268 }
2269 
TmThreadsGetWorkerThreadMax()2270 uint16_t TmThreadsGetWorkerThreadMax()
2271 {
2272     uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2273     int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2274     /* always create at least one thread */
2275     if (thread_max == 0)
2276         thread_max = ncpus * threading_detect_ratio;
2277     if (thread_max < 1)
2278         thread_max = 1;
2279     if (thread_max > 1024) {
2280         SCLogWarning(SC_ERR_RUNMODE, "limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2281         thread_max = 1024;
2282     }
2283     return thread_max;
2284 }
2285 
ThreadBreakLoop(ThreadVars * tv)2286 static inline void ThreadBreakLoop(ThreadVars *tv)
2287 {
2288     if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
2289         return;
2290     }
2291     /* find the correct slot */
2292     TmSlot *s = tv->tm_slots;
2293     TmModule *tm = TmModuleGetById(s->tm_id);
2294     if (tm->flags & TM_FLAG_RECEIVE_TM) {
2295         /* if the method supports it, BreakLoop. Otherwise we rely on
2296          * the capture method's recv timeout */
2297         if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
2298             tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
2299         }
2300     }
2301 }
2302 
2303 /**
2304  *  \retval r 1 if packet was accepted, 0 otherwise
2305  *  \note if packet was not accepted, it's still the responsibility
2306  *        of the caller.
2307  */
TmThreadsInjectPacketsById(Packet ** packets,const int id)2308 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2309 {
2310     if (id <= 0 || id > (int)thread_store.threads_size)
2311         return 0;
2312 
2313     int idx = id - 1;
2314 
2315     Thread *t = &thread_store.threads[idx];
2316     ThreadVars *tv = t->tv;
2317 
2318     if (tv == NULL || tv->stream_pq == NULL)
2319         return 0;
2320 
2321     SCMutexLock(&tv->stream_pq->mutex_q);
2322     while (*packets != NULL) {
2323         PacketEnqueue(tv->stream_pq, *packets);
2324         packets++;
2325     }
2326     SCMutexUnlock(&tv->stream_pq->mutex_q);
2327 
2328     /* wake up listening thread(s) if necessary */
2329     if (tv->inq != NULL) {
2330         SCCondSignal(&tv->inq->pq->cond_q);
2331     } else if (tv->break_loop) {
2332         ThreadBreakLoop(tv);
2333     }
2334     return 1;
2335 }
2336 
2337 /** \brief inject a flow into a threads flow queue
2338  */
TmThreadsInjectFlowById(Flow * f,const int id)2339 void TmThreadsInjectFlowById(Flow *f, const int id)
2340 {
2341     BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2342 
2343     int idx = id - 1;
2344 
2345     Thread *t = &thread_store.threads[idx];
2346     ThreadVars *tv = t->tv;
2347 
2348     BUG_ON(tv == NULL || tv->flow_queue == NULL);
2349 
2350     FlowEnqueue(tv->flow_queue, f);
2351 
2352     /* wake up listening thread(s) if necessary */
2353     if (tv->inq != NULL) {
2354         SCCondSignal(&tv->inq->pq->cond_q);
2355     } else if (tv->break_loop) {
2356         ThreadBreakLoop(tv);
2357     }
2358 }
2359