1 /*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License Version 2 as
4 * published by the Free Software Foundation. You may not use, modify or
5 * distribute this program under any other version of the GNU General
6 * Public License.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 *
17 * Copyright (C) 2014-2021 Cisco and/or its affiliates. All rights reserved.
18 * Copyright (C) 2012-2013 Sourcefire, Inc.
19 *
20 * Author: Michael Altizer <maltizer@sourcefire.com>
21 *
22 */
23
24 #ifdef SIDE_CHANNEL
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #ifdef HAVE_GETTID
31 #define _GNU_SOURCE
32 #endif
33
34 #include <signal.h>
35
36 #include "dmq.h"
37 #include "rbmq.h"
38 #include "sidechannel.h"
39 #include "sscm_logger.h"
40
41 #define DEFAULT_RX_QUEUE_DEPTH 1024
42 #define DEFAULT_RX_QUEUE_DATA_SIZE 10485760
43 #define DEFAULT_TX_QUEUE_DEPTH 1024
44 #define DEFAULT_TX_QUEUE_DATA_SIZE 10485760
45
46 #define CONF_SEPARATORS " \t\n\r,"
47 #define CONF_RX_QUEUE_DATA_SIZE "rx-queue-data-size"
48 #define CONF_RX_QUEUE_DEPTH "rx-queue-depth"
49 #define CONF_TX_QUEUE_DATA_SIZE "tx-queue-data-size"
50 #define CONF_TX_QUEUE_DEPTH "tx-queue-depth"
51 #define CONF_DISABLE_TX_THREAD "disable-tx-thread"
52
53 #ifdef SC_USE_DMQ
54 #define RBMQ_Ptr DMQ_Ptr
55 #define RBMQ_Alloc DMQ_Alloc
56 #define RBMQ_ReserveMsg DMQ_ReserveMsg
57 #define RBMQ_CommitReservedMsg DMQ_CommitReservedMsg
58 #define RBMQ_DiscardReservedMsg DMQ_DiscardReservedMsg
59 #define RBMQ_CommitExternalMsg DMQ_CommitExternalMsg
60 #define RBMQ_ReadMsg DMQ_ReadMsg
61 #define RBMQ_AckMsg DMQ_AckMsg
62 #define RBMQ_IsEmpty DMQ_IsEmpty
63 #define RBMQ_Stats DMQ_Stats
64 #endif
65
66 enum ConfState
67 {
68 STATE_START,
69 STATE_RX_QUEUE_DATA_SIZE,
70 STATE_RX_QUEUE_DEPTH,
71 STATE_TX_QUEUE_DATA_SIZE,
72 STATE_TX_QUEUE_DEPTH
73 };
74
75 typedef struct _SC_CONFIG
76 {
77 uint32_t rx_queue_max_data_size;
78 uint32_t rx_queue_max_depth;
79 uint32_t tx_queue_max_data_size;
80 uint32_t tx_queue_max_depth;
81 bool disable_tx_thread;
82 bool enabled;
83 } SCConfig;
84
85 typedef struct _SC_MODULE
86 {
87 struct _SC_MODULE *next;
88 char *keyword;
89 SCMFunctionBundle funcs;
90 bool enabled;
91 } SCModule;
92
93 typedef struct _SC_HANDLER
94 {
95 struct _SC_HANDLER *next;
96 uint16_t type;
97 SCMProcessMsgFunc processMsgFunc;
98 void *data;
99 } SCHandler;
100
101 typedef struct _SC_MESSAGE_QUEUE
102 {
103 RBMQ_Ptr queue;
104 pthread_mutex_t mutex;
105 pthread_cond_t cond;
106 uint32_t max_data_size;
107 uint32_t max_depth;
108 } SCMessageQueue;
109
110 static struct {
111 uint64_t rx_messages_total;
112 uint64_t rx_messages_processed_ib;
113 uint64_t rx_messages_processed_oob;
114 uint64_t tx_messages_total;
115 uint64_t tx_messages_processed;
116 } Side_Channel_Stats;
117
118 static volatile int stop_processing = 0;
119 static volatile int tx_thread_running = 0;
120
121 static pid_t tx_thread_pid;
122 static pthread_t tx_thread_id;
123 static pthread_t *p_tx_thread_id;
124
125 static SCConfig sc_config;
126
127 static SCMessageQueue rx_queue;
128 static SCMessageQueue tx_queue;
129
130 static SCModule *modules;
131 static SCHandler *rx_handlers;
132 static SCHandler *tx_handlers;
133
134 #ifdef PERF_PROFILING
135 PreprocStats sideChannelRxPerfStats;
136 #endif
137
RegisterSideChannelModules(void)138 void RegisterSideChannelModules(void)
139 {
140 if (!ScSideChannelEnabled())
141 return;
142
143 SetupLoggerSCM();
144 }
145
RegisterSideChannelModule(const char * keyword,SCMFunctionBundle * funcs)146 void RegisterSideChannelModule(const char *keyword, SCMFunctionBundle *funcs)
147 {
148 SCModule *module, *tmp, *last = NULL;
149
150 if (!ScSideChannelEnabled())
151 return;
152
153 if (!keyword)
154 FatalError("No keyword given while registering a side channel module!\n");
155
156 if (!funcs)
157 FatalError("No function bundle given while registering side channel '%s'!\n", keyword);
158
159 for (tmp = modules; tmp; tmp = tmp->next)
160 {
161 if (strcasecmp(tmp->keyword, keyword) == 0)
162 FatalError("Duplicate side channel keyword: %s\n", keyword);
163 last = tmp;
164 }
165 module = SnortAlloc(sizeof(SCModule));
166
167 module->next = NULL;
168 module->keyword = SnortStrdup(keyword);
169 module->funcs = *funcs;
170 module->enabled = 0;
171
172 LogMessage("Register SCM '%s' with configFunc=%p, initFunc=%p, postInitFunc=%p, idleFunc=%p, statsFunc=%p, shutdownFunc=%p\n",
173 keyword, module->funcs.configFunc, module->funcs.initFunc, module->funcs.postInitFunc,
174 module->funcs.idleFunc, module->funcs.statsFunc, module->funcs.shutdownFunc);
175
176 if (last)
177 last->next = module;
178 else
179 modules = module;
180 }
181
ConfigureSideChannelModule(const char * keyword,char * opts)182 int ConfigureSideChannelModule(const char *keyword, char *opts)
183 {
184 SCModule *module;
185
186 for (module = modules; module; module = module->next)
187 {
188 if (strcasecmp(module->keyword, keyword) == 0)
189 break;
190 }
191 if (!module)
192 return -ENOENT;
193
194 module->funcs.configFunc(opts);
195 module->enabled = 1;
196
197 return 0;
198 }
199
SCRegisterHandler(SCHandler ** handlers,uint16_t type,SCMProcessMsgFunc processMsgFunc,void * data)200 static int SCRegisterHandler(SCHandler **handlers, uint16_t type, SCMProcessMsgFunc processMsgFunc, void *data)
201 {
202 SCHandler *handler;
203
204 if (!ScSideChannelEnabled())
205 return 0;
206
207 handler = SnortAlloc(sizeof(SCHandler));
208
209 handler->next = NULL;
210 handler->type = type;
211 handler->processMsgFunc = processMsgFunc;
212 handler->data = data;
213
214 handler->next = *handlers;
215 *handlers = handler;
216
217 return 0;
218 }
219
SideChannelRegisterRXHandler(uint16_t type,SCMProcessMsgFunc processMsgFunc,void * data)220 int SideChannelRegisterRXHandler(uint16_t type, SCMProcessMsgFunc processMsgFunc, void *data)
221 {
222 return SCRegisterHandler(&rx_handlers, type, processMsgFunc, data);
223 }
224
SideChannelRegisterTXHandler(uint16_t type,SCMProcessMsgFunc processMsgFunc,void * data)225 int SideChannelRegisterTXHandler(uint16_t type, SCMProcessMsgFunc processMsgFunc, void *data)
226 {
227 return SCRegisterHandler(&tx_handlers, type, processMsgFunc, data);
228 }
229
SCUnregisterHandler(SCHandler ** handlers,uint16_t type,SCMProcessMsgFunc processMsgFunc)230 static void SCUnregisterHandler(SCHandler **handlers, uint16_t type, SCMProcessMsgFunc processMsgFunc)
231 {
232 SCHandler *handler, *prev;
233
234 if (!ScSideChannelEnabled())
235 return;
236
237 for (prev = NULL, handler = *handlers; handler; prev = handler, handler = handler->next)
238 {
239 if (handler->type == type && handler->processMsgFunc == processMsgFunc)
240 break;
241 }
242
243 if (handler)
244 {
245 if (!prev)
246 *handlers = handler->next;
247 else
248 prev->next = handler->next;
249
250 free(handler);
251 }
252 }
253
SideChannelUnregisterRXHandler(uint16_t type,SCMProcessMsgFunc processMsgFunc)254 void SideChannelUnregisterRXHandler(uint16_t type, SCMProcessMsgFunc processMsgFunc)
255 {
256 SCUnregisterHandler(&rx_handlers, type, processMsgFunc);
257 }
258
SideChannelUnregisterTXHandler(uint16_t type,SCMProcessMsgFunc processMsgFunc)259 void SideChannelUnregisterTXHandler(uint16_t type, SCMProcessMsgFunc processMsgFunc)
260 {
261 SCUnregisterHandler(&tx_handlers, type, processMsgFunc);
262 }
263
SCPreallocMessage(SCMessageQueue * mq,uint32_t length,SCMsgHdr ** hdr_ptr,uint8_t ** msg_ptr,void ** msg_handle)264 static int SCPreallocMessage(SCMessageQueue *mq, uint32_t length, SCMsgHdr **hdr_ptr, uint8_t **msg_ptr, void **msg_handle)
265 {
266 int rval;
267
268 pthread_mutex_lock(&mq->mutex);
269 rval = RBMQ_ReserveMsg(mq->queue, length, (void **) hdr_ptr, msg_ptr, msg_handle);
270 pthread_mutex_unlock(&mq->mutex);
271
272 return rval;
273 }
274
SideChannelPreallocMessageRX(uint32_t length,SCMsgHdr ** hdr_ptr,uint8_t ** msg_ptr,void ** msg_handle)275 int SideChannelPreallocMessageRX(uint32_t length, SCMsgHdr **hdr_ptr, uint8_t **msg_ptr, void **msg_handle)
276 {
277 return SCPreallocMessage(&rx_queue, length, hdr_ptr, msg_ptr, msg_handle);
278 }
279
SideChannelPreallocMessageTX(uint32_t length,SCMsgHdr ** hdr_ptr,uint8_t ** msg_ptr,void ** msg_handle)280 int SideChannelPreallocMessageTX(uint32_t length, SCMsgHdr **hdr_ptr, uint8_t **msg_ptr, void **msg_handle)
281 {
282 return SCPreallocMessage(&tx_queue, length, hdr_ptr, msg_ptr, msg_handle);
283 }
284
SCDiscardMessage(SCMessageQueue * mq,void * msg_handle)285 static int SCDiscardMessage(SCMessageQueue *mq, void *msg_handle)
286 {
287 int rval;
288
289 pthread_mutex_lock(&mq->mutex);
290 rval = RBMQ_DiscardReservedMsg(mq->queue, msg_handle);
291 pthread_mutex_unlock(&mq->mutex);
292
293 return rval;
294 }
295
SideChannelDiscardMessageRX(void * msg_handle)296 int SideChannelDiscardMessageRX(void *msg_handle)
297 {
298 return SCDiscardMessage(&rx_queue, msg_handle);
299 }
300
SideChannelDiscardMessageTX(void * msg_handle)301 int SideChannelDiscardMessageTX(void *msg_handle)
302 {
303 return SCDiscardMessage(&tx_queue, msg_handle);
304 }
305
SCEnqueueMessage(SCMessageQueue * mq,SCMsgHdr * hdr,const uint8_t * msg,uint32_t length,void * msg_handle,SCMQMsgFreeFunc msgFreeFunc)306 static int SCEnqueueMessage(SCMessageQueue *mq, SCMsgHdr *hdr, const uint8_t *msg, uint32_t length, void *msg_handle, SCMQMsgFreeFunc msgFreeFunc)
307 {
308 int rval;
309
310 if (!msg_handle)
311 {
312 SCMsgHdr *hdr_ptr;
313 uint8_t *msg_ptr;
314
315 rval = RBMQ_ReserveMsg(mq->queue, length, (void **) &hdr_ptr, &msg_ptr, &msg_handle);
316 if (rval != 0)
317 {
318 ErrorMessage("%s: Could not reserve message: %d\n", __FUNCTION__, rval);
319 return rval;
320 }
321 memcpy(msg_ptr, msg, length);
322 memcpy(hdr_ptr, hdr, sizeof(SCMsgHdr));
323 rval = RBMQ_CommitReservedMsg(mq->queue, msg_handle, length, msgFreeFunc);
324 if (rval != 0)
325 {
326 ErrorMessage("%s: Could not commit reserved message: %d\n", __FUNCTION__, rval);
327 return rval;
328 }
329 }
330 else
331 rval = RBMQ_CommitReservedMsg(mq->queue, msg_handle, length, msgFreeFunc);
332
333 return rval;
334 }
335
SCProcessMessage(SCHandler * handlers,SCMsgHdr * hdr,const uint8_t * msg,uint32_t length)336 static inline void SCProcessMessage(SCHandler *handlers, SCMsgHdr *hdr, const uint8_t *msg, uint32_t length)
337 {
338 SCHandler *handler;
339
340 for (handler = handlers; handler; handler = handler->next)
341 {
342 if (hdr->type == handler->type || handler->type == SC_MSG_TYPE_ANY)
343 handler->processMsgFunc(hdr, msg, length);
344 }
345 }
346
SCDrainAndProcess(SCMessageQueue * mq,SCHandler * handlers)347 static int SCDrainAndProcess(SCMessageQueue *mq, SCHandler *handlers)
348 {
349 SCMsgHdr *hdr;
350 uint32_t length;
351 const uint8_t *msg;
352 void *msg_handle;
353 int rval;
354
355 /* Read a message from the queue. */
356 pthread_mutex_lock(&mq->mutex);
357 rval = RBMQ_ReadMsg(mq->queue, (const void **) &hdr, &msg, &length, &msg_handle);
358 pthread_mutex_unlock(&mq->mutex);
359 if (rval != 0)
360 return 1;
361
362 /* Handle it. */
363 SCProcessMessage(handlers, hdr, msg, length);
364
365 /* And, finally, acknowledge it. */
366 pthread_mutex_lock(&mq->mutex);
367 rval = RBMQ_AckMsg(mq->queue, msg_handle);
368 pthread_mutex_unlock(&mq->mutex);
369 if (rval != 0)
370 WarningMessage("Error ACK'ing message %p!\n", msg_handle);
371
372 return 0;
373 }
374
375 /* Called by an out-of-band thread (probably a Side Channel Module). */
SideChannelEnqueueMessageRX(SCMsgHdr * hdr,const uint8_t * msg,uint32_t length,void * msg_handle,SCMQMsgFreeFunc msgFreeFunc)376 int SideChannelEnqueueMessageRX(SCMsgHdr *hdr, const uint8_t *msg, uint32_t length, void *msg_handle, SCMQMsgFreeFunc msgFreeFunc)
377 {
378 int rval;
379
380 /*
381 * During snort exit, any messages need not to be processed. So checking
382 * stop_processing before processing the message.
383 */
384 if (stop_processing) {
385 return -1;
386 }
387 /*
388 * Because the Snort main thread relinquishes control to DAQ_Acquire for up to a second,
389 * we potentially need to preempt it and process RX messages as they are being enqueued
390 * to avoid backups and overruns.
391 * This should be safe since the main thread holds the snort_process_lock mutex while it
392 * is not in DAQ_Acquire().
393 */
394 while (pthread_mutex_trylock(&snort_process_lock) == 0)
395 {
396 /* If there are no more messages in the RX queue, process the new message without enqueuing it and return. */
397 if (SCDrainAndProcess(&rx_queue, rx_handlers) != 0)
398 {
399 SCProcessMessage(rx_handlers, hdr, msg, length);
400 if (msgFreeFunc)
401 msgFreeFunc((uint8_t *) msg);
402 if (msg_handle)
403 {
404 pthread_mutex_lock(&rx_queue.mutex);
405 RBMQ_DiscardReservedMsg(rx_queue.queue, msg_handle);
406 pthread_mutex_unlock(&rx_queue.mutex);
407 }
408 Side_Channel_Stats.rx_messages_total++;
409 Side_Channel_Stats.rx_messages_processed_oob++;
410
411 pthread_mutex_unlock(&snort_process_lock);
412 return 0;
413 }
414 else
415 Side_Channel_Stats.rx_messages_processed_oob++;
416
417 pthread_mutex_unlock(&snort_process_lock);
418 }
419
420 /* Finally, enqueue the message if we really have to. */
421 pthread_mutex_lock(&rx_queue.mutex);
422 rval = SCEnqueueMessage(&rx_queue, hdr, msg, length, msg_handle, msgFreeFunc);
423 /* TODO: Error check the above call. */
424 Side_Channel_Stats.rx_messages_total++;
425 pthread_mutex_unlock(&rx_queue.mutex);
426
427 return rval;
428 }
429
430 /* Called in the Snort main thread. */
SideChannelEnqueueMessageTX(SCMsgHdr * hdr,const uint8_t * msg,uint32_t length,void * msg_handle,SCMQMsgFreeFunc msgFreeFunc)431 int SideChannelEnqueueMessageTX(SCMsgHdr *hdr, const uint8_t *msg, uint32_t length, void *msg_handle, SCMQMsgFreeFunc msgFreeFunc)
432 {
433 int rval, empty;
434
435 /* Only bother queuing if the TX thread is running, otherwise just immediately process. */
436 if (tx_thread_running)
437 {
438 pthread_mutex_lock(&tx_queue.mutex);
439 empty = RBMQ_IsEmpty(tx_queue.queue);
440 rval = SCEnqueueMessage(&tx_queue, hdr, msg, length, msg_handle, msgFreeFunc);
441 /* TODO: Error check the above call. */
442 Side_Channel_Stats.tx_messages_total++;
443 /* If the queue was empty, signal any waiters. */
444 if (empty)
445 pthread_cond_signal(&tx_queue.cond);
446 pthread_mutex_unlock(&tx_queue.mutex);
447 }
448 else
449 {
450 SCProcessMessage(tx_handlers, hdr, msg, length);
451 Side_Channel_Stats.tx_messages_total++;
452 Side_Channel_Stats.tx_messages_processed++;
453 if (msgFreeFunc)
454 msgFreeFunc((uint8_t *) msg);
455 if (msg_handle)
456 {
457 pthread_mutex_lock(&tx_queue.mutex);
458 RBMQ_DiscardReservedMsg(tx_queue.queue, msg_handle);
459 pthread_mutex_unlock(&tx_queue.mutex);
460 }
461 rval = 0;
462 }
463
464 return rval;
465 }
466
SCEnqueueData(SCMessageQueue * mq,SCMsgHdr * hdr,uint8_t * msg,uint32_t length,SCMQMsgFreeFunc msgFreeFunc)467 static int SCEnqueueData(SCMessageQueue *mq, SCMsgHdr *hdr, uint8_t *msg, uint32_t length, SCMQMsgFreeFunc msgFreeFunc)
468 {
469 return RBMQ_CommitExternalMsg(mq->queue, hdr, msg, length, msgFreeFunc);
470 }
471
472 /* Called by an out-of-band thread (probably a Side Channel Module). */
SideChannelEnqueueDataRX(SCMsgHdr * hdr,uint8_t * msg,uint32_t length,SCMQMsgFreeFunc msgFreeFunc)473 int SideChannelEnqueueDataRX(SCMsgHdr *hdr, uint8_t *msg, uint32_t length, SCMQMsgFreeFunc msgFreeFunc)
474 {
475 int rval;
476
477 pthread_mutex_lock(&rx_queue.mutex);
478 rval = SCEnqueueData(&rx_queue, hdr, msg, length, msgFreeFunc);
479 /* TODO: Error check the above call. */
480 Side_Channel_Stats.rx_messages_total++;
481 pthread_mutex_unlock(&rx_queue.mutex);
482
483 return rval;
484 }
485
486 /* Called in the Snort main thread. */
SideChannelEnqueueDataTX(SCMsgHdr * hdr,uint8_t * msg,uint32_t length,SCMQMsgFreeFunc msgFreeFunc)487 int SideChannelEnqueueDataTX(SCMsgHdr *hdr, uint8_t *msg, uint32_t length, SCMQMsgFreeFunc msgFreeFunc)
488 {
489 int rval, empty;
490
491 /* Only bother queuing if the TX thread is running, otherwise just immediately process. */
492 if (tx_thread_running)
493 {
494 pthread_mutex_lock(&tx_queue.mutex);
495 empty = RBMQ_IsEmpty(tx_queue.queue);
496 rval = SCEnqueueData(&tx_queue, hdr, msg, length, msgFreeFunc);
497 /* TODO: Error check the above call. */
498 Side_Channel_Stats.tx_messages_total++;
499 /* If the queue was empty, signal any waiters. */
500 if (empty)
501 pthread_cond_signal(&tx_queue.cond);
502 pthread_mutex_unlock(&tx_queue.mutex);
503 }
504 else
505 {
506 SCProcessMessage(tx_handlers, hdr, msg, length);
507 Side_Channel_Stats.tx_messages_total++;
508 Side_Channel_Stats.tx_messages_processed++;
509 msgFreeFunc(msg);
510 rval = 0;
511 }
512
513 return rval;
514 }
515
516 /* Called in the Snort main thread. */
SideChannelDrainRX(unsigned max_msgs)517 uint32_t SideChannelDrainRX(unsigned max_msgs)
518 {
519 uint32_t processed = 0;
520
521 if (!ScSideChannelEnabled())
522 return 0;
523
524 if (RBMQ_IsEmpty(rx_queue.queue))
525 return 0;
526
527 while (!max_msgs || processed < max_msgs)
528 {
529 if (stop_processing || SCDrainAndProcess(&rx_queue, rx_handlers) != 0)
530 break;
531
532 Side_Channel_Stats.rx_messages_processed_ib++;
533 processed++;
534 }
535
536 return processed;
537 }
538
SideChannelThread(void * arg)539 static void *SideChannelThread(void *arg)
540 {
541 struct timespec ts;
542 struct timeval tv;
543 SCHandler *handler;
544 SCModule *module;
545 SCMsgHdr *hdr;
546 uint32_t length;
547 const uint8_t *msg;
548 void *msg_handle;
549 int rval;
550
551 tx_thread_pid = gettid();
552 tx_thread_running = 1;
553
554 pthread_mutex_lock(&tx_queue.mutex);
555 while (!stop_processing)
556 {
557 /* If the message queue is empty, we will stop without unlocking it so we can immediately start a timed wait. */
558 while ((rval = RBMQ_ReadMsg(tx_queue.queue, (const void **) &hdr, &msg, &length, &msg_handle)) == 0)
559 {
560 pthread_mutex_unlock(&tx_queue.mutex);
561
562 for (handler = tx_handlers; handler; handler = handler->next)
563 {
564 if (hdr->type == handler->type || handler->type == SC_MSG_TYPE_ANY)
565 handler->processMsgFunc(hdr, msg, length);
566 }
567
568 pthread_mutex_lock(&tx_queue.mutex);
569 rval = RBMQ_AckMsg(tx_queue.queue, msg_handle);
570 if (rval != 0)
571 WarningMessage("Error ACK'ing message %p!\n", msg_handle);
572 /* Again, not unlocking so that we're already locked for the three places we can go
573 from here, which are all expecting it (dequeue, timed wait, or done). */
574
575 Side_Channel_Stats.tx_messages_processed++;
576 #ifndef REG_TEST
577 if (stop_processing)
578 goto done;
579 #endif
580 }
581 if (stop_processing)
582 goto done;
583 gettimeofday(&tv, NULL);
584 ts.tv_sec = tv.tv_sec + 10;
585 ts.tv_nsec = tv.tv_usec * 1000;
586 rval = pthread_cond_timedwait(&tx_queue.cond, &tx_queue.mutex, &ts);
587 /* If we timed out waiting for new output messages to process, run the registered idle routines. */
588 if (rval == ETIMEDOUT && !stop_processing)
589 {
590 for (module = modules; module; module = module->next)
591 {
592 if (module->enabled && module->funcs.idleFunc)
593 module->funcs.idleFunc();
594 }
595 }
596 }
597 done:
598 pthread_mutex_unlock(&tx_queue.mutex);
599 tx_thread_running = 0;
600
601 LogMessage("Side Channel thread exiting...\n");
602
603 return NULL;
604 }
605
SCParseConfiguration(SnortConfig * sc,SCConfig * config)606 static void SCParseConfiguration(SnortConfig *sc, SCConfig *config)
607 {
608 long int value;
609 char *token, *argcpy, *endptr;
610 enum ConfState confState = STATE_START;
611
612 memset(config, 0, sizeof(SCConfig));
613
614 config->enabled = sc->side_channel_config.enabled;
615 if (!config->enabled)
616 return;
617
618 config->rx_queue_max_data_size = DEFAULT_RX_QUEUE_DATA_SIZE;
619 config->rx_queue_max_depth = DEFAULT_RX_QUEUE_DEPTH;
620 config->tx_queue_max_data_size = DEFAULT_TX_QUEUE_DATA_SIZE;
621 config->tx_queue_max_depth = DEFAULT_TX_QUEUE_DEPTH;
622 config->disable_tx_thread = false;
623
624 if (!sc->side_channel_config.opts)
625 return;
626
627 argcpy = sc->side_channel_config.opts;
628 for (token = strtok(argcpy, CONF_SEPARATORS); token; token = strtok(NULL, CONF_SEPARATORS))
629 {
630 switch (confState)
631 {
632 case STATE_START:
633 if (strcmp(token, CONF_RX_QUEUE_DATA_SIZE) == 0)
634 confState = STATE_RX_QUEUE_DATA_SIZE;
635 else if (strcmp(token, CONF_RX_QUEUE_DEPTH) == 0)
636 confState = STATE_RX_QUEUE_DEPTH;
637 else if (strcmp(token, CONF_TX_QUEUE_DATA_SIZE) == 0)
638 confState = STATE_TX_QUEUE_DATA_SIZE;
639 else if (strcmp(token, CONF_TX_QUEUE_DEPTH) == 0)
640 confState = STATE_TX_QUEUE_DEPTH;
641 else if (strcmp(token, CONF_DISABLE_TX_THREAD) == 0)
642 config->disable_tx_thread = true;
643 else
644 FatalError("Invalid side channel configuration token: '%s'\n", token);
645 break;
646 case STATE_RX_QUEUE_DATA_SIZE:
647 confState = STATE_START;
648 value = SnortStrtoul(token, &endptr, 0);
649 if (errno != 0 || *endptr != '\0')
650 FatalError("Invalid argument for side channel RX queue data size: '%s'\n", token);
651 config->rx_queue_max_data_size = value;
652 break;
653 case STATE_RX_QUEUE_DEPTH:
654 confState = STATE_START;
655 value = SnortStrtoul(token, &endptr, 0);
656 if (errno != 0 || *endptr != '\0')
657 FatalError("Invalid argument for side channel RX queue depth: '%s'\n", token);
658 config->rx_queue_max_depth = value;
659 break;
660 case STATE_TX_QUEUE_DATA_SIZE:
661 confState = STATE_START;
662 value = SnortStrtoul(token, &endptr, 0);
663 if (errno != 0 || *endptr != '\0')
664 FatalError("Invalid argument for side channel TX queue data size: '%s'\n", token);
665 config->tx_queue_max_data_size = value;
666 break;
667 case STATE_TX_QUEUE_DEPTH:
668 confState = STATE_START;
669 value = SnortStrtoul(token, &endptr, 0);
670 if (errno != 0 || *endptr != '\0')
671 FatalError("Invalid argument for side channel TX queue depth: '%s'\n", token);
672 config->tx_queue_max_depth = value;
673 break;
674 default:
675 break;
676 }
677 }
678 }
679
680 #ifdef SNORT_RELOAD
SideChannelVerifyConfig(SnortConfig * sc)681 int SideChannelVerifyConfig(SnortConfig *sc)
682 {
683 SCConfig config;
684
685 SCParseConfiguration(sc, &config);
686
687 return memcmp(&config, &sc_config, sizeof(SCConfig));
688 }
689 #endif
690
SideChannelConfigure(SnortConfig * sc)691 void SideChannelConfigure(SnortConfig *sc)
692 {
693 if (!sc->side_channel_config.enabled)
694 return;
695
696 SCParseConfiguration(sc, &sc_config);
697
698 rx_queue.max_data_size = sc_config.rx_queue_max_data_size;
699 rx_queue.max_depth = sc_config.rx_queue_max_depth;
700 tx_queue.max_data_size = sc_config.tx_queue_max_data_size;
701 tx_queue.max_depth = sc_config.tx_queue_max_depth;
702
703 LogMessage("Side Channel config:\n");
704 LogMessage(" RX Queue Max Data Size: %u\n", sc_config.rx_queue_max_data_size);
705 LogMessage(" RX Queue Max Depth: %u\n", sc_config.rx_queue_max_depth);
706 LogMessage(" TX Queue Max Data Size: %u\n", sc_config.tx_queue_max_data_size);
707 LogMessage(" RX Queue Max Depth: %u\n", sc_config.tx_queue_max_depth);
708 }
709
SideChannelInit(void)710 void SideChannelInit(void)
711 {
712 SCModule *module;
713
714 if (!ScSideChannelEnabled())
715 return;
716
717 pthread_mutex_init(&rx_queue.mutex, NULL);
718 pthread_cond_init(&rx_queue.cond, NULL);
719 rx_queue.queue = RBMQ_Alloc(rx_queue.max_depth, sizeof(SCMsgHdr), rx_queue.max_data_size);
720
721 pthread_cond_init(&tx_queue.cond, NULL);
722 pthread_mutex_init(&tx_queue.mutex, NULL);
723 tx_queue.queue = RBMQ_Alloc(tx_queue.max_depth, sizeof(SCMsgHdr), tx_queue.max_data_size);
724
725 for (module = modules; module; module = module->next)
726 {
727 if (module->enabled && module->funcs.initFunc)
728 module->funcs.initFunc();
729 }
730 }
731
SideChannelStartTXThread(void)732 void SideChannelStartTXThread(void)
733 {
734 const struct timespec thread_sleep = { 0, 100 };
735 SCModule *module;
736 sigset_t mask;
737 int found, rval;
738
739 if (!ScSideChannelEnabled())
740 return;
741
742 if (sc_config.disable_tx_thread)
743 return;
744
745 /* Avoid starting the TX thread if there are no TX handlers or TX idle tasks registered. */
746 found = 0;
747 for (module = modules; module; module = module->next)
748 {
749 if (module->enabled && module->funcs.idleFunc)
750 {
751 found = 1;
752 break;
753 }
754 }
755 if (!found && !tx_handlers)
756 {
757 LogMessage("Not starting unnecessary Side Channel TX thread.\n");
758 return;
759 }
760
761 /* Spin off the Side Channel handler thread. */
762 sigemptyset(&mask);
763 sigaddset(&mask, SIGTERM);
764 sigaddset(&mask, SIGQUIT);
765 sigaddset(&mask, SIGPIPE);
766 sigaddset(&mask, SIGINT);
767 sigaddset(&mask, SIGNAL_SNORT_RELOAD);
768 sigaddset(&mask, SIGNAL_SNORT_DUMP_STATS);
769 sigaddset(&mask, SIGUSR1);
770 sigaddset(&mask, SIGUSR2);
771 sigaddset(&mask, SIGNAL_SNORT_ROTATE_STATS);
772 sigaddset(&mask, SIGNAL_SNORT_CHILD_READY);
773 #ifdef TARGET_BASED
774 sigaddset(&mask, SIGNAL_SNORT_READ_ATTR_TBL);
775 sigaddset(&mask, SIGVTALRM);
776 #endif
777 pthread_sigmask(SIG_SETMASK, &mask, NULL);
778
779 if ((rval = pthread_create(&tx_thread_id, NULL, &SideChannelThread, NULL)) != 0)
780 {
781 sigemptyset(&mask);
782 pthread_sigmask(SIG_SETMASK, &mask, NULL);
783 FatalError("Side Channel: Unable to create thread: %s\n", strerror(rval));
784 }
785 while (!tx_thread_running)
786 nanosleep(&thread_sleep, NULL);
787
788 p_tx_thread_id = &tx_thread_id;
789 sigemptyset(&mask);
790 pthread_sigmask(SIG_SETMASK, &mask, NULL);
791 LogMessage("Side Channel TX thread started tid=%p (pid=%u)\n", (void *) tx_thread_id, tx_thread_pid);
792 }
793
SideChannelStopTXThread(void)794 void SideChannelStopTXThread(void)
795 {
796 int rval;
797
798 if (!ScSideChannelEnabled())
799 return;
800
801 if (p_tx_thread_id != NULL)
802 {
803 stop_processing = 1;
804 pthread_mutex_lock(&tx_queue.mutex);
805 pthread_cond_signal(&tx_queue.cond);
806 pthread_mutex_unlock(&tx_queue.mutex);
807 if ((rval = pthread_join(*p_tx_thread_id, NULL)) != 0)
808 WarningMessage("Side channel TX thread termination returned an error: %s\n", strerror(rval));
809 }
810 }
811
SideChannelPostInit(void)812 int SideChannelPostInit(void)
813 {
814 SCModule *module;
815
816 if (!ScSideChannelEnabled())
817 return 0;
818
819 for (module = modules; module; module = module->next)
820 {
821 if (module->enabled && module->funcs.postInitFunc)
822 module->funcs.postInitFunc();
823 }
824
825 return 0;
826 }
827
SideChannelStats(int exiting,const char * separator)828 void SideChannelStats(int exiting, const char *separator)
829 {
830 SCModule *module;
831
832 if (!ScSideChannelEnabled())
833 return;
834
835 LogMessage("%s\n", separator);
836 LogMessage("Side Channel:\n");
837 LogMessage(" RX Messages Total: %"PRIu64"\n", Side_Channel_Stats.rx_messages_total);
838 LogMessage(" RX Messages Processed (IB): %"PRIu64"\n", Side_Channel_Stats.rx_messages_processed_ib);
839 LogMessage(" RX Messages Processed (OOB): %"PRIu64"\n", Side_Channel_Stats.rx_messages_processed_oob);
840 LogMessage(" TX Messages Total: %"PRIu64"\n", Side_Channel_Stats.tx_messages_total);
841 LogMessage(" TX Messages Processed: %"PRIu64"\n", Side_Channel_Stats.tx_messages_processed);
842
843 for (module = modules; module; module = module->next)
844 {
845 if (module->enabled && module->funcs.statsFunc)
846 {
847 LogMessage("%s\n", separator);
848 module->funcs.statsFunc(exiting);
849 }
850 }
851
852 LogMessage(" RX Queue Stats:\n");
853 RBMQ_Stats(rx_queue.queue, " ");
854
855 LogMessage(" TX Queue Stats:\n");
856 RBMQ_Stats(tx_queue.queue, " ");
857 }
858
SideChannelCleanUp(void)859 void SideChannelCleanUp(void)
860 {
861 SCModule *module;
862
863 if (!ScSideChannelEnabled())
864 return;
865
866 while ((module = modules))
867 {
868 if (module->enabled)
869 {
870 if (module->funcs.statsFunc)
871 module->funcs.statsFunc(1);
872
873 if (module->funcs.shutdownFunc)
874 module->funcs.shutdownFunc();
875 }
876 modules = module->next;
877 free(module->keyword);
878 free(module);
879 }
880 pthread_cond_destroy(&tx_queue.cond);
881 pthread_mutex_destroy(&tx_queue.mutex);
882 pthread_cond_destroy(&rx_queue.cond);
883 pthread_mutex_destroy(&rx_queue.mutex);
884 }
885
886 /*
887 * WARNING: Messages are being written in and read assuming host byte order.
888 */
889
Write(int fd,const void * buf,size_t count)890 static inline ssize_t Write(int fd, const void *buf, size_t count)
891 {
892 ssize_t n;
893 errno = 0;
894
895 while ((n = write(fd, buf, count)) <= (ssize_t) count)
896 {
897 if (n == (ssize_t) count)
898 return 0;
899
900 if (n > 0)
901 count -= n;
902 else if (errno != EINTR)
903 break;
904 }
905
906 return -1;
907 }
908
SideChannelWriteMsgToFile(int fd,SCMsgHdr * hdr,const uint8_t * msg,uint32_t length)909 int SideChannelWriteMsgToFile(int fd, SCMsgHdr *hdr, const uint8_t *msg, uint32_t length)
910 {
911 if (Write(fd, &hdr->type, sizeof(hdr->type)) != 0)
912 return -1;
913
914 if (Write(fd, &hdr->timestamp, sizeof(hdr->timestamp)) != 0)
915 return -1;
916
917 if (Write(fd, &length, sizeof(length)) != 0)
918 return -1;
919
920 if (Write(fd, msg, length) != 0)
921 return -1;
922
923 return 0;
924 }
925
Read(int fd,void * buf,size_t count)926 static inline ssize_t Read(int fd, void *buf, size_t count)
927 {
928 ssize_t n;
929 errno = 0;
930
931 while ((n = read(fd, buf, count)) <= (ssize_t) count)
932 {
933 if (n == (ssize_t) count)
934 return 0;
935
936 if (n > 0)
937 {
938 count -= n;
939 buf = (uint8_t *) buf + n;
940 }
941 else if (n == 0)
942 break;
943 else if (errno != EINTR)
944 {
945 ErrorMessage("Error reading Logger SCM log file: %s (%d)\n", strerror(errno), errno);
946 break;
947 }
948 }
949 return -1;
950 }
951
SideChannelReadMsgFromFile(int fd,SCMsgHdr * hdr,uint8_t ** msg_ptr,uint32_t * length_ptr)952 int SideChannelReadMsgFromFile(int fd, SCMsgHdr *hdr, uint8_t **msg_ptr, uint32_t *length_ptr)
953 {
954 uint64_t timestamp;
955 uint32_t length;
956 uint16_t type;
957 uint8_t *msg;
958
959 if (Read(fd, &type, sizeof(type)) != 0)
960 return -1;
961
962 if (Read(fd, ×tamp, sizeof(timestamp)) != 0)
963 return -1;
964
965 if (Read(fd, &length, sizeof(length)) != 0)
966 return -1;
967
968 if (length > 0)
969 {
970 msg = SnortAlloc(length);
971 if (Read(fd, msg, length) != 0)
972 {
973 free(msg);
974 return -1;
975 }
976 }
977 else
978 msg = NULL;
979
980 hdr->type = type;
981 hdr->timestamp = timestamp;
982 *length_ptr = length;
983 *msg_ptr = msg;
984
985 return 0;
986 }
987
988 #endif /* SIDE_CHANNEL */
989