1 /*
2 * Copyright (c) 2009-2019 by Farsight Security, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 /* Import. */
18
19 #include "private.h"
20
21 /* Forward. */
22
23 static nmsg_res read_file(nmsg_input_t, ssize_t *);
24 static nmsg_res do_read_file(nmsg_input_t, ssize_t, ssize_t);
25 static nmsg_res do_read_sock(nmsg_input_t, ssize_t);
26
27 /* Internal functions. */
28
29 nmsg_res
_input_nmsg_read(nmsg_input_t input,nmsg_message_t * msg)30 _input_nmsg_read(nmsg_input_t input, nmsg_message_t *msg) {
31 Nmsg__NmsgPayload *np;
32 nmsg_res res;
33
34 if (input->stream->nmsg != NULL &&
35 input->stream->np_index >= input->stream->nmsg->n_payloads - 1)
36 {
37 input->stream->nmsg->n_payloads = 0;
38 nmsg__nmsg__free_unpacked(input->stream->nmsg, NULL);
39 input->stream->nmsg = NULL;
40 } else {
41 input->stream->np_index += 1;
42 }
43
44 if (input->stream->nmsg == NULL) {
45 res = input->stream->stream_read_fp(input, &input->stream->nmsg);
46 if (res != nmsg_res_success)
47 return (res);
48 input->stream->np_index = 0;
49 }
50
51 /* detach the payload from the original nmsg container */
52 np = input->stream->nmsg->payloads[input->stream->np_index];
53 input->stream->nmsg->payloads[input->stream->np_index] = NULL;
54
55 /* filter payload */
56 if (_input_nmsg_filter(input, input->stream->np_index, np) == false) {
57 _nmsg_payload_free(&np);
58 return (nmsg_res_again);
59 }
60
61 /* pass a pointer to the payload to the caller */
62 *msg = _nmsg_message_from_payload(np);
63 if (*msg == NULL) {
64 _nmsg_payload_free(&np);
65 return (nmsg_res_memfail);
66 }
67
68 /* possibly sleep a bit if ingress rate control is enabled */
69 if (input->stream->brate != NULL)
70 _nmsg_brate_sleep(input->stream->brate,
71 input->stream->nc_size,
72 input->stream->nmsg->n_payloads,
73 input->stream->np_index);
74
75 return (nmsg_res_success);
76 }
77
78 nmsg_res
_input_nmsg_loop(nmsg_input_t input,int cnt,nmsg_cb_message cb,void * user)79 _input_nmsg_loop(nmsg_input_t input, int cnt, nmsg_cb_message cb, void *user) {
80 unsigned n;
81 nmsg_res res;
82 Nmsg__Nmsg *nmsg;
83 Nmsg__NmsgPayload *np;
84 nmsg_message_t msg;
85
86 if (cnt < 0) {
87 /* loop indefinitely */
88 for (;;) {
89 if (input->stop)
90 break;
91 res = input->stream->stream_read_fp(input, &input->stream->nmsg);
92 if (res == nmsg_res_again)
93 continue;
94 if (res != nmsg_res_success)
95 return (res);
96
97 nmsg = input->stream->nmsg;
98 for (n = 0; n < nmsg->n_payloads; n++) {
99 np = nmsg->payloads[n];
100 if (_input_nmsg_filter(input, n, np)) {
101 msg = _nmsg_message_from_payload(np);
102 cb(msg, user);
103 }
104 }
105 nmsg->n_payloads = 0;
106 free(nmsg->payloads);
107 nmsg->payloads = NULL;
108 nmsg__nmsg__free_unpacked(nmsg, NULL);
109 input->stream->nmsg = NULL;
110 }
111 } else {
112 /* loop until (n_payloads == cnt) */
113 int n_payloads = 0;
114
115 for (;;) {
116 if (input->stop)
117 break;
118 res = input->stream->stream_read_fp(input, &input->stream->nmsg);
119 if (res == nmsg_res_again)
120 continue;
121 if (res != nmsg_res_success)
122 return (res);
123
124 nmsg = input->stream->nmsg;
125 for (n = 0; n < nmsg->n_payloads; n++) {
126 np = nmsg->payloads[n];
127 if (_input_nmsg_filter(input, n, np)) {
128 if (n_payloads == cnt)
129 break;
130 n_payloads += 1;
131 msg = _nmsg_message_from_payload(np);
132 cb(msg, user);
133 }
134 }
135 nmsg->n_payloads = 0;
136 free(nmsg->payloads);
137 nmsg->payloads = NULL;
138 nmsg__nmsg__free_unpacked(nmsg, NULL);
139 input->stream->nmsg = NULL;
140 if (n_payloads == cnt)
141 break;
142 }
143 }
144
145 return (nmsg_res_success);
146 }
147
148 bool
_input_nmsg_filter(nmsg_input_t input,unsigned idx,Nmsg__NmsgPayload * np)149 _input_nmsg_filter(nmsg_input_t input, unsigned idx, Nmsg__NmsgPayload *np) {
150 assert(input->stream->nmsg != NULL);
151
152 /* payload crc */
153 if (input->stream->nmsg->n_payload_crcs >= (idx + 1)) {
154 uint32_t wire_crc = input->stream->nmsg->payload_crcs[idx];
155 uint32_t calc_crc = my_crc32c(np->payload.data, np->payload.len);
156 if (ntohl(wire_crc) != calc_crc) {
157 _nmsg_dprintf(1, "libnmsg: WARNING: crc mismatch (%x != %x) [%s]\n",
158 calc_crc, wire_crc, __func__);
159 return (false);
160 }
161 }
162
163 /* (vid, msgtype) */
164 if (input->do_filter == true &&
165 (input->filter_vid != np->vid ||
166 input->filter_msgtype != np->msgtype))
167 {
168 return (false);
169 }
170
171 /* source */
172 if (input->stream->source > 0 &&
173 input->stream->source != np->source)
174 {
175 return (false);
176 }
177
178 /* operator */
179 if (input->stream->operator > 0 &&
180 input->stream->operator != np->operator_)
181 {
182 return (false);
183 }
184
185 /* group */
186 if (input->stream->group > 0 &&
187 input->stream->group != np->group)
188 {
189 return (false);
190 }
191
192 /* all passed */
193 return (true);
194 }
195
196 nmsg_res
_input_nmsg_unpack_container(nmsg_input_t input,Nmsg__Nmsg ** nmsg,uint8_t * buf,size_t buf_len)197 _input_nmsg_unpack_container(nmsg_input_t input, Nmsg__Nmsg **nmsg,
198 uint8_t *buf, size_t buf_len)
199 {
200 nmsg_res res = nmsg_res_success;
201
202 input->stream->nc_size = buf_len + NMSG_HDRLSZ_V2;
203 _nmsg_dprintf(6, "%s: unpacking container len= %zd\n", __func__, buf_len);
204
205 if (input->stream->flags & NMSG_FLAG_FRAGMENT) {
206 res = _input_frag_read(input, nmsg, buf, buf_len);
207 } else if (input->stream->flags & NMSG_FLAG_ZLIB) {
208 size_t u_len;
209 u_char *u_buf;
210
211 res = nmsg_zbuf_inflate(input->stream->zb, buf_len, buf, &u_len, &u_buf);
212 if (res != nmsg_res_success)
213 return (res);
214 *nmsg = nmsg__nmsg__unpack(NULL, u_len, u_buf);
215 free(u_buf);
216 if (*nmsg == NULL)
217 return (nmsg_res_parse_error);
218 } else {
219 *nmsg = nmsg__nmsg__unpack(NULL, buf_len, buf);
220 if (*nmsg == NULL)
221 return (nmsg_res_parse_error);
222 }
223
224 return (res);
225 }
226
227 nmsg_res
_input_nmsg_unpack_container2(const uint8_t * buf,size_t buf_len,unsigned flags,Nmsg__Nmsg ** nmsg)228 _input_nmsg_unpack_container2(const uint8_t *buf, size_t buf_len,
229 unsigned flags, Nmsg__Nmsg **nmsg)
230 {
231 nmsg_res res;
232
233 /* fragmented containers aren't handled by this function */
234 if (flags & NMSG_FLAG_FRAGMENT)
235 return (nmsg_res_failure);
236
237 if (flags & NMSG_FLAG_ZLIB) {
238 size_t u_len;
239 u_char *u_buf;
240 nmsg_zbuf_t zb;
241
242 zb = nmsg_zbuf_inflate_init();
243 if (zb == NULL)
244 return (nmsg_res_memfail);
245 res = nmsg_zbuf_inflate(zb, buf_len, (uint8_t *) buf, &u_len, &u_buf);
246 nmsg_zbuf_destroy(&zb);
247 if (res != nmsg_res_success)
248 return (res);
249 *nmsg = nmsg__nmsg__unpack(NULL, u_len, u_buf);
250 free(u_buf);
251 if (*nmsg == NULL)
252 return (nmsg_res_failure);
253 } else {
254 *nmsg = nmsg__nmsg__unpack(NULL, buf_len, buf);
255 if (*nmsg == NULL)
256 return (nmsg_res_failure);
257 }
258
259 return (nmsg_res_success);
260 }
261
262 nmsg_res
_input_nmsg_read_container_file(nmsg_input_t input,Nmsg__Nmsg ** nmsg)263 _input_nmsg_read_container_file(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
264 nmsg_res res;
265 ssize_t bytes_avail, msgsize = 0;
266
267 assert(input->stream->type == nmsg_stream_type_file);
268
269 /* read */
270 res = read_file(input, &msgsize);
271 if (res != nmsg_res_success)
272 return (res);
273
274 /* ensure that the full NMSG container is available */
275 bytes_avail = _nmsg_buf_avail(input->stream->buf);
276 if (bytes_avail < msgsize) {
277 ssize_t bytes_to_read = msgsize - bytes_avail;
278
279 res = do_read_file(input, bytes_to_read, bytes_to_read);
280 if (res != nmsg_res_success)
281 return (res);
282 }
283
284 /* unpack message */
285 res = _input_nmsg_unpack_container(input, nmsg, input->stream->buf->pos, msgsize);
286 input->stream->buf->pos += msgsize;
287
288 return (res);
289 }
290
291 nmsg_res
_input_nmsg_read_container_sock(nmsg_input_t input,Nmsg__Nmsg ** nmsg)292 _input_nmsg_read_container_sock(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
293 nmsg_res res;
294 ssize_t msgsize;
295 struct nmsg_buf *buf = input->stream->buf;
296
297 assert(input->stream->type == nmsg_stream_type_sock);
298
299 /* read the NMSG container */
300 _nmsg_buf_reset(buf);
301 res = do_read_sock(input, buf->bufsz);
302 if (res != nmsg_res_success) {
303 if (res == nmsg_res_read_failure)
304 return (res);
305 else
306 /* forward compatibility */
307 return (nmsg_res_again);
308 }
309 if (_nmsg_buf_avail(buf) < NMSG_HDRLSZ_V2)
310 return (nmsg_res_failure);
311
312 /* deserialize the NMSG header */
313 res = _input_nmsg_deserialize_header(buf->pos,
314 _nmsg_buf_avail(buf),
315 &msgsize,
316 &input->stream->flags);
317 if (res != nmsg_res_success)
318 return (res);
319 buf->pos += NMSG_HDRLSZ_V2;
320
321 /* since the input stream is a sock stream, the entire message must
322 * have been read by the call to do_read_sock() */
323 if (_nmsg_buf_avail(buf) != msgsize)
324 return (nmsg_res_parse_error);
325
326 /* unpack message */
327 res = _input_nmsg_unpack_container(input, nmsg, buf->pos, msgsize);
328 input->stream->buf->pos += msgsize;
329
330 /* update counters */
331 if (*nmsg != NULL) {
332 input->stream->count_recv += 1;
333
334 if (input->stream->verify_seqsrc) {
335 struct nmsg_seqsrc *seqsrc;
336
337 seqsrc = _input_seqsrc_get(input, *nmsg);
338 if (seqsrc != NULL) {
339 size_t drop;
340 drop = _input_seqsrc_update(input, seqsrc, *nmsg);
341 input->stream->count_drop += drop;
342 }
343 }
344 }
345
346 /* expire old outstanding fragments */
347 _input_frag_gc(input->stream);
348
349 return (res);
350 }
351
352 #ifdef HAVE_LIBZMQ
353 nmsg_res
_input_nmsg_read_container_zmq(nmsg_input_t input,Nmsg__Nmsg ** nmsg)354 _input_nmsg_read_container_zmq(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
355 int ret;
356 nmsg_res res;
357 uint8_t *buf;
358 size_t buf_len;
359 ssize_t msgsize = 0;
360 zmq_msg_t zmsg;
361 zmq_pollitem_t zitems[1];
362
363 /* poll */
364 zitems[0].socket = input->stream->zmq;
365 zitems[0].events = ZMQ_POLLIN;
366 ret = zmq_poll(zitems, 1, NMSG_RBUF_TIMEOUT);
367 if (ret == 0 || (ret == -1 && errno == EINTR))
368 return (nmsg_res_again);
369 else if (ret == -1)
370 return (nmsg_res_read_failure);
371
372 /* initialize ZMQ message object */
373 if (zmq_msg_init(&zmsg))
374 return (nmsg_res_failure);
375
376 /* read the NMSG container */
377 if (zmq_recvmsg(input->stream->zmq, &zmsg, 0) == -1) {
378 res = nmsg_res_failure;
379 goto out;
380 }
381 nmsg_timespec_get(&input->stream->now);
382
383 /* get buffer from the ZMQ message */
384 buf = zmq_msg_data(&zmsg);
385 buf_len = zmq_msg_size(&zmsg);
386 if (buf_len < NMSG_HDRLSZ_V2) {
387 res = nmsg_res_failure;
388 goto out;
389 }
390
391 /* deserialize the NMSG header */
392 res = _input_nmsg_deserialize_header(buf, buf_len, &msgsize, &input->stream->flags);
393 if (res != nmsg_res_success)
394 goto out;
395 buf += NMSG_HDRLSZ_V2;
396
397 /* the entire message must have been read by zmq_recvmsg() */
398 assert((size_t) msgsize == buf_len - NMSG_HDRLSZ_V2);
399
400 /* unpack message */
401 res = _input_nmsg_unpack_container(input, nmsg, buf, msgsize);
402
403 /* update seqsrc counts */
404 if (input->stream->verify_seqsrc && *nmsg != NULL) {
405 struct nmsg_seqsrc *seqsrc = _input_seqsrc_get(input, *nmsg);
406 if (seqsrc != NULL)
407 _input_seqsrc_update(input, seqsrc, *nmsg);
408 }
409
410 /* expire old outstanding fragments */
411 _input_frag_gc(input->stream);
412
413 out:
414 zmq_msg_close(&zmsg);
415 return (res);
416 }
417 #endif /* HAVE_LIBZMQ */
418
419 nmsg_res
_input_nmsg_deserialize_header(const uint8_t * buf,size_t buf_len,ssize_t * msgsize,unsigned * flags)420 _input_nmsg_deserialize_header(const uint8_t *buf, size_t buf_len,
421 ssize_t *msgsize, unsigned *flags)
422 {
423 static const char magic[] = NMSG_MAGIC;
424 uint16_t version;
425
426 if (buf_len < NMSG_LENHDRSZ_V2)
427 return (nmsg_res_failure);
428
429 /* check magic */
430 if (memcmp(buf, magic, sizeof(magic)) != 0)
431 return (nmsg_res_magic_mismatch);
432 buf += sizeof(magic);
433
434 /* check version */
435 load_net16(buf, &version);
436 if ((version & 0xFF) != 2U)
437 return (nmsg_res_version_mismatch);
438 *flags = version >> 8;
439 buf += sizeof(version);
440
441 /* load message (container) size */
442 load_net32(buf, msgsize);
443
444 return (nmsg_res_success);
445 }
446
447
448 /* Private functions. */
449
450 static nmsg_res
read_file(nmsg_input_t input,ssize_t * msgsize)451 read_file(nmsg_input_t input, ssize_t *msgsize) {
452 static const char magic[] = NMSG_MAGIC;
453
454 bool reset_buf = false;
455 ssize_t bytes_avail, bytes_needed, lenhdrsz;
456 nmsg_res res = nmsg_res_failure;
457 uint16_t version;
458 struct nmsg_buf *buf = input->stream->buf;
459
460 /* ensure we have the (magic, version) header fields */
461 bytes_avail = _nmsg_buf_avail(buf);
462 if (bytes_avail < NMSG_HDRSZ) {
463 assert(bytes_avail >= 0);
464 bytes_needed = NMSG_HDRSZ - bytes_avail;
465 if (bytes_avail == 0) {
466 _nmsg_buf_reset(buf);
467 res = do_read_file(input, bytes_needed, buf->bufsz);
468 } else {
469 /* the (magic, version) header fields were split */
470 res = do_read_file(input, bytes_needed, bytes_needed);
471 reset_buf = true;
472 }
473 if (res != nmsg_res_success)
474 return (res);
475 }
476 bytes_avail = _nmsg_buf_avail(buf);
477 assert(bytes_avail >= NMSG_HDRSZ);
478
479 /* check magic */
480 if (memcmp(buf->pos, magic, sizeof(magic)) != 0)
481 return (nmsg_res_magic_mismatch);
482 buf->pos += sizeof(magic);
483
484 /* check version */
485 load_net16(buf->pos, &version);
486 buf->pos += 2;
487 if (version == 1U) {
488 lenhdrsz = NMSG_LENHDRSZ_V1;
489 } else if ((version & 0xFF) == 2U) {
490 input->stream->flags = version >> 8;
491 version &= 0xFF;
492 lenhdrsz = NMSG_LENHDRSZ_V2;
493 } else {
494 res = nmsg_res_version_mismatch;
495 goto read_header_out;
496 }
497
498 /* if reset_buf was set, then reading the (magic, version) header
499 * required two read()s. at this point we've consumed all the split
500 * header data, so reset the buffer to avoid overflow.
501 */
502 if (reset_buf == true) {
503 _nmsg_buf_reset(buf);
504 reset_buf = false;
505 }
506
507 /* ensure we have the length header field */
508 bytes_avail = _nmsg_buf_avail(buf);
509 if (bytes_avail < lenhdrsz) {
510 if (bytes_avail == 0)
511 _nmsg_buf_reset(buf);
512 bytes_needed = lenhdrsz - bytes_avail;
513 if (bytes_avail == 0) {
514 res = do_read_file(input, bytes_needed, buf->bufsz);
515 } else {
516 /* the length header field was split */
517 res = do_read_file(input, bytes_needed, bytes_needed);
518 reset_buf = true;
519 }
520 if (res != nmsg_res_success)
521 return (res);
522 }
523 bytes_avail = _nmsg_buf_avail(buf);
524 assert(bytes_avail >= lenhdrsz);
525
526 /* load message size */
527 if (version == 1U) {
528 load_net16(buf->pos, msgsize);
529 buf->pos += 2;
530 } else if (version == 2U) {
531 load_net32(buf->pos, msgsize);
532 buf->pos += 4;
533 }
534
535 res = nmsg_res_success;
536
537 read_header_out:
538 if (reset_buf == true)
539 _nmsg_buf_reset(buf);
540
541 return (res);
542 }
543
544 static nmsg_res
do_read_file(nmsg_input_t input,ssize_t bytes_needed,ssize_t bytes_max)545 do_read_file(nmsg_input_t input, ssize_t bytes_needed, ssize_t bytes_max) {
546 ssize_t bytes_read;
547 struct nmsg_buf *buf = input->stream->buf;
548
549 /* sanity check */
550 assert(bytes_needed <= bytes_max);
551
552 /* check that we have enough buffer space */
553 assert((buf->end + bytes_max) <= (buf->data + NMSG_RBUFSZ));
554
555 while (bytes_needed > 0) {
556 bytes_read = read(buf->fd, buf->end, bytes_max);
557 if (bytes_read < 0)
558 return (nmsg_res_failure);
559 if (bytes_read == 0)
560 return (nmsg_res_eof);
561 buf->end += bytes_read;
562 bytes_needed -= bytes_read;
563 bytes_max -= bytes_read;
564 }
565 nmsg_timespec_get(&input->stream->now);
566 return (nmsg_res_success);
567 }
568
569 static nmsg_res
do_read_sock(nmsg_input_t input,ssize_t bytes_max)570 do_read_sock(nmsg_input_t input, ssize_t bytes_max) {
571 int ret;
572 ssize_t bytes_read;
573 struct nmsg_buf *buf = input->stream->buf;
574 socklen_t addr_len = sizeof(struct sockaddr_storage);
575
576 /* check that we have enough buffer space */
577 assert((buf->end + bytes_max) <= (buf->data + NMSG_RBUFSZ));
578
579 if (input->stream->blocking_io == true) {
580 /* poll */
581 ret = poll(&input->stream->pfd, 1, NMSG_RBUF_TIMEOUT);
582 if (ret == 0 || (ret == -1 && errno == EINTR))
583 return (nmsg_res_again);
584 else if (ret == -1)
585 return (nmsg_res_read_failure);
586 }
587
588 /* read */
589 bytes_read = recvfrom(buf->fd, buf->pos, bytes_max, 0,
590 (struct sockaddr *) &input->stream->addr_ss, &addr_len);
591 nmsg_timespec_get(&input->stream->now);
592
593 if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
594 return (nmsg_res_again);
595 if (bytes_read < 0)
596 return (nmsg_res_read_failure);
597 if (bytes_read == 0)
598 return (nmsg_res_eof);
599 buf->end = buf->pos + bytes_read;
600
601 return (nmsg_res_success);
602 }
603