1 /* GStreamer
2  * Copyright (C) 2001 Wim Taymans <wim.taymans@gmail.com>
3  *               2004-2008 Edward Hervey <bilboed@bilboed.com>
4  *               2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
5  *               2014 Thibault Saunier <tsaunier@gnome.org>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include "nle.h"
28 
29 /**
30  * SECTION:element-nlecomposition
31  *
32  * A NleComposition contains NleObjects such as NleSources and NleOperations,
33  * and connects them dynamically to create a composition timeline.
34  */
35 
36 static GstStaticPadTemplate nle_composition_src_template =
37 GST_STATIC_PAD_TEMPLATE ("src",
38     GST_PAD_SRC,
39     GST_PAD_ALWAYS,
40     GST_STATIC_CAPS_ANY);
41 
42 GST_DEBUG_CATEGORY_STATIC (nlecomposition_debug);
43 #define GST_CAT_DEFAULT nlecomposition_debug
44 
45 #define _do_init              \
46   GST_DEBUG_CATEGORY_INIT (nlecomposition_debug,"nlecomposition", GST_DEBUG_FG_BLUE | GST_DEBUG_BOLD, "GNonLin Composition");
47 #define nle_composition_parent_class parent_class
48 
49 enum
50 {
51   PROP_0,
52   PROP_DEACTIVATED_ELEMENTS_STATE,
53   PROP_LAST,
54 };
55 
56 /* Properties from NleObject */
57 enum
58 {
59   NLEOBJECT_PROP_START,
60   NLEOBJECT_PROP_STOP,
61   NLEOBJECT_PROP_DURATION,
62   NLEOBJECT_PROP_LAST
63 };
64 
65 enum
66 {
67   COMMIT_SIGNAL,
68   COMMITED_SIGNAL,
69   LAST_SIGNAL
70 };
71 
72 typedef enum
73 {
74   COMP_UPDATE_STACK_INITIALIZE,
75   COMP_UPDATE_STACK_ON_COMMIT,
76   COMP_UPDATE_STACK_ON_EOS,
77   COMP_UPDATE_STACK_ON_SEEK,
78   COMP_UPDATE_STACK_NONE
79 } NleUpdateStackReason;
80 
81 static const char *UPDATE_PIPELINE_REASONS[] = {
82   "Initialize", "Commit", "EOS", "Seek"
83 };
84 
85 typedef struct
86 {
87   NleComposition *comp;
88   GstEvent *event;
89 } SeekData;
90 
91 typedef struct
92 {
93   NleComposition *comp;
94   NleObject *object;
95 } ChildIOData;
96 
97 typedef struct
98 {
99   NleComposition *comp;
100   gint32 seqnum;
101 
102   NleUpdateStackReason reason;
103 } UpdateCompositionData;
104 
105 typedef struct _Action
106 {
107   GCClosure closure;
108   gint priority;
109 } Action;
110 
111 struct _NleCompositionPrivate
112 {
113   gboolean dispose_has_run;
114 
115   /*
116      Sorted List of NleObjects , ThreadSafe
117      objects_start : sorted by start-time then priority
118      objects_stop : sorted by stop-time then priority
119      objects_hash : contains all controlled objects
120 
121      Those list should be manipulated exclusively in the main context
122      or while the task is totally stopped.
123    */
124   GList *objects_start;
125   GList *objects_stop;
126   GHashTable *objects_hash;
127 
128   /* List of NleObject to be inserted or removed from the composition on the
129    * next commit */
130   GHashTable *pending_io;
131 
132   gulong ghosteventprobe;
133 
134   /* current stack, list of NleObject* */
135   GNode *current;
136 
137   /* List of NleObject whose start/duration will be the same as the composition */
138   GList *expandables;
139 
140   /* currently configured stack seek start/stop time.
141    * In forward playback:
142    *   - current_stack_start: The start of the current stack or the start value
143    *     of the seek if the stack has been seeked 'in the middle'
144    *   - current_stack_stop: The stop time of the current stack
145    *
146    * Reconstruct pipeline ONLY if seeking outside of those values
147    * FIXME : current_stack_start isn't always the earliest time before which the
148    * timeline doesn't need to be modified
149    */
150   GstClockTime current_stack_start;
151   GstClockTime current_stack_stop;
152 
153   /* Seek segment handler */
154   /* Represents the current segment that is being played,
155    * In forwards playback (logic is the same but swapping start and
156    * stop in backward playback):
157    *  - segment->start: start of the current segment being played,
158    *    at each stack change it will advance to the newly configured
159    *    stack start.
160    *  - segment->stop is the final stop of the segment being played.
161    *    if a seek with a stop time happened, it will be the stop time
162    *    otherwise, it will be the composition duration.
163    */
164   GstSegment *segment;
165 
166   /* Segment representing the last seek. Simply initialized
167    * segment if no seek occured. */
168   GstSegment *seek_segment;
169   guint64 next_base_time;
170 
171   /*
172      OUR sync_handler on the child_bus
173      We are called before nle_object_sync_handler
174    */
175   GstPadEventFunction nle_event_pad_func;
176   gboolean send_stream_start;
177 
178   /* Protect the actions list */
179   GMutex actions_lock;
180   GCond actions_cond;
181   GList *actions;
182   Action *current_action;
183 
184   gboolean running;
185   gboolean initialized;
186 
187   GstElement *current_bin;
188 
189   gboolean seeking_itself;
190   gint real_eos_seqnum;
191   gint next_eos_seqnum;
192   guint32 flush_seqnum;
193 
194   /* 0 means that we already received the right caps or segment */
195   gint seqnum_to_restart_task;
196   gboolean waiting_serialized_query_or_buffer;
197 
198   gboolean tearing_down_stack;
199   gboolean suppress_child_error;
200 
201   NleUpdateStackReason updating_reason;
202 };
203 
204 #define ACTION_CALLBACK(__action) (((GCClosure*) (__action))->callback)
205 
206 static guint _signals[LAST_SIGNAL] = { 0 };
207 
208 static GParamSpec *nleobject_properties[NLEOBJECT_PROP_LAST];
209 
210 G_DEFINE_TYPE_WITH_CODE (NleComposition, nle_composition, NLE_TYPE_OBJECT,
211     G_ADD_PRIVATE (NleComposition)
212     _do_init);
213 
214 #define OBJECT_IN_ACTIVE_SEGMENT(comp,element)      \
215   ((NLE_OBJECT_START(element) < comp->priv->current_stack_stop) &&  \
216    (NLE_OBJECT_STOP(element) >= comp->priv->current_stack_start))
217 
218 static void nle_composition_dispose (GObject * object);
219 static void nle_composition_finalize (GObject * object);
220 static void nle_composition_reset (NleComposition * comp);
221 
222 static gboolean nle_composition_add_object (GstBin * bin, GstElement * element);
223 
224 static gboolean
225 nle_composition_remove_object (GstBin * bin, GstElement * element);
226 
227 static GstStateChangeReturn
228 nle_composition_change_state (GstElement * element, GstStateChange transition);
229 
230 static inline void nle_composition_reset_target_pad (NleComposition * comp);
231 
232 static gboolean
233 seek_handling (NleComposition * comp, gint32 seqnum,
234     NleUpdateStackReason update_stack_reason);
235 static gint objects_start_compare (NleObject * a, NleObject * b);
236 static gint objects_stop_compare (NleObject * a, NleObject * b);
237 static GstClockTime get_current_position (NleComposition * comp);
238 
239 static gboolean update_pipeline (NleComposition * comp,
240     GstClockTime currenttime, gint32 seqnum,
241     NleUpdateStackReason update_stack_reason);
242 static gboolean nle_composition_commit_func (NleObject * object,
243     gboolean recurse);
244 static void update_start_stop_duration (NleComposition * comp);
245 
246 static gboolean
247 nle_composition_event_handler (GstPad * ghostpad, GstObject * parent,
248     GstEvent * event);
249 static void _relink_single_node (NleComposition * comp, GNode * node,
250     GstEvent * toplevel_seek);
251 static void _update_pipeline_func (NleComposition * comp,
252     UpdateCompositionData * ucompo);
253 static void _commit_func (NleComposition * comp,
254     UpdateCompositionData * ucompo);
255 static GstEvent *get_new_seek_event (NleComposition * comp, gboolean initial,
256     gboolean updatestoponly);
257 static gboolean _nle_composition_add_object (NleComposition * comp,
258     NleObject * object);
259 static gboolean _nle_composition_remove_object (NleComposition * comp,
260     NleObject * object);
261 static void _deactivate_stack (NleComposition * comp,
262     gboolean flush_downstream);
263 static gboolean _set_real_eos_seqnum_from_seek (NleComposition * comp,
264     GstEvent * event);
265 static void _emit_commited_signal_func (NleComposition * comp, gpointer udata);
266 static void _restart_task (NleComposition * comp);
267 static void
268 _add_action (NleComposition * comp, GCallback func, gpointer data,
269     gint priority);
270 static gboolean
271 _is_ready_to_restart_task (NleComposition * comp, GstEvent * event);
272 
273 
274 /* COMP_REAL_START: actual position to start current playback at. */
275 #define COMP_REAL_START(comp)                                                  \
276   (MAX (comp->priv->segment->start, NLE_OBJECT_START (comp)))
277 
278 #define COMP_REAL_STOP(comp)                                                   \
279   (GST_CLOCK_TIME_IS_VALID (comp->priv->segment->stop) ?                       \
280    (MIN (comp->priv->segment->stop, NLE_OBJECT_STOP (comp))) :                 \
281    NLE_OBJECT_STOP (comp))
282 
283 #define ACTIONS_LOCK(comp) G_STMT_START {                       \
284   GST_LOG_OBJECT (comp, "Getting ACTIONS_LOCK in thread %p",    \
285         g_thread_self());                                            \
286   g_mutex_lock(&((NleComposition*)comp)->priv->actions_lock);    \
287   GST_LOG_OBJECT (comp, "Got ACTIONS_LOCK in thread %p",        \
288         g_thread_self());                                            \
289 } G_STMT_END
290 
291 #define ACTIONS_UNLOCK(comp) G_STMT_START {                     \
292   g_mutex_unlock(&((NleComposition*)comp)->priv->actions_lock);  \
293   GST_LOG_OBJECT (comp, "Unlocked ACTIONS_LOCK in thread %p",   \
294         g_thread_self());                                            \
295 } G_STMT_END
296 
297 #define WAIT_FOR_AN_ACTION(comp) G_STMT_START {                     \
298   GST_LOG_OBJECT (comp, "Waiting for an action in thread %p",       \
299         g_thread_self());                                           \
300   g_cond_wait(&((NleComposition*)comp)->priv->actions_cond,         \
301       &((NleComposition*)comp)->priv->actions_lock);                  \
302   GST_LOG_OBJECT (comp, "Done WAITING for an action in thread %p",       \
303         g_thread_self());                                           \
304 } G_STMT_END
305 
306 #define SIGNAL_NEW_ACTION(comp) G_STMT_START {                     \
307   GST_LOG_OBJECT (comp, "Signalling new action from thread %p",       \
308         g_thread_self());                                           \
309   g_cond_signal(&((NleComposition*)comp)->priv->actions_cond);     \
310 } G_STMT_END
311 
312 #define GET_TASK_LOCK(comp)    (&(NLE_COMPOSITION(comp)->task_rec_lock))
313 
314 static inline gboolean
_have_to_flush_downstream(NleUpdateStackReason update_reason)315 _have_to_flush_downstream (NleUpdateStackReason update_reason)
316 {
317   if (update_reason == COMP_UPDATE_STACK_ON_COMMIT ||
318       update_reason == COMP_UPDATE_STACK_ON_SEEK ||
319       update_reason == COMP_UPDATE_STACK_INITIALIZE)
320     return TRUE;
321 
322   return FALSE;
323 }
324 
325 static void
_assert_proper_thread(NleComposition * comp)326 _assert_proper_thread (NleComposition * comp)
327 {
328   if (comp->task && gst_task_get_state (comp->task) != GST_TASK_STOPPED &&
329       g_thread_self () != comp->task->thread) {
330     g_warning ("Trying to touch children in a thread different from"
331         " its dedicated thread!");
332   }
333 }
334 
335 static void
_remove_actions_for_type(NleComposition * comp,GCallback callback)336 _remove_actions_for_type (NleComposition * comp, GCallback callback)
337 {
338   GList *tmp;
339 
340   ACTIONS_LOCK (comp);
341 
342   GST_LOG_OBJECT (comp, "finding action[callback=%s], action count = %d",
343       GST_DEBUG_FUNCPTR_NAME (callback), g_list_length (comp->priv->actions));
344   tmp = g_list_first (comp->priv->actions);
345   while (tmp != NULL) {
346     Action *act = tmp->data;
347     GList *removed = NULL;
348 
349     if (ACTION_CALLBACK (act) == callback) {
350       GST_LOG_OBJECT (comp, "remove action for callback %s",
351           GST_DEBUG_FUNCPTR_NAME (callback));
352       removed = tmp;
353       g_closure_unref ((GClosure *) act);
354       comp->priv->actions = g_list_remove_link (comp->priv->actions, removed);
355     }
356 
357     tmp = g_list_next (tmp);
358     if (removed)
359       g_list_free (removed);
360   }
361 
362   ACTIONS_UNLOCK (comp);
363 }
364 
365 static void
_execute_actions(NleComposition * comp)366 _execute_actions (NleComposition * comp)
367 {
368   NleCompositionPrivate *priv = comp->priv;
369 
370   ACTIONS_LOCK (comp);
371   if (priv->running == FALSE) {
372     GST_DEBUG_OBJECT (comp, "Not running anymore");
373 
374     ACTIONS_UNLOCK (comp);
375     return;
376   }
377 
378   if (priv->actions == NULL)
379     WAIT_FOR_AN_ACTION (comp);
380 
381   if (comp->priv->running == FALSE) {
382     GST_INFO_OBJECT (comp, "Done waiting but not running anymore");
383 
384     ACTIONS_UNLOCK (comp);
385     return;
386   }
387 
388   if (priv->actions) {
389     GValue params[1] = { G_VALUE_INIT };
390     GList *lact;
391 
392     GST_LOG_OBJECT (comp, "scheduled actions [%d]",
393         g_list_length (priv->actions));
394 
395     g_value_init (&params[0], G_TYPE_OBJECT);
396     g_value_set_object (&params[0], comp);
397 
398     lact = g_list_first (priv->actions);
399     priv->actions = g_list_remove_link (priv->actions, lact);
400     priv->current_action = lact->data;
401     ACTIONS_UNLOCK (comp);
402 
403     GST_INFO_OBJECT (comp, "Invoking %p:%s",
404         lact->data, GST_DEBUG_FUNCPTR_NAME ((ACTION_CALLBACK (lact->data))));
405     g_closure_invoke (lact->data, NULL, 1, params, NULL);
406     g_value_unset (&params[0]);
407 
408     ACTIONS_LOCK (comp);
409     g_closure_unref (lact->data);
410     g_list_free (lact);
411     priv->current_action = NULL;
412     ACTIONS_UNLOCK (comp);
413 
414     GST_LOG_OBJECT (comp, "remaining actions [%d]",
415         g_list_length (priv->actions));
416   } else {
417     ACTIONS_UNLOCK (comp);
418   }
419 }
420 
421 static void
_start_task(NleComposition * comp)422 _start_task (NleComposition * comp)
423 {
424   GstTask *task;
425 
426   ACTIONS_LOCK (comp);
427   comp->priv->running = TRUE;
428   ACTIONS_UNLOCK (comp);
429 
430   GST_OBJECT_LOCK (comp);
431 
432   task = comp->task;
433   if (task == NULL) {
434     gchar *taskname =
435         g_strdup_printf ("%s_update_management", GST_OBJECT_NAME (comp));
436 
437     task = gst_task_new ((GstTaskFunction) _execute_actions, comp, NULL);
438     gst_object_set_name (GST_OBJECT_CAST (task), taskname);
439     gst_task_set_lock (task, GET_TASK_LOCK (comp));
440     GST_DEBUG_OBJECT (comp, "created task %p", task);
441     comp->task = task;
442     g_free (taskname);
443   }
444 
445   gst_task_set_state (task, GST_TASK_STARTED);
446   GST_OBJECT_UNLOCK (comp);
447 }
448 
449 static gboolean
_stop_task(NleComposition * comp)450 _stop_task (NleComposition * comp)
451 {
452   gboolean res = TRUE;
453   GstTask *task;
454 
455   GST_INFO_OBJECT (comp, "Stoping children management task");
456 
457   ACTIONS_LOCK (comp);
458   comp->priv->running = FALSE;
459 
460   /*  Make sure we do not stay blocked trying to execute an action */
461   SIGNAL_NEW_ACTION (comp);
462   ACTIONS_UNLOCK (comp);
463 
464   GST_DEBUG_OBJECT (comp, "stop task");
465 
466   GST_OBJECT_LOCK (comp);
467   task = comp->task;
468   if (task == NULL)
469     goto no_task;
470   comp->task = NULL;
471   res = gst_task_set_state (task, GST_TASK_STOPPED);
472   GST_OBJECT_UNLOCK (comp);
473 
474   if (!gst_task_join (task))
475     goto join_failed;
476 
477   gst_object_unref (task);
478 
479   return res;
480 
481 no_task:
482   {
483     GST_OBJECT_UNLOCK (comp);
484 
485     /* this is not an error */
486     return TRUE;
487   }
488 join_failed:
489   {
490     /* this is bad, possibly the application tried to join the task from
491      * the task's thread. We install the task again so that it will be stopped
492      * again from the right thread next time hopefully. */
493     GST_OBJECT_LOCK (comp);
494     GST_DEBUG_OBJECT (comp, "join failed");
495     /* we can only install this task if there was no other task */
496     if (comp->task == NULL)
497       comp->task = task;
498     GST_OBJECT_UNLOCK (comp);
499 
500     return FALSE;
501   }
502 
503   return res;
504 }
505 
506 static void
_post_start_composition_update(NleComposition * comp,gint32 seqnum,NleUpdateStackReason reason)507 _post_start_composition_update (NleComposition * comp,
508     gint32 seqnum, NleUpdateStackReason reason)
509 {
510   GstMessage *msg;
511 
512   msg = gst_message_new_element (GST_OBJECT (comp),
513       gst_structure_new ("NleCompositionStartUpdate",
514           "reason", G_TYPE_STRING, UPDATE_PIPELINE_REASONS[reason], NULL));
515 
516   gst_message_set_seqnum (msg, seqnum);
517   gst_element_post_message (GST_ELEMENT (comp), msg);
518 }
519 
520 static void
_post_start_composition_update_done(NleComposition * comp,gint32 seqnum,NleUpdateStackReason reason)521 _post_start_composition_update_done (NleComposition * comp,
522     gint32 seqnum, NleUpdateStackReason reason)
523 {
524   GstMessage *msg = gst_message_new_element (GST_OBJECT (comp),
525       gst_structure_new ("NleCompositionUpdateDone",
526           "reason", G_TYPE_STRING, UPDATE_PIPELINE_REASONS[reason],
527           NULL));
528 
529   gst_message_set_seqnum (msg, seqnum);
530   gst_element_post_message (GST_ELEMENT (comp), msg);
531 }
532 
533 static void
_seek_pipeline_func(NleComposition * comp,SeekData * seekd)534 _seek_pipeline_func (NleComposition * comp, SeekData * seekd)
535 {
536   gdouble rate;
537   GstFormat format;
538   GstSeekFlags flags;
539   GstSeekType cur_type, stop_type;
540   gint64 cur, stop;
541   NleCompositionPrivate *priv = comp->priv;
542 
543   gst_event_parse_seek (seekd->event, &rate, &format, &flags,
544       &cur_type, &cur, &stop_type, &stop);
545 
546   GST_DEBUG_OBJECT (seekd->comp,
547       "start:%" GST_TIME_FORMAT " -- stop:%" GST_TIME_FORMAT "  flags:%d",
548       GST_TIME_ARGS (cur), GST_TIME_ARGS (stop), flags);
549 
550   gst_segment_do_seek (priv->segment,
551       rate, format, flags, cur_type, cur, stop_type, stop, NULL);
552   gst_segment_do_seek (priv->seek_segment,
553       rate, format, flags, cur_type, cur, stop_type, stop, NULL);
554 
555   GST_DEBUG_OBJECT (seekd->comp, "Segment now has flags:%d",
556       priv->segment->flags);
557 
558   /* FIXME: The idea was to avoid seeking on a stack if we know we will endup
559    * passed the end, but then we loose the flush, wich leads to hangs. Long
560    * term, we should just flush the stack instead to avoid the double seek. */
561 #if 0
562   if (priv->segment->start >= NLE_OBJECT_STOP (seekd->comp)) {
563     GST_INFO_OBJECT (seekd->comp,
564         "Start %" GST_TIME_FORMAT " > comp->stop: %" GST_TIME_FORMAT
565         " Not seeking", GST_TIME_ARGS (priv->segment->start),
566         GST_TIME_ARGS (NLE_OBJECT_STOP (seekd->comp)));
567     GST_FIXME_OBJECT (seekd->comp, "HANDLE error async!");
568     return;
569   }
570 #endif
571 
572   _post_start_composition_update (seekd->comp,
573       gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
574 
575   /* crop the segment start/stop values */
576   /* Only crop segment start value if we don't have a default object */
577   if (priv->expandables == NULL)
578     priv->segment->start =
579         MAX (priv->segment->start, NLE_OBJECT_START (seekd->comp));
580   priv->segment->stop =
581       MIN (priv->segment->stop, NLE_OBJECT_STOP (seekd->comp));
582 
583   priv->next_base_time = 0;
584 
585   seek_handling (seekd->comp, gst_event_get_seqnum (seekd->event),
586       COMP_UPDATE_STACK_ON_SEEK);
587 
588   _post_start_composition_update_done (seekd->comp,
589       gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
590 }
591 
592 /*  Must be called with OBJECTS_LOCK taken */
593 static void
_process_pending_entries(NleComposition * comp)594 _process_pending_entries (NleComposition * comp)
595 {
596   NleObject *object;
597   GHashTableIter iter;
598   gboolean deactivated_stack = FALSE;
599 
600   NleCompositionPrivate *priv = comp->priv;
601 
602   g_hash_table_iter_init (&iter, priv->pending_io);
603   while (g_hash_table_iter_next (&iter, (gpointer *) & object, NULL)) {
604     if (g_hash_table_contains (priv->objects_hash, object)) {
605 
606       if (GST_OBJECT_PARENT (object) == GST_OBJECT_CAST (priv->current_bin) &&
607           deactivated_stack == FALSE) {
608         deactivated_stack = TRUE;
609 
610         _deactivate_stack (comp, TRUE);
611       }
612 
613       _nle_composition_remove_object (comp, object);
614     } else {
615       /* take a new ref on object as the current one will be released when
616        * object is removed from pending_io */
617       _nle_composition_add_object (comp, gst_object_ref (object));
618     }
619   }
620 
621   g_hash_table_remove_all (priv->pending_io);
622 }
623 
624 
625 static inline gboolean
_commit_values(NleComposition * comp)626 _commit_values (NleComposition * comp)
627 {
628   GList *tmp;
629   gboolean commited = FALSE;
630   NleCompositionPrivate *priv = comp->priv;
631 
632   for (tmp = priv->objects_start; tmp; tmp = tmp->next) {
633     if (nle_object_commit (tmp->data, TRUE))
634       commited = TRUE;
635   }
636 
637   GST_DEBUG_OBJECT (comp, "Linking up commit vmethod");
638   commited |= NLE_OBJECT_CLASS (parent_class)->commit (NLE_OBJECT (comp), TRUE);
639 
640   return commited;
641 }
642 
643 static gboolean
_commit_all_values(NleComposition * comp)644 _commit_all_values (NleComposition * comp)
645 {
646   NleCompositionPrivate *priv = comp->priv;
647 
648   priv->next_base_time = 0;
649 
650   _process_pending_entries (comp);
651 
652   if (_commit_values (comp) == FALSE) {
653 
654     return FALSE;;
655   }
656 
657   /* The topology of the composition might have changed, update the lists */
658   priv->objects_start = g_list_sort
659       (priv->objects_start, (GCompareFunc) objects_start_compare);
660   priv->objects_stop = g_list_sort
661       (priv->objects_stop, (GCompareFunc) objects_stop_compare);
662 
663   return TRUE;
664 }
665 
666 static gboolean
_initialize_stack_func(NleComposition * comp,UpdateCompositionData * ucompo)667 _initialize_stack_func (NleComposition * comp, UpdateCompositionData * ucompo)
668 {
669   NleCompositionPrivate *priv = comp->priv;
670 
671 
672   _post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
673 
674   _commit_all_values (comp);
675   update_start_stop_duration (comp);
676   comp->priv->next_base_time = 0;
677   /* set ghostpad target */
678   if (!(update_pipeline (comp, COMP_REAL_START (comp),
679               ucompo->seqnum, COMP_UPDATE_STACK_INITIALIZE))) {
680     GST_FIXME_OBJECT (comp, "PLEASE signal state change failure ASYNC");
681   }
682 
683   _post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
684   priv->initialized = TRUE;
685 
686   return G_SOURCE_REMOVE;
687 }
688 
689 static void
_remove_object_func(NleComposition * comp,ChildIOData * childio)690 _remove_object_func (NleComposition * comp, ChildIOData * childio)
691 {
692   NleObject *object = childio->object;
693 
694   NleCompositionPrivate *priv = comp->priv;
695   NleObject *in_pending_io;
696 
697   in_pending_io = g_hash_table_lookup (priv->pending_io, object);
698 
699   if (!g_hash_table_contains (priv->objects_hash, object)) {
700     if (in_pending_io) {
701       GST_INFO_OBJECT (comp, "Object %" GST_PTR_FORMAT " was marked"
702           " for addition, removing it from the addition list", object);
703 
704       g_hash_table_remove (priv->pending_io, object);
705       return;
706     }
707 
708     GST_ERROR_OBJECT (comp, "Object %" GST_PTR_FORMAT " is "
709         " not in the composition", object);
710 
711     return;
712   }
713 
714   if (in_pending_io) {
715     GST_WARNING_OBJECT (comp, "Object %" GST_PTR_FORMAT " is already marked"
716         " for removal", object);
717 
718     return;
719   }
720 
721   g_hash_table_add (priv->pending_io, gst_object_ref (object));
722 
723   return;
724 }
725 
726 static void
_add_remove_object_action(NleComposition * comp,NleObject * object)727 _add_remove_object_action (NleComposition * comp, NleObject * object)
728 {
729   ChildIOData *childio = g_slice_new0 (ChildIOData);
730 
731   GST_DEBUG_OBJECT (comp, "Adding Action");
732 
733   childio->comp = comp;
734   childio->object = object;
735 
736   _add_action (comp, G_CALLBACK (_remove_object_func),
737       childio, G_PRIORITY_DEFAULT);
738 }
739 
740 static void
_add_object_func(NleComposition * comp,ChildIOData * childio)741 _add_object_func (NleComposition * comp, ChildIOData * childio)
742 {
743   NleObject *object = childio->object;
744   NleCompositionPrivate *priv = comp->priv;
745   NleObject *in_pending_io;
746 
747   in_pending_io = g_hash_table_lookup (priv->pending_io, object);
748 
749   if (g_hash_table_contains (priv->objects_hash, object)) {
750 
751     if (in_pending_io) {
752       GST_INFO_OBJECT (comp, "Object already in but marked in pendings"
753           " removing from pendings");
754       g_hash_table_remove (priv->pending_io, object);
755 
756       return;
757     }
758     GST_ERROR_OBJECT (comp, "Object %" GST_PTR_FORMAT " is "
759         " already in the composition", object);
760 
761     return;
762   }
763 
764   if (in_pending_io) {
765     GST_WARNING_OBJECT (comp, "Object %" GST_PTR_FORMAT " is already marked"
766         " for addition", object);
767 
768     return;
769   }
770 
771   /* current reference is hold by the action and will be released with it,
772    * so take a new one */
773   g_hash_table_add (priv->pending_io, gst_object_ref (object));
774 }
775 
776 static void
_add_add_object_action(NleComposition * comp,NleObject * object)777 _add_add_object_action (NleComposition * comp, NleObject * object)
778 {
779   ChildIOData *childio = g_slice_new0 (ChildIOData);
780 
781   GST_DEBUG_OBJECT (comp, "Adding Action");
782 
783   childio->comp = comp;
784   childio->object = object;
785 
786   _add_action (comp, G_CALLBACK (_add_object_func), childio,
787       G_PRIORITY_DEFAULT);
788 }
789 
790 static void
_free_action(gpointer udata,Action * action)791 _free_action (gpointer udata, Action * action)
792 {
793   GST_LOG ("freeing %p action for %s", action,
794       GST_DEBUG_FUNCPTR_NAME (ACTION_CALLBACK (action)));
795   if (ACTION_CALLBACK (action) == _seek_pipeline_func) {
796     SeekData *seekd = (SeekData *) udata;
797 
798     gst_event_unref (seekd->event);
799     g_slice_free (SeekData, seekd);
800   } else if (ACTION_CALLBACK (action) == _add_object_func) {
801     ChildIOData *iodata = (ChildIOData *) udata;
802 
803     gst_object_unref (iodata->object);
804     g_slice_free (ChildIOData, iodata);
805   } else if (ACTION_CALLBACK (action) == _remove_object_func) {
806     g_slice_free (ChildIOData, udata);
807   } else if (ACTION_CALLBACK (action) == _update_pipeline_func ||
808       ACTION_CALLBACK (action) == _commit_func ||
809       ACTION_CALLBACK (action) == _initialize_stack_func) {
810     g_slice_free (UpdateCompositionData, udata);
811   }
812 }
813 
814 static void
_add_action_locked(NleComposition * comp,GCallback func,gpointer data,gint priority)815 _add_action_locked (NleComposition * comp, GCallback func,
816     gpointer data, gint priority)
817 {
818   Action *action;
819   NleCompositionPrivate *priv = comp->priv;
820 
821   action = (Action *) g_closure_new_simple (sizeof (Action), data);
822   g_closure_add_finalize_notifier ((GClosure *) action, data,
823       (GClosureNotify) _free_action);
824   ACTION_CALLBACK (action) = func;
825 
826   action->priority = priority;
827   g_closure_set_marshal ((GClosure *) action, g_cclosure_marshal_VOID__VOID);
828 
829   GST_INFO_OBJECT (comp, "Adding Action for function: %p:%s",
830       action, GST_DEBUG_FUNCPTR_NAME (func));
831 
832   if (func == G_CALLBACK (_emit_commited_signal_func))
833     priv->actions = g_list_prepend (priv->actions, action);
834   else
835     priv->actions = g_list_append (priv->actions, action);
836 
837   GST_LOG_OBJECT (comp, "the number of remaining actions: %d",
838       g_list_length (priv->actions));
839 
840   SIGNAL_NEW_ACTION (comp);
841 }
842 
843 static void
_add_action(NleComposition * comp,GCallback func,gpointer data,gint priority)844 _add_action (NleComposition * comp, GCallback func,
845     gpointer data, gint priority)
846 {
847   ACTIONS_LOCK (comp);
848   _add_action_locked (comp, func, data, priority);
849   ACTIONS_UNLOCK (comp);
850 }
851 
852 static void
_add_seek_action(NleComposition * comp,GstEvent * event)853 _add_seek_action (NleComposition * comp, GstEvent * event)
854 {
855   SeekData *seekd;
856   GList *tmp;
857   guint32 seqnum = gst_event_get_seqnum (event);
858 
859   ACTIONS_LOCK (comp);
860   /* Check if this is our current seqnum */
861   if (seqnum == comp->priv->next_eos_seqnum) {
862     GST_DEBUG_OBJECT (comp, "Not adding Action, same seqnum as previous seek");
863     ACTIONS_UNLOCK (comp);
864     return;
865   }
866 
867   /* Check if this seqnum is already queued up but not handled yet */
868   for (tmp = comp->priv->actions; tmp != NULL; tmp = tmp->next) {
869     Action *act = tmp->data;
870 
871     if (ACTION_CALLBACK (act) == G_CALLBACK (_seek_pipeline_func)) {
872       SeekData *tmp_data = ((GClosure *) act)->data;
873 
874       if (gst_event_get_seqnum (tmp_data->event) == seqnum) {
875         GST_DEBUG_OBJECT (comp,
876             "Not adding Action, same seqnum as previous seek");
877         ACTIONS_UNLOCK (comp);
878         return;
879       }
880     }
881   }
882 
883   /* Check if this seqnum is currently being handled */
884   if (comp->priv->current_action) {
885     Action *act = comp->priv->current_action;
886     if (ACTION_CALLBACK (act) == G_CALLBACK (_seek_pipeline_func)) {
887       SeekData *tmp_data = ((GClosure *) act)->data;
888 
889       if (gst_event_get_seqnum (tmp_data->event) == seqnum) {
890         GST_DEBUG_OBJECT (comp,
891             "Not adding Action, same seqnum as previous seek");
892         ACTIONS_UNLOCK (comp);
893         return;
894       }
895     }
896   }
897 
898   GST_DEBUG_OBJECT (comp, "Adding Action");
899 
900   seekd = g_slice_new0 (SeekData);
901   seekd->comp = comp;
902   seekd->event = event;
903 
904   comp->priv->next_eos_seqnum = 0;
905   comp->priv->real_eos_seqnum = 0;
906   _add_action_locked (comp, G_CALLBACK (_seek_pipeline_func), seekd,
907       G_PRIORITY_DEFAULT);
908 
909   ACTIONS_UNLOCK (comp);
910 }
911 
912 static void
_remove_update_actions(NleComposition * comp)913 _remove_update_actions (NleComposition * comp)
914 {
915   _remove_actions_for_type (comp, G_CALLBACK (_update_pipeline_func));
916 }
917 
918 static void
_remove_seek_actions(NleComposition * comp)919 _remove_seek_actions (NleComposition * comp)
920 {
921   _remove_actions_for_type (comp, G_CALLBACK (_seek_pipeline_func));
922 }
923 
924 static void
_add_update_compo_action(NleComposition * comp,GCallback callback,NleUpdateStackReason reason)925 _add_update_compo_action (NleComposition * comp,
926     GCallback callback, NleUpdateStackReason reason)
927 {
928   UpdateCompositionData *ucompo = g_slice_new0 (UpdateCompositionData);
929 
930   ucompo->comp = comp;
931   ucompo->reason = reason;
932   ucompo->seqnum = gst_util_seqnum_next ();
933 
934   GST_INFO_OBJECT (comp, "Updating because: %s -- Setting seqnum: %i",
935       UPDATE_PIPELINE_REASONS[reason], ucompo->seqnum);
936 
937   _add_action (comp, callback, ucompo, G_PRIORITY_DEFAULT);
938 }
939 
940 static void
nle_composition_handle_message(GstBin * bin,GstMessage * message)941 nle_composition_handle_message (GstBin * bin, GstMessage * message)
942 {
943   NleComposition *comp = (NleComposition *) bin;
944   NleCompositionPrivate *priv = comp->priv;
945 
946   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR &&
947       (priv->tearing_down_stack || priv->suppress_child_error)) {
948     GST_FIXME_OBJECT (comp, "Dropping %" GST_PTR_FORMAT " message from "
949         " %" GST_PTR_FORMAT " tearing down: %d, suppressing error: %d",
950         message, GST_MESSAGE_SRC (message), priv->tearing_down_stack,
951         priv->suppress_child_error);
952     goto drop;
953   } else if (comp->priv->tearing_down_stack) {
954     GST_DEBUG_OBJECT (comp, "Dropping message %" GST_PTR_FORMAT " from "
955         "object being teared down to READY!", message);
956     goto drop;
957   }
958 
959   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
960 
961   return;
962 
963 drop:
964   gst_message_unref (message);
965 
966   return;
967 }
968 
969 static void
nle_composition_class_init(NleCompositionClass * klass)970 nle_composition_class_init (NleCompositionClass * klass)
971 {
972   GObjectClass *gobject_class;
973   GstElementClass *gstelement_class;
974   GstBinClass *gstbin_class;
975   NleObjectClass *nleobject_class;
976 
977   gobject_class = (GObjectClass *) klass;
978   gstelement_class = (GstElementClass *) klass;
979   gstbin_class = (GstBinClass *) klass;
980   nleobject_class = (NleObjectClass *) klass;
981 
982   gst_element_class_set_static_metadata (gstelement_class,
983       "GNonLin Composition", "Filter/Editor", "Combines NLE objects",
984       "Wim Taymans <wim.taymans@gmail.com>, Edward Hervey <bilboed@bilboed.com>,"
985       " Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>,"
986       " Thibault Saunier <tsaunier@gnome.org>");
987 
988   gobject_class->dispose = GST_DEBUG_FUNCPTR (nle_composition_dispose);
989   gobject_class->finalize = GST_DEBUG_FUNCPTR (nle_composition_finalize);
990 
991   gstelement_class->change_state = nle_composition_change_state;
992 
993   gstbin_class->add_element = GST_DEBUG_FUNCPTR (nle_composition_add_object);
994   gstbin_class->remove_element =
995       GST_DEBUG_FUNCPTR (nle_composition_remove_object);
996   gstbin_class->handle_message =
997       GST_DEBUG_FUNCPTR (nle_composition_handle_message);
998 
999   gst_element_class_add_static_pad_template (gstelement_class,
1000       &nle_composition_src_template);
1001 
1002   /* Get the paramspec of the NleObject klass so we can do
1003    * fast notifies */
1004   nleobject_properties[NLEOBJECT_PROP_START] =
1005       g_object_class_find_property (gobject_class, "start");
1006   nleobject_properties[NLEOBJECT_PROP_STOP] =
1007       g_object_class_find_property (gobject_class, "stop");
1008   nleobject_properties[NLEOBJECT_PROP_DURATION] =
1009       g_object_class_find_property (gobject_class, "duration");
1010 
1011   _signals[COMMITED_SIGNAL] =
1012       g_signal_new ("commited", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
1013       0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
1014       G_TYPE_BOOLEAN);
1015 
1016   GST_DEBUG_REGISTER_FUNCPTR (_seek_pipeline_func);
1017   GST_DEBUG_REGISTER_FUNCPTR (_remove_object_func);
1018   GST_DEBUG_REGISTER_FUNCPTR (_add_object_func);
1019   GST_DEBUG_REGISTER_FUNCPTR (_update_pipeline_func);
1020   GST_DEBUG_REGISTER_FUNCPTR (_commit_func);
1021   GST_DEBUG_REGISTER_FUNCPTR (_emit_commited_signal_func);
1022   GST_DEBUG_REGISTER_FUNCPTR (_initialize_stack_func);
1023 
1024   /* Just be useless, so the compiler does not warn us
1025    * about our uselessness */
1026   nleobject_class->commit = nle_composition_commit_func;
1027 
1028 }
1029 
1030 static void
nle_composition_init(NleComposition * comp)1031 nle_composition_init (NleComposition * comp)
1032 {
1033   NleCompositionPrivate *priv;
1034 
1035   GST_OBJECT_FLAG_SET (comp, NLE_OBJECT_SOURCE);
1036   GST_OBJECT_FLAG_SET (comp, NLE_OBJECT_COMPOSITION);
1037 
1038   priv = nle_composition_get_instance_private (comp);
1039   priv->objects_start = NULL;
1040   priv->objects_stop = NULL;
1041 
1042   priv->segment = gst_segment_new ();
1043   priv->seek_segment = gst_segment_new ();
1044 
1045   g_rec_mutex_init (&comp->task_rec_lock);
1046 
1047   priv->objects_hash = g_hash_table_new (g_direct_hash, g_direct_equal);
1048 
1049   g_mutex_init (&priv->actions_lock);
1050   g_cond_init (&priv->actions_cond);
1051 
1052   priv->pending_io = g_hash_table_new_full (g_direct_hash, g_direct_equal,
1053       gst_object_unref, NULL);
1054 
1055   comp->priv = priv;
1056 
1057   priv->current_bin = gst_bin_new ("current-bin");
1058   gst_bin_add (GST_BIN (comp), priv->current_bin);
1059 
1060   nle_composition_reset (comp);
1061 
1062   priv->nle_event_pad_func = GST_PAD_EVENTFUNC (NLE_OBJECT_SRC (comp));
1063   gst_pad_set_event_function (NLE_OBJECT_SRC (comp),
1064       GST_DEBUG_FUNCPTR (nle_composition_event_handler));
1065 }
1066 
1067 static void
_remove_each_nleobj(gpointer data,gpointer udata)1068 _remove_each_nleobj (gpointer data, gpointer udata)
1069 {
1070   NleComposition *comp = NLE_COMPOSITION (udata);
1071   NleObject *nleobj = NLE_OBJECT (data);
1072 
1073   _nle_composition_remove_object (NLE_COMPOSITION (comp), NLE_OBJECT (nleobj));
1074 }
1075 
1076 static void
_remove_each_action(gpointer data)1077 _remove_each_action (gpointer data)
1078 {
1079   Action *action = (Action *) (data);
1080 
1081   GST_LOG ("remove action %p for %s", action,
1082       GST_DEBUG_FUNCPTR_NAME (ACTION_CALLBACK (action)));
1083   g_closure_invalidate ((GClosure *) action);
1084   g_closure_unref ((GClosure *) action);
1085 }
1086 
1087 static void
nle_composition_dispose(GObject * object)1088 nle_composition_dispose (GObject * object)
1089 {
1090   NleComposition *comp = NLE_COMPOSITION (object);
1091   NleCompositionPrivate *priv = comp->priv;
1092 
1093   if (priv->dispose_has_run)
1094     return;
1095 
1096   priv->dispose_has_run = TRUE;
1097 
1098   g_list_foreach (priv->objects_start, _remove_each_nleobj, comp);
1099   g_list_free (priv->objects_start);
1100 
1101   g_list_foreach (priv->expandables, _remove_each_nleobj, comp);
1102   g_list_free (priv->expandables);
1103 
1104   g_list_foreach (priv->objects_stop, _remove_each_nleobj, comp);
1105   g_list_free (priv->objects_stop);
1106 
1107   g_list_free_full (priv->actions, (GDestroyNotify) _remove_each_action);
1108 
1109   nle_composition_reset_target_pad (comp);
1110 
1111   if (priv->pending_io) {
1112     g_hash_table_unref (priv->pending_io);
1113   }
1114 
1115   G_OBJECT_CLASS (parent_class)->dispose (object);
1116 }
1117 
1118 static void
nle_composition_finalize(GObject * object)1119 nle_composition_finalize (GObject * object)
1120 {
1121   NleComposition *comp = NLE_COMPOSITION (object);
1122   NleCompositionPrivate *priv = comp->priv;
1123 
1124   _assert_proper_thread (comp);
1125 
1126   if (priv->current) {
1127     g_node_destroy (priv->current);
1128     priv->current = NULL;
1129   }
1130 
1131   g_hash_table_destroy (priv->objects_hash);
1132 
1133   gst_segment_free (priv->segment);
1134   gst_segment_free (priv->seek_segment);
1135 
1136   g_rec_mutex_clear (&comp->task_rec_lock);
1137 
1138   g_mutex_clear (&priv->actions_lock);
1139   g_cond_clear (&priv->actions_cond);
1140 
1141   G_OBJECT_CLASS (parent_class)->finalize (object);
1142 }
1143 
1144 /* signal_duration_change
1145  * Creates a new GST_MESSAGE_DURATION_CHANGED with the currently configured
1146  * composition duration and sends that on the bus.
1147  */
1148 static inline void
signal_duration_change(NleComposition * comp)1149 signal_duration_change (NleComposition * comp)
1150 {
1151   gst_element_post_message (GST_ELEMENT_CAST (comp),
1152       gst_message_new_duration_changed (GST_OBJECT_CAST (comp)));
1153 }
1154 
1155 static gboolean
_remove_child(GValue * item,GValue * ret G_GNUC_UNUSED,GstBin * bin)1156 _remove_child (GValue * item, GValue * ret G_GNUC_UNUSED, GstBin * bin)
1157 {
1158   GstElement *child = g_value_get_object (item);
1159 
1160   if (NLE_IS_OPERATION (child))
1161     nle_operation_hard_cleanup (NLE_OPERATION (child));
1162 
1163 
1164   gst_bin_remove (bin, child);
1165 
1166   return TRUE;
1167 }
1168 
1169 static void
_empty_bin(GstBin * bin)1170 _empty_bin (GstBin * bin)
1171 {
1172   GstIterator *children;
1173 
1174   children = gst_bin_iterate_elements (bin);
1175 
1176   while (G_UNLIKELY (gst_iterator_fold (children,
1177               (GstIteratorFoldFunction) _remove_child, NULL,
1178               bin) == GST_ITERATOR_RESYNC)) {
1179     gst_iterator_resync (children);
1180   }
1181 
1182   gst_iterator_free (children);
1183 }
1184 
1185 static void
nle_composition_reset(NleComposition * comp)1186 nle_composition_reset (NleComposition * comp)
1187 {
1188   NleCompositionPrivate *priv = comp->priv;
1189 
1190   GST_DEBUG_OBJECT (comp, "resetting");
1191 
1192   _assert_proper_thread (comp);
1193 
1194   priv->current_stack_start = GST_CLOCK_TIME_NONE;
1195   priv->current_stack_stop = GST_CLOCK_TIME_NONE;
1196   priv->next_base_time = 0;
1197 
1198   gst_segment_init (priv->segment, GST_FORMAT_TIME);
1199   gst_segment_init (priv->seek_segment, GST_FORMAT_TIME);
1200 
1201   if (priv->current)
1202     g_node_destroy (priv->current);
1203   priv->current = NULL;
1204 
1205   nle_composition_reset_target_pad (comp);
1206 
1207   priv->initialized = FALSE;
1208   priv->send_stream_start = TRUE;
1209   priv->real_eos_seqnum = 0;
1210   priv->next_eos_seqnum = 0;
1211   priv->flush_seqnum = 0;
1212 
1213   _empty_bin (GST_BIN_CAST (priv->current_bin));
1214 
1215   GST_DEBUG_OBJECT (comp, "Composition now resetted");
1216 }
1217 
1218 static GstPadProbeReturn
ghost_event_probe_handler(GstPad * ghostpad G_GNUC_UNUSED,GstPadProbeInfo * info,NleComposition * comp)1219 ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
1220     GstPadProbeInfo * info, NleComposition * comp)
1221 {
1222   GstPadProbeReturn retval = GST_PAD_PROBE_OK;
1223   NleCompositionPrivate *priv = comp->priv;
1224   GstEvent *event;
1225 
1226   if (GST_IS_BUFFER (info->data) ||
1227       (GST_IS_QUERY (info->data) && GST_QUERY_IS_SERIALIZED (info->data))) {
1228     if (priv->waiting_serialized_query_or_buffer) {
1229       GST_INFO_OBJECT (comp, "update_pipeline DONE");
1230       _restart_task (comp);
1231     }
1232 
1233     return GST_PAD_PROBE_OK;
1234   }
1235 
1236   event = GST_PAD_PROBE_INFO_EVENT (info);
1237 
1238   GST_DEBUG_OBJECT (comp, "event: %s", GST_EVENT_TYPE_NAME (event));
1239 
1240   switch (GST_EVENT_TYPE (event)) {
1241     case GST_EVENT_FLUSH_STOP:
1242       if (_is_ready_to_restart_task (comp, event))
1243         _restart_task (comp);
1244 
1245       if (gst_event_get_seqnum (event) != comp->priv->flush_seqnum) {
1246         GST_INFO_OBJECT (comp, "Dropping flush stop %d -- %d",
1247             gst_event_get_seqnum (event), priv->seqnum_to_restart_task);
1248         retval = GST_PAD_PROBE_DROP;
1249       } else {
1250         GST_INFO_OBJECT (comp, "Forwarding our flush stop with seqnum %i",
1251             comp->priv->flush_seqnum);
1252         gst_event_unref (event);
1253         event = gst_event_new_flush_stop (TRUE);
1254         GST_PAD_PROBE_INFO_DATA (info) = event;
1255         gst_event_set_seqnum (event, comp->priv->flush_seqnum);
1256         comp->priv->flush_seqnum = 0;
1257       }
1258       break;
1259     case GST_EVENT_FLUSH_START:
1260       if (gst_event_get_seqnum (event) != comp->priv->flush_seqnum) {
1261         GST_INFO_OBJECT (comp, "Dropping flush start");
1262         retval = GST_PAD_PROBE_DROP;
1263       } else {
1264         GST_INFO_OBJECT (comp, "Forwarding our flush start with seqnum %i",
1265             comp->priv->flush_seqnum);
1266       }
1267       break;
1268     case GST_EVENT_STREAM_START:
1269       if (g_atomic_int_compare_and_exchange (&priv->send_stream_start, TRUE,
1270               FALSE)) {
1271         /* FIXME: Do we want to create a new stream ID here? */
1272         GST_DEBUG_OBJECT (comp, "forward stream-start %p", event);
1273       } else {
1274         GST_DEBUG_OBJECT (comp, "dropping stream-start %p", event);
1275         retval = GST_PAD_PROBE_DROP;
1276       }
1277       break;
1278     case GST_EVENT_SEGMENT:
1279     {
1280       guint64 rstart, rstop;
1281       const GstSegment *segment;
1282       GstSegment copy;
1283       GstEvent *event2;
1284       /* next_base_time */
1285 
1286       if (_is_ready_to_restart_task (comp, event))
1287         _restart_task (comp);
1288 
1289       gst_event_parse_segment (event, &segment);
1290       gst_segment_copy_into (segment, &copy);
1291 
1292       rstart =
1293           gst_segment_to_running_time (segment, GST_FORMAT_TIME,
1294           segment->start);
1295       rstop =
1296           gst_segment_to_running_time (segment, GST_FORMAT_TIME, segment->stop);
1297       copy.base = comp->priv->next_base_time;
1298       GST_DEBUG_OBJECT (comp,
1299           "Updating base time to %" GST_TIME_FORMAT ", next:%" GST_TIME_FORMAT,
1300           GST_TIME_ARGS (comp->priv->next_base_time),
1301           GST_TIME_ARGS (comp->priv->next_base_time + rstop - rstart));
1302       comp->priv->next_base_time += rstop - rstart;
1303 
1304       event2 = gst_event_new_segment (&copy);
1305       GST_EVENT_SEQNUM (event2) = GST_EVENT_SEQNUM (event);
1306 
1307       GST_PAD_PROBE_INFO_DATA (info) = event2;
1308       gst_event_unref (event);
1309     }
1310       break;
1311     case GST_EVENT_TAG:
1312       GST_DEBUG_OBJECT (comp, "Dropping tag: %" GST_PTR_FORMAT, info->data);
1313       retval = GST_PAD_PROBE_DROP;
1314       break;
1315     case GST_EVENT_EOS:
1316     {
1317       gint seqnum = gst_event_get_seqnum (event);
1318 
1319       GST_INFO_OBJECT (comp, "Got EOS, last EOS seqnum id : %i current "
1320           "seq num is: %i", comp->priv->real_eos_seqnum, seqnum);
1321 
1322       if (_is_ready_to_restart_task (comp, event)) {
1323         GST_INFO_OBJECT (comp, "We got an EOS right after seeing the right"
1324             " segment, restarting task");
1325 
1326         _restart_task (comp);
1327       }
1328 
1329       if (g_atomic_int_compare_and_exchange (&comp->priv->real_eos_seqnum,
1330               seqnum, 1)) {
1331 
1332         GST_INFO_OBJECT (comp, "Got EOS for real, seq ID is %i, fowarding it",
1333             seqnum);
1334 
1335         return GST_PAD_PROBE_OK;
1336       }
1337 
1338       if (priv->next_eos_seqnum == seqnum)
1339         _add_update_compo_action (comp, G_CALLBACK (_update_pipeline_func),
1340             COMP_UPDATE_STACK_ON_EOS);
1341       else
1342         GST_INFO_OBJECT (comp,
1343             "Got an EOS but it seqnum %i != next eos seqnum %i", seqnum,
1344             priv->next_eos_seqnum);
1345 
1346       retval = GST_PAD_PROBE_DROP;
1347     }
1348       break;
1349     default:
1350       break;
1351   }
1352 
1353   return retval;
1354 }
1355 
1356 static gint
priority_comp(NleObject * a,NleObject * b)1357 priority_comp (NleObject * a, NleObject * b)
1358 {
1359   if (a->priority < b->priority)
1360     return -1;
1361 
1362   if (a->priority > b->priority)
1363     return 1;
1364 
1365   return 0;
1366 }
1367 
1368 static inline gboolean
have_to_update_pipeline(NleComposition * comp,NleUpdateStackReason update_stack_reason)1369 have_to_update_pipeline (NleComposition * comp,
1370     NleUpdateStackReason update_stack_reason)
1371 {
1372   NleCompositionPrivate *priv = comp->priv;
1373 
1374   if (update_stack_reason == COMP_UPDATE_STACK_ON_EOS)
1375     return TRUE;
1376 
1377   GST_DEBUG_OBJECT (comp,
1378       "segment[%" GST_TIME_FORMAT "--%" GST_TIME_FORMAT "] current[%"
1379       GST_TIME_FORMAT "--%" GST_TIME_FORMAT "]",
1380       GST_TIME_ARGS (priv->segment->start),
1381       GST_TIME_ARGS (priv->segment->stop),
1382       GST_TIME_ARGS (priv->current_stack_start),
1383       GST_TIME_ARGS (priv->current_stack_stop));
1384 
1385   if (priv->segment->start < priv->current_stack_start)
1386     return TRUE;
1387 
1388   if (priv->segment->start >= priv->current_stack_stop)
1389     return TRUE;
1390 
1391   return FALSE;
1392 }
1393 
1394 static gboolean
nle_composition_commit_func(NleObject * object,gboolean recurse)1395 nle_composition_commit_func (NleObject * object, gboolean recurse)
1396 {
1397   _add_update_compo_action (NLE_COMPOSITION (object),
1398       G_CALLBACK (_commit_func), COMP_UPDATE_STACK_ON_COMMIT);
1399 
1400   return TRUE;
1401 }
1402 
1403 /*
1404  * get_new_seek_event:
1405  *
1406  * Returns a seek event for the currently configured segment
1407  * and start/stop values
1408  *
1409  * The GstSegment and current_stack_start|stop must have been configured
1410  * before calling this function.
1411  */
1412 static GstEvent *
get_new_seek_event(NleComposition * comp,gboolean initial,gboolean updatestoponly)1413 get_new_seek_event (NleComposition * comp, gboolean initial,
1414     gboolean updatestoponly)
1415 {
1416   GstSeekFlags flags = GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH;
1417   gint64 start, stop;
1418   GstSeekType starttype = GST_SEEK_TYPE_SET;
1419   NleCompositionPrivate *priv = comp->priv;
1420 
1421   GST_DEBUG_OBJECT (comp, "initial:%d", initial);
1422   /* remove the seek flag */
1423   if (!initial)
1424     flags |= (GstSeekFlags) priv->segment->flags;
1425 
1426   GST_DEBUG_OBJECT (comp,
1427       "private->segment->start:%" GST_TIME_FORMAT " current_stack_start%"
1428       GST_TIME_FORMAT, GST_TIME_ARGS (priv->segment->start),
1429       GST_TIME_ARGS (priv->current_stack_start));
1430 
1431   GST_DEBUG_OBJECT (comp,
1432       "private->segment->stop:%" GST_TIME_FORMAT " current_stack_stop%"
1433       GST_TIME_FORMAT, GST_TIME_ARGS (priv->segment->stop),
1434       GST_TIME_ARGS (priv->current_stack_stop));
1435 
1436   start = GST_CLOCK_TIME_IS_VALID (priv->segment->start)
1437       ? MAX (priv->segment->start, priv->current_stack_start)
1438       : priv->current_stack_start;
1439   stop = GST_CLOCK_TIME_IS_VALID (priv->segment->stop)
1440       ? MIN (priv->segment->stop, priv->current_stack_stop)
1441       : priv->current_stack_stop;
1442 
1443   if (updatestoponly) {
1444     starttype = GST_SEEK_TYPE_NONE;
1445     start = GST_CLOCK_TIME_NONE;
1446   }
1447 
1448   GST_DEBUG_OBJECT (comp,
1449       "Created new seek event. Flags:%d, start:%" GST_TIME_FORMAT ", stop:%"
1450       GST_TIME_FORMAT ", rate:%lf", flags, GST_TIME_ARGS (start),
1451       GST_TIME_ARGS (stop), priv->segment->rate);
1452 
1453   return gst_event_new_seek (priv->segment->rate,
1454       priv->segment->format, flags, starttype, start, GST_SEEK_TYPE_SET, stop);
1455 }
1456 
1457 /* OBJECTS LOCK must be taken when calling this ! */
1458 static GstClockTime
get_current_position(NleComposition * comp)1459 get_current_position (NleComposition * comp)
1460 {
1461   GstPad *pad;
1462   NleObject *obj;
1463   NleCompositionPrivate *priv = comp->priv;
1464   gboolean res;
1465   gint64 value = GST_CLOCK_TIME_NONE;
1466   GstObject *parent, *tmp;
1467 
1468   GstPad *peer;
1469 
1470   parent = gst_object_get_parent (GST_OBJECT (comp));
1471   while ((tmp = parent)) {
1472     if (NLE_IS_COMPOSITION (parent)) {
1473       GstClockTime parent_position =
1474           get_current_position (NLE_COMPOSITION (parent));
1475 
1476       if (parent_position > NLE_OBJECT_STOP (comp)
1477           || parent_position < NLE_OBJECT_START (comp)) {
1478         GST_INFO_OBJECT (comp,
1479             "Global position outside of subcomposition, returning TIME_NONE");
1480 
1481         return GST_CLOCK_TIME_NONE;
1482       }
1483 
1484       value =
1485           parent_position - NLE_OBJECT_START (comp) + NLE_OBJECT_INPOINT (comp);
1486     }
1487 
1488     if (GST_IS_PIPELINE (parent)) {
1489       if (gst_element_query_position (GST_ELEMENT (parent), GST_FORMAT_TIME,
1490               &value)) {
1491 
1492         gst_object_unref (parent);
1493         return value;
1494       }
1495     }
1496 
1497 
1498     parent = gst_object_get_parent (GST_OBJECT (parent));
1499     gst_object_unref (tmp);
1500   }
1501 
1502   /* Try querying position downstream */
1503   peer = gst_pad_get_peer (NLE_OBJECT (comp)->srcpad);
1504   if (peer) {
1505     res = gst_pad_query_position (peer, GST_FORMAT_TIME, &value);
1506     gst_object_unref (peer);
1507 
1508     if (res) {
1509       GST_DEBUG_OBJECT (comp,
1510           "Successfully got downstream position %" GST_TIME_FORMAT,
1511           GST_TIME_ARGS ((guint64) value));
1512       goto beach;
1513     }
1514   }
1515 
1516   GST_DEBUG_OBJECT (comp, "Downstream position query failed");
1517 
1518   /* resetting format/value */
1519   value = GST_CLOCK_TIME_NONE;
1520 
1521   /* If downstream fails , try within the current stack */
1522   if (!priv->current) {
1523     GST_DEBUG_OBJECT (comp, "No current stack, can't send query");
1524     goto beach;
1525   }
1526 
1527   obj = (NleObject *) priv->current->data;
1528 
1529   pad = NLE_OBJECT_SRC (obj);
1530   res = gst_pad_query_position (pad, GST_FORMAT_TIME, &value);
1531 
1532   if (G_UNLIKELY (res == FALSE)) {
1533     GST_WARNING_OBJECT (comp, "query position failed");
1534     value = GST_CLOCK_TIME_NONE;
1535   } else {
1536     GST_LOG_OBJECT (comp, "Query returned %" GST_TIME_FORMAT,
1537         GST_TIME_ARGS ((guint64) value));
1538   }
1539 
1540 beach:
1541 
1542   if (!GST_CLOCK_TIME_IS_VALID (value)) {
1543     if (GST_CLOCK_TIME_IS_VALID (comp->priv->current_stack_start)) {
1544       value = comp->priv->current_stack_start;
1545     } else {
1546       GST_INFO_OBJECT (comp, "Current position is unknown, " "setting it to 0");
1547 
1548       value = 0;
1549     }
1550   }
1551 
1552   return (guint64) value;
1553 }
1554 
1555 /* WITH OBJECTS LOCK TAKEN */
1556 static gboolean
update_base_time(GNode * node,GstClockTime * timestamp)1557 update_base_time (GNode * node, GstClockTime * timestamp)
1558 {
1559   if (NLE_IS_OPERATION (node->data))
1560     nle_operation_update_base_time (NLE_OPERATION (node->data), *timestamp);
1561 
1562   return FALSE;
1563 }
1564 
1565 /* WITH OBJECTS LOCK TAKEN */
1566 static void
update_operations_base_time(NleComposition * comp,gboolean reverse)1567 update_operations_base_time (NleComposition * comp, gboolean reverse)
1568 {
1569   GstClockTime timestamp;
1570 
1571   if (reverse)
1572     timestamp = comp->priv->segment->stop;
1573   else
1574     timestamp = comp->priv->segment->start;
1575 
1576   g_node_traverse (comp->priv->current, G_IN_ORDER, G_TRAVERSE_ALL, -1,
1577       (GNodeTraverseFunc) update_base_time, &timestamp);
1578 }
1579 
1580 /* WITH OBJECTS LOCK TAKEN */
1581 static gboolean
_seek_current_stack(NleComposition * comp,GstEvent * event,gboolean flush_downstream)1582 _seek_current_stack (NleComposition * comp, GstEvent * event,
1583     gboolean flush_downstream)
1584 {
1585   gboolean res;
1586   NleCompositionPrivate *priv = comp->priv;
1587   GstPad *peer = gst_pad_get_peer (NLE_OBJECT_SRC (comp));
1588 
1589   GST_INFO_OBJECT (comp, "Seeking itself %" GST_PTR_FORMAT, event);
1590 
1591   if (!peer) {
1592     GST_ERROR_OBJECT (comp, "Can't seek because no pad available - "
1593         "no children in the composition ready to be used, the duration is 0, "
1594         "or not committed yet");
1595     return FALSE;
1596   }
1597 
1598   if (flush_downstream) {
1599     priv->flush_seqnum = gst_event_get_seqnum (event);
1600     GST_INFO_OBJECT (comp, "sending flushes downstream with seqnum %d",
1601         priv->flush_seqnum);
1602   }
1603 
1604   priv->seeking_itself = TRUE;
1605   res = gst_pad_push_event (peer, event);
1606   priv->seeking_itself = FALSE;
1607   gst_object_unref (peer);
1608 
1609   GST_DEBUG_OBJECT (comp, "Done seeking");
1610 
1611   return res;
1612 }
1613 
1614 /*
1615   Figures out if pipeline needs updating.
1616   Updates it and sends the seek event.
1617   Sends flush events downstream if needed.
1618   can be called by user_seek or segment_done
1619 
1620   update_stack_reason: The reason for which we need to handle 'seek'
1621 */
1622 
1623 static gboolean
seek_handling(NleComposition * comp,gint32 seqnum,NleUpdateStackReason update_stack_reason)1624 seek_handling (NleComposition * comp, gint32 seqnum,
1625     NleUpdateStackReason update_stack_reason)
1626 {
1627   GST_DEBUG_OBJECT (comp, "Seek handling update pipeline reason: %s",
1628       UPDATE_PIPELINE_REASONS[update_stack_reason]);
1629 
1630   if (have_to_update_pipeline (comp, update_stack_reason)) {
1631     if (comp->priv->segment->rate >= 0.0)
1632       update_pipeline (comp, comp->priv->segment->start, seqnum,
1633           update_stack_reason);
1634     else
1635       update_pipeline (comp, comp->priv->segment->stop, seqnum,
1636           update_stack_reason);
1637   } else {
1638     GstEvent *toplevel_seek = get_new_seek_event (comp, FALSE, FALSE);
1639 
1640     gst_event_set_seqnum (toplevel_seek, seqnum);
1641     _set_real_eos_seqnum_from_seek (comp, toplevel_seek);
1642 
1643     _remove_update_actions (comp);
1644     update_operations_base_time (comp, !(comp->priv->segment->rate >= 0.0));
1645     _seek_current_stack (comp, toplevel_seek,
1646         _have_to_flush_downstream (update_stack_reason));
1647   }
1648 
1649   return TRUE;
1650 }
1651 
1652 static gboolean
nle_composition_event_handler(GstPad * ghostpad,GstObject * parent,GstEvent * event)1653 nle_composition_event_handler (GstPad * ghostpad, GstObject * parent,
1654     GstEvent * event)
1655 {
1656   NleComposition *comp = (NleComposition *) parent;
1657   NleCompositionPrivate *priv = comp->priv;
1658   gboolean res = TRUE;
1659 
1660   GST_DEBUG_OBJECT (comp, "event type:%s", GST_EVENT_TYPE_NAME (event));
1661 
1662   switch (GST_EVENT_TYPE (event)) {
1663     case GST_EVENT_SEEK:
1664     {
1665       /* Queue up a seek action if this seek event does not come from
1666        * ourselves. Due to a possible race condition around the
1667        * seeking_itself flag, we also check if the seek comes from
1668        * our task thread. The seeking_itself flag only works as an
1669        * optimization */
1670       GST_OBJECT_LOCK (comp);
1671       if (!priv->seeking_itself || (comp->task
1672               && gst_task_get_state (comp->task) != GST_TASK_STOPPED
1673               && g_thread_self () != comp->task->thread)) {
1674         GST_OBJECT_UNLOCK (comp);
1675         _add_seek_action (comp, event);
1676         event = NULL;
1677         GST_FIXME_OBJECT (comp, "HANDLE seeking errors!");
1678 
1679         return TRUE;
1680       }
1681       GST_OBJECT_UNLOCK (comp);
1682       break;
1683     }
1684     case GST_EVENT_QOS:
1685     {
1686       gdouble prop;
1687       GstQOSType qostype;
1688       GstClockTimeDiff diff;
1689       GstClockTime timestamp;
1690 
1691       gst_event_parse_qos (event, &qostype, &prop, &diff, &timestamp);
1692 
1693       GST_DEBUG_OBJECT (comp,
1694           "timestamp:%" GST_TIME_FORMAT " segment.start:%" GST_TIME_FORMAT
1695           " segment.stop:%" GST_TIME_FORMAT " current_stack_start%"
1696           GST_TIME_FORMAT " current_stack_stop:%" GST_TIME_FORMAT,
1697           GST_TIME_ARGS (timestamp),
1698           GST_TIME_ARGS (priv->seek_segment->start),
1699           GST_TIME_ARGS (priv->seek_segment->stop),
1700           GST_TIME_ARGS (priv->current_stack_start),
1701           GST_TIME_ARGS (priv->current_stack_stop));
1702 
1703       /* The problem with QoS events is the following:
1704        * At each new internal segment (i.e. when we re-arrange our internal
1705        * elements) we send flushing seeks to those elements (to properly
1706        * configure their playback range) but don't let the FLUSH events get
1707        * downstream.
1708        *
1709        * The problem is that the QoS running timestamps we receive from
1710        * downstream will not have taken into account those flush.
1711        *
1712        * What we need to do is to translate to our internal running timestamps
1713        * which for each configured segment starts at 0 for those elements.
1714        *
1715        * The generic algorithm for the incoming running timestamp translation
1716        * is therefore:
1717        *     (original_seek_time : original seek position received from usptream)
1718        *     (current_segment_start : Start position of the currently configured
1719        *                              timeline segment)
1720        *
1721        *     difference = original_seek_time - current_segment_start
1722        *     new_qos_position = upstream_qos_position - difference
1723        *
1724        * The new_qos_position is only valid when:
1725        *    * it applies to the current segment (difference > 0)
1726        *    * The QoS difference + timestamp is greater than the difference
1727        *
1728        */
1729 
1730       if (GST_CLOCK_TIME_IS_VALID (priv->seek_segment->start)) {
1731         GstClockTimeDiff curdiff;
1732 
1733         /* We'll either create a new event or discard it */
1734         gst_event_unref (event);
1735 
1736         if (priv->segment->rate < 0.0)
1737           curdiff = priv->seek_segment->stop - priv->current_stack_stop;
1738         else
1739           curdiff = priv->current_stack_start - priv->seek_segment->start;
1740         GST_DEBUG ("curdiff %" GST_TIME_FORMAT, GST_TIME_ARGS (curdiff));
1741         if ((curdiff != 0) && ((timestamp < curdiff)
1742                 || (curdiff > timestamp + diff))) {
1743           GST_DEBUG_OBJECT (comp,
1744               "QoS event outside of current segment, discarding");
1745           /* The QoS timestamp is before the currently set-up pipeline */
1746           goto beach;
1747         }
1748 
1749         /* Substract the amount of running time we've already outputted
1750          * until the currently configured pipeline from the QoS timestamp.*/
1751         timestamp -= curdiff;
1752         GST_DEBUG_OBJECT (comp,
1753             "Creating new QoS event with timestamp %" GST_TIME_FORMAT,
1754             GST_TIME_ARGS (timestamp));
1755         event = gst_event_new_qos (qostype, prop, diff, timestamp);
1756       }
1757       break;
1758     }
1759     default:
1760       break;
1761   }
1762 
1763   if (res) {
1764     GST_DEBUG_OBJECT (comp, "About to call nle_event_pad_func: %p",
1765         priv->nle_event_pad_func);
1766     res = priv->nle_event_pad_func (NLE_OBJECT (comp)->srcpad, parent, event);
1767     GST_DEBUG_OBJECT (comp, "Done calling nle_event_pad_func() %d", res);
1768   }
1769 
1770 beach:
1771   return res;
1772 }
1773 
1774 static inline void
nle_composition_reset_target_pad(NleComposition * comp)1775 nle_composition_reset_target_pad (NleComposition * comp)
1776 {
1777   NleCompositionPrivate *priv = comp->priv;
1778 
1779   GST_DEBUG_OBJECT (comp, "Removing ghostpad");
1780 
1781   if (priv->ghosteventprobe) {
1782     GstPad *target;
1783 
1784     target = gst_ghost_pad_get_target ((GstGhostPad *) NLE_OBJECT_SRC (comp));
1785     if (target)
1786       gst_pad_remove_probe (target, priv->ghosteventprobe);
1787     priv->ghosteventprobe = 0;
1788   }
1789 
1790   nle_object_ghost_pad_set_target (NLE_OBJECT (comp),
1791       NLE_OBJECT_SRC (comp), NULL);
1792   priv->send_stream_start = TRUE;
1793 }
1794 
1795 /* nle_composition_ghost_pad_set_target:
1796  * target: The target #GstPad. The refcount will be decremented (given to the ghostpad).
1797  */
1798 static void
nle_composition_ghost_pad_set_target(NleComposition * comp,GstPad * target)1799 nle_composition_ghost_pad_set_target (NleComposition * comp, GstPad * target)
1800 {
1801   GstPad *ptarget;
1802   NleCompositionPrivate *priv = comp->priv;
1803 
1804   if (target)
1805     GST_DEBUG_OBJECT (comp, "target:%s:%s", GST_DEBUG_PAD_NAME (target));
1806   else
1807     GST_DEBUG_OBJECT (comp, "Removing target");
1808 
1809 
1810   ptarget =
1811       gst_ghost_pad_get_target (GST_GHOST_PAD (NLE_OBJECT (comp)->srcpad));
1812   if (ptarget) {
1813     gst_object_unref (ptarget);
1814 
1815     if (ptarget == target) {
1816       GST_DEBUG_OBJECT (comp,
1817           "Target of srcpad is the same as existing one, not changing");
1818       return;
1819     }
1820   }
1821 
1822   /* Actually set the target */
1823   nle_object_ghost_pad_set_target ((NleObject *) comp,
1824       NLE_OBJECT (comp)->srcpad, target);
1825 
1826   if (target && (priv->ghosteventprobe == 0)) {
1827     priv->ghosteventprobe =
1828         gst_pad_add_probe (target,
1829         GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
1830         GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM |
1831         GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
1832         (GstPadProbeCallback) ghost_event_probe_handler, comp, NULL);
1833     GST_DEBUG_OBJECT (comp, "added event probe %lu", priv->ghosteventprobe);
1834   }
1835 }
1836 
1837 static void
refine_start_stop_in_region_above_priority(NleComposition * composition,GstClockTime timestamp,GstClockTime start,GstClockTime stop,GstClockTime * rstart,GstClockTime * rstop,guint32 priority)1838 refine_start_stop_in_region_above_priority (NleComposition * composition,
1839     GstClockTime timestamp, GstClockTime start,
1840     GstClockTime stop,
1841     GstClockTime * rstart, GstClockTime * rstop, guint32 priority)
1842 {
1843   GList *tmp;
1844   NleObject *object;
1845   GstClockTime nstart = start, nstop = stop;
1846 
1847   GST_DEBUG_OBJECT (composition,
1848       "timestamp:%" GST_TIME_FORMAT " start: %" GST_TIME_FORMAT " stop: %"
1849       GST_TIME_FORMAT " priority:%u", GST_TIME_ARGS (timestamp),
1850       GST_TIME_ARGS (start), GST_TIME_ARGS (stop), priority);
1851 
1852   for (tmp = composition->priv->objects_start; tmp; tmp = tmp->next) {
1853     object = (NleObject *) tmp->data;
1854 
1855     GST_LOG_OBJECT (object, "START %" GST_TIME_FORMAT "--%" GST_TIME_FORMAT,
1856         GST_TIME_ARGS (object->start), GST_TIME_ARGS (object->stop));
1857 
1858     if ((object->priority >= priority) || (!NLE_OBJECT_ACTIVE (object)))
1859       continue;
1860 
1861     if (object->start <= timestamp)
1862       continue;
1863 
1864     if (object->start >= nstop)
1865       continue;
1866 
1867     nstop = object->start;
1868 
1869     GST_DEBUG_OBJECT (composition,
1870         "START Found %s [prio:%u] at %" GST_TIME_FORMAT,
1871         GST_OBJECT_NAME (object), object->priority,
1872         GST_TIME_ARGS (object->start));
1873 
1874     break;
1875   }
1876 
1877   for (tmp = composition->priv->objects_stop; tmp; tmp = tmp->next) {
1878     object = (NleObject *) tmp->data;
1879 
1880     GST_LOG_OBJECT (object, "STOP %" GST_TIME_FORMAT "--%" GST_TIME_FORMAT,
1881         GST_TIME_ARGS (object->start), GST_TIME_ARGS (object->stop));
1882 
1883     if ((object->priority >= priority) || (!NLE_OBJECT_ACTIVE (object)))
1884       continue;
1885 
1886     if (object->stop >= timestamp)
1887       continue;
1888 
1889     if (object->stop <= nstart)
1890       continue;
1891 
1892     nstart = object->stop;
1893 
1894     GST_DEBUG_OBJECT (composition,
1895         "STOP Found %s [prio:%u] at %" GST_TIME_FORMAT,
1896         GST_OBJECT_NAME (object), object->priority,
1897         GST_TIME_ARGS (object->start));
1898 
1899     break;
1900   }
1901 
1902   if (*rstart)
1903     *rstart = nstart;
1904 
1905   if (*rstop)
1906     *rstop = nstop;
1907 }
1908 
1909 
1910 /*
1911  * Converts a sorted list to a tree
1912  * Recursive
1913  *
1914  * stack will be set to the next item to use in the parent.
1915  * If operations number of sinks is limited, it will only use that number.
1916  */
1917 
1918 static GNode *
convert_list_to_tree(GList ** stack,GstClockTime * start,GstClockTime * stop,guint32 * highprio)1919 convert_list_to_tree (GList ** stack, GstClockTime * start,
1920     GstClockTime * stop, guint32 * highprio)
1921 {
1922   GNode *ret;
1923   guint nbsinks;
1924   gboolean limit;
1925   GList *tmp;
1926   NleObject *object;
1927 
1928   if (!stack || !*stack)
1929     return NULL;
1930 
1931   object = (NleObject *) (*stack)->data;
1932 
1933   GST_DEBUG ("object:%s , *start:%" GST_TIME_FORMAT ", *stop:%"
1934       GST_TIME_FORMAT " highprio:%d",
1935       GST_ELEMENT_NAME (object), GST_TIME_ARGS (*start),
1936       GST_TIME_ARGS (*stop), *highprio);
1937 
1938   /* update earliest stop */
1939   if (GST_CLOCK_TIME_IS_VALID (*stop)) {
1940     if (GST_CLOCK_TIME_IS_VALID (object->stop) && (*stop > object->stop))
1941       *stop = object->stop;
1942   } else {
1943     *stop = object->stop;
1944   }
1945 
1946   if (GST_CLOCK_TIME_IS_VALID (*start)) {
1947     if (GST_CLOCK_TIME_IS_VALID (object->start) && (*start < object->start))
1948       *start = object->start;
1949   } else {
1950     *start = object->start;
1951   }
1952 
1953   if (NLE_OBJECT_IS_SOURCE (object)) {
1954     *stack = g_list_next (*stack);
1955 
1956     /* update highest priority.
1957      * We do this here, since it's only used with sources (leafs of the tree) */
1958     if (object->priority > *highprio)
1959       *highprio = object->priority;
1960 
1961     ret = g_node_new (object);
1962 
1963     goto beach;
1964   } else {
1965     /* NleOperation */
1966     NleOperation *oper = (NleOperation *) object;
1967 
1968     GST_LOG_OBJECT (oper, "operation, num_sinks:%d", oper->num_sinks);
1969 
1970     ret = g_node_new (object);
1971     limit = (oper->dynamicsinks == FALSE);
1972     nbsinks = oper->num_sinks;
1973 
1974     /* FIXME : if num_sinks == -1 : request the proper number of pads */
1975     for (tmp = g_list_next (*stack); tmp && (!limit || nbsinks);) {
1976       g_node_append (ret, convert_list_to_tree (&tmp, start, stop, highprio));
1977       if (limit)
1978         nbsinks--;
1979     }
1980 
1981     *stack = tmp;
1982   }
1983 
1984 beach:
1985   GST_DEBUG_OBJECT (object,
1986       "*start:%" GST_TIME_FORMAT " *stop:%" GST_TIME_FORMAT
1987       " priority:%u", GST_TIME_ARGS (*start), GST_TIME_ARGS (*stop), *highprio);
1988 
1989   return ret;
1990 }
1991 
1992 /*
1993  * get_stack_list:
1994  * @comp: The #NleComposition
1995  * @timestamp: The #GstClockTime to look at
1996  * @priority: The priority level to start looking from
1997  * @activeonly: Only look for active elements if TRUE
1998  * @start: The biggest start time of the objects in the stack
1999  * @stop: The smallest stop time of the objects in the stack
2000  * @highprio: The highest priority in the stack
2001  *
2002  * Not MT-safe, you should take the objects lock before calling it.
2003  * Returns: A tree of #GNode sorted in priority order, corresponding
2004  * to the given search arguments. The returned value can be #NULL.
2005  *
2006  * WITH OBJECTS LOCK TAKEN
2007  */
2008 static GNode *
get_stack_list(NleComposition * comp,GstClockTime timestamp,guint32 priority,gboolean activeonly,GstClockTime * start,GstClockTime * stop,guint * highprio)2009 get_stack_list (NleComposition * comp, GstClockTime timestamp,
2010     guint32 priority, gboolean activeonly, GstClockTime * start,
2011     GstClockTime * stop, guint * highprio)
2012 {
2013   GList *tmp;
2014   GList *stack = NULL;
2015   GNode *ret = NULL;
2016   GstClockTime nstart = GST_CLOCK_TIME_NONE;
2017   GstClockTime nstop = GST_CLOCK_TIME_NONE;
2018   GstClockTime first_out_of_stack = GST_CLOCK_TIME_NONE;
2019   guint32 highest = 0;
2020   gboolean reverse = (comp->priv->segment->rate < 0.0);
2021 
2022   GST_DEBUG_OBJECT (comp,
2023       "timestamp:%" GST_TIME_FORMAT ", priority:%u, activeonly:%d",
2024       GST_TIME_ARGS (timestamp), priority, activeonly);
2025 
2026   GST_LOG ("objects_start:%p objects_stop:%p", comp->priv->objects_start,
2027       comp->priv->objects_stop);
2028 
2029   if (reverse) {
2030     for (tmp = comp->priv->objects_stop; tmp; tmp = g_list_next (tmp)) {
2031       NleObject *object = (NleObject *) tmp->data;
2032 
2033       GST_LOG_OBJECT (object,
2034           "start: %" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT " , duration:%"
2035           GST_TIME_FORMAT ", priority:%u, active:%d",
2036           GST_TIME_ARGS (object->start), GST_TIME_ARGS (object->stop),
2037           GST_TIME_ARGS (object->duration), object->priority, object->active);
2038 
2039       if (object->stop >= timestamp) {
2040         if ((object->start < timestamp) &&
2041             (object->priority >= priority) &&
2042             ((!activeonly) || (NLE_OBJECT_ACTIVE (object)))) {
2043           GST_LOG_OBJECT (comp, "adding %s: sorted to the stack",
2044               GST_OBJECT_NAME (object));
2045           stack = g_list_insert_sorted (stack, object,
2046               (GCompareFunc) priority_comp);
2047           if (NLE_IS_OPERATION (object))
2048             nle_operation_update_base_time (NLE_OPERATION (object), timestamp);
2049         }
2050       } else {
2051         GST_LOG_OBJECT (comp, "too far, stopping iteration");
2052         first_out_of_stack = object->stop;
2053         break;
2054       }
2055     }
2056   } else {
2057     for (tmp = comp->priv->objects_start; tmp; tmp = g_list_next (tmp)) {
2058       NleObject *object = (NleObject *) tmp->data;
2059 
2060       GST_LOG_OBJECT (object,
2061           "start: %" GST_TIME_FORMAT " , stop:%" GST_TIME_FORMAT " , duration:%"
2062           GST_TIME_FORMAT ", priority:%u", GST_TIME_ARGS (object->start),
2063           GST_TIME_ARGS (object->stop), GST_TIME_ARGS (object->duration),
2064           object->priority);
2065 
2066       if (object->start <= timestamp) {
2067         if ((object->stop > timestamp) &&
2068             (object->priority >= priority) &&
2069             ((!activeonly) || (NLE_OBJECT_ACTIVE (object)))) {
2070           GST_LOG_OBJECT (comp, "adding %s: sorted to the stack",
2071               GST_OBJECT_NAME (object));
2072           stack = g_list_insert_sorted (stack, object,
2073               (GCompareFunc) priority_comp);
2074           if (NLE_IS_OPERATION (object))
2075             nle_operation_update_base_time (NLE_OPERATION (object), timestamp);
2076         }
2077       } else {
2078         GST_LOG_OBJECT (comp, "too far, stopping iteration");
2079         first_out_of_stack = object->start;
2080         break;
2081       }
2082     }
2083   }
2084 
2085   /* Insert the expandables */
2086   if (G_LIKELY (timestamp < NLE_OBJECT_STOP (comp)))
2087     for (tmp = comp->priv->expandables; tmp; tmp = tmp->next) {
2088       GST_DEBUG_OBJECT (comp, "Adding expandable %s sorted to the list",
2089           GST_OBJECT_NAME (tmp->data));
2090       stack = g_list_insert_sorted (stack, tmp->data,
2091           (GCompareFunc) priority_comp);
2092       if (NLE_IS_OPERATION (tmp->data))
2093         nle_operation_update_base_time (NLE_OPERATION (tmp->data), timestamp);
2094     }
2095 
2096   /* convert that list to a stack */
2097   tmp = stack;
2098   ret = convert_list_to_tree (&tmp, &nstart, &nstop, &highest);
2099   if (GST_CLOCK_TIME_IS_VALID (first_out_of_stack)) {
2100     if (reverse && nstart < first_out_of_stack)
2101       nstart = first_out_of_stack;
2102     else if (!reverse && nstop > first_out_of_stack)
2103       nstop = first_out_of_stack;
2104   }
2105 
2106   GST_DEBUG ("nstart:%" GST_TIME_FORMAT ", nstop:%" GST_TIME_FORMAT,
2107       GST_TIME_ARGS (nstart), GST_TIME_ARGS (nstop));
2108 
2109   if (*stop)
2110     *stop = nstop;
2111   if (*start)
2112     *start = nstart;
2113   if (highprio)
2114     *highprio = highest;
2115 
2116   g_list_free (stack);
2117 
2118   return ret;
2119 }
2120 
2121 /*
2122  * get_clean_toplevel_stack:
2123  * @comp: The #NleComposition
2124  * @timestamp: The #GstClockTime to look at
2125  * @stop_time: Pointer to a #GstClockTime for min stop time of returned stack
2126  * @start_time: Pointer to a #GstClockTime for greatest start time of returned stack
2127  *
2128  * Returns: The new current stack for the given #NleComposition and @timestamp.
2129  *
2130  * WITH OBJECTS LOCK TAKEN
2131  */
2132 static GNode *
get_clean_toplevel_stack(NleComposition * comp,GstClockTime * timestamp,GstClockTime * start_time,GstClockTime * stop_time)2133 get_clean_toplevel_stack (NleComposition * comp, GstClockTime * timestamp,
2134     GstClockTime * start_time, GstClockTime * stop_time)
2135 {
2136   GNode *stack = NULL;
2137   GstClockTime start = G_MAXUINT64;
2138   GstClockTime stop = G_MAXUINT64;
2139   guint highprio;
2140   gboolean reverse = (comp->priv->segment->rate < 0.0);
2141 
2142   GST_DEBUG_OBJECT (comp, "timestamp:%" GST_TIME_FORMAT,
2143       GST_TIME_ARGS (*timestamp));
2144   GST_DEBUG ("start:%" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT,
2145       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2146 
2147   stack = get_stack_list (comp, *timestamp, 0, TRUE, &start, &stop, &highprio);
2148 
2149   if (!stack &&
2150       ((reverse && (*timestamp > COMP_REAL_START (comp))) ||
2151           (!reverse && (*timestamp < COMP_REAL_STOP (comp))))) {
2152     GST_ELEMENT_ERROR (comp, STREAM, WRONG_TYPE,
2153         ("Gaps ( at %" GST_TIME_FORMAT
2154             ") in the stream is not supported, the application is responsible"
2155             " for filling them", GST_TIME_ARGS (*timestamp)),
2156         ("Gap in the composition this should never"
2157             "append, make sure to fill them"));
2158 
2159     return NULL;
2160   }
2161 
2162   GST_DEBUG ("start:%" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT,
2163       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2164 
2165   if (stack) {
2166     guint32 top_priority = NLE_OBJECT_PRIORITY (stack->data);
2167 
2168     /* Figure out if there's anything blocking us with smaller priority */
2169     refine_start_stop_in_region_above_priority (comp, *timestamp, start,
2170         stop, &start, &stop, (highprio == 0) ? top_priority : highprio);
2171   }
2172 
2173   if (*stop_time) {
2174     if (stack)
2175       *stop_time = stop;
2176     else
2177       *stop_time = 0;
2178   }
2179 
2180   if (*start_time) {
2181     if (stack)
2182       *start_time = start;
2183     else
2184       *start_time = 0;
2185   }
2186 
2187   GST_DEBUG_OBJECT (comp,
2188       "Returning timestamp:%" GST_TIME_FORMAT " , start_time:%"
2189       GST_TIME_FORMAT " , stop_time:%" GST_TIME_FORMAT,
2190       GST_TIME_ARGS (*timestamp), GST_TIME_ARGS (*start_time),
2191       GST_TIME_ARGS (*stop_time));
2192 
2193   return stack;
2194 }
2195 
2196 static GstPadProbeReturn
_drop_all_cb(GstPad * pad G_GNUC_UNUSED,GstPadProbeInfo * info,NleComposition * comp)2197 _drop_all_cb (GstPad * pad G_GNUC_UNUSED,
2198     GstPadProbeInfo * info, NleComposition * comp)
2199 {
2200   return GST_PAD_PROBE_DROP;
2201 }
2202 
2203 /*  Must be called with OBJECTS_LOCK taken */
2204 static void
_set_current_bin_to_ready(NleComposition * comp,gboolean flush_downstream)2205 _set_current_bin_to_ready (NleComposition * comp, gboolean flush_downstream)
2206 {
2207   gint probe_id = -1;
2208   GstPad *ptarget = NULL;
2209   NleCompositionPrivate *priv = comp->priv;
2210   GstEvent *flush_event;
2211 
2212   comp->priv->tearing_down_stack = TRUE;
2213   if (flush_downstream) {
2214     ptarget = gst_ghost_pad_get_target (GST_GHOST_PAD (NLE_OBJECT_SRC (comp)));
2215     if (ptarget) {
2216 
2217       /* Make sure that between the flush_start/flush_stop
2218        * and the time we set the current_bin to READY, no
2219        * buffer can ever get prerolled which would lead to
2220        * a deadlock */
2221       probe_id = gst_pad_add_probe (ptarget,
2222           GST_PAD_PROBE_TYPE_DATA_BOTH | GST_PAD_PROBE_TYPE_EVENT_BOTH,
2223           (GstPadProbeCallback) _drop_all_cb, comp, NULL);
2224 
2225       GST_DEBUG_OBJECT (comp, "added event probe %lu", priv->ghosteventprobe);
2226 
2227       flush_event = gst_event_new_flush_start ();
2228       priv->flush_seqnum = gst_event_get_seqnum (flush_event);
2229       GST_INFO_OBJECT (comp, "sending flushes downstream with seqnum %d",
2230           priv->flush_seqnum);
2231       gst_pad_push_event (ptarget, flush_event);
2232 
2233     }
2234 
2235   }
2236 
2237   gst_element_set_locked_state (priv->current_bin, TRUE);
2238   gst_element_set_state (priv->current_bin, GST_STATE_READY);
2239 
2240   if (ptarget) {
2241     if (flush_downstream) {
2242       flush_event = gst_event_new_flush_stop (TRUE);
2243       gst_event_set_seqnum (flush_event, priv->flush_seqnum);
2244 
2245       /* Force ad activation so that the event can actually travel.
2246        * Not doing that would lead to the event being discarded.
2247        */
2248       gst_pad_set_active (ptarget, TRUE);
2249       gst_pad_push_event (ptarget, flush_event);
2250       gst_pad_set_active (ptarget, FALSE);
2251     }
2252 
2253     gst_pad_remove_probe (ptarget, probe_id);
2254     gst_object_unref (ptarget);
2255   }
2256 
2257   comp->priv->tearing_down_stack = FALSE;
2258 }
2259 
2260 static void
_emit_commited_signal_func(NleComposition * comp,gpointer udata)2261 _emit_commited_signal_func (NleComposition * comp, gpointer udata)
2262 {
2263   GST_INFO_OBJECT (comp, "Emiting COMMITED now that the stack " "is ready");
2264 
2265   g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
2266 }
2267 
2268 static void
_restart_task(NleComposition * comp)2269 _restart_task (NleComposition * comp)
2270 {
2271   GST_INFO_OBJECT (comp, "Restarting task! after %s DONE",
2272       UPDATE_PIPELINE_REASONS[comp->priv->updating_reason]);
2273 
2274   if (comp->priv->updating_reason == COMP_UPDATE_STACK_ON_COMMIT)
2275     _add_action (comp, G_CALLBACK (_emit_commited_signal_func), comp,
2276         G_PRIORITY_HIGH);
2277 
2278   comp->priv->seqnum_to_restart_task = 0;
2279   comp->priv->waiting_serialized_query_or_buffer = FALSE;
2280 
2281   comp->priv->updating_reason = COMP_UPDATE_STACK_NONE;
2282   GST_OBJECT_LOCK (comp);
2283   if (comp->task)
2284     gst_task_start (comp->task);
2285   GST_OBJECT_UNLOCK (comp);
2286 }
2287 
2288 static gboolean
_is_ready_to_restart_task(NleComposition * comp,GstEvent * event)2289 _is_ready_to_restart_task (NleComposition * comp, GstEvent * event)
2290 {
2291   NleCompositionPrivate *priv = comp->priv;
2292   gint seqnum = gst_event_get_seqnum (event);
2293 
2294 
2295   if (comp->priv->seqnum_to_restart_task == seqnum) {
2296     gchar *name = g_strdup_printf ("%s-new-stack__%" GST_TIME_FORMAT "--%"
2297         GST_TIME_FORMAT "", GST_OBJECT_NAME (comp),
2298         GST_TIME_ARGS (comp->priv->current_stack_start),
2299         GST_TIME_ARGS (comp->priv->current_stack_stop));
2300 
2301     GST_INFO_OBJECT (comp, "Got %s with proper seqnum"
2302         " done with stack reconfiguration %" GST_PTR_FORMAT,
2303         GST_EVENT_TYPE_NAME (event), event);
2304 
2305     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (comp),
2306         GST_DEBUG_GRAPH_SHOW_ALL, name);
2307     g_free (name);
2308 
2309     if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2310       GST_INFO_OBJECT (comp, "update_pipeline DONE");
2311       return TRUE;
2312     }
2313 
2314     priv->waiting_serialized_query_or_buffer = TRUE;
2315     return FALSE;
2316 
2317   } else if (comp->priv->seqnum_to_restart_task) {
2318     GST_INFO_OBJECT (comp, "WARNING: %s seqnum %i != wanted %i",
2319         GST_EVENT_TYPE_NAME (event), seqnum,
2320         comp->priv->seqnum_to_restart_task);
2321   }
2322 
2323   return FALSE;
2324 }
2325 
2326 static void
_commit_func(NleComposition * comp,UpdateCompositionData * ucompo)2327 _commit_func (NleComposition * comp, UpdateCompositionData * ucompo)
2328 {
2329   GstClockTime curpos;
2330   NleCompositionPrivate *priv = comp->priv;
2331 
2332   _post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
2333 
2334   /* Get current so that it represent the duration it was
2335    * before commiting children */
2336   curpos = get_current_position (comp);
2337 
2338   if (!_commit_all_values (comp)) {
2339     GST_DEBUG_OBJECT (comp, "Nothing to commit, leaving");
2340 
2341     g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, FALSE);
2342     _post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
2343 
2344     return;
2345   }
2346 
2347   if (priv->initialized == FALSE) {
2348     GST_DEBUG_OBJECT (comp, "Not initialized yet, just updating values");
2349 
2350     update_start_stop_duration (comp);
2351 
2352     g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
2353 
2354   } else {
2355     gboolean reverse;
2356 
2357     /* And update the pipeline at current position if needed */
2358     update_start_stop_duration (comp);
2359 
2360     reverse = (priv->segment->rate < 0.0);
2361     if (!reverse) {
2362       GST_DEBUG_OBJECT (comp,
2363           "Setting segment->start to curpos:%" GST_TIME_FORMAT,
2364           GST_TIME_ARGS (curpos));
2365       priv->segment->start = curpos;
2366     } else {
2367       GST_DEBUG_OBJECT (comp,
2368           "Setting segment->stop to curpos:%" GST_TIME_FORMAT,
2369           GST_TIME_ARGS (curpos));
2370       priv->segment->stop = curpos;
2371     }
2372     update_pipeline (comp, curpos, ucompo->seqnum, COMP_UPDATE_STACK_ON_COMMIT);
2373 
2374     if (!priv->current) {
2375       GST_INFO_OBJECT (comp, "No new stack set, we can go and keep acting on"
2376           " our children");
2377 
2378       g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
2379     }
2380   }
2381 
2382   _post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
2383 }
2384 
2385 static void
_update_pipeline_func(NleComposition * comp,UpdateCompositionData * ucompo)2386 _update_pipeline_func (NleComposition * comp, UpdateCompositionData * ucompo)
2387 {
2388   gboolean reverse;
2389   NleCompositionPrivate *priv = comp->priv;
2390 
2391   _post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
2392 
2393   /* Set up a non-initial seek on current_stack_stop */
2394   reverse = (priv->segment->rate < 0.0);
2395   if (!reverse) {
2396     GST_DEBUG_OBJECT (comp,
2397         "Setting segment->start to current_stack_stop:%" GST_TIME_FORMAT,
2398         GST_TIME_ARGS (priv->current_stack_stop));
2399     priv->segment->start = priv->current_stack_stop;
2400   } else {
2401     GST_DEBUG_OBJECT (comp,
2402         "Setting segment->stop to current_stack_start:%" GST_TIME_FORMAT,
2403         GST_TIME_ARGS (priv->current_stack_start));
2404     priv->segment->stop = priv->current_stack_start;
2405   }
2406 
2407   seek_handling (comp, ucompo->seqnum, COMP_UPDATE_STACK_ON_EOS);
2408 
2409   /* Post segment done if last seek was a segment seek */
2410   if (!priv->current && (priv->segment->flags & GST_SEEK_FLAG_SEGMENT)) {
2411     gint64 epos;
2412 
2413     if (GST_CLOCK_TIME_IS_VALID (priv->segment->stop))
2414       epos = (MIN (priv->segment->stop, NLE_OBJECT_STOP (comp)));
2415     else
2416       epos = NLE_OBJECT_STOP (comp);
2417 
2418     GST_LOG_OBJECT (comp, "Emitting segment done pos %" GST_TIME_FORMAT,
2419         GST_TIME_ARGS (epos));
2420     gst_element_post_message (GST_ELEMENT_CAST (comp),
2421         gst_message_new_segment_done (GST_OBJECT (comp),
2422             priv->segment->format, epos));
2423     gst_pad_push_event (NLE_OBJECT (comp)->srcpad,
2424         gst_event_new_segment_done (priv->segment->format, epos));
2425   }
2426 
2427   _post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
2428 }
2429 
2430 static GstStateChangeReturn
nle_composition_change_state(GstElement * element,GstStateChange transition)2431 nle_composition_change_state (GstElement * element, GstStateChange transition)
2432 {
2433   GstStateChangeReturn res;
2434   NleComposition *comp = (NleComposition *) element;
2435 
2436   GST_DEBUG_OBJECT (comp, "%s => %s",
2437       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
2438       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
2439 
2440   switch (transition) {
2441     case GST_STATE_CHANGE_NULL_TO_READY:
2442       _start_task (comp);
2443       break;
2444     case GST_STATE_CHANGE_READY_TO_PAUSED:
2445       /* state-lock all elements */
2446       GST_DEBUG_OBJECT (comp,
2447           "Setting all children to READY and locking their state");
2448 
2449       _add_update_compo_action (comp, G_CALLBACK (_initialize_stack_func),
2450           COMP_UPDATE_STACK_INITIALIZE);
2451       break;
2452     case GST_STATE_CHANGE_PAUSED_TO_READY:
2453       _stop_task (comp);
2454 
2455       _remove_update_actions (comp);
2456       _remove_seek_actions (comp);
2457       _deactivate_stack (comp, TRUE);
2458       comp->priv->tearing_down_stack = TRUE;
2459       break;
2460     case GST_STATE_CHANGE_READY_TO_NULL:
2461       _stop_task (comp);
2462 
2463       _remove_update_actions (comp);
2464       _remove_seek_actions (comp);
2465       comp->priv->tearing_down_stack = TRUE;
2466       break;
2467     default:
2468       break;
2469   }
2470 
2471   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2472 
2473   if (res == GST_STATE_CHANGE_FAILURE) {
2474     GST_ERROR_OBJECT (comp, "state change failure %s => %s",
2475         gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
2476         gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
2477 
2478     comp->priv->tearing_down_stack = TRUE;
2479     _stop_task (comp);
2480     nle_composition_reset (comp);
2481     gst_element_set_state (comp->priv->current_bin, GST_STATE_NULL);
2482     comp->priv->tearing_down_stack = FALSE;
2483 
2484     return res;
2485   }
2486 
2487   switch (transition) {
2488     case GST_STATE_CHANGE_PAUSED_TO_READY:
2489       comp->priv->tearing_down_stack = FALSE;
2490       nle_composition_reset (comp);
2491 
2492       /* In READY we are still able to process actions. */
2493       _start_task (comp);
2494       break;
2495     case GST_STATE_CHANGE_READY_TO_NULL:
2496       gst_element_set_state (comp->priv->current_bin, GST_STATE_NULL);
2497       comp->priv->tearing_down_stack = FALSE;
2498       break;
2499     default:
2500       break;
2501   }
2502 
2503   return res;
2504 }
2505 
2506 static gint
objects_start_compare(NleObject * a,NleObject * b)2507 objects_start_compare (NleObject * a, NleObject * b)
2508 {
2509   if (a->start == b->start) {
2510     if (a->priority < b->priority)
2511       return -1;
2512     if (a->priority > b->priority)
2513       return 1;
2514     return 0;
2515   }
2516   if (a->start < b->start)
2517     return -1;
2518   if (a->start > b->start)
2519     return 1;
2520   return 0;
2521 }
2522 
2523 static gint
objects_stop_compare(NleObject * a,NleObject * b)2524 objects_stop_compare (NleObject * a, NleObject * b)
2525 {
2526   if (a->stop == b->stop) {
2527     if (a->priority < b->priority)
2528       return -1;
2529     if (a->priority > b->priority)
2530       return 1;
2531     return 0;
2532   }
2533   if (b->stop < a->stop)
2534     return -1;
2535   if (b->stop > a->stop)
2536     return 1;
2537   return 0;
2538 }
2539 
2540 /* WITH OBJECTS LOCK TAKEN */
2541 static void
update_start_stop_duration(NleComposition * comp)2542 update_start_stop_duration (NleComposition * comp)
2543 {
2544   NleObject *obj;
2545   NleObject *cobj = (NleObject *) comp;
2546   NleCompositionPrivate *priv = comp->priv;
2547 
2548   _assert_proper_thread (comp);
2549 
2550   if (!priv->objects_start) {
2551     GST_INFO_OBJECT (comp, "no objects, resetting everything to 0");
2552 
2553     if (cobj->start) {
2554       cobj->start = cobj->pending_start = 0;
2555       g_object_notify_by_pspec (G_OBJECT (cobj),
2556           nleobject_properties[NLEOBJECT_PROP_START]);
2557     }
2558 
2559     if (cobj->duration) {
2560       cobj->pending_duration = cobj->duration = 0;
2561       g_object_notify_by_pspec (G_OBJECT (cobj),
2562           nleobject_properties[NLEOBJECT_PROP_DURATION]);
2563       signal_duration_change (comp);
2564     }
2565 
2566     if (cobj->stop) {
2567       cobj->stop = 0;
2568       g_object_notify_by_pspec (G_OBJECT (cobj),
2569           nleobject_properties[NLEOBJECT_PROP_STOP]);
2570     }
2571 
2572     return;
2573   }
2574 
2575   /* If we have a default object, the start position is 0 */
2576   if (priv->expandables) {
2577     GST_INFO_OBJECT (cobj,
2578         "Setting start to 0 because we have a default object");
2579 
2580     if (cobj->start != 0) {
2581       cobj->pending_start = cobj->start = 0;
2582       g_object_notify_by_pspec (G_OBJECT (cobj),
2583           nleobject_properties[NLEOBJECT_PROP_START]);
2584     }
2585 
2586   } else {
2587 
2588     /* Else it's the first object's start value */
2589     obj = (NleObject *) priv->objects_start->data;
2590 
2591     if (obj->start != cobj->start) {
2592       GST_INFO_OBJECT (obj, "setting start from %s to %" GST_TIME_FORMAT,
2593           GST_OBJECT_NAME (obj), GST_TIME_ARGS (obj->start));
2594       cobj->pending_start = cobj->start = obj->start;
2595       g_object_notify_by_pspec (G_OBJECT (cobj),
2596           nleobject_properties[NLEOBJECT_PROP_START]);
2597     }
2598 
2599   }
2600 
2601   obj = (NleObject *) priv->objects_stop->data;
2602 
2603   if (obj->stop != cobj->stop) {
2604     GST_INFO_OBJECT (obj, "setting stop from %s to %" GST_TIME_FORMAT,
2605         GST_OBJECT_NAME (obj), GST_TIME_ARGS (obj->stop));
2606 
2607     if (priv->expandables) {
2608       GList *tmp;
2609 
2610       GST_INFO_OBJECT (comp, "RE-setting all expandables duration and commit");
2611       for (tmp = priv->expandables; tmp; tmp = tmp->next) {
2612         g_object_set (tmp->data, "duration", obj->stop, NULL);
2613         nle_object_commit (NLE_OBJECT (tmp->data), FALSE);
2614       }
2615     }
2616 
2617     priv->segment->stop = obj->stop;
2618     cobj->stop = obj->stop;
2619     g_object_notify_by_pspec (G_OBJECT (cobj),
2620         nleobject_properties[NLEOBJECT_PROP_STOP]);
2621   }
2622 
2623   if ((cobj->stop - cobj->start) != cobj->duration) {
2624     cobj->pending_duration = cobj->duration = cobj->stop - cobj->start;
2625     g_object_notify_by_pspec (G_OBJECT (cobj),
2626         nleobject_properties[NLEOBJECT_PROP_DURATION]);
2627     signal_duration_change (comp);
2628   }
2629 
2630   GST_INFO_OBJECT (comp,
2631       "start:%" GST_TIME_FORMAT
2632       " stop:%" GST_TIME_FORMAT
2633       " duration:%" GST_TIME_FORMAT,
2634       GST_TIME_ARGS (cobj->start),
2635       GST_TIME_ARGS (cobj->stop), GST_TIME_ARGS (cobj->duration));
2636 }
2637 
2638 static void
_link_to_parent(NleComposition * comp,NleObject * newobj,NleObject * newparent)2639 _link_to_parent (NleComposition * comp, NleObject * newobj,
2640     NleObject * newparent)
2641 {
2642   GstPad *sinkpad;
2643 
2644   /* relink to new parent in required order */
2645   GST_LOG_OBJECT (comp, "Linking %s and %s",
2646       GST_ELEMENT_NAME (GST_ELEMENT (newobj)),
2647       GST_ELEMENT_NAME (GST_ELEMENT (newparent)));
2648 
2649   sinkpad = get_unlinked_sink_ghost_pad ((NleOperation *) newparent);
2650 
2651   if (G_UNLIKELY (sinkpad == NULL)) {
2652     GST_WARNING_OBJECT (comp,
2653         "Couldn't find an unlinked sinkpad from %s",
2654         GST_ELEMENT_NAME (newparent));
2655   } else {
2656     if (G_UNLIKELY (gst_pad_link_full (NLE_OBJECT_SRC (newobj), sinkpad,
2657                 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK)) {
2658       GST_WARNING_OBJECT (comp, "Failed to link pads %s:%s - %s:%s",
2659           GST_DEBUG_PAD_NAME (NLE_OBJECT_SRC (newobj)),
2660           GST_DEBUG_PAD_NAME (sinkpad));
2661     }
2662     gst_object_unref (sinkpad);
2663   }
2664 }
2665 
2666 static void
_relink_children_recursively(NleComposition * comp,NleObject * newobj,GNode * node,GstEvent * toplevel_seek)2667 _relink_children_recursively (NleComposition * comp,
2668     NleObject * newobj, GNode * node, GstEvent * toplevel_seek)
2669 {
2670   GNode *child;
2671   guint nbchildren = g_node_n_children (node);
2672   NleOperation *oper = (NleOperation *) newobj;
2673 
2674   GST_INFO_OBJECT (newobj, "is a %s operation, analyzing the %d children",
2675       oper->dynamicsinks ? "dynamic" : "regular", nbchildren);
2676   /* Update the operation's number of sinks, that will make it have the proper
2677    * number of sink pads to connect the children to. */
2678   if (oper->dynamicsinks)
2679     g_object_set (G_OBJECT (newobj), "sinks", nbchildren, NULL);
2680 
2681   for (child = node->children; child; child = child->next)
2682     _relink_single_node (comp, child, toplevel_seek);
2683 
2684   if (G_UNLIKELY (nbchildren < oper->num_sinks))
2685     GST_ELEMENT_ERROR (comp, STREAM, FAILED,
2686         ("The NleComposition structure is not valid"),
2687         ("%" GST_PTR_FORMAT
2688             " Not enough sinkpads to link all objects to the operation ! "
2689             "%d / %d, current toplevel seek %" GST_PTR_FORMAT,
2690             oper, oper->num_sinks, nbchildren, toplevel_seek));
2691 
2692   if (G_UNLIKELY (nbchildren == 0)) {
2693     GST_ELEMENT_ERROR (comp, STREAM, FAILED,
2694         ("The NleComposition structure is not valid"),
2695         ("Operation %" GST_PTR_FORMAT
2696             " has no child objects to be connected to "
2697             "current toplevel seek: %" GST_PTR_FORMAT, oper, toplevel_seek));
2698   }
2699   /* Make sure we have enough sinkpads */
2700 }
2701 
2702 /*
2703  * recursive depth-first relink stack function on new stack
2704  *
2705  * _ relink nodes with changed parent/order
2706  * _ links new nodes with parents
2707  * _ unblocks available source pads (except for toplevel)
2708  *
2709  * WITH OBJECTS LOCK TAKEN
2710  */
2711 static void
_relink_single_node(NleComposition * comp,GNode * node,GstEvent * toplevel_seek)2712 _relink_single_node (NleComposition * comp, GNode * node,
2713     GstEvent * toplevel_seek)
2714 {
2715   NleObject *newobj;
2716   NleObject *newparent;
2717   GNode *node_it;
2718   GstPad *srcpad = NULL, *sinkpad = NULL;
2719   GstEvent *translated_seek;
2720 
2721   if (G_UNLIKELY (!node))
2722     return;
2723 
2724   newparent = G_NODE_IS_ROOT (node) ? NULL : (NleObject *) node->parent->data;
2725   newobj = (NleObject *) node->data;
2726 
2727   GST_DEBUG_OBJECT (comp, "newobj:%s",
2728       GST_ELEMENT_NAME ((GstElement *) newobj));
2729 
2730   newobj->recursive_media_duration_factor = 1.0f;
2731   for (node_it = node; node_it != NULL; node_it = node_it->parent) {
2732     NleObject *object = (NleObject *) node_it->data;
2733     newobj->recursive_media_duration_factor *= object->media_duration_factor;
2734   }
2735 
2736   srcpad = NLE_OBJECT_SRC (newobj);
2737 
2738   gst_bin_add (GST_BIN (comp->priv->current_bin), GST_ELEMENT (newobj));
2739   gst_element_sync_state_with_parent (GST_ELEMENT_CAST (newobj));
2740 
2741   translated_seek = nle_object_translate_incoming_seek (newobj,
2742       gst_event_ref (toplevel_seek));
2743 
2744   gst_element_send_event (GST_ELEMENT (newobj), translated_seek);
2745 
2746   /* link to parent if needed.  */
2747   if (newparent) {
2748     _link_to_parent (comp, newobj, newparent);
2749 
2750     /* If there's an operation, inform it about priority changes */
2751     sinkpad = gst_pad_get_peer (srcpad);
2752     nle_operation_signal_input_priority_changed ((NleOperation *)
2753         newparent, sinkpad, newobj->priority);
2754     gst_object_unref (sinkpad);
2755   }
2756 
2757   /* Handle children */
2758   if (NLE_IS_OPERATION (newobj))
2759     _relink_children_recursively (comp, newobj, node, toplevel_seek);
2760 
2761   GST_LOG_OBJECT (comp, "done with object %s",
2762       GST_ELEMENT_NAME (GST_ELEMENT (newobj)));
2763 }
2764 
2765 
2766 
2767 /*
2768  * compare_relink_stack:
2769  * @comp: The #NleComposition
2770  * @stack: The new stack
2771  * @modify: TRUE if the timeline has changed and needs downstream flushes.
2772  *
2773  * Compares the given stack to the current one and relinks it if needed.
2774  *
2775  * WITH OBJECTS LOCK TAKEN
2776  *
2777  * Returns: The #GList of #NleObject no longer used
2778  */
2779 
2780 static void
_deactivate_stack(NleComposition * comp,gboolean flush_downstream)2781 _deactivate_stack (NleComposition * comp, gboolean flush_downstream)
2782 {
2783   GstPad *ptarget;
2784 
2785   GST_INFO_OBJECT (comp, "Deactivating current stack (flushing downstream: %d)",
2786       flush_downstream);
2787   _set_current_bin_to_ready (comp, flush_downstream);
2788 
2789   ptarget = gst_ghost_pad_get_target (GST_GHOST_PAD (NLE_OBJECT_SRC (comp)));
2790   _empty_bin (GST_BIN_CAST (comp->priv->current_bin));
2791 
2792   if (comp->priv->ghosteventprobe) {
2793     GST_INFO_OBJECT (comp, "Removing old ghost pad probe");
2794 
2795     gst_pad_remove_probe (ptarget, comp->priv->ghosteventprobe);
2796     comp->priv->ghosteventprobe = 0;
2797   }
2798 
2799   if (ptarget)
2800     gst_object_unref (ptarget);
2801 
2802   GST_INFO_OBJECT (comp, "Stack desctivated");
2803 
2804 /*   priv->current = NULL;
2805  */
2806 }
2807 
2808 static void
_relink_new_stack(NleComposition * comp,GNode * stack,GstEvent * toplevel_seek)2809 _relink_new_stack (NleComposition * comp, GNode * stack,
2810     GstEvent * toplevel_seek)
2811 {
2812   _relink_single_node (comp, stack, toplevel_seek);
2813 
2814   gst_event_unref (toplevel_seek);
2815 }
2816 
2817 /* static void
2818  * unlock_activate_stack (NleComposition * comp, GNode * node, GstState state)
2819  * {
2820  *   GNode *child;
2821  *
2822  *   GST_LOG_OBJECT (comp, "object:%s",
2823  *       GST_ELEMENT_NAME ((GstElement *) (node->data)));
2824  *
2825  *   gst_element_set_locked_state ((GstElement *) (node->data), FALSE);
2826  *   gst_element_set_state (GST_ELEMENT (node->data), state);
2827  *
2828  *   for (child = node->children; child; child = child->next)
2829  *     unlock_activate_stack (comp, child, state);
2830  * }
2831  */
2832 
2833 static gboolean
are_same_stacks(GNode * stack1,GNode * stack2)2834 are_same_stacks (GNode * stack1, GNode * stack2)
2835 {
2836   gboolean res = FALSE;
2837 
2838   /* TODO : FIXME : we should also compare start/inpoint */
2839   /* stacks are not equal if one of them is NULL but not the other */
2840   if ((!stack1 && stack2) || (stack1 && !stack2))
2841     goto beach;
2842 
2843   if (stack1 && stack2) {
2844     GNode *child1, *child2;
2845 
2846     /* if they don't contain the same source, not equal */
2847     if (!(stack1->data == stack2->data))
2848       goto beach;
2849 
2850     /* if they don't have the same number of children, not equal */
2851     if (!(g_node_n_children (stack1) == g_node_n_children (stack2)))
2852       goto beach;
2853 
2854     child1 = stack1->children;
2855     child2 = stack2->children;
2856     while (child1 && child2) {
2857       if (!(are_same_stacks (child1, child2)))
2858         goto beach;
2859       child1 = g_node_next_sibling (child1);
2860       child2 = g_node_next_sibling (child2);
2861     }
2862 
2863     /* if there's a difference in child number, stacks are not equal */
2864     if (child1 || child2)
2865       goto beach;
2866   }
2867 
2868   /* if stack1 AND stack2 are NULL, then they're equal (both empty) */
2869   res = TRUE;
2870 
2871 beach:
2872   GST_LOG ("Stacks are equal : %d", res);
2873 
2874   return res;
2875 }
2876 
2877 static inline gboolean
_activate_new_stack(NleComposition * comp)2878 _activate_new_stack (NleComposition * comp)
2879 {
2880   GstPad *pad;
2881   GstElement *topelement;
2882 
2883   NleCompositionPrivate *priv = comp->priv;
2884 
2885   if (!priv->current) {
2886     if ((!priv->objects_start)) {
2887       nle_composition_reset_target_pad (comp);
2888       priv->current_stack_start = 0;
2889       priv->current_stack_stop = GST_CLOCK_TIME_NONE;
2890     }
2891 
2892     GST_DEBUG_OBJECT (comp, "Nothing else in the composition"
2893         ", update 'worked'");
2894     goto resync_state;
2895   }
2896 
2897   /* The stack is entirely ready, send seek out synchronously */
2898   topelement = GST_ELEMENT (priv->current->data);
2899   /* Get toplevel object source pad */
2900   pad = NLE_OBJECT_SRC (topelement);
2901 
2902   GST_INFO_OBJECT (comp,
2903       "We have a valid toplevel element pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2904 
2905   nle_composition_ghost_pad_set_target (comp, pad);
2906 
2907   GST_DEBUG_OBJECT (comp, "New stack activated!");
2908 
2909 resync_state:
2910   gst_element_set_locked_state (priv->current_bin, FALSE);
2911 
2912   GST_DEBUG ("going back to parent state");
2913   priv->suppress_child_error = TRUE;
2914   if (!gst_element_sync_state_with_parent (priv->current_bin)) {
2915     gst_element_set_locked_state (priv->current_bin, TRUE);
2916     gst_element_set_state (priv->current_bin, GST_STATE_NULL);
2917     priv->suppress_child_error = FALSE;
2918 
2919     GST_ELEMENT_ERROR (comp, CORE, STATE_CHANGE, (NULL),
2920         ("Could not sync %" GST_PTR_FORMAT " state with parent",
2921             priv->current_bin));
2922     return FALSE;
2923   }
2924 
2925   priv->suppress_child_error = FALSE;
2926   GST_DEBUG ("gone back to parent state");
2927 
2928   return TRUE;
2929 }
2930 
2931 /* WITH OBJECTS LOCK TAKEN */
2932 static gboolean
_set_real_eos_seqnum_from_seek(NleComposition * comp,GstEvent * event)2933 _set_real_eos_seqnum_from_seek (NleComposition * comp, GstEvent * event)
2934 {
2935   GList *tmp;
2936 
2937   gboolean should_check_objects = FALSE;
2938   NleCompositionPrivate *priv = comp->priv;
2939   gboolean reverse = (priv->segment->rate < 0);
2940   gint stack_seqnum = gst_event_get_seqnum (event);
2941 
2942   if (reverse && GST_CLOCK_TIME_IS_VALID (priv->current_stack_start))
2943     should_check_objects = TRUE;
2944   else if (!reverse && GST_CLOCK_TIME_IS_VALID (priv->current_stack_stop))
2945     should_check_objects = TRUE;
2946 
2947   if (should_check_objects) {
2948     for (tmp = priv->objects_stop; tmp; tmp = g_list_next (tmp)) {
2949       NleObject *object = (NleObject *) tmp->data;
2950 
2951       if (!NLE_IS_SOURCE (object))
2952         continue;
2953 
2954       if ((!reverse && priv->current_stack_stop < object->stop) ||
2955           (reverse && priv->current_stack_start > object->start)) {
2956         priv->next_eos_seqnum = stack_seqnum;
2957         g_atomic_int_set (&priv->real_eos_seqnum, 0);
2958         return FALSE;
2959       }
2960     }
2961   }
2962 
2963   priv->next_eos_seqnum = stack_seqnum;
2964   g_atomic_int_set (&priv->real_eos_seqnum, stack_seqnum);
2965 
2966   return TRUE;
2967 }
2968 
2969 #ifndef GST_DISABLE_GST_DEBUG
2970 static gboolean
_print_stack(GNode * node,gpointer res)2971 _print_stack (GNode * node, gpointer res)
2972 {
2973   NleObject *obj = NLE_OBJECT (node->data);
2974 
2975   g_string_append_printf ((GString *) res,
2976       "%*s [s=%" GST_TIME_FORMAT " - d=%" GST_TIME_FORMAT "] prio=%d\n",
2977       g_node_depth (node) * 4, GST_OBJECT_NAME (obj),
2978       GST_TIME_ARGS (NLE_OBJECT_START (obj)),
2979       GST_TIME_ARGS (NLE_OBJECT_STOP (obj)), obj->priority);
2980 
2981   return FALSE;
2982 }
2983 #endif
2984 
2985 static void
_dump_stack(NleComposition * comp,GNode * stack)2986 _dump_stack (NleComposition * comp, GNode * stack)
2987 {
2988 #ifndef GST_DISABLE_GST_DEBUG
2989   GString *res;
2990 
2991   if (!stack)
2992     return;
2993 
2994   if (gst_debug_category_get_threshold (nlecomposition_debug) < GST_LEVEL_INFO)
2995     return;
2996 
2997   res = g_string_new (NULL);
2998   g_string_append_printf (res, " ====> dumping stack [%" GST_TIME_FORMAT " - %"
2999       GST_TIME_FORMAT "]:\n",
3000       GST_TIME_ARGS (comp->priv->current_stack_start),
3001       GST_TIME_ARGS (comp->priv->current_stack_stop));
3002   g_node_traverse (stack, G_LEVEL_ORDER, G_TRAVERSE_ALL, -1, _print_stack, res);
3003 
3004   GST_INFO_OBJECT (comp, "%s", res->str);
3005   g_string_free (res, TRUE);
3006 #endif
3007 }
3008 
3009 /*
3010  * update_pipeline:
3011  * @comp: The #NleComposition
3012  * @currenttime: The #GstClockTime to update at, can be GST_CLOCK_TIME_NONE.
3013  * @update_reason: Reason why we are updating the pipeline
3014  *
3015  * Updates the internal pipeline and properties. If @currenttime is
3016  * GST_CLOCK_TIME_NONE, it will not modify the current pipeline
3017  *
3018  * Returns: FALSE if there was an error updating the pipeline.
3019  *
3020  * WITH OBJECTS LOCK TAKEN
3021  */
3022 static gboolean
update_pipeline(NleComposition * comp,GstClockTime currenttime,gint32 seqnum,NleUpdateStackReason update_reason)3023 update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum,
3024     NleUpdateStackReason update_reason)
3025 {
3026 
3027   GstEvent *toplevel_seek;
3028 
3029   GNode *stack = NULL;
3030   gboolean samestack = FALSE;
3031   gboolean updatestoponly = FALSE;
3032   GstState state = GST_STATE (comp);
3033   NleCompositionPrivate *priv = comp->priv;
3034   GstClockTime new_stop = GST_CLOCK_TIME_NONE;
3035   GstClockTime new_start = GST_CLOCK_TIME_NONE;
3036   GstClockTime duration = NLE_OBJECT (comp)->duration - 1;
3037 
3038   GstState nextstate = (GST_STATE_NEXT (comp) == GST_STATE_VOID_PENDING) ?
3039       GST_STATE (comp) : GST_STATE_NEXT (comp);
3040 
3041   _assert_proper_thread (comp);
3042 
3043   if (currenttime >= duration) {
3044     currenttime = duration;
3045     priv->segment->start = GST_CLOCK_TIME_NONE;
3046     priv->segment->stop = GST_CLOCK_TIME_NONE;
3047   }
3048 
3049   GST_INFO_OBJECT (comp,
3050       "currenttime:%" GST_TIME_FORMAT
3051       " Reason: %s, Seqnum: %i", GST_TIME_ARGS (currenttime),
3052       UPDATE_PIPELINE_REASONS[update_reason], seqnum);
3053 
3054   if (!GST_CLOCK_TIME_IS_VALID (currenttime))
3055     return FALSE;
3056 
3057   if (state == GST_STATE_NULL && nextstate == GST_STATE_NULL) {
3058     GST_DEBUG_OBJECT (comp, "STATE_NULL: not updating pipeline");
3059     return FALSE;
3060   }
3061 
3062   GST_DEBUG_OBJECT (comp,
3063       "now really updating the pipeline, current-state:%s",
3064       gst_element_state_get_name (state));
3065 
3066   /* Get new stack and compare it to current one */
3067   stack = get_clean_toplevel_stack (comp, &currenttime, &new_start, &new_stop);
3068   samestack = are_same_stacks (priv->current, stack);
3069 
3070   /* set new current_stack_start/stop (the current zone over which the new stack
3071    * is valid) */
3072   if (priv->segment->rate >= 0.0) {
3073     priv->current_stack_start = currenttime;
3074     priv->current_stack_stop = new_stop;
3075   } else {
3076     priv->current_stack_start = new_start;
3077     priv->current_stack_stop = currenttime;
3078   }
3079 
3080 # if 0
3081   /* FIXME -- We should be ablt to use updatestoponly in that case,
3082    * but it simply does not work! Not using it leads to same
3083    * behaviour, but less optimized */
3084 
3085   gboolean startchanged, stopchanged;
3086 
3087   if (priv->segment->rate >= 0.0) {
3088     startchanged = priv->current_stack_start != currenttime;
3089     stopchanged = priv->current_stack_stop != new_stop;
3090   } else {
3091     startchanged = priv->current_stack_start != new_start;
3092     stopchanged = priv->current_stack_stop != currenttime;
3093   }
3094 
3095   if (samestack) {
3096     if (startchanged || stopchanged) {
3097       /* Update seek events need to be flushing if not in PLAYING,
3098        * else we will encounter deadlocks. */
3099       updatestoponly = (state == GST_STATE_PLAYING) ? FALSE : TRUE;
3100     }
3101   }
3102 #endif
3103 
3104   toplevel_seek = get_new_seek_event (comp, TRUE, updatestoponly);
3105   gst_event_set_seqnum (toplevel_seek, seqnum);
3106   _set_real_eos_seqnum_from_seek (comp, toplevel_seek);
3107 
3108   _remove_update_actions (comp);
3109 
3110   /* If stacks are different, unlink/relink objects */
3111   if (!samestack) {
3112     _dump_stack (comp, stack);
3113     _deactivate_stack (comp, _have_to_flush_downstream (update_reason));
3114     _relink_new_stack (comp, stack, toplevel_seek);
3115   }
3116 
3117   /* Unlock all elements in new stack */
3118   GST_INFO_OBJECT (comp, "Setting current stack [%" GST_TIME_FORMAT " - %"
3119       GST_TIME_FORMAT "]", GST_TIME_ARGS (priv->current_stack_start),
3120       GST_TIME_ARGS (priv->current_stack_stop));
3121 
3122   if (priv->current)
3123     g_node_destroy (priv->current);
3124 
3125   priv->current = stack;
3126 
3127   if (priv->current) {
3128 
3129     GST_INFO_OBJECT (comp, "New stack set and ready to run, probing src pad"
3130         " and stopping children thread until we are actually ready with"
3131         " that new stack");
3132 
3133     comp->priv->updating_reason = update_reason;
3134     comp->priv->seqnum_to_restart_task = seqnum;
3135 
3136     GST_OBJECT_LOCK (comp);
3137     if (comp->task == NULL) {
3138       GST_INFO_OBJECT (comp,
3139           "No task set, it must have been stopped, returning");
3140       GST_OBJECT_UNLOCK (comp);
3141       return FALSE;
3142     }
3143 
3144     gst_task_pause (comp->task);
3145     GST_OBJECT_UNLOCK (comp);
3146   }
3147 
3148   /* Activate stack */
3149   if (!samestack)
3150     return _activate_new_stack (comp);
3151   else
3152     return _seek_current_stack (comp, toplevel_seek,
3153         _have_to_flush_downstream (update_reason));
3154 }
3155 
3156 static gboolean
nle_composition_add_object(GstBin * bin,GstElement * element)3157 nle_composition_add_object (GstBin * bin, GstElement * element)
3158 {
3159   NleObject *object;
3160   NleComposition *comp = (NleComposition *) bin;
3161 
3162   if (element == comp->priv->current_bin) {
3163     GST_INFO_OBJECT (comp, "Adding internal bin");
3164     return GST_BIN_CLASS (parent_class)->add_element (bin, element);
3165   }
3166 
3167   g_return_val_if_fail (NLE_IS_OBJECT (element), FALSE);
3168 
3169   object = NLE_OBJECT (element);
3170   gst_object_ref_sink (object);
3171 
3172   object->in_composition = TRUE;
3173   _add_add_object_action (comp, object);
3174 
3175   return TRUE;
3176 }
3177 
3178 static gboolean
_nle_composition_add_object(NleComposition * comp,NleObject * object)3179 _nle_composition_add_object (NleComposition * comp, NleObject * object)
3180 {
3181   gboolean ret = TRUE;
3182   NleCompositionPrivate *priv = comp->priv;
3183 
3184   GST_DEBUG_OBJECT (comp, "element %s", GST_OBJECT_NAME (object));
3185   GST_DEBUG_OBJECT (object, "%" GST_TIME_FORMAT "--%" GST_TIME_FORMAT,
3186       GST_TIME_ARGS (NLE_OBJECT_START (object)),
3187       GST_TIME_ARGS (NLE_OBJECT_STOP (object)));
3188 
3189   if ((NLE_OBJECT_IS_EXPANDABLE (object)) &&
3190       g_list_find (priv->expandables, object)) {
3191     GST_WARNING_OBJECT (comp,
3192         "We already have an expandable, remove it before adding new one");
3193     ret = FALSE;
3194 
3195     goto chiringuito;
3196   }
3197 
3198   nle_object_set_caps (object, NLE_OBJECT (comp)->caps);
3199   nle_object_set_commit_needed (NLE_OBJECT (comp));
3200 
3201   if (!ret) {
3202     GST_WARNING_OBJECT (comp, "couldn't add object");
3203     goto chiringuito;
3204   }
3205 
3206   /* lock state of child ! */
3207   GST_LOG_OBJECT (comp, "Locking state of %s", GST_ELEMENT_NAME (object));
3208 
3209   if (NLE_OBJECT_IS_EXPANDABLE (object)) {
3210     /* Only react on non-default objects properties */
3211     g_object_set (object,
3212         "start", (GstClockTime) 0,
3213         "inpoint", (GstClockTime) 0,
3214         "duration", (GstClockTimeDiff) NLE_OBJECT_STOP (comp), NULL);
3215 
3216     GST_INFO_OBJECT (object, "Used as expandable, commiting now");
3217     nle_object_commit (NLE_OBJECT (object), FALSE);
3218   }
3219 
3220   /* ...and add it to the hash table */
3221   g_hash_table_add (priv->objects_hash, object);
3222 
3223   /* Set the caps of the composition on the NleObject it handles */
3224   if (G_UNLIKELY (!gst_caps_is_any (((NleObject *) comp)->caps)))
3225     nle_object_set_caps ((NleObject *) object, ((NleObject *) comp)->caps);
3226 
3227   /* Special case for default source. */
3228   if (NLE_OBJECT_IS_EXPANDABLE (object)) {
3229     /* It doesn't get added to objects_start and objects_stop. */
3230     priv->expandables = g_list_prepend (priv->expandables, object);
3231     goto beach;
3232   }
3233 
3234   /* add it sorted to the objects list */
3235   priv->objects_start = g_list_insert_sorted
3236       (priv->objects_start, object, (GCompareFunc) objects_start_compare);
3237 
3238   if (priv->objects_start)
3239     GST_LOG_OBJECT (comp,
3240         "Head of objects_start is now %s [%" GST_TIME_FORMAT "--%"
3241         GST_TIME_FORMAT "]",
3242         GST_OBJECT_NAME (priv->objects_start->data),
3243         GST_TIME_ARGS (NLE_OBJECT_START (priv->objects_start->data)),
3244         GST_TIME_ARGS (NLE_OBJECT_STOP (priv->objects_start->data)));
3245 
3246   priv->objects_stop = g_list_insert_sorted
3247       (priv->objects_stop, object, (GCompareFunc) objects_stop_compare);
3248 
3249   /* Now the object is ready to be commited and then used */
3250 
3251 beach:
3252   return ret;
3253 
3254 chiringuito:
3255   {
3256     update_start_stop_duration (comp);
3257     goto beach;
3258   }
3259 }
3260 
3261 static gboolean
nle_composition_remove_object(GstBin * bin,GstElement * element)3262 nle_composition_remove_object (GstBin * bin, GstElement * element)
3263 {
3264   NleObject *object;
3265   NleComposition *comp = (NleComposition *) bin;
3266 
3267   if (element == comp->priv->current_bin) {
3268     GST_INFO_OBJECT (comp, "Removing internal bin");
3269     return GST_BIN_CLASS (parent_class)->remove_element (bin, element);
3270   }
3271 
3272   g_return_val_if_fail (NLE_IS_OBJECT (element), FALSE);
3273 
3274   object = NLE_OBJECT (element);
3275 
3276   object->in_composition = FALSE;
3277   _add_remove_object_action (comp, object);
3278 
3279   return TRUE;
3280 }
3281 
3282 static gboolean
_nle_composition_remove_object(NleComposition * comp,NleObject * object)3283 _nle_composition_remove_object (NleComposition * comp, NleObject * object)
3284 {
3285   NleCompositionPrivate *priv = comp->priv;
3286 
3287   GST_DEBUG_OBJECT (comp, "removing object %s", GST_OBJECT_NAME (object));
3288 
3289   if (!g_hash_table_contains (priv->objects_hash, object)) {
3290     GST_INFO_OBJECT (comp, "object was not in composition");
3291     return FALSE;
3292   }
3293 
3294   gst_element_set_locked_state (GST_ELEMENT (object), FALSE);
3295   gst_element_set_state (GST_ELEMENT (object), GST_STATE_NULL);
3296 
3297   /* handle default source */
3298   if (NLE_OBJECT_IS_EXPANDABLE (object)) {
3299     /* Find it in the list */
3300     priv->expandables = g_list_remove (priv->expandables, object);
3301   } else {
3302     /* remove it from the objects list and resort the lists */
3303     priv->objects_start = g_list_remove (priv->objects_start, object);
3304     priv->objects_stop = g_list_remove (priv->objects_stop, object);
3305     GST_LOG_OBJECT (object, "Removed from the objects start/stop list");
3306   }
3307 
3308   if (priv->current && NLE_OBJECT (priv->current->data) == NLE_OBJECT (object))
3309     nle_composition_reset_target_pad (comp);
3310 
3311   g_hash_table_remove (priv->objects_hash, object);
3312 
3313   GST_LOG_OBJECT (object, "Done removing from the composition, now updating");
3314 
3315   /* Make it possible to reuse the same object later */
3316   nle_object_reset (NLE_OBJECT (object));
3317   gst_object_unref (object);
3318 
3319   return TRUE;
3320 }
3321