1 /*
2  *			GPAC - Multimedia Framework C SDK
3  *
4  *			Authors: Jean Le Feuvre
5  *			Copyright (c) Telecom ParisTech 2000-2017
6  *					All rights reserved
7  *
8  *  This file is part of GPAC / RTP/RTSP input filter
9  *
10  *  GPAC is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU Lesser General Public License as published by
12  *  the Free Software Foundation; either version 2, or (at your option)
13  *  any later version.
14  *
15  *  GPAC is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU Lesser General Public License for more details.
19  *
20  *  You should have received a copy of the GNU Lesser General Public
21  *  License along with this library; see the file COPYING.  If not, write to
22  *  the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  */
25 
26 #include "in_rtp.h"
27 
28 
29 #ifndef GPAC_DISABLE_STREAMING
30 
rtpin_stream_ack_connect(GF_RTPInStream * stream,GF_Err e)31 void rtpin_stream_ack_connect(GF_RTPInStream *stream, GF_Err e)
32 {
33 	/*in case the channel has been disconnected while SETUP was issued&processed. We also could
34 	clean up the command stack*/
35 	if (!stream->opid) return;
36 
37 	if (e != GF_OK || !stream->rtp_ch) return;
38 }
39 
rtpin_stream_init(GF_RTPInStream * stream,Bool ResetOnly)40 GF_Err rtpin_stream_init(GF_RTPInStream *stream, Bool ResetOnly)
41 {
42 	gf_rtp_depacketizer_reset(stream->depacketizer, !ResetOnly);
43 
44 	if (!ResetOnly) {
45 		GF_Err e;
46 		const char *ip_ifce = NULL;
47 		if (!stream->rtpin->interleave) {
48 			ip_ifce = stream->rtpin->ifce;
49 		}
50 		if (stream->rtp_ch->rtp)
51 			gf_sk_group_unregister(stream->rtpin->sockgroup, stream->rtp_ch->rtp);
52 		if (stream->rtp_ch->rtcp)
53 			gf_sk_group_unregister(stream->rtpin->sockgroup, stream->rtp_ch->rtcp);
54 
55 		e = gf_rtp_initialize(stream->rtp_ch, stream->rtpin->block_size, GF_FALSE, 0, stream->rtpin->reorder_len, stream->rtpin->reorder_delay, (char *)ip_ifce);
56 		if (e) return e;
57 
58 		if (stream->rtp_ch->rtp)
59 			gf_sk_group_register(stream->rtpin->sockgroup, stream->rtp_ch->rtp);
60 		if (stream->rtp_ch->rtcp)
61 			gf_sk_group_register(stream->rtpin->sockgroup, stream->rtp_ch->rtcp);
62 
63 	}
64 	//just reset the sockets
65 	gf_rtp_reset_buffers(stream->rtp_ch);
66 	return GF_OK;
67 }
68 
rtpin_stream_reset_queue(GF_RTPInStream * stream)69 void rtpin_stream_reset_queue(GF_RTPInStream *stream)
70 {
71 	if (!stream->pck_queue) return;
72 	while (gf_list_count(stream->pck_queue)) {
73 		GF_FilterPacket *pck = gf_list_pop_back(stream->pck_queue);
74 		gf_filter_pck_discard(pck);
75 	}
76 }
77 
rtpin_stream_del(GF_RTPInStream * stream)78 void rtpin_stream_del(GF_RTPInStream *stream)
79 {
80 	if (stream->rtsp) {
81 		if (stream->status == RTP_Running) {
82 			rtpin_rtsp_teardown(stream->rtsp, stream);
83 			stream->status = RTP_Disconnected;
84 		}
85 		rtpin_remove_stream(stream->rtpin, stream);
86 	} else {
87 		rtpin_find_stream(stream->rtpin, stream->opid, 0, NULL, GF_TRUE);
88 	}
89 
90 	if (stream->depacketizer) gf_rtp_depacketizer_del(stream->depacketizer);
91 	if (stream->rtp_ch) gf_rtp_del(stream->rtp_ch);
92 	if (stream->control) gf_free(stream->control);
93 	if (stream->session_id) gf_free(stream->session_id);
94 	if (stream->buffer) gf_free(stream->buffer);
95 	if (stream->pck_queue) {
96 		rtpin_stream_reset_queue(stream);
97 		gf_list_del(stream->pck_queue);
98 	}
99 	gf_free(stream);
100 }
101 
rtpin_stream_queue_pck(GF_RTPInStream * stream,GF_FilterPacket * pck,u64 cts)102 static void rtpin_stream_queue_pck(GF_RTPInStream *stream, GF_FilterPacket *pck, u64 cts)
103 {
104 	//get all packets in queue, dispatch all packets with a cts less than the cts of the packet being queued
105 	//if this is teh first packet in the RTP packet
106 	//This is only used for MPEG4, and we are sure we only have [AU(n),AU(n+i)..], [AU(n+j), AU(n+k) ...]
107 	//with i,j,k >0
108 	while (stream->first_in_rtp_pck) {
109 		u64 prev_cts;
110 		GF_FilterPacket *prev = gf_list_get(stream->pck_queue, 0);
111 		if (!prev) break;
112 		prev_cts = gf_filter_pck_get_cts(prev);
113 		if (prev_cts>cts) break;
114 		gf_list_rem(stream->pck_queue, 0);
115 		gf_filter_pck_send(prev);
116 	}
117 	stream->first_in_rtp_pck = GF_FALSE;
118 	gf_list_add(stream->pck_queue, pck);
119 }
120 
rtp_sl_packet_cbk(void * udta,u8 * payload,u32 size,GF_SLHeader * hdr,GF_Err e)121 static void rtp_sl_packet_cbk(void *udta, u8 *payload, u32 size, GF_SLHeader *hdr, GF_Err e)
122 {
123 	u64 cts, dts;
124 	s64 diff;
125 	GF_FilterPacket *pck;
126 	u8 *pck_data;
127 	GF_RTPInStream *stream = (GF_RTPInStream *)udta;
128 
129 
130 	if (!stream->rtcp_init) {
131 		if (!stream->rtcp_check_start) {
132 			GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] No RTCP packet received yet, synchronization might be wrong for a while\n"));
133 			stream->rtcp_check_start = gf_sys_clock();
134 		}
135 		else if (gf_sys_clock() - stream->rtcp_check_start <= stream->rtpin->rtcp_timeout) {
136 			GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] No RTCP packet received yet, synchronization might be wrong for a while\n"));
137 		} else {
138 			GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Timeout for RTCP: no SR recevied after %d ms - sync may be broken\n", stream->rtpin->rtcp_timeout));
139 			stream->rtcp_init = GF_TRUE;
140 		}
141 	}
142 
143 	if (stream->rtpin->first_packet_drop && (hdr->packetSequenceNumber >= stream->rtpin->first_packet_drop) ) {
144 		if (! ( (hdr->packetSequenceNumber - stream->rtpin->first_packet_drop) % stream->rtpin->frequency_drop) )
145 			return;
146 	}
147 
148 	cts = hdr->compositionTimeStamp;
149 	dts = hdr->decodingTimeStamp;
150 
151 	hdr->seekFlag = 0;
152 	hdr->compositionTimeStamp += stream->ts_adjust;
153 	if (stream->first_rtp_ts) {
154 		assert(hdr->compositionTimeStamp >= stream->first_rtp_ts - 1);
155 		hdr->compositionTimeStamp -= stream->first_rtp_ts - 1;
156 	}
157 
158 	if (hdr->decodingTimeStamp) {
159 		hdr->decodingTimeStamp += stream->ts_adjust;
160 		if (stream->first_rtp_ts) {
161 			assert(hdr->decodingTimeStamp >= stream->first_rtp_ts - 1);
162 			hdr->decodingTimeStamp -= stream->first_rtp_ts - 1;
163 		}
164 	}
165 
166 	pck = gf_filter_pck_new_alloc(stream->opid, size, &pck_data);
167 	memcpy(pck_data, payload, size);
168 	if (hdr->decodingTimeStampFlag)
169 		gf_filter_pck_set_dts(pck, hdr->decodingTimeStamp);
170 
171 	gf_filter_pck_set_cts(pck, hdr->compositionTimeStamp);
172 
173 	diff = (s64) hdr->compositionTimeStamp - (s64) stream->prev_cts;
174 	if ((stream->rtpin->max_sleep>0) && stream->prev_cts && diff) {
175 		if (diff<0) diff = -diff;
176 		if (!stream->min_dur_rtp || (diff < stream->min_dur_rtp)) {
177 			u64 dur = diff;
178 			dur *= 1000;
179 			dur /= stream->rtp_ch->TimeScale;
180 			stream->min_dur_rtp = (u32) dur;
181 			if (dur > stream->rtpin->max_sleep) dur = stream->rtpin->max_sleep;
182 			if (!stream->rtpin->min_frame_dur_ms || ( (u32) dur < stream->rtpin->min_frame_dur_ms)) {
183 				stream->rtpin->min_frame_dur_ms = (u32) dur;
184 			}
185 		}
186 	}
187 	stream->prev_cts = (u32) hdr->compositionTimeStamp;
188 
189 	gf_filter_pck_set_sap(pck, hdr->randomAccessPointFlag ? GF_FILTER_SAP_1  :GF_FILTER_SAP_NONE);
190 	if (hdr->au_duration)
191 		gf_filter_pck_set_duration(pck, hdr->au_duration);
192 
193 	if (hdr->samplerate && (hdr->samplerate != stream->sr)) {
194 		stream->sr = hdr->samplerate;
195 		gf_filter_pid_set_property(stream->opid, GF_PROP_PID_SAMPLE_RATE, &PROP_UINT(stream->sr));
196 	}
197 	if (hdr->channels && (hdr->channels != stream->nb_ch)) {
198 		stream->nb_ch = hdr->channels;
199 		gf_filter_pid_set_property(stream->opid, GF_PROP_PID_NUM_CHANNELS, &PROP_UINT(stream->nb_ch));
200 	}
201 	gf_filter_pck_set_framing(pck, hdr->accessUnitStartFlag, hdr->accessUnitEndFlag);
202 
203 	if (stream->rtp_ch->packet_loss)
204 		gf_filter_pck_set_corrupted(pck, 1);
205 
206 #if 0 //not yet implemented
207 	if (hdr->seekFlag)
208 		gf_filter_pck_set_seek_flag(pck, GF_TRUE);
209 #endif
210 
211 	if (stream->depacketizer->sl_map.IndexDeltaLength) {
212 		rtpin_stream_queue_pck(stream, pck, hdr->compositionTimeStamp);
213 	} else {
214 		gf_filter_pck_send(pck);
215 	}
216 
217 	hdr->compositionTimeStamp = cts;
218 	hdr->decodingTimeStamp = dts;
219 }
220 
rtpin_stream_new_satip(GF_RTPIn * rtp,const char * server_ip)221 GF_RTPInStream *rtpin_stream_new_satip(GF_RTPIn *rtp, const char *server_ip)
222 {
223 	GF_RTSPTransport trans;
224 	GF_RTPInStream *tmp;
225 	GF_SAFEALLOC(tmp, GF_RTPInStream);
226 	if (!tmp) return NULL;
227 	tmp->rtpin = rtp;
228 	tmp->buffer = gf_malloc(sizeof(char) * rtp->block_size);
229 
230 	/*create an RTP channel*/
231 	tmp->rtp_ch = gf_rtp_new();
232 	tmp->control = gf_strdup("*");
233 
234 	memset(&trans, 0, sizeof(GF_RTSPTransport));
235 	trans.Profile = "RTP/AVP";
236 	trans.source = (char *) server_ip;
237 	trans.IsUnicast = GF_TRUE;
238 	trans.client_port_first = 0;
239 	trans.client_port_last = 0;
240 	trans.port_first = 0;
241 	trans.port_last = 0;
242 
243 	if (gf_rtp_setup_transport(tmp->rtp_ch, &trans, NULL) != GF_OK) {
244 		rtpin_stream_del(tmp);
245 		return NULL;
246 	}
247 
248 	gf_rtp_setup_payload(tmp->rtp_ch, 33, 90000);
249 
250 	if (rtp->disable_rtcp) tmp->flags |= RTP_ENABLE_RTCP;
251 
252 	/*setup NAT keep-alive*/
253 	gf_rtp_enable_nat_keepalive(tmp->rtp_ch, rtp->nat_keepalive ? rtp->nat_keepalive : 30000);
254 
255 	tmp->range_start = 0;
256 	tmp->range_end = 0;
257 
258 	return tmp;
259 }
260 
rtpin_stream_new_standalone(GF_RTPIn * rtp,const char * flow_ip,u32 port)261 GF_RTPInStream *rtpin_stream_new_standalone(GF_RTPIn *rtp, const char *flow_ip, u32 port)
262 {
263 	GF_RTSPTransport trans;
264 	GF_RTPInStream *tmp;
265 	GF_SAFEALLOC(tmp, GF_RTPInStream);
266 	if (!tmp) return NULL;
267 	tmp->rtpin = rtp;
268 	tmp->buffer = gf_malloc(sizeof(char) * rtp->block_size);
269 
270 	/*create an RTP channel*/
271 	tmp->rtp_ch = gf_rtp_new();
272 
273 	memset(&trans, 0, sizeof(GF_RTSPTransport));
274 	trans.Profile = "RTP/AVP";
275 	trans.source = (char *) flow_ip;
276 	trans.IsUnicast = gf_sk_is_multicast_address(flow_ip);
277 	trans.client_port_first = port;
278 	trans.client_port_last = port+1;
279 	trans.port_first = port;
280 	trans.port_last = port+1;
281 
282 	if (gf_rtp_setup_transport(tmp->rtp_ch, &trans, NULL) != GF_OK) {
283 		rtpin_stream_del(tmp);
284 		return NULL;
285 	}
286 
287 //	gf_rtp_setup_payload(tmp->rtp_ch, 33, 90000);
288 
289 	if (rtp->disable_rtcp) tmp->flags |= RTP_ENABLE_RTCP;
290 
291 	/*setup NAT keep-alive*/
292 	gf_rtp_enable_nat_keepalive(tmp->rtp_ch, rtp->nat_keepalive ? rtp->nat_keepalive : 30000);
293 	tmp->range_start = 0;
294 	tmp->range_end = 0;
295 	return tmp;
296 }
297 
rtpin_stream_new(GF_RTPIn * rtp,GF_SDPMedia * media,GF_SDPInfo * sdp,GF_RTPInStream * input_stream)298 GF_RTPInStream *rtpin_stream_new(GF_RTPIn *rtp, GF_SDPMedia *media, GF_SDPInfo *sdp, GF_RTPInStream *input_stream)
299 {
300 	GF_RTSPRange *range;
301 	GF_RTPInStream *tmp;
302 	GF_RTPMap *map;
303 	u32 i, ESID, ODID;
304 	Bool force_bcast = GF_FALSE;
305 	Double Start, End;
306 	u16 rvc_predef = 0;
307 	char *rvc_config_att = NULL;
308 	GF_X_Attribute *att;
309 	char *ctrl;
310 	GF_SDPConnection *conn;
311 	GF_RTSPTransport trans;
312 	u32 mid, prev_stream, base_stream;
313 
314 	//extract all relevant info from the GF_SDPMedia
315 	Start = 0.0;
316 	End = -1.0;
317 	ODID = 0;
318 	ESID = 0;
319 	ctrl = NULL;
320 	range = NULL;
321 	mid = prev_stream = base_stream = 0;
322 	i=0;
323 	while ((att = (GF_X_Attribute*)gf_list_enum(media->Attributes, &i))) {
324 		if (!stricmp(att->Name, "control")) ctrl = att->Value;
325 		else if (!stricmp(att->Name, "gpac-broadcast")) force_bcast = GF_TRUE;
326 		else if (!stricmp(att->Name, "mpeg4-esid") && att->Value) ESID = atoi(att->Value);
327 		else if (!stricmp(att->Name, "mpeg4-odid") && att->Value) ODID = atoi(att->Value);
328 		else if (!stricmp(att->Name, "range") && !range) range = gf_rtsp_range_parse(att->Value);
329 		else if (!stricmp(att->Name, "rvc-config-predef") && att->Value) {
330 			rvc_predef = atoi(att->Value);
331 		} else if (!stricmp(att->Name, "rvc-config")) {
332 			rvc_config_att = att->Value;
333 		} else if (!stricmp(att->Name, "mid")) {
334 			sscanf(att->Value, "L%d", &mid);
335 		} else if (!stricmp(att->Name, "depend")) {
336 			char buf[3000];
337 			memset(buf, 0, 3000);
338 			sscanf(att->Value, "%*d lay L%d %*s %2999s", &base_stream, buf);
339 			if (!strlen(buf))
340 				sscanf(att->Value, "%*d lay %2999s", buf);
341 			sscanf(buf, "L%d", &prev_stream);
342 		}
343 	}
344 
345 	if (range) {
346 		Start = range->start;
347 		End = range->end;
348 		gf_rtsp_range_del(range);
349 	}
350 
351 	/*check connection*/
352 	conn = sdp->c_connection;
353 	if (conn && (!conn->host || !strcmp(conn->host, "0.0.0.0"))) conn = NULL;
354 
355 	if (!conn) conn = (GF_SDPConnection*)gf_list_get(media->Connections, 0);
356 	if (conn && (!conn->host || !strcmp(conn->host, "0.0.0.0"))) conn = NULL;
357 
358 	if (!conn) {
359 		/*RTSP RFC recommends an empty "c= " line but some server don't send it. Use session info (o=)*/
360 		if (!sdp->o_net_type || !sdp->o_add_type || strcmp(sdp->o_net_type, "IN")) return NULL;
361 		if (strcmp(sdp->o_add_type, "IP4") && strcmp(sdp->o_add_type, "IP6")) return NULL;
362 	} else {
363 		if (strcmp(conn->net_type, "IN")) return NULL;
364 		if (strcmp(conn->add_type, "IP4") && strcmp(conn->add_type, "IP6")) return NULL;
365 	}
366 	/*do we support transport*/
367 	if (strcmp(media->Profile, "RTP/AVP") && strcmp(media->Profile, "RTP/AVP/TCP")
368 	        && strcmp(media->Profile, "RTP/SAVP") && strcmp(media->Profile, "RTP/SAVP/TCP")
369 	   ) return NULL;
370 
371 	/*check RTP map. For now we only support 1 RTPMap*/
372 	if (gf_list_count(media->RTPMaps) > 1) return NULL;
373 
374 	/*check payload type*/
375 	map = (GF_RTPMap*)gf_list_get(media->RTPMaps, 0);
376 	if (!map) {
377 		if (!media->fmt_list) return NULL;
378 		if (strchr(media->fmt_list, ' ')) return NULL;
379 	}
380 
381 	/*this is an ESD-URL setup, we likely have namespace conflicts so overwrite given ES_ID
382 	by the app one (client side), but keep control (server side) if provided*/
383 	if (input_stream) {
384 		ESID = input_stream->ES_ID;
385 		if (!ctrl) ctrl = input_stream->control;
386 		tmp = input_stream;
387 	} else {
388 		tmp = rtpin_find_stream(rtp, NULL, ESID, NULL, GF_FALSE);
389 		if (tmp) return NULL;
390 
391 		GF_SAFEALLOC(tmp, GF_RTPInStream);
392 		if (!tmp) return NULL;
393 		tmp->rtpin = rtp;
394 		tmp->buffer = gf_malloc(sizeof(char) * rtp->block_size);
395 	}
396 
397 	/*create an RTP channel*/
398 	tmp->rtp_ch = gf_rtp_new();
399 	if (ctrl) tmp->control = gf_strdup(ctrl);
400 	tmp->ES_ID = ESID;
401 	tmp->OD_ID = ODID;
402 	tmp->mid = mid;
403 	tmp->prev_stream = prev_stream;
404 	tmp->base_stream = base_stream;
405 
406 	memset(&trans, 0, sizeof(GF_RTSPTransport));
407 	trans.Profile = media->Profile;
408 	trans.source = conn ? conn->host : sdp->o_address;
409 	trans.IsUnicast = gf_sk_is_multicast_address(trans.source) ? GF_FALSE : GF_TRUE;
410 	if (!trans.IsUnicast) {
411 		trans.port_first = media->PortNumber;
412 		trans.port_last = media->PortNumber + 1;
413 		trans.TTL = conn ? conn->TTL : 0;
414 	} else {
415 		trans.client_port_first = media->PortNumber;
416 		trans.client_port_last = media->PortNumber + 1;
417 		trans.port_first = trans.client_port_first;
418 		trans.port_last = trans.client_port_last;
419 	}
420 
421 	if (gf_rtp_setup_transport(tmp->rtp_ch, &trans, NULL) != GF_OK) {
422 		rtpin_stream_del(tmp);
423 		return NULL;
424 	}
425 	/*setup depacketizer*/
426 	tmp->depacketizer = gf_rtp_depacketizer_new(media, 0, rtp_sl_packet_cbk, tmp);
427 	if (!tmp->depacketizer) {
428 		GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Failed to create RTP depacketizer for payload type %d/%s - ignoring stream)\n", map ? map->PayloadType : 0, media->fmt_list ? media->fmt_list : ""));
429 		rtpin_stream_del(tmp);
430 		return NULL;
431 	}
432 	/*setup channel*/
433 	gf_rtp_setup_payload(tmp->rtp_ch, map ? map->PayloadType : tmp->depacketizer->payt, tmp->depacketizer->clock_rate);
434 
435 	if (tmp->depacketizer->sl_map.IndexDeltaLength) {
436 		tmp->pck_queue = gf_list_new();
437 	}
438 
439 //	tmp->status = NM_Disconnected;
440 
441 	if (!rtp->disable_rtcp) tmp->flags |= RTP_ENABLE_RTCP;
442 
443 	/*setup NAT keep-alive*/
444 	if (rtp->nat_keepalive) gf_rtp_enable_nat_keepalive(tmp->rtp_ch, rtp->nat_keepalive);
445 
446 	tmp->range_start = Start;
447 	tmp->range_end = End;
448 	if (End != -1.0) tmp->flags |= RTP_HAS_RANGE;
449 
450 	if (force_bcast) tmp->flags |= RTP_FORCE_BROADCAST;
451 
452 	if (rvc_predef) {
453 		tmp->depacketizer->sl_map.rvc_predef = rvc_predef ;
454 	} else if (rvc_config_att) {
455 		char *rvc_data=NULL;
456 		u32 rvc_size=0;
457 		Bool is_gz = GF_FALSE;
458 		if (!strncmp(rvc_config_att, "data:application/rvc-config+xml", 32) && strstr(rvc_config_att, "base64") ) {
459 			char *data = strchr(rvc_config_att, ',');
460 			if (data) {
461 				rvc_size = (u32) strlen(data) * 3 / 4 + 1;
462 				rvc_data = (char*)gf_malloc(sizeof(char) * rvc_size);
463 				rvc_size = gf_base64_decode(data, (u32) strlen(data), rvc_data, rvc_size);
464 				rvc_data[rvc_size] = 0;
465 			}
466 			if (!strncmp(rvc_config_att, "data:application/rvc-config+xml+gz", 35)) is_gz = GF_TRUE;
467 		} else if (!strnicmp(rvc_config_att, "http://", 7) || !strnicmp(rvc_config_att, "https://", 8) ) {
468 			return NULL;
469 		}
470 		if (rvc_data) {
471 			if (is_gz) {
472 #ifdef GPAC_DISABLE_ZLIB
473 				GF_LOG(GF_LOG_WARNING, GF_LOG_MEDIA, ("Error: no zlib support - RVC not supported in RTP\n"));
474 				gf_free(rvc_data);
475 				return NULL;
476 #else
477 				gf_gz_decompress_payload(rvc_data, rvc_size, &tmp->depacketizer->sl_map.rvc_config, &tmp->depacketizer->sl_map.rvc_config_size);
478 				gf_free(rvc_data);
479 #endif
480 			} else {
481 				tmp->depacketizer->sl_map.rvc_config = rvc_data;
482 				tmp->depacketizer->sl_map.rvc_config_size = rvc_size;
483 			}
484 		}
485 	}
486 
487 	return tmp;
488 }
489 
rtpin_stream_update_stats(GF_RTPInStream * stream)490 static void rtpin_stream_update_stats(GF_RTPInStream *stream)
491 {
492 	u32 time;
493 	Float bps;
494 	if (!stream->rtp_ch) return;
495 
496 	gf_filter_pid_set_info_str(stream->opid, "nets:loss", &PROP_FLOAT( gf_rtp_get_loss(stream->rtp_ch) ) );
497 	if (stream->rtsp && (stream->flags & RTP_INTERLEAVED)) {
498 		gf_filter_pid_set_info_str(stream->opid, "nets:interleaved", &PROP_UINT( gf_rtsp_get_session_port(stream->rtsp->session) ) );
499 		gf_filter_pid_set_info_str(stream->opid, "nets:rtpid", &PROP_UINT( gf_rtp_get_low_interleave_id(stream->rtp_ch) ) );
500 		gf_filter_pid_set_info_str(stream->opid, "nets:rctpid", &PROP_UINT( gf_rtp_get_hight_interleave_id(stream->rtp_ch) ) );
501 
502 	} else {
503 		gf_filter_pid_set_info_str(stream->opid, "nets:rtpp", &PROP_UINT( stream->rtp_ch->net_info.client_port_first ) );
504 		gf_filter_pid_set_info_str(stream->opid, "nets:rtcpp", &PROP_UINT( stream->rtp_ch->net_info.client_port_last ) );
505 	}
506 	if (stream->stat_stop_time) {
507 		time = stream->stat_stop_time - stream->stat_start_time;
508 	} else {
509 		time = gf_sys_clock() - stream->stat_start_time;
510 	}
511 	if (!time) return;
512 
513 	bps = 8.0f * stream->rtp_bytes;
514 	bps *= 1000;
515 	bps /= time;
516 	gf_filter_pid_set_info_str(stream->opid, "nets:bw_down", &PROP_UINT( (u32) bps ) );
517 
518 	bps = 8.0f * stream->rtcp_bytes;
519 	bps *= 1000;
520 	bps /= time;
521 	gf_filter_pid_set_info_str(stream->opid, "nets:ctrl_bw_down", &PROP_UINT( (u32) bps ) );
522 
523 	bps = 8.0f * gf_rtp_get_tcp_bytes_sent(stream->rtp_ch);
524 	bps *= 1000;
525 	bps /= time;
526 	gf_filter_pid_set_info_str(stream->opid, "nets:ctrl_bw_up", &PROP_UINT( (u32) bps ) );
527 }
528 
529 
rtpin_stream_on_rtp_pck(GF_RTPInStream * stream,char * pck,u32 size)530 void rtpin_stream_on_rtp_pck(GF_RTPInStream *stream, char *pck, u32 size)
531 {
532 	GF_Err e;
533 	GF_RTPHeader hdr;
534 	u32 PayloadStart;
535 	stream->rtp_bytes += size;
536 	stream->first_in_rtp_pck = GF_TRUE;
537 
538 	/*first decode RTP*/
539 	e = gf_rtp_decode_rtp(stream->rtp_ch, pck, size, &hdr, &PayloadStart);
540 
541 	/*corrupted or NULL data*/
542 	if (e || (PayloadStart >= size)) {
543 		//gf_service_send_packet(stream->rtpin->service, stream->opid, NULL, 0, NULL, GF_CORRUPTED_DATA);
544 		return;
545 	}
546 	if (!stream->opid && !stream->depacketizer) {
547 		stream->depacketizer = gf_rtp_depacketizer_new(NULL, hdr.PayloadType, rtp_sl_packet_cbk, stream);
548 		if (!stream->depacketizer) {
549 			return;
550 		}
551 		stream->rtp_ch->PayloadType = hdr.PayloadType;
552 		stream->rtp_ch->TimeScale = stream->depacketizer->clock_rate;
553 		rtpin_declare_pid(stream, GF_FALSE, 0, NULL);
554 	}
555 	if (!stream->depacketizer) {
556 		return;
557 	}
558 
559 	/*first we only work with one payload type...*/
560 	if (hdr.PayloadType != stream->rtp_ch->PayloadType) {
561 		GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Mismatched in packet payload type %d vs channel payload type %d, only one payload type per channel is currently supported\n", hdr.PayloadType, stream->rtp_ch->PayloadType));
562 	}
563 
564 	//pure RTP, remap all streams to 0, adjust sync later
565 	if (!stream->rtsp) {
566 		if (!stream->first_rtp_ts || (hdr.TimeStamp < stream->first_rtp_ts-1) )
567 			stream->first_rtp_ts = 1 + hdr.TimeStamp;
568 	}
569 
570 	/*if we must notify some timing, do it now. */
571 	if (stream->check_rtp_time) {
572 		Double ch_time;
573 
574 		/*it may happen that we still receive packets from a previous "play" request. If this is the case,
575 		filter until we reach the indicated rtptime*/
576 		if (stream->rtp_ch->rtp_time
577 		        && (stream->rtp_ch->rtp_first_SN > hdr.SequenceNumber)
578 		        && (stream->rtp_ch->rtp_time > hdr.TimeStamp)
579 		   ) {
580 			GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Rejecting too early packet (TS %d vs signaled rtp time %d - diff %d ms)\n",
581 			                                    hdr.TimeStamp, stream->rtp_ch->rtp_time, ((hdr.TimeStamp - stream->rtp_ch->rtp_time)*1000) / stream->rtp_ch->TimeScale));
582 			return;
583 		}
584 
585 		ch_time = gf_rtp_get_current_time(stream->rtp_ch);
586 
587 		/*this is the first packet on the channel (no PAUSE)*/
588 		if (stream->check_rtp_time == RTP_SET_TIME_RTP) {
589 			Double media_time = 0;
590 			if (stream->rtsp) {
591 				media_time = stream->current_start + ch_time;
592 			}
593 			gf_filter_pid_set_property_str(stream->opid, "time:timestamp", &PROP_LONGUINT(hdr.TimeStamp) );
594 			gf_filter_pid_set_property_str(stream->opid, "time:media", &PROP_DOUBLE(media_time) );
595 
596 			GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTP] Mapping RTP Time seq %d TS %d Media Time %g - rtp info seq %d TS %d\n",
597 			                                 hdr.SequenceNumber, hdr.TimeStamp, stream->current_start + ch_time, stream->rtp_ch->rtp_first_SN, stream->rtp_ch->rtp_time
598 			                                ));
599 
600 			/*skip RTCP clock init when RTSP is used*/
601 			if (stream->rtsp) stream->rtcp_init = GF_TRUE;
602 
603 			if (stream->depacketizer->payt==GF_RTP_PAYT_H264_AVC) stream->depacketizer->flags |= GF_RTP_AVC_WAIT_RAP;
604 		}
605 		/*this is RESUME on channel, filter packet based on time (darwin seems to send
606 		couple of packet before)
607 		do not fetch if we're below 10 ms or <0, because this means we already have
608 		this packet - as the PAUSE is issued with the RTP currentTime*/
609 		else if (ch_time <= 0.021) {
610 			return;
611 		}
612 		stream->check_rtp_time = RTP_SET_TIME_NONE;
613 	}
614 
615 	gf_rtp_depacketizer_process(stream->depacketizer, &hdr, pck + PayloadStart, size - PayloadStart);
616 
617 	/*last check: signal EOS if we're close to end range in case the server do not send RTCP BYE*/
618 	if ((stream->flags & RTP_HAS_RANGE) && !(stream->flags & RTP_EOS) ) {
619 		/*also check last CTS*/
620 		Double ts = (Double) ((u32) stream->depacketizer->sl_hdr.compositionTimeStamp - hdr.TimeStamp);
621 		ts /= gf_rtp_get_clockrate(stream->rtp_ch);
622 		if (ABSDIFF(stream->range_end, (ts + stream->current_start + gf_rtp_get_current_time(stream->rtp_ch)) ) < 0.2) {
623 			stream->flags |= RTP_EOS;
624 			stream->stat_stop_time = gf_sys_clock();
625 			gf_filter_pid_set_eos(stream->opid);
626 		}
627 	}
628 
629 	if (stream->rtpin->stats) {
630 		u32 now = gf_sys_clock();
631 		if (stream->last_stats_time + stream->rtpin->stats < now) {
632 			stream->last_stats_time = now;
633 			rtpin_stream_update_stats(stream);
634 		}
635 	}
636 }
637 
rtpin_adjust_sync(GF_RTPIn * ctx)638 static void rtpin_adjust_sync(GF_RTPIn *ctx)
639 {
640 	u32 i, count = gf_list_count(ctx->streams);
641 	u64 max_ntp_us = 0;
642 
643 	for (i=0; i<count; i++) {
644 		GF_RTPInStream *stream = gf_list_get(ctx->streams, i);
645 		if (!stream->rtcp_init) return;
646 
647 		if (max_ntp_us < stream->init_ntp_us) {
648 			max_ntp_us = stream->init_ntp_us;
649 		}
650 	}
651 
652 	for (i=0; i<count; i++) {
653 		s64 ntp_diff_us;
654 		GF_RTPInStream *stream = gf_list_get(ctx->streams, i);
655 
656 		ntp_diff_us = max_ntp_us;
657 		ntp_diff_us -= stream->init_ntp_us;
658 		ntp_diff_us *= stream->rtp_ch->TimeScale;
659 		ntp_diff_us /= 1000000;
660 		stream->ts_adjust = ntp_diff_us;
661 	}
662 }
663 
rtpin_stream_on_rtcp_pck(GF_RTPInStream * stream,char * pck,u32 size)664 static void rtpin_stream_on_rtcp_pck(GF_RTPInStream *stream, char *pck, u32 size)
665 {
666 	Bool has_sr;
667 	GF_Err e;
668 
669 	if (stream->status == RTP_Connected) return;
670 	//not configured yes
671 	if (!stream->rtp_ch->TimeScale) return;
672 	stream->rtcp_bytes += size;
673 
674 	e = gf_rtp_decode_rtcp(stream->rtp_ch, pck, size, &has_sr);
675 	if (e<0) return;
676 
677 	/*update sync if on pure RTP*/
678 	if (!stream->rtcp_init && has_sr) {
679 		u64 ntp_clock_us, rtp_diff_us;
680 
681 		ntp_clock_us = stream->rtp_ch->last_SR_NTP_frac;
682 		ntp_clock_us *= 1000000;
683 		ntp_clock_us /= 0xFFFFFFFF;
684 		ntp_clock_us += 1000000 * (u64) stream->rtp_ch->last_SR_NTP_sec;
685 
686 		//ntp time at origin: ntp(now) - ntp(first_pck) = rtpts(now) - rtpts(orig) -> ntp_first = ntp - rtp_diff
687 		rtp_diff_us = stream->rtp_ch->last_SR_rtp_time - (stream->first_rtp_ts - 1);
688 		rtp_diff_us *= 1000000;
689 		rtp_diff_us /= stream->rtp_ch->TimeScale;
690 
691 		stream->init_ntp_us = ntp_clock_us - rtp_diff_us;
692 
693 
694 		GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTCP] At %d Using Sender Report to map RTP TS %d to NTP clock "LLU" us - NTP origin "LLU"\n", gf_sys_clock(), stream->rtp_ch->last_SR_rtp_time, ntp_clock_us, stream->init_ntp_us));
695 
696 		stream->rtcp_init = GF_TRUE;
697 
698 		if (stream->rtpin->rtcpsync)
699 			rtpin_adjust_sync(stream->rtpin);
700 	}
701 
702 	if (e == GF_EOS) {
703 		stream->flags |= RTP_EOS;
704 	}
705 }
706 
rtpin_rtsp_data_cbk(GF_RTSPSession * sess,void * cbk,u8 * buffer,u32 bufferSize,Bool IsRTCP)707 GF_Err rtpin_rtsp_data_cbk(GF_RTSPSession *sess, void *cbk, u8 *buffer, u32 bufferSize, Bool IsRTCP)
708 {
709 	GF_RTPInStream *stream = (GF_RTPInStream *) cbk;
710 	if (!stream || stream->rtpin->done) return GF_OK;
711 	if (IsRTCP) {
712 		rtpin_stream_on_rtcp_pck(stream, buffer, bufferSize);
713 	} else {
714 		rtpin_stream_on_rtp_pck(stream, buffer, bufferSize);
715 	}
716 	return GF_OK;
717 }
718 
719 
720 
rtpin_stream_read(GF_RTPInStream * stream)721 u32 rtpin_stream_read(GF_RTPInStream *stream)
722 {
723 	u32 size, tot_size = 0;
724 
725 	if (!stream->rtp_ch) return 0;
726 	if (gf_sk_group_sock_is_set(stream->rtpin->sockgroup, stream->rtp_ch->rtcp, GF_SK_SELECT_READ)) {
727 		size = gf_rtp_read_rtcp(stream->rtp_ch, stream->buffer, stream->rtpin->block_size);
728 		if (size) {
729 			tot_size += size;
730 			rtpin_stream_on_rtcp_pck(stream, stream->buffer, size);
731 		}
732 	}
733 
734 	if (gf_sk_group_sock_is_set(stream->rtpin->sockgroup, stream->rtp_ch->rtp, GF_SK_SELECT_READ)) {
735 		size = gf_rtp_read_rtp(stream->rtp_ch, stream->buffer, stream->rtpin->block_size);
736 		if (size) {
737 			tot_size += size;
738 			stream->rtpin->udp_timeout = 0;
739 			rtpin_stream_on_rtp_pck(stream, stream->buffer, size);
740 		}
741 	}
742 	if (!tot_size) return 0;
743 
744 	/*and send the report*/
745 	if (stream->flags & RTP_ENABLE_RTCP) gf_rtp_send_rtcp_report(stream->rtp_ch);
746 
747 	/*detect timeout*/
748 	if (stream->rtpin->udp_timeout) {
749 		if (!stream->last_udp_time) {
750 			stream->last_udp_time = gf_sys_clock();
751 		} else if (stream->rtp_ch->net_info.IsUnicast) {
752 			u32 diff = gf_sys_clock() - stream->last_udp_time;
753 			if (diff >= stream->rtpin->udp_timeout) {
754 				char szMessage[1024];
755 				GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] UDP Timeout after %d ms\n", diff));
756 				sprintf(szMessage, "No data received in %d ms%s", diff, stream->rtpin->autortsp ? ", retrying using TCP" : "");
757 				rtpin_send_message(stream->rtpin, GF_IP_UDP_TIMEOUT, szMessage);
758 				if (stream->rtpin->autortsp)
759 					stream->rtpin->retry_tcp = GF_TRUE;
760 			}
761 		}
762 	}
763 	return tot_size;
764 }
765 
766 #endif /*GPAC_DISABLE_STREAMING*/
767