1 /**
2 ** yafcollect.c
3 ** Yet Another Flow IPFIX collector
4 **
5 ** ------------------------------------------------------------------------
6 ** Copyright (C) 2006-2016 Carnegie Mellon University. All Rights Reserved.
7 ** ------------------------------------------------------------------------
8 ** Authors: Brian Trammell
9 ** ------------------------------------------------------------------------
10 ** @OPENSOURCE_HEADER_START@
11 ** Use of the YAF system and related source code is subject to the terms
12 ** of the following licenses:
13 **
14 ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991
15 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.227.7013
16 **
17 ** NO WARRANTY
18 **
19 ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER
20 ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY
21 ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN
22 ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY
23 ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT
24 ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE,
25 ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE
26 ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT,
27 ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY
28 ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF
29 ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES.
30 ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF
31 ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON
32 ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE
33 ** DELIVERABLES UNDER THIS LICENSE.
34 **
35 ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie
36 ** Mellon University, its trustees, officers, employees, and agents from
37 ** all claims or demands made against them (and any related losses,
38 ** expenses, or attorney's fees) arising out of, or relating to Licensee's
39 ** and/or its sub licensees' negligent use or willful misuse of or
40 ** negligent conduct or willful misconduct regarding the Software,
41 ** facilities, or other rights or assistance granted by Carnegie Mellon
42 ** University under this License, including, but not limited to, any
43 ** claims of product liability, personal injury, death, damage to
44 ** property, or violation of any laws or regulations.
45 **
46 ** Carnegie Mellon University Software Engineering Institute authored
47 ** documents are sponsored by the U.S. Department of Defense under
48 ** Contract FA8721-05-C-0003. Carnegie Mellon University retains
49 ** copyrights in all material produced under this contract. The U.S.
50 ** Government retains a non-exclusive, royalty-free license to publish or
51 ** reproduce these documents, or allow others to do so, for U.S.
52 ** Government purposes only pursuant to the copyright license under the
53 ** contract clause at 252.227.7013.
54 **
55 ** @OPENSOURCE_HEADER_END@
56 ** ------------------------------------------------------------------------
57 */
58
59 #define _YAF_SOURCE_
60 #include <yaf/autoinc.h>
61 #include <airframe/mio.h>
62 #include <airframe/mio_config.h>
63 #include <airframe/mio_sink_file.h>
64 #include <airframe/logconfig.h>
65 #include <airframe/daeconfig.h>
66 #include <airframe/airutil.h>
67 #include <airframe/privconfig.h>
68 #include <yaf/yafcore.h>
69 #include "yafctx.h"
70
71 typedef struct ycContext_st {
72 uint32_t outtime;
73 fBuf_t *obuf;
74 fBuf_t *ibuf;
75 gboolean ibuf_ready;
76 GString *pstr;
77 yfFlow_t flow;
78 } ycContext_t;
79
80 /* stats */
81 static uint32_t yac_files = 0;
82 static uint32_t yac_flows = 0;
83
84 /* GOption managed options */
85 static int yac_rotate = 300;
86 static char *yac_transport = "tcp";
87 static gboolean yac_tls = FALSE;
88 static gboolean yac_printall = FALSE;
89
90 static fbConnSpec_t yac_inspec = FB_CONNSPEC_INIT;
91
92 static yfConfig_t yaf_config = YF_CONFIG_INIT;
93
94 /* MIO command-line configuration */
95 static uint32_t yac_cliflags = MIO_F_CLI_FILE_OUT |
96 MIO_F_CLI_DIR_OUT;
97
98 AirOptionEntry yac_optentries[] = {
99 AF_OPTION( "in", 'i', 0, AF_OPT_TYPE_STRING, &(yac_inspec.host),
100 "Hostname or address to listen on", NULL ),
101 AF_OPTION( "rotate-delay", 'I', 0, AF_OPT_TYPE_INT, &yac_rotate,
102 "Output file rotation delay [300, 5m]", "sec" ),
103 AF_OPTION( "transport", (char)0, 0, AF_OPT_TYPE_STRING, &yac_transport,
104 "Set IPFIX transport (tcp, udp, sctp) [tcp]", "protocol" ),
105 AF_OPTION( "port", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.svc),
106 "Select IPFIX listener port [4739, 4740]", "port" ),
107 AF_OPTION( "tls", (char)0, 0, AF_OPT_TYPE_NONE, &yac_tls,
108 "Use TLS/DTLS to secure IPFIX export", NULL ),
109 AF_OPTION( "tls-ca", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.ssl_ca_file),
110 "Specify TLS Certificate Authority file", "cafile" ),
111 AF_OPTION( "tls-cert", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.ssl_cert_file),
112 "Specify TLS Certificate file", "certfile" ),
113 AF_OPTION( "tls-key", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.ssl_key_file),
114 "Specify TLS Private Key file", "keyfile" ),
115 AF_OPTION( "tls-pass", (char)0, 0, AF_OPT_TYPE_STRING, &(yac_inspec.ssl_key_pass),
116 "Specify TLS Private Key password", "password" ),
117 AF_OPTION( "print-all", (char)0, 0, AF_OPT_TYPE_NONE, &yac_printall,
118 "print all flows to stdout as received", NULL ),
119 AF_OPTION_END
120 };
121
ycParseOptions(int * argc,char ** argv[])122 static void ycParseOptions(
123 int *argc,
124 char **argv[]) {
125
126 AirOptionCtx *aoctx = NULL;
127
128 aoctx = air_option_context_new("", argc, argv, yac_optentries);
129
130 mio_add_option_group(aoctx, yac_cliflags);
131 daec_add_option_group(aoctx);
132 privc_add_option_group(aoctx);
133 logc_add_option_group(aoctx, "yafcollect", VERSION);
134
135 air_option_context_set_help_enabled(aoctx);
136 air_option_context_parse(aoctx);
137
138 air_option_context_free(aoctx);
139 }
140
141 /* the following is not 64-bit clean. */
142
143 /*
144 static gboolean ycConnectDebug(
145 fbListener_t *listener,
146 void **ctx,
147 int fd,
148 struct sockaddr *speer,
149 size_t peerlen,
150 GError **err)
151 {
152 char pabuf[256];
153 union {
154 struct sockaddr *so;
155 struct sockaddr_in *ip4;
156 struct sockaddr_in6 *ip6;
157 } peer;
158
159 peer.so = speer;
160
161 if (peer.so->sa_family == AF_INET) {
162 g_debug("New IPv4 connection from %s",
163 inet_ntop(AF_INET, &(peer.ip4->sin_addr), pabuf, sizeof(pabuf)));
164 } else if (peer.so->sa_family == AF_INET6) {
165 g_debug("New IPv6 connection from %s",
166 inet_ntop(AF_INET6, &(peer.ip6->sin6_addr), pabuf, sizeof(pabuf)));
167 } else {
168 g_debug("New connection from unknown AF %u", peer.so->sa_family);
169 }
170
171 return TRUE;
172 }
173 */
174
ycOpenListener(MIOSource * source,void * vctx,uint32_t * flags,GError ** err)175 static gboolean ycOpenListener(
176 MIOSource *source,
177 void *vctx,
178 uint32_t *flags,
179 GError **err)
180 {
181 /* create listener */
182 if (!(source->vsp = yfListenerForSpec(&yac_inspec, NULL,
183 NULL, err))) {
184 *flags |= (MIO_F_CTL_ERROR | MIO_F_CTL_TERMINATE);
185 return FALSE;
186 }
187
188 return TRUE;
189 }
190
ycCloseListener(MIOSource * source,void * vctx,uint32_t * flags,GError ** err)191 static gboolean ycCloseListener(
192 MIOSource *source,
193 void *vctx,
194 uint32_t *flags,
195 GError **err)
196 {
197 /* FIXME should shut the listener down perhaps? */
198
199 return TRUE;
200 }
201
ycOpenFileSink(MIOSource * source,MIOSink * sink,void * vctx,uint32_t * flags,GError ** err)202 static gboolean ycOpenFileSink(
203 MIOSource *source,
204 MIOSink *sink,
205 void *vctx,
206 uint32_t *flags,
207 GError **err)
208 {
209 ycContext_t *yx = (ycContext_t *)vctx;
210
211 /* start a new FixWriter */
212 yx->obuf = yfWriterForFP(mio_fp(sink), 0, FALSE, err);
213
214 /* check for failure */
215 if (yx->obuf) {
216 /* Done. Get timestamp for file. */
217 yx->outtime = time(NULL);
218 ++yac_files;
219 return TRUE;
220 } else {
221 *flags |= (MIO_F_CTL_SINKCLOSE | MIO_F_CTL_ERROR);
222 return FALSE;
223 }
224 }
225
ycCloseFileSink(MIOSource * source,MIOSink * sink,void * vctx,uint32_t * flags,GError ** err)226 static gboolean ycCloseFileSink(
227 MIOSource *source,
228 MIOSink *sink,
229 void *vctx,
230 uint32_t *flags,
231 GError **err)
232 {
233 ycContext_t *yx = (ycContext_t *)vctx;
234
235 /* finish the message */
236 if(yfWriterClose(yx->obuf, TRUE, err)) {
237 yx->obuf = NULL;
238 return TRUE;
239 } else {
240 *flags |= MIO_F_CTL_ERROR;
241 return FALSE;
242 }
243 }
244
ycProcess(MIOSource * source,MIOSink * sink,void * vctx,uint32_t * flags,GError ** err)245 static gboolean ycProcess(
246 MIOSource *source,
247 MIOSink *sink,
248 void *vctx,
249 uint32_t *flags,
250 GError **err)
251 {
252 ycContext_t *yx = (ycContext_t *)vctx;
253 fbListener_t *listener = (fbListener_t *)source->vsp;
254 yfContext_t ctx = YF_CTX_INIT;
255
256 ctx.fbuf = yx->obuf;
257 ctx.cfg = &yaf_config;
258
259 /* Check for end of output file */
260 if (yac_rotate && (time(NULL) > yx->outtime + yac_rotate)) {
261 *flags |= MIO_F_CTL_SINKCLOSE;
262 }
263
264 /* Check for quit */
265 if (daec_did_quit()) {
266 *flags |= MIO_F_CTL_TERMINATE;
267 return TRUE;
268 }
269
270 /* Check to see if we need to wait for a buffer */
271 if (!yx->ibuf || !yx->ibuf_ready) {
272 if (!(yx->ibuf = fbListenerWait(listener, err))) {
273 if (g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD) ||
274 g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_CONN)) {
275 /* FIXME this quits on any interrupt */
276 daec_quit();
277 g_critical("Error on read: %s", (*err)->message);
278 g_clear_error(err);
279 *flags |= MIO_F_CTL_TERMINATE;
280 return TRUE;
281 } else {
282 return FALSE;
283 }
284 }
285 }
286
287 /* presume our buffer is ready and process a flow */
288 yx->ibuf_ready = TRUE;
289 if (yfReadFlowExtended(yx->ibuf, &(yx->flow), err)) {
290
291 /* Print it for debugging purposes */
292 if (yx->pstr) {
293 g_string_truncate(yx->pstr, 0);
294 } else {
295 yx->pstr = g_string_new("");
296 }
297 yfPrintString(yx->pstr, &(yx->flow));
298 if (yac_printall) {
299 fprintf(stdout, "flow: %s",yx->pstr->str);
300 }
301
302 /* Got a flow. Write it. */
303 if (yfWriteFlow(&ctx, &(yx->flow), err)) {
304 /* Read and written. Done. */
305 ++yac_flows;
306 return TRUE;
307 } else {
308 /* Write error. Fatal. */
309 *flags |= MIO_F_CTL_ERROR;
310 return FALSE;
311 }
312 } else {
313 if (g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_EOM)) {
314 /* End of message. Set ibuf not ready, keep going. */
315 g_clear_error(err);
316 yx->ibuf_ready = FALSE;
317 return TRUE;
318 } else if (g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD)) {
319 /* just keep going if the error is "no packet" */
320 g_clear_error(err);
321 return TRUE;
322 } else {
323 /* Close the buffer */
324 fBufFree(yx->ibuf);
325 yx->ibuf_ready = FALSE;
326 yx->ibuf = NULL;
327
328 if (g_error_matches(*err, FB_ERROR_DOMAIN, FB_ERROR_EOF)) {
329 /* EOF on a single collector not an issue. */
330 g_clear_error(err);
331 g_debug("Normal connection close");
332 return TRUE;
333 } else {
334 /* bad message. no doughnut. chuck it but keep the socket. */
335 sink->active = FALSE;
336 *flags |= MIO_F_CTL_ERROR;
337 return FALSE;
338 }
339 }
340 }
341 }
342
main(int argc,char * argv[])343 int main (
344 int argc,
345 char *argv[])
346 {
347 GError *err = NULL;
348 ycContext_t yx;
349 MIOSource source;
350 MIOSink sink;
351 MIOAppDriver adrv;
352 uint32_t miodflags;
353 int rv = 0;
354
355 /* parse options */
356 ycParseOptions(&argc, &argv);
357
358 /* set up logging */
359 if (!logc_setup(&err)) {
360 air_opterr("%s", err->message);
361 }
362
363 /* fork if necessary */
364 if (!daec_setup(&err)) {
365 air_opterr("%s", err->message);
366 }
367
368 /* initialize MIO flags */
369 miodflags = 0;
370
371 /* default port */
372 if (!yac_inspec.svc) yac_inspec.svc = yac_tls ? "4740" : "4739";
373
374 if (strcmp(yac_transport, "tcp") == 0) {
375 if (yac_tls) {
376 yac_inspec.transport = FB_TLS_TCP;
377 } else {
378 yac_inspec.transport = FB_TCP;
379 }
380 } else if (strcmp(yac_transport, "udp") == 0) {
381 if (yac_tls) {
382 yac_inspec.transport = FB_DTLS_UDP;
383 } else {
384 yac_inspec.transport = FB_UDP;
385 }
386 } else if (strcmp(yac_transport, "sctp") == 0) {
387 if (yac_tls) {
388 yac_inspec.transport = FB_DTLS_SCTP;
389 } else {
390 yac_inspec.transport = FB_SCTP;
391 }
392 } else {
393 air_opterr("Unsupported IPFIX transport protocol %s", yac_transport);
394 }
395
396 /* create a source around a listener */
397 if (!mio_source_init_app(&source, mio_ov_in, MIO_T_APP, NULL, &err)) {
398 air_opterr("Cannot set up MIO input: %s", err->message);
399 }
400
401 /* set up sink */
402 if (!mio_config_sink(&source, &sink, "ipfix-%T.yaf", yac_cliflags,
403 &miodflags, &err)) {
404 air_opterr("Cannot set up output: %s", err->message);
405 }
406
407 /* initialize yafcollect context */
408 yfFlowPrepare(&(yx.flow));
409 yx.obuf = NULL;
410 yx.ibuf = NULL;
411 yx.ibuf_ready = FALSE;
412 yx.pstr = NULL;
413 yx.outtime = 0;
414
415 /* set up an app driver */
416 adrv.app_open_source = ycOpenListener;
417 adrv.app_close_source = ycCloseListener;
418 adrv.app_open_sink = ycOpenFileSink;
419 adrv.app_close_sink = ycCloseFileSink;
420 adrv.app_process = ycProcess;
421
422 /* do dispatch here */
423 if (!mio_dispatch_loop(&source, &sink, &adrv, &yx, miodflags, mio_ov_poll,
424 1, mio_ov_poll)) {
425 rv = 1;
426 }
427
428 g_message("yafcollect terminating");
429 g_message("Processed %u flows into %u files", yac_flows, yac_files);
430
431 return rv;
432 }
433