1 /*@internal
2 *
3 * fblistener.c
4 * IPFIX Collecting Process connection listener implementation
5 *
6 * ------------------------------------------------------------------------
7 * Copyright (C) 2006-2019 Carnegie Mellon University. All Rights Reserved.
8 * ------------------------------------------------------------------------
9 * Authors: Brian Trammell
10 * ------------------------------------------------------------------------
11 * @OPENSOURCE_LICENSE_START@
12 * libfixbuf 2.0
13 *
14 * Copyright 2018-2019 Carnegie Mellon University. All Rights Reserved.
15 *
16 * NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE
17 * ENGINEERING INSTITUTE MATERIAL IS FURNISHED ON AN "AS-IS"
18 * BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY KIND,
19 * EITHER EXPRESSED OR IMPLIED, AS TO ANY MATTER INCLUDING, BUT NOT
20 * LIMITED TO, WARRANTY OF FITNESS FOR PURPOSE OR MERCHANTABILITY,
21 * EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF THE
22 * MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF
23 * ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR
24 * COPYRIGHT INFRINGEMENT.
25 *
26 * Released under a GNU-Lesser GPL 3.0-style license, please see
27 * LICENSE.txt or contact permission@sei.cmu.edu for full terms.
28 *
29 * [DISTRIBUTION STATEMENT A] This material has been approved for
30 * public release and unlimited distribution. Please see Copyright
31 * notice for non-US Government use and distribution.
32 *
33 * Carnegie Mellon(R) and CERT(R) are registered in the U.S. Patent
34 * and Trademark Office by Carnegie Mellon University.
35 *
36 * DM18-0325
37 * @OPENSOURCE_LICENSE_END@
38 * ------------------------------------------------------------------------
39 */
40
41 #define _FIXBUF_SOURCE_
42 #include <fixbuf/private.h>
43 #include <poll.h>
44
45
46
47 /**
48 *
49 * Understanding socket handling and collector construction within the
50 * fixbuf listener:
51 *
52 * Error handling on connections in fixbuf is very different between
53 * TCP and UDP connections. The reasons behind this are partial
54 * tied up within the IPFIX standard and its handling of UDP.
55 * But within fixbuf, this needs to be understood in terms of how
56 * collector's are created with respect to listener's.
57 *
58 * For both TCP and UDP, when a listener is created, a listening
59 * socket is created with the listener. (lsock)
60 *
61 * For UDP, the listener sock (lsock) is the socket used to create
62 * the collector. The collector gets created immediately, and the
63 * collector is the structure that is associated with the fBuf
64 * structure which actually handles PDU's.
65 *
66 * For TCP, the case is different. The listening socket is used
67 * primarily for the listenerWait call. It is used as a socket
68 * passed to select waiting for connection establishment. Then
69 * an accept call is made which creates a new socket handle.
70 * That socket handle is used is to create the collector, and the
71 * lsock handle is left only within the listener.
72 *
73 * When an error occurs, the normal usage of the API would be
74 * to call fBufFree and call listenerWait again. In the case
75 * of TCP this works. The library will wait for a new connection
76 * to the listener lsock and create a new collector from a new
77 * socket from the accept call. For UDP, this will not work, and
78 * the library will simply hang. (Each lsock also has a
79 * corresponding set of pipes to detect interrupts) and the select
80 * call will simply wait on the read pipe handle.
81 *
82 *
83 */
84
85 #define MAX_BUFFER_FREE 100
86
87 /* Maximum number of connections allowed by fixbuf */
88 #define MAX_CONNECTIONS 25
89
90 struct fbListener_st {
91 /** Connection specifier for passive socket. */
92 fbConnSpec_t *spec;
93 /** Base session. Used for internal templates. */
94 fbSession_t *session;
95 /** UDP Base Session. Only set for UDP listeners.
96 * Since UDP sessions are created at connection time,
97 * this holds the first one so we can free it. */
98 fbSession_t *udp_session;
99 /** Last buffer returned by fbListenerWait(). */
100 fBuf_t *lastbuf;
101 /** The set of file descriptors to be monitored */
102 struct pollfd *pfd_array;
103 /** size of array */
104 nfds_t pfd_len;
105 /** Holds last file descriptor used */
106 int lsock;
107 /** mode (-1 for udp) */
108 int mode;
109 /**
110 * Interrupt pipe read end file descriptor.
111 * Used to unblock a call to fbListenerWait().
112 */
113 int rip;
114 /**
115 * Interrupt pipe write end file descriptor.
116 * Used to unblock a call to fbListenerWait().
117 */
118 int wip;
119 /**
120 * used to hold the handle to the collector for
121 * this listener
122 */
123 fbCollector_t *collectorHandle;
124 /**
125 * File descriptor table.
126 * Maps file descriptors to active listener-managed buffer instances.
127 */
128 GHashTable *fdtab;
129 /**
130 * Application initialization function. Allows the application
131 * to bind internal context to a collector, and to reject connections
132 * after accept() but before session setup.
133 */
134 fbListenerAppInit_fn appinit;
135 /** Application free function. Frees storage allocated by appinit. */
136 fbListenerAppFree_fn appfree;
137 };
138
139 typedef struct fbListenerWaitFDSet_st {
140 fd_set fds;
141 int maxfd;
142 fBuf_t *fbuf;
143 } fbListenerWaitFDSet_t;
144
145 /**
146 * Structure that holds the listeners that are added to the group.
147 */
148 struct fbListenerGroup_st
149 {
150 /** pointer to the head of the listener group result list */
151 fbListenerEntry_t *head;
152 /** pointer to the last fbListener */
153 fbListenerEntry_t *lastlist;
154 /** pointer to a generic structure for future use */
155 struct pollfd *group_pfd;
156 /** length of usable fds */
157 nfds_t pfd_len;
158 };
159
160
161 /**
162 * fbListenerTeardownSocket
163 *
164 *
165 *
166 *
167 */
fbListenerTeardownSocket(fbListener_t * listener)168 static void fbListenerTeardownSocket(
169 fbListener_t *listener)
170 {
171 unsigned int i;
172
173 if (listener->pfd_len) {
174 for (i = 0; i < listener->pfd_len; i++) {
175 if (listener->pfd_array[i].fd >= 0) {
176 close(listener->pfd_array[i].fd);
177 listener->pfd_array[i].fd = -1;
178 }
179 }
180 g_slice_free1((MAX_CONNECTIONS * sizeof(struct pollfd)),
181 listener->pfd_array);
182 listener->pfd_len = 0;
183 }
184
185 }
186
187 /**
188 *fbListenerInitSocket
189 *
190 *
191 *
192 *
193 */
fbListenerInitSocket(fbListener_t * listener,GError ** err)194 static gboolean fbListenerInitSocket(
195 fbListener_t *listener,
196 GError **err)
197 {
198 int pfd[2];
199 int i = 0;
200 int count = 0;
201 struct pollfd *cpfd = NULL;
202 struct addrinfo *ai = NULL;
203 struct addrinfo *current = NULL;
204
205 /* Create interrupt pipe */
206 if (pipe(pfd)) {
207 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
208 "fbListener error creating interrupt pipe: %s",
209 strerror(errno));
210 return FALSE;
211 }
212
213
214 /* Look up the passive socket address */
215 if (!fbConnSpecLookupAI(listener->spec, TRUE, err)) {
216 fbListenerTeardownSocket(listener);
217 return FALSE;
218 }
219
220 ai = (struct addrinfo *)listener->spec->vai;
221
222 current = ai;
223
224 /* figure out how many addresses there are */
225 while (current) {
226 i++;
227 current = current->ai_next;
228 }
229
230 listener->pfd_array = (struct pollfd*)g_slice_alloc0(MAX_CONNECTIONS *
231 sizeof(struct pollfd));
232
233 if (listener->pfd_array == NULL) {
234 return FALSE;
235 }
236
237 listener->pfd_len = i+2;
238
239 /* read interrupt pipe */
240 listener->pfd_array[0].fd = pfd[0];
241 listener->pfd_array[0].events = POLLIN;
242 /* write interrupt pipe */
243 listener->pfd_array[1].fd = pfd[1];
244
245 i = 2;
246 /* Create the passive socket */
247 do {
248
249 cpfd = &listener->pfd_array[i];
250 /*
251 * Kludge for SCTP. addrinfo doesn't accept SCTP hints.
252 */
253 #if FB_ENABLE_SCTP
254 if ((listener->spec->transport == FB_SCTP) ||
255 (listener->spec->transport == FB_DTLS_SCTP)) {
256 ai->ai_socktype = SOCK_STREAM;
257 ai->ai_protocol = IPPROTO_SCTP;
258 }
259 #endif
260 /* Create socket and bind it to the passive address */
261 cpfd->fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
262 if (cpfd->fd < 0) {
263 i++; continue;
264 }
265 if (bind(cpfd->fd, ai->ai_addr, ai->ai_addrlen) == -1) {
266 close(cpfd->fd); cpfd->fd = -1; i++; continue;
267 }
268 cpfd->events = POLLIN;
269
270 /* Listen only on socket and sequenced packet sockets */
271 if ((ai->ai_socktype == SOCK_STREAM)
272 #ifdef SOCK_SEQPACKET
273 || (ai->ai_socktype == SOCK_SEQPACKET)
274 #endif
275 ) {
276 if (listen(cpfd->fd, 1) < 0) {
277 close(cpfd->fd); cpfd->fd = -1; i++; continue;
278 }
279 }
280 i++;
281 /* Socket successfully bound for listening */
282 count++;
283 } while ((ai = ai->ai_next));
284
285 /* check for no listenable socket */
286 if (!count) {
287 fbListenerTeardownSocket(listener);
288 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
289 "couldn't create socket listening to %s:%s: %s",
290 listener->spec->host ? listener->spec->host : "*",
291 listener->spec->svc, strerror(errno));
292 return FALSE;
293 }
294
295
296 /* All done. */
297 return TRUE;
298 }
299
300 /**
301 *fbListenerInitUDPSocket
302 *
303 *
304 *
305 *
306 */
fbListenerInitUDPSocket(fbListener_t * listener,GError ** err)307 static gboolean fbListenerInitUDPSocket(
308 fbListener_t *listener,
309 GError **err)
310 {
311 void *ctx = NULL;
312 fbCollector_t *collector = NULL;
313 fBuf_t *fbuf = NULL;
314 unsigned int i;
315
316 /* Simulate accept on UDP socket */
317
318 /* Ask application for context */
319 if (listener->appinit) {
320 if (!listener->appinit(listener, &ctx, listener->lsock, NULL, 0, err)){
321 return FALSE;
322 }
323 }
324
325 /* Create collector on UDP socket */
326 switch (listener->spec->transport) {
327 case FB_UDP:
328 collector = fbCollectorAllocSocket(listener, ctx,
329 listener->lsock, NULL, 0, err);
330 break;
331 #if HAVE_OPENSSL_DTLS
332 case FB_DTLS_UDP:
333 collector = fbCollectorAllocTLS(listener, ctx,
334 listener->lsock, NULL, 0, err);
335 break;
336 #endif
337 default:
338 g_assert_not_reached();
339 }
340
341 /* Check for collector alloc error */
342 if (!collector) return FALSE;
343
344 /* Create a buffer with a cloned session around the collector */
345 fbuf = fBufAllocForCollection(fbSessionClone(listener->session),collector);
346
347 /* add this fbuf for all file descriptors */
348 for (i = 2; i < listener->pfd_len; i++) {
349 g_hash_table_insert(listener->fdtab,
350 GINT_TO_POINTER(listener->pfd_array[i].fd), fbuf);
351 }
352
353
354 /* Add collector to the file descriptor table */
355 /*g_hash_table_insert(listener->fdtab,
356 GINT_TO_POINTER(listener->lsock), fbuf);*/
357
358 /* No more passive socket */
359 /*listener->lsock = -1;*/
360 /* set mode to UDP */
361 listener->mode = -1;
362
363 /* store this session so we can free it later */
364 listener->udp_session = fBufGetSession(fbuf);
365
366 /* store the handle to the collector */
367 listener->collectorHandle = collector;
368
369 /* All done. */
370 return TRUE;
371 }
372
373 /**
374 *fbListenerAlloc
375 *
376 *
377 *
378 *
379 */
fbListenerAlloc(fbConnSpec_t * spec,fbSession_t * session,fbListenerAppInit_fn appinit,fbListenerAppFree_fn appfree,GError ** err)380 fbListener_t *fbListenerAlloc(
381 fbConnSpec_t *spec,
382 fbSession_t *session,
383 fbListenerAppInit_fn appinit,
384 fbListenerAppFree_fn appfree,
385 GError **err)
386 {
387 fbListener_t *listener = NULL;
388 gboolean ownSocket;
389
390 g_assert(session);
391 if (spec) {
392 ownSocket = FALSE;
393 } else {
394 ownSocket = TRUE;
395 }
396
397 /* Allocate a new listener */
398 listener = g_slice_new0(fbListener_t);
399
400 /* -1 for file descriptors means no fd */
401 listener->lsock = -1;
402 listener->rip = -1;
403 listener->wip = -1;
404
405 if (ownSocket) { /* user handling own socket creation and connections */
406 listener->spec = NULL;
407 } else {
408 listener->spec = fbConnSpecCopy(spec);
409 }
410 /* Fill in what we can */
411 listener->session = session;
412 listener->appinit = appinit;
413 listener->appfree = appfree;
414
415 /* allocate file descriptor table */
416 listener->fdtab = g_hash_table_new(g_direct_hash, g_direct_equal);
417
418 if (!ownSocket) {
419 /* Do transport-specific initialization */
420 switch (spec->transport) {
421 #if FB_ENABLE_SCTP
422 case FB_SCTP:
423 #if HAVE_OPENSSL_DTLS_SCTP
424 case FB_DTLS_SCTP:
425 #endif
426 #endif
427 case FB_TCP:
428 #if HAVE_OPENSSL
429 case FB_TLS_TCP:
430 #endif
431 if (!fbListenerInitSocket(listener, err)) {
432 goto err;
433 }
434 break;
435 case FB_UDP:
436 #if HAVE_OPENSSL_DTLS
437 case FB_DTLS_UDP:
438 #endif
439 /* FIXME this may leak on socket setup error for UDP. */
440 if (fbListenerInitSocket(listener, err)) {
441 if (!fbListenerInitUDPSocket(listener, err)) {
442 fbListenerTeardownSocket(listener);
443 goto err;
444 }
445 } else {
446 goto err;
447 }
448 break;
449 default:
450 #ifndef FB_ENABLE_SCTP
451 if (spec->transport == FB_SCTP || spec->transport == FB_DTLS_SCTP){
452 g_error("Libfixbuf not enabled for SCTP Transport. "
453 " Run configure with --with-sctp");
454 }
455 #endif
456 if (spec->transport == FB_TLS_TCP ||
457 spec->transport == FB_DTLS_SCTP ||
458 spec->transport == FB_DTLS_UDP)
459 {
460 g_error("Libfixbuf not enabled for this mode of transport. "
461 " Run configure with --with-openssl");
462 }
463 }
464 }
465
466 /* Return the initialized listener */
467 return listener;
468
469 err:
470 if (listener) {
471 if (listener->fdtab) {
472 g_hash_table_destroy(listener->fdtab);
473 }
474
475 g_slice_free(fbListener_t, listener);
476 }
477
478 /* No listener */
479 return NULL;
480 }
481
482
483 /**
484 * fbListenerFreeBuffer
485 *
486 *
487 *
488 *
489 */
fbListenerFreeBuffer(void * vfd,fBuf_t * fbuf,fBuf_t ** lfbuf)490 static void fbListenerFreeBuffer(
491 void *vfd __attribute__((unused)),
492 fBuf_t *fbuf,
493 /* void *vignore __attribute__((unused)) )*/
494 fBuf_t **lfbuf)
495 {
496 /* free the buffer; this will close the socket. */
497 /* fBufFree(fbuf);*/
498 /* we can't change the hash table while we are looping through it */
499 *lfbuf = fbuf;
500 lfbuf++;
501 }
502
503 /**
504 * fbListenerAppFree
505 *
506 *
507 */
508
fbListenerAppFree(fbListener_t * listener,void * ctx)509 void fbListenerAppFree(
510 fbListener_t *listener,
511 void *ctx)
512 {
513 if (listener) {
514 if (listener->appfree) {
515 (listener->appfree)(ctx);
516 }
517 }
518 }
519
520
521 /**
522 *fbListenerFree
523 *
524 *
525 *
526 *
527 */
fbListenerFree(fbListener_t * listener)528 void fbListenerFree(
529 fbListener_t *listener)
530 {
531 fBuf_t *tfbuf[MAX_BUFFER_FREE+1];
532 fBuf_t *lfbuf = NULL;
533 fbSession_t *session = NULL;
534 unsigned int loop = 0;
535
536 if (NULL == listener) {
537 return;
538 }
539
540 while (loop < MAX_BUFFER_FREE) {
541 tfbuf[loop] = NULL;
542 loop++;
543 }
544
545 /* shut down passive socket */
546 fbListenerTeardownSocket(listener);
547
548 /* free any open buffers we may have */
549 g_hash_table_foreach(listener->fdtab,
550 (GHFunc)fbListenerFreeBuffer, tfbuf);
551
552 loop = 0;
553 lfbuf = tfbuf[0];
554 /* free first session */
555 if (listener->udp_session) {
556 /* we need to get the session set on the fBuf - it should be the
557 same as udp_session in the case that we haven't received anything*/
558 session = fBufGetSession(lfbuf);
559 if (listener->udp_session != session) {
560 fbSessionFree(listener->udp_session);
561 }
562 }
563
564 if (listener->mode == -1) {
565 /* for UDP there can be multiple FDs with the same fBuf */
566 /* And there should only be 1 fBuf for this listener */
567 fBufFree(lfbuf);
568 } else {
569 while (lfbuf && loop < MAX_BUFFER_FREE) {
570 fBufFree(lfbuf);
571 loop++;
572 lfbuf = tfbuf[loop];
573 }
574 }
575 /* free the listener table */
576 g_hash_table_destroy(listener->fdtab);
577
578 /* free the connection specifier */
579 fbConnSpecFree(listener->spec);
580
581 /* free the listener itself */
582 g_slice_free(fbListener_t, listener);
583 }
584
585 /**
586 *fbListenerWaitAddFD
587 *
588 *
589 *
590 *
591 */
592 /*static void fbListenerWaitAddFD(
593 void *vfd,
594 void *vignore __attribute__((unused)),
595 fbListenerWaitFDSet_t *lfdset)
596 {
597 int fd = GPOINTER_TO_INT(vfd);
598
599 FD_SET(fd,&(lfdset->fds));
600 if (fd > lfdset->maxfd) lfdset->maxfd = fd;
601 }
602 */
603
604 /**
605 * fbListenerWaitSearch
606 *
607 *
608 *
609 *
610 */
611 /*static void fbListenerWaitSearch(
612 void *vfd,
613 void *fbuf,
614 fbListenerWaitFDSet_t *lfdset)
615 {
616 int fd = GPOINTER_TO_INT(vfd);
617
618 if (FD_ISSET(fd,&(lfdset->fds))) {
619 lfdset->fbuf = fbuf;
620 }
621 }*/
622
623 /**
624 * fbListenerAddPollFD
625 *
626 *
627 */
fbListenerAddPollFD(struct pollfd * fd_array,nfds_t * array_len,int fd)628 static void fbListenerAddPollFD(
629 struct pollfd *fd_array,
630 nfds_t *array_len,
631 int fd)
632 {
633 nfds_t i;
634 nfds_t num_fds = *array_len;
635 gboolean added = FALSE;
636
637 /* use an old entry for this new entry */
638 for (i = 0; i < num_fds; i++) {
639 if (fd_array[i].fd < 0) {
640 fd_array[i].fd = fd;
641 fd_array[i].events = POLLIN;
642 added = TRUE;
643 break;
644 }
645 }
646
647 /* no free entries in the array, add a new one */
648 if (!added) {
649 fd_array[num_fds].fd = fd;
650 fd_array[num_fds].events = POLLIN;
651 num_fds++;
652 *array_len = num_fds;
653 }
654 }
655
656 /**
657 * fbListenerWaitAccept
658 *
659 *
660 *
661 *
662 */
fbListenerWaitAccept(fbListener_t * listener,GError ** err)663 static fBuf_t *fbListenerWaitAccept(
664 fbListener_t *listener,
665 GError **err)
666 {
667 int asock;
668 union {
669 struct sockaddr so;
670 struct sockaddr_in ip4;
671 struct sockaddr_in6 ip6;
672 } peer;
673 socklen_t peerlen;
674 void *ctx = NULL;
675 fbCollector_t *collector = NULL;
676 fBuf_t *fbuf = NULL;
677
678 /* Accept the connection */
679 peerlen = sizeof(peer);
680 asock = accept(listener->lsock, &(peer.so), &peerlen);
681 if (asock < 0) {
682 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
683 "listener accept error: %s",
684 strerror(errno));
685 return NULL;
686 }
687
688 /* Okay, we have a socket. Ask the application for context. */
689 if (listener->appinit) {
690 if (!listener->appinit(listener, &ctx, asock,
691 &(peer.so), peerlen, err)) {
692 close(asock);
693 return NULL;
694 }
695 }
696
697 /* Create a collector as appropriate */
698 switch (listener->spec->transport) {
699 #if FB_ENABLE_SCTP
700 case FB_SCTP:
701 #endif
702 case FB_TCP:
703 collector = fbCollectorAllocSocket(listener, ctx, asock,
704 &(peer.so), peerlen, err);
705 break;
706 #if HAVE_OPENSSL
707 #if HAVE_OPENSSL_DTLS_SCTP
708 case FB_DTLS_SCTP:
709 #endif
710 case FB_TLS_TCP:
711 collector = fbCollectorAllocTLS(listener, ctx, asock,
712 &(peer.so), peerlen, err);
713 break;
714 #endif
715 default:
716 g_assert_not_reached();
717 }
718
719 /* Check for collector creation error */
720 if (!collector) return NULL;
721
722 /* Create a buffer with a cloned session around the collector */
723 fbuf = fBufAllocForCollection(fbSessionClone(listener->session),collector);
724
725 /* Make the buffer automatic */
726 fBufSetAutomaticMode(fbuf, TRUE);
727
728 /* Add buffer to the file descriptor table */
729 g_hash_table_insert(listener->fdtab, GINT_TO_POINTER(asock), fbuf);
730
731 /* don't add to array if fbListenerWaitNoCollectors was called */
732 if (listener->mode < 1) {
733 /* add to poll array */
734 if (listener->pfd_len < MAX_CONNECTIONS) {
735 fbListenerAddPollFD(listener->pfd_array, &listener->pfd_len,asock);
736 } else {
737 g_warning("Max connections %d reached.", MAX_CONNECTIONS);
738 }
739 }
740
741 listener->lsock = asock;
742
743 /* store the collector handle */
744 listener->collectorHandle = collector;
745
746 /* All done. */
747 return fbuf;
748 }
749
750 /**
751 * fbListenerRemove
752 *
753 *
754 *
755 *
756 */
fbListenerRemove(fbListener_t * listener,int fd)757 void fbListenerRemove(
758 fbListener_t *listener,
759 int fd)
760 {
761 unsigned int i;
762
763 /* remove from hash table */
764 g_hash_table_remove(listener->fdtab, GINT_TO_POINTER(fd));
765
766 /* remove from poll array */
767 for (i = 0; i < listener->pfd_len; i++) {
768 if (listener->pfd_array[i].fd == fd) {
769 if (listener->lsock == fd) {
770 /* unset lsock */
771 listener->lsock = 0;
772 }
773 close(listener->pfd_array[i].fd);
774 listener->pfd_array[i].fd = -1;
775 break;
776 }
777 }
778 }
779
780 /**
781 * fbListenerWait
782 *
783 *
784 *
785 *
786 */
fbListenerWait(fbListener_t * listener,GError ** err)787 fBuf_t *fbListenerWait(
788 fbListener_t *listener,
789 GError **err)
790 {
791 fBuf_t *fbuf = NULL;
792 uint8_t byte;
793 int got_sock = -1;
794 int rc;
795 unsigned int i;
796
797 /* wait for data available on one of our file descriptors */
798 rc = poll(listener->pfd_array, listener->pfd_len, -1);
799
800 if (rc < 0) {
801 if (errno == EINTR) {
802 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
803 "Interrupted listener wait");
804 return NULL;
805 } else {
806 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
807 "listener wait error: %s",
808 strerror(errno));
809 return NULL;
810 }
811 }
812
813 /* Loop through file descriptors */
814 for (i = 0; i < listener->pfd_len; i++) {
815 struct pollfd *pfd = &listener->pfd_array[i];
816
817 if (pfd->revents & (POLLERR | POLLHUP | POLLNVAL)) {
818 /* hang up or error */
819 got_sock = pfd->fd;
820 break;
821 }
822
823 if (!(pfd->revents & POLLIN)) {
824 continue;
825 }
826
827 if (i == 0) {
828 /* read or write interrupt */
829 /* consume and ignore return */
830 read(pfd->fd, &byte, sizeof(byte));
831 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
832 "External interrupt on pipe");
833 return NULL;
834 }
835
836 got_sock = pfd->fd;
837 break;
838 }
839
840 /* quick solution - check if the fd is the same as last time */
841 if ((listener->lsock == got_sock) && listener->lastbuf) {
842 return listener->lastbuf;
843 }
844
845 listener->lsock = got_sock;
846
847 /* different than last time -> check to see if it's been seen before */
848 if ((fbuf =g_hash_table_lookup(listener->fdtab,GINT_TO_POINTER(got_sock))))
849 {
850 listener->lastbuf = fbuf;
851 if (listener->mode < 0) {
852 /* if UDP set FD on collector for reading */
853 fbCollectorSetFD(fBufGetCollector(fbuf), got_sock);
854 }
855 return fbuf;
856 } else {
857 if (listener->mode >= 0) {
858 /* new connection */
859 fbuf = fbListenerWaitAccept(listener, err);
860 if (!fbuf) return NULL;
861 listener->lastbuf = fbuf;
862 return fbuf;
863 } else {
864 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
865 "listener wait error: invalid FD");
866 /* this is strange bc UDP fbufs are set in fblisteneralloc */
867 return NULL;
868 }
869 }
870 }
871
fbListenerWaitNoCollectors(fbListener_t * listener,GError ** err)872 fBuf_t *fbListenerWaitNoCollectors(
873 fbListener_t *listener,
874 GError **err)
875 {
876 fBuf_t *fbuf = NULL;
877 uint8_t byte;
878 int rc;
879 unsigned int i;
880
881
882 /* set the mode to 1 so fbListenerWaitAccept doesn't add fd */
883 listener->mode = 1;
884 /* wait for data available on one of our file descriptors */
885 rc = poll(listener->pfd_array, listener->pfd_len, -1);
886
887 if (rc < 0) {
888 if (errno == EINTR) {
889 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
890 "Interrupted listener wait");
891 return NULL;
892 } else {
893 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
894 "listener wait error: %s",
895 strerror(errno));
896 return NULL;
897 }
898 }
899
900 /* Loop file descriptors */
901 for (i = 0; i < listener->pfd_len; i++) {
902 struct pollfd *pfd = &listener->pfd_array[i];
903
904 if (pfd->revents & (POLLERR | POLLHUP | POLLNVAL)) {
905 listener->lsock = pfd->fd;
906 break;
907 }
908
909 if (!(pfd->revents & POLLIN)) {
910 continue;
911 }
912
913 if (i == 0) {
914 /* read or write interrupt */
915 /* consume and ignore return */
916 read(pfd->fd, &byte, sizeof(byte));
917 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
918 "External interrupt on pipe");
919 return NULL;
920 }
921
922 listener->lsock = pfd->fd;
923 break;
924 }
925
926 /* handle any pending accept, return the accepted buffer immediately. */
927 if (listener->mode >= 0) {
928 return fbListenerWaitAccept(listener, err);
929 } else {
930 /* For UDP this doesn't really work, just return the data */
931 if ((fbuf = g_hash_table_lookup(listener->fdtab,
932 GINT_TO_POINTER(listener->lsock))))
933 {
934 fbCollectorSetFD(fBufGetCollector(fbuf), listener->lsock);
935 return fbuf;
936 }
937 }
938
939 /* this should never happen */
940 return NULL;
941 }
942
943
fbListenerInterruptCollectors(void * vfd,void * fbuf,fbListenerWaitFDSet_t * lfdset)944 static void fbListenerInterruptCollectors(
945 void *vfd __attribute__((unused)),
946 void *fbuf,
947 fbListenerWaitFDSet_t *lfdset __attribute__((unused)))
948 {
949 fBufInterruptSocket(fbuf);
950 }
951
952
953 /**
954 * fbListenerInterrupt
955 *
956 *
957 *
958 *
959 */
fbListenerInterrupt(fbListener_t * listener)960 void fbListenerInterrupt(
961 fbListener_t *listener)
962 {
963 uint8_t byte = 0xe7;
964
965 /* send interrrupts to the collectors, then to the listener */
966 g_hash_table_foreach(listener->fdtab,
967 (GHFunc)fbListenerInterruptCollectors,
968 NULL);
969
970 /* write and ignore return */
971 /*write(listener->wip, &byte, sizeof(byte));
972 write(listener->rip, &byte, sizeof(byte));*/
973 write(listener->pfd_array[0].fd, &byte, sizeof(byte));
974 write(listener->pfd_array[1].fd, &byte, sizeof(byte));
975
976
977 }
978
979 /**
980 * fbListenerGetConnSpec
981 *
982 *
983 *
984 *
985 */
fbListenerGetConnSpec(fbListener_t * listener)986 fbConnSpec_t *fbListenerGetConnSpec(
987 fbListener_t *listener)
988 {
989 return listener->spec;
990 }
991
992
993 /**
994 *fbListenerGetCollector
995 *
996 * gets the collector allocated to the listener
997 *
998 */
fbListenerGetCollector(fbListener_t * listener,fbCollector_t ** collector,GError ** err)999 gboolean fbListenerGetCollector(
1000 fbListener_t *listener,
1001 fbCollector_t **collector,
1002 GError **err)
1003 {
1004 g_assert(collector);
1005 if (NULL == listener->collectorHandle) {
1006 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1007 "no collector available to be retrieved");
1008 return FALSE;
1009 }
1010
1011 *collector = listener->collectorHandle;
1012
1013 return TRUE;
1014 }
1015
1016 /* returns NULL or pointer to allocated group structure */
1017
fbListenerGroupAlloc(void)1018 fbListenerGroup_t* fbListenerGroupAlloc(
1019 void)
1020 {
1021 fbListenerGroup_t *group = NULL;
1022
1023 group = g_slice_new0( fbListenerGroup_t );
1024 group->group_pfd = ((struct pollfd *)g_slice_alloc0(
1025 MAX_CONNECTIONS* 5 *sizeof(struct pollfd)));
1026
1027 group->head = NULL;
1028
1029 return group;
1030 }
1031
1032
1033 /**
1034 * fbListenerGroupFree
1035 *
1036 * frees a listener group
1037 *
1038 */
fbListenerGroupFree(fbListenerGroup_t * group)1039 void fbListenerGroupFree(
1040 fbListenerGroup_t *group)
1041 {
1042 if (group && group->group_pfd) {
1043 g_slice_free1((MAX_CONNECTIONS * 5 * sizeof(struct pollfd)),
1044 group->group_pfd);
1045 }
1046
1047 g_slice_free(fbListenerGroup_t, group);
1048 }
1049
1050 /**
1051 * fbListenerGroupAddListener
1052 *
1053 * @return 0 upon success. "1" if entry couldn't get created
1054 * maybe "2" if either of the incoming pointers is NULL
1055 */
fbListenerGroupAddListener(fbListenerGroup_t * group,const fbListener_t * listener)1056 int fbListenerGroupAddListener(
1057 fbListenerGroup_t *group,
1058 const fbListener_t *listener)
1059 {
1060 fbListenerEntry_t *entry = NULL;
1061 unsigned int i;
1062
1063 if (!group || !listener) {
1064 return 2;
1065 }
1066
1067 entry = g_slice_new0( fbListenerEntry_t );
1068
1069 if (!entry) {
1070 /* needs to be something like ERR_NO_MEM */
1071 return 1;
1072 }
1073
1074 entry->prev = NULL;
1075 entry->next = group->head;
1076 entry->listener = (fbListener_t*)listener;
1077
1078 if (group->head) {
1079 group->head->prev = entry;
1080 }
1081
1082 group->head = entry;
1083
1084 /* add FDs */
1085 for (i = 0; i < entry->listener->pfd_len; i++) {
1086 group->group_pfd[group->pfd_len].fd = entry->listener->pfd_array[i].fd;
1087 group->group_pfd[group->pfd_len].events = POLLIN;
1088 group->pfd_len++;
1089 }
1090
1091 group->lastlist = entry;
1092
1093 return 0;
1094 }
1095
1096 /**
1097 * fbListenerGroupDeleteListener
1098 *
1099 * @return 0 on success. "1" if not found. "2" if a pointer is NULL
1100 */
fbListenerGroupDeleteListener(fbListenerGroup_t * group,const fbListener_t * listener)1101 int fbListenerGroupDeleteListener(
1102 fbListenerGroup_t *group,
1103 const fbListener_t *listener)
1104 {
1105 fbListenerEntry_t *entry = NULL;
1106 unsigned int i, k;
1107
1108 if (!group || !listener) {
1109 return 2;
1110 }
1111
1112 for (entry = group->head; entry; entry = entry->next) {
1113 if (entry->listener == listener) {
1114 if (entry->prev) {
1115 entry->prev->next = entry->next;
1116 }
1117
1118 if (entry->next) {
1119 entry->next->prev = entry->prev;
1120 }
1121
1122 /* remove FDs (close will happen later) */
1123 for (i = 0; i < entry->listener->pfd_len; i++) {
1124 for (k = 0; k < group->pfd_len; k++) {
1125 if (entry->listener->pfd_array[i].fd ==
1126 group->group_pfd[k].fd)
1127 {
1128 group->group_pfd[k].fd = -1;
1129 break;
1130 }
1131 }
1132 }
1133
1134 if (entry == group->lastlist) {
1135 group->lastlist = group->head;
1136 }
1137
1138 g_slice_free(fbListenerEntry_t, entry);
1139
1140 return 0;
1141 }
1142 }
1143
1144 return 1;
1145 }
1146
fbListenerNewResult(fbListenerGroupResult_t ** resultList,fbListener_t * listener)1147 static fbListenerGroupResult_t *fbListenerNewResult(
1148 fbListenerGroupResult_t **resultList,
1149 fbListener_t *listener)
1150 {
1151 fbListenerGroupResult_t *result = NULL;
1152
1153 /* allocate new one */
1154 result = g_slice_new0( fbListenerGroupResult_t );
1155 /* set buffer to last one */
1156 result->fbuf = listener->lastbuf;
1157 /* set listener */
1158 result->listener = listener;
1159 /* put it on the list */
1160 result->next = *resultList;
1161
1162 *resultList = result;
1163
1164 return result;
1165 }
1166
1167
fbListenerFreeGroupResult(fbListenerGroupResult_t * result)1168 void fbListenerFreeGroupResult(
1169 fbListenerGroupResult_t *result)
1170 {
1171 fbListenerGroupResult_t *cr, *nr;
1172
1173 for (cr = result; cr; cr = nr) {
1174 nr = cr->next;
1175 g_slice_free(fbListenerGroupResult_t, cr);
1176 }
1177 }
1178
1179
1180
1181
fbListenerGroupWait(fbListenerGroup_t * group,GError ** err)1182 fbListenerGroupResult_t* fbListenerGroupWait(
1183 fbListenerGroup_t *group,
1184 GError **err)
1185 {
1186 gboolean found;
1187 uint8_t byte;
1188 unsigned int i, k;
1189 int rc;
1190 int new_fd = -1;
1191 fBuf_t *fbuf = NULL;
1192 fbListenerEntry_t *entry = NULL;
1193 fbListenerGroupResult_t *resultHead = NULL;
1194 fbListenerGroupResult_t *result = NULL;
1195
1196 g_assert(group);
1197
1198 /* wait for data available on one of our file descriptors */
1199
1200 while (!resultHead) {
1201 rc = poll(group->group_pfd, group->pfd_len, -1);
1202
1203 if (rc < 0) {
1204 if (errno == EINTR) {
1205 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
1206 "Interrupted listener wait");
1207 return NULL;
1208 } else {
1209 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1210 "listener wait error: %s",
1211 strerror(errno));
1212 return NULL;
1213 }
1214 }
1215
1216 /* Loop file descriptors */
1217 for (i = 0; i < group->pfd_len; i++) {
1218
1219 struct pollfd *pfd = &group->group_pfd[i];
1220
1221 found = FALSE;
1222
1223 if (pfd->revents & (POLLERR | POLLHUP | POLLNVAL)) {
1224 /* hang up or error */
1225 new_fd = pfd->fd;
1226 } else if (!(pfd->revents & POLLIN)) {
1227 continue;
1228 }
1229
1230 new_fd = pfd->fd;
1231
1232 /* check to see if this belongs to the last listener */
1233 if (new_fd == group->lastlist->listener->lsock) {
1234 result = fbListenerNewResult(&resultHead,
1235 group->lastlist->listener);
1236 continue;
1237 }
1238
1239 /* find out which listener this belongs to */
1240 for (entry = group->head; entry; entry = entry->next) {
1241 /* handle interrupt pipe read end */
1242
1243 for (k = 0; k < entry->listener->pfd_len; k++) {
1244 struct pollfd *cpfd = &entry->listener->pfd_array[k];
1245
1246 if (new_fd == cpfd->fd) {
1247
1248 if (k == 0) {
1249 /* read or write interrupt */
1250 /* consume and ignore return */
1251 read(cpfd->fd, &byte, sizeof(byte));
1252 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
1253 "External interrupt on pipe");
1254 return NULL;
1255 }
1256
1257 if ((entry->listener->lsock == new_fd) &&
1258 entry->listener->lastbuf)
1259 {
1260 result = fbListenerNewResult(&resultHead,
1261 entry->listener);
1262 found = TRUE;
1263 group->lastlist = entry;
1264 break;
1265 }
1266
1267 entry->listener->lsock = new_fd;
1268 /* Look it up */
1269 if ((fbuf = g_hash_table_lookup(entry->listener->fdtab,
1270 GINT_TO_POINTER(new_fd))))
1271 {
1272 if (entry->listener->mode < 0) {
1273 fbCollectorSetFD(fBufGetCollector(fbuf),
1274 new_fd);
1275 }
1276 entry->listener->lastbuf = fbuf;
1277 result = fbListenerNewResult(&resultHead,
1278 entry->listener);
1279 group->lastlist = entry;
1280 found = TRUE;
1281 break;
1282 } else {
1283 if (entry->listener->mode >= 0) {
1284 /* TCP - call accept */
1285 fbuf = fbListenerWaitAccept(entry->listener,
1286 err);
1287 entry->listener->lastbuf = fbuf;
1288 result = fbListenerNewResult(&resultHead,
1289 entry->listener);
1290 if (group->pfd_len < (MAX_CONNECTIONS * 5)) {
1291 fbListenerAddPollFD(group->group_pfd,
1292 &group->pfd_len,
1293 entry->listener->lsock);
1294 } else {
1295 g_warning("Maximum connections reached "
1296 "for Listener Group (%d)",
1297 (int)group->pfd_len);
1298 }
1299 found = TRUE;
1300 group->lastlist = entry;
1301 break;
1302 } else {
1303 /* shouldn't happen */
1304 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1305 "listener wait error: invalid FD");
1306 return NULL;
1307 }
1308 }
1309
1310 } /* new_fd == cpfd->fd */
1311 } /* listener->pfd_array for loop */
1312
1313 if (found) {
1314 break;
1315 }
1316
1317 } /* listenergroup for loop */
1318
1319 if (!found) {
1320 /* close the old one, it doesn't belong to any listeners
1321 it most likely was closed on the listener */
1322 close(pfd->fd);
1323 pfd->fd = -1;
1324 }
1325
1326 } /* loop through fd's */
1327
1328 } /* !resultHead */
1329 return resultHead;
1330
1331 }
1332
1333 /* Given a socket descriptor with an existing connection, return an fbuf
1334 * fBufNext can be called on it
1335 * Interrupting the accepting of new connections on this socket is the
1336 * responsibility of the caller, it cannot be done with
1337 * fbListenerInterrupt(). However, the collectors attached to this listener
1338 * can be interrupted by this call, which short circuits fBufNext().
1339 * Call fbListenerInterrupt to stop the collectors, then stop the listener
1340 * socket on your own.
1341 */
fbListenerOwnSocketCollectorTCP(fbListener_t * listener,int sock,GError ** err)1342 fBuf_t *fbListenerOwnSocketCollectorTCP(
1343 fbListener_t *listener,
1344 int sock,
1345 GError **err)
1346 {
1347 fbCollector_t *collector = NULL;
1348 fBuf_t *fbuf = NULL;
1349 fbConnSpec_t connSpec;
1350 g_assert(listener);
1351
1352 if (sock <= 2) {
1353 /* invalid socket */
1354 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1355 "Invalid socket descriptor");
1356 return NULL;
1357 }
1358
1359 connSpec.transport = FB_TCP;
1360 listener->spec = &connSpec;
1361
1362 collector = fbCollectorAllocSocket(listener, NULL, sock, NULL, 0, err);
1363 if (!collector) {
1364 return NULL;
1365 }
1366
1367 fbuf = fBufAllocForCollection(fbSessionClone(listener->session),collector);
1368
1369 fBufSetAutomaticMode(fbuf, FALSE);
1370
1371 /* Add buffer to the file descriptor table */
1372 /* g_hash_table_insert(listener->fdtab, GINT_TO_POINTER(sock), fbuf);*/
1373
1374 listener->lsock = sock;
1375
1376 /* store the collector handle */
1377 listener->collectorHandle = collector;
1378
1379 listener->spec = NULL;
1380
1381 /* All done. */
1382 return fbuf;
1383 }
1384
1385 /* not even remotely tested yet */
fbListenerOwnSocketCollectorTLS(fbListener_t * listener,int sock,GError ** err)1386 fBuf_t *fbListenerOwnSocketCollectorTLS(
1387 fbListener_t *listener,
1388 int sock,
1389 GError **err)
1390 {
1391 fbCollector_t *collector = NULL;
1392 fBuf_t *fbuf = NULL;
1393
1394 g_assert(listener);
1395
1396 if (sock <= 2) {
1397 /* invalid socket */
1398 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1399 "Invalid socket descriptor");
1400 return NULL;
1401 }
1402
1403 listener->spec->transport = FB_TLS_TCP;
1404
1405 /* collector = fbCollectorAllocTLS(listener, NULL, sock, NULL, 0, err);*/
1406
1407 fbuf = fBufAllocForCollection(fbSessionClone(listener->session),collector);
1408
1409 fBufSetAutomaticMode(fbuf, FALSE);
1410
1411 /* Add buffer to the file descriptor table */
1412 /*g_hash_table_insert(listener->fdtab, GINT_TO_POINTER(sock), fbuf);*/
1413
1414 /* store the collector handle */
1415 listener->collectorHandle = collector;
1416
1417 listener->lsock = sock;
1418
1419 (void)err;
1420
1421 /* All done. */
1422 return fbuf;
1423 }
1424
fbListenerRemoveLastBuf(fBuf_t * fbuf,fbListener_t * listener)1425 void fbListenerRemoveLastBuf(
1426 fBuf_t *fbuf,
1427 fbListener_t *listener)
1428 {
1429 if (listener->lastbuf == fbuf) {
1430 listener->lastbuf = NULL;
1431 }
1432 }
1433
fbListenerCallAppInit(fbListener_t * listener,fbUDPConnSpec_t * spec,GError ** err)1434 gboolean fbListenerCallAppInit(
1435 fbListener_t *listener,
1436 fbUDPConnSpec_t *spec,
1437 GError **err)
1438 {
1439 if (listener->appinit) {
1440 if (!listener->appinit(listener, &(spec->ctx), listener->lsock,
1441 &(spec->peer.so), spec->peerlen, err)) {
1442 return FALSE;
1443 }
1444 }
1445
1446 return TRUE;
1447
1448 }
1449
fbListenerSetPeerSession(fbListener_t * listener,fbSession_t * session)1450 fbSession_t *fbListenerSetPeerSession(
1451 fbListener_t *listener,
1452 fbSession_t *session)
1453 {
1454 fbSession_t *new_session = session;
1455
1456 if (!new_session) {
1457 new_session = fbSessionClone(listener->session);
1458 }
1459
1460 listener->session = new_session;
1461
1462 fBufSetSession(listener->lastbuf, new_session);
1463
1464 fbSessionSetTemplateBuffer(new_session, listener->lastbuf);
1465
1466 return new_session;
1467
1468 }
1469