1 /*
2 * Copyright (c) 2008-2019 by Farsight Security, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 /* Import. */
18
19 #include "private.h"
20
21 /* Forward. */
22
23 static nmsg_input_t input_open_stream(nmsg_stream_type, int);
24 static nmsg_input_t input_open_stream_base(nmsg_stream_type);
25 static nmsg_res input_flush(nmsg_input_t input);
26 static void input_close_stream(nmsg_input_t input);
27
28 /* Export. */
29
30 nmsg_input_t
nmsg_input_open_file(int fd)31 nmsg_input_open_file(int fd) {
32 return (input_open_stream(nmsg_stream_type_file, fd));
33 }
34
35 nmsg_input_t
nmsg_input_open_sock(int fd)36 nmsg_input_open_sock(int fd) {
37 return (input_open_stream(nmsg_stream_type_sock, fd));
38 }
39
40 #ifdef HAVE_LIBZMQ
41 nmsg_input_t
nmsg_input_open_zmq(void * s)42 nmsg_input_open_zmq(void *s) {
43 struct nmsg_input *input;
44
45 input = input_open_stream_base(nmsg_stream_type_zmq);
46 if (input == NULL)
47 return (input);
48
49 input->stream->zmq = s;
50
51 return (input);
52 }
53 #else /* HAVE_LIBZMQ */
54 nmsg_input_t
nmsg_input_open_zmq(void * s)55 nmsg_input_open_zmq(void *s __attribute__((unused))) {
56 return (NULL);
57 }
58 #endif /* HAVE_LIBZMQ */
59
60 nmsg_input_t
nmsg_input_open_callback(nmsg_cb_message_read cb,void * user)61 nmsg_input_open_callback(nmsg_cb_message_read cb, void *user) {
62 struct nmsg_input *input;
63
64 input = calloc(1, sizeof(*input));
65 if (input == NULL)
66 return (NULL);
67 input->type = nmsg_input_type_callback;
68 input->read_fp = _input_nmsg_read_callback;
69 input->read_loop_fp = NULL;
70 input->callback = calloc(1, sizeof(*(input->callback)));
71 if (input->callback == NULL) {
72 free(input);
73 return (NULL);
74 }
75 input->callback->cb = cb;
76 input->callback->user = user;
77
78 return (input);
79 }
80
81 nmsg_input_t
nmsg_input_open_null(void)82 nmsg_input_open_null(void) {
83 struct nmsg_input *input;
84
85 input = input_open_stream_base(nmsg_stream_type_null);
86 if (input == NULL)
87 return (NULL);
88 input->read_fp = _input_nmsg_read_null;
89 input->read_loop_fp = _input_nmsg_loop_null;
90
91 return (input);
92 }
93
94 nmsg_input_t
nmsg_input_open_pres(int fd,nmsg_msgmod_t msgmod)95 nmsg_input_open_pres(int fd, nmsg_msgmod_t msgmod) {
96 nmsg_res res;
97 struct nmsg_input *input;
98
99 input = calloc(1, sizeof(*input));
100 if (input == NULL)
101 return (NULL);
102 input->type = nmsg_input_type_pres;
103 input->read_fp = _input_pres_read;
104
105 input->pres = calloc(1, sizeof(*(input->pres)));
106 if (input->pres == NULL) {
107 free(input);
108 return (NULL);
109 }
110
111 input->pres->fp = fdopen(fd, "r");
112 if (input->pres->fp == NULL) {
113 free(input->pres);
114 free(input);
115 return (NULL);
116 }
117
118 input->msgmod = msgmod;
119 res = nmsg_msgmod_init(input->msgmod, &input->clos);
120 if (res != nmsg_res_success) {
121 fclose(input->pres->fp);
122 free(input->pres);
123 free(input);
124 return (NULL);
125 }
126
127 return (input);
128 }
129
130 #ifdef HAVE_YAJL
131 nmsg_input_t
nmsg_input_open_json(int fd)132 nmsg_input_open_json(int fd) {
133 struct nmsg_input *input;
134 int newfd;
135
136 input = calloc(1, sizeof(*input));
137 if (input == NULL)
138 return (NULL);
139 input->type = nmsg_input_type_json;
140 input->read_fp = _input_json_read;
141
142 input->json = calloc(1, sizeof(*(input->json)));
143 if (input->json == NULL) {
144 free(input);
145 return (NULL);
146 }
147
148 input->json->orig_fd = fd;
149
150 newfd = dup(fd);
151 if (newfd == -1) {
152 free(input->json);
153 free(input);
154 return (NULL);
155 }
156
157 input->json->fp = fdopen(newfd, "r");
158 if (input->json->fp == NULL) {
159 free(input->json);
160 free(input);
161 return (NULL);
162 }
163
164 return (input);
165 }
166 #else /* HAVE_YAJL */
167 nmsg_input_t
nmsg_input_open_json(int fd)168 nmsg_input_open_json(__attribute__((unused)) int fd) {
169 return (NULL);
170 }
171 #endif /* HAVE_YAJL */
172
173 nmsg_input_t
nmsg_input_open_pcap(nmsg_pcap_t pcap,nmsg_msgmod_t msgmod)174 nmsg_input_open_pcap(nmsg_pcap_t pcap, nmsg_msgmod_t msgmod) {
175 nmsg_res res;
176 struct nmsg_input *input;
177
178 input = calloc(1, sizeof(*input));
179 if (input == NULL)
180 return (NULL);
181 input->type = nmsg_input_type_pcap;
182 input->pcap = pcap;
183
184 if (msgmod->plugin->pkt_to_payload != NULL) {
185 input->read_fp = _input_pcap_read_raw;
186 nmsg_pcap_input_set_raw(pcap, true);
187 } else if (msgmod->plugin->ipdg_to_payload != NULL) {
188 input->read_fp = _input_pcap_read;
189 } else {
190 free(input);
191 return (NULL);
192 }
193
194 input->msgmod = msgmod;
195 res = nmsg_msgmod_init(input->msgmod, &input->clos);
196 if (res != nmsg_res_success) {
197 free(input);
198 return (NULL);
199 }
200 if (msgmod->plugin->pcap_init != NULL) {
201 void *clos = input->clos;
202 if (msgmod->plugin->type == nmsg_msgmod_type_transparent)
203 clos = ((struct nmsg_msgmod_clos *) clos)->mod_clos;
204 res = msgmod->plugin->pcap_init(clos, input->pcap);
205 if (res != nmsg_res_success) {
206 free(input);
207 return (NULL);
208 }
209 }
210
211 return (input);
212 }
213
214 nmsg_res
nmsg_input_close(nmsg_input_t * input)215 nmsg_input_close(nmsg_input_t *input) {
216 switch ((*input)->type) {
217 case nmsg_input_type_stream:
218 _nmsg_brate_destroy(&((*input)->stream->brate));
219 #ifdef HAVE_LIBZMQ
220 if ((*input)->stream->type == nmsg_stream_type_zmq)
221 zmq_close((*input)->stream->zmq);
222 #else /* HAVE_LIBZMQ */
223 assert((*input)->stream->type != nmsg_stream_type_zmq);
224 #endif /* HAVE_LIBZMQ */
225 input_close_stream(*input);
226 break;
227 case nmsg_input_type_pcap:
228 nmsg_pcap_input_close(&(*input)->pcap);
229 break;
230 case nmsg_input_type_pres:
231 fclose((*input)->pres->fp);
232 free((*input)->pres);
233 break;
234 case nmsg_input_type_json:
235 if (_nmsg_global_autoclose)
236 close((*input)->json->orig_fd);
237 fclose((*input)->json->fp);
238 free((*input)->json);
239 break;
240 case nmsg_input_type_callback:
241 free((*input)->callback);
242 break;
243 }
244
245 if ((*input)->msgmod != NULL)
246 nmsg_msgmod_fini((*input)->msgmod, &(*input)->clos);
247
248 free(*input);
249 *input = NULL;
250
251 return (nmsg_res_success);
252 }
253
254 void
nmsg_input_breakloop(nmsg_input_t input)255 nmsg_input_breakloop(nmsg_input_t input) {
256 input->stop = true;
257 }
258
259 nmsg_res
nmsg_input_read(nmsg_input_t input,nmsg_message_t * msg)260 nmsg_input_read(nmsg_input_t input, nmsg_message_t *msg) {
261 return (input->read_fp(input, msg));
262 }
263
264 nmsg_res
nmsg_input_loop(nmsg_input_t input,int cnt,nmsg_cb_message cb,void * user)265 nmsg_input_loop(nmsg_input_t input, int cnt, nmsg_cb_message cb, void *user) {
266 int n_payloads = 0;
267 nmsg_message_t msg;
268 nmsg_res res;
269
270 if (input->read_loop_fp != NULL)
271 return (input->read_loop_fp(input, cnt, cb, user));
272
273 for (;;) {
274 res = input->read_fp(input, &msg);
275 if (res == nmsg_res_again)
276 continue;
277 if (res != nmsg_res_success)
278 return (res);
279
280 if (cnt >= 0 && n_payloads == cnt)
281 break;
282 if (input->stop)
283 break;
284 n_payloads += 1;
285 cb(msg, user);
286 }
287
288 return (nmsg_res_success);
289 }
290
291 void
nmsg_input_set_filter_msgtype(nmsg_input_t input,unsigned vid,unsigned msgtype)292 nmsg_input_set_filter_msgtype(nmsg_input_t input,
293 unsigned vid, unsigned msgtype)
294 {
295 if (vid == 0 && msgtype == 0)
296 input->do_filter = false;
297 else
298 input->do_filter = true;
299
300 input->filter_vid = vid;
301 input->filter_msgtype = msgtype;
302 }
303
304 nmsg_res
nmsg_input_set_filter_msgtype_byname(nmsg_input_t input,const char * vname,const char * mname)305 nmsg_input_set_filter_msgtype_byname(nmsg_input_t input,
306 const char *vname, const char *mname)
307 {
308 unsigned vid, msgtype;
309
310 if (vname == NULL || mname == NULL)
311 return (nmsg_res_failure);
312
313 vid = nmsg_msgmod_vname_to_vid(vname);
314 if (vid == 0)
315 return (nmsg_res_failure);
316 msgtype = nmsg_msgmod_mname_to_msgtype(vid, mname);
317 if (msgtype == 0)
318 return (nmsg_res_failure);
319
320 nmsg_input_set_filter_msgtype(input, vid, msgtype);
321
322 return (nmsg_res_success);
323 }
324
325 void
nmsg_input_set_filter_source(nmsg_input_t input,unsigned source)326 nmsg_input_set_filter_source(nmsg_input_t input, unsigned source) {
327 if (input->type == nmsg_input_type_stream)
328 input->stream->source = source;
329 }
330
331 void
nmsg_input_set_filter_operator(nmsg_input_t input,unsigned operator)332 nmsg_input_set_filter_operator(nmsg_input_t input, unsigned operator) {
333 if (input->type == nmsg_input_type_stream)
334 input->stream->operator = operator;
335 }
336
337 void
nmsg_input_set_filter_group(nmsg_input_t input,unsigned group)338 nmsg_input_set_filter_group(nmsg_input_t input, unsigned group) {
339 if (input->type == nmsg_input_type_stream)
340 input->stream->group = group;
341 }
342
343 nmsg_res
nmsg_input_set_blocking_io(nmsg_input_t input,bool flag)344 nmsg_input_set_blocking_io(nmsg_input_t input, bool flag) {
345 int val;
346
347 if (input->type != nmsg_input_type_stream)
348 return (nmsg_res_failure);
349
350 if ((val = fcntl(input->stream->buf->fd, F_GETFL, 0)) < 0)
351 return (nmsg_res_failure);
352
353 if (flag == true)
354 val &= ~O_NONBLOCK;
355 else
356 val |= O_NONBLOCK;
357
358 if (fcntl(input->stream->buf->fd, F_SETFL, val) < 0)
359 return (nmsg_res_failure);
360
361 if (flag == true)
362 input->stream->blocking_io = true;
363 else
364 input->stream->blocking_io = false;
365
366 return (nmsg_res_success);
367 }
368
369 nmsg_res
nmsg_input_set_byte_rate(nmsg_input_t input,size_t target_byte_rate)370 nmsg_input_set_byte_rate(nmsg_input_t input, size_t target_byte_rate) {
371 if (input->type != nmsg_input_type_stream)
372 return (nmsg_res_failure);
373 if (input->stream->brate != NULL)
374 _nmsg_brate_destroy(&input->stream->brate);
375 if (target_byte_rate > 0) {
376 input->stream->brate = _nmsg_brate_init(target_byte_rate);
377 if (input->stream->brate == NULL)
378 return (nmsg_res_failure);
379 }
380 return (nmsg_res_success);
381 }
382
383 nmsg_res
nmsg_input_set_verify_seqsrc(nmsg_input_t input,bool verify)384 nmsg_input_set_verify_seqsrc(nmsg_input_t input, bool verify) {
385 if (input->type != nmsg_input_type_stream)
386 return (nmsg_res_failure);
387 input->stream->verify_seqsrc = verify;
388 return (nmsg_res_success);
389 }
390
391 nmsg_res
nmsg_input_get_count_container_received(nmsg_input_t input,uint64_t * count)392 nmsg_input_get_count_container_received(nmsg_input_t input, uint64_t *count) {
393 if (input->type == nmsg_input_type_stream) {
394 *count = input->stream->count_recv;
395 return (nmsg_res_success);
396 }
397 return (nmsg_res_failure);
398 }
399
400 nmsg_res
nmsg_input_get_count_container_dropped(nmsg_input_t input,uint64_t * count)401 nmsg_input_get_count_container_dropped(nmsg_input_t input, uint64_t *count) {
402 if (input->type == nmsg_input_type_stream &&
403 input->stream->verify_seqsrc)
404 {
405 *count = input->stream->count_drop;
406 return (nmsg_res_success);
407 }
408 return (nmsg_res_failure);
409 }
410
411 /* Private functions. */
412
413 static nmsg_input_t
input_open_stream(nmsg_stream_type type,int fd)414 input_open_stream(nmsg_stream_type type, int fd) {
415 struct nmsg_input *input;
416
417 input = input_open_stream_base(type);
418 if (input == NULL)
419 return (input);
420
421 /* nmsg_buf */
422 input->stream->buf = _nmsg_buf_new(NMSG_RBUFSZ);
423 if (input->stream->buf == NULL) {
424 free(input->stream);
425 free(input);
426 return (NULL);
427 }
428 _nmsg_buf_reset(input->stream->buf);
429 input->stream->buf->fd = fd;
430 input->stream->buf->bufsz = NMSG_RBUFSZ / 2;
431
432 /* struct pollfd */
433 input->stream->pfd.fd = fd;
434 input->stream->pfd.events = POLLIN;
435
436 return (input);
437 }
438
439 static nmsg_input_t
input_open_stream_base(nmsg_stream_type type)440 input_open_stream_base(nmsg_stream_type type) {
441 struct nmsg_input *input;
442
443 /* nmsg_input */
444 input = calloc(1, sizeof(*input));
445 if (input == NULL)
446 return (NULL);
447 input->type = nmsg_input_type_stream;
448 input->read_fp = _input_nmsg_read;
449 input->read_loop_fp = _input_nmsg_loop;
450
451 /* nmsg_stream_input */
452 input->stream = calloc(1, sizeof(*(input->stream)));
453 if (input->stream == NULL) {
454 free(input);
455 return (NULL);
456 }
457 input->stream->blocking_io = true;
458 input->stream->verify_seqsrc = true;
459 input->stream->type = type;
460 if (type == nmsg_stream_type_file) {
461 input->stream->stream_read_fp = _input_nmsg_read_container_file;
462 } else if (type == nmsg_stream_type_sock) {
463 input->stream->stream_read_fp = _input_nmsg_read_container_sock;
464 } else if (type == nmsg_stream_type_zmq) {
465 #ifdef HAVE_LIBZMQ
466 input->stream->stream_read_fp = _input_nmsg_read_container_zmq;
467 #else /* HAVE_LIBZMQ */
468 assert(type != nmsg_stream_type_zmq);
469 #endif /* HAVE_LIBZMQ */
470 }
471
472 /* nmsg_zbuf */
473 input->stream->zb = nmsg_zbuf_inflate_init();
474 if (input->stream->zb == NULL) {
475 _nmsg_buf_destroy(&input->stream->buf);
476 free(input->stream);
477 free(input);
478 return (NULL);
479 }
480
481 /* red-black tree */
482 RB_INIT(&input->stream->nft.head);
483
484 /* nmsg seqsrc */
485 ISC_LIST_INIT(input->stream->seqsrcs);
486
487 return (input);
488 }
489
490 static void
input_close_stream(nmsg_input_t input)491 input_close_stream(nmsg_input_t input) {
492 _input_seqsrc_destroy(input);
493
494 if (input->stream->nmsg != NULL)
495 input_flush(input);
496
497 nmsg_zbuf_destroy(&input->stream->zb);
498 _input_frag_destroy(input->stream);
499 _nmsg_buf_destroy(&input->stream->buf);
500 free(input->stream);
501 }
502
503 static nmsg_res
input_flush(nmsg_input_t input)504 input_flush(nmsg_input_t input) {
505 if (input->type == nmsg_input_type_stream) {
506 Nmsg__Nmsg *nmsg;
507 unsigned i;
508
509 nmsg = input->stream->nmsg;
510 assert(nmsg != NULL);
511
512 for (i = 0; i < nmsg->n_payloads; i++)
513 if (nmsg->payloads[i] != NULL)
514 _nmsg_payload_free(&nmsg->payloads[i]);
515 nmsg->n_payloads = 0;
516 nmsg__nmsg__free_unpacked(nmsg, NULL);
517 }
518
519 return (nmsg_res_success);
520 }
521