1 /*
2  * Copyright (c) 2008-2019 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *    http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Forward. */
22 
23 static nmsg_input_t	input_open_stream(nmsg_stream_type, int);
24 static nmsg_input_t	input_open_stream_base(nmsg_stream_type);
25 static nmsg_res		input_flush(nmsg_input_t input);
26 static void		input_close_stream(nmsg_input_t input);
27 
28 /* Export. */
29 
30 nmsg_input_t
nmsg_input_open_file(int fd)31 nmsg_input_open_file(int fd) {
32 	return (input_open_stream(nmsg_stream_type_file, fd));
33 }
34 
35 nmsg_input_t
nmsg_input_open_sock(int fd)36 nmsg_input_open_sock(int fd) {
37 	return (input_open_stream(nmsg_stream_type_sock, fd));
38 }
39 
40 #ifdef HAVE_LIBZMQ
41 nmsg_input_t
nmsg_input_open_zmq(void * s)42 nmsg_input_open_zmq(void *s) {
43 	struct nmsg_input *input;
44 
45 	input = input_open_stream_base(nmsg_stream_type_zmq);
46 	if (input == NULL)
47 		return (input);
48 
49 	input->stream->zmq = s;
50 
51 	return (input);
52 }
53 #else /* HAVE_LIBZMQ */
54 nmsg_input_t
nmsg_input_open_zmq(void * s)55 nmsg_input_open_zmq(void *s __attribute__((unused))) {
56 	return (NULL);
57 }
58 #endif /* HAVE_LIBZMQ */
59 
60 nmsg_input_t
nmsg_input_open_callback(nmsg_cb_message_read cb,void * user)61 nmsg_input_open_callback(nmsg_cb_message_read cb, void *user) {
62 	struct nmsg_input *input;
63 
64 	input = calloc(1, sizeof(*input));
65 	if (input == NULL)
66 		return (NULL);
67 	input->type = nmsg_input_type_callback;
68 	input->read_fp = _input_nmsg_read_callback;
69 	input->read_loop_fp = NULL;
70 	input->callback = calloc(1, sizeof(*(input->callback)));
71 	if (input->callback == NULL) {
72 		free(input);
73 		return (NULL);
74 	}
75 	input->callback->cb = cb;
76 	input->callback->user = user;
77 
78 	return (input);
79 }
80 
81 nmsg_input_t
nmsg_input_open_null(void)82 nmsg_input_open_null(void) {
83 	struct nmsg_input *input;
84 
85 	input = input_open_stream_base(nmsg_stream_type_null);
86 	if (input == NULL)
87 		return (NULL);
88 	input->read_fp = _input_nmsg_read_null;
89 	input->read_loop_fp = _input_nmsg_loop_null;
90 
91 	return (input);
92 }
93 
94 nmsg_input_t
nmsg_input_open_pres(int fd,nmsg_msgmod_t msgmod)95 nmsg_input_open_pres(int fd, nmsg_msgmod_t msgmod) {
96 	nmsg_res res;
97 	struct nmsg_input *input;
98 
99 	input = calloc(1, sizeof(*input));
100 	if (input == NULL)
101 		return (NULL);
102 	input->type = nmsg_input_type_pres;
103 	input->read_fp = _input_pres_read;
104 
105 	input->pres = calloc(1, sizeof(*(input->pres)));
106 	if (input->pres == NULL) {
107 		free(input);
108 		return (NULL);
109 	}
110 
111 	input->pres->fp = fdopen(fd, "r");
112 	if (input->pres->fp == NULL) {
113 		free(input->pres);
114 		free(input);
115 		return (NULL);
116 	}
117 
118 	input->msgmod = msgmod;
119 	res = nmsg_msgmod_init(input->msgmod, &input->clos);
120 	if (res != nmsg_res_success) {
121 		fclose(input->pres->fp);
122 		free(input->pres);
123 		free(input);
124 		return (NULL);
125 	}
126 
127 	return (input);
128 }
129 
130 #ifdef HAVE_YAJL
131 nmsg_input_t
nmsg_input_open_json(int fd)132 nmsg_input_open_json(int fd) {
133 	struct nmsg_input *input;
134 	int newfd;
135 
136 	input = calloc(1, sizeof(*input));
137 	if (input == NULL)
138 		return (NULL);
139 	input->type = nmsg_input_type_json;
140 	input->read_fp = _input_json_read;
141 
142 	input->json = calloc(1, sizeof(*(input->json)));
143 	if (input->json == NULL) {
144 		free(input);
145 		return (NULL);
146 	}
147 
148 	input->json->orig_fd = fd;
149 
150 	newfd = dup(fd);
151 	if (newfd == -1) {
152 		free(input->json);
153 		free(input);
154 		return (NULL);
155 	}
156 
157 	input->json->fp = fdopen(newfd, "r");
158 	if (input->json->fp == NULL) {
159 		free(input->json);
160 		free(input);
161 		return (NULL);
162 	}
163 
164 	return (input);
165 }
166 #else /* HAVE_YAJL */
167 nmsg_input_t
nmsg_input_open_json(int fd)168 nmsg_input_open_json(__attribute__((unused)) int fd) {
169 	return (NULL);
170 }
171 #endif /* HAVE_YAJL */
172 
173 nmsg_input_t
nmsg_input_open_pcap(nmsg_pcap_t pcap,nmsg_msgmod_t msgmod)174 nmsg_input_open_pcap(nmsg_pcap_t pcap, nmsg_msgmod_t msgmod) {
175 	nmsg_res res;
176 	struct nmsg_input *input;
177 
178 	input = calloc(1, sizeof(*input));
179 	if (input == NULL)
180 		return (NULL);
181 	input->type = nmsg_input_type_pcap;
182 	input->pcap = pcap;
183 
184 	if (msgmod->plugin->pkt_to_payload != NULL) {
185 		input->read_fp = _input_pcap_read_raw;
186 		nmsg_pcap_input_set_raw(pcap, true);
187 	} else if (msgmod->plugin->ipdg_to_payload != NULL) {
188 		input->read_fp = _input_pcap_read;
189 	} else {
190 		free(input);
191 		return (NULL);
192 	}
193 
194 	input->msgmod = msgmod;
195 	res = nmsg_msgmod_init(input->msgmod, &input->clos);
196 	if (res != nmsg_res_success) {
197 		free(input);
198 		return (NULL);
199 	}
200 	if (msgmod->plugin->pcap_init != NULL) {
201 		void *clos = input->clos;
202 		if (msgmod->plugin->type == nmsg_msgmod_type_transparent)
203 			clos = ((struct nmsg_msgmod_clos *) clos)->mod_clos;
204 		res = msgmod->plugin->pcap_init(clos, input->pcap);
205 		if (res != nmsg_res_success) {
206 			free(input);
207 			return (NULL);
208 		}
209 	}
210 
211 	return (input);
212 }
213 
214 nmsg_res
nmsg_input_close(nmsg_input_t * input)215 nmsg_input_close(nmsg_input_t *input) {
216 	switch ((*input)->type) {
217 	case nmsg_input_type_stream:
218 		_nmsg_brate_destroy(&((*input)->stream->brate));
219 #ifdef HAVE_LIBZMQ
220 		if ((*input)->stream->type == nmsg_stream_type_zmq)
221 			zmq_close((*input)->stream->zmq);
222 #else /* HAVE_LIBZMQ */
223 		assert((*input)->stream->type != nmsg_stream_type_zmq);
224 #endif /* HAVE_LIBZMQ */
225 		input_close_stream(*input);
226 		break;
227 	case nmsg_input_type_pcap:
228 		nmsg_pcap_input_close(&(*input)->pcap);
229 		break;
230 	case nmsg_input_type_pres:
231 		fclose((*input)->pres->fp);
232 		free((*input)->pres);
233 		break;
234 	case nmsg_input_type_json:
235 		if (_nmsg_global_autoclose)
236 			close((*input)->json->orig_fd);
237 		fclose((*input)->json->fp);
238 		free((*input)->json);
239 		break;
240 	case nmsg_input_type_callback:
241 		free((*input)->callback);
242 		break;
243 	}
244 
245 	if ((*input)->msgmod != NULL)
246 		nmsg_msgmod_fini((*input)->msgmod, &(*input)->clos);
247 
248 	free(*input);
249 	*input = NULL;
250 
251 	return (nmsg_res_success);
252 }
253 
254 void
nmsg_input_breakloop(nmsg_input_t input)255 nmsg_input_breakloop(nmsg_input_t input) {
256 	input->stop = true;
257 }
258 
259 nmsg_res
nmsg_input_read(nmsg_input_t input,nmsg_message_t * msg)260 nmsg_input_read(nmsg_input_t input, nmsg_message_t *msg) {
261 	return (input->read_fp(input, msg));
262 }
263 
264 nmsg_res
nmsg_input_loop(nmsg_input_t input,int cnt,nmsg_cb_message cb,void * user)265 nmsg_input_loop(nmsg_input_t input, int cnt, nmsg_cb_message cb, void *user) {
266 	int n_payloads = 0;
267 	nmsg_message_t msg;
268 	nmsg_res res;
269 
270 	if (input->read_loop_fp != NULL)
271 		return (input->read_loop_fp(input, cnt, cb, user));
272 
273 	for (;;) {
274 		res = input->read_fp(input, &msg);
275 		if (res == nmsg_res_again)
276 			continue;
277 		if (res != nmsg_res_success)
278 			return (res);
279 
280 		if (cnt >= 0 && n_payloads == cnt)
281 			break;
282 		if (input->stop)
283 			break;
284 		n_payloads += 1;
285 		cb(msg, user);
286 	}
287 
288 	return (nmsg_res_success);
289 }
290 
291 void
nmsg_input_set_filter_msgtype(nmsg_input_t input,unsigned vid,unsigned msgtype)292 nmsg_input_set_filter_msgtype(nmsg_input_t input,
293 			      unsigned vid, unsigned msgtype)
294 {
295 	if (vid == 0 && msgtype == 0)
296 		input->do_filter = false;
297 	else
298 		input->do_filter = true;
299 
300 	input->filter_vid = vid;
301 	input->filter_msgtype = msgtype;
302 }
303 
304 nmsg_res
nmsg_input_set_filter_msgtype_byname(nmsg_input_t input,const char * vname,const char * mname)305 nmsg_input_set_filter_msgtype_byname(nmsg_input_t input,
306 				     const char *vname, const char *mname)
307 {
308 	unsigned vid, msgtype;
309 
310 	if (vname == NULL || mname == NULL)
311 		return (nmsg_res_failure);
312 
313 	vid = nmsg_msgmod_vname_to_vid(vname);
314 	if (vid == 0)
315 		return (nmsg_res_failure);
316 	msgtype = nmsg_msgmod_mname_to_msgtype(vid, mname);
317 	if (msgtype == 0)
318 		return (nmsg_res_failure);
319 
320 	nmsg_input_set_filter_msgtype(input, vid, msgtype);
321 
322 	return (nmsg_res_success);
323 }
324 
325 void
nmsg_input_set_filter_source(nmsg_input_t input,unsigned source)326 nmsg_input_set_filter_source(nmsg_input_t input, unsigned source) {
327 	if (input->type == nmsg_input_type_stream)
328 		input->stream->source = source;
329 }
330 
331 void
nmsg_input_set_filter_operator(nmsg_input_t input,unsigned operator)332 nmsg_input_set_filter_operator(nmsg_input_t input, unsigned operator) {
333 	if (input->type == nmsg_input_type_stream)
334 		input->stream->operator = operator;
335 }
336 
337 void
nmsg_input_set_filter_group(nmsg_input_t input,unsigned group)338 nmsg_input_set_filter_group(nmsg_input_t input, unsigned group) {
339 	if (input->type == nmsg_input_type_stream)
340 		input->stream->group = group;
341 }
342 
343 nmsg_res
nmsg_input_set_blocking_io(nmsg_input_t input,bool flag)344 nmsg_input_set_blocking_io(nmsg_input_t input, bool flag) {
345 	int val;
346 
347 	if (input->type != nmsg_input_type_stream)
348 		return (nmsg_res_failure);
349 
350 	if ((val = fcntl(input->stream->buf->fd, F_GETFL, 0)) < 0)
351 		return (nmsg_res_failure);
352 
353 	if (flag == true)
354 		val &= ~O_NONBLOCK;
355 	else
356 		val |= O_NONBLOCK;
357 
358 	if (fcntl(input->stream->buf->fd, F_SETFL, val) < 0)
359 		return (nmsg_res_failure);
360 
361 	if (flag == true)
362 		input->stream->blocking_io = true;
363 	else
364 		input->stream->blocking_io = false;
365 
366 	return (nmsg_res_success);
367 }
368 
369 nmsg_res
nmsg_input_set_byte_rate(nmsg_input_t input,size_t target_byte_rate)370 nmsg_input_set_byte_rate(nmsg_input_t input, size_t target_byte_rate) {
371 	if (input->type != nmsg_input_type_stream)
372 		return (nmsg_res_failure);
373 	if (input->stream->brate != NULL)
374 		_nmsg_brate_destroy(&input->stream->brate);
375 	if (target_byte_rate > 0) {
376 		input->stream->brate = _nmsg_brate_init(target_byte_rate);
377 		if (input->stream->brate == NULL)
378 			return (nmsg_res_failure);
379 	}
380 	return (nmsg_res_success);
381 }
382 
383 nmsg_res
nmsg_input_set_verify_seqsrc(nmsg_input_t input,bool verify)384 nmsg_input_set_verify_seqsrc(nmsg_input_t input, bool verify) {
385 	if (input->type != nmsg_input_type_stream)
386 		return (nmsg_res_failure);
387 	input->stream->verify_seqsrc = verify;
388 	return (nmsg_res_success);
389 }
390 
391 nmsg_res
nmsg_input_get_count_container_received(nmsg_input_t input,uint64_t * count)392 nmsg_input_get_count_container_received(nmsg_input_t input, uint64_t *count) {
393 	if (input->type == nmsg_input_type_stream) {
394 		*count = input->stream->count_recv;
395 		return (nmsg_res_success);
396 	}
397 	return (nmsg_res_failure);
398 }
399 
400 nmsg_res
nmsg_input_get_count_container_dropped(nmsg_input_t input,uint64_t * count)401 nmsg_input_get_count_container_dropped(nmsg_input_t input, uint64_t *count) {
402 	if (input->type == nmsg_input_type_stream &&
403 	    input->stream->verify_seqsrc)
404 	{
405 		*count = input->stream->count_drop;
406 		return (nmsg_res_success);
407 	}
408 	return (nmsg_res_failure);
409 }
410 
411 /* Private functions. */
412 
413 static nmsg_input_t
input_open_stream(nmsg_stream_type type,int fd)414 input_open_stream(nmsg_stream_type type, int fd) {
415 	struct nmsg_input *input;
416 
417 	input = input_open_stream_base(type);
418 	if (input == NULL)
419 		return (input);
420 
421 	/* nmsg_buf */
422 	input->stream->buf = _nmsg_buf_new(NMSG_RBUFSZ);
423 	if (input->stream->buf == NULL) {
424 		free(input->stream);
425 		free(input);
426 		return (NULL);
427 	}
428 	_nmsg_buf_reset(input->stream->buf);
429 	input->stream->buf->fd = fd;
430 	input->stream->buf->bufsz = NMSG_RBUFSZ / 2;
431 
432 	/* struct pollfd */
433 	input->stream->pfd.fd = fd;
434 	input->stream->pfd.events = POLLIN;
435 
436 	return (input);
437 }
438 
439 static nmsg_input_t
input_open_stream_base(nmsg_stream_type type)440 input_open_stream_base(nmsg_stream_type type) {
441 	struct nmsg_input *input;
442 
443 	/* nmsg_input */
444 	input = calloc(1, sizeof(*input));
445 	if (input == NULL)
446 		return (NULL);
447 	input->type = nmsg_input_type_stream;
448 	input->read_fp = _input_nmsg_read;
449 	input->read_loop_fp = _input_nmsg_loop;
450 
451 	/* nmsg_stream_input */
452 	input->stream = calloc(1, sizeof(*(input->stream)));
453 	if (input->stream == NULL) {
454 		free(input);
455 		return (NULL);
456 	}
457 	input->stream->blocking_io = true;
458 	input->stream->verify_seqsrc = true;
459 	input->stream->type = type;
460 	if (type == nmsg_stream_type_file) {
461 		input->stream->stream_read_fp = _input_nmsg_read_container_file;
462 	} else if (type == nmsg_stream_type_sock) {
463 		input->stream->stream_read_fp = _input_nmsg_read_container_sock;
464 	} else if (type == nmsg_stream_type_zmq) {
465 #ifdef HAVE_LIBZMQ
466 		input->stream->stream_read_fp = _input_nmsg_read_container_zmq;
467 #else /* HAVE_LIBZMQ */
468 		assert(type != nmsg_stream_type_zmq);
469 #endif /* HAVE_LIBZMQ */
470 	}
471 
472 	/* nmsg_zbuf */
473 	input->stream->zb = nmsg_zbuf_inflate_init();
474 	if (input->stream->zb == NULL) {
475 		_nmsg_buf_destroy(&input->stream->buf);
476 		free(input->stream);
477 		free(input);
478 		return (NULL);
479 	}
480 
481 	/* red-black tree */
482 	RB_INIT(&input->stream->nft.head);
483 
484 	/* nmsg seqsrc */
485 	ISC_LIST_INIT(input->stream->seqsrcs);
486 
487 	return (input);
488 }
489 
490 static void
input_close_stream(nmsg_input_t input)491 input_close_stream(nmsg_input_t input) {
492 	_input_seqsrc_destroy(input);
493 
494 	if (input->stream->nmsg != NULL)
495 		input_flush(input);
496 
497 	nmsg_zbuf_destroy(&input->stream->zb);
498 	_input_frag_destroy(input->stream);
499 	_nmsg_buf_destroy(&input->stream->buf);
500 	free(input->stream);
501 }
502 
503 static nmsg_res
input_flush(nmsg_input_t input)504 input_flush(nmsg_input_t input) {
505 	if (input->type == nmsg_input_type_stream) {
506 		Nmsg__Nmsg *nmsg;
507 		unsigned i;
508 
509 		nmsg = input->stream->nmsg;
510 		assert(nmsg != NULL);
511 
512 		for (i = 0; i < nmsg->n_payloads; i++)
513 			if (nmsg->payloads[i] != NULL)
514 				_nmsg_payload_free(&nmsg->payloads[i]);
515 		nmsg->n_payloads = 0;
516 		nmsg__nmsg__free_unpacked(nmsg, NULL);
517 	}
518 
519 	return (nmsg_res_success);
520 }
521