1 /** @internal
2 **
3 **
4 ** @file fbcollector.c
5 ** IPFIX Collecting Process single transport session implementation
6 **
7 ** ------------------------------------------------------------------------
8 ** Copyright (C) 2006-2019 Carnegie Mellon University. All Rights Reserved.
9 ** ------------------------------------------------------------------------
10 ** Authors: Brian Trammell
11 ** ------------------------------------------------------------------------
12 ** @OPENSOURCE_LICENSE_START@
13 ** libfixbuf 2.0
14 **
15 ** Copyright 2018-2019 Carnegie Mellon University. All Rights Reserved.
16 **
17 ** NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE
18 ** ENGINEERING INSTITUTE MATERIAL IS FURNISHED ON AN "AS-IS"
19 ** BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY KIND,
20 ** EITHER EXPRESSED OR IMPLIED, AS TO ANY MATTER INCLUDING, BUT NOT
21 ** LIMITED TO, WARRANTY OF FITNESS FOR PURPOSE OR MERCHANTABILITY,
22 ** EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF THE
23 ** MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF
24 ** ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR
25 ** COPYRIGHT INFRINGEMENT.
26 **
27 ** Released under a GNU-Lesser GPL 3.0-style license, please see
28 ** LICENSE.txt or contact permission@sei.cmu.edu for full terms.
29 **
30 ** [DISTRIBUTION STATEMENT A] This material has been approved for
31 ** public release and unlimited distribution. Please see Copyright
32 ** notice for non-US Government use and distribution.
33 **
34 ** Carnegie Mellon(R) and CERT(R) are registered in the U.S. Patent
35 ** and Trademark Office by Carnegie Mellon University.
36 **
37 ** DM18-0325
38 ** @OPENSOURCE_LICENSE_END@
39 ** ------------------------------------------------------------------------
40 */
41
42 /*#define _GNU_SOURCE*/
43 #define _FIXBUF_SOURCE_
44 #include <fixbuf/private.h>
45
46 #include "fbcollector.h"
47
48
49 /*#################################################
50 *
51 * IPFIX functions for reading input, these are
52 * the default functions
53 *
54 *#################################################*/
55
56
57 /**
58 * fbCollectorDecodeMsgVL
59 *
60 * decodes the header of a variable length message to determine
61 * how long the message is in order to read the appropriate
62 * amount to complete the message
63 *
64 *
65 * @return FALSE on error, TRUE on success
66 */
fbCollectorDecodeMsgVL(fbCollector_t * collector,fbCollectorMsgVL_t * hdr,size_t b_len,uint16_t * m_len,GError ** err)67 static gboolean fbCollectorDecodeMsgVL(
68 fbCollector_t *collector,
69 fbCollectorMsgVL_t *hdr,
70 size_t b_len,
71 uint16_t *m_len,
72 GError **err)
73 {
74 uint16_t h_version;
75 uint16_t h_len;
76
77 /* collector is unused in this function*/
78 (void)collector;
79
80 h_version = g_ntohs(hdr->n_version);
81 if (h_version != 0x000A) {
82 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
83 "Illegal IPFIX Message version 0x%04x; "
84 "input is probably not an IPFIX Message stream.",
85 g_ntohs(hdr->n_version));
86 *m_len = 0;
87 return FALSE;
88 }
89
90 h_len = g_ntohs(hdr->n_len);
91 if (h_len < 16) {
92 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
93 "Illegal IPFIX Message length 0x%04x; "
94 "input is probably not an IPFIX Message stream.",
95 g_ntohs(hdr->n_len));
96 *m_len = 0;
97 return FALSE;
98 }
99
100 if (b_len && (h_len > b_len)) {
101 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_BUFSZ,
102 "Buffer too small to read IPFIX Message "
103 "(message size %hu, buffer size %u).",
104 h_len, (uint32_t)b_len);
105 *m_len = 0;
106 return FALSE;
107 }
108
109 *m_len = h_len;
110 return TRUE;
111 }
112
113 /*
114 * fbCollectMessageBuffer
115 *
116 * decodes the header of a variable length message to determine
117 * how long the message is.
118 * this is used for applications that wants to handle the
119 * connection (reading, shutdown, etc.) itself.
120 * An error FB_ERROR_BUFSZ means that the the buffer does not
121 * contain a full message and an additional read should
122 * occur.
123 *
124 * @return FALSE on error, TRUE on success
125 */
126
fbCollectMessageBuffer(uint8_t * hdr,size_t b_len,size_t * m_len,GError ** err)127 gboolean fbCollectMessageBuffer(
128 uint8_t *hdr,
129 size_t b_len,
130 size_t *m_len,
131 GError **err)
132 {
133 fbCollectorMsgVL_t *iphdr = (fbCollectorMsgVL_t *)hdr;
134 uint16_t h_version;
135 uint16_t h_len;
136
137 if (!hdr || (b_len < 16)) {
138 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_BUFSZ,
139 "Buffer length too small to contain IPFIX header"
140 "(buffer size %u).", (uint32_t)b_len);
141 *m_len = 0;
142 return FALSE;
143 }
144
145 h_version = g_ntohs(iphdr->n_version);
146 if (h_version != 0x000A) {
147 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
148 "Illegal IPFIX Message version 0x%04x; "
149 "input is probably not an IPFIX Message stream.",
150 g_ntohs(iphdr->n_version));
151 *m_len = 0;
152 return FALSE;
153 }
154
155 h_len = g_ntohs(iphdr->n_len);
156 if (h_len < 16) {
157 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
158 "Illegal IPFIX Message length 0x%04x; "
159 "input is probably not an IPFIX Message stream.",
160 g_ntohs(iphdr->n_len));
161 *m_len = 0;
162 return FALSE;
163 }
164
165 if (h_len > b_len) {
166 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_BUFSZ,
167 "Buffer too small to contain IPFIX Message "
168 "(message size %hu, buffer size %u).",
169 h_len, (uint32_t)b_len);
170 *m_len = 0;
171 return FALSE;
172 }
173
174 *m_len = h_len;
175 return TRUE;
176 }
177
178
179 /**
180 * fbCollectorMessageHeaderNull
181 *
182 * this is used to process a PDU after it has been read in a message
183 * based transport protocol (UDP, SCTP) to adjust the header if
184 * needed before sending it to post read fixing. This version does
185 * nothing. NULL transform.
186 *
187 * @param collector pointer to the collector state structure
188 * @param buffer pointer to the message buffer
189 * @param b_len length of the buffer passed in
190 * @param m_len pointer to the length of the resultant buffer
191 * @param err pointer to a GLib error structure
192 *
193 * @return TRUE (this always works)
194 *
195 */
fbCollectorMessageHeaderNull(fbCollector_t * collector,uint8_t * buffer,size_t b_len,uint16_t * m_len,GError ** err)196 static gboolean fbCollectorMessageHeaderNull(
197 fbCollector_t *collector __attribute__((unused)),
198 uint8_t *buffer __attribute__((unused)),
199 size_t b_len,
200 uint16_t *m_len,
201 GError **err __attribute__((unused)) )
202 {
203 *m_len = b_len;
204 return TRUE;
205 }
206
207 /**
208 * fbCollectorUDPMessageHeader
209 *
210 * this is used to process a PDU after it has been read in a message
211 * based transport protocol (UDP, SCTP) to adjust the header if
212 * needed before sending it to post read fixing. This version does
213 * nothing. NULL transform.
214 *
215 * @param collector pointer to the collector state structure
216 * @param buffer pointer to the message buffer
217 * @param b_len length of the buffer passed in
218 * @param m_len pointer to the length of the resultant buffer
219 * @param err pointer to a GLib error structure
220 *
221 * @return TRUE (this always works)
222 *
223 */
fbCollectorUDPMessageHeader(fbCollector_t * collector,uint8_t * buffer,size_t b_len,uint16_t * m_len,GError ** err)224 static gboolean fbCollectorUDPMessageHeader(
225 fbCollector_t *collector,
226 uint8_t *buffer,
227 size_t b_len,
228 uint16_t *m_len,
229 GError **err)
230 {
231 uint16_t h_version;
232
233 *m_len = b_len;
234
235 if (b_len > 16) {
236 if (!fbCollectorHasTranslator(collector)) {
237 h_version = g_ntohs(*(uint16_t *)buffer);
238 if (h_version != 0x000A) {
239 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
240 "Illegal IPFIX Message Version 0x%04x",
241 h_version);
242 return FALSE;
243 }
244 }
245 collector->obdomain = g_ntohl(*(uint32_t *)(buffer + 12));
246 /* Update collector time */
247 collector->time = time(NULL);
248 }
249
250 return TRUE;
251 }
252
253
254 /**
255 * fbCollectorPostProcNull
256 *
257 * this is used to process a PDU after it has been read in order to transform
258 * it, except that this function does _no_ transforms
259 *
260 * @param collector _not used_
261 *
262 * @return TRUE (always succesfull)
263 *
264 */
fbCollectorPostProcNull(fbCollector_t * collector,uint8_t * dataBuf,size_t * bufLen,GError ** err)265 static gboolean fbCollectorPostProcNull(
266 fbCollector_t *collector,
267 uint8_t *dataBuf,
268 size_t *bufLen,
269 GError **err)
270 {
271 (void)collector;
272 (void)dataBuf;
273 (void)bufLen;
274 (void)err;
275
276 return TRUE;
277 }
278
279 /**
280 * fbCollectorCloseTranslatorNull
281 *
282 * default function to clean up the translator state, but there is
283 * none, and this function does nothing
284 *
285 * @param collector current collector
286 *
287 */
fbCollectorCloseTranslatorNull(fbCollector_t * collector)288 static void fbCollectorCloseTranslatorNull(
289 fbCollector_t *collector)
290 {
291 (void)collector;
292 return;
293 }
294
295 /**
296 * fbCollectorSessionTimeoutNull
297 *
298 * default function to clean up timed out UDP sessions.
299 * this function does nothing
300 *
301 * @param collector current collector
302 * @param session session that will be timed out
303 *
304 */
fbCollectorSessionTimeoutNull(fbCollector_t * collector,fbSession_t * session)305 static void fbCollectorSessionTimeoutNull(
306 fbCollector_t *collector,
307 fbSession_t *session)
308 {
309 (void)collector;
310 (void)session;
311 return;
312 }
313
314
315 /*#################################################
316 *
317 * the rest of the meat of the collector implementation
318 *
319 *#################################################*/
320
321 /**
322 * fbCollectorReadFile
323 *
324 *
325 *
326 */
fbCollectorReadFile(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)327 static gboolean fbCollectorReadFile(
328 fbCollector_t *collector,
329 uint8_t *msgbase,
330 size_t *msglen,
331 GError **err)
332 {
333 int rc;
334 uint16_t h_len;
335 gboolean goodLen;
336
337 /* Read and decode version and length */
338 g_assert(*msglen > 4);
339
340 rc = fread(msgbase, 1, 4, collector->stream.fp);
341 if (rc > 0) {
342 goodLen = collector->coreadLen(collector,(fbCollectorMsgVL_t *)msgbase,
343 *msglen, &h_len, err);
344 if (FALSE == goodLen) return FALSE;
345 msgbase += 4;
346 } else if (feof(collector->stream.fp)) {
347 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
348 "End of file");
349 return FALSE;
350 } else {
351 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
352 "I/O error: %s", strerror(errno));
353 return FALSE;
354 }
355
356 /* read rest of message */
357 rc = fread(msgbase, 1, h_len - 4, collector->stream.fp);
358 if (rc > 0) {
359 *msglen = rc + 4;
360 if (!collector->copostRead(collector, msgbase, msglen, err)) {
361 return FALSE;
362 }
363 return TRUE;
364 } else if (feof(collector->stream.fp)) {
365 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
366 "End of file");
367 return FALSE;
368 } else {
369 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
370 "I/O error: %s", strerror(errno));
371 return FALSE;
372 }
373 }
374
375 /**
376 * fbCollectorCloseFile
377 *
378 *
379 *
380 */
fbCollectorCloseFile(fbCollector_t * collector)381 static void fbCollectorCloseFile(
382 fbCollector_t *collector)
383 {
384 if (collector->stream.fp != stdin) {
385 fclose(collector->stream.fp);
386 }
387 collector->active = FALSE;
388 }
389
390 /**
391 * fbCollectorAllocFP
392 *
393 *
394 *
395 */
fbCollectorAllocFP(void * ctx,FILE * fp)396 fbCollector_t *fbCollectorAllocFP(
397 void *ctx,
398 FILE *fp)
399 {
400 fbCollector_t *collector = NULL;
401
402 g_assert(fp);
403
404 /* Create a new collector */
405 collector = g_slice_new0(fbCollector_t);
406
407 /* Fill the collector in */
408 collector->ctx = ctx;
409 collector->stream.fp = fp;
410 collector->bufferedStream = TRUE;
411 collector->active = TRUE;
412 collector->coread = fbCollectorReadFile;
413 collector->copostRead = fbCollectorPostProcNull;
414 collector->coreadLen = fbCollectorDecodeMsgVL;
415 collector->comsgHeader = fbCollectorMessageHeaderNull;
416 collector->cotransClose = fbCollectorCloseTranslatorNull;
417 collector->cotimeOut = fbCollectorSessionTimeoutNull;
418 collector->translationActive = FALSE;
419 collector->rip = -1;
420 collector->wip = -1;
421
422 /* All done */
423 return collector;
424 }
425
426 /**
427 * fbCollectorAllocFile
428 *
429 *
430 *
431 */
fbCollectorAllocFile(void * ctx,const char * path,GError ** err)432 fbCollector_t *fbCollectorAllocFile(
433 void *ctx,
434 const char *path,
435 GError **err)
436 {
437 fbCollector_t *collector = NULL;
438 FILE *fp = NULL;
439
440 /* check to see if we're opening stdin */
441 if ((strlen(path) == 1) && (path[0] == '-'))
442 {
443 /* don't open a terminal */
444 if (isatty(fileno(stdin))) {
445 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
446 "Refusing to open stdin terminal for collection");
447 return NULL;
448 }
449
450 /* yep, stdin */
451 fp = stdin;
452 } else {
453 /* nope, just a regular file; open it. */
454 fp = fopen(path, "r");
455 }
456
457 /* check for error */
458 if (!fp) {
459 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
460 "Couldn't open %s for collection: %s",
461 path, strerror(errno));
462 return NULL;
463 }
464
465 /* allocate a collector */
466 collector = fbCollectorAllocFP(ctx, fp);
467
468 /* set the file close function */
469 collector->coclose = fbCollectorCloseFile;
470
471 /* set the default collector function */
472 collector->copostRead = fbCollectorPostProcNull;
473
474 /* default translator cleanup function */
475 collector->cotransClose = fbCollectorCloseTranslatorNull;
476
477 /* set the default message read length function */
478 collector->coreadLen = fbCollectorDecodeMsgVL;
479
480 /* set the default message header transform function */
481 collector->comsgHeader = fbCollectorMessageHeaderNull;
482
483 /* set the default session timed out function - won't get called */
484 collector->cotimeOut = fbCollectorSessionTimeoutNull;
485
486 /* mark the stream if it is a buffered file pointer */
487 collector->bufferedStream = TRUE;
488
489 /* set that a input translator is not in use */
490 collector->translationActive = FALSE;
491
492 /* since we're not a listener */
493 collector->rip = -1;
494 collector->wip = -1;
495
496 /* all done */
497 return collector;
498 }
499
500 #if FB_ENABLE_SCTP
501
502 /**
503 * fbCollectorReadSCTP
504 *
505 *
506 *
507 */
fbCollectorReadSCTP(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)508 static gboolean fbCollectorReadSCTP(
509 fbCollector_t *collector,
510 uint8_t *msgbase,
511 size_t *msglen,
512 GError **err)
513 {
514 uint16_t msgSize;
515 struct sockaddr peer;
516 socklen_t peerlen = sizeof(peer);
517 struct sctp_sndrcvinfo sri;
518 int sctp_flags = 0;
519 int rc;
520
521 rc = sctp_recvmsg(collector->stream.fd, msgbase, *msglen,
522 &peer, &peerlen, &sri, &sctp_flags);
523
524 if (rc > 0) {
525 if (!collector->comsgHeader(collector, msgbase, rc, &msgSize, err)) {
526 return FALSE;
527 }
528 *msglen = msgSize;
529 if (!collector->copostRead(collector, msgbase, msglen, err)) {
530 return FALSE;
531 }
532 return TRUE;
533 } else if (rc == 0) {
534 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
535 "End of file");
536 return FALSE;
537 } else if (errno == EINTR) {
538 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
539 "SCTP read interrupt");
540 return FALSE;
541 } else {
542 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
543 "TCP I/O error: %s", strerror(errno));
544 return FALSE;
545 }
546 }
547 #endif /* FB_ENABLE_SCTP */
548
fbCollectorHandleSelect(fbCollector_t * collector)549 static int fbCollectorHandleSelect(
550 fbCollector_t *collector)
551 {
552 fd_set rdfds;
553 int maxfd;
554 int count;
555 int retVal = 0;
556 uint8_t byte;
557
558 g_assert(collector);
559
560 if (collector->rip > collector->stream.fd) {
561 maxfd = collector->rip;
562 } else {
563 maxfd = collector->stream.fd;
564 }
565
566 maxfd++;
567
568 FD_ZERO(&rdfds);
569 FD_SET(collector->rip, &rdfds);
570 FD_SET(collector->stream.fd, &rdfds);
571
572 count = select(maxfd, &rdfds, NULL, NULL, NULL);
573
574 if (count) {
575 if (FD_ISSET(collector->stream.fd, &rdfds)) {
576 retVal = 0;
577 }
578
579 if (FD_ISSET(collector->rip, &rdfds)) {
580 read(collector->rip, &byte, sizeof(byte));
581 return -1;
582 }
583 return retVal;
584 } else {
585 return -1;
586 }
587 }
588
589 /**
590 * fbCollectorReadTCP
591 *
592 *
593 *
594 */
fbCollectorReadTCP(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)595 static gboolean fbCollectorReadTCP(
596 fbCollector_t *collector,
597 uint8_t *msgbase,
598 size_t *msglen,
599 GError **err)
600 {
601 int rc;
602 uint16_t h_len, rrem;
603 gboolean goodLen;
604
605 /* Read and decode version and length */
606 g_assert(*msglen > 4);
607 rrem = 4;
608 while (rrem) {
609 rc = fbCollectorHandleSelect(collector);
610
611 if (rc < 0) {
612 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
613 "Interrupted by pipe");
614 /* interrupted by pipe read or other error with select*/
615 return FALSE;
616 }
617
618 rc = read(collector->stream.fd, msgbase, rrem);
619 if (rc > 0) {
620 rrem -= rc;
621 msgbase += rc;
622 } else if (rc == 0) {
623 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
624 "End of file");
625 return FALSE;
626 } else if (errno == EINTR) {
627 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
628 "TCP read interrupt at message start");
629 return FALSE;
630 } else {
631 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
632 "TCP I/O error: %s", strerror(errno));
633 return FALSE;
634 }
635 }
636 goodLen = collector->coreadLen(collector,
637 (fbCollectorMsgVL_t *)(msgbase - 4),
638 *msglen, &h_len, err);
639 if (FALSE == goodLen) return FALSE;
640
641 /* read rest of message */
642 rrem = h_len - 4;
643 while (rrem) {
644 rc = fbCollectorHandleSelect(collector);
645
646 if (rc < 0) {
647 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
648 "Interrupted by pipe");
649 /* interrupted by pipe read or other error with select*/
650 return FALSE;
651 }
652 rc = read(collector->stream.fd, msgbase, rrem);
653 if (rc > 0) {
654 rrem -= rc;
655 msgbase += rc;
656 } else if (rc == 0) {
657 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
658 "End of file");
659 return FALSE;
660 } else if (errno == EINTR) {
661 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
662 "TCP read interrupt in message");
663 return FALSE;
664 } else {
665 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
666 "TCP I/O error: %s", strerror(errno));
667 return FALSE;
668 }
669 }
670
671 /* Post process, if needed and return message length from header. */
672 *msglen = h_len;
673 if (!collector->copostRead(collector, msgbase, msglen, err)) {
674 return FALSE;
675 }
676 return TRUE;
677 }
678
fbCollectorSetUDPSpec(fbCollector_t * collector,fbUDPConnSpec_t * spec)679 static void fbCollectorSetUDPSpec(
680 fbCollector_t *collector,
681 fbUDPConnSpec_t *spec)
682 {
683 if (collector->udp_head == NULL) {
684 collector->udp_head = spec;
685 collector->udp_tail = spec;
686 } else if (collector->udp_head != spec) {
687 /* don't pick if it's new */
688 if (spec->prev || spec->next) {
689 /* connect last to next */
690 if (spec->prev) {
691 spec->prev->next = spec->next;
692 }
693
694 /* connect next to last last */
695 if (spec->next) {
696 spec->next->prev = spec->prev;
697 } else {
698 collector->udp_tail = spec->prev;
699 }
700
701 spec->prev = NULL;
702 fbListenerSetPeerSession(collector->listener, spec->session);
703 }
704
705 /* now set it in the front */
706 spec->next = collector->udp_head;
707 collector->udp_head->prev = spec;
708 collector->udp_head = spec;
709 }
710 }
711
fbCollectorFreeUDPSpec(fbCollector_t * collector,fbUDPConnSpec_t * spec)712 static void fbCollectorFreeUDPSpec(
713 fbCollector_t *collector,
714 fbUDPConnSpec_t *spec)
715 {
716 /* let translators release state */
717 collector->cotimeOut(collector, spec->session);
718
719 /* don't free the last session, fbufree will do that */
720 if (collector->udp_tail != collector->udp_head) {
721 fbSessionFree(spec->session);
722 }
723
724 if (collector->udp_tail == spec) {
725 if (spec->prev) {
726 collector->udp_tail = spec->prev;
727 spec->prev->next = NULL;
728 } else {
729 collector->udp_tail = NULL;
730 }
731 }
732
733 if (collector->multi_session) {
734 fbListenerAppFree(collector->listener, spec->ctx);
735 }
736
737 g_slice_free(fbUDPConnSpec_t, spec);
738 }
739
740 /**
741 * fbCollectorVerifyUDPPeer
742 *
743 *
744 *
745 */
fbCollectorVerifyUDPPeer(fbCollector_t * collector,struct sockaddr * from,socklen_t fromlen,GError ** err)746 static gboolean fbCollectorVerifyUDPPeer(
747 fbCollector_t *collector,
748 struct sockaddr *from,
749 socklen_t fromlen,
750 GError **err)
751 {
752 fbUDPConnSpec_t *udp = collector->udp_head;
753 gboolean found = FALSE;
754
755 /* stash the address if we've not seen it before */
756 /* compare the address if we have */
757 /* appinit should simulate no data (NLREAD) if message is from wrong peer*/
758
759 if (collector->accept_only) {
760 if (collector->peer.so.sa_family == from->sa_family) {
761 if (from->sa_family == AF_INET) {
762 if (memcmp(&(collector->peer.ip4.sin_addr),
763 &(((struct sockaddr_in *)from)->sin_addr),
764 sizeof(struct in_addr)))
765 {
766 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
767 "Ignoring message from peer");
768 return FALSE;
769 }
770 } else if (from->sa_family == AF_INET6) {
771 if(memcmp(&(collector->peer.ip6.sin6_addr),
772 &(((struct sockaddr_in6 *)from)->sin6_addr),
773 sizeof(struct in6_addr)))
774 {
775 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
776 "Ignoring message from peer");
777 return FALSE;
778 }
779 }
780 }
781 } else {
782 memcpy(&(collector->peer.so), from,
783 (fromlen > sizeof(collector->peer)) ?
784 sizeof(collector->peer) : fromlen);
785 }
786
787 while (udp) {
788 /* loop through and find current one */
789 if (udp->obdomain == collector->obdomain) {
790 if (!memcmp(&(udp->peer.so), from, udp->peerlen)) {
791 /* we have a match - set session */
792 fbCollectorSetUDPSpec(collector, udp);
793 found = TRUE;
794 break;
795 }
796 }
797 udp = udp->next;
798 }
799
800
801 if (!found) {
802 udp = g_slice_new0(fbUDPConnSpec_t);
803 memcpy(&(udp->peer.so), from, (fromlen > sizeof(udp->peer)) ?
804 sizeof(udp->peer) : fromlen);
805 udp->peerlen = (fromlen > sizeof(udp->peer)) ? sizeof(udp->peer) : fromlen;
806 udp->obdomain = collector->obdomain;
807 /* create a new session */
808 udp->session = fbListenerSetPeerSession(collector->listener, NULL);
809 fbCollectorSetUDPSpec(collector, udp);
810
811 /* call app init for new UDP connection*/
812 if (collector->multi_session) {
813 if (!fbListenerCallAppInit(collector->listener, udp, err)) {
814 udp->last_seen = collector->time;
815 udp->reject = TRUE;
816 return FALSE;
817 }
818 } else {
819 /* backwards compatibility -> need to associate the ctx with all
820 sessions */
821 udp->ctx = collector->ctx;
822 }
823 } else {
824 if (udp->reject) {
825 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
826 "Rejecting previously rejected connection");
827 return FALSE;
828 }
829 }
830
831 collector->ctx = udp->ctx;
832 udp->last_seen = collector->time;
833
834 while (collector->udp_tail &&
835 (difftime(collector->time, collector->udp_tail->last_seen) >
836 FB_UDP_TIMEOUT))
837 {
838 /* timeout check */
839 fbCollectorFreeUDPSpec(collector, collector->udp_tail);
840 }
841
842
843 return TRUE;
844 }
845
846 /**
847 * fbCollectorReadUDP
848 *
849 *
850 *
851 */
fbCollectorReadUDP(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)852 static gboolean fbCollectorReadUDP(
853 fbCollector_t *collector,
854 uint8_t *msgbase,
855 size_t *msglen,
856 GError **err)
857 {
858 uint16_t msgSize = 0;
859 ssize_t recvlen = 0;
860 int rc;
861 union {
862 struct sockaddr so;
863 struct sockaddr_in ip4;
864 struct sockaddr_in6 ip6;
865 } peer;
866 socklen_t peerlen;
867
868 memset(&peer, 0, sizeof(peer));
869
870 rc = fbCollectorHandleSelect(collector);
871
872 if (rc < 0) {
873 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
874 "Interrupted by pipe");
875 /* interrupted by pipe read or other error with select*/
876 return FALSE;
877 }
878
879 peerlen = sizeof(peer);
880 recvlen = recvfrom(collector->stream.fd, msgbase, *msglen, 0,
881 (struct sockaddr *)&peer, &peerlen);
882
883
884 if (peer.so.sa_family == AF_INET6) {
885 peer.ip6.sin6_flowinfo = 0;
886 peer.ip6.sin6_scope_id = 0;
887 }
888
889 if (!collector->comsgHeader(collector, msgbase, recvlen, &msgSize, err)) {
890 return FALSE;
891 }
892
893 if (msgSize > 0) {
894 *msglen = msgSize;
895 /** Fixed this to do the right thing. We now map ip
896 * addresses/port and observation domains to sessions. If
897 * accept-only is set on the collector, we'll only return TRUE
898 * if the ip/ports match. We will return NL_READ if FALSE, and the
899 * app using fixbuf should ignore error codes = NL_READ.**/
900
901 /* this will only veto if we set accept from explicitly*/
902 if (!fbCollectorVerifyUDPPeer(collector, &(peer.so), peerlen, err)) {
903 return FALSE;
904 }
905 if (!collector->copostRead(collector, msgbase, msglen, err)) {
906 return FALSE;
907 }
908 return TRUE;
909 } else if (errno == EINTR || errno == EWOULDBLOCK) {
910 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
911 "UDP read interrupt or timeout");
912 return FALSE;
913 } else {
914 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
915 "UDP I/O error: %s", strerror(errno));
916 return FALSE;
917 }
918 }
919
920 /**
921 * fbCollectorCloseSocket
922 *
923 *
924 *
925 */
fbCollectorCloseSocket(fbCollector_t * collector)926 static void fbCollectorCloseSocket(
927 fbCollector_t *collector)
928 {
929 if (collector->stream.fd != -1) {
930 close(collector->stream.fd);
931 /* don't set to -1 because we need it to remove listener */
932 }
933
934 if (collector->rip != -1) {
935 close(collector->rip);
936 collector->rip = -1;
937 }
938
939 if (collector->wip != -1) {
940 close(collector->wip);
941 collector->wip = -1;
942 }
943
944 collector->active = FALSE;
945 }
946
947 /**
948 * fbCollectorAllocSocket
949 *
950 *
951 *
952 */
fbCollectorAllocSocket(fbListener_t * listener,void * ctx,int fd,struct sockaddr * peer,size_t peerlen,GError ** err)953 fbCollector_t *fbCollectorAllocSocket(
954 fbListener_t *listener,
955 void *ctx,
956 int fd,
957 struct sockaddr *peer,
958 size_t peerlen,
959 GError **err)
960 {
961 fbCollector_t *collector = NULL;
962 fbConnSpec_t *spec = fbListenerGetConnSpec(listener);
963 int pfd[2];
964
965 /* Create a new collector */
966 collector = g_slice_new0(fbCollector_t);
967
968 /* Fill it in */
969 collector->listener = listener;
970 collector->ctx = ctx;
971 collector->stream.fd = fd;
972 collector->bufferedStream = FALSE;
973 collector->active = TRUE;
974 collector->copostRead = fbCollectorPostProcNull;
975 collector->coreadLen = fbCollectorDecodeMsgVL;
976 collector->comsgHeader = fbCollectorMessageHeaderNull;
977 collector->coclose = fbCollectorCloseSocket;
978 collector->cotransClose = fbCollectorCloseTranslatorNull;
979 collector->cotimeOut = fbCollectorSessionTimeoutNull;
980 collector->translationActive = FALSE;
981 collector->multi_session = FALSE;
982
983 /* Create interrupt pipe */
984 if (pipe(pfd)) {
985 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
986 "Unable to create pipe on collector: %s", strerror(errno));
987 g_slice_free(fbCollector_t, collector);
988 return NULL;
989 }
990 collector->rip = pfd[0];
991 collector->wip = pfd[1];
992
993 if (peerlen) {
994 memcpy(&(collector->peer.so), peer,
995 (peerlen > sizeof(collector->peer)) ?
996 sizeof(collector->peer) : peerlen);
997 }
998
999 /* Select a reader function */
1000 switch(spec->transport) {
1001 #if FB_ENABLE_SCTP
1002 case FB_SCTP:
1003 collector->coread = fbCollectorReadSCTP;
1004 break;
1005 #endif
1006 case FB_TCP:
1007 collector->coread = fbCollectorReadTCP;
1008 break;
1009 case FB_UDP:
1010 collector->coread = fbCollectorReadUDP;
1011 collector->comsgHeader = fbCollectorUDPMessageHeader;
1012 break;
1013 default:
1014 g_assert_not_reached();
1015 }
1016
1017 /* All done */
1018 return collector;
1019 }
1020
1021 #if HAVE_OPENSSL
1022
1023 /**
1024 * fbCollectorReadTLS
1025 *
1026 *
1027 *
1028 */
fbCollectorReadTLS(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)1029 static gboolean fbCollectorReadTLS(
1030 fbCollector_t *collector,
1031 uint8_t *msgbase,
1032 size_t *msglen,
1033 GError **err)
1034 {
1035 int rc;
1036 uint16_t h_len, rrem;
1037 gboolean rv;
1038
1039 /* Read and decode version and length */
1040 g_assert(*msglen > 4);
1041 rrem = 4;
1042 while (rrem) {
1043 rc = SSL_read(collector->ssl, msgbase, rrem);
1044 if (rc > 0) {
1045 rrem -= rc;
1046 msgbase += rc;
1047 } else if (rc == 0) {
1048 /* FIXME this isn't _quite_ robust but it's good enough for now.
1049 we'll fix this when we do TLS/TCP stress testing. */
1050 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
1051 "TLS connection shutdown");
1052 return FALSE;
1053 } else {
1054 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1055 "TLS I/O error at message start: %s",
1056 ERR_error_string(ERR_get_error(), NULL));
1057 while (ERR_get_error());
1058 return FALSE;
1059 }
1060 }
1061 rv = collector->coreadLen(collector,
1062 (fbCollectorMsgVL_t *)(msgbase - 4),
1063 *msglen, &h_len, err);
1064 if (rv == FALSE) return FALSE;
1065
1066 /* read rest of message */
1067 rrem = h_len - 4;
1068 while (rrem) {
1069 rc = SSL_read(collector->ssl, msgbase, rrem);
1070 if (rc > 0) {
1071 rrem -= rc;
1072 msgbase += rc;
1073 } else {
1074 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1075 "TLS I/O error in message: %s",
1076 ERR_error_string(ERR_get_error(), NULL));
1077 while (ERR_get_error());
1078 return FALSE;
1079 }
1080 }
1081
1082 /* All done. Return message length from header. */
1083 *msglen = h_len;
1084 return TRUE;
1085 }
1086
1087 /**
1088 * fbCollectorCloseTLS
1089 *
1090 *
1091 *
1092 */
fbCollectorCloseTLS(fbCollector_t * collector)1093 static void fbCollectorCloseTLS(
1094 fbCollector_t *collector)
1095 {
1096 SSL_shutdown(collector->ssl);
1097 SSL_free(collector->ssl);
1098 if (collector->rip != -1)
1099 {
1100 close(collector->rip);
1101 collector->rip = -1;
1102 }
1103
1104 if (collector->wip != -1)
1105 {
1106 close(collector->wip);
1107 collector->wip = -1;
1108 }
1109
1110 collector->active = FALSE;
1111 }
1112
1113 /**
1114 * fbCollectorOpenTLS
1115 *
1116 *
1117 *
1118 */
fbCollectorOpenTLS(fbCollector_t * collector,GError ** err)1119 static gboolean fbCollectorOpenTLS(
1120 fbCollector_t *collector,
1121 GError **err)
1122 {
1123 fbConnSpec_t *spec = fbListenerGetConnSpec(collector->listener);
1124 BIO *conn;
1125 gboolean ok = TRUE;
1126
1127 /* Initialize SSL context if necessary */
1128 if (!spec->vssl_ctx) {
1129 if (!fbConnSpecInitTLS(spec, TRUE, err)) {
1130 return FALSE;
1131 }
1132 }
1133
1134 /* wrap a stream BIO around the opened socket */
1135 if (!(conn = BIO_new_socket(collector->stream.fd, 1))) {
1136 ok = FALSE;
1137 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1138 "couldn't wrap socket for TLS: %s",
1139 ERR_error_string(ERR_get_error(), NULL));
1140 while (ERR_get_error());
1141 goto end;
1142 }
1143
1144 /* create SSL socket */
1145 if (!(collector->ssl = SSL_new((SSL_CTX *)spec->vssl_ctx))) {
1146 ok = FALSE;
1147 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1148 "couldnt create TLS socket: %s",
1149 ERR_error_string(ERR_get_error(), NULL));
1150 while (ERR_get_error());
1151 goto end;
1152 }
1153
1154 /* accept SSL connection */
1155 SSL_set_accept_state(collector->ssl);
1156 SSL_set_bio(collector->ssl, conn, conn);
1157 SSL_set_mode(collector->ssl, SSL_MODE_AUTO_RETRY);
1158 if (SSL_accept(collector->ssl) <= 0) {
1159 ok = FALSE;
1160 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1161 "couldn't accept on connected TLS socket: %s",
1162 ERR_error_string(ERR_get_error(), NULL));
1163 while (ERR_get_error());
1164 goto end;
1165 }
1166
1167 /* FIXME do post-connection verification */
1168
1169 end:
1170 if (!ok) {
1171 collector->active = FALSE;
1172 if (collector->ssl) {
1173 SSL_free(collector->ssl);
1174 collector->ssl = NULL;
1175 } else if (conn) {
1176 BIO_vfree(conn);
1177 }
1178 }
1179 return ok;
1180 }
1181
1182 #if HAVE_OPENSSL_DTLS
1183
1184 /**
1185 * fbCollectorOpenDTLS
1186 *
1187 *
1188 *
1189 */
fbCollectorOpenDTLS(fbCollector_t * collector,GError ** err)1190 static gboolean fbCollectorOpenDTLS(
1191 fbCollector_t *collector,
1192 GError **err)
1193 {
1194 fbConnSpec_t *spec = fbListenerGetConnSpec(collector->listener);
1195 BIO *conn;
1196 gboolean ok = TRUE;
1197
1198 /* Initialize SSL context if necessary */
1199 if (!spec->vssl_ctx) {
1200 if (!fbConnSpecInitTLS(spec, TRUE, err)) {
1201 return FALSE;
1202 }
1203 }
1204
1205 /* wrap a stream BIO around the opened socket */
1206 if (!(conn = BIO_new_dgram(collector->stream.fd, 1))) {
1207 ok = FALSE;
1208 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1209 "couldn't wrap socket for TLS: %s",
1210 ERR_error_string(ERR_get_error(), NULL));
1211 while (ERR_get_error());
1212 goto end;
1213 }
1214
1215 /* create SSL socket */
1216 if (!(collector->ssl = SSL_new((SSL_CTX *)spec->vssl_ctx))) {
1217 ok = FALSE;
1218 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1219 "couldnt create TLS socket: %s",
1220 ERR_error_string(ERR_get_error(), NULL));
1221 while (ERR_get_error());
1222 goto end;
1223 }
1224
1225 /* Enable cookie exchange */
1226 SSL_set_options(collector->ssl, SSL_OP_COOKIE_EXCHANGE);
1227
1228 /* accept SSL connection */
1229 SSL_set_bio(collector->ssl, conn, conn);
1230 SSL_set_accept_state(collector->ssl);
1231 SSL_set_mode(collector->ssl, SSL_MODE_AUTO_RETRY);
1232 if (SSL_accept(collector->ssl) <= 0) {
1233 ok = FALSE;
1234 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1235 "couldn't accept on connected TLS socket: %s",
1236 ERR_error_string(ERR_get_error(), NULL));
1237 while (ERR_get_error());
1238 goto end;
1239 }
1240
1241 /* FIXME do post-connection verification */
1242
1243 end:
1244 if (!ok) {
1245 collector->active = FALSE;
1246 if (collector->ssl) {
1247 SSL_free(collector->ssl);
1248 collector->ssl = NULL;
1249 } else if (conn) {
1250 BIO_vfree(conn);
1251 }
1252 }
1253 return ok;
1254 }
1255
1256 #endif /* HAVE_OPENSSL_DTLS */
1257
1258 /**
1259 * fbCollectorAllocTLS
1260 *
1261 *
1262 *
1263 */
fbCollectorAllocTLS(fbListener_t * listener,void * ctx,int fd,struct sockaddr * peer,size_t peerlen,GError ** err)1264 fbCollector_t *fbCollectorAllocTLS(
1265 fbListener_t *listener,
1266 void *ctx,
1267 int fd,
1268 struct sockaddr *peer,
1269 size_t peerlen,
1270 GError **err)
1271 {
1272 gboolean ok = TRUE;
1273 fbCollector_t *collector = NULL;
1274 fbConnSpec_t *spec = fbListenerGetConnSpec(listener);
1275
1276 /* Create a new collector */
1277 collector = g_slice_new0(fbCollector_t);
1278
1279 /* Fill it in */
1280 collector->listener = listener;
1281 collector->ctx = ctx;
1282 collector->stream.fd = fd;
1283 collector->bufferedStream = FALSE;
1284 collector->active = TRUE;
1285 collector->copostRead = fbCollectorPostProcNull;
1286 collector->coreadLen = fbCollectorDecodeMsgVL;
1287 collector->comsgHeader = fbCollectorMessageHeaderNull;
1288 collector->coread = fbCollectorReadTLS;
1289 collector->coclose = fbCollectorCloseTLS;
1290 collector->cotransClose = fbCollectorCloseTranslatorNull;
1291 collector->cotimeOut = fbCollectorSessionTimeoutNull;
1292 collector->translationActive = FALSE;
1293 if (peerlen) {
1294 memcpy(&(collector->peer.so), peer,
1295 (peerlen > sizeof(collector->peer)) ?
1296 sizeof(collector->peer) : peerlen);
1297 }
1298
1299
1300 /* Do TLS accept atop opened socket */
1301 switch (spec->transport) {
1302 case FB_TLS_TCP:
1303 ok = fbCollectorOpenTLS(collector, err);
1304 break;
1305 #if HAVE_OPENSSL_DTLS
1306 case FB_DTLS_UDP:
1307 #if HAVE_OPENSSL_DTLS_SCTP
1308 case FB_DTLS_SCTP:
1309 #endif
1310 ok = fbCollectorOpenDTLS(collector, err);
1311 break;
1312 #endif
1313 default:
1314 g_assert_not_reached();
1315 }
1316
1317 /* Nuke collector on TLS setup error */
1318 if (!ok) {
1319 g_slice_free(fbCollector_t, collector);
1320 return NULL;
1321 }
1322
1323 /* All done */
1324 return collector;
1325 }
1326
1327 #endif /* HAVE_OPENSSL */
1328
1329 #if HAVE_SPREAD
1330
fbCollectorSpreadOpen(fbCollector_t * collector,GError ** err)1331 static gboolean fbCollectorSpreadOpen(
1332 fbCollector_t *collector,
1333 GError **err)
1334 {
1335 int ret;
1336 int i = 0;
1337 char grp[MAX_GROUP_NAME];
1338 fbSpreadSpec_t *spread = collector->stream.spread;
1339
1340 if (!spread->daemon)
1341 {
1342 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1343 "Spread daemon name cannot be null" );
1344 return FALSE;
1345 }
1346 if (!spread->daemon[0])
1347 {
1348 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1349 "Spread daemon name cannot be empty" );
1350 return FALSE;
1351 }
1352 /*if (strnlen( spread->daemon, 262 ) > 261)*/
1353 if ( !(memchr( spread->daemon, 0, 261)) )
1354 {
1355 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1356 "Spread daemon name too long" );
1357 return FALSE;
1358 }
1359 if (!spread->groups)
1360 {
1361 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1362 "Spread groups cannot be null" );
1363 return FALSE;
1364 }
1365 if (!spread->groups[0].name[0])
1366 {
1367 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1368 "Spread groups cannot be empty" );
1369 return FALSE;
1370 }
1371 if (!spread->session)
1372 {
1373 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1374 "Spread session cannot be null" );
1375 return FALSE;
1376 }
1377
1378 ret = SP_connect( spread->daemon, 0, 0, 0, &(spread->recv_mbox),
1379 spread->privgroup );
1380
1381 if (ret != ACCEPT_SESSION)
1382 {
1383 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1384 "error connecting to Spread daemon %s: %s",
1385 spread->daemon, fbConnSpreadError( ret ) );
1386 return FALSE;
1387 }
1388
1389 /* mark it active here, fbCollectorFree() will need to disconnect */
1390 collector->active = TRUE;
1391
1392 for (i = 0; i < spread->num_groups; ++i)
1393 {
1394 ret = SP_join( spread->recv_mbox, spread->groups[i].name);
1395 if (ret)
1396 {
1397 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1398 "error joining to Spread data group %s: %s",
1399 spread->groups[i].name, fbConnSpreadError( ret ) );
1400 return FALSE;
1401 }
1402 }
1403
1404 /* now that we have joined the data plane group, join the
1405 * control/template plane group to signal exporters that
1406 * we need the templates for this group. */
1407
1408 for (i = 0; i < spread->num_groups; ++i)
1409 {
1410 memset( grp, 0, sizeof( grp ) );
1411 strncpy( grp, spread->groups[i].name, sizeof( grp) - 2 );
1412 strcat( grp, "T" );
1413 ret = SP_join( spread->recv_mbox, grp );
1414
1415 if (ret) {
1416 g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1417 "error joining to Spread control/template group %s: %s",
1418 spread->groups[i].name, fbConnSpreadError( ret ) );
1419 return FALSE;
1420 }
1421 }
1422
1423 return TRUE;
1424 }
1425
1426 /* There is no good way to deal with sequence numbers in Spread.
1427 * This was added to return False if the collector receives
1428 * a message where it is not the first group listed in msg.
1429 * This is because the exporter only looks at the first group
1430 * when deciding what sequence number to export */
1431
fbCollectorSpreadPostProc(fbCollector_t * collector,uint8_t * buffer,size_t * b_len,GError ** err)1432 static gboolean fbCollectorSpreadPostProc(
1433 fbCollector_t *collector,
1434 uint8_t *buffer,
1435 size_t *b_len,
1436 GError **err)
1437 {
1438 if (fbCollectorTestGroupMembership(collector, 0)) {
1439 return TRUE;
1440 }
1441 (void) collector;
1442 (void) buffer;
1443 (void) b_len;
1444 (void) err;
1445
1446 return FALSE;
1447 }
1448
fbCollectorGetSpreadReturnGroups(fbCollector_t * collector,char * groups[])1449 int fbCollectorGetSpreadReturnGroups(
1450 fbCollector_t *collector,
1451 char *groups[])
1452 {
1453 int loop = 0;
1454 fbSpreadSpec_t *spread = collector->stream.spread;
1455
1456 for ( loop = 0; loop < spread->recv_num_groups; loop++){
1457 groups[loop] = spread->recv_groups[loop].name;
1458 }
1459
1460 return spread->recv_num_groups;
1461 }
1462
fbCollectorSpreadRead(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)1463 static gboolean fbCollectorSpreadRead(
1464 fbCollector_t *collector,
1465 uint8_t *msgbase,
1466 size_t *msglen,
1467 GError **err)
1468 {
1469 fbSpreadSpec_t *spread = collector->stream.spread;
1470
1471 service service_type = 0;
1472 char sender[MAX_GROUP_NAME];
1473 int16 mess_type = 0;
1474 int endian_mismatch;
1475 int no_mess = 1;
1476 int ret;
1477
1478 do {
1479
1480 ret = SP_receive( spread->recv_mbox, &service_type, sender,
1481 spread->recv_max_groups,
1482 &(spread->recv_num_groups),
1483 (char (*)[])spread->recv_groups,
1484 &mess_type, &endian_mismatch, *msglen,
1485 (char *)msgbase );
1486
1487 if (spread->recv_exit) {
1488 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
1489 "End of file: spread shut down or was not connected");
1490 return FALSE;
1491 }
1492
1493 if (ret < 0) {
1494 if (ret == GROUPS_TOO_SHORT) {
1495 g_free(spread->recv_groups);
1496 spread->recv_max_groups = -spread->recv_num_groups;
1497 spread->recv_groups = g_new0( sp_groupname_t,
1498 spread->recv_max_groups );
1499 } else if (ret == BUFFER_TOO_SHORT) {
1500 *msglen = -endian_mismatch;
1501 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOM,
1502 "msglen too small (%zd required)", *msglen);
1503 return FALSE;
1504 } else if ((ret == CONNECTION_CLOSED) || (ret == ILLEGAL_SESSION))
1505 {
1506 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1507 "End of file: %s", fbConnSpreadError(ret));
1508 return FALSE;
1509 } else {
1510 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1511 "error(%d) receiving Spread message: %s", ret,
1512 fbConnSpreadError(ret));
1513
1514 *msglen = 0;
1515 return FALSE;
1516 }
1517 } else {
1518 *msglen = ret;
1519 no_mess = 0;
1520 }
1521
1522 } while (no_mess);
1523
1524 return TRUE;
1525 }
1526
1527
1528
fbCollectorSpreadClose(fbCollector_t * collector)1529 static void fbCollectorSpreadClose(
1530 fbCollector_t *collector)
1531 {
1532 if (collector->active) {
1533 SP_disconnect( collector->stream.spread->recv_mbox );
1534 }
1535 collector->active = FALSE;
1536 }
1537
fbCollectorAllocSpread(void * ctx,fbSpreadParams_t * params,GError ** err)1538 fbCollector_t *fbCollectorAllocSpread(
1539 void *ctx,
1540 fbSpreadParams_t *params,
1541 GError **err)
1542 {
1543 fbCollector_t *collector = g_slice_new0( fbCollector_t );
1544
1545 collector->ctx = ctx;
1546 collector->stream.spread = fbConnSpreadCopy( params );
1547 collector->bufferedStream = FALSE;
1548 collector->active = FALSE;
1549 collector->spread_active = 1;
1550
1551 collector->coread = fbCollectorSpreadRead;
1552 collector->coreadLen = fbCollectorDecodeMsgVL; /* after SCTP */
1553 collector->copostRead = fbCollectorSpreadPostProc; /* after SCTP */
1554 collector->comsgHeader = fbCollectorMessageHeaderNull; /* after SCTP */
1555 collector->coclose = fbCollectorSpreadClose;
1556 collector->cotransClose = fbCollectorCloseTranslatorNull; /* after SCTP */
1557 collector->cotimeOut = fbCollectorSessionTimeoutNull;
1558
1559 collector->translationActive = FALSE;
1560
1561 if (!fbCollectorSpreadOpen( collector, err )) {
1562 fbCollectorFree( collector );
1563 return 0;
1564 }
1565
1566 return collector;
1567 }
1568
fbCollectorTestGroupMembership(fbCollector_t * collector,int group_offset)1569 gboolean fbCollectorTestGroupMembership(
1570 fbCollector_t *collector,
1571 int group_offset)
1572 {
1573 int loop;
1574 fbSpreadSpec_t *spread = NULL;
1575
1576 if (!collector) {
1577 return TRUE;
1578 }
1579
1580 if (!collector->spread_active) {
1581 return TRUE;
1582 }
1583
1584 spread = collector->stream.spread;
1585 for (loop = 0; loop < spread->num_groups; loop++) {
1586 if (strcmp(spread->recv_groups[group_offset].name,
1587 spread->groups[loop].name) == 0)
1588 {
1589 fbSessionSetGroup(spread->session,
1590 (char *)spread->recv_groups[group_offset].name);
1591 return TRUE;
1592 }
1593 }
1594
1595 return FALSE;
1596 }
1597
1598
1599 #endif /* HAVE_SPREAD */
1600
1601 /**
1602 * fbCollectMessage
1603 *
1604 *
1605 *
1606 */
fbCollectMessage(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)1607 gboolean fbCollectMessage(
1608 fbCollector_t *collector,
1609 uint8_t *msgbase,
1610 size_t *msglen,
1611 GError **err)
1612 {
1613 /* Ensure stream is open */
1614 if (!collector->active) {
1615 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1616 "Collector not active");
1617 return FALSE;
1618 }
1619
1620 /* Attempt to read message */
1621 if (collector->coread(collector, msgbase, msglen, err)) return TRUE;
1622
1623 /* Read failure; signal error */
1624 return FALSE;
1625 }
1626
1627 /**
1628 * fbCollectorGetContext
1629 *
1630 *
1631 *
1632 */
fbCollectorGetContext(fbCollector_t * collector)1633 void *fbCollectorGetContext(
1634 fbCollector_t *collector)
1635 {
1636 return collector->ctx;
1637 }
1638
1639
1640 /**
1641 * fbCollectorHasTranslator
1642 *
1643 * use this is check to see if a protocol translation
1644 * is in use for this collector. Needed by the transcode
1645 * and IPFIX machinery to get rid of some error checks
1646 * which no longer apply.
1647 *
1648 * @param collector pointer to the collector state struct
1649 *
1650 * @return TRUE if a translator is in use, FALSE otherwise
1651 */
fbCollectorHasTranslator(fbCollector_t * collector)1652 gboolean fbCollectorHasTranslator(
1653 fbCollector_t *collector)
1654 {
1655 return collector->translationActive;
1656 }
1657
1658 /**
1659 * fbCollectorGetFD
1660 *
1661 *
1662 *
1663 */
fbCollectorGetFD(fbCollector_t * collector)1664 int fbCollectorGetFD(
1665 fbCollector_t *collector)
1666 {
1667 return collector->stream.fd;
1668 }
1669
1670 /**
1671 * fbCollectorSetFD
1672 *
1673 *
1674 *
1675 */
fbCollectorSetFD(fbCollector_t * collector,int fd)1676 void fbCollectorSetFD(
1677 fbCollector_t *collector,
1678 int fd)
1679 {
1680 if (collector) {
1681 collector->stream.fd = fd;
1682 }
1683 }
1684
1685
1686 /**
1687 * fbCollectorClose
1688 *
1689 *
1690 *
1691 */
fbCollectorClose(fbCollector_t * collector)1692 void fbCollectorClose(
1693 fbCollector_t *collector)
1694 {
1695 if (collector->active && collector->coclose) collector->coclose(collector);
1696
1697 if (collector->listener) {
1698 fbListenerRemove(collector->listener, collector->stream.fd);
1699 }
1700
1701 }
1702
1703 /**
1704 * fbCollectorFree
1705 *
1706 *
1707 *
1708 */
fbCollectorFree(fbCollector_t * collector)1709 void fbCollectorFree(
1710 fbCollector_t *collector)
1711 {
1712 if (!collector->multi_session) {
1713 fbListenerAppFree(collector->listener, collector->ctx);
1714 }
1715 collector->cotransClose(collector);
1716 fbCollectorClose(collector);
1717 #if HAVE_SPREAD
1718 if (collector->coclose == fbCollectorSpreadClose) {
1719 fbConnSpreadFree( collector->stream.spread );
1720 }
1721 #endif
1722 while (collector->udp_tail) {
1723 fbCollectorFreeUDPSpec(collector, collector->udp_tail);
1724 }
1725
1726 g_slice_free(fbCollector_t, collector);
1727 }
1728
1729
1730 /**
1731 * fbCollectorClearTranslator
1732 *
1733 * @param collector the collector on which to remove
1734 * the translator
1735 *
1736 * @return TRUE on success, FALSE on failure
1737 */
fbCollectorClearTranslator(fbCollector_t * collector,GError ** err)1738 gboolean fbCollectorClearTranslator(
1739 fbCollector_t *collector,
1740 GError **err __attribute__((unused)) )
1741 {
1742 collector->cotransClose(collector);
1743
1744 return TRUE;
1745 }
1746
1747
1748 /**
1749 * fbCollectorSetTranslator
1750 *
1751 * this sets the collector input to any
1752 * given translator
1753 *
1754 * @param collector the collector to apply the protocol
1755 * convertor to
1756 * @param postProcFunc a function called after the read
1757 * to do any post processing/conversion to turn
1758 * the buffer into an IPFIX buffer
1759 * @param vlMessageFunc function to determine the
1760 * amount needed to complete the next read
1761 * @param headerFunc function to transform the header after
1762 * a block read before it is sent to the
1763 * postProcFunc (called when vlMessageFunc isn't)
1764 * @param trCloseFunc if anything is needed to be cleaned
1765 * up in the translator when a collector is closed
1766 * this function will be called before the collector
1767 * is closed
1768 * @param timeOutFunc when UDP sessions timeout, this function will
1769 * clear any state associated with the session.
1770 * @param opaque a void pointer to hold a translator
1771 * specific state structure
1772 * @param err holds the glib based error message on
1773 * error
1774 *
1775 * @return TRUE on success, FALSE on error
1776 */
fbCollectorSetTranslator(fbCollector_t * collector,fbCollectorPostProc_fn postProcFunc,fbCollectorVLMessageSize_fn vlMessageFunc,fbCollectorMessageHeader_fn headerFunc,fbCollectorTransClose_fn trCloseFunc,fbCollectorSessionTimeout_fn timeOutFunc,void * opaque,GError ** err)1777 gboolean fbCollectorSetTranslator(
1778 fbCollector_t *collector,
1779 fbCollectorPostProc_fn postProcFunc,
1780 fbCollectorVLMessageSize_fn vlMessageFunc,
1781 fbCollectorMessageHeader_fn headerFunc,
1782 fbCollectorTransClose_fn trCloseFunc,
1783 fbCollectorSessionTimeout_fn timeOutFunc,
1784 void *opaque,
1785 GError **err)
1786 {
1787 if (NULL != collector->translatorState)
1788 {
1789 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_TRANSMISC,
1790 "Translator is already set on this collector, "
1791 "must be cleared first");
1792 return FALSE;
1793 }
1794
1795 collector->copostRead = postProcFunc;
1796 collector->coreadLen = vlMessageFunc;
1797 collector->comsgHeader = headerFunc;
1798 collector->cotransClose = trCloseFunc;
1799 collector->cotimeOut = timeOutFunc;
1800 collector->translatorState = opaque;
1801 collector->translationActive = TRUE;
1802
1803 return TRUE;
1804 }
1805
fbCollectorGetPeer(fbCollector_t * collector)1806 struct sockaddr* fbCollectorGetPeer(
1807 fbCollector_t *collector)
1808 {
1809 return (&collector->peer.so);
1810 }
1811
fbCollectorInterruptSocket(fbCollector_t * collector)1812 void fbCollectorInterruptSocket(
1813 fbCollector_t *collector)
1814 {
1815 uint8_t byte = 0xe7;
1816
1817 #if HAVE_SPREAD
1818 if (collector->spread_active) {
1819 fbCollectorSpreadClose(collector);
1820 return;
1821 }
1822 #endif
1823
1824 write(collector->wip, &byte, sizeof(byte));
1825 write(collector->rip, &byte, sizeof(byte));
1826 }
1827
fbCollectorRemoveListenerLastBuf(fBuf_t * fbuf,fbCollector_t * collector)1828 void fbCollectorRemoveListenerLastBuf(
1829 fBuf_t *fbuf,
1830 fbCollector_t *collector)
1831 {
1832 /* may not have a listener - esp for spread */
1833 if (collector->listener) {
1834 fbListenerRemoveLastBuf(fbuf, collector->listener);
1835 }
1836 }
1837
fbCollectorGetObservationDomain(fbCollector_t * collector)1838 uint32_t fbCollectorGetObservationDomain(
1839 fbCollector_t *collector)
1840 {
1841 if (!collector) {
1842 return 0;
1843 }
1844
1845 return collector->obdomain;
1846 }
1847
fbCollectorSetAcceptOnly(fbCollector_t * collector,struct sockaddr * address,size_t address_length)1848 void fbCollectorSetAcceptOnly(
1849 fbCollector_t *collector,
1850 struct sockaddr *address,
1851 size_t address_length)
1852 {
1853 g_assert(address);
1854 collector->accept_only = TRUE;
1855
1856 memcpy(&(collector->peer.so), address,
1857 (address_length > sizeof(collector->peer)) ?
1858 sizeof(collector->peer) : address_length);
1859 }
1860
fbCollectorSetUDPMultiSession(fbCollector_t * collector,gboolean multi_session)1861 void fbCollectorSetUDPMultiSession(
1862 fbCollector_t *collector,
1863 gboolean multi_session)
1864 {
1865 collector->multi_session = multi_session;
1866 }
1867