1 /* Copyright (C) 2016-2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18 /**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 *
23 * Flow Workers are single thread modules taking care of (almost)
24 * everything related to packets with flows:
25 *
26 * - Lookup/creation
27 * - Stream tracking, reassembly
28 * - Applayer update
29 * - Detection
30 *
31 * This all while holding the flow lock.
32 */
33
34 #include "suricata-common.h"
35 #include "suricata.h"
36
37 #include "decode.h"
38 #include "detect.h"
39 #include "stream-tcp.h"
40 #include "app-layer.h"
41 #include "detect-engine.h"
42 #include "output.h"
43 #include "app-layer-parser.h"
44
45 #include "util-validate.h"
46
47 #include "flow-util.h"
48 #include "flow-manager.h"
49 #include "flow-timeout.h"
50 #include "flow-spare-pool.h"
51
52 typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr;
53
54 typedef struct FlowTimeoutCounters {
55 uint32_t flows_aside_needs_work;
56 uint32_t flows_aside_pkt_inject;
57 } FlowTimeoutCounters;
58
59 typedef struct FlowWorkerThreadData_ {
60 DecodeThreadVars *dtv;
61
62 union {
63 StreamTcpThread *stream_thread;
64 void *stream_thread_ptr;
65 };
66
67 SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
68
69 void *output_thread; /* Output thread data. */
70 void *output_thread_flow; /* Output thread data. */
71
72 uint16_t local_bypass_pkts;
73 uint16_t local_bypass_bytes;
74 uint16_t both_bypass_pkts;
75 uint16_t both_bypass_bytes;
76
77 PacketQueueNoLock pq;
78 FlowLookupStruct fls;
79
80 struct {
81 uint16_t flows_injected;
82 uint16_t flows_removed;
83 uint16_t flows_aside_needs_work;
84 uint16_t flows_aside_pkt_inject;
85 } cnt;
86
87 } FlowWorkerThreadData;
88
89 static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, void *detect_thread);
90 Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, TcpSession *ssn);
91
92 /**
93 * \internal
94 * \brief Forces reassembly for flow if it needs it.
95 *
96 * The function requires flow to be locked beforehand.
97 *
98 * \param f Pointer to the flow.
99 *
100 * \retval cnt number of packets injected
101 */
FlowFinish(ThreadVars * tv,Flow * f,FlowWorkerThreadData * fw,void * detect_thread)102 static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread)
103 {
104 Packet *p1 = NULL, *p2 = NULL;
105 const int server = f->ffr_tc;
106 const int client = f->ffr_ts;
107
108 /* Get the tcp session for the flow */
109 TcpSession *ssn = (TcpSession *)f->protoctx;
110
111 /* The packets we use are based on what segments in what direction are
112 * unprocessed.
113 * p1 if we have client segments for reassembly purpose only. If we
114 * have no server segments p2 can be a toserver packet with dummy
115 * seq/ack, and if we have server segments p2 has to carry out reassembly
116 * for server segment as well, in which case we will also need a p3 in the
117 * toclient which is now dummy since all we need it for is detection */
118
119 /* insert a pseudo packet in the toserver direction */
120 if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
121 p1 = FlowForceReassemblyPseudoPacketGet(0, f, ssn);
122 if (p1 == NULL) {
123 return 0;
124 }
125 PKT_SET_SRC(p1, PKT_SRC_FFR);
126
127 if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
128 p2 = FlowForceReassemblyPseudoPacketGet(1, f, ssn);
129 if (p2 == NULL) {
130 FlowDeReference(&p1->flow);
131 TmqhOutputPacketpool(NULL, p1);
132 return 0;
133 }
134 PKT_SET_SRC(p2, PKT_SRC_FFR);
135 p2->flowflags |= FLOW_PKT_LAST_PSEUDO;
136 } else {
137 p1->flowflags |= FLOW_PKT_LAST_PSEUDO;
138 }
139 } else {
140 if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
141 p1 = FlowForceReassemblyPseudoPacketGet(1, f, ssn);
142 if (p1 == NULL) {
143 return 0;
144 }
145 PKT_SET_SRC(p1, PKT_SRC_FFR);
146 p1->flowflags |= FLOW_PKT_LAST_PSEUDO;
147 } else {
148 /* impossible */
149 BUG_ON(1);
150 }
151 }
152 f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
153
154 FlowWorkerFlowTimeout(tv, p1, fw, detect_thread);
155 PacketPoolReturnPacket(p1);
156 if (p2) {
157 FlowWorkerFlowTimeout(tv, p2, fw, detect_thread);
158 PacketPoolReturnPacket(p2);
159 return 2;
160 }
161 return 1;
162 }
163
CheckWorkQueue(ThreadVars * tv,FlowWorkerThreadData * fw,void * detect_thread,FlowTimeoutCounters * counters,FlowQueuePrivate * fq)164 static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
165 void *detect_thread, // TODO proper type?
166 FlowTimeoutCounters *counters,
167 FlowQueuePrivate *fq)
168 {
169 Flow *f;
170 while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
171 FLOWLOCK_WRLOCK(f);
172 f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg
173
174 const FlowStateType state = f->flow_state;
175 if (f->proto == IPPROTO_TCP) {
176 if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
177 #ifdef CAPTURE_OFFLOAD
178 state != FLOW_STATE_CAPTURE_BYPASSED &&
179 #endif
180 state != FLOW_STATE_LOCAL_BYPASSED &&
181 FlowForceReassemblyNeedReassembly(f) == 1 &&
182 f->ffr != 0)
183 {
184 int cnt = FlowFinish(tv, f, fw, detect_thread);
185 counters->flows_aside_pkt_inject += cnt;
186 counters->flows_aside_needs_work++;
187 }
188 }
189
190 /* this should not be possible */
191 BUG_ON(f->use_cnt > 0);
192
193 /* no one is referring to this flow, use_cnt 0, removed from hash
194 * so we can unlock it and pass it to the flow recycler */
195
196 if (fw->output_thread_flow != NULL)
197 (void)OutputFlowLog(tv, fw->output_thread_flow, f);
198
199 FlowClearMemory (f, f->protomap);
200 FLOWLOCK_UNLOCK(f);
201 if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
202 FlowSparePoolReturnFlow(f);
203 } else {
204 FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
205 }
206 }
207 }
208
209 /** \brief handle flow for packet
210 *
211 * Handle flow creation/lookup
212 */
FlowUpdate(ThreadVars * tv,FlowWorkerThreadData * fw,Packet * p)213 static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
214 {
215 FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);
216
217 int state = p->flow->flow_state;
218 switch (state) {
219 #ifdef CAPTURE_OFFLOAD
220 case FLOW_STATE_CAPTURE_BYPASSED: {
221 StatsAddUI64(tv, fw->both_bypass_pkts, 1);
222 StatsAddUI64(tv, fw->both_bypass_bytes, GET_PKT_LEN(p));
223 Flow *f = p->flow;
224 FlowDeReference(&p->flow);
225 FLOWLOCK_UNLOCK(f);
226 return TM_ECODE_DONE;
227 }
228 #endif
229 case FLOW_STATE_LOCAL_BYPASSED: {
230 StatsAddUI64(tv, fw->local_bypass_pkts, 1);
231 StatsAddUI64(tv, fw->local_bypass_bytes, GET_PKT_LEN(p));
232 Flow *f = p->flow;
233 FlowDeReference(&p->flow);
234 FLOWLOCK_UNLOCK(f);
235 return TM_ECODE_DONE;
236 }
237 default:
238 return TM_ECODE_OK;
239 }
240 }
241
242 static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
243
FlowWorkerThreadInit(ThreadVars * tv,const void * initdata,void ** data)244 static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
245 {
246 FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
247 if (fw == NULL)
248 return TM_ECODE_FAILED;
249
250 SC_ATOMIC_INITPTR(fw->detect_thread);
251 SC_ATOMIC_SET(fw->detect_thread, NULL);
252
253 fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", tv);
254 fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", tv);
255 fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", tv);
256 fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", tv);
257
258 fw->cnt.flows_aside_needs_work = StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", tv);
259 fw->cnt.flows_aside_pkt_inject = StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", tv);
260 fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", tv);
261 fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", tv);
262
263 fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
264 if (fw->dtv == NULL) {
265 FlowWorkerThreadDeinit(tv, fw);
266 return TM_ECODE_FAILED;
267 }
268
269 /* setup TCP */
270 if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) {
271 FlowWorkerThreadDeinit(tv, fw);
272 return TM_ECODE_FAILED;
273 }
274
275 if (DetectEngineEnabled()) {
276 /* setup DETECT */
277 void *detect_thread = NULL;
278 if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
279 FlowWorkerThreadDeinit(tv, fw);
280 return TM_ECODE_FAILED;
281 }
282 SC_ATOMIC_SET(fw->detect_thread, detect_thread);
283 }
284
285 /* Setup outputs for this thread. */
286 if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
287 FlowWorkerThreadDeinit(tv, fw);
288 return TM_ECODE_FAILED;
289 }
290 if (OutputFlowLogThreadInit(tv, NULL, &fw->output_thread_flow) != TM_ECODE_OK) {
291 SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
292 FlowWorkerThreadDeinit(tv, fw);
293 return TM_ECODE_FAILED;
294 }
295
296 DecodeRegisterPerfCounters(fw->dtv, tv);
297 AppLayerRegisterThreadCounters(tv);
298
299 /* setup pq for stream end pkts */
300 memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
301 *data = fw;
302 return TM_ECODE_OK;
303 }
304
FlowWorkerThreadDeinit(ThreadVars * tv,void * data)305 static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
306 {
307 FlowWorkerThreadData *fw = data;
308
309 DecodeThreadVarsFree(tv, fw->dtv);
310
311 /* free TCP */
312 StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
313
314 /* free DETECT */
315 void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
316 if (detect_thread != NULL) {
317 DetectEngineThreadCtxDeinit(tv, detect_thread);
318 SC_ATOMIC_SET(fw->detect_thread, NULL);
319 }
320
321 /* Free output. */
322 OutputLoggerThreadDeinit(tv, fw->output_thread);
323 OutputFlowLogThreadDeinit(tv, fw->output_thread_flow);
324
325 /* free pq */
326 BUG_ON(fw->pq.len);
327
328 Flow *f;
329 while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) {
330 FlowFree(f);
331 }
332
333 SCFree(fw);
334 return TM_ECODE_OK;
335 }
336
337 TmEcode Detect(ThreadVars *tv, Packet *p, void *data);
338 TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *pq);
339
UpdateCounters(ThreadVars * tv,FlowWorkerThreadData * fw,const FlowTimeoutCounters * counters)340 static inline void UpdateCounters(ThreadVars *tv,
341 FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters)
342 {
343 if (counters->flows_aside_needs_work) {
344 StatsAddUI64(tv, fw->cnt.flows_aside_needs_work,
345 (uint64_t)counters->flows_aside_needs_work);
346 }
347 if (counters->flows_aside_pkt_inject) {
348 StatsAddUI64(tv, fw->cnt.flows_aside_pkt_inject,
349 (uint64_t)counters->flows_aside_pkt_inject);
350 }
351 }
352
FlowPruneFiles(Packet * p)353 static void FlowPruneFiles(Packet *p)
354 {
355 if (p->flow && p->flow->alstate) {
356 Flow *f = p->flow;
357 FileContainer *fc = AppLayerParserGetFiles(f,
358 PKT_IS_TOSERVER(p) ? STREAM_TOSERVER : STREAM_TOCLIENT);
359 if (fc != NULL) {
360 FilePrune(fc);
361 }
362 }
363 }
364
365 /** \brief update stream engine
366 *
367 * We can be called from both the flow timeout path as well as from the
368 * "real" traffic path. If in the timeout path any additional packets we
369 * forge for flushing pipelines should not leave our scope. If the original
370 * packet is real (or related to a real packet) we need to push the packets
371 * on, so IPS logic stays valid.
372 */
FlowWorkerStreamTCPUpdate(ThreadVars * tv,FlowWorkerThreadData * fw,Packet * p,void * detect_thread,const bool timeout)373 static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p,
374 void *detect_thread, const bool timeout)
375 {
376 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM);
377 StreamTcp(tv, p, fw->stream_thread, &fw->pq);
378 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM);
379
380 if (FlowChangeProto(p->flow)) {
381 StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq);
382 AppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TS);
383 AppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TC);
384 }
385
386 /* Packets here can safely access p->flow as it's locked */
387 SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
388 Packet *x;
389 while ((x = PacketDequeueNoLock(&fw->pq))) {
390 SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
391
392 if (detect_thread != NULL) {
393 FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT);
394 Detect(tv, x, detect_thread);
395 FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
396 }
397
398 OutputLoggerLog(tv, x, fw->output_thread);
399
400 if (timeout) {
401 PacketPoolReturnPacket(x);
402 } else {
403 /* put these packets in the preq queue so that they are
404 * by the other thread modules before packet 'p'. */
405 PacketEnqueueNoLock(&tv->decode_pq, x);
406 }
407 }
408 }
409
FlowWorkerFlowTimeout(ThreadVars * tv,Packet * p,FlowWorkerThreadData * fw,void * detect_thread)410 static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw,
411 void *detect_thread)
412 {
413 DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
414
415 SCLogDebug("packet %"PRIu64" is TCP. Direction %s", p->pcap_cnt, PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
416 DEBUG_VALIDATE_BUG_ON(!(p->flow && PKT_IS_TCP(p)));
417 DEBUG_ASSERT_FLOW_LOCKED(p->flow);
418
419 /* handle TCP and app layer */
420 FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, true);
421
422 PacketUpdateEngineEventCounters(tv, fw->dtv, p);
423
424 /* handle Detect */
425 SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
426 if (detect_thread != NULL) {
427 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
428 Detect(tv, p, detect_thread);
429 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
430 }
431
432 // Outputs.
433 OutputLoggerLog(tv, p, fw->output_thread);
434
435 /* Prune any stored files. */
436 FlowPruneFiles(p);
437
438 /* Release tcp segments. Done here after alerting can use them. */
439 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
440 StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
441 STREAM_TOSERVER : STREAM_TOCLIENT);
442 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
443
444 /* run tx cleanup last */
445 AppLayerParserTransactionsCleanup(p->flow);
446
447 FlowDeReference(&p->flow);
448 /* flow is unlocked later in FlowFinish() */
449 }
450
451 /** \internal
452 * \brief process flows injected into our queue by other threads
453 */
FlowWorkerProcessInjectedFlows(ThreadVars * tv,FlowWorkerThreadData * fw,Packet * p,void * detect_thread)454 static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
455 FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
456 {
457 /* take injected flows and append to our work queue */
458 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
459 FlowQueuePrivate injected = { NULL, NULL, 0 };
460 if (SC_ATOMIC_GET(tv->flow_queue->non_empty) == true)
461 injected = FlowQueueExtractPrivate(tv->flow_queue);
462 if (injected.len > 0) {
463 StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len);
464
465 FlowTimeoutCounters counters = { 0, 0, };
466 CheckWorkQueue(tv, fw, detect_thread, &counters, &injected);
467 UpdateCounters(tv, fw, &counters);
468 }
469 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
470 }
471
472 /** \internal
473 * \brief process flows set aside locally during flow lookup
474 */
FlowWorkerProcessLocalFlows(ThreadVars * tv,FlowWorkerThreadData * fw,Packet * p,void * detect_thread)475 static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv,
476 FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
477 {
478 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
479 if (fw->fls.work_queue.len) {
480 StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len);
481
482 FlowTimeoutCounters counters = { 0, 0, };
483 CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue);
484 UpdateCounters(tv, fw, &counters);
485 }
486 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
487 }
488
FlowWorker(ThreadVars * tv,Packet * p,void * data)489 static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
490 {
491 FlowWorkerThreadData *fw = data;
492 void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
493
494 DEBUG_VALIDATE_BUG_ON(p == NULL);
495 DEBUG_VALIDATE_BUG_ON(tv->flow_queue == NULL);
496
497 SCLogDebug("packet %"PRIu64, p->pcap_cnt);
498
499 /* update time */
500 if (!(PKT_IS_PSEUDOPKT(p))) {
501 TimeSetByThread(tv->id, &p->ts);
502 }
503
504 /* handle Flow */
505 if (p->flags & PKT_WANTS_FLOW) {
506 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
507
508 FlowHandlePacket(tv, &fw->fls, p);
509 if (likely(p->flow != NULL)) {
510 DEBUG_ASSERT_FLOW_LOCKED(p->flow);
511 if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
512 return TM_ECODE_OK;
513 }
514 }
515 /* Flow is now LOCKED */
516
517 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);
518
519 /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
520 * pseudo packet created by the flow manager. */
521 } else if (p->flags & PKT_HAS_FLOW) {
522 FLOWLOCK_WRLOCK(p->flow);
523 DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
524 }
525
526 SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");
527
528 /* handle TCP and app layer */
529 if (p->flow && PKT_IS_TCP(p)) {
530 SCLogDebug("packet %"PRIu64" is TCP. Direction %s", p->pcap_cnt, PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
531 DEBUG_ASSERT_FLOW_LOCKED(p->flow);
532
533 /* if detect is disabled, we need to apply file flags to the flow
534 * here on the first packet. */
535 if (detect_thread == NULL &&
536 ((PKT_IS_TOSERVER(p) && (p->flowflags & FLOW_PKT_TOSERVER_FIRST)) ||
537 (PKT_IS_TOCLIENT(p) && (p->flowflags & FLOW_PKT_TOCLIENT_FIRST))))
538 {
539 DisableDetectFlowFileFlags(p->flow);
540 }
541
542 FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, false);
543
544 /* handle the app layer part of the UDP packet payload */
545 } else if (p->flow && p->proto == IPPROTO_UDP) {
546 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
547 AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
548 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
549 }
550
551 PacketUpdateEngineEventCounters(tv, fw->dtv, p);
552
553 /* handle Detect */
554 DEBUG_ASSERT_FLOW_LOCKED(p->flow);
555 SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
556 if (detect_thread != NULL) {
557 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
558 Detect(tv, p, detect_thread);
559 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
560 }
561
562 // Outputs.
563 OutputLoggerLog(tv, p, fw->output_thread);
564
565 /* Prune any stored files. */
566 FlowPruneFiles(p);
567
568 /* Release tcp segments. Done here after alerting can use them. */
569 if (p->flow != NULL) {
570 DEBUG_ASSERT_FLOW_LOCKED(p->flow);
571
572 if (p->proto == IPPROTO_TCP) {
573 FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
574 StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
575 STREAM_TOSERVER : STREAM_TOCLIENT);
576 FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
577 }
578
579 /* run tx cleanup last */
580 AppLayerParserTransactionsCleanup(p->flow);
581
582 Flow *f = p->flow;
583 FlowDeReference(&p->flow);
584 FLOWLOCK_UNLOCK(f);
585 }
586
587 /* take injected flows and process them */
588 FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread);
589
590 /* process local work queue */
591 FlowWorkerProcessLocalFlows(tv, fw, p, detect_thread);
592
593 return TM_ECODE_OK;
594 }
595
FlowWorkerReplaceDetectCtx(void * flow_worker,void * detect_ctx)596 void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
597 {
598 FlowWorkerThreadData *fw = flow_worker;
599
600 SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
601 }
602
FlowWorkerGetDetectCtxPtr(void * flow_worker)603 void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
604 {
605 FlowWorkerThreadData *fw = flow_worker;
606
607 return SC_ATOMIC_GET(fw->detect_thread);
608 }
609
ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)610 const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
611 {
612 switch (fwi) {
613 case PROFILE_FLOWWORKER_FLOW:
614 return "flow";
615 case PROFILE_FLOWWORKER_STREAM:
616 return "stream";
617 case PROFILE_FLOWWORKER_APPLAYERUDP:
618 return "app-layer";
619 case PROFILE_FLOWWORKER_DETECT:
620 return "detect";
621 case PROFILE_FLOWWORKER_TCPPRUNE:
622 return "tcp-prune";
623 case PROFILE_FLOWWORKER_FLOW_INJECTED:
624 return "flow-inject";
625 case PROFILE_FLOWWORKER_FLOW_EVICTED:
626 return "flow-evict";
627 case PROFILE_FLOWWORKER_SIZE:
628 return "size";
629 }
630 return "error";
631 }
632
FlowWorkerExitPrintStats(ThreadVars * tv,void * data)633 static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data)
634 {
635 FlowWorkerThreadData *fw = data;
636 OutputLoggerExitPrintStats(tv, fw->output_thread);
637 }
638
TmModuleFlowWorkerRegister(void)639 void TmModuleFlowWorkerRegister (void)
640 {
641 tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
642 tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
643 tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
644 tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
645 tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
646 tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
647 tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM;
648 }
649