1 /*@internal
2  *
3  * fblistener.c
4  * IPFIX Collecting Process connection listener implementation
5  *
6  * ------------------------------------------------------------------------
7  * Copyright (C) 2006-2019 Carnegie Mellon University. All Rights Reserved.
8  * ------------------------------------------------------------------------
9  * Authors: Brian Trammell
10  * ------------------------------------------------------------------------
11  * @OPENSOURCE_LICENSE_START@
12  * libfixbuf 2.0
13  *
14  * Copyright 2018-2019 Carnegie Mellon University. All Rights Reserved.
15  *
16  * NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE
17  * ENGINEERING INSTITUTE MATERIAL IS FURNISHED ON AN "AS-IS"
18  * BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY KIND,
19  * EITHER EXPRESSED OR IMPLIED, AS TO ANY MATTER INCLUDING, BUT NOT
20  * LIMITED TO, WARRANTY OF FITNESS FOR PURPOSE OR MERCHANTABILITY,
21  * EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF THE
22  * MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF
23  * ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR
24  * COPYRIGHT INFRINGEMENT.
25  *
26  * Released under a GNU-Lesser GPL 3.0-style license, please see
27  * LICENSE.txt or contact permission@sei.cmu.edu for full terms.
28  *
29  * [DISTRIBUTION STATEMENT A] This material has been approved for
30  * public release and unlimited distribution.  Please see Copyright
31  * notice for non-US Government use and distribution.
32  *
33  * Carnegie Mellon(R) and CERT(R) are registered in the U.S. Patent
34  * and Trademark Office by Carnegie Mellon University.
35  *
36  * DM18-0325
37  * @OPENSOURCE_LICENSE_END@
38  * ------------------------------------------------------------------------
39  */
40 
41 #define _FIXBUF_SOURCE_
42 #include <fixbuf/private.h>
43 #include <poll.h>
44 
45 
46 
47 /**
48  *
49  * Understanding socket handling and collector construction within the
50  * fixbuf listener:
51  *
52  * Error handling on connections in fixbuf is very different between
53  * TCP and UDP connections.  The reasons behind this are partial
54  * tied up within the IPFIX standard and its handling of UDP.
55  * But within fixbuf, this needs to be understood in terms of how
56  * collector's are created with respect to listener's.
57  *
58  * For both TCP and UDP, when a listener is created, a listening
59  * socket is created with the listener.  (lsock)
60  *
61  * For UDP, the listener sock (lsock) is the socket used to create
62  * the collector.  The collector gets created immediately, and the
63  * collector is the structure that is associated with the fBuf
64  * structure which actually handles PDU's.
65  *
66  * For TCP, the case is different.  The listening socket is used
67  * primarily for the listenerWait call.  It is used as a socket
68  * passed to select waiting for connection establishment.  Then
69  * an accept call is made which creates a new socket handle.
70  * That socket handle is used is to create the collector, and the
71  * lsock handle is left only within the listener.
72  *
73  * When an error occurs, the normal usage of the API would be
74  * to call fBufFree and call listenerWait again.  In the case
75  * of TCP this works.  The library will wait for a new connection
76  * to the listener lsock and create a new collector from a new
77  * socket from the accept call.  For UDP, this will not work, and
78  * the library will simply hang.  (Each lsock also has a
79  * corresponding set of pipes to detect interrupts) and the select
80  * call will simply wait on the read pipe handle.
81  *
82  *
83  */
84 
85 #define MAX_BUFFER_FREE 100
86 
87 /* Maximum number of connections allowed by fixbuf */
88 #define MAX_CONNECTIONS 25
89 
90 struct fbListener_st {
91     /** Connection specifier for passive socket. */
92     fbConnSpec_t                *spec;
93     /** Base session. Used for internal templates. */
94     fbSession_t                 *session;
95     /** UDP Base Session.  Only set for UDP listeners.
96      * Since UDP sessions are created at connection time,
97      * this holds the first one so we can free it. */
98     fbSession_t                 *udp_session;
99     /** Last buffer returned by fbListenerWait(). */
100     fBuf_t                      *lastbuf;
101     /** The set of file descriptors to be monitored */
102     struct pollfd               *pfd_array;
103     /** size of array */
104     nfds_t                      pfd_len;
105     /** Holds last file descriptor used */
106     int                         lsock;
107     /** mode (-1 for udp) */
108     int                         mode;
109     /**
110      * Interrupt pipe read end file descriptor.
111      * Used to unblock a call to fbListenerWait().
112      */
113     int                         rip;
114     /**
115      * Interrupt pipe write end file descriptor.
116      * Used to unblock a call to fbListenerWait().
117      */
118     int                         wip;
119     /**
120      * used to hold the handle to the collector for
121      * this listener
122      */
123     fbCollector_t               *collectorHandle;
124     /**
125      * File descriptor table.
126      * Maps file descriptors to active listener-managed buffer instances.
127      */
128     GHashTable                  *fdtab;
129     /**
130      * Application initialization function. Allows the application
131      * to bind internal context to a collector, and to reject connections
132      * after accept() but before session setup.
133      */
134     fbListenerAppInit_fn        appinit;
135     /** Application free function. Frees storage allocated by appinit. */
136     fbListenerAppFree_fn        appfree;
137 };
138 
139 typedef struct fbListenerWaitFDSet_st {
140     fd_set                      fds;
141     int                         maxfd;
142     fBuf_t                      *fbuf;
143 } fbListenerWaitFDSet_t;
144 
145 /**
146  * Structure that holds the listeners that are added to the group.
147  */
148 struct fbListenerGroup_st
149 {
150     /** pointer to the head of the listener group result list */
151     fbListenerEntry_t   *head;
152     /** pointer to the last fbListener */
153     fbListenerEntry_t   *lastlist;
154     /** pointer to a generic structure for future use */
155     struct pollfd       *group_pfd;
156     /** length of usable fds */
157     nfds_t               pfd_len;
158 };
159 
160 
161 /**
162  * fbListenerTeardownSocket
163  *
164  *
165  *
166  *
167  */
fbListenerTeardownSocket(fbListener_t * listener)168 static void fbListenerTeardownSocket(
169     fbListener_t                *listener)
170 {
171     unsigned int i;
172 
173     if (listener->pfd_len) {
174         for (i = 0; i < listener->pfd_len; i++) {
175             if (listener->pfd_array[i].fd >= 0) {
176                 close(listener->pfd_array[i].fd);
177                 listener->pfd_array[i].fd = -1;
178             }
179         }
180         g_slice_free1((MAX_CONNECTIONS * sizeof(struct pollfd)),
181                       listener->pfd_array);
182         listener->pfd_len = 0;
183     }
184 
185 }
186 
187 /**
188  *fbListenerInitSocket
189  *
190  *
191  *
192  *
193  */
fbListenerInitSocket(fbListener_t * listener,GError ** err)194 static gboolean fbListenerInitSocket(
195     fbListener_t                *listener,
196     GError                      **err)
197 {
198     int                         pfd[2];
199     int                         i = 0;
200     int                         count = 0;
201     struct pollfd               *cpfd = NULL;
202     struct addrinfo             *ai = NULL;
203     struct addrinfo             *current = NULL;
204 
205     /* Create interrupt pipe */
206     if (pipe(pfd)) {
207         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
208                     "fbListener error creating interrupt pipe: %s",
209                     strerror(errno));
210         return FALSE;
211     }
212 
213 
214     /* Look up the passive socket address */
215     if (!fbConnSpecLookupAI(listener->spec, TRUE, err)) {
216         fbListenerTeardownSocket(listener);
217         return FALSE;
218     }
219 
220     ai = (struct addrinfo *)listener->spec->vai;
221 
222     current = ai;
223 
224     /* figure out how many addresses there are */
225     while (current) {
226         i++;
227         current = current->ai_next;
228     }
229 
230     listener->pfd_array = (struct pollfd*)g_slice_alloc0(MAX_CONNECTIONS *
231                                                         sizeof(struct pollfd));
232 
233     if (listener->pfd_array == NULL) {
234         return FALSE;
235     }
236 
237     listener->pfd_len = i+2;
238 
239     /* read interrupt pipe */
240     listener->pfd_array[0].fd = pfd[0];
241     listener->pfd_array[0].events = POLLIN;
242     /* write interrupt pipe */
243     listener->pfd_array[1].fd = pfd[1];
244 
245     i = 2;
246     /* Create the passive socket */
247     do {
248 
249         cpfd = &listener->pfd_array[i];
250         /*
251          * Kludge for SCTP. addrinfo doesn't accept SCTP hints.
252          */
253 #if FB_ENABLE_SCTP
254         if ((listener->spec->transport == FB_SCTP) ||
255             (listener->spec->transport == FB_DTLS_SCTP)) {
256             ai->ai_socktype = SOCK_STREAM;
257             ai->ai_protocol = IPPROTO_SCTP;
258         }
259 #endif
260         /* Create socket and bind it to the passive address */
261         cpfd->fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
262         if (cpfd->fd < 0) {
263             i++; continue;
264         }
265         if (bind(cpfd->fd, ai->ai_addr, ai->ai_addrlen) == -1) {
266             close(cpfd->fd); cpfd->fd = -1; i++; continue;
267         }
268         cpfd->events = POLLIN;
269 
270         /* Listen only on socket and sequenced packet sockets */
271         if ((ai->ai_socktype == SOCK_STREAM)
272 #ifdef SOCK_SEQPACKET
273             || (ai->ai_socktype == SOCK_SEQPACKET)
274 #endif
275             ) {
276             if (listen(cpfd->fd, 1) < 0) {
277                 close(cpfd->fd); cpfd->fd = -1; i++; continue;
278             }
279         }
280         i++;
281         /* Socket successfully bound for listening */
282         count++;
283     } while ((ai = ai->ai_next));
284 
285     /* check for no listenable socket */
286     if (!count) {
287         fbListenerTeardownSocket(listener);
288         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
289                     "couldn't create socket listening to %s:%s: %s",
290                     listener->spec->host ? listener->spec->host : "*",
291                     listener->spec->svc, strerror(errno));
292         return FALSE;
293     }
294 
295 
296     /* All done. */
297     return TRUE;
298 }
299 
300 /**
301  *fbListenerInitUDPSocket
302  *
303  *
304  *
305  *
306  */
fbListenerInitUDPSocket(fbListener_t * listener,GError ** err)307 static gboolean fbListenerInitUDPSocket(
308     fbListener_t                *listener,
309     GError                      **err)
310 {
311     void                        *ctx = NULL;
312     fbCollector_t               *collector = NULL;
313     fBuf_t                      *fbuf = NULL;
314     unsigned int                i;
315 
316     /* Simulate accept on UDP socket */
317 
318     /* Ask application for context */
319     if (listener->appinit) {
320         if (!listener->appinit(listener, &ctx, listener->lsock, NULL, 0, err)){
321             return FALSE;
322         }
323     }
324 
325     /* Create collector on UDP socket */
326     switch (listener->spec->transport) {
327     case FB_UDP:
328         collector = fbCollectorAllocSocket(listener, ctx,
329                                            listener->lsock, NULL, 0, err);
330         break;
331 #if HAVE_OPENSSL_DTLS
332     case FB_DTLS_UDP:
333         collector = fbCollectorAllocTLS(listener, ctx,
334                                         listener->lsock, NULL, 0, err);
335         break;
336 #endif
337     default:
338         g_assert_not_reached();
339     }
340 
341     /* Check for collector alloc error */
342     if (!collector) return FALSE;
343 
344     /* Create a buffer with a cloned session around the collector */
345     fbuf = fBufAllocForCollection(fbSessionClone(listener->session),collector);
346 
347     /* add this fbuf for all file descriptors */
348     for (i = 2; i < listener->pfd_len; i++) {
349         g_hash_table_insert(listener->fdtab,
350                             GINT_TO_POINTER(listener->pfd_array[i].fd), fbuf);
351     }
352 
353 
354     /* Add collector to the file descriptor table */
355     /*g_hash_table_insert(listener->fdtab,
356       GINT_TO_POINTER(listener->lsock), fbuf);*/
357 
358     /* No more passive socket */
359     /*listener->lsock = -1;*/
360     /* set mode to UDP */
361     listener->mode = -1;
362 
363     /* store this session so we can free it later */
364     listener->udp_session = fBufGetSession(fbuf);
365 
366     /* store the handle to the collector */
367     listener->collectorHandle = collector;
368 
369     /* All done. */
370     return TRUE;
371 }
372 
373 /**
374  *fbListenerAlloc
375  *
376  *
377  *
378  *
379  */
fbListenerAlloc(fbConnSpec_t * spec,fbSession_t * session,fbListenerAppInit_fn appinit,fbListenerAppFree_fn appfree,GError ** err)380 fbListener_t *fbListenerAlloc(
381     fbConnSpec_t                *spec,
382     fbSession_t                 *session,
383     fbListenerAppInit_fn        appinit,
384     fbListenerAppFree_fn        appfree,
385     GError                      **err)
386 {
387     fbListener_t                *listener = NULL;
388     gboolean                    ownSocket;
389 
390     g_assert(session);
391     if (spec) {
392         ownSocket = FALSE;
393     } else {
394         ownSocket = TRUE;
395     }
396 
397     /* Allocate a new listener */
398     listener = g_slice_new0(fbListener_t);
399 
400     /* -1 for file descriptors means no fd */
401     listener->lsock = -1;
402     listener->rip = -1;
403     listener->wip = -1;
404 
405     if (ownSocket) { /* user handling own socket creation and connections */
406         listener->spec = NULL;
407     } else {
408         listener->spec = fbConnSpecCopy(spec);
409     }
410     /* Fill in what we can */
411     listener->session = session;
412     listener->appinit = appinit;
413     listener->appfree = appfree;
414 
415     /* allocate file descriptor table */
416     listener->fdtab = g_hash_table_new(g_direct_hash, g_direct_equal);
417 
418     if (!ownSocket) {
419         /* Do transport-specific initialization */
420         switch (spec->transport) {
421 #if FB_ENABLE_SCTP
422           case FB_SCTP:
423 #if HAVE_OPENSSL_DTLS_SCTP
424           case FB_DTLS_SCTP:
425 #endif
426 #endif
427           case FB_TCP:
428 #if HAVE_OPENSSL
429           case FB_TLS_TCP:
430 #endif
431             if (!fbListenerInitSocket(listener, err)) {
432                 goto err;
433             }
434             break;
435           case FB_UDP:
436 #if HAVE_OPENSSL_DTLS
437           case FB_DTLS_UDP:
438 #endif
439             /* FIXME this may leak on socket setup error for UDP. */
440             if (fbListenerInitSocket(listener, err)) {
441                 if (!fbListenerInitUDPSocket(listener, err)) {
442                     fbListenerTeardownSocket(listener);
443                     goto err;
444                 }
445             } else {
446                 goto err;
447             }
448             break;
449           default:
450 #ifndef FB_ENABLE_SCTP
451             if (spec->transport == FB_SCTP || spec->transport == FB_DTLS_SCTP){
452                 g_error("Libfixbuf not enabled for SCTP Transport. "
453                         " Run configure with --with-sctp");
454             }
455 #endif
456             if (spec->transport == FB_TLS_TCP ||
457                 spec->transport == FB_DTLS_SCTP ||
458                 spec->transport == FB_DTLS_UDP)
459             {
460                 g_error("Libfixbuf not enabled for this mode of transport. "
461                         " Run configure with --with-openssl");
462             }
463         }
464     }
465 
466     /* Return the initialized listener */
467     return listener;
468 
469 err:
470     if (listener) {
471         if (listener->fdtab) {
472             g_hash_table_destroy(listener->fdtab);
473         }
474 
475         g_slice_free(fbListener_t, listener);
476     }
477 
478     /* No listener */
479     return NULL;
480 }
481 
482 
483 /**
484  * fbListenerFreeBuffer
485  *
486  *
487  *
488  *
489  */
fbListenerFreeBuffer(void * vfd,fBuf_t * fbuf,fBuf_t ** lfbuf)490 static void   fbListenerFreeBuffer(
491     void                        *vfd __attribute__((unused)),
492     fBuf_t                      *fbuf,
493     /*    void                        *vignore __attribute__((unused)) )*/
494     fBuf_t                      **lfbuf)
495 {
496     /* free the buffer; this will close the socket. */
497     /*    fBufFree(fbuf);*/
498     /* we can't change the hash table while we are looping through it */
499     *lfbuf = fbuf;
500     lfbuf++;
501 }
502 
503 /**
504  * fbListenerAppFree
505  *
506  *
507  */
508 
fbListenerAppFree(fbListener_t * listener,void * ctx)509 void fbListenerAppFree(
510     fbListener_t               *listener,
511     void                       *ctx)
512 {
513     if (listener) {
514         if (listener->appfree) {
515             (listener->appfree)(ctx);
516         }
517     }
518 }
519 
520 
521 /**
522  *fbListenerFree
523  *
524  *
525  *
526  *
527  */
fbListenerFree(fbListener_t * listener)528 void            fbListenerFree(
529     fbListener_t                *listener)
530 {
531     fBuf_t                     *tfbuf[MAX_BUFFER_FREE+1];
532     fBuf_t                     *lfbuf = NULL;
533     fbSession_t                *session = NULL;
534     unsigned int               loop = 0;
535 
536     if (NULL == listener) {
537         return;
538     }
539 
540     while (loop < MAX_BUFFER_FREE) {
541         tfbuf[loop] = NULL;
542         loop++;
543     }
544 
545     /* shut down passive socket */
546     fbListenerTeardownSocket(listener);
547 
548     /* free any open buffers we may have */
549     g_hash_table_foreach(listener->fdtab,
550                         (GHFunc)fbListenerFreeBuffer, tfbuf);
551 
552     loop = 0;
553     lfbuf = tfbuf[0];
554     /* free first session */
555     if (listener->udp_session) {
556         /* we need to get the session set on the fBuf - it should be the
557            same as udp_session in the case that we haven't received anything*/
558         session = fBufGetSession(lfbuf);
559         if (listener->udp_session != session) {
560             fbSessionFree(listener->udp_session);
561         }
562     }
563 
564     if (listener->mode == -1) {
565         /* for UDP there can be multiple FDs with the same fBuf */
566         /* And there should only be 1 fBuf for this listener */
567         fBufFree(lfbuf);
568     } else {
569         while (lfbuf && loop < MAX_BUFFER_FREE) {
570             fBufFree(lfbuf);
571             loop++;
572             lfbuf = tfbuf[loop];
573         }
574     }
575     /* free the listener table */
576     g_hash_table_destroy(listener->fdtab);
577 
578     /* free the connection specifier */
579     fbConnSpecFree(listener->spec);
580 
581     /* free the listener itself */
582     g_slice_free(fbListener_t, listener);
583 }
584 
585 /**
586  *fbListenerWaitAddFD
587  *
588  *
589  *
590  *
591  */
592 /*static void   fbListenerWaitAddFD(
593     void                        *vfd,
594     void                        *vignore __attribute__((unused)),
595     fbListenerWaitFDSet_t       *lfdset)
596 {
597     int                         fd = GPOINTER_TO_INT(vfd);
598 
599     FD_SET(fd,&(lfdset->fds));
600     if (fd > lfdset->maxfd) lfdset->maxfd = fd;
601 }
602 */
603 
604 /**
605  * fbListenerWaitSearch
606  *
607  *
608  *
609  *
610  */
611  /*static void   fbListenerWaitSearch(
612     void                        *vfd,
613     void                        *fbuf,
614     fbListenerWaitFDSet_t       *lfdset)
615 {
616     int                         fd = GPOINTER_TO_INT(vfd);
617 
618     if (FD_ISSET(fd,&(lfdset->fds))) {
619         lfdset->fbuf = fbuf;
620     }
621     }*/
622 
623 /**
624  * fbListenerAddPollFD
625  *
626  *
627  */
fbListenerAddPollFD(struct pollfd * fd_array,nfds_t * array_len,int fd)628 static void fbListenerAddPollFD(
629     struct pollfd              *fd_array,
630     nfds_t                     *array_len,
631     int                        fd)
632 {
633     nfds_t i;
634     nfds_t num_fds = *array_len;
635     gboolean added = FALSE;
636 
637     /* use an old entry for this new entry */
638     for (i = 0; i < num_fds; i++) {
639         if (fd_array[i].fd < 0) {
640             fd_array[i].fd = fd;
641             fd_array[i].events = POLLIN;
642             added = TRUE;
643             break;
644         }
645     }
646 
647     /* no free entries in the array, add a new one */
648     if (!added) {
649         fd_array[num_fds].fd = fd;
650         fd_array[num_fds].events = POLLIN;
651         num_fds++;
652         *array_len = num_fds;
653     }
654 }
655 
656 /**
657  * fbListenerWaitAccept
658  *
659  *
660  *
661  *
662  */
fbListenerWaitAccept(fbListener_t * listener,GError ** err)663 static fBuf_t *fbListenerWaitAccept(
664     fbListener_t                *listener,
665     GError                      **err)
666 {
667     int                         asock;
668     union {
669         struct sockaddr         so;
670         struct sockaddr_in      ip4;
671         struct sockaddr_in6     ip6;
672     }                           peer;
673     socklen_t                   peerlen;
674     void                        *ctx = NULL;
675     fbCollector_t               *collector = NULL;
676     fBuf_t                      *fbuf = NULL;
677 
678     /* Accept the connection */
679     peerlen = sizeof(peer);
680     asock = accept(listener->lsock, &(peer.so), &peerlen);
681     if (asock < 0) {
682         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
683                     "listener accept error: %s",
684                     strerror(errno));
685         return NULL;
686     }
687 
688     /* Okay, we have a socket. Ask the application for context. */
689     if (listener->appinit) {
690         if (!listener->appinit(listener, &ctx, asock,
691                                &(peer.so), peerlen, err)) {
692             close(asock);
693             return NULL;
694         }
695     }
696 
697     /* Create a collector as appropriate */
698     switch (listener->spec->transport) {
699 #if FB_ENABLE_SCTP
700     case FB_SCTP:
701 #endif
702     case FB_TCP:
703         collector = fbCollectorAllocSocket(listener, ctx, asock,
704                                            &(peer.so), peerlen, err);
705         break;
706 #if HAVE_OPENSSL
707 #if HAVE_OPENSSL_DTLS_SCTP
708     case FB_DTLS_SCTP:
709 #endif
710     case FB_TLS_TCP:
711         collector = fbCollectorAllocTLS(listener, ctx, asock,
712                                         &(peer.so), peerlen, err);
713         break;
714 #endif
715     default:
716         g_assert_not_reached();
717     }
718 
719     /* Check for collector creation error */
720     if (!collector) return NULL;
721 
722     /* Create a buffer with a cloned session around the collector */
723     fbuf = fBufAllocForCollection(fbSessionClone(listener->session),collector);
724 
725     /* Make the buffer automatic */
726     fBufSetAutomaticMode(fbuf, TRUE);
727 
728     /* Add buffer to the file descriptor table */
729     g_hash_table_insert(listener->fdtab, GINT_TO_POINTER(asock), fbuf);
730 
731     /* don't add to array if fbListenerWaitNoCollectors was called */
732     if (listener->mode < 1) {
733         /* add to poll array */
734         if (listener->pfd_len < MAX_CONNECTIONS) {
735             fbListenerAddPollFD(listener->pfd_array, &listener->pfd_len,asock);
736         } else {
737             g_warning("Max connections %d reached.", MAX_CONNECTIONS);
738         }
739     }
740 
741     listener->lsock = asock;
742 
743     /* store the collector handle */
744     listener->collectorHandle = collector;
745 
746     /* All done. */
747     return fbuf;
748 }
749 
750 /**
751  * fbListenerRemove
752  *
753  *
754  *
755  *
756  */
fbListenerRemove(fbListener_t * listener,int fd)757 void fbListenerRemove(
758     fbListener_t        *listener,
759     int                 fd)
760 {
761     unsigned int i;
762 
763     /* remove from hash table */
764     g_hash_table_remove(listener->fdtab, GINT_TO_POINTER(fd));
765 
766     /* remove from poll array */
767     for (i = 0; i < listener->pfd_len; i++) {
768         if (listener->pfd_array[i].fd == fd) {
769             if (listener->lsock == fd) {
770                 /* unset lsock */
771                 listener->lsock = 0;
772             }
773             close(listener->pfd_array[i].fd);
774             listener->pfd_array[i].fd = -1;
775             break;
776         }
777     }
778 }
779 
780 /**
781  * fbListenerWait
782  *
783  *
784  *
785  *
786  */
fbListenerWait(fbListener_t * listener,GError ** err)787 fBuf_t *fbListenerWait(
788     fbListener_t                *listener,
789     GError                      **err)
790 {
791     fBuf_t                      *fbuf = NULL;
792     uint8_t                     byte;
793     int                         got_sock = -1;
794     int                         rc;
795     unsigned int                i;
796 
797     /* wait for data available on one of our file descriptors */
798     rc = poll(listener->pfd_array, listener->pfd_len, -1);
799 
800     if (rc < 0) {
801         if (errno == EINTR) {
802             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
803                         "Interrupted listener wait");
804                 return NULL;
805         } else {
806             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
807                         "listener wait error: %s",
808                         strerror(errno));
809             return NULL;
810         }
811     }
812 
813     /* Loop through file descriptors */
814     for (i = 0; i < listener->pfd_len; i++) {
815         struct pollfd *pfd = &listener->pfd_array[i];
816 
817         if (pfd->revents & (POLLERR | POLLHUP | POLLNVAL)) {
818             /* hang up or error */
819             got_sock = pfd->fd;
820             break;
821         }
822 
823         if (!(pfd->revents & POLLIN)) {
824             continue;
825         }
826 
827         if (i == 0) {
828             /* read or write interrupt */
829             /* consume and ignore return */
830             read(pfd->fd, &byte, sizeof(byte));
831             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
832                         "External interrupt on pipe");
833             return NULL;
834         }
835 
836         got_sock = pfd->fd;
837         break;
838     }
839 
840     /* quick solution - check if the fd is the same as last time */
841     if ((listener->lsock == got_sock) && listener->lastbuf) {
842         return listener->lastbuf;
843     }
844 
845     listener->lsock = got_sock;
846 
847     /* different than last time -> check to see if it's been seen before */
848     if ((fbuf =g_hash_table_lookup(listener->fdtab,GINT_TO_POINTER(got_sock))))
849     {
850         listener->lastbuf = fbuf;
851         if (listener->mode < 0) {
852             /* if UDP set FD on collector for reading */
853             fbCollectorSetFD(fBufGetCollector(fbuf), got_sock);
854         }
855         return fbuf;
856     } else {
857         if (listener->mode >= 0) {
858             /* new connection */
859             fbuf = fbListenerWaitAccept(listener, err);
860             if (!fbuf) return NULL;
861             listener->lastbuf = fbuf;
862             return fbuf;
863         } else {
864             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
865                         "listener wait error: invalid FD");
866             /* this is strange bc UDP fbufs are set in fblisteneralloc */
867             return NULL;
868         }
869     }
870 }
871 
fbListenerWaitNoCollectors(fbListener_t * listener,GError ** err)872 fBuf_t *fbListenerWaitNoCollectors(
873     fbListener_t                *listener,
874     GError                      **err)
875 {
876     fBuf_t                      *fbuf = NULL;
877     uint8_t                     byte;
878     int                         rc;
879     unsigned int                i;
880 
881 
882     /* set the mode to 1 so fbListenerWaitAccept doesn't add fd */
883     listener->mode = 1;
884     /* wait for data available on one of our file descriptors */
885     rc = poll(listener->pfd_array, listener->pfd_len, -1);
886 
887     if (rc < 0) {
888         if (errno == EINTR) {
889             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
890                         "Interrupted listener wait");
891             return NULL;
892         } else {
893             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
894                         "listener wait error: %s",
895                         strerror(errno));
896             return NULL;
897         }
898     }
899 
900     /* Loop file descriptors */
901     for (i = 0; i < listener->pfd_len; i++) {
902         struct pollfd *pfd = &listener->pfd_array[i];
903 
904         if (pfd->revents & (POLLERR | POLLHUP | POLLNVAL)) {
905             listener->lsock = pfd->fd;
906             break;
907         }
908 
909         if (!(pfd->revents & POLLIN)) {
910             continue;
911         }
912 
913         if (i == 0) {
914             /* read or write interrupt */
915             /* consume and ignore return */
916             read(pfd->fd, &byte, sizeof(byte));
917             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
918                         "External interrupt on pipe");
919             return NULL;
920         }
921 
922         listener->lsock = pfd->fd;
923         break;
924     }
925 
926     /* handle any pending accept, return the accepted buffer immediately. */
927     if (listener->mode >= 0) {
928         return fbListenerWaitAccept(listener, err);
929     } else {
930         /* For UDP this doesn't really work, just return the data */
931         if ((fbuf = g_hash_table_lookup(listener->fdtab,
932                                         GINT_TO_POINTER(listener->lsock))))
933         {
934             fbCollectorSetFD(fBufGetCollector(fbuf), listener->lsock);
935             return fbuf;
936         }
937     }
938 
939     /* this should never happen */
940     return NULL;
941 }
942 
943 
fbListenerInterruptCollectors(void * vfd,void * fbuf,fbListenerWaitFDSet_t * lfdset)944 static void   fbListenerInterruptCollectors(
945     void                        *vfd __attribute__((unused)),
946     void                        *fbuf,
947     fbListenerWaitFDSet_t       *lfdset __attribute__((unused)))
948 {
949     fBufInterruptSocket(fbuf);
950 }
951 
952 
953 /**
954  * fbListenerInterrupt
955  *
956  *
957  *
958  *
959  */
fbListenerInterrupt(fbListener_t * listener)960 void fbListenerInterrupt(
961     fbListener_t        *listener)
962 {
963     uint8_t             byte = 0xe7;
964 
965     /* send interrrupts to the collectors, then to the listener */
966     g_hash_table_foreach(listener->fdtab,
967                          (GHFunc)fbListenerInterruptCollectors,
968                          NULL);
969 
970     /* write and ignore return */
971     /*write(listener->wip, &byte, sizeof(byte));
972       write(listener->rip, &byte, sizeof(byte));*/
973     write(listener->pfd_array[0].fd, &byte, sizeof(byte));
974     write(listener->pfd_array[1].fd, &byte, sizeof(byte));
975 
976 
977 }
978 
979 /**
980  * fbListenerGetConnSpec
981  *
982  *
983  *
984  *
985  */
fbListenerGetConnSpec(fbListener_t * listener)986 fbConnSpec_t        *fbListenerGetConnSpec(
987     fbListener_t        *listener)
988 {
989     return listener->spec;
990 }
991 
992 
993 /**
994  *fbListenerGetCollector
995  *
996  * gets the collector allocated to the listener
997  *
998  */
fbListenerGetCollector(fbListener_t * listener,fbCollector_t ** collector,GError ** err)999 gboolean            fbListenerGetCollector(
1000     fbListener_t        *listener,
1001     fbCollector_t       **collector,
1002     GError              **err)
1003 {
1004     g_assert(collector);
1005     if (NULL == listener->collectorHandle) {
1006         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1007                     "no collector available to be retrieved");
1008         return FALSE;
1009     }
1010 
1011     *collector = listener->collectorHandle;
1012 
1013     return TRUE;
1014 }
1015 
1016 /* returns NULL or pointer to allocated group structure */
1017 
fbListenerGroupAlloc(void)1018 fbListenerGroup_t* fbListenerGroupAlloc(
1019     void)
1020 {
1021     fbListenerGroup_t   *group = NULL;
1022 
1023     group = g_slice_new0( fbListenerGroup_t );
1024     group->group_pfd = ((struct pollfd *)g_slice_alloc0(
1025                             MAX_CONNECTIONS* 5 *sizeof(struct pollfd)));
1026 
1027     group->head = NULL;
1028 
1029     return group;
1030 }
1031 
1032 
1033 /**
1034  * fbListenerGroupFree
1035  *
1036  * frees a listener group
1037  *
1038  */
fbListenerGroupFree(fbListenerGroup_t * group)1039 void fbListenerGroupFree(
1040     fbListenerGroup_t *group)
1041 {
1042     if (group && group->group_pfd) {
1043         g_slice_free1((MAX_CONNECTIONS * 5 * sizeof(struct pollfd)),
1044                       group->group_pfd);
1045     }
1046 
1047     g_slice_free(fbListenerGroup_t, group);
1048 }
1049 
1050 /**
1051  * fbListenerGroupAddListener
1052  *
1053  * @return 0 upon success.  "1" if entry couldn't get created
1054  *         maybe "2" if either of the incoming pointers is NULL
1055  */
fbListenerGroupAddListener(fbListenerGroup_t * group,const fbListener_t * listener)1056 int fbListenerGroupAddListener(
1057     fbListenerGroup_t  *group,
1058     const fbListener_t *listener)
1059 {
1060     fbListenerEntry_t   *entry = NULL;
1061     unsigned int        i;
1062 
1063     if (!group || !listener) {
1064         return 2;
1065     }
1066 
1067     entry = g_slice_new0( fbListenerEntry_t );
1068 
1069     if (!entry) {
1070         /* needs to be something like ERR_NO_MEM */
1071         return 1;
1072     }
1073 
1074     entry->prev = NULL;
1075     entry->next = group->head;
1076     entry->listener = (fbListener_t*)listener;
1077 
1078     if (group->head) {
1079         group->head->prev = entry;
1080     }
1081 
1082     group->head = entry;
1083 
1084     /* add FDs */
1085     for (i = 0; i < entry->listener->pfd_len; i++) {
1086         group->group_pfd[group->pfd_len].fd = entry->listener->pfd_array[i].fd;
1087         group->group_pfd[group->pfd_len].events = POLLIN;
1088         group->pfd_len++;
1089     }
1090 
1091     group->lastlist = entry;
1092 
1093     return 0;
1094 }
1095 
1096 /**
1097  * fbListenerGroupDeleteListener
1098  *
1099  * @return 0 on success.  "1" if not found. "2" if a pointer is NULL
1100 */
fbListenerGroupDeleteListener(fbListenerGroup_t * group,const fbListener_t * listener)1101 int fbListenerGroupDeleteListener(
1102     fbListenerGroup_t   *group,
1103     const fbListener_t  *listener)
1104 {
1105     fbListenerEntry_t   *entry = NULL;
1106     unsigned int i, k;
1107 
1108     if (!group || !listener) {
1109         return 2;
1110     }
1111 
1112     for (entry = group->head; entry; entry = entry->next) {
1113         if (entry->listener == listener) {
1114             if (entry->prev) {
1115                 entry->prev->next = entry->next;
1116             }
1117 
1118             if (entry->next) {
1119                 entry->next->prev = entry->prev;
1120             }
1121 
1122             /* remove FDs (close will happen later) */
1123             for (i = 0; i < entry->listener->pfd_len; i++) {
1124                 for (k = 0; k < group->pfd_len; k++) {
1125                     if (entry->listener->pfd_array[i].fd ==
1126                         group->group_pfd[k].fd)
1127                     {
1128                         group->group_pfd[k].fd = -1;
1129                         break;
1130                     }
1131                 }
1132             }
1133 
1134             if (entry == group->lastlist) {
1135                 group->lastlist = group->head;
1136             }
1137 
1138             g_slice_free(fbListenerEntry_t, entry);
1139 
1140             return 0;
1141         }
1142     }
1143 
1144     return 1;
1145 }
1146 
fbListenerNewResult(fbListenerGroupResult_t ** resultList,fbListener_t * listener)1147 static fbListenerGroupResult_t *fbListenerNewResult(
1148     fbListenerGroupResult_t **resultList,
1149     fbListener_t            *listener)
1150 {
1151     fbListenerGroupResult_t *result = NULL;
1152 
1153     /* allocate new one */
1154     result = g_slice_new0( fbListenerGroupResult_t );
1155     /* set buffer to last one */
1156     result->fbuf = listener->lastbuf;
1157     /* set listener */
1158     result->listener = listener;
1159     /* put it on the list */
1160     result->next = *resultList;
1161 
1162     *resultList = result;
1163 
1164     return result;
1165 }
1166 
1167 
fbListenerFreeGroupResult(fbListenerGroupResult_t * result)1168 void fbListenerFreeGroupResult(
1169     fbListenerGroupResult_t *result)
1170 {
1171     fbListenerGroupResult_t *cr, *nr;
1172 
1173     for (cr = result; cr; cr = nr) {
1174         nr = cr->next;
1175         g_slice_free(fbListenerGroupResult_t, cr);
1176     }
1177 }
1178 
1179 
1180 
1181 
fbListenerGroupWait(fbListenerGroup_t * group,GError ** err)1182 fbListenerGroupResult_t* fbListenerGroupWait(
1183     fbListenerGroup_t   *group,
1184     GError             **err)
1185 {
1186     gboolean                    found;
1187     uint8_t                     byte;
1188     unsigned int                i, k;
1189     int                         rc;
1190     int                         new_fd = -1;
1191     fBuf_t                      *fbuf = NULL;
1192     fbListenerEntry_t           *entry       = NULL;
1193     fbListenerGroupResult_t     *resultHead  = NULL;
1194     fbListenerGroupResult_t     *result      = NULL;
1195 
1196     g_assert(group);
1197 
1198     /* wait for data available on one of our file descriptors */
1199 
1200     while (!resultHead) {
1201         rc = poll(group->group_pfd, group->pfd_len, -1);
1202 
1203         if (rc < 0) {
1204             if (errno == EINTR) {
1205                 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
1206                             "Interrupted listener wait");
1207                 return NULL;
1208             } else {
1209                 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1210                             "listener wait error: %s",
1211                             strerror(errno));
1212                 return NULL;
1213             }
1214         }
1215 
1216         /* Loop file descriptors */
1217         for (i = 0; i < group->pfd_len; i++) {
1218 
1219             struct pollfd *pfd = &group->group_pfd[i];
1220 
1221             found = FALSE;
1222 
1223             if (pfd->revents & (POLLERR | POLLHUP | POLLNVAL)) {
1224                 /* hang up or error */
1225                 new_fd = pfd->fd;
1226             } else if (!(pfd->revents & POLLIN)) {
1227                 continue;
1228             }
1229 
1230             new_fd = pfd->fd;
1231 
1232             /* check to see if this belongs to the last listener */
1233             if (new_fd == group->lastlist->listener->lsock) {
1234                 result = fbListenerNewResult(&resultHead,
1235                                              group->lastlist->listener);
1236                 continue;
1237             }
1238 
1239             /* find out which listener this belongs to */
1240             for (entry = group->head; entry; entry = entry->next) {
1241                 /* handle interrupt pipe read end */
1242 
1243                 for (k = 0; k < entry->listener->pfd_len; k++) {
1244                     struct pollfd *cpfd = &entry->listener->pfd_array[k];
1245 
1246                     if (new_fd == cpfd->fd) {
1247 
1248                         if (k == 0) {
1249                             /* read or write interrupt */
1250                             /* consume and ignore return */
1251                             read(cpfd->fd, &byte, sizeof(byte));
1252                             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
1253                                         "External interrupt on pipe");
1254                             return NULL;
1255                         }
1256 
1257                         if ((entry->listener->lsock == new_fd) &&
1258                             entry->listener->lastbuf)
1259                         {
1260                             result = fbListenerNewResult(&resultHead,
1261                                                          entry->listener);
1262                             found = TRUE;
1263                             group->lastlist = entry;
1264                             break;
1265                         }
1266 
1267                         entry->listener->lsock = new_fd;
1268                         /* Look it up */
1269                         if ((fbuf = g_hash_table_lookup(entry->listener->fdtab,
1270                                                 GINT_TO_POINTER(new_fd))))
1271                         {
1272                             if (entry->listener->mode < 0) {
1273                                 fbCollectorSetFD(fBufGetCollector(fbuf),
1274                                                  new_fd);
1275                             }
1276                             entry->listener->lastbuf = fbuf;
1277                             result = fbListenerNewResult(&resultHead,
1278                                                          entry->listener);
1279                             group->lastlist = entry;
1280                             found = TRUE;
1281                             break;
1282                         } else {
1283                             if (entry->listener->mode >= 0) {
1284                                 /* TCP - call accept */
1285                                 fbuf = fbListenerWaitAccept(entry->listener,
1286                                                             err);
1287                                 entry->listener->lastbuf = fbuf;
1288                                 result = fbListenerNewResult(&resultHead,
1289                                                              entry->listener);
1290                                 if (group->pfd_len < (MAX_CONNECTIONS * 5)) {
1291                                     fbListenerAddPollFD(group->group_pfd,
1292                                                         &group->pfd_len,
1293                                                        entry->listener->lsock);
1294                                 } else {
1295                                     g_warning("Maximum connections reached "
1296                                               "for Listener Group (%d)",
1297                                               (int)group->pfd_len);
1298                                 }
1299                                 found = TRUE;
1300                                 group->lastlist = entry;
1301                                 break;
1302                             } else {
1303                                 /* shouldn't happen */
1304                                 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1305                                             "listener wait error: invalid FD");
1306                                 return NULL;
1307                             }
1308                         }
1309 
1310                     } /* new_fd == cpfd->fd */
1311                 } /* listener->pfd_array for loop */
1312 
1313                 if (found) {
1314                     break;
1315                 }
1316 
1317             } /* listenergroup for loop */
1318 
1319             if (!found) {
1320                 /* close the old one, it doesn't belong to any listeners
1321                  it most likely was closed on the listener */
1322                 close(pfd->fd);
1323                 pfd->fd = -1;
1324             }
1325 
1326         } /* loop through fd's */
1327 
1328     } /* !resultHead */
1329     return resultHead;
1330 
1331 }
1332 
1333 /*  Given a socket descriptor with an existing connection, return an fbuf
1334  *  fBufNext can be called on it
1335  *  Interrupting the accepting of new connections on this socket is the
1336  *  responsibility of the caller, it cannot be done with
1337  *  fbListenerInterrupt().  However, the collectors attached to this listener
1338  *  can be interrupted by this call, which short circuits fBufNext().
1339  *  Call fbListenerInterrupt to stop the collectors, then stop the listener
1340  *  socket on your own.
1341  */
fbListenerOwnSocketCollectorTCP(fbListener_t * listener,int sock,GError ** err)1342 fBuf_t  *fbListenerOwnSocketCollectorTCP(
1343     fbListener_t   *listener,
1344     int             sock,
1345     GError        **err)
1346 {
1347     fbCollector_t   *collector  = NULL;
1348     fBuf_t          *fbuf       = NULL;
1349     fbConnSpec_t     connSpec;
1350     g_assert(listener);
1351 
1352     if (sock <= 2) {
1353         /* invalid socket */
1354         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1355                     "Invalid socket descriptor");
1356         return NULL;
1357     }
1358 
1359     connSpec.transport = FB_TCP;
1360     listener->spec = &connSpec;
1361 
1362     collector = fbCollectorAllocSocket(listener, NULL, sock, NULL, 0, err);
1363     if (!collector) {
1364         return NULL;
1365     }
1366 
1367     fbuf = fBufAllocForCollection(fbSessionClone(listener->session),collector);
1368 
1369     fBufSetAutomaticMode(fbuf, FALSE);
1370 
1371     /* Add buffer to the file descriptor table */
1372     /* g_hash_table_insert(listener->fdtab, GINT_TO_POINTER(sock), fbuf);*/
1373 
1374     listener->lsock = sock;
1375 
1376     /* store the collector handle */
1377     listener->collectorHandle = collector;
1378 
1379     listener->spec = NULL;
1380 
1381     /* All done. */
1382     return fbuf;
1383 }
1384 
1385 /* not even remotely tested yet */
fbListenerOwnSocketCollectorTLS(fbListener_t * listener,int sock,GError ** err)1386 fBuf_t  *fbListenerOwnSocketCollectorTLS(
1387     fbListener_t   *listener,
1388     int             sock,
1389     GError        **err)
1390 {
1391     fbCollector_t   *collector  = NULL;
1392     fBuf_t          *fbuf       = NULL;
1393 
1394     g_assert(listener);
1395 
1396     if (sock <= 2) {
1397         /* invalid socket */
1398         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1399                     "Invalid socket descriptor");
1400         return NULL;
1401     }
1402 
1403     listener->spec->transport = FB_TLS_TCP;
1404 
1405 /*    collector = fbCollectorAllocTLS(listener, NULL, sock, NULL, 0, err);*/
1406 
1407     fbuf = fBufAllocForCollection(fbSessionClone(listener->session),collector);
1408 
1409     fBufSetAutomaticMode(fbuf, FALSE);
1410 
1411     /* Add buffer to the file descriptor table */
1412     /*g_hash_table_insert(listener->fdtab, GINT_TO_POINTER(sock), fbuf);*/
1413 
1414     /* store the collector handle */
1415     listener->collectorHandle = collector;
1416 
1417     listener->lsock = sock;
1418 
1419     (void)err;
1420 
1421     /* All done. */
1422     return fbuf;
1423 }
1424 
fbListenerRemoveLastBuf(fBuf_t * fbuf,fbListener_t * listener)1425 void fbListenerRemoveLastBuf(
1426     fBuf_t         *fbuf,
1427     fbListener_t   *listener)
1428 {
1429     if (listener->lastbuf == fbuf) {
1430         listener->lastbuf = NULL;
1431     }
1432 }
1433 
fbListenerCallAppInit(fbListener_t * listener,fbUDPConnSpec_t * spec,GError ** err)1434 gboolean fbListenerCallAppInit(
1435     fbListener_t       *listener,
1436     fbUDPConnSpec_t    *spec,
1437     GError             **err)
1438 {
1439     if (listener->appinit) {
1440         if (!listener->appinit(listener, &(spec->ctx), listener->lsock,
1441                                &(spec->peer.so), spec->peerlen, err)) {
1442             return FALSE;
1443         }
1444     }
1445 
1446     return TRUE;
1447 
1448 }
1449 
fbListenerSetPeerSession(fbListener_t * listener,fbSession_t * session)1450 fbSession_t *fbListenerSetPeerSession(
1451     fbListener_t        *listener,
1452     fbSession_t         *session)
1453 {
1454     fbSession_t *new_session = session;
1455 
1456     if (!new_session) {
1457         new_session = fbSessionClone(listener->session);
1458     }
1459 
1460     listener->session = new_session;
1461 
1462     fBufSetSession(listener->lastbuf, new_session);
1463 
1464     fbSessionSetTemplateBuffer(new_session, listener->lastbuf);
1465 
1466     return new_session;
1467 
1468 }
1469