1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  *
24  * Simple output queue handler that makes sure all packets of the same flow
25  * are sent to the same queue. We support different kind of q handlers.  Have
26  * a look at "autofp-scheduler" conf to further undertsand the various q
27  * handlers we provide.
28  */
29 
30 #include "suricata.h"
31 #include "packet-queue.h"
32 #include "decode.h"
33 #include "threads.h"
34 #include "threadvars.h"
35 #include "tmqh-flow.h"
36 
37 #include "tm-queuehandlers.h"
38 
39 #include "conf.h"
40 #include "util-unittest.h"
41 
42 Packet *TmqhInputFlow(ThreadVars *t);
43 void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
44 void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
45 void *TmqhOutputFlowSetupCtx(const char *queue_str);
46 void TmqhOutputFlowFreeCtx(void *ctx);
47 void TmqhFlowRegisterTests(void);
48 
TmqhFlowRegister(void)49 void TmqhFlowRegister(void)
50 {
51     tmqh_table[TMQH_FLOW].name = "flow";
52     tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
53     tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
54     tmqh_table[TMQH_FLOW].OutHandlerCtxFree = TmqhOutputFlowFreeCtx;
55     tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;
56 
57     const char *scheduler = NULL;
58     if (ConfGet("autofp-scheduler", &scheduler) == 1) {
59         if (strcasecmp(scheduler, "round-robin") == 0) {
60             SCLogNotice("using flow hash instead of round robin");
61             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
62         } else if (strcasecmp(scheduler, "active-packets") == 0) {
63             SCLogNotice("using flow hash instead of active packets");
64             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
65         } else if (strcasecmp(scheduler, "hash") == 0) {
66             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
67         } else if (strcasecmp(scheduler, "ippair") == 0) {
68             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair;
69         } else {
70             SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
71                        "for autofp-scheduler in conf.  Killing engine.",
72                        scheduler);
73             exit(EXIT_FAILURE);
74         }
75     } else {
76         tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
77     }
78 
79     return;
80 }
81 
TmqhFlowPrintAutofpHandler(void)82 void TmqhFlowPrintAutofpHandler(void)
83 {
84 #define PRINT_IF_FUNC(f, msg)                       \
85     if (tmqh_table[TMQH_FLOW].OutHandler == (f))    \
86         SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
87 
88     PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash");
89     PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair");
90 
91 #undef PRINT_IF_FUNC
92 }
93 
94 /* same as 'simple' */
TmqhInputFlow(ThreadVars * tv)95 Packet *TmqhInputFlow(ThreadVars *tv)
96 {
97     PacketQueue *q = tv->inq->pq;
98 
99     StatsSyncCountersIfSignalled(tv);
100 
101     SCMutexLock(&q->mutex_q);
102     if (q->len == 0) {
103         /* if we have no packets in queue, wait... */
104         SCCondWait(&q->cond_q, &q->mutex_q);
105     }
106 
107     if (q->len > 0) {
108         Packet *p = PacketDequeue(q);
109         SCMutexUnlock(&q->mutex_q);
110         return p;
111     } else {
112         /* return NULL if we have no pkt. Should only happen on signals. */
113         SCMutexUnlock(&q->mutex_q);
114         return NULL;
115     }
116 }
117 
StoreQueueId(TmqhFlowCtx * ctx,char * name)118 static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
119 {
120     void *ptmp;
121     Tmq *tmq = TmqGetQueueByName(name);
122     if (tmq == NULL) {
123         tmq = TmqCreateQueue(name);
124         if (tmq == NULL)
125             return -1;
126     }
127     tmq->writer_cnt++;
128 
129     if (ctx->queues == NULL) {
130         ctx->size = 1;
131         ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
132         if (ctx->queues == NULL) {
133             return -1;
134         }
135         memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
136     } else {
137         ctx->size++;
138         ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
139         if (ptmp == NULL) {
140             SCFree(ctx->queues);
141             ctx->queues = NULL;
142             return -1;
143         }
144         ctx->queues = ptmp;
145 
146         memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
147     }
148     ctx->queues[ctx->size - 1].q = tmq->pq;
149 
150     return 0;
151 }
152 
153 /**
154  * \brief setup the queue handlers ctx
155  *
156  * Parses a comma separated string "queuename1,queuename2,etc"
157  * and sets the ctx up to devide flows over these queue's.
158  *
159  * \param queue_str comma separated string with output queue names
160  *
161  * \retval ctx queues handlers ctx or NULL in error
162  */
TmqhOutputFlowSetupCtx(const char * queue_str)163 void *TmqhOutputFlowSetupCtx(const char *queue_str)
164 {
165     if (queue_str == NULL || strlen(queue_str) == 0)
166         return NULL;
167 
168     SCLogDebug("queue_str %s", queue_str);
169 
170     TmqhFlowCtx *ctx = SCMalloc(sizeof(TmqhFlowCtx));
171     if (unlikely(ctx == NULL))
172         return NULL;
173     memset(ctx,0x00,sizeof(TmqhFlowCtx));
174 
175     char *str = SCStrdup(queue_str);
176     if (unlikely(str == NULL)) {
177         goto error;
178     }
179     char *tstr = str;
180 
181     /* parse the comma separated string */
182     do {
183         char *comma = strchr(tstr,',');
184         if (comma != NULL) {
185             *comma = '\0';
186             char *qname = tstr;
187             int r = StoreQueueId(ctx,qname);
188             if (r < 0)
189                 goto error;
190         } else {
191             char *qname = tstr;
192             int r = StoreQueueId(ctx,qname);
193             if (r < 0)
194                 goto error;
195         }
196         tstr = comma ? (comma + 1) : comma;
197     } while (tstr != NULL);
198 
199     SCFree(str);
200     return (void *)ctx;
201 
202 error:
203     SCFree(ctx);
204     if (str != NULL)
205         SCFree(str);
206     return NULL;
207 }
208 
TmqhOutputFlowFreeCtx(void * ctx)209 void TmqhOutputFlowFreeCtx(void *ctx)
210 {
211     TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
212 
213     SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
214               fctx->size);
215     SCFree(fctx->queues);
216     SCFree(fctx);
217 
218     return;
219 }
220 
TmqhOutputFlowHash(ThreadVars * tv,Packet * p)221 void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
222 {
223     uint32_t qid;
224     TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
225 
226     if (p->flags & PKT_WANTS_FLOW) {
227         uint32_t hash = p->flow_hash;
228         qid = hash % ctx->size;
229     } else {
230         qid = ctx->last++;
231 
232         if (ctx->last == ctx->size)
233             ctx->last = 0;
234     }
235 
236     PacketQueue *q = ctx->queues[qid].q;
237     SCMutexLock(&q->mutex_q);
238     PacketEnqueue(q, p);
239     SCCondSignal(&q->cond_q);
240     SCMutexUnlock(&q->mutex_q);
241 
242     return;
243 }
244 
245 /**
246  * \brief select the queue to output based on IP address pair.
247  *
248  * \param tv thread vars.
249  * \param p packet.
250  */
TmqhOutputFlowIPPair(ThreadVars * tv,Packet * p)251 void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
252 {
253     uint32_t addr_hash = 0;
254 
255     TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
256 
257     if (p->src.family == AF_INET6) {
258         for (int i = 0; i < 4; i++) {
259             addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
260         }
261     } else {
262         addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
263     }
264 
265     uint32_t qid = addr_hash % ctx->size;
266     PacketQueue *q = ctx->queues[qid].q;
267     SCMutexLock(&q->mutex_q);
268     PacketEnqueue(q, p);
269     SCCondSignal(&q->cond_q);
270     SCMutexUnlock(&q->mutex_q);
271 
272     return;
273 }
274 
275 #ifdef UNITTESTS
276 
TmqhOutputFlowSetupCtxTest01(void)277 static int TmqhOutputFlowSetupCtxTest01(void)
278 {
279     TmqResetQueues();
280 
281     Tmq *tmq1 = TmqCreateQueue("queue1");
282     FAIL_IF_NULL(tmq1);
283     Tmq *tmq2 = TmqCreateQueue("queue2");
284     FAIL_IF_NULL(tmq2);
285     Tmq *tmq3 = TmqCreateQueue("another");
286     FAIL_IF_NULL(tmq3);
287     Tmq *tmq4 = TmqCreateQueue("yetanother");
288     FAIL_IF_NULL(tmq4);
289 
290     const char *str = "queue1,queue2,another,yetanother";
291     void *ctx = TmqhOutputFlowSetupCtx(str);
292     FAIL_IF_NULL(ctx);
293 
294     TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
295 
296     FAIL_IF_NOT(fctx->size == 4);
297 
298     FAIL_IF_NULL(fctx->queues);
299 
300     FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
301     FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
302     FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
303     FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
304 
305     TmqhOutputFlowFreeCtx(fctx);
306     TmqResetQueues();
307     PASS;
308 }
309 
TmqhOutputFlowSetupCtxTest02(void)310 static int TmqhOutputFlowSetupCtxTest02(void)
311 {
312     TmqResetQueues();
313 
314     Tmq *tmq1 = TmqCreateQueue("queue1");
315     FAIL_IF_NULL(tmq1);
316     Tmq *tmq2 = TmqCreateQueue("queue2");
317     FAIL_IF_NULL(tmq2);
318     Tmq *tmq3 = TmqCreateQueue("another");
319     FAIL_IF_NULL(tmq3);
320     Tmq *tmq4 = TmqCreateQueue("yetanother");
321     FAIL_IF_NULL(tmq4);
322 
323     const char *str = "queue1";
324     void *ctx = TmqhOutputFlowSetupCtx(str);
325     FAIL_IF_NULL(ctx);
326 
327     TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
328 
329     FAIL_IF_NOT(fctx->size == 1);
330 
331     FAIL_IF_NULL(fctx->queues);
332 
333     FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
334     TmqhOutputFlowFreeCtx(fctx);
335     TmqResetQueues();
336 
337     PASS;
338 }
339 
TmqhOutputFlowSetupCtxTest03(void)340 static int TmqhOutputFlowSetupCtxTest03(void)
341 {
342     TmqResetQueues();
343 
344     const char *str = "queue1,queue2,another,yetanother";
345     void *ctx = TmqhOutputFlowSetupCtx(str);
346     FAIL_IF_NULL(ctx);
347 
348     TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
349 
350     FAIL_IF_NOT(fctx->size == 4);
351 
352     FAIL_IF_NULL(fctx->queues);
353 
354     Tmq *tmq1 = TmqGetQueueByName("queue1");
355     FAIL_IF_NULL(tmq1);
356     Tmq *tmq2 = TmqGetQueueByName("queue2");
357     FAIL_IF_NULL(tmq2);
358     Tmq *tmq3 = TmqGetQueueByName("another");
359     FAIL_IF_NULL(tmq3);
360     Tmq *tmq4 = TmqGetQueueByName("yetanother");
361     FAIL_IF_NULL(tmq4);
362 
363     FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
364     FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
365     FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
366     FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
367 
368     TmqhOutputFlowFreeCtx(fctx);
369     TmqResetQueues();
370     PASS;
371 }
372 
373 #endif /* UNITTESTS */
374 
TmqhFlowRegisterTests(void)375 void TmqhFlowRegisterTests(void)
376 {
377 #ifdef UNITTESTS
378     UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
379                    TmqhOutputFlowSetupCtxTest01);
380     UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
381                    TmqhOutputFlowSetupCtxTest02);
382     UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
383                    TmqhOutputFlowSetupCtxTest03);
384 #endif
385 
386     return;
387 }
388