1 /* $Id$ */
2 /****************************************************************************
3  *
4  * Copyright (C) 2014-2021 Cisco and/or its affiliates. All rights reserved.
5  * Copyright (C) 2012-2013 Sourcefire, Inc.
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License Version 2 as
9  * published by the Free Software Foundation.  You may not use, modify or
10  * distribute this program under any other version of the GNU General
11  * Public License.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
21  *
22  *****************************************************************************/
23 
24 /**************************************************************************
25  *
26  * stream5_ha.c
27  *
28  * Authors: Michael Altizer <maltizer@sourcefire.com>, Russ Combs <rcombs@sourcefire.com>
29  *
30  * Description:
31  *
32  * Stream high availability support.
33  *
34  **************************************************************************/
35 
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <assert.h>
41 
42 #include "active.h"
43 #include "mstring.h"
44 #include "packet_time.h"
45 #include "parser.h"
46 #include "sfcontrol_funcs.h"
47 #ifdef SIDE_CHANNEL
48 #include "sidechannel.h"
49 #endif
50 #include "stream5_ha.h"
51 #include "util.h"
52 
53 /*
54  * Stream HA messages will have the following format:
55  *
56  * <message>  ::= <header> <has-rec> <psd-rec>
57  * <header>   ::= <event> version <message length> <key>
58  * <event>    ::= HA_EVENT_UPDATE | HA_EVENT_DELETE
59  * <key>      ::= <ipv4-key> | <ipv6-key>
60  * <ipv4-key> ::= HA_TYPE_KEY sizeof(ipv4-key) ipv4-key
61  * <ipv6-key> ::= HA_TYPE_KEY sizeof(ipv6-key) ipv6-key
62  * <has-rec>  ::= HA_TYPE_HAS sizeof(has-rec) has-rec | (null)
63  * <psd-rec>  ::= HA_TYPE_PSD sizeof(psd-rec) psd-preprocid psd-subcode psd-rec <psd-rec> | (null)
64  */
65 
66 typedef struct _StreamHAFuncsNode
67 {
68     uint16_t id;
69     uint16_t mask;
70     uint8_t preproc_id;
71     uint8_t subcode;
72     uint8_t size;
73     StreamHAProducerFunc produce;
74     StreamHAConsumerFunc consume;
75     uint32_t produced;
76     uint32_t consumed;
77 } StreamHAFuncsNode;
78 
79 typedef enum
80 {
81     HA_TYPE_KEY,    // Lightweight Session Key
82     HA_TYPE_HAS,    // Lightweight Session Data
83     HA_TYPE_PSD,    // Preprocessor-specific Data
84     HA_TYPE_MAX
85 } HA_Type;
86 
87 typedef struct _MsgHeader
88 {
89     uint8_t event;
90     uint8_t version;
91     uint16_t total_length;
92     uint8_t key_type;
93     uint8_t key_size;
94 } MsgHeader;
95 
96 typedef struct _RecordHeader
97 {
98     uint8_t type;
99     uint8_t length;
100 } RecordHeader;
101 
102 typedef struct _PreprocDataHeader
103 {
104     uint8_t preproc_id;
105     uint8_t subcode;
106 } PreprocDataHeader;
107 
108 /* Something more will probably be added to this structure in the future... */
109 #define HA_SESSION_FLAG_LOW     0x01     // client address / port is low in key
110 #define HA_SESSION_FLAG_IP6     0x02     // key addresses are ip6
111 typedef struct
112 {
113     StreamHAState ha_state;
114     uint8_t flags;
115 } StreamHASession;
116 
117 typedef struct
118 {
119     uint32_t update_messages_received;
120     uint32_t update_messages_received_no_session;
121     uint32_t delete_messages_received;
122     uint32_t update_messages_sent_immediately;
123     uint32_t update_messages_sent_normally;
124     uint32_t delete_messages_sent;
125     uint32_t delete_messages_not_sent;
126 } StreamHAStats;
127 
128 typedef struct _HADebugSessionConstraints
129 {
130     int sip_flag;
131     struct in6_addr sip;
132     int dip_flag;
133     struct in6_addr dip;
134     uint16_t sport;
135     uint16_t dport;
136     uint8_t protocol;
137 } HADebugSessionConstraints;
138 
139 #define MAX_STREAM_HA_FUNCS 8  // depends on sizeof(SessionControlBlock.ha_pending_mask)
140 #define HA_MESSAGE_VERSION  0x82
141 static StreamHAFuncsNode *stream_ha_funcs[MAX_STREAM_HA_FUNCS];
142 static int n_stream_ha_funcs = 0;
143 static int runtime_output_fd = -1;
144 #ifdef REG_TEST
145 static int  runtime_input_fd = -1;
146 #endif
147 static uint8_t file_io_buffer[UINT16_MAX];
148 static StreamHAStats s5ha_stats;
149 
150 /* Runtime debugging stuff. */
151 #define HA_DEBUG_SESSION_ID_SIZE    (39+1+5+5+39+1+5+1+3+1) /* "<IPv6 address>:<port> <-> <IPv6 address>:<port> <ipproto>\0" */
152 static HADebugSessionConstraints s5_ha_debug_info;
153 static volatile int s5_ha_debug_flag = 0;
154 static char s5_ha_debug_session[HA_DEBUG_SESSION_ID_SIZE];
155 uint32_t HA_CRITICAL_SESSION_FLAGS = ( SSNFLAG_DROP_CLIENT | SSNFLAG_DROP_SERVER | SSNFLAG_RESET );
156 
157 #define IP6_SESSION_KEY_SIZE sizeof(SessionKey)
158 #define IP4_SESSION_KEY_SIZE (IP6_SESSION_KEY_SIZE - 24)
159 
160 #ifdef PERF_PROFILING
161 PreprocStats sessionHAPerfStats;
162 PreprocStats sessionHAConsumePerfStats;
163 PreprocStats sessionHAProducePerfStats;
164 #endif
165 
166 static int ConsumeHAMessage(const uint8_t *msg, uint32_t msglen);
167 
168 //--------------------------------------------------------------------
169 //  Runtime debugging support.
170 //--------------------------------------------------------------------
StreamHADebugCheck(const SessionKey * key,volatile int debug_flag,HADebugSessionConstraints * info,char * debug_session,size_t debug_session_len)171 static inline bool StreamHADebugCheck(const SessionKey *key, volatile int debug_flag,
172                                         HADebugSessionConstraints *info, char *debug_session, size_t debug_session_len)
173 {
174 #ifndef REG_TEST
175     if (debug_flag)
176     {
177         if ((!info->protocol || info->protocol == key->protocol) &&
178                 (((!info->sport || info->sport == key->port_l) &&
179                   (!info->sip_flag || memcmp(&info->sip, key->ip_l, sizeof(info->sip)) == 0) &&
180                   (!info->dport || info->dport == key->port_h) &&
181                   (!info->dip_flag || memcmp(&info->dip, key->ip_h, sizeof(info->dip)) == 0)) ||
182                  ((!info->sport || info->sport == key->port_h) &&
183                   (!info->sip_flag || memcmp(&info->sip, key->ip_h, sizeof(info->sip)) == 0) &&
184                   (!info->dport || info->dport == key->port_l) &&
185                   (!info->dip_flag || memcmp(&info->dip, key->ip_l, sizeof(info->dip)) == 0))))
186         {
187 #endif
188             char lipstr[INET6_ADDRSTRLEN];
189             char hipstr[INET6_ADDRSTRLEN];
190 
191             lipstr[0] = '\0';
192             hipstr[0] = '\0';
193             if (key->ip_l[0] || key->ip_l[1] || key->ip_l[2] != htonl(0xFFFF) ||
194                 key->ip_h[0] || key->ip_h[1] || key->ip_h[2] != htonl(0xFFFF))
195             {
196                 inet_ntop(AF_INET6, key->ip_l, lipstr, sizeof(lipstr));
197                 inet_ntop(AF_INET6, key->ip_h, hipstr, sizeof(hipstr));
198             }
199             else
200             {
201                 inet_ntop(AF_INET, &key->ip_l[3], lipstr, sizeof(lipstr));
202                 inet_ntop(AF_INET, &key->ip_h[3], hipstr, sizeof(hipstr));
203             }
204             snprintf(debug_session, debug_session_len, "%s:%hu <-> %s:%hu %hhu",
205                     lipstr, key->port_l, hipstr, key->port_h, key->protocol);
206             return true;
207 #ifndef REG_TEST
208         }
209     }
210 
211     return false;
212 #endif
213 }
214 
StreamHADebugParse(const char * desc,const uint8_t * data,uint32_t length,volatile int * debug_flag,HADebugSessionConstraints * info)215 static void StreamHADebugParse(const char *desc, const uint8_t *data, uint32_t length,
216         volatile int *debug_flag, HADebugSessionConstraints *info)
217 {
218     *debug_flag = 0;
219     memset(info, 0, sizeof(*info));
220     do
221     {
222         if (length >= sizeof(info->protocol))
223         {
224             info->protocol = *(uint8_t *)data;
225             length -= sizeof(info->protocol);
226             data += sizeof(info->protocol);
227         }
228         else
229             break;
230 
231         if (length >= sizeof(info->sip))
232         {
233             if (memcmp(data + 4, info->sip.s6_addr + 4, 12) == 0)
234             {
235                 if (memcmp(data, info->sip.s6_addr, 4) != 0)
236                 {
237                     info->dip_flag = 1;
238                     info->sip.s6_addr32[0] = info->sip.s6_addr32[1] = info->sip.s6_addr16[4] = 0;
239                     info->sip.s6_addr16[5] = 0xFFFF;
240                     info->sip.s6_addr32[3] = *(uint32_t*)data;
241                 }
242             }
243             else if (memcmp(data, info->sip.s6_addr, 4) != 0)
244             {
245                 info->dip_flag = 1;
246                 memcpy(&info->sip, data, sizeof(info->sip));
247             }
248             length -= sizeof(info->sip);
249             data += sizeof(info->sip);
250         }
251         else
252             break;
253 
254         if (length >= sizeof(info->sport))
255         {
256             info->sport = *(uint16_t *)data;
257             length -= sizeof(info->sport);
258             data += sizeof(info->sport);
259         }
260         else
261             break;
262 
263         if (length >= sizeof(info->dip))
264         {
265             if (memcmp(data + 4, info->dip.s6_addr + 4, 12) == 0)
266             {
267                 if (memcmp(data, info->dip.s6_addr, 4) != 0)
268                 {
269                     info->dip_flag = 1;
270                     info->dip.s6_addr32[0] = info->dip.s6_addr32[1] = info->dip.s6_addr16[4] = 0;
271                     info->dip.s6_addr16[5] = 0xFFFF;
272                     info->dip.s6_addr32[3] = *(uint32_t*)data;
273                 }
274             }
275             else if (memcmp(data, info->dip.s6_addr, 4) != 0)
276             {
277                 info->dip_flag = 1;
278                 memcpy(&info->dip, data, sizeof(info->dip));
279             }
280             length -= sizeof(info->dip);
281             data += sizeof(info->dip);
282         }
283         else
284             break;
285 
286         if (length >= sizeof(info->dport))
287         {
288             info->dport = *(uint16_t *)data;
289             length -= sizeof(info->dport);
290             data += sizeof(info->dport);
291         }
292         else
293             break;
294     } while (0);
295 
296     if (info->protocol || info->sip_flag || info->sport || info->dip_flag || info->dport)
297     {
298         char sipstr[INET6_ADDRSTRLEN];
299         char dipstr[INET6_ADDRSTRLEN];
300 
301         sipstr[0] = '\0';
302         if (info->sip_flag)
303             inet_ntop(AF_INET6, &info->sip, sipstr, sizeof(sipstr));
304         else
305             snprintf(sipstr, sizeof(sipstr), "any");
306 
307         dipstr[0] = '\0';
308         if (info->dip_flag)
309             inet_ntop(AF_INET6, &info->dip, dipstr, sizeof(dipstr));
310         else
311             snprintf(dipstr, sizeof(dipstr), "any");
312 
313         LogMessage("Debugging %s with %s-%hu and %s-%hu %hhu\n", desc,
314                     sipstr, info->sport, dipstr, info->dport, info->protocol);
315         *debug_flag = 1;
316     }
317     else
318         LogMessage("Debugging %s disabled\n", desc);
319 }
320 
StreamDebugHA(uint16_t type,const uint8_t * data,uint32_t length,void ** new_context,char * statusBuf,int statusBuf_len)321 static int StreamDebugHA(uint16_t type, const uint8_t *data, uint32_t length, void **new_context, char *statusBuf, int statusBuf_len)
322 {
323     StreamHADebugParse("S5HA", data, length, &s5_ha_debug_flag, &s5_ha_debug_info);
324 
325     return 0;
326 }
327 
328 //--------------------------------------------------------------------
329 // Protocol-specific HA API
330 // could use an array here (and an enum instead of IPPROTO_*)
331 //--------------------------------------------------------------------
332 
333 static const HA_Api *s_tcp = NULL;
334 static const HA_Api *s_udp = NULL;
335 static const HA_Api *s_ip = NULL;
336 
ha_set_api(unsigned proto,const HA_Api * api)337 int ha_set_api(unsigned proto, const HA_Api *api)
338 {
339     switch (proto)
340     {
341         case IPPROTO_TCP:
342             s_tcp = api;
343             break;
344         case IPPROTO_UDP:
345             s_udp = api;
346             break;
347         case IPPROTO_IP:
348             s_ip = api;
349             break;
350         default:
351             return -1;
352     }
353     return 0;
354 }
355 
ha_get_api(unsigned proto)356 static inline const HA_Api *ha_get_api(unsigned proto)
357 {
358     switch (proto)
359     {
360         case IPPROTO_TCP:
361             return s_tcp;
362         case IPPROTO_UDP:
363             return s_udp;
364         case IPPROTO_ICMP:
365         case IPPROTO_IP:
366         default:
367             return s_ip;
368     }
369     return NULL;
370 }
371 
ha_print_key(const SessionKey * key,int rx,uint8_t event)372 static inline void ha_print_key(const SessionKey *key, int rx, uint8_t event)
373 {
374 #if STREAM_DEBUG_ENABLED
375     char ipA[INET6_ADDRSTRLEN], ipB[INET6_ADDRSTRLEN];
376 
377     if (key->ip_l[1] || key->ip_l[2] || key->ip_l[3] || key->ip_h[1] || key->ip_h[2] || key->ip_h[3])
378     {
379         sfip_raw_ntop(AF_INET6, key->ip_l, ipA, sizeof(ipA));
380         sfip_raw_ntop(AF_INET6, key->ip_h, ipB, sizeof(ipB));
381     }
382     else
383     {
384         sfip_raw_ntop(AF_INET, key->ip_l, ipA, sizeof(ipA));
385         sfip_raw_ntop(AF_INET, key->ip_h, ipB, sizeof(ipB));
386     }
387     LogMessage("%s flow %s message for %s:%hu <-> %s:%hu, Protocol %hhu\n",
388                rx ? "Receiving" : "Sending", (event == HA_EVENT_DELETE) ? "deletion" : "update",
389                ipA, key->port_l, ipB, key->port_h, key->protocol);
390 #endif
391 }
392 
RegisterSessionHAFuncs(uint32_t preproc_id,uint8_t subcode,uint8_t size,StreamHAProducerFunc produce,StreamHAConsumerFunc consume)393 int RegisterSessionHAFuncs(uint32_t preproc_id, uint8_t subcode, uint8_t size,
394                             StreamHAProducerFunc produce, StreamHAConsumerFunc consume)
395 {
396     StreamHAFuncsNode *node;
397     int i, idx;
398 
399     if (produce == NULL || consume == NULL)
400     {
401         FatalError("One must be both a producer and a consumer to participate in Stream HA!\n");
402     }
403 
404     if (preproc_id > UINT8_MAX)
405     {
406         FatalError("Preprocessor ID must be between 0 and %d to participate in Stream HA!\n", UINT8_MAX);
407     }
408 
409     idx = n_stream_ha_funcs;
410     for (i = 0; i < n_stream_ha_funcs; i++)
411     {
412         node = stream_ha_funcs[i];
413         if (node)
414         {
415             if (preproc_id == node->preproc_id && subcode == node->subcode)
416             {
417                 FatalError("Duplicate Stream HA registration attempt for preprocessor %hu with subcode %hu\n",
418                            (unsigned short)node->preproc_id, (unsigned short)node->subcode);
419             }
420         }
421         else if (idx == n_stream_ha_funcs)
422             idx = i;
423     }
424 
425     if (idx == MAX_STREAM_HA_FUNCS)
426     {
427         FatalError("Attempted to register more than %d Stream HA types!\n", MAX_STREAM_HA_FUNCS);
428     }
429 
430     if (idx == n_stream_ha_funcs)
431         n_stream_ha_funcs++;
432 
433     node = (StreamHAFuncsNode *) SnortAlloc(sizeof(StreamHAFuncsNode));
434     node->id = idx;
435     node->mask = (1 << idx);
436     node->preproc_id = (uint8_t) preproc_id;
437     node->subcode = subcode;
438     node->size = size;
439     node->produce = produce;
440     node->consume = consume;
441 
442     stream_ha_funcs[idx] = node;
443 
444     LogMessage("StreamHA: Registered node %hu for preprocessor ID %hhu with subcode %hhu (size %hhu)\n",
445                 node->id, node->preproc_id, node->subcode, node->size);
446 
447     return idx;
448 }
449 
UnregisterSessionHAFuncs(uint32_t preproc_id,uint8_t subcode)450 void UnregisterSessionHAFuncs(uint32_t preproc_id, uint8_t subcode)
451 {
452     StreamHAFuncsNode *node;
453     int i;
454 
455     for (i = 0; i < n_stream_ha_funcs; i++)
456     {
457         node = stream_ha_funcs[i];
458         if (node && preproc_id == node->preproc_id && subcode == node->subcode)
459         {
460             stream_ha_funcs[i] = NULL;
461             free(node);
462             break;
463         }
464     }
465 
466     if ((i + 1) == n_stream_ha_funcs)
467         n_stream_ha_funcs--;
468 }
469 
SessionSetHAPendingBit(void * ssnptr,int bit)470 void SessionSetHAPendingBit(void *ssnptr, int bit)
471 {
472     SessionControlBlock *scb = (SessionControlBlock*) ssnptr;
473 
474     if (!scb)
475         return;
476 
477     if (bit >= n_stream_ha_funcs || !stream_ha_funcs[bit])
478     {
479         FatalError("Attempted to set illegal HA pending bit %d!\n", bit);
480     }
481 
482     scb->ha_pending_mask |= (1 << bit);
483 }
484 
StreamParseHAArgs(SnortConfig * sc,SessionHAConfig * config,char * args)485 static void StreamParseHAArgs(SnortConfig *sc, SessionHAConfig *config, char *args)
486 {
487     char **toks;
488     int num_toks;
489     int i;
490     char **stoks = NULL;
491     int s_toks;
492     char *endPtr = NULL;
493     unsigned long int value;
494 
495     if (config == NULL)
496         return;
497 
498     if ((args == NULL) || (strlen(args) == 0))
499         return;
500 
501     toks = mSplit(args, ",", 0, &num_toks, 0);
502 
503     for (i = 0; i < num_toks; i++)
504     {
505         stoks = mSplit(toks[i], " ", 2, &s_toks, 0);
506 
507         if (s_toks == 0)
508         {
509             FatalError("%s(%d) => Missing parameter in Stream HA config.\n",
510                     file_name, file_line);
511         }
512 
513         if (!strcmp(stoks[0], "min_session_lifetime"))
514         {
515             if (stoks[1])
516                 value = strtoul(stoks[1], &endPtr, 10);
517             else
518                 value = 0;
519 
520             if (!stoks[1] || (endPtr == &stoks[1][0]) || *endPtr)
521             {
522                 FatalError("%s(%d) => Invalid '%s' in config file. Requires integer parameter.\n",
523                            file_name, file_line, stoks[0]);
524             }
525 
526             if (value > UINT16_MAX)
527             {
528                 FatalError("%s(%d) => '%s %lu' invalid: value must be between 0 and %d milliseconds.\n",
529                            file_name, file_line, stoks[0], value, UINT16_MAX);
530             }
531 
532             config->min_session_lifetime.tv_sec = 0;
533             while (value >= 1000)
534             {
535                 config->min_session_lifetime.tv_sec++;
536                 value -= 1000;
537             }
538             config->min_session_lifetime.tv_usec = value * 1000;
539         }
540         else if (!strcmp(stoks[0], "min_sync_interval"))
541         {
542             if (stoks[1])
543                 value = strtoul(stoks[1], &endPtr, 10);
544             else
545                 value = 0;
546 
547             if (!stoks[1] || (endPtr == &stoks[1][0]) || *endPtr)
548             {
549                 FatalError("%s(%d) => Invalid '%s' in config file. Requires integer parameter.\n",
550                            file_name, file_line, stoks[0]);
551             }
552 
553             if (value > UINT16_MAX)
554             {
555                 FatalError("%s(%d) => '%s %lu' invalid: value must be between 0 and %d milliseconds.\n",
556                            file_name, file_line, stoks[0], value, UINT16_MAX);
557             }
558 
559             config->min_sync_interval.tv_sec = 0;
560             while (value >= 1000)
561             {
562                 config->min_sync_interval.tv_sec++;
563                 value -= 1000;
564             }
565             config->min_sync_interval.tv_usec = value * 1000;
566         }
567         else if (!strcmp(stoks[0], "startup_input_file"))
568         {
569             if (!stoks[1])
570             {
571                 FatalError("%s(%d) => '%s' missing an argument\n", file_name, file_line, stoks[0]);
572             }
573             if (config->startup_input_file)
574             {
575                 FatalError("%s(%d) => '%s' specified multiple times\n", file_name, file_line, stoks[0]);
576             }
577             config->startup_input_file = SnortStrdup(stoks[1]);
578         }
579         else if (!strcmp(stoks[0], "runtime_output_file"))
580         {
581             if (!stoks[1])
582             {
583                 FatalError("%s(%d) => '%s' missing an argument\n", file_name, file_line, stoks[0]);
584             }
585             if (config->runtime_output_file)
586             {
587                 FatalError("%s(%d) => '%s' specified multiple times\n", file_name, file_line, stoks[0]);
588             }
589             config->runtime_output_file = SnortStrdup(stoks[1]);
590         }
591         else if (!strcmp(stoks[0], "shutdown_output_file"))
592         {
593             if (!stoks[1])
594             {
595                 FatalError("%s(%d) => '%s' missing an argument\n", file_name, file_line, stoks[0]);
596             }
597             if (config->shutdown_output_file)
598             {
599                 FatalError("%s(%d) => '%s' specified multiple times\n", file_name, file_line, stoks[0]);
600             }
601             config->shutdown_output_file = SnortStrdup(stoks[1]);
602         }
603         else if (!strcmp(stoks[0], "use_side_channel"))
604         {
605 #ifdef SIDE_CHANNEL
606             if (!sc->side_channel_config.enabled)
607             {
608                 FatalError("%s(%d) => '%s' cannot be specified without enabling the Snort side channel.\n",
609                             file_name, file_line, stoks[0]);
610             }
611             config->use_side_channel = 1;
612 #else
613             FatalError("%s(%d) => Snort has been compiled without Side Channel support.\n", file_name, file_line);
614 #endif
615         }
616         else if (!strcmp(stoks[0], "use_daq"))
617         {
618 #ifdef HAVE_DAQ_EXT_MODFLOW
619             config->use_daq = 1;
620 #else
621             FatalError("%s(%d) => Snort has been compiled against a LibDAQ version without extended flow modifier support.\n",
622                         file_name, file_line);
623 #endif
624         }
625         else
626         {
627             FatalError("%s(%d) => Invalid Stream HA config option '%s'\n",
628                     file_name, file_line, stoks[0]);
629         }
630 
631         mSplitFree(&stoks, s_toks);
632     }
633 
634     mSplitFree(&toks, num_toks);
635 #ifdef REG_TEST
636     if(sc->ha_out)
637     {
638         if(config->runtime_output_file)
639             free(config->runtime_output_file);
640         config->runtime_output_file = SnortStrdup(sc->ha_out);
641 	}
642     if(sc->ha_in)
643     {
644         if(config->startup_input_file)
645             free(config->startup_input_file);
646         config->startup_input_file = SnortStrdup(sc->ha_in);
647     }
648    if(sc->ha_pdts_in)
649    {
650        if(config->runtime_input_file)
651            free(config->runtime_input_file);
652        config->runtime_input_file = SnortStrdup(sc->ha_pdts_in);
653    }
654 #endif
655 
656 }
657 
StreamPrintHAConfig(SessionHAConfig * config)658 static void StreamPrintHAConfig(SessionHAConfig *config)
659 {
660     if (config == NULL)
661         return;
662 
663     LogMessage("Stream HA config:\n");
664     LogMessage("    Minimum Session Lifetime: %lu milliseconds\n",
665                 config->min_session_lifetime.tv_sec * 1000 + config->min_session_lifetime.tv_usec / 1000);
666     LogMessage("    Minimum Sync Interval: %lu milliseconds\n",
667                 config->min_sync_interval.tv_sec * 1000 + config->min_sync_interval.tv_usec / 1000);
668     if (config->startup_input_file)
669         LogMessage("    Startup Input File:    %s\n", config->startup_input_file);
670     if (config->runtime_output_file)
671         LogMessage("    Runtime Output File:   %s\n", config->runtime_output_file);
672     if (config->shutdown_output_file)
673         LogMessage("    Shutdown Output File:  %s\n", config->shutdown_output_file);
674 #ifdef SIDE_CHANNEL
675     LogMessage("    Using Side Channel:    %s\n", config->use_side_channel ? "Yes" : "No");
676 #endif
677     LogMessage("    Using DAQ:             %s\n", config->use_daq ? "Yes" : "No");
678 #ifdef REG_TEST
679     LogMessage("    Stream LWS HA Data Size: %zu\n", sizeof(StreamHASession));
680 #endif
681 }
682 
SessionPrintHAStats(void)683 void SessionPrintHAStats(void)
684 {
685     StreamHAFuncsNode *node;
686     int i;
687 
688     LogMessage("  High Availability\n");
689     LogMessage("          Updates Received: %u\n", s5ha_stats.update_messages_received);
690     LogMessage("Updates Received (No Session): %u\n", s5ha_stats.update_messages_received_no_session);
691     LogMessage("        Deletions Received: %u\n", s5ha_stats.delete_messages_received);
692     LogMessage("  Updates Sent Immediately: %u\n", s5ha_stats.update_messages_sent_immediately);
693     LogMessage("     Updates Sent Normally: %u\n", s5ha_stats.update_messages_sent_normally);
694     LogMessage("            Deletions Sent: %u\n", s5ha_stats.delete_messages_sent);
695     LogMessage("        Deletions Not Sent: %u\n", s5ha_stats.delete_messages_not_sent);
696     for (i = 0; i < n_stream_ha_funcs; i++)
697     {
698         node = stream_ha_funcs[i];
699         if (!node)
700             continue;
701         LogMessage("        Node %hhu/%hhu: %u produced, %u consumed\n",
702                     node->preproc_id, node->subcode, node->produced, node->consumed);
703     }
704 }
705 
SessionResetHAStats(void)706 void SessionResetHAStats(void)
707 {
708     memset(&s5ha_stats, 0, sizeof(s5ha_stats));
709 }
710 
711 #ifdef HAVE_DAQ_EXT_MODFLOW
SessionHAMetaEval(int type,const uint8_t * data)712 static void SessionHAMetaEval(int type, const uint8_t *data)
713 {
714     DAQ_HA_State_Data_t *daqState;
715 
716     if (type != DAQ_METAHDR_TYPE_HA_STATE)
717         return;
718 
719     daqState = (DAQ_HA_State_Data_t *) data;
720 
721     if (daqState->length == 0 || daqState->length >= UINT16_MAX || !daqState->data)
722         return;
723 
724     /* Ignore the return value from consuming the message - it will print out
725         errors and there's nothing we can do about it. */
726     ConsumeHAMessage(daqState->data, daqState->length);
727 }
728 #endif
729 
730 #ifdef HAVE_DAQ_QUERYFLOW
731 #ifdef REG_TEST
SessionHAQueryDAQState(DAQ_PktHdr_t * pkthdr)732 int SessionHAQueryDAQState( DAQ_PktHdr_t *pkthdr)
733 #else
734 int SessionHAQueryDAQState(const DAQ_PktHdr_t *pkthdr)
735 #endif
736 {
737     DAQ_QueryFlow_t query;
738     DAQ_HA_State_Data_t daqHAState;
739     int rval;
740 
741     query.type = DAQ_QUERYFLOW_TYPE_HA_STATE;
742     query.length = sizeof(daqHAState);
743     query.value = &daqHAState;
744 
745 #ifdef REG_TEST
746     pkthdr->priv_ptr = &runtime_input_fd;
747 #endif
748 
749     if ((rval = DAQ_QueryFlow(pkthdr, &query)) == DAQ_SUCCESS)
750     {
751         if (daqHAState.length == 0 || daqHAState.length >= UINT16_MAX || !daqHAState.data)
752             return -EINVAL;
753         rval = ConsumeHAMessage(daqHAState.data, daqHAState.length);
754     }
755 
756     return rval;
757 }
758 #endif
759 
SessionHAInit(struct _SnortConfig * sc,char * args)760 void SessionHAInit( struct _SnortConfig *sc, char *args )
761 {
762     if( session_configuration == NULL )
763         FatalError("Tried to config Session HA policy without core Session config!\n");
764 
765     if( session_configuration->ha_config != NULL )
766         FatalError("%s(%d) ==> Cannot duplicate Sesssion HA configuration\n", file_name, file_line);
767 
768     // if HA not enabled for session, then we are out of here...
769     if( !session_configuration->enable_ha )
770         return;
771 
772     session_configuration->ha_config = ( SessionHAConfig * ) SnortAlloc( sizeof( SessionHAConfig ) );
773     StreamParseHAArgs(sc, session_configuration->ha_config, args);
774 
775 #ifdef PERF_PROFILING
776     RegisterPreprocessorProfile("sessionHAProduce", &sessionHAProducePerfStats, 2, &sessionHAPerfStats, NULL);
777     RegisterPreprocessorProfile("sessionHAConsume", &sessionHAConsumePerfStats, 0, &totalPerfStats, NULL);
778 #endif
779 
780     ControlSocketRegisterHandler(CS_TYPE_DEBUG_STREAM_HA, StreamDebugHA, NULL, NULL);
781     LogMessage("Registered StreamHA Debug control socket message type (0x%x)\n", CS_TYPE_DEBUG_STREAM_HA);
782 
783 #ifdef HAVE_DAQ_EXT_MODFLOW
784     if (session_configuration->ha_config->use_daq)
785     {
786         AddFuncToPreprocMetaEvalList(sc, SessionHAMetaEval, PP_SESSION_PRIORITY, PP_SESSION);
787         //Do not send delete notification on RST when there is a underlying data plane.
788         HA_CRITICAL_SESSION_FLAGS &= ~SSNFLAG_RESET;
789     }
790 #endif
791 
792     StreamPrintHAConfig(session_configuration->ha_config);
793 }
794 
795 #if defined(SNORT_RELOAD)
SessionHAReload(struct _SnortConfig * sc,char * args,void ** new_config)796 void SessionHAReload(struct _SnortConfig *sc, char *args, void **new_config)
797 {
798     SessionHAConfig *session_ha_config = ( SessionHAConfig * ) *new_config;
799 
800     // session ha config is only in default policy... just return if not default
801     if( getParserPolicy( sc ) != getDefaultPolicy( ) )
802         return;
803 
804     // if HA not enabled for session, then we are out of here...
805     if( !session_configuration->enable_ha )
806         return;
807 
808     if ( session_ha_config == NULL )
809     {
810         session_ha_config = ( SessionHAConfig * ) SnortAlloc( sizeof( SessionHAConfig ) );
811         if ( session_ha_config == NULL )
812             FatalError("Failed to allocate storage for Session HA configuration.\n");
813 
814         StreamParseHAArgs( sc, session_ha_config, args );
815         StreamPrintHAConfig( session_ha_config );
816        *new_config = session_ha_config;
817 
818     }
819     else
820     {
821         FatalError("%s(%d) ==> Cannot duplicate Sesssion HA configuration\n", file_name, file_line);
822     }
823 }
824 #endif
825 
SessionVerifyHAConfig(struct _SnortConfig * sc,void * swap_config)826 int SessionVerifyHAConfig(struct _SnortConfig *sc, void *swap_config)
827 {
828     if (swap_config == NULL)
829         return -1;
830 
831     return 0;
832 }
833 
SessionHASwapReload(struct _SnortConfig * sc,void * data)834 void *SessionHASwapReload( struct _SnortConfig *sc, void *data )
835 {
836     session_configuration->ha_config = ( SessionHAConfig * ) data;
837     return NULL;
838 }
839 
840 
SessionHAConfigFree(void * data)841 void SessionHAConfigFree( void *data )
842 {
843     SessionHAConfig *config = ( SessionHAConfig * ) data;
844 
845     if (config == NULL)
846         return;
847 
848     if (config->startup_input_file)
849         free(config->startup_input_file);
850 
851     if (config->runtime_output_file)
852         free(config->runtime_output_file);
853 
854     if (config->shutdown_output_file)
855         free(config->shutdown_output_file);
856 #ifdef REG_TEST
857     if (config->runtime_input_file)
858         free(config->runtime_input_file);
859 #endif
860 
861     free(config);
862 }
863 
864 // This MUST have the exact same logic as createSessionKeyFromPktHeader()
IsClientLower(const sfaddr_t * cltIP,uint16_t cltPort,const sfaddr_t * srvIP,uint16_t srvPort,char proto)865 static inline bool IsClientLower(const sfaddr_t *cltIP, uint16_t cltPort,
866                                  const sfaddr_t *srvIP, uint16_t srvPort, char proto)
867 {
868     if (sfip_fast_lt6(cltIP, srvIP))
869         return true;
870 
871     if (sfip_fast_lt6(srvIP, cltIP))
872         return false;
873 
874     switch (proto)
875     {
876         case IPPROTO_TCP:
877         case IPPROTO_UDP:
878             if (cltPort < srvPort)
879                 return true;
880     }
881     return false;
882 }
883 
DeserializeHASession(const SessionKey * key,const StreamHASession * has,SessionControlBlock * scb)884 static SessionControlBlock *DeserializeHASession(const SessionKey *key,
885                                                  const StreamHASession *has,
886                                                  SessionControlBlock *scb)
887 {
888     SessionControlBlock *retSsn;
889     int family;
890 
891     if (!scb)
892     {
893         const HA_Api *api;
894 
895         api = ha_get_api(key->protocol);
896         retSsn = api->create_session(key);
897 
898         retSsn->ha_flags &= ~HA_FLAG_NEW;
899         retSsn->ha_flags |= HA_FLAG_STANDBY;
900 
901         family = (has->flags & HA_SESSION_FLAG_IP6) ? AF_INET6 : AF_INET;
902         if (has->flags & HA_SESSION_FLAG_LOW)
903         {
904             sfip_set_raw(&retSsn->server_ip, retSsn->key->ip_l, family);
905             sfip_set_raw(&retSsn->client_ip, retSsn->key->ip_h, family);
906             retSsn->server_port = retSsn->key->port_l;
907             retSsn->client_port = retSsn->key->port_h;
908         }
909         else
910         {
911             sfip_set_raw(&retSsn->client_ip, retSsn->key->ip_l, family);
912             sfip_set_raw(&retSsn->server_ip, retSsn->key->ip_h, family);
913             retSsn->client_port = retSsn->key->port_l;
914             retSsn->server_port = retSsn->key->port_h;
915         }
916     }
917     else
918         retSsn = scb;
919 
920     retSsn->ha_state = has->ha_state;
921 
922     return retSsn;
923 }
924 
DeserializePreprocData(uint8_t event,SessionControlBlock * scb,uint8_t preproc_id,uint8_t subcode,const uint8_t * data,uint8_t length)925 static inline int DeserializePreprocData(uint8_t event, SessionControlBlock *scb, uint8_t preproc_id,
926                                          uint8_t subcode, const uint8_t *data, uint8_t length)
927 {
928     StreamHAFuncsNode *node;
929     int i;
930 
931     for (i = 0; i < n_stream_ha_funcs; i++)
932     {
933         node = stream_ha_funcs[i];
934         if (node && preproc_id == node->preproc_id && subcode == node->subcode)
935         {
936             if (node->size < length)
937             {
938                 ErrorMessage("Stream HA preprocessor data record's length exceeds expected size! (%u vs %u)\n",
939                         length, node->size);
940                 return -1;
941             }
942             node->consumed++;
943             return node->consume(scb, data, length);
944         }
945     }
946 
947     ErrorMessage("Stream HA preprocessor data record received with unrecognized preprocessor ID/subcode! (%hhu:%hhu)\n",
948             preproc_id, subcode);
949     return -1;
950 }
951 
ConsumeHAMessage(const uint8_t * msg,uint32_t msglen)952 static int ConsumeHAMessage(const uint8_t *msg, uint32_t msglen)
953 {
954     const HA_Api *api;
955     StreamHASession *has;
956     SessionControlBlock *scb;
957     SessionKey key;
958     MsgHeader *msg_hdr;
959     RecordHeader *rec_hdr;
960     PreprocDataHeader *psd_hdr;
961     uint32_t offset;
962     bool debug_flag;
963     int rval = 1;
964     PROFILE_VARS;
965 
966     PREPROC_PROFILE_START(sessionHAConsumePerfStats);
967 
968     /* Read the message header */
969     if (msglen < sizeof(*msg_hdr))
970     {
971         ErrorMessage("Stream HA message length shorter than header length! (%u)\n", msglen);
972         goto consume_exit;
973     }
974     msg_hdr = (MsgHeader *) msg;
975     offset = sizeof(*msg_hdr);
976 
977     if (msg_hdr->total_length != msglen)
978     {
979         ErrorMessage("Stream HA message header's total length does not match actual length! (%u vs %u)\n",
980                 msg_hdr->total_length, msglen);
981         goto consume_exit;
982     }
983 
984     if (msg_hdr->version != HA_MESSAGE_VERSION)
985     {
986         ErrorMessage("Stream HA message has unrecognized version: %hhu!\n", msg_hdr->version);
987         goto consume_exit;
988     }
989 
990     if (msg_hdr->event != HA_EVENT_UPDATE && msg_hdr->event != HA_EVENT_DELETE)
991     {
992         ErrorMessage("Stream HA message has unknown event type: %hhu!\n", msg_hdr->event);
993         goto consume_exit;
994     }
995 
996     /* Read the key */
997     if (msg_hdr->key_size == IP4_SESSION_KEY_SIZE) /* IPv4, miniature key */
998     {
999         /* Lower IPv4 address */
1000         memcpy(&key.ip_l[3], msg + offset, 4);
1001         key.ip_l[0] = key.ip_l[1] = 0;
1002         key.ip_l[2] = htonl(0xFFFF);
1003         offset += 4;
1004         /* Higher IPv4 address */
1005         memcpy(&key.ip_h[3], msg + offset, 4);
1006         key.ip_h[0] = key.ip_h[1] = 0;
1007         key.ip_h[2] = htonl(0xFFFF);
1008         offset += 4;
1009         /* The remainder of the key */
1010         memcpy(((uint8_t *) &key) + 32, msg + offset, IP4_SESSION_KEY_SIZE - 8);
1011         offset += IP4_SESSION_KEY_SIZE - 8;
1012     }
1013     else if (msg_hdr->key_size == IP6_SESSION_KEY_SIZE) /* IPv6, full-size key */
1014     {
1015         memcpy(&key, msg + offset, IP6_SESSION_KEY_SIZE);
1016         offset += IP6_SESSION_KEY_SIZE;
1017     }
1018     else
1019     {
1020         ErrorMessage("Stream HA message has unrecognized key size: %hhu!\n", msg_hdr->key_size);
1021         goto consume_exit;
1022     }
1023 
1024     debug_flag = StreamHADebugCheck(&key, s5_ha_debug_flag, &s5_ha_debug_info, s5_ha_debug_session, sizeof(s5_ha_debug_session));
1025 
1026     api = ha_get_api(key.protocol);
1027     if (!api)
1028     {
1029         ErrorMessage("Stream HA message has unhandled protocol: %u!\n", key.protocol);
1030         goto consume_exit;
1031     }
1032 
1033     if (msg_hdr->event == HA_EVENT_DELETE)
1034     {
1035         if (debug_flag)
1036             LogMessage("S5HADbg Consuming deletion message for %s\n", s5_ha_debug_session);
1037         if (offset != msglen)
1038         {
1039             ErrorMessage("Stream HA deletion message contains extraneous data! (%u bytes)\n", msglen - offset);
1040             goto consume_exit;
1041         }
1042         s5ha_stats.delete_messages_received++;
1043         rval = api->delete_session(&key);
1044         if (debug_flag)
1045         {
1046             if (!rval)
1047                 LogMessage("S5HADbg Deleted LWSession for %s\n", s5_ha_debug_session);
1048             else
1049                 LogMessage("S5HADbg Could not delete LWSession for %s\n", s5_ha_debug_session);
1050         }
1051         goto consume_exit;
1052     }
1053 
1054     if (debug_flag)
1055         LogMessage("S5HADbg Consuming update message for %s\n", s5_ha_debug_session);
1056 
1057     if (pkt_trace_enabled)
1058         addPktTraceData(VERDICT_REASON_NO_BLOCK, snprintf(trace_line, MAX_TRACE_LINE,
1059                      "Recovered session \n"));
1060 
1061     scb = api->get_lws(&key);
1062 
1063     /* Read any/all records. */
1064     while (offset < msglen)
1065     {
1066         if (sizeof(*rec_hdr) > (msglen - offset))
1067         {
1068             ErrorMessage("Stream HA message contains a truncated record header! (%zu vs %u)\n",
1069                     sizeof(*rec_hdr), msglen - offset);
1070             goto consume_exit;
1071         }
1072         rec_hdr = (RecordHeader *) (msg + offset);
1073         offset += sizeof(*rec_hdr);
1074 
1075         switch (rec_hdr->type)
1076         {
1077             case HA_TYPE_HAS:
1078                 if (rec_hdr->length != sizeof(*has))
1079                 {
1080                     ErrorMessage("Stream HA message contains incorrectly size HA Session record! (%u vs %zu)\n",
1081                             rec_hdr->length, sizeof(*has));
1082                     goto consume_exit;
1083                 }
1084                 if (rec_hdr->length > (msglen - offset))
1085                 {
1086                     ErrorMessage("Stream HA message contains truncated HA Session record data! (%u vs %u)\n",
1087                             rec_hdr->length, msglen - offset);
1088                     goto consume_exit;
1089                 }
1090                 has = (StreamHASession *) (msg + offset);
1091                 offset += rec_hdr->length;
1092                 if (debug_flag)
1093                 {
1094 #ifdef TARGET_BASED
1095                     LogMessage("S5HADbg %s Session for %s - SF=0x%x IPP=0x%hx AP=0x%hx DIR=%hhu IDIR=%hhu\n",
1096                                 (scb) ? "Updating" : "Creating", s5_ha_debug_session, has->ha_state.session_flags,
1097                                 has->ha_state.ipprotocol, has->ha_state.application_protocol,
1098                                 has->ha_state.direction, has->ha_state.ignore_direction);
1099 #else
1100                     LogMessage("S5HADbg %s LWSession for %s - SF=0x%x DIR=%hhu IDIR=%hhu\n",
1101                                 (lwssn) ? "Updating" : "Creating", s5_ha_debug_session, has->ha_state.session_flags,
1102                                 has->ha_state.direction, has->ha_state.ignore_direction);
1103 #endif
1104                 }
1105                 if(!scb)
1106                 {
1107                     if (has->ha_state.session_flags & SSNFLAG_COUNTED_CLOSING)
1108                         sfBase.iSessionsClosing++;
1109                     else if (has->ha_state.session_flags & SSNFLAG_COUNTED_ESTABLISH)
1110                         sfBase.iSessionsEstablished++;
1111                     else if (has->ha_state.session_flags & SSNFLAG_COUNTED_INITIALIZE)
1112                         sfBase.iSessionsInitializing++;
1113                 }
1114                 scb = DeserializeHASession(&key, has, scb);
1115                 scb->session_established = true;
1116                 break;
1117 
1118             case HA_TYPE_PSD:
1119                 if (!scb)
1120                 {
1121                     //ErrorMessage("Stream HA message with preprocessor data record received for non-existent session!\n");
1122                     s5ha_stats.update_messages_received_no_session++;
1123                     goto consume_exit;
1124                 }
1125                 if (sizeof(*psd_hdr) > (msglen - offset))
1126                 {
1127                     ErrorMessage("Stream HA message contains a truncated preprocessor data record header! (%zu vs %u)\n",
1128                             sizeof(*psd_hdr), msglen - offset);
1129                     goto consume_exit;
1130                 }
1131                 psd_hdr = (PreprocDataHeader *) (msg + offset);
1132                 offset += sizeof(*psd_hdr);
1133                 if (rec_hdr->length > (msglen - offset))
1134                 {
1135                     ErrorMessage("Stream HA message contains truncated preprocessor data record data! (%u vs %u)\n",
1136                             rec_hdr->length, msglen - offset);
1137                     goto consume_exit;
1138                 }
1139                 if (debug_flag)
1140                 {
1141                     LogMessage("S5HADbg Consuming %hhu byte preprocessor data record for %s with PPID=%hhu and SC=%hhu\n",
1142                                 rec_hdr->length, s5_ha_debug_session, psd_hdr->preproc_id, psd_hdr->subcode);
1143                 }
1144                 if (DeserializePreprocData(msg_hdr->event, scb, psd_hdr->preproc_id, psd_hdr->subcode,
1145                                             msg + offset, rec_hdr->length) != 0)
1146                 {
1147                     ErrorMessage("Stream HA message contained invalid preprocessor data record!\n");
1148                     goto consume_exit;
1149                 }
1150                 offset += rec_hdr->length;
1151                 break;
1152 
1153             default:
1154                 ErrorMessage("Stream HA message contains unrecognized record type: %hhu!\n", rec_hdr->type);
1155                 goto consume_exit;
1156         }
1157     }
1158     /* Mark the session as being in standby mode since we just received an update. */
1159     if (scb && !(scb->ha_flags & HA_FLAG_STANDBY))
1160     {
1161         if (api->deactivate_session)
1162             api->deactivate_session(scb);
1163         scb->ha_flags |= HA_FLAG_STANDBY;
1164     }
1165 
1166     s5ha_stats.update_messages_received++;
1167     rval = 0;
1168 
1169 consume_exit:
1170     PREPROC_PROFILE_END(sessionHAConsumePerfStats);
1171     return rval;
1172 }
1173 
1174 /*
1175  * File I/O
1176  */
Read(int fd,void * buf,size_t count)1177 static inline ssize_t Read(int fd, void *buf, size_t count)
1178 {
1179     ssize_t n;
1180     errno = 0;
1181 
1182     while ((n = read(fd, buf, count)) <= (ssize_t) count)
1183     {
1184         if (n == (ssize_t) count)
1185             return 0;
1186 
1187         if (n > 0)
1188         {
1189             buf = (uint8_t *) buf + n;
1190             count -= n;
1191         }
1192         else if (n == 0)
1193             break;
1194         else if (errno != EINTR)
1195         {
1196             ErrorMessage("Error reading from Stream HA message file: %s (%d)\n", strerror(errno), errno);
1197             break;
1198         }
1199     }
1200     return -1;
1201 }
1202 
ReadHAMessagesFromFile(const char * filename)1203 static int ReadHAMessagesFromFile(const char *filename)
1204 {
1205     MsgHeader *msg_header;
1206     uint8_t *msg;
1207     int rval, fd;
1208 
1209     fd = open(filename, O_RDONLY, 0664);
1210     if (fd < 0)
1211     {
1212         FatalError("Could not open %s for reading HA messages from: %s (%d)\n", filename, strerror(errno), errno);
1213     }
1214 
1215     LogMessage("Reading Stream HA messages from '%s'...\n", filename);
1216     msg = file_io_buffer;
1217     while ((rval = Read(fd, msg, sizeof(*msg_header))) == 0)
1218     {
1219         msg_header = (MsgHeader *) msg;
1220         if (msg_header->total_length < sizeof(*msg_header))
1221         {
1222             ErrorMessage("Stream HA Message total length (%hu) is way too short!\n", msg_header->total_length);
1223             close(fd);
1224             return -1;
1225         }
1226         else if (msg_header->total_length > (UINT16_MAX - sizeof(*msg_header)))
1227         {
1228             ErrorMessage("Stream HA Message total length (%hu) is too long!\n", msg_header->total_length);
1229             close(fd);
1230             return -1;
1231         }
1232         else if (msg_header->total_length > sizeof(*msg_header))
1233         {
1234         	if ((rval = Read(fd, msg + sizeof(*msg_header), msg_header->total_length - sizeof(*msg_header))) != 0)
1235         	{
1236             	ErrorMessage("Error reading the remaining %zu bytes of an HA message from file: %s (%d)\n",
1237                     msg_header->total_length - sizeof(*msg_header), strerror(errno), errno);
1238             	close(fd);
1239             	return rval;
1240             }
1241         }
1242 
1243         if ((rval = ConsumeHAMessage(msg, msg_header->total_length)) != 0)
1244         {
1245             close(fd);
1246             return rval;
1247         }
1248     }
1249     close(fd);
1250 
1251     return 0;
1252 }
1253 
Write(int fd,const void * buf,size_t count)1254 static inline ssize_t Write(int fd, const void *buf, size_t count)
1255 {
1256     ssize_t n;
1257     errno = 0;
1258 
1259     while ((n = write(fd, buf, count)) <= (ssize_t) count)
1260     {
1261         if (n == (ssize_t) count)
1262             return 0;
1263 
1264         if (n > 0)
1265             count -= n;
1266         else if (errno != EINTR)
1267         {
1268             ErrorMessage("Error writing to Stream HA message file: %s (%d)\n", strerror(errno), errno);
1269             break;
1270         }
1271     }
1272 
1273     return -1;
1274 }
1275 
WriteHAMessageHeader(uint8_t event,uint16_t msglen,const SessionKey * key,uint8_t * msg)1276 static uint32_t WriteHAMessageHeader(uint8_t event, uint16_t msglen, const SessionKey *key, uint8_t *msg)
1277 {
1278     MsgHeader *msg_hdr;
1279     uint32_t offset;
1280 
1281     msg_hdr = (MsgHeader *) msg;
1282     offset = sizeof(*msg_hdr);
1283     msg_hdr->event = event;
1284     msg_hdr->version = HA_MESSAGE_VERSION;
1285     msg_hdr->total_length = msglen;
1286     msg_hdr->key_type = HA_TYPE_KEY;
1287 
1288     if (key->ip_l[0] || key->ip_l[1] || key->ip_l[2] != htonl(0xFFFF) ||
1289         key->ip_h[0] || key->ip_h[1] || key->ip_h[2] != htonl(0xFFFF))
1290     {
1291         msg_hdr->key_size = IP6_SESSION_KEY_SIZE;
1292         memcpy(msg + offset, key, IP6_SESSION_KEY_SIZE);
1293         offset += IP6_SESSION_KEY_SIZE;
1294     }
1295     else
1296     {
1297         msg_hdr->key_size = IP4_SESSION_KEY_SIZE;
1298         memcpy(msg + offset, &key->ip_l[3], sizeof(key->ip_l[3]));
1299         offset += sizeof(key->ip_l[3]);
1300         memcpy(msg + offset, &key->ip_h[3], sizeof(key->ip_h[3]));
1301         offset += sizeof(key->ip_h[3]);
1302         memcpy(msg + offset, ((uint8_t *) key) + 32, IP4_SESSION_KEY_SIZE - 8);
1303         offset += IP4_SESSION_KEY_SIZE - 8;
1304     }
1305     return offset;
1306 }
1307 
UpdateHAMessageHeaderLength(uint8_t * msg,uint16_t msglen)1308 static void UpdateHAMessageHeaderLength(uint8_t *msg, uint16_t msglen)
1309 {
1310     MsgHeader *msg_hdr;
1311 
1312     msg_hdr = (MsgHeader *) msg;
1313     msg_hdr->total_length = msglen;
1314 }
1315 
WriteHASession(SessionControlBlock * scb,uint8_t * msg)1316 static uint32_t WriteHASession(SessionControlBlock *scb, uint8_t *msg)
1317 {
1318     StreamHASession *has;
1319     RecordHeader *rec_hdr;
1320     uint32_t offset;
1321 
1322     rec_hdr = (RecordHeader *) msg;
1323     offset = sizeof(*rec_hdr);
1324     rec_hdr->type = HA_TYPE_HAS;
1325     rec_hdr->length = sizeof(*has);
1326 
1327     has = (StreamHASession *) (msg + offset);
1328     offset += sizeof(*has);
1329     has->ha_state = scb->ha_state;
1330 
1331     if (!IsClientLower(&scb->client_ip, scb->client_port, &scb->server_ip, scb->server_port, scb->key->protocol))
1332         has->flags |= HA_SESSION_FLAG_LOW;
1333 
1334     has->flags |= HA_SESSION_FLAG_IP6;
1335 
1336     return offset;
1337 }
1338 
WritePreprocDataRecord(SessionControlBlock * scb,StreamHAFuncsNode * node,uint8_t * msg,bool forced)1339 static uint32_t WritePreprocDataRecord(SessionControlBlock *scb, StreamHAFuncsNode *node, uint8_t *msg, bool forced)
1340 {
1341     RecordHeader *rec_hdr;
1342     PreprocDataHeader *psd_hdr;
1343     uint32_t offset;
1344 
1345     rec_hdr = (RecordHeader *) msg;
1346     offset = sizeof(*rec_hdr);
1347     rec_hdr->type = HA_TYPE_PSD;
1348 
1349     psd_hdr = (PreprocDataHeader *) (msg + offset);
1350     offset += sizeof(*psd_hdr);
1351     psd_hdr->preproc_id = node->preproc_id;
1352     psd_hdr->subcode = node->subcode;
1353 
1354     rec_hdr->length = node->produce(scb, msg + offset);
1355     /* If this was a forced record generation (the preprocessor did not indicate pending HA state data), as in the case
1356         of a full HA record generation for session pickling, return a 0 offset if there is no data so that space is not
1357         wasted on the PSD header. */
1358     if (rec_hdr->length == 0 && forced)
1359         return 0;
1360     offset += rec_hdr->length;
1361     node->produced++;
1362 
1363     return offset;
1364 }
1365 
CalculateHAMessageSize(uint8_t event,SessionControlBlock * scb)1366 static uint32_t CalculateHAMessageSize(uint8_t event, SessionControlBlock *scb)
1367 {
1368     StreamHAFuncsNode *node;
1369     SessionKey *key;
1370     uint32_t msg_size;
1371     int idx;
1372 
1373     key = scb->key;
1374 
1375     /* Header (including the key).  IPv4 keys are miniaturized. */
1376     msg_size = sizeof(MsgHeader);
1377     if (key->ip_l[0] || key->ip_l[1] || key->ip_l[2] != htonl(0xFFFF) ||
1378         key->ip_h[0] || key->ip_h[1] || key->ip_h[2] != htonl(0xFFFF))
1379         msg_size += IP6_SESSION_KEY_SIZE;
1380     else
1381         msg_size += IP4_SESSION_KEY_SIZE;
1382 
1383     if (event == HA_EVENT_UPDATE)
1384     {
1385         /* HA Session record */
1386         //if (scb->ha_flags & HA_FLAG_MODIFIED)
1387             msg_size += sizeof(RecordHeader) + sizeof(StreamHASession);
1388 
1389         /* Preprocessor data records */
1390         for (idx = 0; idx < n_stream_ha_funcs; idx++)
1391         {
1392             if (scb->ha_pending_mask & (1 << idx))
1393             {
1394                 node = stream_ha_funcs[idx];
1395                 if (!node)
1396                     continue;
1397                 msg_size += sizeof(RecordHeader) + sizeof(PreprocDataHeader) + node->size;
1398             }
1399         }
1400     }
1401 
1402 #ifdef DEBUG_HA_PRINT
1403     printf("Calculated msg_size = %u (%zu, %zu, %zu, %zu, %zu, %zu, %zu)\n", msg_size, IP4_SESSION_KEY_SIZE, IP6_SESSION_KEY_SIZE,
1404             sizeof(MsgHeader), sizeof(RecordHeader), sizeof(StreamHASession), sizeof(StreamHAState), sizeof(bool));
1405 #endif
1406     return msg_size;
1407 }
1408 
GenerateHADeletionMessage(uint8_t * msg,uint32_t msg_size,SessionControlBlock * scb)1409 static uint32_t GenerateHADeletionMessage(uint8_t *msg, uint32_t msg_size, SessionControlBlock *scb)
1410 {
1411     uint32_t msglen;
1412     PROFILE_VARS;
1413 
1414     PREPROC_PROFILE_START(sessionHAProducePerfStats);
1415 
1416     msglen = WriteHAMessageHeader(HA_EVENT_DELETE, msg_size, scb->key, msg);
1417 
1418     PREPROC_PROFILE_END(sessionHAProducePerfStats);
1419 
1420     return msglen;
1421 }
1422 
1423 #ifdef SIDE_CHANNEL
SendSCDeletionMessage(SessionControlBlock * scb,uint32_t msg_size)1424 static void SendSCDeletionMessage(SessionControlBlock *scb, uint32_t msg_size)
1425 {
1426     SCMsgHdr *sc_hdr;
1427     void *msg_handle;
1428     uint8_t *msg;
1429     int rval;
1430 
1431     /* Allocate space for the message. */
1432     if ((rval = SideChannelPreallocMessageTX(msg_size, &sc_hdr, &msg, &msg_handle)) != 0)
1433     {
1434         /* TODO: Error stuff goes here. */
1435         return;
1436     }
1437 
1438     /* Generate the message. */
1439     msg_size = GenerateHADeletionMessage(msg, msg_size, scb);
1440 
1441     /* Send the message. */
1442     sc_hdr->type = SC_MSG_TYPE_FLOW_STATE_TRACKING;
1443     sc_hdr->timestamp = packet_time();
1444     SideChannelEnqueueMessageTX(sc_hdr, msg, msg_size, msg_handle, NULL);
1445 }
1446 #endif
1447 
SessionHANotifyDeletion(SessionControlBlock * scb)1448 void SessionHANotifyDeletion(SessionControlBlock *scb)
1449 {
1450     uint32_t msg_size;
1451     bool debug_flag;
1452     PROFILE_VARS;
1453 
1454     PREPROC_PROFILE_START(sessionHAPerfStats);
1455 
1456     if ( !scb )
1457     {
1458         PREPROC_PROFILE_END(sessionHAPerfStats);
1459         return;
1460     }
1461 
1462     if ( !session_configuration->enable_ha )
1463     {
1464         PREPROC_PROFILE_END(sessionHAPerfStats);
1465         return;
1466     }
1467 
1468     /* Don't send a deletion notice if we've never sent an update for the flow, it is in standby, or we've already sent one. */
1469     if ( scb->ha_flags & ( HA_FLAG_NEW | HA_FLAG_STANDBY | HA_FLAG_DELETED ) )
1470     {
1471         s5ha_stats.delete_messages_not_sent++;
1472         PREPROC_PROFILE_END(sessionHAPerfStats);
1473         return;
1474     }
1475 
1476 #ifdef SIDE_CHANNEL
1477     if (session_configuration->ha_config->use_side_channel)
1478     {
1479         debug_flag = StreamHADebugCheck(scb->key, s5_ha_debug_flag, &s5_ha_debug_info,
1480                                      s5_ha_debug_session, sizeof(s5_ha_debug_session));
1481         if (debug_flag)
1482             LogMessage("S5HADbg Producing deletion message for %s\n",
1483                        s5_ha_debug_session);
1484     }
1485 #endif
1486 
1487     /* Calculate the size of the deletion message. */
1488     msg_size = CalculateHAMessageSize(HA_EVENT_DELETE, scb);
1489 
1490     if (runtime_output_fd >= 0)
1491     {
1492         msg_size = GenerateHADeletionMessage(file_io_buffer, msg_size, scb);
1493         if (Write(runtime_output_fd, file_io_buffer, msg_size) == -1)
1494         {
1495             /* TODO: Error stuff here. */
1496         }
1497     }
1498 
1499     if (session_configuration->ha_config->use_daq)
1500     {
1501         s5ha_stats.delete_messages_not_sent++;
1502         PREPROC_PROFILE_END(sessionHAPerfStats);
1503         return;
1504     }
1505 
1506 #ifdef SIDE_CHANNEL
1507     if (session_configuration->ha_config->use_side_channel)
1508     {
1509         SendSCDeletionMessage(scb, msg_size);
1510     }
1511 #endif
1512 
1513     s5ha_stats.delete_messages_sent++;
1514     scb->ha_flags |= HA_FLAG_DELETED;
1515 
1516     PREPROC_PROFILE_END(sessionHAPerfStats);
1517 }
1518 
GenerateHAUpdateMessage(uint8_t * msg,SessionControlBlock * scb,bool full)1519 static uint32_t GenerateHAUpdateMessage(uint8_t *msg, SessionControlBlock *scb, bool full)
1520 {
1521     StreamHAFuncsNode *node;
1522     uint32_t offset;
1523     int idx;
1524     PROFILE_VARS;
1525 
1526     PREPROC_PROFILE_START(sessionHAProducePerfStats);
1527 
1528     /* Write HA message header with a length of 0.  It will be updated at the end. */
1529     offset = WriteHAMessageHeader(HA_EVENT_UPDATE, 0, scb->key, msg);
1530     offset += WriteHASession(scb, msg + offset);
1531     for (idx = 0; idx < n_stream_ha_funcs; idx++)
1532     {
1533         /* If this is the generation of a full message, try to generate state from all registered nodes. */
1534         if ((scb->ha_pending_mask & (1 << idx)) || full)
1535         {
1536             node = stream_ha_funcs[idx];
1537             if (!node)
1538                 continue;
1539             offset += WritePreprocDataRecord(scb, node, msg + offset, (scb->ha_pending_mask & (1 << idx)) ? false : true);
1540         }
1541     }
1542     /* Update the message header's length field with the final message size. */
1543     UpdateHAMessageHeaderLength(msg, offset);
1544 
1545     PREPROC_PROFILE_END(sessionHAProducePerfStats);
1546 
1547 #ifdef DEBUG_HA_PRINT
1548     printf("Generated msg_size = %u (%zu, %zu, %zu, %zu, %zu, %zu, %zu)\n", offset, IP4_SESSION_KEY_SIZE, IP6_SESSION_KEY_SIZE,
1549             sizeof(MsgHeader), sizeof(RecordHeader), sizeof(StreamHASession), sizeof(StreamHAState), sizeof(bool));
1550 #endif
1551     return offset;
1552 }
1553 
1554 #ifdef SIDE_CHANNEL
SendSCUpdateMessage(SessionControlBlock * scb)1555 static void SendSCUpdateMessage(SessionControlBlock *scb)
1556 {
1557     SCMsgHdr *schdr;
1558     void *msg_handle;
1559     uint32_t msg_size;
1560     uint8_t *msg;
1561     int rval;
1562 
1563     /* Calculate the maximum size of the update message for preallocation. */
1564     msg_size = CalculateHAMessageSize(HA_EVENT_UPDATE, scb);
1565 
1566     /* Allocate space for the message. */
1567     if ((rval = SideChannelPreallocMessageTX(msg_size, &schdr, &msg, &msg_handle)) != 0)
1568     {
1569         /* TODO: Error stuff goes here. */
1570         return;
1571     }
1572 
1573     /* Gnerate the message. */
1574     msg_size = GenerateHAUpdateMessage(msg, scb, false);
1575 
1576     /* Send the message. */
1577     schdr->type = SC_MSG_TYPE_FLOW_STATE_TRACKING;
1578     schdr->timestamp = packet_time();
1579     SideChannelEnqueueMessageTX(schdr, msg, msg_size, msg_handle, NULL);
1580 }
1581 #endif
1582 
getHaStateDiff(SessionKey * key,const StreamHAState * old_state,StreamHAState * cur_state,bool new_session)1583 static inline uint8_t getHaStateDiff(SessionKey *key, const StreamHAState *old_state, StreamHAState *cur_state, bool new_session)
1584 {
1585     uint32_t session_flags_diff;
1586     uint8_t ha_flags = 0;
1587 
1588     /* Session creation for non-TCP sessions is a major change.  TCP sessions
1589      * hold off until they are established. */
1590     if (new_session)
1591     {
1592         ha_flags |= HA_FLAG_MODIFIED;
1593         if (key->protocol != IPPROTO_TCP)
1594             ha_flags |= HA_FLAG_MAJOR_CHANGE;
1595         return ha_flags;
1596     }
1597 
1598     session_flags_diff = ( old_state->session_flags ^ cur_state->session_flags ) & ~HA_IGNORED_SESSION_FLAGS;
1599     if( session_flags_diff )
1600     {
1601         ha_flags |= HA_FLAG_MODIFIED;
1602         if( key->protocol == IPPROTO_TCP && ( session_flags_diff & HA_TCP_MAJOR_SESSION_FLAGS ) )
1603             ha_flags |= HA_FLAG_MAJOR_CHANGE;
1604         if( session_flags_diff & HA_CRITICAL_SESSION_FLAGS )
1605             ha_flags |= HA_FLAG_CRITICAL_CHANGE;
1606     }
1607 
1608     if( old_state->ignore_direction != cur_state->ignore_direction )
1609     {
1610         ha_flags |= HA_FLAG_MODIFIED;
1611         /* If we have started ignoring both directions, that means we'll probably
1612          * try to whitelist the session.  This is a critical change since we
1613          * probably won't see another packet on the session if we're using
1614          * a DAQ module that fully supports the WHITELIST verdict. */
1615         if( cur_state->ignore_direction == SSN_DIR_BOTH )
1616             ha_flags |= HA_FLAG_CRITICAL_CHANGE;
1617     }
1618 
1619 #ifdef TARGET_BASED
1620     if( ( old_state->ipprotocol != cur_state->ipprotocol ) ||
1621         ( old_state->application_protocol != cur_state->application_protocol ) ||
1622         ( old_state->direction != cur_state->direction ) )
1623 #else
1624     if( old_state->direction != cur_state->direction )
1625 #endif
1626     {
1627         ha_flags |= HA_FLAG_MODIFIED;
1628     }
1629 
1630     return ha_flags;
1631 }
1632 
SessionProcessHA(void * ssnptr,const DAQ_PktHdr_t * pkthdr)1633 void SessionProcessHA(void *ssnptr, const DAQ_PktHdr_t *pkthdr)
1634 {
1635     struct timeval pkt_time;
1636     SessionControlBlock *scb = (SessionControlBlock *) ssnptr;
1637     uint32_t msg_size;
1638     bool debug_flag;
1639     PROFILE_VARS;
1640 
1641     PREPROC_PROFILE_START(sessionHAPerfStats);
1642 
1643     if (!scb || !session_configuration->enable_ha)
1644     {
1645         PREPROC_PROFILE_END(sessionHAPerfStats);
1646         return;
1647     }
1648 
1649     scb->ha_flags |= getHaStateDiff(scb->key, &scb->cached_ha_state, &scb->ha_state, scb->new_session);
1650     /*  Receiving traffic on a session that's in standby is a major change. */
1651     if (scb->ha_flags & HA_FLAG_STANDBY)
1652     {
1653         scb->ha_flags |= HA_FLAG_MODIFIED | HA_FLAG_MAJOR_CHANGE;
1654         scb->ha_flags &= ~HA_FLAG_STANDBY;
1655     }
1656     scb->new_session = false;
1657 
1658     if (!scb->ha_pending_mask && !(scb->ha_flags & HA_FLAG_MODIFIED))
1659     {
1660         PREPROC_PROFILE_END( sessionHAPerfStats );
1661         return;
1662     }
1663 
1664     /*
1665        For now, we are only generating messages for:
1666         (a) major and critical changes or
1667         (b) preprocessor changes on already synchronized sessions.
1668     */
1669     if (!(scb->ha_flags & (HA_FLAG_MAJOR_CHANGE | HA_FLAG_CRITICAL_CHANGE)) &&
1670          (!scb->ha_pending_mask || (scb->ha_flags & HA_FLAG_NEW)))
1671     {
1672         PREPROC_PROFILE_END(sessionHAPerfStats);
1673         return;
1674     }
1675 
1676     /* Ensure that a new flow has lived long enough for anyone to care about it
1677         and that we're not overrunning the synchronization threshold. */
1678     packet_gettimeofday(&pkt_time);
1679     if ((pkt_time.tv_sec < scb->ha_next_update.tv_sec) ||
1680         ((pkt_time.tv_sec == scb->ha_next_update.tv_sec) &&
1681           (pkt_time.tv_usec < scb->ha_next_update.tv_usec)))
1682     {
1683         /* Critical changes will be allowed to bypass the message timing restrictions. */
1684         if (!(scb->ha_flags & HA_FLAG_CRITICAL_CHANGE))
1685         {
1686             PREPROC_PROFILE_END( sessionHAPerfStats );
1687             return;
1688         }
1689         s5ha_stats.update_messages_sent_immediately++;
1690     }
1691     else
1692         s5ha_stats.update_messages_sent_normally++;
1693 
1694     debug_flag = StreamHADebugCheck(scb->key, s5_ha_debug_flag, &s5_ha_debug_info,
1695                                       s5_ha_debug_session, sizeof(s5_ha_debug_session));
1696     if (debug_flag)
1697 #ifdef TARGET_BASED
1698         LogMessage("S5HADbg Producing update message for %s - "
1699                     "SF=0x%x IPP=0x%hx AP=0x%hx DIR=%hhu IDIR=%hhu HPM=0x%hhx HF=0x%hhx\n",
1700                     s5_ha_debug_session, scb->ha_state.session_flags, scb->ha_state.ipprotocol,
1701                     scb->ha_state.application_protocol, scb->ha_state.direction,
1702                     scb->ha_state.ignore_direction, scb->ha_pending_mask, scb->ha_flags);
1703 #else
1704         LogMessage("S5HADbg Producing update message for %s - SF=0x%x DIR=%hhu IDIR=%hhu HPM=0x%hhx HF=0x%hhx\n",
1705                     s5_ha_debug_session, scb->ha_state.session_flags,
1706                     scb->ha_state.direction, scb->ha_state.ignore_direction,
1707                     scb->ha_pending_mask, scb->ha_flags);
1708 #endif
1709 
1710     /* Write out to the runtime output file. */
1711     if (runtime_output_fd >= 0)
1712     {
1713         /* Generate the incremental update message. */
1714         msg_size = GenerateHAUpdateMessage(file_io_buffer, scb, false);
1715         if (Write(runtime_output_fd, file_io_buffer, msg_size) == -1)
1716         {
1717             /* TODO: Error stuff here. */
1718             WarningMessage("(%s)(%d) Error writing HA message not handled\n", __FILE__, __LINE__);
1719         }
1720     }
1721 
1722 #ifdef HAVE_DAQ_EXT_MODFLOW
1723     /*
1724         Store via DAQ module (requires full message generation).
1725         Assume that if we are not setting binding DAQ verdicts because of external encapsulation-induced
1726           confusion that we also cannot safely set the HA state associated with this flow in the DAQ.
1727     */
1728     if (session_configuration->ha_config->use_daq && !Active_GetTunnelBypass())
1729     {
1730         /* Generate the full message. */
1731         msg_size = GenerateHAUpdateMessage(file_io_buffer, scb, true);
1732         DAQ_ModifyFlowHAState(pkthdr, file_io_buffer, msg_size);
1733     }
1734 #endif
1735 
1736 #ifdef SIDE_CHANNEL
1737     /* Send an update message over the side channel. */
1738     if (session_configuration->ha_config->use_side_channel)
1739     {
1740         SendSCUpdateMessage(scb);
1741     }
1742 #endif
1743 
1744     /* Calculate the next update threshold. */
1745     scb->ha_next_update.tv_usec += session_configuration->ha_config->min_session_lifetime.tv_usec;
1746     if (scb->ha_next_update.tv_usec > 1000000)
1747     {
1748         scb->ha_next_update.tv_usec -= 1000000;
1749         scb->ha_next_update.tv_sec++;
1750     }
1751     scb->ha_next_update.tv_sec += session_configuration->ha_config->min_session_lifetime.tv_sec;
1752 
1753     /* Clear the modified/new flags and pending preprocessor updates. */
1754     scb->ha_flags &= ~(HA_FLAG_NEW | HA_FLAG_MODIFIED | HA_FLAG_MAJOR_CHANGE | HA_FLAG_CRITICAL_CHANGE);
1755     scb->ha_pending_mask = 0;
1756 
1757     PREPROC_PROFILE_END(sessionHAPerfStats);
1758 }
1759 
1760 #ifdef SIDE_CHANNEL
StreamHASCMsgHandler(SCMsgHdr * hdr,const uint8_t * msg,uint32_t msglen)1761 static int StreamHASCMsgHandler( SCMsgHdr *hdr, const uint8_t *msg, uint32_t msglen )
1762 {
1763     int rval;
1764     PROFILE_VARS;
1765 
1766     PREPROC_PROFILE_START( sessionHAPerfStats );
1767 
1768     rval = ConsumeHAMessage( msg, msglen );
1769 
1770     PREPROC_PROFILE_END( sessionHAPerfStats );
1771 
1772     return rval;
1773 }
1774 #endif
1775 
SessionHAPostConfigInit(struct _SnortConfig * sc,int unused,void * arg)1776 void SessionHAPostConfigInit( struct _SnortConfig *sc, int unused, void *arg )
1777 {
1778     int rval;
1779 
1780     if( !session_configuration->enable_ha )
1781         return;
1782 
1783     if( session_configuration->ha_config->startup_input_file )
1784     {
1785         rval = ReadHAMessagesFromFile( session_configuration->ha_config->startup_input_file);
1786         if( rval != 0 )
1787         {
1788             ErrorMessage( "Errors were encountered while reading HA messages from file!" );
1789         }
1790     }
1791 
1792     if( session_configuration->ha_config->runtime_output_file )
1793     {
1794         runtime_output_fd = open( session_configuration->ha_config->runtime_output_file,
1795                                  O_WRONLY | O_CREAT | O_TRUNC, 0664 );
1796         if( runtime_output_fd < 0 )
1797         {
1798             FatalError( "Could not open %s for writing HA messages to: %s (%d)\n",
1799                         session_configuration->ha_config->runtime_output_file, strerror( errno ), errno );
1800         }
1801     }
1802 
1803 #ifdef REG_TEST
1804     if( session_configuration->ha_config->runtime_input_file )
1805     {
1806         runtime_input_fd = open( session_configuration->ha_config->runtime_input_file,
1807                                   O_RDONLY, 0664 );
1808         if( runtime_input_fd < 0 )
1809         {
1810             FatalError( "Could not open %s for writing HA messages to: %s (%d)\n",
1811                         session_configuration->ha_config->runtime_input_file, strerror( errno ), errno );
1812         }
1813     }
1814 #endif
1815 
1816 #ifdef SIDE_CHANNEL
1817     if( session_configuration->ha_config->use_side_channel )
1818     {
1819         rval = SideChannelRegisterRXHandler( SC_MSG_TYPE_FLOW_STATE_TRACKING, StreamHASCMsgHandler, NULL );
1820         if( rval != 0 )
1821         {
1822             /* TODO: Fatal error here or something. */
1823             ErrorMessage( "(%s)(%d) Errors were encountered registering Rx Side Channel Handler\n",
1824                     __FILE__, __LINE__ );
1825          }
1826     }
1827 #endif
1828 }
1829 
SessionCleanHA(void)1830 void SessionCleanHA( void )
1831 {
1832     int i;
1833 
1834     for (i = 0; i < n_stream_ha_funcs; i++)
1835     {
1836         if (stream_ha_funcs[i])
1837         {
1838             free(stream_ha_funcs[i]);
1839             stream_ha_funcs[i] = NULL;
1840         }
1841     }
1842     if (runtime_output_fd >= 0)
1843     {
1844         close(runtime_output_fd);
1845         runtime_output_fd = -1;
1846     }
1847 #ifdef REG_TEST
1848     if (runtime_input_fd >= 0)
1849     {
1850         close(runtime_input_fd);
1851         runtime_input_fd = -1;
1852     }
1853 #endif
1854 
1855 }
1856