1 /*
2  * Copyright (c) 2009-2019 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *    http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Forward. */
22 
23 static nmsg_res read_file(nmsg_input_t, ssize_t *);
24 static nmsg_res do_read_file(nmsg_input_t, ssize_t, ssize_t);
25 static nmsg_res do_read_sock(nmsg_input_t, ssize_t);
26 
27 /* Internal functions. */
28 
29 nmsg_res
_input_nmsg_read(nmsg_input_t input,nmsg_message_t * msg)30 _input_nmsg_read(nmsg_input_t input, nmsg_message_t *msg) {
31 	Nmsg__NmsgPayload *np;
32 	nmsg_res res;
33 
34 	if (input->stream->nmsg != NULL &&
35 	    input->stream->np_index >= input->stream->nmsg->n_payloads - 1)
36 	{
37 		input->stream->nmsg->n_payloads = 0;
38 		nmsg__nmsg__free_unpacked(input->stream->nmsg, NULL);
39 		input->stream->nmsg = NULL;
40 	} else {
41 		input->stream->np_index += 1;
42 	}
43 
44 	if (input->stream->nmsg == NULL) {
45 		res = input->stream->stream_read_fp(input, &input->stream->nmsg);
46 		if (res != nmsg_res_success)
47 			return (res);
48 		input->stream->np_index = 0;
49 	}
50 
51 	/* detach the payload from the original nmsg container */
52 	np = input->stream->nmsg->payloads[input->stream->np_index];
53 	input->stream->nmsg->payloads[input->stream->np_index] = NULL;
54 
55 	/* filter payload */
56 	if (_input_nmsg_filter(input, input->stream->np_index, np) == false) {
57 		_nmsg_payload_free(&np);
58 		return (nmsg_res_again);
59 	}
60 
61 	/* pass a pointer to the payload to the caller */
62 	*msg = _nmsg_message_from_payload(np);
63 	if (*msg == NULL) {
64 		_nmsg_payload_free(&np);
65 		return (nmsg_res_memfail);
66 	}
67 
68 	/* possibly sleep a bit if ingress rate control is enabled */
69 	if (input->stream->brate != NULL)
70 		_nmsg_brate_sleep(input->stream->brate,
71 				  input->stream->nc_size,
72 				  input->stream->nmsg->n_payloads,
73 				  input->stream->np_index);
74 
75 	return (nmsg_res_success);
76 }
77 
78 nmsg_res
_input_nmsg_loop(nmsg_input_t input,int cnt,nmsg_cb_message cb,void * user)79 _input_nmsg_loop(nmsg_input_t input, int cnt, nmsg_cb_message cb, void *user) {
80 	unsigned n;
81 	nmsg_res res;
82 	Nmsg__Nmsg *nmsg;
83 	Nmsg__NmsgPayload *np;
84 	nmsg_message_t msg;
85 
86 	if (cnt < 0) {
87 		/* loop indefinitely */
88 		for (;;) {
89 			if (input->stop)
90 				break;
91 			res = input->stream->stream_read_fp(input, &input->stream->nmsg);
92 			if (res == nmsg_res_again)
93 				continue;
94 			if (res != nmsg_res_success)
95 				return (res);
96 
97 			nmsg = input->stream->nmsg;
98 			for (n = 0; n < nmsg->n_payloads; n++) {
99 				np = nmsg->payloads[n];
100 				if (_input_nmsg_filter(input, n, np)) {
101 					msg = _nmsg_message_from_payload(np);
102 					cb(msg, user);
103 				}
104 			}
105 			nmsg->n_payloads = 0;
106 			free(nmsg->payloads);
107 			nmsg->payloads = NULL;
108 			nmsg__nmsg__free_unpacked(nmsg, NULL);
109 			input->stream->nmsg = NULL;
110 		}
111 	} else {
112 		/* loop until (n_payloads == cnt) */
113 		int n_payloads = 0;
114 
115 		for (;;) {
116 			if (input->stop)
117 				break;
118 			res = input->stream->stream_read_fp(input, &input->stream->nmsg);
119 			if (res == nmsg_res_again)
120 				continue;
121 			if (res != nmsg_res_success)
122 				return (res);
123 
124 			nmsg = input->stream->nmsg;
125 			for (n = 0; n < nmsg->n_payloads; n++) {
126 				np = nmsg->payloads[n];
127 				if (_input_nmsg_filter(input, n, np)) {
128 					if (n_payloads == cnt)
129 						break;
130 					n_payloads += 1;
131 					msg = _nmsg_message_from_payload(np);
132 					cb(msg, user);
133 				}
134 			}
135 			nmsg->n_payloads = 0;
136 			free(nmsg->payloads);
137 			nmsg->payloads = NULL;
138 			nmsg__nmsg__free_unpacked(nmsg, NULL);
139 			input->stream->nmsg = NULL;
140 			if (n_payloads == cnt)
141 				break;
142 		}
143 	}
144 
145 	return (nmsg_res_success);
146 }
147 
148 bool
_input_nmsg_filter(nmsg_input_t input,unsigned idx,Nmsg__NmsgPayload * np)149 _input_nmsg_filter(nmsg_input_t input, unsigned idx, Nmsg__NmsgPayload *np) {
150 	assert(input->stream->nmsg != NULL);
151 
152 	/* payload crc */
153 	if (input->stream->nmsg->n_payload_crcs >= (idx + 1)) {
154 		uint32_t wire_crc = input->stream->nmsg->payload_crcs[idx];
155 		uint32_t calc_crc = my_crc32c(np->payload.data, np->payload.len);
156 		if (ntohl(wire_crc) != calc_crc) {
157 			_nmsg_dprintf(1, "libnmsg: WARNING: crc mismatch (%x != %x) [%s]\n",
158 				      calc_crc, wire_crc, __func__);
159 			return (false);
160 		}
161 	}
162 
163 	/* (vid, msgtype) */
164 	if (input->do_filter == true &&
165 	    (input->filter_vid != np->vid ||
166 	     input->filter_msgtype != np->msgtype))
167 	{
168 		return (false);
169 	}
170 
171 	/* source */
172 	if (input->stream->source > 0 &&
173 	    input->stream->source != np->source)
174 	{
175 		return (false);
176 	}
177 
178 	/* operator */
179 	if (input->stream->operator > 0 &&
180 	    input->stream->operator != np->operator_)
181 	{
182 		return (false);
183 	}
184 
185 	/* group */
186 	if (input->stream->group > 0 &&
187 	    input->stream->group != np->group)
188 	{
189 		return (false);
190 	}
191 
192 	/* all passed */
193 	return (true);
194 }
195 
196 nmsg_res
_input_nmsg_unpack_container(nmsg_input_t input,Nmsg__Nmsg ** nmsg,uint8_t * buf,size_t buf_len)197 _input_nmsg_unpack_container(nmsg_input_t input, Nmsg__Nmsg **nmsg,
198 			     uint8_t *buf, size_t buf_len)
199 {
200 	nmsg_res res = nmsg_res_success;
201 
202 	input->stream->nc_size = buf_len + NMSG_HDRLSZ_V2;
203 	_nmsg_dprintf(6, "%s: unpacking container len= %zd\n", __func__, buf_len);
204 
205 	if (input->stream->flags & NMSG_FLAG_FRAGMENT) {
206 		res = _input_frag_read(input, nmsg, buf, buf_len);
207 	} else if (input->stream->flags & NMSG_FLAG_ZLIB) {
208 		size_t u_len;
209 		u_char *u_buf;
210 
211 		res = nmsg_zbuf_inflate(input->stream->zb, buf_len, buf, &u_len, &u_buf);
212 		if (res != nmsg_res_success)
213 			return (res);
214 		*nmsg = nmsg__nmsg__unpack(NULL, u_len, u_buf);
215 		free(u_buf);
216 		if (*nmsg == NULL)
217 			return (nmsg_res_parse_error);
218 	} else {
219 		*nmsg = nmsg__nmsg__unpack(NULL, buf_len, buf);
220 		if (*nmsg == NULL)
221 			return (nmsg_res_parse_error);
222 	}
223 
224 	return (res);
225 }
226 
227 nmsg_res
_input_nmsg_unpack_container2(const uint8_t * buf,size_t buf_len,unsigned flags,Nmsg__Nmsg ** nmsg)228 _input_nmsg_unpack_container2(const uint8_t *buf, size_t buf_len,
229 			      unsigned flags, Nmsg__Nmsg **nmsg)
230 {
231 	nmsg_res res;
232 
233 	/* fragmented containers aren't handled by this function */
234 	if (flags & NMSG_FLAG_FRAGMENT)
235 		return (nmsg_res_failure);
236 
237 	if (flags & NMSG_FLAG_ZLIB) {
238 		size_t u_len;
239 		u_char *u_buf;
240 		nmsg_zbuf_t zb;
241 
242 		zb = nmsg_zbuf_inflate_init();
243 		if (zb == NULL)
244 			return (nmsg_res_memfail);
245 		res = nmsg_zbuf_inflate(zb, buf_len, (uint8_t *) buf, &u_len, &u_buf);
246 		nmsg_zbuf_destroy(&zb);
247 		if (res != nmsg_res_success)
248 			return (res);
249 		*nmsg = nmsg__nmsg__unpack(NULL, u_len, u_buf);
250 		free(u_buf);
251 		if (*nmsg == NULL)
252 			return (nmsg_res_failure);
253 	} else {
254 		*nmsg = nmsg__nmsg__unpack(NULL, buf_len, buf);
255 		if (*nmsg == NULL)
256 			return (nmsg_res_failure);
257 	}
258 
259 	return (nmsg_res_success);
260 }
261 
262 nmsg_res
_input_nmsg_read_container_file(nmsg_input_t input,Nmsg__Nmsg ** nmsg)263 _input_nmsg_read_container_file(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
264 	nmsg_res res;
265 	ssize_t bytes_avail, msgsize = 0;
266 
267 	assert(input->stream->type == nmsg_stream_type_file);
268 
269 	/* read */
270 	res = read_file(input, &msgsize);
271 	if (res != nmsg_res_success)
272 		return (res);
273 
274 	/* ensure that the full NMSG container is available */
275 	bytes_avail = _nmsg_buf_avail(input->stream->buf);
276 	if (bytes_avail < msgsize) {
277 		ssize_t bytes_to_read = msgsize - bytes_avail;
278 
279 		res = do_read_file(input, bytes_to_read, bytes_to_read);
280 		if (res != nmsg_res_success)
281 			return (res);
282 	}
283 
284 	/* unpack message */
285 	res = _input_nmsg_unpack_container(input, nmsg, input->stream->buf->pos, msgsize);
286 	input->stream->buf->pos += msgsize;
287 
288 	return (res);
289 }
290 
291 nmsg_res
_input_nmsg_read_container_sock(nmsg_input_t input,Nmsg__Nmsg ** nmsg)292 _input_nmsg_read_container_sock(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
293 	nmsg_res res;
294 	ssize_t msgsize;
295 	struct nmsg_buf *buf = input->stream->buf;
296 
297 	assert(input->stream->type == nmsg_stream_type_sock);
298 
299 	/* read the NMSG container */
300 	_nmsg_buf_reset(buf);
301 	res = do_read_sock(input, buf->bufsz);
302 	if (res != nmsg_res_success) {
303 		if (res == nmsg_res_read_failure)
304 			return (res);
305 		else
306 			/* forward compatibility */
307 			return (nmsg_res_again);
308 	}
309 	if (_nmsg_buf_avail(buf) < NMSG_HDRLSZ_V2)
310 		return (nmsg_res_failure);
311 
312 	/* deserialize the NMSG header */
313 	res = _input_nmsg_deserialize_header(buf->pos,
314 					     _nmsg_buf_avail(buf),
315 					     &msgsize,
316 					     &input->stream->flags);
317 	if (res != nmsg_res_success)
318 		return (res);
319 	buf->pos += NMSG_HDRLSZ_V2;
320 
321 	/* since the input stream is a sock stream, the entire message must
322 	 * have been read by the call to do_read_sock() */
323 	if (_nmsg_buf_avail(buf) != msgsize)
324 		return (nmsg_res_parse_error);
325 
326 	/* unpack message */
327 	res = _input_nmsg_unpack_container(input, nmsg, buf->pos, msgsize);
328 	input->stream->buf->pos += msgsize;
329 
330 	/* update counters */
331 	if (*nmsg != NULL) {
332 		input->stream->count_recv += 1;
333 
334 		if (input->stream->verify_seqsrc) {
335 			struct nmsg_seqsrc *seqsrc;
336 
337 			seqsrc = _input_seqsrc_get(input, *nmsg);
338 			if (seqsrc != NULL) {
339 				size_t drop;
340 				drop = _input_seqsrc_update(input, seqsrc, *nmsg);
341 				input->stream->count_drop += drop;
342 			}
343 		}
344 	}
345 
346 	/* expire old outstanding fragments */
347 	_input_frag_gc(input->stream);
348 
349 	return (res);
350 }
351 
352 #ifdef HAVE_LIBZMQ
353 nmsg_res
_input_nmsg_read_container_zmq(nmsg_input_t input,Nmsg__Nmsg ** nmsg)354 _input_nmsg_read_container_zmq(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
355 	int ret;
356 	nmsg_res res;
357 	uint8_t *buf;
358 	size_t buf_len;
359 	ssize_t msgsize = 0;
360 	zmq_msg_t zmsg;
361 	zmq_pollitem_t zitems[1];
362 
363 	/* poll */
364 	zitems[0].socket = input->stream->zmq;
365 	zitems[0].events = ZMQ_POLLIN;
366 	ret = zmq_poll(zitems, 1, NMSG_RBUF_TIMEOUT);
367 	if (ret == 0 || (ret == -1 && errno == EINTR))
368 		return (nmsg_res_again);
369 	else if (ret == -1)
370 		return (nmsg_res_read_failure);
371 
372 	/* initialize ZMQ message object */
373 	if (zmq_msg_init(&zmsg))
374 		return (nmsg_res_failure);
375 
376 	/* read the NMSG container */
377 	if (zmq_recvmsg(input->stream->zmq, &zmsg, 0) == -1) {
378 		res = nmsg_res_failure;
379 		goto out;
380 	}
381 	nmsg_timespec_get(&input->stream->now);
382 
383 	/* get buffer from the ZMQ message */
384 	buf = zmq_msg_data(&zmsg);
385 	buf_len = zmq_msg_size(&zmsg);
386 	if (buf_len < NMSG_HDRLSZ_V2) {
387 		res = nmsg_res_failure;
388 		goto out;
389 	}
390 
391 	/* deserialize the NMSG header */
392 	res = _input_nmsg_deserialize_header(buf, buf_len, &msgsize, &input->stream->flags);
393 	if (res != nmsg_res_success)
394 		goto out;
395 	buf += NMSG_HDRLSZ_V2;
396 
397 	/* the entire message must have been read by zmq_recvmsg() */
398 	assert((size_t) msgsize == buf_len - NMSG_HDRLSZ_V2);
399 
400 	/* unpack message */
401 	res = _input_nmsg_unpack_container(input, nmsg, buf, msgsize);
402 
403 	/* update seqsrc counts */
404 	if (input->stream->verify_seqsrc && *nmsg != NULL) {
405 		struct nmsg_seqsrc *seqsrc = _input_seqsrc_get(input, *nmsg);
406 		if (seqsrc != NULL)
407 			_input_seqsrc_update(input, seqsrc, *nmsg);
408 	}
409 
410 	/* expire old outstanding fragments */
411 	_input_frag_gc(input->stream);
412 
413 out:
414 	zmq_msg_close(&zmsg);
415 	return (res);
416 }
417 #endif /* HAVE_LIBZMQ */
418 
419 nmsg_res
_input_nmsg_deserialize_header(const uint8_t * buf,size_t buf_len,ssize_t * msgsize,unsigned * flags)420 _input_nmsg_deserialize_header(const uint8_t *buf, size_t buf_len,
421 			       ssize_t *msgsize, unsigned *flags)
422 {
423 	static const char magic[] = NMSG_MAGIC;
424 	uint16_t version;
425 
426 	if (buf_len < NMSG_LENHDRSZ_V2)
427 		return (nmsg_res_failure);
428 
429 	/* check magic */
430 	if (memcmp(buf, magic, sizeof(magic)) != 0)
431 		return (nmsg_res_magic_mismatch);
432 	buf += sizeof(magic);
433 
434 	/* check version */
435 	load_net16(buf, &version);
436 	if ((version & 0xFF) != 2U)
437 		return (nmsg_res_version_mismatch);
438 	*flags = version >> 8;
439 	buf += sizeof(version);
440 
441 	/* load message (container) size */
442 	load_net32(buf, msgsize);
443 
444 	return (nmsg_res_success);
445 }
446 
447 
448 /* Private functions. */
449 
450 static nmsg_res
read_file(nmsg_input_t input,ssize_t * msgsize)451 read_file(nmsg_input_t input, ssize_t *msgsize) {
452 	static const char magic[] = NMSG_MAGIC;
453 
454 	bool reset_buf = false;
455 	ssize_t bytes_avail, bytes_needed, lenhdrsz;
456 	nmsg_res res = nmsg_res_failure;
457 	uint16_t version;
458 	struct nmsg_buf *buf = input->stream->buf;
459 
460 	/* ensure we have the (magic, version) header fields */
461 	bytes_avail = _nmsg_buf_avail(buf);
462 	if (bytes_avail < NMSG_HDRSZ) {
463 		assert(bytes_avail >= 0);
464 		bytes_needed = NMSG_HDRSZ - bytes_avail;
465 		if (bytes_avail == 0) {
466 			_nmsg_buf_reset(buf);
467 			res = do_read_file(input, bytes_needed, buf->bufsz);
468 		} else {
469 			/* the (magic, version) header fields were split */
470 			res = do_read_file(input, bytes_needed, bytes_needed);
471 			reset_buf = true;
472 		}
473 		if (res != nmsg_res_success)
474 			return (res);
475 	}
476 	bytes_avail = _nmsg_buf_avail(buf);
477 	assert(bytes_avail >= NMSG_HDRSZ);
478 
479 	/* check magic */
480 	if (memcmp(buf->pos, magic, sizeof(magic)) != 0)
481 		return (nmsg_res_magic_mismatch);
482 	buf->pos += sizeof(magic);
483 
484 	/* check version */
485 	load_net16(buf->pos, &version);
486 	buf->pos += 2;
487 	if (version == 1U) {
488 		lenhdrsz = NMSG_LENHDRSZ_V1;
489 	} else if ((version & 0xFF) == 2U) {
490 		input->stream->flags = version >> 8;
491 		version &= 0xFF;
492 		lenhdrsz = NMSG_LENHDRSZ_V2;
493 	} else {
494 		res = nmsg_res_version_mismatch;
495 		goto read_header_out;
496 	}
497 
498 	/* if reset_buf was set, then reading the (magic, version) header
499 	 * required two read()s. at this point we've consumed all the split
500 	 * header data, so reset the buffer to avoid overflow.
501 	 */
502 	if (reset_buf == true) {
503 		_nmsg_buf_reset(buf);
504 		reset_buf = false;
505 	}
506 
507 	/* ensure we have the length header field */
508 	bytes_avail = _nmsg_buf_avail(buf);
509 	if (bytes_avail < lenhdrsz) {
510 		if (bytes_avail == 0)
511 			_nmsg_buf_reset(buf);
512 		bytes_needed = lenhdrsz - bytes_avail;
513 		if (bytes_avail == 0) {
514 			res = do_read_file(input, bytes_needed, buf->bufsz);
515 		} else {
516 			/* the length header field was split */
517 			res = do_read_file(input, bytes_needed, bytes_needed);
518 			reset_buf = true;
519 		}
520 		if (res != nmsg_res_success)
521 			return (res);
522 	}
523 	bytes_avail = _nmsg_buf_avail(buf);
524 	assert(bytes_avail >= lenhdrsz);
525 
526 	/* load message size */
527 	if (version == 1U) {
528 		load_net16(buf->pos, msgsize);
529 		buf->pos += 2;
530 	} else if (version == 2U) {
531 		load_net32(buf->pos, msgsize);
532 		buf->pos += 4;
533 	}
534 
535 	res = nmsg_res_success;
536 
537 read_header_out:
538 	if (reset_buf == true)
539 		_nmsg_buf_reset(buf);
540 
541 	return (res);
542 }
543 
544 static nmsg_res
do_read_file(nmsg_input_t input,ssize_t bytes_needed,ssize_t bytes_max)545 do_read_file(nmsg_input_t input, ssize_t bytes_needed, ssize_t bytes_max) {
546 	ssize_t bytes_read;
547 	struct nmsg_buf *buf = input->stream->buf;
548 
549 	/* sanity check */
550 	assert(bytes_needed <= bytes_max);
551 
552 	/* check that we have enough buffer space */
553 	assert((buf->end + bytes_max) <= (buf->data + NMSG_RBUFSZ));
554 
555 	while (bytes_needed > 0) {
556 		bytes_read = read(buf->fd, buf->end, bytes_max);
557 		if (bytes_read < 0)
558 			return (nmsg_res_failure);
559 		if (bytes_read == 0)
560 			return (nmsg_res_eof);
561 		buf->end += bytes_read;
562 		bytes_needed -= bytes_read;
563 		bytes_max -= bytes_read;
564 	}
565 	nmsg_timespec_get(&input->stream->now);
566 	return (nmsg_res_success);
567 }
568 
569 static nmsg_res
do_read_sock(nmsg_input_t input,ssize_t bytes_max)570 do_read_sock(nmsg_input_t input, ssize_t bytes_max) {
571 	int ret;
572 	ssize_t bytes_read;
573 	struct nmsg_buf *buf = input->stream->buf;
574 	socklen_t addr_len = sizeof(struct sockaddr_storage);
575 
576 	/* check that we have enough buffer space */
577 	assert((buf->end + bytes_max) <= (buf->data + NMSG_RBUFSZ));
578 
579 	if (input->stream->blocking_io == true) {
580 		/* poll */
581 		ret = poll(&input->stream->pfd, 1, NMSG_RBUF_TIMEOUT);
582 		if (ret == 0 || (ret == -1 && errno == EINTR))
583 			return (nmsg_res_again);
584 		else if (ret == -1)
585 			return (nmsg_res_read_failure);
586 	}
587 
588 	/* read */
589 	bytes_read = recvfrom(buf->fd, buf->pos, bytes_max, 0,
590 			      (struct sockaddr *) &input->stream->addr_ss, &addr_len);
591 	nmsg_timespec_get(&input->stream->now);
592 
593 	if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
594 		return (nmsg_res_again);
595 	if (bytes_read < 0)
596 		return (nmsg_res_read_failure);
597 	if (bytes_read == 0)
598 		return (nmsg_res_eof);
599 	buf->end = buf->pos + bytes_read;
600 
601 	return (nmsg_res_success);
602 }
603