1 /*
2 ** Copyright (C) 2006-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8 
9 /*
10 **  skstream.c
11 **      Mark Thomas  July-2006
12 **
13 **      skstream provides a wrapper around file pointers and file
14 **      descriptors.  It handles both textual and binary data.
15 */
16 
17 
18 #include <silk/silk.h>
19 
20 RCSIDENT("$SiLK: skstream.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
21 
22 #include <silk/skstream.h>
23 #include <silk/sksite.h>
24 #include "skheader_priv.h"
25 #include "skiobuf.h"
26 #include "skstream_priv.h"
27 
28 
29 /* LOCAL DEFINES AND TYPEDEFS */
30 
31 #ifndef DEFAULT_FILE_FORMAT
32 #  if SK_ENABLE_IPV6
33 #    define DEFAULT_FILE_FORMAT  FT_RWIPV6ROUTING
34 #  else
35 #    define DEFAULT_FILE_FORMAT  FT_RWGENERIC
36 #  endif
37 #endif
38 
39 
40 /*
41  *    Name of environment variable that affects how to treat ICMP flow
42  *    records.  This variable determines the setting of the
43  *    'silk_icmp_nochange' global.  See the detailed note in
44  *    skStreamReadRecord().
45  */
46 #define SILK_ICMP_SPORT_HANDLER_ENVAR "SILK_ICMP_SPORT_HANDLER"
47 
48 
49 /*
50  *    Return SKSTREAM_ERR_NULL_ARGUMENT when 'srin_stream' is NULL.
51  */
52 #define STREAM_RETURN_IF_NULL(srin_stream) \
53     if (NULL == (srin_stream)) { return SKSTREAM_ERR_NULL_ARGUMENT; }
54 
55 
56 /*
57  *    Set the 'is_silk_flow' bit on 'stream' if the format of the
58  *    header indicates it contains SiLK Flow records.
59  */
60 #define STREAM_SET_IS_SILK_FLOW(stream)                         \
61     switch (skHeaderGetFileFormat((stream)->silk_hdr)) {        \
62       case FT_RWAUGMENTED:                                      \
63       case FT_RWAUGROUTING:                                     \
64       case FT_RWAUGWEB:                                         \
65       case FT_RWAUGSNMPOUT:                                     \
66       case FT_RWFILTER:                                         \
67       case FT_FLOWCAP:                                          \
68       case FT_RWGENERIC:                                        \
69       case FT_RWIPV6:                                           \
70       case FT_RWIPV6ROUTING:                                    \
71       case FT_RWNOTROUTED:                                      \
72       case FT_RWROUTED:                                         \
73       case FT_RWSPLIT:                                          \
74       case FT_RWWWW:                                            \
75         (stream)->is_silk_flow = 1;                             \
76         break;                                                  \
77       default:                                                  \
78         (stream)->is_silk_flow = 0;                             \
79         break;                                                  \
80     }
81 
82 
83 /* LOCAL VARIABLES */
84 
85 /*
86  *    If nonzero, do not attempt process ICMP values in the sPort
87  *    field.  This is 0 unless the SILK_ICMP_SPORT_HANDLER envar is
88  *    set to "none".  See the detailed note in skStreamReadRecord().
89  */
90 static int silk_icmp_nochange = 0;
91 
92 #ifdef SILK_CLOBBER_ENVAR
93 /*
94  *    If nonzero, enable clobbering (overwriting) of existing files
95  */
96 static int silk_clobber = 0;
97 #endif
98 
99 
100 /* LOCAL FUNCTION PROTOTYPES */
101 
102 static ssize_t
103 streamIOBufCallbackRead(
104     skstream_t         *stream,
105     void               *buf,
106     size_t              count);
107 
108 static ssize_t
109 streamIOBufCallbackWrite(
110     skstream_t         *stream,
111     const void         *buf,
112     size_t              count);
113 
114 static off_t
115 streamIOBufCallbackSeek(
116     skstream_t         *stream,
117     off_t               offset,
118     int                 whence);
119 
120 /* FUNCTION DEFINITIONS */
121 
122 /*
123  *    Update 'stream' with the sensor, type, and starting-hour stored
124  *    in the stream's header if 'stream' is bound to a packed hourly
125  *    data file.
126  */
127 static void
streamCacheHeader(skstream_t * stream)128 streamCacheHeader(
129     skstream_t         *stream)
130 {
131     const sk_header_entry_t *hentry;
132 
133     assert(stream);
134     assert(stream->is_silk_flow);
135 
136     hentry = skHeaderGetFirstMatch(stream->silk_hdr, SK_HENTRY_PACKEDFILE_ID);
137     if (hentry) {
138         stream->hdr_starttime = skHentryPackedfileGetStartTime(hentry);
139         stream->hdr_sensor    = skHentryPackedfileGetSensorID(hentry);
140         stream->hdr_flowtype  = skHentryPackedfileGetFlowtypeID(hentry);
141     }
142 }
143 
144 
145 /*
146  *    Return an error string representing the most recent error that
147  *    occurred on 'stream'.
148  *
149  *    This function is a callback invoked by skIOBufStrError().
150  */
151 static const char *
streamCallbackStrerror(skstream_t * stream,int UNUSED (errnum))152 streamCallbackStrerror(
153     skstream_t         *stream,
154     int          UNUSED(errnum))
155 {
156     if (stream->err_info == SKSTREAM_ERR_ZLIB) {
157 #if SK_ENABLE_ZLIB
158         if (stream->gz) {
159             int zerr;
160             return gzerror(stream->gz, &zerr);
161         }
162 #endif  /* SK_ENABLE_ZLIB */
163         return "Interal zlib error";
164     }
165     return strerror(stream->errnum);
166 }
167 
168 
169 /*
170  *  status = streamCheckAttributes(stream, io_mode_list, content_type_list);
171  *
172  */
173 static int
streamCheckAttributes(skstream_t * stream,int io_mode_list,int content_type_list)174 streamCheckAttributes(
175     skstream_t         *stream,
176     int                 io_mode_list,
177     int                 content_type_list)
178 {
179     assert(stream);
180     if ( !(stream->io_mode & io_mode_list)) {
181         return SKSTREAM_ERR_UNSUPPORT_IOMODE;
182     } else if ( !(stream->content_type & content_type_list)) {
183         return SKSTREAM_ERR_UNSUPPORT_CONTENT;
184     } else {
185         return SKSTREAM_OK;
186     }
187 }
188 
189 
190 /*
191  *  status = streamCheckModifiable(stream);
192  *
193  *    Return SKSTREAM_OK if the caller is still allowed to set aspects
194  *    of 'stream'; otherwise return the reason why 'stream' cannot be
195  *    modified.
196  */
197 static int
streamCheckModifiable(skstream_t * stream)198 streamCheckModifiable(
199     skstream_t         *stream)
200 {
201     assert(stream);
202     if (stream->is_closed) {
203         return SKSTREAM_ERR_CLOSED;
204     } else if (stream->is_dirty) {
205         return SKSTREAM_ERR_PREV_DATA;
206     } else {
207         return SKSTREAM_OK;
208     }
209 }
210 
211 
212 /*
213  *  status = streamCheckOpen(stream);
214  *
215  *    Call this function on a stream which you expect to be open; it
216  *    will return SKSTREAM_OK if 'stream' is open, or an error code
217  *    explaining why 'stream' is not open.
218  *
219  *    A stream that has been opened and closed is neither open nor
220  *    unopened.
221  */
222 static int
streamCheckOpen(const skstream_t * stream)223 streamCheckOpen(
224     const skstream_t   *stream)
225 {
226     assert(stream);
227     if (stream->is_closed) {
228         return SKSTREAM_ERR_CLOSED;
229     } else if (stream->fd == -1) {
230         return SKSTREAM_ERR_NOT_OPEN;
231     } else {
232         return SKSTREAM_OK;
233     }
234 }
235 
236 
237 /*
238  *  status = streamCheckUnopened(stream);
239  *
240  *    Call this function on a stream which you expect to be
241  *    unopened---i.e., not yet open.  It will return SKSTREAM_OK if
242  *    'stream' is unopened, or an error code explaining why 'stream'
243  *    is not considered unopened.
244  *
245  *    A stream that has been opened and closed is neither open nor
246  *    unopened.
247  */
248 static int
streamCheckUnopened(const skstream_t * stream)249 streamCheckUnopened(
250     const skstream_t   *stream)
251 {
252     assert(stream);
253     if (stream->is_closed) {
254         return SKSTREAM_ERR_CLOSED;
255     } else if (stream->fd != -1) {
256         return SKSTREAM_ERR_PREV_OPEN;
257     } else {
258         return SKSTREAM_OK;
259     }
260 }
261 
262 
263 #if SK_ENABLE_ZLIB
264 /*
265  *  status = streamGZFlush(stream);
266  *
267  *    Tell the zlib descriptor associated with 'stream' to flush any
268  *    unwritten data to the stream.
269  *
270  *    This function is invoked directly by other skstream functions
271  *    and it is a callback invoked by skIOBufFlush().
272  */
273 static int
streamGZFlush(skstream_t * stream)274 streamGZFlush(
275     skstream_t         *stream)
276 {
277     int zerr = gzflush(stream->gz, Z_SYNC_FLUSH);
278     if (zerr == Z_OK) {
279         return 0;
280     }
281     stream->is_iobuf_error = 1;
282     if (zerr == Z_ERRNO) {
283         stream->errnum = errno;
284         stream->err_info = SKSTREAM_ERR_WRITE;
285     } else {
286         stream->errnum = zerr;
287         stream->err_info = SKSTREAM_ERR_ZLIB;
288     }
289     return -1;
290 }
291 
292 
293 /*
294  *  status = streamGZRead(stream, buf, count);
295  *
296  *    Read 'count' bytes from the zlib descriptor associated with
297  *    'stream' and put them into 'buf'.
298  *
299  *    This function is invoked directly by other skstream functions
300  *    and it is a callback invoked by skIOBufRead().
301  */
302 static ssize_t
streamGZRead(skstream_t * stream,void * buf,size_t count)303 streamGZRead(
304     skstream_t         *stream,
305     void               *buf,
306     size_t              count)
307 {
308     int got = gzread(stream->gz, buf, (unsigned)count);
309     if (got == -1) {
310         stream->is_iobuf_error = 1;
311         (void)gzerror(stream->gz, &stream->errnum);
312         if (stream->errnum == Z_ERRNO) {
313             stream->errnum = errno;
314             stream->err_info = SKSTREAM_ERR_READ;
315         } else {
316             stream->err_info = SKSTREAM_ERR_ZLIB;
317         }
318     }
319     return (ssize_t)got;
320 }
321 
322 
323 /*
324  *  status = streamGZRead(stream, buf, count);
325  *
326  *    Write 'count' bytes from 'buf' to the zlib descriptor associated
327  *    with 'stream'.
328  *
329  *    This function is invoked directly by other skstream functions
330  *    and it is a callback invoked by skIOBufWrite().
331  */
332 static ssize_t
streamGZWrite(skstream_t * stream,const void * buf,size_t count)333 streamGZWrite(
334     skstream_t         *stream,
335     const void         *buf,
336     size_t              count)
337 {
338     int written = gzwrite(stream->gz, buf, (unsigned)count);
339     if (written > 0 || count == 0) {
340         return written;
341     }
342     stream->is_iobuf_error = 1;
343     (void)gzerror(stream->gz, &stream->errnum);
344     if (stream->errnum == Z_ERRNO) {
345         stream->errnum = errno;
346         stream->err_info = SKSTREAM_ERR_WRITE;
347     } else {
348         stream->err_info = SKSTREAM_ERR_ZLIB;
349     }
350     return -1;
351 }
352 #endif  /* SK_ENABLE_ZLIB */
353 
354 
355 /*
356  *  status = streamIOBufCreate(stream);
357  *
358  *    Create the skIOBuf that 'stream' will read-from/write-to, and
359  *    bind it to the file descriptor or gzfile.  Return SKSTREAM_OK on
360  *    success, or an error code on failure.
361  */
362 static int
streamIOBufCreate(skstream_t * stream)363 streamIOBufCreate(
364     skstream_t         *stream)
365 {
366     int rv = SKSTREAM_OK;
367     uint8_t compmethod = SK_COMPMETHOD_NONE;
368     skio_abstract_t io_func;
369 
370     assert(stream);
371     assert(stream->fd != -1);
372 
373     if (stream->is_unbuffered) {
374         goto END;
375     }
376 
377     memset(&io_func, 0, sizeof(skio_abstract_t));
378 
379     /* make certain compression method is available */
380     if (stream->is_silk) {
381         compmethod = skHeaderGetCompressionMethod(stream->silk_hdr);
382         switch (skCompMethodCheck(compmethod)) {
383           case SK_COMPMETHOD_IS_AVAIL:
384             /* known, valid, and available */
385             break;
386           case SK_COMPMETHOD_IS_VALID:
387             /* known and valid but not available */
388             rv = SKSTREAM_ERR_COMPRESS_UNAVAILABLE;
389             goto END;
390           case SK_COMPMETHOD_IS_KNOWN:
391             /* should never be undecided at this point */
392             skAbort();
393           default:
394             rv = SKSTREAM_ERR_COMPRESS_INVALID;
395             goto END;
396         }
397     }
398 
399     /* store location where the IOBuf was enabled */
400     stream->pre_iobuf_pos = lseek(stream->fd, 0, SEEK_CUR);
401 
402     /* create the iobuf */
403     switch (stream->io_mode) {
404       case SK_IO_READ:
405         /* create the buffered reader */
406         stream->iobuf = skIOBufCreate(stream->io_mode);
407         break;
408 
409       case SK_IO_WRITE:
410       case SK_IO_APPEND:
411         stream->iobuf = skIOBufCreate(stream->io_mode);
412         break;
413     }
414     if (stream->iobuf == NULL) {
415         rv = SKSTREAM_ERR_ALLOC;
416         goto END;
417     }
418 
419     /* get the information for SiLK files */
420     if (stream->is_silk) {
421         /* make certain the record size is non-zero */
422         size_t reclen;
423         reclen = skHeaderGetRecordLength(stream->silk_hdr);
424         if (reclen == 0) {
425             reclen = 1;
426             skHeaderSetRecordLength(stream->silk_hdr, reclen);
427         }
428 
429         /* set the record size on the IOBuf */
430         if (-1 == skIOBufSetRecordSize(stream->iobuf, reclen)) {
431             rv = SKSTREAM_ERR_IOBUF;
432             goto END;
433         }
434     }
435 
436     /* bind it to the file descriptor or gzfile */
437 #if SK_ENABLE_ZLIB
438     if (stream->gz) {
439         io_func.read = (skio_read_fn_t)&streamGZRead;
440         io_func.write = (skio_write_fn_t)&streamGZWrite;
441         io_func.seek = NULL;
442         io_func.flush = (skio_flush_fn_t)&streamGZFlush;
443         io_func.strerror = (skio_strerror_fn_t)&streamCallbackStrerror;
444         if (skIOBufBindAbstract(stream->iobuf, stream, compmethod, &io_func)
445             == -1)
446         {
447             rv = SKSTREAM_ERR_IOBUF;
448             goto END;
449         }
450     } else
451 #endif  /* SK_ENABLE_ZLIB */
452     {
453         /* if (skIOBufBind(stream->iobuf, stream->fd, compmethod) == -1) */
454         io_func.read = (skio_read_fn_t)&streamIOBufCallbackRead;
455         io_func.write = (skio_write_fn_t)&streamIOBufCallbackWrite;
456         io_func.seek = (skio_seek_fn_t)&streamIOBufCallbackSeek;
457         io_func.strerror = (skio_strerror_fn_t)&streamCallbackStrerror;
458         if (skIOBufBindAbstract(stream->iobuf, stream, compmethod, &io_func)
459             == -1)
460         {
461             rv = SKSTREAM_ERR_IOBUF;
462             goto END;
463         }
464     }
465 
466   END:
467     return rv;
468 }
469 
470 
471 /*
472  *  status = streamIOBufGetLine(stream, out_buffer, buf_size);
473  *
474  *   Fill 'out_buffer' with the next '\n'-delimited line of text from
475  *   the IOBuf associated with the 'stream'.  The '\n' is replaced
476  *   with '\0'.  If the final input is smaller than 'buf_size' and
477  *   does not contain a '\n' it will be copied into 'out_buffer'.
478  *   Return SKSTREAM_OK on success.
479  *
480  *   If there is no '\n' within the first 'buf_size' characters of the
481  *   input, return SKSTREAM_ERR_LONG_LINE and read from the IOBuf
482  *   until a '\n' is found or until end-of-file is reached.
483  *
484  *   Return SKSTREAM_ERR_EOF when all input data has been processed.
485  *
486  *   Return SKSTREAM_ERR_IOBUF is there is a problem reading from the
487  *   IOBuf.
488  *
489  *   This function mimics fgets().
490  */
491 static int
streamIOBufGetLine(skstream_t * stream,char * out_buffer,size_t buf_size)492 streamIOBufGetLine(
493     skstream_t         *stream,
494     char               *out_buffer,
495     size_t              buf_size)
496 {
497     char *eol = NULL;
498     ssize_t sz;
499     int rv = SKSTREAM_OK;
500 
501     while (eol == NULL) {
502         /* substract 1 for final '\0' */
503         sz = skIOBufReadToChar(stream->iobuf, out_buffer, buf_size-1, '\n');
504         if (sz == -1) {
505             if (stream->is_iobuf_error) {
506                 stream->is_iobuf_error = 0;
507                 rv = stream->err_info;
508             } else {
509                 rv = SKSTREAM_ERR_IOBUF;
510             }
511             break;
512         }
513         if (sz == 0) {
514             rv = SKSTREAM_ERR_EOF;
515             break;
516         }
517         if ((sz == (ssize_t)buf_size-1) && ('\n' != out_buffer[sz-1])) {
518             /* Found no newline in 'buf_size' characters... */
519             rv = SKSTREAM_ERR_LONG_LINE;
520             /* need to read more from skiobuf to find next '\n' */
521             continue;
522         }
523 
524         /* NUL terminate the string, either by replacing '\n' with a
525          * '\0', or by putting a '\0' after the final character. */
526         eol = &out_buffer[sz-1];
527         if (*eol != '\n') {
528             ++eol;
529         }
530         *eol = '\0';
531     }
532 
533     return rv;
534 }
535 
536 
537 /*
538  *  status = streamIOBufCallbackRead(stream, buf, count);
539  *
540  *    Read 'count' bytes from the file descriptor associated with
541  *    'stream' and put them into 'buf'.
542  *
543  *    This function is a callback invoked by skIOBufRead().
544  */
545 static ssize_t
streamIOBufCallbackRead(skstream_t * stream,void * buf,size_t count)546 streamIOBufCallbackRead(
547     skstream_t         *stream,
548     void               *buf,
549     size_t              count)
550 {
551     ssize_t rv;
552 
553     rv = skreadn(stream->fd, buf, count);
554     if (rv == -1) {
555         stream->is_iobuf_error = 1;
556         stream->errnum = errno;
557         stream->err_info = SKSTREAM_ERR_READ;
558     }
559     return rv;
560 }
561 
562 
563 /*
564  *  status = streamIOBufCallbackWrite(stream, buf, count);
565  *
566  *    Write 'count' bytes from 'buf' to the file descriptor associated
567  *    with 'stream'.
568  *
569  *    This function is a callback invoked by skIOBufWrite().
570  */
571 static ssize_t
streamIOBufCallbackWrite(skstream_t * stream,const void * buf,size_t count)572 streamIOBufCallbackWrite(
573     skstream_t         *stream,
574     const void         *buf,
575     size_t              count)
576 {
577     ssize_t rv;
578 
579     rv = skwriten(stream->fd, buf, count);
580     if (rv == -1) {
581         stream->is_iobuf_error = 1;
582         stream->errnum = errno;
583         stream->err_info = SKSTREAM_ERR_WRITE;
584     }
585     return rv;
586 }
587 
588 
589 /*
590  *  status = streamIOBufCallbackSeek(stream, offset, whence);
591  *
592  *    Seeks to a location in 'stream' from 'whence' modified by 'offset'.
593  *
594  *    This function is a callback invoked by skIOBufRead() when the
595  *    buffer in which to return the data is NULL.
596  */
597 static off_t
streamIOBufCallbackSeek(skstream_t * stream,off_t offset,int whence)598 streamIOBufCallbackSeek(
599     skstream_t         *stream,
600     off_t               offset,
601     int                 whence)
602 {
603     off_t rv;
604 
605     rv = lseek(stream->fd, offset, whence);
606     if (rv == (off_t)(-1)) {
607         if (errno != ESPIPE) {
608             stream->is_iobuf_error = 1;
609             stream->errnum = errno;
610             stream->err_info = SKSTREAM_ERR_SYS_LSEEK;
611         }
612     }
613     return rv;
614 }
615 
616 
617 /*
618  *    If a pager has been set on 'stream' and 'stream' is connected to
619  *    a terminal, invoke the pager.
620  */
621 static int
streamInvokePager(skstream_t * stream)622 streamInvokePager(
623     skstream_t         *stream)
624 {
625     int rv;
626     pid_t pid;
627     int wait_status;
628 
629     rv = streamCheckModifiable(stream);
630     if (rv) { goto END; }
631 
632     assert(streamCheckAttributes(stream, SK_IO_WRITE, SK_CONTENT_TEXT)
633            == SKSTREAM_OK);
634 
635     if (stream->pager == NULL) {
636         goto END;
637     }
638 
639     if ( !stream->is_terminal) {
640         goto END;
641     }
642 
643 #if 1
644     /* invoke the pager */
645     stream->fp = popen(stream->pager, "w");
646     if (NULL == stream->fp) {
647         rv = SKSTREAM_ERR_NOPAGER;
648         goto END;
649     }
650 
651     /* see if pager started.  There is a race condition here, and this
652      * assumes we have only one child, which should be true. */
653     pid = wait4(0, &wait_status, WNOHANG, NULL);
654     if (pid) {
655         rv = SKSTREAM_ERR_NOPAGER;
656         goto END;
657     }
658 #else
659     {
660     int pipe_des[2];
661 
662     /* create pipe and fork */
663     if (pipe(pipe_des) == -1) {
664         stream->errnum = errno;
665         rv = SKSTREAM_ERR_SYS_PIPE;
666         goto END;
667     }
668     pid = fork();
669     if (pid < 0) {
670         stream->errnum = errno;
671         rv = SKSTREAM_ERR_SYS_FORK;
672         goto END;
673     }
674 
675     if (pid == 0) {
676         /* CHILD */
677 
678         /* close output side of pipe; set input to stdin */
679         close(pipe_des[1]);
680         if (pipe_des[0] != STDIN_FILENO) {
681             dup2(pipe_des[0], STDIN_FILENO);
682             close(pipe_des[0]);
683         }
684 
685         /* invoke pager */
686         execlp(pager, NULL);
687         skAppPrintErr("Unable to invoke pager '%s': %s",
688                       pager, strerror(errno));
689         _exit(EXIT_FAILURE);
690     }
691 
692     /* PARENT */
693 
694     /* close input side of pipe */
695     close(pipe_des[0]);
696 
697     /* try to open the write side of the pipe */
698     out = fdopen(pipe_des[1], "w");
699     if (NULL == out) {
700         stream->errnum = errno;
701         rv = SKSTREAM_ERR_SYS_FDOPEN;
702         goto END;
703     }
704 
705     /* it'd be nice to have a slight pause here to give child time to
706      * die if command cannot be exec'ed, but it's not worth the
707      * trouble to use select(), and sleep(1) is too long. */
708 
709     /* see if child died unexpectedly */
710     if (waitpid(pid, &wait_status, WNOHANG)) {
711         rv = SKSTREAM_ERR_NOPAGER;
712         goto END;
713     }
714     }
715 #endif /* 1: whether to use popen() */
716 
717     /* looks good. */
718     stream->is_pager_active = 1;
719 
720   END:
721     return rv;
722 }
723 
724 
725 
726 
727 /*
728  *  status = streamOpenAppend(stream);
729  *
730  *    Open the stream for appending.
731  */
732 static int
streamOpenAppend(skstream_t * stream)733 streamOpenAppend(
734     skstream_t         *stream)
735 {
736     int rv = SKSTREAM_OK;
737     int flags = O_RDWR | O_APPEND;
738 
739     assert(stream);
740     assert(stream->pathname);
741 
742     /* Open file for read and write; position at start. */
743     stream->fd = open(stream->pathname, flags, 0);
744     if (stream->fd == -1) {
745         stream->errnum = errno;
746         rv = SKSTREAM_ERR_SYS_OPEN;
747         goto END;
748     }
749     if (-1 == lseek(stream->fd, 0, SEEK_SET)) {
750         stream->errnum = errno;
751         rv = SKSTREAM_ERR_SYS_LSEEK;
752         goto END;
753     }
754 
755   END:
756     return rv;
757 }
758 
759 
760 /*
761  *    Bind the currently open file descriptor to zlib via gzdopen().
762  *    When reading a stream and the underlying file is seekable, do
763  *    not bind the descriptor to zlib when the GZIP magic numbers are
764  *    not present.
765  */
766 static int
streamOpenGzip(skstream_t * stream)767 streamOpenGzip(
768     skstream_t         *stream)
769 {
770     int is_compressed = 1;
771     ssize_t num_read;
772     uint8_t magic[3];
773     int rv = SKSTREAM_OK;
774 
775     if (stream->io_mode == SK_IO_READ && stream->is_seekable) {
776         /* Read the first two characters to look for the GZIP magic
777          * number (31 139 (see RFC1952)) to see if the stream really
778          * is compressed. */
779         num_read = read(stream->fd, magic, 2);
780         if ((num_read != 2) || (magic[0] != 31u) || (magic[1] != 139u)) {
781             /* File does not contain the gzip magic number. */
782             is_compressed = 0;
783         }
784         if (0 != lseek(stream->fd, 0, SEEK_SET)) {
785             rv = SKSTREAM_ERR_SYS_LSEEK;
786             goto END;
787         }
788     }
789 
790     if (is_compressed) {
791 #if SK_ENABLE_ZLIB
792         stream->gz = gzdopen(stream->fd,
793                              (stream->io_mode == SK_IO_READ) ? "rb" : "wb");
794         if (stream->gz == NULL) {
795             rv = SKSTREAM_ERR_ALLOC;
796             goto END;
797         }
798 #else
799         /* compression not supported */
800         rv = SKSTREAM_ERR_COMPRESS_UNAVAILABLE;
801         goto END;
802 #endif /* SK_ENABLE_ZLIB */
803     }
804 
805   END:
806     return rv;
807 }
808 
809 
810 /*
811  *  status = streamOpenRead(stream);
812  *
813  *    Open the stream for reading.
814  */
815 static int
streamOpenRead(skstream_t * stream)816 streamOpenRead(
817     skstream_t         *stream)
818 {
819     int rv = SKSTREAM_OK;
820 
821     assert(stream);
822     assert(stream->pathname);
823     assert(stream->io_mode == SK_IO_READ);
824     assert(-1 == stream->fd);
825 
826     if (stream->is_mpi) {
827         /* for now, just set to a valid value.  we should replace the
828          * checks of 'fd' with an 'is_open' flag */
829         stream->fd = INT32_MAX;
830     } else if ((0 == strcmp(stream->pathname, "stdin"))
831                || (0 == strcmp(stream->pathname, "-")))
832     {
833         stream->fd = STDIN_FILENO;
834         stream->is_stdio = 1;
835     } else {
836         stream->fd = open(stream->pathname, O_RDONLY);
837         if (stream->fd == -1) {
838             rv = SKSTREAM_ERR_SYS_OPEN;
839             stream->errnum = errno;
840             goto END;
841         }
842     }
843 
844   END:
845     /* if something went wrong, close the file */
846     if (rv != SKSTREAM_OK) {
847         if (stream->fd != -1) {
848             close(stream->fd);
849             stream->fd = -1;
850         }
851     }
852     return rv;
853 }
854 
855 
856 static int
streamOpenWrite(skstream_t * stream)857 streamOpenWrite(
858     skstream_t         *stream)
859 {
860     int rv = SKSTREAM_OK;
861 
862     assert(stream);
863     assert(stream->pathname);
864     assert(stream->io_mode == SK_IO_WRITE);
865 
866     if ((0 == strcmp(stream->pathname, "stdout"))
867         || (0 == strcmp(stream->pathname, "-")))
868     {
869         stream->fd = STDOUT_FILENO;
870         stream->is_stdio = 1;
871     } else if (0 == strcmp(stream->pathname, "stderr")) {
872         stream->fd = STDERR_FILENO;
873         stream->is_stdio = 1;
874     } else if (stream->is_mpi) {
875         /* for now, just set to a valid value.  we should replace the
876          * checks of 'fd' with an 'is_open' flag */
877         stream->fd = INT32_MAX;
878     } else {
879         struct stat stbuf;
880         int mode, flags;
881 
882         /* standard mode of 0666 */
883         mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
884 
885         /* assume creating previously non-existent file */
886         flags = O_WRONLY | O_CREAT | O_EXCL;
887 
888         /* try to open as a brand new file */
889         stream->fd = open(stream->pathname, flags, mode);
890         if (stream->fd == -1) {
891             stream->errnum = errno;
892             if ((stream->errnum == EEXIST)
893                 && (0 == stat(stream->pathname, &stbuf)))
894             {
895                 /* file exists.  Try again with different flags when
896                  * the file is a FIFO, the file is a character device
897                  * ("/dev/null"), or the SILK_CLOBBER envar is set. */
898                 if (S_ISFIFO(stbuf.st_mode)) {
899                     flags = O_WRONLY;
900                 } else if (S_ISCHR(stbuf.st_mode)) {
901                     flags = O_WRONLY | O_NOCTTY;
902 #ifdef SILK_CLOBBER_ENVAR
903                 } else if (silk_clobber) {
904                     /* overwrite an existing file */
905                     flags = O_WRONLY | O_TRUNC;
906 #endif  /* SILK_CLOBBER_ENVAR */
907                 } else {
908                     rv = SKSTREAM_ERR_FILE_EXISTS;
909                     goto END;
910                 }
911 
912                 /* try again with the new flags */
913                 stream->fd = open(stream->pathname, flags, mode);
914             }
915 
916             /* if we (still) have an error, return */
917             if (stream->fd == -1) {
918                 /* we set errnum above */
919                 rv = SKSTREAM_ERR_SYS_OPEN;
920                 goto END;
921             }
922         }
923     }
924 
925   END:
926     return rv;
927 }
928 
929 
930 /*
931  *  is_compressed = streamPathnameIsCompressed(pathname, io_mode);
932  *
933  *    Return TRUE if 'pathname' should be considered a compressed file
934  *    for the given IO mode---that is, where the entire file is
935  *    compressed---or FALSE otherwise.
936  *
937  *    Basically, returns TRUE when 'pathname' ends in ".gz" or when it
938  *    is open for reading and contains the substring ".gz."---assuming
939  *    the pathname has had a mkstemp() suffix added to it.  In all
940  *    other cases, it returns FALSE.
941  */
942 static int
streamPathnameIsCompressed(const char * pathname,skstream_mode_t read_write_append)943 streamPathnameIsCompressed(
944     const char         *pathname,
945     skstream_mode_t     read_write_append)
946 {
947     const char *gz;
948 
949     gz = strstr(pathname, ".gz");
950     if (gz != NULL) {
951         switch (read_write_append) {
952           case SK_IO_READ:
953           case SK_IO_APPEND:
954             if (gz[3] == '.') {
955                 return 1;
956             }
957             /* FALLTHROUGH */
958 
959           case SK_IO_WRITE:
960             if (gz[3] == '\0') {
961                 return 1;
962             }
963             break;
964         }
965     }
966     return 0;
967 }
968 
969 
970 static int
streamPostOpen(skstream_t * stream)971 streamPostOpen(
972     skstream_t         *stream)
973 {
974     int rv = SKSTREAM_OK;
975 
976     assert(stream);
977     assert(stream->fd != -1);
978 
979     if (!stream->is_mpi) {
980         if (isatty(stream->fd)) {
981             stream->is_terminal = 1;
982         } else if (lseek(stream->fd, 0, SEEK_CUR) != (off_t)-1) {
983             stream->is_seekable = 1;
984         }
985 
986         /* handle compressed files */
987         if (streamPathnameIsCompressed(stream->pathname, stream->io_mode)) {
988             rv = streamOpenGzip(stream);
989             if (rv) { goto END; }
990         }
991     }
992 
993     /* for a non-silk binary file, create the IOBuf now.  If the
994      * stream was open for appending, seek to the end of the file
995      * before creating the IOBuf */
996     if (stream->content_type == SK_CONTENT_OTHERBINARY) {
997         if (stream->io_mode == SK_IO_APPEND) {
998             if (-1 == lseek(stream->fd, 0, SEEK_END)) {
999                 stream->errnum = errno;
1000                 rv = SKSTREAM_ERR_SYS_LSEEK;
1001                 goto END;
1002             }
1003         }
1004 
1005         rv = streamIOBufCreate(stream);
1006         if (rv) { goto END; }
1007     }
1008 
1009     /* for a text file we are reading, create the IOBuf now */
1010     if (stream->content_type == SK_CONTENT_TEXT
1011         && stream->io_mode == SK_IO_READ)
1012     {
1013         rv = streamIOBufCreate(stream);
1014         if (rv) { goto END; }
1015     }
1016 
1017   END:
1018     return rv;
1019 }
1020 
1021 
1022 /*
1023  *    Invoke the SiLK Flow file format-specific function that sets the
1024  *    rwUnpackFn() and rwPackFn() function pointers on 'stream'.
1025  */
1026 static int
streamPrepareFormat(skstream_t * stream)1027 streamPrepareFormat(
1028     skstream_t         *stream)
1029 {
1030     assert(stream);
1031     assert(stream->is_silk);
1032     assert(stream->silk_hdr);
1033 
1034     switch (skHeaderGetFileFormat(stream->silk_hdr)) {
1035       case FT_RWAUGMENTED:
1036         return augmentedioPrepare(stream);
1037 
1038       case FT_RWAUGROUTING:
1039         return augroutingioPrepare(stream);
1040 
1041       case FT_RWAUGWEB:
1042         return augwebioPrepare(stream);
1043 
1044       case FT_RWAUGSNMPOUT:
1045         return augsnmpoutioPrepare(stream);
1046 
1047       case FT_RWFILTER:
1048         return filterioPrepare(stream);
1049 
1050       case FT_FLOWCAP:
1051         return flowcapioPrepare(stream);
1052 
1053       case FT_RWGENERIC:
1054         return genericioPrepare(stream);
1055 
1056       case FT_RWIPV6:
1057         stream->supports_ipv6 = 1;
1058         return ipv6ioPrepare(stream);
1059 
1060       case FT_RWIPV6ROUTING:
1061         stream->supports_ipv6 = 1;
1062         return ipv6routingioPrepare(stream);
1063 
1064       case FT_RWNOTROUTED:
1065         return notroutedioPrepare(stream);
1066 
1067       case FT_RWROUTED:
1068         return routedioPrepare(stream);
1069 
1070       case FT_RWSPLIT:
1071         return splitioPrepare(stream);
1072 
1073       case FT_RWWWW:
1074         return wwwioPrepare(stream);
1075 
1076       default:
1077         break;
1078     }
1079 
1080     return SKSTREAM_ERR_UNSUPPORT_FORMAT;
1081 }
1082 
1083 
1084 /*
1085  *   Prepare 'stream' for writing textual output.  The function uses
1086  *   fdopen() to get a file pointer for the file descriptor when
1087  *   'stream' is open for write or append.  If a pager is defined for
1088  *   'stream', the pager is invoked.
1089  *
1090  *   For processing textual input, skstream uses an IO Buf.
1091  */
1092 static int
streamPrepareText(skstream_t * stream)1093 streamPrepareText(
1094     skstream_t         *stream)
1095 {
1096     int rv;
1097 
1098     rv = streamCheckOpen(stream);
1099     if (rv) { goto END; }
1100 
1101     assert(!stream->is_binary);
1102 
1103     if (stream->fp == NULL) {
1104         const char *mode = NULL;
1105         switch (stream->io_mode) {
1106           case SK_IO_READ:
1107             break;
1108 
1109           case SK_IO_WRITE:
1110             if (stream->pager) {
1111                 rv = streamInvokePager(stream);
1112                 if (rv) { goto END; }
1113             }
1114             if (stream->fp == NULL) {
1115                 mode = "w";
1116             }
1117             break;
1118 
1119           case SK_IO_APPEND:
1120             mode = "r+";
1121             break;
1122         }
1123         if (mode) {
1124             stream->fp = fdopen(stream->fd, mode);
1125             if (stream->fp == NULL) {
1126                 stream->errnum = errno;
1127                 rv = SKSTREAM_ERR_SYS_FDOPEN;
1128                 goto END;
1129             }
1130         }
1131     }
1132 
1133     stream->is_dirty = 1;
1134 
1135   END:
1136     return rv;
1137 }
1138 
1139 
1140 /*
1141  *  bytes = streamReadNullBuffer(stream, count);
1142  *
1143  *    Read 'count' bytes from 'stream', ignoring the data.  Return the
1144  *    number of bytes read, or -1 for an error.
1145  */
1146 static ssize_t
streamReadNullBuffer(skstream_t * stream,const size_t count)1147 streamReadNullBuffer(
1148     skstream_t         *stream,
1149     const size_t        count)
1150 {
1151     uint8_t buf[65536];
1152     size_t left = count;
1153     size_t wanted;
1154     ssize_t saw;
1155 
1156     assert(stream);
1157     assert(stream->io_mode == SK_IO_READ || stream->io_mode == SK_IO_APPEND);
1158     assert(stream->fd != -1);
1159     assert(stream->iobuf == NULL);
1160 
1161 #if SK_ENABLE_ZLIB
1162     if (stream->gz != NULL) {
1163         while (left) {
1164             /* don't read more than will fit into our buffer */
1165             wanted = ((left < sizeof(buf)) ? left : sizeof(buf));
1166 
1167             saw = streamGZRead(stream, buf, wanted);
1168             if (saw == -1) {
1169                 stream->is_iobuf_error = 0;
1170                 return saw;
1171             }
1172             if (saw == 0) {
1173                 /* no more to read */
1174                 break;
1175             }
1176 
1177             left -= saw;
1178         }
1179 
1180         return (count - left);
1181     }
1182 #endif  /* SK_ENABLE_ZLIB */
1183 
1184     while (left) {
1185         /* don't read more than will fit into our buffer */
1186         wanted = ((left < sizeof(buf)) ? left : sizeof(buf));
1187 
1188         saw = skreadn(stream->fd, buf, wanted);
1189         if (saw == -1) {
1190             stream->errnum = errno;
1191             stream->err_info = SKSTREAM_ERR_READ;
1192             return saw;
1193         }
1194         if (saw == 0) {
1195             /* no more to read */
1196             break;
1197         }
1198         left -= saw;
1199     }
1200 
1201     return (count - left);
1202 }
1203 
1204 
1205 /*
1206  *    Read 'skip_count' records from 'stream'.  If 'records_skipped'
1207  *    is not NULL, fill the location it references with the number of
1208  *    records actually skipped.
1209  *
1210  *    This function is only invoked when an IO Buf is NOT associated
1211  *    with 'stream' and when the 'copyInputFD' member of 'stream' is
1212  *    NULL.
1213  */
1214 static int
streamSkipRecordsNonIOBuf(skstream_t * stream,size_t skip_count,size_t * records_skipped)1215 streamSkipRecordsNonIOBuf(
1216     skstream_t         *stream,
1217     size_t              skip_count,
1218     size_t             *records_skipped)
1219 {
1220 #define SKIP_RECORD_COUNT  1024
1221     uint8_t ar[SKIP_RECORD_COUNT * SK_MAX_RECORD_SIZE];
1222     ssize_t saw;
1223     ssize_t tmp;
1224 
1225     assert(stream);
1226     assert(records_skipped);
1227 
1228     if (stream->is_eof) {
1229         return SKSTREAM_ERR_EOF;
1230     }
1231 
1232     while (skip_count > 0) {
1233         /* can only read the number of records our buffer allows */
1234         if (skip_count > SKIP_RECORD_COUNT) {
1235             tmp = stream->recLen * SKIP_RECORD_COUNT;
1236         } else {
1237             tmp = stream->recLen * skip_count;
1238         }
1239 
1240         /* read the bytes and check for error or short reads */
1241         saw = skStreamRead(stream, ar, tmp);
1242         if (saw != tmp) {
1243             /* Either error or an incomplete read--assume end of file */
1244             stream->is_eof = 1;
1245             if (saw == -1) {
1246                 /* error */
1247                 return -1;
1248             }
1249         }
1250 
1251         /* compute the number of records we actually read, update
1252          * counters, and check for any partially read records. */
1253         tmp = (saw / stream->recLen);
1254         stream->rec_count += tmp;
1255         skip_count -= tmp;
1256         saw -= tmp * stream->recLen;
1257         *records_skipped += tmp;
1258 
1259         if (saw != 0) {
1260             stream->errobj.num = saw;
1261             return SKSTREAM_ERR_READ_SHORT;
1262         }
1263         if (stream->is_eof) {
1264             return SKSTREAM_ERR_EOF;
1265         }
1266     }
1267 
1268     return SKSTREAM_OK;
1269 }
1270 
1271 
1272 /*
1273  * *********************************
1274  * PUBLIC / EXPORTED FUNCTIONS
1275  * *********************************
1276  */
1277 
1278 /*
1279  *  status = skStreamBind(stream, path);
1280  *
1281  *    Set 'stream' to operate on the file specified in 'path'; 'path'
1282  *    may also be one of "stdin", "stdout", or "stderr".  Returns
1283  *    SKSTREAM_OK on success, or an error code on failure.
1284  */
1285 int
skStreamBind(skstream_t * stream,const char * pathname)1286 skStreamBind(
1287     skstream_t         *stream,
1288     const char         *pathname)
1289 {
1290     int rv = SKSTREAM_OK;
1291     FILE *s = NULL;
1292 
1293     /* check name */
1294     if (NULL == stream || NULL == pathname) {
1295         rv = SKSTREAM_ERR_NULL_ARGUMENT;
1296         goto END;
1297     }
1298     if ('\0' == *pathname || strlen(pathname) >= PATH_MAX) {
1299         rv = SKSTREAM_ERR_INVALID_INPUT;
1300         goto END;
1301     }
1302     if (stream->pathname) {
1303         rv = SKSTREAM_ERR_PREV_BOUND;
1304         goto END;
1305     }
1306 
1307     /* copy it into place */
1308     stream->pathname = strdup(pathname);
1309     if (stream->pathname == NULL) {
1310         rv = SKSTREAM_ERR_ALLOC;
1311         goto END;
1312     }
1313 
1314     if (0 == strcmp(pathname, "stdin")) {
1315         switch (stream->io_mode) {
1316           case SK_IO_READ:
1317             if (!stream->is_mpi && stream->is_binary && FILEIsATty(stdin)) {
1318                 rv = SKSTREAM_ERR_ISTERMINAL;
1319                 goto END;
1320             }
1321             break;
1322           case SK_IO_WRITE:
1323           case SK_IO_APPEND:
1324             /* cannot write or append to stdin */
1325             rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1326             goto END;
1327         }
1328     } else if (0 == strcmp(pathname, "stdout")) {
1329         s = stdout;
1330     } else if (0 == strcmp(pathname, "stderr")) {
1331         s = stderr;
1332     } else if (0 == strcmp(pathname, "-")) {
1333         switch (stream->io_mode) {
1334           case SK_IO_READ:
1335             if (!stream->is_mpi && stream->is_binary && FILEIsATty(stdin)) {
1336                 rv = SKSTREAM_ERR_ISTERMINAL;
1337                 goto END;
1338             }
1339             break;
1340           case SK_IO_WRITE:
1341             s = stdout;
1342             break;
1343           case SK_IO_APPEND:
1344             /* cannot append to stdout */
1345             rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1346             goto END;
1347         }
1348     }
1349 
1350     if (s) {
1351         switch (stream->io_mode) {
1352           case SK_IO_READ:
1353           case SK_IO_APPEND:
1354             /* cannot read or append to stdout/stderr */
1355             rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1356             goto END;
1357           case SK_IO_WRITE:
1358             if (!stream->is_mpi && stream->is_binary && FILEIsATty(s)) {
1359                 rv = SKSTREAM_ERR_ISTERMINAL;
1360                 goto END;
1361             }
1362             break;
1363         }
1364     }
1365 
1366     /* cannot append to FIFOs or to gzipped files */
1367     if (stream->io_mode == SK_IO_APPEND) {
1368         if (streamPathnameIsCompressed(stream->pathname, stream->io_mode)) {
1369             rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1370             goto END;
1371         }
1372         if (isFIFO(pathname)) {
1373             /* Cannot append to a FIFO */
1374             rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1375             goto END;
1376         }
1377     }
1378 
1379   END:
1380     return (stream->last_rv = rv);
1381 }
1382 
1383 
1384 int
skStreamCheckCompmethod(skstream_t * stream,sk_msg_fn_t errfn)1385 skStreamCheckCompmethod(
1386     skstream_t         *stream,
1387     sk_msg_fn_t         errfn)
1388 {
1389 #ifdef TEST_PRINTF_FORMATS
1390 #  define P_ERR printf
1391 #else
1392 #  define P_ERR if (!errfn) { } else errfn
1393 #endif
1394     sk_compmethod_t compmethod;
1395 
1396     compmethod = skHeaderGetCompressionMethod(stream->silk_hdr);
1397     switch (skCompMethodCheck(compmethod)) {
1398       case SK_COMPMETHOD_IS_AVAIL:
1399         /* known, valid, and available */
1400         return (stream->last_rv = SKSTREAM_OK);
1401       case SK_COMPMETHOD_IS_VALID:
1402         /* known and valid but not available */
1403         if (errfn) {
1404             char name[64];
1405             skCompMethodGetName(name, sizeof(name), compmethod);
1406             P_ERR("The %s compression method used by '%s' is not available",
1407                   name, stream->pathname);
1408         }
1409         return (stream->last_rv = SKSTREAM_ERR_COMPRESS_UNAVAILABLE);
1410       case SK_COMPMETHOD_IS_KNOWN:
1411         /* this is an undecided value, only valid for write */
1412         if (SK_IO_WRITE == stream->io_mode) {
1413             return (stream->last_rv = SKSTREAM_OK);
1414         }
1415         /* FALLTHROUGH */
1416       default:
1417         if (errfn) {
1418             P_ERR("File '%s' is compressed with an unrecognized method %d",
1419                   stream->pathname, compmethod);
1420         }
1421         return (stream->last_rv = SKSTREAM_ERR_COMPRESS_INVALID);
1422     }
1423 #undef P_ERR
1424 }
1425 
1426 
1427 int
skStreamCheckSilkHeader(skstream_t * stream,sk_file_format_t file_format,sk_file_version_t min_version,sk_file_version_t max_version,sk_msg_fn_t errfn)1428 skStreamCheckSilkHeader(
1429     skstream_t         *stream,
1430     sk_file_format_t    file_format,
1431     sk_file_version_t   min_version,
1432     sk_file_version_t   max_version,
1433     sk_msg_fn_t         errfn)
1434 {
1435 #ifdef TEST_PRINTF_FORMATS
1436 #  define P_ERR printf
1437 #else
1438 #  define P_ERR if (!errfn) { } else errfn
1439 #endif
1440     sk_file_header_t *hdr = stream->silk_hdr;
1441     sk_file_format_t fmt = skHeaderGetFileFormat(hdr);
1442     sk_file_version_t vers = skHeaderGetRecordVersion(hdr);
1443     char fmt_name[SK_MAX_STRLEN_FILE_FORMAT+1];
1444 
1445     /* get the name of the requested format */
1446     skFileFormatGetName(fmt_name, sizeof(fmt_name), file_format);
1447 
1448     if (fmt != file_format) {
1449         P_ERR("File '%s' is not a %s file; format is 0x%02x",
1450               stream->pathname, fmt_name, fmt);
1451         return (stream->last_rv = SKSTREAM_ERR_UNSUPPORT_FORMAT);
1452     }
1453 
1454     if ((vers < min_version) || (vers > max_version)) {
1455         P_ERR("This version of SiLK cannot process the %s v%u file %s",
1456               fmt_name, vers, stream->pathname);
1457         return (stream->last_rv = SKSTREAM_ERR_UNSUPPORT_VERSION);
1458     }
1459 
1460     return (stream->last_rv = skStreamCheckCompmethod(stream, errfn));
1461 #undef errfn
1462 }
1463 
1464 
1465 int
skStreamClose(skstream_t * stream)1466 skStreamClose(
1467     skstream_t         *stream)
1468 {
1469     int rv;
1470 
1471     STREAM_RETURN_IF_NULL(stream);
1472 
1473     rv = streamCheckOpen(stream);
1474     if (rv) { goto END; }
1475 
1476     if (stream->fp) {
1477         if (stream->is_pager_active) {
1478             if (pclose(stream->fp) == -1) {
1479                 stream->errnum = errno;
1480                 if (rv == SKSTREAM_OK) {
1481                     rv = SKSTREAM_ERR_WRITE;
1482                 }
1483             }
1484         } else {
1485             if (EOF == fclose(stream->fp)) {
1486                 stream->errnum = errno;
1487                 if (rv == SKSTREAM_OK) {
1488                     rv = SKSTREAM_ERR_WRITE;
1489                 }
1490             }
1491         }
1492     } else if (stream->fd != -1) {
1493         if (stream->iobuf && stream->io_mode != SK_IO_READ) {
1494             if (skIOBufFlush(stream->iobuf) == -1) {
1495                 if (stream->is_iobuf_error) {
1496                     stream->is_iobuf_error = 0;
1497                     rv = stream->err_info;
1498                 } else {
1499                     rv = SKSTREAM_ERR_IOBUF;
1500                 }
1501             }
1502         }
1503 #if SK_ENABLE_ZLIB
1504         if (stream->gz) {
1505             /* Close the gzFile */
1506             int zerr = gzclose(stream->gz);
1507             stream->gz = NULL;
1508             if (zerr != Z_OK) {
1509                 if (zerr == Z_ERRNO) {
1510                     stream->errnum = errno;
1511                     rv = SKSTREAM_ERR_WRITE;
1512                 } else {
1513                     stream->errnum = zerr;
1514                     rv = SKSTREAM_ERR_ZLIB;
1515                 }
1516             }
1517             /* gzclose() closes the file */
1518             stream->fd = -1;
1519         } else
1520 #endif /* SK_ENABLE_ZLIB */
1521         {
1522             if (stream->is_stdio == 0) {
1523                 if (close(stream->fd) == -1) {
1524                     stream->errnum = errno;
1525                     rv = SKSTREAM_ERR_WRITE;
1526                 }
1527             }
1528         }
1529     }
1530 
1531     stream->fd = -1;
1532     stream->fp = NULL;
1533     stream->is_closed = 1;
1534 
1535   END:
1536     return (stream->last_rv = rv);
1537 }
1538 
1539 
1540 /*
1541  *  status = skStreamCreate(&out_stream, io_mode, content_type);
1542  *
1543  *    Create a stream (skstream_t*) and fill 'out_stream' with the
1544  *    address of the newly allocated stream.  In addition, bind the
1545  *    stream to the given 'path', with IO in the specified 'io_mode'.
1546  *    Return SKSTREAM_OK on success, or an error code on failure.
1547  */
1548 int
skStreamCreate(skstream_t ** new_stream,skstream_mode_t read_write_append,skcontent_t content_type)1549 skStreamCreate(
1550     skstream_t        **new_stream,
1551     skstream_mode_t     read_write_append,
1552     skcontent_t         content_type)
1553 {
1554     if (new_stream == NULL) {
1555         return SKSTREAM_ERR_NULL_ARGUMENT;
1556     }
1557 
1558     *new_stream = (skstream_t*)calloc(1, sizeof(skstream_t));
1559     if (NULL == *new_stream) {
1560         return SKSTREAM_ERR_ALLOC;
1561     }
1562 
1563     if (skHeaderCreate(&((*new_stream)->silk_hdr))) {
1564         free(*new_stream);
1565         *new_stream = NULL;
1566         return SKSTREAM_ERR_ALLOC;
1567     }
1568 
1569     (*new_stream)->io_mode = read_write_append;
1570     (*new_stream)->content_type = content_type;
1571     (*new_stream)->fd = -1;
1572 
1573     /* Native format by default, so don't swap */
1574     (*new_stream)->swapFlag = 0;
1575 
1576     /* Set sensor and flowtype to invalid values */
1577     (*new_stream)->hdr_sensor = SK_INVALID_SENSOR;
1578     (*new_stream)->hdr_flowtype = SK_INVALID_FLOWTYPE;
1579 
1580     switch (content_type) {
1581       case SK_CONTENT_TEXT:
1582         break;
1583 
1584       case SK_CONTENT_SILK_FLOW:
1585         (*new_stream)->is_silk_flow = 1;
1586         /* FALLTHROUGH */
1587 
1588       case SK_CONTENT_SILK:
1589         (*new_stream)->is_silk = 1;
1590         /* FALLTHROUGH */
1591 
1592       case SK_CONTENT_OTHERBINARY:
1593         (*new_stream)->is_binary = 1;
1594         break;
1595     }
1596 
1597     return ((*new_stream)->last_rv = SKSTREAM_OK);
1598 }
1599 
1600 
1601 int
skStreamDestroy(skstream_t ** stream)1602 skStreamDestroy(
1603     skstream_t        **stream)
1604 {
1605     int rv;
1606 
1607     if ((NULL == stream) || (NULL == *stream)) {
1608         return SKSTREAM_OK;
1609     }
1610 
1611     rv = skStreamUnbind(*stream);
1612 
1613     /* Destroy the iobuf */
1614     if ((*stream)->iobuf) {
1615         skIOBufDestroy((*stream)->iobuf);
1616         (*stream)->iobuf = NULL;
1617     }
1618 
1619     /* Destroy the header */
1620     skHeaderDestroy(&((*stream)->silk_hdr));
1621 
1622     /* Free the pathname */
1623     if ((*stream)->pathname) {
1624         free((*stream)->pathname);
1625         (*stream)->pathname = NULL;
1626     }
1627 
1628     free(*stream);
1629     *stream = NULL;
1630 
1631     return rv;
1632 }
1633 
1634 
1635 int
skStreamFDOpen(skstream_t * stream,int file_desc)1636 skStreamFDOpen(
1637     skstream_t         *stream,
1638     int                 file_desc)
1639 {
1640     int rv;
1641 
1642     STREAM_RETURN_IF_NULL(stream);
1643 
1644     rv = streamCheckUnopened(stream);
1645     if (rv) { goto END; }
1646 
1647     if (stream->pathname == NULL) {
1648         rv = SKSTREAM_ERR_NOT_BOUND;
1649         goto END;
1650     }
1651 
1652     if (file_desc == -1) {
1653         rv = SKSTREAM_ERR_INVALID_INPUT;
1654         goto END;
1655     }
1656 
1657     /* Check file modes */
1658     rv = fcntl(file_desc, F_GETFL, 0);
1659     if (rv == -1) {
1660         stream->errnum = errno;
1661         rv = SKSTREAM_ERR_SYS_FCNTL_GETFL;
1662         goto END;
1663     }
1664     switch (stream->io_mode) {
1665       case SK_IO_READ:
1666         if ((rv & O_ACCMODE) == O_WRONLY) {
1667             rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1668             goto END;
1669         }
1670         break;
1671       case SK_IO_WRITE:
1672         if (((rv & O_ACCMODE) == O_RDONLY) || (rv & O_APPEND)) {
1673             rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1674             goto END;
1675         }
1676         break;
1677       case SK_IO_APPEND:
1678         if (((rv & O_ACCMODE) != O_RDWR) || !(rv & O_APPEND)) {
1679             rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
1680             goto END;
1681         }
1682         break;
1683     }
1684 
1685     /* Check tty status if binary */
1686     if (stream->is_binary && isatty(file_desc)) {
1687         rv = SKSTREAM_ERR_ISTERMINAL;
1688         goto END;
1689     }
1690 
1691     /* Seek to beginning on append for the header.  Check this after
1692      * the tty status check, because that is a more useful error
1693      * message. */
1694     if ((stream->io_mode == SK_IO_APPEND)
1695         && (-1 == lseek(file_desc, 0, SEEK_SET)))
1696     {
1697         stream->errnum = errno;
1698         rv = SKSTREAM_ERR_SYS_LSEEK;
1699         goto END;
1700     }
1701 
1702     stream->fd = file_desc;
1703 
1704     rv = streamPostOpen(stream);
1705     if (rv) { goto END; }
1706 
1707   END:
1708     return (stream->last_rv = rv);
1709 }
1710 
1711 
1712 int
skStreamFlush(skstream_t * stream)1713 skStreamFlush(
1714     skstream_t         *stream)
1715 {
1716     int rv;
1717 
1718     STREAM_RETURN_IF_NULL(stream);
1719 
1720     rv = streamCheckOpen(stream);
1721     if (rv) { goto END; }
1722 
1723     if (stream->io_mode == SK_IO_READ) {
1724         /* nothing to do for a reader */
1725         goto END;
1726     }
1727 
1728     if (stream->fp) {
1729         if (EOF == fflush(stream->fp)) {
1730             stream->errnum = errno;
1731             rv = SKSTREAM_ERR_WRITE;
1732         }
1733     } else if (stream->iobuf) {
1734         if (skIOBufFlush(stream->iobuf) == -1) {
1735             if (stream->is_iobuf_error) {
1736                 stream->is_iobuf_error = 0;
1737                 rv = stream->err_info;
1738             } else {
1739                 rv = SKSTREAM_ERR_IOBUF;
1740             }
1741             goto END;
1742         }
1743 #if SK_ENABLE_ZLIB
1744     } else if (stream->gz) {
1745         if (streamGZFlush(stream) == -1) {
1746             stream->is_iobuf_error = 0;
1747             rv = stream->err_info;
1748         }
1749 #endif  /* SK_ENABLE_ZLIB */
1750     }
1751 
1752   END:
1753     return (stream->last_rv = rv);
1754 }
1755 
1756 
1757 /* return the content type */
1758 skcontent_t
skStreamGetContentType(const skstream_t * stream)1759 skStreamGetContentType(
1760     const skstream_t   *stream)
1761 {
1762     assert(stream);
1763     return stream->content_type;
1764 }
1765 
1766 
1767 /* return the file descriptor */
1768 int
skStreamGetDescriptor(const skstream_t * stream)1769 skStreamGetDescriptor(
1770     const skstream_t   *stream)
1771 {
1772     assert(stream);
1773     return stream->fd;
1774 }
1775 
1776 
1777 /* return the cached errno value */
1778 int
skStreamGetLastErrno(const skstream_t * stream)1779 skStreamGetLastErrno(
1780     const skstream_t   *stream)
1781 {
1782     assert(stream);
1783     return stream->errnum;
1784 }
1785 
1786 
1787 /* return the cached return value */
1788 ssize_t
skStreamGetLastReturnValue(const skstream_t * stream)1789 skStreamGetLastReturnValue(
1790     const skstream_t   *stream)
1791 {
1792     assert(stream);
1793     return stream->last_rv;
1794 }
1795 
1796 
1797 /* fill 'value' with the limit implied by the error code */
1798 int
skStreamGetLimit(const skstream_t * stream,int limit_id,int64_t * value)1799 skStreamGetLimit(
1800     const skstream_t   *stream,
1801     int                 limit_id,
1802     int64_t            *value)
1803 {
1804     sk_file_format_t file_format;
1805     sk_file_version_t rec_version;
1806     int rv = SKSTREAM_OK;
1807 
1808     assert(stream);
1809 
1810     if (!stream->is_silk_flow || !stream->silk_hdr) {
1811         rv = SKSTREAM_ERR_REQUIRE_SILK_FLOW;
1812         goto END;
1813     }
1814 
1815     file_format = skHeaderGetFileFormat(stream->silk_hdr);
1816     rec_version = skHeaderGetRecordVersion(stream->silk_hdr);
1817     if (UINT8_MAX == file_format) {
1818         file_format = DEFAULT_FILE_FORMAT;
1819     }
1820 
1821     switch (limit_id) {
1822       case SKSTREAM_ERR_PKTS_ZERO:
1823         /* The record contains a 0 value in the packets field. */
1824         *value = 1;
1825         break;
1826 
1827       case SKSTREAM_ERR_STIME_UNDRFLO:
1828         /* The record's start time is less than the file's start
1829          * time */
1830         switch (file_format) {
1831           case FT_RWAUGMENTED:
1832           case FT_RWAUGROUTING:
1833           case FT_RWAUGSNMPOUT:
1834           case FT_RWAUGWEB:
1835           case FT_RWNOTROUTED:
1836           case FT_RWROUTED:
1837           case FT_RWSPLIT:
1838           case FT_RWWWW:
1839             *value = stream->hdr_starttime;
1840             break;
1841           default:
1842             *value = 0;
1843             break;
1844         }
1845         break;
1846 
1847       case SKSTREAM_ERR_STIME_OVRFLO:
1848         /* The record's start time at least an hour greater than the
1849          * file's start time */
1850         *value = (int64_t)sktimeCreate(UINT32_MAX, 0);
1851         switch (file_format) {
1852           case FT_RWAUGMENTED:
1853           case FT_RWAUGROUTING:
1854           case FT_RWAUGSNMPOUT:
1855           case FT_RWAUGWEB:
1856           case FT_RWNOTROUTED:
1857           case FT_RWROUTED:
1858           case FT_RWSPLIT:
1859           case FT_RWWWW:
1860             *value = (stream->hdr_starttime
1861                       + sktimeCreate((MAX_START_TIME - 1), 0));
1862             break;
1863           case FT_RWGENERIC:
1864             switch (rec_version) {
1865               case 5:
1866                 *value = INT64_MAX;
1867                 break;
1868             }
1869             break;
1870           case FT_RWIPV6:
1871             switch (rec_version) {
1872               case 2:
1873                 *value = (stream->hdr_starttime
1874                           + sktimeCreate((MAX_START_TIME - 1), 0));
1875                 break;
1876               default:
1877                 *value = INT64_MAX;
1878                 break;
1879             }
1880             break;
1881           case FT_RWIPV6ROUTING:
1882             *value = INT64_MAX;
1883             break;
1884         }
1885         break;
1886 
1887       case SKSTREAM_ERR_ELPSD_OVRFLO:
1888           /* The record's elapsed time is greater than space allocated
1889            * for duration in this file format */
1890         switch (file_format) {
1891           case FT_RWAUGMENTED:
1892           case FT_RWAUGROUTING:
1893           case FT_RWAUGSNMPOUT:
1894             if (rec_version <= 4) {
1895                 *value = (MAX_ELAPSED_TIME - 1) * 1000;
1896             } else {
1897                 *value = (int64_t)UINT32_MAX;
1898             }
1899             break;
1900           case FT_RWAUGWEB:
1901             if (rec_version <= 4) {
1902                 *value = (MAX_ELAPSED_TIME - 1) * 1000;
1903             } else {
1904                 *value = (int64_t)MASKARRAY_30;
1905             }
1906             break;
1907           case FT_FLOWCAP:
1908             *value = UINT16_MAX * 1000;
1909             break;
1910           case FT_RWFILTER:
1911           case FT_RWNOTROUTED:
1912           case FT_RWROUTED:
1913           case FT_RWSPLIT:
1914           case FT_RWWWW:
1915             switch (rec_version) {
1916               case 1:
1917               case 2:
1918                 *value = (MAX_ELAPSED_TIME_OLD - 1) * 1000;
1919                 break;
1920               default:
1921                 *value = (MAX_ELAPSED_TIME - 1) * 1000;
1922                 break;
1923             }
1924             break;
1925           default:
1926             *value = (int64_t)UINT32_MAX;
1927             break;
1928         }
1929         break;
1930 
1931       case SKSTREAM_ERR_PKTS_OVRFLO:
1932         /* The record contains more than the number of packets allowed
1933          * in this file format */
1934         switch (file_format) {
1935           case FT_RWAUGMENTED:
1936           case FT_RWAUGROUTING:
1937           case FT_RWAUGSNMPOUT:
1938           case FT_RWAUGWEB:
1939             switch (rec_version) {
1940               case 5:
1941                 *value = (int64_t)UINT32_MAX;
1942                 break;
1943               default:
1944                 *value = MAX_PKTS * PKTS_DIVISOR - 1;
1945                 break;
1946             }
1947             break;
1948           case FT_RWFILTER:
1949           case FT_RWNOTROUTED:
1950           case FT_RWROUTED:
1951           case FT_RWSPLIT:
1952           case FT_RWWWW:
1953             *value = MAX_PKTS * PKTS_DIVISOR - 1;
1954             break;
1955           case FT_FLOWCAP:
1956             *value = MASKARRAY_24;
1957             break;
1958           case FT_RWGENERIC:
1959           case FT_RWIPV6:
1960           case FT_RWIPV6ROUTING:
1961             *value = (int64_t)UINT32_MAX;
1962             break;
1963         }
1964         break;
1965 
1966       case SKSTREAM_ERR_BPP_OVRFLO:
1967         /* The byte-per-packet value is too large to fit into the
1968          * space provided by this file format. */
1969         switch (file_format) {
1970           case FT_RWAUGMENTED:
1971           case FT_RWAUGROUTING:
1972           case FT_RWAUGSNMPOUT:
1973           case FT_RWAUGWEB:
1974             switch (rec_version) {
1975               case 5:
1976                 *value = (int64_t)UINT32_MAX;
1977                 break;
1978               default:
1979                 *value = MASKARRAY_14;
1980                 break;
1981             }
1982             break;
1983           case FT_RWFILTER:
1984           case FT_RWNOTROUTED:
1985           case FT_RWROUTED:
1986           case FT_RWSPLIT:
1987           case FT_RWWWW:
1988             *value = MASKARRAY_14;
1989             break;
1990           case FT_FLOWCAP:
1991           case FT_RWGENERIC:
1992           case FT_RWIPV6:
1993           case FT_RWIPV6ROUTING:
1994             *value = (int64_t)UINT32_MAX;
1995             break;
1996         }
1997         break;
1998 
1999       case SKSTREAM_ERR_SNMP_OVRFLO:
2000         /* The records contains an SNMP value too large to fit into
2001          * the space allocated in this file format. */
2002         *value = 0;
2003         switch (file_format) {
2004           case FT_RWAUGROUTING:
2005           case FT_RWAUGSNMPOUT:
2006           case FT_RWIPV6ROUTING:
2007             *value = UINT16_MAX;
2008             break;
2009           case FT_RWFILTER:
2010           case FT_RWNOTROUTED:
2011           case FT_RWROUTED:
2012             switch (rec_version) {
2013               case 1:
2014               case 2:
2015                 *value = UINT8_MAX;
2016                 break;
2017               default:
2018                 *value = UINT16_MAX;
2019                 break;
2020             }
2021             break;
2022           case FT_RWGENERIC:
2023             switch (rec_version) {
2024               case 0:
2025               case 1:
2026                 *value = UINT8_MAX;
2027                 break;
2028               default:
2029                 *value = UINT16_MAX;
2030                 break;
2031             }
2032             break;
2033           case FT_FLOWCAP:
2034             switch (rec_version) {
2035               case 2:
2036               case 3:
2037               case 4:
2038                 *value = UINT8_MAX;
2039                 break;
2040               default:
2041                 *value = UINT16_MAX;
2042                 break;
2043             }
2044             break;
2045         }
2046         break;
2047 
2048       case SKSTREAM_ERR_SENSORID_OVRFLO:
2049         /* The records contains a SensorID too large to fit into the
2050          * space allocated in this file format. */
2051         *value = UINT16_MAX;
2052         switch (file_format) {
2053           case FT_RWFILTER:
2054             switch (rec_version) {
2055               case 1:
2056                 *value = MASKARRAY_06;
2057                 break;
2058               case 2:
2059                 *value = UINT8_MAX;
2060                 break;
2061             }
2062             break;
2063           case FT_RWGENERIC:
2064             switch (rec_version) {
2065               case 0:
2066               case 1:
2067                 *value = UINT8_MAX;
2068                 break;
2069             }
2070             break;
2071         }
2072         break;
2073 
2074       default:
2075         /* unknown limit */
2076         rv = SKSTREAM_ERR_INVALID_INPUT;
2077         break;
2078     }
2079 
2080   END:
2081     return rv;
2082 }
2083 
2084 
2085 /* Get the next line from a text file */
2086 int
skStreamGetLine(skstream_t * stream,char * out_buffer,size_t buf_size,int * lines_read)2087 skStreamGetLine(
2088     skstream_t         *stream,
2089     char               *out_buffer,
2090     size_t              buf_size,
2091     int                *lines_read)
2092 {
2093     size_t len;
2094     int rv = SKSTREAM_OK;
2095 
2096     assert(stream);
2097 
2098     if ( !stream->is_dirty) {
2099         rv = streamCheckOpen(stream);
2100         if (rv) { goto END; }
2101 
2102         rv = streamCheckAttributes(stream, SK_IO_READ, SK_CONTENT_TEXT);
2103         if (rv) { goto END; }
2104 
2105         rv = streamPrepareText(stream);
2106         if (rv) { goto END; }
2107     }
2108 #ifndef NDEBUG
2109     else {
2110         assert(!stream->is_binary);
2111         assert(stream->content_type == SK_CONTENT_TEXT);
2112         assert(stream->io_mode == SK_IO_READ);
2113         assert(stream->fd != -1);
2114     }
2115 #endif
2116 
2117     assert(out_buffer && buf_size);
2118     out_buffer[0] = '\0';
2119 
2120     /* read from the stream until we get a good line */
2121     for (;;) {
2122         rv = streamIOBufGetLine(stream, out_buffer, buf_size);
2123         if (rv != SKSTREAM_OK) {
2124             if ((rv == SKSTREAM_ERR_LONG_LINE) && lines_read) {
2125                 ++*lines_read;
2126             }
2127             break;
2128         }
2129         if (lines_read) {
2130             ++*lines_read;
2131         }
2132 
2133         /* Terminate line at first comment char */
2134         if (stream->comment_start) {
2135             char *cp = strstr(out_buffer, stream->comment_start);
2136             if (cp) {
2137                 *cp = '\0';
2138             }
2139         }
2140 
2141         /* find first non-space character in the line */
2142         len = strspn(out_buffer, " \t\v\f\r\n");
2143         if (out_buffer[len] == '\0') {
2144             /* line contained whitespace only; ignore */
2145             continue;
2146         }
2147 
2148         /* got a line, break out of loop */
2149         break;
2150     }
2151 
2152   END:
2153     return (stream->last_rv = rv);
2154 }
2155 
2156 
2157 /* return the read/write/append mode */
2158 skstream_mode_t
skStreamGetMode(const skstream_t * stream)2159 skStreamGetMode(
2160     const skstream_t   *stream)
2161 {
2162     assert(stream);
2163     return stream->io_mode;
2164 }
2165 
2166 
2167 /* return the name of pager program */
2168 const char *
skStreamGetPager(const skstream_t * stream)2169 skStreamGetPager(
2170     const skstream_t   *stream)
2171 {
2172     if (stream->is_closed) {
2173         return NULL;
2174     } else if (stream->is_pager_active) {
2175         /* stream is open and pager is in use */
2176         return stream->pager;
2177     } else if (stream->fd == -1) {
2178         /* unopened, return pager we *may* use */
2179         return stream->pager;
2180     } else {
2181         /* stream is open and not using pager */
2182         return NULL;
2183     }
2184 }
2185 
2186 
2187 /* return the name of file associated with the stream */
2188 const char *
skStreamGetPathname(const skstream_t * stream)2189 skStreamGetPathname(
2190     const skstream_t   *stream)
2191 {
2192     assert(stream);
2193     return stream->pathname;
2194 }
2195 
2196 
2197 /* return number of SiLK flow records processed */
2198 uint64_t
skStreamGetRecordCount(const skstream_t * stream)2199 skStreamGetRecordCount(
2200     const skstream_t   *stream)
2201 {
2202     assert(stream);
2203     if (!stream->is_silk_flow) {
2204         return ((uint64_t)(-1));
2205     }
2206     return stream->rec_count;
2207 }
2208 
2209 
2210 sk_file_header_t *
skStreamGetSilkHeader(const skstream_t * stream)2211 skStreamGetSilkHeader(
2212     const skstream_t   *stream)
2213 {
2214     if (!stream->is_silk) {
2215         return NULL;
2216     }
2217     return stream->silk_hdr;
2218 }
2219 
2220 
2221 int
skStreamGetSupportsIPv6(const skstream_t * stream)2222 skStreamGetSupportsIPv6(
2223     const skstream_t   *stream)
2224 {
2225     assert(stream);
2226     return stream->supports_ipv6;
2227 }
2228 
2229 
2230 off_t
skStreamGetUpperBound(skstream_t * stream)2231 skStreamGetUpperBound(
2232     skstream_t         *stream)
2233 {
2234     assert(stream);
2235     assert(stream->fd != -1);
2236 
2237     if (stream->io_mode == SK_IO_READ) {
2238         return 0;
2239     }
2240     if (stream->iobuf) {
2241         return stream->pre_iobuf_pos + skIOBufTotalUpperBound(stream->iobuf);
2242     }
2243     return lseek(stream->fd, 0, SEEK_CUR);
2244 }
2245 
2246 
2247 int
skStreamInitialize(void)2248 skStreamInitialize(
2249     void)
2250 {
2251     const char *env;
2252 
2253     env = getenv(SILK_ICMP_SPORT_HANDLER_ENVAR);
2254     if (NULL != env && (0 == strcasecmp(env, "none"))) {
2255         silk_icmp_nochange = 1;
2256     }
2257 
2258 #ifdef SILK_CLOBBER_ENVAR
2259     env = getenv(SILK_CLOBBER_ENVAR);
2260     if (NULL != env && *env && *env != '0') {
2261         silk_clobber = 1;
2262     }
2263 #endif
2264 
2265     return 0;
2266 }
2267 
2268 
2269 int
skStreamIsSeekable(const skstream_t * stream)2270 skStreamIsSeekable(
2271     const skstream_t   *stream)
2272 {
2273     assert(stream);
2274     return stream->is_seekable;
2275 }
2276 
2277 
2278 int
skStreamIsStdout(const skstream_t * stream)2279 skStreamIsStdout(
2280     const skstream_t   *stream)
2281 {
2282     assert(stream);
2283     return ((SK_IO_WRITE == stream->io_mode)
2284             && (NULL != stream->pathname)
2285             && ((0 == strcmp(stream->pathname, "-"))
2286                 || (0 == strcmp(stream->pathname, "stdout"))));
2287 }
2288 
2289 
2290 int
skStreamLockFile(skstream_t * stream)2291 skStreamLockFile(
2292     skstream_t         *stream)
2293 {
2294     struct flock lock;
2295     int rv;
2296 
2297     lock.l_start = 0;             /* at SOF */
2298     lock.l_whence = SEEK_SET;     /* SOF */
2299     lock.l_len = 0;               /* EOF */
2300 
2301     STREAM_RETURN_IF_NULL(stream);
2302 
2303     rv = streamCheckOpen(stream);
2304     if (rv) { goto END; }
2305 
2306     /* Don't try to lock anything that is not a real file */
2307     if ( !stream->is_seekable) {
2308         goto END;
2309     }
2310 
2311     /* set the lock type and error code if we fail */
2312     if (stream->io_mode == SK_IO_READ) {
2313         lock.l_type = F_RDLCK;
2314         rv = SKSTREAM_ERR_RLOCK;
2315     } else {
2316         lock.l_type = F_WRLCK;
2317         rv = SKSTREAM_ERR_WLOCK;
2318     }
2319 
2320     /* get the lock, waiting if we need to */
2321     if (fcntl(stream->fd, F_SETLKW, &lock) == -1) {
2322         /* error */
2323         stream->errnum = errno;
2324         goto END;
2325     }
2326 
2327     /* success */
2328     rv = SKSTREAM_OK;
2329 
2330   END:
2331     return (stream->last_rv = rv);
2332 }
2333 
2334 
2335 int
skStreamMakeDirectory(skstream_t * stream)2336 skStreamMakeDirectory(
2337     skstream_t         *stream)
2338 {
2339     char dir[PATH_MAX];
2340     int rv;
2341 
2342     STREAM_RETURN_IF_NULL(stream);
2343 
2344     rv = streamCheckUnopened(stream);
2345     if (rv) { goto END; }
2346 
2347     /* Making directory to file only makes sense for writing */
2348     if (stream->io_mode != SK_IO_WRITE) {
2349         rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
2350         goto END;
2351     }
2352 
2353     if (stream->pathname == NULL) {
2354         rv = SKSTREAM_ERR_NOT_BOUND;
2355         goto END;
2356     }
2357 
2358     if (skDirname_r(dir, stream->pathname, sizeof(dir))) {
2359         if ( !skDirExists(dir)) {
2360             rv = skMakeDir(dir);
2361             if (rv) {
2362                 stream->errnum = errno;
2363                 rv = SKSTREAM_ERR_SYS_MKDIR;
2364                 goto END;
2365             }
2366         }
2367     }
2368 
2369   END:
2370     return (stream->last_rv = rv);
2371 }
2372 
2373 
2374 int
skStreamMakeTemp(skstream_t * stream)2375 skStreamMakeTemp(
2376     skstream_t         *stream)
2377 {
2378     int rv;
2379 
2380     STREAM_RETURN_IF_NULL(stream);
2381 
2382     rv = streamCheckUnopened(stream);
2383     if (rv) { goto END; }
2384 
2385     /* Temp files only make sense for writing */
2386     if (stream->io_mode != SK_IO_WRITE) {
2387         rv = SKSTREAM_ERR_UNSUPPORT_IOMODE;
2388         goto END;
2389     }
2390 
2391     if (stream->pathname == NULL) {
2392         rv = SKSTREAM_ERR_NOT_BOUND;
2393         goto END;
2394     }
2395 
2396     /* open file */
2397     stream->fd = mkstemp(stream->pathname);
2398     if (stream->fd == -1) {
2399         rv = SKSTREAM_ERR_SYS_MKSTEMP;
2400         stream->errnum = errno;
2401         goto END;
2402     }
2403 
2404     rv = streamPostOpen(stream);
2405     if (rv) { goto END; }
2406 
2407   END:
2408     return (stream->last_rv = rv);
2409 }
2410 
2411 
2412 int
skStreamOpen(skstream_t * stream)2413 skStreamOpen(
2414     skstream_t         *stream)
2415 {
2416     int rv;
2417 
2418     STREAM_RETURN_IF_NULL(stream);
2419 
2420     rv = streamCheckUnopened(stream);
2421     if (rv) { goto END; }
2422 
2423     if (stream->pathname == NULL) {
2424         rv = SKSTREAM_ERR_NOT_BOUND;
2425         goto END;
2426     }
2427 
2428     switch (stream->io_mode) {
2429       case SK_IO_WRITE:
2430         rv = streamOpenWrite(stream);
2431         if (rv) { goto END; }
2432         break;
2433 
2434       case SK_IO_READ:
2435         rv = streamOpenRead(stream);
2436         if (rv) { goto END; }
2437         break;
2438 
2439       case SK_IO_APPEND:
2440         rv = streamOpenAppend(stream);
2441         if (rv) { goto END; }
2442         break;
2443     }
2444 
2445     rv = streamPostOpen(stream);
2446     if (rv) { goto END; }
2447 
2448   END:
2449     return (stream->last_rv = rv);
2450 }
2451 
2452 
2453 /* convenience function to create and open a SiLK flow file */
2454 int
skStreamOpenSilkFlow(skstream_t ** stream,const char * pathname,skstream_mode_t read_write_append)2455 skStreamOpenSilkFlow(
2456     skstream_t        **stream,
2457     const char         *pathname,
2458     skstream_mode_t     read_write_append)
2459 {
2460     int rv;
2461 
2462     /* Allocate and initialize the stream */
2463     rv = skStreamCreate(stream, read_write_append, SK_CONTENT_SILK_FLOW);
2464     if (rv) { goto END; }
2465 
2466     rv = skStreamBind(*stream, pathname);
2467     if (rv) { goto END; }
2468 
2469     rv = skStreamOpen(*stream);
2470     if (rv) { goto END; }
2471 
2472     switch ((*stream)->io_mode) {
2473       case SK_IO_WRITE:
2474         break;
2475 
2476       case SK_IO_READ:
2477       case SK_IO_APPEND:
2478         rv = skStreamReadSilkHeader(*stream, NULL);
2479         if (rv) {
2480             skStreamClose(*stream);
2481             goto END;
2482         }
2483         break;
2484     }
2485 
2486   END:
2487     if (*stream) {
2488         (*stream)->last_rv = rv;
2489     }
2490     return rv;
2491 }
2492 
2493 
2494 int
skStreamPageOutput(skstream_t * stream,const char * pager)2495 skStreamPageOutput(
2496     skstream_t         *stream,
2497     const char         *pager)
2498 {
2499     int rv;
2500 
2501     STREAM_RETURN_IF_NULL(stream);
2502 
2503     rv = streamCheckModifiable(stream);
2504     if (rv) { goto END; }
2505 
2506     rv = streamCheckAttributes(stream, SK_IO_WRITE, SK_CONTENT_TEXT);
2507     if (rv) { goto END; }
2508 
2509     /* get pager from environment if not passed in */
2510     if (NULL == pager) {
2511         pager = getenv("SILK_PAGER");
2512         if (NULL == pager) {
2513             pager = getenv("PAGER");
2514         }
2515     }
2516 
2517     /* a NULL or an empty string pager means do nothing */
2518     if ((NULL == pager) || ('\0' == pager[0])) {
2519         if (stream->pager) {
2520             free(stream->pager);
2521             stream->pager = NULL;
2522         }
2523         goto END;
2524     }
2525 
2526     if (stream->pager) {
2527         free(stream->pager);
2528     }
2529     stream->pager = strdup(pager);
2530     if (stream->pager == NULL) {
2531         rv = SKSTREAM_ERR_ALLOC;
2532         goto END;
2533     }
2534 
2535     /* if the stream is open, go ahead and invoke the pager now */
2536     if (stream->fd != -1) {
2537         rv = streamPrepareText(stream);
2538         if (rv) { goto END; }
2539     }
2540 
2541   END:
2542     return (stream->last_rv = rv);
2543 }
2544 
2545 
2546 #if !defined(skStreamPrint)
2547 int
skStreamPrint(skstream_t * stream,const char * format,...)2548 skStreamPrint(
2549     skstream_t         *stream,
2550     const char         *format,
2551     ...)
2552 {
2553     int rv = SKSTREAM_OK;
2554     va_list args;
2555 
2556     assert(stream);
2557 
2558     va_start(args, format);
2559 
2560     if ( !stream->fp) {
2561         rv = streamCheckOpen(stream);
2562         if (rv) { goto END; }
2563 
2564         rv = streamCheckAttributes(stream, (SK_IO_WRITE | SK_IO_APPEND),
2565                                    SK_CONTENT_TEXT);
2566         if (rv) { goto END; }
2567 
2568         rv = streamPrepareText(stream);
2569         if (rv) { goto END; }
2570     }
2571 #ifndef NDEBUG
2572     else {
2573         assert(!stream->is_binary);
2574         assert(stream->content_type == SK_CONTENT_TEXT);
2575         assert(stream->io_mode==SK_IO_WRITE || stream->io_mode==SK_IO_APPEND);
2576         assert(stream->fd != -1);
2577     }
2578 #endif  /* NDEBUG */
2579 
2580     if (vfprintf(stream->fp, format, args) == -1) {
2581         rv = SKSTREAM_ERR_WRITE;
2582         stream->errnum = errno;
2583     }
2584 
2585   END:
2586     va_end(args);
2587     return (stream->last_rv = rv);
2588 }
2589 #endif /* !defined(skStreamPrint) */
2590 
2591 
2592 ssize_t
skStreamRead(skstream_t * stream,void * buf,size_t count)2593 skStreamRead(
2594     skstream_t         *stream,
2595     void               *buf,
2596     size_t              count)
2597 {
2598     ssize_t saw;
2599 
2600     assert(stream);
2601     assert(stream->io_mode == SK_IO_READ || stream->io_mode == SK_IO_APPEND);
2602     assert(stream->fd != -1);
2603 
2604     if (stream->iobuf) {
2605         saw = skIOBufRead(stream->iobuf, buf, count);
2606         if (saw >= 0) {
2607             return (stream->last_rv = saw);
2608         }
2609         if (stream->is_iobuf_error) {
2610             stream->is_iobuf_error = 0;
2611         } else {
2612             stream->err_info = SKSTREAM_ERR_IOBUF;
2613         }
2614         return (stream->last_rv = saw);
2615     }
2616     if (buf == NULL) {
2617         return (stream->last_rv = streamReadNullBuffer(stream, count));
2618     }
2619 #if SK_ENABLE_ZLIB
2620     if (stream->gz != NULL) {
2621         saw = streamGZRead(stream, buf, count);
2622         if (saw == -1) {
2623             stream->is_iobuf_error = 0;
2624         }
2625         return (stream->last_rv = saw);
2626     }
2627 #endif
2628 
2629     saw = skreadn(stream->fd, buf, count);
2630     if (saw == -1) {
2631         stream->errnum = errno;
2632         stream->err_info = SKSTREAM_ERR_READ;
2633     }
2634     return (stream->last_rv = saw);
2635 }
2636 
2637 
2638 void *
skStreamReadToEndOfFile(skstream_t * stream,ssize_t * count)2639 skStreamReadToEndOfFile(
2640     skstream_t         *stream,
2641     ssize_t            *count)
2642 {
2643 #define READTOEND_BUFSIZE 1024
2644 
2645     uint8_t *buf = NULL;
2646     uint8_t *bp;
2647     ssize_t saw;
2648     ssize_t total = 0;
2649     size_t bufsize = 0;
2650 
2651     for (;;) {
2652         if (bufsize < 4 * READTOEND_BUFSIZE) {
2653             bufsize += READTOEND_BUFSIZE;
2654         } else {
2655             bufsize += bufsize >> 1;
2656         }
2657         bp = (uint8_t*)realloc(buf, bufsize);
2658         if (NULL == bp) {
2659             stream->errnum = errno;
2660             stream->err_info = SKSTREAM_ERR_ALLOC;
2661             stream->last_rv = stream->err_info;
2662             break;
2663         }
2664         buf = bp;
2665         bp += total;
2666 
2667         saw = skStreamRead(stream, bp, (bufsize - total));
2668         if (-1 == saw) {
2669             stream->last_rv = saw;
2670             break;
2671         }
2672 
2673         total += saw;
2674         if (saw < (ssize_t)(bufsize - total)) {
2675             *count = total;
2676             buf[total] = '\0';
2677             return buf;
2678         }
2679     }
2680 
2681     /* only get here on error */
2682     if (buf) {
2683         free(buf);
2684     }
2685     return NULL;
2686 }
2687 
2688 
2689 int
skStreamReadRecord(skstream_t * stream,rwGenericRec_V5 * rwrec)2690 skStreamReadRecord(
2691     skstream_t         *stream,
2692     rwGenericRec_V5    *rwrec)
2693 {
2694 #ifndef SK_HAVE_ALIGNED_ACCESS_REQUIRED
2695     uint8_t ar[SK_MAX_RECORD_SIZE];
2696 #else
2697     /* force 'ar' to be aligned on an 8byte boundary, since we treat
2698      * it as an rwRec and need to access the 64bit sTime. */
2699     union force_align_un {
2700         uint8_t  fa_ar[SK_MAX_RECORD_SIZE];
2701         uint64_t fa_u64;
2702     } force_align;
2703     uint8_t *ar = force_align.fa_ar;
2704 #endif  /* SK_HAVE_ALIGNED_ACCESS_REQUIRED */
2705 
2706     ssize_t saw;
2707     int rv = SKSTREAM_OK;
2708 
2709     if (stream->is_eof) {
2710         rv = SKSTREAM_ERR_EOF;
2711         goto END;
2712     }
2713 
2714     assert(stream);
2715     assert(stream->io_mode == SK_IO_READ || stream->io_mode == SK_IO_APPEND);
2716     assert(stream->content_type != SK_CONTENT_TEXT);
2717     assert(stream->is_binary);
2718     assert(stream->fd != -1);
2719 
2720     /* label is used by the IPv6 policy to ignore a record */
2721   NEXT_RECORD:
2722 
2723     /* read the packed record as a byte array */
2724     if (stream->iobuf) {
2725         /* avoid function call for the common case */
2726         saw = skIOBufRead(stream->iobuf, ar, stream->recLen);
2727     } else {
2728         saw = skStreamRead(stream, ar, stream->recLen);
2729     }
2730     if (saw != (ssize_t)stream->recLen) {
2731         /* EOF or error */
2732         stream->is_eof = 1;
2733 
2734         if (saw == 0) {
2735             /* 0 means clean record boundary; simple EOF */
2736             rv = SKSTREAM_ERR_EOF;
2737         } else if (saw == -1) {
2738             /* error */
2739             if (stream->iobuf) {
2740                 rv = SKSTREAM_ERR_IOBUF;
2741             } else {
2742                 rv = -1;
2743             }
2744         } else {
2745             /* short read */
2746             stream->errobj.num = saw;
2747             rv = SKSTREAM_ERR_READ_SHORT;
2748         }
2749         goto END;
2750     }
2751 
2752     /* clear the caller's record */
2753     RWREC_CLEAR(rwrec);
2754 
2755     /* convert the byte array to an rwRec in native byte order */
2756 #if SK_ENABLE_IPV6
2757     stream->rwUnpackFn(stream, rwrec, ar);
2758 #else
2759     if (stream->rwUnpackFn(stream, rwrec, ar) == SKSTREAM_ERR_UNSUPPORT_IPV6) {
2760         goto NEXT_RECORD;
2761     }
2762 #endif
2763 
2764     /* Handle incorrectly encoded ICMP Type/Code unless the
2765      * SILK_ICMP_SPORT_HANDLER environment variable is set to none. */
2766     if (rwRecIsICMP(rwrec)
2767         && rwRecGetSPort(rwrec) != 0
2768         && rwRecGetDPort(rwrec) == 0
2769         && !silk_icmp_nochange)
2770     {
2771         /*
2772          *  There are two ways for the ICMP Type/Code to appear in
2773          *  "sPort" instead of in "dPort".
2774          *
2775          *  (1) The flow was an IPFIX bi-flow record read prior to
2776          *  SiLK-3.4.0 where the sPort and dPort of the second record
2777          *  were reversed when they should not have been.  Here, the
2778          *  sPort contains ((type<<8)|code).
2779          *
2780          *  (2) The flow was a NetFlowV5 record read from a buggy
2781          *  Cisco router and read prior to SiLK-0.8.0.  Here, the
2782          *  sPort contains ((code<<8)|type).
2783          *
2784          *  The following assumes buggy ICMP flow records were created
2785          *  from IPFIX sources unless they were created prior to SiLK
2786          *  1.0 and appear in certain file formats more closely
2787          *  associated with NetFlowV5.
2788          *
2789          *  Prior to SiLK-3.4.0, the buggy ICMP record would propagate
2790          *  through the tool suite and be written to binary output
2791          *  files.  As of 3.4.0, we modify the record on read.
2792          */
2793         if (skHeaderGetFileVersion(stream->silk_hdr) >= 16) {
2794             /* File created by SiLK 1.0 or later; most likely the
2795              * buggy value originated from an IPFIX source. */
2796             rwRecSetDPort(rwrec, rwRecGetSPort(rwrec));
2797         } else {
2798             switch(skHeaderGetFileFormat(stream->silk_hdr)) {
2799               case FT_RWFILTER:
2800               case FT_RWNOTROUTED:
2801               case FT_RWROUTED:
2802               case FT_RWSPLIT:
2803               case FT_RWWWW:
2804                 /* Most likely from a PDU source */
2805                 rwRecSetDPort(rwrec, BSWAP16(rwRecGetSPort(rwrec)));
2806                 break;
2807               default:
2808                 /* Assume it is from an IPFIX source */
2809                 rwRecSetDPort(rwrec, rwRecGetSPort(rwrec));
2810                 break;
2811             }
2812         }
2813         rwRecSetSPort(rwrec, 0);
2814     }
2815 
2816     /* Write to the copy-input stream */
2817     if (stream->copyInputFD) {
2818         skStreamWriteRecord(stream->copyInputFD, rwrec);
2819     }
2820 
2821     /* got a record */
2822     ++stream->rec_count;
2823 
2824 #if SK_ENABLE_IPV6
2825     switch (stream->v6policy) {
2826       case SK_IPV6POLICY_MIX:
2827         break;
2828 
2829       case SK_IPV6POLICY_IGNORE:
2830         if (rwRecIsIPv6(rwrec)) {
2831             goto NEXT_RECORD;
2832         }
2833         break;
2834 
2835       case SK_IPV6POLICY_ASV4:
2836         if (rwRecIsIPv6(rwrec)) {
2837             if (rwRecConvertToIPv4(rwrec)) {
2838                 goto NEXT_RECORD;
2839             }
2840         }
2841         break;
2842 
2843       case SK_IPV6POLICY_FORCE:
2844         if (!rwRecIsIPv6(rwrec)) {
2845             rwRecConvertToIPv6(rwrec);
2846         }
2847         break;
2848 
2849       case SK_IPV6POLICY_ONLY:
2850         if (!rwRecIsIPv6(rwrec)) {
2851             goto NEXT_RECORD;
2852         }
2853         break;
2854     }
2855 #endif /* SK_ENABLE_IPV6 */
2856 
2857   END:
2858     return (stream->last_rv = rv);
2859 }
2860 
2861 
2862 int
skStreamReadSilkHeader(skstream_t * stream,sk_file_header_t ** hdr)2863 skStreamReadSilkHeader(
2864     skstream_t         *stream,
2865     sk_file_header_t  **hdr)
2866 {
2867     int rv = SKSTREAM_OK;
2868 
2869     STREAM_RETURN_IF_NULL(stream);
2870 
2871     if (!stream->is_dirty) {
2872         rv = skStreamReadSilkHeaderStart(stream);
2873         if (rv) { goto END; }
2874     } else if (!stream->is_silk) {
2875         rv = SKSTREAM_ERR_UNSUPPORT_CONTENT;
2876         goto END;
2877     }
2878 
2879     if (hdr) {
2880         *hdr = stream->silk_hdr;
2881     }
2882 
2883     /* only read the header one time */
2884     if (stream->have_hdr) {
2885         goto END;
2886     }
2887 
2888     rv = skHeaderReadEntries(stream, stream->silk_hdr);
2889     if (rv) { goto END; }
2890 
2891     skHeaderSetLock(stream->silk_hdr, SKHDR_LOCK_FIXED);
2892 
2893     if (stream->is_silk_flow) {
2894         /* swap bytes? */
2895         stream->swapFlag = !skHeaderIsNativeByteOrder(stream->silk_hdr);
2896 
2897         /* Cache values from the packedfile header */
2898         streamCacheHeader(stream);
2899 
2900         /* Set pointers to the PackFn and UnpackFn functions for this
2901          * file format. */
2902         rv = streamPrepareFormat(stream);
2903         if (rv) { goto END; }
2904 
2905         assert(stream->recLen > 0);
2906         assert(stream->recLen <= SK_MAX_RECORD_SIZE);
2907     }
2908 
2909     /* Move to end of file is stream was open for append */
2910     if (stream->io_mode == SK_IO_APPEND) {
2911         if (-1 == lseek(stream->fd, 0, SEEK_END)) {
2912             stream->errnum = errno;
2913             rv = SKSTREAM_ERR_SYS_LSEEK;
2914             goto END;
2915         }
2916     }
2917 
2918     /* we have the complete header */
2919     stream->have_hdr = 1;
2920 
2921     rv = streamIOBufCreate(stream);
2922     if (rv) { goto END; }
2923 
2924   END:
2925     return (stream->last_rv = rv);
2926 }
2927 
2928 
2929 int
skStreamReadSilkHeaderStart(skstream_t * stream)2930 skStreamReadSilkHeaderStart(
2931     skstream_t         *stream)
2932 {
2933     int rv;
2934     int flows_required;
2935 
2936     STREAM_RETURN_IF_NULL(stream);
2937 
2938     rv = streamCheckOpen(stream);
2939     if (rv) { goto END; }
2940 
2941     if (stream->is_dirty) {
2942         rv = SKSTREAM_ERR_PREV_DATA;
2943         goto END;
2944     }
2945 
2946     rv = streamCheckAttributes(stream, (SK_IO_READ | SK_IO_APPEND),
2947                                (SK_CONTENT_SILK | SK_CONTENT_SILK_FLOW));
2948     if (rv) { goto END; }
2949 
2950     stream->is_dirty = 1;
2951 
2952     rv = skHeaderReadStart(stream, stream->silk_hdr);
2953     if (rv) { goto END; }
2954 
2955     /* check whether this stream contains flow data */
2956     flows_required = stream->is_silk_flow;
2957     STREAM_SET_IS_SILK_FLOW(stream);
2958     if (flows_required && !stream->is_silk_flow) {
2959         rv = SKSTREAM_ERR_REQUIRE_SILK_FLOW;
2960         goto END;
2961     }
2962 
2963     skHeaderSetLock(stream->silk_hdr, SKHDR_LOCK_ENTRY_OK);
2964 
2965   END:
2966     return (stream->last_rv = rv);
2967 }
2968 
2969 
2970 int
skStreamSetCommentStart(skstream_t * stream,const char * comment_start)2971 skStreamSetCommentStart(
2972     skstream_t         *stream,
2973     const char         *comment_start)
2974 {
2975     int rv;
2976 
2977     STREAM_RETURN_IF_NULL(stream);
2978 
2979     rv = streamCheckAttributes(stream, SK_IO_READ, SK_CONTENT_TEXT);
2980     if (rv) { goto END; }
2981 
2982     /* clear existing value */
2983     if (stream->comment_start) {
2984         free(stream->comment_start);
2985     }
2986 
2987     /* set to new value */
2988     if (comment_start == NULL) {
2989         stream->comment_start = NULL;
2990     } else {
2991         stream->comment_start = strdup(comment_start);
2992         if (stream->comment_start == NULL) {
2993             rv = SKSTREAM_ERR_ALLOC;
2994             goto END;
2995         }
2996     }
2997 
2998   END:
2999     return (stream->last_rv = rv);
3000 }
3001 
3002 
3003 int
skStreamSetCopyInput(skstream_t * read_stream,skstream_t * write_stream)3004 skStreamSetCopyInput(
3005     skstream_t         *read_stream,
3006     skstream_t         *write_stream)
3007 {
3008     assert(read_stream);
3009     assert(read_stream->is_silk_flow);
3010 
3011     if (write_stream == NULL) {
3012         return (read_stream->last_rv = SKSTREAM_ERR_NULL_ARGUMENT);
3013     }
3014     if (read_stream->copyInputFD) {
3015         return (read_stream->last_rv = SKSTREAM_ERR_PREV_COPYINPUT);
3016     }
3017     if (read_stream->rec_count) {
3018         return (read_stream->last_rv = SKSTREAM_ERR_PREV_DATA);
3019     }
3020 
3021     read_stream->copyInputFD = write_stream;
3022     return (read_stream->last_rv = SKSTREAM_OK);
3023 }
3024 
3025 
3026 int
skStreamSetIPv6Policy(skstream_t * stream,sk_ipv6policy_t policy)3027 skStreamSetIPv6Policy(
3028     skstream_t         *stream,
3029     sk_ipv6policy_t     policy)
3030 {
3031     int rv;
3032 
3033     STREAM_RETURN_IF_NULL(stream);
3034 
3035     rv = streamCheckAttributes(stream, 0xFF,
3036                                (SK_CONTENT_SILK | SK_CONTENT_SILK_FLOW));
3037     if (rv) { goto END; }
3038 
3039     stream->v6policy = policy;
3040 
3041   END:
3042     return (stream->last_rv = rv);
3043 }
3044 
3045 
3046 int
skStreamSetUnbuffered(skstream_t * stream)3047 skStreamSetUnbuffered(
3048     skstream_t         *stream)
3049 {
3050     int rv;
3051 
3052     STREAM_RETURN_IF_NULL(stream);
3053 
3054     rv = streamCheckUnopened(stream);
3055     if (rv) { goto END; }
3056 
3057     stream->is_unbuffered = 1;
3058 
3059   END:
3060     return (stream->last_rv = rv);
3061 }
3062 
3063 
3064 int
skStreamSkipRecords(skstream_t * stream,size_t skip_count,size_t * records_skipped)3065 skStreamSkipRecords(
3066     skstream_t         *stream,
3067     size_t              skip_count,
3068     size_t             *records_skipped)
3069 {
3070     size_t local_records_skipped;
3071     ssize_t saw;
3072     ssize_t tmp;
3073     int rv;
3074 
3075     STREAM_RETURN_IF_NULL(stream);
3076 
3077     rv = streamCheckOpen(stream);
3078     if (rv) { goto END; }
3079 
3080     if (stream->is_eof) {
3081         rv = SKSTREAM_ERR_EOF;
3082         goto END;
3083     }
3084 
3085     if (NULL == records_skipped) {
3086         records_skipped = &local_records_skipped;
3087     }
3088     *records_skipped = 0;
3089 
3090     /* when some other stream is expecting to see the records, we need
3091      * to read each record individually */
3092     if (stream->copyInputFD) {
3093         size_t skipped = skip_count;
3094         rwRec rec;
3095 
3096         while ((skipped > 0)
3097                && ((rv = skStreamReadRecord(stream, &rec)) == SKSTREAM_OK))
3098         {
3099             --skipped;
3100         }
3101         *records_skipped = skip_count - skipped;
3102         goto END;
3103     }
3104 
3105     if (!stream->iobuf) {
3106         rv = streamSkipRecordsNonIOBuf(stream, skip_count, records_skipped);
3107         goto END;
3108     }
3109 
3110     while (skip_count > 0) {
3111         if (skip_count > (size_t)SSIZE_MAX / stream->recLen) {
3112             tmp = SSIZE_MAX;
3113         } else {
3114             tmp = stream->recLen * skip_count;
3115         }
3116 
3117         /* read the bytes and check for error or short reads */
3118         saw = skIOBufRead(stream->iobuf, NULL, tmp);
3119         if (saw != tmp) {
3120             /* Either error or an incomplete read--assume end of file */
3121             stream->is_eof = 1;
3122             if (saw == -1) {
3123                 /* error */
3124                 rv = SKSTREAM_ERR_IOBUF;
3125                 goto END;
3126             }
3127         }
3128 
3129         /* compute the number of records we actually read, update
3130          * counters, and check for any partially read records. */
3131         tmp = (saw / stream->recLen);
3132         stream->rec_count += tmp;
3133         skip_count -= tmp;
3134         saw -= tmp * stream->recLen;
3135         *records_skipped += tmp;
3136 
3137         if (saw != 0) {
3138             stream->errobj.num = saw;
3139             rv = SKSTREAM_ERR_READ_SHORT;
3140             goto END;
3141         }
3142         if (stream->is_eof) {
3143             rv = SKSTREAM_ERR_EOF;
3144             goto END;
3145         }
3146     }
3147 
3148     rv = SKSTREAM_OK;
3149 
3150   END:
3151     return (stream->last_rv = rv);
3152 }
3153 
3154 
3155 void
skStreamTeardown(void)3156 skStreamTeardown(
3157     void)
3158 {
3159     /* nothing to do */
3160 }
3161 
3162 
3163 off_t
skStreamTell(skstream_t * stream)3164 skStreamTell(
3165     skstream_t         *stream)
3166 {
3167     off_t pos;
3168     int rv;
3169 
3170     STREAM_RETURN_IF_NULL(stream);
3171 
3172     rv = streamCheckOpen(stream);
3173     if (rv) {
3174         stream->err_info = rv;
3175         return (stream->last_rv = -1);
3176     }
3177 
3178     pos = lseek(stream->fd, 0, SEEK_CUR);
3179     if (pos == (off_t)-1) {
3180         stream->errnum = errno;
3181         stream->err_info = SKSTREAM_ERR_SYS_LSEEK;
3182     }
3183 
3184     return (stream->last_rv = pos);
3185 }
3186 
3187 
3188 int
skStreamTruncate(skstream_t * stream,off_t length)3189 skStreamTruncate(
3190     skstream_t         *stream,
3191     off_t               length)
3192 {
3193     int rv;
3194 
3195     STREAM_RETURN_IF_NULL(stream);
3196 
3197     rv = streamCheckOpen(stream);
3198     if (rv) { goto END; }
3199 
3200     rv = streamCheckAttributes(stream, (SK_IO_WRITE | SK_IO_APPEND),
3201                                (SK_CONTENT_SILK | SK_CONTENT_SILK_FLOW
3202                                 | SK_CONTENT_OTHERBINARY));
3203     if (rv) { goto END; }
3204 
3205     if ( !stream->is_seekable) {
3206         rv = SKSTREAM_ERR_NOT_SEEKABLE;
3207         goto END;
3208     }
3209 
3210     /* attempt to truncate the file even if flush fails */
3211     rv = skStreamFlush(stream);
3212     if (ftruncate(stream->fd, length) == -1) {
3213         stream->errnum = errno;
3214         rv = SKSTREAM_ERR_SYS_FTRUNCATE;
3215     }
3216 
3217   END:
3218     return (stream->last_rv = rv);
3219 }
3220 
3221 
3222 int
skStreamUnbind(skstream_t * stream)3223 skStreamUnbind(
3224     skstream_t         *stream)
3225 {
3226     int rv = SKSTREAM_OK;
3227 
3228     if (!stream) {
3229         return rv;
3230     }
3231     if (stream->fd != -1) {
3232         rv = skStreamClose(stream);
3233     }
3234 
3235     if (stream->comment_start) {
3236         free(stream->comment_start);
3237         stream->comment_start = NULL;
3238     }
3239     if (stream->pager) {
3240         free(stream->pager);
3241         stream->pager = NULL;
3242     }
3243     if (stream->pathname) {
3244         free(stream->pathname);
3245         stream->pathname = NULL;
3246     }
3247 
3248     return (stream->last_rv = rv);
3249 }
3250 
3251 
3252 ssize_t
skStreamWrite(skstream_t * stream,const void * buf,size_t count)3253 skStreamWrite(
3254     skstream_t         *stream,
3255     const void         *buf,
3256     size_t              count)
3257 {
3258     ssize_t written;
3259 
3260     assert(stream);
3261     assert(stream->io_mode == SK_IO_WRITE || stream->io_mode == SK_IO_APPEND);
3262     assert(stream->is_binary);
3263     assert(stream->fd != -1);
3264     assert(buf);
3265 
3266     if (stream->iobuf) {
3267         written = skIOBufWrite(stream->iobuf, buf, count);
3268         if (written >= 0) {
3269             return (stream->last_rv = written);
3270         }
3271         if (stream->is_iobuf_error) {
3272             stream->is_iobuf_error = 0;
3273         } else {
3274             stream->err_info = SKSTREAM_ERR_IOBUF;
3275         }
3276         return (stream->last_rv = written);
3277     }
3278 #if SK_ENABLE_ZLIB
3279     if (stream->gz != NULL) {
3280         written = streamGZWrite(stream, buf, count);
3281         if (written == -1) {
3282             stream->is_iobuf_error = 0;
3283         }
3284         return (stream->last_rv = written);
3285     }
3286 #endif  /* SK_ENABLE_ZLIB */
3287 
3288     written = skwriten(stream->fd, buf, count);
3289     if (written == -1) {
3290         stream->errnum = errno;
3291         stream->err_info = SKSTREAM_ERR_WRITE;
3292     }
3293     return (stream->last_rv = written);
3294 }
3295 
3296 
3297 int
skStreamWriteRecord(skstream_t * stream,const rwGenericRec_V5 * rwrec)3298 skStreamWriteRecord(
3299     skstream_t             *stream,
3300     const rwGenericRec_V5  *rwrec)
3301 {
3302 #ifndef SK_HAVE_ALIGNED_ACCESS_REQUIRED
3303     uint8_t ar[SK_MAX_RECORD_SIZE];
3304 #else
3305     /* force 'ar' to be aligned on an 8byte boundary, since we treat
3306      * it as an rwRec and need to access the 64bit sTime. */
3307     union force_align_un {
3308         uint8_t  fa_ar[SK_MAX_RECORD_SIZE];
3309         uint64_t fa_u64;
3310     } force_align;
3311     uint8_t *ar = force_align.fa_ar;
3312 #endif  /* SK_HAVE_ALIGNED_ACCESS_REQUIRED */
3313 
3314 #if SK_ENABLE_IPV6
3315     rwRec rec_copy;
3316 #endif
3317 
3318     int rv;
3319     const rwRec *rp = rwrec;
3320 
3321     assert(stream);
3322     assert(stream->io_mode == SK_IO_WRITE || stream->io_mode == SK_IO_APPEND);
3323     assert(stream->is_silk_flow);
3324     assert(stream->fd != -1);
3325 
3326     if (!stream->is_dirty) {
3327         rv = skStreamWriteSilkHeader(stream);
3328         if (rv) {
3329             return (stream->last_rv = rv);
3330         }
3331     }
3332 
3333 #if SK_ENABLE_IPV6
3334     if (rwRecIsIPv6(rp)) {
3335         switch (stream->v6policy) {
3336           case SK_IPV6POLICY_MIX:
3337           case SK_IPV6POLICY_FORCE:
3338           case SK_IPV6POLICY_ONLY:
3339             /* flow already IPv6; verify that file format supports it */
3340             if (stream->supports_ipv6 == 0) {
3341                 return (stream->last_rv = SKSTREAM_ERR_UNSUPPORT_IPV6);
3342             }
3343             break;
3344 
3345           case SK_IPV6POLICY_IGNORE:
3346             /* we're ignoring IPv6, return */
3347             return (stream->last_rv = SKSTREAM_OK);
3348 
3349           case SK_IPV6POLICY_ASV4:
3350             /* attempt to convert IPv6 flow to v4 */
3351             memcpy(&rec_copy, rp, sizeof(rwRec));
3352             if (rwRecConvertToIPv4(&rec_copy)) {
3353                 return (stream->last_rv = SKSTREAM_OK);
3354             }
3355             rp = &rec_copy;
3356             break;
3357         }
3358     } else {
3359         /* flow is IPv4 */
3360         switch (stream->v6policy) {
3361           case SK_IPV6POLICY_MIX:
3362           case SK_IPV6POLICY_IGNORE:
3363           case SK_IPV6POLICY_ASV4:
3364             /* flow is already IPv4; all file formats supported */
3365             break;
3366 
3367           case SK_IPV6POLICY_ONLY:
3368             /* we're ignoring IPv4 flows; return */
3369             return (stream->last_rv = SKSTREAM_OK);
3370 
3371           case SK_IPV6POLICY_FORCE:
3372             /* must convert flow to IPv6, but first verify that file
3373              * format supports IPv6 */
3374             if (stream->supports_ipv6 == 0) {
3375                 return (stream->last_rv = SKSTREAM_ERR_UNSUPPORT_IPV6);
3376             }
3377             /* convert */
3378             memcpy(&rec_copy, rp, sizeof(rwRec));
3379             rp = &rec_copy;
3380             rwRecConvertToIPv6(&rec_copy);
3381             break;
3382         }
3383     }
3384 #endif /* SK_ENABLE_IPV6 */
3385 
3386     /* Convert the record into a byte array in the appropriate byte order */
3387     rv = stream->rwPackFn(stream, rp, ar);
3388     if (rv != SKSTREAM_OK) {
3389         stream->errobj.rec = rwrec;
3390         return (stream->last_rv = rv);
3391     }
3392 
3393     /* write the record */
3394     if (stream->iobuf) {
3395         if (skIOBufWrite(stream->iobuf, ar, stream->recLen)
3396             == (ssize_t)stream->recLen)
3397         {
3398             ++stream->rec_count;
3399             return (stream->last_rv = SKSTREAM_OK);
3400         } else if (stream->is_iobuf_error) {
3401             stream->is_iobuf_error = 0;
3402         } else {
3403             stream->err_info = SKSTREAM_ERR_IOBUF;
3404         }
3405     } else {
3406         if (skStreamWrite(stream, ar, stream->recLen)
3407             == (ssize_t)stream->recLen)
3408         {
3409             ++stream->rec_count;
3410             return (stream->last_rv = SKSTREAM_OK);
3411         }
3412     }
3413 
3414     return (stream->last_rv = -1);
3415 }
3416 
3417 
3418 int
skStreamWriteSilkHeader(skstream_t * stream)3419 skStreamWriteSilkHeader(
3420     skstream_t         *stream)
3421 {
3422     int rv;
3423     int flows_required;
3424 
3425     STREAM_RETURN_IF_NULL(stream);
3426 
3427     rv = streamCheckOpen(stream);
3428     if (rv) { goto END; }
3429 
3430     if (stream->is_dirty) {
3431         rv = SKSTREAM_ERR_PREV_DATA;
3432         goto END;
3433     }
3434 
3435     rv = streamCheckAttributes(stream, SK_IO_WRITE,
3436                                (SK_CONTENT_SILK | SK_CONTENT_SILK_FLOW));
3437     if (rv) { goto END; }
3438 
3439     if (skHeaderGetFileFormat(stream->silk_hdr) == UINT8_MAX) {
3440         rv = skHeaderSetFileFormat(stream->silk_hdr, DEFAULT_FILE_FORMAT);
3441         if (rv) { goto END; }
3442     }
3443 
3444     /* unless a specific compression method was specified, do not use
3445      * compression when writing to a non-seekable destination */
3446     switch (skHeaderGetCompressionMethod(stream->silk_hdr)) {
3447       case SK_COMPMETHOD_DEFAULT:
3448         if (!stream->is_seekable && !stream->is_mpi) {
3449             rv = skHeaderSetCompressionMethod(stream->silk_hdr,
3450                                               SK_COMPMETHOD_NONE);
3451         } else {
3452             rv = skHeaderSetCompressionMethod(stream->silk_hdr,
3453                                               skCompMethodGetDefault());
3454         }
3455         if (rv) { goto END; }
3456         break;
3457       case SK_COMPMETHOD_BEST:
3458         if (!stream->is_seekable && !stream->is_mpi) {
3459             rv = skHeaderSetCompressionMethod(stream->silk_hdr,
3460                                               SK_COMPMETHOD_NONE);
3461         } else {
3462             rv = skHeaderSetCompressionMethod(stream->silk_hdr,
3463                                               skCompMethodGetBest());
3464         }
3465         if (rv) { goto END; }
3466         break;
3467       default:
3468         break;
3469     }
3470 
3471     /* check whether this stream contains flow data */
3472     flows_required = stream->is_silk_flow;
3473     STREAM_SET_IS_SILK_FLOW(stream);
3474     if (flows_required && !stream->is_silk_flow) {
3475         rv = SKSTREAM_ERR_REQUIRE_SILK_FLOW;
3476         goto END;
3477     }
3478 
3479     if (stream->is_silk_flow) {
3480         /* handle the case where a specific record type has not yet
3481          * been specified. */
3482         if (skHeaderGetFileFormat(stream->silk_hdr) == UINT8_MAX) {
3483             rv = skHeaderSetFileFormat(stream->silk_hdr, DEFAULT_FILE_FORMAT);
3484             if (rv) { goto END; }
3485         }
3486 
3487         /* Set the file version if it is "ANY", and set pointers to
3488          * the PackFn and UnpackFn functions for this file format. */
3489         rv = streamPrepareFormat(stream);
3490         if (rv) { goto END; }
3491 
3492         assert(stream->recLen > 0);
3493         assert(stream->recLen <= SK_MAX_RECORD_SIZE);
3494 
3495         /* Set the swapFlag */
3496         stream->swapFlag = !skHeaderIsNativeByteOrder(stream->silk_hdr);
3497 
3498         /* Cache values from the packedfile header */
3499         streamCacheHeader(stream);
3500     }
3501 
3502     stream->is_dirty = 1;
3503     skHeaderSetLock(stream->silk_hdr, SKHDR_LOCK_FIXED);
3504 
3505     if ( !stream->is_mpi) {
3506         rv = skHeaderWrite(stream, stream->silk_hdr);
3507         if (rv) { goto END; }
3508     }
3509 
3510     rv = streamIOBufCreate(stream);
3511     if (rv) { goto END; }
3512 
3513   END:
3514     return (stream->last_rv = rv);
3515 }
3516 
3517 
3518 /*
3519  *    Though not functions on skstream_t, these are used heavily by
3520  *    the code. Define them here and hope the compiler inlines them.
3521  */
3522 
3523 /* Read count bytes from a file descriptor into buf */
3524 ssize_t
skreadn(int fd,void * buf,size_t count)3525 skreadn(
3526     int                 fd,
3527     void               *buf,
3528     size_t              count)
3529 {
3530     ssize_t rv;
3531     size_t  left = count;
3532 
3533     while (left) {
3534         rv = read(fd, buf, ((left < INT32_MAX) ? left : INT32_MAX));
3535         if (rv == -1) {
3536             if (errno == EINTR) {
3537                 continue;
3538             }
3539             return -1;
3540         }
3541         if (rv == 0) {
3542             break;
3543         }
3544         left -= rv;
3545         buf = ((uint8_t *)buf) + rv;
3546     }
3547     return (count - left);
3548 }
3549 
3550 /* Read count bytes from buf to a file descriptor */
3551 ssize_t
skwriten(int fd,const void * buf,size_t count)3552 skwriten(
3553     int                 fd,
3554     const void         *buf,
3555     size_t              count)
3556 {
3557     ssize_t rv;
3558     size_t  left = count;
3559 
3560     while (left) {
3561         rv = write(fd, buf, ((left < INT32_MAX) ? left : INT32_MAX));
3562         if (rv == -1) {
3563             if (errno == EINTR) {
3564                 continue;
3565             }
3566             return -1;
3567         }
3568         if (rv == 0) {
3569             break;
3570         }
3571         left -= rv;
3572         buf = ((const uint8_t *)buf) + rv;
3573     }
3574     return (count - left);
3575 }
3576 
3577 
3578 /*
3579 ** Local Variables:
3580 ** mode:c
3581 ** indent-tabs-mode:nil
3582 ** c-basic-offset:4
3583 ** End:
3584 */
3585