1 /* @(#) implementation of packet-io functions for udpxy
2  *
3  * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
4  *
5  *  This file is part of udpxy.
6  *
7  *  udpxy is free software: you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation, either version 3 of the License, or
10  *  (at your option) any later version.
11  *
12  *  udpxy is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *  GNU General Public License for more details.
16  *
17  *  You should have received a copy of the GNU General Public License
18  *  along with udpxy.  If not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include <sys/types.h>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <assert.h>
25 #include <errno.h>
26 #include <stdlib.h>
27 #include <sys/uio.h>
28 #include <string.h>
29 #include <strings.h>
30 #include <time.h>
31 
32 #include "udpxy.h"
33 #include "dpkt.h"
34 #include "rtp.h"
35 #include "util.h"
36 #include "mtrace.h"
37 
38 extern FILE* g_flog;
39 
40 static const size_t TS_SEG_LEN = 188;
41 
42 /* data-stream format type */
43 enum {
44     UPXDT_UNKNOWN = 0,     /* no assumptions */
45     UPXDT_TS,          /* MPEG-TS */
46     UPXDT_RTP_TS,      /* RTP over MPEG-TS */
47     UPXDT_UDS,         /* UDS file format */
48     UPXDT_RAW          /* read AS-IS */
49 };
50 static const char* upxfmt_NAME[] = {
51     "UNKNOWN",
52     "MPEG-TS",
53     "RTP-TS",
54     "UDPXY-UDS",
55     "RAW"
56 };
57 static const int UPXDT_LEN = sizeof(upxfmt_NAME) / sizeof(upxfmt_NAME[0]);
58 
59 
60 const char*
fmt2str(upxfmt_t fmt)61 fmt2str( upxfmt_t fmt )
62 {
63     int ifmt = fmt;
64 
65     (void) UPXDT_LEN;
66     assert( (ifmt >= 0 ) && (ifmt < UPXDT_LEN) );
67     return upxfmt_NAME[ ifmt ];
68 }
69 
70 
71 /* check for MPEG TS signature, complain if not found
72  */
73 static int
ts_sigcheck(const int c,off_t offset,ssize_t len,FILE * log,const char * func)74 ts_sigcheck( const int c, off_t offset, ssize_t len,
75              FILE* log, const char* func )
76 
77 {
78     assert( len );
79     (void)(len); /* NOP to avoid warnings */
80 
81     if( c == MPEG_TS_SIG ) return 0;
82 
83     if( log && func ) {
84         (void) offset; /* get rid of a warning if TRACE is disabled */
85         TRACE( (void)tmfprintf( log, "%s: TS signature mismatch TS=[0x%02X], found=[0x%02X]; "
86             "offset [0x%X(%u)] of packet len=[%lu]\n",
87             func, MPEG_TS_SIG, c, offset, offset, (u_long)len ) );
88     }
89 
90     return -1;
91 }
92 
93 
94 /* determine type of stream in memory
95  */
96 upxfmt_t
get_mstream_type(const char * data,size_t len,FILE * log)97 get_mstream_type( const char* data, size_t len, FILE* log )
98 {
99     int sig = 0;
100     size_t hdrlen = 0;
101     ssize_t n = -1;
102 
103     assert( data && len );
104 
105     if( len < (RTP_HDR_SIZE + 1) ) {
106         (void) tmfprintf( log, "%s: read [%ld] bytes,"
107                 " not enough for RTP header\n", __func__,
108                 (long)n );
109         return UPXDT_UNKNOWN;
110     }
111 
112     /* if the 1st byte has MPEG-TS signature - skip further checks */
113     sig = data[0] & 0xFF;
114     if( 0 == ts_sigcheck( sig, 0, 1, NULL /*log*/, __func__ ) )
115         return UPXDT_TS;
116 
117     /* if not RTP - quit */
118     if( 0 == RTP_verify( data, RTP_HDR_SIZE, log ) )
119         return UPXDT_UNKNOWN;
120 
121     /* check the first byte after RTP header - should be
122      * TS signature to be RTP over TS */
123 
124     if( (0 != RTP_hdrlen( data, len, &hdrlen, log )) ||
125         (len < hdrlen) ) {
126         return UPXDT_UNKNOWN;
127     }
128 
129     sig = data[ hdrlen ];
130     if( 0 != ts_sigcheck( sig, 0, 1, log, __func__ ) )
131         return UPXDT_UNKNOWN;
132 
133     return UPXDT_RTP_TS;
134 }
135 
136 
137 
138 /* determine type of stream saved in file
139  */
140 upxfmt_t
get_fstream_type(int fd,FILE * log)141 get_fstream_type( int fd, FILE* log )
142 {
143     ssize_t n = 0;
144     off_t offset = 0, where = 0;
145     upxfmt_t dtype = UPXDT_UNKNOWN;
146     char* data = NULL;
147 
148     /* read in enough data to contain extended header
149      * and beginning of payload segment */
150     size_t len = TS_SEG_LEN + RTP_XTHDRLEN;
151 
152     assert( (fd > 0) && log );
153 
154     if( NULL == (data = malloc( len )) ) {
155         mperror( log, errno, "%s: malloc", __func__ );
156         return UPXDT_UNKNOWN;
157     }
158 
159     do {
160         /* check if it is a MPEG TS stream
161          */
162         n = read( fd, data, len );
163         if( 0 != sizecheck( "Not enough space for stream data",
164                     len, n, log, __func__ ) ) break;
165         offset += n;
166 
167         dtype = get_mstream_type( data, len, log );
168         if( UPXDT_UNKNOWN == dtype ) {
169             TRACE( (void)tmfprintf( log, "%s: file type is not recognized\n",
170                         __func__ ) );
171             dtype = UPXDT_UNKNOWN;
172             break;
173         }
174     } while(0);
175 
176     if( NULL != data ) free( data );
177 
178     if( n <= 0 ) {
179         mperror( log, errno, "%s", __func__ );
180         return UPXDT_UNKNOWN;
181     }
182 
183     where = lseek( fd, (-1) * offset, SEEK_CUR );
184     if( -1 == where ) {
185         mperror( log, errno, "%s: lseek", __func__ );
186         return UPXDT_UNKNOWN;
187     }
188 
189     /*
190     TRACE( (void)tmfprintf( log, "%s: stream type = [%d]=[%s]\n",
191                 __func__, (int)dtype, fmt2str(dtype) ) );
192     */
193 
194     return dtype;
195 }
196 
197 
198 /* read a sequence of MPEG TS packets (to fit into the given buffer)
199  */
200 static ssize_t
read_ts_file(int fd,char * data,const size_t len,FILE * log)201 read_ts_file( int fd, char* data, const size_t len, FILE* log )
202 {
203     const size_t pkt_len = ((len - 1) / TS_SEG_LEN) * TS_SEG_LEN;
204     off_t k = 0;
205     u_int bad_frg = 0;
206     ssize_t n = -1;
207 
208     assert( (fd > 0) && data && len && log);
209 
210     assert( !buf_overrun( data, len, 0, pkt_len, log ) );
211     n = read_buf( fd, data, pkt_len, log );
212     if( n <= 0 ) return n;
213 
214     if( 0 != sizecheck( "Bad TS packet stream",
215                 pkt_len, n, log, __func__ ) )
216         return -1;
217 
218     /* make sure we've read TS records, not random data
219      */
220     for( k = 0; k < (off_t)pkt_len; k += TS_SEG_LEN ) {
221         if( -1 == ts_sigcheck( data[k], k, (u_long)pkt_len,
222                     log, __func__ ) ) {
223             ++bad_frg;
224         }
225     }
226 
227     return (bad_frg ? -1 : n);
228 }
229 
230 
231 /* read an RTP packet
232  */
233 static ssize_t
read_rtp_file(int fd,char * data,const size_t len,FILE * log)234 read_rtp_file( int fd, char* data, const size_t len, FILE* log )
235 {
236     ssize_t nrd = -1, offset = 0;
237     size_t hdrlen = 0, rdlen = 0;
238     u_int frg = 0;
239     int rtp_end = 0, rc = 0;
240     off_t where = 0;
241 
242     assert( (fd > 0) && data && len && log);
243 
244     assert( !buf_overrun( data, len, 0, RTP_HDR_SIZE, log ) );
245     nrd = read_buf( fd, data, RTP_HDR_SIZE, log );
246     if( nrd <= 0 ) return nrd;
247     offset += nrd;
248 
249     if( -1 == sizecheck( "Bad RTP header", RTP_HDR_SIZE, nrd,
250                 log, __func__ ) )
251         return -1;
252 
253     if( 0 == RTP_verify( data, nrd, log ) )
254         return -1;
255 
256     if( -1 == (rc = RTP_hdrlen( data, nrd, &hdrlen, log )) )
257         return -1;
258 
259     /* if there is an extended header, read it in */
260     if( ENOMEM == rc ) {
261         assert( !buf_overrun( data, len, offset,
262                     RTP_XTHDRLEN - RTP_HDR_SIZE, log ) );
263         nrd = read_buf( fd, data + offset,
264                 RTP_XTHDRLEN - RTP_HDR_SIZE, log );
265         if( (nrd <= 0) ||
266             (-1 == sizecheck("Bad RTP x-header",
267                              RTP_XTHDRLEN - RTP_HDR_SIZE, nrd,
268                              log, __func__ )) ) {
269             return -1;
270         }
271         if( 0 == nrd ) return nrd;
272 
273         offset += nrd;
274         rc = RTP_hdrlen( data, offset, &hdrlen, log );
275         if( 0 != rc ) {
276             TRACE( (void)tmfprintf( log, "%s: bad RTP header - quitting\n",
277                         __func__ ) );
278             return -1;
279         }
280 
281         /*
282         TRACE( (void)tmfprintf( log, "%s: RTP x-header length=[%lu]\n",
283                     __func__, (u_long)hdrlen ) );
284         */
285 
286         if( (size_t)offset > hdrlen ) {
287             /* read more than needed: step back */
288 
289             where = lseek( fd, (-1)*((size_t)offset - hdrlen), SEEK_CUR );
290             if( -1 == where ) {
291                 mperror( log, errno, "%s: lseek", __func__ );
292                 return -1;
293             }
294 
295             offset -= ((size_t)offset - hdrlen);
296             assert( (size_t)offset == hdrlen );
297 
298             /*
299             TRACE( (void)tmfprintf( log, "%s: back to fpos=[0x%X], "
300                         "offset=[%ld]\n", __func__, (u_int)where, (long)offset ) );
301             */
302         }
303         else if( hdrlen > (size_t)offset ) {
304             /* read remainder of the header in */
305 
306             assert( !buf_overrun( data, len, offset,
307                         (hdrlen - (size_t)offset), log ) );
308             nrd = read_buf( fd, data + offset,
309                     (hdrlen - (size_t)offset), log );
310             if( nrd <= 0 ||
311                (-1 == sizecheck("Bad RTP x-header tail",
312                              (hdrlen - (size_t)offset), nrd,
313                              log, __func__ )) ) {
314                     return -1;
315             }
316             if( 0 == nrd ) return nrd;
317 
318             offset += nrd;
319             assert( (size_t)offset == hdrlen );
320         }
321     } /* read extended header */
322 
323 
324     /* read TS records until there is another RTP header or EOF */
325     for( frg = 0; (ssize_t)len > offset; ++frg ) {
326 
327         rdlen = ( (len - offset) < TS_SEG_LEN
328                   ? (len - offset)
329                   : TS_SEG_LEN );
330 
331        /*
332        TRACE( (void)tmfprintf( log, "%s: reading [%lu] more bytes\n",
333                __func__, (u_long)rdlen ) );
334        */
335 
336         assert( !buf_overrun( data, len, offset, rdlen, log ) );
337         nrd = read_buf( fd, data + offset, rdlen, log );
338         if( nrd <= 0 ) break;
339 
340         /* if it's an RTP header, roll back and return
341          */
342         rtp_end = RTP_verify( data + offset, nrd, log );
343         if( 1 == rtp_end ) {
344             if( -1 == lseek( fd, (-1) * nrd, SEEK_CUR ) ) {
345                 mperror( log, errno, "%s: lseek", __func__ );
346                 return -1;
347             }
348 
349             break;
350         }
351 
352         /* check if it is a TS packet and it's of the right size
353          */
354         if( (-1 == ts_sigcheck( data[offset], offset, (u_long)TS_SEG_LEN,
355                     log, __func__ )) ||
356             (-1 == sizecheck( "Bad TS segment size", TS_SEG_LEN, nrd,
357                     log, __func__ )) ) {
358             TRACE( hex_dump( "Data in question", data, offset + nrd, log ) );
359             return -1;
360         }
361 
362         offset += nrd;
363     } /* for */
364 
365 
366     /* If it is not EOF and no RTP header for the next message is found,
367      * it is either our buffer is too small (to fit the whole message)
368      * or the stream is invalid
369      */
370     if( !rtp_end && (0 != nrd) ) {
371         (void)tmfprintf( log, "%s: no RTP end after reading [%ld] bytes\n",
372                 __func__, (long)offset );
373         return -1;
374     }
375 
376     return (nrd < 0) ? nrd : offset;
377 }
378 
379 
380 /* read record of one of the supported types from file
381  */
382 ssize_t
read_frecord(int fd,char * data,const size_t len,upxfmt_t * stream_type,FILE * log)383 read_frecord( int fd, char* data, const size_t len,
384              upxfmt_t* stream_type, FILE* log )
385 {
386     upxfmt_t stype;
387     /* off_t where = -1, endmark = -1; */
388     ssize_t nrd = -1;
389 
390     assert( fd > 0 );
391     assert( data && len );
392     assert( stream_type && log );
393 
394     stype = *stream_type;
395 
396     /*
397     where = lseek( fd, 0, SEEK_CUR );
398     TRACE( (void)tmfprintf( log, "%s: BEGIN reading at pos=[0x%X:%u]\n",
399                 __func__, (u_int)where, (u_int)where ) );
400     */
401 
402     if( UPXDT_UNKNOWN == *stream_type ) {
403         stype = get_fstream_type( fd, log );
404 
405         if( UPXDT_UNKNOWN == stype ) {
406             (void)tmfprintf( log, "%s: Unsupported type\n", __func__ );
407             return -1;
408         }
409 
410         *stream_type = stype;
411     } /* UPXDT_UNKNOWN */
412 
413     if( UPXDT_TS == stype ) {
414         nrd = read_ts_file( fd, data, len, log );
415     }
416     else if( UPXDT_RTP_TS == stype ) {
417         nrd = read_rtp_file( fd, data, len, log );
418     }
419     else {
420         (void)tmfprintf( log, "%s: unknown stream type [%d]\n",
421                 __func__, stype );
422         return -1;
423     }
424 
425     /*
426     if( nrd >= 0 ) {
427         endmark = lseek( fd, 0, SEEK_CUR );
428 
429         TRACE( (void)tmfprintf( log, "%s: END reading [%ld] bytes at pos=[0x%X:%u]\n",
430                 __func__, (long)nrd, (u_int)endmark, (u_int)endmark ) );
431 
432         TRACE( sizecheck( "WARNING: Read file discrepancy",
433                     where + nrd, endmark,
434                     log, __func__ ) );
435     }
436     */
437 
438     return nrd;
439 }
440 
441 
442 /* write data as a UDS record
443  */
444 static ssize_t
write_uds_record(int fd,const char * data,size_t len,FILE * log)445 write_uds_record( int fd, const char* data, size_t len, FILE* log )
446 {
447     assert( (fd > 0) && data && len );
448     (void)(data && len && fd);
449     (void)tmfprintf( log, "%s: UDS conversion not yet implemented\n",
450             __func__ );
451     return -1;
452 }
453 
454 
455 /* write RTP record into TS stream
456  */
457 static ssize_t
write_rtp2ts(int fd,const char * data,size_t len,FILE * log)458 write_rtp2ts( int fd, const char* data, size_t len, FILE* log )
459 {
460     void* buf = (void*)data;
461     size_t pldlen = len;
462     const int NO_VERIFY = 0;
463     int rc = 0;
464 
465     assert( (fd > 0) && data && len && log );
466 
467     rc = RTP_process( &buf, &pldlen, NO_VERIFY, log );
468     if( -1 == rc ) return -1;
469 
470     assert( !buf_overrun( buf, len, 0, pldlen, log ) );
471     return write_buf( fd, buf, pldlen, log );
472 }
473 
474 
475 /* write record after converting it from source into destination
476  * format
477  */
478 ssize_t
write_frecord(int fd,const char * data,size_t len,upxfmt_t sfmt,upxfmt_t dfmt,FILE * log)479 write_frecord( int fd, const char* data, size_t len,
480               upxfmt_t sfmt, upxfmt_t dfmt, FILE* log )
481 {
482     ssize_t nwr = -1;
483     int fmt_ok = 0;
484     const char *str_from = NULL, *str_to = NULL;
485 
486     if( UPXDT_UDS == dfmt ) {
487         fmt_ok = 1;
488         nwr = write_uds_record( fd, data, len, log );
489     }
490     else if( UPXDT_TS == dfmt ) {
491         if( UPXDT_RTP_TS == sfmt ) {
492             fmt_ok = 1;
493             nwr = write_rtp2ts( fd, data, len, log );
494         }
495     }
496 
497     if( !fmt_ok ) {
498         str_from = fmt2str(sfmt);
499         str_to   = fmt2str(dfmt);
500         (void)tmfprintf( log, "Conversion from [%s] into [%s] is not supported\n",
501                 str_from, str_to );
502         return -1;
503     }
504 
505     return nwr;
506 }
507 
508 
509 /* reset packet-buffer registry in stream spec
510  */
511 void
reset_pkt_registry(struct dstream_ctx * ds)512 reset_pkt_registry( struct dstream_ctx* ds )
513 {
514     assert( ds );
515 
516     ds->flags &= ~F_DROP_PACKET;
517     ds->pkt_count = 0;
518 }
519 
520 
521 /* release resources allocated for stream spec
522  */
523 void
free_dstream_ctx(struct dstream_ctx * ds)524 free_dstream_ctx( struct dstream_ctx* ds )
525 {
526     assert( ds );
527 
528     if( NULL != ds->pkt ) {
529         free( ds->pkt );
530         ds->pkt = NULL;
531 
532         ds->pkt_count = ds->max_pkt = 0;
533     }
534 }
535 
536 
537 
538 /* register received packet into registry (for scattered output)
539  */
540 static int
register_packet(struct dstream_ctx * spc,char * buf,size_t len)541 register_packet( struct dstream_ctx* spc, char* buf, size_t len )
542 {
543     struct iovec* new_pkt = NULL;
544     static const int DO_VERIFY = 1;
545 
546     void* new_buf = buf;
547     size_t new_len = len;
548 
549     assert( spc->max_pkt > 0 );
550 
551     /* enlarge packet registry if needed */
552     if( spc->pkt_count >= spc->max_pkt ) {
553         spc->max_pkt <<= 1;
554         spc->pkt = realloc( spc->pkt, spc->max_pkt * sizeof(spc->pkt[0]) );
555         if( NULL == spc->pkt ) {
556             mperror( g_flog, errno, "%s: realloc", __func__ );
557             return -1;
558         }
559 
560         TRACE( (void)tmfprintf( g_flog, "RTP packet registry "
561                 "expanded to [%lu] records\n", (u_long)spc->max_pkt ) );
562     }
563 
564     /* put packet info into registry */
565 
566     new_pkt = &(spc->pkt[ spc->pkt_count ]);
567 
568     /*
569     TRACE( (void)tmfprintf( stderr, "IN: packet [%lu]: buf=[%p], len=[%lu]\n",
570                 (u_long)spc->pkt_count, (void*)buf, (u_long)len ) );
571     */
572 
573     if( 0 != RTP_process( &new_buf, &new_len, DO_VERIFY, g_flog ) ) {
574         TRACE( (void)tmfputs("register packet: dropping\n", g_flog) );
575         spc->flags |= F_DROP_PACKET;
576 
577         return 0;
578     }
579 
580     new_pkt->iov_base = new_buf;
581     new_pkt->iov_len = new_len;
582 
583     /*
584     TRACE( (void)tmfprintf( stderr, "OUT: packet [%lu]: buf=[%p], len=[%lu]\n",
585                 (u_long)spc->pkt_count, new_pkt->iov_base,
586                 (u_long)new_pkt->iov_len ) );
587     */
588 
589     spc->pkt_count++;
590     return 0;
591 }
592 
593 
594 /* read data from source, determine underlying protocol
595  * (if not already known); and process the packet
596  * if needed (for RTP - register packet)
597  *
598  * return the number of octets read from the source
599  * into the buffer
600  */
601 static ssize_t
read_packet(struct dstream_ctx * spc,int fd,char * buf,size_t len)602 read_packet( struct dstream_ctx* spc, int fd, char* buf, size_t len )
603 {
604     ssize_t n = -1;
605     size_t chunk_len = len;
606 
607     assert( spc && buf && len );
608     assert( fd > 0 );
609 
610     /* if *RAW* data specified - read AS IS
611      * and exit */
612     if( UPXDT_RAW == spc->stype ) {
613         return read_buf( fd, buf, len, g_flog );
614     }
615 
616     /* if it is (or *could* be) RTP, read only MTU bytes
617      */
618     if( (spc->stype == UPXDT_RTP_TS) || (spc->flags & F_CHECK_FMT) )
619         chunk_len = (len > spc->mtu) ? spc->mtu : len;
620 
621     if( spc->flags & F_FILE_INPUT ) {
622         assert( !buf_overrun( buf, len, 0, chunk_len, g_flog ) );
623         n = read_frecord( fd, buf, chunk_len, &(spc->stype), g_flog );
624         if( n <= 0 ) return n;
625     }
626     else {
627         assert( !buf_overrun(buf, len, 0, chunk_len, g_flog) );
628         n = read_buf( fd, buf, chunk_len, g_flog );
629         if( n <= 0 ) return n;
630     }
631 
632     if( spc->flags & F_CHECK_FMT ) {
633         spc->stype = get_mstream_type( buf, n, g_flog );
634         switch (spc->stype) {
635             case UPXDT_RTP_TS:
636                 /* scattered: exclude RTP headers */
637                 spc->flags |= F_SCATTERED; break;
638             case UPXDT_TS:
639                 spc->flags &= ~F_SCATTERED; break;
640             default:
641                 spc->stype = UPXDT_RAW;
642                 TRACE( (void)tmfputs( "Unrecognized stream type\n", g_flog ) );
643                 break;
644         } /* switch */
645 
646         TRACE( (void)tmfprintf( g_flog, "Established stream as [%s]\n",
647                fmt2str( spc->stype ) ) );
648 
649         spc->flags &= ~F_CHECK_FMT;
650     }
651 
652     if( spc->flags & F_SCATTERED )
653         if( -1 == register_packet( spc, buf, n ) ) return -1;
654 
655     return n;
656 }
657 
658 
659 /* read data from source of specified type (UDP socket or otherwise);
660  * read as many fragments as specified (max_frgs) into the buffer
661  */
662 ssize_t
read_data(struct dstream_ctx * spc,int fd,char * data,const ssize_t data_len,const struct rdata_opt * opt)663 read_data( struct dstream_ctx* spc, int fd, char* data,
664            const ssize_t data_len, const struct rdata_opt* opt )
665 {
666     int m = 0;
667     ssize_t n = 0, nrcv = -1;
668     time_t start_tm = time(NULL), cur_tm = 0;
669     time_t buftm_sec = 0;
670 
671     assert( spc && (data_len > 0) && opt );
672 
673     /* if max_frgs < 0, read as many packets as can fit in the buffer,
674      * otherwise read no more than max_frgs packets
675      */
676 
677     for( m = 0, n = 0; ((opt->max_frgs < 0) ? 1 : (m < opt->max_frgs)); ++m ) {
678         nrcv = read_packet( spc, fd, data + n, data_len - n );
679         if( nrcv <= 0 ) {
680             if( EAGAIN == errno ) {
681                 (void)tmfprintf( g_flog,
682                         "Receive on socket/file [%d] timed out\n",
683                         fd);
684             }
685 
686             if( 0 == nrcv ) (void)tmfprintf(g_flog, "%s - EOF\n",__func__);
687             else {
688                 mperror(g_flog, errno, "%s: read/recv", __func__);
689             }
690 
691             break;
692         }
693 
694         if( spc->flags & F_DROP_PACKET ) {
695             spc->flags &= ~F_DROP_PACKET;
696             continue;
697         }
698 
699         n += nrcv;
700         if( n >= (data_len - nrcv) ) break;
701 
702         if( -1 != opt->buf_tmout ) {
703             cur_tm = time(NULL);
704             buftm_sec = cur_tm - start_tm;
705             if( buftm_sec >= opt->buf_tmout ) {
706                 TRACE( (void)tmfprintf( g_flog, "%s: Buffer timed out "
707                             "after [%ld] seconds\n", __func__,
708                             (long)buftm_sec ) );
709                 break;
710             }
711             /*
712             else {
713                 (void) tmfprintf( g_flog, "%s: Skip\n", __func__ );
714             }
715             */
716         }
717     } /* for */
718 
719     if( (nrcv > 0) && !n ) {
720         TRACE( (void)tmfprintf( g_flog, "%s: no data to send "
721                     "out of [%d] packets\n", __func__, m ) );
722         return -1;
723     }
724 
725     return (nrcv > 0) ? n : -1;
726 }
727 
728 
729 /* write data to destination(s)
730  */
731 ssize_t
write_data(const struct dstream_ctx * spc,const char * data,const ssize_t len,int fd)732 write_data( const struct dstream_ctx* spc,
733             const char* data,
734             const ssize_t len,
735             int fd )
736 {
737     ssize_t n = 0, error = IO_ERR;
738     int32_t n_count = -1;
739 
740     assert( spc && data && len );
741     if( fd <= 0 ) return 0;
742 
743     if( spc->flags & F_SCATTERED ) {
744         n_count = spc->pkt_count;
745         n = writev( fd, spc->pkt, n_count );
746         if( n <= 0 ) {
747             if( EAGAIN == errno ) {
748                 (void)tmfprintf( g_flog, "Write on fd=[%d] timed out\n", fd);
749                 error = IO_BLK;
750             }
751             mperror( g_flog, errno, "%s: writev", __func__ );
752             return error;
753         }
754     }
755     else {
756         n = write_buf( fd, data, len, g_flog );
757         if( n < 0 )
758             error = n;
759     }
760 
761     return (n > 0) ? n : error;
762 }
763 
764 
765 /* initialize incoming-stream context:
766  *      set data type (if possible) and flags
767  */
768 int
init_dstream_ctx(struct dstream_ctx * ds,const char * cmd,const char * fname,ssize_t nmsgs)769 init_dstream_ctx( struct dstream_ctx* ds, const char* cmd, const char* fname,
770                   ssize_t nmsgs )
771 {
772     extern const char       CMD_UDP[];
773     extern const size_t     CMD_UDP_LEN;
774     extern const char       CMD_RTP[];
775     extern const size_t     CMD_RTP_LEN;
776 
777     assert( ds && cmd && (nmsgs > 0) );
778 
779     ds->flags = 0;
780     ds->pkt = NULL;
781     ds->max_pkt = ds->pkt_count = 0;
782     ds->mtu = ETHERNET_MTU;
783 
784     if( NULL != fname ) {
785         ds->stype = UPXDT_UNKNOWN;
786         ds->flags |= (F_CHECK_FMT | F_FILE_INPUT);
787         TRACE( (void)tmfputs( "File stream, RTP check enabled\n", g_flog ) );
788     }
789     else if( 0 == strncmp( cmd, CMD_UDP, CMD_UDP_LEN ) ) {
790         ds->stype = UPXDT_UNKNOWN;
791         ds->flags |= F_CHECK_FMT;
792         TRACE( (void)tmfputs( "UDP stream, RTP check enabled\n", g_flog ) );
793     }
794     else if( 0 == strncmp( cmd, CMD_RTP, CMD_RTP_LEN ) ) {
795         ds->stype = UPXDT_RTP_TS;
796         ds->flags |= F_SCATTERED;
797         TRACE( (void)tmfputs( "RTP (over UDP) stream assumed,"
798                     " no checks\n", g_flog ) );
799     }
800     else {
801         TRACE( (void)tmfprintf( g_flog, "%s: "
802                     "Irrelevant command [%s]\n", __func__, cmd) );
803         return -1;
804     }
805 
806     ds->pkt = calloc( nmsgs, sizeof(ds->pkt[0]) );
807     if( NULL == ds->pkt ) {
808         mperror( g_flog, errno, "%s: calloc", __func__ );
809         return -1;
810     }
811 
812     ds->max_pkt = nmsgs;
813     return 0;
814 }
815 
816 
817 
818 /* __EOF__ */
819