1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18 /**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23 * \author Eric Leblond <eric@regit.org>
24 *
25 * Thread management functions.
26 */
27
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45
46 #ifdef PROFILE_LOCKING
47 thread_local uint64_t mutex_lock_contention;
48 thread_local uint64_t mutex_lock_wait_ticks;
49 thread_local uint64_t mutex_lock_cnt;
50
51 thread_local uint64_t spin_lock_contention;
52 thread_local uint64_t spin_lock_wait_ticks;
53 thread_local uint64_t spin_lock_cnt;
54
55 thread_local uint64_t rww_lock_contention;
56 thread_local uint64_t rww_lock_wait_ticks;
57 thread_local uint64_t rww_lock_cnt;
58
59 thread_local uint64_t rwr_lock_contention;
60 thread_local uint64_t rwr_lock_wait_ticks;
61 thread_local uint64_t rwr_lock_cnt;
62 #endif
63
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 static void TmThreadDeinitMC(ThreadVars *tv);
76
77 /* root of the threadvars list */
78 ThreadVars *tv_root[TVT_MAX] = { NULL };
79
80 /* lock to protect tv_root */
81 SCMutex tv_root_lock = SCMUTEX_INITIALIZER;
82
83 /**
84 * \brief Check if a thread flag is set.
85 *
86 * \retval 1 flag is set.
87 * \retval 0 flag is not set.
88 */
TmThreadsCheckFlag(ThreadVars * tv,uint32_t flag)89 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
90 {
91 return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
92 }
93
94 /**
95 * \brief Set a thread flag.
96 */
TmThreadsSetFlag(ThreadVars * tv,uint32_t flag)97 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
98 {
99 SC_ATOMIC_OR(tv->flags, flag);
100 }
101
102 /**
103 * \brief Unset a thread flag.
104 */
TmThreadsUnsetFlag(ThreadVars * tv,uint32_t flag)105 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
106 {
107 SC_ATOMIC_AND(tv->flags, ~flag);
108 }
109
110 /**
111 * \brief Separate run function so we can call it recursively.
112 */
TmThreadsSlotVarRun(ThreadVars * tv,Packet * p,TmSlot * slot)113 TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
114 {
115 for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
116 PACKET_PROFILING_TMM_START(p, s->tm_id);
117 TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
118 PACKET_PROFILING_TMM_END(p, s->tm_id);
119
120 /* handle error */
121 if (unlikely(r == TM_ECODE_FAILED)) {
122 /* Encountered error. Return packets to packetpool and return */
123 TmThreadsSlotProcessPktFail(tv, s, NULL);
124 return TM_ECODE_FAILED;
125 }
126
127 /* handle new packets */
128 while (tv->decode_pq.top != NULL) {
129 Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
130 if (unlikely(extra_p == NULL))
131 continue;
132
133 /* see if we need to process the packet */
134 if (s->slot_next != NULL) {
135 r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
136 if (unlikely(r == TM_ECODE_FAILED)) {
137 TmThreadsSlotProcessPktFail(tv, s, extra_p);
138 return TM_ECODE_FAILED;
139 }
140 }
141 tv->tmqh_out(tv, extra_p);
142 }
143 }
144
145 return TM_ECODE_OK;
146 }
147
148 /** \internal
149 *
150 * \brief Process flow timeout packets
151 *
152 * Process flow timeout pseudo packets. During shutdown this loop
153 * is run until the flow engine kills the thread and the queue is
154 * empty.
155 */
TmThreadTimeoutLoop(ThreadVars * tv,TmSlot * s)156 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
157 {
158 TmSlot *fw_slot = tv->tm_flowworker;
159 int r = TM_ECODE_OK;
160
161 if (tv->stream_pq == NULL || fw_slot == NULL) {
162 SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
163 return r;
164 }
165
166 SCLogDebug("flow end loop starting");
167 while (1) {
168 SCMutexLock(&tv->stream_pq->mutex_q);
169 uint32_t len = tv->stream_pq->len;
170 SCMutexUnlock(&tv->stream_pq->mutex_q);
171 if (len > 0) {
172 while (len--) {
173 SCMutexLock(&tv->stream_pq->mutex_q);
174 Packet *p = PacketDequeue(tv->stream_pq);
175 SCMutexUnlock(&tv->stream_pq->mutex_q);
176 if (likely(p)) {
177 if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
178 if (r == TM_ECODE_FAILED)
179 break;
180 }
181 }
182 }
183 } else {
184 if (TmThreadsCheckFlag(tv, THV_KILL)) {
185 break;
186 }
187 SleepUsec(1);
188 }
189 }
190 SCLogDebug("flow end loop complete");
191 StatsSyncCounters(tv);
192
193 return r;
194 }
195
196 /*
197
198 pcap/nfq
199
200 pkt read
201 callback
202 process_pkt
203
204 pfring
205
206 pkt read
207 process_pkt
208
209 slot:
210 setup
211
212 pkt_ack_loop(tv, slot_data)
213
214 deinit
215
216 process_pkt:
217 while(s)
218 run s;
219 queue;
220
221 */
222
TmThreadsSlotPktAcqLoop(void * td)223 static void *TmThreadsSlotPktAcqLoop(void *td)
224 {
225 ThreadVars *tv = (ThreadVars *)td;
226 TmSlot *s = tv->tm_slots;
227 char run = 1;
228 TmEcode r = TM_ECODE_OK;
229 TmSlot *slot = NULL;
230
231 /* Set the thread name */
232 if (SCSetThreadName(tv->name) < 0) {
233 SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
234 }
235
236 if (tv->thread_setup_flags != 0)
237 TmThreadSetupOptions(tv);
238
239 /* Drop the capabilities for this thread */
240 SCDropCaps(tv);
241
242 PacketPoolInit();
243
244 /* check if we are setup properly */
245 if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
246 SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
247 " PktAcqLoop=%p, tmqh_in=%p,"
248 " tmqh_out=%p",
249 s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
250 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
251 pthread_exit((void *) -1);
252 return NULL;
253 }
254
255 for (slot = s; slot != NULL; slot = slot->slot_next) {
256 if (slot->SlotThreadInit != NULL) {
257 void *slot_data = NULL;
258 r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
259 if (r != TM_ECODE_OK) {
260 if (r == TM_ECODE_DONE) {
261 EngineDone();
262 TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
263 goto error;
264 } else {
265 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
266 goto error;
267 }
268 }
269 (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
270 }
271
272 /* if the flowworker module is the first, get the threads input queue */
273 if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
274 tv->stream_pq = tv->inq->pq;
275 tv->tm_flowworker = slot;
276 SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
277 tv->flow_queue = FlowQueueNew();
278 if (tv->flow_queue == NULL) {
279 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
280 pthread_exit((void *) -1);
281 return NULL;
282 }
283 /* setup a queue */
284 } else if (slot->tm_id == TMM_FLOWWORKER) {
285 tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
286 if (tv->stream_pq_local == NULL)
287 FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
288 SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
289 tv->stream_pq = tv->stream_pq_local;
290 tv->tm_flowworker = slot;
291 SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
292 tv->flow_queue = FlowQueueNew();
293 if (tv->flow_queue == NULL) {
294 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
295 pthread_exit((void *) -1);
296 return NULL;
297 }
298 }
299 }
300
301 StatsSetupPrivate(tv);
302
303 TmThreadsSetFlag(tv, THV_INIT_DONE);
304
305 while(run) {
306 if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
307 TmThreadsSetFlag(tv, THV_PAUSED);
308 TmThreadTestThreadUnPaused(tv);
309 TmThreadsUnsetFlag(tv, THV_PAUSED);
310 }
311
312 r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
313
314 if (r == TM_ECODE_FAILED) {
315 TmThreadsSetFlag(tv, THV_FAILED);
316 run = 0;
317 }
318 if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
319 run = 0;
320 }
321 if (r == TM_ECODE_DONE) {
322 run = 0;
323 }
324 }
325 StatsSyncCounters(tv);
326
327 TmThreadsSetFlag(tv, THV_FLOW_LOOP);
328
329 /* process all pseudo packets the flow timeout may throw at us */
330 TmThreadTimeoutLoop(tv, s);
331
332 TmThreadsSetFlag(tv, THV_RUNNING_DONE);
333 TmThreadWaitForFlag(tv, THV_DEINIT);
334
335 PacketPoolDestroy();
336
337 for (slot = s; slot != NULL; slot = slot->slot_next) {
338 if (slot->SlotThreadExitPrintStats != NULL) {
339 slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
340 }
341
342 if (slot->SlotThreadDeinit != NULL) {
343 r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
344 if (r != TM_ECODE_OK) {
345 TmThreadsSetFlag(tv, THV_CLOSED);
346 goto error;
347 }
348 }
349 }
350
351 tv->stream_pq = NULL;
352 SCLogDebug("%s ending", tv->name);
353 TmThreadsSetFlag(tv, THV_CLOSED);
354 pthread_exit((void *) 0);
355 return NULL;
356
357 error:
358 tv->stream_pq = NULL;
359 pthread_exit((void *) -1);
360 return NULL;
361 }
362
TmThreadsSlotVar(void * td)363 static void *TmThreadsSlotVar(void *td)
364 {
365 ThreadVars *tv = (ThreadVars *)td;
366 TmSlot *s = (TmSlot *)tv->tm_slots;
367 Packet *p = NULL;
368 char run = 1;
369 TmEcode r = TM_ECODE_OK;
370
371 PacketPoolInit();//Empty();
372
373 /* Set the thread name */
374 if (SCSetThreadName(tv->name) < 0) {
375 SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
376 }
377
378 if (tv->thread_setup_flags != 0)
379 TmThreadSetupOptions(tv);
380
381 /* Drop the capabilities for this thread */
382 SCDropCaps(tv);
383
384 /* check if we are setup properly */
385 if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
386 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
387 pthread_exit((void *) -1);
388 return NULL;
389 }
390
391 for (; s != NULL; s = s->slot_next) {
392 if (s->SlotThreadInit != NULL) {
393 void *slot_data = NULL;
394 r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
395 if (r != TM_ECODE_OK) {
396 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
397 goto error;
398 }
399 (void)SC_ATOMIC_SET(s->slot_data, slot_data);
400 }
401
402 /* special case: we need to access the stream queue
403 * from the flow timeout code */
404
405 /* if the flowworker module is the first, get the threads input queue */
406 if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
407 tv->stream_pq = tv->inq->pq;
408 tv->tm_flowworker = s;
409 SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
410 tv->flow_queue = FlowQueueNew();
411 if (tv->flow_queue == NULL) {
412 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
413 pthread_exit((void *) -1);
414 return NULL;
415 }
416 /* setup a queue */
417 } else if (s->tm_id == TMM_FLOWWORKER) {
418 tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
419 if (tv->stream_pq_local == NULL)
420 FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
421 SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
422 tv->stream_pq = tv->stream_pq_local;
423 tv->tm_flowworker = s;
424 SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
425 tv->flow_queue = FlowQueueNew();
426 if (tv->flow_queue == NULL) {
427 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
428 pthread_exit((void *) -1);
429 return NULL;
430 }
431 }
432 }
433
434 StatsSetupPrivate(tv);
435
436 TmThreadsSetFlag(tv, THV_INIT_DONE);
437
438 s = (TmSlot *)tv->tm_slots;
439
440 while (run) {
441 if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
442 TmThreadsSetFlag(tv, THV_PAUSED);
443 TmThreadTestThreadUnPaused(tv);
444 TmThreadsUnsetFlag(tv, THV_PAUSED);
445 }
446
447 /* input a packet */
448 p = tv->tmqh_in(tv);
449
450 /* if we didn't get a packet see if we need to do some housekeeping */
451 if (unlikely(p == NULL)) {
452 if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
453 p = PacketGetFromQueueOrAlloc();
454 if (p != NULL) {
455 p->flags |= PKT_PSEUDO_STREAM_END;
456 PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
457 }
458 }
459 }
460
461 if (p != NULL) {
462 /* run the thread module(s) */
463 r = TmThreadsSlotVarRun(tv, p, s);
464 if (r == TM_ECODE_FAILED) {
465 TmqhOutputPacketpool(tv, p);
466 TmThreadsSetFlag(tv, THV_FAILED);
467 break;
468 }
469
470 /* output the packet */
471 tv->tmqh_out(tv, p);
472
473 /* now handle the stream pq packets */
474 TmThreadsHandleInjectedPackets(tv);
475 }
476
477 if (TmThreadsCheckFlag(tv, THV_KILL)) {
478 run = 0;
479 }
480 } /* while (run) */
481 StatsSyncCounters(tv);
482
483 TmThreadsSetFlag(tv, THV_RUNNING_DONE);
484 TmThreadWaitForFlag(tv, THV_DEINIT);
485
486 PacketPoolDestroy();
487
488 s = (TmSlot *)tv->tm_slots;
489
490 for ( ; s != NULL; s = s->slot_next) {
491 if (s->SlotThreadExitPrintStats != NULL) {
492 s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
493 }
494
495 if (s->SlotThreadDeinit != NULL) {
496 r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
497 if (r != TM_ECODE_OK) {
498 TmThreadsSetFlag(tv, THV_CLOSED);
499 goto error;
500 }
501 }
502 }
503
504 SCLogDebug("%s ending", tv->name);
505 tv->stream_pq = NULL;
506 TmThreadsSetFlag(tv, THV_CLOSED);
507 pthread_exit((void *) 0);
508 return NULL;
509
510 error:
511 tv->stream_pq = NULL;
512 pthread_exit((void *) -1);
513 return NULL;
514 }
515
TmThreadsManagement(void * td)516 static void *TmThreadsManagement(void *td)
517 {
518 ThreadVars *tv = (ThreadVars *)td;
519 TmSlot *s = (TmSlot *)tv->tm_slots;
520 TmEcode r = TM_ECODE_OK;
521
522 BUG_ON(s == NULL);
523
524 /* Set the thread name */
525 if (SCSetThreadName(tv->name) < 0) {
526 SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
527 }
528
529 if (tv->thread_setup_flags != 0)
530 TmThreadSetupOptions(tv);
531
532 /* Drop the capabilities for this thread */
533 SCDropCaps(tv);
534
535 SCLogDebug("%s starting", tv->name);
536
537 if (s->SlotThreadInit != NULL) {
538 void *slot_data = NULL;
539 r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
540 if (r != TM_ECODE_OK) {
541 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
542 pthread_exit((void *) -1);
543 return NULL;
544 }
545 (void)SC_ATOMIC_SET(s->slot_data, slot_data);
546 }
547
548 StatsSetupPrivate(tv);
549
550 TmThreadsSetFlag(tv, THV_INIT_DONE);
551
552 r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
553 /* handle error */
554 if (r == TM_ECODE_FAILED) {
555 TmThreadsSetFlag(tv, THV_FAILED);
556 }
557
558 if (TmThreadsCheckFlag(tv, THV_KILL)) {
559 StatsSyncCounters(tv);
560 }
561
562 TmThreadsSetFlag(tv, THV_RUNNING_DONE);
563 TmThreadWaitForFlag(tv, THV_DEINIT);
564
565 if (s->SlotThreadExitPrintStats != NULL) {
566 s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
567 }
568
569 if (s->SlotThreadDeinit != NULL) {
570 r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
571 if (r != TM_ECODE_OK) {
572 TmThreadsSetFlag(tv, THV_CLOSED);
573 pthread_exit((void *) -1);
574 return NULL;
575 }
576 }
577
578 TmThreadsSetFlag(tv, THV_CLOSED);
579 pthread_exit((void *) 0);
580 return NULL;
581 }
582
583 /**
584 * \brief We set the slot functions.
585 *
586 * \param tv Pointer to the TV to set the slot function for.
587 * \param name Name of the slot variant.
588 * \param fn_p Pointer to a custom slot function. Used only if slot variant
589 * "name" is "custom".
590 *
591 * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
592 */
TmThreadSetSlots(ThreadVars * tv,const char * name,void * (* fn_p)(void *))593 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
594 {
595 if (name == NULL) {
596 if (fn_p == NULL) {
597 printf("Both slot name and function pointer can't be NULL inside "
598 "TmThreadSetSlots\n");
599 goto error;
600 } else {
601 name = "custom";
602 }
603 }
604
605 if (strcmp(name, "varslot") == 0) {
606 tv->tm_func = TmThreadsSlotVar;
607 } else if (strcmp(name, "pktacqloop") == 0) {
608 tv->tm_func = TmThreadsSlotPktAcqLoop;
609 } else if (strcmp(name, "management") == 0) {
610 tv->tm_func = TmThreadsManagement;
611 } else if (strcmp(name, "command") == 0) {
612 tv->tm_func = TmThreadsManagement;
613 } else if (strcmp(name, "custom") == 0) {
614 if (fn_p == NULL)
615 goto error;
616 tv->tm_func = fn_p;
617 } else {
618 printf("Error: Slot \"%s\" not supported\n", name);
619 goto error;
620 }
621
622 return TM_ECODE_OK;
623
624 error:
625 return TM_ECODE_FAILED;
626 }
627
TmThreadsGetTVContainingSlot(TmSlot * tm_slot)628 ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
629 {
630 SCMutexLock(&tv_root_lock);
631 for (int i = 0; i < TVT_MAX; i++) {
632 ThreadVars *tv = tv_root[i];
633 while (tv) {
634 TmSlot *slots = tv->tm_slots;
635 while (slots != NULL) {
636 if (slots == tm_slot) {
637 SCMutexUnlock(&tv_root_lock);
638 return tv;
639 }
640 slots = slots->slot_next;
641 }
642 tv = tv->next;
643 }
644 }
645 SCMutexUnlock(&tv_root_lock);
646 return NULL;
647 }
648
649 /**
650 * \brief Appends a new entry to the slots.
651 *
652 * \param tv TV the slot is attached to.
653 * \param tm TM to append.
654 * \param data Data to be passed on to the slot init function.
655 *
656 * \retval The allocated TmSlot or NULL if there is an error
657 */
TmSlotSetFuncAppend(ThreadVars * tv,TmModule * tm,const void * data)658 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
659 {
660 TmSlot *slot = SCMalloc(sizeof(TmSlot));
661 if (unlikely(slot == NULL))
662 return;
663 memset(slot, 0, sizeof(TmSlot));
664 SC_ATOMIC_INITPTR(slot->slot_data);
665 slot->SlotThreadInit = tm->ThreadInit;
666 slot->slot_initdata = data;
667 if (tm->Func) {
668 slot->SlotFunc = tm->Func;
669 } else if (tm->PktAcqLoop) {
670 slot->PktAcqLoop = tm->PktAcqLoop;
671 if (tm->PktAcqBreakLoop) {
672 tv->break_loop = true;
673 }
674 } else if (tm->Management) {
675 slot->Management = tm->Management;
676 }
677 slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
678 slot->SlotThreadDeinit = tm->ThreadDeinit;
679 /* we don't have to check for the return value "-1". We wouldn't have
680 * received a TM as arg, if it didn't exist */
681 slot->tm_id = TmModuleGetIDForTM(tm);
682
683 tv->tmm_flags |= tm->flags;
684 tv->cap_flags |= tm->cap_flags;
685
686 if (tv->tm_slots == NULL) {
687 tv->tm_slots = slot;
688 } else {
689 TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
690
691 /* get the last slot */
692 for ( ; a != NULL; a = a->slot_next) {
693 b = a;
694 }
695 /* append the new slot */
696 if (b != NULL) {
697 b->slot_next = slot;
698 }
699 }
700 return;
701 }
702
703 /**
704 * \brief Returns the slot holding a TM with the particular tm_id.
705 *
706 * \param tm_id TM id of the TM whose slot has to be returned.
707 *
708 * \retval slots Pointer to the slot.
709 */
TmSlotGetSlotForTM(int tm_id)710 TmSlot *TmSlotGetSlotForTM(int tm_id)
711 {
712 SCMutexLock(&tv_root_lock);
713 for (int i = 0; i < TVT_MAX; i++) {
714 ThreadVars *tv = tv_root[i];
715 while (tv) {
716 TmSlot *slots = tv->tm_slots;
717 while (slots != NULL) {
718 if (slots->tm_id == tm_id) {
719 SCMutexUnlock(&tv_root_lock);
720 return slots;
721 }
722 slots = slots->slot_next;
723 }
724 tv = tv->next;
725 }
726 }
727 SCMutexUnlock(&tv_root_lock);
728 return NULL;
729 }
730
731 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun && !defined __DragonFly__
SetCPUAffinitySet(cpu_set_t * cs)732 static int SetCPUAffinitySet(cpu_set_t *cs)
733 {
734 #if defined OS_FREEBSD
735 int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
736 SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
737 #elif OS_DARWIN
738 int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
739 (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
740 #else
741 pid_t tid = syscall(SYS_gettid);
742 int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
743 #endif /* OS_FREEBSD */
744
745 if (r != 0) {
746 printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
747 strerror(errno));
748 return -1;
749 }
750
751 return 0;
752 }
753 #endif
754
755
756 /**
757 * \brief Set the thread affinity on the calling thread.
758 *
759 * \param cpuid Id of the core/cpu to setup the affinity.
760 *
761 * \retval 0 If all goes well; -1 if something is wrong.
762 */
SetCPUAffinity(uint16_t cpuid)763 static int SetCPUAffinity(uint16_t cpuid)
764 {
765 #if defined __OpenBSD__ || defined sun || defined __DragonFly__
766 return 0;
767 #else
768 int cpu = (int)cpuid;
769
770 #if defined OS_WIN32 || defined __CYGWIN__
771 DWORD cs = 1 << cpu;
772
773 int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
774 if (r != 0) {
775 printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
776 strerror(errno));
777 return -1;
778 }
779 SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
780 SCGetThreadIdLong(), cpu);
781
782 return 0;
783
784 #else
785 cpu_set_t cs;
786
787 CPU_ZERO(&cs);
788 CPU_SET(cpu, &cs);
789 return SetCPUAffinitySet(&cs);
790 #endif /* windows */
791 #endif /* not supported */
792 }
793
794
795 /**
796 * \brief Set the thread options (thread priority).
797 *
798 * \param tv Pointer to the ThreadVars to setup the thread priority.
799 *
800 * \retval TM_ECODE_OK.
801 */
TmThreadSetThreadPriority(ThreadVars * tv,int prio)802 TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
803 {
804 tv->thread_setup_flags |= THREAD_SET_PRIORITY;
805 tv->thread_priority = prio;
806
807 return TM_ECODE_OK;
808 }
809
810 /**
811 * \brief Adjusting nice value for threads.
812 */
TmThreadSetPrio(ThreadVars * tv)813 void TmThreadSetPrio(ThreadVars *tv)
814 {
815 SCEnter();
816 #ifndef __CYGWIN__
817 #ifdef OS_WIN32
818 if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
819 SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
820 "thread %s: %s", tv->name, strerror(errno));
821 } else {
822 SCLogDebug("Priority set to %"PRId32" for thread %s",
823 tv->thread_priority, tv->name);
824 }
825 #else
826 int ret = nice(tv->thread_priority);
827 if (ret == -1) {
828 SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
829 "for thread %s: %s", tv->thread_priority, tv->name,
830 strerror(errno));
831 } else {
832 SCLogDebug("Nice value set to %"PRId32" for thread %s",
833 tv->thread_priority, tv->name);
834 }
835 #endif /* OS_WIN32 */
836 #endif
837 SCReturn;
838 }
839
840
841 /**
842 * \brief Set the thread options (cpu affinity).
843 *
844 * \param tv pointer to the ThreadVars to setup the affinity.
845 * \param cpu cpu on which affinity is set.
846 *
847 * \retval TM_ECODE_OK
848 */
TmThreadSetCPUAffinity(ThreadVars * tv,uint16_t cpu)849 TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
850 {
851 tv->thread_setup_flags |= THREAD_SET_AFFINITY;
852 tv->cpu_affinity = cpu;
853
854 return TM_ECODE_OK;
855 }
856
857
TmThreadSetCPU(ThreadVars * tv,uint8_t type)858 TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
859 {
860 if (!threading_set_cpu_affinity)
861 return TM_ECODE_OK;
862
863 if (type > MAX_CPU_SET) {
864 SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
865 return TM_ECODE_FAILED;
866 }
867
868 tv->thread_setup_flags |= THREAD_SET_AFFTYPE;
869 tv->cpu_affinity = type;
870
871 return TM_ECODE_OK;
872 }
873
TmThreadGetNbThreads(uint8_t type)874 int TmThreadGetNbThreads(uint8_t type)
875 {
876 if (type >= MAX_CPU_SET) {
877 SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
878 return 0;
879 }
880
881 return thread_affinity[type].nb_threads;
882 }
883
884 /**
885 * \brief Set the thread options (cpu affinitythread).
886 * Priority should be already set by pthread_create.
887 *
888 * \param tv pointer to the ThreadVars of the calling thread.
889 */
TmThreadSetupOptions(ThreadVars * tv)890 TmEcode TmThreadSetupOptions(ThreadVars *tv)
891 {
892 if (tv->thread_setup_flags & THREAD_SET_AFFINITY) {
893 SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
894 "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
895 SCGetThreadIdLong());
896 SetCPUAffinity(tv->cpu_affinity);
897 }
898
899 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun && !defined __DragonFly__
900 if (tv->thread_setup_flags & THREAD_SET_PRIORITY)
901 TmThreadSetPrio(tv);
902 if (tv->thread_setup_flags & THREAD_SET_AFFTYPE) {
903 ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity];
904 if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
905 int cpu = AffinityGetNextCPU(taf);
906 SetCPUAffinity(cpu);
907 /* If CPU is in a set overwrite the default thread prio */
908 if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
909 tv->thread_priority = PRIO_LOW;
910 } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
911 tv->thread_priority = PRIO_MEDIUM;
912 } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
913 tv->thread_priority = PRIO_HIGH;
914 } else {
915 tv->thread_priority = taf->prio;
916 }
917 SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
918 "%d, thread id %lu", tv->thread_priority,
919 tv->name, cpu, SCGetThreadIdLong());
920 } else {
921 SetCPUAffinitySet(&taf->cpu_set);
922 tv->thread_priority = taf->prio;
923 SCLogPerf("Setting prio %d for thread \"%s\", "
924 "thread id %lu", tv->thread_priority,
925 tv->name, SCGetThreadIdLong());
926 }
927 TmThreadSetPrio(tv);
928 }
929 #endif
930
931 return TM_ECODE_OK;
932 }
933
934 /**
935 * \brief Creates and returns the TV instance for a new thread.
936 *
937 * \param name Name of this TV instance
938 * \param inq_name Incoming queue name
939 * \param inqh_name Incoming queue handler name as set by TmqhSetup()
940 * \param outq_name Outgoing queue name
941 * \param outqh_name Outgoing queue handler as set by TmqhSetup()
942 * \param slots String representation for the slot function to be used
943 * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
944 * \param mucond Flag to indicate whether to initialize the condition
945 * and the mutex variables for this newly created TV.
946 *
947 * \retval the newly created TV instance, or NULL on error
948 */
TmThreadCreate(const char * name,const char * inq_name,const char * inqh_name,const char * outq_name,const char * outqh_name,const char * slots,void * (* fn_p)(void *),int mucond)949 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
950 const char *outq_name, const char *outqh_name, const char *slots,
951 void * (*fn_p)(void *), int mucond)
952 {
953 ThreadVars *tv = NULL;
954 Tmq *tmq = NULL;
955 Tmqh *tmqh = NULL;
956
957 SCLogDebug("creating thread \"%s\"...", name);
958
959 /* XXX create separate function for this: allocate a thread container */
960 tv = SCMalloc(sizeof(ThreadVars));
961 if (unlikely(tv == NULL))
962 goto error;
963 memset(tv, 0, sizeof(ThreadVars));
964
965 SC_ATOMIC_INIT(tv->flags);
966 SCMutexInit(&tv->perf_public_ctx.m, NULL);
967
968 strlcpy(tv->name, name, sizeof(tv->name));
969
970 /* default state for every newly created thread */
971 TmThreadsSetFlag(tv, THV_PAUSE);
972 TmThreadsSetFlag(tv, THV_USE);
973
974 /* set the incoming queue */
975 if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
976 SCLogDebug("inq_name \"%s\"", inq_name);
977
978 tmq = TmqGetQueueByName(inq_name);
979 if (tmq == NULL) {
980 tmq = TmqCreateQueue(inq_name);
981 if (tmq == NULL)
982 goto error;
983 }
984 SCLogDebug("tmq %p", tmq);
985
986 tv->inq = tmq;
987 tv->inq->reader_cnt++;
988 SCLogDebug("tv->inq %p", tv->inq);
989 }
990 if (inqh_name != NULL) {
991 SCLogDebug("inqh_name \"%s\"", inqh_name);
992
993 int id = TmqhNameToID(inqh_name);
994 if (id <= 0) {
995 goto error;
996 }
997 tmqh = TmqhGetQueueHandlerByName(inqh_name);
998 if (tmqh == NULL)
999 goto error;
1000
1001 tv->tmqh_in = tmqh->InHandler;
1002 tv->inq_id = (uint8_t)id;
1003 SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
1004 }
1005
1006 /* set the outgoing queue */
1007 if (outqh_name != NULL) {
1008 SCLogDebug("outqh_name \"%s\"", outqh_name);
1009
1010 int id = TmqhNameToID(outqh_name);
1011 if (id <= 0) {
1012 goto error;
1013 }
1014
1015 tmqh = TmqhGetQueueHandlerByName(outqh_name);
1016 if (tmqh == NULL)
1017 goto error;
1018
1019 tv->tmqh_out = tmqh->OutHandler;
1020 tv->outq_id = (uint8_t)id;
1021
1022 if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1023 SCLogDebug("outq_name \"%s\"", outq_name);
1024
1025 if (tmqh->OutHandlerCtxSetup != NULL) {
1026 tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1027 if (tv->outctx == NULL)
1028 goto error;
1029 tv->outq = NULL;
1030 } else {
1031 tmq = TmqGetQueueByName(outq_name);
1032 if (tmq == NULL) {
1033 tmq = TmqCreateQueue(outq_name);
1034 if (tmq == NULL)
1035 goto error;
1036 }
1037 SCLogDebug("tmq %p", tmq);
1038
1039 tv->outq = tmq;
1040 tv->outctx = NULL;
1041 tv->outq->writer_cnt++;
1042 }
1043 }
1044 }
1045
1046 if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1047 goto error;
1048 }
1049
1050 if (mucond != 0)
1051 TmThreadInitMC(tv);
1052
1053 return tv;
1054
1055 error:
1056 SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1057
1058 if (tv != NULL)
1059 SCFree(tv);
1060 return NULL;
1061 }
1062
1063 /**
1064 * \brief Creates and returns a TV instance for a Packet Processing Thread.
1065 * This function doesn't support custom slots, and hence shouldn't be
1066 * supplied \"custom\" as its slot type. All PPT threads are created
1067 * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1068 * conditional variables are not used to kill the thread.
1069 *
1070 * \param name Name of this TV instance
1071 * \param inq_name Incoming queue name
1072 * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1073 * \param outq_name Outgoing queue name
1074 * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1075 * \param slots String representation for the slot function to be used
1076 *
1077 * \retval the newly created TV instance, or NULL on error
1078 */
TmThreadCreatePacketHandler(const char * name,const char * inq_name,const char * inqh_name,const char * outq_name,const char * outqh_name,const char * slots)1079 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1080 const char *inqh_name, const char *outq_name,
1081 const char *outqh_name, const char *slots)
1082 {
1083 ThreadVars *tv = NULL;
1084
1085 tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1086 slots, NULL, 0);
1087
1088 if (tv != NULL) {
1089 tv->type = TVT_PPT;
1090 tv->id = TmThreadsRegisterThread(tv, tv->type);
1091 }
1092
1093
1094 return tv;
1095 }
1096
1097 /**
1098 * \brief Creates and returns the TV instance for a Management thread(MGMT).
1099 * This function supports only custom slot functions and hence a
1100 * function pointer should be sent as an argument.
1101 *
1102 * \param name Name of this TV instance
1103 * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1104 * \param mucond Flag to indicate whether to initialize the condition
1105 * and the mutex variables for this newly created TV.
1106 *
1107 * \retval the newly created TV instance, or NULL on error
1108 */
TmThreadCreateMgmtThread(const char * name,void * (fn_p)(void *),int mucond)1109 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1110 int mucond)
1111 {
1112 ThreadVars *tv = NULL;
1113
1114 tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1115
1116 if (tv != NULL) {
1117 tv->type = TVT_MGMT;
1118 tv->id = TmThreadsRegisterThread(tv, tv->type);
1119 TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
1120 }
1121
1122 return tv;
1123 }
1124
1125 /**
1126 * \brief Creates and returns the TV instance for a Management thread(MGMT).
1127 * This function supports only custom slot functions and hence a
1128 * function pointer should be sent as an argument.
1129 *
1130 * \param name Name of this TV instance
1131 * \param module Name of TmModule with MANAGEMENT flag set.
1132 * \param mucond Flag to indicate whether to initialize the condition
1133 * and the mutex variables for this newly created TV.
1134 *
1135 * \retval the newly created TV instance, or NULL on error
1136 */
TmThreadCreateMgmtThreadByName(const char * name,const char * module,int mucond)1137 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1138 int mucond)
1139 {
1140 ThreadVars *tv = NULL;
1141
1142 tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1143
1144 if (tv != NULL) {
1145 tv->type = TVT_MGMT;
1146 tv->id = TmThreadsRegisterThread(tv, tv->type);
1147 TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
1148
1149 TmModule *m = TmModuleGetByName(module);
1150 if (m) {
1151 TmSlotSetFuncAppend(tv, m, NULL);
1152 }
1153 }
1154
1155 return tv;
1156 }
1157
1158 /**
1159 * \brief Creates and returns the TV instance for a Command thread (CMD).
1160 * This function supports only custom slot functions and hence a
1161 * function pointer should be sent as an argument.
1162 *
1163 * \param name Name of this TV instance
1164 * \param module Name of TmModule with COMMAND flag set.
1165 * \param mucond Flag to indicate whether to initialize the condition
1166 * and the mutex variables for this newly created TV.
1167 *
1168 * \retval the newly created TV instance, or NULL on error
1169 */
TmThreadCreateCmdThreadByName(const char * name,const char * module,int mucond)1170 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1171 int mucond)
1172 {
1173 ThreadVars *tv = NULL;
1174
1175 tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1176
1177 if (tv != NULL) {
1178 tv->type = TVT_CMD;
1179 tv->id = TmThreadsRegisterThread(tv, tv->type);
1180 TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
1181
1182 TmModule *m = TmModuleGetByName(module);
1183 if (m) {
1184 TmSlotSetFuncAppend(tv, m, NULL);
1185 }
1186 }
1187
1188 return tv;
1189 }
1190
1191 /**
1192 * \brief Appends this TV to tv_root based on its type
1193 *
1194 * \param type holds the type this TV belongs to.
1195 */
TmThreadAppend(ThreadVars * tv,int type)1196 void TmThreadAppend(ThreadVars *tv, int type)
1197 {
1198 SCMutexLock(&tv_root_lock);
1199
1200 if (tv_root[type] == NULL) {
1201 tv_root[type] = tv;
1202 tv->next = NULL;
1203
1204 SCMutexUnlock(&tv_root_lock);
1205
1206 return;
1207 }
1208
1209 ThreadVars *t = tv_root[type];
1210
1211 while (t) {
1212 if (t->next == NULL) {
1213 t->next = tv;
1214 tv->next = NULL;
1215 break;
1216 }
1217
1218 t = t->next;
1219 }
1220
1221 SCMutexUnlock(&tv_root_lock);
1222
1223 return;
1224 }
1225
ThreadStillHasPackets(ThreadVars * tv)1226 static bool ThreadStillHasPackets(ThreadVars *tv)
1227 {
1228 if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1229 /* we wait till we dry out all the inq packets, before we
1230 * kill this thread. Do note that you should have disabled
1231 * packet acquire by now using TmThreadDisableReceiveThreads()*/
1232 PacketQueue *q = tv->inq->pq;
1233 SCMutexLock(&q->mutex_q);
1234 uint32_t len = q->len;
1235 SCMutexUnlock(&q->mutex_q);
1236 if (len != 0) {
1237 return true;
1238 }
1239 }
1240
1241 if (tv->stream_pq != NULL) {
1242 SCMutexLock(&tv->stream_pq->mutex_q);
1243 uint32_t len = tv->stream_pq->len;
1244 SCMutexUnlock(&tv->stream_pq->mutex_q);
1245
1246 if (len != 0) {
1247 return true;
1248 }
1249 }
1250 return false;
1251 }
1252
1253 /**
1254 * \brief Kill a thread.
1255 *
1256 * \param tv A ThreadVars instance corresponding to the thread that has to be
1257 * killed.
1258 *
1259 * \retval r 1 killed succesfully
1260 * 0 not yet ready, needs another look
1261 */
TmThreadKillThread(ThreadVars * tv)1262 static int TmThreadKillThread(ThreadVars *tv)
1263 {
1264 BUG_ON(tv == NULL);
1265
1266 /* kill only once :) */
1267 if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1268 return 1;
1269 }
1270
1271 /* set the thread flag informing the thread that it needs to be
1272 * terminated */
1273 TmThreadsSetFlag(tv, THV_KILL);
1274 TmThreadsSetFlag(tv, THV_DEINIT);
1275
1276 /* to be sure, signal more */
1277 if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1278 if (tv->inq_id != TMQH_NOT_SET) {
1279 Tmqh *qh = TmqhGetQueueHandlerByID(tv->inq_id);
1280 if (qh != NULL && qh->InShutdownHandler != NULL) {
1281 qh->InShutdownHandler(tv);
1282 }
1283 }
1284 if (tv->inq != NULL) {
1285 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1286 SCCondSignal(&tv->inq->pq->cond_q);
1287 }
1288 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1289 }
1290
1291 if (tv->ctrl_cond != NULL ) {
1292 pthread_cond_broadcast(tv->ctrl_cond);
1293 }
1294 return 0;
1295 }
1296
1297 if (tv->outctx != NULL) {
1298 if (tv->outq_id != TMQH_NOT_SET) {
1299 Tmqh *qh = TmqhGetQueueHandlerByID(tv->outq_id);
1300 if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1301 qh->OutHandlerCtxFree(tv->outctx);
1302 tv->outctx = NULL;
1303 }
1304 }
1305 }
1306
1307 /* join it and flag it as dead */
1308 pthread_join(tv->t, NULL);
1309 SCLogDebug("thread %s stopped", tv->name);
1310 TmThreadsSetFlag(tv, THV_DEAD);
1311 return 1;
1312 }
1313
1314 /** \internal
1315 *
1316 * \brief make sure that all packet threads are done processing their
1317 * in-flight packets, including 'injected' flow packets.
1318 */
TmThreadDrainPacketThreads(void)1319 static void TmThreadDrainPacketThreads(void)
1320 {
1321 ThreadVars *tv = NULL;
1322 struct timeval start_ts;
1323 struct timeval cur_ts;
1324 gettimeofday(&start_ts, NULL);
1325
1326 again:
1327 gettimeofday(&cur_ts, NULL);
1328 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1329 SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1330 "to process their packets in time");
1331 return;
1332 }
1333
1334 SCMutexLock(&tv_root_lock);
1335
1336 /* all receive threads are part of packet processing threads */
1337 tv = tv_root[TVT_PPT];
1338 while (tv) {
1339 if (ThreadStillHasPackets(tv)) {
1340 /* we wait till we dry out all the inq packets, before we
1341 * kill this thread. Do note that you should have disabled
1342 * packet acquire by now using TmThreadDisableReceiveThreads()*/
1343 SCMutexUnlock(&tv_root_lock);
1344
1345 /* sleep outside lock */
1346 SleepMsec(1);
1347 goto again;
1348 }
1349 if (tv->flow_queue) {
1350 FQLOCK_LOCK(tv->flow_queue);
1351 bool fq_done = (tv->flow_queue->qlen == 0);
1352 FQLOCK_UNLOCK(tv->flow_queue);
1353 if (!fq_done) {
1354 SCMutexUnlock(&tv_root_lock);
1355
1356 Packet *p = PacketGetFromAlloc();
1357 if (p != NULL) {
1358 p->flags |= PKT_PSEUDO_STREAM_END;
1359 PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
1360 PacketQueue *q = tv->stream_pq;
1361 SCMutexLock(&q->mutex_q);
1362 PacketEnqueue(q, p);
1363 SCCondSignal(&q->cond_q);
1364 SCMutexUnlock(&q->mutex_q);
1365 }
1366
1367 /* don't sleep while holding a lock */
1368 SleepMsec(1);
1369 goto again;
1370 }
1371 }
1372 tv = tv->next;
1373 }
1374
1375 SCMutexUnlock(&tv_root_lock);
1376 return;
1377 }
1378
1379 /**
1380 * \brief Disable all threads having the specified TMs.
1381 *
1382 * Breaks out of the packet acquisition loop, and bumps
1383 * into the 'flow loop', where it will process packets
1384 * from the flow engine's shutdown handling.
1385 */
TmThreadDisableReceiveThreads(void)1386 void TmThreadDisableReceiveThreads(void)
1387 {
1388 ThreadVars *tv = NULL;
1389 struct timeval start_ts;
1390 struct timeval cur_ts;
1391 gettimeofday(&start_ts, NULL);
1392
1393 again:
1394 gettimeofday(&cur_ts, NULL);
1395 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1396 FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1397 "thread - \"%s\". Killing engine", tv->name);
1398 }
1399
1400 SCMutexLock(&tv_root_lock);
1401
1402 /* all receive threads are part of packet processing threads */
1403 tv = tv_root[TVT_PPT];
1404
1405 /* we do have to keep in mind that TVs are arranged in the order
1406 * right from receive to log. The moment we fail to find a
1407 * receive TM amongst the slots in a tv, it indicates we are done
1408 * with all receive threads */
1409 while (tv) {
1410 int disable = 0;
1411 TmModule *tm = NULL;
1412 /* obtain the slots for this TV */
1413 TmSlot *slots = tv->tm_slots;
1414 while (slots != NULL) {
1415 tm = TmModuleGetById(slots->tm_id);
1416
1417 if (tm->flags & TM_FLAG_RECEIVE_TM) {
1418 disable = 1;
1419 break;
1420 }
1421
1422 slots = slots->slot_next;
1423 continue;
1424 }
1425
1426 if (disable) {
1427 if (ThreadStillHasPackets(tv)) {
1428 /* we wait till we dry out all the inq packets, before we
1429 * kill this thread. Do note that you should have disabled
1430 * packet acquire by now using TmThreadDisableReceiveThreads()*/
1431 SCMutexUnlock(&tv_root_lock);
1432 /* don't sleep while holding a lock */
1433 SleepMsec(1);
1434 goto again;
1435 }
1436
1437 if (tv->flow_queue) {
1438 FQLOCK_LOCK(tv->flow_queue);
1439 bool fq_done = (tv->flow_queue->qlen == 0);
1440 FQLOCK_UNLOCK(tv->flow_queue);
1441 if (!fq_done) {
1442 SCMutexUnlock(&tv_root_lock);
1443
1444 Packet *p = PacketGetFromAlloc();
1445 if (p != NULL) {
1446 p->flags |= PKT_PSEUDO_STREAM_END;
1447 PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
1448 PacketQueue *q = tv->stream_pq;
1449 SCMutexLock(&q->mutex_q);
1450 PacketEnqueue(q, p);
1451 SCCondSignal(&q->cond_q);
1452 SCMutexUnlock(&q->mutex_q);
1453 }
1454
1455 /* don't sleep while holding a lock */
1456 SleepMsec(1);
1457 goto again;
1458 }
1459 }
1460
1461 /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1462 if (tm && tm->PktAcqBreakLoop != NULL) {
1463 tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1464 }
1465 TmThreadsSetFlag(tv, THV_KILL_PKTACQ);
1466
1467 if (tv->inq != NULL) {
1468 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1469 SCCondSignal(&tv->inq->pq->cond_q);
1470 }
1471 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1472 }
1473
1474 /* wait for it to enter the 'flow loop' stage */
1475 while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1476 SCMutexUnlock(&tv_root_lock);
1477
1478 SleepMsec(1);
1479 goto again;
1480 }
1481 }
1482
1483 tv = tv->next;
1484 }
1485
1486 SCMutexUnlock(&tv_root_lock);
1487
1488 /* finally wait for all packet threads to have
1489 * processed all of their 'live' packets so we
1490 * don't process the last live packets together
1491 * with FFR packets */
1492 TmThreadDrainPacketThreads();
1493 return;
1494 }
1495
TmThreadDebugValidateNoMorePackets(void)1496 static void TmThreadDebugValidateNoMorePackets(void)
1497 {
1498 #ifdef DEBUG_VALIDATION
1499 SCMutexLock(&tv_root_lock);
1500 for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501 if (ThreadStillHasPackets(tv)) {
1502 SCMutexUnlock(&tv_root_lock);
1503 TmThreadDumpThreads();
1504 abort();
1505 }
1506 }
1507 SCMutexUnlock(&tv_root_lock);
1508 #endif
1509 }
1510
1511 /**
1512 * \brief Disable all packet threads
1513 */
TmThreadDisablePacketThreads(void)1514 void TmThreadDisablePacketThreads(void)
1515 {
1516 struct timeval start_ts;
1517 struct timeval cur_ts;
1518
1519 /* first drain all packet threads of their packets */
1520 TmThreadDrainPacketThreads();
1521
1522 /* since all the threads possibly able to produce more packets
1523 * are now gone or inactive, we should see no packets anywhere
1524 * anymore. */
1525 TmThreadDebugValidateNoMorePackets();
1526
1527 gettimeofday(&start_ts, NULL);
1528 again:
1529 gettimeofday(&cur_ts, NULL);
1530 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1531 FatalError(SC_ERR_FATAL, "Engine unable to disable packet "
1532 "threads. Killing engine");
1533 }
1534
1535 /* loop through the packet threads and kill them */
1536 SCMutexLock(&tv_root_lock);
1537 for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1538 TmThreadsSetFlag(tv, THV_KILL);
1539
1540 /* separate worker threads (autofp) will still wait at their
1541 * input queues. So nudge them here so they will observe the
1542 * THV_KILL flag. */
1543 if (tv->inq != NULL) {
1544 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1545 SCCondSignal(&tv->inq->pq->cond_q);
1546 }
1547 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1548 }
1549
1550 while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1551 SCMutexUnlock(&tv_root_lock);
1552
1553 SleepMsec(1);
1554 goto again;
1555 }
1556 }
1557 SCMutexUnlock(&tv_root_lock);
1558 return;
1559 }
1560
TmThreadGetFirstTmSlotForPartialPattern(const char * tm_name)1561 TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
1562 {
1563 ThreadVars *tv = NULL;
1564 TmSlot *slots = NULL;
1565
1566 SCMutexLock(&tv_root_lock);
1567
1568 /* all receive threads are part of packet processing threads */
1569 tv = tv_root[TVT_PPT];
1570
1571 while (tv) {
1572 slots = tv->tm_slots;
1573
1574 while (slots != NULL) {
1575 TmModule *tm = TmModuleGetById(slots->tm_id);
1576
1577 char *found = strstr(tm->name, tm_name);
1578 if (found != NULL)
1579 goto end;
1580
1581 slots = slots->slot_next;
1582 }
1583
1584 tv = tv->next;
1585 }
1586
1587 end:
1588 SCMutexUnlock(&tv_root_lock);
1589 return slots;
1590 }
1591
1592 #define MIN_WAIT_TIME 100
1593 #define MAX_WAIT_TIME 999999
TmThreadKillThreadsFamily(int family)1594 void TmThreadKillThreadsFamily(int family)
1595 {
1596 ThreadVars *tv = NULL;
1597 unsigned int sleep_usec = MIN_WAIT_TIME;
1598
1599 BUG_ON((family < 0) || (family >= TVT_MAX));
1600
1601 again:
1602 SCMutexLock(&tv_root_lock);
1603 tv = tv_root[family];
1604
1605 while (tv) {
1606 int r = TmThreadKillThread(tv);
1607 if (r == 0) {
1608 SCMutexUnlock(&tv_root_lock);
1609 SleepUsec(sleep_usec);
1610 sleep_usec *= 2; /* slowly back off */
1611 sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1612 goto again;
1613 }
1614 sleep_usec = MIN_WAIT_TIME; /* reset */
1615
1616 tv = tv->next;
1617 }
1618 SCMutexUnlock(&tv_root_lock);
1619 }
1620 #undef MIN_WAIT_TIME
1621 #undef MAX_WAIT_TIME
1622
TmThreadKillThreads(void)1623 void TmThreadKillThreads(void)
1624 {
1625 int i = 0;
1626
1627 for (i = 0; i < TVT_MAX; i++) {
1628 TmThreadKillThreadsFamily(i);
1629 }
1630
1631 return;
1632 }
1633
TmThreadFree(ThreadVars * tv)1634 static void TmThreadFree(ThreadVars *tv)
1635 {
1636 TmSlot *s;
1637 TmSlot *ps;
1638 if (tv == NULL)
1639 return;
1640
1641 SCLogDebug("Freeing thread '%s'.", tv->name);
1642
1643 if (tv->flow_queue) {
1644 BUG_ON(tv->flow_queue->qlen != 0);
1645 SCFree(tv->flow_queue);
1646 }
1647
1648 StatsThreadCleanup(tv);
1649
1650 TmThreadDeinitMC(tv);
1651
1652 if (tv->thread_group_name) {
1653 SCFree(tv->thread_group_name);
1654 }
1655
1656 if (tv->printable_name) {
1657 SCFree(tv->printable_name);
1658 }
1659
1660 if (tv->stream_pq_local) {
1661 BUG_ON(tv->stream_pq_local->len);
1662 SCMutexDestroy(&tv->stream_pq_local->mutex_q);
1663 SCFree(tv->stream_pq_local);
1664 }
1665
1666 s = (TmSlot *)tv->tm_slots;
1667 while (s) {
1668 ps = s;
1669 s = s->slot_next;
1670 SCFree(ps);
1671 }
1672
1673 TmThreadsUnregisterThread(tv->id);
1674 SCFree(tv);
1675 }
1676
TmThreadSetGroupName(ThreadVars * tv,const char * name)1677 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1678 {
1679 char *thread_group_name = NULL;
1680
1681 if (name == NULL)
1682 return;
1683
1684 if (tv == NULL)
1685 return;
1686
1687 thread_group_name = SCStrdup(name);
1688 if (unlikely(thread_group_name == NULL)) {
1689 SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1690 return;
1691 }
1692 tv->thread_group_name = thread_group_name;
1693 }
1694
TmThreadClearThreadsFamily(int family)1695 void TmThreadClearThreadsFamily(int family)
1696 {
1697 ThreadVars *tv = NULL;
1698 ThreadVars *ptv = NULL;
1699
1700 if ((family < 0) || (family >= TVT_MAX))
1701 return;
1702
1703 SCMutexLock(&tv_root_lock);
1704 tv = tv_root[family];
1705
1706 while (tv) {
1707 ptv = tv;
1708 tv = tv->next;
1709 TmThreadFree(ptv);
1710 }
1711 tv_root[family] = NULL;
1712 SCMutexUnlock(&tv_root_lock);
1713 }
1714
1715 /**
1716 * \brief Spawns a thread associated with the ThreadVars instance tv
1717 *
1718 * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1719 */
TmThreadSpawn(ThreadVars * tv)1720 TmEcode TmThreadSpawn(ThreadVars *tv)
1721 {
1722 pthread_attr_t attr;
1723 if (tv->tm_func == NULL) {
1724 printf("ERROR: no thread function set\n");
1725 return TM_ECODE_FAILED;
1726 }
1727
1728 /* Initialize and set thread detached attribute */
1729 pthread_attr_init(&attr);
1730
1731 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1732
1733 int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1734 if (rc) {
1735 printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1736 return TM_ECODE_FAILED;
1737 }
1738
1739 TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
1740
1741 TmThreadAppend(tv, tv->type);
1742 return TM_ECODE_OK;
1743 }
1744
1745 /**
1746 * \brief Initializes the mutex and condition variables for this TV
1747 *
1748 * It can be used by a thread to control a wait loop that can also be
1749 * influenced by other threads.
1750 *
1751 * \param tv Pointer to a TV instance
1752 */
TmThreadInitMC(ThreadVars * tv)1753 void TmThreadInitMC(ThreadVars *tv)
1754 {
1755 if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1756 FatalError(SC_ERR_FATAL,
1757 "Fatal error encountered in TmThreadInitMC. "
1758 "Exiting...");
1759 }
1760
1761 if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1762 printf("Error initializing the tv->m mutex\n");
1763 exit(EXIT_FAILURE);
1764 }
1765
1766 if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1767 FatalError(SC_ERR_FATAL,
1768 "Fatal error encountered in TmThreadInitMC. "
1769 "Exiting...");
1770 }
1771
1772 if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1773 FatalError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1774 "variable");
1775 }
1776
1777 return;
1778 }
1779
TmThreadDeinitMC(ThreadVars * tv)1780 static void TmThreadDeinitMC(ThreadVars *tv)
1781 {
1782 if (tv->ctrl_mutex) {
1783 SCCtrlMutexDestroy(tv->ctrl_mutex);
1784 SCFree(tv->ctrl_mutex);
1785 }
1786 if (tv->ctrl_cond) {
1787 SCCtrlCondDestroy(tv->ctrl_cond);
1788 SCFree(tv->ctrl_cond);
1789 }
1790 return;
1791 }
1792
1793 /**
1794 * \brief Tests if the thread represented in the arg has been unpaused or not.
1795 *
1796 * The function would return if the thread tv has been unpaused or if the
1797 * kill flag for the thread has been set.
1798 *
1799 * \param tv Pointer to the TV instance.
1800 */
TmThreadTestThreadUnPaused(ThreadVars * tv)1801 void TmThreadTestThreadUnPaused(ThreadVars *tv)
1802 {
1803 while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1804 SleepUsec(100);
1805
1806 if (TmThreadsCheckFlag(tv, THV_KILL))
1807 break;
1808 }
1809
1810 return;
1811 }
1812
1813 /**
1814 * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1815 * the kill flag has been set or not on the thread.
1816 *
1817 * \param tv Pointer to the TV instance.
1818 */
TmThreadWaitForFlag(ThreadVars * tv,uint32_t flags)1819 void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
1820 {
1821 while (!TmThreadsCheckFlag(tv, flags)) {
1822 SleepUsec(100);
1823 }
1824
1825 return;
1826 }
1827
1828 /**
1829 * \brief Unpauses a thread
1830 *
1831 * \param tv Pointer to a TV instance that has to be unpaused
1832 */
TmThreadContinue(ThreadVars * tv)1833 void TmThreadContinue(ThreadVars *tv)
1834 {
1835 TmThreadsUnsetFlag(tv, THV_PAUSE);
1836
1837 return;
1838 }
1839
1840 /**
1841 * \brief Unpauses all threads present in tv_root
1842 */
TmThreadContinueThreads()1843 void TmThreadContinueThreads()
1844 {
1845 SCMutexLock(&tv_root_lock);
1846 for (int i = 0; i < TVT_MAX; i++) {
1847 ThreadVars *tv = tv_root[i];
1848 while (tv != NULL) {
1849 TmThreadContinue(tv);
1850 tv = tv->next;
1851 }
1852 }
1853 SCMutexUnlock(&tv_root_lock);
1854 return;
1855 }
1856
1857 /**
1858 * \brief Pauses a thread
1859 *
1860 * \param tv Pointer to a TV instance that has to be paused
1861 */
TmThreadPause(ThreadVars * tv)1862 void TmThreadPause(ThreadVars *tv)
1863 {
1864 TmThreadsSetFlag(tv, THV_PAUSE);
1865 return;
1866 }
1867
1868 /**
1869 * \brief Pauses all threads present in tv_root
1870 */
TmThreadPauseThreads()1871 void TmThreadPauseThreads()
1872 {
1873 TmThreadsListThreads();
1874
1875 SCMutexLock(&tv_root_lock);
1876 for (int i = 0; i < TVT_MAX; i++) {
1877 ThreadVars *tv = tv_root[i];
1878 while (tv != NULL) {
1879 TmThreadPause(tv);
1880 tv = tv->next;
1881 }
1882 }
1883 SCMutexUnlock(&tv_root_lock);
1884 }
1885
1886 /**
1887 * \brief Used to check the thread for certain conditions of failure.
1888 */
TmThreadCheckThreadState(void)1889 void TmThreadCheckThreadState(void)
1890 {
1891 SCMutexLock(&tv_root_lock);
1892 for (int i = 0; i < TVT_MAX; i++) {
1893 ThreadVars *tv = tv_root[i];
1894 while (tv) {
1895 if (TmThreadsCheckFlag(tv, THV_FAILED)) {
1896 FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
1897 }
1898 tv = tv->next;
1899 }
1900 }
1901 SCMutexUnlock(&tv_root_lock);
1902 return;
1903 }
1904
1905 /**
1906 * \brief Used to check if all threads have finished their initialization. On
1907 * finding an un-initialized thread, it waits till that thread completes
1908 * its initialization, before proceeding to the next thread.
1909 *
1910 * \retval TM_ECODE_OK all initialized properly
1911 * \retval TM_ECODE_FAILED failure
1912 */
TmThreadWaitOnThreadInit(void)1913 TmEcode TmThreadWaitOnThreadInit(void)
1914 {
1915 uint16_t mgt_num = 0;
1916 uint16_t ppt_num = 0;
1917
1918 struct timeval start_ts;
1919 struct timeval cur_ts;
1920 gettimeofday(&start_ts, NULL);
1921
1922 again:
1923 SCMutexLock(&tv_root_lock);
1924 for (int i = 0; i < TVT_MAX; i++) {
1925 ThreadVars *tv = tv_root[i];
1926 while (tv != NULL) {
1927 if (TmThreadsCheckFlag(tv, (THV_CLOSED|THV_DEAD))) {
1928 SCMutexUnlock(&tv_root_lock);
1929
1930 SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1931 "initialize: flags %04x", tv->name,
1932 SC_ATOMIC_GET(tv->flags));
1933 return TM_ECODE_FAILED;
1934 }
1935
1936 if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1937 SCMutexUnlock(&tv_root_lock);
1938
1939 gettimeofday(&cur_ts, NULL);
1940 if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1941 SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1942 "initialize in time: flags %04x", tv->name,
1943 SC_ATOMIC_GET(tv->flags));
1944 return TM_ECODE_FAILED;
1945 }
1946
1947 /* sleep a little to give the thread some
1948 * time to finish initialization */
1949 SleepUsec(100);
1950 goto again;
1951 }
1952
1953 if (TmThreadsCheckFlag(tv, THV_FAILED)) {
1954 SCMutexUnlock(&tv_root_lock);
1955 SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1956 "initialize.", tv->name);
1957 return TM_ECODE_FAILED;
1958 }
1959 if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
1960 SCMutexUnlock(&tv_root_lock);
1961 SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
1962 "initialization.", tv->name);
1963 return TM_ECODE_FAILED;
1964 }
1965
1966 if (i == TVT_MGMT)
1967 mgt_num++;
1968 else if (i == TVT_PPT)
1969 ppt_num++;
1970
1971 tv = tv->next;
1972 }
1973 }
1974 SCMutexUnlock(&tv_root_lock);
1975
1976 SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management "
1977 "threads initialized, engine started.", ppt_num, mgt_num);
1978
1979 return TM_ECODE_OK;
1980 }
1981
1982 /**
1983 * \brief Returns the TV for the calling thread.
1984 *
1985 * \retval tv Pointer to the ThreadVars instance for the calling thread;
1986 * NULL on no match
1987 */
TmThreadsGetCallingThread(void)1988 ThreadVars *TmThreadsGetCallingThread(void)
1989 {
1990 pthread_t self = pthread_self();
1991
1992 SCMutexLock(&tv_root_lock);
1993 for (int i = 0; i < TVT_MAX; i++) {
1994 ThreadVars *tv = tv_root[i];
1995 while (tv) {
1996 if (pthread_equal(self, tv->t)) {
1997 SCMutexUnlock(&tv_root_lock);
1998 return tv;
1999 }
2000 tv = tv->next;
2001 }
2002 }
2003 SCMutexUnlock(&tv_root_lock);
2004 return NULL;
2005 }
2006
2007 /**
2008 * \brief returns a count of all the threads that match the flag
2009 */
TmThreadCountThreadsByTmmFlags(uint8_t flags)2010 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
2011 {
2012 uint32_t cnt = 0;
2013 SCMutexLock(&tv_root_lock);
2014 for (int i = 0; i < TVT_MAX; i++) {
2015 ThreadVars *tv = tv_root[i];
2016 while (tv != NULL) {
2017 if ((tv->tmm_flags & flags) == flags)
2018 cnt++;
2019
2020 tv = tv->next;
2021 }
2022 }
2023 SCMutexUnlock(&tv_root_lock);
2024 return cnt;
2025 }
2026
TmThreadDoDumpSlots(const ThreadVars * tv)2027 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2028 {
2029 for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2030 TmModule *m = TmModuleGetById(s->tm_id);
2031 SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2032 tv, s, s->tm_id, m->name);
2033 }
2034 }
2035
TmThreadDumpThreads(void)2036 void TmThreadDumpThreads(void)
2037 {
2038 SCMutexLock(&tv_root_lock);
2039 for (int i = 0; i < TVT_MAX; i++) {
2040 ThreadVars *tv = tv_root[i];
2041 while (tv != NULL) {
2042 const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2043 SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2044 tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2045 if (tv->inq && tv->stream_pq == tv->inq->pq) {
2046 SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2047 } else if (tv->stream_pq_local != NULL) {
2048 for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2049 SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2050 tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2051 }
2052 }
2053 for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2054 SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2055 tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2056 }
2057 TmThreadDoDumpSlots(tv);
2058 tv = tv->next;
2059 }
2060 }
2061 SCMutexUnlock(&tv_root_lock);
2062 TmThreadsListThreads();
2063 }
2064
2065 typedef struct Thread_ {
2066 ThreadVars *tv; /**< threadvars structure */
2067 const char *name;
2068 int type;
2069 int in_use; /**< bool to indicate this is in use */
2070
2071 struct timeval pktts; /**< current packet time of this thread
2072 * (offline mode) */
2073 uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2074 * time when the pktts was last updated. */
2075 } Thread;
2076
2077 typedef struct Threads_ {
2078 Thread *threads;
2079 size_t threads_size;
2080 int threads_cnt;
2081 } Threads;
2082
2083 static Threads thread_store = { NULL, 0, 0 };
2084 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2085
TmThreadsListThreads(void)2086 void TmThreadsListThreads(void)
2087 {
2088 SCMutexLock(&thread_store_lock);
2089 for (size_t s = 0; s < thread_store.threads_size; s++) {
2090 Thread *t = &thread_store.threads[s];
2091 if (t == NULL || t->in_use == 0)
2092 continue;
2093
2094 SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2095 (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2096 if (t->tv) {
2097 ThreadVars *tv = t->tv;
2098 const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2099 SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2100 tv, tv->type, tv->name, tv->tmm_flags, flags);
2101 }
2102 }
2103 SCMutexUnlock(&thread_store_lock);
2104 }
2105
2106 #define STEP 32
2107 /**
2108 * \retval id thread id, or 0 if not found
2109 */
TmThreadsRegisterThread(ThreadVars * tv,const int type)2110 int TmThreadsRegisterThread(ThreadVars *tv, const int type)
2111 {
2112 SCMutexLock(&thread_store_lock);
2113 if (thread_store.threads == NULL) {
2114 thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2115 BUG_ON(thread_store.threads == NULL);
2116 thread_store.threads_size = STEP;
2117 }
2118
2119 size_t s;
2120 for (s = 0; s < thread_store.threads_size; s++) {
2121 if (thread_store.threads[s].in_use == 0) {
2122 Thread *t = &thread_store.threads[s];
2123 t->name = tv->name;
2124 t->type = type;
2125 t->tv = tv;
2126 t->in_use = 1;
2127
2128 SCMutexUnlock(&thread_store_lock);
2129 return (int)(s+1);
2130 }
2131 }
2132
2133 /* if we get here the array is completely filled */
2134 void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2135 BUG_ON(newmem == NULL);
2136 thread_store.threads = newmem;
2137 memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2138
2139 Thread *t = &thread_store.threads[thread_store.threads_size];
2140 t->name = tv->name;
2141 t->type = type;
2142 t->tv = tv;
2143 t->in_use = 1;
2144
2145 s = thread_store.threads_size;
2146 thread_store.threads_size += STEP;
2147
2148 SCMutexUnlock(&thread_store_lock);
2149 return (int)(s+1);
2150 }
2151 #undef STEP
2152
TmThreadsUnregisterThread(const int id)2153 void TmThreadsUnregisterThread(const int id)
2154 {
2155 SCMutexLock(&thread_store_lock);
2156 if (id <= 0 || id > (int)thread_store.threads_size) {
2157 SCMutexUnlock(&thread_store_lock);
2158 return;
2159 }
2160
2161 /* id is one higher than index */
2162 int idx = id - 1;
2163
2164 /* reset thread_id, which serves as clearing the record */
2165 thread_store.threads[idx].in_use = 0;
2166
2167 /* check if we have at least one registered thread left */
2168 size_t s;
2169 for (s = 0; s < thread_store.threads_size; s++) {
2170 Thread *t = &thread_store.threads[s];
2171 if (t->in_use == 1) {
2172 goto end;
2173 }
2174 }
2175
2176 /* if we get here no threads are registered */
2177 SCFree(thread_store.threads);
2178 thread_store.threads = NULL;
2179 thread_store.threads_size = 0;
2180 thread_store.threads_cnt = 0;
2181
2182 end:
2183 SCMutexUnlock(&thread_store_lock);
2184 }
2185
TmThreadsSetThreadTimestamp(const int id,const struct timeval * ts)2186 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2187 {
2188 SCMutexLock(&thread_store_lock);
2189 if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2190 SCMutexUnlock(&thread_store_lock);
2191 return;
2192 }
2193
2194 int idx = id - 1;
2195 Thread *t = &thread_store.threads[idx];
2196 t->pktts = *ts;
2197 struct timeval systs;
2198 gettimeofday(&systs, NULL);
2199 t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2200 SCMutexUnlock(&thread_store_lock);
2201 }
2202
TmThreadsTimeSubsysIsReady(void)2203 bool TmThreadsTimeSubsysIsReady(void)
2204 {
2205 bool ready = true;
2206 SCMutexLock(&thread_store_lock);
2207 for (size_t s = 0; s < thread_store.threads_size; s++) {
2208 Thread *t = &thread_store.threads[s];
2209 if (!t->in_use)
2210 break;
2211 if (t->sys_sec_stamp == 0) {
2212 ready = false;
2213 break;
2214 }
2215 }
2216 SCMutexUnlock(&thread_store_lock);
2217 return ready;
2218 }
2219
TmThreadsInitThreadsTimestamp(const struct timeval * ts)2220 void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
2221 {
2222 struct timeval systs;
2223 gettimeofday(&systs, NULL);
2224 SCMutexLock(&thread_store_lock);
2225 for (size_t s = 0; s < thread_store.threads_size; s++) {
2226 Thread *t = &thread_store.threads[s];
2227 if (!t->in_use)
2228 break;
2229 t->pktts = *ts;
2230 t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2231 }
2232 SCMutexUnlock(&thread_store_lock);
2233 }
2234
TmThreadsGetMinimalTimestamp(struct timeval * ts)2235 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2236 {
2237 struct timeval local, nullts;
2238 memset(&local, 0, sizeof(local));
2239 memset(&nullts, 0, sizeof(nullts));
2240 int set = 0;
2241 size_t s;
2242 struct timeval systs;
2243 gettimeofday(&systs, NULL);
2244
2245 SCMutexLock(&thread_store_lock);
2246 for (s = 0; s < thread_store.threads_size; s++) {
2247 Thread *t = &thread_store.threads[s];
2248 if (t->in_use == 0)
2249 break;
2250 if (!(timercmp(&t->pktts, &nullts, ==))) {
2251 /* ignore sleeping threads */
2252 if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2253 continue;
2254
2255 if (!set) {
2256 local = t->pktts;
2257 set = 1;
2258 } else {
2259 if (timercmp(&t->pktts, &local, <)) {
2260 local = t->pktts;
2261 }
2262 }
2263 }
2264 }
2265 SCMutexUnlock(&thread_store_lock);
2266 *ts = local;
2267 SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2268 }
2269
TmThreadsGetWorkerThreadMax()2270 uint16_t TmThreadsGetWorkerThreadMax()
2271 {
2272 uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2273 int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2274 /* always create at least one thread */
2275 if (thread_max == 0)
2276 thread_max = ncpus * threading_detect_ratio;
2277 if (thread_max < 1)
2278 thread_max = 1;
2279 if (thread_max > 1024) {
2280 SCLogWarning(SC_ERR_RUNMODE, "limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2281 thread_max = 1024;
2282 }
2283 return thread_max;
2284 }
2285
ThreadBreakLoop(ThreadVars * tv)2286 static inline void ThreadBreakLoop(ThreadVars *tv)
2287 {
2288 if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
2289 return;
2290 }
2291 /* find the correct slot */
2292 TmSlot *s = tv->tm_slots;
2293 TmModule *tm = TmModuleGetById(s->tm_id);
2294 if (tm->flags & TM_FLAG_RECEIVE_TM) {
2295 /* if the method supports it, BreakLoop. Otherwise we rely on
2296 * the capture method's recv timeout */
2297 if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
2298 tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
2299 }
2300 }
2301 }
2302
2303 /**
2304 * \retval r 1 if packet was accepted, 0 otherwise
2305 * \note if packet was not accepted, it's still the responsibility
2306 * of the caller.
2307 */
TmThreadsInjectPacketsById(Packet ** packets,const int id)2308 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2309 {
2310 if (id <= 0 || id > (int)thread_store.threads_size)
2311 return 0;
2312
2313 int idx = id - 1;
2314
2315 Thread *t = &thread_store.threads[idx];
2316 ThreadVars *tv = t->tv;
2317
2318 if (tv == NULL || tv->stream_pq == NULL)
2319 return 0;
2320
2321 SCMutexLock(&tv->stream_pq->mutex_q);
2322 while (*packets != NULL) {
2323 PacketEnqueue(tv->stream_pq, *packets);
2324 packets++;
2325 }
2326 SCMutexUnlock(&tv->stream_pq->mutex_q);
2327
2328 /* wake up listening thread(s) if necessary */
2329 if (tv->inq != NULL) {
2330 SCCondSignal(&tv->inq->pq->cond_q);
2331 } else if (tv->break_loop) {
2332 ThreadBreakLoop(tv);
2333 }
2334 return 1;
2335 }
2336
2337 /** \brief inject a flow into a threads flow queue
2338 */
TmThreadsInjectFlowById(Flow * f,const int id)2339 void TmThreadsInjectFlowById(Flow *f, const int id)
2340 {
2341 BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2342
2343 int idx = id - 1;
2344
2345 Thread *t = &thread_store.threads[idx];
2346 ThreadVars *tv = t->tv;
2347
2348 BUG_ON(tv == NULL || tv->flow_queue == NULL);
2349
2350 FlowEnqueue(tv->flow_queue, f);
2351
2352 /* wake up listening thread(s) if necessary */
2353 if (tv->inq != NULL) {
2354 SCCondSignal(&tv->inq->pq->cond_q);
2355 } else if (tv->break_loop) {
2356 ThreadBreakLoop(tv);
2357 }
2358 }
2359