1 /* @(#) implementation of packet-io functions for udpxy
2 *
3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
4 *
5 * This file is part of udpxy.
6 *
7 * udpxy is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * udpxy is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with udpxy. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include <sys/types.h>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <assert.h>
25 #include <errno.h>
26 #include <stdlib.h>
27 #include <sys/uio.h>
28 #include <string.h>
29 #include <strings.h>
30 #include <time.h>
31
32 #include "udpxy.h"
33 #include "dpkt.h"
34 #include "rtp.h"
35 #include "util.h"
36 #include "mtrace.h"
37
38 extern FILE* g_flog;
39
40 static const size_t TS_SEG_LEN = 188;
41
42 /* data-stream format type */
43 enum {
44 UPXDT_UNKNOWN = 0, /* no assumptions */
45 UPXDT_TS, /* MPEG-TS */
46 UPXDT_RTP_TS, /* RTP over MPEG-TS */
47 UPXDT_UDS, /* UDS file format */
48 UPXDT_RAW /* read AS-IS */
49 };
50 static const char* upxfmt_NAME[] = {
51 "UNKNOWN",
52 "MPEG-TS",
53 "RTP-TS",
54 "UDPXY-UDS",
55 "RAW"
56 };
57 static const int UPXDT_LEN = sizeof(upxfmt_NAME) / sizeof(upxfmt_NAME[0]);
58
59
60 const char*
fmt2str(upxfmt_t fmt)61 fmt2str( upxfmt_t fmt )
62 {
63 int ifmt = fmt;
64
65 (void) UPXDT_LEN;
66 assert( (ifmt >= 0 ) && (ifmt < UPXDT_LEN) );
67 return upxfmt_NAME[ ifmt ];
68 }
69
70
71 /* check for MPEG TS signature, complain if not found
72 */
73 static int
ts_sigcheck(const int c,off_t offset,ssize_t len,FILE * log,const char * func)74 ts_sigcheck( const int c, off_t offset, ssize_t len,
75 FILE* log, const char* func )
76
77 {
78 assert( len );
79 (void)(len); /* NOP to avoid warnings */
80
81 if( c == MPEG_TS_SIG ) return 0;
82
83 if( log && func ) {
84 (void) offset; /* get rid of a warning if TRACE is disabled */
85 TRACE( (void)tmfprintf( log, "%s: TS signature mismatch TS=[0x%02X], found=[0x%02X]; "
86 "offset [0x%X(%u)] of packet len=[%lu]\n",
87 func, MPEG_TS_SIG, c, offset, offset, (u_long)len ) );
88 }
89
90 return -1;
91 }
92
93
94 /* determine type of stream in memory
95 */
96 upxfmt_t
get_mstream_type(const char * data,size_t len,FILE * log)97 get_mstream_type( const char* data, size_t len, FILE* log )
98 {
99 int sig = 0;
100 size_t hdrlen = 0;
101 ssize_t n = -1;
102
103 assert( data && len );
104
105 if( len < (RTP_HDR_SIZE + 1) ) {
106 (void) tmfprintf( log, "%s: read [%ld] bytes,"
107 " not enough for RTP header\n", __func__,
108 (long)n );
109 return UPXDT_UNKNOWN;
110 }
111
112 /* if the 1st byte has MPEG-TS signature - skip further checks */
113 sig = data[0] & 0xFF;
114 if( 0 == ts_sigcheck( sig, 0, 1, NULL /*log*/, __func__ ) )
115 return UPXDT_TS;
116
117 /* if not RTP - quit */
118 if( 0 == RTP_verify( data, RTP_HDR_SIZE, log ) )
119 return UPXDT_UNKNOWN;
120
121 /* check the first byte after RTP header - should be
122 * TS signature to be RTP over TS */
123
124 if( (0 != RTP_hdrlen( data, len, &hdrlen, log )) ||
125 (len < hdrlen) ) {
126 return UPXDT_UNKNOWN;
127 }
128
129 sig = data[ hdrlen ];
130 if( 0 != ts_sigcheck( sig, 0, 1, log, __func__ ) )
131 return UPXDT_UNKNOWN;
132
133 return UPXDT_RTP_TS;
134 }
135
136
137
138 /* determine type of stream saved in file
139 */
140 upxfmt_t
get_fstream_type(int fd,FILE * log)141 get_fstream_type( int fd, FILE* log )
142 {
143 ssize_t n = 0;
144 off_t offset = 0, where = 0;
145 upxfmt_t dtype = UPXDT_UNKNOWN;
146 char* data = NULL;
147
148 /* read in enough data to contain extended header
149 * and beginning of payload segment */
150 size_t len = TS_SEG_LEN + RTP_XTHDRLEN;
151
152 assert( (fd > 0) && log );
153
154 if( NULL == (data = malloc( len )) ) {
155 mperror( log, errno, "%s: malloc", __func__ );
156 return UPXDT_UNKNOWN;
157 }
158
159 do {
160 /* check if it is a MPEG TS stream
161 */
162 n = read( fd, data, len );
163 if( 0 != sizecheck( "Not enough space for stream data",
164 len, n, log, __func__ ) ) break;
165 offset += n;
166
167 dtype = get_mstream_type( data, len, log );
168 if( UPXDT_UNKNOWN == dtype ) {
169 TRACE( (void)tmfprintf( log, "%s: file type is not recognized\n",
170 __func__ ) );
171 dtype = UPXDT_UNKNOWN;
172 break;
173 }
174 } while(0);
175
176 if( NULL != data ) free( data );
177
178 if( n <= 0 ) {
179 mperror( log, errno, "%s", __func__ );
180 return UPXDT_UNKNOWN;
181 }
182
183 where = lseek( fd, (-1) * offset, SEEK_CUR );
184 if( -1 == where ) {
185 mperror( log, errno, "%s: lseek", __func__ );
186 return UPXDT_UNKNOWN;
187 }
188
189 /*
190 TRACE( (void)tmfprintf( log, "%s: stream type = [%d]=[%s]\n",
191 __func__, (int)dtype, fmt2str(dtype) ) );
192 */
193
194 return dtype;
195 }
196
197
198 /* read a sequence of MPEG TS packets (to fit into the given buffer)
199 */
200 static ssize_t
read_ts_file(int fd,char * data,const size_t len,FILE * log)201 read_ts_file( int fd, char* data, const size_t len, FILE* log )
202 {
203 const size_t pkt_len = ((len - 1) / TS_SEG_LEN) * TS_SEG_LEN;
204 off_t k = 0;
205 u_int bad_frg = 0;
206 ssize_t n = -1;
207
208 assert( (fd > 0) && data && len && log);
209
210 assert( !buf_overrun( data, len, 0, pkt_len, log ) );
211 n = read_buf( fd, data, pkt_len, log );
212 if( n <= 0 ) return n;
213
214 if( 0 != sizecheck( "Bad TS packet stream",
215 pkt_len, n, log, __func__ ) )
216 return -1;
217
218 /* make sure we've read TS records, not random data
219 */
220 for( k = 0; k < (off_t)pkt_len; k += TS_SEG_LEN ) {
221 if( -1 == ts_sigcheck( data[k], k, (u_long)pkt_len,
222 log, __func__ ) ) {
223 ++bad_frg;
224 }
225 }
226
227 return (bad_frg ? -1 : n);
228 }
229
230
231 /* read an RTP packet
232 */
233 static ssize_t
read_rtp_file(int fd,char * data,const size_t len,FILE * log)234 read_rtp_file( int fd, char* data, const size_t len, FILE* log )
235 {
236 ssize_t nrd = -1, offset = 0;
237 size_t hdrlen = 0, rdlen = 0;
238 u_int frg = 0;
239 int rtp_end = 0, rc = 0;
240 off_t where = 0;
241
242 assert( (fd > 0) && data && len && log);
243
244 assert( !buf_overrun( data, len, 0, RTP_HDR_SIZE, log ) );
245 nrd = read_buf( fd, data, RTP_HDR_SIZE, log );
246 if( nrd <= 0 ) return nrd;
247 offset += nrd;
248
249 if( -1 == sizecheck( "Bad RTP header", RTP_HDR_SIZE, nrd,
250 log, __func__ ) )
251 return -1;
252
253 if( 0 == RTP_verify( data, nrd, log ) )
254 return -1;
255
256 if( -1 == (rc = RTP_hdrlen( data, nrd, &hdrlen, log )) )
257 return -1;
258
259 /* if there is an extended header, read it in */
260 if( ENOMEM == rc ) {
261 assert( !buf_overrun( data, len, offset,
262 RTP_XTHDRLEN - RTP_HDR_SIZE, log ) );
263 nrd = read_buf( fd, data + offset,
264 RTP_XTHDRLEN - RTP_HDR_SIZE, log );
265 if( (nrd <= 0) ||
266 (-1 == sizecheck("Bad RTP x-header",
267 RTP_XTHDRLEN - RTP_HDR_SIZE, nrd,
268 log, __func__ )) ) {
269 return -1;
270 }
271 if( 0 == nrd ) return nrd;
272
273 offset += nrd;
274 rc = RTP_hdrlen( data, offset, &hdrlen, log );
275 if( 0 != rc ) {
276 TRACE( (void)tmfprintf( log, "%s: bad RTP header - quitting\n",
277 __func__ ) );
278 return -1;
279 }
280
281 /*
282 TRACE( (void)tmfprintf( log, "%s: RTP x-header length=[%lu]\n",
283 __func__, (u_long)hdrlen ) );
284 */
285
286 if( (size_t)offset > hdrlen ) {
287 /* read more than needed: step back */
288
289 where = lseek( fd, (-1)*((size_t)offset - hdrlen), SEEK_CUR );
290 if( -1 == where ) {
291 mperror( log, errno, "%s: lseek", __func__ );
292 return -1;
293 }
294
295 offset -= ((size_t)offset - hdrlen);
296 assert( (size_t)offset == hdrlen );
297
298 /*
299 TRACE( (void)tmfprintf( log, "%s: back to fpos=[0x%X], "
300 "offset=[%ld]\n", __func__, (u_int)where, (long)offset ) );
301 */
302 }
303 else if( hdrlen > (size_t)offset ) {
304 /* read remainder of the header in */
305
306 assert( !buf_overrun( data, len, offset,
307 (hdrlen - (size_t)offset), log ) );
308 nrd = read_buf( fd, data + offset,
309 (hdrlen - (size_t)offset), log );
310 if( nrd <= 0 ||
311 (-1 == sizecheck("Bad RTP x-header tail",
312 (hdrlen - (size_t)offset), nrd,
313 log, __func__ )) ) {
314 return -1;
315 }
316 if( 0 == nrd ) return nrd;
317
318 offset += nrd;
319 assert( (size_t)offset == hdrlen );
320 }
321 } /* read extended header */
322
323
324 /* read TS records until there is another RTP header or EOF */
325 for( frg = 0; (ssize_t)len > offset; ++frg ) {
326
327 rdlen = ( (len - offset) < TS_SEG_LEN
328 ? (len - offset)
329 : TS_SEG_LEN );
330
331 /*
332 TRACE( (void)tmfprintf( log, "%s: reading [%lu] more bytes\n",
333 __func__, (u_long)rdlen ) );
334 */
335
336 assert( !buf_overrun( data, len, offset, rdlen, log ) );
337 nrd = read_buf( fd, data + offset, rdlen, log );
338 if( nrd <= 0 ) break;
339
340 /* if it's an RTP header, roll back and return
341 */
342 rtp_end = RTP_verify( data + offset, nrd, log );
343 if( 1 == rtp_end ) {
344 if( -1 == lseek( fd, (-1) * nrd, SEEK_CUR ) ) {
345 mperror( log, errno, "%s: lseek", __func__ );
346 return -1;
347 }
348
349 break;
350 }
351
352 /* check if it is a TS packet and it's of the right size
353 */
354 if( (-1 == ts_sigcheck( data[offset], offset, (u_long)TS_SEG_LEN,
355 log, __func__ )) ||
356 (-1 == sizecheck( "Bad TS segment size", TS_SEG_LEN, nrd,
357 log, __func__ )) ) {
358 TRACE( hex_dump( "Data in question", data, offset + nrd, log ) );
359 return -1;
360 }
361
362 offset += nrd;
363 } /* for */
364
365
366 /* If it is not EOF and no RTP header for the next message is found,
367 * it is either our buffer is too small (to fit the whole message)
368 * or the stream is invalid
369 */
370 if( !rtp_end && (0 != nrd) ) {
371 (void)tmfprintf( log, "%s: no RTP end after reading [%ld] bytes\n",
372 __func__, (long)offset );
373 return -1;
374 }
375
376 return (nrd < 0) ? nrd : offset;
377 }
378
379
380 /* read record of one of the supported types from file
381 */
382 ssize_t
read_frecord(int fd,char * data,const size_t len,upxfmt_t * stream_type,FILE * log)383 read_frecord( int fd, char* data, const size_t len,
384 upxfmt_t* stream_type, FILE* log )
385 {
386 upxfmt_t stype;
387 /* off_t where = -1, endmark = -1; */
388 ssize_t nrd = -1;
389
390 assert( fd > 0 );
391 assert( data && len );
392 assert( stream_type && log );
393
394 stype = *stream_type;
395
396 /*
397 where = lseek( fd, 0, SEEK_CUR );
398 TRACE( (void)tmfprintf( log, "%s: BEGIN reading at pos=[0x%X:%u]\n",
399 __func__, (u_int)where, (u_int)where ) );
400 */
401
402 if( UPXDT_UNKNOWN == *stream_type ) {
403 stype = get_fstream_type( fd, log );
404
405 if( UPXDT_UNKNOWN == stype ) {
406 (void)tmfprintf( log, "%s: Unsupported type\n", __func__ );
407 return -1;
408 }
409
410 *stream_type = stype;
411 } /* UPXDT_UNKNOWN */
412
413 if( UPXDT_TS == stype ) {
414 nrd = read_ts_file( fd, data, len, log );
415 }
416 else if( UPXDT_RTP_TS == stype ) {
417 nrd = read_rtp_file( fd, data, len, log );
418 }
419 else {
420 (void)tmfprintf( log, "%s: unknown stream type [%d]\n",
421 __func__, stype );
422 return -1;
423 }
424
425 /*
426 if( nrd >= 0 ) {
427 endmark = lseek( fd, 0, SEEK_CUR );
428
429 TRACE( (void)tmfprintf( log, "%s: END reading [%ld] bytes at pos=[0x%X:%u]\n",
430 __func__, (long)nrd, (u_int)endmark, (u_int)endmark ) );
431
432 TRACE( sizecheck( "WARNING: Read file discrepancy",
433 where + nrd, endmark,
434 log, __func__ ) );
435 }
436 */
437
438 return nrd;
439 }
440
441
442 /* write data as a UDS record
443 */
444 static ssize_t
write_uds_record(int fd,const char * data,size_t len,FILE * log)445 write_uds_record( int fd, const char* data, size_t len, FILE* log )
446 {
447 assert( (fd > 0) && data && len );
448 (void)(data && len && fd);
449 (void)tmfprintf( log, "%s: UDS conversion not yet implemented\n",
450 __func__ );
451 return -1;
452 }
453
454
455 /* write RTP record into TS stream
456 */
457 static ssize_t
write_rtp2ts(int fd,const char * data,size_t len,FILE * log)458 write_rtp2ts( int fd, const char* data, size_t len, FILE* log )
459 {
460 void* buf = (void*)data;
461 size_t pldlen = len;
462 const int NO_VERIFY = 0;
463 int rc = 0;
464
465 assert( (fd > 0) && data && len && log );
466
467 rc = RTP_process( &buf, &pldlen, NO_VERIFY, log );
468 if( -1 == rc ) return -1;
469
470 assert( !buf_overrun( buf, len, 0, pldlen, log ) );
471 return write_buf( fd, buf, pldlen, log );
472 }
473
474
475 /* write record after converting it from source into destination
476 * format
477 */
478 ssize_t
write_frecord(int fd,const char * data,size_t len,upxfmt_t sfmt,upxfmt_t dfmt,FILE * log)479 write_frecord( int fd, const char* data, size_t len,
480 upxfmt_t sfmt, upxfmt_t dfmt, FILE* log )
481 {
482 ssize_t nwr = -1;
483 int fmt_ok = 0;
484 const char *str_from = NULL, *str_to = NULL;
485
486 if( UPXDT_UDS == dfmt ) {
487 fmt_ok = 1;
488 nwr = write_uds_record( fd, data, len, log );
489 }
490 else if( UPXDT_TS == dfmt ) {
491 if( UPXDT_RTP_TS == sfmt ) {
492 fmt_ok = 1;
493 nwr = write_rtp2ts( fd, data, len, log );
494 }
495 }
496
497 if( !fmt_ok ) {
498 str_from = fmt2str(sfmt);
499 str_to = fmt2str(dfmt);
500 (void)tmfprintf( log, "Conversion from [%s] into [%s] is not supported\n",
501 str_from, str_to );
502 return -1;
503 }
504
505 return nwr;
506 }
507
508
509 /* reset packet-buffer registry in stream spec
510 */
511 void
reset_pkt_registry(struct dstream_ctx * ds)512 reset_pkt_registry( struct dstream_ctx* ds )
513 {
514 assert( ds );
515
516 ds->flags &= ~F_DROP_PACKET;
517 ds->pkt_count = 0;
518 }
519
520
521 /* release resources allocated for stream spec
522 */
523 void
free_dstream_ctx(struct dstream_ctx * ds)524 free_dstream_ctx( struct dstream_ctx* ds )
525 {
526 assert( ds );
527
528 if( NULL != ds->pkt ) {
529 free( ds->pkt );
530 ds->pkt = NULL;
531
532 ds->pkt_count = ds->max_pkt = 0;
533 }
534 }
535
536
537
538 /* register received packet into registry (for scattered output)
539 */
540 static int
register_packet(struct dstream_ctx * spc,char * buf,size_t len)541 register_packet( struct dstream_ctx* spc, char* buf, size_t len )
542 {
543 struct iovec* new_pkt = NULL;
544 static const int DO_VERIFY = 1;
545
546 void* new_buf = buf;
547 size_t new_len = len;
548
549 assert( spc->max_pkt > 0 );
550
551 /* enlarge packet registry if needed */
552 if( spc->pkt_count >= spc->max_pkt ) {
553 spc->max_pkt <<= 1;
554 spc->pkt = realloc( spc->pkt, spc->max_pkt * sizeof(spc->pkt[0]) );
555 if( NULL == spc->pkt ) {
556 mperror( g_flog, errno, "%s: realloc", __func__ );
557 return -1;
558 }
559
560 TRACE( (void)tmfprintf( g_flog, "RTP packet registry "
561 "expanded to [%lu] records\n", (u_long)spc->max_pkt ) );
562 }
563
564 /* put packet info into registry */
565
566 new_pkt = &(spc->pkt[ spc->pkt_count ]);
567
568 /*
569 TRACE( (void)tmfprintf( stderr, "IN: packet [%lu]: buf=[%p], len=[%lu]\n",
570 (u_long)spc->pkt_count, (void*)buf, (u_long)len ) );
571 */
572
573 if( 0 != RTP_process( &new_buf, &new_len, DO_VERIFY, g_flog ) ) {
574 TRACE( (void)tmfputs("register packet: dropping\n", g_flog) );
575 spc->flags |= F_DROP_PACKET;
576
577 return 0;
578 }
579
580 new_pkt->iov_base = new_buf;
581 new_pkt->iov_len = new_len;
582
583 /*
584 TRACE( (void)tmfprintf( stderr, "OUT: packet [%lu]: buf=[%p], len=[%lu]\n",
585 (u_long)spc->pkt_count, new_pkt->iov_base,
586 (u_long)new_pkt->iov_len ) );
587 */
588
589 spc->pkt_count++;
590 return 0;
591 }
592
593
594 /* read data from source, determine underlying protocol
595 * (if not already known); and process the packet
596 * if needed (for RTP - register packet)
597 *
598 * return the number of octets read from the source
599 * into the buffer
600 */
601 static ssize_t
read_packet(struct dstream_ctx * spc,int fd,char * buf,size_t len)602 read_packet( struct dstream_ctx* spc, int fd, char* buf, size_t len )
603 {
604 ssize_t n = -1;
605 size_t chunk_len = len;
606
607 assert( spc && buf && len );
608 assert( fd > 0 );
609
610 /* if *RAW* data specified - read AS IS
611 * and exit */
612 if( UPXDT_RAW == spc->stype ) {
613 return read_buf( fd, buf, len, g_flog );
614 }
615
616 /* if it is (or *could* be) RTP, read only MTU bytes
617 */
618 if( (spc->stype == UPXDT_RTP_TS) || (spc->flags & F_CHECK_FMT) )
619 chunk_len = (len > spc->mtu) ? spc->mtu : len;
620
621 if( spc->flags & F_FILE_INPUT ) {
622 assert( !buf_overrun( buf, len, 0, chunk_len, g_flog ) );
623 n = read_frecord( fd, buf, chunk_len, &(spc->stype), g_flog );
624 if( n <= 0 ) return n;
625 }
626 else {
627 assert( !buf_overrun(buf, len, 0, chunk_len, g_flog) );
628 n = read_buf( fd, buf, chunk_len, g_flog );
629 if( n <= 0 ) return n;
630 }
631
632 if( spc->flags & F_CHECK_FMT ) {
633 spc->stype = get_mstream_type( buf, n, g_flog );
634 switch (spc->stype) {
635 case UPXDT_RTP_TS:
636 /* scattered: exclude RTP headers */
637 spc->flags |= F_SCATTERED; break;
638 case UPXDT_TS:
639 spc->flags &= ~F_SCATTERED; break;
640 default:
641 spc->stype = UPXDT_RAW;
642 TRACE( (void)tmfputs( "Unrecognized stream type\n", g_flog ) );
643 break;
644 } /* switch */
645
646 TRACE( (void)tmfprintf( g_flog, "Established stream as [%s]\n",
647 fmt2str( spc->stype ) ) );
648
649 spc->flags &= ~F_CHECK_FMT;
650 }
651
652 if( spc->flags & F_SCATTERED )
653 if( -1 == register_packet( spc, buf, n ) ) return -1;
654
655 return n;
656 }
657
658
659 /* read data from source of specified type (UDP socket or otherwise);
660 * read as many fragments as specified (max_frgs) into the buffer
661 */
662 ssize_t
read_data(struct dstream_ctx * spc,int fd,char * data,const ssize_t data_len,const struct rdata_opt * opt)663 read_data( struct dstream_ctx* spc, int fd, char* data,
664 const ssize_t data_len, const struct rdata_opt* opt )
665 {
666 int m = 0;
667 ssize_t n = 0, nrcv = -1;
668 time_t start_tm = time(NULL), cur_tm = 0;
669 time_t buftm_sec = 0;
670
671 assert( spc && (data_len > 0) && opt );
672
673 /* if max_frgs < 0, read as many packets as can fit in the buffer,
674 * otherwise read no more than max_frgs packets
675 */
676
677 for( m = 0, n = 0; ((opt->max_frgs < 0) ? 1 : (m < opt->max_frgs)); ++m ) {
678 nrcv = read_packet( spc, fd, data + n, data_len - n );
679 if( nrcv <= 0 ) {
680 if( EAGAIN == errno ) {
681 (void)tmfprintf( g_flog,
682 "Receive on socket/file [%d] timed out\n",
683 fd);
684 }
685
686 if( 0 == nrcv ) (void)tmfprintf(g_flog, "%s - EOF\n",__func__);
687 else {
688 mperror(g_flog, errno, "%s: read/recv", __func__);
689 }
690
691 break;
692 }
693
694 if( spc->flags & F_DROP_PACKET ) {
695 spc->flags &= ~F_DROP_PACKET;
696 continue;
697 }
698
699 n += nrcv;
700 if( n >= (data_len - nrcv) ) break;
701
702 if( -1 != opt->buf_tmout ) {
703 cur_tm = time(NULL);
704 buftm_sec = cur_tm - start_tm;
705 if( buftm_sec >= opt->buf_tmout ) {
706 TRACE( (void)tmfprintf( g_flog, "%s: Buffer timed out "
707 "after [%ld] seconds\n", __func__,
708 (long)buftm_sec ) );
709 break;
710 }
711 /*
712 else {
713 (void) tmfprintf( g_flog, "%s: Skip\n", __func__ );
714 }
715 */
716 }
717 } /* for */
718
719 if( (nrcv > 0) && !n ) {
720 TRACE( (void)tmfprintf( g_flog, "%s: no data to send "
721 "out of [%d] packets\n", __func__, m ) );
722 return -1;
723 }
724
725 return (nrcv > 0) ? n : -1;
726 }
727
728
729 /* write data to destination(s)
730 */
731 ssize_t
write_data(const struct dstream_ctx * spc,const char * data,const ssize_t len,int fd)732 write_data( const struct dstream_ctx* spc,
733 const char* data,
734 const ssize_t len,
735 int fd )
736 {
737 ssize_t n = 0, error = IO_ERR;
738 int32_t n_count = -1;
739
740 assert( spc && data && len );
741 if( fd <= 0 ) return 0;
742
743 if( spc->flags & F_SCATTERED ) {
744 n_count = spc->pkt_count;
745 n = writev( fd, spc->pkt, n_count );
746 if( n <= 0 ) {
747 if( EAGAIN == errno ) {
748 (void)tmfprintf( g_flog, "Write on fd=[%d] timed out\n", fd);
749 error = IO_BLK;
750 }
751 mperror( g_flog, errno, "%s: writev", __func__ );
752 return error;
753 }
754 }
755 else {
756 n = write_buf( fd, data, len, g_flog );
757 if( n < 0 )
758 error = n;
759 }
760
761 return (n > 0) ? n : error;
762 }
763
764
765 /* initialize incoming-stream context:
766 * set data type (if possible) and flags
767 */
768 int
init_dstream_ctx(struct dstream_ctx * ds,const char * cmd,const char * fname,ssize_t nmsgs)769 init_dstream_ctx( struct dstream_ctx* ds, const char* cmd, const char* fname,
770 ssize_t nmsgs )
771 {
772 extern const char CMD_UDP[];
773 extern const size_t CMD_UDP_LEN;
774 extern const char CMD_RTP[];
775 extern const size_t CMD_RTP_LEN;
776
777 assert( ds && cmd && (nmsgs > 0) );
778
779 ds->flags = 0;
780 ds->pkt = NULL;
781 ds->max_pkt = ds->pkt_count = 0;
782 ds->mtu = ETHERNET_MTU;
783
784 if( NULL != fname ) {
785 ds->stype = UPXDT_UNKNOWN;
786 ds->flags |= (F_CHECK_FMT | F_FILE_INPUT);
787 TRACE( (void)tmfputs( "File stream, RTP check enabled\n", g_flog ) );
788 }
789 else if( 0 == strncmp( cmd, CMD_UDP, CMD_UDP_LEN ) ) {
790 ds->stype = UPXDT_UNKNOWN;
791 ds->flags |= F_CHECK_FMT;
792 TRACE( (void)tmfputs( "UDP stream, RTP check enabled\n", g_flog ) );
793 }
794 else if( 0 == strncmp( cmd, CMD_RTP, CMD_RTP_LEN ) ) {
795 ds->stype = UPXDT_RTP_TS;
796 ds->flags |= F_SCATTERED;
797 TRACE( (void)tmfputs( "RTP (over UDP) stream assumed,"
798 " no checks\n", g_flog ) );
799 }
800 else {
801 TRACE( (void)tmfprintf( g_flog, "%s: "
802 "Irrelevant command [%s]\n", __func__, cmd) );
803 return -1;
804 }
805
806 ds->pkt = calloc( nmsgs, sizeof(ds->pkt[0]) );
807 if( NULL == ds->pkt ) {
808 mperror( g_flog, errno, "%s: calloc", __func__ );
809 return -1;
810 }
811
812 ds->max_pkt = nmsgs;
813 return 0;
814 }
815
816
817
818 /* __EOF__ */
819