1 /* $Id$ */
2 /****************************************************************************
3 *
4 * Copyright (C) 2014-2021 Cisco and/or its affiliates. All rights reserved.
5 * Copyright (C) 2012-2013 Sourcefire, Inc.
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License Version 2 as
9 * published by the Free Software Foundation. You may not use, modify or
10 * distribute this program under any other version of the GNU General
11 * Public License.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 *
22 *****************************************************************************/
23
24 /**************************************************************************
25 *
26 * stream5_ha.c
27 *
28 * Authors: Michael Altizer <maltizer@sourcefire.com>, Russ Combs <rcombs@sourcefire.com>
29 *
30 * Description:
31 *
32 * Stream high availability support.
33 *
34 **************************************************************************/
35
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <assert.h>
41
42 #include "active.h"
43 #include "mstring.h"
44 #include "packet_time.h"
45 #include "parser.h"
46 #include "sfcontrol_funcs.h"
47 #ifdef SIDE_CHANNEL
48 #include "sidechannel.h"
49 #endif
50 #include "stream5_ha.h"
51 #include "util.h"
52
53 /*
54 * Stream HA messages will have the following format:
55 *
56 * <message> ::= <header> <has-rec> <psd-rec>
57 * <header> ::= <event> version <message length> <key>
58 * <event> ::= HA_EVENT_UPDATE | HA_EVENT_DELETE
59 * <key> ::= <ipv4-key> | <ipv6-key>
60 * <ipv4-key> ::= HA_TYPE_KEY sizeof(ipv4-key) ipv4-key
61 * <ipv6-key> ::= HA_TYPE_KEY sizeof(ipv6-key) ipv6-key
62 * <has-rec> ::= HA_TYPE_HAS sizeof(has-rec) has-rec | (null)
63 * <psd-rec> ::= HA_TYPE_PSD sizeof(psd-rec) psd-preprocid psd-subcode psd-rec <psd-rec> | (null)
64 */
65
66 typedef struct _StreamHAFuncsNode
67 {
68 uint16_t id;
69 uint16_t mask;
70 uint8_t preproc_id;
71 uint8_t subcode;
72 uint8_t size;
73 StreamHAProducerFunc produce;
74 StreamHAConsumerFunc consume;
75 uint32_t produced;
76 uint32_t consumed;
77 } StreamHAFuncsNode;
78
79 typedef enum
80 {
81 HA_TYPE_KEY, // Lightweight Session Key
82 HA_TYPE_HAS, // Lightweight Session Data
83 HA_TYPE_PSD, // Preprocessor-specific Data
84 HA_TYPE_MAX
85 } HA_Type;
86
87 typedef struct _MsgHeader
88 {
89 uint8_t event;
90 uint8_t version;
91 uint16_t total_length;
92 uint8_t key_type;
93 uint8_t key_size;
94 } MsgHeader;
95
96 typedef struct _RecordHeader
97 {
98 uint8_t type;
99 uint8_t length;
100 } RecordHeader;
101
102 typedef struct _PreprocDataHeader
103 {
104 uint8_t preproc_id;
105 uint8_t subcode;
106 } PreprocDataHeader;
107
108 /* Something more will probably be added to this structure in the future... */
109 #define HA_SESSION_FLAG_LOW 0x01 // client address / port is low in key
110 #define HA_SESSION_FLAG_IP6 0x02 // key addresses are ip6
111 typedef struct
112 {
113 StreamHAState ha_state;
114 uint8_t flags;
115 } StreamHASession;
116
117 typedef struct
118 {
119 uint32_t update_messages_received;
120 uint32_t update_messages_received_no_session;
121 uint32_t delete_messages_received;
122 uint32_t update_messages_sent_immediately;
123 uint32_t update_messages_sent_normally;
124 uint32_t delete_messages_sent;
125 uint32_t delete_messages_not_sent;
126 } StreamHAStats;
127
128 typedef struct _HADebugSessionConstraints
129 {
130 int sip_flag;
131 struct in6_addr sip;
132 int dip_flag;
133 struct in6_addr dip;
134 uint16_t sport;
135 uint16_t dport;
136 uint8_t protocol;
137 } HADebugSessionConstraints;
138
139 #define MAX_STREAM_HA_FUNCS 8 // depends on sizeof(SessionControlBlock.ha_pending_mask)
140 #define HA_MESSAGE_VERSION 0x82
141 static StreamHAFuncsNode *stream_ha_funcs[MAX_STREAM_HA_FUNCS];
142 static int n_stream_ha_funcs = 0;
143 static int runtime_output_fd = -1;
144 #ifdef REG_TEST
145 static int runtime_input_fd = -1;
146 #endif
147 static uint8_t file_io_buffer[UINT16_MAX];
148 static StreamHAStats s5ha_stats;
149
150 /* Runtime debugging stuff. */
151 #define HA_DEBUG_SESSION_ID_SIZE (39+1+5+5+39+1+5+1+3+1) /* "<IPv6 address>:<port> <-> <IPv6 address>:<port> <ipproto>\0" */
152 static HADebugSessionConstraints s5_ha_debug_info;
153 static volatile int s5_ha_debug_flag = 0;
154 static char s5_ha_debug_session[HA_DEBUG_SESSION_ID_SIZE];
155 uint32_t HA_CRITICAL_SESSION_FLAGS = ( SSNFLAG_DROP_CLIENT | SSNFLAG_DROP_SERVER | SSNFLAG_RESET );
156
157 #define IP6_SESSION_KEY_SIZE sizeof(SessionKey)
158 #define IP4_SESSION_KEY_SIZE (IP6_SESSION_KEY_SIZE - 24)
159
160 #ifdef PERF_PROFILING
161 PreprocStats sessionHAPerfStats;
162 PreprocStats sessionHAConsumePerfStats;
163 PreprocStats sessionHAProducePerfStats;
164 #endif
165
166 static int ConsumeHAMessage(const uint8_t *msg, uint32_t msglen);
167
168 //--------------------------------------------------------------------
169 // Runtime debugging support.
170 //--------------------------------------------------------------------
StreamHADebugCheck(const SessionKey * key,volatile int debug_flag,HADebugSessionConstraints * info,char * debug_session,size_t debug_session_len)171 static inline bool StreamHADebugCheck(const SessionKey *key, volatile int debug_flag,
172 HADebugSessionConstraints *info, char *debug_session, size_t debug_session_len)
173 {
174 #ifndef REG_TEST
175 if (debug_flag)
176 {
177 if ((!info->protocol || info->protocol == key->protocol) &&
178 (((!info->sport || info->sport == key->port_l) &&
179 (!info->sip_flag || memcmp(&info->sip, key->ip_l, sizeof(info->sip)) == 0) &&
180 (!info->dport || info->dport == key->port_h) &&
181 (!info->dip_flag || memcmp(&info->dip, key->ip_h, sizeof(info->dip)) == 0)) ||
182 ((!info->sport || info->sport == key->port_h) &&
183 (!info->sip_flag || memcmp(&info->sip, key->ip_h, sizeof(info->sip)) == 0) &&
184 (!info->dport || info->dport == key->port_l) &&
185 (!info->dip_flag || memcmp(&info->dip, key->ip_l, sizeof(info->dip)) == 0))))
186 {
187 #endif
188 char lipstr[INET6_ADDRSTRLEN];
189 char hipstr[INET6_ADDRSTRLEN];
190
191 lipstr[0] = '\0';
192 hipstr[0] = '\0';
193 if (key->ip_l[0] || key->ip_l[1] || key->ip_l[2] != htonl(0xFFFF) ||
194 key->ip_h[0] || key->ip_h[1] || key->ip_h[2] != htonl(0xFFFF))
195 {
196 inet_ntop(AF_INET6, key->ip_l, lipstr, sizeof(lipstr));
197 inet_ntop(AF_INET6, key->ip_h, hipstr, sizeof(hipstr));
198 }
199 else
200 {
201 inet_ntop(AF_INET, &key->ip_l[3], lipstr, sizeof(lipstr));
202 inet_ntop(AF_INET, &key->ip_h[3], hipstr, sizeof(hipstr));
203 }
204 snprintf(debug_session, debug_session_len, "%s:%hu <-> %s:%hu %hhu",
205 lipstr, key->port_l, hipstr, key->port_h, key->protocol);
206 return true;
207 #ifndef REG_TEST
208 }
209 }
210
211 return false;
212 #endif
213 }
214
StreamHADebugParse(const char * desc,const uint8_t * data,uint32_t length,volatile int * debug_flag,HADebugSessionConstraints * info)215 static void StreamHADebugParse(const char *desc, const uint8_t *data, uint32_t length,
216 volatile int *debug_flag, HADebugSessionConstraints *info)
217 {
218 *debug_flag = 0;
219 memset(info, 0, sizeof(*info));
220 do
221 {
222 if (length >= sizeof(info->protocol))
223 {
224 info->protocol = *(uint8_t *)data;
225 length -= sizeof(info->protocol);
226 data += sizeof(info->protocol);
227 }
228 else
229 break;
230
231 if (length >= sizeof(info->sip))
232 {
233 if (memcmp(data + 4, info->sip.s6_addr + 4, 12) == 0)
234 {
235 if (memcmp(data, info->sip.s6_addr, 4) != 0)
236 {
237 info->dip_flag = 1;
238 info->sip.s6_addr32[0] = info->sip.s6_addr32[1] = info->sip.s6_addr16[4] = 0;
239 info->sip.s6_addr16[5] = 0xFFFF;
240 info->sip.s6_addr32[3] = *(uint32_t*)data;
241 }
242 }
243 else if (memcmp(data, info->sip.s6_addr, 4) != 0)
244 {
245 info->dip_flag = 1;
246 memcpy(&info->sip, data, sizeof(info->sip));
247 }
248 length -= sizeof(info->sip);
249 data += sizeof(info->sip);
250 }
251 else
252 break;
253
254 if (length >= sizeof(info->sport))
255 {
256 info->sport = *(uint16_t *)data;
257 length -= sizeof(info->sport);
258 data += sizeof(info->sport);
259 }
260 else
261 break;
262
263 if (length >= sizeof(info->dip))
264 {
265 if (memcmp(data + 4, info->dip.s6_addr + 4, 12) == 0)
266 {
267 if (memcmp(data, info->dip.s6_addr, 4) != 0)
268 {
269 info->dip_flag = 1;
270 info->dip.s6_addr32[0] = info->dip.s6_addr32[1] = info->dip.s6_addr16[4] = 0;
271 info->dip.s6_addr16[5] = 0xFFFF;
272 info->dip.s6_addr32[3] = *(uint32_t*)data;
273 }
274 }
275 else if (memcmp(data, info->dip.s6_addr, 4) != 0)
276 {
277 info->dip_flag = 1;
278 memcpy(&info->dip, data, sizeof(info->dip));
279 }
280 length -= sizeof(info->dip);
281 data += sizeof(info->dip);
282 }
283 else
284 break;
285
286 if (length >= sizeof(info->dport))
287 {
288 info->dport = *(uint16_t *)data;
289 length -= sizeof(info->dport);
290 data += sizeof(info->dport);
291 }
292 else
293 break;
294 } while (0);
295
296 if (info->protocol || info->sip_flag || info->sport || info->dip_flag || info->dport)
297 {
298 char sipstr[INET6_ADDRSTRLEN];
299 char dipstr[INET6_ADDRSTRLEN];
300
301 sipstr[0] = '\0';
302 if (info->sip_flag)
303 inet_ntop(AF_INET6, &info->sip, sipstr, sizeof(sipstr));
304 else
305 snprintf(sipstr, sizeof(sipstr), "any");
306
307 dipstr[0] = '\0';
308 if (info->dip_flag)
309 inet_ntop(AF_INET6, &info->dip, dipstr, sizeof(dipstr));
310 else
311 snprintf(dipstr, sizeof(dipstr), "any");
312
313 LogMessage("Debugging %s with %s-%hu and %s-%hu %hhu\n", desc,
314 sipstr, info->sport, dipstr, info->dport, info->protocol);
315 *debug_flag = 1;
316 }
317 else
318 LogMessage("Debugging %s disabled\n", desc);
319 }
320
StreamDebugHA(uint16_t type,const uint8_t * data,uint32_t length,void ** new_context,char * statusBuf,int statusBuf_len)321 static int StreamDebugHA(uint16_t type, const uint8_t *data, uint32_t length, void **new_context, char *statusBuf, int statusBuf_len)
322 {
323 StreamHADebugParse("S5HA", data, length, &s5_ha_debug_flag, &s5_ha_debug_info);
324
325 return 0;
326 }
327
328 //--------------------------------------------------------------------
329 // Protocol-specific HA API
330 // could use an array here (and an enum instead of IPPROTO_*)
331 //--------------------------------------------------------------------
332
333 static const HA_Api *s_tcp = NULL;
334 static const HA_Api *s_udp = NULL;
335 static const HA_Api *s_ip = NULL;
336
ha_set_api(unsigned proto,const HA_Api * api)337 int ha_set_api(unsigned proto, const HA_Api *api)
338 {
339 switch (proto)
340 {
341 case IPPROTO_TCP:
342 s_tcp = api;
343 break;
344 case IPPROTO_UDP:
345 s_udp = api;
346 break;
347 case IPPROTO_IP:
348 s_ip = api;
349 break;
350 default:
351 return -1;
352 }
353 return 0;
354 }
355
ha_get_api(unsigned proto)356 static inline const HA_Api *ha_get_api(unsigned proto)
357 {
358 switch (proto)
359 {
360 case IPPROTO_TCP:
361 return s_tcp;
362 case IPPROTO_UDP:
363 return s_udp;
364 case IPPROTO_ICMP:
365 case IPPROTO_IP:
366 default:
367 return s_ip;
368 }
369 return NULL;
370 }
371
ha_print_key(const SessionKey * key,int rx,uint8_t event)372 static inline void ha_print_key(const SessionKey *key, int rx, uint8_t event)
373 {
374 #if STREAM_DEBUG_ENABLED
375 char ipA[INET6_ADDRSTRLEN], ipB[INET6_ADDRSTRLEN];
376
377 if (key->ip_l[1] || key->ip_l[2] || key->ip_l[3] || key->ip_h[1] || key->ip_h[2] || key->ip_h[3])
378 {
379 sfip_raw_ntop(AF_INET6, key->ip_l, ipA, sizeof(ipA));
380 sfip_raw_ntop(AF_INET6, key->ip_h, ipB, sizeof(ipB));
381 }
382 else
383 {
384 sfip_raw_ntop(AF_INET, key->ip_l, ipA, sizeof(ipA));
385 sfip_raw_ntop(AF_INET, key->ip_h, ipB, sizeof(ipB));
386 }
387 LogMessage("%s flow %s message for %s:%hu <-> %s:%hu, Protocol %hhu\n",
388 rx ? "Receiving" : "Sending", (event == HA_EVENT_DELETE) ? "deletion" : "update",
389 ipA, key->port_l, ipB, key->port_h, key->protocol);
390 #endif
391 }
392
RegisterSessionHAFuncs(uint32_t preproc_id,uint8_t subcode,uint8_t size,StreamHAProducerFunc produce,StreamHAConsumerFunc consume)393 int RegisterSessionHAFuncs(uint32_t preproc_id, uint8_t subcode, uint8_t size,
394 StreamHAProducerFunc produce, StreamHAConsumerFunc consume)
395 {
396 StreamHAFuncsNode *node;
397 int i, idx;
398
399 if (produce == NULL || consume == NULL)
400 {
401 FatalError("One must be both a producer and a consumer to participate in Stream HA!\n");
402 }
403
404 if (preproc_id > UINT8_MAX)
405 {
406 FatalError("Preprocessor ID must be between 0 and %d to participate in Stream HA!\n", UINT8_MAX);
407 }
408
409 idx = n_stream_ha_funcs;
410 for (i = 0; i < n_stream_ha_funcs; i++)
411 {
412 node = stream_ha_funcs[i];
413 if (node)
414 {
415 if (preproc_id == node->preproc_id && subcode == node->subcode)
416 {
417 FatalError("Duplicate Stream HA registration attempt for preprocessor %hu with subcode %hu\n",
418 (unsigned short)node->preproc_id, (unsigned short)node->subcode);
419 }
420 }
421 else if (idx == n_stream_ha_funcs)
422 idx = i;
423 }
424
425 if (idx == MAX_STREAM_HA_FUNCS)
426 {
427 FatalError("Attempted to register more than %d Stream HA types!\n", MAX_STREAM_HA_FUNCS);
428 }
429
430 if (idx == n_stream_ha_funcs)
431 n_stream_ha_funcs++;
432
433 node = (StreamHAFuncsNode *) SnortAlloc(sizeof(StreamHAFuncsNode));
434 node->id = idx;
435 node->mask = (1 << idx);
436 node->preproc_id = (uint8_t) preproc_id;
437 node->subcode = subcode;
438 node->size = size;
439 node->produce = produce;
440 node->consume = consume;
441
442 stream_ha_funcs[idx] = node;
443
444 LogMessage("StreamHA: Registered node %hu for preprocessor ID %hhu with subcode %hhu (size %hhu)\n",
445 node->id, node->preproc_id, node->subcode, node->size);
446
447 return idx;
448 }
449
UnregisterSessionHAFuncs(uint32_t preproc_id,uint8_t subcode)450 void UnregisterSessionHAFuncs(uint32_t preproc_id, uint8_t subcode)
451 {
452 StreamHAFuncsNode *node;
453 int i;
454
455 for (i = 0; i < n_stream_ha_funcs; i++)
456 {
457 node = stream_ha_funcs[i];
458 if (node && preproc_id == node->preproc_id && subcode == node->subcode)
459 {
460 stream_ha_funcs[i] = NULL;
461 free(node);
462 break;
463 }
464 }
465
466 if ((i + 1) == n_stream_ha_funcs)
467 n_stream_ha_funcs--;
468 }
469
SessionSetHAPendingBit(void * ssnptr,int bit)470 void SessionSetHAPendingBit(void *ssnptr, int bit)
471 {
472 SessionControlBlock *scb = (SessionControlBlock*) ssnptr;
473
474 if (!scb)
475 return;
476
477 if (bit >= n_stream_ha_funcs || !stream_ha_funcs[bit])
478 {
479 FatalError("Attempted to set illegal HA pending bit %d!\n", bit);
480 }
481
482 scb->ha_pending_mask |= (1 << bit);
483 }
484
StreamParseHAArgs(SnortConfig * sc,SessionHAConfig * config,char * args)485 static void StreamParseHAArgs(SnortConfig *sc, SessionHAConfig *config, char *args)
486 {
487 char **toks;
488 int num_toks;
489 int i;
490 char **stoks = NULL;
491 int s_toks;
492 char *endPtr = NULL;
493 unsigned long int value;
494
495 if (config == NULL)
496 return;
497
498 if ((args == NULL) || (strlen(args) == 0))
499 return;
500
501 toks = mSplit(args, ",", 0, &num_toks, 0);
502
503 for (i = 0; i < num_toks; i++)
504 {
505 stoks = mSplit(toks[i], " ", 2, &s_toks, 0);
506
507 if (s_toks == 0)
508 {
509 FatalError("%s(%d) => Missing parameter in Stream HA config.\n",
510 file_name, file_line);
511 }
512
513 if (!strcmp(stoks[0], "min_session_lifetime"))
514 {
515 if (stoks[1])
516 value = strtoul(stoks[1], &endPtr, 10);
517 else
518 value = 0;
519
520 if (!stoks[1] || (endPtr == &stoks[1][0]) || *endPtr)
521 {
522 FatalError("%s(%d) => Invalid '%s' in config file. Requires integer parameter.\n",
523 file_name, file_line, stoks[0]);
524 }
525
526 if (value > UINT16_MAX)
527 {
528 FatalError("%s(%d) => '%s %lu' invalid: value must be between 0 and %d milliseconds.\n",
529 file_name, file_line, stoks[0], value, UINT16_MAX);
530 }
531
532 config->min_session_lifetime.tv_sec = 0;
533 while (value >= 1000)
534 {
535 config->min_session_lifetime.tv_sec++;
536 value -= 1000;
537 }
538 config->min_session_lifetime.tv_usec = value * 1000;
539 }
540 else if (!strcmp(stoks[0], "min_sync_interval"))
541 {
542 if (stoks[1])
543 value = strtoul(stoks[1], &endPtr, 10);
544 else
545 value = 0;
546
547 if (!stoks[1] || (endPtr == &stoks[1][0]) || *endPtr)
548 {
549 FatalError("%s(%d) => Invalid '%s' in config file. Requires integer parameter.\n",
550 file_name, file_line, stoks[0]);
551 }
552
553 if (value > UINT16_MAX)
554 {
555 FatalError("%s(%d) => '%s %lu' invalid: value must be between 0 and %d milliseconds.\n",
556 file_name, file_line, stoks[0], value, UINT16_MAX);
557 }
558
559 config->min_sync_interval.tv_sec = 0;
560 while (value >= 1000)
561 {
562 config->min_sync_interval.tv_sec++;
563 value -= 1000;
564 }
565 config->min_sync_interval.tv_usec = value * 1000;
566 }
567 else if (!strcmp(stoks[0], "startup_input_file"))
568 {
569 if (!stoks[1])
570 {
571 FatalError("%s(%d) => '%s' missing an argument\n", file_name, file_line, stoks[0]);
572 }
573 if (config->startup_input_file)
574 {
575 FatalError("%s(%d) => '%s' specified multiple times\n", file_name, file_line, stoks[0]);
576 }
577 config->startup_input_file = SnortStrdup(stoks[1]);
578 }
579 else if (!strcmp(stoks[0], "runtime_output_file"))
580 {
581 if (!stoks[1])
582 {
583 FatalError("%s(%d) => '%s' missing an argument\n", file_name, file_line, stoks[0]);
584 }
585 if (config->runtime_output_file)
586 {
587 FatalError("%s(%d) => '%s' specified multiple times\n", file_name, file_line, stoks[0]);
588 }
589 config->runtime_output_file = SnortStrdup(stoks[1]);
590 }
591 else if (!strcmp(stoks[0], "shutdown_output_file"))
592 {
593 if (!stoks[1])
594 {
595 FatalError("%s(%d) => '%s' missing an argument\n", file_name, file_line, stoks[0]);
596 }
597 if (config->shutdown_output_file)
598 {
599 FatalError("%s(%d) => '%s' specified multiple times\n", file_name, file_line, stoks[0]);
600 }
601 config->shutdown_output_file = SnortStrdup(stoks[1]);
602 }
603 else if (!strcmp(stoks[0], "use_side_channel"))
604 {
605 #ifdef SIDE_CHANNEL
606 if (!sc->side_channel_config.enabled)
607 {
608 FatalError("%s(%d) => '%s' cannot be specified without enabling the Snort side channel.\n",
609 file_name, file_line, stoks[0]);
610 }
611 config->use_side_channel = 1;
612 #else
613 FatalError("%s(%d) => Snort has been compiled without Side Channel support.\n", file_name, file_line);
614 #endif
615 }
616 else if (!strcmp(stoks[0], "use_daq"))
617 {
618 #ifdef HAVE_DAQ_EXT_MODFLOW
619 config->use_daq = 1;
620 #else
621 FatalError("%s(%d) => Snort has been compiled against a LibDAQ version without extended flow modifier support.\n",
622 file_name, file_line);
623 #endif
624 }
625 else
626 {
627 FatalError("%s(%d) => Invalid Stream HA config option '%s'\n",
628 file_name, file_line, stoks[0]);
629 }
630
631 mSplitFree(&stoks, s_toks);
632 }
633
634 mSplitFree(&toks, num_toks);
635 #ifdef REG_TEST
636 if(sc->ha_out)
637 {
638 if(config->runtime_output_file)
639 free(config->runtime_output_file);
640 config->runtime_output_file = SnortStrdup(sc->ha_out);
641 }
642 if(sc->ha_in)
643 {
644 if(config->startup_input_file)
645 free(config->startup_input_file);
646 config->startup_input_file = SnortStrdup(sc->ha_in);
647 }
648 if(sc->ha_pdts_in)
649 {
650 if(config->runtime_input_file)
651 free(config->runtime_input_file);
652 config->runtime_input_file = SnortStrdup(sc->ha_pdts_in);
653 }
654 #endif
655
656 }
657
StreamPrintHAConfig(SessionHAConfig * config)658 static void StreamPrintHAConfig(SessionHAConfig *config)
659 {
660 if (config == NULL)
661 return;
662
663 LogMessage("Stream HA config:\n");
664 LogMessage(" Minimum Session Lifetime: %lu milliseconds\n",
665 config->min_session_lifetime.tv_sec * 1000 + config->min_session_lifetime.tv_usec / 1000);
666 LogMessage(" Minimum Sync Interval: %lu milliseconds\n",
667 config->min_sync_interval.tv_sec * 1000 + config->min_sync_interval.tv_usec / 1000);
668 if (config->startup_input_file)
669 LogMessage(" Startup Input File: %s\n", config->startup_input_file);
670 if (config->runtime_output_file)
671 LogMessage(" Runtime Output File: %s\n", config->runtime_output_file);
672 if (config->shutdown_output_file)
673 LogMessage(" Shutdown Output File: %s\n", config->shutdown_output_file);
674 #ifdef SIDE_CHANNEL
675 LogMessage(" Using Side Channel: %s\n", config->use_side_channel ? "Yes" : "No");
676 #endif
677 LogMessage(" Using DAQ: %s\n", config->use_daq ? "Yes" : "No");
678 #ifdef REG_TEST
679 LogMessage(" Stream LWS HA Data Size: %zu\n", sizeof(StreamHASession));
680 #endif
681 }
682
SessionPrintHAStats(void)683 void SessionPrintHAStats(void)
684 {
685 StreamHAFuncsNode *node;
686 int i;
687
688 LogMessage(" High Availability\n");
689 LogMessage(" Updates Received: %u\n", s5ha_stats.update_messages_received);
690 LogMessage("Updates Received (No Session): %u\n", s5ha_stats.update_messages_received_no_session);
691 LogMessage(" Deletions Received: %u\n", s5ha_stats.delete_messages_received);
692 LogMessage(" Updates Sent Immediately: %u\n", s5ha_stats.update_messages_sent_immediately);
693 LogMessage(" Updates Sent Normally: %u\n", s5ha_stats.update_messages_sent_normally);
694 LogMessage(" Deletions Sent: %u\n", s5ha_stats.delete_messages_sent);
695 LogMessage(" Deletions Not Sent: %u\n", s5ha_stats.delete_messages_not_sent);
696 for (i = 0; i < n_stream_ha_funcs; i++)
697 {
698 node = stream_ha_funcs[i];
699 if (!node)
700 continue;
701 LogMessage(" Node %hhu/%hhu: %u produced, %u consumed\n",
702 node->preproc_id, node->subcode, node->produced, node->consumed);
703 }
704 }
705
SessionResetHAStats(void)706 void SessionResetHAStats(void)
707 {
708 memset(&s5ha_stats, 0, sizeof(s5ha_stats));
709 }
710
711 #ifdef HAVE_DAQ_EXT_MODFLOW
SessionHAMetaEval(int type,const uint8_t * data)712 static void SessionHAMetaEval(int type, const uint8_t *data)
713 {
714 DAQ_HA_State_Data_t *daqState;
715
716 if (type != DAQ_METAHDR_TYPE_HA_STATE)
717 return;
718
719 daqState = (DAQ_HA_State_Data_t *) data;
720
721 if (daqState->length == 0 || daqState->length >= UINT16_MAX || !daqState->data)
722 return;
723
724 /* Ignore the return value from consuming the message - it will print out
725 errors and there's nothing we can do about it. */
726 ConsumeHAMessage(daqState->data, daqState->length);
727 }
728 #endif
729
730 #ifdef HAVE_DAQ_QUERYFLOW
731 #ifdef REG_TEST
SessionHAQueryDAQState(DAQ_PktHdr_t * pkthdr)732 int SessionHAQueryDAQState( DAQ_PktHdr_t *pkthdr)
733 #else
734 int SessionHAQueryDAQState(const DAQ_PktHdr_t *pkthdr)
735 #endif
736 {
737 DAQ_QueryFlow_t query;
738 DAQ_HA_State_Data_t daqHAState;
739 int rval;
740
741 query.type = DAQ_QUERYFLOW_TYPE_HA_STATE;
742 query.length = sizeof(daqHAState);
743 query.value = &daqHAState;
744
745 #ifdef REG_TEST
746 pkthdr->priv_ptr = &runtime_input_fd;
747 #endif
748
749 if ((rval = DAQ_QueryFlow(pkthdr, &query)) == DAQ_SUCCESS)
750 {
751 if (daqHAState.length == 0 || daqHAState.length >= UINT16_MAX || !daqHAState.data)
752 return -EINVAL;
753 rval = ConsumeHAMessage(daqHAState.data, daqHAState.length);
754 }
755
756 return rval;
757 }
758 #endif
759
SessionHAInit(struct _SnortConfig * sc,char * args)760 void SessionHAInit( struct _SnortConfig *sc, char *args )
761 {
762 if( session_configuration == NULL )
763 FatalError("Tried to config Session HA policy without core Session config!\n");
764
765 if( session_configuration->ha_config != NULL )
766 FatalError("%s(%d) ==> Cannot duplicate Sesssion HA configuration\n", file_name, file_line);
767
768 // if HA not enabled for session, then we are out of here...
769 if( !session_configuration->enable_ha )
770 return;
771
772 session_configuration->ha_config = ( SessionHAConfig * ) SnortAlloc( sizeof( SessionHAConfig ) );
773 StreamParseHAArgs(sc, session_configuration->ha_config, args);
774
775 #ifdef PERF_PROFILING
776 RegisterPreprocessorProfile("sessionHAProduce", &sessionHAProducePerfStats, 2, &sessionHAPerfStats, NULL);
777 RegisterPreprocessorProfile("sessionHAConsume", &sessionHAConsumePerfStats, 0, &totalPerfStats, NULL);
778 #endif
779
780 ControlSocketRegisterHandler(CS_TYPE_DEBUG_STREAM_HA, StreamDebugHA, NULL, NULL);
781 LogMessage("Registered StreamHA Debug control socket message type (0x%x)\n", CS_TYPE_DEBUG_STREAM_HA);
782
783 #ifdef HAVE_DAQ_EXT_MODFLOW
784 if (session_configuration->ha_config->use_daq)
785 {
786 AddFuncToPreprocMetaEvalList(sc, SessionHAMetaEval, PP_SESSION_PRIORITY, PP_SESSION);
787 //Do not send delete notification on RST when there is a underlying data plane.
788 HA_CRITICAL_SESSION_FLAGS &= ~SSNFLAG_RESET;
789 }
790 #endif
791
792 StreamPrintHAConfig(session_configuration->ha_config);
793 }
794
795 #if defined(SNORT_RELOAD)
SessionHAReload(struct _SnortConfig * sc,char * args,void ** new_config)796 void SessionHAReload(struct _SnortConfig *sc, char *args, void **new_config)
797 {
798 SessionHAConfig *session_ha_config = ( SessionHAConfig * ) *new_config;
799
800 // session ha config is only in default policy... just return if not default
801 if( getParserPolicy( sc ) != getDefaultPolicy( ) )
802 return;
803
804 // if HA not enabled for session, then we are out of here...
805 if( !session_configuration->enable_ha )
806 return;
807
808 if ( session_ha_config == NULL )
809 {
810 session_ha_config = ( SessionHAConfig * ) SnortAlloc( sizeof( SessionHAConfig ) );
811 if ( session_ha_config == NULL )
812 FatalError("Failed to allocate storage for Session HA configuration.\n");
813
814 StreamParseHAArgs( sc, session_ha_config, args );
815 StreamPrintHAConfig( session_ha_config );
816 *new_config = session_ha_config;
817
818 }
819 else
820 {
821 FatalError("%s(%d) ==> Cannot duplicate Sesssion HA configuration\n", file_name, file_line);
822 }
823 }
824 #endif
825
SessionVerifyHAConfig(struct _SnortConfig * sc,void * swap_config)826 int SessionVerifyHAConfig(struct _SnortConfig *sc, void *swap_config)
827 {
828 if (swap_config == NULL)
829 return -1;
830
831 return 0;
832 }
833
SessionHASwapReload(struct _SnortConfig * sc,void * data)834 void *SessionHASwapReload( struct _SnortConfig *sc, void *data )
835 {
836 session_configuration->ha_config = ( SessionHAConfig * ) data;
837 return NULL;
838 }
839
840
SessionHAConfigFree(void * data)841 void SessionHAConfigFree( void *data )
842 {
843 SessionHAConfig *config = ( SessionHAConfig * ) data;
844
845 if (config == NULL)
846 return;
847
848 if (config->startup_input_file)
849 free(config->startup_input_file);
850
851 if (config->runtime_output_file)
852 free(config->runtime_output_file);
853
854 if (config->shutdown_output_file)
855 free(config->shutdown_output_file);
856 #ifdef REG_TEST
857 if (config->runtime_input_file)
858 free(config->runtime_input_file);
859 #endif
860
861 free(config);
862 }
863
864 // This MUST have the exact same logic as createSessionKeyFromPktHeader()
IsClientLower(const sfaddr_t * cltIP,uint16_t cltPort,const sfaddr_t * srvIP,uint16_t srvPort,char proto)865 static inline bool IsClientLower(const sfaddr_t *cltIP, uint16_t cltPort,
866 const sfaddr_t *srvIP, uint16_t srvPort, char proto)
867 {
868 if (sfip_fast_lt6(cltIP, srvIP))
869 return true;
870
871 if (sfip_fast_lt6(srvIP, cltIP))
872 return false;
873
874 switch (proto)
875 {
876 case IPPROTO_TCP:
877 case IPPROTO_UDP:
878 if (cltPort < srvPort)
879 return true;
880 }
881 return false;
882 }
883
DeserializeHASession(const SessionKey * key,const StreamHASession * has,SessionControlBlock * scb)884 static SessionControlBlock *DeserializeHASession(const SessionKey *key,
885 const StreamHASession *has,
886 SessionControlBlock *scb)
887 {
888 SessionControlBlock *retSsn;
889 int family;
890
891 if (!scb)
892 {
893 const HA_Api *api;
894
895 api = ha_get_api(key->protocol);
896 retSsn = api->create_session(key);
897
898 retSsn->ha_flags &= ~HA_FLAG_NEW;
899 retSsn->ha_flags |= HA_FLAG_STANDBY;
900
901 family = (has->flags & HA_SESSION_FLAG_IP6) ? AF_INET6 : AF_INET;
902 if (has->flags & HA_SESSION_FLAG_LOW)
903 {
904 sfip_set_raw(&retSsn->server_ip, retSsn->key->ip_l, family);
905 sfip_set_raw(&retSsn->client_ip, retSsn->key->ip_h, family);
906 retSsn->server_port = retSsn->key->port_l;
907 retSsn->client_port = retSsn->key->port_h;
908 }
909 else
910 {
911 sfip_set_raw(&retSsn->client_ip, retSsn->key->ip_l, family);
912 sfip_set_raw(&retSsn->server_ip, retSsn->key->ip_h, family);
913 retSsn->client_port = retSsn->key->port_l;
914 retSsn->server_port = retSsn->key->port_h;
915 }
916 }
917 else
918 retSsn = scb;
919
920 retSsn->ha_state = has->ha_state;
921
922 return retSsn;
923 }
924
DeserializePreprocData(uint8_t event,SessionControlBlock * scb,uint8_t preproc_id,uint8_t subcode,const uint8_t * data,uint8_t length)925 static inline int DeserializePreprocData(uint8_t event, SessionControlBlock *scb, uint8_t preproc_id,
926 uint8_t subcode, const uint8_t *data, uint8_t length)
927 {
928 StreamHAFuncsNode *node;
929 int i;
930
931 for (i = 0; i < n_stream_ha_funcs; i++)
932 {
933 node = stream_ha_funcs[i];
934 if (node && preproc_id == node->preproc_id && subcode == node->subcode)
935 {
936 if (node->size < length)
937 {
938 ErrorMessage("Stream HA preprocessor data record's length exceeds expected size! (%u vs %u)\n",
939 length, node->size);
940 return -1;
941 }
942 node->consumed++;
943 return node->consume(scb, data, length);
944 }
945 }
946
947 ErrorMessage("Stream HA preprocessor data record received with unrecognized preprocessor ID/subcode! (%hhu:%hhu)\n",
948 preproc_id, subcode);
949 return -1;
950 }
951
ConsumeHAMessage(const uint8_t * msg,uint32_t msglen)952 static int ConsumeHAMessage(const uint8_t *msg, uint32_t msglen)
953 {
954 const HA_Api *api;
955 StreamHASession *has;
956 SessionControlBlock *scb;
957 SessionKey key;
958 MsgHeader *msg_hdr;
959 RecordHeader *rec_hdr;
960 PreprocDataHeader *psd_hdr;
961 uint32_t offset;
962 bool debug_flag;
963 int rval = 1;
964 PROFILE_VARS;
965
966 PREPROC_PROFILE_START(sessionHAConsumePerfStats);
967
968 /* Read the message header */
969 if (msglen < sizeof(*msg_hdr))
970 {
971 ErrorMessage("Stream HA message length shorter than header length! (%u)\n", msglen);
972 goto consume_exit;
973 }
974 msg_hdr = (MsgHeader *) msg;
975 offset = sizeof(*msg_hdr);
976
977 if (msg_hdr->total_length != msglen)
978 {
979 ErrorMessage("Stream HA message header's total length does not match actual length! (%u vs %u)\n",
980 msg_hdr->total_length, msglen);
981 goto consume_exit;
982 }
983
984 if (msg_hdr->version != HA_MESSAGE_VERSION)
985 {
986 ErrorMessage("Stream HA message has unrecognized version: %hhu!\n", msg_hdr->version);
987 goto consume_exit;
988 }
989
990 if (msg_hdr->event != HA_EVENT_UPDATE && msg_hdr->event != HA_EVENT_DELETE)
991 {
992 ErrorMessage("Stream HA message has unknown event type: %hhu!\n", msg_hdr->event);
993 goto consume_exit;
994 }
995
996 /* Read the key */
997 if (msg_hdr->key_size == IP4_SESSION_KEY_SIZE) /* IPv4, miniature key */
998 {
999 /* Lower IPv4 address */
1000 memcpy(&key.ip_l[3], msg + offset, 4);
1001 key.ip_l[0] = key.ip_l[1] = 0;
1002 key.ip_l[2] = htonl(0xFFFF);
1003 offset += 4;
1004 /* Higher IPv4 address */
1005 memcpy(&key.ip_h[3], msg + offset, 4);
1006 key.ip_h[0] = key.ip_h[1] = 0;
1007 key.ip_h[2] = htonl(0xFFFF);
1008 offset += 4;
1009 /* The remainder of the key */
1010 memcpy(((uint8_t *) &key) + 32, msg + offset, IP4_SESSION_KEY_SIZE - 8);
1011 offset += IP4_SESSION_KEY_SIZE - 8;
1012 }
1013 else if (msg_hdr->key_size == IP6_SESSION_KEY_SIZE) /* IPv6, full-size key */
1014 {
1015 memcpy(&key, msg + offset, IP6_SESSION_KEY_SIZE);
1016 offset += IP6_SESSION_KEY_SIZE;
1017 }
1018 else
1019 {
1020 ErrorMessage("Stream HA message has unrecognized key size: %hhu!\n", msg_hdr->key_size);
1021 goto consume_exit;
1022 }
1023
1024 debug_flag = StreamHADebugCheck(&key, s5_ha_debug_flag, &s5_ha_debug_info, s5_ha_debug_session, sizeof(s5_ha_debug_session));
1025
1026 api = ha_get_api(key.protocol);
1027 if (!api)
1028 {
1029 ErrorMessage("Stream HA message has unhandled protocol: %u!\n", key.protocol);
1030 goto consume_exit;
1031 }
1032
1033 if (msg_hdr->event == HA_EVENT_DELETE)
1034 {
1035 if (debug_flag)
1036 LogMessage("S5HADbg Consuming deletion message for %s\n", s5_ha_debug_session);
1037 if (offset != msglen)
1038 {
1039 ErrorMessage("Stream HA deletion message contains extraneous data! (%u bytes)\n", msglen - offset);
1040 goto consume_exit;
1041 }
1042 s5ha_stats.delete_messages_received++;
1043 rval = api->delete_session(&key);
1044 if (debug_flag)
1045 {
1046 if (!rval)
1047 LogMessage("S5HADbg Deleted LWSession for %s\n", s5_ha_debug_session);
1048 else
1049 LogMessage("S5HADbg Could not delete LWSession for %s\n", s5_ha_debug_session);
1050 }
1051 goto consume_exit;
1052 }
1053
1054 if (debug_flag)
1055 LogMessage("S5HADbg Consuming update message for %s\n", s5_ha_debug_session);
1056
1057 if (pkt_trace_enabled)
1058 addPktTraceData(VERDICT_REASON_NO_BLOCK, snprintf(trace_line, MAX_TRACE_LINE,
1059 "Recovered session \n"));
1060
1061 scb = api->get_lws(&key);
1062
1063 /* Read any/all records. */
1064 while (offset < msglen)
1065 {
1066 if (sizeof(*rec_hdr) > (msglen - offset))
1067 {
1068 ErrorMessage("Stream HA message contains a truncated record header! (%zu vs %u)\n",
1069 sizeof(*rec_hdr), msglen - offset);
1070 goto consume_exit;
1071 }
1072 rec_hdr = (RecordHeader *) (msg + offset);
1073 offset += sizeof(*rec_hdr);
1074
1075 switch (rec_hdr->type)
1076 {
1077 case HA_TYPE_HAS:
1078 if (rec_hdr->length != sizeof(*has))
1079 {
1080 ErrorMessage("Stream HA message contains incorrectly size HA Session record! (%u vs %zu)\n",
1081 rec_hdr->length, sizeof(*has));
1082 goto consume_exit;
1083 }
1084 if (rec_hdr->length > (msglen - offset))
1085 {
1086 ErrorMessage("Stream HA message contains truncated HA Session record data! (%u vs %u)\n",
1087 rec_hdr->length, msglen - offset);
1088 goto consume_exit;
1089 }
1090 has = (StreamHASession *) (msg + offset);
1091 offset += rec_hdr->length;
1092 if (debug_flag)
1093 {
1094 #ifdef TARGET_BASED
1095 LogMessage("S5HADbg %s Session for %s - SF=0x%x IPP=0x%hx AP=0x%hx DIR=%hhu IDIR=%hhu\n",
1096 (scb) ? "Updating" : "Creating", s5_ha_debug_session, has->ha_state.session_flags,
1097 has->ha_state.ipprotocol, has->ha_state.application_protocol,
1098 has->ha_state.direction, has->ha_state.ignore_direction);
1099 #else
1100 LogMessage("S5HADbg %s LWSession for %s - SF=0x%x DIR=%hhu IDIR=%hhu\n",
1101 (lwssn) ? "Updating" : "Creating", s5_ha_debug_session, has->ha_state.session_flags,
1102 has->ha_state.direction, has->ha_state.ignore_direction);
1103 #endif
1104 }
1105 if(!scb)
1106 {
1107 if (has->ha_state.session_flags & SSNFLAG_COUNTED_CLOSING)
1108 sfBase.iSessionsClosing++;
1109 else if (has->ha_state.session_flags & SSNFLAG_COUNTED_ESTABLISH)
1110 sfBase.iSessionsEstablished++;
1111 else if (has->ha_state.session_flags & SSNFLAG_COUNTED_INITIALIZE)
1112 sfBase.iSessionsInitializing++;
1113 }
1114 scb = DeserializeHASession(&key, has, scb);
1115 scb->session_established = true;
1116 break;
1117
1118 case HA_TYPE_PSD:
1119 if (!scb)
1120 {
1121 //ErrorMessage("Stream HA message with preprocessor data record received for non-existent session!\n");
1122 s5ha_stats.update_messages_received_no_session++;
1123 goto consume_exit;
1124 }
1125 if (sizeof(*psd_hdr) > (msglen - offset))
1126 {
1127 ErrorMessage("Stream HA message contains a truncated preprocessor data record header! (%zu vs %u)\n",
1128 sizeof(*psd_hdr), msglen - offset);
1129 goto consume_exit;
1130 }
1131 psd_hdr = (PreprocDataHeader *) (msg + offset);
1132 offset += sizeof(*psd_hdr);
1133 if (rec_hdr->length > (msglen - offset))
1134 {
1135 ErrorMessage("Stream HA message contains truncated preprocessor data record data! (%u vs %u)\n",
1136 rec_hdr->length, msglen - offset);
1137 goto consume_exit;
1138 }
1139 if (debug_flag)
1140 {
1141 LogMessage("S5HADbg Consuming %hhu byte preprocessor data record for %s with PPID=%hhu and SC=%hhu\n",
1142 rec_hdr->length, s5_ha_debug_session, psd_hdr->preproc_id, psd_hdr->subcode);
1143 }
1144 if (DeserializePreprocData(msg_hdr->event, scb, psd_hdr->preproc_id, psd_hdr->subcode,
1145 msg + offset, rec_hdr->length) != 0)
1146 {
1147 ErrorMessage("Stream HA message contained invalid preprocessor data record!\n");
1148 goto consume_exit;
1149 }
1150 offset += rec_hdr->length;
1151 break;
1152
1153 default:
1154 ErrorMessage("Stream HA message contains unrecognized record type: %hhu!\n", rec_hdr->type);
1155 goto consume_exit;
1156 }
1157 }
1158 /* Mark the session as being in standby mode since we just received an update. */
1159 if (scb && !(scb->ha_flags & HA_FLAG_STANDBY))
1160 {
1161 if (api->deactivate_session)
1162 api->deactivate_session(scb);
1163 scb->ha_flags |= HA_FLAG_STANDBY;
1164 }
1165
1166 s5ha_stats.update_messages_received++;
1167 rval = 0;
1168
1169 consume_exit:
1170 PREPROC_PROFILE_END(sessionHAConsumePerfStats);
1171 return rval;
1172 }
1173
1174 /*
1175 * File I/O
1176 */
Read(int fd,void * buf,size_t count)1177 static inline ssize_t Read(int fd, void *buf, size_t count)
1178 {
1179 ssize_t n;
1180 errno = 0;
1181
1182 while ((n = read(fd, buf, count)) <= (ssize_t) count)
1183 {
1184 if (n == (ssize_t) count)
1185 return 0;
1186
1187 if (n > 0)
1188 {
1189 buf = (uint8_t *) buf + n;
1190 count -= n;
1191 }
1192 else if (n == 0)
1193 break;
1194 else if (errno != EINTR)
1195 {
1196 ErrorMessage("Error reading from Stream HA message file: %s (%d)\n", strerror(errno), errno);
1197 break;
1198 }
1199 }
1200 return -1;
1201 }
1202
ReadHAMessagesFromFile(const char * filename)1203 static int ReadHAMessagesFromFile(const char *filename)
1204 {
1205 MsgHeader *msg_header;
1206 uint8_t *msg;
1207 int rval, fd;
1208
1209 fd = open(filename, O_RDONLY, 0664);
1210 if (fd < 0)
1211 {
1212 FatalError("Could not open %s for reading HA messages from: %s (%d)\n", filename, strerror(errno), errno);
1213 }
1214
1215 LogMessage("Reading Stream HA messages from '%s'...\n", filename);
1216 msg = file_io_buffer;
1217 while ((rval = Read(fd, msg, sizeof(*msg_header))) == 0)
1218 {
1219 msg_header = (MsgHeader *) msg;
1220 if (msg_header->total_length < sizeof(*msg_header))
1221 {
1222 ErrorMessage("Stream HA Message total length (%hu) is way too short!\n", msg_header->total_length);
1223 close(fd);
1224 return -1;
1225 }
1226 else if (msg_header->total_length > (UINT16_MAX - sizeof(*msg_header)))
1227 {
1228 ErrorMessage("Stream HA Message total length (%hu) is too long!\n", msg_header->total_length);
1229 close(fd);
1230 return -1;
1231 }
1232 else if (msg_header->total_length > sizeof(*msg_header))
1233 {
1234 if ((rval = Read(fd, msg + sizeof(*msg_header), msg_header->total_length - sizeof(*msg_header))) != 0)
1235 {
1236 ErrorMessage("Error reading the remaining %zu bytes of an HA message from file: %s (%d)\n",
1237 msg_header->total_length - sizeof(*msg_header), strerror(errno), errno);
1238 close(fd);
1239 return rval;
1240 }
1241 }
1242
1243 if ((rval = ConsumeHAMessage(msg, msg_header->total_length)) != 0)
1244 {
1245 close(fd);
1246 return rval;
1247 }
1248 }
1249 close(fd);
1250
1251 return 0;
1252 }
1253
Write(int fd,const void * buf,size_t count)1254 static inline ssize_t Write(int fd, const void *buf, size_t count)
1255 {
1256 ssize_t n;
1257 errno = 0;
1258
1259 while ((n = write(fd, buf, count)) <= (ssize_t) count)
1260 {
1261 if (n == (ssize_t) count)
1262 return 0;
1263
1264 if (n > 0)
1265 count -= n;
1266 else if (errno != EINTR)
1267 {
1268 ErrorMessage("Error writing to Stream HA message file: %s (%d)\n", strerror(errno), errno);
1269 break;
1270 }
1271 }
1272
1273 return -1;
1274 }
1275
WriteHAMessageHeader(uint8_t event,uint16_t msglen,const SessionKey * key,uint8_t * msg)1276 static uint32_t WriteHAMessageHeader(uint8_t event, uint16_t msglen, const SessionKey *key, uint8_t *msg)
1277 {
1278 MsgHeader *msg_hdr;
1279 uint32_t offset;
1280
1281 msg_hdr = (MsgHeader *) msg;
1282 offset = sizeof(*msg_hdr);
1283 msg_hdr->event = event;
1284 msg_hdr->version = HA_MESSAGE_VERSION;
1285 msg_hdr->total_length = msglen;
1286 msg_hdr->key_type = HA_TYPE_KEY;
1287
1288 if (key->ip_l[0] || key->ip_l[1] || key->ip_l[2] != htonl(0xFFFF) ||
1289 key->ip_h[0] || key->ip_h[1] || key->ip_h[2] != htonl(0xFFFF))
1290 {
1291 msg_hdr->key_size = IP6_SESSION_KEY_SIZE;
1292 memcpy(msg + offset, key, IP6_SESSION_KEY_SIZE);
1293 offset += IP6_SESSION_KEY_SIZE;
1294 }
1295 else
1296 {
1297 msg_hdr->key_size = IP4_SESSION_KEY_SIZE;
1298 memcpy(msg + offset, &key->ip_l[3], sizeof(key->ip_l[3]));
1299 offset += sizeof(key->ip_l[3]);
1300 memcpy(msg + offset, &key->ip_h[3], sizeof(key->ip_h[3]));
1301 offset += sizeof(key->ip_h[3]);
1302 memcpy(msg + offset, ((uint8_t *) key) + 32, IP4_SESSION_KEY_SIZE - 8);
1303 offset += IP4_SESSION_KEY_SIZE - 8;
1304 }
1305 return offset;
1306 }
1307
UpdateHAMessageHeaderLength(uint8_t * msg,uint16_t msglen)1308 static void UpdateHAMessageHeaderLength(uint8_t *msg, uint16_t msglen)
1309 {
1310 MsgHeader *msg_hdr;
1311
1312 msg_hdr = (MsgHeader *) msg;
1313 msg_hdr->total_length = msglen;
1314 }
1315
WriteHASession(SessionControlBlock * scb,uint8_t * msg)1316 static uint32_t WriteHASession(SessionControlBlock *scb, uint8_t *msg)
1317 {
1318 StreamHASession *has;
1319 RecordHeader *rec_hdr;
1320 uint32_t offset;
1321
1322 rec_hdr = (RecordHeader *) msg;
1323 offset = sizeof(*rec_hdr);
1324 rec_hdr->type = HA_TYPE_HAS;
1325 rec_hdr->length = sizeof(*has);
1326
1327 has = (StreamHASession *) (msg + offset);
1328 offset += sizeof(*has);
1329 has->ha_state = scb->ha_state;
1330
1331 if (!IsClientLower(&scb->client_ip, scb->client_port, &scb->server_ip, scb->server_port, scb->key->protocol))
1332 has->flags |= HA_SESSION_FLAG_LOW;
1333
1334 has->flags |= HA_SESSION_FLAG_IP6;
1335
1336 return offset;
1337 }
1338
WritePreprocDataRecord(SessionControlBlock * scb,StreamHAFuncsNode * node,uint8_t * msg,bool forced)1339 static uint32_t WritePreprocDataRecord(SessionControlBlock *scb, StreamHAFuncsNode *node, uint8_t *msg, bool forced)
1340 {
1341 RecordHeader *rec_hdr;
1342 PreprocDataHeader *psd_hdr;
1343 uint32_t offset;
1344
1345 rec_hdr = (RecordHeader *) msg;
1346 offset = sizeof(*rec_hdr);
1347 rec_hdr->type = HA_TYPE_PSD;
1348
1349 psd_hdr = (PreprocDataHeader *) (msg + offset);
1350 offset += sizeof(*psd_hdr);
1351 psd_hdr->preproc_id = node->preproc_id;
1352 psd_hdr->subcode = node->subcode;
1353
1354 rec_hdr->length = node->produce(scb, msg + offset);
1355 /* If this was a forced record generation (the preprocessor did not indicate pending HA state data), as in the case
1356 of a full HA record generation for session pickling, return a 0 offset if there is no data so that space is not
1357 wasted on the PSD header. */
1358 if (rec_hdr->length == 0 && forced)
1359 return 0;
1360 offset += rec_hdr->length;
1361 node->produced++;
1362
1363 return offset;
1364 }
1365
CalculateHAMessageSize(uint8_t event,SessionControlBlock * scb)1366 static uint32_t CalculateHAMessageSize(uint8_t event, SessionControlBlock *scb)
1367 {
1368 StreamHAFuncsNode *node;
1369 SessionKey *key;
1370 uint32_t msg_size;
1371 int idx;
1372
1373 key = scb->key;
1374
1375 /* Header (including the key). IPv4 keys are miniaturized. */
1376 msg_size = sizeof(MsgHeader);
1377 if (key->ip_l[0] || key->ip_l[1] || key->ip_l[2] != htonl(0xFFFF) ||
1378 key->ip_h[0] || key->ip_h[1] || key->ip_h[2] != htonl(0xFFFF))
1379 msg_size += IP6_SESSION_KEY_SIZE;
1380 else
1381 msg_size += IP4_SESSION_KEY_SIZE;
1382
1383 if (event == HA_EVENT_UPDATE)
1384 {
1385 /* HA Session record */
1386 //if (scb->ha_flags & HA_FLAG_MODIFIED)
1387 msg_size += sizeof(RecordHeader) + sizeof(StreamHASession);
1388
1389 /* Preprocessor data records */
1390 for (idx = 0; idx < n_stream_ha_funcs; idx++)
1391 {
1392 if (scb->ha_pending_mask & (1 << idx))
1393 {
1394 node = stream_ha_funcs[idx];
1395 if (!node)
1396 continue;
1397 msg_size += sizeof(RecordHeader) + sizeof(PreprocDataHeader) + node->size;
1398 }
1399 }
1400 }
1401
1402 #ifdef DEBUG_HA_PRINT
1403 printf("Calculated msg_size = %u (%zu, %zu, %zu, %zu, %zu, %zu, %zu)\n", msg_size, IP4_SESSION_KEY_SIZE, IP6_SESSION_KEY_SIZE,
1404 sizeof(MsgHeader), sizeof(RecordHeader), sizeof(StreamHASession), sizeof(StreamHAState), sizeof(bool));
1405 #endif
1406 return msg_size;
1407 }
1408
GenerateHADeletionMessage(uint8_t * msg,uint32_t msg_size,SessionControlBlock * scb)1409 static uint32_t GenerateHADeletionMessage(uint8_t *msg, uint32_t msg_size, SessionControlBlock *scb)
1410 {
1411 uint32_t msglen;
1412 PROFILE_VARS;
1413
1414 PREPROC_PROFILE_START(sessionHAProducePerfStats);
1415
1416 msglen = WriteHAMessageHeader(HA_EVENT_DELETE, msg_size, scb->key, msg);
1417
1418 PREPROC_PROFILE_END(sessionHAProducePerfStats);
1419
1420 return msglen;
1421 }
1422
1423 #ifdef SIDE_CHANNEL
SendSCDeletionMessage(SessionControlBlock * scb,uint32_t msg_size)1424 static void SendSCDeletionMessage(SessionControlBlock *scb, uint32_t msg_size)
1425 {
1426 SCMsgHdr *sc_hdr;
1427 void *msg_handle;
1428 uint8_t *msg;
1429 int rval;
1430
1431 /* Allocate space for the message. */
1432 if ((rval = SideChannelPreallocMessageTX(msg_size, &sc_hdr, &msg, &msg_handle)) != 0)
1433 {
1434 /* TODO: Error stuff goes here. */
1435 return;
1436 }
1437
1438 /* Generate the message. */
1439 msg_size = GenerateHADeletionMessage(msg, msg_size, scb);
1440
1441 /* Send the message. */
1442 sc_hdr->type = SC_MSG_TYPE_FLOW_STATE_TRACKING;
1443 sc_hdr->timestamp = packet_time();
1444 SideChannelEnqueueMessageTX(sc_hdr, msg, msg_size, msg_handle, NULL);
1445 }
1446 #endif
1447
SessionHANotifyDeletion(SessionControlBlock * scb)1448 void SessionHANotifyDeletion(SessionControlBlock *scb)
1449 {
1450 uint32_t msg_size;
1451 bool debug_flag;
1452 PROFILE_VARS;
1453
1454 PREPROC_PROFILE_START(sessionHAPerfStats);
1455
1456 if ( !scb )
1457 {
1458 PREPROC_PROFILE_END(sessionHAPerfStats);
1459 return;
1460 }
1461
1462 if ( !session_configuration->enable_ha )
1463 {
1464 PREPROC_PROFILE_END(sessionHAPerfStats);
1465 return;
1466 }
1467
1468 /* Don't send a deletion notice if we've never sent an update for the flow, it is in standby, or we've already sent one. */
1469 if ( scb->ha_flags & ( HA_FLAG_NEW | HA_FLAG_STANDBY | HA_FLAG_DELETED ) )
1470 {
1471 s5ha_stats.delete_messages_not_sent++;
1472 PREPROC_PROFILE_END(sessionHAPerfStats);
1473 return;
1474 }
1475
1476 #ifdef SIDE_CHANNEL
1477 if (session_configuration->ha_config->use_side_channel)
1478 {
1479 debug_flag = StreamHADebugCheck(scb->key, s5_ha_debug_flag, &s5_ha_debug_info,
1480 s5_ha_debug_session, sizeof(s5_ha_debug_session));
1481 if (debug_flag)
1482 LogMessage("S5HADbg Producing deletion message for %s\n",
1483 s5_ha_debug_session);
1484 }
1485 #endif
1486
1487 /* Calculate the size of the deletion message. */
1488 msg_size = CalculateHAMessageSize(HA_EVENT_DELETE, scb);
1489
1490 if (runtime_output_fd >= 0)
1491 {
1492 msg_size = GenerateHADeletionMessage(file_io_buffer, msg_size, scb);
1493 if (Write(runtime_output_fd, file_io_buffer, msg_size) == -1)
1494 {
1495 /* TODO: Error stuff here. */
1496 }
1497 }
1498
1499 if (session_configuration->ha_config->use_daq)
1500 {
1501 s5ha_stats.delete_messages_not_sent++;
1502 PREPROC_PROFILE_END(sessionHAPerfStats);
1503 return;
1504 }
1505
1506 #ifdef SIDE_CHANNEL
1507 if (session_configuration->ha_config->use_side_channel)
1508 {
1509 SendSCDeletionMessage(scb, msg_size);
1510 }
1511 #endif
1512
1513 s5ha_stats.delete_messages_sent++;
1514 scb->ha_flags |= HA_FLAG_DELETED;
1515
1516 PREPROC_PROFILE_END(sessionHAPerfStats);
1517 }
1518
GenerateHAUpdateMessage(uint8_t * msg,SessionControlBlock * scb,bool full)1519 static uint32_t GenerateHAUpdateMessage(uint8_t *msg, SessionControlBlock *scb, bool full)
1520 {
1521 StreamHAFuncsNode *node;
1522 uint32_t offset;
1523 int idx;
1524 PROFILE_VARS;
1525
1526 PREPROC_PROFILE_START(sessionHAProducePerfStats);
1527
1528 /* Write HA message header with a length of 0. It will be updated at the end. */
1529 offset = WriteHAMessageHeader(HA_EVENT_UPDATE, 0, scb->key, msg);
1530 offset += WriteHASession(scb, msg + offset);
1531 for (idx = 0; idx < n_stream_ha_funcs; idx++)
1532 {
1533 /* If this is the generation of a full message, try to generate state from all registered nodes. */
1534 if ((scb->ha_pending_mask & (1 << idx)) || full)
1535 {
1536 node = stream_ha_funcs[idx];
1537 if (!node)
1538 continue;
1539 offset += WritePreprocDataRecord(scb, node, msg + offset, (scb->ha_pending_mask & (1 << idx)) ? false : true);
1540 }
1541 }
1542 /* Update the message header's length field with the final message size. */
1543 UpdateHAMessageHeaderLength(msg, offset);
1544
1545 PREPROC_PROFILE_END(sessionHAProducePerfStats);
1546
1547 #ifdef DEBUG_HA_PRINT
1548 printf("Generated msg_size = %u (%zu, %zu, %zu, %zu, %zu, %zu, %zu)\n", offset, IP4_SESSION_KEY_SIZE, IP6_SESSION_KEY_SIZE,
1549 sizeof(MsgHeader), sizeof(RecordHeader), sizeof(StreamHASession), sizeof(StreamHAState), sizeof(bool));
1550 #endif
1551 return offset;
1552 }
1553
1554 #ifdef SIDE_CHANNEL
SendSCUpdateMessage(SessionControlBlock * scb)1555 static void SendSCUpdateMessage(SessionControlBlock *scb)
1556 {
1557 SCMsgHdr *schdr;
1558 void *msg_handle;
1559 uint32_t msg_size;
1560 uint8_t *msg;
1561 int rval;
1562
1563 /* Calculate the maximum size of the update message for preallocation. */
1564 msg_size = CalculateHAMessageSize(HA_EVENT_UPDATE, scb);
1565
1566 /* Allocate space for the message. */
1567 if ((rval = SideChannelPreallocMessageTX(msg_size, &schdr, &msg, &msg_handle)) != 0)
1568 {
1569 /* TODO: Error stuff goes here. */
1570 return;
1571 }
1572
1573 /* Gnerate the message. */
1574 msg_size = GenerateHAUpdateMessage(msg, scb, false);
1575
1576 /* Send the message. */
1577 schdr->type = SC_MSG_TYPE_FLOW_STATE_TRACKING;
1578 schdr->timestamp = packet_time();
1579 SideChannelEnqueueMessageTX(schdr, msg, msg_size, msg_handle, NULL);
1580 }
1581 #endif
1582
getHaStateDiff(SessionKey * key,const StreamHAState * old_state,StreamHAState * cur_state,bool new_session)1583 static inline uint8_t getHaStateDiff(SessionKey *key, const StreamHAState *old_state, StreamHAState *cur_state, bool new_session)
1584 {
1585 uint32_t session_flags_diff;
1586 uint8_t ha_flags = 0;
1587
1588 /* Session creation for non-TCP sessions is a major change. TCP sessions
1589 * hold off until they are established. */
1590 if (new_session)
1591 {
1592 ha_flags |= HA_FLAG_MODIFIED;
1593 if (key->protocol != IPPROTO_TCP)
1594 ha_flags |= HA_FLAG_MAJOR_CHANGE;
1595 return ha_flags;
1596 }
1597
1598 session_flags_diff = ( old_state->session_flags ^ cur_state->session_flags ) & ~HA_IGNORED_SESSION_FLAGS;
1599 if( session_flags_diff )
1600 {
1601 ha_flags |= HA_FLAG_MODIFIED;
1602 if( key->protocol == IPPROTO_TCP && ( session_flags_diff & HA_TCP_MAJOR_SESSION_FLAGS ) )
1603 ha_flags |= HA_FLAG_MAJOR_CHANGE;
1604 if( session_flags_diff & HA_CRITICAL_SESSION_FLAGS )
1605 ha_flags |= HA_FLAG_CRITICAL_CHANGE;
1606 }
1607
1608 if( old_state->ignore_direction != cur_state->ignore_direction )
1609 {
1610 ha_flags |= HA_FLAG_MODIFIED;
1611 /* If we have started ignoring both directions, that means we'll probably
1612 * try to whitelist the session. This is a critical change since we
1613 * probably won't see another packet on the session if we're using
1614 * a DAQ module that fully supports the WHITELIST verdict. */
1615 if( cur_state->ignore_direction == SSN_DIR_BOTH )
1616 ha_flags |= HA_FLAG_CRITICAL_CHANGE;
1617 }
1618
1619 #ifdef TARGET_BASED
1620 if( ( old_state->ipprotocol != cur_state->ipprotocol ) ||
1621 ( old_state->application_protocol != cur_state->application_protocol ) ||
1622 ( old_state->direction != cur_state->direction ) )
1623 #else
1624 if( old_state->direction != cur_state->direction )
1625 #endif
1626 {
1627 ha_flags |= HA_FLAG_MODIFIED;
1628 }
1629
1630 return ha_flags;
1631 }
1632
SessionProcessHA(void * ssnptr,const DAQ_PktHdr_t * pkthdr)1633 void SessionProcessHA(void *ssnptr, const DAQ_PktHdr_t *pkthdr)
1634 {
1635 struct timeval pkt_time;
1636 SessionControlBlock *scb = (SessionControlBlock *) ssnptr;
1637 uint32_t msg_size;
1638 bool debug_flag;
1639 PROFILE_VARS;
1640
1641 PREPROC_PROFILE_START(sessionHAPerfStats);
1642
1643 if (!scb || !session_configuration->enable_ha)
1644 {
1645 PREPROC_PROFILE_END(sessionHAPerfStats);
1646 return;
1647 }
1648
1649 scb->ha_flags |= getHaStateDiff(scb->key, &scb->cached_ha_state, &scb->ha_state, scb->new_session);
1650 /* Receiving traffic on a session that's in standby is a major change. */
1651 if (scb->ha_flags & HA_FLAG_STANDBY)
1652 {
1653 scb->ha_flags |= HA_FLAG_MODIFIED | HA_FLAG_MAJOR_CHANGE;
1654 scb->ha_flags &= ~HA_FLAG_STANDBY;
1655 }
1656 scb->new_session = false;
1657
1658 if (!scb->ha_pending_mask && !(scb->ha_flags & HA_FLAG_MODIFIED))
1659 {
1660 PREPROC_PROFILE_END( sessionHAPerfStats );
1661 return;
1662 }
1663
1664 /*
1665 For now, we are only generating messages for:
1666 (a) major and critical changes or
1667 (b) preprocessor changes on already synchronized sessions.
1668 */
1669 if (!(scb->ha_flags & (HA_FLAG_MAJOR_CHANGE | HA_FLAG_CRITICAL_CHANGE)) &&
1670 (!scb->ha_pending_mask || (scb->ha_flags & HA_FLAG_NEW)))
1671 {
1672 PREPROC_PROFILE_END(sessionHAPerfStats);
1673 return;
1674 }
1675
1676 /* Ensure that a new flow has lived long enough for anyone to care about it
1677 and that we're not overrunning the synchronization threshold. */
1678 packet_gettimeofday(&pkt_time);
1679 if ((pkt_time.tv_sec < scb->ha_next_update.tv_sec) ||
1680 ((pkt_time.tv_sec == scb->ha_next_update.tv_sec) &&
1681 (pkt_time.tv_usec < scb->ha_next_update.tv_usec)))
1682 {
1683 /* Critical changes will be allowed to bypass the message timing restrictions. */
1684 if (!(scb->ha_flags & HA_FLAG_CRITICAL_CHANGE))
1685 {
1686 PREPROC_PROFILE_END( sessionHAPerfStats );
1687 return;
1688 }
1689 s5ha_stats.update_messages_sent_immediately++;
1690 }
1691 else
1692 s5ha_stats.update_messages_sent_normally++;
1693
1694 debug_flag = StreamHADebugCheck(scb->key, s5_ha_debug_flag, &s5_ha_debug_info,
1695 s5_ha_debug_session, sizeof(s5_ha_debug_session));
1696 if (debug_flag)
1697 #ifdef TARGET_BASED
1698 LogMessage("S5HADbg Producing update message for %s - "
1699 "SF=0x%x IPP=0x%hx AP=0x%hx DIR=%hhu IDIR=%hhu HPM=0x%hhx HF=0x%hhx\n",
1700 s5_ha_debug_session, scb->ha_state.session_flags, scb->ha_state.ipprotocol,
1701 scb->ha_state.application_protocol, scb->ha_state.direction,
1702 scb->ha_state.ignore_direction, scb->ha_pending_mask, scb->ha_flags);
1703 #else
1704 LogMessage("S5HADbg Producing update message for %s - SF=0x%x DIR=%hhu IDIR=%hhu HPM=0x%hhx HF=0x%hhx\n",
1705 s5_ha_debug_session, scb->ha_state.session_flags,
1706 scb->ha_state.direction, scb->ha_state.ignore_direction,
1707 scb->ha_pending_mask, scb->ha_flags);
1708 #endif
1709
1710 /* Write out to the runtime output file. */
1711 if (runtime_output_fd >= 0)
1712 {
1713 /* Generate the incremental update message. */
1714 msg_size = GenerateHAUpdateMessage(file_io_buffer, scb, false);
1715 if (Write(runtime_output_fd, file_io_buffer, msg_size) == -1)
1716 {
1717 /* TODO: Error stuff here. */
1718 WarningMessage("(%s)(%d) Error writing HA message not handled\n", __FILE__, __LINE__);
1719 }
1720 }
1721
1722 #ifdef HAVE_DAQ_EXT_MODFLOW
1723 /*
1724 Store via DAQ module (requires full message generation).
1725 Assume that if we are not setting binding DAQ verdicts because of external encapsulation-induced
1726 confusion that we also cannot safely set the HA state associated with this flow in the DAQ.
1727 */
1728 if (session_configuration->ha_config->use_daq && !Active_GetTunnelBypass())
1729 {
1730 /* Generate the full message. */
1731 msg_size = GenerateHAUpdateMessage(file_io_buffer, scb, true);
1732 DAQ_ModifyFlowHAState(pkthdr, file_io_buffer, msg_size);
1733 }
1734 #endif
1735
1736 #ifdef SIDE_CHANNEL
1737 /* Send an update message over the side channel. */
1738 if (session_configuration->ha_config->use_side_channel)
1739 {
1740 SendSCUpdateMessage(scb);
1741 }
1742 #endif
1743
1744 /* Calculate the next update threshold. */
1745 scb->ha_next_update.tv_usec += session_configuration->ha_config->min_session_lifetime.tv_usec;
1746 if (scb->ha_next_update.tv_usec > 1000000)
1747 {
1748 scb->ha_next_update.tv_usec -= 1000000;
1749 scb->ha_next_update.tv_sec++;
1750 }
1751 scb->ha_next_update.tv_sec += session_configuration->ha_config->min_session_lifetime.tv_sec;
1752
1753 /* Clear the modified/new flags and pending preprocessor updates. */
1754 scb->ha_flags &= ~(HA_FLAG_NEW | HA_FLAG_MODIFIED | HA_FLAG_MAJOR_CHANGE | HA_FLAG_CRITICAL_CHANGE);
1755 scb->ha_pending_mask = 0;
1756
1757 PREPROC_PROFILE_END(sessionHAPerfStats);
1758 }
1759
1760 #ifdef SIDE_CHANNEL
StreamHASCMsgHandler(SCMsgHdr * hdr,const uint8_t * msg,uint32_t msglen)1761 static int StreamHASCMsgHandler( SCMsgHdr *hdr, const uint8_t *msg, uint32_t msglen )
1762 {
1763 int rval;
1764 PROFILE_VARS;
1765
1766 PREPROC_PROFILE_START( sessionHAPerfStats );
1767
1768 rval = ConsumeHAMessage( msg, msglen );
1769
1770 PREPROC_PROFILE_END( sessionHAPerfStats );
1771
1772 return rval;
1773 }
1774 #endif
1775
SessionHAPostConfigInit(struct _SnortConfig * sc,int unused,void * arg)1776 void SessionHAPostConfigInit( struct _SnortConfig *sc, int unused, void *arg )
1777 {
1778 int rval;
1779
1780 if( !session_configuration->enable_ha )
1781 return;
1782
1783 if( session_configuration->ha_config->startup_input_file )
1784 {
1785 rval = ReadHAMessagesFromFile( session_configuration->ha_config->startup_input_file);
1786 if( rval != 0 )
1787 {
1788 ErrorMessage( "Errors were encountered while reading HA messages from file!" );
1789 }
1790 }
1791
1792 if( session_configuration->ha_config->runtime_output_file )
1793 {
1794 runtime_output_fd = open( session_configuration->ha_config->runtime_output_file,
1795 O_WRONLY | O_CREAT | O_TRUNC, 0664 );
1796 if( runtime_output_fd < 0 )
1797 {
1798 FatalError( "Could not open %s for writing HA messages to: %s (%d)\n",
1799 session_configuration->ha_config->runtime_output_file, strerror( errno ), errno );
1800 }
1801 }
1802
1803 #ifdef REG_TEST
1804 if( session_configuration->ha_config->runtime_input_file )
1805 {
1806 runtime_input_fd = open( session_configuration->ha_config->runtime_input_file,
1807 O_RDONLY, 0664 );
1808 if( runtime_input_fd < 0 )
1809 {
1810 FatalError( "Could not open %s for writing HA messages to: %s (%d)\n",
1811 session_configuration->ha_config->runtime_input_file, strerror( errno ), errno );
1812 }
1813 }
1814 #endif
1815
1816 #ifdef SIDE_CHANNEL
1817 if( session_configuration->ha_config->use_side_channel )
1818 {
1819 rval = SideChannelRegisterRXHandler( SC_MSG_TYPE_FLOW_STATE_TRACKING, StreamHASCMsgHandler, NULL );
1820 if( rval != 0 )
1821 {
1822 /* TODO: Fatal error here or something. */
1823 ErrorMessage( "(%s)(%d) Errors were encountered registering Rx Side Channel Handler\n",
1824 __FILE__, __LINE__ );
1825 }
1826 }
1827 #endif
1828 }
1829
SessionCleanHA(void)1830 void SessionCleanHA( void )
1831 {
1832 int i;
1833
1834 for (i = 0; i < n_stream_ha_funcs; i++)
1835 {
1836 if (stream_ha_funcs[i])
1837 {
1838 free(stream_ha_funcs[i]);
1839 stream_ha_funcs[i] = NULL;
1840 }
1841 }
1842 if (runtime_output_fd >= 0)
1843 {
1844 close(runtime_output_fd);
1845 runtime_output_fd = -1;
1846 }
1847 #ifdef REG_TEST
1848 if (runtime_input_fd >= 0)
1849 {
1850 close(runtime_input_fd);
1851 runtime_input_fd = -1;
1852 }
1853 #endif
1854
1855 }
1856