1 /*
2 * Copyright (c) 2008-2021 by Farsight Security, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 /* Import. */
18
19 #include <assert.h>
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <signal.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <unistd.h>
27
28 #include <nmsg.h>
29
30 #include "nmsgtool.h"
31 #include "kickfile.h"
32
33 /* Globals. */
34
35 static nmsgtool_ctx ctx;
36
37 static argv_t args[] = {
38 { 'b', "bpf",
39 ARGV_CHAR_P,
40 &ctx.bpfstr,
41 "filter",
42 "filter pcap inputs with this bpf" },
43
44 { 'B', "byterate",
45 ARGV_INT,
46 &ctx.byte_rate,
47 "byterate",
48 "ingress byte rate limit for file input" },
49
50 { 'c', "count",
51 ARGV_INT,
52 &ctx.count,
53 "count",
54 "stop or reopen after count payloads output" },
55
56 { 'C', "readchan",
57 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
58 &ctx.r_channel,
59 "channel",
60 "read nmsg data from socket(s)" },
61
62 { 'd', "debug",
63 ARGV_INCR,
64 &ctx.debug,
65 NULL,
66 "increment debugging level" },
67
68 { 'D', "daemon",
69 ARGV_BOOL,
70 &ctx.daemon,
71 NULL,
72 "fork into background" },
73
74 { 'e', "endline",
75 ARGV_CHAR_P,
76 &ctx.endline,
77 "endline",
78 "continuation separator" },
79
80 { 'f', "readpres",
81 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
82 &ctx.r_pres,
83 "file",
84 "read pres format data from file" },
85
86 { 'F', "filter",
87 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
88 &ctx.filters,
89 "dso[,param]",
90 "filter nmsg payloads with module" },
91
92 { '\0', "getgroup",
93 ARGV_CHAR_P,
94 &ctx.get_group_str,
95 "grname",
96 "only process payloads with this group name" },
97
98 { '\0', "getoperator",
99 ARGV_CHAR_P,
100 &ctx.get_operator_str,
101 "opname",
102 "only process payloads with this operator name" },
103
104 { '\0', "getsource",
105 ARGV_CHAR_P,
106 &ctx.get_source_str,
107 "sonum",
108 "only process payloads with this source value" },
109
110 { 'h', "help",
111 ARGV_BOOL,
112 &ctx.help,
113 NULL,
114 "display help text and exit" },
115
116 { 'i', "readif",
117 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
118 &ctx.r_pcapif,
119 "if[+][,snap]",
120 "read pcap data from interface ('+' = promisc)" },
121
122
123 { 'j', "readjson",
124 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
125 &ctx.r_json,
126 "file",
127 #ifdef HAVE_YAJL
128 "read json format data from file" },
129 #else /* HAVE_YAJL */
130 "read json format data from file (no support)" },
131 #endif /* HAVE_YAJL */
132
133 { 'J', "writejson",
134 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
135 &ctx.w_json,
136 "file",
137 #ifdef HAVE_YAJL
138 "write json format data to file" },
139 #else /* HAVE_YAJL */
140 "write json format data to file (no support)" },
141 #endif /* HAVE_YAJL */
142
143 { 'k', "kicker",
144 ARGV_CHAR_P,
145 &ctx.kicker,
146 "cmd",
147 "make -c, -t continuous; run cmd on new files" },
148
149 { 'l', "readsock",
150 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
151 &ctx.r_sock,
152 "so",
153 "read nmsg data from socket (addr/port)" },
154
155 { 'L', "readzsock",
156 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
157 &ctx.r_zsock,
158 "zep",
159 #ifdef HAVE_LIBZMQ
160 "read nmsg data from ZMQ endpoint" },
161 #else /* HAVE_LIBZMQ */
162 "read nmsg data from ZMQ endpoint (no support)" },
163 #endif /* HAVE_LIBZMQ */
164
165 { 'm', "mtu",
166 ARGV_INT,
167 &ctx.mtu,
168 "mtu",
169 "MTU for datagram socket outputs" },
170
171 { '\0', "mirror",
172 ARGV_BOOL,
173 &ctx.mirror,
174 NULL,
175 "mirror payloads across data outputs" },
176
177 { 'o', "writepres",
178 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
179 &ctx.w_pres,
180 "file",
181 "write pres format data to file" },
182
183 { 'p', "readpcap",
184 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
185 &ctx.r_pcapfile,
186 "file",
187 "read pcap data from file" },
188
189 { 'P', "pidfile",
190 ARGV_CHAR_P,
191 &ctx.pidfile,
192 "file",
193 "write PID into file" },
194
195 { '\0', "policy",
196 ARGV_CHAR_P,
197 &ctx.filter_policy,
198 "ACCEPT|DROP",
199 "default filter chain policy" },
200
201 { 'r', "readnmsg",
202 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
203 &ctx.r_nmsg,
204 "file",
205 "read nmsg data from file" },
206
207
208 { 'R', "randomize",
209 ARGV_BOOL,
210 &ctx.interval_randomized,
211 NULL,
212 "randomize beginning of -t interval" },
213
214 { 's', "writesock",
215 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
216 &ctx.w_sock,
217 "so[,r[,f]]",
218 "write nmsg data to socket (addr/port)" },
219
220 { 'S', "writezsock",
221 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
222 &ctx.w_zsock,
223 "zep",
224 #ifdef HAVE_LIBZMQ
225 "write nmsg data to ZMQ endpoint" },
226 #else /* HAVE_LIBZMQ */
227 "write nmsg data to ZMQ endpoint (no support)" },
228 #endif /* HAVE_LIBZMQ */
229
230 { '\0', "setgroup",
231 ARGV_CHAR_P,
232 &ctx.set_group_str,
233 "grname",
234 "set payload group to this value" },
235
236 { '\0', "setoperator",
237 ARGV_CHAR_P,
238 &ctx.set_operator_str,
239 "opname",
240 "set payload operator to this value" },
241
242 { '\0', "setsource",
243 ARGV_CHAR_P,
244 &ctx.set_source_str,
245 "sonum",
246 "set payload source to this value" },
247
248 { 't', "interval",
249 ARGV_INT,
250 &ctx.interval,
251 "secs",
252 "stop or reopen after secs have elapsed" },
253
254 { 'T', "msgtype",
255 ARGV_CHAR_P,
256 &ctx.mname,
257 "msgtype",
258 "message type" },
259
260 { 'U', "username",
261 ARGV_CHAR_P,
262 &ctx.username,
263 "user",
264 "drop privileges and run as user" },
265
266 { '\0', "unbuffered",
267 ARGV_BOOL,
268 &ctx.unbuffered,
269 NULL,
270 "don't buffer writes to outputs" },
271
272 { 'v', "version",
273 ARGV_BOOL,
274 &ctx.version,
275 NULL,
276 "print version" },
277
278 { 'V', "vendor",
279 ARGV_CHAR_P,
280 &ctx.vname,
281 "vendor",
282 "vendor" },
283
284 { 'w', "writenmsg",
285 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
286 &ctx.w_nmsg,
287 "file",
288 "write nmsg data to file" },
289
290 { 'Z', "readzchan",
291 ARGV_CHAR_P | ARGV_FLAG_ARRAY,
292 &ctx.r_zchannel,
293 "zchannel",
294 #ifdef HAVE_LIBZMQ
295 "read nmsg data from ZMQ channels" },
296 #else /* HAVE_LIBZMQ */
297 "read nmsg data from ZMQ channels (no support)" },
298 #endif /* HAVE_LIBZMQ */
299
300 { 'z', "zlibout",
301 ARGV_BOOL,
302 &ctx.zlibout,
303 NULL,
304 "compress nmsg output" },
305
306 { ARGV_LAST, 0, 0, 0, 0, 0 }
307 };
308
309 /* Forward. */
310
311 static void print_io_stats(nmsg_io_t);
312 static void io_close(struct nmsg_io_close_event *);
313 static void setup_signals(void);
314 static void signal_handler(int);
315
316 /* Functions. */
317
main(int argc,char ** argv)318 int main(int argc, char **argv) {
319 nmsg_res res;
320
321 /* parse command line arguments */
322 argv_process(args, argc, argv);
323
324 if (ctx.debug < 1)
325 ctx.debug = 1;
326 nmsg_set_debug(ctx.debug);
327 res = nmsg_init();
328 if (res != nmsg_res_success) {
329 fprintf(stderr, "nmsgtool: unable to initialize libnmsg\n");
330 return (EXIT_FAILURE);
331 }
332 if (ctx.debug >= 2)
333 #ifdef HAVE_LIBZMQ
334 fprintf(stderr, "nmsgtool: version " VERSION "\n");
335 #else /* HAVE_LIBZMQ */
336 fprintf(stderr, "nmsgtool: version " VERSION " (without libzmq support)\n");
337 #endif /* HAVE_LIBZMQ */
338
339 /* initialize the nmsg_io engine */
340 ctx.io = nmsg_io_init();
341 assert(ctx.io != NULL);
342 nmsg_io_set_close_fp(ctx.io, io_close);
343
344 /* process arguments and load inputs/outputs into the nmsg_io engine */
345 process_args(&ctx);
346
347 setup_signals();
348
349 /* run the nmsg_io engine */
350 res = nmsg_io_loop(ctx.io);
351
352 /* print stats, if requested */
353 if (ctx.debug >= 2) {
354 print_io_stats(ctx.io);
355 }
356
357 /* cleanup */
358 if (ctx.pidfile != NULL) {
359 if (unlink(ctx.pidfile) != 0) {
360 fprintf(stderr, "nmsgtool: unlink() failed: %s\n",
361 strerror(errno));
362 }
363 }
364 nmsg_io_destroy(&ctx.io);
365 #ifdef HAVE_LIBZMQ
366 if (ctx.zmq_ctx)
367 zmq_term(ctx.zmq_ctx);
368 #endif /* HAVE_LIBZMQ */
369 free(ctx.endline_str);
370 argv_cleanup(args);
371
372 return (res);
373 }
374
375 void
usage(const char * msg)376 usage(const char *msg) {
377 if (msg)
378 fprintf(stderr, "%s: usage error: %s\n", argv_program, msg);
379 nmsg_io_destroy(&ctx.io);
380 exit(argv_usage(args, ARGV_USAGE_DEFAULT));
381 }
382
383 void
setup_nmsg_output(nmsgtool_ctx * c,nmsg_output_t output)384 setup_nmsg_output(nmsgtool_ctx *c, nmsg_output_t output) {
385 nmsg_output_set_buffered(output, !(c->unbuffered));
386 nmsg_output_set_endline(output, c->endline_str);
387 nmsg_output_set_zlibout(output, c->zlibout);
388 nmsg_output_set_source(output, c->set_source);
389 nmsg_output_set_operator(output, c->set_operator);
390 nmsg_output_set_group(output, c->set_group);
391 }
392
393 void
setup_nmsg_input(nmsgtool_ctx * c,nmsg_input_t input)394 setup_nmsg_input(nmsgtool_ctx *c, nmsg_input_t input) {
395 if (c->vid != 0 && c->msgtype != 0)
396 nmsg_input_set_filter_msgtype(input, c->vid, c->msgtype);
397 nmsg_input_set_filter_source(input, c->get_source);
398 nmsg_input_set_filter_operator(input, c->get_operator);
399 nmsg_input_set_filter_group(input, c->get_group);
400 }
401
402 /* Private functions. */
403
404 static void
print_io_stats(nmsg_io_t io)405 print_io_stats(nmsg_io_t io) {
406 uint64_t sum_in = 0, sum_out = 0, container_drops = 0, container_recvs = 0;
407
408 if (nmsg_io_get_stats(io, &sum_in, &sum_out, &container_recvs, &container_drops) == nmsg_res_success)
409 fprintf(stderr,
410 "%s: totals: payloads_in %"PRIu64
411 " payloads_out %"PRIu64
412 " container_recvs %"PRIu64
413 " container_drops %"PRIu64"\n",
414 argv_program, sum_in, sum_out, container_recvs, container_drops);
415 }
416
417 static void
io_close(struct nmsg_io_close_event * ce)418 io_close(struct nmsg_io_close_event *ce) {
419 struct kickfile *kf;
420
421 if (ctx.debug >= 2) {
422 if (ce->close_type != nmsg_io_close_type_eof &&
423 ce->user != NULL && ce->user == ctx.stats_user) {
424 print_io_stats(ce->io);
425 }
426 }
427
428 if (ctx.debug >= 5) {
429 fprintf(stderr, "entering io_close()\n");
430 fprintf(stderr, "%s: ce->io_type = %u\n", __func__, ce->io_type);
431 fprintf(stderr, "%s: ce->close_type = %u\n", __func__, ce->close_type);
432 fprintf(stderr, "%s: ce->user = %p\n", __func__, ce->user);
433 if (ce->io_type == nmsg_io_io_type_input) {
434 fprintf(stderr, "%s: ce->input_type = %u\n", __func__, ce->input_type);
435 fprintf(stderr, "%s: ce->input = %p\n", __func__, (void *)ce->input);
436 } else if (ce->io_type == nmsg_io_io_type_output) {
437 fprintf(stderr, "%s: ce->output_type = %u\n", __func__, ce->output_type);
438 fprintf(stderr, "%s: ce->output = %p\n", __func__, (void *)ce->output);
439 }
440 }
441
442 if (ce->user != NULL && ce->user != (void *) -1 &&
443 ce->io_type == nmsg_io_io_type_output &&
444 (ce->output_type == nmsg_output_type_stream ||
445 ce->output_type == nmsg_output_type_pres ||
446 ce->output_type == nmsg_output_type_json))
447 {
448 nmsg_output_close(ce->output);
449
450 kf = (struct kickfile *) ce->user;
451 kickfile_exec(kf);
452 if (ce->close_type == nmsg_io_close_type_eof) {
453 if (ctx.debug >= 2)
454 fprintf(stderr, "%s: closed output: %s\n",
455 argv_program, kf->basename);
456 kickfile_destroy(&kf);
457 } else {
458 kickfile_rotate(kf);
459
460 char *output_type_descr = NULL;
461 switch (ce->output_type) {
462 case nmsg_output_type_stream:
463 *(ce->output) = nmsg_output_open_file(
464 open_wfile(kf->tmpname), NMSG_WBUFSZ_MAX);
465 output_type_descr = "nmsg";
466 break;
467 case nmsg_output_type_pres:
468 *(ce->output) = nmsg_output_open_pres(
469 open_wfile(kf->tmpname));
470 output_type_descr = "pres";
471 break;
472 case nmsg_output_type_json:
473 *(ce->output) = nmsg_output_open_json(
474 open_wfile(kf->tmpname));
475 output_type_descr = "json";
476 break;
477 default:
478 assert(0);
479 }
480 setup_nmsg_output(&ctx, *(ce->output));
481 if (ctx.debug >= 2)
482 fprintf(stderr,
483 "%s: reopened %s file output: %s\n",
484 argv_program, output_type_descr, kf->curname);
485 }
486 } else if (ce->io_type == nmsg_io_io_type_input) {
487 if ((ce->user == NULL || ce->close_type == nmsg_io_close_type_eof) &&
488 ce->input != NULL)
489 {
490 if (ctx.debug >= 5) {
491 fprintf(stderr, "%s: closing input %p\n", __func__, (void *)ce->input);
492 }
493 nmsg_input_close(ce->input);
494 }
495 } else if (ce->io_type == nmsg_io_io_type_output) {
496 if ((ce->user == NULL || ce->close_type == nmsg_io_close_type_eof) &&
497 ce->output != NULL)
498 {
499 if (ctx.debug >= 5) {
500 fprintf(stderr, "%s: closing output %p\n", __func__, (void *)ce->output);
501 }
502 nmsg_output_close(ce->output);
503 }
504 } else {
505 /* should never be reached */
506 assert(0);
507 }
508 }
509
510 static void
signal_handler(int sig)511 signal_handler(int sig __attribute__((unused))) {
512 fprintf(stderr, "%s: signalled break\n", argv_program);
513 nmsg_io_breakloop(ctx.io);
514 }
515
516 static void
setup_signals(void)517 setup_signals(void) {
518 struct sigaction sa;
519
520 memset(&sa, 0, sizeof(sa));
521 sigemptyset(&sa.sa_mask);
522 sigaddset(&sa.sa_mask, SIGINT);
523 sigaddset(&sa.sa_mask, SIGTERM);
524 sa.sa_handler = &signal_handler;
525 if (sigaction(SIGINT, &sa, NULL) != 0) {
526 perror("sigaction");
527 exit(EXIT_FAILURE);
528 }
529 if (sigaction(SIGTERM, &sa, NULL) != 0) {
530 perror("sigaction");
531 exit(EXIT_FAILURE);
532 }
533 }
534