1 /**
2  * @file rtp/sess.c  Real-time Transport Control Protocol Session
3  *
4  * Copyright (C) 2010 Creytiv.com
5  */
6 #ifdef HAVE_SYS_TIME_H
7 #include <sys/time.h>
8 #endif
9 #include <time.h>
10 #ifdef WIN32
11 #include <winsock2.h>
12 #endif
13 #include <string.h>
14 #include <re_types.h>
15 #include <re_fmt.h>
16 #include <re_mem.h>
17 #include <re_mbuf.h>
18 #include <re_list.h>
19 #include <re_hash.h>
20 #include <re_tmr.h>
21 #include <re_sa.h>
22 #include <re_lock.h>
23 #include <re_rtp.h>
24 #include "rtcp.h"
25 
26 
27 #define DEBUG_MODULE "rtcp_sess"
28 #define DEBUG_LEVEL 5
29 #include <re_dbg.h>
30 
31 
32 /** RTP protocol values */
33 enum {
34 	RTCP_INTERVAL = 5000,  /**< Interval in [ms] between sending reports */
35 	MAX_MEMBERS   = 8,
36 };
37 
38 /** RTP Transmit stats */
39 struct txstat {
40 	uint32_t psent;      /**< Total number of RTP packets sent */
41 	uint32_t osent;      /**< Total number of RTP octets  sent */
42 	uint64_t jfs_ref;    /**< Timer ticks at RTP timestamp reference */
43 	uint32_t ts_ref;     /**< RTP timestamp reference (transmit)     */
44 	bool ts_synced;      /**< RTP timestamp synchronization flag     */
45 };
46 
47 /** RTCP Session */
48 struct rtcp_sess {
49 	struct rtp_sock *rs;        /**< RTP Socket                          */
50 	struct hash *members;       /**< Member table                        */
51 	struct tmr tmr;             /**< Event sender timer                  */
52 	char *cname;                /**< Canonical Name                      */
53 	uint32_t memberc;           /**< Number of members                   */
54 	uint32_t senderc;           /**< Number of senders                   */
55 	uint32_t srate_tx;          /**< Transmit sampling rate              */
56 	uint32_t srate_rx;          /**< Receive sampling rate               */
57 
58 	/* stats */
59 	struct lock *lock;          /**< Lock for txstat                     */
60 	struct txstat txstat;       /**< Local transmit statistics           */
61 };
62 
63 
64 /* Prototypes */
65 static void schedule(struct rtcp_sess *sess);
66 static int  send_bye_packet(struct rtcp_sess *sess);
67 
68 
sess_destructor(void * data)69 static void sess_destructor(void *data)
70 {
71 	struct rtcp_sess *sess = data;
72 
73 	if (sess->cname)
74 		(void)send_bye_packet(sess);
75 
76 	tmr_cancel(&sess->tmr);
77 
78 	mem_deref(sess->cname);
79 	hash_flush(sess->members);
80 	mem_deref(sess->members);
81 	mem_deref(sess->lock);
82 }
83 
84 
get_member(struct rtcp_sess * sess,uint32_t src)85 static struct rtp_member *get_member(struct rtcp_sess *sess, uint32_t src)
86 {
87 	struct rtp_member *mbr;
88 
89 	mbr = member_find(sess->members, src);
90 	if (mbr)
91 		return mbr;
92 
93 	if (sess->memberc >= MAX_MEMBERS)
94 		return NULL;
95 
96 	mbr = member_add(sess->members, src);
97 	if (!mbr)
98 		return NULL;
99 
100 	++sess->memberc;
101 
102 	return mbr;
103 }
104 
105 
106 /** Calculate Round-Trip Time in [microseconds] */
calc_rtt(uint32_t * rtt,uint32_t lsr,uint32_t dlsr)107 static void calc_rtt(uint32_t *rtt, uint32_t lsr, uint32_t dlsr)
108 {
109 	struct ntp_time ntp_time;
110 	uint64_t a_us, lsr_us, dlsr_us;
111 	int err;
112 
113 	err = ntp_time_get(&ntp_time);
114 	if (err)
115 		return;
116 
117 	a_us    = ntp_compact2us(ntp_compact(&ntp_time));
118 	lsr_us  = ntp_compact2us(lsr);
119 	dlsr_us = 1000000ULL * dlsr / 65536;
120 
121 	/* RTT delay is (A - LSR - DLSR) */
122 	*rtt = MAX((int)(a_us - lsr_us - dlsr_us), 0);
123 }
124 
125 
126 /** Decode Reception Report block */
handle_rr_block(struct rtcp_sess * sess,struct rtp_member * mbr,const struct rtcp_rr * rr)127 static void handle_rr_block(struct rtcp_sess *sess, struct rtp_member *mbr,
128 			    const struct rtcp_rr *rr)
129 {
130 	/* Lost */
131 	mbr->cum_lost = rr->lost;
132 
133 	/* Interarrival jitter */
134 	if (sess->srate_tx)
135 		mbr->jit = 1000000 * rr->jitter / sess->srate_tx;
136 
137 	/* round-trip propagation delay as (A - LSR - DLSR) */
138 	if (rr->lsr && rr->dlsr)
139 		calc_rtt(&mbr->rtt, rr->lsr, rr->dlsr);
140 }
141 
142 
143 /** Handle incoming RR (Receiver Report) packet */
handle_incoming_rr(struct rtcp_sess * sess,const struct rtcp_msg * msg)144 static void handle_incoming_rr(struct rtcp_sess *sess,
145 			       const struct rtcp_msg *msg)
146 {
147 	struct rtp_member *mbr;
148 	uint32_t i;
149 
150 	mbr = get_member(sess, msg->r.rr.ssrc);
151 	if (!mbr)
152 		return;
153 
154 	for (i=0; i<msg->hdr.count; i++)
155 		handle_rr_block(sess, mbr, &msg->r.rr.rrv[i]);
156 }
157 
158 
159 /** Handle incoming SR (Sender Report) packet */
handle_incoming_sr(struct rtcp_sess * sess,const struct rtcp_msg * msg)160 static void handle_incoming_sr(struct rtcp_sess *sess,
161 			       const struct rtcp_msg *msg)
162 {
163 	struct rtp_member *mbr;
164 	uint32_t i;
165 
166 	mbr = get_member(sess, msg->r.sr.ssrc);
167 	if (!mbr) {
168 		DEBUG_WARNING("0x%08x: could not add member\n",
169 			      msg->r.sr.ssrc);
170 		return;
171 	}
172 
173 	if (mbr->s) {
174 		/* Save time when SR was received */
175 		mbr->s->sr_recv = tmr_jiffies();
176 
177 		/* Save NTP timestamp from SR */
178 		mbr->s->last_sr.hi = msg->r.sr.ntp_sec;
179 		mbr->s->last_sr.lo = msg->r.sr.ntp_frac;
180 		mbr->s->rtp_ts     = msg->r.sr.rtp_ts;
181 		mbr->s->psent      = msg->r.sr.psent;
182 		mbr->s->osent      = msg->r.sr.osent;
183 	}
184 
185 	for (i=0; i<msg->hdr.count; i++)
186 		handle_rr_block(sess, mbr, &msg->r.sr.rrv[i]);
187 }
188 
189 
handle_incoming_bye(struct rtcp_sess * sess,const struct rtcp_msg * msg)190 static void handle_incoming_bye(struct rtcp_sess *sess,
191 				const struct rtcp_msg *msg)
192 {
193 	uint32_t i;
194 
195 	for (i=0; i<msg->hdr.count; i++) {
196 
197 		struct rtp_member *mbr;
198 
199 		mbr = member_find(sess->members, msg->r.bye.srcv[i]);
200 		if (mbr) {
201 			if (mbr->s)
202 				--sess->senderc;
203 
204 			--sess->memberc;
205 			mem_deref(mbr);
206 		}
207 	}
208 }
209 
210 
rtcp_handler(struct rtcp_sess * sess,struct rtcp_msg * msg)211 void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg)
212 {
213 	if (!sess || !msg)
214 		return;
215 
216 	switch (msg->hdr.pt) {
217 
218 	case RTCP_SR:
219 		handle_incoming_sr(sess, msg);
220 		break;
221 
222 	case RTCP_RR:
223 		handle_incoming_rr(sess, msg);
224 		break;
225 
226 	case RTCP_BYE:
227 		handle_incoming_bye(sess, msg);
228 		break;
229 
230 	default:
231 		break;
232 	}
233 }
234 
235 
rtcp_sess_alloc(struct rtcp_sess ** sessp,struct rtp_sock * rs)236 int rtcp_sess_alloc(struct rtcp_sess **sessp, struct rtp_sock *rs)
237 {
238 	struct rtcp_sess *sess;
239 	int err;
240 
241 	if (!sessp)
242 		return EINVAL;
243 
244 	sess = mem_zalloc(sizeof(*sess), sess_destructor);
245 	if (!sess)
246 		return ENOMEM;
247 
248 	sess->rs = rs;
249 	tmr_init(&sess->tmr);
250 
251 	err = lock_alloc(&sess->lock);
252 	if (err)
253 		goto out;
254 
255 	err  = hash_alloc(&sess->members, MAX_MEMBERS);
256 	if (err)
257 		goto out;
258 
259  out:
260 	if (err)
261 		mem_deref(sess);
262 	else
263 		*sessp = sess;
264 
265 	return err;
266 }
267 
268 
269 /**
270  * Set the Sampling-rate on an RTCP Session
271  *
272  * @param rs       RTP Socket
273  * @param srate_tx Transmit samplerate
274  * @param srate_rx Receive samplerate
275  */
rtcp_set_srate(struct rtp_sock * rs,uint32_t srate_tx,uint32_t srate_rx)276 void rtcp_set_srate(struct rtp_sock *rs, uint32_t srate_tx, uint32_t srate_rx)
277 {
278 	struct rtcp_sess *sess = rtp_rtcp_sess(rs);
279 	if (!sess)
280 		return;
281 
282 	sess->srate_tx = srate_tx;
283 	sess->srate_rx = srate_rx;
284 }
285 
286 
287 /**
288  * Set the transmit Sampling-rate on an RTCP Session
289  *
290  * @param rs       RTP Socket
291  * @param srate_tx Transmit samplerate
292  */
rtcp_set_srate_tx(struct rtp_sock * rs,uint32_t srate_tx)293 void rtcp_set_srate_tx(struct rtp_sock *rs, uint32_t srate_tx)
294 {
295 	struct rtcp_sess *sess = rtp_rtcp_sess(rs);
296 	if (!sess)
297 		return;
298 
299 	sess->srate_tx = srate_tx;
300 }
301 
302 
303 /**
304  * Set the receive Sampling-rate on an RTCP Session
305  *
306  * @param rs       RTP Socket
307  * @param srate_rx Receive samplerate
308  */
rtcp_set_srate_rx(struct rtp_sock * rs,uint32_t srate_rx)309 void rtcp_set_srate_rx(struct rtp_sock *rs, uint32_t srate_rx)
310 {
311 	struct rtcp_sess *sess = rtp_rtcp_sess(rs);
312 	if (!sess)
313 		return;
314 
315 	sess->srate_rx = srate_rx;
316 }
317 
318 
rtcp_enable(struct rtcp_sess * sess,bool enabled,const char * cname)319 int rtcp_enable(struct rtcp_sess *sess, bool enabled, const char *cname)
320 {
321 	int err;
322 
323 	if (!sess)
324 		return EINVAL;
325 
326 	sess->cname = mem_deref(sess->cname);
327 	err = str_dup(&sess->cname, cname);
328 	if (err)
329 		return err;
330 
331 	if (enabled)
332 		schedule(sess);
333 	else
334 		tmr_cancel(&sess->tmr);
335 
336 	return 0;
337 }
338 
339 
340 /** Calculate LSR (middle 32 bits out of 64 in the NTP timestamp) */
calc_lsr(const struct ntp_time * last_sr)341 static uint32_t calc_lsr(const struct ntp_time *last_sr)
342 {
343 	return last_sr->hi ? ntp_compact(last_sr) : 0;
344 }
345 
346 
calc_dlsr(uint64_t sr_recv)347 static uint32_t calc_dlsr(uint64_t sr_recv)
348 {
349 	if (sr_recv) {
350 		const uint64_t diff = tmr_jiffies() - sr_recv;
351 		return (uint32_t)((65536 * diff) / 1000);
352 	}
353 	else {
354 		return 0;
355 	}
356 }
357 
358 
sender_apply_handler(struct le * le,void * arg)359 static bool sender_apply_handler(struct le *le, void *arg)
360 {
361 	struct rtp_member *mbr = le->data;
362 	struct rtp_source *s = mbr->s;
363 	struct mbuf *mb = arg;
364 	struct rtcp_rr rr;
365 
366 	if (!s)
367 		return false;
368 
369 	/* Initialise the members */
370 	rr.ssrc     = mbr->src;
371 	rr.fraction = source_calc_fraction_lost(s);
372 	rr.lost     = source_calc_lost(s);
373 	rr.last_seq = s->cycles | s->max_seq;
374 	rr.jitter   = s->jitter >> 4;
375 	rr.lsr      = calc_lsr(&s->last_sr);
376 	rr.dlsr     = calc_dlsr(s->sr_recv);
377 
378 	return 0 != rtcp_rr_encode(mb, &rr);
379 }
380 
381 
encode_handler(struct mbuf * mb,void * arg)382 static int encode_handler(struct mbuf *mb, void *arg)
383 {
384 	struct hash *members = arg;
385 
386 	/* copy all report blocks */
387 	if (hash_apply(members, sender_apply_handler, mb))
388 		return ENOMEM;
389 
390 	return 0;
391 }
392 
393 
394 /** Create a Sender Report */
mk_sr(struct rtcp_sess * sess,struct mbuf * mb)395 static int mk_sr(struct rtcp_sess *sess, struct mbuf *mb)
396 {
397 	struct ntp_time ntp = {0, 0};
398 	struct txstat txstat;
399 	uint32_t dur, rtp_ts = 0;
400 	int err;
401 
402 	err = ntp_time_get(&ntp);
403 	if (err)
404 		return err;
405 
406 	lock_write_get(sess->lock);
407 	txstat = sess->txstat;
408 	sess->txstat.ts_synced = false;
409 	lock_rel(sess->lock);
410 
411 	if (txstat.jfs_ref) {
412 		dur = (uint32_t)(tmr_jiffies() - txstat.jfs_ref);
413 		rtp_ts = txstat.ts_ref + dur * sess->srate_tx / 1000;
414 	}
415 
416 	err = rtcp_encode(mb, RTCP_SR, sess->senderc, rtp_sess_ssrc(sess->rs),
417 			  ntp.hi, ntp.lo, rtp_ts, txstat.psent, txstat.osent,
418 			  encode_handler, sess->members);
419 	if (err)
420 		return err;
421 
422 	return err;
423 }
424 
425 
sdes_encode_handler(struct mbuf * mb,void * arg)426 static int sdes_encode_handler(struct mbuf *mb, void *arg)
427 {
428 	struct rtcp_sess *sess = arg;
429 
430 	return rtcp_sdes_encode(mb, rtp_sess_ssrc(sess->rs), 1,
431 				RTCP_SDES_CNAME, sess->cname);
432 }
433 
434 
mk_sdes(struct rtcp_sess * sess,struct mbuf * mb)435 static int mk_sdes(struct rtcp_sess *sess, struct mbuf *mb)
436 {
437 	return rtcp_encode(mb, RTCP_SDES, 1, sdes_encode_handler, sess);
438 }
439 
440 
send_rtcp_report(struct rtcp_sess * sess)441 static int send_rtcp_report(struct rtcp_sess *sess)
442 {
443 	struct mbuf *mb;
444 	int err;
445 
446 	mb = mbuf_alloc(512);
447 	if (!mb)
448 		return ENOMEM;
449 
450 	mb->pos = RTCP_HEADROOM;
451 
452 	err  = mk_sr(sess, mb);
453 	err |= mk_sdes(sess, mb);
454 	if (err)
455 		goto out;
456 
457 	mb->pos = RTCP_HEADROOM;
458 
459 	err = rtcp_send(sess->rs, mb);
460 
461  out:
462 	mem_deref(mb);
463 	return err;
464 }
465 
466 
send_bye_packet(struct rtcp_sess * sess)467 static int send_bye_packet(struct rtcp_sess *sess)
468 {
469 	const uint32_t ssrc = rtp_sess_ssrc(sess->rs);
470 	struct mbuf *mb;
471 	int err;
472 
473 	mb = mbuf_alloc(512);
474 	if (!mb)
475 		return ENOMEM;
476 
477 	mb->pos = RTCP_HEADROOM;
478 
479 	err  = rtcp_encode(mb, RTCP_BYE, 1, &ssrc, "Adjo");
480 	err |= mk_sdes(sess, mb);
481 	if (err)
482 		goto out;
483 
484 	mb->pos = RTCP_HEADROOM;
485 
486 	err = rtcp_send(sess->rs, mb);
487 
488  out:
489 	mem_deref(mb);
490 	return err;
491 }
492 
493 
timeout(void * arg)494 static void timeout(void *arg)
495 {
496 	struct rtcp_sess *sess = arg;
497 	int err;
498 
499 	err = send_rtcp_report(sess);
500 	if (err) {
501 		DEBUG_WARNING("Send RTCP report failed: %m\n", err);
502 	}
503 
504 	schedule(sess);
505 }
506 
507 
schedule(struct rtcp_sess * sess)508 static void schedule(struct rtcp_sess *sess)
509 {
510 	tmr_start(&sess->tmr, RTCP_INTERVAL, timeout, sess);
511 }
512 
513 
rtcp_sess_tx_rtp(struct rtcp_sess * sess,uint32_t ts,size_t payload_size)514 void rtcp_sess_tx_rtp(struct rtcp_sess *sess, uint32_t ts, size_t payload_size)
515 {
516 	if (!sess)
517 		return;
518 
519 	lock_write_get(sess->lock);
520 
521 	sess->txstat.osent += (uint32_t)payload_size;
522 	sess->txstat.psent += 1;
523 
524 	if (!sess->txstat.ts_synced) {
525 		sess->txstat.jfs_ref   = tmr_jiffies();
526 		sess->txstat.ts_ref    = ts;
527 		sess->txstat.ts_synced = true;
528 	}
529 
530 	lock_rel(sess->lock);
531 }
532 
533 
rtcp_sess_rx_rtp(struct rtcp_sess * sess,uint16_t seq,uint32_t ts,uint32_t ssrc,size_t payload_size,const struct sa * peer)534 void rtcp_sess_rx_rtp(struct rtcp_sess *sess, uint16_t seq, uint32_t ts,
535 		      uint32_t ssrc, size_t payload_size,
536 		      const struct sa *peer)
537 {
538 	struct rtp_member *mbr;
539 
540 	if (!sess)
541 		return;
542 
543 	mbr = get_member(sess, ssrc);
544 	if (!mbr) {
545 		DEBUG_NOTICE("could not add member: 0x%08x\n", ssrc);
546 		return;
547 	}
548 
549 	if (!mbr->s) {
550 		mbr->s = mem_zalloc(sizeof(*mbr->s), NULL);
551 		if (!mbr->s) {
552 			DEBUG_NOTICE("could not add sender: 0x%08x\n", ssrc);
553 			return;
554 		}
555 
556 		/* first packet - init sequence number */
557 		source_init_seq(mbr->s, seq);
558 		/* probation not used */
559 		sa_cpy(&mbr->s->rtp_peer, peer);
560 		++sess->senderc;
561 	}
562 
563 	if (!source_update_seq(mbr->s, seq)) {
564 		DEBUG_WARNING("rtp_update_seq() returned 0\n");
565 	}
566 
567 	if (sess->srate_rx) {
568 
569 		uint64_t ts_arrive;
570 
571 		/* Convert from wall-clock time to timestamp units */
572 		ts_arrive = tmr_jiffies() * sess->srate_rx / 1000;
573 
574 		source_calc_jitter(mbr->s, ts, (uint32_t)ts_arrive);
575 	}
576 
577 	mbr->s->rtp_rx_bytes += payload_size;
578 }
579 
580 
581 /**
582  * Get the RTCP Statistics for a source
583  *
584  * @param rs    RTP Socket
585  * @param ssrc  Synchronization source
586  * @param stats RTCP Statistics, set on return
587  *
588  * @return 0 if success, otherwise errorcode
589  */
rtcp_stats(struct rtp_sock * rs,uint32_t ssrc,struct rtcp_stats * stats)590 int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats)
591 {
592 	const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
593 	struct rtp_member *mbr;
594 
595 	if (!sess || !stats)
596 		return EINVAL;
597 
598 	mbr = member_find(sess->members, ssrc);
599 	if (!mbr)
600 		return ENOENT;
601 
602 	lock_read_get(sess->lock);
603 	stats->tx.sent = sess->txstat.psent;
604 	lock_rel(sess->lock);
605 
606 	stats->tx.lost = mbr->cum_lost;
607 	stats->tx.jit  = mbr->jit;
608 
609 	stats->rtt = mbr->rtt;
610 
611 	if (!mbr->s) {
612 		memset(&stats->rx, 0, sizeof(stats->rx));
613 		return 0;
614 	}
615 
616 	stats->rx.sent = mbr->s->received;
617 	stats->rx.lost = source_calc_lost(mbr->s);
618 	stats->rx.jit  = sess->srate_rx ?
619 		1000000 * (mbr->s->jitter>>4) / sess->srate_rx : 0;
620 
621 	return 0;
622 }
623 
624 
debug_handler(struct le * le,void * arg)625 static bool debug_handler(struct le *le, void *arg)
626 {
627 	const struct rtp_member *mbr = le->data;
628 	struct re_printf *pf = arg;
629 	int err;
630 
631 	err = re_hprintf(pf, "  member 0x%08x: lost=%d Jitter=%.1fms"
632 			  " RTT=%.1fms\n", mbr->src, mbr->cum_lost,
633 			  (double)mbr->jit/1000, (double)mbr->rtt/1000);
634 	if (mbr->s) {
635 		err |= re_hprintf(pf,
636 				  "                 IP=%J psent=%u rcvd=%u\n",
637 				  &mbr->s->rtp_peer, mbr->s->psent,
638 				  mbr->s->received);
639 	}
640 
641 	return err != 0;
642 }
643 
644 
645 /**
646  * RTCP Debug handler, use with fmt %H
647  *
648  * @param pf Print function
649  * @param rs RTP Socket
650  *
651  * @return 0 if success, otherwise errorcode
652  */
rtcp_debug(struct re_printf * pf,const struct rtp_sock * rs)653 int rtcp_debug(struct re_printf *pf, const struct rtp_sock *rs)
654 {
655 	const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
656 	int err = 0;
657 
658 	if (!sess)
659 		return 0;
660 
661 	err |= re_hprintf(pf, "----- RTCP Session: -----\n");
662 	err |= re_hprintf(pf, "  cname=%s SSRC=0x%08x/%u rx=%uHz\n",
663 			  sess->cname,
664 			  rtp_sess_ssrc(sess->rs), rtp_sess_ssrc(sess->rs),
665 			  sess->srate_rx);
666 
667 	hash_apply(sess->members, debug_handler, pf);
668 
669 	lock_read_get(sess->lock);
670 	err |= re_hprintf(pf, "  TX: packets=%u, octets=%u\n",
671 			  sess->txstat.psent, sess->txstat.osent);
672 	lock_rel(sess->lock);
673 
674 	return err;
675 }
676