1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18 /**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23 *
24 * Simple output queue handler that makes sure all packets of the same flow
25 * are sent to the same queue. We support different kind of q handlers. Have
26 * a look at "autofp-scheduler" conf to further undertsand the various q
27 * handlers we provide.
28 */
29
30 #include "suricata.h"
31 #include "packet-queue.h"
32 #include "decode.h"
33 #include "threads.h"
34 #include "threadvars.h"
35 #include "tmqh-flow.h"
36
37 #include "tm-queuehandlers.h"
38
39 #include "conf.h"
40 #include "util-unittest.h"
41
42 Packet *TmqhInputFlow(ThreadVars *t);
43 void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
44 void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
45 void *TmqhOutputFlowSetupCtx(const char *queue_str);
46 void TmqhOutputFlowFreeCtx(void *ctx);
47 void TmqhFlowRegisterTests(void);
48
TmqhFlowRegister(void)49 void TmqhFlowRegister(void)
50 {
51 tmqh_table[TMQH_FLOW].name = "flow";
52 tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
53 tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
54 tmqh_table[TMQH_FLOW].OutHandlerCtxFree = TmqhOutputFlowFreeCtx;
55 tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;
56
57 const char *scheduler = NULL;
58 if (ConfGet("autofp-scheduler", &scheduler) == 1) {
59 if (strcasecmp(scheduler, "round-robin") == 0) {
60 SCLogNotice("using flow hash instead of round robin");
61 tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
62 } else if (strcasecmp(scheduler, "active-packets") == 0) {
63 SCLogNotice("using flow hash instead of active packets");
64 tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
65 } else if (strcasecmp(scheduler, "hash") == 0) {
66 tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
67 } else if (strcasecmp(scheduler, "ippair") == 0) {
68 tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair;
69 } else {
70 SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
71 "for autofp-scheduler in conf. Killing engine.",
72 scheduler);
73 exit(EXIT_FAILURE);
74 }
75 } else {
76 tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
77 }
78
79 return;
80 }
81
TmqhFlowPrintAutofpHandler(void)82 void TmqhFlowPrintAutofpHandler(void)
83 {
84 #define PRINT_IF_FUNC(f, msg) \
85 if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
86 SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
87
88 PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash");
89 PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair");
90
91 #undef PRINT_IF_FUNC
92 }
93
94 /* same as 'simple' */
TmqhInputFlow(ThreadVars * tv)95 Packet *TmqhInputFlow(ThreadVars *tv)
96 {
97 PacketQueue *q = tv->inq->pq;
98
99 StatsSyncCountersIfSignalled(tv);
100
101 SCMutexLock(&q->mutex_q);
102 if (q->len == 0) {
103 /* if we have no packets in queue, wait... */
104 SCCondWait(&q->cond_q, &q->mutex_q);
105 }
106
107 if (q->len > 0) {
108 Packet *p = PacketDequeue(q);
109 SCMutexUnlock(&q->mutex_q);
110 return p;
111 } else {
112 /* return NULL if we have no pkt. Should only happen on signals. */
113 SCMutexUnlock(&q->mutex_q);
114 return NULL;
115 }
116 }
117
StoreQueueId(TmqhFlowCtx * ctx,char * name)118 static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
119 {
120 void *ptmp;
121 Tmq *tmq = TmqGetQueueByName(name);
122 if (tmq == NULL) {
123 tmq = TmqCreateQueue(name);
124 if (tmq == NULL)
125 return -1;
126 }
127 tmq->writer_cnt++;
128
129 if (ctx->queues == NULL) {
130 ctx->size = 1;
131 ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
132 if (ctx->queues == NULL) {
133 return -1;
134 }
135 memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
136 } else {
137 ctx->size++;
138 ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
139 if (ptmp == NULL) {
140 SCFree(ctx->queues);
141 ctx->queues = NULL;
142 return -1;
143 }
144 ctx->queues = ptmp;
145
146 memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
147 }
148 ctx->queues[ctx->size - 1].q = tmq->pq;
149
150 return 0;
151 }
152
153 /**
154 * \brief setup the queue handlers ctx
155 *
156 * Parses a comma separated string "queuename1,queuename2,etc"
157 * and sets the ctx up to devide flows over these queue's.
158 *
159 * \param queue_str comma separated string with output queue names
160 *
161 * \retval ctx queues handlers ctx or NULL in error
162 */
TmqhOutputFlowSetupCtx(const char * queue_str)163 void *TmqhOutputFlowSetupCtx(const char *queue_str)
164 {
165 if (queue_str == NULL || strlen(queue_str) == 0)
166 return NULL;
167
168 SCLogDebug("queue_str %s", queue_str);
169
170 TmqhFlowCtx *ctx = SCMalloc(sizeof(TmqhFlowCtx));
171 if (unlikely(ctx == NULL))
172 return NULL;
173 memset(ctx,0x00,sizeof(TmqhFlowCtx));
174
175 char *str = SCStrdup(queue_str);
176 if (unlikely(str == NULL)) {
177 goto error;
178 }
179 char *tstr = str;
180
181 /* parse the comma separated string */
182 do {
183 char *comma = strchr(tstr,',');
184 if (comma != NULL) {
185 *comma = '\0';
186 char *qname = tstr;
187 int r = StoreQueueId(ctx,qname);
188 if (r < 0)
189 goto error;
190 } else {
191 char *qname = tstr;
192 int r = StoreQueueId(ctx,qname);
193 if (r < 0)
194 goto error;
195 }
196 tstr = comma ? (comma + 1) : comma;
197 } while (tstr != NULL);
198
199 SCFree(str);
200 return (void *)ctx;
201
202 error:
203 SCFree(ctx);
204 if (str != NULL)
205 SCFree(str);
206 return NULL;
207 }
208
TmqhOutputFlowFreeCtx(void * ctx)209 void TmqhOutputFlowFreeCtx(void *ctx)
210 {
211 TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
212
213 SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
214 fctx->size);
215 SCFree(fctx->queues);
216 SCFree(fctx);
217
218 return;
219 }
220
TmqhOutputFlowHash(ThreadVars * tv,Packet * p)221 void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
222 {
223 uint32_t qid;
224 TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
225
226 if (p->flags & PKT_WANTS_FLOW) {
227 uint32_t hash = p->flow_hash;
228 qid = hash % ctx->size;
229 } else {
230 qid = ctx->last++;
231
232 if (ctx->last == ctx->size)
233 ctx->last = 0;
234 }
235
236 PacketQueue *q = ctx->queues[qid].q;
237 SCMutexLock(&q->mutex_q);
238 PacketEnqueue(q, p);
239 SCCondSignal(&q->cond_q);
240 SCMutexUnlock(&q->mutex_q);
241
242 return;
243 }
244
245 /**
246 * \brief select the queue to output based on IP address pair.
247 *
248 * \param tv thread vars.
249 * \param p packet.
250 */
TmqhOutputFlowIPPair(ThreadVars * tv,Packet * p)251 void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
252 {
253 uint32_t addr_hash = 0;
254
255 TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
256
257 if (p->src.family == AF_INET6) {
258 for (int i = 0; i < 4; i++) {
259 addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
260 }
261 } else {
262 addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
263 }
264
265 uint32_t qid = addr_hash % ctx->size;
266 PacketQueue *q = ctx->queues[qid].q;
267 SCMutexLock(&q->mutex_q);
268 PacketEnqueue(q, p);
269 SCCondSignal(&q->cond_q);
270 SCMutexUnlock(&q->mutex_q);
271
272 return;
273 }
274
275 #ifdef UNITTESTS
276
TmqhOutputFlowSetupCtxTest01(void)277 static int TmqhOutputFlowSetupCtxTest01(void)
278 {
279 TmqResetQueues();
280
281 Tmq *tmq1 = TmqCreateQueue("queue1");
282 FAIL_IF_NULL(tmq1);
283 Tmq *tmq2 = TmqCreateQueue("queue2");
284 FAIL_IF_NULL(tmq2);
285 Tmq *tmq3 = TmqCreateQueue("another");
286 FAIL_IF_NULL(tmq3);
287 Tmq *tmq4 = TmqCreateQueue("yetanother");
288 FAIL_IF_NULL(tmq4);
289
290 const char *str = "queue1,queue2,another,yetanother";
291 void *ctx = TmqhOutputFlowSetupCtx(str);
292 FAIL_IF_NULL(ctx);
293
294 TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
295
296 FAIL_IF_NOT(fctx->size == 4);
297
298 FAIL_IF_NULL(fctx->queues);
299
300 FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
301 FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
302 FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
303 FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
304
305 TmqhOutputFlowFreeCtx(fctx);
306 TmqResetQueues();
307 PASS;
308 }
309
TmqhOutputFlowSetupCtxTest02(void)310 static int TmqhOutputFlowSetupCtxTest02(void)
311 {
312 TmqResetQueues();
313
314 Tmq *tmq1 = TmqCreateQueue("queue1");
315 FAIL_IF_NULL(tmq1);
316 Tmq *tmq2 = TmqCreateQueue("queue2");
317 FAIL_IF_NULL(tmq2);
318 Tmq *tmq3 = TmqCreateQueue("another");
319 FAIL_IF_NULL(tmq3);
320 Tmq *tmq4 = TmqCreateQueue("yetanother");
321 FAIL_IF_NULL(tmq4);
322
323 const char *str = "queue1";
324 void *ctx = TmqhOutputFlowSetupCtx(str);
325 FAIL_IF_NULL(ctx);
326
327 TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
328
329 FAIL_IF_NOT(fctx->size == 1);
330
331 FAIL_IF_NULL(fctx->queues);
332
333 FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
334 TmqhOutputFlowFreeCtx(fctx);
335 TmqResetQueues();
336
337 PASS;
338 }
339
TmqhOutputFlowSetupCtxTest03(void)340 static int TmqhOutputFlowSetupCtxTest03(void)
341 {
342 TmqResetQueues();
343
344 const char *str = "queue1,queue2,another,yetanother";
345 void *ctx = TmqhOutputFlowSetupCtx(str);
346 FAIL_IF_NULL(ctx);
347
348 TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
349
350 FAIL_IF_NOT(fctx->size == 4);
351
352 FAIL_IF_NULL(fctx->queues);
353
354 Tmq *tmq1 = TmqGetQueueByName("queue1");
355 FAIL_IF_NULL(tmq1);
356 Tmq *tmq2 = TmqGetQueueByName("queue2");
357 FAIL_IF_NULL(tmq2);
358 Tmq *tmq3 = TmqGetQueueByName("another");
359 FAIL_IF_NULL(tmq3);
360 Tmq *tmq4 = TmqGetQueueByName("yetanother");
361 FAIL_IF_NULL(tmq4);
362
363 FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
364 FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
365 FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
366 FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
367
368 TmqhOutputFlowFreeCtx(fctx);
369 TmqResetQueues();
370 PASS;
371 }
372
373 #endif /* UNITTESTS */
374
TmqhFlowRegisterTests(void)375 void TmqhFlowRegisterTests(void)
376 {
377 #ifdef UNITTESTS
378 UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
379 TmqhOutputFlowSetupCtxTest01);
380 UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
381 TmqhOutputFlowSetupCtxTest02);
382 UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
383 TmqhOutputFlowSetupCtxTest03);
384 #endif
385
386 return;
387 }
388