1 /*
2 * GPAC - Multimedia Framework C SDK
3 *
4 * Authors: Jean Le Feuvre
5 * Copyright (c) Telecom ParisTech 2000-2017
6 * All rights reserved
7 *
8 * This file is part of GPAC / RTP/RTSP input filter
9 *
10 * GPAC is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation; either version 2, or (at your option)
13 * any later version.
14 *
15 * GPAC is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; see the file COPYING. If not, write to
22 * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 *
24 */
25
26 #include "in_rtp.h"
27
28
29 #ifndef GPAC_DISABLE_STREAMING
30
rtpin_stream_ack_connect(GF_RTPInStream * stream,GF_Err e)31 void rtpin_stream_ack_connect(GF_RTPInStream *stream, GF_Err e)
32 {
33 /*in case the channel has been disconnected while SETUP was issued&processed. We also could
34 clean up the command stack*/
35 if (!stream->opid) return;
36
37 if (e != GF_OK || !stream->rtp_ch) return;
38 }
39
rtpin_stream_init(GF_RTPInStream * stream,Bool ResetOnly)40 GF_Err rtpin_stream_init(GF_RTPInStream *stream, Bool ResetOnly)
41 {
42 gf_rtp_depacketizer_reset(stream->depacketizer, !ResetOnly);
43
44 if (!ResetOnly) {
45 GF_Err e;
46 const char *ip_ifce = NULL;
47 if (!stream->rtpin->interleave) {
48 ip_ifce = stream->rtpin->ifce;
49 }
50 if (stream->rtp_ch->rtp)
51 gf_sk_group_unregister(stream->rtpin->sockgroup, stream->rtp_ch->rtp);
52 if (stream->rtp_ch->rtcp)
53 gf_sk_group_unregister(stream->rtpin->sockgroup, stream->rtp_ch->rtcp);
54
55 e = gf_rtp_initialize(stream->rtp_ch, stream->rtpin->block_size, GF_FALSE, 0, stream->rtpin->reorder_len, stream->rtpin->reorder_delay, (char *)ip_ifce);
56 if (e) return e;
57
58 if (stream->rtp_ch->rtp)
59 gf_sk_group_register(stream->rtpin->sockgroup, stream->rtp_ch->rtp);
60 if (stream->rtp_ch->rtcp)
61 gf_sk_group_register(stream->rtpin->sockgroup, stream->rtp_ch->rtcp);
62
63 }
64 //just reset the sockets
65 gf_rtp_reset_buffers(stream->rtp_ch);
66 return GF_OK;
67 }
68
rtpin_stream_reset_queue(GF_RTPInStream * stream)69 void rtpin_stream_reset_queue(GF_RTPInStream *stream)
70 {
71 if (!stream->pck_queue) return;
72 while (gf_list_count(stream->pck_queue)) {
73 GF_FilterPacket *pck = gf_list_pop_back(stream->pck_queue);
74 gf_filter_pck_discard(pck);
75 }
76 }
77
rtpin_stream_del(GF_RTPInStream * stream)78 void rtpin_stream_del(GF_RTPInStream *stream)
79 {
80 if (stream->rtsp) {
81 if (stream->status == RTP_Running) {
82 rtpin_rtsp_teardown(stream->rtsp, stream);
83 stream->status = RTP_Disconnected;
84 }
85 rtpin_remove_stream(stream->rtpin, stream);
86 } else {
87 rtpin_find_stream(stream->rtpin, stream->opid, 0, NULL, GF_TRUE);
88 }
89
90 if (stream->depacketizer) gf_rtp_depacketizer_del(stream->depacketizer);
91 if (stream->rtp_ch) gf_rtp_del(stream->rtp_ch);
92 if (stream->control) gf_free(stream->control);
93 if (stream->session_id) gf_free(stream->session_id);
94 if (stream->buffer) gf_free(stream->buffer);
95 if (stream->pck_queue) {
96 rtpin_stream_reset_queue(stream);
97 gf_list_del(stream->pck_queue);
98 }
99 gf_free(stream);
100 }
101
rtpin_stream_queue_pck(GF_RTPInStream * stream,GF_FilterPacket * pck,u64 cts)102 static void rtpin_stream_queue_pck(GF_RTPInStream *stream, GF_FilterPacket *pck, u64 cts)
103 {
104 //get all packets in queue, dispatch all packets with a cts less than the cts of the packet being queued
105 //if this is teh first packet in the RTP packet
106 //This is only used for MPEG4, and we are sure we only have [AU(n),AU(n+i)..], [AU(n+j), AU(n+k) ...]
107 //with i,j,k >0
108 while (stream->first_in_rtp_pck) {
109 u64 prev_cts;
110 GF_FilterPacket *prev = gf_list_get(stream->pck_queue, 0);
111 if (!prev) break;
112 prev_cts = gf_filter_pck_get_cts(prev);
113 if (prev_cts>cts) break;
114 gf_list_rem(stream->pck_queue, 0);
115 gf_filter_pck_send(prev);
116 }
117 stream->first_in_rtp_pck = GF_FALSE;
118 gf_list_add(stream->pck_queue, pck);
119 }
120
rtp_sl_packet_cbk(void * udta,u8 * payload,u32 size,GF_SLHeader * hdr,GF_Err e)121 static void rtp_sl_packet_cbk(void *udta, u8 *payload, u32 size, GF_SLHeader *hdr, GF_Err e)
122 {
123 u64 cts, dts;
124 s64 diff;
125 GF_FilterPacket *pck;
126 u8 *pck_data;
127 GF_RTPInStream *stream = (GF_RTPInStream *)udta;
128
129
130 if (!stream->rtcp_init) {
131 if (!stream->rtcp_check_start) {
132 GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] No RTCP packet received yet, synchronization might be wrong for a while\n"));
133 stream->rtcp_check_start = gf_sys_clock();
134 }
135 else if (gf_sys_clock() - stream->rtcp_check_start <= stream->rtpin->rtcp_timeout) {
136 GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] No RTCP packet received yet, synchronization might be wrong for a while\n"));
137 } else {
138 GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Timeout for RTCP: no SR recevied after %d ms - sync may be broken\n", stream->rtpin->rtcp_timeout));
139 stream->rtcp_init = GF_TRUE;
140 }
141 }
142
143 if (stream->rtpin->first_packet_drop && (hdr->packetSequenceNumber >= stream->rtpin->first_packet_drop) ) {
144 if (! ( (hdr->packetSequenceNumber - stream->rtpin->first_packet_drop) % stream->rtpin->frequency_drop) )
145 return;
146 }
147
148 cts = hdr->compositionTimeStamp;
149 dts = hdr->decodingTimeStamp;
150
151 hdr->seekFlag = 0;
152 hdr->compositionTimeStamp += stream->ts_adjust;
153 if (stream->first_rtp_ts) {
154 assert(hdr->compositionTimeStamp >= stream->first_rtp_ts - 1);
155 hdr->compositionTimeStamp -= stream->first_rtp_ts - 1;
156 }
157
158 if (hdr->decodingTimeStamp) {
159 hdr->decodingTimeStamp += stream->ts_adjust;
160 if (stream->first_rtp_ts) {
161 assert(hdr->decodingTimeStamp >= stream->first_rtp_ts - 1);
162 hdr->decodingTimeStamp -= stream->first_rtp_ts - 1;
163 }
164 }
165
166 pck = gf_filter_pck_new_alloc(stream->opid, size, &pck_data);
167 memcpy(pck_data, payload, size);
168 if (hdr->decodingTimeStampFlag)
169 gf_filter_pck_set_dts(pck, hdr->decodingTimeStamp);
170
171 gf_filter_pck_set_cts(pck, hdr->compositionTimeStamp);
172
173 diff = (s64) hdr->compositionTimeStamp - (s64) stream->prev_cts;
174 if ((stream->rtpin->max_sleep>0) && stream->prev_cts && diff) {
175 if (diff<0) diff = -diff;
176 if (!stream->min_dur_rtp || (diff < stream->min_dur_rtp)) {
177 u64 dur = diff;
178 dur *= 1000;
179 dur /= stream->rtp_ch->TimeScale;
180 stream->min_dur_rtp = (u32) dur;
181 if (dur > stream->rtpin->max_sleep) dur = stream->rtpin->max_sleep;
182 if (!stream->rtpin->min_frame_dur_ms || ( (u32) dur < stream->rtpin->min_frame_dur_ms)) {
183 stream->rtpin->min_frame_dur_ms = (u32) dur;
184 }
185 }
186 }
187 stream->prev_cts = (u32) hdr->compositionTimeStamp;
188
189 gf_filter_pck_set_sap(pck, hdr->randomAccessPointFlag ? GF_FILTER_SAP_1 :GF_FILTER_SAP_NONE);
190 if (hdr->au_duration)
191 gf_filter_pck_set_duration(pck, hdr->au_duration);
192
193 if (hdr->samplerate && (hdr->samplerate != stream->sr)) {
194 stream->sr = hdr->samplerate;
195 gf_filter_pid_set_property(stream->opid, GF_PROP_PID_SAMPLE_RATE, &PROP_UINT(stream->sr));
196 }
197 if (hdr->channels && (hdr->channels != stream->nb_ch)) {
198 stream->nb_ch = hdr->channels;
199 gf_filter_pid_set_property(stream->opid, GF_PROP_PID_NUM_CHANNELS, &PROP_UINT(stream->nb_ch));
200 }
201 gf_filter_pck_set_framing(pck, hdr->accessUnitStartFlag, hdr->accessUnitEndFlag);
202
203 if (stream->rtp_ch->packet_loss)
204 gf_filter_pck_set_corrupted(pck, 1);
205
206 #if 0 //not yet implemented
207 if (hdr->seekFlag)
208 gf_filter_pck_set_seek_flag(pck, GF_TRUE);
209 #endif
210
211 if (stream->depacketizer->sl_map.IndexDeltaLength) {
212 rtpin_stream_queue_pck(stream, pck, hdr->compositionTimeStamp);
213 } else {
214 gf_filter_pck_send(pck);
215 }
216
217 hdr->compositionTimeStamp = cts;
218 hdr->decodingTimeStamp = dts;
219 }
220
rtpin_stream_new_satip(GF_RTPIn * rtp,const char * server_ip)221 GF_RTPInStream *rtpin_stream_new_satip(GF_RTPIn *rtp, const char *server_ip)
222 {
223 GF_RTSPTransport trans;
224 GF_RTPInStream *tmp;
225 GF_SAFEALLOC(tmp, GF_RTPInStream);
226 if (!tmp) return NULL;
227 tmp->rtpin = rtp;
228 tmp->buffer = gf_malloc(sizeof(char) * rtp->block_size);
229
230 /*create an RTP channel*/
231 tmp->rtp_ch = gf_rtp_new();
232 tmp->control = gf_strdup("*");
233
234 memset(&trans, 0, sizeof(GF_RTSPTransport));
235 trans.Profile = "RTP/AVP";
236 trans.source = (char *) server_ip;
237 trans.IsUnicast = GF_TRUE;
238 trans.client_port_first = 0;
239 trans.client_port_last = 0;
240 trans.port_first = 0;
241 trans.port_last = 0;
242
243 if (gf_rtp_setup_transport(tmp->rtp_ch, &trans, NULL) != GF_OK) {
244 rtpin_stream_del(tmp);
245 return NULL;
246 }
247
248 gf_rtp_setup_payload(tmp->rtp_ch, 33, 90000);
249
250 if (rtp->disable_rtcp) tmp->flags |= RTP_ENABLE_RTCP;
251
252 /*setup NAT keep-alive*/
253 gf_rtp_enable_nat_keepalive(tmp->rtp_ch, rtp->nat_keepalive ? rtp->nat_keepalive : 30000);
254
255 tmp->range_start = 0;
256 tmp->range_end = 0;
257
258 return tmp;
259 }
260
rtpin_stream_new_standalone(GF_RTPIn * rtp,const char * flow_ip,u32 port)261 GF_RTPInStream *rtpin_stream_new_standalone(GF_RTPIn *rtp, const char *flow_ip, u32 port)
262 {
263 GF_RTSPTransport trans;
264 GF_RTPInStream *tmp;
265 GF_SAFEALLOC(tmp, GF_RTPInStream);
266 if (!tmp) return NULL;
267 tmp->rtpin = rtp;
268 tmp->buffer = gf_malloc(sizeof(char) * rtp->block_size);
269
270 /*create an RTP channel*/
271 tmp->rtp_ch = gf_rtp_new();
272
273 memset(&trans, 0, sizeof(GF_RTSPTransport));
274 trans.Profile = "RTP/AVP";
275 trans.source = (char *) flow_ip;
276 trans.IsUnicast = gf_sk_is_multicast_address(flow_ip);
277 trans.client_port_first = port;
278 trans.client_port_last = port+1;
279 trans.port_first = port;
280 trans.port_last = port+1;
281
282 if (gf_rtp_setup_transport(tmp->rtp_ch, &trans, NULL) != GF_OK) {
283 rtpin_stream_del(tmp);
284 return NULL;
285 }
286
287 // gf_rtp_setup_payload(tmp->rtp_ch, 33, 90000);
288
289 if (rtp->disable_rtcp) tmp->flags |= RTP_ENABLE_RTCP;
290
291 /*setup NAT keep-alive*/
292 gf_rtp_enable_nat_keepalive(tmp->rtp_ch, rtp->nat_keepalive ? rtp->nat_keepalive : 30000);
293 tmp->range_start = 0;
294 tmp->range_end = 0;
295 return tmp;
296 }
297
rtpin_stream_new(GF_RTPIn * rtp,GF_SDPMedia * media,GF_SDPInfo * sdp,GF_RTPInStream * input_stream)298 GF_RTPInStream *rtpin_stream_new(GF_RTPIn *rtp, GF_SDPMedia *media, GF_SDPInfo *sdp, GF_RTPInStream *input_stream)
299 {
300 GF_RTSPRange *range;
301 GF_RTPInStream *tmp;
302 GF_RTPMap *map;
303 u32 i, ESID, ODID;
304 Bool force_bcast = GF_FALSE;
305 Double Start, End;
306 u16 rvc_predef = 0;
307 char *rvc_config_att = NULL;
308 GF_X_Attribute *att;
309 char *ctrl;
310 GF_SDPConnection *conn;
311 GF_RTSPTransport trans;
312 u32 mid, prev_stream, base_stream;
313
314 //extract all relevant info from the GF_SDPMedia
315 Start = 0.0;
316 End = -1.0;
317 ODID = 0;
318 ESID = 0;
319 ctrl = NULL;
320 range = NULL;
321 mid = prev_stream = base_stream = 0;
322 i=0;
323 while ((att = (GF_X_Attribute*)gf_list_enum(media->Attributes, &i))) {
324 if (!stricmp(att->Name, "control")) ctrl = att->Value;
325 else if (!stricmp(att->Name, "gpac-broadcast")) force_bcast = GF_TRUE;
326 else if (!stricmp(att->Name, "mpeg4-esid") && att->Value) ESID = atoi(att->Value);
327 else if (!stricmp(att->Name, "mpeg4-odid") && att->Value) ODID = atoi(att->Value);
328 else if (!stricmp(att->Name, "range") && !range) range = gf_rtsp_range_parse(att->Value);
329 else if (!stricmp(att->Name, "rvc-config-predef") && att->Value) {
330 rvc_predef = atoi(att->Value);
331 } else if (!stricmp(att->Name, "rvc-config")) {
332 rvc_config_att = att->Value;
333 } else if (!stricmp(att->Name, "mid")) {
334 sscanf(att->Value, "L%d", &mid);
335 } else if (!stricmp(att->Name, "depend")) {
336 char buf[3000];
337 memset(buf, 0, 3000);
338 sscanf(att->Value, "%*d lay L%d %*s %2999s", &base_stream, buf);
339 if (!strlen(buf))
340 sscanf(att->Value, "%*d lay %2999s", buf);
341 sscanf(buf, "L%d", &prev_stream);
342 }
343 }
344
345 if (range) {
346 Start = range->start;
347 End = range->end;
348 gf_rtsp_range_del(range);
349 }
350
351 /*check connection*/
352 conn = sdp->c_connection;
353 if (conn && (!conn->host || !strcmp(conn->host, "0.0.0.0"))) conn = NULL;
354
355 if (!conn) conn = (GF_SDPConnection*)gf_list_get(media->Connections, 0);
356 if (conn && (!conn->host || !strcmp(conn->host, "0.0.0.0"))) conn = NULL;
357
358 if (!conn) {
359 /*RTSP RFC recommends an empty "c= " line but some server don't send it. Use session info (o=)*/
360 if (!sdp->o_net_type || !sdp->o_add_type || strcmp(sdp->o_net_type, "IN")) return NULL;
361 if (strcmp(sdp->o_add_type, "IP4") && strcmp(sdp->o_add_type, "IP6")) return NULL;
362 } else {
363 if (strcmp(conn->net_type, "IN")) return NULL;
364 if (strcmp(conn->add_type, "IP4") && strcmp(conn->add_type, "IP6")) return NULL;
365 }
366 /*do we support transport*/
367 if (strcmp(media->Profile, "RTP/AVP") && strcmp(media->Profile, "RTP/AVP/TCP")
368 && strcmp(media->Profile, "RTP/SAVP") && strcmp(media->Profile, "RTP/SAVP/TCP")
369 ) return NULL;
370
371 /*check RTP map. For now we only support 1 RTPMap*/
372 if (gf_list_count(media->RTPMaps) > 1) return NULL;
373
374 /*check payload type*/
375 map = (GF_RTPMap*)gf_list_get(media->RTPMaps, 0);
376 if (!map) {
377 if (!media->fmt_list) return NULL;
378 if (strchr(media->fmt_list, ' ')) return NULL;
379 }
380
381 /*this is an ESD-URL setup, we likely have namespace conflicts so overwrite given ES_ID
382 by the app one (client side), but keep control (server side) if provided*/
383 if (input_stream) {
384 ESID = input_stream->ES_ID;
385 if (!ctrl) ctrl = input_stream->control;
386 tmp = input_stream;
387 } else {
388 tmp = rtpin_find_stream(rtp, NULL, ESID, NULL, GF_FALSE);
389 if (tmp) return NULL;
390
391 GF_SAFEALLOC(tmp, GF_RTPInStream);
392 if (!tmp) return NULL;
393 tmp->rtpin = rtp;
394 tmp->buffer = gf_malloc(sizeof(char) * rtp->block_size);
395 }
396
397 /*create an RTP channel*/
398 tmp->rtp_ch = gf_rtp_new();
399 if (ctrl) tmp->control = gf_strdup(ctrl);
400 tmp->ES_ID = ESID;
401 tmp->OD_ID = ODID;
402 tmp->mid = mid;
403 tmp->prev_stream = prev_stream;
404 tmp->base_stream = base_stream;
405
406 memset(&trans, 0, sizeof(GF_RTSPTransport));
407 trans.Profile = media->Profile;
408 trans.source = conn ? conn->host : sdp->o_address;
409 trans.IsUnicast = gf_sk_is_multicast_address(trans.source) ? GF_FALSE : GF_TRUE;
410 if (!trans.IsUnicast) {
411 trans.port_first = media->PortNumber;
412 trans.port_last = media->PortNumber + 1;
413 trans.TTL = conn ? conn->TTL : 0;
414 } else {
415 trans.client_port_first = media->PortNumber;
416 trans.client_port_last = media->PortNumber + 1;
417 trans.port_first = trans.client_port_first;
418 trans.port_last = trans.client_port_last;
419 }
420
421 if (gf_rtp_setup_transport(tmp->rtp_ch, &trans, NULL) != GF_OK) {
422 rtpin_stream_del(tmp);
423 return NULL;
424 }
425 /*setup depacketizer*/
426 tmp->depacketizer = gf_rtp_depacketizer_new(media, 0, rtp_sl_packet_cbk, tmp);
427 if (!tmp->depacketizer) {
428 GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Failed to create RTP depacketizer for payload type %d/%s - ignoring stream)\n", map ? map->PayloadType : 0, media->fmt_list ? media->fmt_list : ""));
429 rtpin_stream_del(tmp);
430 return NULL;
431 }
432 /*setup channel*/
433 gf_rtp_setup_payload(tmp->rtp_ch, map ? map->PayloadType : tmp->depacketizer->payt, tmp->depacketizer->clock_rate);
434
435 if (tmp->depacketizer->sl_map.IndexDeltaLength) {
436 tmp->pck_queue = gf_list_new();
437 }
438
439 // tmp->status = NM_Disconnected;
440
441 if (!rtp->disable_rtcp) tmp->flags |= RTP_ENABLE_RTCP;
442
443 /*setup NAT keep-alive*/
444 if (rtp->nat_keepalive) gf_rtp_enable_nat_keepalive(tmp->rtp_ch, rtp->nat_keepalive);
445
446 tmp->range_start = Start;
447 tmp->range_end = End;
448 if (End != -1.0) tmp->flags |= RTP_HAS_RANGE;
449
450 if (force_bcast) tmp->flags |= RTP_FORCE_BROADCAST;
451
452 if (rvc_predef) {
453 tmp->depacketizer->sl_map.rvc_predef = rvc_predef ;
454 } else if (rvc_config_att) {
455 char *rvc_data=NULL;
456 u32 rvc_size=0;
457 Bool is_gz = GF_FALSE;
458 if (!strncmp(rvc_config_att, "data:application/rvc-config+xml", 32) && strstr(rvc_config_att, "base64") ) {
459 char *data = strchr(rvc_config_att, ',');
460 if (data) {
461 rvc_size = (u32) strlen(data) * 3 / 4 + 1;
462 rvc_data = (char*)gf_malloc(sizeof(char) * rvc_size);
463 rvc_size = gf_base64_decode(data, (u32) strlen(data), rvc_data, rvc_size);
464 rvc_data[rvc_size] = 0;
465 }
466 if (!strncmp(rvc_config_att, "data:application/rvc-config+xml+gz", 35)) is_gz = GF_TRUE;
467 } else if (!strnicmp(rvc_config_att, "http://", 7) || !strnicmp(rvc_config_att, "https://", 8) ) {
468 return NULL;
469 }
470 if (rvc_data) {
471 if (is_gz) {
472 #ifdef GPAC_DISABLE_ZLIB
473 GF_LOG(GF_LOG_WARNING, GF_LOG_MEDIA, ("Error: no zlib support - RVC not supported in RTP\n"));
474 gf_free(rvc_data);
475 return NULL;
476 #else
477 gf_gz_decompress_payload(rvc_data, rvc_size, &tmp->depacketizer->sl_map.rvc_config, &tmp->depacketizer->sl_map.rvc_config_size);
478 gf_free(rvc_data);
479 #endif
480 } else {
481 tmp->depacketizer->sl_map.rvc_config = rvc_data;
482 tmp->depacketizer->sl_map.rvc_config_size = rvc_size;
483 }
484 }
485 }
486
487 return tmp;
488 }
489
rtpin_stream_update_stats(GF_RTPInStream * stream)490 static void rtpin_stream_update_stats(GF_RTPInStream *stream)
491 {
492 u32 time;
493 Float bps;
494 if (!stream->rtp_ch) return;
495
496 gf_filter_pid_set_info_str(stream->opid, "nets:loss", &PROP_FLOAT( gf_rtp_get_loss(stream->rtp_ch) ) );
497 if (stream->rtsp && (stream->flags & RTP_INTERLEAVED)) {
498 gf_filter_pid_set_info_str(stream->opid, "nets:interleaved", &PROP_UINT( gf_rtsp_get_session_port(stream->rtsp->session) ) );
499 gf_filter_pid_set_info_str(stream->opid, "nets:rtpid", &PROP_UINT( gf_rtp_get_low_interleave_id(stream->rtp_ch) ) );
500 gf_filter_pid_set_info_str(stream->opid, "nets:rctpid", &PROP_UINT( gf_rtp_get_hight_interleave_id(stream->rtp_ch) ) );
501
502 } else {
503 gf_filter_pid_set_info_str(stream->opid, "nets:rtpp", &PROP_UINT( stream->rtp_ch->net_info.client_port_first ) );
504 gf_filter_pid_set_info_str(stream->opid, "nets:rtcpp", &PROP_UINT( stream->rtp_ch->net_info.client_port_last ) );
505 }
506 if (stream->stat_stop_time) {
507 time = stream->stat_stop_time - stream->stat_start_time;
508 } else {
509 time = gf_sys_clock() - stream->stat_start_time;
510 }
511 if (!time) return;
512
513 bps = 8.0f * stream->rtp_bytes;
514 bps *= 1000;
515 bps /= time;
516 gf_filter_pid_set_info_str(stream->opid, "nets:bw_down", &PROP_UINT( (u32) bps ) );
517
518 bps = 8.0f * stream->rtcp_bytes;
519 bps *= 1000;
520 bps /= time;
521 gf_filter_pid_set_info_str(stream->opid, "nets:ctrl_bw_down", &PROP_UINT( (u32) bps ) );
522
523 bps = 8.0f * gf_rtp_get_tcp_bytes_sent(stream->rtp_ch);
524 bps *= 1000;
525 bps /= time;
526 gf_filter_pid_set_info_str(stream->opid, "nets:ctrl_bw_up", &PROP_UINT( (u32) bps ) );
527 }
528
529
rtpin_stream_on_rtp_pck(GF_RTPInStream * stream,char * pck,u32 size)530 void rtpin_stream_on_rtp_pck(GF_RTPInStream *stream, char *pck, u32 size)
531 {
532 GF_Err e;
533 GF_RTPHeader hdr;
534 u32 PayloadStart;
535 stream->rtp_bytes += size;
536 stream->first_in_rtp_pck = GF_TRUE;
537
538 /*first decode RTP*/
539 e = gf_rtp_decode_rtp(stream->rtp_ch, pck, size, &hdr, &PayloadStart);
540
541 /*corrupted or NULL data*/
542 if (e || (PayloadStart >= size)) {
543 //gf_service_send_packet(stream->rtpin->service, stream->opid, NULL, 0, NULL, GF_CORRUPTED_DATA);
544 return;
545 }
546 if (!stream->opid && !stream->depacketizer) {
547 stream->depacketizer = gf_rtp_depacketizer_new(NULL, hdr.PayloadType, rtp_sl_packet_cbk, stream);
548 if (!stream->depacketizer) {
549 return;
550 }
551 stream->rtp_ch->PayloadType = hdr.PayloadType;
552 stream->rtp_ch->TimeScale = stream->depacketizer->clock_rate;
553 rtpin_declare_pid(stream, GF_FALSE, 0, NULL);
554 }
555 if (!stream->depacketizer) {
556 return;
557 }
558
559 /*first we only work with one payload type...*/
560 if (hdr.PayloadType != stream->rtp_ch->PayloadType) {
561 GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Mismatched in packet payload type %d vs channel payload type %d, only one payload type per channel is currently supported\n", hdr.PayloadType, stream->rtp_ch->PayloadType));
562 }
563
564 //pure RTP, remap all streams to 0, adjust sync later
565 if (!stream->rtsp) {
566 if (!stream->first_rtp_ts || (hdr.TimeStamp < stream->first_rtp_ts-1) )
567 stream->first_rtp_ts = 1 + hdr.TimeStamp;
568 }
569
570 /*if we must notify some timing, do it now. */
571 if (stream->check_rtp_time) {
572 Double ch_time;
573
574 /*it may happen that we still receive packets from a previous "play" request. If this is the case,
575 filter until we reach the indicated rtptime*/
576 if (stream->rtp_ch->rtp_time
577 && (stream->rtp_ch->rtp_first_SN > hdr.SequenceNumber)
578 && (stream->rtp_ch->rtp_time > hdr.TimeStamp)
579 ) {
580 GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Rejecting too early packet (TS %d vs signaled rtp time %d - diff %d ms)\n",
581 hdr.TimeStamp, stream->rtp_ch->rtp_time, ((hdr.TimeStamp - stream->rtp_ch->rtp_time)*1000) / stream->rtp_ch->TimeScale));
582 return;
583 }
584
585 ch_time = gf_rtp_get_current_time(stream->rtp_ch);
586
587 /*this is the first packet on the channel (no PAUSE)*/
588 if (stream->check_rtp_time == RTP_SET_TIME_RTP) {
589 Double media_time = 0;
590 if (stream->rtsp) {
591 media_time = stream->current_start + ch_time;
592 }
593 gf_filter_pid_set_property_str(stream->opid, "time:timestamp", &PROP_LONGUINT(hdr.TimeStamp) );
594 gf_filter_pid_set_property_str(stream->opid, "time:media", &PROP_DOUBLE(media_time) );
595
596 GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTP] Mapping RTP Time seq %d TS %d Media Time %g - rtp info seq %d TS %d\n",
597 hdr.SequenceNumber, hdr.TimeStamp, stream->current_start + ch_time, stream->rtp_ch->rtp_first_SN, stream->rtp_ch->rtp_time
598 ));
599
600 /*skip RTCP clock init when RTSP is used*/
601 if (stream->rtsp) stream->rtcp_init = GF_TRUE;
602
603 if (stream->depacketizer->payt==GF_RTP_PAYT_H264_AVC) stream->depacketizer->flags |= GF_RTP_AVC_WAIT_RAP;
604 }
605 /*this is RESUME on channel, filter packet based on time (darwin seems to send
606 couple of packet before)
607 do not fetch if we're below 10 ms or <0, because this means we already have
608 this packet - as the PAUSE is issued with the RTP currentTime*/
609 else if (ch_time <= 0.021) {
610 return;
611 }
612 stream->check_rtp_time = RTP_SET_TIME_NONE;
613 }
614
615 gf_rtp_depacketizer_process(stream->depacketizer, &hdr, pck + PayloadStart, size - PayloadStart);
616
617 /*last check: signal EOS if we're close to end range in case the server do not send RTCP BYE*/
618 if ((stream->flags & RTP_HAS_RANGE) && !(stream->flags & RTP_EOS) ) {
619 /*also check last CTS*/
620 Double ts = (Double) ((u32) stream->depacketizer->sl_hdr.compositionTimeStamp - hdr.TimeStamp);
621 ts /= gf_rtp_get_clockrate(stream->rtp_ch);
622 if (ABSDIFF(stream->range_end, (ts + stream->current_start + gf_rtp_get_current_time(stream->rtp_ch)) ) < 0.2) {
623 stream->flags |= RTP_EOS;
624 stream->stat_stop_time = gf_sys_clock();
625 gf_filter_pid_set_eos(stream->opid);
626 }
627 }
628
629 if (stream->rtpin->stats) {
630 u32 now = gf_sys_clock();
631 if (stream->last_stats_time + stream->rtpin->stats < now) {
632 stream->last_stats_time = now;
633 rtpin_stream_update_stats(stream);
634 }
635 }
636 }
637
rtpin_adjust_sync(GF_RTPIn * ctx)638 static void rtpin_adjust_sync(GF_RTPIn *ctx)
639 {
640 u32 i, count = gf_list_count(ctx->streams);
641 u64 max_ntp_us = 0;
642
643 for (i=0; i<count; i++) {
644 GF_RTPInStream *stream = gf_list_get(ctx->streams, i);
645 if (!stream->rtcp_init) return;
646
647 if (max_ntp_us < stream->init_ntp_us) {
648 max_ntp_us = stream->init_ntp_us;
649 }
650 }
651
652 for (i=0; i<count; i++) {
653 s64 ntp_diff_us;
654 GF_RTPInStream *stream = gf_list_get(ctx->streams, i);
655
656 ntp_diff_us = max_ntp_us;
657 ntp_diff_us -= stream->init_ntp_us;
658 ntp_diff_us *= stream->rtp_ch->TimeScale;
659 ntp_diff_us /= 1000000;
660 stream->ts_adjust = ntp_diff_us;
661 }
662 }
663
rtpin_stream_on_rtcp_pck(GF_RTPInStream * stream,char * pck,u32 size)664 static void rtpin_stream_on_rtcp_pck(GF_RTPInStream *stream, char *pck, u32 size)
665 {
666 Bool has_sr;
667 GF_Err e;
668
669 if (stream->status == RTP_Connected) return;
670 //not configured yes
671 if (!stream->rtp_ch->TimeScale) return;
672 stream->rtcp_bytes += size;
673
674 e = gf_rtp_decode_rtcp(stream->rtp_ch, pck, size, &has_sr);
675 if (e<0) return;
676
677 /*update sync if on pure RTP*/
678 if (!stream->rtcp_init && has_sr) {
679 u64 ntp_clock_us, rtp_diff_us;
680
681 ntp_clock_us = stream->rtp_ch->last_SR_NTP_frac;
682 ntp_clock_us *= 1000000;
683 ntp_clock_us /= 0xFFFFFFFF;
684 ntp_clock_us += 1000000 * (u64) stream->rtp_ch->last_SR_NTP_sec;
685
686 //ntp time at origin: ntp(now) - ntp(first_pck) = rtpts(now) - rtpts(orig) -> ntp_first = ntp - rtp_diff
687 rtp_diff_us = stream->rtp_ch->last_SR_rtp_time - (stream->first_rtp_ts - 1);
688 rtp_diff_us *= 1000000;
689 rtp_diff_us /= stream->rtp_ch->TimeScale;
690
691 stream->init_ntp_us = ntp_clock_us - rtp_diff_us;
692
693
694 GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTCP] At %d Using Sender Report to map RTP TS %d to NTP clock "LLU" us - NTP origin "LLU"\n", gf_sys_clock(), stream->rtp_ch->last_SR_rtp_time, ntp_clock_us, stream->init_ntp_us));
695
696 stream->rtcp_init = GF_TRUE;
697
698 if (stream->rtpin->rtcpsync)
699 rtpin_adjust_sync(stream->rtpin);
700 }
701
702 if (e == GF_EOS) {
703 stream->flags |= RTP_EOS;
704 }
705 }
706
rtpin_rtsp_data_cbk(GF_RTSPSession * sess,void * cbk,u8 * buffer,u32 bufferSize,Bool IsRTCP)707 GF_Err rtpin_rtsp_data_cbk(GF_RTSPSession *sess, void *cbk, u8 *buffer, u32 bufferSize, Bool IsRTCP)
708 {
709 GF_RTPInStream *stream = (GF_RTPInStream *) cbk;
710 if (!stream || stream->rtpin->done) return GF_OK;
711 if (IsRTCP) {
712 rtpin_stream_on_rtcp_pck(stream, buffer, bufferSize);
713 } else {
714 rtpin_stream_on_rtp_pck(stream, buffer, bufferSize);
715 }
716 return GF_OK;
717 }
718
719
720
rtpin_stream_read(GF_RTPInStream * stream)721 u32 rtpin_stream_read(GF_RTPInStream *stream)
722 {
723 u32 size, tot_size = 0;
724
725 if (!stream->rtp_ch) return 0;
726 if (gf_sk_group_sock_is_set(stream->rtpin->sockgroup, stream->rtp_ch->rtcp, GF_SK_SELECT_READ)) {
727 size = gf_rtp_read_rtcp(stream->rtp_ch, stream->buffer, stream->rtpin->block_size);
728 if (size) {
729 tot_size += size;
730 rtpin_stream_on_rtcp_pck(stream, stream->buffer, size);
731 }
732 }
733
734 if (gf_sk_group_sock_is_set(stream->rtpin->sockgroup, stream->rtp_ch->rtp, GF_SK_SELECT_READ)) {
735 size = gf_rtp_read_rtp(stream->rtp_ch, stream->buffer, stream->rtpin->block_size);
736 if (size) {
737 tot_size += size;
738 stream->rtpin->udp_timeout = 0;
739 rtpin_stream_on_rtp_pck(stream, stream->buffer, size);
740 }
741 }
742 if (!tot_size) return 0;
743
744 /*and send the report*/
745 if (stream->flags & RTP_ENABLE_RTCP) gf_rtp_send_rtcp_report(stream->rtp_ch);
746
747 /*detect timeout*/
748 if (stream->rtpin->udp_timeout) {
749 if (!stream->last_udp_time) {
750 stream->last_udp_time = gf_sys_clock();
751 } else if (stream->rtp_ch->net_info.IsUnicast) {
752 u32 diff = gf_sys_clock() - stream->last_udp_time;
753 if (diff >= stream->rtpin->udp_timeout) {
754 char szMessage[1024];
755 GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] UDP Timeout after %d ms\n", diff));
756 sprintf(szMessage, "No data received in %d ms%s", diff, stream->rtpin->autortsp ? ", retrying using TCP" : "");
757 rtpin_send_message(stream->rtpin, GF_IP_UDP_TIMEOUT, szMessage);
758 if (stream->rtpin->autortsp)
759 stream->rtpin->retry_tcp = GF_TRUE;
760 }
761 }
762 }
763 return tot_size;
764 }
765
766 #endif /*GPAC_DISABLE_STREAMING*/
767