1 /**
2 * @file stream.c Generic Media Stream
3 *
4 * Copyright (C) 2010 Creytiv.com
5 */
6 #include <string.h>
7 #include <time.h>
8 #include <re.h>
9 #include <baresip.h>
10 #include "core.h"
11
12
13 enum {
14 RTP_RECV_SIZE = 8192,
15 RTP_CHECK_INTERVAL = 1000 /* how often to check for RTP [ms] */
16 };
17
18
stream_close(struct stream * strm,int err)19 static void stream_close(struct stream *strm, int err)
20 {
21 stream_error_h *errorh = strm->errorh;
22
23 strm->terminated = true;
24 strm->errorh = NULL;
25
26 if (errorh) {
27 errorh(strm, err, strm->errorh_arg);
28 }
29 }
30
31
check_rtp_handler(void * arg)32 static void check_rtp_handler(void *arg)
33 {
34 struct stream *strm = arg;
35 const uint64_t now = tmr_jiffies();
36 int diff_ms;
37
38 tmr_start(&strm->tmr_rtp, RTP_CHECK_INTERVAL,
39 check_rtp_handler, strm);
40
41 /* If no RTP was received at all, check later */
42 if (!strm->ts_last)
43 return;
44
45 /* We are in sendrecv mode, check when the last RTP packet
46 * was received.
47 */
48 if (sdp_media_dir(strm->sdp) == SDP_SENDRECV) {
49
50 diff_ms = (int)(now - strm->ts_last);
51
52 debug("stream: last \"%s\" RTP packet: %d milliseconds\n",
53 sdp_media_name(strm->sdp), diff_ms);
54
55 /* check for large jumps in time */
56 if (diff_ms > (3600 * 1000)) {
57 strm->ts_last = 0;
58 return;
59 }
60
61 if (diff_ms > (int)strm->rtp_timeout_ms) {
62
63 info("stream: no %s RTP packets received for"
64 " %d milliseconds\n",
65 sdp_media_name(strm->sdp), diff_ms);
66
67 stream_close(strm, ETIMEDOUT);
68 }
69 }
70 else {
71 re_printf("check_rtp: not checking (dir=%s)\n",
72 sdp_dir_name(sdp_media_dir(strm->sdp)));
73 }
74 }
75
76
lostcalc(struct stream * s,uint16_t seq)77 static inline int lostcalc(struct stream *s, uint16_t seq)
78 {
79 const uint16_t delta = seq - s->pseq;
80 int lostc;
81
82 if (s->pseq == (uint32_t)-1)
83 lostc = 0;
84 else if (delta == 0)
85 return -1;
86 else if (delta < 3000)
87 lostc = delta - 1;
88 else if (delta < 0xff9c)
89 lostc = 0;
90 else
91 return -2;
92
93 s->pseq = seq;
94
95 return lostc;
96 }
97
98
print_rtp_stats(const struct stream * s)99 static void print_rtp_stats(const struct stream *s)
100 {
101 bool started = s->metric_tx.n_packets>0 || s->metric_rx.n_packets>0;
102
103 if (!started)
104 return;
105
106 info("\n%-9s Transmit: Receive:\n"
107 "packets: %7u %7u\n"
108 "avg. bitrate: %7.1f %7.1f (kbit/s)\n"
109 "errors: %7d %7d\n"
110 ,
111 sdp_media_name(s->sdp),
112 s->metric_tx.n_packets, s->metric_rx.n_packets,
113 1.0*metric_avg_bitrate(&s->metric_tx)/1000.0,
114 1.0*metric_avg_bitrate(&s->metric_rx)/1000.0,
115 s->metric_tx.n_err, s->metric_rx.n_err
116 );
117
118 if (s->rtcp_stats.tx.sent || s->rtcp_stats.rx.sent) {
119
120 info("pkt.report: %7u %7u\n"
121 "lost: %7d %7d\n"
122 "jitter: %7.1f %7.1f (ms)\n",
123 s->rtcp_stats.tx.sent, s->rtcp_stats.rx.sent,
124 s->rtcp_stats.tx.lost, s->rtcp_stats.rx.lost,
125 1.0*s->rtcp_stats.tx.jit/1000,
126 1.0*s->rtcp_stats.rx.jit/1000);
127 }
128 }
129
130
stream_destructor(void * arg)131 static void stream_destructor(void *arg)
132 {
133 struct stream *s = arg;
134
135 if (s->cfg.rtp_stats)
136 print_rtp_stats(s);
137
138 metric_reset(&s->metric_tx);
139 metric_reset(&s->metric_rx);
140
141 tmr_cancel(&s->tmr_rtp);
142 list_unlink(&s->le);
143 mem_deref(s->rtpkeep);
144 mem_deref(s->sdp);
145 mem_deref(s->mes);
146 mem_deref(s->mencs);
147 mem_deref(s->mns);
148 mem_deref(s->jbuf);
149 mem_deref(s->rtp);
150 mem_deref(s->cname);
151 }
152
153
handle_rtp(struct stream * s,const struct rtp_header * hdr,struct mbuf * mb)154 static void handle_rtp(struct stream *s, const struct rtp_header *hdr,
155 struct mbuf *mb)
156 {
157 struct rtpext extv[8];
158 size_t extc = 0;
159
160 /* RFC 5285 -- A General Mechanism for RTP Header Extensions */
161 if (hdr->ext && hdr->x.len && mb) {
162
163 const size_t pos = mb->pos;
164 const size_t end = mb->end;
165 const size_t ext_stop = mb->pos;
166 size_t ext_len;
167 size_t i;
168 int err;
169
170 if (hdr->x.type != RTPEXT_TYPE_MAGIC) {
171 info("stream: unknown ext type ignored (0x%04x)\n",
172 hdr->x.type);
173 goto handler;
174 }
175
176 ext_len = hdr->x.len*sizeof(uint32_t);
177 if (mb->pos < ext_len) {
178 warning("stream: corrupt rtp packet,"
179 " not enough space for rtpext of %zu bytes\n",
180 ext_len);
181 return;
182 }
183
184 mb->pos = mb->pos - ext_len;
185 mb->end = ext_stop;
186
187 for (i=0; i<ARRAY_SIZE(extv) && mbuf_get_left(mb); i++) {
188
189 err = rtpext_decode(&extv[i], mb);
190 if (err) {
191 warning("stream: rtpext_decode failed (%m)\n",
192 err);
193 return;
194 }
195 }
196
197 extc = i;
198
199 mb->pos = pos;
200 mb->end = end;
201 }
202
203 handler:
204 s->rtph(hdr, extv, extc, mb, s->arg);
205
206 }
207
208
rtp_handler(const struct sa * src,const struct rtp_header * hdr,struct mbuf * mb,void * arg)209 static void rtp_handler(const struct sa *src, const struct rtp_header *hdr,
210 struct mbuf *mb, void *arg)
211 {
212 struct stream *s = arg;
213 bool flush = false;
214 int err;
215
216 s->ts_last = tmr_jiffies();
217
218 if (!mbuf_get_left(mb))
219 return;
220
221 if (!(sdp_media_ldir(s->sdp) & SDP_RECVONLY))
222 return;
223
224 metric_add_packet(&s->metric_rx, mbuf_get_left(mb));
225
226 if (!s->rtp_estab) {
227 info("stream: incoming rtp for '%s' established"
228 ", receiving from %J\n",
229 sdp_media_name(s->sdp), src);
230 s->rtp_estab = true;
231 }
232
233 if (hdr->ssrc != s->ssrc_rx) {
234 if (s->ssrc_rx) {
235 flush = true;
236 info("stream: %s: SSRC changed %x -> %x"
237 " (%u bytes from %J)\n",
238 sdp_media_name(s->sdp), s->ssrc_rx, hdr->ssrc,
239 mbuf_get_left(mb), src);
240 }
241 s->ssrc_rx = hdr->ssrc;
242 }
243
244 if (s->jbuf) {
245
246 struct rtp_header hdr2;
247 void *mb2 = NULL;
248
249 /* Put frame in Jitter Buffer */
250 if (flush)
251 jbuf_flush(s->jbuf);
252
253 err = jbuf_put(s->jbuf, hdr, mb);
254 if (err) {
255 info("%s: dropping %u bytes from %J (%m)\n",
256 sdp_media_name(s->sdp), mb->end,
257 src, err);
258 s->metric_rx.n_err++;
259 }
260
261 if (jbuf_get(s->jbuf, &hdr2, &mb2)) {
262
263 if (!s->jbuf_started)
264 return;
265
266 memset(&hdr2, 0, sizeof(hdr2));
267 }
268
269 s->jbuf_started = true;
270
271 if (lostcalc(s, hdr2.seq) > 0)
272 handle_rtp(s, hdr, NULL);
273
274 handle_rtp(s, &hdr2, mb2);
275
276 mem_deref(mb2);
277 }
278 else {
279 if (lostcalc(s, hdr->seq) > 0)
280 handle_rtp(s, hdr, NULL);
281
282 handle_rtp(s, hdr, mb);
283 }
284 }
285
286
rtcp_handler(const struct sa * src,struct rtcp_msg * msg,void * arg)287 static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg)
288 {
289 struct stream *s = arg;
290 (void)src;
291
292 s->ts_last = tmr_jiffies();
293
294 if (s->rtcph)
295 s->rtcph(msg, s->arg);
296
297 switch (msg->hdr.pt) {
298
299 case RTCP_SR:
300 (void)rtcp_stats(s->rtp, msg->r.sr.ssrc, &s->rtcp_stats);
301
302 if (s->cfg.rtp_stats)
303 call_set_xrtpstat(s->call);
304
305 ua_event(call_get_ua(s->call), UA_EVENT_CALL_RTCP, s->call,
306 "%s", sdp_media_name(stream_sdpmedia(s)));
307 break;
308 }
309 }
310
311
stream_sock_alloc(struct stream * s,int af)312 static int stream_sock_alloc(struct stream *s, int af)
313 {
314 struct sa laddr;
315 int tos, err;
316
317 if (!s)
318 return EINVAL;
319
320 /* we listen on all interfaces */
321 sa_init(&laddr, af);
322
323 err = rtp_listen(&s->rtp, IPPROTO_UDP, &laddr,
324 s->cfg.rtp_ports.min, s->cfg.rtp_ports.max,
325 s->rtcp, rtp_handler, rtcp_handler, s);
326 if (err) {
327 warning("stream: rtp_listen failed: af=%s ports=%u-%u"
328 " (%m)\n", net_af2name(af),
329 s->cfg.rtp_ports.min, s->cfg.rtp_ports.max, err);
330 return err;
331 }
332
333 tos = s->cfg.rtp_tos;
334 (void)udp_setsockopt(rtp_sock(s->rtp), IPPROTO_IP, IP_TOS,
335 &tos, sizeof(tos));
336 (void)udp_setsockopt(rtcp_sock(s->rtp), IPPROTO_IP, IP_TOS,
337 &tos, sizeof(tos));
338
339 udp_rxsz_set(rtp_sock(s->rtp), RTP_RECV_SIZE);
340
341 return 0;
342 }
343
344
stream_alloc(struct stream ** sp,const struct stream_param * prm,const struct config_avt * cfg,struct call * call,struct sdp_session * sdp_sess,const char * name,int label,const struct mnat * mnat,struct mnat_sess * mnat_sess,const struct menc * menc,struct menc_sess * menc_sess,const char * cname,stream_rtp_h * rtph,stream_rtcp_h * rtcph,void * arg)345 int stream_alloc(struct stream **sp, const struct stream_param *prm,
346 const struct config_avt *cfg,
347 struct call *call, struct sdp_session *sdp_sess,
348 const char *name, int label,
349 const struct mnat *mnat, struct mnat_sess *mnat_sess,
350 const struct menc *menc, struct menc_sess *menc_sess,
351 const char *cname,
352 stream_rtp_h *rtph, stream_rtcp_h *rtcph, void *arg)
353 {
354 struct stream *s;
355 int err;
356
357 if (!sp || !prm || !cfg || !call || !rtph)
358 return EINVAL;
359
360 s = mem_zalloc(sizeof(*s), stream_destructor);
361 if (!s)
362 return ENOMEM;
363
364 s->cfg = *cfg;
365 s->call = call;
366 s->rtph = rtph;
367 s->rtcph = rtcph;
368 s->arg = arg;
369 s->pseq = -1;
370 s->rtcp = s->cfg.rtcp_enable;
371
372 if (prm->use_rtp) {
373 err = stream_sock_alloc(s, call_af(call));
374 if (err) {
375 warning("stream: failed to create socket"
376 " for media '%s' (%m)\n", name, err);
377 goto out;
378 }
379 }
380
381 err = str_dup(&s->cname, cname);
382 if (err)
383 goto out;
384
385 /* Jitter buffer */
386 if (cfg->jbuf_del.min && cfg->jbuf_del.max) {
387
388 err = jbuf_alloc(&s->jbuf, cfg->jbuf_del.min,
389 cfg->jbuf_del.max);
390 if (err)
391 goto out;
392 }
393
394 err = sdp_media_add(&s->sdp, sdp_sess, name,
395 s->rtp ? sa_port(rtp_local(s->rtp)) : 9,
396 (menc && menc->sdp_proto) ? menc->sdp_proto :
397 sdp_proto_rtpavp);
398 if (err)
399 goto out;
400
401 if (label) {
402 err |= sdp_media_set_lattr(s->sdp, true,
403 "label", "%d", label);
404 }
405
406 /* RFC 5506 */
407 if (s->rtcp)
408 err |= sdp_media_set_lattr(s->sdp, true, "rtcp-rsize", NULL);
409
410 /* RFC 5576 */
411 if (s->rtcp) {
412 err |= sdp_media_set_lattr(s->sdp, true,
413 "ssrc", "%u cname:%s",
414 rtp_sess_ssrc(s->rtp), cname);
415 }
416
417 /* RFC 5761 */
418 if (cfg->rtcp_mux)
419 err |= sdp_media_set_lattr(s->sdp, true, "rtcp-mux", NULL);
420
421 if (err)
422 goto out;
423
424 if (mnat && s->rtp) {
425 err = mnat->mediah(&s->mns, mnat_sess, IPPROTO_UDP,
426 rtp_sock(s->rtp),
427 s->rtcp ? rtcp_sock(s->rtp) : NULL,
428 s->sdp);
429 if (err)
430 goto out;
431 }
432
433 if (menc && s->rtp) {
434 s->menc = menc;
435 s->mencs = mem_ref(menc_sess);
436 err = menc->mediah(&s->mes, menc_sess,
437 s->rtp,
438 IPPROTO_UDP,
439 rtp_sock(s->rtp),
440 s->rtcp ? rtcp_sock(s->rtp) : NULL,
441 s->sdp);
442 if (err)
443 goto out;
444 }
445
446 if (err)
447 goto out;
448
449 s->pt_enc = -1;
450
451 metric_init(&s->metric_tx);
452 metric_init(&s->metric_rx);
453
454 list_append(call_streaml(call), &s->le, s);
455
456 out:
457 if (err)
458 mem_deref(s);
459 else
460 *sp = s;
461
462 return err;
463 }
464
465
stream_sdpmedia(const struct stream * s)466 struct sdp_media *stream_sdpmedia(const struct stream *s)
467 {
468 return s ? s->sdp : NULL;
469 }
470
471
stream_start_keepalive(struct stream * s)472 static void stream_start_keepalive(struct stream *s)
473 {
474 const char *rtpkeep;
475
476 if (!s)
477 return;
478
479 rtpkeep = call_account(s->call)->rtpkeep;
480
481 s->rtpkeep = mem_deref(s->rtpkeep);
482
483 if (rtpkeep && sdp_media_rformat(s->sdp, NULL)) {
484 int err;
485 err = rtpkeep_alloc(&s->rtpkeep, rtpkeep,
486 IPPROTO_UDP, s->rtp, s->sdp);
487 if (err) {
488 warning("stream: rtpkeep_alloc failed: %m\n", err);
489 }
490 }
491 }
492
493
stream_send(struct stream * s,bool ext,bool marker,int pt,uint32_t ts,struct mbuf * mb)494 int stream_send(struct stream *s, bool ext, bool marker, int pt, uint32_t ts,
495 struct mbuf *mb)
496 {
497 int err = 0;
498
499 if (!s)
500 return EINVAL;
501
502 if (!sa_isset(sdp_media_raddr(s->sdp), SA_ALL))
503 return 0;
504 if (sdp_media_dir(s->sdp) != SDP_SENDRECV)
505 return 0;
506
507 metric_add_packet(&s->metric_tx, mbuf_get_left(mb));
508
509 if (pt < 0)
510 pt = s->pt_enc;
511
512 if (pt >= 0) {
513 err = rtp_send(s->rtp, sdp_media_raddr(s->sdp), ext,
514 marker, pt, ts, mb);
515 if (err)
516 s->metric_tx.n_err++;
517 }
518
519 rtpkeep_refresh(s->rtpkeep, ts);
520
521 return err;
522 }
523
524
stream_remote_set(struct stream * s)525 static void stream_remote_set(struct stream *s)
526 {
527 struct sa rtcp;
528
529 if (!s)
530 return;
531
532 /* RFC 5761 */
533 if (s->cfg.rtcp_mux && sdp_media_rattr(s->sdp, "rtcp-mux")) {
534
535 if (!s->rtcp_mux)
536 info("%s: RTP/RTCP multiplexing enabled\n",
537 sdp_media_name(s->sdp));
538 s->rtcp_mux = true;
539 }
540
541 rtcp_enable_mux(s->rtp, s->rtcp_mux);
542
543 sdp_media_raddr_rtcp(s->sdp, &rtcp);
544
545 rtcp_start(s->rtp, s->cname,
546 s->rtcp_mux ? sdp_media_raddr(s->sdp): &rtcp);
547 }
548
549
stream_update(struct stream * s)550 void stream_update(struct stream *s)
551 {
552 const struct sdp_format *fmt;
553 int err = 0;
554
555 if (!s)
556 return;
557
558 fmt = sdp_media_rformat(s->sdp, NULL);
559
560 s->pt_enc = fmt ? fmt->pt : -1;
561
562 if (sdp_media_has_media(s->sdp))
563 stream_remote_set(s);
564
565 if (s->menc && s->menc->mediah) {
566 err = s->menc->mediah(&s->mes, s->mencs, s->rtp,
567 IPPROTO_UDP,
568 rtp_sock(s->rtp),
569 s->rtcp ? rtcp_sock(s->rtp) : NULL,
570 s->sdp);
571 if (err) {
572 warning("stream: mediaenc update: %m\n", err);
573 }
574 }
575 }
576
577
stream_update_encoder(struct stream * s,int pt_enc)578 void stream_update_encoder(struct stream *s, int pt_enc)
579 {
580 if (!s)
581 return;
582
583 if (pt_enc >= 0)
584 s->pt_enc = pt_enc;
585 }
586
587
stream_jbuf_stat(struct re_printf * pf,const struct stream * s)588 int stream_jbuf_stat(struct re_printf *pf, const struct stream *s)
589 {
590 struct jbuf_stat stat;
591 int err;
592
593 if (!s)
594 return EINVAL;
595
596 err = re_hprintf(pf, " %s:", sdp_media_name(s->sdp));
597
598 err |= jbuf_stats(s->jbuf, &stat);
599 if (err) {
600 err = re_hprintf(pf, "Jbuf stat: (not available)");
601 }
602 else {
603 err = re_hprintf(pf, "Jbuf stat: put=%u get=%u or=%u ur=%u",
604 stat.n_put, stat.n_get,
605 stat.n_overflow, stat.n_underflow);
606 }
607
608 return err;
609 }
610
611
stream_hold(struct stream * s,bool hold)612 void stream_hold(struct stream *s, bool hold)
613 {
614 if (!s)
615 return;
616
617 sdp_media_set_ldir(s->sdp, hold ? SDP_SENDONLY : SDP_SENDRECV);
618 }
619
620
stream_set_srate(struct stream * s,uint32_t srate_tx,uint32_t srate_rx)621 void stream_set_srate(struct stream *s, uint32_t srate_tx, uint32_t srate_rx)
622 {
623 if (!s)
624 return;
625
626 rtcp_set_srate(s->rtp, srate_tx, srate_rx);
627 }
628
629
stream_send_fir(struct stream * s,bool pli)630 void stream_send_fir(struct stream *s, bool pli)
631 {
632 int err;
633
634 if (!s)
635 return;
636
637 if (pli)
638 err = rtcp_send_pli(s->rtp, s->ssrc_rx);
639 else
640 err = rtcp_send_fir(s->rtp, rtp_sess_ssrc(s->rtp));
641
642 if (err) {
643 s->metric_tx.n_err++;
644
645 warning("stream: failed to send RTCP %s: %m\n",
646 pli ? "PLI" : "FIR", err);
647 }
648 }
649
650
stream_reset(struct stream * s)651 void stream_reset(struct stream *s)
652 {
653 if (!s)
654 return;
655
656 jbuf_flush(s->jbuf);
657
658 stream_start_keepalive(s);
659 }
660
661
stream_set_bw(struct stream * s,uint32_t bps)662 void stream_set_bw(struct stream *s, uint32_t bps)
663 {
664 if (!s)
665 return;
666
667 sdp_media_set_lbandwidth(s->sdp, SDP_BANDWIDTH_AS, bps / 1000);
668 }
669
670
stream_enable_rtp_timeout(struct stream * strm,uint32_t timeout_ms)671 void stream_enable_rtp_timeout(struct stream *strm, uint32_t timeout_ms)
672 {
673 if (!strm)
674 return;
675
676 strm->rtp_timeout_ms = timeout_ms;
677
678 tmr_cancel(&strm->tmr_rtp);
679
680 if (timeout_ms) {
681
682 info("stream: Enable RTP timeout (%u milliseconds)\n",
683 timeout_ms);
684
685 strm->ts_last = tmr_jiffies();
686 tmr_start(&strm->tmr_rtp, 10, check_rtp_handler, strm);
687 }
688 }
689
690
stream_set_error_handler(struct stream * strm,stream_error_h * errorh,void * arg)691 void stream_set_error_handler(struct stream *strm,
692 stream_error_h *errorh, void *arg)
693 {
694 if (!strm)
695 return;
696
697 strm->errorh = errorh;
698 strm->errorh_arg = arg;
699 }
700
701
stream_debug(struct re_printf * pf,const struct stream * s)702 int stream_debug(struct re_printf *pf, const struct stream *s)
703 {
704 struct sa rrtcp;
705 int err;
706
707 if (!s)
708 return 0;
709
710 err = re_hprintf(pf, " %s dir=%s pt_enc=%d\n", sdp_media_name(s->sdp),
711 sdp_dir_name(sdp_media_dir(s->sdp)),
712 s->pt_enc);
713
714 sdp_media_raddr_rtcp(s->sdp, &rrtcp);
715 err |= re_hprintf(pf, " local: %J, remote: %J/%J\n",
716 sdp_media_laddr(s->sdp),
717 sdp_media_raddr(s->sdp), &rrtcp);
718
719 err |= rtp_debug(pf, s->rtp);
720 err |= jbuf_debug(pf, s->jbuf);
721
722 return err;
723 }
724
725
stream_print(struct re_printf * pf,const struct stream * s)726 int stream_print(struct re_printf *pf, const struct stream *s)
727 {
728 if (!s)
729 return 0;
730
731 return re_hprintf(pf, " %s=%u/%u", sdp_media_name(s->sdp),
732 s->metric_tx.cur_bitrate,
733 s->metric_rx.cur_bitrate);
734 }
735
736
stream_rtcp_stats(const struct stream * strm)737 const struct rtcp_stats *stream_rtcp_stats(const struct stream *strm)
738 {
739 return strm ? &strm->rtcp_stats : NULL;
740 }
741