1 /*
2  *			GPAC - Multimedia Framework C SDK
3  *
4  *			Authors: Jean Le Feuvre
5  *			Copyright (c) Telecom ParisTech 2017-2020
6  *					All rights reserved
7  *
8  *  This file is part of GPAC / filters sub-project
9  *
10  *  GPAC is free software; you can redistribute it and/or modify
11  *  it under the terfsess of the GNU Lesser General Public License as published by
12  *  the Free Software Foundation; either version 2, or (at your option)
13  *  any later version.
14  *
15  *  GPAC is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU Lesser General Public License for more details.
19  *
20  *  You should have received a copy of the GNU Lesser General Public
21  *  License along with this library; see the file COPYING.  If not, write to
22  *  the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  */
25 
26 #include "filter_session.h"
27 #include <gpac/network.h>
28 
29 #ifndef GPAC_DISABLE_3D
30 #include <gpac/modules/video_out.h>
31 #endif
32 //#define CHECK_TASK_LIST_INTEGRITY
33 
34 struct _gf_ft_mgr *gf_font_manager_new();
35 void gf_font_manager_del(struct _gf_ft_mgr *fm);
36 
37 
gf_fs_sema_io(GF_FilterSession * fsess,Bool notify,Bool main)38 static GFINLINE void gf_fs_sema_io(GF_FilterSession *fsess, Bool notify, Bool main)
39 {
40 	GF_Semaphore *sem = main ? fsess->semaphore_main : fsess->semaphore_other;
41 	if (sem) {
42 		if (notify) {
43 			GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler %s semaphore\n", gf_th_id(), main ? "main" : "secondary"));
44 			if ( ! gf_sema_notify(sem, 1)) {
45 				GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("Cannot notify scheduler of new task, semaphore failure\n"));
46 			}
47 		} else {
48 			u32 nb_tasks;
49 			//if not main and no tasks in main list, this could be the last task to process.
50 			//if main thread is sleeping force a wake to take further actions (only the main thread decides the exit)
51 			//this also ensures that tha main thread will process tasks from secondary task lists if no
52 			//dedicated main thread tasks are present (eg no GL filters)
53 			if (!main && fsess->in_main_sem_wait && !gf_fq_count(fsess->main_thread_tasks)) {
54 				gf_fs_sema_io(fsess, GF_TRUE, GF_TRUE);
55 			}
56 			nb_tasks = 1;
57 			//no active threads, count number of tasks. If no posted tasks we are likely at the end of the session, don't block, rather use a sem_wait
58 			if (!fsess->active_threads)
59 			 	nb_tasks = gf_fq_count(fsess->main_thread_tasks) + gf_fq_count(fsess->tasks);
60 
61 			//if main semaphore, keep track that we are going to sleep
62 			if (main) {
63 				fsess->in_main_sem_wait = GF_TRUE;
64 				if (!nb_tasks) {
65 					GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("No tasks scheduled, waiting on main semaphore for at most 100 ms\n"));
66 					if (gf_sema_wait_for(sem, 100)) {
67 					}
68 				} else {
69 					if (gf_sema_wait(sem)) {
70 					}
71 				}
72 				fsess->in_main_sem_wait = GF_FALSE;
73 			} else {
74 				if (!nb_tasks) {
75 					GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("No tasks scheduled, waiting on secondary semaphore for at most 100 ms\n"));
76 					if (gf_sema_wait_for(sem, 100)) {
77 					}
78 				} else {
79 					if (gf_sema_wait(sem)) {
80 					}
81 				}
82 			}
83 		}
84 	}
85 }
86 
gf_fs_add_filter_register(GF_FilterSession * fsess,const GF_FilterRegister * freg)87 void gf_fs_add_filter_register(GF_FilterSession *fsess, const GF_FilterRegister *freg)
88 {
89 	if (!freg) return;
90 
91 	if (!freg->name) {
92 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Filter missing name - ignoring\n"));
93 		return;
94 	}
95 	if (!freg->process) {
96 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Filter %s missing process function - ignoring\n", freg->name));
97 		return;
98 	}
99 	if (fsess->blacklist) {
100 		char *fname = strstr(fsess->blacklist, freg->name);
101 		if (fname) {
102 			u32 len = (u32) strlen(freg->name);
103 			if (!fname[len] || (fname[len] == fsess->sep_list)) {
104 				return;
105 			}
106 		}
107 	}
108 	gf_list_add(fsess->registry, (void *) freg);
109 
110 	if (fsess->init_done && fsess->links && gf_list_count( fsess->links)) {
111 		gf_filter_sess_build_graph(fsess, freg);
112 	}
113 }
114 
115 
116 
fs_default_event_proc(void * ptr,GF_Event * evt)117 static Bool fs_default_event_proc(void *ptr, GF_Event *evt)
118 {
119 	if (evt->type==GF_EVENT_QUIT) {
120 		GF_FilterSession *fsess = (GF_FilterSession *)ptr;
121 		gf_fs_abort(fsess, GF_FALSE);
122 	}
123 	return 0;
124 }
125 
126 GF_EXPORT
gf_fs_new(s32 nb_threads,GF_FilterSchedulerType sched_type,u32 flags,const char * blacklist)127 GF_FilterSession *gf_fs_new(s32 nb_threads, GF_FilterSchedulerType sched_type, u32 flags, const char *blacklist)
128 {
129 	const char *opt;
130 	Bool gf_sys_has_filter_global_args();
131 	Bool gf_sys_has_filter_global_meta_args();
132 
133 	u32 i, count;
134 	GF_FilterSession *fsess, *a_sess;
135 
136 	//safety check: all built-in properties shall have unique 4CCs
137 	if ( ! gf_props_4cc_check_props())
138 		return NULL;
139 
140 	GF_SAFEALLOC(fsess, GF_FilterSession);
141 	if (!fsess) {
142 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to alloc media session\n"));
143 		return NULL;
144 	}
145 	fsess->flags = flags;
146 
147 	fsess->filters = gf_list_new();
148 	fsess->main_th.fsess = fsess;
149 
150 	if ((s32) nb_threads == -1) {
151 		GF_SystemRTInfo rti;
152 		memset(&rti, 0, sizeof(GF_SystemRTInfo));
153 		if (gf_sys_get_rti(0, &rti, 0)) {
154 			nb_threads = rti.nb_cores-1;
155 		}
156 		if ((s32)nb_threads<0) {
157 			GF_LOG(GF_LOG_WARNING, GF_LOG_FILTER, ("Failed to query number of cores, disabling extra threads for session\n"));
158 			nb_threads=0;
159 		}
160 	}
161 
162 	if (sched_type==GF_FS_SCHEDULER_DIRECT) {
163 		fsess->direct_mode = GF_TRUE;
164 		nb_threads=0;
165 	}
166 	if (nb_threads && (sched_type != GF_FS_SCHEDULER_LOCK_FREE_X)) {
167 		fsess->tasks_mx = gf_mx_new("TasksList");
168 	}
169 
170 	//regardless of scheduler type, we don't use lock on the main task list
171 	fsess->tasks = gf_fq_new(fsess->tasks_mx);
172 
173 	if (nb_threads>0) {
174 		fsess->main_thread_tasks = gf_fq_new(fsess->tasks_mx);
175 		fsess->filters_mx = gf_mx_new("Filters");
176 	} else {
177 		//otherwise use the same as the global task list
178 		fsess->main_thread_tasks = fsess->tasks;
179 	}
180 
181 	if (!(flags & GF_FS_FLAG_NO_RESERVOIR)) {
182 		fsess->tasks_reservoir = gf_fq_new(fsess->tasks_mx);
183 	}
184 
185 	if (nb_threads || (sched_type==GF_FS_SCHEDULER_LOCK_FORCE) ) {
186 		fsess->semaphore_main = fsess->semaphore_other = gf_sema_new(GF_INT_MAX, 0);
187 		if (nb_threads>0)
188 			fsess->semaphore_other = gf_sema_new(GF_INT_MAX, 0);
189 
190 		//force testing of mutex queues
191 		//fsess->use_locks = GF_TRUE;
192 	}
193 	fsess->ui_event_proc = fs_default_event_proc;
194 	fsess->ui_opaque = fsess;
195 
196 	if (flags & GF_FS_FLAG_NO_MAIN_THREAD)
197 		fsess->no_main_thread = GF_TRUE;
198 
199 	if (!fsess->semaphore_main)
200 		nb_threads=0;
201 
202 	if (nb_threads) {
203 		fsess->threads = gf_list_new();
204 		if (!fsess->threads) {
205 			gf_sema_del(fsess->semaphore_main);
206 			fsess->semaphore_main=NULL;
207 			gf_sema_del(fsess->semaphore_other);
208 			fsess->semaphore_other=NULL;
209 			nb_threads=0;
210 		}
211 		fsess->use_locks = (sched_type==GF_FS_SCHEDULER_LOCK) ? GF_TRUE : GF_FALSE;
212 	} else {
213 #ifdef GPAC_MEMORY_TRACKING
214 		extern int gf_mem_track_enabled;
215 		fsess->check_allocs = gf_mem_track_enabled;
216 #endif
217 
218 	}
219 
220 	if (fsess->use_locks)
221 		fsess->props_mx = gf_mx_new("FilterSessionProps");
222 
223 	if (!(flags & GF_FS_FLAG_NO_RESERVOIR)) {
224 #if GF_PROPS_HASHTABLE_SIZE
225 		fsess->prop_maps_list_reservoir = gf_fq_new(fsess->props_mx);
226 #endif
227 		fsess->prop_maps_reservoir = gf_fq_new(fsess->props_mx);
228 		fsess->prop_maps_entry_reservoir = gf_fq_new(fsess->props_mx);
229 		fsess->prop_maps_entry_data_alloc_reservoir = gf_fq_new(fsess->props_mx);
230 		//we also use the props mutex for the this one
231 		fsess->pcks_refprops_reservoir = gf_fq_new(fsess->props_mx);
232 	}
233 
234 
235 #ifndef GPAC_DISABLE_REMOTERY
236 	sprintf(fsess->main_th.rmt_name, "FSThread0");
237 #endif
238 
239 	if (!fsess->filters || !fsess->tasks) {
240 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to alloc media session\n"));
241 		fsess->run_status = GF_OUT_OF_MEM;
242 		gf_fs_del(fsess);
243 		return NULL;
244 	}
245 
246 	if (nb_threads) {
247 		fsess->info_mx = gf_mx_new("FilterSessionInfo");
248 		fsess->ui_mx = gf_mx_new("FilterSessionUIProc");
249 	}
250 
251 	for (i=0; i<(u32) nb_threads; i++) {
252 		GF_SessionThread *sess_thread;
253 		GF_SAFEALLOC(sess_thread, GF_SessionThread);
254 		if (!sess_thread) continue;
255 #ifndef GPAC_DISABLE_REMOTERY
256 		sprintf(sess_thread->rmt_name, "FSThread%d", i+1);
257 #endif
258 		sess_thread->th = gf_th_new("MediaSessionThread");
259 		if (!sess_thread->th) {
260 			gf_free(sess_thread);
261 			continue;
262 		}
263 		sess_thread->fsess = fsess;
264 		gf_list_add(fsess->threads, sess_thread);
265 	}
266 
267 	gf_fs_set_separators(fsess, NULL);
268 
269 	fsess->registry = gf_list_new();
270 	fsess->blacklist = blacklist;
271 	a_sess = (flags & GF_FS_FLAG_LOAD_META) ? fsess : NULL;
272 	gf_fs_reg_all(fsess, a_sess);
273 
274 	//load external modules
275 	count = gf_modules_count();
276 	for (i=0; i<count; i++) {
277 		GF_FilterRegister *freg = (GF_FilterRegister *) gf_modules_load_filter(i, a_sess);
278 		if (freg) {
279 			freg->flags |= 0x80000000;
280 			gf_fs_add_filter_register(fsess, freg);
281 		}
282 	}
283 	fsess->blacklist = NULL;
284 
285 	//todo - find a way to handle events without mutex ...
286 	fsess->evt_mx = gf_mx_new("Event mutex");
287 
288 	fsess->blocking_mode = GF_FS_BLOCK_ALL;
289 	opt = gf_opts_get_key("core", "no-block");
290 	if (opt) {
291 		if (!strcmp(opt, "fanout")) {
292 			fsess->blocking_mode = GF_FS_NOBLOCK_FANOUT;
293 		}
294 		else if (!strcmp(opt, "all")) {
295 			fsess->blocking_mode = GF_FS_NOBLOCK;
296 		}
297 	}
298 	fsess->run_status = GF_EOS;
299 	fsess->nb_threads_stopped = 1+nb_threads;
300 	fsess->default_pid_buffer_max_us = 1000;
301 	fsess->decoder_pid_buffer_max_us = 1000000;
302 	fsess->default_pid_buffer_max_units = 1;
303 	fsess->max_resolve_chain_len = 6;
304 	fsess->auto_inc_nums = gf_list_new();
305 
306 	if (nb_threads)
307 		fsess->links_mx = gf_mx_new("FilterRegistryGraph");
308 	fsess->links = gf_list_new();
309 
310 #ifndef GPAC_DISABLE_3D
311 	fsess->gl_providers = gf_list_new();
312 #endif
313 
314 	if (! (fsess->flags & GF_FS_FLAG_NO_GRAPH_CACHE))
315 		gf_filter_sess_build_graph(fsess, NULL);
316 
317 	fsess->init_done = GF_TRUE;
318 
319 
320 	if (gf_sys_has_filter_global_args() || gf_sys_has_filter_global_meta_args()) {
321 		u32 nb_args = gf_sys_get_argc();
322 		for (i=0; i<nb_args; i++) {
323 			char *arg = (char *)gf_sys_get_arg(i);
324 			if (arg[0]!='-') continue;
325 			if ((arg[1]!='-') && (arg[1]!='+')) continue;
326 			char *sep = strchr(arg, '=');
327 			if (sep) sep[0] = 0;
328 			gf_fs_push_arg(fsess, arg+2, GF_FALSE, (arg[1]!='-') ? 2 : 1);
329 			if (sep) sep[0] = '=';
330 		}
331 	}
332 
333 	return fsess;
334 }
335 
gf_fs_push_arg(GF_FilterSession * session,const char * szArg,Bool was_found,u32 type)336 void gf_fs_push_arg(GF_FilterSession *session, const char *szArg, Bool was_found, u32 type)
337 {
338 	if (session->flags & GF_FS_FLAG_NO_ARG_CHECK)
339 		return;
340 
341 	if (!was_found) {
342 		Bool afound = GF_FALSE;
343 		u32 k, acount;
344 		if (!session->parsed_args) session->parsed_args = gf_list_new();
345 		acount = gf_list_count(session->parsed_args);
346 		for (k=0; k<acount; k++) {
347 			GF_FSArgItem *ai = gf_list_get(session->parsed_args, k);
348 			if (!strcmp(ai->argname, szArg)) {
349 				afound = GF_TRUE;
350 				if ((ai->type==2) && (type==2))
351 					ai->found = GF_FALSE;
352 				break;
353 			}
354 		}
355 		if (!afound) {
356 			GF_FSArgItem *ai;
357 			GF_SAFEALLOC(ai, GF_FSArgItem);
358 			if (ai) {
359 				ai->argname = gf_strdup(szArg);
360 				ai->type = type;
361 				gf_list_add(session->parsed_args, ai );
362 			}
363 		}
364 	} else {
365 		u32 k, acount;
366 		Bool found = GF_FALSE;
367 		if (!session->parsed_args) session->parsed_args = gf_list_new();
368 		acount = gf_list_count(session->parsed_args);
369 		for (k=0; k<acount; k++) {
370 			GF_FSArgItem *ai = gf_list_get(session->parsed_args, k);
371 			if (!strcmp(ai->argname, szArg)) {
372 				ai->found = GF_TRUE;
373 				found = GF_TRUE;
374 				break;
375 			}
376 		}
377 		if (!found) {
378 			GF_FSArgItem *ai;
379 			GF_SAFEALLOC(ai, GF_FSArgItem);
380 			if (ai) {
381 				ai->argname = gf_strdup(szArg);
382 				ai->type = type;
383 				ai->found = GF_TRUE;
384 				gf_list_add(session->parsed_args, ai );
385 			}
386 		}
387 	}
388 }
389 
390 
391 GF_EXPORT
gf_fs_new_defaults(u32 inflags)392 GF_FilterSession *gf_fs_new_defaults(u32 inflags)
393 {
394 	GF_FilterSession *fsess;
395 	GF_FilterSchedulerType sched_type = GF_FS_SCHEDULER_LOCK_FREE;
396 	u32 flags = 0;
397 	s32 nb_threads = gf_opts_get_int("core", "threads");
398 	const char *blacklist = gf_opts_get_key("core", "blacklist");
399 	const char *opt = gf_opts_get_key("core", "sched");
400 
401 	if (!opt) sched_type = GF_FS_SCHEDULER_LOCK_FREE;
402 	else if (!strcmp(opt, "lock")) sched_type = GF_FS_SCHEDULER_LOCK;
403 	else if (!strcmp(opt, "flock")) sched_type = GF_FS_SCHEDULER_LOCK_FORCE;
404 	else if (!strcmp(opt, "direct")) sched_type = GF_FS_SCHEDULER_DIRECT;
405 	else if (!strcmp(opt, "free")) sched_type = GF_FS_SCHEDULER_LOCK_FREE;
406 	else if (!strcmp(opt, "freex")) sched_type = GF_FS_SCHEDULER_LOCK_FREE_X;
407 	else {
408 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Unrecognized scheduler type %s\n", opt));
409 		return NULL;
410 	}
411 	if (inflags & GF_FS_FLAG_LOAD_META)
412 		flags |= GF_FS_FLAG_LOAD_META;
413 
414 	if (inflags & GF_FS_FLAG_NO_MAIN_THREAD)
415 		flags |= GF_FS_FLAG_NO_MAIN_THREAD;
416 
417 	if (inflags & GF_FS_FLAG_NO_GRAPH_CACHE)
418 		flags |= GF_FS_FLAG_NO_GRAPH_CACHE;
419 
420 	if (gf_opts_get_bool("core", "dbg-edges"))
421 		flags |= GF_FS_FLAG_PRINT_CONNECTIONS;
422 
423 	if (gf_opts_get_bool("core", "full-link"))
424 		flags |= GF_FS_FLAG_FULL_LINK;
425 
426 	if (gf_opts_get_bool("core", "no-reg"))
427 		flags |= GF_FS_FLAG_NO_REGULATION;
428 
429 	if (gf_opts_get_bool("core", "no-reassign"))
430 		flags |= GF_FS_FLAG_NO_REASSIGN;
431 
432 	if (gf_opts_get_bool("core", "no-graph-cache"))
433 		flags |= GF_FS_FLAG_NO_GRAPH_CACHE;
434 
435 	if (gf_opts_get_bool("core", "no-probe"))
436 		flags |= GF_FS_FLAG_NO_PROBE;
437 
438 	if (gf_opts_get_bool("core", "no-argchk"))
439 		flags |= GF_FS_FLAG_NO_ARG_CHECK;
440 
441 	if (gf_opts_get_bool("core", "no-reservoir"))
442 		flags |= GF_FS_FLAG_NO_RESERVOIR;
443 
444 
445 	fsess = gf_fs_new(nb_threads, sched_type, flags, blacklist);
446 	if (!fsess) return NULL;
447 
448 	gf_fs_set_max_resolution_chain_length(fsess, gf_opts_get_int("core", "max-chain") );
449 
450 	gf_fs_set_max_sleep_time(fsess, gf_opts_get_int("core", "max-sleep") );
451 
452 	opt = gf_opts_get_key("core", "seps");
453 	if (opt)
454 		gf_fs_set_separators(fsess, opt);
455 
456 	return fsess;
457 }
458 
459 
460 GF_EXPORT
gf_fs_set_separators(GF_FilterSession * session,const char * separator_set)461 GF_Err gf_fs_set_separators(GF_FilterSession *session, const char *separator_set)
462 {
463 	if (!session) return GF_BAD_PARAM;
464 	if (separator_set && (strlen(separator_set)<5)) return GF_BAD_PARAM;
465 
466 	if (separator_set) {
467 		session->sep_args = separator_set[0];
468 		session->sep_name = separator_set[1];
469 		session->sep_frag = separator_set[2];
470 		session->sep_list = separator_set[3];
471 		session->sep_neg = separator_set[4];
472 	} else {
473 		session->sep_args = ':';
474 		session->sep_name = '=';
475 		session->sep_frag = '#';
476 		session->sep_list = ',';
477 		session->sep_neg = '!';
478 	}
479 	return GF_OK;
480 }
481 
482 GF_EXPORT
gf_fs_set_max_resolution_chain_length(GF_FilterSession * session,u32 max_chain_length)483 GF_Err gf_fs_set_max_resolution_chain_length(GF_FilterSession *session, u32 max_chain_length)
484 {
485 	if (!session) return GF_BAD_PARAM;
486 	session->max_resolve_chain_len = max_chain_length;
487 	return GF_OK;
488 }
489 
490 GF_EXPORT
gf_fs_set_max_sleep_time(GF_FilterSession * session,u32 max_sleep)491 GF_Err gf_fs_set_max_sleep_time(GF_FilterSession *session, u32 max_sleep)
492 {
493 	if (!session) return GF_BAD_PARAM;
494 	session->max_sleep = max_sleep;
495 	return GF_OK;
496 }
497 
498 GF_EXPORT
gf_fs_get_max_resolution_chain_length(GF_FilterSession * session)499 u32 gf_fs_get_max_resolution_chain_length(GF_FilterSession *session)
500 {
501 	if (!session) return 0;
502 	return session->max_resolve_chain_len;
503 }
504 
gf_fs_remove_filter_register(GF_FilterSession * session,GF_FilterRegister * freg)505 void gf_fs_remove_filter_register(GF_FilterSession *session, GF_FilterRegister *freg)
506 {
507 	gf_list_del_item(session->registry, freg);
508 	gf_filter_sess_reset_graph(session, freg);
509 }
510 
511 GF_EXPORT
gf_fs_set_ui_callback(GF_FilterSession * fs,Bool (* ui_event_proc)(void * opaque,GF_Event * event),void * cbk_udta)512 void gf_fs_set_ui_callback(GF_FilterSession *fs, Bool (*ui_event_proc)(void *opaque, GF_Event *event), void *cbk_udta)
513 {
514 	if (fs) {
515 		fs->ui_event_proc = ui_event_proc;
516 		fs->ui_opaque = cbk_udta;
517 		if (!fs->ui_event_proc) {
518 			fs->ui_event_proc = fs_default_event_proc;
519 			fs->ui_opaque = fs;
520 		}
521 	}
522 }
523 
gf_propalloc_del(void * it)524 void gf_propalloc_del(void *it)
525 {
526 	GF_PropertyEntry *pe = (GF_PropertyEntry *)it;
527 	if (pe->prop.value.data.ptr) gf_free(pe->prop.value.data.ptr);
528 	gf_free(pe);
529 }
530 
531 
532 GF_EXPORT
gf_fs_enum_unmapped_options(GF_FilterSession * fsess,u32 * idx,char ** argname,u32 * argtype)533 Bool gf_fs_enum_unmapped_options(GF_FilterSession *fsess, u32 *idx, char **argname, u32 *argtype)
534 {
535 	if (!fsess || !fsess->parsed_args) return GF_FALSE;
536 	u32 i, count = gf_list_count(fsess->parsed_args);
537 
538 	for (i=*idx; i<count; i++) {
539 		GF_FSArgItem *ai = gf_list_get(fsess->parsed_args, i);
540 		(*idx)++;
541 		if (ai->found) continue;
542 		if (argname) *argname = ai->argname;
543 		if (argtype) *argtype = ai->type;
544 		return GF_TRUE;
545 	}
546 	return GF_FALSE;
547 }
548 
549 
550 GF_EXPORT
gf_fs_del(GF_FilterSession * fsess)551 void gf_fs_del(GF_FilterSession *fsess)
552 {
553 	assert(fsess);
554 
555 	gf_fs_stop(fsess);
556 	GF_LOG(GF_LOG_DEBUG, GF_LOG_FILTER, ("Session destroy begin\n"));
557 
558 	if (fsess->parsed_args) {
559 		while (gf_list_count(fsess->parsed_args)) {
560 			GF_FSArgItem *ai = gf_list_pop_back(fsess->parsed_args);
561 			gf_free(ai->argname);
562 			gf_free(ai);
563 		}
564 		gf_list_del(fsess->parsed_args);
565 	}
566 
567 	//temporary until we don't introduce fsess_stop
568 	assert(fsess->run_status != GF_OK);
569 	if (fsess->filters) {
570 		u32 i, count=gf_list_count(fsess->filters);
571 		//first pass: disconnect all filters, since some may have references to property maps or packets
572 		for (i=0; i<count; i++) {
573 			u32 j;
574 			GF_Filter *filter = gf_list_get(fsess->filters, i);
575 			filter->process_th_id = 0;
576 			filter->scheduled_for_next_task = GF_TRUE;
577 
578 			if (filter->detached_pid_inst) {
579 				while (gf_list_count(filter->detached_pid_inst)) {
580 					GF_FilterPidInst *pidi = gf_list_pop_front(filter->detached_pid_inst);
581 					gf_filter_pid_inst_del(pidi);
582 				}
583 				gf_list_del(filter->detached_pid_inst);
584 				filter->detached_pid_inst = NULL;
585 			}
586 
587 			if (filter->postponed_packets) {
588 				while (gf_list_count(filter->postponed_packets)) {
589 					GF_FilterPacket *pck = gf_list_pop_front(filter->postponed_packets);
590 					gf_filter_packet_destroy(pck);
591 				}
592 				gf_list_del(filter->postponed_packets);
593 				filter->postponed_packets = NULL;
594 			}
595 			for (j=0; j<filter->num_input_pids; j++) {
596 				GF_FilterPidInst *pidi = gf_list_get(filter->input_pids, j);
597 				gf_filter_pid_inst_reset(pidi);
598 			}
599 			filter->scheduled_for_next_task = GF_FALSE;
600 		}
601 		//second pass, finalize all
602 		for (i=0; i<count; i++) {
603 			GF_Filter *filter = gf_list_get(fsess->filters, i);
604 			if (filter->freg->finalize && !filter->finalized) {
605 				filter->finalized = GF_TRUE;
606 				FSESS_CHECK_THREAD(filter)
607 				filter->freg->finalize(filter);
608 			}
609 		}
610 
611 		while (gf_list_count(fsess->filters)) {
612 			GF_Filter *filter = gf_list_pop_back(fsess->filters);
613 
614 			gf_filter_del(filter);
615 		}
616 		gf_list_del(fsess->filters);
617 		fsess->filters = NULL;
618 	}
619 
620 	if (fsess->download_manager) gf_dm_del(fsess->download_manager);
621 	if (fsess->font_manager) gf_font_manager_del(fsess->font_manager);
622 
623 	if (fsess->registry) {
624 		while (gf_list_count(fsess->registry)) {
625 			GF_FilterRegister *freg = gf_list_pop_back(fsess->registry);
626 			if (freg->register_free) freg->register_free(fsess, freg);
627 		}
628 		gf_list_del(fsess->registry);
629 	}
630 
631 	if (fsess->tasks)
632 		gf_fq_del(fsess->tasks, gf_void_del);
633 
634 	if (fsess->tasks_reservoir)
635 		gf_fq_del(fsess->tasks_reservoir, gf_void_del);
636 
637 	if (fsess->threads) {
638 		if (fsess->main_thread_tasks)
639 			gf_fq_del(fsess->main_thread_tasks, gf_void_del);
640 
641 		while (gf_list_count(fsess->threads)) {
642 			GF_SessionThread *sess_th = gf_list_pop_back(fsess->threads);
643 			gf_th_del(sess_th->th);
644 			gf_free(sess_th);
645 		}
646 		gf_list_del(fsess->threads);
647 	}
648 
649 	if (fsess->prop_maps_reservoir)
650 		gf_fq_del(fsess->prop_maps_reservoir, gf_propmap_del);
651 #if GF_PROPS_HASHTABLE_SIZE
652 	if (fsess->prop_maps_list_reservoir)
653 		gf_fq_del(fsess->prop_maps_list_reservoir, (gf_destruct_fun) gf_list_del);
654 #endif
655 	if (fsess->prop_maps_entry_reservoir)
656 		gf_fq_del(fsess->prop_maps_entry_reservoir, gf_void_del);
657 	if (fsess->prop_maps_entry_data_alloc_reservoir)
658 		gf_fq_del(fsess->prop_maps_entry_data_alloc_reservoir, gf_propalloc_del);
659 	if (fsess->pcks_refprops_reservoir)
660 		gf_fq_del(fsess->pcks_refprops_reservoir, gf_void_del);
661 
662 
663 	if (fsess->props_mx)
664 		gf_mx_del(fsess->props_mx);
665 
666 	if (fsess->info_mx)
667 		gf_mx_del(fsess->info_mx);
668 
669 	if (fsess->ui_mx)
670 		gf_mx_del(fsess->ui_mx);
671 
672 	if (fsess->semaphore_other && (fsess->semaphore_other != fsess->semaphore_main) )
673 		gf_sema_del(fsess->semaphore_other);
674 
675 	if (fsess->semaphore_main)
676 		gf_sema_del(fsess->semaphore_main);
677 
678 	if (fsess->tasks_mx)
679 		gf_mx_del(fsess->tasks_mx);
680 
681 	if (fsess->filters_mx)
682 		gf_mx_del(fsess->filters_mx);
683 
684 	if (fsess->evt_mx) gf_mx_del(fsess->evt_mx);
685 	if (fsess->event_listeners) gf_list_del(fsess->event_listeners);
686 
687 	if (fsess->links) {
688 		gf_filter_sess_reset_graph(fsess, NULL);
689 		gf_list_del(fsess->links);
690 	}
691 	if (fsess->links_mx) gf_mx_del(fsess->links_mx);
692 
693 #ifndef GPAC_DISABLE_3D
694 	gf_list_del(fsess->gl_providers);
695 	if (fsess->gl_driver) {
696 		fsess->gl_driver->Shutdown(fsess->gl_driver);
697 		gf_modules_close_interface((GF_BaseInterface *)fsess->gl_driver);
698 	}
699 #endif
700 
701 	if (fsess->auto_inc_nums) {
702 		while(gf_list_count(fsess->auto_inc_nums)) {
703 			GF_FSAutoIncNum *aint = gf_list_pop_back(fsess->auto_inc_nums);
704 			gf_free(aint);
705 		}
706 		gf_list_del(fsess->auto_inc_nums);
707 	}
708 
709 	gf_free(fsess);
710 	GF_LOG(GF_LOG_DEBUG, GF_LOG_FILTER, ("Session destroyed\n"));
711 }
712 
713 GF_EXPORT
gf_fs_filters_registers_count(GF_FilterSession * fsess)714 u32 gf_fs_filters_registers_count(GF_FilterSession *fsess)
715 {
716 	return fsess ? gf_list_count(fsess->registry) : 0;
717 }
718 
719 GF_EXPORT
gf_fs_get_filter_register(GF_FilterSession * fsess,u32 idx)720 const GF_FilterRegister * gf_fs_get_filter_register(GF_FilterSession *fsess, u32 idx)
721 {
722 	return gf_list_get(fsess->registry, idx);
723 }
724 
725 #ifdef CHECK_TASK_LIST_INTEGRITY
check_task_list_enum(void * udta,void * item)726 static Bool check_task_list_enum(void *udta, void *item)
727 {
728 	assert(udta != item);
729 	return GF_FALSE;
730 }
check_task_list(GF_FilterQueue * fq,GF_FSTask * task)731 static void check_task_list(GF_FilterQueue *fq, GF_FSTask *task)
732 {
733 	if (fq) {
734 		gf_fq_enum(fq, check_task_list_enum, task);
735 	}
736 }
737 #endif
738 
739 
gf_fs_post_task_ex(GF_FilterSession * fsess,gf_fs_task_callback task_fun,GF_Filter * filter,GF_FilterPid * pid,const char * log_name,void * udta,Bool is_configure,Bool force_direct_call)740 void gf_fs_post_task_ex(GF_FilterSession *fsess, gf_fs_task_callback task_fun, GF_Filter *filter, GF_FilterPid *pid, const char *log_name, void *udta, Bool is_configure, Bool force_direct_call)
741 {
742 	GF_FSTask *task;
743 	Bool force_main_thread = GF_FALSE;
744 	Bool notified = GF_FALSE;
745 
746 	assert(fsess);
747 	assert(task_fun);
748 
749 	//only flatten calls if in main thread (we still have some broken filters using threading
750 	//that could trigger tasks
751 	if ((force_direct_call || fsess->direct_mode)
752 		&& (!filter || !filter->in_process)
753 		&& fsess->tasks_in_process
754 		&& (gf_th_id()==fsess->main_th.th_id)
755 	) {
756 		GF_FSTask atask;
757 		u64 task_time = gf_sys_clock_high_res();
758 		memset(&atask, 0, sizeof(GF_FSTask));
759 		atask.filter = filter;
760 		atask.pid = pid;
761 		atask.run_task = task_fun;
762 		atask.log_name = log_name;
763 		atask.udta = udta;
764 		GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread 0 task#%d %p executing Filter %s::%s (%d tasks pending)\n", fsess->main_th.nb_tasks, &atask, filter ? filter->name : "none", log_name, fsess->tasks_pending));
765 		if (filter)
766 			filter->scheduled_for_next_task = GF_TRUE;
767 		task_fun(&atask);
768 		filter = atask.filter;
769 		if (filter) {
770 			filter->time_process += gf_sys_clock_high_res() - task_time;
771 			filter->scheduled_for_next_task = GF_FALSE;
772 			filter->nb_tasks_done++;
773 		}
774 		if (!atask.requeue_request)
775 			return;
776 		//asked to requeue the task, post it
777 	}
778 	task = gf_fq_pop(fsess->tasks_reservoir);
779 
780 	if (!task) {
781 		GF_SAFEALLOC(task, GF_FSTask);
782 		if (!task) {
783 			GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("No more memory to post new task\n"));
784 			return;
785 		}
786 	}
787 	task->filter = filter;
788 	task->pid = pid;
789 	task->run_task = task_fun;
790 	task->log_name = log_name;
791 	task->udta = udta;
792 
793 	if (filter && is_configure) {
794 		if (filter->freg->flags & GF_FS_REG_CONFIGURE_MAIN_THREAD)
795 			force_main_thread = GF_TRUE;
796 	}
797 
798 	if (filter) {
799 		gf_mx_p(filter->tasks_mx);
800 		//no tasks and not scheduled
801 		if (! filter->scheduled_for_next_task && !gf_fq_count(filter->tasks)) {
802 			notified = task->notified = GF_TRUE;
803 
804 			if (!force_main_thread)
805 				force_main_thread = (filter->main_thread_forced || (filter->freg->flags & GF_FS_REG_MAIN_THREAD)) ? GF_TRUE : GF_FALSE;
806 		} else if (force_main_thread) {
807 			force_main_thread = GF_FALSE;
808 			if (filter->process_th_id && (fsess->main_th.th_id != filter->process_th_id)) {
809 				GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("Cannot post task to main thread, filter is already scheduled\n"));
810 			}
811 		}
812 		if (!force_main_thread)
813 			task->blocking = (filter->freg->flags & GF_FS_REG_BLOCKING) ? GF_TRUE : GF_FALSE;
814 
815 		gf_fq_add(filter->tasks, task);
816 		gf_mx_v(filter->tasks_mx);
817 
818 		GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u Posted task %p Filter %s::%s (%d (%d) pending, %d process tasks) on %s task list\n", gf_th_id(), task, filter->name, task->log_name, fsess->tasks_pending, gf_fq_count(filter->tasks), filter->process_task_queued, task->notified ? (force_main_thread ? "main" : "secondary") : "filter"));
819 	} else {
820 		task->notified = notified = GF_TRUE;
821 		GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u Posted filter-less task %s (%d pending) on secondary task list\n", gf_th_id(), task->log_name, fsess->tasks_pending));
822 	}
823 
824 	//WARNING, do not use task->notified since the task may have been posted to the filter task list and may already have been swapped
825 	//with a different value !
826 	if (notified) {
827 #ifdef CHECK_TASK_LIST_INTEGRITY
828 		check_task_list(fsess->main_thread_tasks, task);
829 		check_task_list(fsess->tasks, task);
830 		check_task_list(fsess->tasks_reservoir, task);
831 #endif
832 		assert(task->run_task);
833 		if (filter) {
834 			GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u posting filter task, scheduled_for_next_task %d\n", gf_th_id(), filter->scheduled_for_next_task));
835 			assert(!filter->scheduled_for_next_task);
836 		}
837 
838 		//notify/count tasks posted on the main task or regular task lists
839 		safe_int_inc(&fsess->tasks_pending);
840 		if (filter && force_main_thread) {
841 			gf_fq_add(fsess->main_thread_tasks, task);
842 			gf_fs_sema_io(fsess, GF_TRUE, GF_TRUE);
843 		} else {
844 			assert(task->run_task);
845 			gf_fq_add(fsess->tasks, task);
846 			gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
847 		}
848 	}
849 }
850 
gf_fs_post_task(GF_FilterSession * fsess,gf_fs_task_callback task_fun,GF_Filter * filter,GF_FilterPid * pid,const char * log_name,void * udta)851 void gf_fs_post_task(GF_FilterSession *fsess, gf_fs_task_callback task_fun, GF_Filter *filter, GF_FilterPid *pid, const char *log_name, void *udta)
852 {
853 	gf_fs_post_task_ex(fsess, task_fun, filter, pid, log_name, udta, GF_FALSE, GF_FALSE);
854 }
855 
856 GF_EXPORT
gf_fs_check_filter_register_cap(const GF_FilterRegister * f_reg,u32 incode,GF_PropertyValue * cap_input,u32 outcode,GF_PropertyValue * cap_output,Bool exact_match_only)857 Bool gf_fs_check_filter_register_cap(const GF_FilterRegister *f_reg, u32 incode, GF_PropertyValue *cap_input, u32 outcode, GF_PropertyValue *cap_output, Bool exact_match_only)
858 {
859 	u32 j;
860 	u32 has_raw_in = 0;
861 	u32 has_cid_match = 0;
862 	u32 exclude_cid_out = 0;
863 	u32 has_exclude_cid_out = 0;
864 	for (j=0; j<f_reg->nb_caps; j++) {
865 		const GF_FilterCapability *cap = &f_reg->caps[j];
866 		if (!(cap->flags & GF_CAPFLAG_IN_BUNDLE)) {
867 			//CID not excluded, raw in present and CID explicit match or not included in excluded set
868 			if (!exclude_cid_out && has_raw_in && (has_cid_match || (!exact_match_only && has_exclude_cid_out) ) ) {
869 				return GF_TRUE;
870 			}
871 
872 			if (has_raw_in != 2) has_raw_in = 0;
873 			if (has_cid_match != 2) has_cid_match = 0;
874 			if (exclude_cid_out != 2) exclude_cid_out = 0;
875 			if (has_exclude_cid_out != 2) has_exclude_cid_out = 0;
876 
877 			continue;
878 		}
879 
880 		if ( (cap->flags & GF_CAPFLAG_INPUT) && (cap->code == incode) ) {
881 			if (! (cap->flags & GF_CAPFLAG_EXCLUDED) && gf_props_equal(&cap->val, cap_input) ) {
882 				has_raw_in = (cap->flags & GF_CAPS_INPUT_STATIC) ? 2 : 1;
883 			}
884 		}
885 		if ( (cap->flags & GF_CAPFLAG_OUTPUT) && (cap->code == outcode) ) {
886 			if (! (cap->flags & GF_CAPFLAG_EXCLUDED)) {
887 				if (gf_props_equal(&cap->val, cap_output) ) {
888 					has_cid_match = (cap->flags & GF_CAPS_OUTPUT_STATIC) ? 2 : 1;
889 				}
890 			} else {
891 				if (gf_props_equal(&cap->val, cap_output) ) {
892 					exclude_cid_out = (cap->flags & GF_CAPS_OUTPUT_STATIC) ? 2 : 1;
893 				} else {
894 					has_exclude_cid_out = (cap->flags & GF_CAPS_OUTPUT_STATIC) ? 2 : 1;
895 				}
896 			}
897 		}
898 	}
899 	//CID not excluded, raw in present and CID explicit match or not included in excluded set
900 	if (!exclude_cid_out && has_raw_in && (has_cid_match || (!exact_match_only && has_exclude_cid_out) ) ) {
901 		return GF_TRUE;
902 	}
903 	return GF_FALSE;
904 }
gf_fs_load_encoder(GF_FilterSession * fsess,const char * args)905 static GF_Filter *gf_fs_load_encoder(GF_FilterSession *fsess, const char *args)
906 {
907 	GF_Err e;
908 	char szCodec[3];
909 	char *cid, *sep;
910 	const GF_FilterRegister *candidate = NULL;
911 	u32 codecid=0;
912 	GF_Filter *filter;
913 	u32 i, count;
914 	GF_PropertyValue cap_in, cap_out;
915 	szCodec[0] = 'c';
916 	szCodec[1] = fsess->sep_name;
917 	szCodec[2] = 0;
918 
919 	cid = args ? strstr(args, szCodec) : NULL;
920 	if (!cid) {
921 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Missing codec identifier in \"enc\" definition: %s\n", args ? args : "no arguments"));
922 		return NULL;
923 	}
924 	sep = strchr(cid, fsess->sep_args);
925 	if (sep) sep[0] = 0;
926 
927 	codecid = gf_codec_parse(cid+2);
928 	if (codecid==GF_CODECID_NONE) {
929 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Unrecognized codec identifier in \"enc\" definition: %s\n", cid));
930 		if (sep) sep[0] = fsess->sep_args;
931 		return NULL;
932 	}
933 	if (sep) sep[0] = fsess->sep_args;
934 
935 	cap_in.type = GF_PROP_UINT;
936 	cap_in.value.uint = GF_CODECID_RAW;
937 	cap_out.type = GF_PROP_UINT;
938 	cap_out.value.uint = codecid;
939 
940 	count = gf_list_count(fsess->registry);
941 	for (i=0; i<count; i++) {
942 		const GF_FilterRegister *f_reg = gf_list_get(fsess->registry, i);
943 
944 		if ( gf_fs_check_filter_register_cap(f_reg, GF_PROP_PID_CODECID, &cap_in, GF_PROP_PID_CODECID, &cap_out, GF_FALSE)) {
945 			if (!candidate || (candidate->priority>f_reg->priority))
946 				candidate = f_reg;
947 		}
948 	}
949 	if (!candidate) {
950 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Cannot find any filter providing encoding for %s\n", cid));
951 		return NULL;
952 	}
953 	filter = gf_filter_new(fsess, candidate, args, NULL, GF_FILTER_ARG_EXPLICIT, &e, NULL, GF_FALSE);
954 	if (!filter) {
955 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to load filter %s: %s\n", candidate->name, gf_error_to_string(e) ));
956 	} else {
957 		filter->encoder_stream_type = gf_codecid_type(codecid);
958 	}
959 	return filter;
960 }
961 
962 GF_EXPORT
gf_fs_filter_exists(GF_FilterSession * fsess,const char * name)963 Bool gf_fs_filter_exists(GF_FilterSession *fsess, const char *name)
964 {
965 	u32 i, count;
966 
967 	if (!strcmp(name, "enc")) return GF_TRUE;
968 
969 	count = gf_list_count(fsess->registry);
970 	for (i=0;i<count;i++) {
971 		const GF_FilterRegister *f_reg = gf_list_get(fsess->registry, i);
972 		if (!strcmp(f_reg->name, name)) {
973 			return GF_TRUE;
974 		}
975 	}
976 	return GF_FALSE;
977 }
978 
979 GF_EXPORT
gf_fs_load_filter(GF_FilterSession * fsess,const char * name,GF_Err * err_code)980 GF_Filter *gf_fs_load_filter(GF_FilterSession *fsess, const char *name, GF_Err *err_code)
981 {
982 	const char *args=NULL;
983 	u32 i, len, count = gf_list_count(fsess->registry);
984 	Bool quiet = (err_code && (*err_code == GF_EOS)) ? GF_TRUE : GF_FALSE;
985 
986 	char *sep;
987 
988 	assert(fsess);
989 	assert(name);
990 	if (err_code) *err_code = GF_OK;
991 
992 	sep = strchr(name, fsess->sep_args);
993 	if (sep) {
994 		args = sep+1;
995 		len = (u32) (sep - name);
996 	} else len = (u32) strlen(name);
997 
998 	if (!len) {
999 		if (!quiet) {
1000 			GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Missing filter name in %s\n", name));
1001 		}
1002 		return NULL;
1003 	}
1004 
1005 	if (!strncmp(name, "enc", len)) {
1006 		return gf_fs_load_encoder(fsess, args);
1007 	}
1008 	/*regular filter loading*/
1009 	for (i=0;i<count;i++) {
1010 		const GF_FilterRegister *f_reg = gf_list_get(fsess->registry, i);
1011 		if ((strlen(f_reg->name)==len) && !strncmp(f_reg->name, name, len)) {
1012 			GF_Filter *filter;
1013 			GF_FilterArgType argtype = GF_FILTER_ARG_EXPLICIT;
1014 
1015 			if ((f_reg->flags & GF_FS_REG_REQUIRES_RESOLVER) && !fsess->max_resolve_chain_len) {
1016 				GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Filter %s requires graph resolver but it is disabled\n", name));
1017 				if (err_code) *err_code = GF_BAD_PARAM;
1018 				return NULL;
1019 			}
1020 
1021 			if (f_reg->flags & GF_FS_REG_ACT_AS_SOURCE) argtype = GF_FILTER_ARG_EXPLICIT_SOURCE;
1022 			filter = gf_filter_new(fsess, f_reg, args, NULL, argtype, err_code, NULL, GF_FALSE);
1023 			if (!filter) return NULL;
1024 			if (!filter->num_output_pids) {
1025 				const char *src_url = strstr(name, "src");
1026 				if (src_url && (src_url[3]==fsess->sep_name))
1027 					gf_filter_post_process_task(filter);
1028 			}
1029 			return filter;
1030 		}
1031 	}
1032 	/*check JS file*/
1033 	if (strstr(name, ".js") || strstr(name, ".jsf") || strstr(name, ".mjs") ) {
1034 		char szPath[10+GF_MAX_PATH];
1035 		if (len>GF_MAX_PATH)
1036 			return NULL;
1037 		strncpy(szPath, name, len);
1038 		szPath[len]=0;
1039 		if (gf_file_exists(szPath)) {
1040 			sprintf(szPath, "jsf%cjs%c", fsess->sep_args, fsess->sep_name);
1041 			strcat(szPath, name);
1042 			return gf_fs_load_filter(fsess, szPath, err_code);
1043 		}
1044 	}
1045 
1046 	if (!quiet) {
1047 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to load filter %s: no such filter registry\n", name));
1048 	}
1049 	if (err_code) *err_code = GF_FILTER_NOT_FOUND;
1050 	return NULL;
1051 }
1052 
1053 //in mono thread mode, we cannot always sleep for the requested timeout in case there are more tasks to be processed
1054 //this defines the number of pending tasks above wich we limit sleep
1055 #define MONOTH_MIN_TASKS	2
1056 //this defines the sleep time for this case
1057 #define MONOTH_MIN_SLEEP	5
1058 
gf_fs_thread_proc(GF_SessionThread * sess_thread)1059 static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
1060 {
1061 	GF_FilterSession *fsess = sess_thread->fsess;
1062 	u32 i, th_count = fsess->threads ? gf_list_count(fsess->threads) : 0;
1063 	u32 thid =  1 + gf_list_find(fsess->threads, sess_thread);
1064 	u64 enter_time = gf_sys_clock_high_res();
1065 	Bool use_main_sema = thid ? GF_FALSE : GF_TRUE;
1066 #ifndef GPAC_DISABLE_LOG
1067 	u32 sys_thid = gf_th_id();
1068 #endif
1069 	u64 next_task_schedule_time = 0;
1070 	Bool do_regulate = (fsess->flags & GF_FS_FLAG_NO_REGULATION) ? GF_FALSE : GF_TRUE;
1071 	u32 consecutive_filter_tasks=0;
1072 	Bool force_secondary_tasks = GF_FALSE;
1073 	Bool skip_next_sema_wait = GF_FALSE;
1074 
1075 	GF_Filter *current_filter = NULL;
1076 	sess_thread->th_id = gf_th_id();
1077 
1078 #ifndef GPAC_DISABLE_REMOTERY
1079 	sess_thread->rmt_tasks=40;
1080 	gf_rmt_set_thread_name(sess_thread->rmt_name);
1081 #endif
1082 
1083 	gf_rmt_begin(fs_thread, 0);
1084 
1085 	safe_int_inc(&fsess->active_threads);
1086 
1087 	if (!thid && fsess->no_main_thread) {
1088 		GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc enter\n"));
1089 	}
1090 
1091 	while (1) {
1092 		Bool notified;
1093 		Bool requeue = GF_FALSE;
1094 		u64 active_start, task_time;
1095 		GF_FSTask *task=NULL;
1096 #ifdef CHECK_TASK_LIST_INTEGRITY
1097 		GF_Filter *prev_current_filter = NULL;
1098 		Bool skip_filter_task_check = GF_FALSE;
1099 #endif
1100 
1101 #ifndef GPAC_DISABLE_REMOTERY
1102 		sess_thread->rmt_tasks--;
1103 		if (!sess_thread->rmt_tasks) {
1104 			gf_rmt_end();
1105 			gf_rmt_begin(fs_thread, 0);
1106 			sess_thread->rmt_tasks=40;
1107 		}
1108 #endif
1109 
1110 		safe_int_dec(&fsess->active_threads);
1111 
1112 		if (!skip_next_sema_wait && (current_filter==NULL)) {
1113 			gf_rmt_begin(sema_wait, GF_RMT_AGGREGATE);
1114 			GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u Waiting scheduler %s semaphore\n", sys_thid, use_main_sema ? "main" : "secondary"));
1115 			//wait for something to be done
1116 			gf_fs_sema_io(fsess, GF_FALSE, use_main_sema);
1117 			consecutive_filter_tasks = 0;
1118 			gf_rmt_end();
1119 		}
1120 		safe_int_inc(&fsess->active_threads);
1121 		skip_next_sema_wait = GF_FALSE;
1122 
1123 		active_start = gf_sys_clock_high_res();
1124 
1125 		if (current_filter==NULL) {
1126 			//main thread
1127 			if (thid==0) {
1128 				if (!force_secondary_tasks) {
1129 					task = gf_fq_pop(fsess->main_thread_tasks);
1130 				}
1131 				if (!task) {
1132 					task = gf_fq_pop(fsess->tasks);
1133 					if (task && task->blocking) {
1134 						gf_fq_add(fsess->tasks, task);
1135 						task = NULL;
1136 						gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
1137 					}
1138 				}
1139 				force_secondary_tasks = GF_FALSE;
1140 			} else {
1141 				task = gf_fq_pop(fsess->tasks);
1142 			}
1143 			if (task) {
1144 				assert( task->run_task );
1145 				assert( task->notified );
1146 			}
1147 		} else {
1148 			//keep task in filter tasks list until done
1149 			task = gf_fq_head(current_filter->tasks);
1150 			if (task) {
1151 				assert( task->run_task );
1152 				assert( ! task->notified );
1153 			}
1154 		}
1155 
1156 		if (!task) {
1157 			u32 force_nb_notif = 0;
1158 			next_task_schedule_time = 0;
1159 			//no more task and EOS signal
1160 			if (fsess->run_status != GF_OK)
1161 				break;
1162 
1163 			if (!fsess->tasks_pending && fsess->main_th.has_seen_eot) {
1164 				//check all threads
1165 				Bool all_done = GF_TRUE;
1166 
1167 				for (i=0; i<th_count; i++) {
1168 					GF_SessionThread *st = gf_list_get(fsess->threads, i);
1169 					if (!st->has_seen_eot) {
1170 						all_done = GF_FALSE;
1171 						force_nb_notif++;
1172 					}
1173 				}
1174 				if (all_done)
1175 					break;
1176 			}
1177 			if (current_filter) {
1178 				current_filter->scheduled_for_next_task = GF_FALSE;
1179 				current_filter->process_th_id = 0;
1180 				assert(current_filter->in_process);
1181 				current_filter->in_process = GF_FALSE;
1182 			}
1183 			current_filter = NULL;
1184 			sess_thread->active_time += gf_sys_clock_high_res() - active_start;
1185 
1186 
1187 			//no pending tasks and first time main task queue is empty, flush to detect if we
1188 			//are indeed done
1189 			if (!fsess->tasks_pending && !fsess->tasks_in_process && !sess_thread->has_seen_eot && !gf_fq_count(fsess->tasks)) {
1190 				//maybe last task, force a notify to check if we are truly done
1191 				sess_thread->has_seen_eot = GF_TRUE;
1192 				//not main thread and some tasks pending on main, notify only ourselves
1193 				if (thid && gf_fq_count(fsess->main_thread_tasks)) {
1194 					gf_fs_sema_io(fsess, GF_TRUE, use_main_sema);
1195 				}
1196 				//main thread exit probing, send a notify to main sema (for this thread), and N for the secondary one
1197 				else {
1198 					GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler main semaphore\n", gf_th_id()));
1199 					gf_sema_notify(fsess->semaphore_main, 1);
1200 					GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler secondary semaphore %d\n", gf_th_id(), th_count));
1201 					gf_sema_notify(fsess->semaphore_other, th_count);
1202 				}
1203 			}
1204 			//this thread and the main thread are done but we still have unfinished threads, re-notify everyone
1205 			else if (!fsess->tasks_pending && fsess->main_th.has_seen_eot && force_nb_notif) {
1206 				GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler main semaphore\n", gf_th_id()));
1207 				gf_sema_notify(fsess->semaphore_main, 1);
1208 				GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u notify scheduler secondary semaphore %d\n", gf_th_id(), th_count));
1209 				gf_sema_notify(fsess->semaphore_other, th_count);
1210 			}
1211 
1212 			GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: no task available\n", sys_thid));
1213 
1214 			//no main thread, return
1215 			if (!thid && fsess->no_main_thread) {
1216 				GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc exit\n"));
1217 				return 0;
1218 			}
1219 
1220 			if (do_regulate) {
1221 				gf_sleep(0);
1222 			}
1223 			continue;
1224 		}
1225 #ifdef CHECK_TASK_LIST_INTEGRITY
1226 		check_task_list(fsess->main_thread_tasks, task);
1227 		check_task_list(fsess->tasks, task);
1228 #endif
1229 		if (current_filter) {
1230 			assert(current_filter==task->filter);
1231 		}
1232 		current_filter = task->filter;
1233 
1234 		//this is a crude way of scheduling the next task, we should
1235 		//1- have a way to make sure we will not repost after a time-consuming task
1236 		//2- have a way to wait for the given amount of time rather than just do a sema_wait/notify in loop
1237 		if (task->schedule_next_time) {
1238 			s64 now = gf_sys_clock_high_res();
1239 			s64 diff = task->schedule_next_time;
1240 			diff -= now;
1241 			diff /= 1000;
1242 
1243 
1244 			if (diff > 0) {
1245 				GF_FSTask *next;
1246 				s64 tdiff = diff;
1247 				s64 ndiff = 0;
1248 
1249 				//no filter, just reschedule the task
1250 				if (!current_filter) {
1251 #ifndef GPAC_DISABLE_LOG
1252 					const char *task_log_name = task->log_name;
1253 #endif
1254 					next = gf_fq_head(fsess->tasks);
1255 					next_task_schedule_time = task->schedule_next_time;
1256 					assert(task->run_task);
1257 #ifdef CHECK_TASK_LIST_INTEGRITY
1258 					check_task_list(fsess->main_thread_tasks, task);
1259 					check_task_list(fsess->tasks, task);
1260 					check_task_list(fsess->tasks_reservoir, task);
1261 #endif
1262 					//tasks without filter are currently only posted to the secondary task list
1263 					gf_fq_add(fsess->tasks, task);
1264 					if (next) {
1265 						if (next->schedule_next_time <= (u64) now) {
1266 							GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, next task time ready for execution\n", sys_thid, task_log_name));
1267 
1268 							skip_next_sema_wait = GF_TRUE;
1269 							continue;
1270 						}
1271 						ndiff = next->schedule_next_time;
1272 						ndiff -= now;
1273 						ndiff /= 1000;
1274 						if (ndiff<diff) {
1275 							GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s scheduled after next task %s:%s (in %d ms vs %d ms)\n", sys_thid, task_log_name, next->log_name, next->filter ? next->filter->name : "", (s32) diff, (s32) ndiff));
1276 							diff = ndiff;
1277 						}
1278 					}
1279 
1280 					if (!do_regulate) {
1281 						diff = 0;
1282 					}
1283 
1284 					if (diff && do_regulate) {
1285 						if (diff > fsess->max_sleep)
1286 							diff = fsess->max_sleep;
1287 						if (th_count==0) {
1288 							if ( gf_fq_count(fsess->tasks) > MONOTH_MIN_TASKS)
1289 								diff = MONOTH_MIN_SLEEP;
1290 						}
1291 						GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, %s task scheduled after this task, sleeping for %d ms (task diff %d - next diff %d)\n", sys_thid, task_log_name, next ? "next" : "no", diff, tdiff, ndiff));
1292 						gf_sleep((u32) diff);
1293 					} else {
1294 						GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, next task scheduled after this task, rerun\n", sys_thid, task_log_name));
1295 					}
1296 					skip_next_sema_wait = GF_TRUE;
1297 					continue;
1298 				}
1299 
1300 				if (!task->filter->finalized) {
1301 #ifdef CHECK_TASK_LIST_INTEGRITY
1302 					next = gf_fq_head(current_filter->tasks);
1303 					assert(next == task);
1304 					check_task_list(fsess->main_thread_tasks, task);
1305 					check_task_list(fsess->tasks_reservoir, task);
1306 #endif
1307 
1308 					//next in filter should be handled before this task, move task at the end of the filter task
1309 					next = gf_fq_get(current_filter->tasks, 1);
1310 					if (next && next->schedule_next_time < task->schedule_next_time) {
1311 						if (task->notified) {
1312 							assert(fsess->tasks_pending);
1313 							safe_int_dec(&fsess->tasks_pending);
1314 							task->notified = GF_FALSE;
1315 						}
1316 						GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s reposted to filter task until task exec time is reached (%d us)\n", sys_thid, current_filter->name, task->log_name, (s32) (task->schedule_next_time - next->schedule_next_time) ));
1317 						//remove task
1318 						gf_fq_pop(current_filter->tasks);
1319 						//and queue it after the next one
1320 						gf_fq_add(current_filter->tasks, task);
1321 						//and continue with the same filter
1322 						continue;
1323 					}
1324 					//little optim here: if this is the main thread and we have other tasks pending
1325 					//check the timing of tasks in the secondary list. If a task is present with smaller time than
1326 					//the head of the main task, force a temporary swap to the secondary task list
1327 					if (!thid && task->notified && diff>10) {
1328 						next = gf_fq_head(fsess->tasks);
1329 						if (next && !next->blocking) {
1330 							u64 next_time_main = task->schedule_next_time;
1331 							u64 next_time_secondary = next->schedule_next_time;
1332 							//if we have several threads, also check the next task on the main task list
1333 							// (different from secondary tasks in multithread case)
1334 							if (th_count) {
1335 								GF_FSTask *next_main = gf_fq_head(fsess->main_thread_tasks);
1336 								if (next_main && (next_time_main > next_main->schedule_next_time))
1337 									next_time_main = next_main->schedule_next_time;
1338 							}
1339 
1340 							if (next_time_secondary<next_time_main) {
1341 								GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: forcing secondary task list on main - current task schedule time "LLU" (diff to now %d) vs next time secondary "LLU" (%s::%s)\n", sys_thid, task->schedule_next_time, (s32) diff, next_time_secondary, next->filter->freg->name, next->log_name));
1342 								diff = 0;
1343 								force_secondary_tasks = GF_TRUE;
1344 							}
1345 						}
1346 					}
1347 
1348 					//move task to main list
1349 					if (!task->notified) {
1350 						task->notified = GF_TRUE;
1351 						safe_int_inc(&fsess->tasks_pending);
1352 					}
1353 
1354 					sess_thread->active_time += gf_sys_clock_high_res() - active_start;
1355 
1356 					if (next_task_schedule_time && (next_task_schedule_time <= task->schedule_next_time)) {
1357 						tdiff = next_task_schedule_time;
1358 						tdiff -= now;
1359 						if (tdiff < 0) tdiff=0;
1360 						if (tdiff<diff) {
1361 							GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: next task has earlier exec time than current task %s:%s, adjusting sleep (old %d - new %d)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, (s32) tdiff));
1362 							diff = tdiff;
1363 						} else {
1364 							GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: next task has earlier exec time#2 than current task %s:%s, adjusting sleep (old %d - new %d)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, (s32) tdiff));
1365 
1366 						}
1367 					}
1368 
1369 					if (do_regulate && diff) {
1370 						if (diff > fsess->max_sleep)
1371 							diff = fsess->max_sleep;
1372 						if (th_count==0) {
1373 							if ( gf_fq_count(fsess->tasks) > MONOTH_MIN_TASKS)
1374 								diff = MONOTH_MIN_SLEEP;
1375 						}
1376 						GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s postponed for %d ms (scheduled time "LLU" us, next task schedule "LLU" us)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, task->schedule_next_time, next_task_schedule_time));
1377 
1378 						gf_sleep((u32) diff);
1379 						active_start = gf_sys_clock_high_res();
1380 					}
1381 					diff = (s64)task->schedule_next_time;
1382 					diff -= (s64) gf_sys_clock_high_res();
1383 					if (diff > 100 ) {
1384 						Bool use_main = (current_filter->freg->flags & GF_FS_REG_MAIN_THREAD) ? GF_TRUE : GF_FALSE;
1385 						GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: releasing current filter %s, exec time due in "LLD" us\n", sys_thid, current_filter->name, diff));
1386 						current_filter->process_th_id = 0;
1387 						current_filter->in_process = GF_FALSE;
1388 						//don't touch the current filter tasks, just repost the task to the main/secondary list
1389 						assert(gf_fq_count(current_filter->tasks));
1390 						current_filter = NULL;
1391 
1392 #ifdef CHECK_TASK_LIST_INTEGRITY
1393 						check_task_list(fsess->main_thread_tasks, task);
1394 						check_task_list(fsess->tasks, task);
1395 						check_task_list(fsess->tasks_reservoir, task);
1396 						assert(task->run_task);
1397 #endif
1398 
1399 						if (use_main) {
1400 							gf_fq_add(fsess->main_thread_tasks, task);
1401 							//we are the main thread and reposting to the main task list, don't notify/wait for the sema, just retry
1402 							//we are sure to get a task from main list at next iteration
1403 							if (use_main_sema) {
1404 								skip_next_sema_wait = GF_TRUE;
1405 							} else {
1406 								gf_fs_sema_io(fsess, GF_TRUE, GF_TRUE);
1407 							}
1408 						} else {
1409 							gf_fq_add(fsess->tasks, task);
1410 							//we are not the main thread and we are reposting to the secondary task list, don't notify/wait for the sema, just retry
1411 							//we are not sure to get a task from secondary list at next iteration, but the end of thread check will make
1412 							//sure we renotify secondary sema if some tasks are still pending
1413 							if (!use_main_sema) {
1414 								skip_next_sema_wait = GF_TRUE;
1415 							} else {
1416 								gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
1417 							}
1418 						}
1419 						//we temporary force the main thread to fetch a task from the secondary list
1420 						//because the first main task was not yet due for execution
1421 						//it is likely that the execution of the next task will not wake up the main thread
1422 						//but we must reevaluate the previous main task timing, so we force a notification of the main sema
1423 						if (force_secondary_tasks)
1424 							gf_fs_sema_io(fsess, GF_TRUE, GF_TRUE);
1425 
1426 						continue;
1427 					}
1428 					force_secondary_tasks=GF_FALSE;
1429 				}
1430 			}
1431 			GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s schedule time "LLU" us reached (diff %d ms)\n", sys_thid, current_filter ? current_filter->name : "", task->log_name, task->schedule_next_time, (s32) diff));
1432 
1433 		}
1434 		next_task_schedule_time = 0;
1435 
1436 		if (current_filter) {
1437 			current_filter->scheduled_for_next_task = GF_TRUE;
1438 			assert(!current_filter->in_process);
1439 			current_filter->in_process = GF_TRUE;
1440 			current_filter->process_th_id = gf_th_id();
1441 		}
1442 
1443 		sess_thread->nb_tasks++;
1444 		sess_thread->has_seen_eot = GF_FALSE;
1445 		if (task->filter) {
1446 			assert(gf_fq_count(task->filter->tasks));
1447 		}
1448 
1449 		GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u task#%d %p executing Filter %s::%s (%d tasks pending, %d(%d) process task queued)\n", sys_thid, sess_thread->nb_tasks, task, task->filter ? task->filter->name : "none", task->log_name, fsess->tasks_pending, task->filter ? task->filter->process_task_queued : 0, task->filter ? gf_fq_count(task->filter->tasks) : 0));
1450 
1451 		safe_int_inc(& fsess->tasks_in_process );
1452 		assert( task->run_task );
1453 		task_time = gf_sys_clock_high_res();
1454 
1455 		task->can_swap = GF_FALSE;
1456 		task->requeue_request = GF_FALSE;
1457 		task->run_task(task);
1458 		requeue = task->requeue_request;
1459 
1460 		task_time = gf_sys_clock_high_res() - task_time;
1461 		safe_int_dec(& fsess->tasks_in_process );
1462 
1463 		//may now be NULL if task was a filter destruction task
1464 		current_filter = task->filter;
1465 
1466 #ifdef CHECK_TASK_LIST_INTEGRITY
1467 		prev_current_filter = task->filter;
1468 #endif
1469 
1470 		//source task was current filter, pop the filter task list
1471 		if (current_filter) {
1472 			current_filter->nb_tasks_done++;
1473 			current_filter->time_process += task_time;
1474 			consecutive_filter_tasks++;
1475 
1476 			gf_mx_p(current_filter->tasks_mx);
1477 			//if last task
1478 			if ( (gf_fq_count(current_filter->tasks)==1)
1479 				//if requeue request and stream reset pending (we must exit the filter task loop for the reset task to pe processed)
1480 				|| (requeue && current_filter->stream_reset_pending)
1481 				//or requeue request and pid swap pending (we must exit the filter task loop for the swap task to pe processed)
1482 				|| (requeue && (current_filter->swap_pidinst_src ||  current_filter->swap_pidinst_dst) )
1483 				//or requeue request and pid detach / cap negotiate pending
1484 				|| (requeue && (current_filter->out_pid_connection_pending || current_filter->detached_pid_inst || current_filter->caps_negociate) )
1485 
1486 				//or requeue request and we have been running on that filter for more than 10 times, abort
1487 				|| (requeue && (consecutive_filter_tasks>10))
1488 			) {
1489 
1490 				if (requeue) {
1491 					//filter task can be pushed back the queue of tasks
1492 					if (task->can_swap) {
1493 						GF_FSTask *next_task;
1494 
1495 						//drop task from filter task list
1496 						gf_fq_pop(current_filter->tasks);
1497 
1498 						next_task = gf_fq_head(current_filter->tasks);
1499 						//if first task was notified, swap the flag
1500 						if (next_task) {
1501 							//see note in post_task_ex for caution about this !!
1502 							next_task->notified = task->notified;
1503 							task->notified = GF_FALSE;
1504 						}
1505 						//requeue task
1506 						gf_fq_add(current_filter->tasks, task);
1507 
1508 						//ans swap task for later requeing
1509 						if (next_task) task = next_task;
1510 					}
1511 					//otherwise (can't swap) keep task first in the list
1512 
1513 					//don't reset scheduled_for_next_task flag if requeued to make sure no other task posted from
1514 					//another thread will post to main sched
1515 
1516 #ifdef CHECK_TASK_LIST_INTEGRITY
1517 					skip_filter_task_check = GF_TRUE;
1518 #endif
1519 				} else {
1520 					//no requeue, filter no longer scheduled and drop task
1521 					current_filter->scheduled_for_next_task = GF_FALSE;
1522 
1523 					//drop task from filter task list
1524 					gf_fq_pop(current_filter->tasks);
1525 				}
1526 
1527 				current_filter->process_th_id = 0;
1528 				current_filter->in_process = GF_FALSE;
1529 
1530 				//unlock once we modified in_process, otherwise this will make our assert fail
1531 				gf_mx_v(current_filter->tasks_mx);
1532 #ifdef CHECK_TASK_LIST_INTEGRITY
1533 				if (requeue && !skip_filter_task_check) check_task_list(current_filter->tasks, task);
1534 #endif
1535 				current_filter = NULL;
1536 			} else {
1537 				//drop task from filter task list
1538 				gf_fq_pop(current_filter->tasks);
1539 
1540 				//not requeued, no more tasks, deactivate filter
1541 				if (!requeue && !gf_fq_count(current_filter->tasks)) {
1542 					current_filter->process_th_id = 0;
1543 					current_filter->in_process = GF_FALSE;
1544 					current_filter->scheduled_for_next_task = GF_FALSE;
1545 					gf_mx_v(current_filter->tasks_mx);
1546 					current_filter = NULL;
1547 				} else {
1548 #ifdef CHECK_TASK_LIST_INTEGRITY
1549 					check_task_list(fsess->main_thread_tasks, task);
1550 					check_task_list(fsess->tasks, task);
1551 					check_task_list(fsess->tasks_reservoir, task);
1552 #endif
1553 
1554 					//requeue task in current filter
1555 					if (requeue)
1556 						gf_fq_add(current_filter->tasks, task);
1557 
1558 					gf_mx_v(current_filter->tasks_mx);
1559 				}
1560 			}
1561 		}
1562 		//do not touch the filter task list after this, it has to be mutex protected to ensure proper posting of tasks
1563 
1564 		notified = task->notified;
1565 		if (requeue) {
1566 			//if requeue on a filter active, use filter queue to avoid another thread grabing the task (we would have concurrent access to the filter)
1567 			if (current_filter) {
1568 				GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u re-posted task Filter %s::%s in filter tasks (%d pending)\n", sys_thid, task->filter->name, task->log_name, fsess->tasks_pending));
1569 				task->notified = GF_FALSE;
1570 				//keep this thread running on the current filter no signaling of semaphore
1571 			} else {
1572 				GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u re-posted task Filter %s::%s in %s tasks (%d pending)\n", sys_thid, task->filter ? task->filter->name : "none", task->log_name, (task->filter && (task->filter->freg->flags & GF_FS_REG_MAIN_THREAD)) ? "main" : "secondary", fsess->tasks_pending));
1573 
1574 				task->notified = GF_TRUE;
1575 				safe_int_inc(&fsess->tasks_pending);
1576 
1577 #ifdef CHECK_TASK_LIST_INTEGRITY
1578 				check_task_list(fsess->main_thread_tasks, task);
1579 				check_task_list(fsess->tasks, task);
1580 				check_task_list(fsess->tasks_reservoir, task);
1581 				if (prev_current_filter && !skip_filter_task_check) check_task_list(prev_current_filter->tasks, task);
1582 #endif
1583 
1584 				//main thread
1585 				if (task->filter && (task->filter->freg->flags & GF_FS_REG_MAIN_THREAD)) {
1586 					gf_fq_add(fsess->main_thread_tasks, task);
1587 				} else {
1588 					gf_fq_add(fsess->tasks, task);
1589 				}
1590 				gf_fs_sema_io(fsess, GF_TRUE, use_main_sema);
1591 			}
1592 		} else {
1593 #ifdef CHECK_TASK_LIST_INTEGRITY
1594 			check_task_list(fsess->main_thread_tasks, task);
1595 			check_task_list(fsess->tasks, task);
1596 			check_task_list(fsess->tasks_reservoir, task);
1597 			if (prev_current_filter)
1598 				check_task_list(prev_current_filter->tasks, task);
1599 
1600 			{
1601 				gf_mx_p(fsess->filters_mx);
1602 				u32 k, c2 = gf_list_count(fsess->filters);
1603 				for (k=0; k<c2; k++) {
1604 					GF_Filter *af = gf_list_get(fsess->filters, k);
1605 					check_task_list(af->tasks, task);
1606 				}
1607 				gf_mx_v(fsess->filters_mx);
1608 			}
1609 #endif
1610 			GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u task#%d %p pushed to reservoir\n", sys_thid, sess_thread->nb_tasks, task));
1611 
1612 			if (fsess->tasks_reservoir) {
1613 				memset(task, 0, sizeof(GF_FSTask));
1614 				gf_fq_add(fsess->tasks_reservoir, task);
1615 			} else {
1616 				gf_free(task);
1617 			}
1618 		}
1619 
1620 		//decrement task counter
1621 		if (notified) {
1622 			assert(fsess->tasks_pending);
1623 			safe_int_dec(&fsess->tasks_pending);
1624 		}
1625 		if (current_filter) {
1626 			current_filter->process_th_id = 0;
1627 			current_filter->in_process = GF_FALSE;
1628 		}
1629 		//not requeuing and first time we have an empty task queue, flush to detect if we are indeed done
1630 		if (!current_filter && !fsess->tasks_pending && !sess_thread->has_seen_eot && !gf_fq_count(fsess->tasks)) {
1631 			//if not the main thread, or if main thread and task list is empty, enter end of session probing mode
1632 			if (thid || !gf_fq_count(fsess->main_thread_tasks) ) {
1633 				//maybe last task, force a notify to check if we are truly done. We only tag "session done" for the non-main
1634 				//threads, in order to enter the end-of session signaling above
1635 				if (thid) sess_thread->has_seen_eot = GF_TRUE;
1636 				gf_fs_sema_io(fsess, GF_TRUE, use_main_sema);
1637 			}
1638 		}
1639 
1640 		sess_thread->active_time += gf_sys_clock_high_res() - active_start;
1641 
1642 
1643 		//no main thread, return
1644 		if (!thid && fsess->no_main_thread && !current_filter && !fsess->pid_connect_tasks_pending) {
1645 			gf_rmt_end();
1646 			GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc exit\n"));
1647 			return 0;
1648 		}
1649 	}
1650 
1651 	gf_rmt_end();
1652 
1653 	//no main thread, return
1654 	if (!thid && fsess->no_main_thread) {
1655 		GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc exit\n"));
1656 		return 0;
1657 	}
1658 	sess_thread->run_time = gf_sys_clock_high_res() - enter_time;
1659 
1660 	safe_int_inc(&fsess->nb_threads_stopped);
1661 
1662 	if (!fsess->run_status)
1663 		fsess->run_status = GF_EOS;
1664 
1665 	// thread exit, notify the semaphores
1666 	if (fsess->semaphore_main && ! gf_sema_notify(fsess->semaphore_main, 1)) {
1667 		GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("Failed to notify main semaphore, might hang up !!\n"));
1668 	}
1669 	if (fsess->semaphore_other && ! gf_sema_notify(fsess->semaphore_other, th_count)) {
1670 		GF_LOG(GF_LOG_ERROR, GF_LOG_SCHEDULER, ("Failed to notify secondary semaphore, might hang up !!\n"));
1671 	}
1672 
1673 	return 0;
1674 }
1675 
1676 
1677 GF_EXPORT
gf_fs_run(GF_FilterSession * fsess)1678 GF_Err gf_fs_run(GF_FilterSession *fsess)
1679 {
1680 	u32 i, nb_threads;
1681 	assert(fsess);
1682 
1683 	fsess->run_status = GF_OK;
1684 	fsess->main_th.has_seen_eot = GF_FALSE;
1685 	fsess->nb_threads_stopped = 0;
1686 
1687 	nb_threads = gf_list_count(fsess->threads);
1688 	for (i=0;i<nb_threads; i++) {
1689 		GF_SessionThread *sess_th = gf_list_get(fsess->threads, i);
1690 		gf_th_run(sess_th->th, (gf_thread_run) gf_fs_thread_proc, sess_th);
1691 	}
1692 	if (fsess->no_main_thread) return GF_OK;
1693 
1694 	gf_fs_thread_proc(&fsess->main_th);
1695 
1696 	//wait for all threads to be done
1697 	while (nb_threads+1 != fsess->nb_threads_stopped) {
1698 		gf_sleep(1);
1699 	}
1700 
1701 	return fsess->run_status;
1702 }
1703 
gf_fs_run_step(GF_FilterSession * fsess)1704 void gf_fs_run_step(GF_FilterSession *fsess)
1705 {
1706 	gf_fs_thread_proc(&fsess->main_th);
1707 }
1708 
1709 GF_EXPORT
gf_fs_abort(GF_FilterSession * fsess,Bool do_flush)1710 GF_Err gf_fs_abort(GF_FilterSession *fsess, Bool do_flush)
1711 {
1712 	u32 i, count;
1713 	GF_LOG(GF_LOG_INFO, GF_LOG_FILTER, ("Session abort from user, stoping sources\n"));
1714 	if (!fsess) return GF_BAD_PARAM;
1715 
1716 	if (!do_flush) {
1717 		fsess->in_final_flush = GF_TRUE;
1718 		fsess->run_status = GF_EOS;
1719 		return GF_OK;
1720 	}
1721 
1722 	gf_mx_p(fsess->filters_mx);
1723 	count = gf_list_count(fsess->filters);
1724 	//disable all sources
1725 	for (i=0; i<count; i++) {
1726 		GF_Filter *filter = gf_list_get(fsess->filters, i);
1727 		//force end of session on all sources, and on all filters connected to these sources, and dispatch end of stream on all outputs pids of these filters
1728 		//if we don't propagate on connected filters, we take the risk of not deactivating demuxers working from file
1729 		//(eg ignoring input packets)
1730 		if (!filter->num_input_pids) {
1731 			u32 j, k, l;
1732 			filter->disabled = GF_TRUE;
1733 			for (j=0; j<filter->num_output_pids; j++) {
1734 				GF_FilterPid *pid = gf_list_get(filter->output_pids, j);
1735 				gf_filter_pid_set_eos(pid);
1736 				for (k=0; k<pid->num_destinations; k++) {
1737 					GF_FilterPidInst *pidi = gf_list_get(pid->destinations, k);
1738 					pidi->filter->disabled = GF_TRUE;
1739 					for (l=0; l<pidi->filter->num_output_pids; l++) {
1740 						GF_FilterPid *opid = gf_list_get(pidi->filter->output_pids, l);
1741 						if (opid->filter->freg->process_event) {
1742 							GF_FilterEvent evt;
1743 							GF_FEVT_INIT(evt, GF_FEVT_STOP, opid);
1744 							opid->filter->freg->process_event(opid->filter, &evt);
1745 						}
1746 						gf_filter_pid_set_eos(opid);
1747 					}
1748 				}
1749 			}
1750 		}
1751 	}
1752 	fsess->in_final_flush = GF_TRUE;
1753 
1754 	gf_mx_v(fsess->filters_mx);
1755 	return GF_OK;
1756 }
1757 
1758 GF_EXPORT
gf_fs_stop(GF_FilterSession * fsess)1759 GF_Err gf_fs_stop(GF_FilterSession *fsess)
1760 {
1761 	u32 i, count = fsess->threads ? gf_list_count(fsess->threads) : 0;
1762 
1763 	GF_LOG(GF_LOG_DEBUG, GF_LOG_FILTER, ("Session stop\n"));
1764 	if (count+1 == fsess->nb_threads_stopped) {
1765 		return GF_OK;
1766 	}
1767 
1768 	if (!fsess->run_status)
1769 		fsess->run_status = GF_EOS;
1770 
1771 	for (i=0; i < count; i++) {
1772 		gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
1773 	}
1774 
1775 	//wait for all threads to be done, we might still need flushing the main thread queue
1776 	while (fsess->no_main_thread) {
1777 		gf_fs_thread_proc(&fsess->main_th);
1778 		if (gf_fq_count(fsess->main_thread_tasks))
1779 			continue;
1780 
1781 		if (count && (count == fsess->nb_threads_stopped) && gf_fq_count(fsess->tasks) ) {
1782 			continue;
1783 		}
1784 		break;
1785 	}
1786 	if (fsess->no_main_thread) {
1787 		safe_int_inc(&fsess->nb_threads_stopped);
1788 		fsess->main_th.has_seen_eot = GF_TRUE;
1789 	}
1790 
1791 	while (count+1 != fsess->nb_threads_stopped) {
1792 		for (i=0; i < count; i++) {
1793 			gf_fs_sema_io(fsess, GF_TRUE, GF_FALSE);
1794 		}
1795 		gf_sleep(0);
1796 		//we may have tasks in main task list posted by other threads
1797 		if (fsess->no_main_thread) {
1798 			gf_fs_thread_proc(&fsess->main_th);
1799 			fsess->main_th.has_seen_eot = GF_TRUE;
1800 		}
1801 	}
1802 	return GF_OK;
1803 }
1804 
print_filter_name(GF_Filter * f,Bool skip_id,Bool skip_args)1805 static GFINLINE void print_filter_name(GF_Filter *f, Bool skip_id, Bool skip_args)
1806 {
1807 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s", f->freg->name));
1808 	if (strcmp(f->name, f->freg->name)) {
1809 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" \"%s\"", f->name));
1810 	}
1811 	if (!skip_id && f->id) GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" ID %s", f->id));
1812 	if (f->dynamic_filter || skip_args) return;
1813 
1814 	if (!f->src_args && !f->orig_args && !f->dst_args && !f->dynamic_source_ids) return;
1815 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" ("));
1816 	if (f->src_args) {
1817 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s", f->src_args));
1818 	}
1819 	else if (f->orig_args) {
1820 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s", f->orig_args));
1821 	}
1822 	else if (f->dst_args) {
1823 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s", f->dst_args));
1824 	}
1825 
1826 	if (f->dynamic_source_ids) GF_LOG(GF_LOG_INFO, GF_LOG_APP, (",resolved SID:%s", f->source_ids));
1827 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, (")"));
1828 }
1829 
1830 GF_EXPORT
gf_fs_print_stats(GF_FilterSession * fsess)1831 void gf_fs_print_stats(GF_FilterSession *fsess)
1832 {
1833 	u64 run_time=0, active_time=0, nb_tasks=0, nb_filters=0;
1834 	u32 i, count;
1835 
1836 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
1837 	if (fsess->filters_mx) gf_mx_p(fsess->filters_mx);
1838 
1839 	count=gf_list_count(fsess->filters);
1840 	for (i=0; i<count; i++) {
1841 		GF_Filter *f = gf_list_get(fsess->filters, i);
1842 		if (f->multi_sink_target) continue;
1843 		nb_filters++;
1844 	}
1845 
1846 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Filter stats - %d filters\n", nb_filters));
1847 	for (i=0; i<count; i++) {
1848 		u32 k, ipids, opids;
1849 		GF_Filter *f = gf_list_get(fsess->filters, i);
1850 		if (f->multi_sink_target) continue;
1851 
1852 		ipids = f->num_input_pids;
1853 		opids = f->num_output_pids;
1854 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\tFilter "));
1855 		print_filter_name(f, GF_FALSE, GF_FALSE);
1856 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" : %d input pids %d output pids "LLU" tasks "LLU" us process time\n", ipids, opids, f->nb_tasks_done, f->time_process));
1857 
1858 		if (ipids) {
1859 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t"LLU" packets processed "LLU" bytes processed", f->nb_pck_processed, f->nb_bytes_processed));
1860 			if (f->time_process) {
1861 				GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (%g pck/sec %g mbps)", (Double) f->nb_pck_processed*1000000/f->time_process, (Double) f->nb_bytes_processed*8/f->time_process));
1862 			}
1863 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
1864 		}
1865 		if (opids) {
1866 			if (f->nb_hw_pck_sent) {
1867 				GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t"LLU" hardware frames sent", f->nb_hw_pck_sent));
1868 				if (f->time_process) {
1869 					GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (%g pck/sec)", (Double) f->nb_hw_pck_sent*1000000/f->time_process));
1870 				}
1871 
1872 			} else if (f->nb_pck_sent) {
1873 				GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t"LLU" packets sent "LLU" bytes sent", f->nb_pck_sent, f->nb_bytes_sent));
1874 				if (f->time_process) {
1875 					GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (%g pck/sec %g mbps)", (Double) f->nb_pck_sent*1000000/f->time_process, (Double) f->nb_bytes_sent*8/f->time_process));
1876 				}
1877 			}
1878 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
1879 		}
1880 
1881 		for (k=0; k<ipids; k++) {
1882 			GF_FilterPidInst *pid = gf_list_get(f->input_pids, k);
1883 			if (!pid->pid) continue;
1884 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t* input PID %s: %d packets received\n", pid->pid->name, pid->pid->nb_pck_sent));
1885 		}
1886 #ifndef GPAC_DISABLE_LOG
1887 		for (k=0; k<opids; k++) {
1888 			GF_FilterPid *pid = gf_list_get(f->output_pids, k);
1889 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t* output PID %s: %d packets sent\n", pid->name, pid->nb_pck_sent));
1890 		}
1891 		if (f->nb_errors) {
1892 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\t\t%d errors while processing\n", f->nb_errors));
1893 		}
1894 #endif
1895 
1896 	}
1897 	if (fsess->filters_mx) gf_mx_v(fsess->filters_mx);
1898 
1899 	count=gf_list_count(fsess->threads);
1900 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Session stats - threads %d\n", 1+count));
1901 
1902 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\tThread %u: run_time "LLU" us active_time "LLU" us nb_tasks "LLU"\n", 1, fsess->main_th.run_time, fsess->main_th.active_time, fsess->main_th.nb_tasks));
1903 
1904 	run_time+=fsess->main_th.run_time;
1905 	active_time+=fsess->main_th.active_time;
1906 	nb_tasks+=fsess->main_th.nb_tasks;
1907 
1908 	for (i=0; i<count; i++) {
1909 		GF_SessionThread *s = gf_list_get(fsess->threads, i);
1910 
1911 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\tThread %u: run_time "LLU" us active_time "LLU" us nb_tasks "LLU"\n", i+2, s->run_time, s->active_time, s->nb_tasks));
1912 
1913 		run_time+=s->run_time;
1914 		active_time+=s->active_time;
1915 		nb_tasks+=s->nb_tasks;
1916 	}
1917 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\nTotal: run_time "LLU" us active_time "LLU" us nb_tasks "LLU"\n", run_time, active_time, nb_tasks));
1918 }
1919 
gf_fs_print_filter_outputs(GF_Filter * f,GF_List * filters_done,u32 indent,GF_FilterPid * pid,GF_Filter * alias_for)1920 static void gf_fs_print_filter_outputs(GF_Filter *f, GF_List *filters_done, u32 indent, GF_FilterPid *pid, GF_Filter *alias_for)
1921 {
1922 	u32 i=0;
1923 
1924 	while (i<indent) {
1925 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("-"));
1926 		i++;
1927 	}
1928 
1929 	if (pid) {
1930 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("(PID %s) ", pid->name));
1931 	}
1932 	print_filter_name(f, GF_TRUE, GF_FALSE);
1933 	if (f->id) {
1934 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (ID=%s)\n", f->id));
1935 	} else {
1936 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (ptr=%p)\n", f));
1937 	}
1938 	if (gf_list_find(filters_done, f)>=0)
1939 		return;
1940 
1941 	gf_list_add(filters_done, f);
1942 	if (alias_for) {
1943 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" (<=> "));
1944 		print_filter_name(alias_for, GF_TRUE, GF_TRUE);
1945 		if (alias_for->id) {
1946 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" ID=%s", alias_for->id));
1947 		} else {
1948 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" ptr=%p", alias_for));
1949 		}
1950 		GF_LOG(GF_LOG_INFO, GF_LOG_APP, (")\n"));
1951 	}
1952 
1953 	for (i=0; i<f->num_output_pids; i++) {
1954 		u32 j, k;
1955 		GF_FilterPid *pidout = gf_list_get(f->output_pids, i);
1956 		for (j=0; j<pidout->num_destinations; j++) {
1957 			GF_FilterPidInst *pidi = gf_list_get(pidout->destinations, j);
1958 			GF_Filter *alias = NULL;
1959 			for (k=0; k<gf_list_count(f->destination_filters); k++) {
1960 				alias = gf_list_get(f->destination_filters, k);
1961 				if (alias->multi_sink_target == pidi->filter)
1962 					break;
1963 				alias = NULL;
1964 			}
1965 			if (alias) {
1966 
1967 				gf_fs_print_filter_outputs(alias, filters_done, indent+1, pidout, pidi->filter);
1968 			} else {
1969 				gf_fs_print_filter_outputs(pidi->filter, filters_done, indent+1, pidout, NULL);
1970 			}
1971 		}
1972 	}
1973 
1974 }
1975 GF_EXPORT
gf_fs_print_connections(GF_FilterSession * fsess)1976 void gf_fs_print_connections(GF_FilterSession *fsess)
1977 {
1978 	u32 i, count;
1979 	Bool has_undefined=GF_FALSE;
1980 	Bool has_connected=GF_FALSE;
1981 	Bool has_unconnected=GF_FALSE;
1982 	GF_List *filters_done;
1983 	GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
1984 	if (fsess->filters_mx) gf_mx_p(fsess->filters_mx);
1985 
1986 	filters_done = gf_list_new();
1987 
1988 	count=gf_list_count(fsess->filters);
1989 	for (i=0; i<count; i++) {
1990 		GF_Filter *f = gf_list_get(fsess->filters, i);
1991 		//only dump sources
1992 		if (f->num_input_pids) continue;
1993 		if (!f->num_output_pids) continue;
1994 		if (!has_connected) {
1995 			has_connected = GF_TRUE;
1996 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Filters connected:\n"));
1997 		}
1998 		gf_fs_print_filter_outputs(f, filters_done, 0, NULL, NULL);
1999 	}
2000 	for (i=0; i<count; i++) {
2001 		GF_Filter *f = gf_list_get(fsess->filters, i);
2002 		//only dump not connected ones
2003 		if (f->num_input_pids || f->num_output_pids || f->multi_sink_target) continue;
2004 		if (!has_unconnected) {
2005 			has_unconnected = GF_TRUE;
2006 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Filters not connected:\n"));
2007 		}
2008 		gf_fs_print_filter_outputs(f, filters_done, 0, NULL, NULL);
2009 	}
2010 	for (i=0; i<count; i++) {
2011 		GF_Filter *f = gf_list_get(fsess->filters, i);
2012 		if (f->multi_sink_target) continue;
2013 		if (gf_list_find(filters_done, f)>=0) continue;
2014 		if (!has_undefined) {
2015 			has_undefined = GF_TRUE;
2016 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("Filters in unknown connection state:\n"));
2017 		}
2018 		gf_fs_print_filter_outputs(f, filters_done, 0, NULL, NULL);
2019 	}
2020 
2021 	if (fsess->filters_mx) gf_mx_v(fsess->filters_mx);
2022 	gf_list_del(filters_done);
2023 }
2024 
2025 
2026 GF_EXPORT
gf_fs_send_update(GF_FilterSession * fsess,const char * fid,GF_Filter * filter,const char * name,const char * val,GF_EventPropagateType propagate_mask)2027 void gf_fs_send_update(GF_FilterSession *fsess, const char *fid, GF_Filter *filter, const char *name, const char *val, GF_EventPropagateType propagate_mask)
2028 {
2029 	GF_FilterUpdate *upd;
2030 	u32 i, count;
2031 	Bool removed = GF_FALSE;
2032 	if ((!fid && !filter) || !name) return;
2033 
2034 	if (fsess->filters_mx) gf_mx_p(fsess->filters_mx);
2035 
2036 	if (!filter) {
2037 		GF_Filter *reg_filter = NULL;
2038 		count = gf_list_count(fsess->filters);
2039 		for (i=0; i<count; i++) {
2040 			filter = gf_list_get(fsess->filters, i);
2041 			if (filter->id && !strcmp(filter->id, fid)) {
2042 				break;
2043 			}
2044 			if (filter->name && !strcmp(filter->name, fid)) {
2045 				break;
2046 			}
2047 			if (!reg_filter && !strcmp(filter->freg->name, fid))
2048 				reg_filter = filter;
2049 			filter = NULL;
2050 		}
2051 		if (!filter)
2052 			filter = reg_filter;
2053 	}
2054 
2055 	if (filter && filter->multi_sink_target)
2056 		filter = filter->multi_sink_target;
2057 
2058 	removed = (!filter || filter->removed || filter->finalized) ? GF_TRUE : GF_FALSE;
2059 	if (fsess->filters_mx) gf_mx_v(fsess->filters_mx);
2060 
2061 	if (removed) return;
2062 
2063 	GF_SAFEALLOC(upd, GF_FilterUpdate);
2064 	if (!val) {
2065 		char *sep = strchr(name, fsess->sep_name);
2066 		if (sep) sep[0] = 0;
2067 		upd->name = gf_strdup(name);
2068 		upd->val = sep ? gf_strdup(sep+1) : NULL;
2069 		if (sep) sep[0] = fsess->sep_name;
2070 	} else {
2071 		upd->name = gf_strdup(name);
2072 		upd->val = gf_strdup(val);
2073 	}
2074 	upd->recursive = propagate_mask;
2075 	gf_fs_post_task(fsess, gf_filter_update_arg_task, filter, NULL, "update_arg", upd);
2076 }
2077 
probe_meta_check_builtin_format(GF_FilterSession * fsess,GF_FilterRegister * freg,const char * url,const char * mime,char * fargs)2078 static GF_FilterProbeScore probe_meta_check_builtin_format(GF_FilterSession *fsess, GF_FilterRegister *freg, const char *url, const char *mime, char *fargs)
2079 {
2080 	char szExt[100];
2081 	const char *ext = gf_file_ext_start(url);
2082 	u32 len=0, i, j, count = gf_list_count(fsess->registry);
2083 	if (ext) {
2084 		ext++;
2085 		len = (u32) strlen(ext);
2086 	}
2087 	//check in filter args if we have a format set, in which case replace URL ext by the given format
2088 	if (fargs) {
2089 		char szExtN[10];
2090 		char *ext_arg;
2091 		sprintf(szExtN, "ext%c", fsess->sep_name);
2092 		ext_arg = strstr(fargs, szExtN);
2093 		if (ext_arg) {
2094 			char *next_arg;
2095 			ext_arg+=4;
2096 			next_arg = strchr(ext_arg, fsess->sep_args);
2097 			if (next_arg) {
2098 				len = (u32) (next_arg - ext_arg);
2099 			} else {
2100 				len = (u32) strlen(ext_arg);
2101 			}
2102 			if (len>99) len=99;
2103 			strncpy(szExt, ext_arg, len);
2104 			szExt[len] = 0;
2105 			ext = szExt;
2106 		}
2107 	}
2108 
2109 	if (mime) {
2110 		if (strstr(mime, "/mp4")) return GF_FPROBE_MAYBE_SUPPORTED;
2111 	}
2112 
2113 	for (i=0; i<count; i++) {
2114 		const GF_FilterArgs *dst_arg=NULL;
2115 		GF_FilterRegister *reg = gf_list_get(fsess->registry, i);
2116 		if (reg==freg) continue;
2117 		if (reg->flags & GF_FS_REG_META) continue;
2118 
2119 		j=0;
2120 		while (reg->args) {
2121 			dst_arg = &reg->args[j];
2122 			if (!dst_arg || !dst_arg->arg_name) {
2123 				dst_arg=NULL;
2124 				break;
2125 			}
2126 			if (!strcmp(dst_arg->arg_name, "dst")) break;
2127 			dst_arg = NULL;
2128 			j++;
2129 		}
2130 		/*check prober*/
2131 		if (dst_arg) {
2132 			if (reg->probe_url) {
2133 				GF_FilterProbeScore s = reg->probe_url(url, mime);
2134 				if (s==GF_FPROBE_SUPPORTED)
2135 					return GF_FPROBE_MAYBE_SUPPORTED;
2136 			}
2137 			continue;
2138 		}
2139 
2140 		/* check muxers*/
2141 		for (j=0; j<reg->nb_caps; j++) {
2142 			char *value=NULL;
2143 			const char *pattern = NULL;
2144 			const GF_FilterCapability *cap = &reg->caps[j];
2145 			if (! (cap->flags & GF_CAPFLAG_OUTPUT) )
2146 				continue;
2147 			if (cap->flags & GF_CAPFLAG_EXCLUDED)
2148 				continue;
2149 
2150 			if (cap->code==GF_PROP_PID_FILE_EXT) {
2151 				if (ext) {
2152 					value = cap->val.value.string;
2153 					pattern = ext;
2154 				}
2155 			} else if (cap->code==GF_PROP_PID_MIME) {
2156 				if (mime) {
2157 					value = cap->val.value.string;
2158 					pattern = mime;
2159 				}
2160 			}
2161 			while (value) {
2162 				char *match = strstr(value, pattern);
2163 				if (!match) break;
2164 				if (!match[len] || match[len]=='|')
2165 					return GF_FPROBE_MAYBE_SUPPORTED;
2166 				value = match+1;
2167 			}
2168 		}
2169 	}
2170 	return GF_FPROBE_SUPPORTED;
2171 }
2172 
2173 
gf_fs_load_source_dest_internal(GF_FilterSession * fsess,const char * url,const char * user_args,const char * parent_url,GF_Err * err,GF_Filter * filter,GF_Filter * dst_filter,Bool for_source,Bool no_args_inherit,Bool * probe_only)2174 GF_Filter *gf_fs_load_source_dest_internal(GF_FilterSession *fsess, const char *url, const char *user_args, const char *parent_url, GF_Err *err, GF_Filter *filter, GF_Filter *dst_filter, Bool for_source, Bool no_args_inherit, Bool *probe_only)
2175 {
2176 	GF_FilterProbeScore score = GF_FPROBE_NOT_SUPPORTED;
2177 	GF_FilterRegister *candidate_freg=NULL;
2178 	GF_Filter *alias_for_filter = NULL;
2179 	const GF_FilterArgs *src_dst_arg=NULL;
2180 	u32 i, count, user_args_len, arg_type;
2181 	char szForceReg[20];
2182 	char szMime[50];
2183 	GF_Err e;
2184 	const char *force_freg = NULL;
2185 	char *sURL, *mime_type, *args, *sep;
2186 	char szExt[50];
2187 	Bool free_url=GF_FALSE;
2188 	memset(szExt, 0, sizeof(szExt));
2189 
2190 	if (err) *err = GF_OK;
2191 
2192 	mime_type = NULL;
2193 	//destination, extract mime from arguments
2194 	if (!for_source) {
2195 		sprintf(szMime, "%cmime=", fsess->sep_args);
2196 		mime_type = strstr(url, szMime);
2197 		if (!mime_type && user_args)
2198 			mime_type = strstr(user_args, szMime);
2199 
2200 		if (mime_type) {
2201 			strncpy(szMime, mime_type+6, 49);
2202 			szMime[49]=0;
2203 			sep = strchr(szMime, fsess->sep_args);
2204 			if (sep) sep[0] = 0;
2205 			mime_type = szMime;
2206 		}
2207 	}
2208 	sURL = NULL;
2209 	if (!url || !strncmp(url, "\\\\", 2) ) {
2210 		return NULL;
2211 	}
2212 	if (filter) {
2213 		sURL = (char *) url;
2214 	} else {
2215 		/*used by GUIs scripts to skip URL concatenation*/
2216 		if (!strncmp(url, "gpac://", 7)) sURL = gf_strdup(url+7);
2217 		/*opera-style localhost URLs*/
2218 		else if (!strncmp(url, "file://localhost", 16)) sURL = gf_strdup(url+16);
2219 		else if (parent_url) sURL = gf_url_concatenate(parent_url, url);
2220 
2221 		/*path absolute*/
2222 		if (!sURL) sURL = gf_strdup(url);
2223 		free_url=GF_TRUE;
2224 
2225 		if (!strncmp(sURL, "gpac://", 7)) {
2226 			u32 ulen = (u32) strlen(sURL+7);
2227 			memmove(sURL, sURL+7, ulen);
2228 			sURL[ulen]=0;
2229 		}
2230 
2231 		if (for_source && gf_url_is_local(sURL)) {
2232 			char *frag_par, *cgi, *ext_start;
2233 			char f_c=0;
2234 			gf_url_to_fs_path(sURL);
2235 			sep = (char *)gf_fs_path_escape_colon(fsess, sURL);
2236 			if (sep) sep[0] = 0;
2237 
2238 			ext_start = gf_file_ext_start(sURL);
2239 			if (!ext_start) ext_start = sURL;
2240 			frag_par = strchr(ext_start, '#');
2241 			cgi = strchr(ext_start, '?');
2242 			if (frag_par && cgi && (cgi<frag_par))
2243 				frag_par = cgi;
2244 
2245 			if (frag_par) {
2246 				f_c = frag_par[0];
2247 				frag_par[0] = 0;
2248 			}
2249 
2250 			if (strcmp(sURL, "null") && strcmp(sURL, "-") && strcmp(sURL, "stdin") && ! gf_file_exists(sURL)) {
2251 				if (sep) sep[0] = fsess->sep_args;
2252 				if (frag_par) frag_par[0] = f_c;
2253 
2254 				if (err) *err = GF_URL_ERROR;
2255 				gf_free(sURL);
2256 				return NULL;
2257 			}
2258 			if (frag_par) frag_par[0] = f_c;
2259 			if (sep) sep[0] = fsess->sep_args;
2260 		}
2261 	}
2262 	sep = (char *)gf_fs_path_escape_colon(fsess, sURL);
2263 
2264 	sprintf(szForceReg, "gfreg%c", fsess->sep_name);
2265 	force_freg = NULL;
2266 	if (sep) {
2267 		sep[0] = 0;
2268 		force_freg = strstr(sep+1, szForceReg);
2269 	}
2270 	if (!force_freg && user_args) {
2271 		force_freg = strstr(user_args, szForceReg);
2272 	}
2273 	if (force_freg)
2274 		force_freg += 6;
2275 
2276 restart:
2277 	//check all our registered filters
2278 	count = gf_list_count(fsess->registry);
2279 	for (i=0; i<count; i++) {
2280 		u32 j;
2281 		GF_FilterProbeScore s;
2282 		GF_FilterRegister *freg = gf_list_get(fsess->registry, i);
2283 		if (! freg->probe_url) continue;
2284 		if (force_freg && strncmp(force_freg, freg->name, strlen(freg->name))) continue;
2285 		if (! freg->args) continue;
2286 		if (filter && (gf_list_find(filter->blacklisted, freg) >=0)) continue;
2287 
2288 		j=0;
2289 		while (freg->args) {
2290 			src_dst_arg = &freg->args[j];
2291 			if (!src_dst_arg || !src_dst_arg->arg_name) {
2292 				src_dst_arg=NULL;
2293 				break;
2294 			}
2295 			if (for_source && !strcmp(src_dst_arg->arg_name, "src")) break;
2296 			else if (!for_source && !strcmp(src_dst_arg->arg_name, "dst")) break;
2297 			src_dst_arg = NULL;
2298 			j++;
2299 		}
2300 		if (!src_dst_arg)
2301 			continue;
2302 
2303 		s = freg->probe_url(sURL, mime_type);
2304 		/* destination meta filter: change GF_FPROBE_SUPPORTED to GF_FPROBE_MAYBE_SUPPORTED for internal mux formats
2305 		in order to avoid always giving the hand to the meta filter*/
2306 		if (!for_source && (s == GF_FPROBE_SUPPORTED) && (freg->flags & GF_FS_REG_META)) {
2307 			s = probe_meta_check_builtin_format(fsess, freg, sURL, mime_type, sep ? sep+1 : NULL);
2308 		}
2309 		//higher score, use this new registry
2310 		if (s > score) {
2311 			candidate_freg = freg;
2312 			score = s;
2313 		}
2314 		//same score and higher priority (0 being highest), use this new registry
2315 		else if ((s == score) && candidate_freg && (freg->priority<candidate_freg->priority) ) {
2316 			candidate_freg = freg;
2317 			score = s;
2318 		}
2319 	}
2320 	if (probe_only) {
2321 		*probe_only = candidate_freg ? GF_TRUE : GF_FALSE;
2322 		if (free_url)
2323 			gf_free(sURL);
2324 		if (err) *err = GF_OK;
2325 		return NULL;
2326 	}
2327 
2328 	if (!candidate_freg) {
2329 		if (force_freg) {
2330 			GF_LOG(GF_LOG_INFO, GF_LOG_FILTER, ("No source filter named %s found, retrying without forcing registry\n", force_freg));
2331 			force_freg = NULL;
2332 			goto restart;
2333 		}
2334 		if (free_url)
2335 			gf_free(sURL);
2336 		if (err) *err = GF_NOT_SUPPORTED;
2337 		if (filter) filter->finalized = GF_TRUE;
2338 		return NULL;
2339 	}
2340 	if (sep) sep[0] = fsess->sep_args;
2341 
2342 	user_args_len = user_args ? (u32) strlen(user_args) : 0;
2343 	args = gf_malloc(sizeof(char)*(5+strlen(sURL) + (user_args_len ? user_args_len + 8/*for potential :gpac: */  :0) ) );
2344 
2345 	sprintf(args, "%s%c", for_source ? "src" : "dst", fsess->sep_name);
2346 	strcat(args, sURL);
2347 	if (user_args_len) {
2348 		if (fsess->sep_args==':') strcat(args, ":gpac:");
2349 		else {
2350 			char szSep[2];
2351 			szSep[0] = fsess->sep_args;
2352 			szSep[1] = 0;
2353 			strcat(args, szSep);
2354 		}
2355 		strcat(args, user_args);
2356 	}
2357 
2358 	e = GF_OK;
2359 	arg_type = GF_FILTER_ARG_EXPLICIT_SINK;
2360 	if (for_source) {
2361 		if (no_args_inherit) arg_type = GF_FILTER_ARG_EXPLICIT_SOURCE_NO_DST_INHERIT;
2362 		else arg_type = GF_FILTER_ARG_EXPLICIT_SOURCE;
2363 	}
2364 
2365 	if (!for_source && candidate_freg->use_alias) {
2366 		u32 fcount = gf_list_count(fsess->filters);
2367 		for (i=0; i<fcount; i++) {
2368 			GF_Filter *f = gf_list_get(fsess->filters, i);
2369 			if (f->freg != candidate_freg) continue;
2370 			if (f->freg->use_alias(f, sURL, mime_type)) {
2371 				alias_for_filter = f;
2372 				break;
2373 			}
2374 		}
2375 	}
2376 
2377 	if (!filter) {
2378 		filter = gf_filter_new(fsess, candidate_freg, args, NULL, arg_type, err, alias_for_filter, GF_FALSE);
2379 	} else {
2380 		filter->freg = candidate_freg;
2381 		e = gf_filter_new_finalize(filter, args, arg_type);
2382 		if (err) *err = e;
2383 	}
2384 
2385 	if (free_url)
2386 		gf_free(sURL);
2387 
2388 	if (filter) {
2389 		if (filter->src_args) gf_free(filter->src_args);
2390 		filter->src_args = args;
2391 		//for link resolution
2392 		if (dst_filter && for_source)	{
2393 			if (gf_list_find(filter->destination_links, dst_filter)<0)
2394 				gf_list_add(filter->destination_links, dst_filter);
2395 			//to remember our connection target
2396 			filter->target_filter = dst_filter;
2397 		}
2398 	} else {
2399 		gf_free(args);
2400 	}
2401 
2402 	if (!e && filter && !filter->num_output_pids && for_source)
2403 		gf_filter_post_process_task(filter);
2404 
2405 	return filter;
2406 }
2407 
2408 GF_EXPORT
gf_fs_load_source(GF_FilterSession * fsess,const char * url,const char * args,const char * parent_url,GF_Err * err)2409 GF_Filter *gf_fs_load_source(GF_FilterSession *fsess, const char *url, const char *args, const char *parent_url, GF_Err *err)
2410 {
2411 	return gf_fs_load_source_dest_internal(fsess, url, args, parent_url, err, NULL, NULL, GF_TRUE, GF_FALSE, NULL);
2412 }
2413 
2414 GF_EXPORT
gf_fs_load_destination(GF_FilterSession * fsess,const char * url,const char * args,const char * parent_url,GF_Err * err)2415 GF_Filter *gf_fs_load_destination(GF_FilterSession *fsess, const char *url, const char *args, const char *parent_url, GF_Err *err)
2416 {
2417 	return gf_fs_load_source_dest_internal(fsess, url, args, parent_url, err, NULL, NULL, GF_FALSE, GF_FALSE, NULL);
2418 }
2419 
2420 
2421 GF_EXPORT
gf_filter_add_event_listener(GF_Filter * filter,GF_FSEventListener * el)2422 GF_Err gf_filter_add_event_listener(GF_Filter *filter, GF_FSEventListener *el)
2423 {
2424 	GF_Err e;
2425 	if (!filter || !filter->session || !el || !el->on_event) return GF_BAD_PARAM;
2426 	while (filter->session->in_event_listener) gf_sleep(1);
2427 	gf_mx_p(filter->session->evt_mx);
2428 	if (!filter->session->event_listeners) {
2429 		filter->session->event_listeners = gf_list_new();
2430 	}
2431 	e = gf_list_add(filter->session->event_listeners, el);
2432 	gf_mx_v(filter->session->evt_mx);
2433 	return e;
2434 }
2435 
2436 GF_EXPORT
gf_filter_remove_event_listener(GF_Filter * filter,GF_FSEventListener * el)2437 GF_Err gf_filter_remove_event_listener(GF_Filter *filter, GF_FSEventListener *el)
2438 {
2439 	if (!filter || !filter->session || !el || !filter->session->event_listeners) return GF_BAD_PARAM;
2440 
2441 	while (filter->session->in_event_listener) gf_sleep(1);
2442 	gf_mx_p(filter->session->evt_mx);
2443 	gf_list_del_item(filter->session->event_listeners, el);
2444 	if (!gf_list_count(filter->session->event_listeners)) {
2445 		gf_list_del(filter->session->event_listeners);
2446 		filter->session->event_listeners=NULL;
2447 	}
2448 	gf_mx_v(filter->session->evt_mx);
2449 	return GF_OK;
2450 }
2451 
2452 GF_EXPORT
gf_filter_forward_gf_event(GF_Filter * filter,GF_Event * evt,Bool consumed,Bool skip_user)2453 Bool gf_filter_forward_gf_event(GF_Filter *filter, GF_Event *evt, Bool consumed, Bool skip_user)
2454 {
2455 	if (!filter || !filter->session || filter->session->in_final_flush) return GF_FALSE;
2456 
2457 	if (filter->session->event_listeners) {
2458 		GF_FSEventListener *el;
2459 		u32 i=0;
2460 
2461 		gf_mx_p(filter->session->evt_mx);
2462 		filter->session->in_event_listener ++;
2463 		gf_mx_v(filter->session->evt_mx);
2464 		while ((el = gf_list_enum(filter->session->event_listeners, &i))) {
2465 			if (el->on_event(el->udta, evt, consumed)) {
2466 				filter->session->in_event_listener --;
2467 				return GF_TRUE;
2468 			}
2469 		}
2470 		filter->session->in_event_listener --;
2471 	}
2472 
2473 	if (!skip_user && !consumed && filter->session->ui_event_proc) {
2474 		Bool res;
2475 //		term->nb_calls_in_event_proc++;
2476 		res = gf_fs_ui_event(filter->session, evt);
2477 //		term->nb_calls_in_event_proc--;
2478 		return res;
2479 	}
2480 	return GF_FALSE;
2481 }
2482 
2483 GF_EXPORT
gf_filter_send_gf_event(GF_Filter * filter,GF_Event * evt)2484 Bool gf_filter_send_gf_event(GF_Filter *filter, GF_Event *evt)
2485 {
2486 	return gf_filter_forward_gf_event(filter, evt, GF_FALSE, GF_FALSE);
2487 }
2488 
2489 
gf_fs_print_jsf_connection(GF_FilterSession * session,char * filter_name,void (* print_fn)(FILE * output,GF_SysPrintArgFlags flags,const char * fmt,...))2490 static void gf_fs_print_jsf_connection(GF_FilterSession *session, char *filter_name, void (*print_fn)(FILE *output, GF_SysPrintArgFlags flags, const char *fmt, ...) )
2491 {
2492 	GF_CapsBundleStore capstore;
2493 	GF_Filter *js_filter;
2494 	const char *js_name = NULL;
2495 	GF_Err e=GF_OK;
2496 	u32 i, j, count, nb_js_caps;
2497 	GF_List *sources, *sinks;
2498 	GF_FilterRegister loaded_freg;
2499 	Bool has_output, has_input;
2500 
2501 	js_filter = gf_fs_load_filter(session, filter_name, &e);
2502 	if (!js_filter) return;
2503 
2504 	js_name = strrchr(filter_name, '/');
2505 	if (!js_name) js_name = strrchr(filter_name, '\\');
2506 	if (js_name) js_name++;
2507 	else js_name = filter_name;
2508 
2509 	nb_js_caps = gf_filter_caps_bundle_count(js_filter->forced_caps, js_filter->nb_forced_caps);
2510 
2511 	//fake a new register with only the caps set
2512 	memset(&loaded_freg, 0, sizeof(GF_FilterRegister));
2513 	loaded_freg.caps = js_filter->forced_caps;
2514 	loaded_freg.nb_caps = js_filter->nb_forced_caps;
2515 
2516 	has_output = gf_filter_has_out_caps(js_filter->forced_caps, js_filter->nb_forced_caps);
2517 	has_input = gf_filter_has_in_caps(js_filter->forced_caps, js_filter->nb_forced_caps);
2518 
2519 	memset(&capstore, 0, sizeof(GF_CapsBundleStore));
2520 	sources = gf_list_new();
2521 	sinks = gf_list_new();
2522 	//edges for JS are for the unloaded JSF (eg accept anything, output anything).
2523 	//we need to do a manual check
2524 	count = gf_list_count(session->links);
2525 	for (i=0; i<count; i++) {
2526 		u32 nb_src_caps, k, l;
2527 		Bool src_match = GF_FALSE;
2528 		Bool sink_match = GF_FALSE;
2529 		GF_FilterRegDesc *a_reg = gf_list_get(session->links, i);
2530 		if (a_reg->freg == js_filter->freg) continue;
2531 
2532 		//check which cap of this filter matches our destination
2533 		nb_src_caps = gf_filter_caps_bundle_count(a_reg->freg->caps, a_reg->freg->nb_caps);
2534 		for (k=0; k<nb_src_caps; k++) {
2535 			for (l=0; l<nb_js_caps; l++) {
2536 				s32 bundle_idx;
2537 				u32 loaded_filter_only_flags = 0;
2538 				u32 path_weight;
2539 				if (has_input && !src_match) {
2540 					path_weight = gf_filter_caps_to_caps_match(a_reg->freg, k, (const GF_FilterRegister *) &loaded_freg, NULL, &bundle_idx, l, &loaded_filter_only_flags, &capstore);
2541 					if (path_weight && (bundle_idx == l))
2542 						src_match = GF_TRUE;
2543 				}
2544 				if (has_output && !sink_match) {
2545 					loaded_filter_only_flags = 0;
2546 					path_weight = gf_filter_caps_to_caps_match(&loaded_freg, l, a_reg->freg, NULL, &bundle_idx, k, &loaded_filter_only_flags, &capstore);
2547 
2548 					if (path_weight && (bundle_idx == k))
2549 						sink_match = GF_TRUE;
2550 				}
2551 			}
2552 			if (src_match && sink_match)
2553 				break;
2554 		}
2555 		if (src_match) gf_list_add(sources, (void *) a_reg->freg);
2556 		if (sink_match) gf_list_add(sinks, (void *) a_reg->freg);
2557 	}
2558 
2559 	for (i=0; i<2; i++) {
2560 		GF_List *from = i ? sinks : sources;
2561 		char *type = i ? "sources" : "sinks";
2562 
2563 		count = gf_list_count(from);
2564 		if (!count) {
2565 			if (print_fn)
2566 				print_fn(stderr, 1, "%s: no %s\n", js_name, type);
2567 			else {
2568 				GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s: no %s\n", type));
2569 			}
2570 			continue;
2571 		}
2572 
2573 		if (print_fn)
2574 			print_fn(stderr, 1, "%s %s:", js_name, type);
2575 		else {
2576 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s %s:", js_name, type));
2577 		}
2578 		for (j=0; j<count; j++) {
2579 			GF_FilterRegister *a_reg = gf_list_get(from, j);
2580 			if (print_fn)
2581 				print_fn(stderr, 0, " %s", a_reg->name);
2582 			else {
2583 				GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" %s", a_reg->name));
2584 			}
2585 		}
2586 		if (print_fn)
2587 			print_fn(stderr, 0, "\n");
2588 		else {
2589 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
2590 		}
2591 	}
2592 
2593 	if (capstore.bundles_cap_found) gf_free(capstore.bundles_cap_found);
2594 	if (capstore.bundles_in_ok) gf_free(capstore.bundles_in_ok);
2595 	if (capstore.bundles_in_scores) gf_free(capstore.bundles_in_scores);
2596 	gf_list_del(sources);
2597 	gf_list_del(sinks);
2598 }
2599 
2600 GF_EXPORT
gf_fs_print_all_connections(GF_FilterSession * session,char * filter_name,void (* print_fn)(FILE * output,GF_SysPrintArgFlags flags,const char * fmt,...))2601 void gf_fs_print_all_connections(GF_FilterSession *session, char *filter_name, void (*print_fn)(FILE *output, GF_SysPrintArgFlags flags, const char *fmt, ...) )
2602 {
2603 	Bool found = GF_FALSE;
2604 	GF_List *done;
2605 	u32 i, j, count;
2606 	u32 llev = gf_log_get_tool_level(GF_LOG_FILTER);
2607 
2608 	gf_log_set_tool_level(GF_LOG_FILTER, GF_LOG_INFO);
2609 	//load JS to inspect its connections
2610 	if (filter_name && strstr(filter_name, ".js")) {
2611 		gf_fs_print_jsf_connection(session, filter_name, print_fn);
2612 		gf_log_set_tool_level(GF_LOG_FILTER, llev);
2613 		return;
2614 	}
2615 	done = gf_list_new();
2616 	count = gf_list_count(session->links);
2617 
2618 	for (i=0; i<count; i++) {
2619 		const GF_FilterRegDesc *src = gf_list_get(session->links, i);
2620 		if (filter_name && strcmp(src->freg->name, filter_name))
2621 			continue;
2622 
2623 		if (!src->nb_edges) {
2624 			if (print_fn)
2625 				print_fn(stderr, 1, "%s: no sources\n", src->freg->name);
2626 			else {
2627 				GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s: no sources\n", src->freg->name));
2628 			}
2629 			continue;
2630 		}
2631 		found = GF_TRUE;
2632 		if (print_fn)
2633 			print_fn(stderr, 1, "%s sources:", src->freg->name);
2634 		else {
2635 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s sources:", src->freg->name));
2636 		}
2637 
2638 		for (j=0; j<src->nb_edges; j++) {
2639 			if (gf_list_find(done, (void *) src->edges[j].src_reg->freg->name)<0) {
2640 				if (print_fn)
2641 					print_fn(stderr, 0, " %s", src->edges[j].src_reg->freg->name);
2642 				else {
2643 					GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" %s", src->edges[j].src_reg->freg->name));
2644 				}
2645 				gf_list_add(done, (void *) src->edges[j].src_reg->freg->name);
2646 			}
2647 		}
2648 		if (print_fn)
2649 			print_fn(stderr, 0, "\n");
2650 		else {
2651 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("\n"));
2652 		}
2653 		gf_list_reset(done);
2654 	}
2655 
2656 	if (found && filter_name) {
2657 		if (print_fn)
2658 			print_fn(stderr, 1, "%s sinks:", filter_name);
2659 		else {
2660 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, ("%s sinks:", filter_name));
2661 		}
2662 		count = gf_list_count(session->links);
2663 		for (i=0; i<count; i++) {
2664 			const GF_FilterRegDesc *src = gf_list_get(session->links, i);
2665 			if (!strcmp(src->freg->name, filter_name)) continue;
2666 
2667 			for (j=0; j<src->nb_edges; j++) {
2668 				if (strcmp(src->edges[j].src_reg->freg->name, filter_name)) continue;
2669 
2670 				if (gf_list_find(done, (void *) src->freg->name)<0) {
2671 					if (print_fn)
2672 						print_fn(stderr, 0, " %s", src->freg->name);
2673 					else {
2674 						GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" %s", src->freg->name));
2675 					}
2676 					gf_list_add(done, (void *) src->freg->name);
2677 				}
2678 			}
2679 			gf_list_reset(done);
2680 		}
2681 		if (print_fn)
2682 			print_fn(stderr, 1, " \n");
2683 		else {
2684 			GF_LOG(GF_LOG_INFO, GF_LOG_APP, (" \n"));
2685 		}
2686 	}
2687 
2688 	if (!found && filter_name) {
2689 		if (print_fn)
2690 			print_fn(stderr, 1, "%s filter not found\n", filter_name);
2691 		else {
2692 			GF_LOG(GF_LOG_ERROR, GF_LOG_APP, ("%s filter not found\n", filter_name));
2693 		}
2694 	}
2695 	gf_list_del(done);
2696 	gf_log_set_tool_level(GF_LOG_FILTER, llev);
2697 }
2698 
2699 
2700 GF_EXPORT
gf_filter_get_session_caps(GF_Filter * filter,GF_FilterSessionCaps * caps)2701 void gf_filter_get_session_caps(GF_Filter *filter, GF_FilterSessionCaps *caps)
2702 {
2703 	if (caps) {
2704 		if (filter) {
2705 			(*caps) = filter->session->caps;
2706 		} else {
2707 			memset(caps, 0, sizeof(GF_FilterSessionCaps));
2708 		}
2709 	}
2710 }
2711 
2712 GF_EXPORT
gf_filter_set_session_caps(GF_Filter * filter,GF_FilterSessionCaps * caps)2713 void gf_filter_set_session_caps(GF_Filter *filter, GF_FilterSessionCaps *caps)
2714 {
2715 	if (caps && filter) {
2716 		filter->session->caps = (*caps);
2717 		//TODO fire event
2718 	}
2719 }
2720 
2721 GF_EXPORT
gf_filter_get_sep(GF_Filter * filter,GF_FilterSessionSepType sep_type)2722 u8 gf_filter_get_sep(GF_Filter *filter, GF_FilterSessionSepType sep_type)
2723 {
2724 	switch (sep_type) {
2725 	case GF_FS_SEP_ARGS: return filter->session->sep_args;
2726 	case GF_FS_SEP_NAME: return filter->session->sep_name;
2727 	case GF_FS_SEP_FRAG: return filter->session->sep_frag;
2728 	case GF_FS_SEP_LIST: return filter->session->sep_list;
2729 	case GF_FS_SEP_NEG: return filter->session->sep_neg;
2730 	default: return 0;
2731 	}
2732 }
2733 
2734 GF_EXPORT
gf_filter_get_download_manager(GF_Filter * filter)2735 GF_DownloadManager *gf_filter_get_download_manager(GF_Filter *filter)
2736 {
2737 	GF_FilterSession *fsess;
2738 	if (!filter) return NULL;
2739 	fsess = filter->session;
2740 
2741 	if (!fsess->download_manager) {
2742 		fsess->download_manager = gf_dm_new(fsess);
2743 	}
2744 	return fsess->download_manager;
2745 }
2746 
2747 GF_EXPORT
gf_filter_get_font_manager(GF_Filter * filter)2748 struct _gf_ft_mgr *gf_filter_get_font_manager(GF_Filter *filter)
2749 {
2750 	GF_FilterSession *fsess;
2751 	if (!filter) return NULL;
2752 	fsess = filter->session;
2753 
2754 	if (!fsess->font_manager) {
2755 		fsess->font_manager = gf_font_manager_new();
2756 	}
2757 	return fsess->font_manager;
2758 }
2759 
gf_fs_cleanup_filters(GF_FilterSession * fsess)2760 void gf_fs_cleanup_filters(GF_FilterSession *fsess)
2761 {
2762 	assert(fsess->pid_connect_tasks_pending);
2763 	safe_int_dec(&fsess->pid_connect_tasks_pending);
2764 }
2765 
2766 GF_EXPORT
gf_fs_get_last_connect_error(GF_FilterSession * fs)2767 GF_Err gf_fs_get_last_connect_error(GF_FilterSession *fs)
2768 {
2769 	GF_Err e;
2770 	if (!fs) return GF_BAD_PARAM;
2771 	e = fs->last_connect_error;
2772 	fs->last_connect_error = GF_OK;
2773 	return e;
2774 }
2775 
2776 GF_EXPORT
gf_fs_get_last_process_error(GF_FilterSession * fs)2777 GF_Err gf_fs_get_last_process_error(GF_FilterSession *fs)
2778 {
2779 	GF_Err e;
2780 	if (!fs) return GF_BAD_PARAM;
2781 	e = fs->last_process_error;
2782 	fs->last_process_error = GF_OK;
2783 	return e;
2784 }
2785 
2786 typedef struct
2787 {
2788 	GF_FilterSession *fsess;
2789 	void *callback;
2790 	Bool (*task_execute) (GF_FilterSession *fsess, void *callback, u32 *reschedule_ms);
2791 	Bool (*task_execute_filter) (GF_Filter *filter, void *callback, u32 *reschedule_ms);
2792 #ifndef GPAC_DISABLE_REMOTERY
2793 	rmtU32 rmt_hash;
2794 #endif
2795 } GF_UserTask;
2796 
gf_fs_user_task(GF_FSTask * task)2797 static void gf_fs_user_task(GF_FSTask *task)
2798 {
2799 	u32 reschedule_ms=0;
2800 	GF_UserTask *utask = (GF_UserTask *)task->udta;
2801 	task->schedule_next_time = 0;
2802 
2803 #ifndef GPAC_DISABLE_REMOTERY
2804 	gf_rmt_begin_hash(task->log_name, GF_RMT_AGGREGATE, &utask->rmt_hash);
2805 #endif
2806 	if (utask->task_execute) {
2807 		task->requeue_request = utask->task_execute(utask->fsess, utask->callback, &reschedule_ms);
2808 	} else if (task->filter) {
2809 		task->requeue_request = utask->task_execute_filter(task->filter, utask->callback, &reschedule_ms);
2810 	} else {
2811 		task->requeue_request = 0;
2812 	}
2813 	gf_rmt_end();
2814 	//if no requeue request or if we are in final flush, don't re-execute
2815 	if (!task->requeue_request || utask->fsess->in_final_flush) {
2816 		gf_free(utask);
2817 		task->udta = NULL;
2818 		task->requeue_request = GF_FALSE;
2819 	} else {
2820 		task->schedule_next_time = gf_sys_clock_high_res() + 1000*reschedule_ms;
2821 	}
2822 }
2823 
2824 GF_EXPORT
gf_fs_post_user_task(GF_FilterSession * fsess,Bool (* task_execute)(GF_FilterSession * fsess,void * callback,u32 * reschedule_ms),void * udta_callback,const char * log_name)2825 GF_Err gf_fs_post_user_task(GF_FilterSession *fsess, Bool (*task_execute) (GF_FilterSession *fsess, void *callback, u32 *reschedule_ms), void *udta_callback, const char *log_name)
2826 {
2827 	GF_UserTask *utask;
2828 	if (!fsess || !task_execute) return GF_BAD_PARAM;
2829 	GF_SAFEALLOC(utask, GF_UserTask);
2830 	if (!utask) return GF_OUT_OF_MEM;
2831 	utask->fsess = fsess;
2832 	utask->callback = udta_callback;
2833 	utask->task_execute = task_execute;
2834 	gf_fs_post_task(fsess, gf_fs_user_task, NULL, NULL, log_name ? log_name : "user_task", utask);
2835 	return GF_OK;
2836 }
2837 
2838 GF_EXPORT
gf_filter_post_task(GF_Filter * filter,Bool (* task_execute)(GF_Filter * filter,void * callback,u32 * reschedule_ms),void * udta,const char * task_name)2839 GF_Err gf_filter_post_task(GF_Filter *filter, Bool (*task_execute) (GF_Filter *filter, void *callback, u32 *reschedule_ms), void *udta, const char *task_name)
2840 {
2841 	GF_UserTask *utask;
2842 	if (!filter || !task_execute) return GF_BAD_PARAM;
2843 	GF_SAFEALLOC(utask, GF_UserTask);
2844 	if (!utask) return GF_OUT_OF_MEM;
2845 	utask->callback = udta;
2846 	utask->task_execute_filter = task_execute;
2847 	utask->fsess = filter->session;
2848 	gf_fs_post_task(filter->session, gf_fs_user_task, filter, NULL, task_name ? task_name : "user_task", utask);
2849 	return GF_OK;
2850 }
2851 
2852 
2853 GF_EXPORT
gf_fs_is_last_task(GF_FilterSession * fsess)2854 Bool gf_fs_is_last_task(GF_FilterSession *fsess)
2855 {
2856 	if (!fsess) return GF_TRUE;
2857 	if (fsess->tasks_pending>1) return GF_FALSE;
2858 	if (gf_fq_count(fsess->main_thread_tasks)) return GF_FALSE;
2859 	if (gf_fq_count(fsess->tasks)) return GF_FALSE;
2860 	return GF_TRUE;
2861 }
2862 
2863 GF_EXPORT
gf_fs_mime_supported(GF_FilterSession * fsess,const char * mime)2864 Bool gf_fs_mime_supported(GF_FilterSession *fsess, const char *mime)
2865 {
2866 	u32 i, count;
2867 	//first pass on explicit mimes
2868 	count = gf_list_count(fsess->registry);
2869 	for (i=0; i<count; i++) {
2870 		u32 j;
2871 		const GF_FilterRegister *freg = gf_list_get(fsess->registry, i);
2872 		for (j=0; j<freg->nb_caps; j++) {
2873 			const GF_FilterCapability *acap = &freg->caps[j];
2874 			if (!(acap->flags & GF_CAPFLAG_INPUT)) continue;
2875 			if (acap->code == GF_PROP_PID_MIME) {
2876 				if (acap->val.value.string && strstr(acap->val.value.string, mime)) return GF_TRUE;
2877 			}
2878 		}
2879 	}
2880 	return GF_FALSE;
2881 }
2882 
2883 
2884 GF_EXPORT
gf_fs_enable_reporting(GF_FilterSession * session,Bool reporting_on)2885 void gf_fs_enable_reporting(GF_FilterSession *session, Bool reporting_on)
2886 {
2887 	if (session) session->reporting_on = reporting_on;
2888 }
2889 
2890 GF_EXPORT
gf_fs_lock_filters(GF_FilterSession * session,Bool do_lock)2891 void gf_fs_lock_filters(GF_FilterSession *session, Bool do_lock)
2892 {
2893 	if (!session || !session->filters_mx) return;
2894 	if (do_lock) gf_mx_p(session->filters_mx);
2895 	else gf_mx_v(session->filters_mx);
2896 }
2897 
2898 GF_EXPORT
gf_fs_get_filters_count(GF_FilterSession * session)2899 u32 gf_fs_get_filters_count(GF_FilterSession *session)
2900 {
2901 	return session ? gf_list_count(session->filters) : 0;
2902 }
2903 
2904 GF_EXPORT
gf_fs_get_filter_stats(GF_FilterSession * session,u32 idx,GF_FilterStats * stats)2905 GF_Err gf_fs_get_filter_stats(GF_FilterSession *session, u32 idx, GF_FilterStats *stats)
2906 {
2907 	GF_Filter *f;
2908 	u32 i;
2909 	Bool set_name=GF_FALSE;
2910 	if (!stats || !session) return GF_BAD_PARAM;
2911 	memset(stats, 0, sizeof(GF_FilterStats));
2912 	f = gf_list_get(session->filters, idx);
2913 	if (!f) return GF_BAD_PARAM;
2914 	stats->filter = f;
2915 	stats->filter_alias = f->multi_sink_target;
2916 	if (f->multi_sink_target) return GF_OK;
2917 
2918 	stats->percent = f->status_percent>10000 ? -1 : (s32) f->status_percent;
2919 	stats->status = f->status_str;
2920 	stats->nb_pck_processed = f->nb_pck_processed;
2921 	stats->nb_bytes_processed = f->nb_bytes_processed;
2922 	stats->time_process = f->time_process;
2923 	stats->nb_hw_pck_sent = f->nb_hw_pck_sent;
2924 	stats->nb_pck_sent = f->nb_pck_sent;
2925 	stats->nb_bytes_sent = f->nb_bytes_sent;
2926 	stats->nb_tasks_done = f->nb_tasks_done;
2927 	stats->nb_errors = f->nb_errors;
2928 	stats->name = f->name;
2929 	stats->reg_name = f->freg->name;
2930 	stats->filter_id = f->id;
2931 	stats->done = f->removed || f->finalized;
2932 	if (stats->name && !strcmp(stats->name, stats->reg_name)) {
2933 		set_name=GF_TRUE;
2934 	}
2935 	stats->report_updated = f->report_updated;
2936 	f->report_updated = GF_FALSE;
2937 
2938 
2939 	if (!stats->nb_pid_out && stats->nb_pid_in) stats->type = GF_FS_STATS_FILTER_RAWOUT;
2940 	else if (!stats->nb_pid_in && stats->nb_pid_out) stats->type = GF_FS_STATS_FILTER_RAWIN;
2941 
2942 	stats->nb_pid_out = f->num_output_pids;
2943 	for (i=0; i<f->num_output_pids; i++) {
2944 		GF_FilterPid *pid = gf_list_get(f->output_pids, i);
2945 		stats->nb_out_pck += pid->nb_pck_sent;
2946 		if (pid->has_seen_eos) stats->in_eos = GF_TRUE;
2947 
2948 		if (f->num_output_pids!=1) continue;
2949 
2950 		if (!stats->codecid)
2951 			stats->codecid = pid->codecid;
2952 		if (!stats->stream_type)
2953 			stats->stream_type = pid->stream_type;
2954 
2955 		//set name if PID name is not a default generated one
2956 		if (set_name && strncmp(pid->name, "PID", 3)) {
2957 			stats->name = pid->name;
2958 			set_name = GF_FALSE;
2959 		}
2960 	}
2961 	stats->nb_pid_in = f->num_input_pids;
2962 	for (i=0; i<f->num_input_pids; i++) {
2963 		GF_FilterPidInst *pidi = gf_list_get(f->input_pids, i);
2964 		stats->nb_in_pck += pidi->nb_processed;
2965 		if (pidi->is_end_of_stream) stats->in_eos = GF_TRUE;
2966 
2967 		if (pidi->is_decoder_input) stats->type = GF_FS_STATS_FILTER_DECODE;
2968 		else if (pidi->is_encoder_input) stats->type = GF_FS_STATS_FILTER_ENCODE;
2969 
2970 		if (pidi->pid->stream_type==GF_STREAM_FILE)
2971 			stats->type = GF_FS_STATS_FILTER_DEMUX;
2972 
2973 		if ((f->num_input_pids!=1) && f->num_output_pids)
2974 			continue;
2975 
2976 		if (!stats->codecid)
2977 			stats->codecid = pidi->pid->codecid;
2978 		if (!stats->stream_type)
2979 			stats->stream_type = pidi->pid->stream_type;
2980 
2981 		if (set_name) {
2982 			stats->name = pidi->pid->name;
2983 			set_name = GF_FALSE;
2984 		}
2985 	}
2986 	return GF_OK;
2987 }
2988 
gf_fs_ui_event(GF_FilterSession * session,GF_Event * uievt)2989 Bool gf_fs_ui_event(GF_FilterSession *session, GF_Event *uievt)
2990 {
2991 	Bool ret;
2992 	gf_mx_p(session->ui_mx);
2993 	ret = session->ui_event_proc(session->ui_opaque, uievt);
2994 	gf_mx_v(session->ui_mx);
2995 	return ret;
2996 }
2997 
gf_fs_check_graph_load(GF_FilterSession * fsess,Bool for_load)2998 void gf_fs_check_graph_load(GF_FilterSession *fsess, Bool for_load)
2999 {
3000 	if (for_load) {
3001 		if (!fsess->links || ! gf_list_count( fsess->links))
3002 			gf_filter_sess_build_graph(fsess, NULL);
3003 	} else {
3004 		if (fsess->flags & GF_FS_FLAG_NO_GRAPH_CACHE)
3005 			gf_filter_sess_reset_graph(fsess, NULL);
3006 	}
3007 }
3008 
3009 #ifndef GPAC_DISABLE_3D
fsess_on_event(void * cbk,GF_Event * evt)3010 static Bool fsess_on_event(void *cbk, GF_Event *evt)
3011 {
3012 	return GF_TRUE;
3013 }
3014 
gf_fs_check_gl_provider(GF_FilterSession * session)3015 GF_Err gf_fs_check_gl_provider(GF_FilterSession *session)
3016 {
3017 	GF_Event evt;
3018 	GF_Err e;
3019 	const char *sOpt;
3020 	void *os_disp_handler;
3021 
3022 	if (!session->nb_gl_filters) return GF_OK;
3023 	if (gf_list_count(session->gl_providers)) return GF_OK;
3024 
3025 	if (session->gl_driver) return GF_OK;
3026 
3027 
3028 	session->gl_driver = (GF_VideoOutput *) gf_module_load(GF_VIDEO_OUTPUT_INTERFACE, gf_opts_get_key("core", "video-output") );
3029 
3030 	if (!session->gl_driver) {
3031 		GF_LOG(GF_LOG_ERROR, GF_LOG_FILTER, ("Failed to load a video output for OpenGL context support !\n"));
3032 		return GF_IO_ERR;
3033 	}
3034 	if (!gf_opts_get_key("core", "video-output")) {
3035 		gf_opts_set_key("core", "video-output", session->gl_driver->module_name);
3036 	}
3037 	session->gl_driver->on_event = fsess_on_event;
3038 	session->gl_driver->evt_cbk_hdl = session;
3039 
3040 	os_disp_handler = NULL;
3041 	sOpt = gf_opts_get_key("Temp", "OSDisp");
3042 	if (sOpt) sscanf(sOpt, "%p", &os_disp_handler);
3043 
3044 	e = session->gl_driver->Setup(session->gl_driver, NULL, os_disp_handler, GF_TERM_INIT_HIDE);
3045 	if (e!=GF_OK) {
3046 		GF_LOG(GF_LOG_WARNING, GF_LOG_FILTER, ("Failed to setup Video Driver %s!\n", session->gl_driver->module_name));
3047 		gf_modules_close_interface((GF_BaseInterface *)session->gl_driver);
3048 		session->gl_driver = NULL;
3049 		return e;
3050 	}
3051 
3052 	//and initialize GL context
3053 	memset(&evt, 0, sizeof(GF_Event));
3054 	evt.type = GF_EVENT_VIDEO_SETUP;
3055 	evt.setup.width = 128;
3056 	evt.setup.height = 128;
3057 	evt.setup.use_opengl = GF_TRUE;
3058 	evt.setup.back_buffer = 1;
3059 	//we anyway should'nt call swapBuffer/flush on this object
3060 	evt.setup.disable_vsync = GF_TRUE;
3061 	session->gl_driver->ProcessEvent(session->gl_driver, &evt);
3062 
3063 	if (evt.setup.use_opengl) {
3064 		gf_opengl_init();
3065 	}
3066 	return GF_OK;
3067 }
3068 
gf_fs_set_gl(GF_FilterSession * session)3069 GF_Err gf_fs_set_gl(GF_FilterSession *session)
3070 {
3071 	GF_Event evt;
3072 	if (!session->gl_driver) return GF_BAD_PARAM;
3073 	memset(&evt, 0, sizeof(GF_Event));
3074 	evt.type = GF_EVENT_SET_GL;
3075 	return session->gl_driver->ProcessEvent(session->gl_driver, &evt);
3076 }
3077 #endif
3078 
3079 
3080 #ifdef FILTER_FIXME
3081 
3082 
term_find_res(GF_TermLocales * loc,char * parent,char * path,char * relocated_path,char * localized_rel_path)3083 static Bool term_find_res(GF_TermLocales *loc, char *parent, char *path, char *relocated_path, char *localized_rel_path)
3084 {
3085 	FILE *f;
3086 
3087 	if (loc->szAbsRelocatedPath) gf_free(loc->szAbsRelocatedPath);
3088 	loc->szAbsRelocatedPath = gf_url_concatenate(parent, path);
3089 	if (!loc->szAbsRelocatedPath) loc->szAbsRelocatedPath = gf_strdup(path);
3090 
3091 	f = gf_fopen(loc->szAbsRelocatedPath, "rb");
3092 	if (f) {
3093 		gf_fclose(f);
3094 		strcpy(localized_rel_path, path);
3095 		strcpy(relocated_path, loc->szAbsRelocatedPath);
3096 		return 1;
3097 	}
3098 	return 0;
3099 }
3100 
3101 /* Checks if, for a given relative path, there exists a localized version in an given folder
3102    if this is the case, it returns the absolute localized path, otherwise it returns null.
3103    if the resource was localized, the last parameter is set to the localized relative path.
3104 */
term_check_locales(void * __self,const char * locales_parent_path,const char * rel_path,char * relocated_path,char * localized_rel_path)3105 static Bool term_check_locales(void *__self, const char *locales_parent_path, const char *rel_path, char *relocated_path, char *localized_rel_path)
3106 {
3107 	char path[GF_MAX_PATH];
3108 	const char *opt;
3109 
3110 	GF_TermLocales *loc = (GF_TermLocales*)__self;
3111 
3112 	/* Checks if the rel_path argument really contains a relative path (no ':', no '/' at the beginning) */
3113 	if (strstr(rel_path, "://") || (rel_path[0]=='/') || strstr(rel_path, ":\\") || !strncmp(rel_path, "\\\\", 2)) {
3114 		return 0;
3115 	}
3116 
3117 	/*Checks if the absolute path is really absolute and points to a local file (no http or else) */
3118 	if (!locales_parent_path ||
3119 	        (locales_parent_path && (locales_parent_path[0] != '/') && strstr(locales_parent_path, "://") && strnicmp(locales_parent_path, "file://", 7))) {
3120 		return 0;
3121 	}
3122 	opt = gf_opts_get_key("core", "lang");
3123 	if (opt && (!strcmp(opt, "*") || !strcmp(opt, "un") ) ) {
3124 		opt = NULL;
3125 	}
3126 
3127 	while (opt) {
3128 		char lan[100];
3129 		char *sep;
3130 		char *sep_lang = strchr(opt, ';');
3131 		if (sep_lang) sep_lang[0] = 0;
3132 
3133 		while (strchr(" \t", opt[0]))
3134 			opt++;
3135 
3136 		strcpy(lan, opt);
3137 
3138 		if (sep_lang) {
3139 			sep_lang[0] = ';';
3140 			opt = sep_lang+1;
3141 		} else {
3142 			opt = NULL;
3143 		}
3144 
3145 		while (1) {
3146 			sep = strstr(lan, "-*");
3147 			if (!sep) break;
3148 			strncpy(sep, sep+2, strlen(sep)-2);
3149 		}
3150 
3151 		sprintf(path, "locales/%s/%s", lan, rel_path);
3152 		if (term_find_res(loc, (char *) locales_parent_path, (char *) path, relocated_path, localized_rel_path))
3153 			return 1;
3154 
3155 		/*recursively remove region (sub)tags*/
3156 		while (1) {
3157 			sep = strrchr(lan, '-');
3158 			if (!sep) break;
3159 			sep[0] = 0;
3160 			sprintf(path, "locales/%s/%s", lan, rel_path);
3161 			if (term_find_res(loc, (char *) locales_parent_path, (char *) path, relocated_path, localized_rel_path))
3162 				return 1;
3163 		}
3164 	}
3165 
3166 	if (term_find_res(loc, (char *) locales_parent_path, (char *) rel_path, relocated_path, localized_rel_path))
3167 		return 1;
3168 	/* if we did not find the localized file, both the relocated and localized strings are NULL */
3169 	strcpy(localized_rel_path, "");
3170 	strcpy(relocated_path, "");
3171 	return 0;
3172 }
3173 
3174 #endif
3175 
3176