1 /* Copyright (C) 2007-2013 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  *  \file
20  *
21  *  \author Victor Julien <victor@inliniac.net>
22  *
23  *  Flow implementation.
24  */
25 
26 #include "suricata-common.h"
27 #include "suricata.h"
28 #include "decode.h"
29 #include "conf.h"
30 #include "threadvars.h"
31 #include "tm-threads.h"
32 #include "runmodes.h"
33 
34 #include "util-random.h"
35 #include "util-time.h"
36 
37 #include "flow.h"
38 #include "flow-queue.h"
39 #include "flow-hash.h"
40 #include "flow-util.h"
41 #include "flow-var.h"
42 #include "flow-private.h"
43 #include "flow-timeout.h"
44 #include "flow-manager.h"
45 #include "flow-storage.h"
46 #include "flow-bypass.h"
47 #include "flow-spare-pool.h"
48 
49 #include "stream-tcp-private.h"
50 #include "stream-tcp-reassemble.h"
51 #include "stream-tcp.h"
52 
53 #include "util-unittest.h"
54 #include "util-unittest-helper.h"
55 #include "util-byte.h"
56 #include "util-misc.h"
57 
58 #include "util-debug.h"
59 #include "util-privs.h"
60 #include "util-validate.h"
61 
62 #include "detect.h"
63 #include "detect-engine-state.h"
64 #include "stream.h"
65 
66 #include "app-layer-parser.h"
67 #include "app-layer-expectation.h"
68 
69 #define FLOW_DEFAULT_EMERGENCY_RECOVERY 30
70 
71 //#define FLOW_DEFAULT_HASHSIZE    262144
72 #define FLOW_DEFAULT_HASHSIZE    65536
73 //#define FLOW_DEFAULT_MEMCAP      128 * 1024 * 1024 /* 128 MB */
74 #define FLOW_DEFAULT_MEMCAP      (32 * 1024 * 1024) /* 32 MB */
75 
76 #define FLOW_DEFAULT_PREALLOC    10000
77 
78 SC_ATOMIC_DECLARE(FlowProtoTimeoutPtr, flow_timeouts);
79 
80 /** atomic int that is used when freeing a flow from the hash. In this
81  *  case we walk the hash to find a flow to free. This var records where
82  *  we left off in the hash. Without this only the top rows of the hash
83  *  are freed. This isn't just about fairness. Under severe presure, the
84  *  hash rows on top would be all freed and the time to find a flow to
85  *  free increased with every run. */
86 SC_ATOMIC_DECLARE(unsigned int, flow_prune_idx);
87 
88 /** atomic flags */
89 SC_ATOMIC_DECLARE(unsigned int, flow_flags);
90 
91 /** FlowProto specific timeouts and free/state functions */
92 
93 FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX];
94 FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX];
95 FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
96 FlowProtoFreeFunc flow_freefuncs[FLOW_PROTO_MAX];
97 
98 FlowConfig flow_config;
99 
100 /** flow memuse counter (atomic), for enforcing memcap limit */
101 SC_ATOMIC_DECLARE(uint64_t, flow_memuse);
102 
103 void FlowRegisterTests(void);
104 void FlowInitFlowProto(void);
105 int FlowSetProtoFreeFunc(uint8_t, void (*Free)(void *));
106 
107 /* Run mode selected at suricata.c */
108 extern int run_mode;
109 
110 /**
111  *  \brief Update memcap value
112  *
113  *  \param size new memcap value
114  */
FlowSetMemcap(uint64_t size)115 int FlowSetMemcap(uint64_t size)
116 {
117     if ((uint64_t)SC_ATOMIC_GET(flow_memuse) < size) {
118         SC_ATOMIC_SET(flow_config.memcap, size);
119         return 1;
120     }
121 
122     return 0;
123 }
124 
125 /**
126  *  \brief Return memcap value
127  *
128  *  \retval memcap value
129  */
FlowGetMemcap(void)130 uint64_t FlowGetMemcap(void)
131 {
132     uint64_t memcapcopy = SC_ATOMIC_GET(flow_config.memcap);
133     return memcapcopy;
134 }
135 
FlowGetMemuse(void)136 uint64_t FlowGetMemuse(void)
137 {
138     uint64_t memusecopy = SC_ATOMIC_GET(flow_memuse);
139     return memusecopy;
140 }
141 
FlowCleanupAppLayer(Flow * f)142 void FlowCleanupAppLayer(Flow *f)
143 {
144     if (f == NULL || f->proto == 0)
145         return;
146 
147     AppLayerParserStateCleanup(f, f->alstate, f->alparser);
148     f->alstate = NULL;
149     f->alparser = NULL;
150     return;
151 }
152 
153 /** \brief Set the IPOnly scanned flag for 'direction'.
154   *
155   * \param f Flow to set the flag in
156   * \param direction direction to set the flag in
157   */
FlowSetIPOnlyFlag(Flow * f,int direction)158 void FlowSetIPOnlyFlag(Flow *f, int direction)
159 {
160     direction ? (f->flags |= FLOW_TOSERVER_IPONLY_SET) :
161         (f->flags |= FLOW_TOCLIENT_IPONLY_SET);
162     return;
163 }
164 
165 /** \brief Set flag to indicate that flow has alerts
166  *
167  * \param f flow
168  */
FlowSetHasAlertsFlag(Flow * f)169 void FlowSetHasAlertsFlag(Flow *f)
170 {
171     f->flags |= FLOW_HAS_ALERTS;
172 }
173 
174 /** \brief Check if flow has alerts
175  *
176  * \param f flow
177  * \retval 1 has alerts
178  * \retval 0 has not alerts
179  */
FlowHasAlerts(const Flow * f)180 int FlowHasAlerts(const Flow *f)
181 {
182     if (f->flags & FLOW_HAS_ALERTS) {
183         return 1;
184     }
185 
186     return 0;
187 }
188 
189 /** \brief Set flag to indicate to change proto for the flow
190  *
191  * \param f flow
192  */
FlowSetChangeProtoFlag(Flow * f)193 void FlowSetChangeProtoFlag(Flow *f)
194 {
195     f->flags |= FLOW_CHANGE_PROTO;
196 }
197 
198 /** \brief Unset flag to indicate to change proto for the flow
199  *
200  * \param f flow
201  */
FlowUnsetChangeProtoFlag(Flow * f)202 void FlowUnsetChangeProtoFlag(Flow *f)
203 {
204     f->flags &= ~FLOW_CHANGE_PROTO;
205 }
206 
207 /** \brief Check if change proto flag is set for flow
208  * \param f flow
209  * \retval 1 change proto flag is set
210  * \retval 0 change proto flag is not set
211  */
FlowChangeProto(Flow * f)212 int FlowChangeProto(Flow *f)
213 {
214     if (f->flags & FLOW_CHANGE_PROTO) {
215         return 1;
216     }
217 
218     return 0;
219 }
220 
FlowSwapFlags(Flow * f)221 static inline void FlowSwapFlags(Flow *f)
222 {
223     SWAP_FLAGS(f->flags, FLOW_TO_SRC_SEEN, FLOW_TO_DST_SEEN);
224     SWAP_FLAGS(f->flags, FLOW_TOSERVER_IPONLY_SET, FLOW_TOCLIENT_IPONLY_SET);
225     SWAP_FLAGS(f->flags, FLOW_SGH_TOSERVER, FLOW_SGH_TOCLIENT);
226 
227     SWAP_FLAGS(f->flags, FLOW_TOSERVER_DROP_LOGGED, FLOW_TOCLIENT_DROP_LOGGED);
228     SWAP_FLAGS(f->flags, FLOW_TS_PM_ALPROTO_DETECT_DONE, FLOW_TC_PM_ALPROTO_DETECT_DONE);
229     SWAP_FLAGS(f->flags, FLOW_TS_PP_ALPROTO_DETECT_DONE, FLOW_TC_PP_ALPROTO_DETECT_DONE);
230     SWAP_FLAGS(f->flags, FLOW_TS_PE_ALPROTO_DETECT_DONE, FLOW_TC_PE_ALPROTO_DETECT_DONE);
231 
232     SWAP_FLAGS(f->flags, FLOW_PROTO_DETECT_TS_DONE, FLOW_PROTO_DETECT_TC_DONE);
233 }
234 
FlowSwapFileFlags(Flow * f)235 static inline void FlowSwapFileFlags(Flow *f)
236 {
237     SWAP_FLAGS(f->file_flags, FLOWFILE_NO_MAGIC_TS, FLOWFILE_NO_MAGIC_TC);
238     SWAP_FLAGS(f->file_flags, FLOWFILE_NO_MAGIC_TS, FLOWFILE_NO_MAGIC_TC);
239     SWAP_FLAGS(f->file_flags, FLOWFILE_NO_MAGIC_TS, FLOWFILE_NO_MAGIC_TC);
240     SWAP_FLAGS(f->file_flags, FLOWFILE_NO_MAGIC_TS, FLOWFILE_NO_MAGIC_TC);
241 }
242 
TcpStreamFlowSwap(Flow * f)243 static inline void TcpStreamFlowSwap(Flow *f)
244 {
245     TcpSession *ssn = f->protoctx;
246     SWAP_VARS(TcpStream, ssn->server, ssn->client);
247     if (ssn->data_first_seen_dir & STREAM_TOSERVER) {
248         ssn->data_first_seen_dir = STREAM_TOCLIENT;
249     } else if (ssn->data_first_seen_dir & STREAM_TOCLIENT) {
250         ssn->data_first_seen_dir = STREAM_TOSERVER;
251     }
252 }
253 
254 /** \brief swap the flow's direction
255  *  \note leaves the 'header' untouched. Interpret that based
256  *        on FLOW_DIR_REVERSED flag.
257  *  \warning: only valid before applayer parsing started. This
258  *            function doesn't swap anything in Flow::alparser,
259  *            Flow::alstate
260  */
FlowSwap(Flow * f)261 void FlowSwap(Flow *f)
262 {
263     f->flags |= FLOW_DIR_REVERSED;
264 
265     SWAP_VARS(uint32_t, f->probing_parser_toserver_alproto_masks,
266                    f->probing_parser_toclient_alproto_masks);
267 
268     FlowSwapFlags(f);
269     FlowSwapFileFlags(f);
270 
271     if (f->proto == IPPROTO_TCP) {
272         TcpStreamFlowSwap(f);
273     }
274 
275     SWAP_VARS(AppProto, f->alproto_ts, f->alproto_tc);
276     SWAP_VARS(uint8_t, f->min_ttl_toserver, f->max_ttl_toserver);
277     SWAP_VARS(uint8_t, f->min_ttl_toclient, f->max_ttl_toclient);
278 
279     /* not touching Flow::alparser and Flow::alstate */
280 
281     SWAP_VARS(const void *, f->sgh_toclient, f->sgh_toserver);
282 
283     SWAP_VARS(uint32_t, f->todstpktcnt, f->tosrcpktcnt);
284     SWAP_VARS(uint64_t, f->todstbytecnt, f->tosrcbytecnt);
285 }
286 
287 /**
288  *  \brief determine the direction of the packet compared to the flow
289  *  \retval 0 to_server
290  *  \retval 1 to_client
291  */
FlowGetPacketDirection(const Flow * f,const Packet * p)292 int FlowGetPacketDirection(const Flow *f, const Packet *p)
293 {
294     const int reverse = (f->flags & FLOW_DIR_REVERSED) != 0;
295 
296     if (p->proto == IPPROTO_TCP || p->proto == IPPROTO_UDP || p->proto == IPPROTO_SCTP) {
297         if (!(CMP_PORT(p->sp,p->dp))) {
298             /* update flags and counters */
299             if (CMP_PORT(f->sp,p->sp)) {
300                 return TOSERVER ^ reverse;
301             } else {
302                 return TOCLIENT ^ reverse;
303             }
304         } else {
305             if (CMP_ADDR(&f->src,&p->src)) {
306                 return TOSERVER ^ reverse;
307             } else {
308                 return TOCLIENT ^ reverse;
309             }
310         }
311     } else if (p->proto == IPPROTO_ICMP || p->proto == IPPROTO_ICMPV6) {
312         if (CMP_ADDR(&f->src,&p->src)) {
313             return TOSERVER  ^ reverse;
314         } else {
315             return TOCLIENT ^ reverse;
316         }
317     }
318 
319     /* default to toserver */
320     return TOSERVER;
321 }
322 
323 /**
324  *  \brief Check to update "seen" flags
325  *
326  *  \param p packet
327  *
328  *  \retval 1 true
329  *  \retval 0 false
330  */
FlowUpdateSeenFlag(const Packet * p)331 static inline int FlowUpdateSeenFlag(const Packet *p)
332 {
333     if (PKT_IS_ICMPV4(p)) {
334         if (ICMPV4_IS_ERROR_MSG(p)) {
335             return 0;
336         }
337     }
338 
339     return 1;
340 }
341 
FlowUpdateTTL(Flow * f,Packet * p,uint8_t ttl)342 static inline void FlowUpdateTTL(Flow *f, Packet *p, uint8_t ttl)
343 {
344     if (FlowGetPacketDirection(f, p) == TOSERVER) {
345         if (f->min_ttl_toserver == 0) {
346             f->min_ttl_toserver = ttl;
347         } else {
348             f->min_ttl_toserver = MIN(f->min_ttl_toserver, ttl);
349         }
350         f->max_ttl_toserver = MAX(f->max_ttl_toserver, ttl);
351     } else {
352         if (f->min_ttl_toclient == 0) {
353             f->min_ttl_toclient = ttl;
354         } else {
355             f->min_ttl_toclient = MIN(f->min_ttl_toclient, ttl);
356         }
357         f->max_ttl_toclient = MAX(f->max_ttl_toclient, ttl);
358     }
359 }
360 
FlowUpdateEthernet(ThreadVars * tv,DecodeThreadVars * dtv,Flow * f,EthernetHdr * ethh,bool toserver)361 static inline void FlowUpdateEthernet(ThreadVars *tv, DecodeThreadVars *dtv,
362                                       Flow *f, EthernetHdr *ethh, bool toserver)
363 {
364     if (ethh && MacSetFlowStorageEnabled()) {
365         MacSet *ms = FlowGetStorageById(f, MacSetGetFlowStorageID());
366         if (ms != NULL) {
367             if (toserver) {
368                 MacSetAddWithCtr(ms, ethh->eth_src, ethh->eth_dst, tv,
369                                  dtv->counter_max_mac_addrs_src,
370                                  dtv->counter_max_mac_addrs_dst);
371             } else {
372                 MacSetAddWithCtr(ms, ethh->eth_dst, ethh->eth_src, tv,
373                                  dtv->counter_max_mac_addrs_dst,
374                                  dtv->counter_max_mac_addrs_src);
375             }
376         }
377     }
378 }
379 
380 /** \brief Update Packet and Flow
381  *
382  *  Updates packet and flow based on the new packet.
383  *
384  *  \param f locked flow
385  *  \param p packet
386  *
387  *  \note overwrites p::flowflags
388  */
FlowHandlePacketUpdate(Flow * f,Packet * p,ThreadVars * tv,DecodeThreadVars * dtv)389 void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars *dtv)
390 {
391     SCLogDebug("packet %"PRIu64" -- flow %p", p->pcap_cnt, f);
392 
393 #ifdef CAPTURE_OFFLOAD
394     int state = f->flow_state;
395 
396     if (state != FLOW_STATE_CAPTURE_BYPASSED) {
397 #endif
398         /* update the last seen timestamp of this flow */
399         if (timercmp(&p->ts, &f->lastts, >)) {
400             COPY_TIMESTAMP(&p->ts, &f->lastts);
401             const uint32_t timeout_at = (uint32_t)f->lastts.tv_sec + f->timeout_policy;
402             if (timeout_at != f->timeout_at) {
403                 f->timeout_at = timeout_at;
404             }
405         }
406 #ifdef CAPTURE_OFFLOAD
407     } else {
408         /* still seeing packet, we downgrade to local bypass */
409         if (p->ts.tv_sec - f->lastts.tv_sec > FLOW_BYPASSED_TIMEOUT / 2) {
410             SCLogDebug("Downgrading flow to local bypass");
411             COPY_TIMESTAMP(&p->ts, &f->lastts);
412             FlowUpdateState(f, FLOW_STATE_LOCAL_BYPASSED);
413         } else {
414             /* In IPS mode the packet could come from the other interface so it would
415              * need to be bypassed */
416             if (EngineModeIsIPS()) {
417                 BypassedFlowUpdate(f, p);
418             }
419         }
420     }
421 #endif
422     /* update flags and counters */
423     if (FlowGetPacketDirection(f, p) == TOSERVER) {
424         f->todstpktcnt++;
425         f->todstbytecnt += GET_PKT_LEN(p);
426         p->flowflags = FLOW_PKT_TOSERVER;
427         if (!(f->flags & FLOW_TO_DST_SEEN)) {
428             if (FlowUpdateSeenFlag(p)) {
429                 f->flags |= FLOW_TO_DST_SEEN;
430                 p->flowflags |= FLOW_PKT_TOSERVER_FIRST;
431             }
432         }
433         /* xfer proto detect ts flag to first packet in ts dir */
434         if (f->flags & FLOW_PROTO_DETECT_TS_DONE) {
435             f->flags &= ~FLOW_PROTO_DETECT_TS_DONE;
436             p->flags |= PKT_PROTO_DETECT_TS_DONE;
437         }
438         FlowUpdateEthernet(tv, dtv, f, p->ethh, true);
439     } else {
440         f->tosrcpktcnt++;
441         f->tosrcbytecnt += GET_PKT_LEN(p);
442         p->flowflags = FLOW_PKT_TOCLIENT;
443         if (!(f->flags & FLOW_TO_SRC_SEEN)) {
444             if (FlowUpdateSeenFlag(p)) {
445                 f->flags |= FLOW_TO_SRC_SEEN;
446                 p->flowflags |= FLOW_PKT_TOCLIENT_FIRST;
447             }
448         }
449         /* xfer proto detect tc flag to first packet in tc dir */
450         if (f->flags & FLOW_PROTO_DETECT_TC_DONE) {
451             f->flags &= ~FLOW_PROTO_DETECT_TC_DONE;
452             p->flags |= PKT_PROTO_DETECT_TC_DONE;
453         }
454         FlowUpdateEthernet(tv, dtv, f, p->ethh, false);
455     }
456 
457     if (f->flow_state == FLOW_STATE_ESTABLISHED) {
458         SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
459         p->flowflags |= FLOW_PKT_ESTABLISHED;
460 
461     } else if (f->proto == IPPROTO_TCP) {
462         TcpSession *ssn = (TcpSession *)f->protoctx;
463         if (ssn != NULL && ssn->state >= TCP_ESTABLISHED) {
464             p->flowflags |= FLOW_PKT_ESTABLISHED;
465         }
466     } else if ((f->flags & (FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) ==
467             (FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) {
468         SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
469         p->flowflags |= FLOW_PKT_ESTABLISHED;
470 
471         FlowUpdateState(f, FLOW_STATE_ESTABLISHED);
472     }
473 
474     /*set the detection bypass flags*/
475     if (f->flags & FLOW_NOPACKET_INSPECTION) {
476         SCLogDebug("setting FLOW_NOPACKET_INSPECTION flag on flow %p", f);
477         DecodeSetNoPacketInspectionFlag(p);
478     }
479     if (f->flags & FLOW_NOPAYLOAD_INSPECTION) {
480         SCLogDebug("setting FLOW_NOPAYLOAD_INSPECTION flag on flow %p", f);
481         DecodeSetNoPayloadInspectionFlag(p);
482     }
483 
484     /* update flow's ttl fields if needed */
485     if (PKT_IS_IPV4(p)) {
486         FlowUpdateTTL(f, p, IPV4_GET_IPTTL(p));
487     } else if (PKT_IS_IPV6(p)) {
488         FlowUpdateTTL(f, p, IPV6_GET_HLIM(p));
489     }
490 }
491 
492 /** \brief Entry point for packet flow handling
493  *
494  * This is called for every packet.
495  *
496  *  \param tv threadvars
497  *  \param dtv decode thread vars (for flow output api thread data)
498  *  \param p packet to handle flow for
499  */
FlowHandlePacket(ThreadVars * tv,FlowLookupStruct * fls,Packet * p)500 void FlowHandlePacket(ThreadVars *tv, FlowLookupStruct *fls, Packet *p)
501 {
502     /* Get this packet's flow from the hash. FlowHandlePacket() will setup
503      * a new flow if nescesary. If we get NULL, we're out of flow memory.
504      * The returned flow is locked. */
505     Flow *f = FlowGetFlowFromHash(tv, fls, p, &p->flow);
506     if (f == NULL)
507         return;
508 
509     /* set the flow in the packet */
510     p->flags |= PKT_HAS_FLOW;
511     return;
512 }
513 
514 /** \brief initialize the configuration
515  *  \warning Not thread safe */
FlowInitConfig(char quiet)516 void FlowInitConfig(char quiet)
517 {
518     SCLogDebug("initializing flow engine...");
519 
520     memset(&flow_config,  0, sizeof(flow_config));
521     SC_ATOMIC_INIT(flow_flags);
522     SC_ATOMIC_INIT(flow_memuse);
523     SC_ATOMIC_INIT(flow_prune_idx);
524     SC_ATOMIC_INIT(flow_config.memcap);
525     FlowQueueInit(&flow_recycle_q);
526 
527     /* set defaults */
528     flow_config.hash_rand   = (uint32_t)RandomGet();
529     flow_config.hash_size   = FLOW_DEFAULT_HASHSIZE;
530     flow_config.prealloc    = FLOW_DEFAULT_PREALLOC;
531     SC_ATOMIC_SET(flow_config.memcap, FLOW_DEFAULT_MEMCAP);
532 
533     /* If we have specific config, overwrite the defaults with them,
534      * otherwise, leave the default values */
535     intmax_t val = 0;
536     if (ConfGetInt("flow.emergency-recovery", &val) == 1) {
537         if (val <= 100 && val >= 1) {
538             flow_config.emergency_recovery = (uint8_t)val;
539         } else {
540             SCLogError(SC_ERR_INVALID_VALUE, "flow.emergency-recovery must be in the range of "
541                                              "1 and 100 (as percentage)");
542             flow_config.emergency_recovery = FLOW_DEFAULT_EMERGENCY_RECOVERY;
543         }
544     } else {
545         SCLogDebug("flow.emergency-recovery, using default value");
546         flow_config.emergency_recovery = FLOW_DEFAULT_EMERGENCY_RECOVERY;
547     }
548 
549     /* Check if we have memcap and hash_size defined at config */
550     const char *conf_val;
551     uint32_t configval = 0;
552 
553     /** set config values for memcap, prealloc and hash_size */
554     uint64_t flow_memcap_copy = 0;
555     if ((ConfGet("flow.memcap", &conf_val)) == 1)
556     {
557         if (conf_val == NULL) {
558             FatalError(SC_ERR_FATAL, "Invalid value for flow.memcap: NULL");
559         }
560 
561         if (ParseSizeStringU64(conf_val, &flow_memcap_copy) < 0) {
562             SCLogError(SC_ERR_SIZE_PARSE, "Error parsing flow.memcap "
563                        "from conf file - %s.  Killing engine",
564                        conf_val);
565             exit(EXIT_FAILURE);
566         } else {
567             SC_ATOMIC_SET(flow_config.memcap, flow_memcap_copy);
568         }
569     }
570     if ((ConfGet("flow.hash-size", &conf_val)) == 1)
571     {
572         if (conf_val == NULL) {
573             FatalError(SC_ERR_FATAL, "Invalid value for flow.hash-size: NULL");
574         }
575 
576         if (StringParseUint32(&configval, 10, strlen(conf_val),
577                                     conf_val) > 0) {
578             flow_config.hash_size = configval;
579         }
580     }
581     if ((ConfGet("flow.prealloc", &conf_val)) == 1)
582     {
583         if (conf_val == NULL) {
584             FatalError(SC_ERR_FATAL, "Invalid value for flow.prealloc: NULL");
585         }
586 
587         if (StringParseUint32(&configval, 10, strlen(conf_val),
588                                     conf_val) > 0) {
589             flow_config.prealloc = configval;
590         }
591     }
592     SCLogDebug("Flow config from suricata.yaml: memcap: %"PRIu64", hash-size: "
593                "%"PRIu32", prealloc: %"PRIu32, SC_ATOMIC_GET(flow_config.memcap),
594                flow_config.hash_size, flow_config.prealloc);
595 
596     /* alloc hash memory */
597     uint64_t hash_size = flow_config.hash_size * sizeof(FlowBucket);
598     if (!(FLOW_CHECK_MEMCAP(hash_size))) {
599         SCLogError(SC_ERR_FLOW_INIT, "allocating flow hash failed: "
600                 "max flow memcap is smaller than projected hash size. "
601                 "Memcap: %"PRIu64", Hash table size %"PRIu64". Calculate "
602                 "total hash size by multiplying \"flow.hash-size\" with %"PRIuMAX", "
603                 "which is the hash bucket size.", SC_ATOMIC_GET(flow_config.memcap), hash_size,
604                 (uintmax_t)sizeof(FlowBucket));
605         exit(EXIT_FAILURE);
606     }
607     flow_hash = SCMallocAligned(flow_config.hash_size * sizeof(FlowBucket), CLS);
608     if (unlikely(flow_hash == NULL)) {
609         FatalError(SC_ERR_FATAL,
610                    "Fatal error encountered in FlowInitConfig. Exiting...");
611     }
612     memset(flow_hash, 0, flow_config.hash_size * sizeof(FlowBucket));
613 
614     uint32_t i = 0;
615     for (i = 0; i < flow_config.hash_size; i++) {
616         FBLOCK_INIT(&flow_hash[i]);
617         SC_ATOMIC_INIT(flow_hash[i].next_ts);
618     }
619     (void) SC_ATOMIC_ADD(flow_memuse, (flow_config.hash_size * sizeof(FlowBucket)));
620 
621     if (quiet == FALSE) {
622         SCLogConfig("allocated %"PRIu64" bytes of memory for the flow hash... "
623                   "%" PRIu32 " buckets of size %" PRIuMAX "",
624                   SC_ATOMIC_GET(flow_memuse), flow_config.hash_size,
625                   (uintmax_t)sizeof(FlowBucket));
626     }
627     FlowSparePoolInit();
628     if (quiet == FALSE) {
629         SCLogConfig("flow memory usage: %"PRIu64" bytes, maximum: %"PRIu64,
630                 SC_ATOMIC_GET(flow_memuse), SC_ATOMIC_GET(flow_config.memcap));
631     }
632 
633     FlowInitFlowProto();
634 
635     uint32_t sz = sizeof(Flow) + FlowStorageSize();
636     SCLogConfig("flow size %u, memcap allows for %" PRIu64 " flows. Per hash row in perfect "
637                 "conditions %" PRIu64,
638             sz, flow_memcap_copy / sz, (flow_memcap_copy / sz) / flow_config.hash_size);
639     return;
640 }
641 
642 /** \brief shutdown the flow engine
643  *  \warning Not thread safe */
FlowShutdown(void)644 void FlowShutdown(void)
645 {
646     Flow *f;
647     while ((f = FlowDequeue(&flow_recycle_q))) {
648         FlowFree(f);
649     }
650 
651     /* clear and free the hash */
652     if (flow_hash != NULL) {
653         /* clean up flow mutexes */
654         for (uint32_t u = 0; u < flow_config.hash_size; u++) {
655             f = flow_hash[u].head;
656             while (f) {
657                 DEBUG_VALIDATE_BUG_ON(f->use_cnt != 0);
658                 Flow *n = f->next;
659                 uint8_t proto_map = FlowGetProtoMapping(f->proto);
660                 FlowClearMemory(f, proto_map);
661                 FlowFree(f);
662                 f = n;
663             }
664             f = flow_hash[u].evicted;
665             while (f) {
666                 DEBUG_VALIDATE_BUG_ON(f->use_cnt != 0);
667                 Flow *n = f->next;
668                 uint8_t proto_map = FlowGetProtoMapping(f->proto);
669                 FlowClearMemory(f, proto_map);
670                 FlowFree(f);
671                 f = n;
672             }
673 
674             FBLOCK_DESTROY(&flow_hash[u]);
675         }
676         SCFreeAligned(flow_hash);
677         flow_hash = NULL;
678     }
679     (void) SC_ATOMIC_SUB(flow_memuse, flow_config.hash_size * sizeof(FlowBucket));
680     FlowQueueDestroy(&flow_recycle_q);
681     FlowSparePoolDestroy();
682     return;
683 }
684 
685 /**
686  *  \brief  Function to set the default timeout, free function and flow state
687  *          function for all supported flow_proto.
688  */
689 
FlowInitFlowProto(void)690 void FlowInitFlowProto(void)
691 {
692     FlowTimeoutsInit();
693 
694 #define SET_DEFAULTS(p, n, e, c, b, ne, ee, ce, be)     \
695     flow_timeouts_normal[(p)].new_timeout = (n);     \
696     flow_timeouts_normal[(p)].est_timeout = (e);     \
697     flow_timeouts_normal[(p)].closed_timeout = (c);  \
698     flow_timeouts_normal[(p)].bypassed_timeout = (b); \
699     flow_timeouts_emerg[(p)].new_timeout = (ne);     \
700     flow_timeouts_emerg[(p)].est_timeout = (ee);     \
701     flow_timeouts_emerg[(p)].closed_timeout = (ce); \
702     flow_timeouts_emerg[(p)].bypassed_timeout = (be); \
703 
704     SET_DEFAULTS(FLOW_PROTO_DEFAULT,
705                 FLOW_DEFAULT_NEW_TIMEOUT, FLOW_DEFAULT_EST_TIMEOUT,
706                     0, FLOW_DEFAULT_BYPASSED_TIMEOUT,
707                 FLOW_DEFAULT_EMERG_NEW_TIMEOUT, FLOW_DEFAULT_EMERG_EST_TIMEOUT,
708                     0, FLOW_DEFAULT_EMERG_BYPASSED_TIMEOUT);
709     SET_DEFAULTS(FLOW_PROTO_TCP,
710                 FLOW_IPPROTO_TCP_NEW_TIMEOUT, FLOW_IPPROTO_TCP_EST_TIMEOUT,
711                     FLOW_IPPROTO_TCP_CLOSED_TIMEOUT, FLOW_IPPROTO_TCP_BYPASSED_TIMEOUT,
712                 FLOW_IPPROTO_TCP_EMERG_NEW_TIMEOUT, FLOW_IPPROTO_TCP_EMERG_EST_TIMEOUT,
713                     FLOW_IPPROTO_TCP_EMERG_CLOSED_TIMEOUT, FLOW_DEFAULT_EMERG_BYPASSED_TIMEOUT);
714     SET_DEFAULTS(FLOW_PROTO_UDP,
715                 FLOW_IPPROTO_UDP_NEW_TIMEOUT, FLOW_IPPROTO_UDP_EST_TIMEOUT,
716                     0, FLOW_IPPROTO_UDP_BYPASSED_TIMEOUT,
717                 FLOW_IPPROTO_UDP_EMERG_NEW_TIMEOUT, FLOW_IPPROTO_UDP_EMERG_EST_TIMEOUT,
718                     0, FLOW_DEFAULT_EMERG_BYPASSED_TIMEOUT);
719     SET_DEFAULTS(FLOW_PROTO_ICMP,
720                 FLOW_IPPROTO_ICMP_NEW_TIMEOUT, FLOW_IPPROTO_ICMP_EST_TIMEOUT,
721                     0, FLOW_IPPROTO_ICMP_BYPASSED_TIMEOUT,
722                 FLOW_IPPROTO_ICMP_EMERG_NEW_TIMEOUT, FLOW_IPPROTO_ICMP_EMERG_EST_TIMEOUT,
723                     0, FLOW_DEFAULT_EMERG_BYPASSED_TIMEOUT);
724 
725     flow_freefuncs[FLOW_PROTO_DEFAULT].Freefunc = NULL;
726     flow_freefuncs[FLOW_PROTO_TCP].Freefunc = NULL;
727     flow_freefuncs[FLOW_PROTO_UDP].Freefunc = NULL;
728     flow_freefuncs[FLOW_PROTO_ICMP].Freefunc = NULL;
729 
730     /* Let's see if we have custom timeouts defined from config */
731     const char *new = NULL;
732     const char *established = NULL;
733     const char *closed = NULL;
734     const char *bypassed = NULL;
735     const char *emergency_new = NULL;
736     const char *emergency_established = NULL;
737     const char *emergency_closed = NULL;
738     const char *emergency_bypassed = NULL;
739 
740     ConfNode *flow_timeouts = ConfGetNode("flow-timeouts");
741     if (flow_timeouts != NULL) {
742         ConfNode *proto = NULL;
743         uint32_t configval = 0;
744 
745         /* Defaults. */
746         proto = ConfNodeLookupChild(flow_timeouts, "default");
747         if (proto != NULL) {
748             new = ConfNodeLookupChildValue(proto, "new");
749             established = ConfNodeLookupChildValue(proto, "established");
750             closed = ConfNodeLookupChildValue(proto, "closed");
751             bypassed = ConfNodeLookupChildValue(proto, "bypassed");
752             emergency_new = ConfNodeLookupChildValue(proto, "emergency-new");
753             emergency_established = ConfNodeLookupChildValue(proto,
754                 "emergency-established");
755             emergency_closed = ConfNodeLookupChildValue(proto,
756                 "emergency-closed");
757             emergency_bypassed = ConfNodeLookupChildValue(proto,
758                 "emergency-bypassed");
759 
760             if (new != NULL &&
761                 StringParseUint32(&configval, 10, strlen(new), new) > 0) {
762 
763                     flow_timeouts_normal[FLOW_PROTO_DEFAULT].new_timeout = configval;
764             }
765             if (established != NULL &&
766                 StringParseUint32(&configval, 10, strlen(established),
767                                         established) > 0) {
768 
769                 flow_timeouts_normal[FLOW_PROTO_DEFAULT].est_timeout = configval;
770             }
771             if (closed != NULL &&
772                 StringParseUint32(&configval, 10, strlen(closed),
773                                         closed) > 0) {
774 
775                 flow_timeouts_normal[FLOW_PROTO_DEFAULT].closed_timeout = configval;
776             }
777             if (bypassed != NULL &&
778                     StringParseUint32(&configval, 10,
779                                             strlen(bypassed),
780                                             bypassed) > 0) {
781 
782                 flow_timeouts_normal[FLOW_PROTO_DEFAULT].bypassed_timeout = configval;
783             }
784             if (emergency_new != NULL &&
785                 StringParseUint32(&configval, 10, strlen(emergency_new),
786                                         emergency_new) > 0) {
787 
788                 flow_timeouts_emerg[FLOW_PROTO_DEFAULT].new_timeout = configval;
789             }
790             if (emergency_established != NULL &&
791                     StringParseUint32(&configval, 10,
792                                             strlen(emergency_established),
793                                             emergency_established) > 0) {
794 
795                 flow_timeouts_emerg[FLOW_PROTO_DEFAULT].est_timeout= configval;
796             }
797             if (emergency_closed != NULL &&
798                     StringParseUint32(&configval, 10,
799                                             strlen(emergency_closed),
800                                             emergency_closed) > 0) {
801 
802                 flow_timeouts_emerg[FLOW_PROTO_DEFAULT].closed_timeout = configval;
803             }
804             if (emergency_bypassed != NULL &&
805                     StringParseUint32(&configval, 10,
806                                             strlen(emergency_bypassed),
807                                             emergency_bypassed) > 0) {
808 
809                 flow_timeouts_emerg[FLOW_PROTO_DEFAULT].bypassed_timeout = configval;
810             }
811         }
812 
813         /* TCP. */
814         proto = ConfNodeLookupChild(flow_timeouts, "tcp");
815         if (proto != NULL) {
816             new = ConfNodeLookupChildValue(proto, "new");
817             established = ConfNodeLookupChildValue(proto, "established");
818             closed = ConfNodeLookupChildValue(proto, "closed");
819             bypassed = ConfNodeLookupChildValue(proto, "bypassed");
820             emergency_new = ConfNodeLookupChildValue(proto, "emergency-new");
821             emergency_established = ConfNodeLookupChildValue(proto,
822                 "emergency-established");
823             emergency_closed = ConfNodeLookupChildValue(proto,
824                 "emergency-closed");
825             emergency_bypassed = ConfNodeLookupChildValue(proto,
826                 "emergency-bypassed");
827 
828             if (new != NULL &&
829                 StringParseUint32(&configval, 10, strlen(new), new) > 0) {
830 
831                 flow_timeouts_normal[FLOW_PROTO_TCP].new_timeout = configval;
832             }
833             if (established != NULL &&
834                 StringParseUint32(&configval, 10, strlen(established),
835                                         established) > 0) {
836 
837                 flow_timeouts_normal[FLOW_PROTO_TCP].est_timeout = configval;
838             }
839             if (closed != NULL &&
840                 StringParseUint32(&configval, 10, strlen(closed),
841                                         closed) > 0) {
842 
843                 flow_timeouts_normal[FLOW_PROTO_TCP].closed_timeout = configval;
844             }
845             if (bypassed != NULL &&
846                     StringParseUint32(&configval, 10,
847                                             strlen(bypassed),
848                                             bypassed) > 0) {
849 
850                 flow_timeouts_normal[FLOW_PROTO_TCP].bypassed_timeout = configval;
851             }
852             if (emergency_new != NULL &&
853                 StringParseUint32(&configval, 10, strlen(emergency_new),
854                                         emergency_new) > 0) {
855 
856                 flow_timeouts_emerg[FLOW_PROTO_TCP].new_timeout = configval;
857             }
858             if (emergency_established != NULL &&
859                 StringParseUint32(&configval, 10,
860                                         strlen(emergency_established),
861                                         emergency_established) > 0) {
862 
863                 flow_timeouts_emerg[FLOW_PROTO_TCP].est_timeout = configval;
864             }
865             if (emergency_closed != NULL &&
866                 StringParseUint32(&configval, 10,
867                                         strlen(emergency_closed),
868                                         emergency_closed) > 0) {
869 
870                 flow_timeouts_emerg[FLOW_PROTO_TCP].closed_timeout = configval;
871             }
872             if (emergency_bypassed != NULL &&
873                     StringParseUint32(&configval, 10,
874                                             strlen(emergency_bypassed),
875                                             emergency_bypassed) > 0) {
876 
877                 flow_timeouts_emerg[FLOW_PROTO_TCP].bypassed_timeout = configval;
878             }
879         }
880 
881         /* UDP. */
882         proto = ConfNodeLookupChild(flow_timeouts, "udp");
883         if (proto != NULL) {
884             new = ConfNodeLookupChildValue(proto, "new");
885             established = ConfNodeLookupChildValue(proto, "established");
886             bypassed = ConfNodeLookupChildValue(proto, "bypassed");
887             emergency_new = ConfNodeLookupChildValue(proto, "emergency-new");
888             emergency_established = ConfNodeLookupChildValue(proto,
889                 "emergency-established");
890             emergency_bypassed = ConfNodeLookupChildValue(proto,
891                 "emergency-bypassed");
892 
893             if (new != NULL &&
894                 StringParseUint32(&configval, 10, strlen(new), new) > 0) {
895 
896                 flow_timeouts_normal[FLOW_PROTO_UDP].new_timeout = configval;
897             }
898             if (established != NULL &&
899                 StringParseUint32(&configval, 10, strlen(established),
900                                         established) > 0) {
901 
902                 flow_timeouts_normal[FLOW_PROTO_UDP].est_timeout = configval;
903             }
904             if (bypassed != NULL &&
905                     StringParseUint32(&configval, 10,
906                                             strlen(bypassed),
907                                             bypassed) > 0) {
908 
909                 flow_timeouts_normal[FLOW_PROTO_UDP].bypassed_timeout = configval;
910             }
911             if (emergency_new != NULL &&
912                 StringParseUint32(&configval, 10, strlen(emergency_new),
913                                         emergency_new) > 0) {
914 
915                 flow_timeouts_emerg[FLOW_PROTO_UDP].new_timeout = configval;
916             }
917             if (emergency_established != NULL &&
918                 StringParseUint32(&configval, 10,
919                                         strlen(emergency_established),
920                                         emergency_established) > 0) {
921 
922                 flow_timeouts_emerg[FLOW_PROTO_UDP].est_timeout = configval;
923             }
924             if (emergency_bypassed != NULL &&
925                     StringParseUint32(&configval, 10,
926                                             strlen(emergency_bypassed),
927                                             emergency_bypassed) > 0) {
928 
929                 flow_timeouts_emerg[FLOW_PROTO_UDP].bypassed_timeout = configval;
930             }
931         }
932 
933         /* ICMP. */
934         proto = ConfNodeLookupChild(flow_timeouts, "icmp");
935         if (proto != NULL) {
936             new = ConfNodeLookupChildValue(proto, "new");
937             established = ConfNodeLookupChildValue(proto, "established");
938             bypassed = ConfNodeLookupChildValue(proto, "bypassed");
939             emergency_new = ConfNodeLookupChildValue(proto, "emergency-new");
940             emergency_established = ConfNodeLookupChildValue(proto,
941                 "emergency-established");
942             emergency_bypassed = ConfNodeLookupChildValue(proto,
943                 "emergency-bypassed");
944 
945             if (new != NULL &&
946                 StringParseUint32(&configval, 10, strlen(new), new) > 0) {
947 
948                 flow_timeouts_normal[FLOW_PROTO_ICMP].new_timeout = configval;
949             }
950             if (established != NULL &&
951                 StringParseUint32(&configval, 10, strlen(established),
952                                         established) > 0) {
953 
954                 flow_timeouts_normal[FLOW_PROTO_ICMP].est_timeout = configval;
955             }
956             if (bypassed != NULL &&
957                     StringParseUint32(&configval, 10,
958                                             strlen(bypassed),
959                                             bypassed) > 0) {
960 
961                 flow_timeouts_normal[FLOW_PROTO_ICMP].bypassed_timeout = configval;
962             }
963             if (emergency_new != NULL &&
964                 StringParseUint32(&configval, 10, strlen(emergency_new),
965                                         emergency_new) > 0) {
966 
967                 flow_timeouts_emerg[FLOW_PROTO_ICMP].new_timeout = configval;
968             }
969             if (emergency_established != NULL &&
970                 StringParseUint32(&configval, 10,
971                                         strlen(emergency_established),
972                                         emergency_established) > 0) {
973 
974                 flow_timeouts_emerg[FLOW_PROTO_ICMP].est_timeout = configval;
975             }
976             if (emergency_bypassed != NULL &&
977                     StringParseUint32(&configval, 10,
978                                             strlen(emergency_bypassed),
979                                             emergency_bypassed) > 0) {
980 
981                 flow_timeouts_emerg[FLOW_PROTO_ICMP].bypassed_timeout = configval;
982             }
983         }
984     }
985 
986     /* validate and if needed update emergency timeout values */
987     for (int i = 0; i < FLOW_PROTO_MAX; i++) {
988         const FlowProtoTimeout *n = &flow_timeouts_normal[i];
989         FlowProtoTimeout *e = &flow_timeouts_emerg[i];
990 
991         if (e->est_timeout > n->est_timeout) {
992             SCLogWarning(SC_WARN_FLOW_EMERGENCY, "emergency timeout value %u for \'established\' "
993                     "must be below regular value %u", e->est_timeout, n->est_timeout);
994             e->est_timeout = n->est_timeout / 10;
995         }
996 
997         if (e->new_timeout > n->new_timeout) {
998             SCLogWarning(SC_WARN_FLOW_EMERGENCY, "emergency timeout value %u for \'new\' must be "
999                     "below regular value %u", e->new_timeout, n->new_timeout);
1000             e->new_timeout = n->new_timeout / 10;
1001         }
1002 
1003         if (e->closed_timeout > n->closed_timeout) {
1004             SCLogWarning(SC_WARN_FLOW_EMERGENCY, "emergency timeout value %u for \'closed\' must "
1005                     "be below regular value %u", e->closed_timeout, n->closed_timeout);
1006             e->closed_timeout = n->closed_timeout / 10;
1007         }
1008 
1009         if (e->bypassed_timeout > n->bypassed_timeout) {
1010             SCLogWarning(SC_WARN_FLOW_EMERGENCY, "emergency timeout value %u for \'bypassed\' "
1011                     "must be below regular value %u", e->bypassed_timeout, n->bypassed_timeout);
1012             e->bypassed_timeout = n->bypassed_timeout / 10;
1013         }
1014     }
1015 
1016     for (int i = 0; i < FLOW_PROTO_MAX; i++) {
1017         FlowProtoTimeout *n = &flow_timeouts_normal[i];
1018         FlowProtoTimeout *e = &flow_timeouts_emerg[i];
1019         FlowProtoTimeout *d = &flow_timeouts_delta[i];
1020 
1021         if (e->est_timeout > n->est_timeout) {
1022             SCLogWarning(SC_WARN_FLOW_EMERGENCY, "emergency timeout value for \'established\' must be below normal value");
1023             e->est_timeout = n->est_timeout / 10;
1024         }
1025         d->est_timeout = n->est_timeout - e->est_timeout;
1026 
1027         if (e->new_timeout > n->new_timeout) {
1028             SCLogWarning(SC_WARN_FLOW_EMERGENCY, "emergency timeout value for \'new\' must be below normal value");
1029             e->new_timeout = n->new_timeout / 10;
1030         }
1031         d->new_timeout = n->new_timeout - e->new_timeout;
1032 
1033         if (e->closed_timeout > n->closed_timeout) {
1034             SCLogWarning(SC_WARN_FLOW_EMERGENCY, "emergency timeout value for \'closed\' must be below normal value");
1035             e->closed_timeout = n->closed_timeout / 10;
1036         }
1037         d->closed_timeout = n->closed_timeout - e->closed_timeout;
1038 
1039         if (e->bypassed_timeout > n->bypassed_timeout) {
1040             SCLogWarning(SC_WARN_FLOW_EMERGENCY, "emergency timeout value for \'bypassed\' must be below normal value");
1041             e->bypassed_timeout = n->bypassed_timeout / 10;
1042         }
1043         d->bypassed_timeout = n->bypassed_timeout - e->bypassed_timeout;
1044 
1045         SCLogDebug("deltas: new: -%u est: -%u closed: -%u bypassed: -%u",
1046                 d->new_timeout, d->est_timeout, d->closed_timeout, d->bypassed_timeout);
1047     }
1048 
1049     return;
1050 }
1051 
1052 /**
1053  *  \brief  Function clear the flow memory before queueing it to spare flow
1054  *          queue.
1055  *
1056  *  \param  f           pointer to the flow needed to be cleared.
1057  *  \param  proto_map   mapped value of the protocol to FLOW_PROTO's.
1058  */
1059 
FlowClearMemory(Flow * f,uint8_t proto_map)1060 int FlowClearMemory(Flow* f, uint8_t proto_map)
1061 {
1062     SCEnter();
1063 
1064     if (unlikely(f->flags & FLOW_HAS_EXPECTATION)) {
1065         AppLayerExpectationClean(f);
1066     }
1067 
1068     /* call the protocol specific free function if we have one */
1069     if (flow_freefuncs[proto_map].Freefunc != NULL) {
1070         flow_freefuncs[proto_map].Freefunc(f->protoctx);
1071     }
1072 
1073     FlowFreeStorage(f);
1074 
1075     FLOW_RECYCLE(f);
1076 
1077     SCReturnInt(1);
1078 }
1079 
1080 /**
1081  *  \brief  Function to set the function to get protocol specific flow state.
1082  *
1083  *  \param   proto  protocol of which function is needed to be set.
1084  *  \param   Free   Function pointer which will be called to free the protocol
1085  *                  specific memory.
1086  */
1087 
FlowSetProtoFreeFunc(uint8_t proto,void (* Free)(void *))1088 int FlowSetProtoFreeFunc (uint8_t proto, void (*Free)(void *))
1089 {
1090     uint8_t proto_map;
1091     proto_map = FlowGetProtoMapping(proto);
1092 
1093     flow_freefuncs[proto_map].Freefunc = Free;
1094     return 1;
1095 }
1096 
FlowGetAppProtocol(const Flow * f)1097 AppProto FlowGetAppProtocol(const Flow *f)
1098 {
1099     return f->alproto;
1100 }
1101 
FlowGetAppState(const Flow * f)1102 void *FlowGetAppState(const Flow *f)
1103 {
1104     return f->alstate;
1105 }
1106 
1107 /**
1108  *  \brief get 'disruption' flags: GAP/DEPTH/PASS
1109  *  \param f locked flow
1110  *  \param flags existing flags to be ammended
1111  *  \retval flags original flags + disrupt flags (if any)
1112  *  \TODO handle UDP
1113  */
FlowGetDisruptionFlags(const Flow * f,uint8_t flags)1114 uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags)
1115 {
1116     if (f->proto != IPPROTO_TCP) {
1117         return flags;
1118     }
1119     if (f->protoctx == NULL) {
1120         return flags;
1121     }
1122 
1123     uint8_t newflags = flags;
1124     TcpSession *ssn = f->protoctx;
1125     TcpStream *stream = flags & STREAM_TOSERVER ? &ssn->client : &ssn->server;
1126 
1127     if (stream->flags & STREAMTCP_STREAM_FLAG_DEPTH_REACHED) {
1128         newflags |= STREAM_DEPTH;
1129     }
1130     /* todo: handle pass case (also for UDP!) */
1131 
1132     return newflags;
1133 }
1134 
FlowUpdateState(Flow * f,const enum FlowState s)1135 void FlowUpdateState(Flow *f, const enum FlowState s)
1136 {
1137     if (s != f->flow_state) {
1138         /* set the state */
1139         f->flow_state = s;
1140 
1141         /* update timeout policy and value */
1142         const uint32_t timeout_policy = FlowGetTimeoutPolicy(f);
1143         if (timeout_policy != f->timeout_policy) {
1144             f->timeout_policy = timeout_policy;
1145             const uint32_t timeout_at = (uint32_t)f->lastts.tv_sec + timeout_policy;
1146             if (timeout_at != f->timeout_at)
1147                 f->timeout_at = timeout_at;
1148         }
1149     }
1150 #ifdef UNITTESTS
1151     if (f->fb != NULL) {
1152 #endif
1153         /* and reset the flow buckup next_ts value so that the flow manager
1154          * has to revisit this row */
1155         SC_ATOMIC_SET(f->fb->next_ts, 0);
1156 #ifdef UNITTESTS
1157     }
1158 #endif
1159 }
1160 
1161 /**
1162  * \brief Get flow last time as individual values.
1163  *
1164  * Instead of returning a pointer to the timeval copy the timeval
1165  * parts into output pointers to make it simpler to call from Rust
1166  * over FFI using only basic data types.
1167  */
FlowGetLastTimeAsParts(Flow * flow,uint64_t * secs,uint64_t * usecs)1168 void FlowGetLastTimeAsParts(Flow *flow, uint64_t *secs, uint64_t *usecs)
1169 {
1170     *secs = (uint64_t)flow->lastts.tv_sec;
1171     *usecs = (uint64_t)flow->lastts.tv_usec;
1172 }
1173 
1174 /************************************Unittests*******************************/
1175 
1176 #ifdef UNITTESTS
1177 #include "threads.h"
1178 
1179 /**
1180  *  \test   Test the setting of the per protocol timeouts.
1181  *
1182  *  \retval On success it returns 1 and on failure 0.
1183  */
1184 
FlowTest01(void)1185 static int FlowTest01 (void)
1186 {
1187     uint8_t proto_map;
1188 
1189     FlowInitFlowProto();
1190     proto_map = FlowGetProtoMapping(IPPROTO_TCP);
1191     FAIL_IF(flow_timeouts_normal[proto_map].new_timeout != FLOW_IPPROTO_TCP_NEW_TIMEOUT);
1192     FAIL_IF(flow_timeouts_normal[proto_map].est_timeout != FLOW_IPPROTO_TCP_EST_TIMEOUT);
1193     FAIL_IF(flow_timeouts_emerg[proto_map].new_timeout != FLOW_IPPROTO_TCP_EMERG_NEW_TIMEOUT);
1194     FAIL_IF(flow_timeouts_emerg[proto_map].est_timeout != FLOW_IPPROTO_TCP_EMERG_EST_TIMEOUT);
1195 
1196     proto_map = FlowGetProtoMapping(IPPROTO_UDP);
1197     FAIL_IF(flow_timeouts_normal[proto_map].new_timeout != FLOW_IPPROTO_UDP_NEW_TIMEOUT);
1198     FAIL_IF(flow_timeouts_normal[proto_map].est_timeout != FLOW_IPPROTO_UDP_EST_TIMEOUT);
1199     FAIL_IF(flow_timeouts_emerg[proto_map].new_timeout != FLOW_IPPROTO_UDP_EMERG_NEW_TIMEOUT);
1200     FAIL_IF(flow_timeouts_emerg[proto_map].est_timeout != FLOW_IPPROTO_UDP_EMERG_EST_TIMEOUT);
1201 
1202     proto_map = FlowGetProtoMapping(IPPROTO_ICMP);
1203     FAIL_IF(flow_timeouts_normal[proto_map].new_timeout != FLOW_IPPROTO_ICMP_NEW_TIMEOUT);
1204     FAIL_IF(flow_timeouts_normal[proto_map].est_timeout != FLOW_IPPROTO_ICMP_EST_TIMEOUT);
1205     FAIL_IF(flow_timeouts_emerg[proto_map].new_timeout != FLOW_IPPROTO_ICMP_EMERG_NEW_TIMEOUT);
1206     FAIL_IF(flow_timeouts_emerg[proto_map].est_timeout != FLOW_IPPROTO_ICMP_EMERG_EST_TIMEOUT);
1207 
1208     proto_map = FlowGetProtoMapping(IPPROTO_DCCP);
1209     FAIL_IF(flow_timeouts_normal[proto_map].new_timeout != FLOW_DEFAULT_NEW_TIMEOUT);
1210     FAIL_IF(flow_timeouts_normal[proto_map].est_timeout != FLOW_DEFAULT_EST_TIMEOUT);
1211     FAIL_IF(flow_timeouts_emerg[proto_map].new_timeout != FLOW_DEFAULT_EMERG_NEW_TIMEOUT);
1212     FAIL_IF(flow_timeouts_emerg[proto_map].est_timeout != FLOW_DEFAULT_EMERG_EST_TIMEOUT);
1213 
1214     PASS;
1215 }
1216 
1217 /*Test function for the unit test FlowTest02*/
1218 
test(void * f)1219 static void test(void *f) {}
1220 
1221 /**
1222  *  \test   Test the setting of the per protocol free function to free the
1223  *          protocol specific memory.
1224  *
1225  *  \retval On success it returns 1 and on failure 0.
1226  */
1227 
FlowTest02(void)1228 static int FlowTest02 (void)
1229 {
1230     FlowSetProtoFreeFunc(IPPROTO_DCCP, test);
1231     FlowSetProtoFreeFunc(IPPROTO_TCP, test);
1232     FlowSetProtoFreeFunc(IPPROTO_UDP, test);
1233     FlowSetProtoFreeFunc(IPPROTO_ICMP, test);
1234 
1235     FAIL_IF(flow_freefuncs[FLOW_PROTO_DEFAULT].Freefunc != test);
1236     FAIL_IF(flow_freefuncs[FLOW_PROTO_TCP].Freefunc != test);
1237     FAIL_IF(flow_freefuncs[FLOW_PROTO_UDP].Freefunc != test);
1238     FAIL_IF(flow_freefuncs[FLOW_PROTO_ICMP].Freefunc != test);
1239 
1240     PASS;
1241 }
1242 
1243 /**
1244  *  \test   Test flow allocations when it reach memcap
1245  *
1246  *
1247  *  \retval On success it returns 1 and on failure 0.
1248  */
1249 
FlowTest07(void)1250 static int FlowTest07 (void)
1251 {
1252     int result = 0;
1253     FlowInitConfig(FLOW_QUIET);
1254     FlowConfig backup;
1255     memcpy(&backup, &flow_config, sizeof(FlowConfig));
1256 
1257     uint32_t ini = 0;
1258     uint32_t end = FlowSpareGetPoolSize();
1259     SC_ATOMIC_SET(flow_config.memcap, 10000);
1260     flow_config.prealloc = 100;
1261 
1262     /* Let's get the flow spare pool empty */
1263     UTHBuildPacketOfFlows(ini, end, 0);
1264 
1265     /* And now let's try to reach the memcap val */
1266     while (FLOW_CHECK_MEMCAP(sizeof(Flow))) {
1267         ini = end + 1;
1268         end = end + 2;
1269         UTHBuildPacketOfFlows(ini, end, 0);
1270     }
1271 
1272     /* should time out normal */
1273     TimeSetIncrementTime(2000);
1274     ini = end + 1;
1275     end = end + 2;
1276     UTHBuildPacketOfFlows(ini, end, 0);
1277 
1278     /* This means that the engine entered emerg mode: should happen as easy
1279      * with flow mgr activated */
1280     if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
1281         result = 1;
1282 
1283     FlowShutdown();
1284     memcpy(&flow_config, &backup, sizeof(FlowConfig));
1285 
1286     return result;
1287 }
1288 
1289 /**
1290  *  \test   Test flow allocations when it reach memcap
1291  *
1292  *
1293  *  \retval On success it returns 1 and on failure 0.
1294  */
1295 
FlowTest08(void)1296 static int FlowTest08 (void)
1297 {
1298     int result = 0;
1299 
1300     FlowInitConfig(FLOW_QUIET);
1301     FlowConfig backup;
1302     memcpy(&backup, &flow_config, sizeof(FlowConfig));
1303 
1304     uint32_t ini = 0;
1305     uint32_t end = FlowSpareGetPoolSize();
1306     SC_ATOMIC_SET(flow_config.memcap, 10000);
1307     flow_config.prealloc = 100;
1308 
1309     /* Let's get the flow spare pool empty */
1310     UTHBuildPacketOfFlows(ini, end, 0);
1311 
1312     /* And now let's try to reach the memcap val */
1313     while (FLOW_CHECK_MEMCAP(sizeof(Flow))) {
1314         ini = end + 1;
1315         end = end + 2;
1316         UTHBuildPacketOfFlows(ini, end, 0);
1317     }
1318 
1319     /* By default we use 30  for timing out new flows. This means
1320      * that the Emergency mode should be set */
1321     TimeSetIncrementTime(20);
1322     ini = end + 1;
1323     end = end + 2;
1324     UTHBuildPacketOfFlows(ini, end, 0);
1325 
1326     /* This means that the engine released 5 flows by emergency timeout */
1327     if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
1328         result = 1;
1329 
1330     memcpy(&flow_config, &backup, sizeof(FlowConfig));
1331     FlowShutdown();
1332 
1333     return result;
1334 }
1335 
1336 /**
1337  *  \test   Test flow allocations when it reach memcap
1338  *
1339  *
1340  *  \retval On success it returns 1 and on failure 0.
1341  */
1342 
FlowTest09(void)1343 static int FlowTest09 (void)
1344 {
1345     int result = 0;
1346 
1347     FlowInitConfig(FLOW_QUIET);
1348     FlowConfig backup;
1349     memcpy(&backup, &flow_config, sizeof(FlowConfig));
1350 
1351     uint32_t ini = 0;
1352     uint32_t end = FlowSpareGetPoolSize();
1353     SC_ATOMIC_SET(flow_config.memcap, 10000);
1354     flow_config.prealloc = 100;
1355 
1356     /* Let's get the flow spare pool empty */
1357     UTHBuildPacketOfFlows(ini, end, 0);
1358 
1359     /* And now let's try to reach the memcap val */
1360     while (FLOW_CHECK_MEMCAP(sizeof(Flow))) {
1361         ini = end + 1;
1362         end = end + 2;
1363         UTHBuildPacketOfFlows(ini, end, 0);
1364     }
1365 
1366     /* No timeout will work */
1367     TimeSetIncrementTime(5);
1368     ini = end + 1;
1369     end = end + 2;
1370     UTHBuildPacketOfFlows(ini, end, 0);
1371 
1372     /* engine in emerg mode */
1373     if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
1374         result = 1;
1375 
1376     memcpy(&flow_config, &backup, sizeof(FlowConfig));
1377     FlowShutdown();
1378 
1379     return result;
1380 }
1381 
1382 #endif /* UNITTESTS */
1383 
1384 /**
1385  *  \brief   Function to register the Flow Unitests.
1386  */
FlowRegisterTests(void)1387 void FlowRegisterTests (void)
1388 {
1389 #ifdef UNITTESTS
1390     UtRegisterTest("FlowTest01 -- Protocol Specific Timeouts", FlowTest01);
1391     UtRegisterTest("FlowTest02 -- Setting Protocol Specific Free Function",
1392                    FlowTest02);
1393     UtRegisterTest("FlowTest07 -- Test flow Allocations when it reach memcap",
1394                    FlowTest07);
1395     UtRegisterTest("FlowTest08 -- Test flow Allocations when it reach memcap",
1396                    FlowTest08);
1397     UtRegisterTest("FlowTest09 -- Test flow Allocations when it reach memcap",
1398                    FlowTest09);
1399 
1400     RegisterFlowStorageTests();
1401 #endif /* UNITTESTS */
1402 }
1403