1 /**
2  ** yafcollect.c
3  ** Yet Another Flow IPFIX collector
4  **
5  ** ------------------------------------------------------------------------
6  ** Copyright (C) 2006-2016 Carnegie Mellon University. All Rights Reserved.
7  ** ------------------------------------------------------------------------
8  ** Authors: Brian Trammell
9  ** ------------------------------------------------------------------------
10  ** @OPENSOURCE_HEADER_START@
11  ** Use of the YAF system and related source code is subject to the terms
12  ** of the following licenses:
13  **
14  ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991
15  ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.227.7013
16  **
17  ** NO WARRANTY
18  **
19  ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER
20  ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY
21  ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN
22  ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY
23  ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT
24  ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE,
25  ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE
26  ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT,
27  ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY
28  ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF
29  ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES.
30  ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF
31  ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON
32  ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE
33  ** DELIVERABLES UNDER THIS LICENSE.
34  **
35  ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie
36  ** Mellon University, its trustees, officers, employees, and agents from
37  ** all claims or demands made against them (and any related losses,
38  ** expenses, or attorney's fees) arising out of, or relating to Licensee's
39  ** and/or its sub licensees' negligent use or willful misuse of or
40  ** negligent conduct or willful misconduct regarding the Software,
41  ** facilities, or other rights or assistance granted by Carnegie Mellon
42  ** University under this License, including, but not limited to, any
43  ** claims of product liability, personal injury, death, damage to
44  ** property, or violation of any laws or regulations.
45  **
46  ** Carnegie Mellon University Software Engineering Institute authored
47  ** documents are sponsored by the U.S. Department of Defense under
48  ** Contract FA8721-05-C-0003. Carnegie Mellon University retains
49  ** copyrights in all material produced under this contract. The U.S.
50  ** Government retains a non-exclusive, royalty-free license to publish or
51  ** reproduce these documents, or allow others to do so, for U.S.
52  ** Government purposes only pursuant to the copyright license under the
53  ** contract clause at 252.227.7013.
54  **
55  ** @OPENSOURCE_HEADER_END@
56  ** ------------------------------------------------------------------------
57  */
58 
59 #define _YAF_SOURCE_
60 #include <yaf/autoinc.h>
61 #include <airframe/mio.h>
62 #include <airframe/mio_config.h>
63 #include <airframe/mio_sink_file.h>
64 #include <airframe/logconfig.h>
65 #include <airframe/daeconfig.h>
66 #include <airframe/airutil.h>
67 #include <airframe/privconfig.h>
68 #include <yaf/yafcore.h>
69 #include "yafctx.h"
70 
71 typedef struct ycContext_st {
72     uint32_t            outtime;
73     fBuf_t              *obuf;
74     fBuf_t              *ibuf;
75     gboolean            ibuf_ready;
76     GString             *pstr;
77     yfFlow_t            flow;
78 } ycContext_t;
79 
80 /* stats */
81 static uint32_t yac_files = 0;
82 static uint32_t yac_flows = 0;
83 
84 /* GOption managed options */
85 static int          yac_rotate = 300;
86 static char         *yac_transport = "tcp";
87 static gboolean     yac_tls = FALSE;
88 static gboolean     yac_printall = FALSE;
89 
90 static fbConnSpec_t yac_inspec = FB_CONNSPEC_INIT;
91 
92 static yfConfig_t yaf_config = YF_CONFIG_INIT;
93 
94 /* MIO command-line configuration */
95 static uint32_t     yac_cliflags =  MIO_F_CLI_FILE_OUT |
96                                     MIO_F_CLI_DIR_OUT;
97 
98 AirOptionEntry yac_optentries[] = {
99     AF_OPTION( "in", 'i', 0, AF_OPT_TYPE_STRING, &(yac_inspec.host),
100       "Hostname or address to listen on", NULL ),
101     AF_OPTION( "rotate-delay", 'I', 0, AF_OPT_TYPE_INT, &yac_rotate,
102       "Output file rotation delay [300, 5m]", "sec" ),
103     AF_OPTION( "transport", (char)0, 0, AF_OPT_TYPE_STRING, &yac_transport,
104       "Set IPFIX transport (tcp, udp, sctp) [tcp]", "protocol" ),
105     AF_OPTION( "port", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.svc),
106       "Select IPFIX listener port [4739, 4740]", "port" ),
107     AF_OPTION( "tls", (char)0, 0, AF_OPT_TYPE_NONE, &yac_tls,
108       "Use TLS/DTLS to secure IPFIX export", NULL ),
109     AF_OPTION( "tls-ca", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.ssl_ca_file),
110       "Specify TLS Certificate Authority file", "cafile" ),
111     AF_OPTION( "tls-cert", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.ssl_cert_file),
112       "Specify TLS Certificate file", "certfile" ),
113     AF_OPTION( "tls-key", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.ssl_key_file),
114       "Specify TLS Private Key file", "keyfile" ),
115     AF_OPTION( "tls-pass", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.ssl_key_pass),
116       "Specify TLS Private Key password", "password" ),
117     AF_OPTION( "print-all", (char)0, 0, AF_OPT_TYPE_NONE, &yac_printall,
118       "print all flows to stdout as received", NULL ),
119     AF_OPTION_END
120 };
121 
ycParseOptions(int * argc,char ** argv[])122 static void ycParseOptions(
123     int             *argc,
124     char            **argv[]) {
125 
126     AirOptionCtx *aoctx = NULL;
127 
128     aoctx = air_option_context_new("", argc, argv, yac_optentries);
129 
130     mio_add_option_group(aoctx, yac_cliflags);
131     daec_add_option_group(aoctx);
132     privc_add_option_group(aoctx);
133     logc_add_option_group(aoctx, "yafcollect", VERSION);
134 
135     air_option_context_set_help_enabled(aoctx);
136     air_option_context_parse(aoctx);
137 
138     air_option_context_free(aoctx);
139 }
140 
141 /* the following is not 64-bit clean. */
142 
143 /*
144 static gboolean ycConnectDebug(
145     fbListener_t                *listener,
146     void                        **ctx,
147     int                         fd,
148     struct sockaddr             *speer,
149     size_t                      peerlen,
150     GError                      **err)
151 {
152     char                        pabuf[256];
153     union {
154         struct sockaddr         *so;
155         struct sockaddr_in      *ip4;
156         struct sockaddr_in6     *ip6;
157     }                           peer;
158 
159     peer.so = speer;
160 
161     if (peer.so->sa_family == AF_INET) {
162         g_debug("New IPv4 connection from %s",
163                 inet_ntop(AF_INET, &(peer.ip4->sin_addr), pabuf, sizeof(pabuf)));
164     } else if (peer.so->sa_family == AF_INET6) {
165         g_debug("New IPv6 connection from %s",
166                 inet_ntop(AF_INET6, &(peer.ip6->sin6_addr), pabuf, sizeof(pabuf)));
167     } else {
168         g_debug("New connection from unknown AF %u", peer.so->sa_family);
169     }
170 
171     return TRUE;
172 }
173 */
174 
ycOpenListener(MIOSource * source,void * vctx,uint32_t * flags,GError ** err)175 static gboolean ycOpenListener(
176     MIOSource               *source,
177     void                    *vctx,
178     uint32_t                *flags,
179     GError                  **err)
180 {
181     /* create listener */
182     if (!(source->vsp = yfListenerForSpec(&yac_inspec, NULL,
183                                           NULL, err))) {
184         *flags |= (MIO_F_CTL_ERROR | MIO_F_CTL_TERMINATE);
185         return FALSE;
186     }
187 
188     return TRUE;
189 }
190 
ycCloseListener(MIOSource * source,void * vctx,uint32_t * flags,GError ** err)191 static gboolean ycCloseListener(
192     MIOSource               *source,
193     void                    *vctx,
194     uint32_t                *flags,
195     GError                  **err)
196 {
197     /* FIXME should shut the listener down perhaps? */
198 
199     return TRUE;
200 }
201 
ycOpenFileSink(MIOSource * source,MIOSink * sink,void * vctx,uint32_t * flags,GError ** err)202 static gboolean ycOpenFileSink(
203     MIOSource               *source,
204     MIOSink                 *sink,
205     void                    *vctx,
206     uint32_t                *flags,
207     GError                  **err)
208 {
209     ycContext_t              *yx = (ycContext_t *)vctx;
210 
211     /* start a new FixWriter */
212     yx->obuf = yfWriterForFP(mio_fp(sink), 0, FALSE, err);
213 
214     /* check for failure */
215     if (yx->obuf) {
216         /* Done. Get timestamp for file. */
217         yx->outtime = time(NULL);
218         ++yac_files;
219         return TRUE;
220     } else {
221         *flags |= (MIO_F_CTL_SINKCLOSE | MIO_F_CTL_ERROR);
222         return FALSE;
223     }
224 }
225 
ycCloseFileSink(MIOSource * source,MIOSink * sink,void * vctx,uint32_t * flags,GError ** err)226 static gboolean ycCloseFileSink(
227     MIOSource               *source,
228     MIOSink                 *sink,
229     void                    *vctx,
230     uint32_t                *flags,
231     GError                  **err)
232 {
233     ycContext_t              *yx = (ycContext_t *)vctx;
234 
235     /* finish the message */
236     if(yfWriterClose(yx->obuf, TRUE, err)) {
237         yx->obuf = NULL;
238         return TRUE;
239     } else {
240         *flags |= MIO_F_CTL_ERROR;
241         return FALSE;
242     }
243 }
244 
ycProcess(MIOSource * source,MIOSink * sink,void * vctx,uint32_t * flags,GError ** err)245 static gboolean ycProcess(
246     MIOSource               *source,
247     MIOSink                 *sink,
248     void                    *vctx,
249     uint32_t                *flags,
250     GError                  **err)
251 {
252     ycContext_t              *yx = (ycContext_t *)vctx;
253     fbListener_t             *listener = (fbListener_t *)source->vsp;
254     yfContext_t              ctx = YF_CTX_INIT;
255 
256     ctx.fbuf = yx->obuf;
257     ctx.cfg = &yaf_config;
258 
259     /* Check for end of output file */
260     if (yac_rotate && (time(NULL) > yx->outtime + yac_rotate)) {
261         *flags |= MIO_F_CTL_SINKCLOSE;
262     }
263 
264     /* Check for quit */
265     if (daec_did_quit()) {
266         *flags |= MIO_F_CTL_TERMINATE;
267         return TRUE;
268     }
269 
270     /* Check to see if we need to wait for a buffer */
271     if (!yx->ibuf || !yx->ibuf_ready) {
272         if (!(yx->ibuf = fbListenerWait(listener, err))) {
273             if (g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD) ||
274                 g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_CONN)) {
275                 /* FIXME this quits on any interrupt */
276                 daec_quit();
277                 g_critical("Error on read: %s", (*err)->message);
278                 g_clear_error(err);
279                 *flags |= MIO_F_CTL_TERMINATE;
280                 return TRUE;
281             } else {
282                 return FALSE;
283             }
284         }
285     }
286 
287     /* presume our buffer is ready and process a flow */
288     yx->ibuf_ready = TRUE;
289     if (yfReadFlowExtended(yx->ibuf, &(yx->flow), err)) {
290 
291         /* Print it for debugging purposes */
292         if (yx->pstr) {
293             g_string_truncate(yx->pstr, 0);
294         } else {
295             yx->pstr = g_string_new("");
296         }
297         yfPrintString(yx->pstr, &(yx->flow));
298         if (yac_printall) {
299             fprintf(stdout, "flow: %s",yx->pstr->str);
300         }
301 
302         /* Got a flow. Write it. */
303         if (yfWriteFlow(&ctx, &(yx->flow), err)) {
304             /* Read and written. Done. */
305             ++yac_flows;
306             return TRUE;
307         } else {
308             /* Write error. Fatal. */
309             *flags |= MIO_F_CTL_ERROR;
310             return FALSE;
311         }
312     } else {
313         if (g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_EOM)) {
314             /* End of message. Set ibuf not ready, keep going. */
315             g_clear_error(err);
316             yx->ibuf_ready = FALSE;
317             return TRUE;
318         } else if (g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD)) {
319             /* just keep going if the error is "no packet" */
320             g_clear_error(err);
321             return TRUE;
322         } else {
323             /* Close the buffer */
324             fBufFree(yx->ibuf);
325             yx->ibuf_ready = FALSE;
326             yx->ibuf = NULL;
327 
328             if (g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_EOF)) {
329                 /* EOF on a single collector not an issue. */
330                 g_clear_error(err);
331                 g_debug("Normal connection close");
332                 return TRUE;
333             } else {
334                 /* bad message. no doughnut. chuck it but keep the socket. */
335                 sink->active = FALSE;
336                 *flags |= MIO_F_CTL_ERROR;
337                 return FALSE;
338             }
339         }
340     }
341 }
342 
main(int argc,char * argv[])343 int main (
344     int             argc,
345     char            *argv[])
346 {
347     GError          *err = NULL;
348     ycContext_t     yx;
349     MIOSource       source;
350     MIOSink         sink;
351     MIOAppDriver    adrv;
352     uint32_t        miodflags;
353     int rv          = 0;
354 
355     /* parse options */
356     ycParseOptions(&argc, &argv);
357 
358     /* set up logging */
359     if (!logc_setup(&err)) {
360         air_opterr("%s", err->message);
361     }
362 
363     /* fork if necessary */
364     if (!daec_setup(&err)) {
365         air_opterr("%s", err->message);
366     }
367 
368     /* initialize MIO flags */
369     miodflags = 0;
370 
371     /* default port */
372     if (!yac_inspec.svc) yac_inspec.svc = yac_tls ? "4740" : "4739";
373 
374     if (strcmp(yac_transport, "tcp") == 0) {
375         if (yac_tls) {
376             yac_inspec.transport = FB_TLS_TCP;
377         } else {
378             yac_inspec.transport = FB_TCP;
379         }
380     } else if (strcmp(yac_transport, "udp") == 0) {
381         if (yac_tls) {
382             yac_inspec.transport = FB_DTLS_UDP;
383         } else {
384             yac_inspec.transport = FB_UDP;
385         }
386     } else if (strcmp(yac_transport, "sctp") == 0) {
387         if (yac_tls) {
388             yac_inspec.transport = FB_DTLS_SCTP;
389         } else {
390             yac_inspec.transport = FB_SCTP;
391         }
392     } else {
393         air_opterr("Unsupported IPFIX transport protocol %s", yac_transport);
394     }
395 
396     /* create a source around a listener */
397     if (!mio_source_init_app(&source, mio_ov_in, MIO_T_APP, NULL, &err)) {
398         air_opterr("Cannot set up MIO input: %s", err->message);
399     }
400 
401     /* set up sink */
402     if (!mio_config_sink(&source, &sink, "ipfix-%T.yaf", yac_cliflags,
403                          &miodflags, &err)) {
404         air_opterr("Cannot set up output: %s", err->message);
405     }
406 
407     /* initialize yafcollect context */
408     yfFlowPrepare(&(yx.flow));
409     yx.obuf = NULL;
410     yx.ibuf = NULL;
411     yx.ibuf_ready = FALSE;
412     yx.pstr = NULL;
413     yx.outtime = 0;
414 
415     /* set up an app driver */
416     adrv.app_open_source = ycOpenListener;
417     adrv.app_close_source = ycCloseListener;
418     adrv.app_open_sink = ycOpenFileSink;
419     adrv.app_close_sink = ycCloseFileSink;
420     adrv.app_process = ycProcess;
421 
422     /* do dispatch here */
423     if (!mio_dispatch_loop(&source, &sink, &adrv, &yx, miodflags, mio_ov_poll,
424                            1, mio_ov_poll)) {
425         rv = 1;
426     }
427 
428     g_message("yafcollect terminating");
429     g_message("Processed %u flows into %u files", yac_flows, yac_files);
430 
431     return rv;
432 }
433