1 /*
2 ** Copyright (C) 2006-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8
9 /*
10 ** skstream.c
11 ** Mark Thomas July-2006
12 **
13 ** skstream provides a wrapper around file pointers and file
14 ** descriptors. It handles both textual and binary data.
15 */
16
17
18 #include <silk/silk.h>
19
20 RCSIDENT("$SiLK: skstream.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
21
22 #include <silk/skstream.h>
23 #include <silk/sksite.h>
24 #include "skheader_priv.h"
25 #include "skiobuf.h"
26 #include "skstream_priv.h"
27
28
29 /* LOCAL DEFINES AND TYPEDEFS */
30
31 #ifndef DEFAULT_FILE_FORMAT
32 # if SK_ENABLE_IPV6
33 # define DEFAULT_FILE_FORMAT FT_RWIPV6ROUTING
34 # else
35 # define DEFAULT_FILE_FORMAT FT_RWGENERIC
36 # endif
37 #endif
38
39
40 /*
41 * Name of environment variable that affects how to treat ICMP flow
42 * records. This variable determines the setting of the
43 * 'silk_icmp_nochange' global. See the detailed note in
44 * skStreamReadRecord().
45 */
46 #define SILK_ICMP_SPORT_HANDLER_ENVAR "SILK_ICMP_SPORT_HANDLER"
47
48
49 /*
50 * Return SKSTREAM_ERR_NULL_ARGUMENT when 'srin_stream' is NULL.
51 */
52 #define STREAM_RETURN_IF_NULL(srin_stream) \
53 if (NULL == (srin_stream)) { return SKSTREAM_ERR_NULL_ARGUMENT; }
54
55
56 /*
57 * Set the 'is_silk_flow' bit on 'stream' if the format of the
58 * header indicates it contains SiLK Flow records.
59 */
60 #define STREAM_SET_IS_SILK_FLOW(stream) \
61 switch (skHeaderGetFileFormat((stream)->silk_hdr)) { \
62 case FT_RWAUGMENTED: \
63 case FT_RWAUGROUTING: \
64 case FT_RWAUGWEB: \
65 case FT_RWAUGSNMPOUT: \
66 case FT_RWFILTER: \
67 case FT_FLOWCAP: \
68 case FT_RWGENERIC: \
69 case FT_RWIPV6: \
70 case FT_RWIPV6ROUTING: \
71 case FT_RWNOTROUTED: \
72 case FT_RWROUTED: \
73 case FT_RWSPLIT: \
74 case FT_RWWWW: \
75 (stream)->is_silk_flow = 1; \
76 break; \
77 default: \
78 (stream)->is_silk_flow = 0; \
79 break; \
80 }
81
82
83 /* LOCAL VARIABLES */
84
85 /*
86 * If nonzero, do not attempt process ICMP values in the sPort
87 * field. This is 0 unless the SILK_ICMP_SPORT_HANDLER envar is
88 * set to "none". See the detailed note in skStreamReadRecord().
89 */
90 static int silk_icmp_nochange = 0;
91
92 #ifdef SILK_CLOBBER_ENVAR
93 /*
94 * If nonzero, enable clobbering (overwriting) of existing files
95 */
96 static int silk_clobber = 0;
97 #endif
98
99
100 /* LOCAL FUNCTION PROTOTYPES */
101
102 static ssize_t
103 streamIOBufCallbackRead(
104 skstream_t *stream,
105 void *buf,
106 size_t count);
107
108 static ssize_t
109 streamIOBufCallbackWrite(
110 skstream_t *stream,
111 const void *buf,
112 size_t count);
113
114 static off_t
115 streamIOBufCallbackSeek(
116 skstream_t *stream,
117 off_t offset,
118 int whence);
119
120 /* FUNCTION DEFINITIONS */
121
122 /*
123 * Update 'stream' with the sensor, type, and starting-hour stored
124 * in the stream's header if 'stream' is bound to a packed hourly
125 * data file.
126 */
127 static void
streamCacheHeader(skstream_t * stream)128 streamCacheHeader(
129 skstream_t *stream)
130 {
131 const sk_header_entry_t *hentry;
132
133 assert(stream);
134 assert(stream->is_silk_flow);
135
136 hentry = skHeaderGetFirstMatch(stream->silk_hdr, SK_HENTRY_PACKEDFILE_ID);
137 if (hentry) {
138 stream->hdr_starttime = skHentryPackedfileGetStartTime(hentry);
139 stream->hdr_sensor = skHentryPackedfileGetSensorID(hentry);
140 stream->hdr_flowtype = skHentryPackedfileGetFlowtypeID(hentry);
141 }
142 }
143
144
145 /*
146 * Return an error string representing the most recent error that
147 * occurred on 'stream'.
148 *
149 * This function is a callback invoked by skIOBufStrError().
150 */
151 static const char *
streamCallbackStrerror(skstream_t * stream,int UNUSED (errnum))152 streamCallbackStrerror(
153 skstream_t *stream,
154 int UNUSED(errnum))
155 {
156 if (stream->err_info == SKSTREAM_ERR_ZLIB) {
157 #if SK_ENABLE_ZLIB
158 if (stream->gz) {
159 int zerr;
160 return gzerror(stream->gz, &zerr);
161 }
162 #endif /* SK_ENABLE_ZLIB */
163 return "Interal zlib error";
164 }
165 return strerror(stream->errnum);
166 }
167
168
169 /*
170 * status = streamCheckAttributes(stream, io_mode_list, content_type_list);
171 *
172 */
173 static int
streamCheckAttributes(skstream_t * stream,int io_mode_list,int content_type_list)174 streamCheckAttributes(
175 skstream_t *stream,
176 int io_mode_list,
177 int content_type_list)
178 {
179 assert(stream);
180 if ( !(stream->io_mode & io_mode_list)) {
181 return SKSTREAM_ERR_UNSUPPORT_IOMODE;
182 } else if ( !(stream->content_type & content_type_list)) {
183 return SKSTREAM_ERR_UNSUPPORT_CONTENT;
184 } else {
185 return SKSTREAM_OK;
186 }
187 }
188
189
190 /*
191 * status = streamCheckModifiable(stream);
192 *
193 * Return SKSTREAM_OK if the caller is still allowed to set aspects
194 * of 'stream'; otherwise return the reason why 'stream' cannot be
195 * modified.
196 */
197 static int
streamCheckModifiable(skstream_t * stream)198 streamCheckModifiable(
199 skstream_t *stream)
200 {
201 assert(stream);
202 if (stream->is_closed) {
203 return SKSTREAM_ERR_CLOSED;
204 } else if (stream->is_dirty) {
205 return SKSTREAM_ERR_PREV_DATA;
206 } else {
207 return SKSTREAM_OK;
208 }
209 }
210
211
212 /*
213 * status = streamCheckOpen(stream);
214 *
215 * Call this function on a stream which you expect to be open; it
216 * will return SKSTREAM_OK if 'stream' is open, or an error code
217 * explaining why 'stream' is not open.
218 *
219 * A stream that has been opened and closed is neither open nor
220 * unopened.
221 */
222 static int
streamCheckOpen(const skstream_t * stream)223 streamCheckOpen(
224 const skstream_t *stream)
225 {
226 assert(stream);
227 if (stream->is_closed) {
228 return SKSTREAM_ERR_CLOSED;
229 } else if (stream->fd == -1) {
230 return SKSTREAM_ERR_NOT_OPEN;
231 } else {
232 return SKSTREAM_OK;
233 }
234 }
235
236
237 /*
238 * status = streamCheckUnopened(stream);
239 *
240 * Call this function on a stream which you expect to be
241 * unopened---i.e., not yet open. It will return SKSTREAM_OK if
242 * 'stream' is unopened, or an error code explaining why 'stream'
243 * is not considered unopened.
244 *
245 * A stream that has been opened and closed is neither open nor
246 * unopened.
247 */
248 static int
streamCheckUnopened(const skstream_t * stream)249 streamCheckUnopened(
250 const skstream_t *stream)
251 {
252 assert(stream);
253 if (stream->is_closed) {
254 return SKSTREAM_ERR_CLOSED;
255 } else if (stream->fd != -1) {
256 return SKSTREAM_ERR_PREV_OPEN;
257 } else {
258 return SKSTREAM_OK;
259 }
260 }
261
262
263 #if SK_ENABLE_ZLIB
264 /*
265 * status = streamGZFlush(stream);
266 *
267 * Tell the zlib descriptor associated with 'stream' to flush any
268 * unwritten data to the stream.
269 *
270 * This function is invoked directly by other skstream functions
271 * and it is a callback invoked by skIOBufFlush().
272 */
273 static int
streamGZFlush(skstream_t * stream)274 streamGZFlush(
275 skstream_t *stream)
276 {
277 int zerr = gzflush(stream->gz, Z_SYNC_FLUSH);
278 if (zerr == Z_OK) {
279 return 0;
280 }
281 stream->is_iobuf_error = 1;
282 if (zerr == Z_ERRNO) {
283 stream->errnum = errno;
284 stream->err_info = SKSTREAM_ERR_WRITE;
285 } else {
286 stream->errnum = zerr;
287 stream->err_info = SKSTREAM_ERR_ZLIB;
288 }
289 return -1;
290 }
291
292
293 /*
294 * status = streamGZRead(stream, buf, count);
295 *
296 * Read 'count' bytes from the zlib descriptor associated with
297 * 'stream' and put them into 'buf'.
298 *
299 * This function is invoked directly by other skstream functions
300 * and it is a callback invoked by skIOBufRead().
301 */
302 static ssize_t
streamGZRead(skstream_t * stream,void * buf,size_t count)303 streamGZRead(
304 skstream_t *stream,
305 void *buf,
306 size_t count)
307 {
308 int got = gzread(stream->gz, buf, (unsigned)count);
309 if (got == -1) {
310 stream->is_iobuf_error = 1;
311 (void)gzerror(stream->gz, &stream->errnum);
312 if (stream->errnum == Z_ERRNO) {
313 stream->errnum = errno;
314 stream->err_info = SKSTREAM_ERR_READ;
315 } else {
316 stream->err_info = SKSTREAM_ERR_ZLIB;
317 }
318 }
319 return (ssize_t)got;
320 }
321
322
323 /*
324 * status = streamGZRead(stream, buf, count);
325 *
326 * Write 'count' bytes from 'buf' to the zlib descriptor associated
327 * with 'stream'.
328 *
329 * This function is invoked directly by other skstream functions
330 * and it is a callback invoked by skIOBufWrite().
331 */
332 static ssize_t
streamGZWrite(skstream_t * stream,const void * buf,size_t count)333 streamGZWrite(
334 skstream_t *stream,
335 const void *buf,
336 size_t count)
337 {
338 int written = gzwrite(stream->gz, buf, (unsigned)count);
339 if (written > 0 || count == 0) {
340 return written;
341 }
342 stream->is_iobuf_error = 1;
343 (void)gzerror(stream->gz, &stream->errnum);
344 if (stream->errnum == Z_ERRNO) {
345 stream->errnum = errno;
346 stream->err_info = SKSTREAM_ERR_WRITE;
347 } else {
348 stream->err_info = SKSTREAM_ERR_ZLIB;
349 }
350 return -1;
351 }
352 #endif /* SK_ENABLE_ZLIB */
353
354
355 /*
356 * status = streamIOBufCreate(stream);
357 *
358 * Create the skIOBuf that 'stream' will read-from/write-to, and
359 * bind it to the file descriptor or gzfile. Return SKSTREAM_OK on
360 * success, or an error code on failure.
361 */
362 static int
streamIOBufCreate(skstream_t * stream)363 streamIOBufCreate(
364 skstream_t *stream)
365 {
366 int rv = SKSTREAM_OK;
367 uint8_t compmethod = SK_COMPMETHOD_NONE;
368 skio_abstract_t io_func;
369
370 assert(stream);
371 assert(stream->fd != -1);
372
373 if (stream->is_unbuffered) {
374 goto END;
375 }
376
377 memset(&io_func, 0, sizeof(skio_abstract_t));
378
379 /* make certain compression method is available */
380 if (stream->is_silk) {
381 compmethod = skHeaderGetCompressionMethod(stream->silk_hdr);
382 switch (skCompMethodCheck(compmethod)) {
383 case SK_COMPMETHOD_IS_AVAIL:
384 /* known, valid, and available */
385 break;
386 case SK_COMPMETHOD_IS_VALID:
387 /* known and valid but not available */
388 rv = SKSTREAM_ERR_COMPRESS_UNAVAILABLE;
389 goto END;
390 case SK_COMPMETHOD_IS_KNOWN:
391 /* should never be undecided at this point */
392 skAbort();
393 default:
394 rv = SKSTREAM_ERR_COMPRESS_INVALID;
395 goto END;
396 }
397 }
398
399 /* store location where the IOBuf was enabled */
400 stream->pre_iobuf_pos = lseek(stream->fd, 0, SEEK_CUR);
401
402 /* create the iobuf */
403 switch (stream->io_mode) {
404 case SK_IO_READ:
405 /* create the buffered reader */
406 stream->iobuf = skIOBufCreate(stream->io_mode);
407 break;
408
409 case SK_IO_WRITE:
410 case SK_IO_APPEND:
411 stream->iobuf = skIOBufCreate(stream->io_mode);
412 break;
413 }
414 if (stream->iobuf == NULL) {
415 rv = SKSTREAM_ERR_ALLOC;
416 goto END;
417 }
418
419 /* get the information for SiLK files */
420 if (stream->is_silk) {
421 /* make certain the record size is non-zero */
422 size_t reclen;
423 reclen = skHeaderGetRecordLength(stream->silk_hdr);
424 if (reclen == 0) {
425 reclen = 1;
426 skHeaderSetRecordLength(stream->silk_hdr, reclen);
427 }
428
429 /* set the record size on the IOBuf */
430 if (-1 == skIOBufSetRecordSize(stream->iobuf, reclen)) {
431 rv = SKSTREAM_ERR_IOBUF;
432 goto END;
433 }
434 }
435
436 /* bind it to the file descriptor or gzfile */
437 #if SK_ENABLE_ZLIB
438 if (stream->gz) {
439 io_func.read = (skio_read_fn_t)&streamGZRead;
440 io_func.write = (skio_write_fn_t)&streamGZWrite;
441 io_func.seek = NULL;
442 io_func.flush = (skio_flush_fn_t)&streamGZFlush;
443 io_func.strerror = (skio_strerror_fn_t)&streamCallbackStrerror;
444 if (skIOBufBindAbstract(stream->iobuf, stream, compmethod, &io_func)
445 == -1)
446 {
447 rv = SKSTREAM_ERR_IOBUF;
448 goto END;
449 }
450 } else
451 #endif /* SK_ENABLE_ZLIB */
452 {
453 /* if (skIOBufBind(stream->iobuf, stream->fd, compmethod) == -1) */
454 io_func.read = (skio_read_fn_t)&streamIOBufCallbackRead;
455 io_func.write = (skio_write_fn_t)&streamIOBufCallbackWrite;
456 io_func.seek = (skio_seek_fn_t)&streamIOBufCallbackSeek;
457 io_func.strerror = (skio_strerror_fn_t)&streamCallbackStrerror;
458 if (skIOBufBindAbstract(stream->iobuf, stream, compmethod, &io_func)
459 == -1)
460 {
461 rv = SKSTREAM_ERR_IOBUF;
462 goto END;
463 }
464 }
465
466 END:
467 return rv;
468 }
469
470
471 /*
472 * status = streamIOBufGetLine(stream, out_buffer, buf_size);
473 *
474 * Fill 'out_buffer' with the next '\n'-delimited line of text from
475 * the IOBuf associated with the 'stream'. The '\n' is replaced
476 * with '\0'. If the final input is smaller than 'buf_size' and
477 * does not contain a '\n' it will be copied into 'out_buffer'.
478 * Return SKSTREAM_OK on success.
479 *
480 * If there is no '\n' within the first 'buf_size' characters of the
481 * input, return SKSTREAM_ERR_LONG_LINE and read from the IOBuf
482 * until a '\n' is found or until end-of-file is reached.
483 *
484 * Return SKSTREAM_ERR_EOF when all input data has been processed.
485 *
486 * Return SKSTREAM_ERR_IOBUF is there is a problem reading from the
487 * IOBuf.
488 *
489 * This function mimics fgets().
490 */
491 static int
streamIOBufGetLine(skstream_t * stream,char * out_buffer,size_t buf_size)492 streamIOBufGetLine(
493 skstream_t *stream,
494 char *out_buffer,
495 size_t buf_size)
496 {
497 char *eol = NULL;
498 ssize_t sz;
499 int rv = SKSTREAM_OK;
500
501 while (eol == NULL) {
502 /* substract 1 for final '\0' */
503 sz = skIOBufReadToChar(stream->iobuf, out_buffer, buf_size-1, '\n');
504 if (sz == -1) {
505 if (stream->is_iobuf_error) {
506 stream->is_iobuf_error = 0;
507 rv = stream->err_info;
508 } else {
509 rv = SKSTREAM_ERR_IOBUF;
510 }
511 break;
512 }
513 if (sz == 0) {
514 rv = SKSTREAM_ERR_EOF;
515 break;
516 }
517 if ((sz == (ssize_t)buf_size-1) && ('\n' != out_buffer[sz-1])) {
518 /* Found no newline in 'buf_size' characters... */
519 rv = SKSTREAM_ERR_LONG_LINE;
520 /* need to read more from skiobuf to find next '\n' */
521 continue;
522 }
523
524 /* NUL terminate the string, either by replacing '\n' with a
525 * '\0', or by putting a '\0' after the final character. */
526 eol = &out_buffer[sz-1];
527 if (*eol != '\n') {
528 ++eol;
529 }
530 *eol = '\0';
531 }
532
533 return rv;
534 }
535
536
537 /*
538 * status = streamIOBufCallbackRead(stream, buf, count);
539 *
540 * Read 'count' bytes from the file descriptor associated with
541 * 'stream' and put them into 'buf'.
542 *
543 * This function is a callback invoked by skIOBufRead().
544 */
545 static ssize_t
streamIOBufCallbackRead(skstream_t * stream,void * buf,size_t count)546 streamIOBufCallbackRead(
547 skstream_t *stream,
548 void *buf,
549 size_t count)
550 {
551 ssize_t rv;
552
553 rv = skreadn(stream->fd, buf, count);
554 if (rv == -1) {
555 stream->is_iobuf_error = 1;
556 stream->errnum = errno;
557 stream->err_info = SKSTREAM_ERR_READ;
558 }
559 return rv;
560 }
561
562
563 /*
564 * status = streamIOBufCallbackWrite(stream, buf, count);
565 *
566 * Write 'count' bytes from 'buf' to the file descriptor associated
567 * with 'stream'.
568 *
569 * This function is a callback invoked by skIOBufWrite().
570 */
571 static ssize_t
streamIOBufCallbackWrite(skstream_t * stream,const void * buf,size_t count)572 streamIOBufCallbackWrite(
573 skstream_t *stream,
574 const void *buf,
575 size_t count)
576 {
577 ssize_t rv;
578
579 rv = skwriten(stream->fd, buf, count);
580 if (rv == -1) {
581 stream->is_iobuf_error = 1;
582 stream->errnum = errno;
583 stream->err_info = SKSTREAM_ERR_WRITE;
584 }
585 return rv;
586 }
587
588
589 /*
590 * status = streamIOBufCallbackSeek(stream, offset, whence);
591 *
592 * Seeks to a location in 'stream' from 'whence' modified by 'offset'.
593 *
594 * This function is a callback invoked by skIOBufRead() when the
595 * buffer in which to return the data is NULL.
596 */
597 static off_t
streamIOBufCallbackSeek(skstream_t * stream,off_t offset,int whence)598 streamIOBufCallbackSeek(
599 skstream_t *stream,
600 off_t offset,
601 int whence)
602 {
603 off_t rv;
604
605 rv = lseek(stream->fd, offset, whence);
606 if (rv == (off_t)(-1)) {
607 if (errno != ESPIPE) {
608 stream->is_iobuf_error = 1;
609 stream->errnum = errno;
610 stream->err_info = SKSTREAM_ERR_SYS_LSEEK;
611 }
612 }
613 return rv;
614 }
615
616
617 /*
618 * If a pager has been set on 'stream' and 'stream' is connected to
619 * a terminal, invoke the pager.
620 */
621 static int
streamInvokePager(skstream_t * stream)622 streamInvokePager(
623 skstream_t *stream)
624 {
625 int rv;
626 pid_t pid;
627 int wait_status;
628
629 rv = streamCheckModifiable(stream);
630 if (rv) { goto END; }
631
632 assert(streamCheckAttributes(stream, SK_IO_WRITE, SK_CONTENT_TEXT)
633 == SKSTREAM_OK);
634
635 if (stream->pager == NULL) {
636 goto END;
637 }
638
639 if ( !stream->is_terminal) {
640 goto END;
641 }
642
643 #if 1
644 /* invoke the pager */
645 stream->fp = popen(stream->pager, "w");
646 if (NULL == stream->fp) {
647 rv = SKSTREAM_ERR_NOPAGER;
648 goto END;
649 }
650
651 /* see if pager started. There is a race condition here, and this
652 * assumes we have only one child, which should be true. */
653 pid = wait4(0, &wait_status, WNOHANG, NULL);
654 if (pid) {
655 rv = SKSTREAM_ERR_NOPAGER;
656 goto END;
657 }
658 #else
659 {
660 int pipe_des[2];
661
662 /* create pipe and fork */
663 if (pipe(pipe_des) == -1) {
664 stream->errnum = errno;
665 rv = SKSTREAM_ERR_SYS_PIPE;
666 goto END;
667 }
668 pid = fork();
669 if (pid < 0) {
670 stream->errnum = errno;
671 rv = SKSTREAM_ERR_SYS_FORK;
672 goto END;
673 }
674
675 if (pid == 0) {
676 /* CHILD */
677
678 /* close output side of pipe; set input to stdin */
679 close(pipe_des[1]);
680 if (pipe_des[0] != STDIN_FILENO) {
681 dup2(pipe_des[0], STDIN_FILENO);
682 close(pipe_des[0]);
683 }
684
685 /* invoke pager */
686 execlp(pager, NULL);
687 skAppPrintErr("Unable to invoke pager '%s': %s",
688 pager, strerror(errno));
689 _exit(EXIT_FAILURE);
690 }
691
692 /* PARENT */
693
694 /* close input side of pipe */
695 close(pipe_des[0]);
696
697 /* try to open the write side of the pipe */
698 out = fdopen(pipe_des[1], "w");
699 if (NULL == out) {
700 stream->errnum = errno;
701 rv = SKSTREAM_ERR_SYS_FDOPEN;
702 goto END;
703 }
704
705 /* it'd be nice to have a slight pause here to give child time to
706 * die if command cannot be exec'ed, but it's not worth the
707 * trouble to use select(), and sleep(1) is too long. */
708
709 /* see if child died unexpectedly */
710 if (waitpid(pid, &wait_status, WNOHANG)) {
711 rv = SKSTREAM_ERR_NOPAGER;
712 goto END;
713 }
714 }
715 #endif /* 1: whether to use popen() */
716
717 /* looks good. */
718 stream->is_pager_active = 1;
719
720 END:
721 return rv;
722 }
723
724
725
726
727 /*
728 * status = streamOpenAppend(stream);
729 *
730 * Open the stream for appending.
731 */
732 static int
streamOpenAppend(skstream_t * stream)733 streamOpenAppend(
734 skstream_t *stream)
735 {
736 int rv = SKSTREAM_OK;
737 int flags = O_RDWR | O_APPEND;
738
739 assert(stream);
740 assert(stream->pathname);
741
742 /* Open file for read and write; position at start. */
743 stream->fd = open(stream->pathname, flags, 0);
744 if (stream->fd == -1) {
745 stream->errnum = errno;
746 rv = SKSTREAM_ERR_SYS_OPEN;
747 goto END;
748 }
749 if (-1 == lseek(stream->fd, 0, SEEK_SET)) {
750 stream->errnum = errno;
751 rv = SKSTREAM_ERR_SYS_LSEEK;
752 goto END;
753 }
754
755 END:
756 return rv;
757 }
758
759
760 /*
761 * Bind the currently open file descriptor to zlib via gzdopen().
762 * When reading a stream and the underlying file is seekable, do
763 * not bind the descriptor to zlib when the GZIP magic numbers are
764 * not present.
765 */
766 static int
streamOpenGzip(skstream_t * stream)767 streamOpenGzip(
768 skstream_t *stream)
769 {
770 int is_compressed = 1;
771 ssize_t num_read;
772 uint8_t magic[3];
773 int rv = SKSTREAM_OK;
774
775 if (stream->io_mode == SK_IO_READ && stream->is_seekable) {
776 /* Read the first two characters to look for the GZIP magic
777 * number (31 139 (see RFC1952)) to see if the stream really
778 * is compressed. */
779 num_read = read(stream->fd, magic, 2);
780 if ((num_read != 2) || (magic[0] != 31u) || (magic[1] != 139u)) {
781 /* File does not contain the gzip magic number. */
782 is_compressed = 0;
783 }
784 if (0 != lseek(stream->fd, 0, SEEK_SET)) {
785 rv = SKSTREAM_ERR_SYS_LSEEK;
786 goto END;
787 }
788 }
789
790 if (is_compressed) {
791 #if SK_ENABLE_ZLIB
792 stream->gz = gzdopen(stream->fd,
793 (stream->io_mode == SK_IO_READ) ? "rb" : "wb");
794 if (stream->gz == NULL) {
795 rv = SKSTREAM_ERR_ALLOC;
796 goto END;
797 }
798 #else
799 /* compression not supported */
800 rv = SKSTREAM_ERR_COMPRESS_UNAVAILABLE;
801 goto END;
802 #endif /* SK_ENABLE_ZLIB */
803 }
804
805 END:
806 return rv;
807 }
808
809
810 /*
811 * status = streamOpenRead(stream);
812 *
813 * Open the stream for reading.
814 */
815 static int
streamOpenRead(skstream_t * stream)816 streamOpenRead(
817 skstream_t *stream)
818 {
819 int rv = SKSTREAM_OK;
820
821 assert(stream);
822 assert(stream->pathname);
823 assert(stream->io_mode == SK_IO_READ);
824 assert(-1 == stream->fd);
825
826 if (stream->is_mpi) {
827 /* for now, just set to a valid value. we should replace the
828 * checks of 'fd' with an 'is_open' flag */
829 stream->fd = INT32_MAX;
830 } else if ((0 == strcmp(stream->pathname, "stdin"))
831 || (0 == strcmp(stream->pathname, "-")))
832 {
833 stream->fd = STDIN_FILENO;
834 stream->is_stdio = 1;
835 } else {
836 stream->fd = open(stream->pathname, O_RDONLY);
837 if (stream->fd == -1) {
838 rv = SKSTREAM_ERR_SYS_OPEN;
839 stream->errnum = errno;
840 goto END;
841 }
842 }
843
844 END:
845 /* if something went wrong, close the file */
846 if (rv != SKSTREAM_OK) {
847 if (stream->fd != -1) {
848 close(stream->fd);
849 stream->fd = -1;
850 }
851 }
852 return rv;
853 }
854
855
856 static int
streamOpenWrite(skstream_t * stream)857 streamOpenWrite(
858 skstream_t *stream)
859 {
860 int rv = SKSTREAM_OK;
861
862 assert(stream);
863 assert(stream->pathname);
864 assert(stream->io_mode == SK_IO_WRITE);
865
866 if ((0 == strcmp(stream->pathname, "stdout"))
867 || (0 == strcmp(stream->pathname, "-")))
868 {
869 stream->fd = STDOUT_FILENO;
870 stream->is_stdio = 1;
871 } else if (0 == strcmp(stream->pathname, "stderr")) {
872 stream->fd = STDERR_FILENO;
873 stream->is_stdio = 1;
874 } else if (stream->is_mpi) {
875 /* for now, just set to a valid value. we should replace the
876 * checks of 'fd' with an 'is_open' flag */
877 stream->fd = INT32_MAX;
878 } else {
879 struct stat stbuf;
880 int mode, flags;
881
882 /* standard mode of 0666 */
883 mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
884
885 /* assume creating previously non-existent file */
886 flags = O_WRONLY | O_CREAT | O_EXCL;
887
888 /* try to open as a brand new file */
889 stream->fd = open(stream->pathname, flags, mode);
890 if (stream->fd == -1) {
891 stream->errnum = errno;
892 if ((stream->errnum == EEXIST)
893 && (0 == stat(stream->pathname, &stbuf)))
894 {
895 /* file exists. Try again with different flags when
896 * the file is a FIFO, the file is a character device
897 * ("/dev/null"), or the SILK_CLOBBER envar is set. */
898 if (S_ISFIFO(stbuf.st_mode)) {
899 flags = O_WRONLY;
900 } else if (S_ISCHR(stbuf.st_mode)) {
901 flags = O_WRONLY | O_NOCTTY;
902 #ifdef SILK_CLOBBER_ENVAR
903 } else if (silk_clobber) {
904 /* overwrite an existing file */
905 flags = O_WRONLY | O_TRUNC;
906 #endif /* SILK_CLOBBER_ENVAR */
907 } else {
908 rv = SKSTREAM_ERR_FILE_EXISTS;
909 goto END;
910 }
911
912 /* try again with the new flags */
913 stream->fd = open(stream->pathname, flags, mode);
914 }
915
916 /* if we (still) have an error, return */
917 if (stream->fd == -1) {
918 /* we set errnum above */
919 rv = SKSTREAM_ERR_SYS_OPEN;
920 goto END;
921 }
922 }
923 }
924
925 END:
926 return rv;
927 }
928
929
930 /*
931 * is_compressed = streamPathnameIsCompressed(pathname, io_mode);
932 *
933 * Return TRUE if 'pathname' should be considered a compressed file
934 * for the given IO mode---that is, where the entire file is
935 * compressed---or FALSE otherwise.
936 *
937 * Basically, returns TRUE when 'pathname' ends in ".gz" or when it
938 * is open for reading and contains the substring ".gz."---assuming
939 * the pathname has had a mkstemp() suffix added to it. In all
940 * other cases, it returns FALSE.
941 */
942 static int
streamPathnameIsCompressed(const char * pathname,skstream_mode_t read_write_append)943 streamPathnameIsCompressed(
944 const char *pathname,
945 skstream_mode_t read_write_append)
946 {
947 const char *gz;
948
949 gz = strstr(pathname, ".gz");
950 if (gz != NULL) {
951 switch (read_write_append) {
952 case SK_IO_READ:
953 case SK_IO_APPEND:
954 if (gz[3] == '.') {
955 return 1;
956 }
957 /* FALLTHROUGH */
958
959 case SK_IO_WRITE:
960 if (gz[3] == '\0') {
961 return 1;
962 }
963 break;
964 }
965 }
966 return 0;
967 }
968
969
970 static int
streamPostOpen(skstream_t * stream)971 streamPostOpen(
972 skstream_t *stream)
973 {
974 int rv = SKSTREAM_OK;
975
976 assert(stream);
977 assert(stream->fd != -1);
978
979 if (!stream->is_mpi) {
980 if (isatty(stream->fd)) {
981 stream->is_terminal = 1;
982 } else if (lseek(stream->fd, 0, SEEK_CUR) != (off_t)-1) {
983 stream->is_seekable = 1;
984 }
985
986 /* handle compressed files */
987 if (streamPathnameIsCompressed(stream->pathname, stream->io_mode)) {
988 rv = streamOpenGzip(stream);
989 if (rv) { goto END; }
990 }
991 }
992
993 /* for a non-silk binary file, create the IOBuf now. If the
994 * stream was open for appending, seek to the end of the file
995 * before creating the IOBuf */
996 if (stream->content_type == SK_CONTENT_OTHERBINARY) {
997 if (stream->io_mode == SK_IO_APPEND) {
998 if (-1 == lseek(stream->fd, 0, SEEK_END)) {
999 stream->errnum = errno;
1000 rv = SKSTREAM_ERR_SYS_LSEEK;
1001 goto END;
1002 }
1003 }
1004
1005 rv = streamIOBufCreate(stream);
1006 if (rv) { goto END; }
1007 }
1008
1009 /* for a text file we are reading, create the IOBuf now */
1010 if (stream->content_type == SK_CONTENT_TEXT
1011 && stream->io_mode == SK_IO_READ)
1012 {
1013 rv = streamIOBufCreate(stream);
1014 if (rv) { goto END; }
1015 }
1016
1017 END:
1018 return rv;
1019 }
1020
1021
1022 /*
1023 * Invoke the SiLK Flow file format-specific function that sets the
1024 * rwUnpackFn() and rwPackFn() function pointers on 'stream'.
1025 */
1026 static int
streamPrepareFormat(skstream_t * stream)1027 streamPrepareFormat(
1028 skstream_t *stream)
1029 {
1030 assert(stream);
1031 assert(stream->is_silk);
1032 assert(stream->silk_hdr);
1033
1034 switch (skHeaderGetFileFormat(stream->silk_hdr)) {
1035 case FT_RWAUGMENTED:
1036 return augmentedioPrepare(stream);
1037
1038 case FT_RWAUGROUTING:
1039 return augroutingioPrepare(stream);
1040
1041 case FT_RWAUGWEB:
1042 return augwebioPrepare(stream);
1043
1044 case FT_RWAUGSNMPOUT:
1045 return augsnmpoutioPrepare(stream);
1046
1047 case FT_RWFILTER:
1048 return filterioPrepare(stream);
1049
1050 case FT_FLOWCAP:
1051 return flowcapioPrepare(stream);
1052
1053 case FT_RWGENERIC:
1054 return genericioPrepare(stream);
1055
1056 case FT_RWIPV6:
1057 stream->supports_ipv6 = 1;
1058 return ipv6ioPrepare(stream);
1059
1060 case FT_RWIPV6ROUTING:
1061 stream->supports_ipv6 = 1;
1062 return ipv6routingioPrepare(stream);
1063
1064 case FT_RWNOTROUTED:
1065 return notroutedioPrepare(stream);
1066
1067 case FT_RWROUTED:
1068 return routedioPrepare(stream);
1069
1070 case FT_RWSPLIT:
1071 return splitioPrepare(stream);
1072
1073 case FT_RWWWW:
1074 return wwwioPrepare(stream);
1075
1076 default:
1077 break;
1078 }
1079
1080 return SKSTREAM_ERR_UNSUPPORT_FORMAT;
1081 }
1082
1083
1084 /*
1085 * Prepare 'stream' for writing textual output. The function uses
1086 * fdopen() to get a file pointer for the file descriptor when
1087 * 'stream' is open for write or append. If a pager is defined for
1088 * 'stream', the pager is invoked.
1089 *
1090 * For processing textual input, skstream uses an IO Buf.
1091 */
1092 static int
streamPrepareText(skstream_t * stream)1093 streamPrepareText(
1094 skstream_t *stream)
1095 {
1096 int rv;
1097
1098 rv = streamCheckOpen(stream);
1099 if (rv) { goto END; }
1100
1101 assert(!stream->is_binary);
1102
1103 if (stream->fp == NULL) {
1104 const char *mode = NULL;
1105 switch (stream->io_mode) {
1106 case SK_IO_READ:
1107 break;
1108
1109 case SK_IO_WRITE:
1110 if (stream->pager) {
1111 rv = streamInvokePager(stream);
1112 if (rv) { goto END; }
1113 }
1114 if (stream->fp == NULL) {
1115 mode = "w";
1116 }
1117 break;
1118
1119 case SK_IO_APPEND:
1120 mode = "r+";
1121 break;
1122 }
1123 if (mode) {
1124 stream->fp = fdopen(stream->fd, mode);
1125 if (stream->fp == NULL) {
1126 stream->errnum = errno;
1127 rv = SKSTREAM_ERR_SYS_FDOPEN;
1128 goto END;
1129 }
1130 }
1131 }
1132
1133 stream->is_dirty = 1;
1134
1135 END:
1136 return rv;
1137 }
1138
1139
1140 /*
1141 * bytes = streamReadNullBuffer(stream, count);
1142 *
1143 * Read 'count' bytes from 'stream', ignoring the data. Return the
1144 * number of bytes read, or -1 for an error.
1145 */
1146 static ssize_t
streamReadNullBuffer(skstream_t * stream,const size_t count)1147 streamReadNullBuffer(
1148 skstream_t *stream,
1149 const size_t count)
1150 {
1151 uint8_t buf[65536];
1152 size_t left = count;
1153 size_t wanted;
1154 ssize_t saw;
1155
1156 assert(stream);
1157 assert(stream->io_mode == SK_IO_READ || stream->io_mode == SK_IO_APPEND);
1158 assert(stream->fd != -1);
1159 assert(stream->iobuf == NULL);
1160
1161 #if SK_ENABLE_ZLIB
1162 if (stream->gz != NULL) {
1163 while (left) {
1164 /* don't read more than will fit into our buffer */
1165 wanted = ((left < sizeof(buf)) ? left : sizeof(buf));
1166
1167 saw = streamGZRead(stream, buf, wanted);
1168 if (saw == -1) {
1169 stream->is_iobuf_error = 0;
1170 return saw;
1171 }
1172 if (saw == 0) {
1173 /* no more to read */
1174 break;
1175 }
1176
1177 left -= saw;
1178 }
1179
1180 return (count - left);
1181 }
1182 #endif /* SK_ENABLE_ZLIB */
1183
1184 while (left) {
1185 /* don't read more than will fit into our buffer */
1186 wanted = ((left < sizeof(buf)) ? left : sizeof(buf));
1187
1188 saw = skreadn(stream->fd, buf, wanted);
1189 if (saw == -1) {
1190 stream->errnum = errno;
1191 stream->err_info = SKSTREAM_ERR_READ;
1192 return saw;
1193 }
1194 if (saw == 0) {
1195 /* no more to read */
1196 break;
1197 }
1198 left -= saw;
1199 }
1200
1201 return (count - left);
1202 }
1203
1204
1205 /*
1206 * Read 'skip_count' records from 'stream'. If 'records_skipped'
1207 * is not NULL, fill the location it references with the number of
1208 * records actually skipped.
1209 *
1210 * This function is only invoked when an IO Buf is NOT associated
1211 * with 'stream' and when the 'copyInputFD' member of 'stream' is
1212 * NULL.
1213 */
1214 static int
streamSkipRecordsNonIOBuf(skstream_t * stream,size_t skip_count,size_t * records_skipped)1215 streamSkipRecordsNonIOBuf(
1216 skstream_t *stream,
1217 size_t skip_count,
1218 size_t *records_skipped)
1219 {
1220 #define SKIP_RECORD_COUNT 1024
1221 uint8_t ar[SKIP_RECORD_COUNT * SK_MAX_RECORD_SIZE];
1222 ssize_t saw;
1223 ssize_t tmp;
1224
1225 assert(stream);
1226 assert(records_skipped);
1227
1228 if (stream->is_eof) {
1229 return SKSTREAM_ERR_EOF;
1230 }
1231
1232 while (skip_count > 0) {
1233 /* can only read the number of records our buffer allows */
1234 if (skip_count > SKIP_RECORD_COUNT) {
1235 tmp = stream->recLen * SKIP_RECORD_COUNT;
1236 } else {
1237 tmp = stream->recLen * skip_count;
1238 }
1239
1240 /* read the bytes and check for error or short reads */
1241 saw = skStreamRead(stream, ar, tmp);
1242 if (saw != tmp) {
1243 /* Either error or an incomplete read--assume end of file */
1244 stream->is_eof = 1;
1245 if (saw == -1) {
1246 /* error */
1247 return -1;
1248 }
1249 }
1250
1251 /* compute the number of records we actually read, update
1252 * counters, and check for any partially read records. */
1253 tmp = (saw / stream->recLen);
1254 stream->rec_count += tmp;
1255 skip_count -= tmp;
1256 saw -= tmp * stream->recLen;
1257 *records_skipped += tmp;
1258
1259 if (saw != 0) {
1260 stream->errobj.num = saw;
1261 return SKSTREAM_ERR_READ_SHORT;
1262 }
1263 if (stream->is_eof) {
1264 return SKSTREAM_ERR_EOF;
1265 }
1266 }
1267
1268 return SKSTREAM_OK;
1269 }
1270
1271
1272 /*
1273 * *********************************
1274 * PUBLIC / EXPORTED FUNCTIONS
1275 * *********************************
1276 */
1277
1278 /*
1279 * status = skStreamBind(stream, path);
1280 *
1281 * Set 'stream' to operate on the file specified in 'path'; 'path'
1282 * may also be one of "stdin", "stdout", or "stderr". Returns
1283 * SKSTREAM_OK on success, or an error code on failure.
1284 */
1285 int
skStreamBind(skstream_t * stream,const char * pathname)1286 skStreamBind(
1287 skstream_t *stream,
1288 const char *pathname)
1289 {
1290 int rv = SKSTREAM_OK;
1291 FILE *s = NULL;
1292
1293 /* check name */
1294 if (NULL == stream || NULL == pathname) {
1295 rv = SKSTREAM_ERR_NULL_ARGUMENT;
1296 goto END;
1297 }
1298 if ('\0' == *pathname || strlen(pathname) >= PATH_MAX) {
1299 rv = SKSTREAM_ERR_INVALID_INPUT;
1300 goto END;
1301 }
1302 if (stream->pathname) {
1303 rv = SKSTREAM_ERR_PREV_BOUND;
1304 goto END;
1305 }
1306
1307 /* copy it into place */
1308 stream->pathname = strdup(pathname);
1309 if (stream->pathname == NULL) {
1310 rv = SKSTREAM_ERR_ALLOC;
1311 goto END;
1312 }
1313
1314 if (0 == strcmp(pathname, "stdin")) {
1315 switch (stream->io_mode) {
1316 case SK_IO_READ:
1317 if (!stream->is_mpi && stream->is_binary && FILEIsATty(stdin)) {
1318 rv = SKSTREAM_ERR_ISTERMINAL;
1319 goto END;
1320 }
1321 break;
1322 case SK_IO_WRITE:
1323 case SK_IO_APPEND:
1324 /* cannot write or append to stdin */
1325 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1326 goto END;
1327 }
1328 } else if (0 == strcmp(pathname, "stdout")) {
1329 s = stdout;
1330 } else if (0 == strcmp(pathname, "stderr")) {
1331 s = stderr;
1332 } else if (0 == strcmp(pathname, "-")) {
1333 switch (stream->io_mode) {
1334 case SK_IO_READ:
1335 if (!stream->is_mpi && stream->is_binary && FILEIsATty(stdin)) {
1336 rv = SKSTREAM_ERR_ISTERMINAL;
1337 goto END;
1338 }
1339 break;
1340 case SK_IO_WRITE:
1341 s = stdout;
1342 break;
1343 case SK_IO_APPEND:
1344 /* cannot append to stdout */
1345 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1346 goto END;
1347 }
1348 }
1349
1350 if (s) {
1351 switch (stream->io_mode) {
1352 case SK_IO_READ:
1353 case SK_IO_APPEND:
1354 /* cannot read or append to stdout/stderr */
1355 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1356 goto END;
1357 case SK_IO_WRITE:
1358 if (!stream->is_mpi && stream->is_binary && FILEIsATty(s)) {
1359 rv = SKSTREAM_ERR_ISTERMINAL;
1360 goto END;
1361 }
1362 break;
1363 }
1364 }
1365
1366 /* cannot append to FIFOs or to gzipped files */
1367 if (stream->io_mode == SK_IO_APPEND) {
1368 if (streamPathnameIsCompressed(stream->pathname, stream->io_mode)) {
1369 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1370 goto END;
1371 }
1372 if (isFIFO(pathname)) {
1373 /* Cannot append to a FIFO */
1374 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1375 goto END;
1376 }
1377 }
1378
1379 END:
1380 return (stream->last_rv = rv);
1381 }
1382
1383
1384 int
skStreamCheckCompmethod(skstream_t * stream,sk_msg_fn_t errfn)1385 skStreamCheckCompmethod(
1386 skstream_t *stream,
1387 sk_msg_fn_t errfn)
1388 {
1389 #ifdef TEST_PRINTF_FORMATS
1390 # define P_ERR printf
1391 #else
1392 # define P_ERR if (!errfn) { } else errfn
1393 #endif
1394 sk_compmethod_t compmethod;
1395
1396 compmethod = skHeaderGetCompressionMethod(stream->silk_hdr);
1397 switch (skCompMethodCheck(compmethod)) {
1398 case SK_COMPMETHOD_IS_AVAIL:
1399 /* known, valid, and available */
1400 return (stream->last_rv = SKSTREAM_OK);
1401 case SK_COMPMETHOD_IS_VALID:
1402 /* known and valid but not available */
1403 if (errfn) {
1404 char name[64];
1405 skCompMethodGetName(name, sizeof(name), compmethod);
1406 P_ERR("The %s compression method used by '%s' is not available",
1407 name, stream->pathname);
1408 }
1409 return (stream->last_rv = SKSTREAM_ERR_COMPRESS_UNAVAILABLE);
1410 case SK_COMPMETHOD_IS_KNOWN:
1411 /* this is an undecided value, only valid for write */
1412 if (SK_IO_WRITE == stream->io_mode) {
1413 return (stream->last_rv = SKSTREAM_OK);
1414 }
1415 /* FALLTHROUGH */
1416 default:
1417 if (errfn) {
1418 P_ERR("File '%s' is compressed with an unrecognized method %d",
1419 stream->pathname, compmethod);
1420 }
1421 return (stream->last_rv = SKSTREAM_ERR_COMPRESS_INVALID);
1422 }
1423 #undef P_ERR
1424 }
1425
1426
1427 int
skStreamCheckSilkHeader(skstream_t * stream,sk_file_format_t file_format,sk_file_version_t min_version,sk_file_version_t max_version,sk_msg_fn_t errfn)1428 skStreamCheckSilkHeader(
1429 skstream_t *stream,
1430 sk_file_format_t file_format,
1431 sk_file_version_t min_version,
1432 sk_file_version_t max_version,
1433 sk_msg_fn_t errfn)
1434 {
1435 #ifdef TEST_PRINTF_FORMATS
1436 # define P_ERR printf
1437 #else
1438 # define P_ERR if (!errfn) { } else errfn
1439 #endif
1440 sk_file_header_t *hdr = stream->silk_hdr;
1441 sk_file_format_t fmt = skHeaderGetFileFormat(hdr);
1442 sk_file_version_t vers = skHeaderGetRecordVersion(hdr);
1443 char fmt_name[SK_MAX_STRLEN_FILE_FORMAT+1];
1444
1445 /* get the name of the requested format */
1446 skFileFormatGetName(fmt_name, sizeof(fmt_name), file_format);
1447
1448 if (fmt != file_format) {
1449 P_ERR("File '%s' is not a %s file; format is 0x%02x",
1450 stream->pathname, fmt_name, fmt);
1451 return (stream->last_rv = SKSTREAM_ERR_UNSUPPORT_FORMAT);
1452 }
1453
1454 if ((vers < min_version) || (vers > max_version)) {
1455 P_ERR("This version of SiLK cannot process the %s v%u file %s",
1456 fmt_name, vers, stream->pathname);
1457 return (stream->last_rv = SKSTREAM_ERR_UNSUPPORT_VERSION);
1458 }
1459
1460 return (stream->last_rv = skStreamCheckCompmethod(stream, errfn));
1461 #undef errfn
1462 }
1463
1464
1465 int
skStreamClose(skstream_t * stream)1466 skStreamClose(
1467 skstream_t *stream)
1468 {
1469 int rv;
1470
1471 STREAM_RETURN_IF_NULL(stream);
1472
1473 rv = streamCheckOpen(stream);
1474 if (rv) { goto END; }
1475
1476 if (stream->fp) {
1477 if (stream->is_pager_active) {
1478 if (pclose(stream->fp) == -1) {
1479 stream->errnum = errno;
1480 if (rv == SKSTREAM_OK) {
1481 rv = SKSTREAM_ERR_WRITE;
1482 }
1483 }
1484 } else {
1485 if (EOF == fclose(stream->fp)) {
1486 stream->errnum = errno;
1487 if (rv == SKSTREAM_OK) {
1488 rv = SKSTREAM_ERR_WRITE;
1489 }
1490 }
1491 }
1492 } else if (stream->fd != -1) {
1493 if (stream->iobuf && stream->io_mode != SK_IO_READ) {
1494 if (skIOBufFlush(stream->iobuf) == -1) {
1495 if (stream->is_iobuf_error) {
1496 stream->is_iobuf_error = 0;
1497 rv = stream->err_info;
1498 } else {
1499 rv = SKSTREAM_ERR_IOBUF;
1500 }
1501 }
1502 }
1503 #if SK_ENABLE_ZLIB
1504 if (stream->gz) {
1505 /* Close the gzFile */
1506 int zerr = gzclose(stream->gz);
1507 stream->gz = NULL;
1508 if (zerr != Z_OK) {
1509 if (zerr == Z_ERRNO) {
1510 stream->errnum = errno;
1511 rv = SKSTREAM_ERR_WRITE;
1512 } else {
1513 stream->errnum = zerr;
1514 rv = SKSTREAM_ERR_ZLIB;
1515 }
1516 }
1517 /* gzclose() closes the file */
1518 stream->fd = -1;
1519 } else
1520 #endif /* SK_ENABLE_ZLIB */
1521 {
1522 if (stream->is_stdio == 0) {
1523 if (close(stream->fd) == -1) {
1524 stream->errnum = errno;
1525 rv = SKSTREAM_ERR_WRITE;
1526 }
1527 }
1528 }
1529 }
1530
1531 stream->fd = -1;
1532 stream->fp = NULL;
1533 stream->is_closed = 1;
1534
1535 END:
1536 return (stream->last_rv = rv);
1537 }
1538
1539
1540 /*
1541 * status = skStreamCreate(&out_stream, io_mode, content_type);
1542 *
1543 * Create a stream (skstream_t*) and fill 'out_stream' with the
1544 * address of the newly allocated stream. In addition, bind the
1545 * stream to the given 'path', with IO in the specified 'io_mode'.
1546 * Return SKSTREAM_OK on success, or an error code on failure.
1547 */
1548 int
skStreamCreate(skstream_t ** new_stream,skstream_mode_t read_write_append,skcontent_t content_type)1549 skStreamCreate(
1550 skstream_t **new_stream,
1551 skstream_mode_t read_write_append,
1552 skcontent_t content_type)
1553 {
1554 if (new_stream == NULL) {
1555 return SKSTREAM_ERR_NULL_ARGUMENT;
1556 }
1557
1558 *new_stream = (skstream_t*)calloc(1, sizeof(skstream_t));
1559 if (NULL == *new_stream) {
1560 return SKSTREAM_ERR_ALLOC;
1561 }
1562
1563 if (skHeaderCreate(&((*new_stream)->silk_hdr))) {
1564 free(*new_stream);
1565 *new_stream = NULL;
1566 return SKSTREAM_ERR_ALLOC;
1567 }
1568
1569 (*new_stream)->io_mode = read_write_append;
1570 (*new_stream)->content_type = content_type;
1571 (*new_stream)->fd = -1;
1572
1573 /* Native format by default, so don't swap */
1574 (*new_stream)->swapFlag = 0;
1575
1576 /* Set sensor and flowtype to invalid values */
1577 (*new_stream)->hdr_sensor = SK_INVALID_SENSOR;
1578 (*new_stream)->hdr_flowtype = SK_INVALID_FLOWTYPE;
1579
1580 switch (content_type) {
1581 case SK_CONTENT_TEXT:
1582 break;
1583
1584 case SK_CONTENT_SILK_FLOW:
1585 (*new_stream)->is_silk_flow = 1;
1586 /* FALLTHROUGH */
1587
1588 case SK_CONTENT_SILK:
1589 (*new_stream)->is_silk = 1;
1590 /* FALLTHROUGH */
1591
1592 case SK_CONTENT_OTHERBINARY:
1593 (*new_stream)->is_binary = 1;
1594 break;
1595 }
1596
1597 return ((*new_stream)->last_rv = SKSTREAM_OK);
1598 }
1599
1600
1601 int
skStreamDestroy(skstream_t ** stream)1602 skStreamDestroy(
1603 skstream_t **stream)
1604 {
1605 int rv;
1606
1607 if ((NULL == stream) || (NULL == *stream)) {
1608 return SKSTREAM_OK;
1609 }
1610
1611 rv = skStreamUnbind(*stream);
1612
1613 /* Destroy the iobuf */
1614 if ((*stream)->iobuf) {
1615 skIOBufDestroy((*stream)->iobuf);
1616 (*stream)->iobuf = NULL;
1617 }
1618
1619 /* Destroy the header */
1620 skHeaderDestroy(&((*stream)->silk_hdr));
1621
1622 /* Free the pathname */
1623 if ((*stream)->pathname) {
1624 free((*stream)->pathname);
1625 (*stream)->pathname = NULL;
1626 }
1627
1628 free(*stream);
1629 *stream = NULL;
1630
1631 return rv;
1632 }
1633
1634
1635 int
skStreamFDOpen(skstream_t * stream,int file_desc)1636 skStreamFDOpen(
1637 skstream_t *stream,
1638 int file_desc)
1639 {
1640 int rv;
1641
1642 STREAM_RETURN_IF_NULL(stream);
1643
1644 rv = streamCheckUnopened(stream);
1645 if (rv) { goto END; }
1646
1647 if (stream->pathname == NULL) {
1648 rv = SKSTREAM_ERR_NOT_BOUND;
1649 goto END;
1650 }
1651
1652 if (file_desc == -1) {
1653 rv = SKSTREAM_ERR_INVALID_INPUT;
1654 goto END;
1655 }
1656
1657 /* Check file modes */
1658 rv = fcntl(file_desc, F_GETFL, 0);
1659 if (rv == -1) {
1660 stream->errnum = errno;
1661 rv = SKSTREAM_ERR_SYS_FCNTL_GETFL;
1662 goto END;
1663 }
1664 switch (stream->io_mode) {
1665 case SK_IO_READ:
1666 if ((rv & O_ACCMODE) == O_WRONLY) {
1667 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1668 goto END;
1669 }
1670 break;
1671 case SK_IO_WRITE:
1672 if (((rv & O_ACCMODE) == O_RDONLY) || (rv & O_APPEND)) {
1673 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1674 goto END;
1675 }
1676 break;
1677 case SK_IO_APPEND:
1678 if (((rv & O_ACCMODE) != O_RDWR) || !(rv & O_APPEND)) {
1679 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1680 goto END;
1681 }
1682 break;
1683 }
1684
1685 /* Check tty status if binary */
1686 if (stream->is_binary && isatty(file_desc)) {
1687 rv = SKSTREAM_ERR_ISTERMINAL;
1688 goto END;
1689 }
1690
1691 /* Seek to beginning on append for the header. Check this after
1692 * the tty status check, because that is a more useful error
1693 * message. */
1694 if ((stream->io_mode == SK_IO_APPEND)
1695 && (-1 == lseek(file_desc, 0, SEEK_SET)))
1696 {
1697 stream->errnum = errno;
1698 rv = SKSTREAM_ERR_SYS_LSEEK;
1699 goto END;
1700 }
1701
1702 stream->fd = file_desc;
1703
1704 rv = streamPostOpen(stream);
1705 if (rv) { goto END; }
1706
1707 END:
1708 return (stream->last_rv = rv);
1709 }
1710
1711
1712 int
skStreamFlush(skstream_t * stream)1713 skStreamFlush(
1714 skstream_t *stream)
1715 {
1716 int rv;
1717
1718 STREAM_RETURN_IF_NULL(stream);
1719
1720 rv = streamCheckOpen(stream);
1721 if (rv) { goto END; }
1722
1723 if (stream->io_mode == SK_IO_READ) {
1724 /* nothing to do for a reader */
1725 goto END;
1726 }
1727
1728 if (stream->fp) {
1729 if (EOF == fflush(stream->fp)) {
1730 stream->errnum = errno;
1731 rv = SKSTREAM_ERR_WRITE;
1732 }
1733 } else if (stream->iobuf) {
1734 if (skIOBufFlush(stream->iobuf) == -1) {
1735 if (stream->is_iobuf_error) {
1736 stream->is_iobuf_error = 0;
1737 rv = stream->err_info;
1738 } else {
1739 rv = SKSTREAM_ERR_IOBUF;
1740 }
1741 goto END;
1742 }
1743 #if SK_ENABLE_ZLIB
1744 } else if (stream->gz) {
1745 if (streamGZFlush(stream) == -1) {
1746 stream->is_iobuf_error = 0;
1747 rv = stream->err_info;
1748 }
1749 #endif /* SK_ENABLE_ZLIB */
1750 }
1751
1752 END:
1753 return (stream->last_rv = rv);
1754 }
1755
1756
1757 /* return the content type */
1758 skcontent_t
skStreamGetContentType(const skstream_t * stream)1759 skStreamGetContentType(
1760 const skstream_t *stream)
1761 {
1762 assert(stream);
1763 return stream->content_type;
1764 }
1765
1766
1767 /* return the file descriptor */
1768 int
skStreamGetDescriptor(const skstream_t * stream)1769 skStreamGetDescriptor(
1770 const skstream_t *stream)
1771 {
1772 assert(stream);
1773 return stream->fd;
1774 }
1775
1776
1777 /* return the cached errno value */
1778 int
skStreamGetLastErrno(const skstream_t * stream)1779 skStreamGetLastErrno(
1780 const skstream_t *stream)
1781 {
1782 assert(stream);
1783 return stream->errnum;
1784 }
1785
1786
1787 /* return the cached return value */
1788 ssize_t
skStreamGetLastReturnValue(const skstream_t * stream)1789 skStreamGetLastReturnValue(
1790 const skstream_t *stream)
1791 {
1792 assert(stream);
1793 return stream->last_rv;
1794 }
1795
1796
1797 /* fill 'value' with the limit implied by the error code */
1798 int
skStreamGetLimit(const skstream_t * stream,int limit_id,int64_t * value)1799 skStreamGetLimit(
1800 const skstream_t *stream,
1801 int limit_id,
1802 int64_t *value)
1803 {
1804 sk_file_format_t file_format;
1805 sk_file_version_t rec_version;
1806 int rv = SKSTREAM_OK;
1807
1808 assert(stream);
1809
1810 if (!stream->is_silk_flow || !stream->silk_hdr) {
1811 rv = SKSTREAM_ERR_REQUIRE_SILK_FLOW;
1812 goto END;
1813 }
1814
1815 file_format = skHeaderGetFileFormat(stream->silk_hdr);
1816 rec_version = skHeaderGetRecordVersion(stream->silk_hdr);
1817 if (UINT8_MAX == file_format) {
1818 file_format = DEFAULT_FILE_FORMAT;
1819 }
1820
1821 switch (limit_id) {
1822 case SKSTREAM_ERR_PKTS_ZERO:
1823 /* The record contains a 0 value in the packets field. */
1824 *value = 1;
1825 break;
1826
1827 case SKSTREAM_ERR_STIME_UNDRFLO:
1828 /* The record's start time is less than the file's start
1829 * time */
1830 switch (file_format) {
1831 case FT_RWAUGMENTED:
1832 case FT_RWAUGROUTING:
1833 case FT_RWAUGSNMPOUT:
1834 case FT_RWAUGWEB:
1835 case FT_RWNOTROUTED:
1836 case FT_RWROUTED:
1837 case FT_RWSPLIT:
1838 case FT_RWWWW:
1839 *value = stream->hdr_starttime;
1840 break;
1841 default:
1842 *value = 0;
1843 break;
1844 }
1845 break;
1846
1847 case SKSTREAM_ERR_STIME_OVRFLO:
1848 /* The record's start time at least an hour greater than the
1849 * file's start time */
1850 *value = (int64_t)sktimeCreate(UINT32_MAX, 0);
1851 switch (file_format) {
1852 case FT_RWAUGMENTED:
1853 case FT_RWAUGROUTING:
1854 case FT_RWAUGSNMPOUT:
1855 case FT_RWAUGWEB:
1856 case FT_RWNOTROUTED:
1857 case FT_RWROUTED:
1858 case FT_RWSPLIT:
1859 case FT_RWWWW:
1860 *value = (stream->hdr_starttime
1861 + sktimeCreate((MAX_START_TIME - 1), 0));
1862 break;
1863 case FT_RWGENERIC:
1864 switch (rec_version) {
1865 case 5:
1866 *value = INT64_MAX;
1867 break;
1868 }
1869 break;
1870 case FT_RWIPV6:
1871 switch (rec_version) {
1872 case 2:
1873 *value = (stream->hdr_starttime
1874 + sktimeCreate((MAX_START_TIME - 1), 0));
1875 break;
1876 default:
1877 *value = INT64_MAX;
1878 break;
1879 }
1880 break;
1881 case FT_RWIPV6ROUTING:
1882 *value = INT64_MAX;
1883 break;
1884 }
1885 break;
1886
1887 case SKSTREAM_ERR_ELPSD_OVRFLO:
1888 /* The record's elapsed time is greater than space allocated
1889 * for duration in this file format */
1890 switch (file_format) {
1891 case FT_RWAUGMENTED:
1892 case FT_RWAUGROUTING:
1893 case FT_RWAUGSNMPOUT:
1894 if (rec_version <= 4) {
1895 *value = (MAX_ELAPSED_TIME - 1) * 1000;
1896 } else {
1897 *value = (int64_t)UINT32_MAX;
1898 }
1899 break;
1900 case FT_RWAUGWEB:
1901 if (rec_version <= 4) {
1902 *value = (MAX_ELAPSED_TIME - 1) * 1000;
1903 } else {
1904 *value = (int64_t)MASKARRAY_30;
1905 }
1906 break;
1907 case FT_FLOWCAP:
1908 *value = UINT16_MAX * 1000;
1909 break;
1910 case FT_RWFILTER:
1911 case FT_RWNOTROUTED:
1912 case FT_RWROUTED:
1913 case FT_RWSPLIT:
1914 case FT_RWWWW:
1915 switch (rec_version) {
1916 case 1:
1917 case 2:
1918 *value = (MAX_ELAPSED_TIME_OLD - 1) * 1000;
1919 break;
1920 default:
1921 *value = (MAX_ELAPSED_TIME - 1) * 1000;
1922 break;
1923 }
1924 break;
1925 default:
1926 *value = (int64_t)UINT32_MAX;
1927 break;
1928 }
1929 break;
1930
1931 case SKSTREAM_ERR_PKTS_OVRFLO:
1932 /* The record contains more than the number of packets allowed
1933 * in this file format */
1934 switch (file_format) {
1935 case FT_RWAUGMENTED:
1936 case FT_RWAUGROUTING:
1937 case FT_RWAUGSNMPOUT:
1938 case FT_RWAUGWEB:
1939 switch (rec_version) {
1940 case 5:
1941 *value = (int64_t)UINT32_MAX;
1942 break;
1943 default:
1944 *value = MAX_PKTS * PKTS_DIVISOR - 1;
1945 break;
1946 }
1947 break;
1948 case FT_RWFILTER:
1949 case FT_RWNOTROUTED:
1950 case FT_RWROUTED:
1951 case FT_RWSPLIT:
1952 case FT_RWWWW:
1953 *value = MAX_PKTS * PKTS_DIVISOR - 1;
1954 break;
1955 case FT_FLOWCAP:
1956 *value = MASKARRAY_24;
1957 break;
1958 case FT_RWGENERIC:
1959 case FT_RWIPV6:
1960 case FT_RWIPV6ROUTING:
1961 *value = (int64_t)UINT32_MAX;
1962 break;
1963 }
1964 break;
1965
1966 case SKSTREAM_ERR_BPP_OVRFLO:
1967 /* The byte-per-packet value is too large to fit into the
1968 * space provided by this file format. */
1969 switch (file_format) {
1970 case FT_RWAUGMENTED:
1971 case FT_RWAUGROUTING:
1972 case FT_RWAUGSNMPOUT:
1973 case FT_RWAUGWEB:
1974 switch (rec_version) {
1975 case 5:
1976 *value = (int64_t)UINT32_MAX;
1977 break;
1978 default:
1979 *value = MASKARRAY_14;
1980 break;
1981 }
1982 break;
1983 case FT_RWFILTER:
1984 case FT_RWNOTROUTED:
1985 case FT_RWROUTED:
1986 case FT_RWSPLIT:
1987 case FT_RWWWW:
1988 *value = MASKARRAY_14;
1989 break;
1990 case FT_FLOWCAP:
1991 case FT_RWGENERIC:
1992 case FT_RWIPV6:
1993 case FT_RWIPV6ROUTING:
1994 *value = (int64_t)UINT32_MAX;
1995 break;
1996 }
1997 break;
1998
1999 case SKSTREAM_ERR_SNMP_OVRFLO:
2000 /* The records contains an SNMP value too large to fit into
2001 * the space allocated in this file format. */
2002 *value = 0;
2003 switch (file_format) {
2004 case FT_RWAUGROUTING:
2005 case FT_RWAUGSNMPOUT:
2006 case FT_RWIPV6ROUTING:
2007 *value = UINT16_MAX;
2008 break;
2009 case FT_RWFILTER:
2010 case FT_RWNOTROUTED:
2011 case FT_RWROUTED:
2012 switch (rec_version) {
2013 case 1:
2014 case 2:
2015 *value = UINT8_MAX;
2016 break;
2017 default:
2018 *value = UINT16_MAX;
2019 break;
2020 }
2021 break;
2022 case FT_RWGENERIC:
2023 switch (rec_version) {
2024 case 0:
2025 case 1:
2026 *value = UINT8_MAX;
2027 break;
2028 default:
2029 *value = UINT16_MAX;
2030 break;
2031 }
2032 break;
2033 case FT_FLOWCAP:
2034 switch (rec_version) {
2035 case 2:
2036 case 3:
2037 case 4:
2038 *value = UINT8_MAX;
2039 break;
2040 default:
2041 *value = UINT16_MAX;
2042 break;
2043 }
2044 break;
2045 }
2046 break;
2047
2048 case SKSTREAM_ERR_SENSORID_OVRFLO:
2049 /* The records contains a SensorID too large to fit into the
2050 * space allocated in this file format. */
2051 *value = UINT16_MAX;
2052 switch (file_format) {
2053 case FT_RWFILTER:
2054 switch (rec_version) {
2055 case 1:
2056 *value = MASKARRAY_06;
2057 break;
2058 case 2:
2059 *value = UINT8_MAX;
2060 break;
2061 }
2062 break;
2063 case FT_RWGENERIC:
2064 switch (rec_version) {
2065 case 0:
2066 case 1:
2067 *value = UINT8_MAX;
2068 break;
2069 }
2070 break;
2071 }
2072 break;
2073
2074 default:
2075 /* unknown limit */
2076 rv = SKSTREAM_ERR_INVALID_INPUT;
2077 break;
2078 }
2079
2080 END:
2081 return rv;
2082 }
2083
2084
2085 /* Get the next line from a text file */
2086 int
skStreamGetLine(skstream_t * stream,char * out_buffer,size_t buf_size,int * lines_read)2087 skStreamGetLine(
2088 skstream_t *stream,
2089 char *out_buffer,
2090 size_t buf_size,
2091 int *lines_read)
2092 {
2093 size_t len;
2094 int rv = SKSTREAM_OK;
2095
2096 assert(stream);
2097
2098 if ( !stream->is_dirty) {
2099 rv = streamCheckOpen(stream);
2100 if (rv) { goto END; }
2101
2102 rv = streamCheckAttributes(stream, SK_IO_READ, SK_CONTENT_TEXT);
2103 if (rv) { goto END; }
2104
2105 rv = streamPrepareText(stream);
2106 if (rv) { goto END; }
2107 }
2108 #ifndef NDEBUG
2109 else {
2110 assert(!stream->is_binary);
2111 assert(stream->content_type == SK_CONTENT_TEXT);
2112 assert(stream->io_mode == SK_IO_READ);
2113 assert(stream->fd != -1);
2114 }
2115 #endif
2116
2117 assert(out_buffer && buf_size);
2118 out_buffer[0] = '\0';
2119
2120 /* read from the stream until we get a good line */
2121 for (;;) {
2122 rv = streamIOBufGetLine(stream, out_buffer, buf_size);
2123 if (rv != SKSTREAM_OK) {
2124 if ((rv == SKSTREAM_ERR_LONG_LINE) && lines_read) {
2125 ++*lines_read;
2126 }
2127 break;
2128 }
2129 if (lines_read) {
2130 ++*lines_read;
2131 }
2132
2133 /* Terminate line at first comment char */
2134 if (stream->comment_start) {
2135 char *cp = strstr(out_buffer, stream->comment_start);
2136 if (cp) {
2137 *cp = '\0';
2138 }
2139 }
2140
2141 /* find first non-space character in the line */
2142 len = strspn(out_buffer, " \t\v\f\r\n");
2143 if (out_buffer[len] == '\0') {
2144 /* line contained whitespace only; ignore */
2145 continue;
2146 }
2147
2148 /* got a line, break out of loop */
2149 break;
2150 }
2151
2152 END:
2153 return (stream->last_rv = rv);
2154 }
2155
2156
2157 /* return the read/write/append mode */
2158 skstream_mode_t
skStreamGetMode(const skstream_t * stream)2159 skStreamGetMode(
2160 const skstream_t *stream)
2161 {
2162 assert(stream);
2163 return stream->io_mode;
2164 }
2165
2166
2167 /* return the name of pager program */
2168 const char *
skStreamGetPager(const skstream_t * stream)2169 skStreamGetPager(
2170 const skstream_t *stream)
2171 {
2172 if (stream->is_closed) {
2173 return NULL;
2174 } else if (stream->is_pager_active) {
2175 /* stream is open and pager is in use */
2176 return stream->pager;
2177 } else if (stream->fd == -1) {
2178 /* unopened, return pager we *may* use */
2179 return stream->pager;
2180 } else {
2181 /* stream is open and not using pager */
2182 return NULL;
2183 }
2184 }
2185
2186
2187 /* return the name of file associated with the stream */
2188 const char *
skStreamGetPathname(const skstream_t * stream)2189 skStreamGetPathname(
2190 const skstream_t *stream)
2191 {
2192 assert(stream);
2193 return stream->pathname;
2194 }
2195
2196
2197 /* return number of SiLK flow records processed */
2198 uint64_t
skStreamGetRecordCount(const skstream_t * stream)2199 skStreamGetRecordCount(
2200 const skstream_t *stream)
2201 {
2202 assert(stream);
2203 if (!stream->is_silk_flow) {
2204 return ((uint64_t)(-1));
2205 }
2206 return stream->rec_count;
2207 }
2208
2209
2210 sk_file_header_t *
skStreamGetSilkHeader(const skstream_t * stream)2211 skStreamGetSilkHeader(
2212 const skstream_t *stream)
2213 {
2214 if (!stream->is_silk) {
2215 return NULL;
2216 }
2217 return stream->silk_hdr;
2218 }
2219
2220
2221 int
skStreamGetSupportsIPv6(const skstream_t * stream)2222 skStreamGetSupportsIPv6(
2223 const skstream_t *stream)
2224 {
2225 assert(stream);
2226 return stream->supports_ipv6;
2227 }
2228
2229
2230 off_t
skStreamGetUpperBound(skstream_t * stream)2231 skStreamGetUpperBound(
2232 skstream_t *stream)
2233 {
2234 assert(stream);
2235 assert(stream->fd != -1);
2236
2237 if (stream->io_mode == SK_IO_READ) {
2238 return 0;
2239 }
2240 if (stream->iobuf) {
2241 return stream->pre_iobuf_pos + skIOBufTotalUpperBound(stream->iobuf);
2242 }
2243 return lseek(stream->fd, 0, SEEK_CUR);
2244 }
2245
2246
2247 int
skStreamInitialize(void)2248 skStreamInitialize(
2249 void)
2250 {
2251 const char *env;
2252
2253 env = getenv(SILK_ICMP_SPORT_HANDLER_ENVAR);
2254 if (NULL != env && (0 == strcasecmp(env, "none"))) {
2255 silk_icmp_nochange = 1;
2256 }
2257
2258 #ifdef SILK_CLOBBER_ENVAR
2259 env = getenv(SILK_CLOBBER_ENVAR);
2260 if (NULL != env && *env && *env != '0') {
2261 silk_clobber = 1;
2262 }
2263 #endif
2264
2265 return 0;
2266 }
2267
2268
2269 int
skStreamIsSeekable(const skstream_t * stream)2270 skStreamIsSeekable(
2271 const skstream_t *stream)
2272 {
2273 assert(stream);
2274 return stream->is_seekable;
2275 }
2276
2277
2278 int
skStreamIsStdout(const skstream_t * stream)2279 skStreamIsStdout(
2280 const skstream_t *stream)
2281 {
2282 assert(stream);
2283 return ((SK_IO_WRITE == stream->io_mode)
2284 && (NULL != stream->pathname)
2285 && ((0 == strcmp(stream->pathname, "-"))
2286 || (0 == strcmp(stream->pathname, "stdout"))));
2287 }
2288
2289
2290 int
skStreamLockFile(skstream_t * stream)2291 skStreamLockFile(
2292 skstream_t *stream)
2293 {
2294 struct flock lock;
2295 int rv;
2296
2297 lock.l_start = 0; /* at SOF */
2298 lock.l_whence = SEEK_SET; /* SOF */
2299 lock.l_len = 0; /* EOF */
2300
2301 STREAM_RETURN_IF_NULL(stream);
2302
2303 rv = streamCheckOpen(stream);
2304 if (rv) { goto END; }
2305
2306 /* Don't try to lock anything that is not a real file */
2307 if ( !stream->is_seekable) {
2308 goto END;
2309 }
2310
2311 /* set the lock type and error code if we fail */
2312 if (stream->io_mode == SK_IO_READ) {
2313 lock.l_type = F_RDLCK;
2314 rv = SKSTREAM_ERR_RLOCK;
2315 } else {
2316 lock.l_type = F_WRLCK;
2317 rv = SKSTREAM_ERR_WLOCK;
2318 }
2319
2320 /* get the lock, waiting if we need to */
2321 if (fcntl(stream->fd, F_SETLKW, &lock) == -1) {
2322 /* error */
2323 stream->errnum = errno;
2324 goto END;
2325 }
2326
2327 /* success */
2328 rv = SKSTREAM_OK;
2329
2330 END:
2331 return (stream->last_rv = rv);
2332 }
2333
2334
2335 int
skStreamMakeDirectory(skstream_t * stream)2336 skStreamMakeDirectory(
2337 skstream_t *stream)
2338 {
2339 char dir[PATH_MAX];
2340 int rv;
2341
2342 STREAM_RETURN_IF_NULL(stream);
2343
2344 rv = streamCheckUnopened(stream);
2345 if (rv) { goto END; }
2346
2347 /* Making directory to file only makes sense for writing */
2348 if (stream->io_mode != SK_IO_WRITE) {
2349 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
2350 goto END;
2351 }
2352
2353 if (stream->pathname == NULL) {
2354 rv = SKSTREAM_ERR_NOT_BOUND;
2355 goto END;
2356 }
2357
2358 if (skDirname_r(dir, stream->pathname, sizeof(dir))) {
2359 if ( !skDirExists(dir)) {
2360 rv = skMakeDir(dir);
2361 if (rv) {
2362 stream->errnum = errno;
2363 rv = SKSTREAM_ERR_SYS_MKDIR;
2364 goto END;
2365 }
2366 }
2367 }
2368
2369 END:
2370 return (stream->last_rv = rv);
2371 }
2372
2373
2374 int
skStreamMakeTemp(skstream_t * stream)2375 skStreamMakeTemp(
2376 skstream_t *stream)
2377 {
2378 int rv;
2379
2380 STREAM_RETURN_IF_NULL(stream);
2381
2382 rv = streamCheckUnopened(stream);
2383 if (rv) { goto END; }
2384
2385 /* Temp files only make sense for writing */
2386 if (stream->io_mode != SK_IO_WRITE) {
2387 rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
2388 goto END;
2389 }
2390
2391 if (stream->pathname == NULL) {
2392 rv = SKSTREAM_ERR_NOT_BOUND;
2393 goto END;
2394 }
2395
2396 /* open file */
2397 stream->fd = mkstemp(stream->pathname);
2398 if (stream->fd == -1) {
2399 rv = SKSTREAM_ERR_SYS_MKSTEMP;
2400 stream->errnum = errno;
2401 goto END;
2402 }
2403
2404 rv = streamPostOpen(stream);
2405 if (rv) { goto END; }
2406
2407 END:
2408 return (stream->last_rv = rv);
2409 }
2410
2411
2412 int
skStreamOpen(skstream_t * stream)2413 skStreamOpen(
2414 skstream_t *stream)
2415 {
2416 int rv;
2417
2418 STREAM_RETURN_IF_NULL(stream);
2419
2420 rv = streamCheckUnopened(stream);
2421 if (rv) { goto END; }
2422
2423 if (stream->pathname == NULL) {
2424 rv = SKSTREAM_ERR_NOT_BOUND;
2425 goto END;
2426 }
2427
2428 switch (stream->io_mode) {
2429 case SK_IO_WRITE:
2430 rv = streamOpenWrite(stream);
2431 if (rv) { goto END; }
2432 break;
2433
2434 case SK_IO_READ:
2435 rv = streamOpenRead(stream);
2436 if (rv) { goto END; }
2437 break;
2438
2439 case SK_IO_APPEND:
2440 rv = streamOpenAppend(stream);
2441 if (rv) { goto END; }
2442 break;
2443 }
2444
2445 rv = streamPostOpen(stream);
2446 if (rv) { goto END; }
2447
2448 END:
2449 return (stream->last_rv = rv);
2450 }
2451
2452
2453 /* convenience function to create and open a SiLK flow file */
2454 int
skStreamOpenSilkFlow(skstream_t ** stream,const char * pathname,skstream_mode_t read_write_append)2455 skStreamOpenSilkFlow(
2456 skstream_t **stream,
2457 const char *pathname,
2458 skstream_mode_t read_write_append)
2459 {
2460 int rv;
2461
2462 /* Allocate and initialize the stream */
2463 rv = skStreamCreate(stream, read_write_append, SK_CONTENT_SILK_FLOW);
2464 if (rv) { goto END; }
2465
2466 rv = skStreamBind(*stream, pathname);
2467 if (rv) { goto END; }
2468
2469 rv = skStreamOpen(*stream);
2470 if (rv) { goto END; }
2471
2472 switch ((*stream)->io_mode) {
2473 case SK_IO_WRITE:
2474 break;
2475
2476 case SK_IO_READ:
2477 case SK_IO_APPEND:
2478 rv = skStreamReadSilkHeader(*stream, NULL);
2479 if (rv) {
2480 skStreamClose(*stream);
2481 goto END;
2482 }
2483 break;
2484 }
2485
2486 END:
2487 if (*stream) {
2488 (*stream)->last_rv = rv;
2489 }
2490 return rv;
2491 }
2492
2493
2494 int
skStreamPageOutput(skstream_t * stream,const char * pager)2495 skStreamPageOutput(
2496 skstream_t *stream,
2497 const char *pager)
2498 {
2499 int rv;
2500
2501 STREAM_RETURN_IF_NULL(stream);
2502
2503 rv = streamCheckModifiable(stream);
2504 if (rv) { goto END; }
2505
2506 rv = streamCheckAttributes(stream, SK_IO_WRITE, SK_CONTENT_TEXT);
2507 if (rv) { goto END; }
2508
2509 /* get pager from environment if not passed in */
2510 if (NULL == pager) {
2511 pager = getenv("SILK_PAGER");
2512 if (NULL == pager) {
2513 pager = getenv("PAGER");
2514 }
2515 }
2516
2517 /* a NULL or an empty string pager means do nothing */
2518 if ((NULL == pager) || ('\0' == pager[0])) {
2519 if (stream->pager) {
2520 free(stream->pager);
2521 stream->pager = NULL;
2522 }
2523 goto END;
2524 }
2525
2526 if (stream->pager) {
2527 free(stream->pager);
2528 }
2529 stream->pager = strdup(pager);
2530 if (stream->pager == NULL) {
2531 rv = SKSTREAM_ERR_ALLOC;
2532 goto END;
2533 }
2534
2535 /* if the stream is open, go ahead and invoke the pager now */
2536 if (stream->fd != -1) {
2537 rv = streamPrepareText(stream);
2538 if (rv) { goto END; }
2539 }
2540
2541 END:
2542 return (stream->last_rv = rv);
2543 }
2544
2545
2546 #if !defined(skStreamPrint)
2547 int
skStreamPrint(skstream_t * stream,const char * format,...)2548 skStreamPrint(
2549 skstream_t *stream,
2550 const char *format,
2551 ...)
2552 {
2553 int rv = SKSTREAM_OK;
2554 va_list args;
2555
2556 assert(stream);
2557
2558 va_start(args, format);
2559
2560 if ( !stream->fp) {
2561 rv = streamCheckOpen(stream);
2562 if (rv) { goto END; }
2563
2564 rv = streamCheckAttributes(stream, (SK_IO_WRITE | SK_IO_APPEND),
2565 SK_CONTENT_TEXT);
2566 if (rv) { goto END; }
2567
2568 rv = streamPrepareText(stream);
2569 if (rv) { goto END; }
2570 }
2571 #ifndef NDEBUG
2572 else {
2573 assert(!stream->is_binary);
2574 assert(stream->content_type == SK_CONTENT_TEXT);
2575 assert(stream->io_mode==SK_IO_WRITE || stream->io_mode==SK_IO_APPEND);
2576 assert(stream->fd != -1);
2577 }
2578 #endif /* NDEBUG */
2579
2580 if (vfprintf(stream->fp, format, args) == -1) {
2581 rv = SKSTREAM_ERR_WRITE;
2582 stream->errnum = errno;
2583 }
2584
2585 END:
2586 va_end(args);
2587 return (stream->last_rv = rv);
2588 }
2589 #endif /* !defined(skStreamPrint) */
2590
2591
2592 ssize_t
skStreamRead(skstream_t * stream,void * buf,size_t count)2593 skStreamRead(
2594 skstream_t *stream,
2595 void *buf,
2596 size_t count)
2597 {
2598 ssize_t saw;
2599
2600 assert(stream);
2601 assert(stream->io_mode == SK_IO_READ || stream->io_mode == SK_IO_APPEND);
2602 assert(stream->fd != -1);
2603
2604 if (stream->iobuf) {
2605 saw = skIOBufRead(stream->iobuf, buf, count);
2606 if (saw >= 0) {
2607 return (stream->last_rv = saw);
2608 }
2609 if (stream->is_iobuf_error) {
2610 stream->is_iobuf_error = 0;
2611 } else {
2612 stream->err_info = SKSTREAM_ERR_IOBUF;
2613 }
2614 return (stream->last_rv = saw);
2615 }
2616 if (buf == NULL) {
2617 return (stream->last_rv = streamReadNullBuffer(stream, count));
2618 }
2619 #if SK_ENABLE_ZLIB
2620 if (stream->gz != NULL) {
2621 saw = streamGZRead(stream, buf, count);
2622 if (saw == -1) {
2623 stream->is_iobuf_error = 0;
2624 }
2625 return (stream->last_rv = saw);
2626 }
2627 #endif
2628
2629 saw = skreadn(stream->fd, buf, count);
2630 if (saw == -1) {
2631 stream->errnum = errno;
2632 stream->err_info = SKSTREAM_ERR_READ;
2633 }
2634 return (stream->last_rv = saw);
2635 }
2636
2637
2638 void *
skStreamReadToEndOfFile(skstream_t * stream,ssize_t * count)2639 skStreamReadToEndOfFile(
2640 skstream_t *stream,
2641 ssize_t *count)
2642 {
2643 #define READTOEND_BUFSIZE 1024
2644
2645 uint8_t *buf = NULL;
2646 uint8_t *bp;
2647 ssize_t saw;
2648 ssize_t total = 0;
2649 size_t bufsize = 0;
2650
2651 for (;;) {
2652 if (bufsize < 4 * READTOEND_BUFSIZE) {
2653 bufsize += READTOEND_BUFSIZE;
2654 } else {
2655 bufsize += bufsize >> 1;
2656 }
2657 bp = (uint8_t*)realloc(buf, bufsize);
2658 if (NULL == bp) {
2659 stream->errnum = errno;
2660 stream->err_info = SKSTREAM_ERR_ALLOC;
2661 stream->last_rv = stream->err_info;
2662 break;
2663 }
2664 buf = bp;
2665 bp += total;
2666
2667 saw = skStreamRead(stream, bp, (bufsize - total));
2668 if (-1 == saw) {
2669 stream->last_rv = saw;
2670 break;
2671 }
2672
2673 total += saw;
2674 if (saw < (ssize_t)(bufsize - total)) {
2675 *count = total;
2676 buf[total] = '\0';
2677 return buf;
2678 }
2679 }
2680
2681 /* only get here on error */
2682 if (buf) {
2683 free(buf);
2684 }
2685 return NULL;
2686 }
2687
2688
2689 int
skStreamReadRecord(skstream_t * stream,rwGenericRec_V5 * rwrec)2690 skStreamReadRecord(
2691 skstream_t *stream,
2692 rwGenericRec_V5 *rwrec)
2693 {
2694 #ifndef SK_HAVE_ALIGNED_ACCESS_REQUIRED
2695 uint8_t ar[SK_MAX_RECORD_SIZE];
2696 #else
2697 /* force 'ar' to be aligned on an 8byte boundary, since we treat
2698 * it as an rwRec and need to access the 64bit sTime. */
2699 union force_align_un {
2700 uint8_t fa_ar[SK_MAX_RECORD_SIZE];
2701 uint64_t fa_u64;
2702 } force_align;
2703 uint8_t *ar = force_align.fa_ar;
2704 #endif /* SK_HAVE_ALIGNED_ACCESS_REQUIRED */
2705
2706 ssize_t saw;
2707 int rv = SKSTREAM_OK;
2708
2709 if (stream->is_eof) {
2710 rv = SKSTREAM_ERR_EOF;
2711 goto END;
2712 }
2713
2714 assert(stream);
2715 assert(stream->io_mode == SK_IO_READ || stream->io_mode == SK_IO_APPEND);
2716 assert(stream->content_type != SK_CONTENT_TEXT);
2717 assert(stream->is_binary);
2718 assert(stream->fd != -1);
2719
2720 /* label is used by the IPv6 policy to ignore a record */
2721 NEXT_RECORD:
2722
2723 /* read the packed record as a byte array */
2724 if (stream->iobuf) {
2725 /* avoid function call for the common case */
2726 saw = skIOBufRead(stream->iobuf, ar, stream->recLen);
2727 } else {
2728 saw = skStreamRead(stream, ar, stream->recLen);
2729 }
2730 if (saw != (ssize_t)stream->recLen) {
2731 /* EOF or error */
2732 stream->is_eof = 1;
2733
2734 if (saw == 0) {
2735 /* 0 means clean record boundary; simple EOF */
2736 rv = SKSTREAM_ERR_EOF;
2737 } else if (saw == -1) {
2738 /* error */
2739 if (stream->iobuf) {
2740 rv = SKSTREAM_ERR_IOBUF;
2741 } else {
2742 rv = -1;
2743 }
2744 } else {
2745 /* short read */
2746 stream->errobj.num = saw;
2747 rv = SKSTREAM_ERR_READ_SHORT;
2748 }
2749 goto END;
2750 }
2751
2752 /* clear the caller's record */
2753 RWREC_CLEAR(rwrec);
2754
2755 /* convert the byte array to an rwRec in native byte order */
2756 #if SK_ENABLE_IPV6
2757 stream->rwUnpackFn(stream, rwrec, ar);
2758 #else
2759 if (stream->rwUnpackFn(stream, rwrec, ar) == SKSTREAM_ERR_UNSUPPORT_IPV6) {
2760 goto NEXT_RECORD;
2761 }
2762 #endif
2763
2764 /* Handle incorrectly encoded ICMP Type/Code unless the
2765 * SILK_ICMP_SPORT_HANDLER environment variable is set to none. */
2766 if (rwRecIsICMP(rwrec)
2767 && rwRecGetSPort(rwrec) != 0
2768 && rwRecGetDPort(rwrec) == 0
2769 && !silk_icmp_nochange)
2770 {
2771 /*
2772 * There are two ways for the ICMP Type/Code to appear in
2773 * "sPort" instead of in "dPort".
2774 *
2775 * (1) The flow was an IPFIX bi-flow record read prior to
2776 * SiLK-3.4.0 where the sPort and dPort of the second record
2777 * were reversed when they should not have been. Here, the
2778 * sPort contains ((type<<8)|code).
2779 *
2780 * (2) The flow was a NetFlowV5 record read from a buggy
2781 * Cisco router and read prior to SiLK-0.8.0. Here, the
2782 * sPort contains ((code<<8)|type).
2783 *
2784 * The following assumes buggy ICMP flow records were created
2785 * from IPFIX sources unless they were created prior to SiLK
2786 * 1.0 and appear in certain file formats more closely
2787 * associated with NetFlowV5.
2788 *
2789 * Prior to SiLK-3.4.0, the buggy ICMP record would propagate
2790 * through the tool suite and be written to binary output
2791 * files. As of 3.4.0, we modify the record on read.
2792 */
2793 if (skHeaderGetFileVersion(stream->silk_hdr) >= 16) {
2794 /* File created by SiLK 1.0 or later; most likely the
2795 * buggy value originated from an IPFIX source. */
2796 rwRecSetDPort(rwrec, rwRecGetSPort(rwrec));
2797 } else {
2798 switch(skHeaderGetFileFormat(stream->silk_hdr)) {
2799 case FT_RWFILTER:
2800 case FT_RWNOTROUTED:
2801 case FT_RWROUTED:
2802 case FT_RWSPLIT:
2803 case FT_RWWWW:
2804 /* Most likely from a PDU source */
2805 rwRecSetDPort(rwrec, BSWAP16(rwRecGetSPort(rwrec)));
2806 break;
2807 default:
2808 /* Assume it is from an IPFIX source */
2809 rwRecSetDPort(rwrec, rwRecGetSPort(rwrec));
2810 break;
2811 }
2812 }
2813 rwRecSetSPort(rwrec, 0);
2814 }
2815
2816 /* Write to the copy-input stream */
2817 if (stream->copyInputFD) {
2818 skStreamWriteRecord(stream->copyInputFD, rwrec);
2819 }
2820
2821 /* got a record */
2822 ++stream->rec_count;
2823
2824 #if SK_ENABLE_IPV6
2825 switch (stream->v6policy) {
2826 case SK_IPV6POLICY_MIX:
2827 break;
2828
2829 case SK_IPV6POLICY_IGNORE:
2830 if (rwRecIsIPv6(rwrec)) {
2831 goto NEXT_RECORD;
2832 }
2833 break;
2834
2835 case SK_IPV6POLICY_ASV4:
2836 if (rwRecIsIPv6(rwrec)) {
2837 if (rwRecConvertToIPv4(rwrec)) {
2838 goto NEXT_RECORD;
2839 }
2840 }
2841 break;
2842
2843 case SK_IPV6POLICY_FORCE:
2844 if (!rwRecIsIPv6(rwrec)) {
2845 rwRecConvertToIPv6(rwrec);
2846 }
2847 break;
2848
2849 case SK_IPV6POLICY_ONLY:
2850 if (!rwRecIsIPv6(rwrec)) {
2851 goto NEXT_RECORD;
2852 }
2853 break;
2854 }
2855 #endif /* SK_ENABLE_IPV6 */
2856
2857 END:
2858 return (stream->last_rv = rv);
2859 }
2860
2861
2862 int
skStreamReadSilkHeader(skstream_t * stream,sk_file_header_t ** hdr)2863 skStreamReadSilkHeader(
2864 skstream_t *stream,
2865 sk_file_header_t **hdr)
2866 {
2867 int rv = SKSTREAM_OK;
2868
2869 STREAM_RETURN_IF_NULL(stream);
2870
2871 if (!stream->is_dirty) {
2872 rv = skStreamReadSilkHeaderStart(stream);
2873 if (rv) { goto END; }
2874 } else if (!stream->is_silk) {
2875 rv = SKSTREAM_ERR_UNSUPPORT_CONTENT;
2876 goto END;
2877 }
2878
2879 if (hdr) {
2880 *hdr = stream->silk_hdr;
2881 }
2882
2883 /* only read the header one time */
2884 if (stream->have_hdr) {
2885 goto END;
2886 }
2887
2888 rv = skHeaderReadEntries(stream, stream->silk_hdr);
2889 if (rv) { goto END; }
2890
2891 skHeaderSetLock(stream->silk_hdr, SKHDR_LOCK_FIXED);
2892
2893 if (stream->is_silk_flow) {
2894 /* swap bytes? */
2895 stream->swapFlag = !skHeaderIsNativeByteOrder(stream->silk_hdr);
2896
2897 /* Cache values from the packedfile header */
2898 streamCacheHeader(stream);
2899
2900 /* Set pointers to the PackFn and UnpackFn functions for this
2901 * file format. */
2902 rv = streamPrepareFormat(stream);
2903 if (rv) { goto END; }
2904
2905 assert(stream->recLen > 0);
2906 assert(stream->recLen <= SK_MAX_RECORD_SIZE);
2907 }
2908
2909 /* Move to end of file is stream was open for append */
2910 if (stream->io_mode == SK_IO_APPEND) {
2911 if (-1 == lseek(stream->fd, 0, SEEK_END)) {
2912 stream->errnum = errno;
2913 rv = SKSTREAM_ERR_SYS_LSEEK;
2914 goto END;
2915 }
2916 }
2917
2918 /* we have the complete header */
2919 stream->have_hdr = 1;
2920
2921 rv = streamIOBufCreate(stream);
2922 if (rv) { goto END; }
2923
2924 END:
2925 return (stream->last_rv = rv);
2926 }
2927
2928
2929 int
skStreamReadSilkHeaderStart(skstream_t * stream)2930 skStreamReadSilkHeaderStart(
2931 skstream_t *stream)
2932 {
2933 int rv;
2934 int flows_required;
2935
2936 STREAM_RETURN_IF_NULL(stream);
2937
2938 rv = streamCheckOpen(stream);
2939 if (rv) { goto END; }
2940
2941 if (stream->is_dirty) {
2942 rv = SKSTREAM_ERR_PREV_DATA;
2943 goto END;
2944 }
2945
2946 rv = streamCheckAttributes(stream, (SK_IO_READ | SK_IO_APPEND),
2947 (SK_CONTENT_SILK | SK_CONTENT_SILK_FLOW));
2948 if (rv) { goto END; }
2949
2950 stream->is_dirty = 1;
2951
2952 rv = skHeaderReadStart(stream, stream->silk_hdr);
2953 if (rv) { goto END; }
2954
2955 /* check whether this stream contains flow data */
2956 flows_required = stream->is_silk_flow;
2957 STREAM_SET_IS_SILK_FLOW(stream);
2958 if (flows_required && !stream->is_silk_flow) {
2959 rv = SKSTREAM_ERR_REQUIRE_SILK_FLOW;
2960 goto END;
2961 }
2962
2963 skHeaderSetLock(stream->silk_hdr, SKHDR_LOCK_ENTRY_OK);
2964
2965 END:
2966 return (stream->last_rv = rv);
2967 }
2968
2969
2970 int
skStreamSetCommentStart(skstream_t * stream,const char * comment_start)2971 skStreamSetCommentStart(
2972 skstream_t *stream,
2973 const char *comment_start)
2974 {
2975 int rv;
2976
2977 STREAM_RETURN_IF_NULL(stream);
2978
2979 rv = streamCheckAttributes(stream, SK_IO_READ, SK_CONTENT_TEXT);
2980 if (rv) { goto END; }
2981
2982 /* clear existing value */
2983 if (stream->comment_start) {
2984 free(stream->comment_start);
2985 }
2986
2987 /* set to new value */
2988 if (comment_start == NULL) {
2989 stream->comment_start = NULL;
2990 } else {
2991 stream->comment_start = strdup(comment_start);
2992 if (stream->comment_start == NULL) {
2993 rv = SKSTREAM_ERR_ALLOC;
2994 goto END;
2995 }
2996 }
2997
2998 END:
2999 return (stream->last_rv = rv);
3000 }
3001
3002
3003 int
skStreamSetCopyInput(skstream_t * read_stream,skstream_t * write_stream)3004 skStreamSetCopyInput(
3005 skstream_t *read_stream,
3006 skstream_t *write_stream)
3007 {
3008 assert(read_stream);
3009 assert(read_stream->is_silk_flow);
3010
3011 if (write_stream == NULL) {
3012 return (read_stream->last_rv = SKSTREAM_ERR_NULL_ARGUMENT);
3013 }
3014 if (read_stream->copyInputFD) {
3015 return (read_stream->last_rv = SKSTREAM_ERR_PREV_COPYINPUT);
3016 }
3017 if (read_stream->rec_count) {
3018 return (read_stream->last_rv = SKSTREAM_ERR_PREV_DATA);
3019 }
3020
3021 read_stream->copyInputFD = write_stream;
3022 return (read_stream->last_rv = SKSTREAM_OK);
3023 }
3024
3025
3026 int
skStreamSetIPv6Policy(skstream_t * stream,sk_ipv6policy_t policy)3027 skStreamSetIPv6Policy(
3028 skstream_t *stream,
3029 sk_ipv6policy_t policy)
3030 {
3031 int rv;
3032
3033 STREAM_RETURN_IF_NULL(stream);
3034
3035 rv = streamCheckAttributes(stream, 0xFF,
3036 (SK_CONTENT_SILK | SK_CONTENT_SILK_FLOW));
3037 if (rv) { goto END; }
3038
3039 stream->v6policy = policy;
3040
3041 END:
3042 return (stream->last_rv = rv);
3043 }
3044
3045
3046 int
skStreamSetUnbuffered(skstream_t * stream)3047 skStreamSetUnbuffered(
3048 skstream_t *stream)
3049 {
3050 int rv;
3051
3052 STREAM_RETURN_IF_NULL(stream);
3053
3054 rv = streamCheckUnopened(stream);
3055 if (rv) { goto END; }
3056
3057 stream->is_unbuffered = 1;
3058
3059 END:
3060 return (stream->last_rv = rv);
3061 }
3062
3063
3064 int
skStreamSkipRecords(skstream_t * stream,size_t skip_count,size_t * records_skipped)3065 skStreamSkipRecords(
3066 skstream_t *stream,
3067 size_t skip_count,
3068 size_t *records_skipped)
3069 {
3070 size_t local_records_skipped;
3071 ssize_t saw;
3072 ssize_t tmp;
3073 int rv;
3074
3075 STREAM_RETURN_IF_NULL(stream);
3076
3077 rv = streamCheckOpen(stream);
3078 if (rv) { goto END; }
3079
3080 if (stream->is_eof) {
3081 rv = SKSTREAM_ERR_EOF;
3082 goto END;
3083 }
3084
3085 if (NULL == records_skipped) {
3086 records_skipped = &local_records_skipped;
3087 }
3088 *records_skipped = 0;
3089
3090 /* when some other stream is expecting to see the records, we need
3091 * to read each record individually */
3092 if (stream->copyInputFD) {
3093 size_t skipped = skip_count;
3094 rwRec rec;
3095
3096 while ((skipped > 0)
3097 && ((rv = skStreamReadRecord(stream, &rec)) == SKSTREAM_OK))
3098 {
3099 --skipped;
3100 }
3101 *records_skipped = skip_count - skipped;
3102 goto END;
3103 }
3104
3105 if (!stream->iobuf) {
3106 rv = streamSkipRecordsNonIOBuf(stream, skip_count, records_skipped);
3107 goto END;
3108 }
3109
3110 while (skip_count > 0) {
3111 if (skip_count > (size_t)SSIZE_MAX / stream->recLen) {
3112 tmp = SSIZE_MAX;
3113 } else {
3114 tmp = stream->recLen * skip_count;
3115 }
3116
3117 /* read the bytes and check for error or short reads */
3118 saw = skIOBufRead(stream->iobuf, NULL, tmp);
3119 if (saw != tmp) {
3120 /* Either error or an incomplete read--assume end of file */
3121 stream->is_eof = 1;
3122 if (saw == -1) {
3123 /* error */
3124 rv = SKSTREAM_ERR_IOBUF;
3125 goto END;
3126 }
3127 }
3128
3129 /* compute the number of records we actually read, update
3130 * counters, and check for any partially read records. */
3131 tmp = (saw / stream->recLen);
3132 stream->rec_count += tmp;
3133 skip_count -= tmp;
3134 saw -= tmp * stream->recLen;
3135 *records_skipped += tmp;
3136
3137 if (saw != 0) {
3138 stream->errobj.num = saw;
3139 rv = SKSTREAM_ERR_READ_SHORT;
3140 goto END;
3141 }
3142 if (stream->is_eof) {
3143 rv = SKSTREAM_ERR_EOF;
3144 goto END;
3145 }
3146 }
3147
3148 rv = SKSTREAM_OK;
3149
3150 END:
3151 return (stream->last_rv = rv);
3152 }
3153
3154
3155 void
skStreamTeardown(void)3156 skStreamTeardown(
3157 void)
3158 {
3159 /* nothing to do */
3160 }
3161
3162
3163 off_t
skStreamTell(skstream_t * stream)3164 skStreamTell(
3165 skstream_t *stream)
3166 {
3167 off_t pos;
3168 int rv;
3169
3170 STREAM_RETURN_IF_NULL(stream);
3171
3172 rv = streamCheckOpen(stream);
3173 if (rv) {
3174 stream->err_info = rv;
3175 return (stream->last_rv = -1);
3176 }
3177
3178 pos = lseek(stream->fd, 0, SEEK_CUR);
3179 if (pos == (off_t)-1) {
3180 stream->errnum = errno;
3181 stream->err_info = SKSTREAM_ERR_SYS_LSEEK;
3182 }
3183
3184 return (stream->last_rv = pos);
3185 }
3186
3187
3188 int
skStreamTruncate(skstream_t * stream,off_t length)3189 skStreamTruncate(
3190 skstream_t *stream,
3191 off_t length)
3192 {
3193 int rv;
3194
3195 STREAM_RETURN_IF_NULL(stream);
3196
3197 rv = streamCheckOpen(stream);
3198 if (rv) { goto END; }
3199
3200 rv = streamCheckAttributes(stream, (SK_IO_WRITE | SK_IO_APPEND),
3201 (SK_CONTENT_SILK | SK_CONTENT_SILK_FLOW
3202 | SK_CONTENT_OTHERBINARY));
3203 if (rv) { goto END; }
3204
3205 if ( !stream->is_seekable) {
3206 rv = SKSTREAM_ERR_NOT_SEEKABLE;
3207 goto END;
3208 }
3209
3210 /* attempt to truncate the file even if flush fails */
3211 rv = skStreamFlush(stream);
3212 if (ftruncate(stream->fd, length) == -1) {
3213 stream->errnum = errno;
3214 rv = SKSTREAM_ERR_SYS_FTRUNCATE;
3215 }
3216
3217 END:
3218 return (stream->last_rv = rv);
3219 }
3220
3221
3222 int
skStreamUnbind(skstream_t * stream)3223 skStreamUnbind(
3224 skstream_t *stream)
3225 {
3226 int rv = SKSTREAM_OK;
3227
3228 if (!stream) {
3229 return rv;
3230 }
3231 if (stream->fd != -1) {
3232 rv = skStreamClose(stream);
3233 }
3234
3235 if (stream->comment_start) {
3236 free(stream->comment_start);
3237 stream->comment_start = NULL;
3238 }
3239 if (stream->pager) {
3240 free(stream->pager);
3241 stream->pager = NULL;
3242 }
3243 if (stream->pathname) {
3244 free(stream->pathname);
3245 stream->pathname = NULL;
3246 }
3247
3248 return (stream->last_rv = rv);
3249 }
3250
3251
3252 ssize_t
skStreamWrite(skstream_t * stream,const void * buf,size_t count)3253 skStreamWrite(
3254 skstream_t *stream,
3255 const void *buf,
3256 size_t count)
3257 {
3258 ssize_t written;
3259
3260 assert(stream);
3261 assert(stream->io_mode == SK_IO_WRITE || stream->io_mode == SK_IO_APPEND);
3262 assert(stream->is_binary);
3263 assert(stream->fd != -1);
3264 assert(buf);
3265
3266 if (stream->iobuf) {
3267 written = skIOBufWrite(stream->iobuf, buf, count);
3268 if (written >= 0) {
3269 return (stream->last_rv = written);
3270 }
3271 if (stream->is_iobuf_error) {
3272 stream->is_iobuf_error = 0;
3273 } else {
3274 stream->err_info = SKSTREAM_ERR_IOBUF;
3275 }
3276 return (stream->last_rv = written);
3277 }
3278 #if SK_ENABLE_ZLIB
3279 if (stream->gz != NULL) {
3280 written = streamGZWrite(stream, buf, count);
3281 if (written == -1) {
3282 stream->is_iobuf_error = 0;
3283 }
3284 return (stream->last_rv = written);
3285 }
3286 #endif /* SK_ENABLE_ZLIB */
3287
3288 written = skwriten(stream->fd, buf, count);
3289 if (written == -1) {
3290 stream->errnum = errno;
3291 stream->err_info = SKSTREAM_ERR_WRITE;
3292 }
3293 return (stream->last_rv = written);
3294 }
3295
3296
3297 int
skStreamWriteRecord(skstream_t * stream,const rwGenericRec_V5 * rwrec)3298 skStreamWriteRecord(
3299 skstream_t *stream,
3300 const rwGenericRec_V5 *rwrec)
3301 {
3302 #ifndef SK_HAVE_ALIGNED_ACCESS_REQUIRED
3303 uint8_t ar[SK_MAX_RECORD_SIZE];
3304 #else
3305 /* force 'ar' to be aligned on an 8byte boundary, since we treat
3306 * it as an rwRec and need to access the 64bit sTime. */
3307 union force_align_un {
3308 uint8_t fa_ar[SK_MAX_RECORD_SIZE];
3309 uint64_t fa_u64;
3310 } force_align;
3311 uint8_t *ar = force_align.fa_ar;
3312 #endif /* SK_HAVE_ALIGNED_ACCESS_REQUIRED */
3313
3314 #if SK_ENABLE_IPV6
3315 rwRec rec_copy;
3316 #endif
3317
3318 int rv;
3319 const rwRec *rp = rwrec;
3320
3321 assert(stream);
3322 assert(stream->io_mode == SK_IO_WRITE || stream->io_mode == SK_IO_APPEND);
3323 assert(stream->is_silk_flow);
3324 assert(stream->fd != -1);
3325
3326 if (!stream->is_dirty) {
3327 rv = skStreamWriteSilkHeader(stream);
3328 if (rv) {
3329 return (stream->last_rv = rv);
3330 }
3331 }
3332
3333 #if SK_ENABLE_IPV6
3334 if (rwRecIsIPv6(rp)) {
3335 switch (stream->v6policy) {
3336 case SK_IPV6POLICY_MIX:
3337 case SK_IPV6POLICY_FORCE:
3338 case SK_IPV6POLICY_ONLY:
3339 /* flow already IPv6; verify that file format supports it */
3340 if (stream->supports_ipv6 == 0) {
3341 return (stream->last_rv = SKSTREAM_ERR_UNSUPPORT_IPV6);
3342 }
3343 break;
3344
3345 case SK_IPV6POLICY_IGNORE:
3346 /* we're ignoring IPv6, return */
3347 return (stream->last_rv = SKSTREAM_OK);
3348
3349 case SK_IPV6POLICY_ASV4:
3350 /* attempt to convert IPv6 flow to v4 */
3351 memcpy(&rec_copy, rp, sizeof(rwRec));
3352 if (rwRecConvertToIPv4(&rec_copy)) {
3353 return (stream->last_rv = SKSTREAM_OK);
3354 }
3355 rp = &rec_copy;
3356 break;
3357 }
3358 } else {
3359 /* flow is IPv4 */
3360 switch (stream->v6policy) {
3361 case SK_IPV6POLICY_MIX:
3362 case SK_IPV6POLICY_IGNORE:
3363 case SK_IPV6POLICY_ASV4:
3364 /* flow is already IPv4; all file formats supported */
3365 break;
3366
3367 case SK_IPV6POLICY_ONLY:
3368 /* we're ignoring IPv4 flows; return */
3369 return (stream->last_rv = SKSTREAM_OK);
3370
3371 case SK_IPV6POLICY_FORCE:
3372 /* must convert flow to IPv6, but first verify that file
3373 * format supports IPv6 */
3374 if (stream->supports_ipv6 == 0) {
3375 return (stream->last_rv = SKSTREAM_ERR_UNSUPPORT_IPV6);
3376 }
3377 /* convert */
3378 memcpy(&rec_copy, rp, sizeof(rwRec));
3379 rp = &rec_copy;
3380 rwRecConvertToIPv6(&rec_copy);
3381 break;
3382 }
3383 }
3384 #endif /* SK_ENABLE_IPV6 */
3385
3386 /* Convert the record into a byte array in the appropriate byte order */
3387 rv = stream->rwPackFn(stream, rp, ar);
3388 if (rv != SKSTREAM_OK) {
3389 stream->errobj.rec = rwrec;
3390 return (stream->last_rv = rv);
3391 }
3392
3393 /* write the record */
3394 if (stream->iobuf) {
3395 if (skIOBufWrite(stream->iobuf, ar, stream->recLen)
3396 == (ssize_t)stream->recLen)
3397 {
3398 ++stream->rec_count;
3399 return (stream->last_rv = SKSTREAM_OK);
3400 } else if (stream->is_iobuf_error) {
3401 stream->is_iobuf_error = 0;
3402 } else {
3403 stream->err_info = SKSTREAM_ERR_IOBUF;
3404 }
3405 } else {
3406 if (skStreamWrite(stream, ar, stream->recLen)
3407 == (ssize_t)stream->recLen)
3408 {
3409 ++stream->rec_count;
3410 return (stream->last_rv = SKSTREAM_OK);
3411 }
3412 }
3413
3414 return (stream->last_rv = -1);
3415 }
3416
3417
3418 int
skStreamWriteSilkHeader(skstream_t * stream)3419 skStreamWriteSilkHeader(
3420 skstream_t *stream)
3421 {
3422 int rv;
3423 int flows_required;
3424
3425 STREAM_RETURN_IF_NULL(stream);
3426
3427 rv = streamCheckOpen(stream);
3428 if (rv) { goto END; }
3429
3430 if (stream->is_dirty) {
3431 rv = SKSTREAM_ERR_PREV_DATA;
3432 goto END;
3433 }
3434
3435 rv = streamCheckAttributes(stream, SK_IO_WRITE,
3436 (SK_CONTENT_SILK | SK_CONTENT_SILK_FLOW));
3437 if (rv) { goto END; }
3438
3439 if (skHeaderGetFileFormat(stream->silk_hdr) == UINT8_MAX) {
3440 rv = skHeaderSetFileFormat(stream->silk_hdr, DEFAULT_FILE_FORMAT);
3441 if (rv) { goto END; }
3442 }
3443
3444 /* unless a specific compression method was specified, do not use
3445 * compression when writing to a non-seekable destination */
3446 switch (skHeaderGetCompressionMethod(stream->silk_hdr)) {
3447 case SK_COMPMETHOD_DEFAULT:
3448 if (!stream->is_seekable && !stream->is_mpi) {
3449 rv = skHeaderSetCompressionMethod(stream->silk_hdr,
3450 SK_COMPMETHOD_NONE);
3451 } else {
3452 rv = skHeaderSetCompressionMethod(stream->silk_hdr,
3453 skCompMethodGetDefault());
3454 }
3455 if (rv) { goto END; }
3456 break;
3457 case SK_COMPMETHOD_BEST:
3458 if (!stream->is_seekable && !stream->is_mpi) {
3459 rv = skHeaderSetCompressionMethod(stream->silk_hdr,
3460 SK_COMPMETHOD_NONE);
3461 } else {
3462 rv = skHeaderSetCompressionMethod(stream->silk_hdr,
3463 skCompMethodGetBest());
3464 }
3465 if (rv) { goto END; }
3466 break;
3467 default:
3468 break;
3469 }
3470
3471 /* check whether this stream contains flow data */
3472 flows_required = stream->is_silk_flow;
3473 STREAM_SET_IS_SILK_FLOW(stream);
3474 if (flows_required && !stream->is_silk_flow) {
3475 rv = SKSTREAM_ERR_REQUIRE_SILK_FLOW;
3476 goto END;
3477 }
3478
3479 if (stream->is_silk_flow) {
3480 /* handle the case where a specific record type has not yet
3481 * been specified. */
3482 if (skHeaderGetFileFormat(stream->silk_hdr) == UINT8_MAX) {
3483 rv = skHeaderSetFileFormat(stream->silk_hdr, DEFAULT_FILE_FORMAT);
3484 if (rv) { goto END; }
3485 }
3486
3487 /* Set the file version if it is "ANY", and set pointers to
3488 * the PackFn and UnpackFn functions for this file format. */
3489 rv = streamPrepareFormat(stream);
3490 if (rv) { goto END; }
3491
3492 assert(stream->recLen > 0);
3493 assert(stream->recLen <= SK_MAX_RECORD_SIZE);
3494
3495 /* Set the swapFlag */
3496 stream->swapFlag = !skHeaderIsNativeByteOrder(stream->silk_hdr);
3497
3498 /* Cache values from the packedfile header */
3499 streamCacheHeader(stream);
3500 }
3501
3502 stream->is_dirty = 1;
3503 skHeaderSetLock(stream->silk_hdr, SKHDR_LOCK_FIXED);
3504
3505 if ( !stream->is_mpi) {
3506 rv = skHeaderWrite(stream, stream->silk_hdr);
3507 if (rv) { goto END; }
3508 }
3509
3510 rv = streamIOBufCreate(stream);
3511 if (rv) { goto END; }
3512
3513 END:
3514 return (stream->last_rv = rv);
3515 }
3516
3517
3518 /*
3519 * Though not functions on skstream_t, these are used heavily by
3520 * the code. Define them here and hope the compiler inlines them.
3521 */
3522
3523 /* Read count bytes from a file descriptor into buf */
3524 ssize_t
skreadn(int fd,void * buf,size_t count)3525 skreadn(
3526 int fd,
3527 void *buf,
3528 size_t count)
3529 {
3530 ssize_t rv;
3531 size_t left = count;
3532
3533 while (left) {
3534 rv = read(fd, buf, ((left < INT32_MAX) ? left : INT32_MAX));
3535 if (rv == -1) {
3536 if (errno == EINTR) {
3537 continue;
3538 }
3539 return -1;
3540 }
3541 if (rv == 0) {
3542 break;
3543 }
3544 left -= rv;
3545 buf = ((uint8_t *)buf) + rv;
3546 }
3547 return (count - left);
3548 }
3549
3550 /* Read count bytes from buf to a file descriptor */
3551 ssize_t
skwriten(int fd,const void * buf,size_t count)3552 skwriten(
3553 int fd,
3554 const void *buf,
3555 size_t count)
3556 {
3557 ssize_t rv;
3558 size_t left = count;
3559
3560 while (left) {
3561 rv = write(fd, buf, ((left < INT32_MAX) ? left : INT32_MAX));
3562 if (rv == -1) {
3563 if (errno == EINTR) {
3564 continue;
3565 }
3566 return -1;
3567 }
3568 if (rv == 0) {
3569 break;
3570 }
3571 left -= rv;
3572 buf = ((const uint8_t *)buf) + rv;
3573 }
3574 return (count - left);
3575 }
3576
3577
3578 /*
3579 ** Local Variables:
3580 ** mode:c
3581 ** indent-tabs-mode:nil
3582 ** c-basic-offset:4
3583 ** End:
3584 */
3585