1 /*
2  * The oRTP library is an RTP (Realtime Transport Protocol - rfc3550) implementation with additional features.
3  * Copyright (C) 2017 Belledonne Communications SARL
4  *
5  *  This program is free software; you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation; either version 2 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, write to the Free Software
17  *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19 
20 
21 #ifdef HAVE_CONFIG_H
22 #include "ortp-config.h"
23 #endif
24 
25 #include "ortp/ortp.h"
26 #include "jitterctl.h"
27 #include "utils.h"
28 #include "rtpsession_priv.h"
29 #include "congestiondetector.h"
30 #include "videobandwidthestimator.h"
31 
queue_packet(queue_t * q,int maxrqsz,mblk_t * mp,rtp_header_t * rtp,int * discarded,int * duplicate)32 static bool_t queue_packet(queue_t *q, int maxrqsz, mblk_t *mp, rtp_header_t *rtp, int *discarded, int *duplicate)
33 {
34 	mblk_t *tmp;
35 	int header_size;
36 	*discarded=0;
37 	*duplicate=0;
38 	header_size=RTP_FIXED_HEADER_SIZE+ (4*rtp->cc);
39 	if ((mp->b_wptr - mp->b_rptr)==header_size){
40 		ortp_debug("Rtp packet contains no data.");
41 		(*discarded)++;
42 		freemsg(mp);
43 		return FALSE;
44 	}
45 
46 	/* and then add the packet to the queue */
47 	if (rtp_putq(q,mp) < 0) {
48 		/* It was a duplicate packet */
49 		(*duplicate)++;
50 	}
51 
52 	/* make some checks: q size must not exceed RtpStream::max_rq_size */
53 	while (q->q_mcount > maxrqsz)
54 	{
55 		/* remove the oldest mblk_t */
56 		tmp=getq(q);
57 		if (mp!=NULL)
58 		{
59 			ortp_warning("rtp_putq: Queue is full. Discarding message with ts=%u",((rtp_header_t*)mp->b_rptr)->timestamp);
60 			freemsg(tmp);
61 			(*discarded)++;
62 		}
63 	}
64 	return TRUE;
65 }
66 
compute_mean_and_deviation(uint32_t nb,double x,double * olds,double * oldm,double * news,double * newm)67 static void compute_mean_and_deviation(uint32_t nb, double x, double *olds, double *oldm, double *news, double *newm) {
68 	*newm = *oldm + (x - *oldm) / nb;
69 	*news = *olds + ((x - *oldm) * (x - *newm));
70 	*oldm = *newm;
71 	*olds = *news;
72 }
73 
update_rtcp_xr_stat_summary(RtpSession * session,mblk_t * mp,uint32_t local_str_ts)74 static void update_rtcp_xr_stat_summary(RtpSession *session, mblk_t *mp, uint32_t local_str_ts) {
75 	rtp_header_t *rtp = (rtp_header_t *)mp->b_rptr;
76 	int64_t diff = (int64_t)rtp->timestamp - (int64_t)local_str_ts;
77 
78 	/* TTL/HL statistics */
79 	if (session->rtcp_xr_stats.rcv_since_last_stat_summary == 1) {
80 		session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary = 255;
81 		session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary = 0;
82 		session->rtcp_xr_stats.olds_ttl_or_hl_since_last_stat_summary = 0;
83 		session->rtcp_xr_stats.oldm_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
84 		session->rtcp_xr_stats.newm_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
85 	}
86 	compute_mean_and_deviation(session->rtcp_xr_stats.rcv_since_last_stat_summary,
87 		(double)mp->ttl_or_hl,
88 		&session->rtcp_xr_stats.olds_ttl_or_hl_since_last_stat_summary,
89 		&session->rtcp_xr_stats.oldm_ttl_or_hl_since_last_stat_summary,
90 		&session->rtcp_xr_stats.news_ttl_or_hl_since_last_stat_summary,
91 		&session->rtcp_xr_stats.newm_ttl_or_hl_since_last_stat_summary);
92 	if (mp->ttl_or_hl < session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary) {
93 		session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
94 	}
95 	if (mp->ttl_or_hl > session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary) {
96 		session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
97 	}
98 
99 	/* Jitter statistics */
100 	if (session->rtcp_xr_stats.rcv_since_last_stat_summary == 1) {
101 		session->rtcp_xr_stats.min_jitter_since_last_stat_summary = 0xFFFFFFFF;
102 		session->rtcp_xr_stats.max_jitter_since_last_stat_summary = 0;
103 	} else {
104 		int64_t signed_jitter = diff - session->rtcp_xr_stats.last_jitter_diff_since_last_stat_summary;
105 		uint32_t jitter;
106 		if (signed_jitter < 0) {
107 			jitter = (uint32_t)(-signed_jitter);
108 		} else {
109 			jitter = (uint32_t)(signed_jitter);
110 		}
111 		compute_mean_and_deviation(session->rtcp_xr_stats.rcv_since_last_stat_summary - 1,
112 			(double)jitter,
113 			&session->rtcp_xr_stats.olds_jitter_since_last_stat_summary,
114 			&session->rtcp_xr_stats.oldm_jitter_since_last_stat_summary,
115 			&session->rtcp_xr_stats.news_jitter_since_last_stat_summary,
116 			&session->rtcp_xr_stats.newm_jitter_since_last_stat_summary);
117 		if (jitter < session->rtcp_xr_stats.min_jitter_since_last_stat_summary) {
118 			session->rtcp_xr_stats.min_jitter_since_last_stat_summary = jitter;
119 		}
120 		if (jitter > session->rtcp_xr_stats.max_jitter_since_last_stat_summary) {
121 			session->rtcp_xr_stats.max_jitter_since_last_stat_summary = jitter;
122 		}
123 	}
124 	session->rtcp_xr_stats.last_jitter_diff_since_last_stat_summary = diff;
125 }
126 
rtp_session_rtp_parse(RtpSession * session,mblk_t * mp,uint32_t local_str_ts,struct sockaddr * addr,socklen_t addrlen)127 void rtp_session_rtp_parse(RtpSession *session, mblk_t *mp, uint32_t local_str_ts, struct sockaddr *addr, socklen_t addrlen)
128 {
129 	int i;
130 	int discarded;
131 	int duplicate;
132 	rtp_header_t *rtp;
133 	int msgsize;
134 	RtpStream *rtpstream=&session->rtp;
135 	rtp_stats_t *stats=&session->stats;
136 
137 	msgsize=(int)(mp->b_wptr-mp->b_rptr);
138 
139 	if (msgsize<RTP_FIXED_HEADER_SIZE){
140 		ortp_warning("Packet too small to be a rtp packet (%i)!",msgsize);
141 		session->stats.bad++;
142 		ortp_global_stats.bad++;
143 		freemsg(mp);
144 		return;
145 	}
146 	rtp=(rtp_header_t*)mp->b_rptr;
147 	if (rtp->version!=2)
148 	{
149 		/* try to see if it is a STUN packet */
150 		uint16_t stunlen=*((uint16_t*)(mp->b_rptr + sizeof(uint16_t)));
151 		stunlen = ntohs(stunlen);
152 		if (stunlen+20==mp->b_wptr-mp->b_rptr){
153 			/* this looks like a stun packet */
154 			rtp_session_update_remote_sock_addr(session,mp,TRUE,TRUE);
155 			if (session->eventqs!=NULL){
156 				OrtpEvent *ev=ortp_event_new(ORTP_EVENT_STUN_PACKET_RECEIVED);
157 				OrtpEventData *ed=ortp_event_get_data(ev);
158 				ed->packet=mp;
159 				memcpy(&ed->source_addr,addr,addrlen);
160 				ed->source_addrlen=addrlen;
161 				ed->info.socket_type = OrtpRTPSocket;
162 				rtp_session_dispatch_event(session,ev);
163 				return;
164 			}
165 		}
166 		/* discard in two case: the packet is not stun OR nobody is interested by STUN (no eventqs) */
167 		ortp_debug("Receiving rtp packet with version number %d!=2...discarded", rtp->version);
168 		stats->bad++;
169 		ortp_global_stats.bad++;
170 		freemsg(mp);
171 		return;
172 	}
173 
174 	/* only count non-stun packets. */
175 	ortp_global_stats.packet_recv++;
176 	stats->packet_recv++;
177 	ortp_global_stats.hw_recv+=msgsize;
178 	stats->hw_recv+=msgsize;
179 	session->rtp.hwrcv_since_last_SR++;
180 	session->rtcp_xr_stats.rcv_since_last_stat_summary++;
181 
182 	/* convert all header data from network order to host order */
183 	rtp->seq_number=ntohs(rtp->seq_number);
184 	rtp->timestamp=ntohl(rtp->timestamp);
185 	rtp->ssrc=ntohl(rtp->ssrc);
186 	/* convert csrc if necessary */
187 	if (rtp->cc*sizeof(uint32_t) > (uint32_t) (msgsize-RTP_FIXED_HEADER_SIZE)){
188 		ortp_debug("Receiving too short rtp packet.");
189 		stats->bad++;
190 		ortp_global_stats.bad++;
191 		freemsg(mp);
192 		return;
193 	}
194 
195 #ifndef PERF
196 	/* Write down the last RTP/RTCP packet reception time. */
197 	ortp_gettimeofday(&session->last_recv_time, NULL);
198 #endif
199 
200 	for (i=0;i<rtp->cc;i++)
201 		rtp->csrc[i]=ntohl(rtp->csrc[i]);
202 	/*the goal of the following code is to lock on an incoming SSRC to avoid
203 	receiving "mixed streams"*/
204 	if (session->ssrc_set){
205 		/*the ssrc is set, so we must check it */
206 		if (session->rcv.ssrc!=rtp->ssrc){
207 			if (session->inc_ssrc_candidate==rtp->ssrc){
208 				session->inc_same_ssrc_count++;
209 			}else{
210 				session->inc_same_ssrc_count=0;
211 				session->inc_ssrc_candidate=rtp->ssrc;
212 			}
213 			if (session->inc_same_ssrc_count>=session->rtp.ssrc_changed_thres){
214 				/* store the sender rtp address to do symmetric RTP */
215 				rtp_session_update_remote_sock_addr(session,mp,TRUE,FALSE);
216 				session->rtp.rcv_last_ts = rtp->timestamp;
217 				session->rcv.ssrc=rtp->ssrc;
218 				rtp_signal_table_emit(&session->on_ssrc_changed);
219 			}else{
220 				/*discard the packet*/
221 				ortp_debug("Receiving packet with unknown ssrc.");
222 				stats->bad++;
223 				ortp_global_stats.bad++;
224 				freemsg(mp);
225 				return;
226 			}
227 		} else{
228 			/* The SSRC change must not happen if we still receive
229 			ssrc from the initial source. */
230 			session->inc_same_ssrc_count=0;
231 		}
232 	}else{
233 		session->ssrc_set=TRUE;
234 		session->rcv.ssrc=rtp->ssrc;
235 		rtp_session_update_remote_sock_addr(session,mp,TRUE,FALSE);
236 	}
237 
238 	/* update some statistics */
239 	{
240 		poly32_t *extseq=(poly32_t*)&rtpstream->hwrcv_extseq;
241 		if (rtp->seq_number>extseq->split.lo){
242 			extseq->split.lo=rtp->seq_number;
243 		}else if (rtp->seq_number<200 && extseq->split.lo>((1<<16) - 200)){
244 			/* this is a check for sequence number looping */
245 			extseq->split.lo=rtp->seq_number;
246 			extseq->split.hi++;
247 		}
248 
249 		/* the first sequence number received should be initialized at the beginning
250 		or at any resync, so that the first receiver reports contains valid loss rate*/
251 		if (!(session->flags & RTP_SESSION_RECV_SEQ_INIT)){
252 			rtp_session_set_flag(session, RTP_SESSION_RECV_SEQ_INIT);
253 			rtpstream->hwrcv_seq_at_last_SR=rtp->seq_number-1;
254 			session->rtcp_xr_stats.rcv_seq_at_last_stat_summary=rtp->seq_number-1;
255 		}
256 		if (stats->packet_recv==1){
257 			session->rtcp_xr_stats.first_rcv_seq=extseq->one;
258 		}
259 		session->rtcp_xr_stats.last_rcv_seq=extseq->one;
260 	}
261 
262 	/* check for possible telephone events */
263 	if (rtp_profile_is_telephone_event(session->snd.profile, rtp->paytype)){
264 		queue_packet(&session->rtp.tev_rq,session->rtp.jittctl.params.max_packets,mp,rtp,&discarded,&duplicate);
265 		stats->discarded+=discarded;
266 		ortp_global_stats.discarded+=discarded;
267 		stats->packet_dup_recv+=duplicate;
268 		ortp_global_stats.packet_dup_recv+=duplicate;
269 		session->rtcp_xr_stats.discarded_count += discarded;
270 		session->rtcp_xr_stats.dup_since_last_stat_summary += duplicate;
271 		return;
272 	}
273 
274 	/* check for possible payload type change, in order to update accordingly our clock-rate dependant
275 	parameters */
276 	if (session->hw_recv_pt!=rtp->paytype){
277 		rtp_session_update_payload_type(session,rtp->paytype);
278 	}
279 
280 	/* Drop the packets while the RTP_SESSION_FLUSH flag is set. */
281 	if (session->flags & RTP_SESSION_FLUSH) {
282 		freemsg(mp);
283 		return;
284 	}
285 
286 	jitter_control_new_packet(&session->rtp.jittctl,rtp->timestamp,local_str_ts);
287 
288 	if (session->video_bandwidth_estimator_enabled && session->rtp.video_bw_estimator) {
289 		int overhead = ortp_stream_is_ipv6(&session->rtp.gs) ? IP6_UDP_OVERHEAD : IP_UDP_OVERHEAD;
290 		ortp_video_bandwidth_estimator_process_packet(session->rtp.video_bw_estimator, rtp->timestamp, &mp->timestamp, msgsize + overhead, rtp->markbit == 1);
291 	}
292 
293 	if (session->congestion_detector_enabled && session->rtp.congdetect){
294 		if (ortp_congestion_detector_record(session->rtp.congdetect,rtp->timestamp,local_str_ts)) {
295 			OrtpEvent *ev=ortp_event_new(ORTP_EVENT_CONGESTION_STATE_CHANGED);
296 			OrtpEventData *ed=ortp_event_get_data(ev);
297 			ed->info.congestion_detected = session->rtp.congdetect->state == CongestionStateDetected;
298 			rtp_session_dispatch_event(session,ev);
299 		}
300 	}
301 
302 	update_rtcp_xr_stat_summary(session, mp, local_str_ts);
303 
304 	if (session->flags & RTP_SESSION_FIRST_PACKET_DELIVERED) {
305 		/* detect timestamp important jumps in the future, to workaround stupid rtp senders */
306 		if (RTP_TIMESTAMP_IS_NEWER_THAN(rtp->timestamp,session->rtp.rcv_last_ts+session->rtp.ts_jump)){
307 			ortp_warning("rtp_parse: timestamp jump in the future detected.");
308 			rtp_signal_table_emit2(&session->on_timestamp_jump,&rtp->timestamp);
309 		}
310 		else if (RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN(session->rtp.rcv_last_ts,rtp->timestamp)
311 			|| RTP_SEQ_IS_STRICTLY_GREATER_THAN(session->rtp.rcv_last_seq,rtp->seq_number)){
312 			/* don't queue packets older than the last returned packet to the application, or whose sequence number
313 			 is behind the last packet returned to the application*/
314 			/* Call timstamp jumb in case of
315 			 * large negative Ts jump or if ts is set to 0
316 			*/
317 
318 			if ( RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN(session->rtp.rcv_last_ts, rtp->timestamp + session->rtp.ts_jump) ){
319 				ortp_warning("rtp_parse: negative timestamp jump detected");
320 				rtp_signal_table_emit2(&session->on_timestamp_jump, &rtp->timestamp);
321 			}
322 			ortp_error("rtp_parse: discarding too old packet (seq=%i, ts=%u, last_delivered was seq=%i, ts=%u)",rtp->seq_number, rtp->timestamp,
323 				(int)session->rtp.rcv_last_seq, session->rtp.rcv_last_ts);
324 			freemsg(mp);
325 			stats->outoftime++;
326 			ortp_global_stats.outoftime++;
327 			session->rtcp_xr_stats.discarded_count++;
328 			return;
329 		}
330 	}
331 
332 	if (queue_packet(&session->rtp.rq,session->rtp.jittctl.params.max_packets,mp,rtp,&discarded,&duplicate))
333 		jitter_control_update_size(&session->rtp.jittctl,&session->rtp.rq);
334 	stats->discarded+=discarded;
335 	ortp_global_stats.discarded+=discarded;
336 	stats->packet_dup_recv+=duplicate;
337 	ortp_global_stats.packet_dup_recv+=duplicate;
338 	session->rtcp_xr_stats.discarded_count += discarded;
339 	session->rtcp_xr_stats.dup_since_last_stat_summary += duplicate;
340 	if ((discarded == 0) && (duplicate == 0)) {
341 		session->rtcp_xr_stats.rcv_count++;
342 	}
343 }
344 
345