1 /* Copyright (C) 2016-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  *
23  * Flow Workers are single thread modules taking care of (almost)
24  * everything related to packets with flows:
25  *
26  * - Lookup/creation
27  * - Stream tracking, reassembly
28  * - Applayer update
29  * - Detection
30  *
31  * This all while holding the flow lock.
32  */
33 
34 #include "suricata-common.h"
35 #include "suricata.h"
36 
37 #include "decode.h"
38 #include "detect.h"
39 #include "stream-tcp.h"
40 #include "app-layer.h"
41 #include "detect-engine.h"
42 #include "output.h"
43 #include "app-layer-parser.h"
44 
45 #include "util-validate.h"
46 
47 #include "flow-util.h"
48 #include "flow-manager.h"
49 #include "flow-timeout.h"
50 #include "flow-spare-pool.h"
51 
52 typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr;
53 
54 typedef struct FlowTimeoutCounters {
55     uint32_t flows_aside_needs_work;
56     uint32_t flows_aside_pkt_inject;
57 } FlowTimeoutCounters;
58 
59 typedef struct FlowWorkerThreadData_ {
60     DecodeThreadVars *dtv;
61 
62     union {
63         StreamTcpThread *stream_thread;
64         void *stream_thread_ptr;
65     };
66 
67     SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
68 
69     void *output_thread; /* Output thread data. */
70     void *output_thread_flow; /* Output thread data. */
71 
72     uint16_t local_bypass_pkts;
73     uint16_t local_bypass_bytes;
74     uint16_t both_bypass_pkts;
75     uint16_t both_bypass_bytes;
76 
77     PacketQueueNoLock pq;
78     FlowLookupStruct fls;
79 
80     struct {
81         uint16_t flows_injected;
82         uint16_t flows_removed;
83         uint16_t flows_aside_needs_work;
84         uint16_t flows_aside_pkt_inject;
85     } cnt;
86 
87 } FlowWorkerThreadData;
88 
89 static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, void *detect_thread);
90 Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, TcpSession *ssn);
91 
92 /**
93  * \internal
94  * \brief Forces reassembly for flow if it needs it.
95  *
96  *        The function requires flow to be locked beforehand.
97  *
98  * \param f Pointer to the flow.
99  *
100  * \retval cnt number of packets injected
101  */
FlowFinish(ThreadVars * tv,Flow * f,FlowWorkerThreadData * fw,void * detect_thread)102 static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread)
103 {
104     Packet *p1 = NULL, *p2 = NULL;
105     const int server = f->ffr_tc;
106     const int client = f->ffr_ts;
107 
108     /* Get the tcp session for the flow */
109     TcpSession *ssn = (TcpSession *)f->protoctx;
110 
111     /* The packets we use are based on what segments in what direction are
112      * unprocessed.
113      * p1 if we have client segments for reassembly purpose only.  If we
114      * have no server segments p2 can be a toserver packet with dummy
115      * seq/ack, and if we have server segments p2 has to carry out reassembly
116      * for server segment as well, in which case we will also need a p3 in the
117      * toclient which is now dummy since all we need it for is detection */
118 
119     /* insert a pseudo packet in the toserver direction */
120     if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
121         p1 = FlowForceReassemblyPseudoPacketGet(0, f, ssn);
122         if (p1 == NULL) {
123             return 0;
124         }
125         PKT_SET_SRC(p1, PKT_SRC_FFR);
126 
127         if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
128             p2 = FlowForceReassemblyPseudoPacketGet(1, f, ssn);
129             if (p2 == NULL) {
130                 FlowDeReference(&p1->flow);
131                 TmqhOutputPacketpool(NULL, p1);
132                 return 0;
133             }
134             PKT_SET_SRC(p2, PKT_SRC_FFR);
135             p2->flowflags |= FLOW_PKT_LAST_PSEUDO;
136         } else {
137             p1->flowflags |= FLOW_PKT_LAST_PSEUDO;
138         }
139     } else {
140         if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
141             p1 = FlowForceReassemblyPseudoPacketGet(1, f, ssn);
142             if (p1 == NULL) {
143                 return 0;
144             }
145             PKT_SET_SRC(p1, PKT_SRC_FFR);
146             p1->flowflags |= FLOW_PKT_LAST_PSEUDO;
147         } else {
148             /* impossible */
149             BUG_ON(1);
150         }
151     }
152     f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
153 
154     FlowWorkerFlowTimeout(tv, p1, fw, detect_thread);
155     PacketPoolReturnPacket(p1);
156     if (p2) {
157         FlowWorkerFlowTimeout(tv, p2, fw, detect_thread);
158         PacketPoolReturnPacket(p2);
159         return 2;
160     }
161     return 1;
162 }
163 
CheckWorkQueue(ThreadVars * tv,FlowWorkerThreadData * fw,void * detect_thread,FlowTimeoutCounters * counters,FlowQueuePrivate * fq)164 static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
165         void *detect_thread, // TODO proper type?
166         FlowTimeoutCounters *counters,
167         FlowQueuePrivate *fq)
168 {
169     Flow *f;
170     while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
171         FLOWLOCK_WRLOCK(f);
172         f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg
173 
174         const FlowStateType state = f->flow_state;
175         if (f->proto == IPPROTO_TCP) {
176             if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
177 #ifdef CAPTURE_OFFLOAD
178                     state != FLOW_STATE_CAPTURE_BYPASSED &&
179 #endif
180                     state != FLOW_STATE_LOCAL_BYPASSED &&
181                     FlowForceReassemblyNeedReassembly(f) == 1 &&
182                     f->ffr != 0)
183             {
184                 int cnt = FlowFinish(tv, f, fw, detect_thread);
185                 counters->flows_aside_pkt_inject += cnt;
186                 counters->flows_aside_needs_work++;
187             }
188         }
189 
190         /* this should not be possible */
191         BUG_ON(f->use_cnt > 0);
192 
193         /* no one is referring to this flow, use_cnt 0, removed from hash
194          * so we can unlock it and pass it to the flow recycler */
195 
196         if (fw->output_thread_flow != NULL)
197             (void)OutputFlowLog(tv, fw->output_thread_flow, f);
198 
199         FlowClearMemory (f, f->protomap);
200         FLOWLOCK_UNLOCK(f);
201         if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
202             FlowSparePoolReturnFlow(f);
203         } else {
204             FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
205         }
206     }
207 }
208 
209 /** \brief handle flow for packet
210  *
211  *  Handle flow creation/lookup
212  */
FlowUpdate(ThreadVars * tv,FlowWorkerThreadData * fw,Packet * p)213 static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
214 {
215     FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);
216 
217     int state = p->flow->flow_state;
218     switch (state) {
219 #ifdef CAPTURE_OFFLOAD
220         case FLOW_STATE_CAPTURE_BYPASSED: {
221             StatsAddUI64(tv, fw->both_bypass_pkts, 1);
222             StatsAddUI64(tv, fw->both_bypass_bytes, GET_PKT_LEN(p));
223             Flow *f = p->flow;
224             FlowDeReference(&p->flow);
225             FLOWLOCK_UNLOCK(f);
226             return TM_ECODE_DONE;
227         }
228 #endif
229         case FLOW_STATE_LOCAL_BYPASSED: {
230             StatsAddUI64(tv, fw->local_bypass_pkts, 1);
231             StatsAddUI64(tv, fw->local_bypass_bytes, GET_PKT_LEN(p));
232             Flow *f = p->flow;
233             FlowDeReference(&p->flow);
234             FLOWLOCK_UNLOCK(f);
235             return TM_ECODE_DONE;
236         }
237         default:
238             return TM_ECODE_OK;
239     }
240 }
241 
242 static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
243 
FlowWorkerThreadInit(ThreadVars * tv,const void * initdata,void ** data)244 static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
245 {
246     FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
247     if (fw == NULL)
248         return TM_ECODE_FAILED;
249 
250     SC_ATOMIC_INITPTR(fw->detect_thread);
251     SC_ATOMIC_SET(fw->detect_thread, NULL);
252 
253     fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", tv);
254     fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", tv);
255     fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", tv);
256     fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", tv);
257 
258     fw->cnt.flows_aside_needs_work = StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", tv);
259     fw->cnt.flows_aside_pkt_inject = StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", tv);
260     fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", tv);
261     fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", tv);
262 
263     fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
264     if (fw->dtv == NULL) {
265         FlowWorkerThreadDeinit(tv, fw);
266         return TM_ECODE_FAILED;
267     }
268 
269     /* setup TCP */
270     if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) {
271         FlowWorkerThreadDeinit(tv, fw);
272         return TM_ECODE_FAILED;
273     }
274 
275     if (DetectEngineEnabled()) {
276         /* setup DETECT */
277         void *detect_thread = NULL;
278         if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
279             FlowWorkerThreadDeinit(tv, fw);
280             return TM_ECODE_FAILED;
281         }
282         SC_ATOMIC_SET(fw->detect_thread, detect_thread);
283     }
284 
285     /* Setup outputs for this thread. */
286     if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
287         FlowWorkerThreadDeinit(tv, fw);
288         return TM_ECODE_FAILED;
289     }
290     if (OutputFlowLogThreadInit(tv, NULL, &fw->output_thread_flow) != TM_ECODE_OK) {
291         SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
292         FlowWorkerThreadDeinit(tv, fw);
293         return TM_ECODE_FAILED;
294     }
295 
296     DecodeRegisterPerfCounters(fw->dtv, tv);
297     AppLayerRegisterThreadCounters(tv);
298 
299     /* setup pq for stream end pkts */
300     memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
301     *data = fw;
302     return TM_ECODE_OK;
303 }
304 
FlowWorkerThreadDeinit(ThreadVars * tv,void * data)305 static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
306 {
307     FlowWorkerThreadData *fw = data;
308 
309     DecodeThreadVarsFree(tv, fw->dtv);
310 
311     /* free TCP */
312     StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
313 
314     /* free DETECT */
315     void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
316     if (detect_thread != NULL) {
317         DetectEngineThreadCtxDeinit(tv, detect_thread);
318         SC_ATOMIC_SET(fw->detect_thread, NULL);
319     }
320 
321     /* Free output. */
322     OutputLoggerThreadDeinit(tv, fw->output_thread);
323     OutputFlowLogThreadDeinit(tv, fw->output_thread_flow);
324 
325     /* free pq */
326     BUG_ON(fw->pq.len);
327 
328     Flow *f;
329     while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) {
330         FlowFree(f);
331     }
332 
333     SCFree(fw);
334     return TM_ECODE_OK;
335 }
336 
337 TmEcode Detect(ThreadVars *tv, Packet *p, void *data);
338 TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *pq);
339 
UpdateCounters(ThreadVars * tv,FlowWorkerThreadData * fw,const FlowTimeoutCounters * counters)340 static inline void UpdateCounters(ThreadVars *tv,
341         FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters)
342 {
343     if (counters->flows_aside_needs_work) {
344         StatsAddUI64(tv, fw->cnt.flows_aside_needs_work,
345                 (uint64_t)counters->flows_aside_needs_work);
346     }
347     if (counters->flows_aside_pkt_inject) {
348         StatsAddUI64(tv, fw->cnt.flows_aside_pkt_inject,
349                 (uint64_t)counters->flows_aside_pkt_inject);
350     }
351 }
352 
FlowPruneFiles(Packet * p)353 static void FlowPruneFiles(Packet *p)
354 {
355     if (p->flow && p->flow->alstate) {
356         Flow *f = p->flow;
357         FileContainer *fc = AppLayerParserGetFiles(f,
358                 PKT_IS_TOSERVER(p) ? STREAM_TOSERVER : STREAM_TOCLIENT);
359         if (fc != NULL) {
360             FilePrune(fc);
361         }
362     }
363 }
364 
365 /** \brief update stream engine
366  *
367  *  We can be called from both the flow timeout path as well as from the
368  *  "real" traffic path. If in the timeout path any additional packets we
369  *  forge for flushing pipelines should not leave our scope. If the original
370  *  packet is real (or related to a real packet) we need to push the packets
371  *  on, so IPS logic stays valid.
372  */
FlowWorkerStreamTCPUpdate(ThreadVars * tv,FlowWorkerThreadData * fw,Packet * p,void * detect_thread,const bool timeout)373 static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p,
374         void *detect_thread, const bool timeout)
375 {
376     FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM);
377     StreamTcp(tv, p, fw->stream_thread, &fw->pq);
378     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM);
379 
380     if (FlowChangeProto(p->flow)) {
381         StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq);
382         AppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TS);
383         AppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TC);
384     }
385 
386     /* Packets here can safely access p->flow as it's locked */
387     SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
388     Packet *x;
389     while ((x = PacketDequeueNoLock(&fw->pq))) {
390         SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
391 
392         if (detect_thread != NULL) {
393             FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT);
394             Detect(tv, x, detect_thread);
395             FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
396         }
397 
398         OutputLoggerLog(tv, x, fw->output_thread);
399 
400         if (timeout) {
401             PacketPoolReturnPacket(x);
402         } else {
403             /* put these packets in the preq queue so that they are
404              * by the other thread modules before packet 'p'. */
405             PacketEnqueueNoLock(&tv->decode_pq, x);
406         }
407     }
408 }
409 
FlowWorkerFlowTimeout(ThreadVars * tv,Packet * p,FlowWorkerThreadData * fw,void * detect_thread)410 static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw,
411         void *detect_thread)
412 {
413     DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
414 
415     SCLogDebug("packet %"PRIu64" is TCP. Direction %s", p->pcap_cnt, PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
416     DEBUG_VALIDATE_BUG_ON(!(p->flow && PKT_IS_TCP(p)));
417     DEBUG_ASSERT_FLOW_LOCKED(p->flow);
418 
419     /* handle TCP and app layer */
420     FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, true);
421 
422     PacketUpdateEngineEventCounters(tv, fw->dtv, p);
423 
424     /* handle Detect */
425     SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
426     if (detect_thread != NULL) {
427         FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
428         Detect(tv, p, detect_thread);
429         FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
430     }
431 
432     // Outputs.
433     OutputLoggerLog(tv, p, fw->output_thread);
434 
435     /* Prune any stored files. */
436     FlowPruneFiles(p);
437 
438     /*  Release tcp segments. Done here after alerting can use them. */
439     FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
440     StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
441             STREAM_TOSERVER : STREAM_TOCLIENT);
442     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
443 
444     /* run tx cleanup last */
445     AppLayerParserTransactionsCleanup(p->flow);
446 
447     FlowDeReference(&p->flow);
448     /* flow is unlocked later in FlowFinish() */
449 }
450 
451 /** \internal
452  *  \brief process flows injected into our queue by other threads
453  */
FlowWorkerProcessInjectedFlows(ThreadVars * tv,FlowWorkerThreadData * fw,Packet * p,void * detect_thread)454 static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
455         FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
456 {
457     /* take injected flows and append to our work queue */
458     FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
459     FlowQueuePrivate injected = { NULL, NULL, 0 };
460     if (SC_ATOMIC_GET(tv->flow_queue->non_empty) == true)
461         injected = FlowQueueExtractPrivate(tv->flow_queue);
462     if (injected.len > 0) {
463         StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len);
464 
465         FlowTimeoutCounters counters = { 0, 0, };
466         CheckWorkQueue(tv, fw, detect_thread, &counters, &injected);
467         UpdateCounters(tv, fw, &counters);
468     }
469     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
470 }
471 
472 /** \internal
473  *  \brief process flows set aside locally during flow lookup
474  */
FlowWorkerProcessLocalFlows(ThreadVars * tv,FlowWorkerThreadData * fw,Packet * p,void * detect_thread)475 static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv,
476         FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
477 {
478     FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
479     if (fw->fls.work_queue.len) {
480         StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len);
481 
482         FlowTimeoutCounters counters = { 0, 0, };
483         CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue);
484         UpdateCounters(tv, fw, &counters);
485     }
486     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
487 }
488 
FlowWorker(ThreadVars * tv,Packet * p,void * data)489 static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
490 {
491     FlowWorkerThreadData *fw = data;
492     void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
493 
494     DEBUG_VALIDATE_BUG_ON(p == NULL);
495     DEBUG_VALIDATE_BUG_ON(tv->flow_queue == NULL);
496 
497     SCLogDebug("packet %"PRIu64, p->pcap_cnt);
498 
499     /* update time */
500     if (!(PKT_IS_PSEUDOPKT(p))) {
501         TimeSetByThread(tv->id, &p->ts);
502     }
503 
504     /* handle Flow */
505     if (p->flags & PKT_WANTS_FLOW) {
506         FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
507 
508         FlowHandlePacket(tv, &fw->fls, p);
509         if (likely(p->flow != NULL)) {
510             DEBUG_ASSERT_FLOW_LOCKED(p->flow);
511             if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
512                 return TM_ECODE_OK;
513             }
514         }
515         /* Flow is now LOCKED */
516 
517         FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);
518 
519     /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
520      * pseudo packet created by the flow manager. */
521     } else if (p->flags & PKT_HAS_FLOW) {
522         FLOWLOCK_WRLOCK(p->flow);
523         DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
524     }
525 
526     SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");
527 
528     /* handle TCP and app layer */
529     if (p->flow && PKT_IS_TCP(p)) {
530         SCLogDebug("packet %"PRIu64" is TCP. Direction %s", p->pcap_cnt, PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
531         DEBUG_ASSERT_FLOW_LOCKED(p->flow);
532 
533         /* if detect is disabled, we need to apply file flags to the flow
534          * here on the first packet. */
535         if (detect_thread == NULL &&
536                 ((PKT_IS_TOSERVER(p) && (p->flowflags & FLOW_PKT_TOSERVER_FIRST)) ||
537                  (PKT_IS_TOCLIENT(p) && (p->flowflags & FLOW_PKT_TOCLIENT_FIRST))))
538         {
539             DisableDetectFlowFileFlags(p->flow);
540         }
541 
542         FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, false);
543 
544         /* handle the app layer part of the UDP packet payload */
545     } else if (p->flow && p->proto == IPPROTO_UDP) {
546         FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
547         AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
548         FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
549     }
550 
551     PacketUpdateEngineEventCounters(tv, fw->dtv, p);
552 
553     /* handle Detect */
554     DEBUG_ASSERT_FLOW_LOCKED(p->flow);
555     SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
556     if (detect_thread != NULL) {
557         FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
558         Detect(tv, p, detect_thread);
559         FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
560     }
561 
562     // Outputs.
563     OutputLoggerLog(tv, p, fw->output_thread);
564 
565     /* Prune any stored files. */
566     FlowPruneFiles(p);
567 
568     /*  Release tcp segments. Done here after alerting can use them. */
569     if (p->flow != NULL) {
570         DEBUG_ASSERT_FLOW_LOCKED(p->flow);
571 
572         if (p->proto == IPPROTO_TCP) {
573             FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
574             StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
575                     STREAM_TOSERVER : STREAM_TOCLIENT);
576             FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
577         }
578 
579         /* run tx cleanup last */
580         AppLayerParserTransactionsCleanup(p->flow);
581 
582         Flow *f = p->flow;
583         FlowDeReference(&p->flow);
584         FLOWLOCK_UNLOCK(f);
585     }
586 
587     /* take injected flows and process them */
588     FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread);
589 
590     /* process local work queue */
591     FlowWorkerProcessLocalFlows(tv, fw, p, detect_thread);
592 
593     return TM_ECODE_OK;
594 }
595 
FlowWorkerReplaceDetectCtx(void * flow_worker,void * detect_ctx)596 void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
597 {
598     FlowWorkerThreadData *fw = flow_worker;
599 
600     SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
601 }
602 
FlowWorkerGetDetectCtxPtr(void * flow_worker)603 void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
604 {
605     FlowWorkerThreadData *fw = flow_worker;
606 
607     return SC_ATOMIC_GET(fw->detect_thread);
608 }
609 
ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)610 const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
611 {
612     switch (fwi) {
613         case PROFILE_FLOWWORKER_FLOW:
614             return "flow";
615         case PROFILE_FLOWWORKER_STREAM:
616             return "stream";
617         case PROFILE_FLOWWORKER_APPLAYERUDP:
618             return "app-layer";
619         case PROFILE_FLOWWORKER_DETECT:
620             return "detect";
621         case PROFILE_FLOWWORKER_TCPPRUNE:
622             return "tcp-prune";
623         case PROFILE_FLOWWORKER_FLOW_INJECTED:
624             return "flow-inject";
625         case PROFILE_FLOWWORKER_FLOW_EVICTED:
626             return "flow-evict";
627         case PROFILE_FLOWWORKER_SIZE:
628             return "size";
629     }
630     return "error";
631 }
632 
FlowWorkerExitPrintStats(ThreadVars * tv,void * data)633 static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data)
634 {
635     FlowWorkerThreadData *fw = data;
636     OutputLoggerExitPrintStats(tv, fw->output_thread);
637 }
638 
TmModuleFlowWorkerRegister(void)639 void TmModuleFlowWorkerRegister (void)
640 {
641     tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
642     tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
643     tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
644     tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
645     tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
646     tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
647     tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM;
648 }
649