1 /*
2 * GPAC - Multimedia Framework C SDK
3 *
4 * Authors: Jean Le Feuvre
5 * Copyright (c) Telecom ParisTech 2000-2019
6 * All rights reserved
7 *
8 * Authors: Jean Le Feuvre
9 *
10 * This file is part of GPAC / mp4box application
11 *
12 * GPAC is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU Lesser General Public License as published by
14 * the Free Software Foundation; either version 2, or (at your option)
15 * any later version.
16 *
17 * GPAC is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU Lesser General Public License for more details.
21 *
22 * You should have received a copy of the GNU Lesser General Public
23 * License along with this library; see the file COPYING. If not, write to
24 * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 *
26 */
27
28
29 #include "mp4box.h"
30
31 #ifndef GPAC_DISABLE_SENG
32 #include <gpac/scene_engine.h>
33 #endif
34 #ifndef GPAC_DISABLE_STREAMING
35 #include <gpac/rtp_streamer.h>
36 #endif
37
38 #include <gpac/mpegts.h>
39
40 #if defined(GPAC_DISABLE_ISOM) || defined(GPAC_DISABLE_ISOM_WRITE)
41
42 #error "Cannot compile MP4Box if GPAC is not built with ISO File Format support"
43
44 #else
45
46 #if !defined(GPAC_DISABLE_STREAMING) && !defined(GPAC_DISABLE_SENG)
47
48
49
50
51 typedef struct
52 {
53 GF_RTPStreamer *rtp;
54 Bool manual_rtcp;
55 u16 ESID;
56
57 u8 *carousel_data;
58 u32 carousel_size, carousel_alloc;
59 u32 last_carousel_time;
60 u64 carousel_ts, time_at_carousel_store;
61
62 u32 timescale, init_time;
63 u32 carousel_period, ts_delta;
64 u16 aggregate_on_stream;
65 Bool adjust_carousel_time, discard, aggregate, rap, m2ts_vers_inc;
66 u32 critical;
67 } RTPChannel;
68
69 typedef struct
70 {
71 GF_SceneEngine *seng;
72 Bool force_carousel, carousel_generation;
73 GF_List *streams;
74 u32 start_time;
75 Bool critical;
76 } LiveSession;
77
78
next_carousel(LiveSession * sess,u32 * timeout)79 RTPChannel *next_carousel(LiveSession *sess, u32 *timeout)
80 {
81 RTPChannel *to_send = NULL;
82 u32 i, time, count, now;
83
84 if (!sess->start_time) sess->start_time = gf_sys_clock();
85 now = gf_sys_clock() - sess->start_time;
86
87 time = (u32) -1;
88 count = gf_list_count(sess->streams);
89 for (i=0; i<count; i++) {
90 RTPChannel *ch = (RTPChannel*)gf_list_get(sess->streams, i);
91 if (!ch->carousel_period) continue;
92 if (!ch->carousel_size) continue;
93
94 if (!ch->last_carousel_time) ch->last_carousel_time = now;
95
96 if (ch->last_carousel_time + ch->carousel_period < time) {
97 to_send = ch;
98 time = ch->last_carousel_time + ch->carousel_period;
99 }
100 }
101 if (!to_send) {
102 if (timeout) *timeout = 0;
103 return NULL;
104 }
105 if (timeout) {
106 if (time>now) time-=now;
107 else time=0;
108 *timeout = time;
109 }
110 return to_send;
111 }
112
113
live_session_callback(void * calling_object,u16 ESID,u8 * data,u32 size,u64 ts)114 static void live_session_callback(void *calling_object, u16 ESID, u8 *data, u32 size, u64 ts)
115 {
116 LiveSession *livesess = (LiveSession *) calling_object;
117 RTPChannel *rtpch;
118 u32 i=0;
119
120 while ( (rtpch = (RTPChannel*)gf_list_enum(livesess->streams, &i))) {
121 if (rtpch->ESID == ESID) {
122
123 /*store carousel data*/
124 if (livesess->carousel_generation && rtpch->carousel_period) {
125 if (rtpch->carousel_alloc < size) {
126 rtpch->carousel_data = gf_realloc(rtpch->carousel_data, size);
127 rtpch->carousel_alloc = size;
128 }
129 memcpy(rtpch->carousel_data, data, size);
130 rtpch->carousel_size = size;
131 rtpch->carousel_ts = ts;
132 rtpch->time_at_carousel_store = gf_sys_clock();
133 fprintf(stderr, "\nStream %d: Storing new carousel TS "LLD", %d bytes\n", ESID, ts, size);
134 }
135 /*send data*/
136 else {
137 u32 critical = 0;
138 Bool rap = rtpch->rap;
139 if (livesess->carousel_generation) rap = 1;
140 ts += rtpch->timescale*((u64)gf_sys_clock()-rtpch->init_time + rtpch->ts_delta)/1000;
141 if (rtpch->critical) critical = rtpch->critical;
142 else if (livesess->critical) critical = 1;
143
144 gf_rtp_streamer_send_au_with_sn(rtpch->rtp, data, size, ts, ts, rap, critical);
145
146 fprintf(stderr, "Stream %d: Sending update at TS "LLD", %d bytes - RAP %d - critical %d\n", ESID, ts, size, rap, critical);
147 rtpch->rap = rtpch->critical = 0;
148
149 if (rtpch->manual_rtcp) gf_rtp_streamer_send_rtcp(rtpch->rtp, 0, 0, 0, 0, 0);
150 }
151 return;
152 }
153 }
154 }
155
live_session_send_carousel(LiveSession * livesess,RTPChannel * ch)156 static void live_session_send_carousel(LiveSession *livesess, RTPChannel *ch)
157 {
158 u32 now = gf_sys_clock();
159 u64 ts;
160 if (ch) {
161 if (ch->carousel_size) {
162 ts = ch->carousel_ts + ch->timescale * ( (ch->adjust_carousel_time ? (u64)gf_sys_clock() : ch->time_at_carousel_store) - ch->init_time + ch->ts_delta)/1000;
163
164 gf_rtp_streamer_send_au_with_sn(ch->rtp, ch->carousel_data, ch->carousel_size, ts, ts, 1, 0);
165 ch->last_carousel_time = now - livesess->start_time;
166 fprintf(stderr, "Stream %d: Sending carousel at TS "LLD", %d bytes\n", ch->ESID, ts, ch->carousel_size);
167
168 if (ch->manual_rtcp) {
169 ts = ch->carousel_ts + ch->timescale * ( gf_sys_clock() - ch->init_time + ch->ts_delta)/1000;
170 gf_rtp_streamer_send_rtcp(ch->rtp, 1, (u32) ts, 0, 0, 0);
171 }
172 }
173 } else {
174 u32 i=0;
175 while (NULL != (ch = gf_list_enum(livesess->streams, &i))) {
176 if (ch->carousel_size) {
177 if (ch->adjust_carousel_time) {
178 ts = ch->carousel_ts + ch->timescale*(gf_sys_clock()-ch->init_time + ch->ts_delta)/1000;
179 } else {
180 ts = ch->carousel_ts;
181 }
182 gf_rtp_streamer_send_au_with_sn(ch->rtp, ch->carousel_data, ch->carousel_size, ts, ts, 1, 0);
183 ch->last_carousel_time = now - livesess->start_time;
184 fprintf(stderr, "Stream %d: Sending carousel at TS "LLD" , %d bytes\n", ch->ESID, ts, ch->carousel_size);
185
186 if (ch->manual_rtcp) {
187 ts = ch->carousel_ts + ch->timescale*(gf_sys_clock()-ch->init_time + ch->ts_delta)/1000;
188 gf_rtp_streamer_send_rtcp(ch->rtp, 1, (u32) ts, 0, 0, 0);
189 }
190 }
191 }
192 }
193 }
194
live_session_setup(LiveSession * livesess,char * ip,u16 port,u32 path_mtu,u32 ttl,char * ifce_addr,char * sdp_name)195 static Bool live_session_setup(LiveSession *livesess, char *ip, u16 port, u32 path_mtu, u32 ttl, char *ifce_addr, char *sdp_name)
196 {
197 RTPChannel *rtpch;
198 u32 count = gf_seng_get_stream_count(livesess->seng);
199 u32 i;
200 char *iod64 = gf_seng_get_base64_iod(livesess->seng);
201 char *sdp = gf_rtp_streamer_format_sdp_header("GPACSceneStreamer", ip, NULL, iod64);
202 if (iod64) gf_free(iod64);
203
204 #ifdef GPAC_ENABLE_COVERAGE
205 if (gf_sys_is_cov_mode()) {
206 GF_Descriptor *desc = gf_seng_get_iod(livesess->seng);
207 if (desc) gf_odf_desc_del(desc);
208 }
209 #endif
210 for (i=0; i<count; i++) {
211 u16 ESID;
212 u32 st, oti, ts;
213 const u8 *config = NULL;
214 u32 config_len;
215 gf_seng_get_stream_config(livesess->seng, i, &ESID, &config, &config_len, &st, &oti, &ts);
216
217 GF_SAFEALLOC(rtpch, RTPChannel);
218 if (!rtpch) {
219 GF_LOG(GF_LOG_ERROR, GF_LOG_APP, ("Cannot allocate rtp input handler\n"));
220 continue;
221 }
222 rtpch->timescale = ts;
223 rtpch->init_time = gf_sys_clock();
224
225 switch (st) {
226 case GF_STREAM_OD:
227 case GF_STREAM_SCENE:
228 rtpch->rtp = gf_rtp_streamer_new(st, oti, ts, ip, port, path_mtu, ttl, ifce_addr,
229 GP_RTP_PCK_SYSTEMS_CAROUSEL, config, config_len,
230 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, GF_FALSE);
231
232 if (rtpch->rtp) {
233 gf_rtp_streamer_disable_auto_rtcp(rtpch->rtp);
234 rtpch->manual_rtcp = 1;
235 }
236 break;
237 default:
238 rtpch->rtp = gf_rtp_streamer_new(st, oti, ts, ip, port, path_mtu, ttl, ifce_addr, GP_RTP_PCK_SIGNAL_RAP, config, config_len, 96, 0, 0, GF_FALSE, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, GF_FALSE);
239 break;
240 }
241 rtpch->ESID = ESID;
242 rtpch->adjust_carousel_time = 1;
243 gf_list_add(livesess->streams, rtpch);
244
245 if (!rtpch->rtp)
246 return GF_FALSE;
247
248 gf_rtp_streamer_append_sdp(rtpch->rtp, ESID, config, config_len, NULL, &sdp);
249
250 /*fetch initial config of the broadcast*/
251 gf_seng_get_stream_carousel_info(livesess->seng, ESID, &rtpch->carousel_period, &rtpch->aggregate_on_stream);
252 port += 2;
253 }
254 if (sdp) {
255 FILE *out = gf_fopen(sdp_name, "wt");
256 fprintf(out, "%s", sdp);
257 gf_fclose(out);
258 gf_free(sdp);
259 }
260 return GF_TRUE;
261 }
262
live_session_shutdown(LiveSession * livesess)263 void live_session_shutdown(LiveSession *livesess)
264 {
265 gf_seng_terminate(livesess->seng);
266
267 if (livesess->streams) {
268 while (gf_list_count(livesess->streams)) {
269 RTPChannel *rtpch = gf_list_get(livesess->streams, 0);
270 gf_list_rem(livesess->streams, 0);
271 gf_rtp_streamer_del(rtpch->rtp);
272 if (rtpch->carousel_data) gf_free(rtpch->carousel_data);
273 gf_free(rtpch);
274 }
275 gf_list_del(livesess->streams);
276 }
277 }
278
279
set_broadcast_params(LiveSession * livesess,u16 esid,u32 period,u32 ts_delta,u16 aggregate_on_stream,Bool adjust_carousel_time,Bool force_rap,Bool aggregate_au,Bool discard_pending,Bool signal_rap,u32 signal_critical,Bool version_inc)280 static RTPChannel *set_broadcast_params(LiveSession *livesess, u16 esid, u32 period, u32 ts_delta, u16 aggregate_on_stream, Bool adjust_carousel_time, Bool force_rap, Bool aggregate_au, Bool discard_pending, Bool signal_rap, u32 signal_critical, Bool version_inc)
281 {
282 RTPChannel *rtpch = NULL;
283
284 /*locate our stream*/
285 if (esid) {
286 u32 i=0;
287 while ( (rtpch = gf_list_enum(livesess->streams, &i))) {
288 if (rtpch->ESID == esid) break;
289 }
290 } else {
291 rtpch = gf_list_get(livesess->streams, 0);
292 }
293
294 /*TODO - set/reset the ESID for the parsers*/
295 if (!rtpch) return NULL;
296
297 /*TODO - if discard is set, abort current carousel*/
298 if (discard_pending) {
299 }
300
301 /*remember RAP flag*/
302 rtpch->rap = signal_rap;
303 rtpch->critical = signal_critical;
304 rtpch->m2ts_vers_inc = version_inc;
305
306 rtpch->ts_delta = ts_delta;
307 rtpch->aggregate = aggregate_au;
308 rtpch->adjust_carousel_time = adjust_carousel_time;
309
310 /*change stream aggregation mode*/
311 if ((aggregate_on_stream != (u16)-1) && (rtpch->aggregate_on_stream != aggregate_on_stream)) {
312 gf_seng_enable_aggregation(livesess->seng, esid, aggregate_on_stream);
313 rtpch->aggregate_on_stream = aggregate_on_stream;
314 }
315 /*change stream aggregation mode*/
316 if ((period!=(u32)-1) && (rtpch->carousel_period != period)) {
317 rtpch->carousel_period = period;
318 rtpch->last_carousel_time = 0;
319 }
320
321 if (force_rap) {
322 livesess->force_carousel = 1;
323 }
324 return rtpch;
325 }
326
live_session(int argc,char ** argv)327 int live_session(int argc, char **argv)
328 {
329 GF_Err e;
330 u32 i;
331 char *filename = NULL;
332 char *dst = NULL;
333 const char *ifce_addr = NULL;
334 char *sdp_name = "session.sdp";
335 u16 dst_port = 7000;
336 u32 load_type=0;
337 u32 check;
338 u32 ttl = 1;
339 u32 path_mtu = 1450;
340 s32 next_time;
341 u64 last_src_modif, mod_time, runfor=0, start_time;
342 char *src_name = NULL;
343 Bool run, has_carousel, no_rap;
344 Bool udp = 0;
345 u16 sk_port=0;
346 GF_Socket *sk = NULL;
347 LiveSession livesess;
348 RTPChannel *ch;
349 char *update_buffer = NULL;
350 u32 update_buffer_size = 0;
351 u16 aggregate_on_stream;
352 Bool adjust_carousel_time, force_rap, aggregate_au, discard_pending, signal_rap, version_inc;
353 Bool update_context;
354 u32 period, ts_delta, signal_critical;
355 u16 es_id;
356 e = GF_OK;
357 aggregate_au = 1;
358 es_id = 0;
359 no_rap = 0;
360 gf_sys_init(GF_MemTrackerNone, NULL);
361
362 memset(&livesess, 0, sizeof(LiveSession));
363
364 gf_log_set_tool_level(GF_LOG_ALL, GF_LOG_INFO);
365
366 gf_sys_set_args(argc, (const char **) argv);
367
368
369 for (i=1; i<(u32) argc; i++) {
370 char *arg = argv[i];
371 if (arg[0] != '-') filename = arg;
372 else if (!strnicmp(arg, "-dst=", 5)) dst = arg+5;
373 else if (!strnicmp(arg, "-port=", 6)) dst_port = atoi(arg+6);
374 else if (!strnicmp(arg, "-sdp=", 5)) sdp_name = arg+5;
375 else if (!strnicmp(arg, "-mtu=", 5)) path_mtu = atoi(arg+5);
376 else if (!strnicmp(arg, "-ttl=", 5)) ttl = atoi(arg+5);
377 else if (!strnicmp(arg, "-no-rap", 7)) no_rap = 1;
378 else if (!strnicmp(arg, "-dims", 5)) load_type = GF_SM_LOAD_DIMS;
379 else if (!strnicmp(arg, "-src=", 5)) src_name = arg+5;
380 else if (!strnicmp(arg, "-udp=", 5)) {
381 sk_port = atoi(arg+5);
382 udp = 1;
383 }
384 else if (!strnicmp(arg, "-tcp=", 5)) {
385 sk_port = atoi(arg+5);
386 udp = 0;
387 }
388 else if (!stricmp(arg, "-run-for")) {
389 runfor = 1 + 1000 * atoi(argv[i+1]);
390 i++;
391 }
392 }
393 if (!filename) {
394 fprintf(stderr, "Missing filename\n");
395 PrintLiveUsage();
396 return 1;
397 }
398 ifce_addr = gf_opts_get_key("core", "ifce");
399
400 if (dst_port && dst) livesess.streams = gf_list_new();
401
402 livesess.seng = gf_seng_init(&livesess, filename, load_type, NULL, (load_type == GF_SM_LOAD_DIMS) ? 1 : 0);
403 if (!livesess.seng) {
404 fprintf(stderr, "Cannot create scene engine\n");
405 return 1;
406 }
407 if (livesess.streams) {
408 Bool res = live_session_setup(&livesess, dst, dst_port, path_mtu, ttl, (char *) ifce_addr, sdp_name);
409 if (!res) {
410 live_session_shutdown(&livesess);
411 if (update_buffer) gf_free(update_buffer);
412 if (sk) gf_sk_del(sk);
413 gf_sys_close();
414 return e ? 1 : 0;
415 }
416 }
417
418 has_carousel = 0;
419 last_src_modif = src_name ? gf_file_modification_time(src_name) : 0;
420
421 if (sk_port) {
422 sk = gf_sk_new(udp ? GF_SOCK_TYPE_UDP : GF_SOCK_TYPE_TCP);
423 if (udp) {
424 e = gf_sk_bind(sk, NULL, sk_port, NULL, 0, 0);
425 if (e != GF_OK) {
426 if (sk) gf_sk_del(sk);
427 sk = NULL;
428 }
429 } else {
430 }
431 }
432
433
434 for (i=0; i<(u32) argc; i++) {
435 char *arg = argv[i];
436 if (!strnicmp(arg, "-rap=", 5)) {
437 u32 id, j;
438 period = id = 0;
439 if (strchr(arg, ':')) {
440 sscanf(arg, "-rap=ESID=%u:%u", &id, &period);
441 e = gf_seng_enable_aggregation(livesess.seng, id, 1);
442 if (e) {
443 fprintf(stderr, "Cannot enable aggregation on stream %u: %s\n", id, gf_error_to_string(e));
444 goto exit;
445 }
446 } else {
447 sscanf(arg, "-rap=%u", &period);
448 }
449
450 j=0;
451 while (NULL != (ch = gf_list_enum(livesess.streams, &j))) {
452 if (!id || (ch->ESID==id))
453 ch->carousel_period = period;
454 }
455 has_carousel = 1;
456 }
457 }
458
459 i=0;
460 while (NULL != (ch = gf_list_enum(livesess.streams, &i))) {
461 if (ch->carousel_period) {
462 has_carousel = 1;
463 break;
464 }
465 }
466
467 update_context = 0;
468
469 if (has_carousel || !no_rap) {
470 livesess.carousel_generation = 1;
471 gf_seng_encode_context(livesess.seng, live_session_callback);
472 livesess.carousel_generation = 0;
473 }
474
475 live_session_send_carousel(&livesess, NULL);
476
477
478 #ifdef GPAC_ENABLE_COVERAGE
479 if (gf_sys_is_cov_mode()) {
480 aggregate_on_stream = (u16) -1;
481 adjust_carousel_time = force_rap = discard_pending = signal_rap = signal_critical = 0;
482 aggregate_au = version_inc = 1;
483 period = -1;
484 ts_delta = 0;
485 es_id = 0;
486
487 set_broadcast_params(&livesess, es_id, period, ts_delta, aggregate_on_stream, adjust_carousel_time, force_rap, aggregate_au, discard_pending, signal_rap, signal_critical, version_inc);
488 }
489 #endif
490
491 start_time = gf_sys_clock_high_res();
492 check = 10;
493 run = 1;
494 while (run) {
495 check--;
496 if (!check) {
497 check = 10;
498 if (gf_prompt_has_input()) {
499 char c = gf_prompt_get_char();
500 switch (c) {
501 case 'q':
502 run=0;
503 break;
504 case 'U':
505 case 'u':
506 {
507 char szCom[8192];
508 fprintf(stderr, "Enter command to send:\n");
509 szCom[0] = 0;
510 if (1 > scanf("%8191[^\t\n]", szCom)) {
511 fprintf(stderr, "No command entered properly, aborting.\n");
512 break;
513 }
514 /*stdin flush bug*/
515 while (getchar()!='\n') {}
516 e = gf_seng_encode_from_string(livesess.seng, 0, 0, szCom, live_session_callback);
517 if (e) fprintf(stderr, "Processing command failed: %s\n", gf_error_to_string(e));
518 e = gf_seng_aggregate_context(livesess.seng, 0);
519 if (e) fprintf(stderr, "Aggregating context failed: %s\n", gf_error_to_string(e));
520 livesess.critical = (c=='U') ? 1 : 0;
521 update_context = 1;
522 }
523 break;
524 case 'E':
525 case 'e':
526 {
527 char szCom[8192];
528 fprintf(stderr, "Enter command to send:\n");
529 szCom[0] = 0;
530 if (1 > scanf("%8191[^\t\n]", szCom)) {
531 printf("No command entered properly, aborting.\n");
532 break;
533 }
534 /*stdin flush bug*/
535 while (getchar()!='\n') {}
536 e = gf_seng_encode_from_string(livesess.seng, 0, 1, szCom, live_session_callback);
537 if (e) fprintf(stderr, "Processing command failed: %s\n", gf_error_to_string(e));
538 livesess.critical = (c=='E') ? 1 : 0;
539 e = gf_seng_aggregate_context(livesess.seng, 0);
540 if (e) fprintf(stderr, "Aggregating context failed: %s\n", gf_error_to_string(e));
541
542 }
543 break;
544
545 case 'p':
546 {
547 char rad[GF_MAX_PATH];
548 fprintf(stderr, "Enter output file name - \"std\" for stderr: ");
549 if (1 > scanf("%1023s", rad)) {
550 fprintf(stderr, "No ouput file name entered, aborting.\n");
551 break;
552 }
553 e = gf_seng_save_context(livesess.seng, !strcmp(rad, "std") ? NULL : rad);
554 fprintf(stderr, "Dump done (%s)\n", gf_error_to_string(e));
555 }
556 break;
557 case 'F':
558 update_context = 1;
559 case 'f':
560 livesess.force_carousel = 1;
561 break;
562 }
563 e = GF_OK;
564 }
565 }
566
567 /*process updates from file source*/
568 if (src_name) {
569 mod_time = gf_file_modification_time(src_name);
570 if (mod_time != last_src_modif) {
571 FILE *srcf;
572 char flag_buf[201], *flag;
573 fprintf(stderr, "Update file modified - processing\n");
574 last_src_modif = mod_time;
575
576 srcf = gf_fopen(src_name, "rt");
577 if (!srcf) continue;
578
579 /*checks if we have a broadcast config*/
580 if (!gf_fgets(flag_buf, 200, srcf))
581 flag_buf[0] = '\0';
582 gf_fclose(srcf);
583
584 aggregate_on_stream = (u16) -1;
585 adjust_carousel_time = force_rap = discard_pending = signal_rap = signal_critical = 0;
586 aggregate_au = version_inc = 1;
587 period = -1;
588 ts_delta = 0;
589 es_id = 0;
590
591 /*find our keyword*/
592 flag = strstr(flag_buf, "gpac_broadcast_config ");
593 if (flag) {
594 flag += strlen("gpac_broadcast_config ");
595 /*move to next word*/
596 while (flag[0]==' ') flag++;
597
598 while (1) {
599 char *sep = strchr(flag, ' ');
600 if (sep) sep[0] = 0;
601 if (!strnicmp(flag, "esid=", 5)) es_id = atoi(flag+5);
602 else if (!strnicmp(flag, "period=", 7)) period = atoi(flag+7);
603 else if (!strnicmp(flag, "ts=", 3)) ts_delta = atoi(flag+3);
604 else if (!strnicmp(flag, "carousel=", 9)) aggregate_on_stream = atoi(flag+9);
605 else if (!strnicmp(flag, "restamp=", 8)) adjust_carousel_time = atoi(flag+8);
606
607 else if (!strnicmp(flag, "discard=", 8)) discard_pending = atoi(flag+8);
608 else if (!strnicmp(flag, "aggregate=", 10)) aggregate_au = atoi(flag+10);
609 else if (!strnicmp(flag, "force_rap=", 10)) force_rap = atoi(flag+10);
610 else if (!strnicmp(flag, "rap=", 4)) signal_rap = atoi(flag+4);
611 else if (!strnicmp(flag, "critical=", 9)) signal_critical = atoi(flag+9);
612 else if (!strnicmp(flag, "vers_inc=", 9)) version_inc = atoi(flag+9);
613 if (sep) {
614 sep[0] = ' ';
615 flag = sep+1;
616 } else {
617 break;
618 }
619 }
620
621 set_broadcast_params(&livesess, es_id, period, ts_delta, aggregate_on_stream, adjust_carousel_time, force_rap, aggregate_au, discard_pending, signal_rap, signal_critical, version_inc);
622 }
623
624 e = gf_seng_encode_from_file(livesess.seng, es_id, aggregate_au ? 0 : 1, src_name, live_session_callback);
625 if (e) fprintf(stderr, "Processing command failed: %s\n", gf_error_to_string(e));
626 e = gf_seng_aggregate_context(livesess.seng, 0);
627
628 update_context = no_rap ? 0 : 1;
629 }
630 }
631
632 /*process updates from socket source*/
633 if (sk) {
634 u8 buffer[2049];
635 u32 bytes_read;
636 u32 update_length;
637 u32 bytes_received;
638
639
640 e = gf_sk_receive(sk, buffer, 2048, &bytes_read);
641 if (e == GF_OK) {
642 u32 hdr_length = 0;
643 u8 cmd_type = buffer[0];
644 bytes_received = 0;
645 switch (cmd_type) {
646 case 0:
647 {
648 GF_BitStream *bs = gf_bs_new(buffer, bytes_read, GF_BITSTREAM_READ);
649 gf_bs_read_u8(bs);
650 es_id = gf_bs_read_u16(bs);
651 aggregate_on_stream = gf_bs_read_u16(bs);
652 if (aggregate_on_stream==0xFFFF) aggregate_on_stream = -1;
653 adjust_carousel_time = gf_bs_read_int(bs, 1);
654 force_rap = gf_bs_read_int(bs, 1);
655 aggregate_au = gf_bs_read_int(bs, 1);
656 discard_pending = gf_bs_read_int(bs, 1);
657 signal_rap = gf_bs_read_int(bs, 1);
658 signal_critical = gf_bs_read_int(bs, 1);
659 version_inc = gf_bs_read_int(bs, 1);
660 gf_bs_read_int(bs, 1);
661 period = gf_bs_read_u16(bs);
662 if (period==0xFFFF) period = -1;
663 ts_delta = gf_bs_read_u16(bs);
664 update_length = gf_bs_read_u32(bs);
665 hdr_length = 12;
666 gf_bs_del(bs);
667 }
668
669 set_broadcast_params(&livesess, es_id, period, ts_delta, aggregate_on_stream, adjust_carousel_time, force_rap, aggregate_au, discard_pending, signal_rap, signal_critical, version_inc);
670 break;
671 default:
672 update_length = 0;
673 break;
674 }
675
676 if (update_buffer_size <= update_length) {
677 update_buffer = gf_realloc(update_buffer, update_length+1);
678 update_buffer_size = update_length+1;
679 }
680 if (update_length && (bytes_read>hdr_length) ) {
681 memcpy(update_buffer, buffer+hdr_length, bytes_read-hdr_length);
682 bytes_received = bytes_read-hdr_length;
683 }
684 while (bytes_received<update_length) {
685 e = gf_sk_receive(sk, buffer, 2048, &bytes_read);
686 switch (e) {
687 case GF_IP_NETWORK_EMPTY:
688 break;
689 case GF_OK:
690 memcpy(update_buffer+bytes_received, buffer, bytes_read);
691 bytes_received += bytes_read;
692 break;
693 default:
694 fprintf(stderr, "Error with UDP socket : %s\n", gf_error_to_string(e));
695 break;
696 }
697 }
698 update_buffer[update_length] = 0;
699
700 if (update_length) {
701 e = gf_seng_encode_from_string(livesess.seng, es_id, aggregate_au ? 0 : 1, update_buffer, live_session_callback);
702 if (e) fprintf(stderr, "Processing command failed: %s\n", gf_error_to_string(e));
703 e = gf_seng_aggregate_context(livesess.seng, 0);
704
705 update_context = 1;
706 }
707 }
708 }
709
710 if (update_context) {
711 livesess.carousel_generation=1;
712 e = gf_seng_encode_context(livesess.seng, live_session_callback );
713 livesess.carousel_generation=0;
714 update_context = 0;
715 }
716
717 if (livesess.force_carousel) {
718 live_session_send_carousel(&livesess, NULL);
719 livesess.force_carousel = 0;
720 continue;
721 }
722
723 if (runfor && (gf_sys_clock_high_res() > start_time+runfor))
724 break;
725
726
727 if (!has_carousel) {
728 gf_sleep(10);
729 continue;
730 }
731 ch = next_carousel(&livesess, (u32 *) &next_time);
732 if ((ch==NULL) || (next_time > 20)) {
733 gf_sleep(20);
734 continue;
735 }
736 if (next_time) gf_sleep(next_time);
737 live_session_send_carousel(&livesess, ch);
738 }
739
740 #ifdef GPAC_ENABLE_COVERAGE
741 if (gf_sys_is_cov_mode()) {
742 /* gf_seng_save_context(livesess.seng, NULL);
743 gf_seng_aggregate_context
744 gf_seng_encode_from_string
745 gf_seng_encode_from_file
746 */
747 }
748 #endif
749
750 exit:
751 live_session_shutdown(&livesess);
752 if (update_buffer) gf_free(update_buffer);
753 if (sk) gf_sk_del(sk);
754 gf_sys_close();
755 return e ? 1 : 0;
756 }
757
758
759 #endif /*!defined(GPAC_DISABLE_STREAMING) && !defined(GPAC_DISABLE_SENG)*/
760
761 #endif /*defined(GPAC_DISABLE_ISOM) || defined(GPAC_DISABLE_ISOM_WRITE)*/
762