1 /* Copyright (C) 2007-2011 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18 /**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23 */
24
25 #ifndef __TM_THREADS_H__
26 #define __TM_THREADS_H__
27
28 #include "tmqh-packetpool.h"
29 #include "tm-threads-common.h"
30 #include "tm-modules.h"
31
32 #ifdef OS_WIN32
SleepUsec(uint64_t usec)33 static inline void SleepUsec(uint64_t usec)
34 {
35 uint64_t msec = 1;
36 if (usec > 1000) {
37 msec = usec / 1000;
38 }
39 Sleep(msec);
40 }
41 #define SleepMsec(msec) Sleep((msec))
42 #else
43 #define SleepUsec(usec) usleep((usec))
44 #define SleepMsec(msec) usleep((msec) * 1000)
45 #endif
46
47 #define TM_QUEUE_NAME_MAX 16
48 #define TM_THREAD_NAME_MAX 16
49
50 typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *);
51
52 typedef struct TmSlot_ {
53 /* function pointers */
54 union {
55 TmSlotFunc SlotFunc;
56 TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
57 TmEcode (*Management)(ThreadVars *, void *);
58 };
59 /** linked list of slots, used when a pipeline has multiple slots
60 * in a single thread. */
61 struct TmSlot_ *slot_next;
62
63 SC_ATOMIC_DECLARE(void *, slot_data);
64
65 TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
66 void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
67 TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
68
69 /* data storage */
70 const void *slot_initdata;
71 /* store the thread module id */
72 int tm_id;
73
74 } TmSlot;
75
76 extern ThreadVars *tv_root[TVT_MAX];
77
78 extern SCMutex tv_root_lock;
79
80 void TmSlotSetFuncAppend(ThreadVars *, TmModule *, const void *);
81 TmSlot *TmSlotGetSlotForTM(int);
82
83 ThreadVars *TmThreadCreate(const char *, const char *, const char *, const char *, const char *, const char *,
84 void *(fn_p)(void *), int);
85 ThreadVars *TmThreadCreatePacketHandler(const char *, const char *, const char *, const char *, const char *,
86 const char *);
87 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int);
88 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
89 int mucond);
90 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
91 int mucond);
92 TmEcode TmThreadSpawn(ThreadVars *);
93 void TmThreadSetFlags(ThreadVars *, uint8_t);
94 void TmThreadKillThreadsFamily(int family);
95 void TmThreadKillThreads(void);
96 void TmThreadClearThreadsFamily(int family);
97 void TmThreadAppend(ThreadVars *, int);
98 void TmThreadSetGroupName(ThreadVars *tv, const char *name);
99 void TmThreadDumpThreads(void);
100
101 TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t);
102 TmEcode TmThreadSetThreadPriority(ThreadVars *, int);
103 TmEcode TmThreadSetCPU(ThreadVars *, uint8_t);
104 TmEcode TmThreadSetupOptions(ThreadVars *);
105 void TmThreadSetPrio(ThreadVars *);
106 int TmThreadGetNbThreads(uint8_t type);
107
108 void TmThreadInitMC(ThreadVars *);
109 void TmThreadTestThreadUnPaused(ThreadVars *);
110 void TmThreadContinue(ThreadVars *);
111 void TmThreadContinueThreads(void);
112 void TmThreadPause(ThreadVars *);
113 void TmThreadPauseThreads(void);
114 void TmThreadCheckThreadState(void);
115 TmEcode TmThreadWaitOnThreadInit(void);
116 ThreadVars *TmThreadsGetCallingThread(void);
117
118 int TmThreadsCheckFlag(ThreadVars *, uint32_t);
119 void TmThreadsSetFlag(ThreadVars *, uint32_t);
120 void TmThreadsUnsetFlag(ThreadVars *, uint32_t);
121 void TmThreadWaitForFlag(ThreadVars *, uint32_t);
122
123 TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
124
125 ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *);
126 void TmThreadDisablePacketThreads(void);
127 void TmThreadDisableReceiveThreads(void);
128 TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *);
129
130 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
131
TmThreadsCleanDecodePQ(PacketQueueNoLock * pq)132 static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
133 {
134 while (1) {
135 Packet *p = PacketDequeueNoLock(pq);
136 if (unlikely(p == NULL))
137 break;
138 TmqhOutputPacketpool(NULL, p);
139 }
140 }
141
TmThreadsSlotProcessPktFail(ThreadVars * tv,TmSlot * s,Packet * p)142 static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet *p)
143 {
144 if (p != NULL) {
145 TmqhOutputPacketpool(tv, p);
146 }
147 TmThreadsCleanDecodePQ(&tv->decode_pq);
148 if (tv->stream_pq_local) {
149 SCMutexLock(&tv->stream_pq_local->mutex_q);
150 TmqhReleasePacketsToPacketPool(tv->stream_pq_local);
151 SCMutexUnlock(&tv->stream_pq_local->mutex_q);
152 }
153 TmThreadsSetFlag(tv, THV_FAILED);
154 }
155
156 /**
157 * \brief Handle timeout from the capture layer. Checks
158 * stream_pq which may have been filled by the flow
159 * manager.
160 * \param s pipeline to run on these packets.
161 */
TmThreadsHandleInjectedPackets(ThreadVars * tv)162 static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
163 {
164 PacketQueue *pq = tv->stream_pq_local;
165 if (pq && pq->len > 0) {
166 while (1) {
167 SCMutexLock(&pq->mutex_q);
168 Packet *extra_p = PacketDequeue(pq);
169 SCMutexUnlock(&pq->mutex_q);
170 if (extra_p == NULL)
171 break;
172 TmEcode r = TmThreadsSlotVarRun(tv, extra_p, tv->tm_flowworker);
173 if (r == TM_ECODE_FAILED) {
174 TmThreadsSlotProcessPktFail(tv, tv->tm_flowworker, extra_p);
175 break;
176 }
177 tv->tmqh_out(tv, extra_p);
178 }
179 return true;
180 } else {
181 return false;
182 }
183 }
184
185 /**
186 * \brief Process the rest of the functions (if any) and queue.
187 */
TmThreadsSlotProcessPkt(ThreadVars * tv,TmSlot * s,Packet * p)188 static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
189 {
190 if (s == NULL) {
191 tv->tmqh_out(tv, p);
192 return TM_ECODE_OK;
193 }
194
195 TmEcode r = TmThreadsSlotVarRun(tv, p, s);
196 if (unlikely(r == TM_ECODE_FAILED)) {
197 TmThreadsSlotProcessPktFail(tv, s, p);
198 return TM_ECODE_FAILED;
199 }
200
201 tv->tmqh_out(tv, p);
202
203 TmThreadsHandleInjectedPackets(tv);
204
205 return TM_ECODE_OK;
206 }
207
208 /** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
209 * Allow caller to supply their own packet
210 *
211 * Meant for detect reload process that interupts an sleeping capture thread
212 * to force a packet through the engine to complete a reload */
TmThreadsCaptureInjectPacket(ThreadVars * tv,Packet * p)213 static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
214 {
215 TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
216 if (p == NULL)
217 p = PacketGetFromQueueOrAlloc();
218 if (p != NULL) {
219 p->flags |= PKT_PSEUDO_STREAM_END;
220 PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
221 if (TmThreadsSlotProcessPkt(tv, tv->tm_flowworker, p) != TM_ECODE_OK) {
222 TmqhOutputPacketpool(tv, p);
223 }
224 }
225 }
226
227 /** \brief handle capture timeout
228 * When a capture method times out we check for house keeping
229 * tasks in the capture thread.
230 *
231 * \param p packet. Capture method may have taken a packet from
232 * the pool prior to the timing out call. We will then
233 * use that packet. Otherwise we can get our own.
234 */
TmThreadsCaptureHandleTimeout(ThreadVars * tv,Packet * p)235 static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
236 {
237 if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
238 TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
239 return;
240
241 } else {
242 if (TmThreadsHandleInjectedPackets(tv) == false) {
243 /* see if we have to do some house keeping */
244 if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
245 TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
246 return;
247 }
248 }
249 }
250
251 /* packet could have been passed to us that we won't use
252 * return it to the pool. */
253 if (p != NULL)
254 tv->tmqh_out(tv, p);
255 }
256
257 void TmThreadsListThreads(void);
258 int TmThreadsRegisterThread(ThreadVars *tv, const int type);
259 void TmThreadsUnregisterThread(const int id);
260 int TmThreadsInjectPacketsById(Packet **, int id);
261 void TmThreadsInjectFlowById(Flow *f, const int id);
262
263 void TmThreadsInitThreadsTimestamp(const struct timeval *ts);
264 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts);
265 void TmThreadsGetMinimalTimestamp(struct timeval *ts);
266 uint16_t TmThreadsGetWorkerThreadMax(void);
267 bool TmThreadsTimeSubsysIsReady(void);
268
269 #endif /* __TM_THREADS_H__ */
270