1 /* Copyright (C) 2007-2011 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  */
24 
25 #ifndef __TM_THREADS_H__
26 #define __TM_THREADS_H__
27 
28 #include "tmqh-packetpool.h"
29 #include "tm-threads-common.h"
30 #include "tm-modules.h"
31 
32 #ifdef OS_WIN32
SleepUsec(uint64_t usec)33 static inline void SleepUsec(uint64_t usec)
34 {
35     uint64_t msec = 1;
36     if (usec > 1000) {
37         msec = usec / 1000;
38     }
39     Sleep(msec);
40 }
41 #define SleepMsec(msec) Sleep((msec))
42 #else
43 #define SleepUsec(usec) usleep((usec))
44 #define SleepMsec(msec) usleep((msec) * 1000)
45 #endif
46 
47 #define TM_QUEUE_NAME_MAX 16
48 #define TM_THREAD_NAME_MAX 16
49 
50 typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *);
51 
52 typedef struct TmSlot_ {
53     /* function pointers */
54     union {
55         TmSlotFunc SlotFunc;
56         TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
57         TmEcode (*Management)(ThreadVars *, void *);
58     };
59     /** linked list of slots, used when a pipeline has multiple slots
60      *  in a single thread. */
61     struct TmSlot_ *slot_next;
62 
63     SC_ATOMIC_DECLARE(void *, slot_data);
64 
65     TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
66     void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
67     TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
68 
69     /* data storage */
70     const void *slot_initdata;
71     /* store the thread module id */
72     int tm_id;
73 
74 } TmSlot;
75 
76 extern ThreadVars *tv_root[TVT_MAX];
77 
78 extern SCMutex tv_root_lock;
79 
80 void TmSlotSetFuncAppend(ThreadVars *, TmModule *, const void *);
81 TmSlot *TmSlotGetSlotForTM(int);
82 
83 ThreadVars *TmThreadCreate(const char *, const char *, const char *, const char *, const char *, const char *,
84                            void *(fn_p)(void *), int);
85 ThreadVars *TmThreadCreatePacketHandler(const char *, const char *, const char *, const char *, const char *,
86                                         const char *);
87 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int);
88 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
89                                      int mucond);
90 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
91                                      int mucond);
92 TmEcode TmThreadSpawn(ThreadVars *);
93 void TmThreadSetFlags(ThreadVars *, uint8_t);
94 void TmThreadKillThreadsFamily(int family);
95 void TmThreadKillThreads(void);
96 void TmThreadClearThreadsFamily(int family);
97 void TmThreadAppend(ThreadVars *, int);
98 void TmThreadSetGroupName(ThreadVars *tv, const char *name);
99 void TmThreadDumpThreads(void);
100 
101 TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t);
102 TmEcode TmThreadSetThreadPriority(ThreadVars *, int);
103 TmEcode TmThreadSetCPU(ThreadVars *, uint8_t);
104 TmEcode TmThreadSetupOptions(ThreadVars *);
105 void TmThreadSetPrio(ThreadVars *);
106 int TmThreadGetNbThreads(uint8_t type);
107 
108 void TmThreadInitMC(ThreadVars *);
109 void TmThreadTestThreadUnPaused(ThreadVars *);
110 void TmThreadContinue(ThreadVars *);
111 void TmThreadContinueThreads(void);
112 void TmThreadPause(ThreadVars *);
113 void TmThreadPauseThreads(void);
114 void TmThreadCheckThreadState(void);
115 TmEcode TmThreadWaitOnThreadInit(void);
116 ThreadVars *TmThreadsGetCallingThread(void);
117 
118 int TmThreadsCheckFlag(ThreadVars *, uint32_t);
119 void TmThreadsSetFlag(ThreadVars *, uint32_t);
120 void TmThreadsUnsetFlag(ThreadVars *, uint32_t);
121 void TmThreadWaitForFlag(ThreadVars *, uint32_t);
122 
123 TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
124 
125 ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *);
126 void TmThreadDisablePacketThreads(void);
127 void TmThreadDisableReceiveThreads(void);
128 TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *);
129 
130 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
131 
TmThreadsCleanDecodePQ(PacketQueueNoLock * pq)132 static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
133 {
134     while (1) {
135         Packet *p = PacketDequeueNoLock(pq);
136         if (unlikely(p == NULL))
137             break;
138         TmqhOutputPacketpool(NULL, p);
139     }
140 }
141 
TmThreadsSlotProcessPktFail(ThreadVars * tv,TmSlot * s,Packet * p)142 static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet *p)
143 {
144     if (p != NULL) {
145         TmqhOutputPacketpool(tv, p);
146     }
147     TmThreadsCleanDecodePQ(&tv->decode_pq);
148     if (tv->stream_pq_local) {
149         SCMutexLock(&tv->stream_pq_local->mutex_q);
150         TmqhReleasePacketsToPacketPool(tv->stream_pq_local);
151         SCMutexUnlock(&tv->stream_pq_local->mutex_q);
152     }
153     TmThreadsSetFlag(tv, THV_FAILED);
154 }
155 
156 /**
157  *  \brief Handle timeout from the capture layer. Checks
158  *         stream_pq which may have been filled by the flow
159  *         manager.
160  *  \param s pipeline to run on these packets.
161  */
TmThreadsHandleInjectedPackets(ThreadVars * tv)162 static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
163 {
164     PacketQueue *pq = tv->stream_pq_local;
165     if (pq && pq->len > 0) {
166         while (1) {
167             SCMutexLock(&pq->mutex_q);
168             Packet *extra_p = PacketDequeue(pq);
169             SCMutexUnlock(&pq->mutex_q);
170             if (extra_p == NULL)
171                 break;
172             TmEcode r = TmThreadsSlotVarRun(tv, extra_p, tv->tm_flowworker);
173             if (r == TM_ECODE_FAILED) {
174                 TmThreadsSlotProcessPktFail(tv, tv->tm_flowworker, extra_p);
175                 break;
176             }
177             tv->tmqh_out(tv, extra_p);
178         }
179         return true;
180     } else {
181         return false;
182     }
183 }
184 
185 /**
186  *  \brief Process the rest of the functions (if any) and queue.
187  */
TmThreadsSlotProcessPkt(ThreadVars * tv,TmSlot * s,Packet * p)188 static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
189 {
190     if (s == NULL) {
191         tv->tmqh_out(tv, p);
192         return TM_ECODE_OK;
193     }
194 
195     TmEcode r = TmThreadsSlotVarRun(tv, p, s);
196     if (unlikely(r == TM_ECODE_FAILED)) {
197         TmThreadsSlotProcessPktFail(tv, s, p);
198         return TM_ECODE_FAILED;
199     }
200 
201     tv->tmqh_out(tv, p);
202 
203     TmThreadsHandleInjectedPackets(tv);
204 
205     return TM_ECODE_OK;
206 }
207 
208 /** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
209  *  Allow caller to supply their own packet
210  *
211  *  Meant for detect reload process that interupts an sleeping capture thread
212  *  to force a packet through the engine to complete a reload */
TmThreadsCaptureInjectPacket(ThreadVars * tv,Packet * p)213 static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
214 {
215     TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
216     if (p == NULL)
217         p = PacketGetFromQueueOrAlloc();
218     if (p != NULL) {
219         p->flags |= PKT_PSEUDO_STREAM_END;
220         PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
221         if (TmThreadsSlotProcessPkt(tv, tv->tm_flowworker, p) != TM_ECODE_OK) {
222             TmqhOutputPacketpool(tv, p);
223         }
224     }
225 }
226 
227 /** \brief handle capture timeout
228  *  When a capture method times out we check for house keeping
229  *  tasks in the capture thread.
230  *
231  *  \param p packet. Capture method may have taken a packet from
232  *           the pool prior to the timing out call. We will then
233  *           use that packet. Otherwise we can get our own.
234  */
TmThreadsCaptureHandleTimeout(ThreadVars * tv,Packet * p)235 static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
236 {
237     if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
238         TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
239         return;
240 
241     } else {
242         if (TmThreadsHandleInjectedPackets(tv) == false) {
243             /* see if we have to do some house keeping */
244             if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
245                 TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
246                 return;
247             }
248         }
249     }
250 
251     /* packet could have been passed to us that we won't use
252      * return it to the pool. */
253     if (p != NULL)
254         tv->tmqh_out(tv, p);
255 }
256 
257 void TmThreadsListThreads(void);
258 int TmThreadsRegisterThread(ThreadVars *tv, const int type);
259 void TmThreadsUnregisterThread(const int id);
260 int TmThreadsInjectPacketsById(Packet **, int id);
261 void TmThreadsInjectFlowById(Flow *f, const int id);
262 
263 void TmThreadsInitThreadsTimestamp(const struct timeval *ts);
264 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts);
265 void TmThreadsGetMinimalTimestamp(struct timeval *ts);
266 uint16_t TmThreadsGetWorkerThreadMax(void);
267 bool TmThreadsTimeSubsysIsReady(void);
268 
269 #endif /* __TM_THREADS_H__ */
270