1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License Version 2 as
4  * published by the Free Software Foundation.  You may not use, modify or
5  * distribute this program under any other version of the GNU General
6  * Public License.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
16  *
17  * Copyright (C) 2014-2021 Cisco and/or its affiliates. All rights reserved.
18  * Copyright (C) 2012-2013 Sourcefire, Inc.
19  *
20  * Author: Michael Altizer <maltizer@sourcefire.com>
21  *
22  */
23 
24 #ifdef SIDE_CHANNEL
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 
30 #ifdef HAVE_GETTID
31 #define _GNU_SOURCE
32 #endif
33 
34 #include <signal.h>
35 
36 #include "dmq.h"
37 #include "rbmq.h"
38 #include "sidechannel.h"
39 #include "sscm_logger.h"
40 
41 #define DEFAULT_RX_QUEUE_DEPTH      1024
42 #define DEFAULT_RX_QUEUE_DATA_SIZE  10485760
43 #define DEFAULT_TX_QUEUE_DEPTH      1024
44 #define DEFAULT_TX_QUEUE_DATA_SIZE  10485760
45 
46 #define CONF_SEPARATORS     " \t\n\r,"
47 #define CONF_RX_QUEUE_DATA_SIZE "rx-queue-data-size"
48 #define CONF_RX_QUEUE_DEPTH     "rx-queue-depth"
49 #define CONF_TX_QUEUE_DATA_SIZE "tx-queue-data-size"
50 #define CONF_TX_QUEUE_DEPTH     "tx-queue-depth"
51 #define CONF_DISABLE_TX_THREAD  "disable-tx-thread"
52 
53 #ifdef SC_USE_DMQ
54 #define RBMQ_Ptr DMQ_Ptr
55 #define RBMQ_Alloc DMQ_Alloc
56 #define RBMQ_ReserveMsg DMQ_ReserveMsg
57 #define RBMQ_CommitReservedMsg DMQ_CommitReservedMsg
58 #define RBMQ_DiscardReservedMsg DMQ_DiscardReservedMsg
59 #define RBMQ_CommitExternalMsg DMQ_CommitExternalMsg
60 #define RBMQ_ReadMsg DMQ_ReadMsg
61 #define RBMQ_AckMsg DMQ_AckMsg
62 #define RBMQ_IsEmpty DMQ_IsEmpty
63 #define RBMQ_Stats DMQ_Stats
64 #endif
65 
66 enum ConfState
67 {
68     STATE_START,
69     STATE_RX_QUEUE_DATA_SIZE,
70     STATE_RX_QUEUE_DEPTH,
71     STATE_TX_QUEUE_DATA_SIZE,
72     STATE_TX_QUEUE_DEPTH
73 };
74 
75 typedef struct _SC_CONFIG
76 {
77     uint32_t rx_queue_max_data_size;
78     uint32_t rx_queue_max_depth;
79     uint32_t tx_queue_max_data_size;
80     uint32_t tx_queue_max_depth;
81     bool disable_tx_thread;
82     bool enabled;
83 } SCConfig;
84 
85 typedef struct _SC_MODULE
86 {
87     struct _SC_MODULE *next;
88     char *keyword;
89     SCMFunctionBundle funcs;
90     bool enabled;
91 } SCModule;
92 
93 typedef struct _SC_HANDLER
94 {
95     struct _SC_HANDLER *next;
96     uint16_t type;
97     SCMProcessMsgFunc processMsgFunc;
98     void *data;
99 } SCHandler;
100 
101 typedef struct _SC_MESSAGE_QUEUE
102 {
103     RBMQ_Ptr queue;
104     pthread_mutex_t mutex;
105     pthread_cond_t cond;
106     uint32_t max_data_size;
107     uint32_t max_depth;
108 } SCMessageQueue;
109 
110 static struct {
111     uint64_t rx_messages_total;
112     uint64_t rx_messages_processed_ib;
113     uint64_t rx_messages_processed_oob;
114     uint64_t tx_messages_total;
115     uint64_t tx_messages_processed;
116 } Side_Channel_Stats;
117 
118 static volatile int stop_processing = 0;
119 static volatile int tx_thread_running = 0;
120 
121 static pid_t tx_thread_pid;
122 static pthread_t tx_thread_id;
123 static pthread_t *p_tx_thread_id;
124 
125 static SCConfig sc_config;
126 
127 static SCMessageQueue rx_queue;
128 static SCMessageQueue tx_queue;
129 
130 static SCModule *modules;
131 static SCHandler *rx_handlers;
132 static SCHandler *tx_handlers;
133 
134 #ifdef PERF_PROFILING
135 PreprocStats sideChannelRxPerfStats;
136 #endif
137 
RegisterSideChannelModules(void)138 void RegisterSideChannelModules(void)
139 {
140     if (!ScSideChannelEnabled())
141         return;
142 
143     SetupLoggerSCM();
144 }
145 
RegisterSideChannelModule(const char * keyword,SCMFunctionBundle * funcs)146 void RegisterSideChannelModule(const char *keyword, SCMFunctionBundle *funcs)
147 {
148     SCModule *module, *tmp, *last = NULL;
149 
150     if (!ScSideChannelEnabled())
151         return;
152 
153     if (!keyword)
154         FatalError("No keyword given while registering a side channel module!\n");
155 
156     if (!funcs)
157         FatalError("No function bundle given while registering side channel '%s'!\n", keyword);
158 
159     for (tmp = modules; tmp; tmp = tmp->next)
160     {
161         if (strcasecmp(tmp->keyword, keyword) == 0)
162             FatalError("Duplicate side channel keyword: %s\n", keyword);
163         last = tmp;
164     }
165     module = SnortAlloc(sizeof(SCModule));
166 
167     module->next = NULL;
168     module->keyword = SnortStrdup(keyword);
169     module->funcs = *funcs;
170     module->enabled = 0;
171 
172     LogMessage("Register SCM '%s' with configFunc=%p, initFunc=%p, postInitFunc=%p, idleFunc=%p, statsFunc=%p, shutdownFunc=%p\n",
173             keyword, module->funcs.configFunc, module->funcs.initFunc, module->funcs.postInitFunc,
174             module->funcs.idleFunc, module->funcs.statsFunc, module->funcs.shutdownFunc);
175 
176     if (last)
177         last->next = module;
178     else
179         modules = module;
180 }
181 
ConfigureSideChannelModule(const char * keyword,char * opts)182 int ConfigureSideChannelModule(const char *keyword, char *opts)
183 {
184     SCModule *module;
185 
186     for (module = modules; module; module = module->next)
187     {
188         if (strcasecmp(module->keyword, keyword) == 0)
189             break;
190     }
191     if (!module)
192         return -ENOENT;
193 
194     module->funcs.configFunc(opts);
195     module->enabled = 1;
196 
197     return 0;
198 }
199 
SCRegisterHandler(SCHandler ** handlers,uint16_t type,SCMProcessMsgFunc processMsgFunc,void * data)200 static int SCRegisterHandler(SCHandler **handlers, uint16_t type, SCMProcessMsgFunc processMsgFunc, void *data)
201 {
202     SCHandler *handler;
203 
204     if (!ScSideChannelEnabled())
205         return 0;
206 
207     handler = SnortAlloc(sizeof(SCHandler));
208 
209     handler->next = NULL;
210     handler->type = type;
211     handler->processMsgFunc = processMsgFunc;
212     handler->data = data;
213 
214     handler->next = *handlers;
215     *handlers = handler;
216 
217     return 0;
218 }
219 
SideChannelRegisterRXHandler(uint16_t type,SCMProcessMsgFunc processMsgFunc,void * data)220 int SideChannelRegisterRXHandler(uint16_t type, SCMProcessMsgFunc processMsgFunc, void *data)
221 {
222     return SCRegisterHandler(&rx_handlers, type, processMsgFunc, data);
223 }
224 
SideChannelRegisterTXHandler(uint16_t type,SCMProcessMsgFunc processMsgFunc,void * data)225 int SideChannelRegisterTXHandler(uint16_t type, SCMProcessMsgFunc processMsgFunc, void *data)
226 {
227     return SCRegisterHandler(&tx_handlers, type, processMsgFunc, data);
228 }
229 
SCUnregisterHandler(SCHandler ** handlers,uint16_t type,SCMProcessMsgFunc processMsgFunc)230 static void SCUnregisterHandler(SCHandler **handlers, uint16_t type, SCMProcessMsgFunc processMsgFunc)
231 {
232     SCHandler *handler, *prev;
233 
234     if (!ScSideChannelEnabled())
235         return;
236 
237     for (prev = NULL, handler = *handlers; handler; prev = handler, handler = handler->next)
238     {
239         if (handler->type == type && handler->processMsgFunc == processMsgFunc)
240             break;
241     }
242 
243     if (handler)
244     {
245         if (!prev)
246             *handlers = handler->next;
247         else
248             prev->next = handler->next;
249 
250         free(handler);
251     }
252 }
253 
SideChannelUnregisterRXHandler(uint16_t type,SCMProcessMsgFunc processMsgFunc)254 void SideChannelUnregisterRXHandler(uint16_t type, SCMProcessMsgFunc processMsgFunc)
255 {
256     SCUnregisterHandler(&rx_handlers, type, processMsgFunc);
257 }
258 
SideChannelUnregisterTXHandler(uint16_t type,SCMProcessMsgFunc processMsgFunc)259 void SideChannelUnregisterTXHandler(uint16_t type, SCMProcessMsgFunc processMsgFunc)
260 {
261     SCUnregisterHandler(&tx_handlers, type, processMsgFunc);
262 }
263 
SCPreallocMessage(SCMessageQueue * mq,uint32_t length,SCMsgHdr ** hdr_ptr,uint8_t ** msg_ptr,void ** msg_handle)264 static int SCPreallocMessage(SCMessageQueue *mq, uint32_t length, SCMsgHdr **hdr_ptr, uint8_t **msg_ptr, void **msg_handle)
265 {
266     int rval;
267 
268     pthread_mutex_lock(&mq->mutex);
269     rval = RBMQ_ReserveMsg(mq->queue, length, (void **) hdr_ptr, msg_ptr, msg_handle);
270     pthread_mutex_unlock(&mq->mutex);
271 
272     return rval;
273 }
274 
SideChannelPreallocMessageRX(uint32_t length,SCMsgHdr ** hdr_ptr,uint8_t ** msg_ptr,void ** msg_handle)275 int SideChannelPreallocMessageRX(uint32_t length, SCMsgHdr **hdr_ptr, uint8_t **msg_ptr, void **msg_handle)
276 {
277     return SCPreallocMessage(&rx_queue, length, hdr_ptr, msg_ptr, msg_handle);
278 }
279 
SideChannelPreallocMessageTX(uint32_t length,SCMsgHdr ** hdr_ptr,uint8_t ** msg_ptr,void ** msg_handle)280 int SideChannelPreallocMessageTX(uint32_t length, SCMsgHdr **hdr_ptr, uint8_t **msg_ptr, void **msg_handle)
281 {
282     return SCPreallocMessage(&tx_queue, length, hdr_ptr, msg_ptr, msg_handle);
283 }
284 
SCDiscardMessage(SCMessageQueue * mq,void * msg_handle)285 static int SCDiscardMessage(SCMessageQueue *mq, void *msg_handle)
286 {
287     int rval;
288 
289     pthread_mutex_lock(&mq->mutex);
290     rval = RBMQ_DiscardReservedMsg(mq->queue, msg_handle);
291     pthread_mutex_unlock(&mq->mutex);
292 
293     return rval;
294 }
295 
SideChannelDiscardMessageRX(void * msg_handle)296 int SideChannelDiscardMessageRX(void *msg_handle)
297 {
298     return SCDiscardMessage(&rx_queue, msg_handle);
299 }
300 
SideChannelDiscardMessageTX(void * msg_handle)301 int SideChannelDiscardMessageTX(void *msg_handle)
302 {
303     return SCDiscardMessage(&tx_queue, msg_handle);
304 }
305 
SCEnqueueMessage(SCMessageQueue * mq,SCMsgHdr * hdr,const uint8_t * msg,uint32_t length,void * msg_handle,SCMQMsgFreeFunc msgFreeFunc)306 static int SCEnqueueMessage(SCMessageQueue *mq, SCMsgHdr *hdr, const uint8_t *msg, uint32_t length, void *msg_handle, SCMQMsgFreeFunc msgFreeFunc)
307 {
308     int rval;
309 
310     if (!msg_handle)
311     {
312         SCMsgHdr *hdr_ptr;
313         uint8_t *msg_ptr;
314 
315         rval = RBMQ_ReserveMsg(mq->queue, length, (void **) &hdr_ptr, &msg_ptr, &msg_handle);
316         if (rval != 0)
317         {
318             ErrorMessage("%s: Could not reserve message: %d\n", __FUNCTION__, rval);
319             return rval;
320         }
321         memcpy(msg_ptr, msg, length);
322         memcpy(hdr_ptr, hdr, sizeof(SCMsgHdr));
323         rval = RBMQ_CommitReservedMsg(mq->queue, msg_handle, length, msgFreeFunc);
324         if (rval != 0)
325         {
326             ErrorMessage("%s: Could not commit reserved message: %d\n", __FUNCTION__, rval);
327             return rval;
328         }
329     }
330     else
331         rval = RBMQ_CommitReservedMsg(mq->queue, msg_handle, length, msgFreeFunc);
332 
333     return rval;
334 }
335 
SCProcessMessage(SCHandler * handlers,SCMsgHdr * hdr,const uint8_t * msg,uint32_t length)336 static inline void SCProcessMessage(SCHandler *handlers, SCMsgHdr *hdr, const uint8_t *msg, uint32_t length)
337 {
338     SCHandler *handler;
339 
340     for (handler = handlers; handler; handler = handler->next)
341     {
342         if (hdr->type == handler->type || handler->type == SC_MSG_TYPE_ANY)
343             handler->processMsgFunc(hdr, msg, length);
344     }
345 }
346 
SCDrainAndProcess(SCMessageQueue * mq,SCHandler * handlers)347 static int SCDrainAndProcess(SCMessageQueue *mq, SCHandler *handlers)
348 {
349     SCMsgHdr *hdr;
350     uint32_t length;
351     const uint8_t *msg;
352     void *msg_handle;
353     int rval;
354 
355     /* Read a message from the queue. */
356     pthread_mutex_lock(&mq->mutex);
357     rval = RBMQ_ReadMsg(mq->queue, (const void **) &hdr, &msg, &length, &msg_handle);
358     pthread_mutex_unlock(&mq->mutex);
359     if (rval != 0)
360         return 1;
361 
362     /* Handle it. */
363     SCProcessMessage(handlers, hdr, msg, length);
364 
365     /* And, finally, acknowledge it. */
366     pthread_mutex_lock(&mq->mutex);
367     rval = RBMQ_AckMsg(mq->queue, msg_handle);
368     pthread_mutex_unlock(&mq->mutex);
369     if (rval != 0)
370         WarningMessage("Error ACK'ing message %p!\n", msg_handle);
371 
372     return 0;
373 }
374 
375 /* Called by an out-of-band thread (probably a Side Channel Module). */
SideChannelEnqueueMessageRX(SCMsgHdr * hdr,const uint8_t * msg,uint32_t length,void * msg_handle,SCMQMsgFreeFunc msgFreeFunc)376 int SideChannelEnqueueMessageRX(SCMsgHdr *hdr, const uint8_t *msg, uint32_t length, void *msg_handle, SCMQMsgFreeFunc msgFreeFunc)
377 {
378     int rval;
379 
380     /*
381      * During snort exit, any messages need not to be processed. So checking
382      * stop_processing before processing the message.
383      */
384     if (stop_processing) {
385         return -1;
386     }
387     /*
388      * Because the Snort main thread relinquishes control to DAQ_Acquire for up to a second,
389      * we potentially need to preempt it and process RX messages as they are being enqueued
390      * to avoid backups and overruns.
391      * This should be safe since the main thread holds the snort_process_lock mutex while it
392      * is not in DAQ_Acquire().
393      */
394     while (pthread_mutex_trylock(&snort_process_lock) == 0)
395     {
396         /* If there are no more messages in the RX queue, process the new message without enqueuing it and return. */
397         if (SCDrainAndProcess(&rx_queue, rx_handlers) != 0)
398         {
399             SCProcessMessage(rx_handlers, hdr, msg, length);
400             if (msgFreeFunc)
401                 msgFreeFunc((uint8_t *) msg);
402             if (msg_handle)
403             {
404                 pthread_mutex_lock(&rx_queue.mutex);
405                 RBMQ_DiscardReservedMsg(rx_queue.queue, msg_handle);
406                 pthread_mutex_unlock(&rx_queue.mutex);
407             }
408             Side_Channel_Stats.rx_messages_total++;
409             Side_Channel_Stats.rx_messages_processed_oob++;
410 
411             pthread_mutex_unlock(&snort_process_lock);
412             return 0;
413         }
414         else
415             Side_Channel_Stats.rx_messages_processed_oob++;
416 
417         pthread_mutex_unlock(&snort_process_lock);
418     }
419 
420     /* Finally, enqueue the message if we really have to. */
421     pthread_mutex_lock(&rx_queue.mutex);
422     rval = SCEnqueueMessage(&rx_queue, hdr, msg, length, msg_handle, msgFreeFunc);
423     /* TODO: Error check the above call. */
424     Side_Channel_Stats.rx_messages_total++;
425     pthread_mutex_unlock(&rx_queue.mutex);
426 
427     return rval;
428 }
429 
430 /* Called in the Snort main thread. */
SideChannelEnqueueMessageTX(SCMsgHdr * hdr,const uint8_t * msg,uint32_t length,void * msg_handle,SCMQMsgFreeFunc msgFreeFunc)431 int SideChannelEnqueueMessageTX(SCMsgHdr *hdr, const uint8_t *msg, uint32_t length, void *msg_handle, SCMQMsgFreeFunc msgFreeFunc)
432 {
433     int rval, empty;
434 
435     /* Only bother queuing if the TX thread is running, otherwise just immediately process. */
436     if (tx_thread_running)
437     {
438         pthread_mutex_lock(&tx_queue.mutex);
439         empty = RBMQ_IsEmpty(tx_queue.queue);
440         rval = SCEnqueueMessage(&tx_queue, hdr, msg, length, msg_handle, msgFreeFunc);
441         /* TODO: Error check the above call. */
442         Side_Channel_Stats.tx_messages_total++;
443         /* If the queue was empty, signal any waiters. */
444         if (empty)
445             pthread_cond_signal(&tx_queue.cond);
446         pthread_mutex_unlock(&tx_queue.mutex);
447     }
448     else
449     {
450         SCProcessMessage(tx_handlers, hdr, msg, length);
451         Side_Channel_Stats.tx_messages_total++;
452         Side_Channel_Stats.tx_messages_processed++;
453         if (msgFreeFunc)
454             msgFreeFunc((uint8_t *) msg);
455         if (msg_handle)
456         {
457             pthread_mutex_lock(&tx_queue.mutex);
458             RBMQ_DiscardReservedMsg(tx_queue.queue, msg_handle);
459             pthread_mutex_unlock(&tx_queue.mutex);
460         }
461         rval = 0;
462     }
463 
464     return rval;
465 }
466 
SCEnqueueData(SCMessageQueue * mq,SCMsgHdr * hdr,uint8_t * msg,uint32_t length,SCMQMsgFreeFunc msgFreeFunc)467 static int SCEnqueueData(SCMessageQueue *mq, SCMsgHdr *hdr, uint8_t *msg, uint32_t length, SCMQMsgFreeFunc msgFreeFunc)
468 {
469     return RBMQ_CommitExternalMsg(mq->queue, hdr, msg, length, msgFreeFunc);
470 }
471 
472 /* Called by an out-of-band thread (probably a Side Channel Module). */
SideChannelEnqueueDataRX(SCMsgHdr * hdr,uint8_t * msg,uint32_t length,SCMQMsgFreeFunc msgFreeFunc)473 int SideChannelEnqueueDataRX(SCMsgHdr *hdr, uint8_t *msg, uint32_t length, SCMQMsgFreeFunc msgFreeFunc)
474 {
475     int rval;
476 
477     pthread_mutex_lock(&rx_queue.mutex);
478     rval = SCEnqueueData(&rx_queue, hdr, msg, length, msgFreeFunc);
479     /* TODO: Error check the above call. */
480     Side_Channel_Stats.rx_messages_total++;
481     pthread_mutex_unlock(&rx_queue.mutex);
482 
483     return rval;
484 }
485 
486 /* Called in the Snort main thread. */
SideChannelEnqueueDataTX(SCMsgHdr * hdr,uint8_t * msg,uint32_t length,SCMQMsgFreeFunc msgFreeFunc)487 int SideChannelEnqueueDataTX(SCMsgHdr *hdr, uint8_t *msg, uint32_t length, SCMQMsgFreeFunc msgFreeFunc)
488 {
489     int rval, empty;
490 
491     /* Only bother queuing if the TX thread is running, otherwise just immediately process. */
492     if (tx_thread_running)
493     {
494         pthread_mutex_lock(&tx_queue.mutex);
495         empty = RBMQ_IsEmpty(tx_queue.queue);
496         rval = SCEnqueueData(&tx_queue, hdr, msg, length, msgFreeFunc);
497         /* TODO: Error check the above call. */
498         Side_Channel_Stats.tx_messages_total++;
499         /* If the queue was empty, signal any waiters. */
500         if (empty)
501             pthread_cond_signal(&tx_queue.cond);
502         pthread_mutex_unlock(&tx_queue.mutex);
503     }
504     else
505     {
506         SCProcessMessage(tx_handlers, hdr, msg, length);
507         Side_Channel_Stats.tx_messages_total++;
508         Side_Channel_Stats.tx_messages_processed++;
509         msgFreeFunc(msg);
510         rval = 0;
511     }
512 
513     return rval;
514 }
515 
516 /* Called in the Snort main thread. */
SideChannelDrainRX(unsigned max_msgs)517 uint32_t SideChannelDrainRX(unsigned max_msgs)
518 {
519     uint32_t processed = 0;
520 
521     if (!ScSideChannelEnabled())
522         return 0;
523 
524     if (RBMQ_IsEmpty(rx_queue.queue))
525         return 0;
526 
527     while (!max_msgs || processed < max_msgs)
528     {
529         if (stop_processing || SCDrainAndProcess(&rx_queue, rx_handlers) != 0)
530             break;
531 
532         Side_Channel_Stats.rx_messages_processed_ib++;
533         processed++;
534     }
535 
536     return processed;
537 }
538 
SideChannelThread(void * arg)539 static void *SideChannelThread(void *arg)
540 {
541     struct timespec ts;
542     struct timeval tv;
543     SCHandler *handler;
544     SCModule *module;
545     SCMsgHdr *hdr;
546     uint32_t length;
547     const uint8_t *msg;
548     void *msg_handle;
549     int rval;
550 
551     tx_thread_pid = gettid();
552     tx_thread_running = 1;
553 
554     pthread_mutex_lock(&tx_queue.mutex);
555     while (!stop_processing)
556     {
557         /* If the message queue is empty, we will stop without unlocking it so we can immediately start a timed wait. */
558         while ((rval = RBMQ_ReadMsg(tx_queue.queue, (const void **) &hdr, &msg, &length, &msg_handle)) == 0)
559         {
560             pthread_mutex_unlock(&tx_queue.mutex);
561 
562             for (handler = tx_handlers; handler; handler = handler->next)
563             {
564                 if (hdr->type == handler->type || handler->type == SC_MSG_TYPE_ANY)
565                     handler->processMsgFunc(hdr, msg, length);
566             }
567 
568             pthread_mutex_lock(&tx_queue.mutex);
569             rval = RBMQ_AckMsg(tx_queue.queue, msg_handle);
570             if (rval != 0)
571                 WarningMessage("Error ACK'ing message %p!\n", msg_handle);
572             /* Again, not unlocking so that we're already locked for the three places we can go
573                 from here, which are all expecting it (dequeue, timed wait, or done). */
574 
575             Side_Channel_Stats.tx_messages_processed++;
576 #ifndef REG_TEST
577             if (stop_processing)
578                 goto done;
579 #endif
580         }
581         if (stop_processing)
582             goto done;
583         gettimeofday(&tv, NULL);
584         ts.tv_sec = tv.tv_sec + 10;
585         ts.tv_nsec = tv.tv_usec * 1000;
586         rval = pthread_cond_timedwait(&tx_queue.cond, &tx_queue.mutex, &ts);
587         /* If we timed out waiting for new output messages to process, run the registered idle routines. */
588         if (rval == ETIMEDOUT && !stop_processing)
589         {
590             for (module = modules; module; module = module->next)
591             {
592                 if (module->enabled && module->funcs.idleFunc)
593                     module->funcs.idleFunc();
594             }
595         }
596     }
597 done:
598     pthread_mutex_unlock(&tx_queue.mutex);
599     tx_thread_running = 0;
600 
601     LogMessage("Side Channel thread exiting...\n");
602 
603     return NULL;
604 }
605 
SCParseConfiguration(SnortConfig * sc,SCConfig * config)606 static void SCParseConfiguration(SnortConfig *sc, SCConfig *config)
607 {
608     long int value;
609     char *token, *argcpy, *endptr;
610     enum ConfState confState = STATE_START;
611 
612     memset(config, 0, sizeof(SCConfig));
613 
614     config->enabled = sc->side_channel_config.enabled;
615     if (!config->enabled)
616         return;
617 
618     config->rx_queue_max_data_size = DEFAULT_RX_QUEUE_DATA_SIZE;
619     config->rx_queue_max_depth = DEFAULT_RX_QUEUE_DEPTH;
620     config->tx_queue_max_data_size = DEFAULT_TX_QUEUE_DATA_SIZE;
621     config->tx_queue_max_depth = DEFAULT_TX_QUEUE_DEPTH;
622     config->disable_tx_thread = false;
623 
624     if (!sc->side_channel_config.opts)
625         return;
626 
627     argcpy = sc->side_channel_config.opts;
628     for (token = strtok(argcpy, CONF_SEPARATORS); token; token = strtok(NULL, CONF_SEPARATORS))
629     {
630         switch (confState)
631         {
632             case STATE_START:
633                 if (strcmp(token, CONF_RX_QUEUE_DATA_SIZE) == 0)
634                     confState = STATE_RX_QUEUE_DATA_SIZE;
635                 else if (strcmp(token, CONF_RX_QUEUE_DEPTH) == 0)
636                     confState = STATE_RX_QUEUE_DEPTH;
637                 else if (strcmp(token, CONF_TX_QUEUE_DATA_SIZE) == 0)
638                     confState = STATE_TX_QUEUE_DATA_SIZE;
639                 else if (strcmp(token, CONF_TX_QUEUE_DEPTH) == 0)
640                     confState = STATE_TX_QUEUE_DEPTH;
641                 else if (strcmp(token, CONF_DISABLE_TX_THREAD) == 0)
642                     config->disable_tx_thread = true;
643                 else
644                     FatalError("Invalid side channel configuration token: '%s'\n", token);
645                 break;
646             case STATE_RX_QUEUE_DATA_SIZE:
647                 confState = STATE_START;
648                 value = SnortStrtoul(token, &endptr, 0);
649                 if (errno != 0 || *endptr != '\0')
650                     FatalError("Invalid argument for side channel RX queue data size: '%s'\n", token);
651                 config->rx_queue_max_data_size = value;
652                 break;
653             case STATE_RX_QUEUE_DEPTH:
654                 confState = STATE_START;
655                 value = SnortStrtoul(token, &endptr, 0);
656                 if (errno != 0 || *endptr != '\0')
657                     FatalError("Invalid argument for side channel RX queue depth: '%s'\n", token);
658                 config->rx_queue_max_depth = value;
659                 break;
660             case STATE_TX_QUEUE_DATA_SIZE:
661                 confState = STATE_START;
662                 value = SnortStrtoul(token, &endptr, 0);
663                 if (errno != 0 || *endptr != '\0')
664                     FatalError("Invalid argument for side channel TX queue data size: '%s'\n", token);
665                 config->tx_queue_max_data_size = value;
666                 break;
667             case STATE_TX_QUEUE_DEPTH:
668                 confState = STATE_START;
669                 value = SnortStrtoul(token, &endptr, 0);
670                 if (errno != 0 || *endptr != '\0')
671                     FatalError("Invalid argument for side channel TX queue depth: '%s'\n", token);
672                 config->tx_queue_max_depth = value;
673                 break;
674             default:
675                 break;
676         }
677     }
678 }
679 
680 #ifdef SNORT_RELOAD
SideChannelVerifyConfig(SnortConfig * sc)681 int SideChannelVerifyConfig(SnortConfig *sc)
682 {
683     SCConfig config;
684 
685     SCParseConfiguration(sc, &config);
686 
687     return memcmp(&config, &sc_config, sizeof(SCConfig));
688 }
689 #endif
690 
SideChannelConfigure(SnortConfig * sc)691 void SideChannelConfigure(SnortConfig *sc)
692 {
693     if (!sc->side_channel_config.enabled)
694         return;
695 
696     SCParseConfiguration(sc, &sc_config);
697 
698     rx_queue.max_data_size = sc_config.rx_queue_max_data_size;
699     rx_queue.max_depth = sc_config.rx_queue_max_depth;
700     tx_queue.max_data_size = sc_config.tx_queue_max_data_size;
701     tx_queue.max_depth = sc_config.tx_queue_max_depth;
702 
703     LogMessage("Side Channel config:\n");
704     LogMessage("  RX Queue Max Data Size: %u\n", sc_config.rx_queue_max_data_size);
705     LogMessage("  RX Queue Max Depth: %u\n", sc_config.rx_queue_max_depth);
706     LogMessage("  TX Queue Max Data Size: %u\n", sc_config.tx_queue_max_data_size);
707     LogMessage("  RX Queue Max Depth: %u\n", sc_config.tx_queue_max_depth);
708 }
709 
SideChannelInit(void)710 void SideChannelInit(void)
711 {
712     SCModule *module;
713 
714     if (!ScSideChannelEnabled())
715         return;
716 
717     pthread_mutex_init(&rx_queue.mutex, NULL);
718     pthread_cond_init(&rx_queue.cond, NULL);
719     rx_queue.queue = RBMQ_Alloc(rx_queue.max_depth, sizeof(SCMsgHdr), rx_queue.max_data_size);
720 
721     pthread_cond_init(&tx_queue.cond, NULL);
722     pthread_mutex_init(&tx_queue.mutex, NULL);
723     tx_queue.queue = RBMQ_Alloc(tx_queue.max_depth, sizeof(SCMsgHdr), tx_queue.max_data_size);
724 
725     for (module = modules; module; module = module->next)
726     {
727         if (module->enabled && module->funcs.initFunc)
728             module->funcs.initFunc();
729     }
730 }
731 
SideChannelStartTXThread(void)732 void SideChannelStartTXThread(void)
733 {
734     const struct timespec thread_sleep = { 0, 100 };
735     SCModule *module;
736     sigset_t mask;
737     int found, rval;
738 
739     if (!ScSideChannelEnabled())
740         return;
741 
742     if (sc_config.disable_tx_thread)
743         return;
744 
745     /* Avoid starting the TX thread if there are no TX handlers or TX idle tasks registered. */
746     found = 0;
747     for (module = modules; module; module = module->next)
748     {
749         if (module->enabled && module->funcs.idleFunc)
750         {
751             found = 1;
752             break;
753         }
754     }
755     if (!found && !tx_handlers)
756     {
757         LogMessage("Not starting unnecessary Side Channel TX thread.\n");
758         return;
759     }
760 
761     /* Spin off the Side Channel handler thread. */
762     sigemptyset(&mask);
763     sigaddset(&mask, SIGTERM);
764     sigaddset(&mask, SIGQUIT);
765     sigaddset(&mask, SIGPIPE);
766     sigaddset(&mask, SIGINT);
767     sigaddset(&mask, SIGNAL_SNORT_RELOAD);
768     sigaddset(&mask, SIGNAL_SNORT_DUMP_STATS);
769     sigaddset(&mask, SIGUSR1);
770     sigaddset(&mask, SIGUSR2);
771     sigaddset(&mask, SIGNAL_SNORT_ROTATE_STATS);
772     sigaddset(&mask, SIGNAL_SNORT_CHILD_READY);
773 #ifdef TARGET_BASED
774     sigaddset(&mask, SIGNAL_SNORT_READ_ATTR_TBL);
775     sigaddset(&mask, SIGVTALRM);
776 #endif
777     pthread_sigmask(SIG_SETMASK, &mask, NULL);
778 
779     if ((rval = pthread_create(&tx_thread_id, NULL, &SideChannelThread, NULL)) != 0)
780     {
781         sigemptyset(&mask);
782         pthread_sigmask(SIG_SETMASK, &mask, NULL);
783         FatalError("Side Channel: Unable to create thread: %s\n", strerror(rval));
784     }
785     while (!tx_thread_running)
786         nanosleep(&thread_sleep, NULL);
787 
788     p_tx_thread_id = &tx_thread_id;
789     sigemptyset(&mask);
790     pthread_sigmask(SIG_SETMASK, &mask, NULL);
791     LogMessage("Side Channel TX thread started tid=%p (pid=%u)\n", (void *) tx_thread_id, tx_thread_pid);
792 }
793 
SideChannelStopTXThread(void)794 void SideChannelStopTXThread(void)
795 {
796     int rval;
797 
798     if (!ScSideChannelEnabled())
799         return;
800 
801     if (p_tx_thread_id != NULL)
802     {
803         stop_processing = 1;
804         pthread_mutex_lock(&tx_queue.mutex);
805         pthread_cond_signal(&tx_queue.cond);
806         pthread_mutex_unlock(&tx_queue.mutex);
807         if ((rval = pthread_join(*p_tx_thread_id, NULL)) != 0)
808             WarningMessage("Side channel TX thread termination returned an error: %s\n", strerror(rval));
809     }
810 }
811 
SideChannelPostInit(void)812 int SideChannelPostInit(void)
813 {
814     SCModule *module;
815 
816     if (!ScSideChannelEnabled())
817         return 0;
818 
819     for (module = modules; module; module = module->next)
820     {
821         if (module->enabled && module->funcs.postInitFunc)
822             module->funcs.postInitFunc();
823     }
824 
825     return 0;
826 }
827 
SideChannelStats(int exiting,const char * separator)828 void SideChannelStats(int exiting, const char *separator)
829 {
830     SCModule *module;
831 
832     if (!ScSideChannelEnabled())
833         return;
834 
835     LogMessage("%s\n", separator);
836     LogMessage("Side Channel:\n");
837     LogMessage("  RX Messages Total:            %"PRIu64"\n", Side_Channel_Stats.rx_messages_total);
838     LogMessage("  RX Messages Processed (IB):   %"PRIu64"\n", Side_Channel_Stats.rx_messages_processed_ib);
839     LogMessage("  RX Messages Processed (OOB):  %"PRIu64"\n", Side_Channel_Stats.rx_messages_processed_oob);
840     LogMessage("  TX Messages Total:            %"PRIu64"\n", Side_Channel_Stats.tx_messages_total);
841     LogMessage("  TX Messages Processed:        %"PRIu64"\n", Side_Channel_Stats.tx_messages_processed);
842 
843     for (module = modules; module; module = module->next)
844     {
845         if (module->enabled && module->funcs.statsFunc)
846         {
847             LogMessage("%s\n", separator);
848             module->funcs.statsFunc(exiting);
849         }
850     }
851 
852     LogMessage("  RX Queue Stats:\n");
853     RBMQ_Stats(rx_queue.queue, "  ");
854 
855     LogMessage("  TX Queue Stats:\n");
856     RBMQ_Stats(tx_queue.queue, "  ");
857 }
858 
SideChannelCleanUp(void)859 void SideChannelCleanUp(void)
860 {
861     SCModule *module;
862 
863     if (!ScSideChannelEnabled())
864         return;
865 
866     while ((module = modules))
867     {
868         if (module->enabled)
869         {
870             if (module->funcs.statsFunc)
871                 module->funcs.statsFunc(1);
872 
873             if (module->funcs.shutdownFunc)
874                 module->funcs.shutdownFunc();
875         }
876         modules = module->next;
877         free(module->keyword);
878         free(module);
879     }
880     pthread_cond_destroy(&tx_queue.cond);
881     pthread_mutex_destroy(&tx_queue.mutex);
882     pthread_cond_destroy(&rx_queue.cond);
883     pthread_mutex_destroy(&rx_queue.mutex);
884 }
885 
886 /*
887  * WARNING: Messages are being written in and read assuming host byte order.
888  */
889 
Write(int fd,const void * buf,size_t count)890 static inline ssize_t Write(int fd, const void *buf, size_t count)
891 {
892     ssize_t n;
893     errno = 0;
894 
895     while ((n = write(fd, buf, count)) <= (ssize_t) count)
896     {
897         if (n == (ssize_t) count)
898             return 0;
899 
900         if (n > 0)
901             count -= n;
902         else if (errno != EINTR)
903             break;
904     }
905 
906     return -1;
907 }
908 
SideChannelWriteMsgToFile(int fd,SCMsgHdr * hdr,const uint8_t * msg,uint32_t length)909 int SideChannelWriteMsgToFile(int fd, SCMsgHdr *hdr, const uint8_t *msg, uint32_t length)
910 {
911     if (Write(fd, &hdr->type, sizeof(hdr->type)) != 0)
912         return -1;
913 
914     if (Write(fd, &hdr->timestamp, sizeof(hdr->timestamp)) != 0)
915         return -1;
916 
917     if (Write(fd, &length, sizeof(length)) != 0)
918         return -1;
919 
920     if (Write(fd, msg, length) != 0)
921         return -1;
922 
923     return 0;
924 }
925 
Read(int fd,void * buf,size_t count)926 static inline ssize_t Read(int fd, void *buf, size_t count)
927 {
928     ssize_t n;
929     errno = 0;
930 
931     while ((n = read(fd, buf, count)) <= (ssize_t) count)
932     {
933         if (n == (ssize_t) count)
934             return 0;
935 
936         if (n > 0)
937         {
938             count -= n;
939             buf = (uint8_t *) buf + n;
940         }
941         else if (n == 0)
942             break;
943         else if (errno != EINTR)
944         {
945             ErrorMessage("Error reading Logger SCM log file: %s (%d)\n", strerror(errno), errno);
946             break;
947         }
948     }
949     return -1;
950 }
951 
SideChannelReadMsgFromFile(int fd,SCMsgHdr * hdr,uint8_t ** msg_ptr,uint32_t * length_ptr)952 int SideChannelReadMsgFromFile(int fd, SCMsgHdr *hdr, uint8_t **msg_ptr, uint32_t *length_ptr)
953 {
954     uint64_t timestamp;
955     uint32_t length;
956     uint16_t type;
957     uint8_t *msg;
958 
959     if (Read(fd, &type, sizeof(type)) != 0)
960         return -1;
961 
962     if (Read(fd, &timestamp, sizeof(timestamp)) != 0)
963         return -1;
964 
965     if (Read(fd, &length, sizeof(length)) != 0)
966         return -1;
967 
968     if (length > 0)
969     {
970         msg = SnortAlloc(length);
971         if (Read(fd, msg, length) != 0)
972         {
973             free(msg);
974             return -1;
975         }
976     }
977     else
978         msg = NULL;
979 
980     hdr->type = type;
981     hdr->timestamp = timestamp;
982     *length_ptr = length;
983     *msg_ptr = msg;
984 
985     return 0;
986 }
987 
988 #endif /* SIDE_CHANNEL */
989