1 /*
2  * Copyright (c) 2008-2021 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *    http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include <assert.h>
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <signal.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <unistd.h>
27 
28 #include <nmsg.h>
29 
30 #include "nmsgtool.h"
31 #include "kickfile.h"
32 
33 /* Globals. */
34 
35 static nmsgtool_ctx ctx;
36 
37 static argv_t args[] = {
38 	{ 'b',	"bpf",
39 		ARGV_CHAR_P,
40 		&ctx.bpfstr,
41 		"filter",
42 		"filter pcap inputs with this bpf" },
43 
44 	{ 'B', "byterate",
45 		ARGV_INT,
46 		&ctx.byte_rate,
47 		"byterate",
48 		"ingress byte rate limit for file input" },
49 
50 	{ 'c',	"count",
51 		ARGV_INT,
52 		&ctx.count,
53 		"count",
54 		"stop or reopen after count payloads output" },
55 
56 	{ 'C', "readchan",
57 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
58 		&ctx.r_channel,
59 		"channel",
60 		"read nmsg data from socket(s)" },
61 
62 	{ 'd',	"debug",
63 		ARGV_INCR,
64 		&ctx.debug,
65 		NULL,
66 		"increment debugging level" },
67 
68 	{ 'D', "daemon",
69 		ARGV_BOOL,
70 		&ctx.daemon,
71 		NULL,
72 		"fork into background" },
73 
74 	{ 'e', "endline",
75 		ARGV_CHAR_P,
76 		&ctx.endline,
77 		"endline",
78 		"continuation separator" },
79 
80 	{ 'f', "readpres",
81 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
82 		&ctx.r_pres,
83 		"file",
84 		"read pres format data from file" },
85 
86 	{ 'F',	"filter",
87 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
88 		&ctx.filters,
89 		"dso[,param]",
90 		"filter nmsg payloads with module" },
91 
92 	{ '\0',	"getgroup",
93 		ARGV_CHAR_P,
94 		&ctx.get_group_str,
95 		"grname",
96 		"only process payloads with this group name" },
97 
98 	{ '\0', "getoperator",
99 		ARGV_CHAR_P,
100 		&ctx.get_operator_str,
101 		"opname",
102 		"only process payloads with this operator name" },
103 
104 	{ '\0', "getsource",
105 		ARGV_CHAR_P,
106 		&ctx.get_source_str,
107 		"sonum",
108 		"only process payloads with this source value" },
109 
110 	{ 'h',	"help",
111 		ARGV_BOOL,
112 		&ctx.help,
113 		NULL,
114 		"display help text and exit" },
115 
116 	{ 'i',	"readif",
117 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
118 		&ctx.r_pcapif,
119 		"if[+][,snap]",
120 		"read pcap data from interface ('+' = promisc)" },
121 
122 
123 	{ 'j', "readjson",
124 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
125 		&ctx.r_json,
126 		"file",
127 #ifdef HAVE_YAJL
128 		"read json format data from file" },
129 #else /* HAVE_YAJL */
130 		"read json format data from file (no support)" },
131 #endif /* HAVE_YAJL */
132 
133 	{ 'J', "writejson",
134 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
135 		&ctx.w_json,
136 		"file",
137 #ifdef HAVE_YAJL
138 		"write json format data to file" },
139 #else /* HAVE_YAJL */
140 		"write json format data to file (no support)" },
141 #endif /* HAVE_YAJL */
142 
143 	{ 'k',	"kicker",
144 		ARGV_CHAR_P,
145 		&ctx.kicker,
146 		"cmd",
147 		"make -c, -t continuous; run cmd on new files" },
148 
149 	{ 'l', "readsock",
150 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
151 		&ctx.r_sock,
152 		"so",
153 		"read nmsg data from socket (addr/port)" },
154 
155 	{ 'L', "readzsock",
156 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
157 		&ctx.r_zsock,
158 		"zep",
159 #ifdef HAVE_LIBZMQ
160 		"read nmsg data from ZMQ endpoint" },
161 #else /* HAVE_LIBZMQ */
162 		"read nmsg data from ZMQ endpoint (no support)" },
163 #endif /* HAVE_LIBZMQ */
164 
165 	{ 'm', "mtu",
166 		ARGV_INT,
167 		&ctx.mtu,
168 		"mtu",
169 		"MTU for datagram socket outputs" },
170 
171 	{ '\0', "mirror",
172 		ARGV_BOOL,
173 		&ctx.mirror,
174 		NULL,
175 		"mirror payloads across data outputs" },
176 
177 	{ 'o', "writepres",
178 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
179 		&ctx.w_pres,
180 		"file",
181 		"write pres format data to file" },
182 
183 	{ 'p',	"readpcap",
184 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
185 		&ctx.r_pcapfile,
186 		"file",
187 		"read pcap data from file" },
188 
189 	{ 'P', "pidfile",
190 		ARGV_CHAR_P,
191 		&ctx.pidfile,
192 		"file",
193 		"write PID into file" },
194 
195 	{ '\0', "policy",
196 		ARGV_CHAR_P,
197 		&ctx.filter_policy,
198 		"ACCEPT|DROP",
199 		"default filter chain policy" },
200 
201 	{ 'r', "readnmsg",
202 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
203 		&ctx.r_nmsg,
204 		"file",
205 		"read nmsg data from file" },
206 
207 
208 	{ 'R', "randomize",
209 		ARGV_BOOL,
210 		&ctx.interval_randomized,
211 		NULL,
212 		"randomize beginning of -t interval" },
213 
214 	{ 's', "writesock",
215 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
216 		&ctx.w_sock,
217 		"so[,r[,f]]",
218 		"write nmsg data to socket (addr/port)" },
219 
220 	{ 'S', "writezsock",
221 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
222 		&ctx.w_zsock,
223 		"zep",
224 #ifdef HAVE_LIBZMQ
225 		"write nmsg data to ZMQ endpoint" },
226 #else /* HAVE_LIBZMQ */
227 		"write nmsg data to ZMQ endpoint (no support)" },
228 #endif /* HAVE_LIBZMQ */
229 
230 	{ '\0',	"setgroup",
231 		ARGV_CHAR_P,
232 		&ctx.set_group_str,
233 		"grname",
234 		"set payload group to this value" },
235 
236 	{ '\0',	"setoperator",
237 		ARGV_CHAR_P,
238 		&ctx.set_operator_str,
239 		"opname",
240 		"set payload operator to this value" },
241 
242 	{ '\0',	"setsource",
243 		ARGV_CHAR_P,
244 		&ctx.set_source_str,
245 		"sonum",
246 		"set payload source to this value" },
247 
248 	{ 't',	"interval",
249 		ARGV_INT,
250 		&ctx.interval,
251 		"secs",
252 		"stop or reopen after secs have elapsed" },
253 
254 	{ 'T', "msgtype",
255 		ARGV_CHAR_P,
256 		&ctx.mname,
257 		"msgtype",
258 		"message type" },
259 
260 	{ 'U', "username",
261 		ARGV_CHAR_P,
262 		&ctx.username,
263 		"user",
264 		"drop privileges and run as user" },
265 
266 	{ '\0', "unbuffered",
267 		ARGV_BOOL,
268 		&ctx.unbuffered,
269 		NULL,
270 		"don't buffer writes to outputs" },
271 
272 	{ 'v', "version",
273 		ARGV_BOOL,
274 		&ctx.version,
275 		NULL,
276 		"print version" },
277 
278 	{ 'V', "vendor",
279 		ARGV_CHAR_P,
280 		&ctx.vname,
281 		"vendor",
282 		"vendor" },
283 
284 	{ 'w', "writenmsg",
285 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
286 		&ctx.w_nmsg,
287 		"file",
288 		"write nmsg data to file" },
289 
290 	{ 'Z', "readzchan",
291 		ARGV_CHAR_P | ARGV_FLAG_ARRAY,
292 		&ctx.r_zchannel,
293 		"zchannel",
294 #ifdef HAVE_LIBZMQ
295 		"read nmsg data from ZMQ channels" },
296 #else /* HAVE_LIBZMQ */
297 		"read nmsg data from ZMQ channels (no support)" },
298 #endif /* HAVE_LIBZMQ */
299 
300 	{ 'z', "zlibout",
301 		ARGV_BOOL,
302 		&ctx.zlibout,
303 		NULL,
304 		"compress nmsg output" },
305 
306 	{ ARGV_LAST, 0, 0, 0, 0, 0 }
307 };
308 
309 /* Forward. */
310 
311 static void print_io_stats(nmsg_io_t);
312 static void io_close(struct nmsg_io_close_event *);
313 static void setup_signals(void);
314 static void signal_handler(int);
315 
316 /* Functions. */
317 
main(int argc,char ** argv)318 int main(int argc, char **argv) {
319 	nmsg_res res;
320 
321 	/* parse command line arguments */
322 	argv_process(args, argc, argv);
323 
324 	if (ctx.debug < 1)
325 		ctx.debug = 1;
326 	nmsg_set_debug(ctx.debug);
327 	res = nmsg_init();
328 	if (res != nmsg_res_success) {
329 		fprintf(stderr, "nmsgtool: unable to initialize libnmsg\n");
330 		return (EXIT_FAILURE);
331 	}
332 	if (ctx.debug >= 2)
333 #ifdef HAVE_LIBZMQ
334 		fprintf(stderr, "nmsgtool: version " VERSION "\n");
335 #else /* HAVE_LIBZMQ */
336 		fprintf(stderr, "nmsgtool: version " VERSION " (without libzmq support)\n");
337 #endif /* HAVE_LIBZMQ */
338 
339 	/* initialize the nmsg_io engine */
340 	ctx.io = nmsg_io_init();
341 	assert(ctx.io != NULL);
342 	nmsg_io_set_close_fp(ctx.io, io_close);
343 
344 	/* process arguments and load inputs/outputs into the nmsg_io engine */
345 	process_args(&ctx);
346 
347 	setup_signals();
348 
349 	/* run the nmsg_io engine */
350 	res = nmsg_io_loop(ctx.io);
351 
352 	/* print stats, if requested */
353 	if (ctx.debug >= 2) {
354 		print_io_stats(ctx.io);
355 	}
356 
357 	/* cleanup */
358 	if (ctx.pidfile != NULL) {
359 		if (unlink(ctx.pidfile) != 0) {
360 			fprintf(stderr, "nmsgtool: unlink() failed: %s\n",
361 				strerror(errno));
362 		}
363 	}
364 	nmsg_io_destroy(&ctx.io);
365 #ifdef HAVE_LIBZMQ
366 	if (ctx.zmq_ctx)
367 		zmq_term(ctx.zmq_ctx);
368 #endif /* HAVE_LIBZMQ */
369 	free(ctx.endline_str);
370 	argv_cleanup(args);
371 
372 	return (res);
373 }
374 
375 void
usage(const char * msg)376 usage(const char *msg) {
377 	if (msg)
378 		fprintf(stderr, "%s: usage error: %s\n", argv_program, msg);
379 	nmsg_io_destroy(&ctx.io);
380 	exit(argv_usage(args, ARGV_USAGE_DEFAULT));
381 }
382 
383 void
setup_nmsg_output(nmsgtool_ctx * c,nmsg_output_t output)384 setup_nmsg_output(nmsgtool_ctx *c, nmsg_output_t output) {
385 	nmsg_output_set_buffered(output, !(c->unbuffered));
386 	nmsg_output_set_endline(output, c->endline_str);
387 	nmsg_output_set_zlibout(output, c->zlibout);
388 	nmsg_output_set_source(output, c->set_source);
389 	nmsg_output_set_operator(output, c->set_operator);
390 	nmsg_output_set_group(output, c->set_group);
391 }
392 
393 void
setup_nmsg_input(nmsgtool_ctx * c,nmsg_input_t input)394 setup_nmsg_input(nmsgtool_ctx *c, nmsg_input_t input) {
395 	if (c->vid != 0 && c->msgtype != 0)
396 		nmsg_input_set_filter_msgtype(input, c->vid, c->msgtype);
397 	nmsg_input_set_filter_source(input, c->get_source);
398 	nmsg_input_set_filter_operator(input, c->get_operator);
399 	nmsg_input_set_filter_group(input, c->get_group);
400 }
401 
402 /* Private functions. */
403 
404 static void
print_io_stats(nmsg_io_t io)405 print_io_stats(nmsg_io_t io) {
406 	uint64_t sum_in = 0, sum_out = 0, container_drops = 0, container_recvs = 0;
407 
408 	if (nmsg_io_get_stats(io, &sum_in, &sum_out, &container_recvs, &container_drops) == nmsg_res_success)
409 		fprintf(stderr,
410 			"%s: totals: payloads_in %"PRIu64
411 			" payloads_out %"PRIu64
412 			" container_recvs %"PRIu64
413 		        " container_drops %"PRIu64"\n",
414 			argv_program, sum_in, sum_out, container_recvs, container_drops);
415 }
416 
417 static void
io_close(struct nmsg_io_close_event * ce)418 io_close(struct nmsg_io_close_event *ce) {
419 	struct kickfile *kf;
420 
421 	if (ctx.debug >= 2) {
422 		if (ce->close_type != nmsg_io_close_type_eof &&
423 		    ce->user != NULL && ce->user == ctx.stats_user) {
424 			print_io_stats(ce->io);
425 		}
426 	}
427 
428 	if (ctx.debug >= 5) {
429 		fprintf(stderr, "entering io_close()\n");
430 		fprintf(stderr, "%s: ce->io_type = %u\n", __func__, ce->io_type);
431 		fprintf(stderr, "%s: ce->close_type = %u\n", __func__, ce->close_type);
432 		fprintf(stderr, "%s: ce->user = %p\n", __func__, ce->user);
433 		if (ce->io_type == nmsg_io_io_type_input) {
434 			fprintf(stderr, "%s: ce->input_type = %u\n", __func__, ce->input_type);
435 			fprintf(stderr, "%s: ce->input = %p\n", __func__, (void *)ce->input);
436 		} else if (ce->io_type == nmsg_io_io_type_output) {
437 			fprintf(stderr, "%s: ce->output_type = %u\n", __func__, ce->output_type);
438 			fprintf(stderr, "%s: ce->output = %p\n", __func__, (void *)ce->output);
439 		}
440 	}
441 
442 	if (ce->user != NULL && ce->user != (void *) -1 &&
443 	    ce->io_type == nmsg_io_io_type_output &&
444 	    (ce->output_type == nmsg_output_type_stream ||
445 	     ce->output_type == nmsg_output_type_pres ||
446 	     ce->output_type == nmsg_output_type_json))
447 	{
448 		nmsg_output_close(ce->output);
449 
450 		kf = (struct kickfile *) ce->user;
451 		kickfile_exec(kf);
452 		if (ce->close_type == nmsg_io_close_type_eof) {
453 			if (ctx.debug >= 2)
454 				fprintf(stderr, "%s: closed output: %s\n",
455 					argv_program, kf->basename);
456 			kickfile_destroy(&kf);
457 		} else {
458 			kickfile_rotate(kf);
459 
460 			char *output_type_descr = NULL;
461 			switch (ce->output_type) {
462 			case nmsg_output_type_stream:
463 				*(ce->output) = nmsg_output_open_file(
464 					open_wfile(kf->tmpname), NMSG_WBUFSZ_MAX);
465 				output_type_descr = "nmsg";
466 				break;
467 			case nmsg_output_type_pres:
468 				*(ce->output) = nmsg_output_open_pres(
469 					open_wfile(kf->tmpname));
470 				output_type_descr = "pres";
471 				break;
472 			case nmsg_output_type_json:
473 				*(ce->output) = nmsg_output_open_json(
474 					open_wfile(kf->tmpname));
475 				output_type_descr = "json";
476 				break;
477 			default:
478 				assert(0);
479 			}
480 			setup_nmsg_output(&ctx, *(ce->output));
481 			if (ctx.debug >= 2)
482 				fprintf(stderr,
483 					"%s: reopened %s file output: %s\n",
484 					argv_program, output_type_descr, kf->curname);
485 		}
486 	} else if (ce->io_type == nmsg_io_io_type_input) {
487 		if ((ce->user == NULL || ce->close_type == nmsg_io_close_type_eof) &&
488 		     ce->input != NULL)
489 		{
490 			if (ctx.debug >= 5) {
491 				fprintf(stderr, "%s: closing input %p\n", __func__, (void *)ce->input);
492 			}
493 			nmsg_input_close(ce->input);
494 		}
495 	} else if (ce->io_type == nmsg_io_io_type_output) {
496 		if ((ce->user == NULL || ce->close_type == nmsg_io_close_type_eof) &&
497 		     ce->output != NULL)
498 		{
499 			if (ctx.debug >= 5) {
500 				fprintf(stderr, "%s: closing output %p\n", __func__, (void *)ce->output);
501 			}
502 			nmsg_output_close(ce->output);
503 		}
504 	} else {
505 		/* should never be reached */
506 		assert(0);
507 	}
508 }
509 
510 static void
signal_handler(int sig)511 signal_handler(int sig __attribute__((unused))) {
512 	fprintf(stderr, "%s: signalled break\n", argv_program);
513 	nmsg_io_breakloop(ctx.io);
514 }
515 
516 static void
setup_signals(void)517 setup_signals(void) {
518 	struct sigaction sa;
519 
520 	memset(&sa, 0, sizeof(sa));
521 	sigemptyset(&sa.sa_mask);
522 	sigaddset(&sa.sa_mask, SIGINT);
523 	sigaddset(&sa.sa_mask, SIGTERM);
524 	sa.sa_handler = &signal_handler;
525 	if (sigaction(SIGINT, &sa, NULL) != 0) {
526 		perror("sigaction");
527 		exit(EXIT_FAILURE);
528 	}
529 	if (sigaction(SIGTERM, &sa, NULL) != 0) {
530 		perror("sigaction");
531 		exit(EXIT_FAILURE);
532 	}
533 }
534