1 /*
2 * The oRTP library is an RTP (Realtime Transport Protocol - rfc3550) implementation with additional features.
3 * Copyright (C) 2017 Belledonne Communications SARL
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20
21 #ifdef HAVE_CONFIG_H
22 #include "ortp-config.h"
23 #endif
24
25 #include "ortp/ortp.h"
26 #include "jitterctl.h"
27 #include "utils.h"
28 #include "rtpsession_priv.h"
29 #include "congestiondetector.h"
30 #include "videobandwidthestimator.h"
31
queue_packet(queue_t * q,int maxrqsz,mblk_t * mp,rtp_header_t * rtp,int * discarded,int * duplicate)32 static bool_t queue_packet(queue_t *q, int maxrqsz, mblk_t *mp, rtp_header_t *rtp, int *discarded, int *duplicate)
33 {
34 mblk_t *tmp;
35 int header_size;
36 *discarded=0;
37 *duplicate=0;
38 header_size=RTP_FIXED_HEADER_SIZE+ (4*rtp->cc);
39 if ((mp->b_wptr - mp->b_rptr)==header_size){
40 ortp_debug("Rtp packet contains no data.");
41 (*discarded)++;
42 freemsg(mp);
43 return FALSE;
44 }
45
46 /* and then add the packet to the queue */
47 if (rtp_putq(q,mp) < 0) {
48 /* It was a duplicate packet */
49 (*duplicate)++;
50 }
51
52 /* make some checks: q size must not exceed RtpStream::max_rq_size */
53 while (q->q_mcount > maxrqsz)
54 {
55 /* remove the oldest mblk_t */
56 tmp=getq(q);
57 if (mp!=NULL)
58 {
59 ortp_warning("rtp_putq: Queue is full. Discarding message with ts=%u",((rtp_header_t*)mp->b_rptr)->timestamp);
60 freemsg(tmp);
61 (*discarded)++;
62 }
63 }
64 return TRUE;
65 }
66
compute_mean_and_deviation(uint32_t nb,double x,double * olds,double * oldm,double * news,double * newm)67 static void compute_mean_and_deviation(uint32_t nb, double x, double *olds, double *oldm, double *news, double *newm) {
68 *newm = *oldm + (x - *oldm) / nb;
69 *news = *olds + ((x - *oldm) * (x - *newm));
70 *oldm = *newm;
71 *olds = *news;
72 }
73
update_rtcp_xr_stat_summary(RtpSession * session,mblk_t * mp,uint32_t local_str_ts)74 static void update_rtcp_xr_stat_summary(RtpSession *session, mblk_t *mp, uint32_t local_str_ts) {
75 rtp_header_t *rtp = (rtp_header_t *)mp->b_rptr;
76 int64_t diff = (int64_t)rtp->timestamp - (int64_t)local_str_ts;
77
78 /* TTL/HL statistics */
79 if (session->rtcp_xr_stats.rcv_since_last_stat_summary == 1) {
80 session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary = 255;
81 session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary = 0;
82 session->rtcp_xr_stats.olds_ttl_or_hl_since_last_stat_summary = 0;
83 session->rtcp_xr_stats.oldm_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
84 session->rtcp_xr_stats.newm_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
85 }
86 compute_mean_and_deviation(session->rtcp_xr_stats.rcv_since_last_stat_summary,
87 (double)mp->ttl_or_hl,
88 &session->rtcp_xr_stats.olds_ttl_or_hl_since_last_stat_summary,
89 &session->rtcp_xr_stats.oldm_ttl_or_hl_since_last_stat_summary,
90 &session->rtcp_xr_stats.news_ttl_or_hl_since_last_stat_summary,
91 &session->rtcp_xr_stats.newm_ttl_or_hl_since_last_stat_summary);
92 if (mp->ttl_or_hl < session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary) {
93 session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
94 }
95 if (mp->ttl_or_hl > session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary) {
96 session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
97 }
98
99 /* Jitter statistics */
100 if (session->rtcp_xr_stats.rcv_since_last_stat_summary == 1) {
101 session->rtcp_xr_stats.min_jitter_since_last_stat_summary = 0xFFFFFFFF;
102 session->rtcp_xr_stats.max_jitter_since_last_stat_summary = 0;
103 } else {
104 int64_t signed_jitter = diff - session->rtcp_xr_stats.last_jitter_diff_since_last_stat_summary;
105 uint32_t jitter;
106 if (signed_jitter < 0) {
107 jitter = (uint32_t)(-signed_jitter);
108 } else {
109 jitter = (uint32_t)(signed_jitter);
110 }
111 compute_mean_and_deviation(session->rtcp_xr_stats.rcv_since_last_stat_summary - 1,
112 (double)jitter,
113 &session->rtcp_xr_stats.olds_jitter_since_last_stat_summary,
114 &session->rtcp_xr_stats.oldm_jitter_since_last_stat_summary,
115 &session->rtcp_xr_stats.news_jitter_since_last_stat_summary,
116 &session->rtcp_xr_stats.newm_jitter_since_last_stat_summary);
117 if (jitter < session->rtcp_xr_stats.min_jitter_since_last_stat_summary) {
118 session->rtcp_xr_stats.min_jitter_since_last_stat_summary = jitter;
119 }
120 if (jitter > session->rtcp_xr_stats.max_jitter_since_last_stat_summary) {
121 session->rtcp_xr_stats.max_jitter_since_last_stat_summary = jitter;
122 }
123 }
124 session->rtcp_xr_stats.last_jitter_diff_since_last_stat_summary = diff;
125 }
126
rtp_session_rtp_parse(RtpSession * session,mblk_t * mp,uint32_t local_str_ts,struct sockaddr * addr,socklen_t addrlen)127 void rtp_session_rtp_parse(RtpSession *session, mblk_t *mp, uint32_t local_str_ts, struct sockaddr *addr, socklen_t addrlen)
128 {
129 int i;
130 int discarded;
131 int duplicate;
132 rtp_header_t *rtp;
133 int msgsize;
134 RtpStream *rtpstream=&session->rtp;
135 rtp_stats_t *stats=&session->stats;
136
137 msgsize=(int)(mp->b_wptr-mp->b_rptr);
138
139 if (msgsize<RTP_FIXED_HEADER_SIZE){
140 ortp_warning("Packet too small to be a rtp packet (%i)!",msgsize);
141 session->stats.bad++;
142 ortp_global_stats.bad++;
143 freemsg(mp);
144 return;
145 }
146 rtp=(rtp_header_t*)mp->b_rptr;
147 if (rtp->version!=2)
148 {
149 /* try to see if it is a STUN packet */
150 uint16_t stunlen=*((uint16_t*)(mp->b_rptr + sizeof(uint16_t)));
151 stunlen = ntohs(stunlen);
152 if (stunlen+20==mp->b_wptr-mp->b_rptr){
153 /* this looks like a stun packet */
154 rtp_session_update_remote_sock_addr(session,mp,TRUE,TRUE);
155 if (session->eventqs!=NULL){
156 OrtpEvent *ev=ortp_event_new(ORTP_EVENT_STUN_PACKET_RECEIVED);
157 OrtpEventData *ed=ortp_event_get_data(ev);
158 ed->packet=mp;
159 memcpy(&ed->source_addr,addr,addrlen);
160 ed->source_addrlen=addrlen;
161 ed->info.socket_type = OrtpRTPSocket;
162 rtp_session_dispatch_event(session,ev);
163 return;
164 }
165 }
166 /* discard in two case: the packet is not stun OR nobody is interested by STUN (no eventqs) */
167 ortp_debug("Receiving rtp packet with version number %d!=2...discarded", rtp->version);
168 stats->bad++;
169 ortp_global_stats.bad++;
170 freemsg(mp);
171 return;
172 }
173
174 /* only count non-stun packets. */
175 ortp_global_stats.packet_recv++;
176 stats->packet_recv++;
177 ortp_global_stats.hw_recv+=msgsize;
178 stats->hw_recv+=msgsize;
179 session->rtp.hwrcv_since_last_SR++;
180 session->rtcp_xr_stats.rcv_since_last_stat_summary++;
181
182 /* convert all header data from network order to host order */
183 rtp->seq_number=ntohs(rtp->seq_number);
184 rtp->timestamp=ntohl(rtp->timestamp);
185 rtp->ssrc=ntohl(rtp->ssrc);
186 /* convert csrc if necessary */
187 if (rtp->cc*sizeof(uint32_t) > (uint32_t) (msgsize-RTP_FIXED_HEADER_SIZE)){
188 ortp_debug("Receiving too short rtp packet.");
189 stats->bad++;
190 ortp_global_stats.bad++;
191 freemsg(mp);
192 return;
193 }
194
195 #ifndef PERF
196 /* Write down the last RTP/RTCP packet reception time. */
197 ortp_gettimeofday(&session->last_recv_time, NULL);
198 #endif
199
200 for (i=0;i<rtp->cc;i++)
201 rtp->csrc[i]=ntohl(rtp->csrc[i]);
202 /*the goal of the following code is to lock on an incoming SSRC to avoid
203 receiving "mixed streams"*/
204 if (session->ssrc_set){
205 /*the ssrc is set, so we must check it */
206 if (session->rcv.ssrc!=rtp->ssrc){
207 if (session->inc_ssrc_candidate==rtp->ssrc){
208 session->inc_same_ssrc_count++;
209 }else{
210 session->inc_same_ssrc_count=0;
211 session->inc_ssrc_candidate=rtp->ssrc;
212 }
213 if (session->inc_same_ssrc_count>=session->rtp.ssrc_changed_thres){
214 /* store the sender rtp address to do symmetric RTP */
215 rtp_session_update_remote_sock_addr(session,mp,TRUE,FALSE);
216 session->rtp.rcv_last_ts = rtp->timestamp;
217 session->rcv.ssrc=rtp->ssrc;
218 rtp_signal_table_emit(&session->on_ssrc_changed);
219 }else{
220 /*discard the packet*/
221 ortp_debug("Receiving packet with unknown ssrc.");
222 stats->bad++;
223 ortp_global_stats.bad++;
224 freemsg(mp);
225 return;
226 }
227 } else{
228 /* The SSRC change must not happen if we still receive
229 ssrc from the initial source. */
230 session->inc_same_ssrc_count=0;
231 }
232 }else{
233 session->ssrc_set=TRUE;
234 session->rcv.ssrc=rtp->ssrc;
235 rtp_session_update_remote_sock_addr(session,mp,TRUE,FALSE);
236 }
237
238 /* update some statistics */
239 {
240 poly32_t *extseq=(poly32_t*)&rtpstream->hwrcv_extseq;
241 if (rtp->seq_number>extseq->split.lo){
242 extseq->split.lo=rtp->seq_number;
243 }else if (rtp->seq_number<200 && extseq->split.lo>((1<<16) - 200)){
244 /* this is a check for sequence number looping */
245 extseq->split.lo=rtp->seq_number;
246 extseq->split.hi++;
247 }
248
249 /* the first sequence number received should be initialized at the beginning
250 or at any resync, so that the first receiver reports contains valid loss rate*/
251 if (!(session->flags & RTP_SESSION_RECV_SEQ_INIT)){
252 rtp_session_set_flag(session, RTP_SESSION_RECV_SEQ_INIT);
253 rtpstream->hwrcv_seq_at_last_SR=rtp->seq_number-1;
254 session->rtcp_xr_stats.rcv_seq_at_last_stat_summary=rtp->seq_number-1;
255 }
256 if (stats->packet_recv==1){
257 session->rtcp_xr_stats.first_rcv_seq=extseq->one;
258 }
259 session->rtcp_xr_stats.last_rcv_seq=extseq->one;
260 }
261
262 /* check for possible telephone events */
263 if (rtp_profile_is_telephone_event(session->snd.profile, rtp->paytype)){
264 queue_packet(&session->rtp.tev_rq,session->rtp.jittctl.params.max_packets,mp,rtp,&discarded,&duplicate);
265 stats->discarded+=discarded;
266 ortp_global_stats.discarded+=discarded;
267 stats->packet_dup_recv+=duplicate;
268 ortp_global_stats.packet_dup_recv+=duplicate;
269 session->rtcp_xr_stats.discarded_count += discarded;
270 session->rtcp_xr_stats.dup_since_last_stat_summary += duplicate;
271 return;
272 }
273
274 /* check for possible payload type change, in order to update accordingly our clock-rate dependant
275 parameters */
276 if (session->hw_recv_pt!=rtp->paytype){
277 rtp_session_update_payload_type(session,rtp->paytype);
278 }
279
280 /* Drop the packets while the RTP_SESSION_FLUSH flag is set. */
281 if (session->flags & RTP_SESSION_FLUSH) {
282 freemsg(mp);
283 return;
284 }
285
286 jitter_control_new_packet(&session->rtp.jittctl,rtp->timestamp,local_str_ts);
287
288 if (session->video_bandwidth_estimator_enabled && session->rtp.video_bw_estimator) {
289 int overhead = ortp_stream_is_ipv6(&session->rtp.gs) ? IP6_UDP_OVERHEAD : IP_UDP_OVERHEAD;
290 ortp_video_bandwidth_estimator_process_packet(session->rtp.video_bw_estimator, rtp->timestamp, &mp->timestamp, msgsize + overhead, rtp->markbit == 1);
291 }
292
293 if (session->congestion_detector_enabled && session->rtp.congdetect){
294 if (ortp_congestion_detector_record(session->rtp.congdetect,rtp->timestamp,local_str_ts)) {
295 OrtpEvent *ev=ortp_event_new(ORTP_EVENT_CONGESTION_STATE_CHANGED);
296 OrtpEventData *ed=ortp_event_get_data(ev);
297 ed->info.congestion_detected = session->rtp.congdetect->state == CongestionStateDetected;
298 rtp_session_dispatch_event(session,ev);
299 }
300 }
301
302 update_rtcp_xr_stat_summary(session, mp, local_str_ts);
303
304 if (session->flags & RTP_SESSION_FIRST_PACKET_DELIVERED) {
305 /* detect timestamp important jumps in the future, to workaround stupid rtp senders */
306 if (RTP_TIMESTAMP_IS_NEWER_THAN(rtp->timestamp,session->rtp.rcv_last_ts+session->rtp.ts_jump)){
307 ortp_warning("rtp_parse: timestamp jump in the future detected.");
308 rtp_signal_table_emit2(&session->on_timestamp_jump,&rtp->timestamp);
309 }
310 else if (RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN(session->rtp.rcv_last_ts,rtp->timestamp)
311 || RTP_SEQ_IS_STRICTLY_GREATER_THAN(session->rtp.rcv_last_seq,rtp->seq_number)){
312 /* don't queue packets older than the last returned packet to the application, or whose sequence number
313 is behind the last packet returned to the application*/
314 /* Call timstamp jumb in case of
315 * large negative Ts jump or if ts is set to 0
316 */
317
318 if ( RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN(session->rtp.rcv_last_ts, rtp->timestamp + session->rtp.ts_jump) ){
319 ortp_warning("rtp_parse: negative timestamp jump detected");
320 rtp_signal_table_emit2(&session->on_timestamp_jump, &rtp->timestamp);
321 }
322 ortp_error("rtp_parse: discarding too old packet (seq=%i, ts=%u, last_delivered was seq=%i, ts=%u)",rtp->seq_number, rtp->timestamp,
323 (int)session->rtp.rcv_last_seq, session->rtp.rcv_last_ts);
324 freemsg(mp);
325 stats->outoftime++;
326 ortp_global_stats.outoftime++;
327 session->rtcp_xr_stats.discarded_count++;
328 return;
329 }
330 }
331
332 if (queue_packet(&session->rtp.rq,session->rtp.jittctl.params.max_packets,mp,rtp,&discarded,&duplicate))
333 jitter_control_update_size(&session->rtp.jittctl,&session->rtp.rq);
334 stats->discarded+=discarded;
335 ortp_global_stats.discarded+=discarded;
336 stats->packet_dup_recv+=duplicate;
337 ortp_global_stats.packet_dup_recv+=duplicate;
338 session->rtcp_xr_stats.discarded_count += discarded;
339 session->rtcp_xr_stats.dup_since_last_stat_summary += duplicate;
340 if ((discarded == 0) && (duplicate == 0)) {
341 session->rtcp_xr_stats.rcv_count++;
342 }
343 }
344
345