1 /*
2 * GPAC - Multimedia Framework C SDK
3 *
4 * Authors: Jean Le Feuvre
5 * Copyright (c) Telecom ParisTech 2017-2020
6 * All rights reserved
7 *
8 * This file is part of GPAC / filters sub-project
9 *
10 * GPAC is free software; you can redistribute it and/or modify
11 * it under the terfsess of the GNU Lesser General Public License as published by
12 * the Free Software Foundation; either version 2, or (at your option)
13 * any later version.
14 *
15 * GPAC is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; see the file COPYING. If not, write to
22 * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 *
24 */
25
26 #include "filter_session.h"
27 #include <gpac/network.h>
28
29 #ifndef GPAC_DISABLE_3D
30 #include <gpac/modules/video_out.h>
31 #endif
32 //#define CHECK_TASK_LIST_INTEGRITY
33
34 struct _gf_ft_mgr *gf_font_manager_new();
35 void gf_font_manager_del(struct _gf_ft_mgr *fm);
36
37
gf_fs_sema_io(GF_FilterSession * fsess,Bool notify,Bool main)38 static GFINLINE void gf_fs_sema_io(GF_FilterSession *fsess, Bool notify, Bool main)
39 {
40 GF_Semaphore *sem = main ? fsess->semaphore_main : fsess->semaphore_other;
41 if (sem) {
42 if (notify) {
43 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler %s semaphore\n", gf_th_id(), main ? "main" : "secondary"));
44 if ( ! gf_sema_notify(sem, 1)) {
45 GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("Cannot notify scheduler of new task, semaphore failure\n"));
46 }
47 } else {
48 u32 nb_tasks;
49 //if not main and no tasks in main list, this could be the last task to process.
50 //if main thread is sleeping force a wake to take further actions (only the main thread decides the exit)
51 //this also ensures that tha main thread will process tasks from secondary task lists if no
52 //dedicated main thread tasks are present (eg no GL filters)
53 if (!main && fsess->in_main_sem_wait && !gf_fq_count(fsess->main_thread_tasks)) {
54 gf_fs_sema_io(fsess, GF_TRUE, GF_TRUE);
55 }
56 nb_tasks = 1;
57 //no active threads, count number of tasks. If no posted tasks we are likely at the end of the session, don't block, rather use a sem_wait
58 if (!fsess->active_threads)
59 nb_tasks = gf_fq_count(fsess->main_thread_tasks) + gf_fq_count(fsess->tasks);
60
61 //if main semaphore, keep track that we are going to sleep
62 if (main) {
63 fsess->in_main_sem_wait = GF_TRUE;
64 if (!nb_tasks) {
65 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("No tasks scheduled, waiting on main semaphore for at most 100 ms\n"));
66 if (gf_sema_wait_for(sem, 100)) {
67 }
68 } else {
69 if (gf_sema_wait(sem)) {
70 }
71 }
72 fsess->in_main_sem_wait = GF_FALSE;
73 } else {
74 if (!nb_tasks) {
75 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("No tasks scheduled, waiting on secondary semaphore for at most 100 ms\n"));
76 if (gf_sema_wait_for(sem, 100)) {
77 }
78 } else {
79 if (gf_sema_wait(sem)) {
80 }
81 }
82 }
83 }
84 }
85 }
86
gf_fs_add_filter_register(GF_FilterSession * fsess,const GF_FilterRegister * freg)87 void gf_fs_add_filter_register(GF_FilterSession *fsess, const GF_FilterRegister *freg)
88 {
89 if (!freg) return;
90
91 if (!freg->name) {
92 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Filter missing name - ignoring\n"));
93 return;
94 }
95 if (!freg->process) {
96 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Filter %s missing process function - ignoring\n", freg->name));
97 return;
98 }
99 if (fsess->blacklist) {
100 char *fname = strstr(fsess->blacklist, freg->name);
101 if (fname) {
102 u32 len = (u32) strlen(freg->name);
103 if (!fname[len] || (fname[len] == fsess->sep_list)) {
104 return;
105 }
106 }
107 }
108 gf_list_add(fsess->registry, (void *) freg);
109
110 if (fsess->init_done && fsess->links && gf_list_count( fsess->links)) {
111 gf_filter_sess_build_graph(fsess, freg);
112 }
113 }
114
115
116
fs_default_event_proc(void * ptr,GF_Event * evt)117 static Bool fs_default_event_proc(void *ptr, GF_Event *evt)
118 {
119 if (evt->type==GF_EVENT_QUIT) {
120 GF_FilterSession *fsess = (GF_FilterSession *)ptr;
121 gf_fs_abort(fsess, GF_FALSE);
122 }
123 return 0;
124 }
125
126 GF_EXPORT
gf_fs_new(s32 nb_threads,GF_FilterSchedulerType sched_type,u32 flags,const char * blacklist)127 GF_FilterSession *gf_fs_new(s32 nb_threads, GF_FilterSchedulerType sched_type, u32 flags, const char *blacklist)
128 {
129 const char *opt;
130 Bool gf_sys_has_filter_global_args();
131 Bool gf_sys_has_filter_global_meta_args();
132
133 u32 i, count;
134 GF_FilterSession *fsess, *a_sess;
135
136 //safety check: all built-in properties shall have unique 4CCs
137 if ( ! gf_props_4cc_check_props())
138 return NULL;
139
140 GF_SAFEALLOC(fsess, GF_FilterSession);
141 if (!fsess) {
142 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to alloc media session\n"));
143 return NULL;
144 }
145 fsess->flags = flags;
146
147 fsess->filters = gf_list_new();
148 fsess->main_th.fsess = fsess;
149
150 if ((s32) nb_threads == -1) {
151 GF_SystemRTInfo rti;
152 memset(&rti, 0, sizeof(GF_SystemRTInfo));
153 if (gf_sys_get_rti(0, &rti, 0)) {
154 nb_threads = rti.nb_cores-1;
155 }
156 if ((s32)nb_threads<0) {
157 GF_LOG(GF_LOG_WARNING, GF_LOG_FILTER, ("Failed to query number of cores, disabling extra threads for session\n"));
158 nb_threads=0;
159 }
160 }
161
162 if (sched_type==GF_FS_SCHEDULER_DIRECT) {
163 fsess->direct_mode = GF_TRUE;
164 nb_threads=0;
165 }
166 if (nb_threads && (sched_type != GF_FS_SCHEDULER_LOCK_FREE_X)) {
167 fsess->tasks_mx = gf_mx_new("TasksList");
168 }
169
170 //regardless of scheduler type, we don't use lock on the main task list
171 fsess->tasks = gf_fq_new(fsess->tasks_mx);
172
173 if (nb_threads>0) {
174 fsess->main_thread_tasks = gf_fq_new(fsess->tasks_mx);
175 fsess->filters_mx = gf_mx_new("Filters");
176 } else {
177 //otherwise use the same as the global task list
178 fsess->main_thread_tasks = fsess->tasks;
179 }
180
181 if (!(flags & GF_FS_FLAG_NO_RESERVOIR)) {
182 fsess->tasks_reservoir = gf_fq_new(fsess->tasks_mx);
183 }
184
185 if (nb_threads || (sched_type==GF_FS_SCHEDULER_LOCK_FORCE) ) {
186 fsess->semaphore_main = fsess->semaphore_other = gf_sema_new(GF_INT_MAX, 0);
187 if (nb_threads>0)
188 fsess->semaphore_other = gf_sema_new(GF_INT_MAX, 0);
189
190 //force testing of mutex queues
191 //fsess->use_locks = GF_TRUE;
192 }
193 fsess->ui_event_proc = fs_default_event_proc;
194 fsess->ui_opaque = fsess;
195
196 if (flags & GF_FS_FLAG_NO_MAIN_THREAD)
197 fsess->no_main_thread = GF_TRUE;
198
199 if (!fsess->semaphore_main)
200 nb_threads=0;
201
202 if (nb_threads) {
203 fsess->threads = gf_list_new();
204 if (!fsess->threads) {
205 gf_sema_del(fsess->semaphore_main);
206 fsess->semaphore_main=NULL;
207 gf_sema_del(fsess->semaphore_other);
208 fsess->semaphore_other=NULL;
209 nb_threads=0;
210 }
211 fsess->use_locks = (sched_type==GF_FS_SCHEDULER_LOCK) ? GF_TRUE : GF_FALSE;
212 } else {
213 #ifdef GPAC_MEMORY_TRACKING
214 extern int gf_mem_track_enabled;
215 fsess->check_allocs = gf_mem_track_enabled;
216 #endif
217
218 }
219
220 if (fsess->use_locks)
221 fsess->props_mx = gf_mx_new("FilterSessionProps");
222
223 if (!(flags & GF_FS_FLAG_NO_RESERVOIR)) {
224 #if GF_PROPS_HASHTABLE_SIZE
225 fsess->prop_maps_list_reservoir = gf_fq_new(fsess->props_mx);
226 #endif
227 fsess->prop_maps_reservoir = gf_fq_new(fsess->props_mx);
228 fsess->prop_maps_entry_reservoir = gf_fq_new(fsess->props_mx);
229 fsess->prop_maps_entry_data_alloc_reservoir = gf_fq_new(fsess->props_mx);
230 //we also use the props mutex for the this one
231 fsess->pcks_refprops_reservoir = gf_fq_new(fsess->props_mx);
232 }
233
234
235 #ifndef GPAC_DISABLE_REMOTERY
236 sprintf(fsess->main_th.rmt_name, "FSThread0");
237 #endif
238
239 if (!fsess->filters || !fsess->tasks) {
240 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to alloc media session\n"));
241 fsess->run_status = GF_OUT_OF_MEM;
242 gf_fs_del(fsess);
243 return NULL;
244 }
245
246 if (nb_threads) {
247 fsess->info_mx = gf_mx_new("FilterSessionInfo");
248 fsess->ui_mx = gf_mx_new("FilterSessionUIProc");
249 }
250
251 for (i=0; i<(u32) nb_threads; i++) {
252 GF_SessionThread *sess_thread;
253 GF_SAFEALLOC(sess_thread, GF_SessionThread);
254 if (!sess_thread) continue;
255 #ifndef GPAC_DISABLE_REMOTERY
256 sprintf(sess_thread->rmt_name, "FSThread%d", i+1);
257 #endif
258 sess_thread->th = gf_th_new("MediaSessionThread");
259 if (!sess_thread->th) {
260 gf_free(sess_thread);
261 continue;
262 }
263 sess_thread->fsess = fsess;
264 gf_list_add(fsess->threads, sess_thread);
265 }
266
267 gf_fs_set_separators(fsess, NULL);
268
269 fsess->registry = gf_list_new();
270 fsess->blacklist = blacklist;
271 a_sess = (flags & GF_FS_FLAG_LOAD_META) ? fsess : NULL;
272 gf_fs_reg_all(fsess, a_sess);
273
274 //load external modules
275 count = gf_modules_count();
276 for (i=0; i<count; i++) {
277 GF_FilterRegister *freg = (GF_FilterRegister *) gf_modules_load_filter(i, a_sess);
278 if (freg) {
279 freg->flags |= 0x80000000;
280 gf_fs_add_filter_register(fsess, freg);
281 }
282 }
283 fsess->blacklist = NULL;
284
285 //todo - find a way to handle events without mutex ...
286 fsess->evt_mx = gf_mx_new("Event mutex");
287
288 fsess->blocking_mode = GF_FS_BLOCK_ALL;
289 opt = gf_opts_get_key("core", "no-block");
290 if (opt) {
291 if (!strcmp(opt, "fanout")) {
292 fsess->blocking_mode = GF_FS_NOBLOCK_FANOUT;
293 }
294 else if (!strcmp(opt, "all")) {
295 fsess->blocking_mode = GF_FS_NOBLOCK;
296 }
297 }
298 fsess->run_status = GF_EOS;
299 fsess->nb_threads_stopped = 1+nb_threads;
300 fsess->default_pid_buffer_max_us = 1000;
301 fsess->decoder_pid_buffer_max_us = 1000000;
302 fsess->default_pid_buffer_max_units = 1;
303 fsess->max_resolve_chain_len = 6;
304 fsess->auto_inc_nums = gf_list_new();
305
306 if (nb_threads)
307 fsess->links_mx = gf_mx_new("FilterRegistryGraph");
308 fsess->links = gf_list_new();
309
310 #ifndef GPAC_DISABLE_3D
311 fsess->gl_providers = gf_list_new();
312 #endif
313
314 if (! (fsess->flags & GF_FS_FLAG_NO_GRAPH_CACHE))
315 gf_filter_sess_build_graph(fsess, NULL);
316
317 fsess->init_done = GF_TRUE;
318
319
320 if (gf_sys_has_filter_global_args() || gf_sys_has_filter_global_meta_args()) {
321 u32 nb_args = gf_sys_get_argc();
322 for (i=0; i<nb_args; i++) {
323 char *arg = (char *)gf_sys_get_arg(i);
324 if (arg[0]!='-') continue;
325 if ((arg[1]!='-') && (arg[1]!='+')) continue;
326 char *sep = strchr(arg, '=');
327 if (sep) sep[0] = 0;
328 gf_fs_push_arg(fsess, arg+2, GF_FALSE, (arg[1]!='-') ? 2 : 1);
329 if (sep) sep[0] = '=';
330 }
331 }
332
333 return fsess;
334 }
335
gf_fs_push_arg(GF_FilterSession * session,const char * szArg,Bool was_found,u32 type)336 void gf_fs_push_arg(GF_FilterSession *session, const char *szArg, Bool was_found, u32 type)
337 {
338 if (session->flags & GF_FS_FLAG_NO_ARG_CHECK)
339 return;
340
341 if (!was_found) {
342 Bool afound = GF_FALSE;
343 u32 k, acount;
344 if (!session->parsed_args) session->parsed_args = gf_list_new();
345 acount = gf_list_count(session->parsed_args);
346 for (k=0; k<acount; k++) {
347 GF_FSArgItem *ai = gf_list_get(session->parsed_args, k);
348 if (!strcmp(ai->argname, szArg)) {
349 afound = GF_TRUE;
350 if ((ai->type==2) && (type==2))
351 ai->found = GF_FALSE;
352 break;
353 }
354 }
355 if (!afound) {
356 GF_FSArgItem *ai;
357 GF_SAFEALLOC(ai, GF_FSArgItem);
358 if (ai) {
359 ai->argname = gf_strdup(szArg);
360 ai->type = type;
361 gf_list_add(session->parsed_args, ai );
362 }
363 }
364 } else {
365 u32 k, acount;
366 Bool found = GF_FALSE;
367 if (!session->parsed_args) session->parsed_args = gf_list_new();
368 acount = gf_list_count(session->parsed_args);
369 for (k=0; k<acount; k++) {
370 GF_FSArgItem *ai = gf_list_get(session->parsed_args, k);
371 if (!strcmp(ai->argname, szArg)) {
372 ai->found = GF_TRUE;
373 found = GF_TRUE;
374 break;
375 }
376 }
377 if (!found) {
378 GF_FSArgItem *ai;
379 GF_SAFEALLOC(ai, GF_FSArgItem);
380 if (ai) {
381 ai->argname = gf_strdup(szArg);
382 ai->type = type;
383 ai->found = GF_TRUE;
384 gf_list_add(session->parsed_args, ai );
385 }
386 }
387 }
388 }
389
390
391 GF_EXPORT
gf_fs_new_defaults(u32 inflags)392 GF_FilterSession *gf_fs_new_defaults(u32 inflags)
393 {
394 GF_FilterSession *fsess;
395 GF_FilterSchedulerType sched_type = GF_FS_SCHEDULER_LOCK_FREE;
396 u32 flags = 0;
397 s32 nb_threads = gf_opts_get_int("core", "threads");
398 const char *blacklist = gf_opts_get_key("core", "blacklist");
399 const char *opt = gf_opts_get_key("core", "sched");
400
401 if (!opt) sched_type = GF_FS_SCHEDULER_LOCK_FREE;
402 else if (!strcmp(opt, "lock")) sched_type = GF_FS_SCHEDULER_LOCK;
403 else if (!strcmp(opt, "flock")) sched_type = GF_FS_SCHEDULER_LOCK_FORCE;
404 else if (!strcmp(opt, "direct")) sched_type = GF_FS_SCHEDULER_DIRECT;
405 else if (!strcmp(opt, "free")) sched_type = GF_FS_SCHEDULER_LOCK_FREE;
406 else if (!strcmp(opt, "freex")) sched_type = GF_FS_SCHEDULER_LOCK_FREE_X;
407 else {
408 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Unrecognized scheduler type %s\n", opt));
409 return NULL;
410 }
411 if (inflags & GF_FS_FLAG_LOAD_META)
412 flags |= GF_FS_FLAG_LOAD_META;
413
414 if (inflags & GF_FS_FLAG_NO_MAIN_THREAD)
415 flags |= GF_FS_FLAG_NO_MAIN_THREAD;
416
417 if (inflags & GF_FS_FLAG_NO_GRAPH_CACHE)
418 flags |= GF_FS_FLAG_NO_GRAPH_CACHE;
419
420 if (gf_opts_get_bool("core", "dbg-edges"))
421 flags |= GF_FS_FLAG_PRINT_CONNECTIONS;
422
423 if (gf_opts_get_bool("core", "full-link"))
424 flags |= GF_FS_FLAG_FULL_LINK;
425
426 if (gf_opts_get_bool("core", "no-reg"))
427 flags |= GF_FS_FLAG_NO_REGULATION;
428
429 if (gf_opts_get_bool("core", "no-reassign"))
430 flags |= GF_FS_FLAG_NO_REASSIGN;
431
432 if (gf_opts_get_bool("core", "no-graph-cache"))
433 flags |= GF_FS_FLAG_NO_GRAPH_CACHE;
434
435 if (gf_opts_get_bool("core", "no-probe"))
436 flags |= GF_FS_FLAG_NO_PROBE;
437
438 if (gf_opts_get_bool("core", "no-argchk"))
439 flags |= GF_FS_FLAG_NO_ARG_CHECK;
440
441 if (gf_opts_get_bool("core", "no-reservoir"))
442 flags |= GF_FS_FLAG_NO_RESERVOIR;
443
444
445 fsess = gf_fs_new(nb_threads, sched_type, flags, blacklist);
446 if (!fsess) return NULL;
447
448 gf_fs_set_max_resolution_chain_length(fsess, gf_opts_get_int("core", "max-chain") );
449
450 gf_fs_set_max_sleep_time(fsess, gf_opts_get_int("core", "max-sleep") );
451
452 opt = gf_opts_get_key("core", "seps");
453 if (opt)
454 gf_fs_set_separators(fsess, opt);
455
456 return fsess;
457 }
458
459
460 GF_EXPORT
gf_fs_set_separators(GF_FilterSession * session,const char * separator_set)461 GF_Err gf_fs_set_separators(GF_FilterSession *session, const char *separator_set)
462 {
463 if (!session) return GF_BAD_PARAM;
464 if (separator_set && (strlen(separator_set)<5)) return GF_BAD_PARAM;
465
466 if (separator_set) {
467 session->sep_args = separator_set[0];
468 session->sep_name = separator_set[1];
469 session->sep_frag = separator_set[2];
470 session->sep_list = separator_set[3];
471 session->sep_neg = separator_set[4];
472 } else {
473 session->sep_args = ':';
474 session->sep_name = '=';
475 session->sep_frag = '#';
476 session->sep_list = ',';
477 session->sep_neg = '!';
478 }
479 return GF_OK;
480 }
481
482 GF_EXPORT
gf_fs_set_max_resolution_chain_length(GF_FilterSession * session,u32 max_chain_length)483 GF_Err gf_fs_set_max_resolution_chain_length(GF_FilterSession *session, u32 max_chain_length)
484 {
485 if (!session) return GF_BAD_PARAM;
486 session->max_resolve_chain_len = max_chain_length;
487 return GF_OK;
488 }
489
490 GF_EXPORT
gf_fs_set_max_sleep_time(GF_FilterSession * session,u32 max_sleep)491 GF_Err gf_fs_set_max_sleep_time(GF_FilterSession *session, u32 max_sleep)
492 {
493 if (!session) return GF_BAD_PARAM;
494 session->max_sleep = max_sleep;
495 return GF_OK;
496 }
497
498 GF_EXPORT
gf_fs_get_max_resolution_chain_length(GF_FilterSession * session)499 u32 gf_fs_get_max_resolution_chain_length(GF_FilterSession *session)
500 {
501 if (!session) return 0;
502 return session->max_resolve_chain_len;
503 }
504
gf_fs_remove_filter_register(GF_FilterSession * session,GF_FilterRegister * freg)505 void gf_fs_remove_filter_register(GF_FilterSession *session, GF_FilterRegister *freg)
506 {
507 gf_list_del_item(session->registry, freg);
508 gf_filter_sess_reset_graph(session, freg);
509 }
510
511 GF_EXPORT
gf_fs_set_ui_callback(GF_FilterSession * fs,Bool (* ui_event_proc)(void * opaque,GF_Event * event),void * cbk_udta)512 void gf_fs_set_ui_callback(GF_FilterSession *fs, Bool (*ui_event_proc)(void *opaque, GF_Event *event), void *cbk_udta)
513 {
514 if (fs) {
515 fs->ui_event_proc = ui_event_proc;
516 fs->ui_opaque = cbk_udta;
517 if (!fs->ui_event_proc) {
518 fs->ui_event_proc = fs_default_event_proc;
519 fs->ui_opaque = fs;
520 }
521 }
522 }
523
gf_propalloc_del(void * it)524 void gf_propalloc_del(void *it)
525 {
526 GF_PropertyEntry *pe = (GF_PropertyEntry *)it;
527 if (pe->prop.value.data.ptr) gf_free(pe->prop.value.data.ptr);
528 gf_free(pe);
529 }
530
531
532 GF_EXPORT
gf_fs_enum_unmapped_options(GF_FilterSession * fsess,u32 * idx,char ** argname,u32 * argtype)533 Bool gf_fs_enum_unmapped_options(GF_FilterSession *fsess, u32 *idx, char **argname, u32 *argtype)
534 {
535 if (!fsess || !fsess->parsed_args) return GF_FALSE;
536 u32 i, count = gf_list_count(fsess->parsed_args);
537
538 for (i=*idx; i<count; i++) {
539 GF_FSArgItem *ai = gf_list_get(fsess->parsed_args, i);
540 (*idx)++;
541 if (ai->found) continue;
542 if (argname) *argname = ai->argname;
543 if (argtype) *argtype = ai->type;
544 return GF_TRUE;
545 }
546 return GF_FALSE;
547 }
548
549
550 GF_EXPORT
gf_fs_del(GF_FilterSession * fsess)551 void gf_fs_del(GF_FilterSession *fsess)
552 {
553 assert(fsess);
554
555 gf_fs_stop(fsess);
556 GF_LOG(GF_LOG_DEBUG, GF_LOG_FILTER, ("Session destroy begin\n"));
557
558 if (fsess->parsed_args) {
559 while (gf_list_count(fsess->parsed_args)) {
560 GF_FSArgItem *ai = gf_list_pop_back(fsess->parsed_args);
561 gf_free(ai->argname);
562 gf_free(ai);
563 }
564 gf_list_del(fsess->parsed_args);
565 }
566
567 //temporary until we don't introduce fsess_stop
568 assert(fsess->run_status != GF_OK);
569 if (fsess->filters) {
570 u32 i, count=gf_list_count(fsess->filters);
571 //first pass: disconnect all filters, since some may have references to property maps or packets
572 for (i=0; i<count; i++) {
573 u32 j;
574 GF_Filter *filter = gf_list_get(fsess->filters, i);
575 filter->process_th_id = 0;
576 filter->scheduled_for_next_task = GF_TRUE;
577
578 if (filter->detached_pid_inst) {
579 while (gf_list_count(filter->detached_pid_inst)) {
580 GF_FilterPidInst *pidi = gf_list_pop_front(filter->detached_pid_inst);
581 gf_filter_pid_inst_del(pidi);
582 }
583 gf_list_del(filter->detached_pid_inst);
584 filter->detached_pid_inst = NULL;
585 }
586
587 if (filter->postponed_packets) {
588 while (gf_list_count(filter->postponed_packets)) {
589 GF_FilterPacket *pck = gf_list_pop_front(filter->postponed_packets);
590 gf_filter_packet_destroy(pck);
591 }
592 gf_list_del(filter->postponed_packets);
593 filter->postponed_packets = NULL;
594 }
595 for (j=0; j<filter->num_input_pids; j++) {
596 GF_FilterPidInst *pidi = gf_list_get(filter->input_pids, j);
597 gf_filter_pid_inst_reset(pidi);
598 }
599 filter->scheduled_for_next_task = GF_FALSE;
600 }
601 //second pass, finalize all
602 for (i=0; i<count; i++) {
603 GF_Filter *filter = gf_list_get(fsess->filters, i);
604 if (filter->freg->finalize && !filter->finalized) {
605 filter->finalized = GF_TRUE;
606 FSESS_CHECK_THREAD(filter)
607 filter->freg->finalize(filter);
608 }
609 }
610
611 while (gf_list_count(fsess->filters)) {
612 GF_Filter *filter = gf_list_pop_back(fsess->filters);
613
614 gf_filter_del(filter);
615 }
616 gf_list_del(fsess->filters);
617 fsess->filters = NULL;
618 }
619
620 if (fsess->download_manager) gf_dm_del(fsess->download_manager);
621 if (fsess->font_manager) gf_font_manager_del(fsess->font_manager);
622
623 if (fsess->registry) {
624 while (gf_list_count(fsess->registry)) {
625 GF_FilterRegister *freg = gf_list_pop_back(fsess->registry);
626 if (freg->register_free) freg->register_free(fsess, freg);
627 }
628 gf_list_del(fsess->registry);
629 }
630
631 if (fsess->tasks)
632 gf_fq_del(fsess->tasks, gf_void_del);
633
634 if (fsess->tasks_reservoir)
635 gf_fq_del(fsess->tasks_reservoir, gf_void_del);
636
637 if (fsess->threads) {
638 if (fsess->main_thread_tasks)
639 gf_fq_del(fsess->main_thread_tasks, gf_void_del);
640
641 while (gf_list_count(fsess->threads)) {
642 GF_SessionThread *sess_th = gf_list_pop_back(fsess->threads);
643 gf_th_del(sess_th->th);
644 gf_free(sess_th);
645 }
646 gf_list_del(fsess->threads);
647 }
648
649 if (fsess->prop_maps_reservoir)
650 gf_fq_del(fsess->prop_maps_reservoir, gf_propmap_del);
651 #if GF_PROPS_HASHTABLE_SIZE
652 if (fsess->prop_maps_list_reservoir)
653 gf_fq_del(fsess->prop_maps_list_reservoir, (gf_destruct_fun) gf_list_del);
654 #endif
655 if (fsess->prop_maps_entry_reservoir)
656 gf_fq_del(fsess->prop_maps_entry_reservoir, gf_void_del);
657 if (fsess->prop_maps_entry_data_alloc_reservoir)
658 gf_fq_del(fsess->prop_maps_entry_data_alloc_reservoir, gf_propalloc_del);
659 if (fsess->pcks_refprops_reservoir)
660 gf_fq_del(fsess->pcks_refprops_reservoir, gf_void_del);
661
662
663 if (fsess->props_mx)
664 gf_mx_del(fsess->props_mx);
665
666 if (fsess->info_mx)
667 gf_mx_del(fsess->info_mx);
668
669 if (fsess->ui_mx)
670 gf_mx_del(fsess->ui_mx);
671
672 if (fsess->semaphore_other && (fsess->semaphore_other != fsess->semaphore_main) )
673 gf_sema_del(fsess->semaphore_other);
674
675 if (fsess->semaphore_main)
676 gf_sema_del(fsess->semaphore_main);
677
678 if (fsess->tasks_mx)
679 gf_mx_del(fsess->tasks_mx);
680
681 if (fsess->filters_mx)
682 gf_mx_del(fsess->filters_mx);
683
684 if (fsess->evt_mx) gf_mx_del(fsess->evt_mx);
685 if (fsess->event_listeners) gf_list_del(fsess->event_listeners);
686
687 if (fsess->links) {
688 gf_filter_sess_reset_graph(fsess, NULL);
689 gf_list_del(fsess->links);
690 }
691 if (fsess->links_mx) gf_mx_del(fsess->links_mx);
692
693 #ifndef GPAC_DISABLE_3D
694 gf_list_del(fsess->gl_providers);
695 if (fsess->gl_driver) {
696 fsess->gl_driver->Shutdown(fsess->gl_driver);
697 gf_modules_close_interface((GF_BaseInterface *)fsess->gl_driver);
698 }
699 #endif
700
701 if (fsess->auto_inc_nums) {
702 while(gf_list_count(fsess->auto_inc_nums)) {
703 GF_FSAutoIncNum *aint = gf_list_pop_back(fsess->auto_inc_nums);
704 gf_free(aint);
705 }
706 gf_list_del(fsess->auto_inc_nums);
707 }
708
709 gf_free(fsess);
710 GF_LOG(GF_LOG_DEBUG, GF_LOG_FILTER, ("Session destroyed\n"));
711 }
712
713 GF_EXPORT
gf_fs_filters_registers_count(GF_FilterSession * fsess)714 u32 gf_fs_filters_registers_count(GF_FilterSession *fsess)
715 {
716 return fsess ? gf_list_count(fsess->registry) : 0;
717 }
718
719 GF_EXPORT
gf_fs_get_filter_register(GF_FilterSession * fsess,u32 idx)720 const GF_FilterRegister * gf_fs_get_filter_register(GF_FilterSession *fsess, u32 idx)
721 {
722 return gf_list_get(fsess->registry, idx);
723 }
724
725 #ifdef CHECK_TASK_LIST_INTEGRITY
check_task_list_enum(void * udta,void * item)726 static Bool check_task_list_enum(void *udta, void *item)
727 {
728 assert(udta != item);
729 return GF_FALSE;
730 }
check_task_list(GF_FilterQueue * fq,GF_FSTask * task)731 static void check_task_list(GF_FilterQueue *fq, GF_FSTask *task)
732 {
733 if (fq) {
734 gf_fq_enum(fq, check_task_list_enum, task);
735 }
736 }
737 #endif
738
739
gf_fs_post_task_ex(GF_FilterSession * fsess,gf_fs_task_callback task_fun,GF_Filter * filter,GF_FilterPid * pid,const char * log_name,void * udta,Bool is_configure,Bool force_direct_call)740 void gf_fs_post_task_ex(GF_FilterSession *fsess, gf_fs_task_callback task_fun, GF_Filter *filter, GF_FilterPid *pid, const char *log_name, void *udta, Bool is_configure, Bool force_direct_call)
741 {
742 GF_FSTask *task;
743 Bool force_main_thread = GF_FALSE;
744 Bool notified = GF_FALSE;
745
746 assert(fsess);
747 assert(task_fun);
748
749 //only flatten calls if in main thread (we still have some broken filters using threading
750 //that could trigger tasks
751 if ((force_direct_call || fsess->direct_mode)
752 && (!filter || !filter->in_process)
753 && fsess->tasks_in_process
754 && (gf_th_id()==fsess->main_th.th_id)
755 ) {
756 GF_FSTask atask;
757 u64 task_time = gf_sys_clock_high_res();
758 memset(&atask, 0, sizeof(GF_FSTask));
759 atask.filter = filter;
760 atask.pid = pid;
761 atask.run_task = task_fun;
762 atask.log_name = log_name;
763 atask.udta = udta;
764 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread 0 task#%d %p executing Filter %s::%s (%d tasks pending)\n", fsess->main_th.nb_tasks, &atask, filter ? filter->name : "none", log_name, fsess->tasks_pending));
765 if (filter)
766 filter->scheduled_for_next_task = GF_TRUE;
767 task_fun(&atask);
768 filter = atask.filter;
769 if (filter) {
770 filter->time_process += gf_sys_clock_high_res() - task_time;
771 filter->scheduled_for_next_task = GF_FALSE;
772 filter->nb_tasks_done++;
773 }
774 if (!atask.requeue_request)
775 return;
776 //asked to requeue the task, post it
777 }
778 task = gf_fq_pop(fsess->tasks_reservoir);
779
780 if (!task) {
781 GF_SAFEALLOC(task, GF_FSTask);
782 if (!task) {
783 GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("No more memory to post new task\n"));
784 return;
785 }
786 }
787 task->filter = filter;
788 task->pid = pid;
789 task->run_task = task_fun;
790 task->log_name = log_name;
791 task->udta = udta;
792
793 if (filter && is_configure) {
794 if (filter->freg->flags & GF_FS_REG_CONFIGURE_MAIN_THREAD)
795 force_main_thread = GF_TRUE;
796 }
797
798 if (filter) {
799 gf_mx_p(filter->tasks_mx);
800 //no tasks and not scheduled
801 if (! filter->scheduled_for_next_task && !gf_fq_count(filter->tasks)) {
802 notified = task->notified = GF_TRUE;
803
804 if (!force_main_thread)
805 force_main_thread = (filter->main_thread_forced || (filter->freg->flags & GF_FS_REG_MAIN_THREAD)) ? GF_TRUE : GF_FALSE;
806 } else if (force_main_thread) {
807 force_main_thread = GF_FALSE;
808 if (filter->process_th_id && (fsess->main_th.th_id != filter->process_th_id)) {
809 GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("Cannot post task to main thread, filter is already scheduled\n"));
810 }
811 }
812 if (!force_main_thread)
813 task->blocking = (filter->freg->flags & GF_FS_REG_BLOCKING) ? GF_TRUE : GF_FALSE;
814
815 gf_fq_add(filter->tasks, task);
816 gf_mx_v(filter->tasks_mx);
817
818 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u Posted task %p Filter %s::%s (%d (%d) pending, %d process tasks) on %s task list\n", gf_th_id(), task, filter->name, task->log_name, fsess->tasks_pending, gf_fq_count(filter->tasks), filter->process_task_queued, task->notified ? (force_main_thread ? "main" : "secondary") : "filter"));
819 } else {
820 task->notified = notified = GF_TRUE;
821 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u Posted filter-less task %s (%d pending) on secondary task list\n", gf_th_id(), task->log_name, fsess->tasks_pending));
822 }
823
824 //WARNING, do not use task->notified since the task may have been posted to the filter task list and may already have been swapped
825 //with a different value !
826 if (notified) {
827 #ifdef CHECK_TASK_LIST_INTEGRITY
828 check_task_list(fsess->main_thread_tasks, task);
829 check_task_list(fsess->tasks, task);
830 check_task_list(fsess->tasks_reservoir, task);
831 #endif
832 assert(task->run_task);
833 if (filter) {
834 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u posting filter task, scheduled_for_next_task %d\n", gf_th_id(), filter->scheduled_for_next_task));
835 assert(!filter->scheduled_for_next_task);
836 }
837
838 //notify/count tasks posted on the main task or regular task lists
839 safe_int_inc(&fsess->tasks_pending);
840 if (filter && force_main_thread) {
841 gf_fq_add(fsess->main_thread_tasks, task);
842 gf_fs_sema_io(fsess, GF_TRUE, GF_TRUE);
843 } else {
844 assert(task->run_task);
845 gf_fq_add(fsess->tasks, task);
846 gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
847 }
848 }
849 }
850
gf_fs_post_task(GF_FilterSession * fsess,gf_fs_task_callback task_fun,GF_Filter * filter,GF_FilterPid * pid,const char * log_name,void * udta)851 void gf_fs_post_task(GF_FilterSession *fsess, gf_fs_task_callback task_fun, GF_Filter *filter, GF_FilterPid *pid, const char *log_name, void *udta)
852 {
853 gf_fs_post_task_ex(fsess, task_fun, filter, pid, log_name, udta, GF_FALSE, GF_FALSE);
854 }
855
856 GF_EXPORT
gf_fs_check_filter_register_cap(const GF_FilterRegister * f_reg,u32 incode,GF_PropertyValue * cap_input,u32 outcode,GF_PropertyValue * cap_output,Bool exact_match_only)857 Bool gf_fs_check_filter_register_cap(const GF_FilterRegister *f_reg, u32 incode, GF_PropertyValue *cap_input, u32 outcode, GF_PropertyValue *cap_output, Bool exact_match_only)
858 {
859 u32 j;
860 u32 has_raw_in = 0;
861 u32 has_cid_match = 0;
862 u32 exclude_cid_out = 0;
863 u32 has_exclude_cid_out = 0;
864 for (j=0; j<f_reg->nb_caps; j++) {
865 const GF_FilterCapability *cap = &f_reg->caps[j];
866 if (!(cap->flags & GF_CAPFLAG_IN_BUNDLE)) {
867 //CID not excluded, raw in present and CID explicit match or not included in excluded set
868 if (!exclude_cid_out && has_raw_in && (has_cid_match || (!exact_match_only && has_exclude_cid_out) ) ) {
869 return GF_TRUE;
870 }
871
872 if (has_raw_in != 2) has_raw_in = 0;
873 if (has_cid_match != 2) has_cid_match = 0;
874 if (exclude_cid_out != 2) exclude_cid_out = 0;
875 if (has_exclude_cid_out != 2) has_exclude_cid_out = 0;
876
877 continue;
878 }
879
880 if ( (cap->flags & GF_CAPFLAG_INPUT) && (cap->code == incode) ) {
881 if (! (cap->flags & GF_CAPFLAG_EXCLUDED) && gf_props_equal(&cap->val, cap_input) ) {
882 has_raw_in = (cap->flags & GF_CAPS_INPUT_STATIC) ? 2 : 1;
883 }
884 }
885 if ( (cap->flags & GF_CAPFLAG_OUTPUT) && (cap->code == outcode) ) {
886 if (! (cap->flags & GF_CAPFLAG_EXCLUDED)) {
887 if (gf_props_equal(&cap->val, cap_output) ) {
888 has_cid_match = (cap->flags & GF_CAPS_OUTPUT_STATIC) ? 2 : 1;
889 }
890 } else {
891 if (gf_props_equal(&cap->val, cap_output) ) {
892 exclude_cid_out = (cap->flags & GF_CAPS_OUTPUT_STATIC) ? 2 : 1;
893 } else {
894 has_exclude_cid_out = (cap->flags & GF_CAPS_OUTPUT_STATIC) ? 2 : 1;
895 }
896 }
897 }
898 }
899 //CID not excluded, raw in present and CID explicit match or not included in excluded set
900 if (!exclude_cid_out && has_raw_in && (has_cid_match || (!exact_match_only && has_exclude_cid_out) ) ) {
901 return GF_TRUE;
902 }
903 return GF_FALSE;
904 }
gf_fs_load_encoder(GF_FilterSession * fsess,const char * args)905 static GF_Filter *gf_fs_load_encoder(GF_FilterSession *fsess, const char *args)
906 {
907 GF_Err e;
908 char szCodec[3];
909 char *cid, *sep;
910 const GF_FilterRegister *candidate = NULL;
911 u32 codecid=0;
912 GF_Filter *filter;
913 u32 i, count;
914 GF_PropertyValue cap_in, cap_out;
915 szCodec[0] = 'c';
916 szCodec[1] = fsess->sep_name;
917 szCodec[2] = 0;
918
919 cid = args ? strstr(args, szCodec) : NULL;
920 if (!cid) {
921 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Missing codec identifier in \"enc\" definition: %s\n", args ? args : "no arguments"));
922 return NULL;
923 }
924 sep = strchr(cid, fsess->sep_args);
925 if (sep) sep[0] = 0;
926
927 codecid = gf_codec_parse(cid+2);
928 if (codecid==GF_CODECID_NONE) {
929 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Unrecognized codec identifier in \"enc\" definition: %s\n", cid));
930 if (sep) sep[0] = fsess->sep_args;
931 return NULL;
932 }
933 if (sep) sep[0] = fsess->sep_args;
934
935 cap_in.type = GF_PROP_UINT;
936 cap_in.value.uint = GF_CODECID_RAW;
937 cap_out.type = GF_PROP_UINT;
938 cap_out.value.uint = codecid;
939
940 count = gf_list_count(fsess->registry);
941 for (i=0; i<count; i++) {
942 const GF_FilterRegister *f_reg = gf_list_get(fsess->registry, i);
943
944 if ( gf_fs_check_filter_register_cap(f_reg, GF_PROP_PID_CODECID, &cap_in, GF_PROP_PID_CODECID, &cap_out, GF_FALSE)) {
945 if (!candidate || (candidate->priority>f_reg->priority))
946 candidate = f_reg;
947 }
948 }
949 if (!candidate) {
950 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Cannot find any filter providing encoding for %s\n", cid));
951 return NULL;
952 }
953 filter = gf_filter_new(fsess, candidate, args, NULL, GF_FILTER_ARG_EXPLICIT, &e, NULL, GF_FALSE);
954 if (!filter) {
955 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to load filter %s: %s\n", candidate->name, gf_error_to_string(e) ));
956 } else {
957 filter->encoder_stream_type = gf_codecid_type(codecid);
958 }
959 return filter;
960 }
961
962 GF_EXPORT
gf_fs_filter_exists(GF_FilterSession * fsess,const char * name)963 Bool gf_fs_filter_exists(GF_FilterSession *fsess, const char *name)
964 {
965 u32 i, count;
966
967 if (!strcmp(name, "enc")) return GF_TRUE;
968
969 count = gf_list_count(fsess->registry);
970 for (i=0;i<count;i++) {
971 const GF_FilterRegister *f_reg = gf_list_get(fsess->registry, i);
972 if (!strcmp(f_reg->name, name)) {
973 return GF_TRUE;
974 }
975 }
976 return GF_FALSE;
977 }
978
979 GF_EXPORT
gf_fs_load_filter(GF_FilterSession * fsess,const char * name,GF_Err * err_code)980 GF_Filter *gf_fs_load_filter(GF_FilterSession *fsess, const char *name, GF_Err *err_code)
981 {
982 const char *args=NULL;
983 u32 i, len, count = gf_list_count(fsess->registry);
984 Bool quiet = (err_code && (*err_code == GF_EOS)) ? GF_TRUE : GF_FALSE;
985
986 char *sep;
987
988 assert(fsess);
989 assert(name);
990 if (err_code) *err_code = GF_OK;
991
992 sep = strchr(name, fsess->sep_args);
993 if (sep) {
994 args = sep+1;
995 len = (u32) (sep - name);
996 } else len = (u32) strlen(name);
997
998 if (!len) {
999 if (!quiet) {
1000 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Missing filter name in %s\n", name));
1001 }
1002 return NULL;
1003 }
1004
1005 if (!strncmp(name, "enc", len)) {
1006 return gf_fs_load_encoder(fsess, args);
1007 }
1008 /*regular filter loading*/
1009 for (i=0;i<count;i++) {
1010 const GF_FilterRegister *f_reg = gf_list_get(fsess->registry, i);
1011 if ((strlen(f_reg->name)==len) && !strncmp(f_reg->name, name, len)) {
1012 GF_Filter *filter;
1013 GF_FilterArgType argtype = GF_FILTER_ARG_EXPLICIT;
1014
1015 if ((f_reg->flags & GF_FS_REG_REQUIRES_RESOLVER) && !fsess->max_resolve_chain_len) {
1016 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Filter %s requires graph resolver but it is disabled\n", name));
1017 if (err_code) *err_code = GF_BAD_PARAM;
1018 return NULL;
1019 }
1020
1021 if (f_reg->flags & GF_FS_REG_ACT_AS_SOURCE) argtype = GF_FILTER_ARG_EXPLICIT_SOURCE;
1022 filter = gf_filter_new(fsess, f_reg, args, NULL, argtype, err_code, NULL, GF_FALSE);
1023 if (!filter) return NULL;
1024 if (!filter->num_output_pids) {
1025 const char *src_url = strstr(name, "src");
1026 if (src_url && (src_url[3]==fsess->sep_name))
1027 gf_filter_post_process_task(filter);
1028 }
1029 return filter;
1030 }
1031 }
1032 /*check JS file*/
1033 if (strstr(name, ".js") || strstr(name, ".jsf") || strstr(name, ".mjs") ) {
1034 char szPath[10+GF_MAX_PATH];
1035 if (len>GF_MAX_PATH)
1036 return NULL;
1037 strncpy(szPath, name, len);
1038 szPath[len]=0;
1039 if (gf_file_exists(szPath)) {
1040 sprintf(szPath, "jsf%cjs%c", fsess->sep_args, fsess->sep_name);
1041 strcat(szPath, name);
1042 return gf_fs_load_filter(fsess, szPath, err_code);
1043 }
1044 }
1045
1046 if (!quiet) {
1047 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to load filter %s: no such filter registry\n", name));
1048 }
1049 if (err_code) *err_code = GF_FILTER_NOT_FOUND;
1050 return NULL;
1051 }
1052
1053 //in mono thread mode, we cannot always sleep for the requested timeout in case there are more tasks to be processed
1054 //this defines the number of pending tasks above wich we limit sleep
1055 #define MONOTH_MIN_TASKS 2
1056 //this defines the sleep time for this case
1057 #define MONOTH_MIN_SLEEP 5
1058
gf_fs_thread_proc(GF_SessionThread * sess_thread)1059 static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
1060 {
1061 GF_FilterSession *fsess = sess_thread->fsess;
1062 u32 i, th_count = fsess->threads ? gf_list_count(fsess->threads) : 0;
1063 u32 thid = 1 + gf_list_find(fsess->threads, sess_thread);
1064 u64 enter_time = gf_sys_clock_high_res();
1065 Bool use_main_sema = thid ? GF_FALSE : GF_TRUE;
1066 #ifndef GPAC_DISABLE_LOG
1067 u32 sys_thid = gf_th_id();
1068 #endif
1069 u64 next_task_schedule_time = 0;
1070 Bool do_regulate = (fsess->flags & GF_FS_FLAG_NO_REGULATION) ? GF_FALSE : GF_TRUE;
1071 u32 consecutive_filter_tasks=0;
1072 Bool force_secondary_tasks = GF_FALSE;
1073 Bool skip_next_sema_wait = GF_FALSE;
1074
1075 GF_Filter *current_filter = NULL;
1076 sess_thread->th_id = gf_th_id();
1077
1078 #ifndef GPAC_DISABLE_REMOTERY
1079 sess_thread->rmt_tasks=40;
1080 gf_rmt_set_thread_name(sess_thread->rmt_name);
1081 #endif
1082
1083 gf_rmt_begin(fs_thread, 0);
1084
1085 safe_int_inc(&fsess->active_threads);
1086
1087 if (!thid && fsess->no_main_thread) {
1088 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc enter\n"));
1089 }
1090
1091 while (1) {
1092 Bool notified;
1093 Bool requeue = GF_FALSE;
1094 u64 active_start, task_time;
1095 GF_FSTask *task=NULL;
1096 #ifdef CHECK_TASK_LIST_INTEGRITY
1097 GF_Filter *prev_current_filter = NULL;
1098 Bool skip_filter_task_check = GF_FALSE;
1099 #endif
1100
1101 #ifndef GPAC_DISABLE_REMOTERY
1102 sess_thread->rmt_tasks--;
1103 if (!sess_thread->rmt_tasks) {
1104 gf_rmt_end();
1105 gf_rmt_begin(fs_thread, 0);
1106 sess_thread->rmt_tasks=40;
1107 }
1108 #endif
1109
1110 safe_int_dec(&fsess->active_threads);
1111
1112 if (!skip_next_sema_wait && (current_filter==NULL)) {
1113 gf_rmt_begin(sema_wait, GF_RMT_AGGREGATE);
1114 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u Waiting scheduler %s semaphore\n", sys_thid, use_main_sema ? "main" : "secondary"));
1115 //wait for something to be done
1116 gf_fs_sema_io(fsess, GF_FALSE, use_main_sema);
1117 consecutive_filter_tasks = 0;
1118 gf_rmt_end();
1119 }
1120 safe_int_inc(&fsess->active_threads);
1121 skip_next_sema_wait = GF_FALSE;
1122
1123 active_start = gf_sys_clock_high_res();
1124
1125 if (current_filter==NULL) {
1126 //main thread
1127 if (thid==0) {
1128 if (!force_secondary_tasks) {
1129 task = gf_fq_pop(fsess->main_thread_tasks);
1130 }
1131 if (!task) {
1132 task = gf_fq_pop(fsess->tasks);
1133 if (task && task->blocking) {
1134 gf_fq_add(fsess->tasks, task);
1135 task = NULL;
1136 gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
1137 }
1138 }
1139 force_secondary_tasks = GF_FALSE;
1140 } else {
1141 task = gf_fq_pop(fsess->tasks);
1142 }
1143 if (task) {
1144 assert( task->run_task );
1145 assert( task->notified );
1146 }
1147 } else {
1148 //keep task in filter tasks list until done
1149 task = gf_fq_head(current_filter->tasks);
1150 if (task) {
1151 assert( task->run_task );
1152 assert( ! task->notified );
1153 }
1154 }
1155
1156 if (!task) {
1157 u32 force_nb_notif = 0;
1158 next_task_schedule_time = 0;
1159 //no more task and EOS signal
1160 if (fsess->run_status != GF_OK)
1161 break;
1162
1163 if (!fsess->tasks_pending && fsess->main_th.has_seen_eot) {
1164 //check all threads
1165 Bool all_done = GF_TRUE;
1166
1167 for (i=0; i<th_count; i++) {
1168 GF_SessionThread *st = gf_list_get(fsess->threads, i);
1169 if (!st->has_seen_eot) {
1170 all_done = GF_FALSE;
1171 force_nb_notif++;
1172 }
1173 }
1174 if (all_done)
1175 break;
1176 }
1177 if (current_filter) {
1178 current_filter->scheduled_for_next_task = GF_FALSE;
1179 current_filter->process_th_id = 0;
1180 assert(current_filter->in_process);
1181 current_filter->in_process = GF_FALSE;
1182 }
1183 current_filter = NULL;
1184 sess_thread->active_time += gf_sys_clock_high_res() - active_start;
1185
1186
1187 //no pending tasks and first time main task queue is empty, flush to detect if we
1188 //are indeed done
1189 if (!fsess->tasks_pending && !fsess->tasks_in_process && !sess_thread->has_seen_eot && !gf_fq_count(fsess->tasks)) {
1190 //maybe last task, force a notify to check if we are truly done
1191 sess_thread->has_seen_eot = GF_TRUE;
1192 //not main thread and some tasks pending on main, notify only ourselves
1193 if (thid && gf_fq_count(fsess->main_thread_tasks)) {
1194 gf_fs_sema_io(fsess, GF_TRUE, use_main_sema);
1195 }
1196 //main thread exit probing, send a notify to main sema (for this thread), and N for the secondary one
1197 else {
1198 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler main semaphore\n", gf_th_id()));
1199 gf_sema_notify(fsess->semaphore_main, 1);
1200 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler secondary semaphore %d\n", gf_th_id(), th_count));
1201 gf_sema_notify(fsess->semaphore_other, th_count);
1202 }
1203 }
1204 //this thread and the main thread are done but we still have unfinished threads, re-notify everyone
1205 else if (!fsess->tasks_pending && fsess->main_th.has_seen_eot && force_nb_notif) {
1206 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler main semaphore\n", gf_th_id()));
1207 gf_sema_notify(fsess->semaphore_main, 1);
1208 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler secondary semaphore %d\n", gf_th_id(), th_count));
1209 gf_sema_notify(fsess->semaphore_other, th_count);
1210 }
1211
1212 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: no task available\n", sys_thid));
1213
1214 //no main thread, return
1215 if (!thid && fsess->no_main_thread) {
1216 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc exit\n"));
1217 return 0;
1218 }
1219
1220 if (do_regulate) {
1221 gf_sleep(0);
1222 }
1223 continue;
1224 }
1225 #ifdef CHECK_TASK_LIST_INTEGRITY
1226 check_task_list(fsess->main_thread_tasks, task);
1227 check_task_list(fsess->tasks, task);
1228 #endif
1229 if (current_filter) {
1230 assert(current_filter==task->filter);
1231 }
1232 current_filter = task->filter;
1233
1234 //this is a crude way of scheduling the next task, we should
1235 //1- have a way to make sure we will not repost after a time-consuming task
1236 //2- have a way to wait for the given amount of time rather than just do a sema_wait/notify in loop
1237 if (task->schedule_next_time) {
1238 s64 now = gf_sys_clock_high_res();
1239 s64 diff = task->schedule_next_time;
1240 diff -= now;
1241 diff /= 1000;
1242
1243
1244 if (diff > 0) {
1245 GF_FSTask *next;
1246 s64 tdiff = diff;
1247 s64 ndiff = 0;
1248
1249 //no filter, just reschedule the task
1250 if (!current_filter) {
1251 #ifndef GPAC_DISABLE_LOG
1252 const char *task_log_name = task->log_name;
1253 #endif
1254 next = gf_fq_head(fsess->tasks);
1255 next_task_schedule_time = task->schedule_next_time;
1256 assert(task->run_task);
1257 #ifdef CHECK_TASK_LIST_INTEGRITY
1258 check_task_list(fsess->main_thread_tasks, task);
1259 check_task_list(fsess->tasks, task);
1260 check_task_list(fsess->tasks_reservoir, task);
1261 #endif
1262 //tasks without filter are currently only posted to the secondary task list
1263 gf_fq_add(fsess->tasks, task);
1264 if (next) {
1265 if (next->schedule_next_time <= (u64) now) {
1266 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, next task time ready for execution\n", sys_thid, task_log_name));
1267
1268 skip_next_sema_wait = GF_TRUE;
1269 continue;
1270 }
1271 ndiff = next->schedule_next_time;
1272 ndiff -= now;
1273 ndiff /= 1000;
1274 if (ndiff<diff) {
1275 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s scheduled after next task %s:%s (in %d ms vs %d ms)\n", sys_thid, task_log_name, next->log_name, next->filter ? next->filter->name : "", (s32) diff, (s32) ndiff));
1276 diff = ndiff;
1277 }
1278 }
1279
1280 if (!do_regulate) {
1281 diff = 0;
1282 }
1283
1284 if (diff && do_regulate) {
1285 if (diff > fsess->max_sleep)
1286 diff = fsess->max_sleep;
1287 if (th_count==0) {
1288 if ( gf_fq_count(fsess->tasks) > MONOTH_MIN_TASKS)
1289 diff = MONOTH_MIN_SLEEP;
1290 }
1291 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, %s task scheduled after this task, sleeping for %d ms (task diff %d - next diff %d)\n", sys_thid, task_log_name, next ? "next" : "no", diff, tdiff, ndiff));
1292 gf_sleep((u32) diff);
1293 } else {
1294 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, next task scheduled after this task, rerun\n", sys_thid, task_log_name));
1295 }
1296 skip_next_sema_wait = GF_TRUE;
1297 continue;
1298 }
1299
1300 if (!task->filter->finalized) {
1301 #ifdef CHECK_TASK_LIST_INTEGRITY
1302 next = gf_fq_head(current_filter->tasks);
1303 assert(next == task);
1304 check_task_list(fsess->main_thread_tasks, task);
1305 check_task_list(fsess->tasks_reservoir, task);
1306 #endif
1307
1308 //next in filter should be handled before this task, move task at the end of the filter task
1309 next = gf_fq_get(current_filter->tasks, 1);
1310 if (next && next->schedule_next_time < task->schedule_next_time) {
1311 if (task->notified) {
1312 assert(fsess->tasks_pending);
1313 safe_int_dec(&fsess->tasks_pending);
1314 task->notified = GF_FALSE;
1315 }
1316 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s reposted to filter task until task exec time is reached (%d us)\n", sys_thid, current_filter->name, task->log_name, (s32) (task->schedule_next_time - next->schedule_next_time) ));
1317 //remove task
1318 gf_fq_pop(current_filter->tasks);
1319 //and queue it after the next one
1320 gf_fq_add(current_filter->tasks, task);
1321 //and continue with the same filter
1322 continue;
1323 }
1324 //little optim here: if this is the main thread and we have other tasks pending
1325 //check the timing of tasks in the secondary list. If a task is present with smaller time than
1326 //the head of the main task, force a temporary swap to the secondary task list
1327 if (!thid && task->notified && diff>10) {
1328 next = gf_fq_head(fsess->tasks);
1329 if (next && !next->blocking) {
1330 u64 next_time_main = task->schedule_next_time;
1331 u64 next_time_secondary = next->schedule_next_time;
1332 //if we have several threads, also check the next task on the main task list
1333 // (different from secondary tasks in multithread case)
1334 if (th_count) {
1335 GF_FSTask *next_main = gf_fq_head(fsess->main_thread_tasks);
1336 if (next_main && (next_time_main > next_main->schedule_next_time))
1337 next_time_main = next_main->schedule_next_time;
1338 }
1339
1340 if (next_time_secondary<next_time_main) {
1341 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: forcing secondary task list on main - current task schedule time "LLU" (diff to now %d) vs next time secondary "LLU" (%s::%s)\n", sys_thid, task->schedule_next_time, (s32) diff, next_time_secondary, next->filter->freg->name, next->log_name));
1342 diff = 0;
1343 force_secondary_tasks = GF_TRUE;
1344 }
1345 }
1346 }
1347
1348 //move task to main list
1349 if (!task->notified) {
1350 task->notified = GF_TRUE;
1351 safe_int_inc(&fsess->tasks_pending);
1352 }
1353
1354 sess_thread->active_time += gf_sys_clock_high_res() - active_start;
1355
1356 if (next_task_schedule_time && (next_task_schedule_time <= task->schedule_next_time)) {
1357 tdiff = next_task_schedule_time;
1358 tdiff -= now;
1359 if (tdiff < 0) tdiff=0;
1360 if (tdiff<diff) {
1361 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: next task has earlier exec time than current task %s:%s, adjusting sleep (old %d - new %d)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, (s32) tdiff));
1362 diff = tdiff;
1363 } else {
1364 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: next task has earlier exec time#2 than current task %s:%s, adjusting sleep (old %d - new %d)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, (s32) tdiff));
1365
1366 }
1367 }
1368
1369 if (do_regulate && diff) {
1370 if (diff > fsess->max_sleep)
1371 diff = fsess->max_sleep;
1372 if (th_count==0) {
1373 if ( gf_fq_count(fsess->tasks) > MONOTH_MIN_TASKS)
1374 diff = MONOTH_MIN_SLEEP;
1375 }
1376 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s postponed for %d ms (scheduled time "LLU" us, next task schedule "LLU" us)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, task->schedule_next_time, next_task_schedule_time));
1377
1378 gf_sleep((u32) diff);
1379 active_start = gf_sys_clock_high_res();
1380 }
1381 diff = (s64)task->schedule_next_time;
1382 diff -= (s64) gf_sys_clock_high_res();
1383 if (diff > 100 ) {
1384 Bool use_main = (current_filter->freg->flags & GF_FS_REG_MAIN_THREAD) ? GF_TRUE : GF_FALSE;
1385 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: releasing current filter %s, exec time due in "LLD" us\n", sys_thid, current_filter->name, diff));
1386 current_filter->process_th_id = 0;
1387 current_filter->in_process = GF_FALSE;
1388 //don't touch the current filter tasks, just repost the task to the main/secondary list
1389 assert(gf_fq_count(current_filter->tasks));
1390 current_filter = NULL;
1391
1392 #ifdef CHECK_TASK_LIST_INTEGRITY
1393 check_task_list(fsess->main_thread_tasks, task);
1394 check_task_list(fsess->tasks, task);
1395 check_task_list(fsess->tasks_reservoir, task);
1396 assert(task->run_task);
1397 #endif
1398
1399 if (use_main) {
1400 gf_fq_add(fsess->main_thread_tasks, task);
1401 //we are the main thread and reposting to the main task list, don't notify/wait for the sema, just retry
1402 //we are sure to get a task from main list at next iteration
1403 if (use_main_sema) {
1404 skip_next_sema_wait = GF_TRUE;
1405 } else {
1406 gf_fs_sema_io(fsess, GF_TRUE, GF_TRUE);
1407 }
1408 } else {
1409 gf_fq_add(fsess->tasks, task);
1410 //we are not the main thread and we are reposting to the secondary task list, don't notify/wait for the sema, just retry
1411 //we are not sure to get a task from secondary list at next iteration, but the end of thread check will make
1412 //sure we renotify secondary sema if some tasks are still pending
1413 if (!use_main_sema) {
1414 skip_next_sema_wait = GF_TRUE;
1415 } else {
1416 gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
1417 }
1418 }
1419 //we temporary force the main thread to fetch a task from the secondary list
1420 //because the first main task was not yet due for execution
1421 //it is likely that the execution of the next task will not wake up the main thread
1422 //but we must reevaluate the previous main task timing, so we force a notification of the main sema
1423 if (force_secondary_tasks)
1424 gf_fs_sema_io(fsess, GF_TRUE, GF_TRUE);
1425
1426 continue;
1427 }
1428 force_secondary_tasks=GF_FALSE;
1429 }
1430 }
1431 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s schedule time "LLU" us reached (diff %d ms)\n", sys_thid, current_filter ? current_filter->name : "", task->log_name, task->schedule_next_time, (s32) diff));
1432
1433 }
1434 next_task_schedule_time = 0;
1435
1436 if (current_filter) {
1437 current_filter->scheduled_for_next_task = GF_TRUE;
1438 assert(!current_filter->in_process);
1439 current_filter->in_process = GF_TRUE;
1440 current_filter->process_th_id = gf_th_id();
1441 }
1442
1443 sess_thread->nb_tasks++;
1444 sess_thread->has_seen_eot = GF_FALSE;
1445 if (task->filter) {
1446 assert(gf_fq_count(task->filter->tasks));
1447 }
1448
1449 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u task#%d %p executing Filter %s::%s (%d tasks pending, %d(%d) process task queued)\n", sys_thid, sess_thread->nb_tasks, task, task->filter ? task->filter->name : "none", task->log_name, fsess->tasks_pending, task->filter ? task->filter->process_task_queued : 0, task->filter ? gf_fq_count(task->filter->tasks) : 0));
1450
1451 safe_int_inc(& fsess->tasks_in_process );
1452 assert( task->run_task );
1453 task_time = gf_sys_clock_high_res();
1454
1455 task->can_swap = GF_FALSE;
1456 task->requeue_request = GF_FALSE;
1457 task->run_task(task);
1458 requeue = task->requeue_request;
1459
1460 task_time = gf_sys_clock_high_res() - task_time;
1461 safe_int_dec(& fsess->tasks_in_process );
1462
1463 //may now be NULL if task was a filter destruction task
1464 current_filter = task->filter;
1465
1466 #ifdef CHECK_TASK_LIST_INTEGRITY
1467 prev_current_filter = task->filter;
1468 #endif
1469
1470 //source task was current filter, pop the filter task list
1471 if (current_filter) {
1472 current_filter->nb_tasks_done++;
1473 current_filter->time_process += task_time;
1474 consecutive_filter_tasks++;
1475
1476 gf_mx_p(current_filter->tasks_mx);
1477 //if last task
1478 if ( (gf_fq_count(current_filter->tasks)==1)
1479 //if requeue request and stream reset pending (we must exit the filter task loop for the reset task to pe processed)
1480 || (requeue && current_filter->stream_reset_pending)
1481 //or requeue request and pid swap pending (we must exit the filter task loop for the swap task to pe processed)
1482 || (requeue && (current_filter->swap_pidinst_src || current_filter->swap_pidinst_dst) )
1483 //or requeue request and pid detach / cap negotiate pending
1484 || (requeue && (current_filter->out_pid_connection_pending || current_filter->detached_pid_inst || current_filter->caps_negociate) )
1485
1486 //or requeue request and we have been running on that filter for more than 10 times, abort
1487 || (requeue && (consecutive_filter_tasks>10))
1488 ) {
1489
1490 if (requeue) {
1491 //filter task can be pushed back the queue of tasks
1492 if (task->can_swap) {
1493 GF_FSTask *next_task;
1494
1495 //drop task from filter task list
1496 gf_fq_pop(current_filter->tasks);
1497
1498 next_task = gf_fq_head(current_filter->tasks);
1499 //if first task was notified, swap the flag
1500 if (next_task) {
1501 //see note in post_task_ex for caution about this !!
1502 next_task->notified = task->notified;
1503 task->notified = GF_FALSE;
1504 }
1505 //requeue task
1506 gf_fq_add(current_filter->tasks, task);
1507
1508 //ans swap task for later requeing
1509 if (next_task) task = next_task;
1510 }
1511 //otherwise (can't swap) keep task first in the list
1512
1513 //don't reset scheduled_for_next_task flag if requeued to make sure no other task posted from
1514 //another thread will post to main sched
1515
1516 #ifdef CHECK_TASK_LIST_INTEGRITY
1517 skip_filter_task_check = GF_TRUE;
1518 #endif
1519 } else {
1520 //no requeue, filter no longer scheduled and drop task
1521 current_filter->scheduled_for_next_task = GF_FALSE;
1522
1523 //drop task from filter task list
1524 gf_fq_pop(current_filter->tasks);
1525 }
1526
1527 current_filter->process_th_id = 0;
1528 current_filter->in_process = GF_FALSE;
1529
1530 //unlock once we modified in_process, otherwise this will make our assert fail
1531 gf_mx_v(current_filter->tasks_mx);
1532 #ifdef CHECK_TASK_LIST_INTEGRITY
1533 if (requeue && !skip_filter_task_check) check_task_list(current_filter->tasks, task);
1534 #endif
1535 current_filter = NULL;
1536 } else {
1537 //drop task from filter task list
1538 gf_fq_pop(current_filter->tasks);
1539
1540 //not requeued, no more tasks, deactivate filter
1541 if (!requeue && !gf_fq_count(current_filter->tasks)) {
1542 current_filter->process_th_id = 0;
1543 current_filter->in_process = GF_FALSE;
1544 current_filter->scheduled_for_next_task = GF_FALSE;
1545 gf_mx_v(current_filter->tasks_mx);
1546 current_filter = NULL;
1547 } else {
1548 #ifdef CHECK_TASK_LIST_INTEGRITY
1549 check_task_list(fsess->main_thread_tasks, task);
1550 check_task_list(fsess->tasks, task);
1551 check_task_list(fsess->tasks_reservoir, task);
1552 #endif
1553
1554 //requeue task in current filter
1555 if (requeue)
1556 gf_fq_add(current_filter->tasks, task);
1557
1558 gf_mx_v(current_filter->tasks_mx);
1559 }
1560 }
1561 }
1562 //do not touch the filter task list after this, it has to be mutex protected to ensure proper posting of tasks
1563
1564 notified = task->notified;
1565 if (requeue) {
1566 //if requeue on a filter active, use filter queue to avoid another thread grabing the task (we would have concurrent access to the filter)
1567 if (current_filter) {
1568 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u re-posted task Filter %s::%s in filter tasks (%d pending)\n", sys_thid, task->filter->name, task->log_name, fsess->tasks_pending));
1569 task->notified = GF_FALSE;
1570 //keep this thread running on the current filter no signaling of semaphore
1571 } else {
1572 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u re-posted task Filter %s::%s in %s tasks (%d pending)\n", sys_thid, task->filter ? task->filter->name : "none", task->log_name, (task->filter && (task->filter->freg->flags & GF_FS_REG_MAIN_THREAD)) ? "main" : "secondary", fsess->tasks_pending));
1573
1574 task->notified = GF_TRUE;
1575 safe_int_inc(&fsess->tasks_pending);
1576
1577 #ifdef CHECK_TASK_LIST_INTEGRITY
1578 check_task_list(fsess->main_thread_tasks, task);
1579 check_task_list(fsess->tasks, task);
1580 check_task_list(fsess->tasks_reservoir, task);
1581 if (prev_current_filter && !skip_filter_task_check) check_task_list(prev_current_filter->tasks, task);
1582 #endif
1583
1584 //main thread
1585 if (task->filter && (task->filter->freg->flags & GF_FS_REG_MAIN_THREAD)) {
1586 gf_fq_add(fsess->main_thread_tasks, task);
1587 } else {
1588 gf_fq_add(fsess->tasks, task);
1589 }
1590 gf_fs_sema_io(fsess, GF_TRUE, use_main_sema);
1591 }
1592 } else {
1593 #ifdef CHECK_TASK_LIST_INTEGRITY
1594 check_task_list(fsess->main_thread_tasks, task);
1595 check_task_list(fsess->tasks, task);
1596 check_task_list(fsess->tasks_reservoir, task);
1597 if (prev_current_filter)
1598 check_task_list(prev_current_filter->tasks, task);
1599
1600 {
1601 gf_mx_p(fsess->filters_mx);
1602 u32 k, c2 = gf_list_count(fsess->filters);
1603 for (k=0; k<c2; k++) {
1604 GF_Filter *af = gf_list_get(fsess->filters, k);
1605 check_task_list(af->tasks, task);
1606 }
1607 gf_mx_v(fsess->filters_mx);
1608 }
1609 #endif
1610 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u task#%d %p pushed to reservoir\n", sys_thid, sess_thread->nb_tasks, task));
1611
1612 if (fsess->tasks_reservoir) {
1613 memset(task, 0, sizeof(GF_FSTask));
1614 gf_fq_add(fsess->tasks_reservoir, task);
1615 } else {
1616 gf_free(task);
1617 }
1618 }
1619
1620 //decrement task counter
1621 if (notified) {
1622 assert(fsess->tasks_pending);
1623 safe_int_dec(&fsess->tasks_pending);
1624 }
1625 if (current_filter) {
1626 current_filter->process_th_id = 0;
1627 current_filter->in_process = GF_FALSE;
1628 }
1629 //not requeuing and first time we have an empty task queue, flush to detect if we are indeed done
1630 if (!current_filter && !fsess->tasks_pending && !sess_thread->has_seen_eot && !gf_fq_count(fsess->tasks)) {
1631 //if not the main thread, or if main thread and task list is empty, enter end of session probing mode
1632 if (thid || !gf_fq_count(fsess->main_thread_tasks) ) {
1633 //maybe last task, force a notify to check if we are truly done. We only tag "session done" for the non-main
1634 //threads, in order to enter the end-of session signaling above
1635 if (thid) sess_thread->has_seen_eot = GF_TRUE;
1636 gf_fs_sema_io(fsess, GF_TRUE, use_main_sema);
1637 }
1638 }
1639
1640 sess_thread->active_time += gf_sys_clock_high_res() - active_start;
1641
1642
1643 //no main thread, return
1644 if (!thid && fsess->no_main_thread && !current_filter && !fsess->pid_connect_tasks_pending) {
1645 gf_rmt_end();
1646 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc exit\n"));
1647 return 0;
1648 }
1649 }
1650
1651 gf_rmt_end();
1652
1653 //no main thread, return
1654 if (!thid && fsess->no_main_thread) {
1655 GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc exit\n"));
1656 return 0;
1657 }
1658 sess_thread->run_time = gf_sys_clock_high_res() - enter_time;
1659
1660 safe_int_inc(&fsess->nb_threads_stopped);
1661
1662 if (!fsess->run_status)
1663 fsess->run_status = GF_EOS;
1664
1665 // thread exit, notify the semaphores
1666 if (fsess->semaphore_main && ! gf_sema_notify(fsess->semaphore_main, 1)) {
1667 GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("Failed to notify main semaphore, might hang up !!\n"));
1668 }
1669 if (fsess->semaphore_other && ! gf_sema_notify(fsess->semaphore_other, th_count)) {
1670 GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("Failed to notify secondary semaphore, might hang up !!\n"));
1671 }
1672
1673 return 0;
1674 }
1675
1676
1677 GF_EXPORT
gf_fs_run(GF_FilterSession * fsess)1678 GF_Err gf_fs_run(GF_FilterSession *fsess)
1679 {
1680 u32 i, nb_threads;
1681 assert(fsess);
1682
1683 fsess->run_status = GF_OK;
1684 fsess->main_th.has_seen_eot = GF_FALSE;
1685 fsess->nb_threads_stopped = 0;
1686
1687 nb_threads = gf_list_count(fsess->threads);
1688 for (i=0;i<nb_threads; i++) {
1689 GF_SessionThread *sess_th = gf_list_get(fsess->threads, i);
1690 gf_th_run(sess_th->th, (gf_thread_run) gf_fs_thread_proc, sess_th);
1691 }
1692 if (fsess->no_main_thread) return GF_OK;
1693
1694 gf_fs_thread_proc(&fsess->main_th);
1695
1696 //wait for all threads to be done
1697 while (nb_threads+1 != fsess->nb_threads_stopped) {
1698 gf_sleep(1);
1699 }
1700
1701 return fsess->run_status;
1702 }
1703
gf_fs_run_step(GF_FilterSession * fsess)1704 void gf_fs_run_step(GF_FilterSession *fsess)
1705 {
1706 gf_fs_thread_proc(&fsess->main_th);
1707 }
1708
1709 GF_EXPORT
gf_fs_abort(GF_FilterSession * fsess,Bool do_flush)1710 GF_Err gf_fs_abort(GF_FilterSession *fsess, Bool do_flush)
1711 {
1712 u32 i, count;
1713 GF_LOG(GF_LOG_INFO, GF_LOG_FILTER, ("Session abort from user, stoping sources\n"));
1714 if (!fsess) return GF_BAD_PARAM;
1715
1716 if (!do_flush) {
1717 fsess->in_final_flush = GF_TRUE;
1718 fsess->run_status = GF_EOS;
1719 return GF_OK;
1720 }
1721
1722 gf_mx_p(fsess->filters_mx);
1723 count = gf_list_count(fsess->filters);
1724 //disable all sources
1725 for (i=0; i<count; i++) {
1726 GF_Filter *filter = gf_list_get(fsess->filters, i);
1727 //force end of session on all sources, and on all filters connected to these sources, and dispatch end of stream on all outputs pids of these filters
1728 //if we don't propagate on connected filters, we take the risk of not deactivating demuxers working from file
1729 //(eg ignoring input packets)
1730 if (!filter->num_input_pids) {
1731 u32 j, k, l;
1732 filter->disabled = GF_TRUE;
1733 for (j=0; j<filter->num_output_pids; j++) {
1734 GF_FilterPid *pid = gf_list_get(filter->output_pids, j);
1735 gf_filter_pid_set_eos(pid);
1736 for (k=0; k<pid->num_destinations; k++) {
1737 GF_FilterPidInst *pidi = gf_list_get(pid->destinations, k);
1738 pidi->filter->disabled = GF_TRUE;
1739 for (l=0; l<pidi->filter->num_output_pids; l++) {
1740 GF_FilterPid *opid = gf_list_get(pidi->filter->output_pids, l);
1741 if (opid->filter->freg->process_event) {
1742 GF_FilterEvent evt;
1743 GF_FEVT_INIT(evt, GF_FEVT_STOP, opid);
1744 opid->filter->freg->process_event(opid->filter, &evt);
1745 }
1746 gf_filter_pid_set_eos(opid);
1747 }
1748 }
1749 }
1750 }
1751 }
1752 fsess->in_final_flush = GF_TRUE;
1753
1754 gf_mx_v(fsess->filters_mx);
1755 return GF_OK;
1756 }
1757
1758 GF_EXPORT
gf_fs_stop(GF_FilterSession * fsess)1759 GF_Err gf_fs_stop(GF_FilterSession *fsess)
1760 {
1761 u32 i, count = fsess->threads ? gf_list_count(fsess->threads) : 0;
1762
1763 GF_LOG(GF_LOG_DEBUG, GF_LOG_FILTER, ("Session stop\n"));
1764 if (count+1 == fsess->nb_threads_stopped) {
1765 return GF_OK;
1766 }
1767
1768 if (!fsess->run_status)
1769 fsess->run_status = GF_EOS;
1770
1771 for (i=0; i < count; i++) {
1772 gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
1773 }
1774
1775 //wait for all threads to be done, we might still need flushing the main thread queue
1776 while (fsess->no_main_thread) {
1777 gf_fs_thread_proc(&fsess->main_th);
1778 if (gf_fq_count(fsess->main_thread_tasks))
1779 continue;
1780
1781 if (count && (count == fsess->nb_threads_stopped) && gf_fq_count(fsess->tasks) ) {
1782 continue;
1783 }
1784 break;
1785 }
1786 if (fsess->no_main_thread) {
1787 safe_int_inc(&fsess->nb_threads_stopped);
1788 fsess->main_th.has_seen_eot = GF_TRUE;
1789 }
1790
1791 while (count+1 != fsess->nb_threads_stopped) {
1792 for (i=0; i < count; i++) {
1793 gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
1794 }
1795 gf_sleep(0);
1796 //we may have tasks in main task list posted by other threads
1797 if (fsess->no_main_thread) {
1798 gf_fs_thread_proc(&fsess->main_th);
1799 fsess->main_th.has_seen_eot = GF_TRUE;
1800 }
1801 }
1802 return GF_OK;
1803 }
1804
print_filter_name(GF_Filter * f,Bool skip_id,Bool skip_args)1805 static GFINLINE void print_filter_name(GF_Filter *f, Bool skip_id, Bool skip_args)
1806 {
1807 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s", f->freg->name));
1808 if (strcmp(f->name, f->freg->name)) {
1809 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" \"%s\"", f->name));
1810 }
1811 if (!skip_id && f->id) GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" ID %s", f->id));
1812 if (f->dynamic_filter || skip_args) return;
1813
1814 if (!f->src_args && !f->orig_args && !f->dst_args && !f->dynamic_source_ids) return;
1815 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" ("));
1816 if (f->src_args) {
1817 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s", f->src_args));
1818 }
1819 else if (f->orig_args) {
1820 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s", f->orig_args));
1821 }
1822 else if (f->dst_args) {
1823 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s", f->dst_args));
1824 }
1825
1826 if (f->dynamic_source_ids) GF_LOG(GF_LOG_INFO, GF_LOG_APP, (",resolved SID:%s", f->source_ids));
1827 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (")"));
1828 }
1829
1830 GF_EXPORT
gf_fs_print_stats(GF_FilterSession * fsess)1831 void gf_fs_print_stats(GF_FilterSession *fsess)
1832 {
1833 u64 run_time=0, active_time=0, nb_tasks=0, nb_filters=0;
1834 u32 i, count;
1835
1836 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
1837 if (fsess->filters_mx) gf_mx_p(fsess->filters_mx);
1838
1839 count=gf_list_count(fsess->filters);
1840 for (i=0; i<count; i++) {
1841 GF_Filter *f = gf_list_get(fsess->filters, i);
1842 if (f->multi_sink_target) continue;
1843 nb_filters++;
1844 }
1845
1846 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Filter stats - %d filters\n", nb_filters));
1847 for (i=0; i<count; i++) {
1848 u32 k, ipids, opids;
1849 GF_Filter *f = gf_list_get(fsess->filters, i);
1850 if (f->multi_sink_target) continue;
1851
1852 ipids = f->num_input_pids;
1853 opids = f->num_output_pids;
1854 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\tFilter "));
1855 print_filter_name(f, GF_FALSE, GF_FALSE);
1856 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" : %d input pids %d output pids "LLU" tasks "LLU" us process time\n", ipids, opids, f->nb_tasks_done, f->time_process));
1857
1858 if (ipids) {
1859 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t"LLU" packets processed "LLU" bytes processed", f->nb_pck_processed, f->nb_bytes_processed));
1860 if (f->time_process) {
1861 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (%g pck/sec %g mbps)", (Double) f->nb_pck_processed*1000000/f->time_process, (Double) f->nb_bytes_processed*8/f->time_process));
1862 }
1863 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
1864 }
1865 if (opids) {
1866 if (f->nb_hw_pck_sent) {
1867 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t"LLU" hardware frames sent", f->nb_hw_pck_sent));
1868 if (f->time_process) {
1869 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (%g pck/sec)", (Double) f->nb_hw_pck_sent*1000000/f->time_process));
1870 }
1871
1872 } else if (f->nb_pck_sent) {
1873 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t"LLU" packets sent "LLU" bytes sent", f->nb_pck_sent, f->nb_bytes_sent));
1874 if (f->time_process) {
1875 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (%g pck/sec %g mbps)", (Double) f->nb_pck_sent*1000000/f->time_process, (Double) f->nb_bytes_sent*8/f->time_process));
1876 }
1877 }
1878 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
1879 }
1880
1881 for (k=0; k<ipids; k++) {
1882 GF_FilterPidInst *pid = gf_list_get(f->input_pids, k);
1883 if (!pid->pid) continue;
1884 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t* input PID %s: %d packets received\n", pid->pid->name, pid->pid->nb_pck_sent));
1885 }
1886 #ifndef GPAC_DISABLE_LOG
1887 for (k=0; k<opids; k++) {
1888 GF_FilterPid *pid = gf_list_get(f->output_pids, k);
1889 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t* output PID %s: %d packets sent\n", pid->name, pid->nb_pck_sent));
1890 }
1891 if (f->nb_errors) {
1892 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t%d errors while processing\n", f->nb_errors));
1893 }
1894 #endif
1895
1896 }
1897 if (fsess->filters_mx) gf_mx_v(fsess->filters_mx);
1898
1899 count=gf_list_count(fsess->threads);
1900 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Session stats - threads %d\n", 1+count));
1901
1902 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\tThread %u: run_time "LLU" us active_time "LLU" us nb_tasks "LLU"\n", 1, fsess->main_th.run_time, fsess->main_th.active_time, fsess->main_th.nb_tasks));
1903
1904 run_time+=fsess->main_th.run_time;
1905 active_time+=fsess->main_th.active_time;
1906 nb_tasks+=fsess->main_th.nb_tasks;
1907
1908 for (i=0; i<count; i++) {
1909 GF_SessionThread *s = gf_list_get(fsess->threads, i);
1910
1911 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\tThread %u: run_time "LLU" us active_time "LLU" us nb_tasks "LLU"\n", i+2, s->run_time, s->active_time, s->nb_tasks));
1912
1913 run_time+=s->run_time;
1914 active_time+=s->active_time;
1915 nb_tasks+=s->nb_tasks;
1916 }
1917 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\nTotal: run_time "LLU" us active_time "LLU" us nb_tasks "LLU"\n", run_time, active_time, nb_tasks));
1918 }
1919
gf_fs_print_filter_outputs(GF_Filter * f,GF_List * filters_done,u32 indent,GF_FilterPid * pid,GF_Filter * alias_for)1920 static void gf_fs_print_filter_outputs(GF_Filter *f, GF_List *filters_done, u32 indent, GF_FilterPid *pid, GF_Filter *alias_for)
1921 {
1922 u32 i=0;
1923
1924 while (i<indent) {
1925 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("-"));
1926 i++;
1927 }
1928
1929 if (pid) {
1930 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("(PID %s) ", pid->name));
1931 }
1932 print_filter_name(f, GF_TRUE, GF_FALSE);
1933 if (f->id) {
1934 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (ID=%s)\n", f->id));
1935 } else {
1936 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (ptr=%p)\n", f));
1937 }
1938 if (gf_list_find(filters_done, f)>=0)
1939 return;
1940
1941 gf_list_add(filters_done, f);
1942 if (alias_for) {
1943 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (<=> "));
1944 print_filter_name(alias_for, GF_TRUE, GF_TRUE);
1945 if (alias_for->id) {
1946 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" ID=%s", alias_for->id));
1947 } else {
1948 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" ptr=%p", alias_for));
1949 }
1950 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (")\n"));
1951 }
1952
1953 for (i=0; i<f->num_output_pids; i++) {
1954 u32 j, k;
1955 GF_FilterPid *pidout = gf_list_get(f->output_pids, i);
1956 for (j=0; j<pidout->num_destinations; j++) {
1957 GF_FilterPidInst *pidi = gf_list_get(pidout->destinations, j);
1958 GF_Filter *alias = NULL;
1959 for (k=0; k<gf_list_count(f->destination_filters); k++) {
1960 alias = gf_list_get(f->destination_filters, k);
1961 if (alias->multi_sink_target == pidi->filter)
1962 break;
1963 alias = NULL;
1964 }
1965 if (alias) {
1966
1967 gf_fs_print_filter_outputs(alias, filters_done, indent+1, pidout, pidi->filter);
1968 } else {
1969 gf_fs_print_filter_outputs(pidi->filter, filters_done, indent+1, pidout, NULL);
1970 }
1971 }
1972 }
1973
1974 }
1975 GF_EXPORT
gf_fs_print_connections(GF_FilterSession * fsess)1976 void gf_fs_print_connections(GF_FilterSession *fsess)
1977 {
1978 u32 i, count;
1979 Bool has_undefined=GF_FALSE;
1980 Bool has_connected=GF_FALSE;
1981 Bool has_unconnected=GF_FALSE;
1982 GF_List *filters_done;
1983 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
1984 if (fsess->filters_mx) gf_mx_p(fsess->filters_mx);
1985
1986 filters_done = gf_list_new();
1987
1988 count=gf_list_count(fsess->filters);
1989 for (i=0; i<count; i++) {
1990 GF_Filter *f = gf_list_get(fsess->filters, i);
1991 //only dump sources
1992 if (f->num_input_pids) continue;
1993 if (!f->num_output_pids) continue;
1994 if (!has_connected) {
1995 has_connected = GF_TRUE;
1996 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Filters connected:\n"));
1997 }
1998 gf_fs_print_filter_outputs(f, filters_done, 0, NULL, NULL);
1999 }
2000 for (i=0; i<count; i++) {
2001 GF_Filter *f = gf_list_get(fsess->filters, i);
2002 //only dump not connected ones
2003 if (f->num_input_pids || f->num_output_pids || f->multi_sink_target) continue;
2004 if (!has_unconnected) {
2005 has_unconnected = GF_TRUE;
2006 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Filters not connected:\n"));
2007 }
2008 gf_fs_print_filter_outputs(f, filters_done, 0, NULL, NULL);
2009 }
2010 for (i=0; i<count; i++) {
2011 GF_Filter *f = gf_list_get(fsess->filters, i);
2012 if (f->multi_sink_target) continue;
2013 if (gf_list_find(filters_done, f)>=0) continue;
2014 if (!has_undefined) {
2015 has_undefined = GF_TRUE;
2016 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Filters in unknown connection state:\n"));
2017 }
2018 gf_fs_print_filter_outputs(f, filters_done, 0, NULL, NULL);
2019 }
2020
2021 if (fsess->filters_mx) gf_mx_v(fsess->filters_mx);
2022 gf_list_del(filters_done);
2023 }
2024
2025
2026 GF_EXPORT
gf_fs_send_update(GF_FilterSession * fsess,const char * fid,GF_Filter * filter,const char * name,const char * val,GF_EventPropagateType propagate_mask)2027 void gf_fs_send_update(GF_FilterSession *fsess, const char *fid, GF_Filter *filter, const char *name, const char *val, GF_EventPropagateType propagate_mask)
2028 {
2029 GF_FilterUpdate *upd;
2030 u32 i, count;
2031 Bool removed = GF_FALSE;
2032 if ((!fid && !filter) || !name) return;
2033
2034 if (fsess->filters_mx) gf_mx_p(fsess->filters_mx);
2035
2036 if (!filter) {
2037 GF_Filter *reg_filter = NULL;
2038 count = gf_list_count(fsess->filters);
2039 for (i=0; i<count; i++) {
2040 filter = gf_list_get(fsess->filters, i);
2041 if (filter->id && !strcmp(filter->id, fid)) {
2042 break;
2043 }
2044 if (filter->name && !strcmp(filter->name, fid)) {
2045 break;
2046 }
2047 if (!reg_filter && !strcmp(filter->freg->name, fid))
2048 reg_filter = filter;
2049 filter = NULL;
2050 }
2051 if (!filter)
2052 filter = reg_filter;
2053 }
2054
2055 if (filter && filter->multi_sink_target)
2056 filter = filter->multi_sink_target;
2057
2058 removed = (!filter || filter->removed || filter->finalized) ? GF_TRUE : GF_FALSE;
2059 if (fsess->filters_mx) gf_mx_v(fsess->filters_mx);
2060
2061 if (removed) return;
2062
2063 GF_SAFEALLOC(upd, GF_FilterUpdate);
2064 if (!val) {
2065 char *sep = strchr(name, fsess->sep_name);
2066 if (sep) sep[0] = 0;
2067 upd->name = gf_strdup(name);
2068 upd->val = sep ? gf_strdup(sep+1) : NULL;
2069 if (sep) sep[0] = fsess->sep_name;
2070 } else {
2071 upd->name = gf_strdup(name);
2072 upd->val = gf_strdup(val);
2073 }
2074 upd->recursive = propagate_mask;
2075 gf_fs_post_task(fsess, gf_filter_update_arg_task, filter, NULL, "update_arg", upd);
2076 }
2077
probe_meta_check_builtin_format(GF_FilterSession * fsess,GF_FilterRegister * freg,const char * url,const char * mime,char * fargs)2078 static GF_FilterProbeScore probe_meta_check_builtin_format(GF_FilterSession *fsess, GF_FilterRegister *freg, const char *url, const char *mime, char *fargs)
2079 {
2080 char szExt[100];
2081 const char *ext = gf_file_ext_start(url);
2082 u32 len=0, i, j, count = gf_list_count(fsess->registry);
2083 if (ext) {
2084 ext++;
2085 len = (u32) strlen(ext);
2086 }
2087 //check in filter args if we have a format set, in which case replace URL ext by the given format
2088 if (fargs) {
2089 char szExtN[10];
2090 char *ext_arg;
2091 sprintf(szExtN, "ext%c", fsess->sep_name);
2092 ext_arg = strstr(fargs, szExtN);
2093 if (ext_arg) {
2094 char *next_arg;
2095 ext_arg+=4;
2096 next_arg = strchr(ext_arg, fsess->sep_args);
2097 if (next_arg) {
2098 len = (u32) (next_arg - ext_arg);
2099 } else {
2100 len = (u32) strlen(ext_arg);
2101 }
2102 if (len>99) len=99;
2103 strncpy(szExt, ext_arg, len);
2104 szExt[len] = 0;
2105 ext = szExt;
2106 }
2107 }
2108
2109 if (mime) {
2110 if (strstr(mime, "/mp4")) return GF_FPROBE_MAYBE_SUPPORTED;
2111 }
2112
2113 for (i=0; i<count; i++) {
2114 const GF_FilterArgs *dst_arg=NULL;
2115 GF_FilterRegister *reg = gf_list_get(fsess->registry, i);
2116 if (reg==freg) continue;
2117 if (reg->flags & GF_FS_REG_META) continue;
2118
2119 j=0;
2120 while (reg->args) {
2121 dst_arg = ®->args[j];
2122 if (!dst_arg || !dst_arg->arg_name) {
2123 dst_arg=NULL;
2124 break;
2125 }
2126 if (!strcmp(dst_arg->arg_name, "dst")) break;
2127 dst_arg = NULL;
2128 j++;
2129 }
2130 /*check prober*/
2131 if (dst_arg) {
2132 if (reg->probe_url) {
2133 GF_FilterProbeScore s = reg->probe_url(url, mime);
2134 if (s==GF_FPROBE_SUPPORTED)
2135 return GF_FPROBE_MAYBE_SUPPORTED;
2136 }
2137 continue;
2138 }
2139
2140 /* check muxers*/
2141 for (j=0; j<reg->nb_caps; j++) {
2142 char *value=NULL;
2143 const char *pattern = NULL;
2144 const GF_FilterCapability *cap = ®->caps[j];
2145 if (! (cap->flags & GF_CAPFLAG_OUTPUT) )
2146 continue;
2147 if (cap->flags & GF_CAPFLAG_EXCLUDED)
2148 continue;
2149
2150 if (cap->code==GF_PROP_PID_FILE_EXT) {
2151 if (ext) {
2152 value = cap->val.value.string;
2153 pattern = ext;
2154 }
2155 } else if (cap->code==GF_PROP_PID_MIME) {
2156 if (mime) {
2157 value = cap->val.value.string;
2158 pattern = mime;
2159 }
2160 }
2161 while (value) {
2162 char *match = strstr(value, pattern);
2163 if (!match) break;
2164 if (!match[len] || match[len]=='|')
2165 return GF_FPROBE_MAYBE_SUPPORTED;
2166 value = match+1;
2167 }
2168 }
2169 }
2170 return GF_FPROBE_SUPPORTED;
2171 }
2172
2173
gf_fs_load_source_dest_internal(GF_FilterSession * fsess,const char * url,const char * user_args,const char * parent_url,GF_Err * err,GF_Filter * filter,GF_Filter * dst_filter,Bool for_source,Bool no_args_inherit,Bool * probe_only)2174 GF_Filter *gf_fs_load_source_dest_internal(GF_FilterSession *fsess, const char *url, const char *user_args, const char *parent_url, GF_Err *err, GF_Filter *filter, GF_Filter *dst_filter, Bool for_source, Bool no_args_inherit, Bool *probe_only)
2175 {
2176 GF_FilterProbeScore score = GF_FPROBE_NOT_SUPPORTED;
2177 GF_FilterRegister *candidate_freg=NULL;
2178 GF_Filter *alias_for_filter = NULL;
2179 const GF_FilterArgs *src_dst_arg=NULL;
2180 u32 i, count, user_args_len, arg_type;
2181 char szForceReg[20];
2182 char szMime[50];
2183 GF_Err e;
2184 const char *force_freg = NULL;
2185 char *sURL, *mime_type, *args, *sep;
2186 char szExt[50];
2187 Bool free_url=GF_FALSE;
2188 memset(szExt, 0, sizeof(szExt));
2189
2190 if (err) *err = GF_OK;
2191
2192 mime_type = NULL;
2193 //destination, extract mime from arguments
2194 if (!for_source) {
2195 sprintf(szMime, "%cmime=", fsess->sep_args);
2196 mime_type = strstr(url, szMime);
2197 if (!mime_type && user_args)
2198 mime_type = strstr(user_args, szMime);
2199
2200 if (mime_type) {
2201 strncpy(szMime, mime_type+6, 49);
2202 szMime[49]=0;
2203 sep = strchr(szMime, fsess->sep_args);
2204 if (sep) sep[0] = 0;
2205 mime_type = szMime;
2206 }
2207 }
2208 sURL = NULL;
2209 if (!url || !strncmp(url, "\\\\", 2) ) {
2210 return NULL;
2211 }
2212 if (filter) {
2213 sURL = (char *) url;
2214 } else {
2215 /*used by GUIs scripts to skip URL concatenation*/
2216 if (!strncmp(url, "gpac://", 7)) sURL = gf_strdup(url+7);
2217 /*opera-style localhost URLs*/
2218 else if (!strncmp(url, "file://localhost", 16)) sURL = gf_strdup(url+16);
2219 else if (parent_url) sURL = gf_url_concatenate(parent_url, url);
2220
2221 /*path absolute*/
2222 if (!sURL) sURL = gf_strdup(url);
2223 free_url=GF_TRUE;
2224
2225 if (!strncmp(sURL, "gpac://", 7)) {
2226 u32 ulen = (u32) strlen(sURL+7);
2227 memmove(sURL, sURL+7, ulen);
2228 sURL[ulen]=0;
2229 }
2230
2231 if (for_source && gf_url_is_local(sURL)) {
2232 char *frag_par, *cgi, *ext_start;
2233 char f_c=0;
2234 gf_url_to_fs_path(sURL);
2235 sep = (char *)gf_fs_path_escape_colon(fsess, sURL);
2236 if (sep) sep[0] = 0;
2237
2238 ext_start = gf_file_ext_start(sURL);
2239 if (!ext_start) ext_start = sURL;
2240 frag_par = strchr(ext_start, '#');
2241 cgi = strchr(ext_start, '?');
2242 if (frag_par && cgi && (cgi<frag_par))
2243 frag_par = cgi;
2244
2245 if (frag_par) {
2246 f_c = frag_par[0];
2247 frag_par[0] = 0;
2248 }
2249
2250 if (strcmp(sURL, "null") && strcmp(sURL, "-") && strcmp(sURL, "stdin") && ! gf_file_exists(sURL)) {
2251 if (sep) sep[0] = fsess->sep_args;
2252 if (frag_par) frag_par[0] = f_c;
2253
2254 if (err) *err = GF_URL_ERROR;
2255 gf_free(sURL);
2256 return NULL;
2257 }
2258 if (frag_par) frag_par[0] = f_c;
2259 if (sep) sep[0] = fsess->sep_args;
2260 }
2261 }
2262 sep = (char *)gf_fs_path_escape_colon(fsess, sURL);
2263
2264 sprintf(szForceReg, "gfreg%c", fsess->sep_name);
2265 force_freg = NULL;
2266 if (sep) {
2267 sep[0] = 0;
2268 force_freg = strstr(sep+1, szForceReg);
2269 }
2270 if (!force_freg && user_args) {
2271 force_freg = strstr(user_args, szForceReg);
2272 }
2273 if (force_freg)
2274 force_freg += 6;
2275
2276 restart:
2277 //check all our registered filters
2278 count = gf_list_count(fsess->registry);
2279 for (i=0; i<count; i++) {
2280 u32 j;
2281 GF_FilterProbeScore s;
2282 GF_FilterRegister *freg = gf_list_get(fsess->registry, i);
2283 if (! freg->probe_url) continue;
2284 if (force_freg && strncmp(force_freg, freg->name, strlen(freg->name))) continue;
2285 if (! freg->args) continue;
2286 if (filter && (gf_list_find(filter->blacklisted, freg) >=0)) continue;
2287
2288 j=0;
2289 while (freg->args) {
2290 src_dst_arg = &freg->args[j];
2291 if (!src_dst_arg || !src_dst_arg->arg_name) {
2292 src_dst_arg=NULL;
2293 break;
2294 }
2295 if (for_source && !strcmp(src_dst_arg->arg_name, "src")) break;
2296 else if (!for_source && !strcmp(src_dst_arg->arg_name, "dst")) break;
2297 src_dst_arg = NULL;
2298 j++;
2299 }
2300 if (!src_dst_arg)
2301 continue;
2302
2303 s = freg->probe_url(sURL, mime_type);
2304 /* destination meta filter: change GF_FPROBE_SUPPORTED to GF_FPROBE_MAYBE_SUPPORTED for internal mux formats
2305 in order to avoid always giving the hand to the meta filter*/
2306 if (!for_source && (s == GF_FPROBE_SUPPORTED) && (freg->flags & GF_FS_REG_META)) {
2307 s = probe_meta_check_builtin_format(fsess, freg, sURL, mime_type, sep ? sep+1 : NULL);
2308 }
2309 //higher score, use this new registry
2310 if (s > score) {
2311 candidate_freg = freg;
2312 score = s;
2313 }
2314 //same score and higher priority (0 being highest), use this new registry
2315 else if ((s == score) && candidate_freg && (freg->priority<candidate_freg->priority) ) {
2316 candidate_freg = freg;
2317 score = s;
2318 }
2319 }
2320 if (probe_only) {
2321 *probe_only = candidate_freg ? GF_TRUE : GF_FALSE;
2322 if (free_url)
2323 gf_free(sURL);
2324 if (err) *err = GF_OK;
2325 return NULL;
2326 }
2327
2328 if (!candidate_freg) {
2329 if (force_freg) {
2330 GF_LOG(GF_LOG_INFO, GF_LOG_FILTER, ("No source filter named %s found, retrying without forcing registry\n", force_freg));
2331 force_freg = NULL;
2332 goto restart;
2333 }
2334 if (free_url)
2335 gf_free(sURL);
2336 if (err) *err = GF_NOT_SUPPORTED;
2337 if (filter) filter->finalized = GF_TRUE;
2338 return NULL;
2339 }
2340 if (sep) sep[0] = fsess->sep_args;
2341
2342 user_args_len = user_args ? (u32) strlen(user_args) : 0;
2343 args = gf_malloc(sizeof(char)*(5+strlen(sURL) + (user_args_len ? user_args_len + 8/*for potential :gpac: */ :0) ) );
2344
2345 sprintf(args, "%s%c", for_source ? "src" : "dst", fsess->sep_name);
2346 strcat(args, sURL);
2347 if (user_args_len) {
2348 if (fsess->sep_args==':') strcat(args, ":gpac:");
2349 else {
2350 char szSep[2];
2351 szSep[0] = fsess->sep_args;
2352 szSep[1] = 0;
2353 strcat(args, szSep);
2354 }
2355 strcat(args, user_args);
2356 }
2357
2358 e = GF_OK;
2359 arg_type = GF_FILTER_ARG_EXPLICIT_SINK;
2360 if (for_source) {
2361 if (no_args_inherit) arg_type = GF_FILTER_ARG_EXPLICIT_SOURCE_NO_DST_INHERIT;
2362 else arg_type = GF_FILTER_ARG_EXPLICIT_SOURCE;
2363 }
2364
2365 if (!for_source && candidate_freg->use_alias) {
2366 u32 fcount = gf_list_count(fsess->filters);
2367 for (i=0; i<fcount; i++) {
2368 GF_Filter *f = gf_list_get(fsess->filters, i);
2369 if (f->freg != candidate_freg) continue;
2370 if (f->freg->use_alias(f, sURL, mime_type)) {
2371 alias_for_filter = f;
2372 break;
2373 }
2374 }
2375 }
2376
2377 if (!filter) {
2378 filter = gf_filter_new(fsess, candidate_freg, args, NULL, arg_type, err, alias_for_filter, GF_FALSE);
2379 } else {
2380 filter->freg = candidate_freg;
2381 e = gf_filter_new_finalize(filter, args, arg_type);
2382 if (err) *err = e;
2383 }
2384
2385 if (free_url)
2386 gf_free(sURL);
2387
2388 if (filter) {
2389 if (filter->src_args) gf_free(filter->src_args);
2390 filter->src_args = args;
2391 //for link resolution
2392 if (dst_filter && for_source) {
2393 if (gf_list_find(filter->destination_links, dst_filter)<0)
2394 gf_list_add(filter->destination_links, dst_filter);
2395 //to remember our connection target
2396 filter->target_filter = dst_filter;
2397 }
2398 } else {
2399 gf_free(args);
2400 }
2401
2402 if (!e && filter && !filter->num_output_pids && for_source)
2403 gf_filter_post_process_task(filter);
2404
2405 return filter;
2406 }
2407
2408 GF_EXPORT
gf_fs_load_source(GF_FilterSession * fsess,const char * url,const char * args,const char * parent_url,GF_Err * err)2409 GF_Filter *gf_fs_load_source(GF_FilterSession *fsess, const char *url, const char *args, const char *parent_url, GF_Err *err)
2410 {
2411 return gf_fs_load_source_dest_internal(fsess, url, args, parent_url, err, NULL, NULL, GF_TRUE, GF_FALSE, NULL);
2412 }
2413
2414 GF_EXPORT
gf_fs_load_destination(GF_FilterSession * fsess,const char * url,const char * args,const char * parent_url,GF_Err * err)2415 GF_Filter *gf_fs_load_destination(GF_FilterSession *fsess, const char *url, const char *args, const char *parent_url, GF_Err *err)
2416 {
2417 return gf_fs_load_source_dest_internal(fsess, url, args, parent_url, err, NULL, NULL, GF_FALSE, GF_FALSE, NULL);
2418 }
2419
2420
2421 GF_EXPORT
gf_filter_add_event_listener(GF_Filter * filter,GF_FSEventListener * el)2422 GF_Err gf_filter_add_event_listener(GF_Filter *filter, GF_FSEventListener *el)
2423 {
2424 GF_Err e;
2425 if (!filter || !filter->session || !el || !el->on_event) return GF_BAD_PARAM;
2426 while (filter->session->in_event_listener) gf_sleep(1);
2427 gf_mx_p(filter->session->evt_mx);
2428 if (!filter->session->event_listeners) {
2429 filter->session->event_listeners = gf_list_new();
2430 }
2431 e = gf_list_add(filter->session->event_listeners, el);
2432 gf_mx_v(filter->session->evt_mx);
2433 return e;
2434 }
2435
2436 GF_EXPORT
gf_filter_remove_event_listener(GF_Filter * filter,GF_FSEventListener * el)2437 GF_Err gf_filter_remove_event_listener(GF_Filter *filter, GF_FSEventListener *el)
2438 {
2439 if (!filter || !filter->session || !el || !filter->session->event_listeners) return GF_BAD_PARAM;
2440
2441 while (filter->session->in_event_listener) gf_sleep(1);
2442 gf_mx_p(filter->session->evt_mx);
2443 gf_list_del_item(filter->session->event_listeners, el);
2444 if (!gf_list_count(filter->session->event_listeners)) {
2445 gf_list_del(filter->session->event_listeners);
2446 filter->session->event_listeners=NULL;
2447 }
2448 gf_mx_v(filter->session->evt_mx);
2449 return GF_OK;
2450 }
2451
2452 GF_EXPORT
gf_filter_forward_gf_event(GF_Filter * filter,GF_Event * evt,Bool consumed,Bool skip_user)2453 Bool gf_filter_forward_gf_event(GF_Filter *filter, GF_Event *evt, Bool consumed, Bool skip_user)
2454 {
2455 if (!filter || !filter->session || filter->session->in_final_flush) return GF_FALSE;
2456
2457 if (filter->session->event_listeners) {
2458 GF_FSEventListener *el;
2459 u32 i=0;
2460
2461 gf_mx_p(filter->session->evt_mx);
2462 filter->session->in_event_listener ++;
2463 gf_mx_v(filter->session->evt_mx);
2464 while ((el = gf_list_enum(filter->session->event_listeners, &i))) {
2465 if (el->on_event(el->udta, evt, consumed)) {
2466 filter->session->in_event_listener --;
2467 return GF_TRUE;
2468 }
2469 }
2470 filter->session->in_event_listener --;
2471 }
2472
2473 if (!skip_user && !consumed && filter->session->ui_event_proc) {
2474 Bool res;
2475 // term->nb_calls_in_event_proc++;
2476 res = gf_fs_ui_event(filter->session, evt);
2477 // term->nb_calls_in_event_proc--;
2478 return res;
2479 }
2480 return GF_FALSE;
2481 }
2482
2483 GF_EXPORT
gf_filter_send_gf_event(GF_Filter * filter,GF_Event * evt)2484 Bool gf_filter_send_gf_event(GF_Filter *filter, GF_Event *evt)
2485 {
2486 return gf_filter_forward_gf_event(filter, evt, GF_FALSE, GF_FALSE);
2487 }
2488
2489
gf_fs_print_jsf_connection(GF_FilterSession * session,char * filter_name,void (* print_fn)(FILE * output,GF_SysPrintArgFlags flags,const char * fmt,...))2490 static void gf_fs_print_jsf_connection(GF_FilterSession *session, char *filter_name, void (*print_fn)(FILE *output, GF_SysPrintArgFlags flags, const char *fmt, ...) )
2491 {
2492 GF_CapsBundleStore capstore;
2493 GF_Filter *js_filter;
2494 const char *js_name = NULL;
2495 GF_Err e=GF_OK;
2496 u32 i, j, count, nb_js_caps;
2497 GF_List *sources, *sinks;
2498 GF_FilterRegister loaded_freg;
2499 Bool has_output, has_input;
2500
2501 js_filter = gf_fs_load_filter(session, filter_name, &e);
2502 if (!js_filter) return;
2503
2504 js_name = strrchr(filter_name, '/');
2505 if (!js_name) js_name = strrchr(filter_name, '\\');
2506 if (js_name) js_name++;
2507 else js_name = filter_name;
2508
2509 nb_js_caps = gf_filter_caps_bundle_count(js_filter->forced_caps, js_filter->nb_forced_caps);
2510
2511 //fake a new register with only the caps set
2512 memset(&loaded_freg, 0, sizeof(GF_FilterRegister));
2513 loaded_freg.caps = js_filter->forced_caps;
2514 loaded_freg.nb_caps = js_filter->nb_forced_caps;
2515
2516 has_output = gf_filter_has_out_caps(js_filter->forced_caps, js_filter->nb_forced_caps);
2517 has_input = gf_filter_has_in_caps(js_filter->forced_caps, js_filter->nb_forced_caps);
2518
2519 memset(&capstore, 0, sizeof(GF_CapsBundleStore));
2520 sources = gf_list_new();
2521 sinks = gf_list_new();
2522 //edges for JS are for the unloaded JSF (eg accept anything, output anything).
2523 //we need to do a manual check
2524 count = gf_list_count(session->links);
2525 for (i=0; i<count; i++) {
2526 u32 nb_src_caps, k, l;
2527 Bool src_match = GF_FALSE;
2528 Bool sink_match = GF_FALSE;
2529 GF_FilterRegDesc *a_reg = gf_list_get(session->links, i);
2530 if (a_reg->freg == js_filter->freg) continue;
2531
2532 //check which cap of this filter matches our destination
2533 nb_src_caps = gf_filter_caps_bundle_count(a_reg->freg->caps, a_reg->freg->nb_caps);
2534 for (k=0; k<nb_src_caps; k++) {
2535 for (l=0; l<nb_js_caps; l++) {
2536 s32 bundle_idx;
2537 u32 loaded_filter_only_flags = 0;
2538 u32 path_weight;
2539 if (has_input && !src_match) {
2540 path_weight = gf_filter_caps_to_caps_match(a_reg->freg, k, (const GF_FilterRegister *) &loaded_freg, NULL, &bundle_idx, l, &loaded_filter_only_flags, &capstore);
2541 if (path_weight && (bundle_idx == l))
2542 src_match = GF_TRUE;
2543 }
2544 if (has_output && !sink_match) {
2545 loaded_filter_only_flags = 0;
2546 path_weight = gf_filter_caps_to_caps_match(&loaded_freg, l, a_reg->freg, NULL, &bundle_idx, k, &loaded_filter_only_flags, &capstore);
2547
2548 if (path_weight && (bundle_idx == k))
2549 sink_match = GF_TRUE;
2550 }
2551 }
2552 if (src_match && sink_match)
2553 break;
2554 }
2555 if (src_match) gf_list_add(sources, (void *) a_reg->freg);
2556 if (sink_match) gf_list_add(sinks, (void *) a_reg->freg);
2557 }
2558
2559 for (i=0; i<2; i++) {
2560 GF_List *from = i ? sinks : sources;
2561 char *type = i ? "sources" : "sinks";
2562
2563 count = gf_list_count(from);
2564 if (!count) {
2565 if (print_fn)
2566 print_fn(stderr, 1, "%s: no %s\n", js_name, type);
2567 else {
2568 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s: no %s\n", type));
2569 }
2570 continue;
2571 }
2572
2573 if (print_fn)
2574 print_fn(stderr, 1, "%s %s:", js_name, type);
2575 else {
2576 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s %s:", js_name, type));
2577 }
2578 for (j=0; j<count; j++) {
2579 GF_FilterRegister *a_reg = gf_list_get(from, j);
2580 if (print_fn)
2581 print_fn(stderr, 0, " %s", a_reg->name);
2582 else {
2583 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" %s", a_reg->name));
2584 }
2585 }
2586 if (print_fn)
2587 print_fn(stderr, 0, "\n");
2588 else {
2589 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
2590 }
2591 }
2592
2593 if (capstore.bundles_cap_found) gf_free(capstore.bundles_cap_found);
2594 if (capstore.bundles_in_ok) gf_free(capstore.bundles_in_ok);
2595 if (capstore.bundles_in_scores) gf_free(capstore.bundles_in_scores);
2596 gf_list_del(sources);
2597 gf_list_del(sinks);
2598 }
2599
2600 GF_EXPORT
gf_fs_print_all_connections(GF_FilterSession * session,char * filter_name,void (* print_fn)(FILE * output,GF_SysPrintArgFlags flags,const char * fmt,...))2601 void gf_fs_print_all_connections(GF_FilterSession *session, char *filter_name, void (*print_fn)(FILE *output, GF_SysPrintArgFlags flags, const char *fmt, ...) )
2602 {
2603 Bool found = GF_FALSE;
2604 GF_List *done;
2605 u32 i, j, count;
2606 u32 llev = gf_log_get_tool_level(GF_LOG_FILTER);
2607
2608 gf_log_set_tool_level(GF_LOG_FILTER, GF_LOG_INFO);
2609 //load JS to inspect its connections
2610 if (filter_name && strstr(filter_name, ".js")) {
2611 gf_fs_print_jsf_connection(session, filter_name, print_fn);
2612 gf_log_set_tool_level(GF_LOG_FILTER, llev);
2613 return;
2614 }
2615 done = gf_list_new();
2616 count = gf_list_count(session->links);
2617
2618 for (i=0; i<count; i++) {
2619 const GF_FilterRegDesc *src = gf_list_get(session->links, i);
2620 if (filter_name && strcmp(src->freg->name, filter_name))
2621 continue;
2622
2623 if (!src->nb_edges) {
2624 if (print_fn)
2625 print_fn(stderr, 1, "%s: no sources\n", src->freg->name);
2626 else {
2627 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s: no sources\n", src->freg->name));
2628 }
2629 continue;
2630 }
2631 found = GF_TRUE;
2632 if (print_fn)
2633 print_fn(stderr, 1, "%s sources:", src->freg->name);
2634 else {
2635 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s sources:", src->freg->name));
2636 }
2637
2638 for (j=0; j<src->nb_edges; j++) {
2639 if (gf_list_find(done, (void *) src->edges[j].src_reg->freg->name)<0) {
2640 if (print_fn)
2641 print_fn(stderr, 0, " %s", src->edges[j].src_reg->freg->name);
2642 else {
2643 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" %s", src->edges[j].src_reg->freg->name));
2644 }
2645 gf_list_add(done, (void *) src->edges[j].src_reg->freg->name);
2646 }
2647 }
2648 if (print_fn)
2649 print_fn(stderr, 0, "\n");
2650 else {
2651 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
2652 }
2653 gf_list_reset(done);
2654 }
2655
2656 if (found && filter_name) {
2657 if (print_fn)
2658 print_fn(stderr, 1, "%s sinks:", filter_name);
2659 else {
2660 GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s sinks:", filter_name));
2661 }
2662 count = gf_list_count(session->links);
2663 for (i=0; i<count; i++) {
2664 const GF_FilterRegDesc *src = gf_list_get(session->links, i);
2665 if (!strcmp(src->freg->name, filter_name)) continue;
2666
2667 for (j=0; j<src->nb_edges; j++) {
2668 if (strcmp(src->edges[j].src_reg->freg->name, filter_name)) continue;
2669
2670 if (gf_list_find(done, (void *) src->freg->name)<0) {
2671 if (print_fn)
2672 print_fn(stderr, 0, " %s", src->freg->name);
2673 else {
2674 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" %s", src->freg->name));
2675 }
2676 gf_list_add(done, (void *) src->freg->name);
2677 }
2678 }
2679 gf_list_reset(done);
2680 }
2681 if (print_fn)
2682 print_fn(stderr, 1, " \n");
2683 else {
2684 GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" \n"));
2685 }
2686 }
2687
2688 if (!found && filter_name) {
2689 if (print_fn)
2690 print_fn(stderr, 1, "%s filter not found\n", filter_name);
2691 else {
2692 GF_LOG(GF_LOG_ERROR, GF_LOG_APP, ("%s filter not found\n", filter_name));
2693 }
2694 }
2695 gf_list_del(done);
2696 gf_log_set_tool_level(GF_LOG_FILTER, llev);
2697 }
2698
2699
2700 GF_EXPORT
gf_filter_get_session_caps(GF_Filter * filter,GF_FilterSessionCaps * caps)2701 void gf_filter_get_session_caps(GF_Filter *filter, GF_FilterSessionCaps *caps)
2702 {
2703 if (caps) {
2704 if (filter) {
2705 (*caps) = filter->session->caps;
2706 } else {
2707 memset(caps, 0, sizeof(GF_FilterSessionCaps));
2708 }
2709 }
2710 }
2711
2712 GF_EXPORT
gf_filter_set_session_caps(GF_Filter * filter,GF_FilterSessionCaps * caps)2713 void gf_filter_set_session_caps(GF_Filter *filter, GF_FilterSessionCaps *caps)
2714 {
2715 if (caps && filter) {
2716 filter->session->caps = (*caps);
2717 //TODO fire event
2718 }
2719 }
2720
2721 GF_EXPORT
gf_filter_get_sep(GF_Filter * filter,GF_FilterSessionSepType sep_type)2722 u8 gf_filter_get_sep(GF_Filter *filter, GF_FilterSessionSepType sep_type)
2723 {
2724 switch (sep_type) {
2725 case GF_FS_SEP_ARGS: return filter->session->sep_args;
2726 case GF_FS_SEP_NAME: return filter->session->sep_name;
2727 case GF_FS_SEP_FRAG: return filter->session->sep_frag;
2728 case GF_FS_SEP_LIST: return filter->session->sep_list;
2729 case GF_FS_SEP_NEG: return filter->session->sep_neg;
2730 default: return 0;
2731 }
2732 }
2733
2734 GF_EXPORT
gf_filter_get_download_manager(GF_Filter * filter)2735 GF_DownloadManager *gf_filter_get_download_manager(GF_Filter *filter)
2736 {
2737 GF_FilterSession *fsess;
2738 if (!filter) return NULL;
2739 fsess = filter->session;
2740
2741 if (!fsess->download_manager) {
2742 fsess->download_manager = gf_dm_new(fsess);
2743 }
2744 return fsess->download_manager;
2745 }
2746
2747 GF_EXPORT
gf_filter_get_font_manager(GF_Filter * filter)2748 struct _gf_ft_mgr *gf_filter_get_font_manager(GF_Filter *filter)
2749 {
2750 GF_FilterSession *fsess;
2751 if (!filter) return NULL;
2752 fsess = filter->session;
2753
2754 if (!fsess->font_manager) {
2755 fsess->font_manager = gf_font_manager_new();
2756 }
2757 return fsess->font_manager;
2758 }
2759
gf_fs_cleanup_filters(GF_FilterSession * fsess)2760 void gf_fs_cleanup_filters(GF_FilterSession *fsess)
2761 {
2762 assert(fsess->pid_connect_tasks_pending);
2763 safe_int_dec(&fsess->pid_connect_tasks_pending);
2764 }
2765
2766 GF_EXPORT
gf_fs_get_last_connect_error(GF_FilterSession * fs)2767 GF_Err gf_fs_get_last_connect_error(GF_FilterSession *fs)
2768 {
2769 GF_Err e;
2770 if (!fs) return GF_BAD_PARAM;
2771 e = fs->last_connect_error;
2772 fs->last_connect_error = GF_OK;
2773 return e;
2774 }
2775
2776 GF_EXPORT
gf_fs_get_last_process_error(GF_FilterSession * fs)2777 GF_Err gf_fs_get_last_process_error(GF_FilterSession *fs)
2778 {
2779 GF_Err e;
2780 if (!fs) return GF_BAD_PARAM;
2781 e = fs->last_process_error;
2782 fs->last_process_error = GF_OK;
2783 return e;
2784 }
2785
2786 typedef struct
2787 {
2788 GF_FilterSession *fsess;
2789 void *callback;
2790 Bool (*task_execute) (GF_FilterSession *fsess, void *callback, u32 *reschedule_ms);
2791 Bool (*task_execute_filter) (GF_Filter *filter, void *callback, u32 *reschedule_ms);
2792 #ifndef GPAC_DISABLE_REMOTERY
2793 rmtU32 rmt_hash;
2794 #endif
2795 } GF_UserTask;
2796
gf_fs_user_task(GF_FSTask * task)2797 static void gf_fs_user_task(GF_FSTask *task)
2798 {
2799 u32 reschedule_ms=0;
2800 GF_UserTask *utask = (GF_UserTask *)task->udta;
2801 task->schedule_next_time = 0;
2802
2803 #ifndef GPAC_DISABLE_REMOTERY
2804 gf_rmt_begin_hash(task->log_name, GF_RMT_AGGREGATE, &utask->rmt_hash);
2805 #endif
2806 if (utask->task_execute) {
2807 task->requeue_request = utask->task_execute(utask->fsess, utask->callback, &reschedule_ms);
2808 } else if (task->filter) {
2809 task->requeue_request = utask->task_execute_filter(task->filter, utask->callback, &reschedule_ms);
2810 } else {
2811 task->requeue_request = 0;
2812 }
2813 gf_rmt_end();
2814 //if no requeue request or if we are in final flush, don't re-execute
2815 if (!task->requeue_request || utask->fsess->in_final_flush) {
2816 gf_free(utask);
2817 task->udta = NULL;
2818 task->requeue_request = GF_FALSE;
2819 } else {
2820 task->schedule_next_time = gf_sys_clock_high_res() + 1000*reschedule_ms;
2821 }
2822 }
2823
2824 GF_EXPORT
gf_fs_post_user_task(GF_FilterSession * fsess,Bool (* task_execute)(GF_FilterSession * fsess,void * callback,u32 * reschedule_ms),void * udta_callback,const char * log_name)2825 GF_Err gf_fs_post_user_task(GF_FilterSession *fsess, Bool (*task_execute) (GF_FilterSession *fsess, void *callback, u32 *reschedule_ms), void *udta_callback, const char *log_name)
2826 {
2827 GF_UserTask *utask;
2828 if (!fsess || !task_execute) return GF_BAD_PARAM;
2829 GF_SAFEALLOC(utask, GF_UserTask);
2830 if (!utask) return GF_OUT_OF_MEM;
2831 utask->fsess = fsess;
2832 utask->callback = udta_callback;
2833 utask->task_execute = task_execute;
2834 gf_fs_post_task(fsess, gf_fs_user_task, NULL, NULL, log_name ? log_name : "user_task", utask);
2835 return GF_OK;
2836 }
2837
2838 GF_EXPORT
gf_filter_post_task(GF_Filter * filter,Bool (* task_execute)(GF_Filter * filter,void * callback,u32 * reschedule_ms),void * udta,const char * task_name)2839 GF_Err gf_filter_post_task(GF_Filter *filter, Bool (*task_execute) (GF_Filter *filter, void *callback, u32 *reschedule_ms), void *udta, const char *task_name)
2840 {
2841 GF_UserTask *utask;
2842 if (!filter || !task_execute) return GF_BAD_PARAM;
2843 GF_SAFEALLOC(utask, GF_UserTask);
2844 if (!utask) return GF_OUT_OF_MEM;
2845 utask->callback = udta;
2846 utask->task_execute_filter = task_execute;
2847 utask->fsess = filter->session;
2848 gf_fs_post_task(filter->session, gf_fs_user_task, filter, NULL, task_name ? task_name : "user_task", utask);
2849 return GF_OK;
2850 }
2851
2852
2853 GF_EXPORT
gf_fs_is_last_task(GF_FilterSession * fsess)2854 Bool gf_fs_is_last_task(GF_FilterSession *fsess)
2855 {
2856 if (!fsess) return GF_TRUE;
2857 if (fsess->tasks_pending>1) return GF_FALSE;
2858 if (gf_fq_count(fsess->main_thread_tasks)) return GF_FALSE;
2859 if (gf_fq_count(fsess->tasks)) return GF_FALSE;
2860 return GF_TRUE;
2861 }
2862
2863 GF_EXPORT
gf_fs_mime_supported(GF_FilterSession * fsess,const char * mime)2864 Bool gf_fs_mime_supported(GF_FilterSession *fsess, const char *mime)
2865 {
2866 u32 i, count;
2867 //first pass on explicit mimes
2868 count = gf_list_count(fsess->registry);
2869 for (i=0; i<count; i++) {
2870 u32 j;
2871 const GF_FilterRegister *freg = gf_list_get(fsess->registry, i);
2872 for (j=0; j<freg->nb_caps; j++) {
2873 const GF_FilterCapability *acap = &freg->caps[j];
2874 if (!(acap->flags & GF_CAPFLAG_INPUT)) continue;
2875 if (acap->code == GF_PROP_PID_MIME) {
2876 if (acap->val.value.string && strstr(acap->val.value.string, mime)) return GF_TRUE;
2877 }
2878 }
2879 }
2880 return GF_FALSE;
2881 }
2882
2883
2884 GF_EXPORT
gf_fs_enable_reporting(GF_FilterSession * session,Bool reporting_on)2885 void gf_fs_enable_reporting(GF_FilterSession *session, Bool reporting_on)
2886 {
2887 if (session) session->reporting_on = reporting_on;
2888 }
2889
2890 GF_EXPORT
gf_fs_lock_filters(GF_FilterSession * session,Bool do_lock)2891 void gf_fs_lock_filters(GF_FilterSession *session, Bool do_lock)
2892 {
2893 if (!session || !session->filters_mx) return;
2894 if (do_lock) gf_mx_p(session->filters_mx);
2895 else gf_mx_v(session->filters_mx);
2896 }
2897
2898 GF_EXPORT
gf_fs_get_filters_count(GF_FilterSession * session)2899 u32 gf_fs_get_filters_count(GF_FilterSession *session)
2900 {
2901 return session ? gf_list_count(session->filters) : 0;
2902 }
2903
2904 GF_EXPORT
gf_fs_get_filter_stats(GF_FilterSession * session,u32 idx,GF_FilterStats * stats)2905 GF_Err gf_fs_get_filter_stats(GF_FilterSession *session, u32 idx, GF_FilterStats *stats)
2906 {
2907 GF_Filter *f;
2908 u32 i;
2909 Bool set_name=GF_FALSE;
2910 if (!stats || !session) return GF_BAD_PARAM;
2911 memset(stats, 0, sizeof(GF_FilterStats));
2912 f = gf_list_get(session->filters, idx);
2913 if (!f) return GF_BAD_PARAM;
2914 stats->filter = f;
2915 stats->filter_alias = f->multi_sink_target;
2916 if (f->multi_sink_target) return GF_OK;
2917
2918 stats->percent = f->status_percent>10000 ? -1 : (s32) f->status_percent;
2919 stats->status = f->status_str;
2920 stats->nb_pck_processed = f->nb_pck_processed;
2921 stats->nb_bytes_processed = f->nb_bytes_processed;
2922 stats->time_process = f->time_process;
2923 stats->nb_hw_pck_sent = f->nb_hw_pck_sent;
2924 stats->nb_pck_sent = f->nb_pck_sent;
2925 stats->nb_bytes_sent = f->nb_bytes_sent;
2926 stats->nb_tasks_done = f->nb_tasks_done;
2927 stats->nb_errors = f->nb_errors;
2928 stats->name = f->name;
2929 stats->reg_name = f->freg->name;
2930 stats->filter_id = f->id;
2931 stats->done = f->removed || f->finalized;
2932 if (stats->name && !strcmp(stats->name, stats->reg_name)) {
2933 set_name=GF_TRUE;
2934 }
2935 stats->report_updated = f->report_updated;
2936 f->report_updated = GF_FALSE;
2937
2938
2939 if (!stats->nb_pid_out && stats->nb_pid_in) stats->type = GF_FS_STATS_FILTER_RAWOUT;
2940 else if (!stats->nb_pid_in && stats->nb_pid_out) stats->type = GF_FS_STATS_FILTER_RAWIN;
2941
2942 stats->nb_pid_out = f->num_output_pids;
2943 for (i=0; i<f->num_output_pids; i++) {
2944 GF_FilterPid *pid = gf_list_get(f->output_pids, i);
2945 stats->nb_out_pck += pid->nb_pck_sent;
2946 if (pid->has_seen_eos) stats->in_eos = GF_TRUE;
2947
2948 if (f->num_output_pids!=1) continue;
2949
2950 if (!stats->codecid)
2951 stats->codecid = pid->codecid;
2952 if (!stats->stream_type)
2953 stats->stream_type = pid->stream_type;
2954
2955 //set name if PID name is not a default generated one
2956 if (set_name && strncmp(pid->name, "PID", 3)) {
2957 stats->name = pid->name;
2958 set_name = GF_FALSE;
2959 }
2960 }
2961 stats->nb_pid_in = f->num_input_pids;
2962 for (i=0; i<f->num_input_pids; i++) {
2963 GF_FilterPidInst *pidi = gf_list_get(f->input_pids, i);
2964 stats->nb_in_pck += pidi->nb_processed;
2965 if (pidi->is_end_of_stream) stats->in_eos = GF_TRUE;
2966
2967 if (pidi->is_decoder_input) stats->type = GF_FS_STATS_FILTER_DECODE;
2968 else if (pidi->is_encoder_input) stats->type = GF_FS_STATS_FILTER_ENCODE;
2969
2970 if (pidi->pid->stream_type==GF_STREAM_FILE)
2971 stats->type = GF_FS_STATS_FILTER_DEMUX;
2972
2973 if ((f->num_input_pids!=1) && f->num_output_pids)
2974 continue;
2975
2976 if (!stats->codecid)
2977 stats->codecid = pidi->pid->codecid;
2978 if (!stats->stream_type)
2979 stats->stream_type = pidi->pid->stream_type;
2980
2981 if (set_name) {
2982 stats->name = pidi->pid->name;
2983 set_name = GF_FALSE;
2984 }
2985 }
2986 return GF_OK;
2987 }
2988
gf_fs_ui_event(GF_FilterSession * session,GF_Event * uievt)2989 Bool gf_fs_ui_event(GF_FilterSession *session, GF_Event *uievt)
2990 {
2991 Bool ret;
2992 gf_mx_p(session->ui_mx);
2993 ret = session->ui_event_proc(session->ui_opaque, uievt);
2994 gf_mx_v(session->ui_mx);
2995 return ret;
2996 }
2997
gf_fs_check_graph_load(GF_FilterSession * fsess,Bool for_load)2998 void gf_fs_check_graph_load(GF_FilterSession *fsess, Bool for_load)
2999 {
3000 if (for_load) {
3001 if (!fsess->links || ! gf_list_count( fsess->links))
3002 gf_filter_sess_build_graph(fsess, NULL);
3003 } else {
3004 if (fsess->flags & GF_FS_FLAG_NO_GRAPH_CACHE)
3005 gf_filter_sess_reset_graph(fsess, NULL);
3006 }
3007 }
3008
3009 #ifndef GPAC_DISABLE_3D
fsess_on_event(void * cbk,GF_Event * evt)3010 static Bool fsess_on_event(void *cbk, GF_Event *evt)
3011 {
3012 return GF_TRUE;
3013 }
3014
gf_fs_check_gl_provider(GF_FilterSession * session)3015 GF_Err gf_fs_check_gl_provider(GF_FilterSession *session)
3016 {
3017 GF_Event evt;
3018 GF_Err e;
3019 const char *sOpt;
3020 void *os_disp_handler;
3021
3022 if (!session->nb_gl_filters) return GF_OK;
3023 if (gf_list_count(session->gl_providers)) return GF_OK;
3024
3025 if (session->gl_driver) return GF_OK;
3026
3027
3028 session->gl_driver = (GF_VideoOutput *) gf_module_load(GF_VIDEO_OUTPUT_INTERFACE, gf_opts_get_key("core", "video-output") );
3029
3030 if (!session->gl_driver) {
3031 GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to load a video output for OpenGL context support !\n"));
3032 return GF_IO_ERR;
3033 }
3034 if (!gf_opts_get_key("core", "video-output")) {
3035 gf_opts_set_key("core", "video-output", session->gl_driver->module_name);
3036 }
3037 session->gl_driver->on_event = fsess_on_event;
3038 session->gl_driver->evt_cbk_hdl = session;
3039
3040 os_disp_handler = NULL;
3041 sOpt = gf_opts_get_key("Temp", "OSDisp");
3042 if (sOpt) sscanf(sOpt, "%p", &os_disp_handler);
3043
3044 e = session->gl_driver->Setup(session->gl_driver, NULL, os_disp_handler, GF_TERM_INIT_HIDE);
3045 if (e!=GF_OK) {
3046 GF_LOG(GF_LOG_WARNING, GF_LOG_FILTER, ("Failed to setup Video Driver %s!\n", session->gl_driver->module_name));
3047 gf_modules_close_interface((GF_BaseInterface *)session->gl_driver);
3048 session->gl_driver = NULL;
3049 return e;
3050 }
3051
3052 //and initialize GL context
3053 memset(&evt, 0, sizeof(GF_Event));
3054 evt.type = GF_EVENT_VIDEO_SETUP;
3055 evt.setup.width = 128;
3056 evt.setup.height = 128;
3057 evt.setup.use_opengl = GF_TRUE;
3058 evt.setup.back_buffer = 1;
3059 //we anyway should'nt call swapBuffer/flush on this object
3060 evt.setup.disable_vsync = GF_TRUE;
3061 session->gl_driver->ProcessEvent(session->gl_driver, &evt);
3062
3063 if (evt.setup.use_opengl) {
3064 gf_opengl_init();
3065 }
3066 return GF_OK;
3067 }
3068
gf_fs_set_gl(GF_FilterSession * session)3069 GF_Err gf_fs_set_gl(GF_FilterSession *session)
3070 {
3071 GF_Event evt;
3072 if (!session->gl_driver) return GF_BAD_PARAM;
3073 memset(&evt, 0, sizeof(GF_Event));
3074 evt.type = GF_EVENT_SET_GL;
3075 return session->gl_driver->ProcessEvent(session->gl_driver, &evt);
3076 }
3077 #endif
3078
3079
3080 #ifdef FILTER_FIXME
3081
3082
term_find_res(GF_TermLocales * loc,char * parent,char * path,char * relocated_path,char * localized_rel_path)3083 static Bool term_find_res(GF_TermLocales *loc, char *parent, char *path, char *relocated_path, char *localized_rel_path)
3084 {
3085 FILE *f;
3086
3087 if (loc->szAbsRelocatedPath) gf_free(loc->szAbsRelocatedPath);
3088 loc->szAbsRelocatedPath = gf_url_concatenate(parent, path);
3089 if (!loc->szAbsRelocatedPath) loc->szAbsRelocatedPath = gf_strdup(path);
3090
3091 f = gf_fopen(loc->szAbsRelocatedPath, "rb");
3092 if (f) {
3093 gf_fclose(f);
3094 strcpy(localized_rel_path, path);
3095 strcpy(relocated_path, loc->szAbsRelocatedPath);
3096 return 1;
3097 }
3098 return 0;
3099 }
3100
3101 /* Checks if, for a given relative path, there exists a localized version in an given folder
3102 if this is the case, it returns the absolute localized path, otherwise it returns null.
3103 if the resource was localized, the last parameter is set to the localized relative path.
3104 */
term_check_locales(void * __self,const char * locales_parent_path,const char * rel_path,char * relocated_path,char * localized_rel_path)3105 static Bool term_check_locales(void *__self, const char *locales_parent_path, const char *rel_path, char *relocated_path, char *localized_rel_path)
3106 {
3107 char path[GF_MAX_PATH];
3108 const char *opt;
3109
3110 GF_TermLocales *loc = (GF_TermLocales*)__self;
3111
3112 /* Checks if the rel_path argument really contains a relative path (no ':', no '/' at the beginning) */
3113 if (strstr(rel_path, "://") || (rel_path[0]=='/') || strstr(rel_path, ":\\") || !strncmp(rel_path, "\\\\", 2)) {
3114 return 0;
3115 }
3116
3117 /*Checks if the absolute path is really absolute and points to a local file (no http or else) */
3118 if (!locales_parent_path ||
3119 (locales_parent_path && (locales_parent_path[0] != '/') && strstr(locales_parent_path, "://") && strnicmp(locales_parent_path, "file://", 7))) {
3120 return 0;
3121 }
3122 opt = gf_opts_get_key("core", "lang");
3123 if (opt && (!strcmp(opt, "*") || !strcmp(opt, "un") ) ) {
3124 opt = NULL;
3125 }
3126
3127 while (opt) {
3128 char lan[100];
3129 char *sep;
3130 char *sep_lang = strchr(opt, ';');
3131 if (sep_lang) sep_lang[0] = 0;
3132
3133 while (strchr(" \t", opt[0]))
3134 opt++;
3135
3136 strcpy(lan, opt);
3137
3138 if (sep_lang) {
3139 sep_lang[0] = ';';
3140 opt = sep_lang+1;
3141 } else {
3142 opt = NULL;
3143 }
3144
3145 while (1) {
3146 sep = strstr(lan, "-*");
3147 if (!sep) break;
3148 strncpy(sep, sep+2, strlen(sep)-2);
3149 }
3150
3151 sprintf(path, "locales/%s/%s", lan, rel_path);
3152 if (term_find_res(loc, (char *) locales_parent_path, (char *) path, relocated_path, localized_rel_path))
3153 return 1;
3154
3155 /*recursively remove region (sub)tags*/
3156 while (1) {
3157 sep = strrchr(lan, '-');
3158 if (!sep) break;
3159 sep[0] = 0;
3160 sprintf(path, "locales/%s/%s", lan, rel_path);
3161 if (term_find_res(loc, (char *) locales_parent_path, (char *) path, relocated_path, localized_rel_path))
3162 return 1;
3163 }
3164 }
3165
3166 if (term_find_res(loc, (char *) locales_parent_path, (char *) rel_path, relocated_path, localized_rel_path))
3167 return 1;
3168 /* if we did not find the localized file, both the relocated and localized strings are NULL */
3169 strcpy(localized_rel_path, "");
3170 strcpy(relocated_path, "");
3171 return 0;
3172 }
3173
3174 #endif
3175
3176