1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 /**
23  * SECTION:gstadaptivedemux
24  * @short_description: Base class for adaptive demuxers
25  * @see_also:
26  *
27  * What is an adaptive demuxer?
28  * Adaptive demuxers are special demuxers in the sense that they don't
29  * actually demux data received from upstream but download the data
30  * themselves.
31  *
32  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
33  * a set of fragments. The manifest describes the available media and
34  * the sequence of fragments to use. Each fragment contains a small
35  * part of the media (typically only a few seconds). It is possible for
36  * the manifest to have the same media available in different configurations
37  * (bitrates for example) so that the client can select the one that
38  * best suits its scenario (network fluctuation, hardware requirements...).
39  * It is possible to switch from one representation of the media to another
40  * during playback. That's why it is called 'adaptive', because it can be
41  * adapted to the client's needs.
42  *
43  * Architectural overview:
44  * The manifest is received by the demuxer in its sink pad and, upon receiving
45  * EOS, it parses the manifest and exposes the streams available in it. For
46  * each stream a source element will be created and will download the list
47  * of fragments one by one. Once a fragment is finished downloading, the next
48  * URI is set to the source element and it starts fetching it and pushing
49  * through the stream's pad. This implies that each stream is independent from
50  * each other as it runs on a separate thread.
51  *
52  * After downloading each fragment, the download rate of it is calculated and
53  * the demuxer has a chance to switch to a different bitrate if needed. The
54  * switch can be done by simply pushing a new caps before the next fragment
55  * when codecs are the same, or by exposing a new pad group if it needs
56  * a codec change.
57  *
58  * Extra features:
59  * - Not linked streams: Streams that are not-linked have their download threads
60  *                       interrupted to save network bandwidth. When they are
61  *                       relinked a reconfigure event is received and the
62  *                       stream is restarted.
63  *
64  * Subclasses:
65  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
66  * about the intrinsics of the subclass formats, so the subclasses are
67  * resposible for maintaining the manifest data structures and stream
68  * information.
69  */
70 
71 /*
72 MT safety.
73 The following rules were observed while implementing MT safety in adaptive demux:
74 1. If a variable is accessed from multiple threads and at least one thread
75 writes to it, then all the accesses needs to be done from inside a critical section.
76 2. If thread A wants to join thread B then at the moment it calls gst_task_join
77 it must not hold any mutexes that thread B might take.
78 
79 Adaptive demux API can be called from several threads. More, adaptive demux
80 starts some threads to monitor the download of fragments. In order to protect
81 accesses to shared variables (demux and streams) all the API functions that
82 can be run in different threads will need to get a mutex (manifest_lock)
83 when they start and release it when they end. Because some of those functions
84 can indirectly call other API functions (eg they can generate events or messages
85 that are processed in the same thread) the manifest_lock must be recursive.
86 
87 The manifest_lock will serialize the public API making access to shared
88 variables safe. But some of these functions will try at some moment to join
89 threads created by adaptive demux, or to change the state of src elements
90 (which will block trying to join the src element streaming thread). Because
91 of rule 2, those functions will need to release the manifest_lock during the
92 call of gst_task_join. During this time they can be interrupted by other API calls.
93 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
94 is called and this will join all threads. In order to prevent interruptions
95 during such period, all the API functions will also use a second lock: api_lock.
96 This will be taken at the beginning of the function and released at the end,
97 but this time this lock will not be temporarily released during join.
98 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
99 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
100 to hold it while joining the threads or changing the src element state. The
101 api_lock will serialise all external requests to adaptive demux. In order to
102 avoid deadlocks, if a function needs to acquire both manifest and api locks,
103 the api_lock will be taken first and the manifest_lock second.
104 
105 By using the api_lock a thread is protected against other API calls. But when
106 temporarily dropping the manifest_lock, it will be vulnerable to changes from
107 threads that use only the manifest_lock and not the api_lock. These threads run
108 one of the following functions: gst_adaptive_demux_stream_download_loop,
109 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
110 that all operations during an API call are not impacted by other writes, the
111 above mentioned functions must check a cancelled flag every time they reacquire
112 the manifest_lock. If the flag is set, they must exit immediately, without
113 performing any changes on the shared data. In this way, an API call (eg seek
114 request) can set the cancel flag before releasing the manifest_lock and be sure
115 that the demux object and its streams are not changed by anybody else.
116 */
117 
118 #ifdef HAVE_CONFIG_H
119 #include "config.h"
120 #endif
121 
122 #include "gstadaptivedemux.h"
123 #include "gst/gst-i18n-plugin.h"
124 #include <gst/base/gstadapter.h>
125 
126 GST_DEBUG_CATEGORY (adaptivedemux_debug);
127 #define GST_CAT_DEFAULT adaptivedemux_debug
128 
129 #define MAX_DOWNLOAD_ERROR_COUNT 3
130 #define DEFAULT_FAILED_COUNT 3
131 #define DEFAULT_CONNECTION_SPEED 0
132 #define DEFAULT_BITRATE_LIMIT 0.8f
133 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024    /* For safety. Large enough to hold a segment. */
134 #define NUM_LOOKBACK_FRAGMENTS 3
135 
136 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
137 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
138     GST_TRACE("Locking from thread %p", g_thread_self()); \
139     g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
140     GST_TRACE("Locked from thread %p", g_thread_self()); \
141  } G_STMT_END
142 
143 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
144     GST_TRACE("Unlocking from thread %p", g_thread_self()); \
145     g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
146  } G_STMT_END
147 
148 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
149 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
150 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
151 
152 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
153 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
154 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
155 
156 enum
157 {
158   PROP_0,
159   PROP_CONNECTION_SPEED,
160   PROP_BITRATE_LIMIT,
161   PROP_LAST
162 };
163 
164 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
165 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
166 
167 struct _GstAdaptiveDemuxPrivate
168 {
169   GstAdapter *input_adapter;    /* protected by manifest_lock */
170   gint have_manifest;           /* MT safe */
171 
172   GList *old_streams;           /* protected by manifest_lock */
173 
174   GstTask *updates_task;        /* MT safe */
175   GRecMutex updates_lock;
176   GMutex updates_timed_lock;
177   GCond updates_timed_cond;     /* protected by updates_timed_lock */
178   gboolean stop_updates_task;   /* protected by updates_timed_lock */
179 
180   /* used only from updates_task, no need to protect it */
181   gint update_failed_count;
182 
183   guint32 segment_seqnum;       /* protected by manifest_lock */
184 
185   /* main lock used to protect adaptive demux and all its streams.
186    * It serializes the adaptive demux public API.
187    */
188   GRecMutex manifest_lock;
189 
190   /* condition to wait for manifest updates on a live stream.
191    * In order to signal the manifest_cond, the caller needs to hold both
192    * manifest_lock and manifest_update_lock (taken in this order)
193    */
194   GCond manifest_cond;
195   GMutex manifest_update_lock;
196 
197   /* Lock and condition for prerolling streams before exposing */
198   GMutex preroll_lock;
199   GCond preroll_cond;
200   gint preroll_pending;
201 
202   GMutex api_lock;
203 
204   /* Protects demux and stream segment information
205    * Needed because seeks can update segment information
206    * without needing to stop tasks when they just want to
207    * update the segment boundaries */
208   GMutex segment_lock;
209 };
210 
211 typedef struct _GstAdaptiveDemuxTimer
212 {
213   volatile gint ref_count;
214   GCond *cond;
215   GMutex *mutex;
216   GstClockID clock_id;
217   gboolean fired;
218 } GstAdaptiveDemuxTimer;
219 
220 static GstBinClass *parent_class = NULL;
221 static gint private_offset = 0;
222 
223 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
224 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
225     GstAdaptiveDemuxClass * klass);
226 static void gst_adaptive_demux_finalize (GObject * object);
227 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
228     element, GstStateChange transition);
229 
230 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
231 
232 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
233     GstEvent * event);
234 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
235     GstObject * parent, GstBuffer * buffer);
236 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
237     GstQuery * query);
238 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
239     GstEvent * event);
240 
241 static gboolean
242 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
243 
244 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
245 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
246     stream);
247 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
248 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
249     gboolean first_and_live);
250 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
251 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
252 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
253     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
254     GstClockTime ts, GstClockTime * final_ts);
255 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
256     demux, GstAdaptiveDemuxStream * stream);
257 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
258     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
259 static GstFlowReturn
260 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
261     GstAdaptiveDemuxStream * stream);
262 static gint64
263 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
264     GstAdaptiveDemuxStream * stream);
265 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
266     demux);
267 static GstFlowReturn
268 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
269 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
270 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
271 
272 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
273 static GstFlowReturn
274 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
275     GstEvent * event);
276 
277 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
278     demux);
279 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
280     demux);
281 
282 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
283     gboolean start_preroll_streams);
284 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
285     gboolean stop_updates);
286 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
287     demux);
288 static void
289 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
290     stream, GstFlowReturn ret, GError * err);
291 static GstFlowReturn
292 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
293     GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
294 static GstFlowReturn
295 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
296     GstAdaptiveDemuxStream * stream);
297 static GstFlowReturn
298 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
299     GstAdaptiveDemuxStream * stream, GstClockTime duration);
300 static gboolean
301 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
302     GstClockTime end_time);
303 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
304     GstClockTime time, GstClockID id, gpointer user_data);
305 static gboolean
306 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
307     * demux);
308 
309 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
310  * method to get to the padtemplates */
311 GType
gst_adaptive_demux_get_type(void)312 gst_adaptive_demux_get_type (void)
313 {
314   static volatile gsize type = 0;
315 
316   if (g_once_init_enter (&type)) {
317     GType _type;
318     static const GTypeInfo info = {
319       sizeof (GstAdaptiveDemuxClass),
320       NULL,
321       NULL,
322       (GClassInitFunc) gst_adaptive_demux_class_init,
323       NULL,
324       NULL,
325       sizeof (GstAdaptiveDemux),
326       0,
327       (GInstanceInitFunc) gst_adaptive_demux_init,
328     };
329 
330     _type = g_type_register_static (GST_TYPE_BIN,
331         "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
332 
333     private_offset =
334         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
335 
336     g_once_init_leave (&type, _type);
337   }
338   return type;
339 }
340 
341 static inline GstAdaptiveDemuxPrivate *
gst_adaptive_demux_get_instance_private(GstAdaptiveDemux * self)342 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
343 {
344   return (G_STRUCT_MEMBER_P (self, private_offset));
345 }
346 
347 static void
gst_adaptive_demux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)348 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
349     const GValue * value, GParamSpec * pspec)
350 {
351   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
352 
353   GST_API_LOCK (demux);
354   GST_MANIFEST_LOCK (demux);
355 
356   switch (prop_id) {
357     case PROP_CONNECTION_SPEED:
358       demux->connection_speed = g_value_get_uint (value) * 1000;
359       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
360           demux->connection_speed);
361       break;
362     case PROP_BITRATE_LIMIT:
363       demux->bitrate_limit = g_value_get_float (value);
364       break;
365     default:
366       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
367       break;
368   }
369 
370   GST_MANIFEST_UNLOCK (demux);
371   GST_API_UNLOCK (demux);
372 }
373 
374 static void
gst_adaptive_demux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)375 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
376     GValue * value, GParamSpec * pspec)
377 {
378   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
379 
380   GST_MANIFEST_LOCK (demux);
381 
382   switch (prop_id) {
383     case PROP_CONNECTION_SPEED:
384       g_value_set_uint (value, demux->connection_speed / 1000);
385       break;
386     case PROP_BITRATE_LIMIT:
387       g_value_set_float (value, demux->bitrate_limit);
388       break;
389     default:
390       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
391       break;
392   }
393 
394   GST_MANIFEST_UNLOCK (demux);
395 }
396 
397 static void
gst_adaptive_demux_class_init(GstAdaptiveDemuxClass * klass)398 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
399 {
400   GObjectClass *gobject_class;
401   GstElementClass *gstelement_class;
402   GstBinClass *gstbin_class;
403 
404   gobject_class = G_OBJECT_CLASS (klass);
405   gstelement_class = GST_ELEMENT_CLASS (klass);
406   gstbin_class = GST_BIN_CLASS (klass);
407 
408   GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
409       "Base Adaptive Demux");
410 
411   parent_class = g_type_class_peek_parent (klass);
412 
413   if (private_offset != 0)
414     g_type_class_adjust_private_offset (klass, &private_offset);
415 
416   gobject_class->set_property = gst_adaptive_demux_set_property;
417   gobject_class->get_property = gst_adaptive_demux_get_property;
418   gobject_class->finalize = gst_adaptive_demux_finalize;
419 
420   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
421       g_param_spec_uint ("connection-speed", "Connection Speed",
422           "Network connection speed in kbps (0 = calculate from downloaded"
423           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
424           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
425 
426   /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
427   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
428       g_param_spec_float ("bitrate-limit",
429           "Bitrate limit in %",
430           "Limit of the available bitrate to use when switching to alternates.",
431           0, 1, DEFAULT_BITRATE_LIMIT,
432           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433 
434   gstelement_class->change_state = gst_adaptive_demux_change_state;
435 
436   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
437 
438   klass->data_received = gst_adaptive_demux_stream_data_received_default;
439   klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
440   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
441   klass->requires_periodical_playlist_update =
442       gst_adaptive_demux_requires_periodical_playlist_update_default;
443 
444 }
445 
446 static void
gst_adaptive_demux_init(GstAdaptiveDemux * demux,GstAdaptiveDemuxClass * klass)447 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
448     GstAdaptiveDemuxClass * klass)
449 {
450   GstPadTemplate *pad_template;
451   GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
452   GObjectClass *gobject_class;
453 
454   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
455 
456   demux->priv = gst_adaptive_demux_get_instance_private (demux);
457   demux->priv->input_adapter = gst_adapter_new ();
458   demux->downloader = gst_uri_downloader_new ();
459   gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
460   demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
461   demux->priv->segment_seqnum = gst_util_seqnum_next ();
462   demux->have_group_id = FALSE;
463   demux->group_id = G_MAXUINT;
464 
465   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
466 
467   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
468       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
469 
470   demux->realtime_clock = gst_system_clock_obtain ();
471   g_assert (demux->realtime_clock != NULL);
472   gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
473   if (g_object_class_find_property (gobject_class, "clock-type")) {
474     g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
475   } else {
476     GST_WARNING_OBJECT (demux,
477         "System clock does not have clock-type property");
478   }
479   if (clock_type == GST_CLOCK_TYPE_REALTIME) {
480     demux->clock_offset = 0;
481   } else {
482     GDateTime *utc_now;
483     GstClockTime rtc_now;
484     GTimeVal gtv;
485 
486     utc_now = g_date_time_new_now_utc ();
487     rtc_now = gst_clock_get_time (demux->realtime_clock);
488     g_date_time_to_timeval (utc_now, &gtv);
489     demux->clock_offset =
490         gtv.tv_sec * G_TIME_SPAN_SECOND + gtv.tv_usec -
491         GST_TIME_AS_USECONDS (rtc_now);
492     g_date_time_unref (utc_now);
493   }
494   g_rec_mutex_init (&demux->priv->updates_lock);
495   demux->priv->updates_task =
496       gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
497       demux, NULL);
498   gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
499 
500   g_mutex_init (&demux->priv->updates_timed_lock);
501   g_cond_init (&demux->priv->updates_timed_cond);
502 
503   g_cond_init (&demux->priv->manifest_cond);
504   g_mutex_init (&demux->priv->manifest_update_lock);
505 
506   g_rec_mutex_init (&demux->priv->manifest_lock);
507   g_mutex_init (&demux->priv->api_lock);
508   g_mutex_init (&demux->priv->segment_lock);
509 
510   g_cond_init (&demux->priv->preroll_cond);
511   g_mutex_init (&demux->priv->preroll_lock);
512 
513   pad_template =
514       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
515   g_return_if_fail (pad_template != NULL);
516 
517   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
518   gst_pad_set_event_function (demux->sinkpad,
519       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
520   gst_pad_set_chain_function (demux->sinkpad,
521       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
522 
523   /* Properties */
524   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
525   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
526 
527   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
528 }
529 
530 static void
gst_adaptive_demux_finalize(GObject * object)531 gst_adaptive_demux_finalize (GObject * object)
532 {
533   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
534   GstAdaptiveDemuxPrivate *priv = demux->priv;
535 
536   GST_DEBUG_OBJECT (object, "finalize");
537 
538   g_object_unref (priv->input_adapter);
539   g_object_unref (demux->downloader);
540 
541   g_mutex_clear (&priv->updates_timed_lock);
542   g_cond_clear (&priv->updates_timed_cond);
543   g_mutex_clear (&demux->priv->manifest_update_lock);
544   g_cond_clear (&demux->priv->manifest_cond);
545   g_object_unref (priv->updates_task);
546   g_rec_mutex_clear (&priv->updates_lock);
547   g_rec_mutex_clear (&demux->priv->manifest_lock);
548   g_mutex_clear (&demux->priv->api_lock);
549   g_mutex_clear (&demux->priv->segment_lock);
550   if (demux->realtime_clock) {
551     gst_object_unref (demux->realtime_clock);
552     demux->realtime_clock = NULL;
553   }
554 
555   g_cond_clear (&demux->priv->preroll_cond);
556   g_mutex_clear (&demux->priv->preroll_lock);
557 
558   G_OBJECT_CLASS (parent_class)->finalize (object);
559 }
560 
561 static GstStateChangeReturn
gst_adaptive_demux_change_state(GstElement * element,GstStateChange transition)562 gst_adaptive_demux_change_state (GstElement * element,
563     GstStateChange transition)
564 {
565   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
566   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
567 
568   switch (transition) {
569     case GST_STATE_CHANGE_PAUSED_TO_READY:
570       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
571         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
572       gst_uri_downloader_cancel (demux->downloader);
573 
574       GST_API_LOCK (demux);
575       GST_MANIFEST_LOCK (demux);
576       gst_adaptive_demux_reset (demux);
577       GST_MANIFEST_UNLOCK (demux);
578       GST_API_UNLOCK (demux);
579       break;
580     case GST_STATE_CHANGE_READY_TO_PAUSED:
581       GST_API_LOCK (demux);
582       GST_MANIFEST_LOCK (demux);
583       gst_adaptive_demux_reset (demux);
584       /* Clear "cancelled" flag in uridownloader since subclass might want to
585        * use uridownloader to fetch another manifest */
586       gst_uri_downloader_reset (demux->downloader);
587       if (g_atomic_int_get (&demux->priv->have_manifest))
588         gst_adaptive_demux_start_manifest_update_task (demux);
589       GST_MANIFEST_UNLOCK (demux);
590       GST_API_UNLOCK (demux);
591       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
592         GST_DEBUG_OBJECT (demux, "demuxer has started running");
593       break;
594     default:
595       break;
596   }
597 
598   /* this must be run without MANIFEST_LOCK taken.
599    * For PLAYING to PLAYING state changes, it will want to take a lock in
600    * src element and that lock is held while the streaming thread is running.
601    * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
602    */
603   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
604 
605   return result;
606 }
607 
608 static gboolean
gst_adaptive_demux_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)609 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
610     GstEvent * event)
611 {
612   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
613   gboolean ret;
614 
615   switch (event->type) {
616     case GST_EVENT_FLUSH_STOP:{
617       GST_API_LOCK (demux);
618       GST_MANIFEST_LOCK (demux);
619 
620       gst_adaptive_demux_reset (demux);
621 
622       ret = gst_pad_event_default (pad, parent, event);
623 
624       GST_MANIFEST_UNLOCK (demux);
625       GST_API_UNLOCK (demux);
626 
627       return ret;
628     }
629     case GST_EVENT_EOS:{
630       GstAdaptiveDemuxClass *demux_class;
631       GstQuery *query;
632       gboolean query_res;
633       gboolean ret = TRUE;
634       gsize available;
635       GstBuffer *manifest_buffer;
636 
637       GST_API_LOCK (demux);
638       GST_MANIFEST_LOCK (demux);
639 
640       demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
641 
642       available = gst_adapter_available (demux->priv->input_adapter);
643 
644       if (available == 0) {
645         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
646         ret = gst_pad_event_default (pad, parent, event);
647 
648         GST_MANIFEST_UNLOCK (demux);
649         GST_API_UNLOCK (demux);
650 
651         return ret;
652       }
653 
654       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
655 
656       /* Need to get the URI to use it as a base to generate the fragment's
657        * uris */
658       query = gst_query_new_uri ();
659       query_res = gst_pad_peer_query (pad, query);
660       if (query_res) {
661         gchar *uri, *redirect_uri;
662         gboolean permanent;
663 
664         gst_query_parse_uri (query, &uri);
665         gst_query_parse_uri_redirection (query, &redirect_uri);
666         gst_query_parse_uri_redirection_permanent (query, &permanent);
667 
668         if (permanent && redirect_uri) {
669           demux->manifest_uri = redirect_uri;
670           demux->manifest_base_uri = NULL;
671           g_free (uri);
672         } else {
673           demux->manifest_uri = uri;
674           demux->manifest_base_uri = redirect_uri;
675         }
676 
677         GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
678             demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
679       } else {
680         GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
681       }
682       gst_query_unref (query);
683 
684       /* Let the subclass parse the manifest */
685       manifest_buffer =
686           gst_adapter_take_buffer (demux->priv->input_adapter, available);
687       if (!demux_class->process_manifest (demux, manifest_buffer)) {
688         /* In most cases, this will happen if we set a wrong url in the
689          * source element and we have received the 404 HTML response instead of
690          * the manifest */
691         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
692             (NULL));
693         ret = FALSE;
694       } else {
695         g_atomic_int_set (&demux->priv->have_manifest, TRUE);
696       }
697       gst_buffer_unref (manifest_buffer);
698 
699       gst_element_post_message (GST_ELEMENT_CAST (demux),
700           gst_message_new_element (GST_OBJECT_CAST (demux),
701               gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
702                   "manifest-uri", G_TYPE_STRING,
703                   demux->manifest_uri, "uri", G_TYPE_STRING,
704                   demux->manifest_uri,
705                   "manifest-download-start", GST_TYPE_CLOCK_TIME,
706                   GST_CLOCK_TIME_NONE,
707                   "manifest-download-stop", GST_TYPE_CLOCK_TIME,
708                   gst_util_get_timestamp (), NULL)));
709 
710       if (ret) {
711         /* Send duration message */
712         if (!gst_adaptive_demux_is_live (demux)) {
713           GstClockTime duration = demux_class->get_duration (demux);
714 
715           if (duration != GST_CLOCK_TIME_NONE) {
716             GST_DEBUG_OBJECT (demux,
717                 "Sending duration message : %" GST_TIME_FORMAT,
718                 GST_TIME_ARGS (duration));
719             gst_element_post_message (GST_ELEMENT (demux),
720                 gst_message_new_duration_changed (GST_OBJECT (demux)));
721           } else {
722             GST_DEBUG_OBJECT (demux,
723                 "media duration unknown, can not send the duration message");
724           }
725         }
726 
727         if (demux->next_streams) {
728           gst_adaptive_demux_prepare_streams (demux,
729               gst_adaptive_demux_is_live (demux));
730           gst_adaptive_demux_start_tasks (demux, TRUE);
731           gst_adaptive_demux_start_manifest_update_task (demux);
732         } else {
733           /* no streams */
734           GST_WARNING_OBJECT (demux, "No streams created from manifest");
735           GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
736               (_("This file contains no playable streams.")),
737               ("No known stream formats found at the Manifest"));
738           ret = FALSE;
739         }
740 
741       }
742       GST_MANIFEST_UNLOCK (demux);
743       GST_API_UNLOCK (demux);
744 
745       gst_event_unref (event);
746       return ret;
747     }
748     case GST_EVENT_SEGMENT:
749       /* Swallow newsegments, we'll push our own */
750       gst_event_unref (event);
751       return TRUE;
752     default:
753       break;
754   }
755 
756   return gst_pad_event_default (pad, parent, event);
757 }
758 
759 static GstFlowReturn
gst_adaptive_demux_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)760 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
761     GstBuffer * buffer)
762 {
763   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
764 
765   GST_MANIFEST_LOCK (demux);
766 
767   gst_adapter_push (demux->priv->input_adapter, buffer);
768 
769   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
770       (gint) gst_adapter_available (demux->priv->input_adapter));
771 
772   GST_MANIFEST_UNLOCK (demux);
773   return GST_FLOW_OK;
774 }
775 
776 /* must be called with manifest_lock taken */
777 static void
gst_adaptive_demux_reset(GstAdaptiveDemux * demux)778 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
779 {
780   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
781   GList *iter;
782   GList *old_streams;
783   GstEvent *eos;
784 
785   /* take ownership of old_streams before releasing the manifest_lock in
786    * gst_adaptive_demux_stop_tasks
787    */
788   old_streams = demux->priv->old_streams;
789   demux->priv->old_streams = NULL;
790 
791   gst_adaptive_demux_stop_tasks (demux, TRUE);
792 
793   if (klass->reset)
794     klass->reset (demux);
795 
796   eos = gst_event_new_eos ();
797   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
798     GstAdaptiveDemuxStream *stream = iter->data;
799     if (stream->pad) {
800       gst_pad_push_event (stream->pad, gst_event_ref (eos));
801       gst_pad_set_active (stream->pad, FALSE);
802 
803       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
804     }
805     gst_adaptive_demux_stream_free (stream);
806   }
807   gst_event_unref (eos);
808   g_list_free (demux->streams);
809   demux->streams = NULL;
810   if (demux->prepared_streams) {
811     g_list_free_full (demux->prepared_streams,
812         (GDestroyNotify) gst_adaptive_demux_stream_free);
813     demux->prepared_streams = NULL;
814   }
815   if (demux->next_streams) {
816     g_list_free_full (demux->next_streams,
817         (GDestroyNotify) gst_adaptive_demux_stream_free);
818     demux->next_streams = NULL;
819   }
820 
821   if (old_streams) {
822     g_list_free_full (old_streams,
823         (GDestroyNotify) gst_adaptive_demux_stream_free);
824   }
825 
826   if (demux->priv->old_streams) {
827     g_list_free_full (demux->priv->old_streams,
828         (GDestroyNotify) gst_adaptive_demux_stream_free);
829     demux->priv->old_streams = NULL;
830   }
831 
832   g_free (demux->manifest_uri);
833   g_free (demux->manifest_base_uri);
834   demux->manifest_uri = NULL;
835   demux->manifest_base_uri = NULL;
836 
837   gst_adapter_clear (demux->priv->input_adapter);
838   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
839 
840   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
841 
842   demux->have_group_id = FALSE;
843   demux->group_id = G_MAXUINT;
844   demux->priv->segment_seqnum = gst_util_seqnum_next ();
845 }
846 
847 static void
gst_adaptive_demux_handle_message(GstBin * bin,GstMessage * msg)848 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
849 {
850   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
851 
852   switch (GST_MESSAGE_TYPE (msg)) {
853     case GST_MESSAGE_ERROR:{
854       GList *iter;
855       GstAdaptiveDemuxStream *stream = NULL;
856       GError *err = NULL;
857       gchar *debug = NULL;
858       gchar *new_error = NULL;
859       const GstStructure *details = NULL;
860 
861       GST_MANIFEST_LOCK (demux);
862 
863       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
864         GstAdaptiveDemuxStream *cur = iter->data;
865         if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
866                 GST_OBJECT_CAST (cur->src))) {
867           stream = cur;
868           break;
869         }
870       }
871       if (stream == NULL) {
872         for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
873           GstAdaptiveDemuxStream *cur = iter->data;
874           if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
875                   GST_OBJECT_CAST (cur->src))) {
876             stream = cur;
877             break;
878           }
879         }
880         if (stream == NULL) {
881           GST_WARNING_OBJECT (demux,
882               "Failed to locate stream for errored element");
883           break;
884         }
885       }
886 
887       gst_message_parse_error (msg, &err, &debug);
888 
889       GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
890           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
891           err->message, debug);
892 
893       if (debug)
894         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
895       if (new_error) {
896         g_free (err->message);
897         err->message = new_error;
898       }
899 
900       gst_message_parse_error_details (msg, &details);
901       if (details) {
902         gst_structure_get_uint (details, "http-status-code",
903             &stream->last_status_code);
904       }
905 
906       /* error, but ask to retry */
907       gst_adaptive_demux_stream_fragment_download_finish (stream,
908           GST_FLOW_CUSTOM_ERROR, err);
909 
910       g_error_free (err);
911       g_free (debug);
912 
913       GST_MANIFEST_UNLOCK (demux);
914 
915       gst_message_unref (msg);
916       msg = NULL;
917     }
918       break;
919     default:
920       break;
921   }
922 
923   if (msg)
924     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
925 }
926 
927 void
gst_adaptive_demux_set_stream_struct_size(GstAdaptiveDemux * demux,gsize struct_size)928 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
929     gsize struct_size)
930 {
931   GST_API_LOCK (demux);
932   GST_MANIFEST_LOCK (demux);
933   demux->stream_struct_size = struct_size;
934   GST_MANIFEST_UNLOCK (demux);
935   GST_API_UNLOCK (demux);
936 }
937 
938 /* must be called with manifest_lock taken */
939 static gboolean
gst_adaptive_demux_prepare_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)940 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
941     GstAdaptiveDemuxStream * stream)
942 {
943   GstPad *pad = stream->pad;
944   gchar *name = gst_pad_get_name (pad);
945   GstEvent *event;
946   gchar *stream_id;
947 
948   gst_pad_set_active (pad, TRUE);
949   stream->need_header = TRUE;
950 
951   stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
952 
953   event =
954       gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
955       GST_EVENT_STREAM_START, 0);
956   if (event) {
957     if (gst_event_parse_group_id (event, &demux->group_id))
958       demux->have_group_id = TRUE;
959     else
960       demux->have_group_id = FALSE;
961     gst_event_unref (event);
962   } else if (!demux->have_group_id) {
963     demux->have_group_id = TRUE;
964     demux->group_id = gst_util_group_id_next ();
965   }
966   event = gst_event_new_stream_start (stream_id);
967   if (demux->have_group_id)
968     gst_event_set_group_id (event, demux->group_id);
969 
970   gst_pad_push_event (pad, event);
971   g_free (stream_id);
972   g_free (name);
973 
974   GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
975 
976   stream->discont = TRUE;
977 
978   return TRUE;
979 }
980 
981 static gboolean
gst_adaptive_demux_expose_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)982 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
983     GstAdaptiveDemuxStream * stream)
984 {
985   gboolean ret;
986   GstPad *pad = stream->pad;
987   GstCaps *caps;
988 
989   if (stream->pending_caps) {
990     gst_pad_set_caps (pad, stream->pending_caps);
991     caps = stream->pending_caps;
992     stream->pending_caps = NULL;
993   } else {
994     caps = gst_pad_get_current_caps (pad);
995   }
996 
997   GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
998       GST_DEBUG_PAD_NAME (pad), caps);
999   if (caps)
1000     gst_caps_unref (caps);
1001 
1002   gst_object_ref (pad);
1003 
1004   /* Don't hold the manifest lock while exposing a pad */
1005   GST_MANIFEST_UNLOCK (demux);
1006   ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1007   GST_MANIFEST_LOCK (demux);
1008 
1009   return ret;
1010 }
1011 
1012 /* must be called with manifest_lock taken */
1013 static GstClockTime
gst_adaptive_demux_stream_get_presentation_offset(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1014 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1015     GstAdaptiveDemuxStream * stream)
1016 {
1017   GstAdaptiveDemuxClass *klass;
1018 
1019   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1020 
1021   if (klass->get_presentation_offset == NULL)
1022     return 0;
1023 
1024   return klass->get_presentation_offset (demux, stream);
1025 }
1026 
1027 /* must be called with manifest_lock taken */
1028 static GstClockTime
gst_adaptive_demux_get_period_start_time(GstAdaptiveDemux * demux)1029 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1030 {
1031   GstAdaptiveDemuxClass *klass;
1032 
1033   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1034 
1035   if (klass->get_period_start_time == NULL)
1036     return 0;
1037 
1038   return klass->get_period_start_time (demux);
1039 }
1040 
1041 /* must be called with manifest_lock taken */
1042 static gboolean
gst_adaptive_demux_prepare_streams(GstAdaptiveDemux * demux,gboolean first_and_live)1043 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1044     gboolean first_and_live)
1045 {
1046   GList *iter;
1047   GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1048 
1049   g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1050   if (demux->prepared_streams != NULL) {
1051     /* Old streams that were never exposed, due to a seek or so */
1052     GST_FIXME_OBJECT (demux,
1053         "Preparing new streams without cleaning up old ones!");
1054     return FALSE;
1055   }
1056 
1057   demux->prepared_streams = demux->next_streams;
1058   demux->next_streams = NULL;
1059 
1060   if (!gst_adaptive_demux_is_running (demux)) {
1061     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1062     return TRUE;
1063   }
1064 
1065   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1066     GstAdaptiveDemuxStream *stream = iter->data;
1067 
1068     stream->do_block = TRUE;
1069 
1070     if (!gst_adaptive_demux_prepare_stream (demux,
1071             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1072       /* TODO act on error */
1073       GST_FIXME_OBJECT (stream->pad,
1074           "Do something on failure to expose stream");
1075     }
1076 
1077     if (first_and_live) {
1078       /* TODO we only need the first timestamp, maybe create a simple function to
1079        * get the current PTS of a fragment ? */
1080       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1081       gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1082 
1083       if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1084         min_pts = MIN (min_pts, stream->fragment.timestamp);
1085       } else {
1086         min_pts = stream->fragment.timestamp;
1087       }
1088     }
1089   }
1090 
1091   period_start = gst_adaptive_demux_get_period_start_time (demux);
1092 
1093   /* For live streams, the subclass is supposed to seek to the current
1094    * fragment and then tell us its timestamp in stream->fragment.timestamp.
1095    * We now also have to seek our demuxer segment to reflect this.
1096    *
1097    * FIXME: This needs some refactoring at some point.
1098    */
1099   if (first_and_live) {
1100     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1101         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1102         GST_SEEK_TYPE_NONE, -1, NULL);
1103   }
1104 
1105   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1106     GstAdaptiveDemuxStream *stream = iter->data;
1107     GstClockTime offset;
1108 
1109     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1110     stream->segment = demux->segment;
1111 
1112     /* The demuxer segment is just built from seek events, but for each stream
1113      * we have to adjust segments according to the current period and the
1114      * stream specific presentation time offset.
1115      *
1116      * For each period, buffer timestamps start again from 0. Additionally the
1117      * buffer timestamps are shifted by the stream specific presentation time
1118      * offset, so the first buffer timestamp of a period is 0 + presentation
1119      * time offset. If the stream contains timestamps itself, this is also
1120      * supposed to be the presentation time stored inside the stream.
1121      *
1122      * The stream time over periods is supposed to be continuous, that is the
1123      * buffer timestamp 0 + presentation time offset should map to the start
1124      * time of the current period.
1125      *
1126      *
1127      * The adjustment of the stream segments as such works the following.
1128      *
1129      * If the demuxer segment start is bigger than the period start, this
1130      * means that we have to drop some media at the beginning of the current
1131      * period, e.g. because a seek into the middle of the period has
1132      * happened. The amount of media to drop is the difference between the
1133      * period start and the demuxer segment start, and as each period starts
1134      * again from 0, this difference is going to be the actual stream's
1135      * segment start. As all timestamps of the stream are shifted by the
1136      * presentation time offset, we will also have to move the segment start
1137      * by that offset.
1138      *
1139      * Likewise, the demuxer segment stop value is adjusted in the same
1140      * fashion.
1141      *
1142      * Now the running time and stream time at the stream's segment start has
1143      * to be the one that is stored inside the demuxer's segment, which means
1144      * that segment.base and segment.time have to be copied over (done just
1145      * above)
1146      *
1147      *
1148      * If the demuxer segment start is smaller than the period start time,
1149      * this means that the whole period is inside the segment. As each period
1150      * starts timestamps from 0, and additionally timestamps are shifted by
1151      * the presentation time offset, the stream's first timestamp (and as such
1152      * the stream's segment start) has to be the presentation time offset.
1153      * The stream time at the segment start is supposed to be the stream time
1154      * of the period start according to the demuxer segment, so the stream
1155      * segment's time would be set to that. The same goes for the stream
1156      * segment's base, which is supposed to be the running time of the period
1157      * start according to the demuxer's segment.
1158      *
1159      * The same logic applies for negative rates with the segment stop and
1160      * the period stop time (which gets clamped).
1161      *
1162      *
1163      * For the first case where not the complete period is inside the segment,
1164      * the segment time and base as calculated by the second case would be
1165      * equivalent.
1166      */
1167     GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1168         &demux->segment);
1169     GST_DEBUG_OBJECT (demux,
1170         "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1171         GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1172     /* note for readers:
1173      * Since stream->segment is initially a copy of demux->segment,
1174      * only the values that need updating are modified below. */
1175     if (first_and_live) {
1176       /* If first and live, demuxer did seek to the current position already */
1177       stream->segment.start = demux->segment.start - period_start + offset;
1178       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1179         stream->segment.stop = demux->segment.stop - period_start + offset;
1180       /* FIXME : Do we need to handle negative rates for this ? */
1181       stream->segment.position = stream->segment.start;
1182     } else if (demux->segment.start > period_start) {
1183       /* seek within a period */
1184       stream->segment.start = demux->segment.start - period_start + offset;
1185       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1186         stream->segment.stop = demux->segment.stop - period_start + offset;
1187       if (stream->segment.rate >= 0)
1188         stream->segment.position = offset;
1189       else
1190         stream->segment.position = stream->segment.stop;
1191     } else {
1192       stream->segment.start = offset;
1193       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1194         stream->segment.stop = demux->segment.stop - period_start + offset;
1195       if (stream->segment.rate >= 0)
1196         stream->segment.position = offset;
1197       else
1198         stream->segment.position = stream->segment.stop;
1199       stream->segment.time =
1200           gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1201           period_start);
1202       stream->segment.base =
1203           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1204           period_start);
1205     }
1206 
1207     stream->pending_segment = gst_event_new_segment (&stream->segment);
1208     gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1209     stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1210 
1211     GST_DEBUG_OBJECT (demux,
1212         "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1213         &stream->segment, stream);
1214   }
1215 
1216   return TRUE;
1217 }
1218 
1219 static gboolean
gst_adaptive_demux_expose_streams(GstAdaptiveDemux * demux)1220 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1221 {
1222   GList *iter;
1223   GList *old_streams;
1224 
1225   g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1226 
1227   old_streams = demux->streams;
1228   demux->streams = demux->prepared_streams;
1229   demux->prepared_streams = NULL;
1230 
1231   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1232     GstAdaptiveDemuxStream *stream = iter->data;
1233 
1234     if (!gst_adaptive_demux_expose_stream (demux,
1235             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1236       /* TODO act on error */
1237     }
1238   }
1239   demux->priv->preroll_pending = 0;
1240 
1241   GST_MANIFEST_UNLOCK (demux);
1242   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1243   GST_MANIFEST_LOCK (demux);
1244 
1245   if (old_streams) {
1246     GstEvent *eos = gst_event_new_eos ();
1247 
1248     /* before we put streams in the demux->priv->old_streams list,
1249      * we ask the download task to stop. In this way, it will no longer be
1250      * allowed to change the demux object.
1251      */
1252     for (iter = old_streams; iter; iter = g_list_next (iter)) {
1253       GstAdaptiveDemuxStream *stream = iter->data;
1254       GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1255 
1256       GST_MANIFEST_UNLOCK (demux);
1257 
1258       GST_DEBUG_OBJECT (pad, "Pushing EOS");
1259       gst_pad_push_event (pad, gst_event_ref (eos));
1260       gst_pad_set_active (pad, FALSE);
1261 
1262       GST_LOG_OBJECT (pad, "Removing stream");
1263       gst_element_remove_pad (GST_ELEMENT (demux), pad);
1264       GST_MANIFEST_LOCK (demux);
1265 
1266       gst_object_unref (GST_OBJECT (pad));
1267 
1268       /* ask the download task to stop.
1269        * We will not join it now, because our thread can be one of these tasks.
1270        * We will do the joining later, from another stream download task or
1271        * from gst_adaptive_demux_stop_tasks.
1272        * We also cannot change the state of the stream->src element, because
1273        * that will wait on the streaming thread (which could be this thread)
1274        * to stop first.
1275        * Because we sent an EOS to the downstream element, the stream->src
1276        * element should detect this in its streaming task and stop.
1277        * Even if it doesn't do that, we will change its state later in
1278        * gst_adaptive_demux_stop_tasks.
1279        */
1280       GST_LOG_OBJECT (stream, "Marking stream as cancelled");
1281       gst_task_stop (stream->download_task);
1282       g_mutex_lock (&stream->fragment_download_lock);
1283       stream->cancelled = TRUE;
1284       stream->replaced = TRUE;
1285       g_cond_signal (&stream->fragment_download_cond);
1286       g_mutex_unlock (&stream->fragment_download_lock);
1287     }
1288     gst_event_unref (eos);
1289 
1290     /* The list should be freed from another thread as we can't properly
1291      * cleanup a GstTask from itself */
1292     demux->priv->old_streams =
1293         g_list_concat (demux->priv->old_streams, old_streams);
1294   }
1295 
1296   /* Unblock after removing oldstreams */
1297   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1298     GstAdaptiveDemuxStream *stream = iter->data;
1299     stream->do_block = FALSE;
1300   }
1301 
1302   GST_DEBUG_OBJECT (demux, "All streams are exposed");
1303 
1304   return TRUE;
1305 }
1306 
1307 /* must be called with manifest_lock taken */
1308 GstAdaptiveDemuxStream *
gst_adaptive_demux_stream_new(GstAdaptiveDemux * demux,GstPad * pad)1309 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1310 {
1311   GstAdaptiveDemuxStream *stream;
1312 
1313   stream = g_malloc0 (demux->stream_struct_size);
1314 
1315   /* Downloading task */
1316   g_rec_mutex_init (&stream->download_lock);
1317   stream->download_task =
1318       gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1319       stream, NULL);
1320   gst_task_set_lock (stream->download_task, &stream->download_lock);
1321 
1322   stream->pad = pad;
1323   stream->demux = demux;
1324   stream->fragment_bitrates =
1325       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1326   gst_pad_set_element_private (pad, stream);
1327   stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1328 
1329   g_mutex_lock (&demux->priv->preroll_lock);
1330   stream->do_block = TRUE;
1331   demux->priv->preroll_pending++;
1332   g_mutex_unlock (&demux->priv->preroll_lock);
1333 
1334   gst_pad_set_query_function (pad,
1335       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1336   gst_pad_set_event_function (pad,
1337       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1338 
1339   gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1340   g_cond_init (&stream->fragment_download_cond);
1341   g_mutex_init (&stream->fragment_download_lock);
1342 
1343   demux->next_streams = g_list_append (demux->next_streams, stream);
1344 
1345   return stream;
1346 }
1347 
1348 GstAdaptiveDemuxStream *
gst_adaptive_demux_find_stream_for_pad(GstAdaptiveDemux * demux,GstPad * pad)1349 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1350 {
1351   GList *iter;
1352 
1353   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1354     GstAdaptiveDemuxStream *stream = iter->data;
1355     if (stream->pad == pad) {
1356       return stream;
1357     }
1358   }
1359 
1360   return NULL;
1361 }
1362 
1363 /* must be called with manifest_lock taken.
1364  * It will temporarily drop the manifest_lock in order to join the task.
1365  * It will join only the old_streams (the demux->streams are joined by
1366  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1367  * called)
1368  */
1369 static void
gst_adaptive_demux_stream_free(GstAdaptiveDemuxStream * stream)1370 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1371 {
1372   GstAdaptiveDemux *demux = stream->demux;
1373   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1374 
1375   if (klass->stream_free)
1376     klass->stream_free (stream);
1377 
1378   g_clear_error (&stream->last_error);
1379   if (stream->download_task) {
1380     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1381       GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1382           GST_DEBUG_PAD_NAME (stream->pad));
1383 
1384       gst_task_stop (stream->download_task);
1385 
1386       g_mutex_lock (&stream->fragment_download_lock);
1387       stream->cancelled = TRUE;
1388       g_cond_signal (&stream->fragment_download_cond);
1389       g_mutex_unlock (&stream->fragment_download_lock);
1390     }
1391     GST_LOG_OBJECT (demux, "Waiting for task to finish");
1392 
1393     /* temporarily drop the manifest lock to join the task */
1394     GST_MANIFEST_UNLOCK (demux);
1395 
1396     gst_task_join (stream->download_task);
1397 
1398     GST_MANIFEST_LOCK (demux);
1399 
1400     GST_LOG_OBJECT (demux, "Finished");
1401     gst_object_unref (stream->download_task);
1402     g_rec_mutex_clear (&stream->download_lock);
1403     stream->download_task = NULL;
1404   }
1405 
1406   gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1407 
1408   if (stream->pending_segment) {
1409     gst_event_unref (stream->pending_segment);
1410     stream->pending_segment = NULL;
1411   }
1412 
1413   if (stream->pending_events) {
1414     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1415     stream->pending_events = NULL;
1416   }
1417 
1418   if (stream->internal_pad) {
1419     gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1420   }
1421 
1422   if (stream->src_srcpad) {
1423     gst_object_unref (stream->src_srcpad);
1424     stream->src_srcpad = NULL;
1425   }
1426 
1427   if (stream->src) {
1428     GstElement *src = stream->src;
1429 
1430     stream->src = NULL;
1431 
1432     GST_MANIFEST_UNLOCK (demux);
1433     gst_element_set_locked_state (src, TRUE);
1434     gst_element_set_state (src, GST_STATE_NULL);
1435     gst_bin_remove (GST_BIN_CAST (demux), src);
1436     GST_MANIFEST_LOCK (demux);
1437   }
1438 
1439   g_cond_clear (&stream->fragment_download_cond);
1440   g_mutex_clear (&stream->fragment_download_lock);
1441   g_free (stream->fragment_bitrates);
1442 
1443   if (stream->pad) {
1444     gst_object_unref (stream->pad);
1445     stream->pad = NULL;
1446   }
1447   if (stream->pending_caps)
1448     gst_caps_unref (stream->pending_caps);
1449 
1450   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1451 
1452   g_free (stream);
1453 }
1454 
1455 /* must be called with manifest_lock taken */
1456 static gboolean
gst_adaptive_demux_get_live_seek_range(GstAdaptiveDemux * demux,gint64 * range_start,gint64 * range_stop)1457 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1458     gint64 * range_start, gint64 * range_stop)
1459 {
1460   GstAdaptiveDemuxClass *klass;
1461 
1462   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1463 
1464   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1465 
1466   return klass->get_live_seek_range (demux, range_start, range_stop);
1467 }
1468 
1469 /* must be called with manifest_lock taken */
1470 static gboolean
gst_adaptive_demux_stream_in_live_seek_range(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1471 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1472     GstAdaptiveDemuxStream * stream)
1473 {
1474   gint64 range_start, range_stop;
1475   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1476     GST_LOG_OBJECT (stream->pad,
1477         "stream position %" GST_TIME_FORMAT "  live seek range %"
1478         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1479         GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1480         GST_STIME_ARGS (range_stop));
1481     return (stream->segment.position >= range_start
1482         && stream->segment.position <= range_stop);
1483   }
1484 
1485   return FALSE;
1486 }
1487 
1488 /* must be called with manifest_lock taken */
1489 static gboolean
gst_adaptive_demux_can_seek(GstAdaptiveDemux * demux)1490 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1491 {
1492   GstAdaptiveDemuxClass *klass;
1493 
1494   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1495   if (gst_adaptive_demux_is_live (demux)) {
1496     return klass->get_live_seek_range != NULL;
1497   }
1498 
1499   return klass->seek != NULL;
1500 }
1501 
1502 static void
gst_adaptive_demux_update_streams_segment(GstAdaptiveDemux * demux,GList * streams,gint64 period_start,GstSeekType start_type,GstSeekType stop_type)1503 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1504     GList * streams, gint64 period_start, GstSeekType start_type,
1505     GstSeekType stop_type)
1506 {
1507   GList *iter;
1508   for (iter = streams; iter; iter = g_list_next (iter)) {
1509     GstAdaptiveDemuxStream *stream = iter->data;
1510     GstEvent *seg_evt;
1511     GstClockTime offset;
1512 
1513     /* See comments in gst_adaptive_demux_get_period_start_time() for
1514      * an explanation of the segment modifications */
1515     stream->segment = demux->segment;
1516     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1517     stream->segment.start += offset - period_start;
1518     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1519       stream->segment.stop += offset - period_start;
1520     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1521       stream->segment.position = stream->segment.start;
1522     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1523       stream->segment.position = stream->segment.stop;
1524     seg_evt = gst_event_new_segment (&stream->segment);
1525     gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1526     gst_event_replace (&stream->pending_segment, seg_evt);
1527     GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1528         stream->pending_segment);
1529     gst_event_unref (seg_evt);
1530     /* Make sure the first buffer after a seek has the discont flag */
1531     stream->discont = TRUE;
1532     stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1533   }
1534 }
1535 
1536 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |	  \
1537                               GST_SEEK_FLAG_SNAP_AFTER |	  \
1538                               GST_SEEK_FLAG_SNAP_NEAREST |	  \
1539 			      GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1540 			      GST_SEEK_FLAG_KEY_UNIT))
1541 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1542                               GST_SEEK_FLAG_SNAP_AFTER | \
1543                               GST_SEEK_FLAG_SNAP_NEAREST))
1544 
1545 static gboolean
gst_adaptive_demux_handle_seek_event(GstAdaptiveDemux * demux,GstPad * pad,GstEvent * event)1546 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1547     GstEvent * event)
1548 {
1549   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1550   gdouble rate;
1551   GstFormat format;
1552   GstSeekFlags flags;
1553   GstSeekType start_type, stop_type;
1554   gint64 start, stop;
1555   guint32 seqnum;
1556   gboolean update;
1557   gboolean ret;
1558   GstSegment oldsegment;
1559   GstAdaptiveDemuxStream *stream = NULL;
1560 
1561   GST_INFO_OBJECT (demux, "Received seek event");
1562 
1563   GST_API_LOCK (demux);
1564   GST_MANIFEST_LOCK (demux);
1565 
1566   if (!gst_adaptive_demux_can_seek (demux)) {
1567     GST_MANIFEST_UNLOCK (demux);
1568     GST_API_UNLOCK (demux);
1569     gst_event_unref (event);
1570     return FALSE;
1571   }
1572 
1573   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1574       &stop_type, &stop);
1575 
1576   if (format != GST_FORMAT_TIME) {
1577     GST_MANIFEST_UNLOCK (demux);
1578     GST_API_UNLOCK (demux);
1579     GST_WARNING_OBJECT (demux,
1580         "Adaptive demuxers only support TIME-based seeking");
1581     gst_event_unref (event);
1582     return FALSE;
1583   }
1584 
1585   if (flags & GST_SEEK_FLAG_SEGMENT) {
1586     GST_FIXME_OBJECT (demux, "Handle segment seeks");
1587     GST_MANIFEST_UNLOCK (demux);
1588     GST_API_UNLOCK (demux);
1589     gst_event_unref (event);
1590     return FALSE;
1591   }
1592 
1593   seqnum = gst_event_get_seqnum (event);
1594 
1595   if (gst_adaptive_demux_is_live (demux)) {
1596     gint64 range_start, range_stop;
1597     gboolean changed = FALSE;
1598     gboolean start_valid = TRUE, stop_valid = TRUE;
1599 
1600     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1601             &range_stop)) {
1602       GST_MANIFEST_UNLOCK (demux);
1603       GST_API_UNLOCK (demux);
1604       gst_event_unref (event);
1605       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1606       return FALSE;
1607     }
1608 
1609     GST_DEBUG_OBJECT (demux,
1610         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1611         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1612 
1613     /* Handle relative positioning for live streams (relative to the range_stop) */
1614     if (start_type == GST_SEEK_TYPE_END) {
1615       start = range_stop + start;
1616       start_type = GST_SEEK_TYPE_SET;
1617       changed = TRUE;
1618     }
1619     if (stop_type == GST_SEEK_TYPE_END) {
1620       stop = range_stop + stop;
1621       stop_type = GST_SEEK_TYPE_SET;
1622       changed = TRUE;
1623     }
1624 
1625     /* Adjust the requested start/stop position if it falls beyond the live
1626      * seek range.
1627      * The only case where we don't adjust is for the starting point of
1628      * an accurate seek (start if forward and stop if backwards)
1629      */
1630     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1631         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1632       GST_DEBUG_OBJECT (demux,
1633           "seek before live stream start, setting to range start: %"
1634           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1635       start = range_start;
1636       changed = TRUE;
1637     }
1638     /* truncate stop position also if set */
1639     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1640         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1641       GST_DEBUG_OBJECT (demux,
1642           "seek ending after live start, adjusting to: %"
1643           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1644       stop = range_stop;
1645       changed = TRUE;
1646     }
1647 
1648     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1649         (start < range_start || start > range_stop)) {
1650       GST_WARNING_OBJECT (demux,
1651           "Seek to invalid position start:%" GST_STIME_FORMAT
1652           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1653           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1654           GST_STIME_ARGS (range_stop));
1655       start_valid = FALSE;
1656     }
1657     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1658         (stop < range_start || stop > range_stop)) {
1659       GST_WARNING_OBJECT (demux,
1660           "Seek to invalid position stop:%" GST_STIME_FORMAT
1661           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1662           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1663           GST_STIME_ARGS (range_stop));
1664       stop_valid = FALSE;
1665     }
1666 
1667     /* If the seek position is still outside of the seekable range, refuse the seek */
1668     if (!start_valid || !stop_valid) {
1669       GST_MANIFEST_UNLOCK (demux);
1670       GST_API_UNLOCK (demux);
1671       gst_event_unref (event);
1672       return FALSE;
1673     }
1674 
1675     /* Re-create seek event with changed/updated values */
1676     if (changed) {
1677       gst_event_unref (event);
1678       event =
1679           gst_event_new_seek (rate, format, flags,
1680           start_type, start, stop_type, stop);
1681       gst_event_set_seqnum (event, seqnum);
1682     }
1683   }
1684 
1685   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1686 
1687   /* have a backup in case seek fails */
1688   gst_segment_copy_into (&demux->segment, &oldsegment);
1689 
1690   if (flags & GST_SEEK_FLAG_FLUSH) {
1691     GstEvent *fevent;
1692 
1693     GST_DEBUG_OBJECT (demux, "sending flush start");
1694     fevent = gst_event_new_flush_start ();
1695     gst_event_set_seqnum (fevent, seqnum);
1696     GST_MANIFEST_UNLOCK (demux);
1697     gst_adaptive_demux_push_src_event (demux, fevent);
1698     GST_MANIFEST_LOCK (demux);
1699 
1700     gst_adaptive_demux_stop_tasks (demux, FALSE);
1701   } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1702       (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1703 
1704     gst_adaptive_demux_stop_tasks (demux, FALSE);
1705   }
1706 
1707   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1708 
1709   /*
1710    * Handle snap seeks as follows:
1711    * 1) do the snap seeking on the stream that received
1712    *    the event
1713    * 2) use the final position on this stream to seek
1714    *    on the other streams to the same position
1715    *
1716    * We can't snap at all streams at the same time as
1717    * they might end in different positions, so just
1718    * use the one that received the event as the 'leading'
1719    * one to do the snap seek.
1720    */
1721   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1722           gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1723     GstClockTime ts;
1724     GstSeekFlags stream_seek_flags = flags;
1725 
1726     /* snap-seek on the stream that received the event and then
1727      * use the resulting position to seek on all streams */
1728 
1729     if (rate >= 0) {
1730       if (start_type != GST_SEEK_TYPE_NONE)
1731         ts = start;
1732       else {
1733         ts = stream->segment.position;
1734         start_type = GST_SEEK_TYPE_SET;
1735       }
1736     } else {
1737       if (stop_type != GST_SEEK_TYPE_NONE)
1738         ts = stop;
1739       else {
1740         stop_type = GST_SEEK_TYPE_SET;
1741         ts = stream->segment.position;
1742       }
1743     }
1744 
1745     if (stream) {
1746       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1747     }
1748 
1749     /* replace event with a new one without snaping to seek on all streams */
1750     gst_event_unref (event);
1751     if (rate >= 0) {
1752       start = ts;
1753     } else {
1754       stop = ts;
1755     }
1756     event =
1757         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1758         start_type, start, stop_type, stop);
1759     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1760   }
1761   stream = NULL;
1762 
1763   gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1764       start, stop_type, stop, &update);
1765 
1766   /* FIXME - this seems unatural, do_seek() is updating base when we
1767    * only want the start/stop position to change, maybe do_seek() needs
1768    * some fixing? */
1769   if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1770               && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1771               && stop_type == GST_SEEK_TYPE_NONE))) {
1772     demux->segment.base = oldsegment.base;
1773   }
1774 
1775   GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1776 
1777   ret = demux_class->seek (demux, event);
1778 
1779   if (!ret) {
1780     /* Is there anything else we can do if it fails? */
1781     gst_segment_copy_into (&oldsegment, &demux->segment);
1782   } else {
1783     demux->priv->segment_seqnum = seqnum;
1784   }
1785   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1786 
1787   if (flags & GST_SEEK_FLAG_FLUSH) {
1788     GstEvent *fevent;
1789 
1790     GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1791     fevent = gst_event_new_flush_stop (TRUE);
1792     gst_event_set_seqnum (fevent, seqnum);
1793     gst_adaptive_demux_push_src_event (demux, fevent);
1794   }
1795 
1796   if (demux->next_streams) {
1797     /* If the seek generated new streams, get them
1798      * to preroll */
1799     gst_adaptive_demux_prepare_streams (demux, FALSE);
1800     gst_adaptive_demux_start_tasks (demux, TRUE);
1801   } else {
1802     GstClockTime period_start =
1803         gst_adaptive_demux_get_period_start_time (demux);
1804 
1805     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1806     gst_adaptive_demux_update_streams_segment (demux, demux->streams,
1807         period_start, start_type, stop_type);
1808     gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
1809         period_start, start_type, stop_type);
1810     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1811 
1812     /* Restart the demux */
1813     gst_adaptive_demux_start_tasks (demux, FALSE);
1814   }
1815 
1816   GST_MANIFEST_UNLOCK (demux);
1817   GST_API_UNLOCK (demux);
1818   gst_event_unref (event);
1819 
1820   return ret;
1821 }
1822 
1823 static gboolean
gst_adaptive_demux_src_event(GstPad * pad,GstObject * parent,GstEvent * event)1824 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1825     GstEvent * event)
1826 {
1827   GstAdaptiveDemux *demux;
1828 
1829   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1830 
1831   /* FIXME handle events received on pads that are to be removed */
1832 
1833   switch (event->type) {
1834     case GST_EVENT_SEEK:
1835     {
1836       guint32 seqnum = gst_event_get_seqnum (event);
1837       if (seqnum == demux->priv->segment_seqnum) {
1838         GST_LOG_OBJECT (pad,
1839             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
1840         gst_event_unref (event);
1841         return TRUE;
1842       }
1843       return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1844     }
1845     case GST_EVENT_RECONFIGURE:{
1846       GstAdaptiveDemuxStream *stream;
1847 
1848       GST_MANIFEST_LOCK (demux);
1849       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1850 
1851       if (stream) {
1852         if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
1853             stream->last_ret == GST_FLOW_NOT_LINKED) {
1854           stream->last_ret = GST_FLOW_OK;
1855           stream->restart_download = TRUE;
1856           stream->need_header = TRUE;
1857           stream->discont = TRUE;
1858           GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1859           gst_task_start (stream->download_task);
1860         }
1861         gst_event_unref (event);
1862         GST_MANIFEST_UNLOCK (demux);
1863         return TRUE;
1864       }
1865       GST_MANIFEST_UNLOCK (demux);
1866     }
1867       break;
1868     case GST_EVENT_LATENCY:{
1869       /* Upstream and our internal source are irrelevant
1870        * for latency, and we should not fail here to
1871        * configure the latency */
1872       gst_event_unref (event);
1873       return TRUE;
1874     }
1875     case GST_EVENT_QOS:{
1876       GstAdaptiveDemuxStream *stream;
1877 
1878       GST_MANIFEST_LOCK (demux);
1879       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1880 
1881       if (stream) {
1882         GstClockTimeDiff diff;
1883         GstClockTime timestamp;
1884 
1885         gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
1886         /* Only take into account lateness if late */
1887         if (diff > 0)
1888           stream->qos_earliest_time = timestamp + 2 * diff;
1889         else
1890           stream->qos_earliest_time = timestamp;
1891         GST_DEBUG_OBJECT (stream->pad, "qos_earliest_time %" GST_TIME_FORMAT,
1892             GST_TIME_ARGS (stream->qos_earliest_time));
1893       }
1894       GST_MANIFEST_UNLOCK (demux);
1895       break;
1896     }
1897     default:
1898       break;
1899   }
1900 
1901   return gst_pad_event_default (pad, parent, event);
1902 }
1903 
1904 static gboolean
gst_adaptive_demux_src_query(GstPad * pad,GstObject * parent,GstQuery * query)1905 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1906     GstQuery * query)
1907 {
1908   GstAdaptiveDemux *demux;
1909   GstAdaptiveDemuxClass *demux_class;
1910   gboolean ret = FALSE;
1911 
1912   if (query == NULL)
1913     return FALSE;
1914 
1915   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1916   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1917 
1918   switch (query->type) {
1919     case GST_QUERY_DURATION:{
1920       GstClockTime duration = -1;
1921       GstFormat fmt;
1922 
1923       gst_query_parse_duration (query, &fmt, NULL);
1924 
1925       if (fmt == GST_FORMAT_TIME
1926           && g_atomic_int_get (&demux->priv->have_manifest)) {
1927         duration = demux_class->get_duration (demux);
1928 
1929         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1930           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1931           ret = TRUE;
1932         }
1933       }
1934 
1935       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1936           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1937       break;
1938     }
1939     case GST_QUERY_LATENCY:{
1940       gst_query_set_latency (query, FALSE, 0, -1);
1941       ret = TRUE;
1942       break;
1943     }
1944     case GST_QUERY_SEEKING:{
1945       GstFormat fmt;
1946       gint64 stop = -1;
1947       gint64 start = 0;
1948 
1949       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
1950         GST_INFO_OBJECT (demux,
1951             "Don't have manifest yet, can't answer seeking query");
1952         return FALSE;           /* can't answer without manifest */
1953       }
1954 
1955       GST_MANIFEST_LOCK (demux);
1956 
1957       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1958       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
1959       if (fmt == GST_FORMAT_TIME) {
1960         GstClockTime duration;
1961         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
1962 
1963         ret = TRUE;
1964         if (can_seek) {
1965           if (gst_adaptive_demux_is_live (demux)) {
1966             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
1967             if (!ret) {
1968               GST_MANIFEST_UNLOCK (demux);
1969               GST_INFO_OBJECT (demux, "can't answer seeking query");
1970               return FALSE;
1971             }
1972           } else {
1973             duration = demux_class->get_duration (demux);
1974             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
1975               stop = duration;
1976           }
1977         }
1978         gst_query_set_seeking (query, fmt, can_seek, start, stop);
1979         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
1980             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
1981             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
1982       }
1983       GST_MANIFEST_UNLOCK (demux);
1984       break;
1985     }
1986     case GST_QUERY_URI:
1987 
1988       GST_MANIFEST_LOCK (demux);
1989 
1990       /* TODO HLS can answer this differently it seems */
1991       if (demux->manifest_uri) {
1992         /* FIXME: (hls) Do we answer with the variant playlist, with the current
1993          * playlist or the the uri of the last downlowaded fragment? */
1994         gst_query_set_uri (query, demux->manifest_uri);
1995         ret = TRUE;
1996       }
1997 
1998       GST_MANIFEST_UNLOCK (demux);
1999       break;
2000     default:
2001       /* Don't forward queries upstream because of the special nature of this
2002        *  "demuxer", which relies on the upstream element only to be fed
2003        *  the Manifest
2004        */
2005       break;
2006   }
2007 
2008   return ret;
2009 }
2010 
2011 /* must be called with manifest_lock taken */
2012 static void
gst_adaptive_demux_start_tasks(GstAdaptiveDemux * demux,gboolean start_preroll_streams)2013 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2014     gboolean start_preroll_streams)
2015 {
2016   GList *iter;
2017 
2018   if (!gst_adaptive_demux_is_running (demux)) {
2019     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2020     return;
2021   }
2022 
2023   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2024 
2025   iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2026 
2027   for (; iter; iter = g_list_next (iter)) {
2028     GstAdaptiveDemuxStream *stream = iter->data;
2029 
2030     if (!start_preroll_streams) {
2031       g_mutex_lock (&stream->fragment_download_lock);
2032       stream->cancelled = FALSE;
2033       stream->replaced = FALSE;
2034       g_mutex_unlock (&stream->fragment_download_lock);
2035     }
2036 
2037     stream->last_ret = GST_FLOW_OK;
2038     gst_task_start (stream->download_task);
2039   }
2040 }
2041 
2042 /* must be called with manifest_lock taken */
2043 static void
gst_adaptive_demux_stop_manifest_update_task(GstAdaptiveDemux * demux)2044 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2045 {
2046   gst_uri_downloader_cancel (demux->downloader);
2047 
2048   gst_task_stop (demux->priv->updates_task);
2049 
2050   g_mutex_lock (&demux->priv->updates_timed_lock);
2051   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2052   demux->priv->stop_updates_task = TRUE;
2053   g_cond_signal (&demux->priv->updates_timed_cond);
2054   g_mutex_unlock (&demux->priv->updates_timed_lock);
2055 }
2056 
2057 /* must be called with manifest_lock taken */
2058 static void
gst_adaptive_demux_start_manifest_update_task(GstAdaptiveDemux * demux)2059 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2060 {
2061   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2062 
2063   if (gst_adaptive_demux_is_live (demux)) {
2064     gst_uri_downloader_reset (demux->downloader);
2065     g_mutex_lock (&demux->priv->updates_timed_lock);
2066     demux->priv->stop_updates_task = FALSE;
2067     g_mutex_unlock (&demux->priv->updates_timed_lock);
2068     /* Task to periodically update the manifest */
2069     if (demux_class->requires_periodical_playlist_update (demux)) {
2070       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2071       gst_task_start (demux->priv->updates_task);
2072     }
2073   }
2074 }
2075 
2076 /* must be called with manifest_lock taken
2077  * This function will temporarily release manifest_lock in order to join the
2078  * download threads.
2079  * The api_lock will still protect it against other threads trying to modify
2080  * the demux element.
2081  */
2082 static void
gst_adaptive_demux_stop_tasks(GstAdaptiveDemux * demux,gboolean stop_updates)2083 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2084 {
2085   int i;
2086   GList *iter;
2087   GList *list_to_process;
2088 
2089   GST_LOG_OBJECT (demux, "Stopping tasks");
2090 
2091   if (stop_updates)
2092     gst_adaptive_demux_stop_manifest_update_task (demux);
2093 
2094   list_to_process = demux->streams;
2095   for (i = 0; i < 2; ++i) {
2096     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2097       GstAdaptiveDemuxStream *stream = iter->data;
2098 
2099       g_mutex_lock (&stream->fragment_download_lock);
2100       stream->cancelled = TRUE;
2101       gst_task_stop (stream->download_task);
2102       g_cond_signal (&stream->fragment_download_cond);
2103       g_mutex_unlock (&stream->fragment_download_lock);
2104     }
2105     list_to_process = demux->prepared_streams;
2106   }
2107 
2108   GST_MANIFEST_UNLOCK (demux);
2109   g_mutex_lock (&demux->priv->preroll_lock);
2110   g_cond_broadcast (&demux->priv->preroll_cond);
2111   g_mutex_unlock (&demux->priv->preroll_lock);
2112   GST_MANIFEST_LOCK (demux);
2113 
2114   g_mutex_lock (&demux->priv->manifest_update_lock);
2115   g_cond_broadcast (&demux->priv->manifest_cond);
2116   g_mutex_unlock (&demux->priv->manifest_update_lock);
2117 
2118   /* need to release manifest_lock before stopping the src element.
2119    * The streams were asked to cancel, so they will not make any writes to demux
2120    * object. Even if we temporarily release manifest_lock, the demux->streams
2121    * cannot change and iter cannot be invalidated.
2122    */
2123   list_to_process = demux->streams;
2124   for (i = 0; i < 2; ++i) {
2125     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2126       GstAdaptiveDemuxStream *stream = iter->data;
2127       GstElement *src = stream->src;
2128 
2129       GST_MANIFEST_UNLOCK (demux);
2130 
2131       if (src) {
2132         gst_element_set_locked_state (src, TRUE);
2133         gst_element_set_state (src, GST_STATE_READY);
2134       }
2135 
2136       /* stream->download_task value never changes, so it is safe to read it
2137        * outside critical section
2138        */
2139       gst_task_join (stream->download_task);
2140 
2141       GST_MANIFEST_LOCK (demux);
2142     }
2143     list_to_process = demux->prepared_streams;
2144   }
2145 
2146   GST_MANIFEST_UNLOCK (demux);
2147   if (stop_updates)
2148     gst_task_join (demux->priv->updates_task);
2149 
2150   GST_MANIFEST_LOCK (demux);
2151 
2152   list_to_process = demux->streams;
2153   for (i = 0; i < 2; ++i) {
2154     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2155       GstAdaptiveDemuxStream *stream = iter->data;
2156 
2157       stream->download_error_count = 0;
2158       stream->need_header = TRUE;
2159       stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
2160     }
2161     list_to_process = demux->prepared_streams;
2162   }
2163 }
2164 
2165 /* must be called with manifest_lock taken */
2166 static gboolean
gst_adaptive_demux_push_src_event(GstAdaptiveDemux * demux,GstEvent * event)2167 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2168 {
2169   GList *iter;
2170   gboolean ret = TRUE;
2171 
2172   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2173     GstAdaptiveDemuxStream *stream = iter->data;
2174     gst_event_ref (event);
2175     ret = ret & gst_pad_push_event (stream->pad, event);
2176   }
2177   gst_event_unref (event);
2178   return ret;
2179 }
2180 
2181 /* must be called with manifest_lock taken */
2182 void
gst_adaptive_demux_stream_set_caps(GstAdaptiveDemuxStream * stream,GstCaps * caps)2183 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2184     GstCaps * caps)
2185 {
2186   GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2187       caps);
2188   gst_caps_replace (&stream->pending_caps, caps);
2189   gst_caps_unref (caps);
2190 }
2191 
2192 /* must be called with manifest_lock taken */
2193 void
gst_adaptive_demux_stream_set_tags(GstAdaptiveDemuxStream * stream,GstTagList * tags)2194 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2195     GstTagList * tags)
2196 {
2197   GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2198       tags);
2199   if (stream->pending_tags) {
2200     gst_tag_list_unref (stream->pending_tags);
2201   }
2202   stream->pending_tags = tags;
2203 }
2204 
2205 /* must be called with manifest_lock taken */
2206 void
gst_adaptive_demux_stream_queue_event(GstAdaptiveDemuxStream * stream,GstEvent * event)2207 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2208     GstEvent * event)
2209 {
2210   stream->pending_events = g_list_append (stream->pending_events, event);
2211 }
2212 
2213 /* must be called with manifest_lock taken */
2214 static guint64
_update_average_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 new_bitrate)2215 _update_average_bitrate (GstAdaptiveDemux * demux,
2216     GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2217 {
2218   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2219 
2220   stream->moving_bitrate -= stream->fragment_bitrates[index];
2221   stream->fragment_bitrates[index] = new_bitrate;
2222   stream->moving_bitrate += new_bitrate;
2223 
2224   stream->moving_index += 1;
2225 
2226   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2227     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2228   return stream->moving_bitrate / stream->moving_index;
2229 }
2230 
2231 /* must be called with manifest_lock taken */
2232 static guint64
gst_adaptive_demux_stream_update_current_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2233 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2234     GstAdaptiveDemuxStream * stream)
2235 {
2236   guint64 average_bitrate;
2237   guint64 fragment_bitrate;
2238 
2239   if (demux->connection_speed) {
2240     GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2241         demux->connection_speed / 1000);
2242     stream->current_download_rate = demux->connection_speed;
2243     return demux->connection_speed;
2244   }
2245 
2246   fragment_bitrate = stream->last_bitrate;
2247   GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2248       fragment_bitrate);
2249 
2250   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2251 
2252   GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
2253       fragment_bitrate);
2254   GST_INFO_OBJECT (stream,
2255       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2256       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2257 
2258   /* Conservative approach, make sure we don't upgrade too fast */
2259   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2260 
2261   stream->current_download_rate *= demux->bitrate_limit;
2262   GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2263       G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2264 
2265 #if 0
2266   /* Debugging code, modulate the bitrate every few fragments */
2267   {
2268     static guint ctr = 0;
2269     if (ctr % 3 == 0) {
2270       GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2271       stream->current_download_rate /= 2;
2272     }
2273     ctr++;
2274   }
2275 #endif
2276 
2277   return stream->current_download_rate;
2278 }
2279 
2280 /* must be called with manifest_lock taken */
2281 static GstFlowReturn
gst_adaptive_demux_combine_flows(GstAdaptiveDemux * demux)2282 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2283 {
2284   gboolean all_notlinked = TRUE;
2285   gboolean all_eos = TRUE;
2286   GList *iter;
2287 
2288   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2289     GstAdaptiveDemuxStream *stream = iter->data;
2290 
2291     if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2292       all_notlinked = FALSE;
2293       if (stream->last_ret != GST_FLOW_EOS)
2294         all_eos = FALSE;
2295     }
2296 
2297     if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2298         || stream->last_ret == GST_FLOW_FLUSHING) {
2299       return stream->last_ret;
2300     }
2301   }
2302   if (all_notlinked)
2303     return GST_FLOW_NOT_LINKED;
2304   else if (all_eos)
2305     return GST_FLOW_EOS;
2306   return GST_FLOW_OK;
2307 }
2308 
2309 /* Called with preroll_lock */
2310 static void
gst_adaptive_demux_handle_preroll(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2311 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2312     GstAdaptiveDemuxStream * stream)
2313 {
2314   demux->priv->preroll_pending--;
2315   if (demux->priv->preroll_pending == 0) {
2316     /* That was the last one, time to release all streams
2317      * and expose them */
2318     GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2319     gst_adaptive_demux_expose_streams (demux);
2320     g_cond_broadcast (&demux->priv->preroll_cond);
2321   }
2322 }
2323 
2324 /* must be called with manifest_lock taken.
2325  * Temporarily releases manifest_lock
2326  */
2327 GstFlowReturn
gst_adaptive_demux_stream_push_buffer(GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2328 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2329     GstBuffer * buffer)
2330 {
2331   GstAdaptiveDemux *demux = stream->demux;
2332   GstFlowReturn ret = GST_FLOW_OK;
2333   gboolean discont = FALSE;
2334   /* Pending events */
2335   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2336   GList *pending_events = NULL;
2337 
2338   /* FIXME :
2339    * This is duplicating *exactly* the same thing as what is done at the beginning
2340    * of _src_chain if starting_fragment is TRUE */
2341   if (stream->first_fragment_buffer) {
2342     GstClockTime offset =
2343         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2344     GstClockTime period_start =
2345         gst_adaptive_demux_get_period_start_time (demux);
2346 
2347     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2348     if (demux->segment.rate < 0)
2349       /* Set DISCONT flag for every first buffer in reverse playback mode
2350        * as each fragment for its own has to be reversed */
2351       discont = TRUE;
2352 
2353     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2354     if (GST_BUFFER_PTS_IS_VALID (buffer))
2355       GST_BUFFER_PTS (buffer) += offset;
2356 
2357     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2358       stream->segment.position = GST_BUFFER_PTS (buffer);
2359 
2360       /* Convert from position inside the stream's segment to the demuxer's
2361        * segment, they are not necessarily the same */
2362       if (stream->segment.position - offset + period_start >
2363           demux->segment.position)
2364         demux->segment.position =
2365             stream->segment.position - offset + period_start;
2366     }
2367     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2368 
2369     GST_LOG_OBJECT (stream->pad,
2370         "Going to push buffer with PTS %" GST_TIME_FORMAT,
2371         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2372   } else {
2373     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2374   }
2375 
2376   if (stream->discont) {
2377     discont = TRUE;
2378     stream->discont = FALSE;
2379   }
2380 
2381   if (discont) {
2382     GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2383     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2384   } else {
2385     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2386   }
2387 
2388   stream->first_fragment_buffer = FALSE;
2389 
2390   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2391   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2392   if (G_UNLIKELY (stream->pending_caps)) {
2393     pending_caps = gst_event_new_caps (stream->pending_caps);
2394     gst_caps_unref (stream->pending_caps);
2395     stream->pending_caps = NULL;
2396   }
2397 
2398   if (stream->do_block) {
2399 
2400     g_mutex_lock (&demux->priv->preroll_lock);
2401 
2402     /* If we are preroll state, set caps in here */
2403     if (pending_caps) {
2404       gst_pad_push_event (stream->pad, pending_caps);
2405       pending_caps = NULL;
2406     }
2407 
2408     gst_adaptive_demux_handle_preroll (demux, stream);
2409     GST_MANIFEST_UNLOCK (demux);
2410 
2411     while (stream->do_block && !stream->cancelled) {
2412       GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2413       g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2414     }
2415     if (stream->cancelled) {
2416       GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2417       gst_buffer_unref (buffer);
2418       g_mutex_unlock (&demux->priv->preroll_lock);
2419       return GST_FLOW_FLUSHING;
2420     }
2421 
2422     g_mutex_unlock (&demux->priv->preroll_lock);
2423     GST_MANIFEST_LOCK (demux);
2424   }
2425 
2426   if (G_UNLIKELY (stream->pending_segment)) {
2427     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2428     pending_segment = stream->pending_segment;
2429     stream->pending_segment = NULL;
2430     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2431   }
2432   if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2433     GstTagList *tags = stream->pending_tags;
2434 
2435     stream->pending_tags = NULL;
2436     stream->bitrate_changed = 0;
2437 
2438     if (stream->fragment.bitrate != 0) {
2439       if (tags)
2440         tags = gst_tag_list_make_writable (tags);
2441       else
2442         tags = gst_tag_list_new_empty ();
2443 
2444       gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2445           GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2446     }
2447     if (tags)
2448       pending_tags = gst_event_new_tag (tags);
2449   }
2450   if (G_UNLIKELY (stream->pending_events)) {
2451     pending_events = stream->pending_events;
2452     stream->pending_events = NULL;
2453   }
2454 
2455   GST_MANIFEST_UNLOCK (demux);
2456 
2457   /* Do not push events or buffers holding the manifest lock */
2458   if (G_UNLIKELY (pending_caps)) {
2459     GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2460         pending_caps);
2461     gst_pad_push_event (stream->pad, pending_caps);
2462   }
2463   if (G_UNLIKELY (pending_segment)) {
2464     GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2465         pending_segment);
2466     gst_pad_push_event (stream->pad, pending_segment);
2467   }
2468   if (G_UNLIKELY (pending_tags)) {
2469     GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2470         pending_tags);
2471     gst_pad_push_event (stream->pad, pending_tags);
2472   }
2473   while (pending_events != NULL) {
2474     GstEvent *event = pending_events->data;
2475 
2476     if (!gst_pad_push_event (stream->pad, event))
2477       GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2478 
2479     pending_events = g_list_delete_link (pending_events, pending_events);
2480   }
2481 
2482   /* Wait for preroll if blocking */
2483   GST_DEBUG_OBJECT (stream->pad,
2484       "About to push buffer of size %" G_GSIZE_FORMAT,
2485       gst_buffer_get_size (buffer));
2486 
2487   ret = gst_pad_push (stream->pad, buffer);
2488 
2489   GST_MANIFEST_LOCK (demux);
2490 
2491   g_mutex_lock (&stream->fragment_download_lock);
2492   if (G_UNLIKELY (stream->cancelled)) {
2493     GST_LOG_OBJECT (stream, "Stream was cancelled");
2494     ret = stream->last_ret = GST_FLOW_FLUSHING;
2495     g_mutex_unlock (&stream->fragment_download_lock);
2496     return ret;
2497   }
2498   g_mutex_unlock (&stream->fragment_download_lock);
2499 
2500   GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2501       gst_flow_get_name (ret));
2502 
2503   return ret;
2504 }
2505 
2506 /* must be called with manifest_lock taken */
2507 static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2508 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2509     GstAdaptiveDemuxStream * stream)
2510 {
2511   /* No need to advance, this isn't a real fragment */
2512   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2513     return GST_FLOW_OK;
2514 
2515   return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2516       stream->fragment.duration);
2517 }
2518 
2519 /* must be called with manifest_lock taken.
2520  * Can temporarily release manifest_lock
2521  */
2522 static GstFlowReturn
gst_adaptive_demux_stream_data_received_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2523 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2524     GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2525 {
2526   return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2527 }
2528 
2529 static gboolean
gst_adaptive_demux_requires_periodical_playlist_update_default(GstAdaptiveDemux * demux)2530 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2531     * demux)
2532 {
2533   return TRUE;
2534 }
2535 
2536 static GstFlowReturn
_src_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2537 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2538 {
2539   GstAdaptiveDemuxStream *stream;
2540   GstAdaptiveDemux *demux;
2541   GstAdaptiveDemuxClass *klass;
2542   GstFlowReturn ret = GST_FLOW_OK;
2543 
2544   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2545   stream = gst_pad_get_element_private (pad);
2546   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2547 
2548   GST_MANIFEST_LOCK (demux);
2549 
2550   /* do not make any changes if the stream is cancelled */
2551   g_mutex_lock (&stream->fragment_download_lock);
2552   if (G_UNLIKELY (stream->cancelled)) {
2553     g_mutex_unlock (&stream->fragment_download_lock);
2554     gst_buffer_unref (buffer);
2555     ret = stream->last_ret = GST_FLOW_FLUSHING;
2556     GST_MANIFEST_UNLOCK (demux);
2557     return ret;
2558   }
2559   g_mutex_unlock (&stream->fragment_download_lock);
2560 
2561   /* starting_fragment is set to TRUE at the beginning of
2562    * _stream_download_fragment()
2563    * /!\ If there is a header/index being downloaded, then this will
2564    * be TRUE for the first one ... but FALSE for the remaining ones,
2565    * including the *actual* fragment ! */
2566   if (stream->starting_fragment) {
2567     GstClockTime offset =
2568         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2569     GstClockTime period_start =
2570         gst_adaptive_demux_get_period_start_time (demux);
2571 
2572     stream->starting_fragment = FALSE;
2573     if (klass->start_fragment) {
2574       if (!klass->start_fragment (demux, stream)) {
2575         ret = GST_FLOW_ERROR;
2576         goto error;
2577       }
2578     }
2579 
2580     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2581     if (GST_BUFFER_PTS_IS_VALID (buffer))
2582       GST_BUFFER_PTS (buffer) += offset;
2583 
2584     GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2585         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2586 
2587     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2588       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2589       stream->segment.position = GST_BUFFER_PTS (buffer);
2590 
2591       /* Convert from position inside the stream's segment to the demuxer's
2592        * segment, they are not necessarily the same */
2593       if (stream->segment.position - offset + period_start >
2594           demux->segment.position)
2595         demux->segment.position =
2596             stream->segment.position - offset + period_start;
2597       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2598     }
2599 
2600   } else {
2601     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2602   }
2603 
2604   /* downloading_first_buffer is set to TRUE in download_uri() just before
2605    * activating the source (i.e. requesting a given URI)
2606    *
2607    * The difference with starting_fragment is that this will be called
2608    * for *all* first buffers (of index, and header, and fragment)
2609    *
2610    * ... to then only do something useful (in this block) for actual
2611    * fragments... */
2612   if (stream->downloading_first_buffer) {
2613     gint64 chunk_size = 0;
2614 
2615     stream->downloading_first_buffer = FALSE;
2616 
2617     if (!stream->downloading_header && !stream->downloading_index) {
2618       /* If this is the first buffer of a fragment (not the headers or index)
2619        * and we don't have a birate from the sub-class, then see if we
2620        * can work it out from the fragment size and duration */
2621       if (stream->fragment.bitrate == 0 &&
2622           stream->fragment.duration != 0 &&
2623           gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2624               &chunk_size)) {
2625         guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2626                 8 * GST_SECOND, stream->fragment.duration));
2627         GST_LOG_OBJECT (demux,
2628             "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2629             " = bitrate %u", chunk_size,
2630             GST_TIME_ARGS (stream->fragment.duration), bitrate);
2631         stream->fragment.bitrate = bitrate;
2632       }
2633       if (stream->fragment.bitrate) {
2634         stream->bitrate_changed = TRUE;
2635       } else {
2636         GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2637       }
2638     }
2639   }
2640 
2641   stream->download_total_bytes += gst_buffer_get_size (buffer);
2642 
2643   GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2644       gst_buffer_get_size (buffer));
2645 
2646   ret = klass->data_received (demux, stream, buffer);
2647 
2648   if (ret == GST_FLOW_FLUSHING) {
2649     /* do not make any changes if the stream is cancelled */
2650     g_mutex_lock (&stream->fragment_download_lock);
2651     if (G_UNLIKELY (stream->cancelled)) {
2652       g_mutex_unlock (&stream->fragment_download_lock);
2653       GST_MANIFEST_UNLOCK (demux);
2654       return ret;
2655     }
2656     g_mutex_unlock (&stream->fragment_download_lock);
2657   }
2658 
2659   if (ret != GST_FLOW_OK) {
2660     gboolean finished = FALSE;
2661 
2662     if (ret < GST_FLOW_EOS) {
2663       GST_ELEMENT_FLOW_ERROR (demux, ret);
2664 
2665       /* TODO push this on all pads */
2666       gst_pad_push_event (stream->pad, gst_event_new_eos ());
2667     } else {
2668       GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2669           gst_flow_get_name (ret));
2670     }
2671 
2672     if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2673       ret = GST_FLOW_EOS;       /* return EOS to make the source stop */
2674     } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
2675       /* Behaves like an EOS event from upstream */
2676       stream->fragment.finished = TRUE;
2677       ret = klass->finish_fragment (demux, stream);
2678       if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2679         ret = GST_FLOW_EOS;     /* return EOS to make the source stop */
2680       } else if (ret != GST_FLOW_OK) {
2681         goto error;
2682       }
2683       finished = TRUE;
2684     }
2685 
2686     gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2687     if (finished)
2688       ret = GST_FLOW_EOS;
2689   }
2690 
2691 error:
2692 
2693   GST_MANIFEST_UNLOCK (demux);
2694 
2695   return ret;
2696 }
2697 
2698 /* must be called with manifest_lock taken */
2699 static void
gst_adaptive_demux_stream_fragment_download_finish(GstAdaptiveDemuxStream * stream,GstFlowReturn ret,GError * err)2700 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2701     stream, GstFlowReturn ret, GError * err)
2702 {
2703   GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2704       gst_flow_get_name (ret), err);
2705 
2706   /* if we have an error, only replace last_ret if it was OK before to avoid
2707    * overwriting the first error we got */
2708   if (stream->last_ret == GST_FLOW_OK) {
2709     stream->last_ret = ret;
2710     if (err) {
2711       g_clear_error (&stream->last_error);
2712       stream->last_error = g_error_copy (err);
2713     }
2714   }
2715   g_mutex_lock (&stream->fragment_download_lock);
2716   stream->download_finished = TRUE;
2717   g_cond_signal (&stream->fragment_download_cond);
2718   g_mutex_unlock (&stream->fragment_download_lock);
2719 }
2720 
2721 static GstFlowReturn
gst_adaptive_demux_eos_handling(GstAdaptiveDemuxStream * stream)2722 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
2723 {
2724   GstFlowReturn ret = GST_FLOW_OK;
2725   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
2726 
2727   if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
2728       || !klass->need_another_chunk (stream)
2729       || stream->fragment.chunk_size == 0) {
2730     stream->fragment.finished = TRUE;
2731     ret = klass->finish_fragment (stream->demux, stream);
2732   }
2733   gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2734 
2735   return ret;
2736 }
2737 
2738 static gboolean
_src_event(GstPad * pad,GstObject * parent,GstEvent * event)2739 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2740 {
2741   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2742   GstAdaptiveDemux *demux = stream->demux;
2743 
2744   switch (GST_EVENT_TYPE (event)) {
2745     case GST_EVENT_EOS:{
2746       GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
2747       GST_MANIFEST_LOCK (demux);
2748 
2749       gst_adaptive_demux_eos_handling (stream);
2750 
2751       /* FIXME ?
2752        * _eos_handling() calls  fragment_download_finish() which does the
2753        * same thing as below.
2754        * Could this cause races ? */
2755       g_mutex_lock (&stream->fragment_download_lock);
2756       stream->download_finished = TRUE;
2757       g_cond_signal (&stream->fragment_download_cond);
2758       g_mutex_unlock (&stream->fragment_download_lock);
2759 
2760       GST_MANIFEST_UNLOCK (demux);
2761       break;
2762     }
2763     default:
2764       break;
2765   }
2766 
2767   gst_event_unref (event);
2768 
2769   return TRUE;
2770 }
2771 
2772 static gboolean
_src_query(GstPad * pad,GstObject * parent,GstQuery * query)2773 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2774 {
2775   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2776 
2777   switch (GST_QUERY_TYPE (query)) {
2778     case GST_QUERY_ALLOCATION:
2779       return FALSE;
2780       break;
2781     default:
2782       break;
2783   }
2784 
2785   return gst_pad_peer_query (stream->pad, query);
2786 }
2787 
2788 static GstPadProbeReturn
_uri_handler_probe(GstPad * pad,GstPadProbeInfo * info,GstAdaptiveDemuxStream * stream)2789 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
2790     GstAdaptiveDemuxStream * stream)
2791 {
2792   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2793 
2794   if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
2795     GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2796     if (stream->fragment_bytes_downloaded == 0) {
2797       stream->last_latency =
2798           gst_adaptive_demux_get_monotonic_time (stream->demux) -
2799           (stream->download_start_time * GST_USECOND);
2800       GST_DEBUG_OBJECT (pad,
2801           "FIRST BYTE since download_start %" GST_TIME_FORMAT,
2802           GST_TIME_ARGS (stream->last_latency));
2803     }
2804     stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
2805     GST_LOG_OBJECT (pad,
2806         "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
2807         gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
2808   } else if (GST_PAD_PROBE_INFO_TYPE (info) &
2809       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
2810     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2811     GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
2812         GST_EVENT_TYPE_NAME (ev), ev);
2813     switch (GST_EVENT_TYPE (ev)) {
2814       case GST_EVENT_SEGMENT:
2815         stream->fragment_bytes_downloaded = 0;
2816         break;
2817       case GST_EVENT_EOS:
2818       {
2819         stream->last_download_time =
2820             gst_adaptive_demux_get_monotonic_time (stream->demux) -
2821             (stream->download_start_time * GST_USECOND);
2822         stream->last_bitrate =
2823             gst_util_uint64_scale (stream->fragment_bytes_downloaded,
2824             8 * GST_SECOND, stream->last_download_time);
2825         GST_DEBUG_OBJECT (pad,
2826             "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
2827             G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
2828             stream->last_bitrate);
2829         /* Calculate bitrate since URI request */
2830       }
2831         break;
2832       default:
2833         break;
2834     }
2835   }
2836 
2837   return ret;
2838 }
2839 
2840 /* must be called with manifest_lock taken.
2841  * Can temporarily release manifest_lock
2842  */
2843 static gboolean
gst_adaptive_demux_stream_wait_manifest_update(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2844 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2845     GstAdaptiveDemuxStream * stream)
2846 {
2847   gboolean ret = TRUE;
2848 
2849   /* Wait until we're cancelled or there's something for
2850    * us to download in the playlist or the playlist
2851    * became non-live */
2852   while (TRUE) {
2853     GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2854 
2855     /* get the manifest_update_lock while still holding the manifest_lock.
2856      * This will prevent other threads to signal the condition (they will need
2857      * both manifest_lock and manifest_update_lock in order to signal).
2858      * It cannot deadlock because all threads always get the manifest_lock first
2859      * and manifest_update_lock second.
2860      */
2861     g_mutex_lock (&demux->priv->manifest_update_lock);
2862 
2863     GST_MANIFEST_UNLOCK (demux);
2864 
2865     g_cond_wait (&demux->priv->manifest_cond,
2866         &demux->priv->manifest_update_lock);
2867     g_mutex_unlock (&demux->priv->manifest_update_lock);
2868 
2869     GST_MANIFEST_LOCK (demux);
2870 
2871     /* check for cancelled every time we get the manifest_lock */
2872     g_mutex_lock (&stream->fragment_download_lock);
2873     if (G_UNLIKELY (stream->cancelled)) {
2874       ret = FALSE;
2875       stream->last_ret = GST_FLOW_FLUSHING;
2876       g_mutex_unlock (&stream->fragment_download_lock);
2877       break;
2878     }
2879     g_mutex_unlock (&stream->fragment_download_lock);
2880 
2881     /* Got a new fragment or not live anymore? */
2882     if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
2883         GST_FLOW_OK) {
2884       GST_DEBUG_OBJECT (demux, "new fragment available, "
2885           "not waiting for manifest update");
2886       ret = TRUE;
2887       break;
2888     }
2889 
2890     if (!gst_adaptive_demux_is_live (demux)) {
2891       GST_DEBUG_OBJECT (demux, "Not live anymore, "
2892           "not waiting for manifest update");
2893       ret = FALSE;
2894       break;
2895     }
2896   }
2897   GST_DEBUG_OBJECT (demux, "Retrying now");
2898   return ret;
2899 }
2900 
2901 /* must be called with manifest_lock taken */
2902 static gboolean
gst_adaptive_demux_stream_update_source(GstAdaptiveDemuxStream * stream,const gchar * uri,const gchar * referer,gboolean refresh,gboolean allow_cache)2903 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2904     const gchar * uri, const gchar * referer, gboolean refresh,
2905     gboolean allow_cache)
2906 {
2907   GstAdaptiveDemux *demux = stream->demux;
2908 
2909   if (!gst_uri_is_valid (uri)) {
2910     GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2911     return FALSE;
2912   }
2913 
2914   /* Try to re-use existing source element */
2915   if (stream->src != NULL) {
2916     gchar *old_protocol, *new_protocol;
2917     gchar *old_uri;
2918 
2919     old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2920     old_protocol = gst_uri_get_protocol (old_uri);
2921     new_protocol = gst_uri_get_protocol (uri);
2922 
2923     if (!g_str_equal (old_protocol, new_protocol)) {
2924       GstElement *src = stream->src;
2925 
2926       stream->src = NULL;
2927       gst_object_unref (stream->src_srcpad);
2928       stream->src_srcpad = NULL;
2929       GST_MANIFEST_UNLOCK (demux);
2930       gst_element_set_locked_state (src, TRUE);
2931       gst_element_set_state (src, GST_STATE_NULL);
2932       gst_bin_remove (GST_BIN_CAST (demux), src);
2933       GST_MANIFEST_LOCK (demux);
2934       GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2935     } else {
2936       GError *err = NULL;
2937 
2938       GST_DEBUG_OBJECT (demux, "Re-using old source element");
2939       if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
2940               &err)) {
2941         GstElement *src = stream->src;
2942 
2943         stream->src = NULL;
2944         GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
2945             err ? err->message : "Unknown error");
2946         g_clear_error (&err);
2947         gst_object_unref (stream->src_srcpad);
2948         stream->src_srcpad = NULL;
2949         GST_MANIFEST_UNLOCK (demux);
2950         gst_element_set_locked_state (src, TRUE);
2951         gst_element_set_state (src, GST_STATE_NULL);
2952         gst_bin_remove (GST_BIN_CAST (demux), src);
2953         GST_MANIFEST_LOCK (demux);
2954       }
2955     }
2956     g_free (old_uri);
2957     g_free (old_protocol);
2958     g_free (new_protocol);
2959   }
2960 
2961   if (stream->src == NULL) {
2962     GstPad *uri_handler_src;
2963     GstPad *queue_sink;
2964     GstPad *queue_src;
2965     GstElement *uri_handler;
2966     GstElement *queue;
2967     GstPadLinkReturn pad_link_ret;
2968     GObjectClass *gobject_class;
2969     gchar *internal_name, *bin_name;
2970 
2971     /* Our src consists of a bin containing uri_handler -> queue . The
2972      * purpose of the queue is to allow the uri_handler to download an
2973      * entire fragment without blocking, so we can accurately measure the
2974      * download bitrate. */
2975 
2976     queue = gst_element_factory_make ("queue", NULL);
2977     if (queue == NULL)
2978       return FALSE;
2979 
2980     g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
2981     g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
2982     g_object_set (queue, "max-size-time", (guint64) 0, NULL);
2983 
2984     uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
2985     if (uri_handler == NULL) {
2986       GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
2987           ("Missing plugin to handle URI: '%s'", uri), (NULL));
2988       gst_object_unref (queue);
2989       return FALSE;
2990     }
2991 
2992     gobject_class = G_OBJECT_GET_CLASS (uri_handler);
2993 
2994     if (g_object_class_find_property (gobject_class, "compress"))
2995       g_object_set (uri_handler, "compress", FALSE, NULL);
2996     if (g_object_class_find_property (gobject_class, "keep-alive"))
2997       g_object_set (uri_handler, "keep-alive", TRUE, NULL);
2998     if (g_object_class_find_property (gobject_class, "extra-headers")) {
2999       if (referer || refresh || !allow_cache) {
3000         GstStructure *extra_headers = gst_structure_new_empty ("headers");
3001 
3002         if (referer)
3003           gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3004               NULL);
3005 
3006         if (!allow_cache)
3007           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3008               "no-cache", NULL);
3009         else if (refresh)
3010           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3011               "max-age=0", NULL);
3012 
3013         g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3014 
3015         gst_structure_free (extra_headers);
3016       } else {
3017         g_object_set (uri_handler, "extra-headers", NULL, NULL);
3018       }
3019     }
3020 
3021     /* Source bin creation */
3022     bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3023     stream->src = gst_bin_new (bin_name);
3024     g_free (bin_name);
3025     if (stream->src == NULL) {
3026       gst_object_unref (queue);
3027       gst_object_unref (uri_handler);
3028       return FALSE;
3029     }
3030 
3031     gst_bin_add (GST_BIN_CAST (stream->src), queue);
3032     gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3033 
3034     uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3035     queue_sink = gst_element_get_static_pad (queue, "sink");
3036 
3037     pad_link_ret =
3038         gst_pad_link_full (uri_handler_src, queue_sink,
3039         GST_PAD_LINK_CHECK_NOTHING);
3040     if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3041       GST_WARNING_OBJECT (demux,
3042           "Could not link pads %s:%s to %s:%s for reason %d",
3043           GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3044           pad_link_ret);
3045       g_object_unref (queue_sink);
3046       g_object_unref (uri_handler_src);
3047       gst_object_unref (stream->src);
3048       stream->src = NULL;
3049       return FALSE;
3050     }
3051 
3052     /* Add a downstream event and data probe */
3053     gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3054         (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3055 
3056     g_object_unref (queue_sink);
3057     g_object_unref (uri_handler_src);
3058     queue_src = gst_element_get_static_pad (queue, "src");
3059     stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3060     g_object_unref (queue_src);
3061     gst_element_add_pad (stream->src, stream->src_srcpad);
3062 
3063     gst_element_set_locked_state (stream->src, TRUE);
3064     gst_bin_add (GST_BIN_CAST (demux), stream->src);
3065     stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3066 
3067     /* set up our internal floating pad to drop all events from
3068      * the http src we don't care about. On the chain function
3069      * we just push the buffer forward */
3070     internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3071     stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3072     g_free (internal_name);
3073     gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3074         GST_OBJECT_CAST (demux));
3075     GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3076     gst_pad_set_element_private (stream->internal_pad, stream);
3077     gst_pad_set_active (stream->internal_pad, TRUE);
3078     gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3079     gst_pad_set_event_function (stream->internal_pad, _src_event);
3080     gst_pad_set_query_function (stream->internal_pad, _src_query);
3081 
3082     if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3083             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3084       GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3085       return FALSE;
3086     }
3087 
3088     stream->uri_handler = uri_handler;
3089     stream->queue = queue;
3090 
3091     stream->last_status_code = 200;     /* default to OK */
3092   }
3093   return TRUE;
3094 }
3095 
3096 static GstPadProbeReturn
gst_ad_stream_src_to_ready_cb(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3097 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3098     gpointer user_data)
3099 {
3100   GstAdaptiveDemuxStream *stream = user_data;
3101 
3102   /* The source's src pad is IDLE so now set the state to READY */
3103   g_mutex_lock (&stream->fragment_download_lock);
3104   stream->src_at_ready = TRUE;
3105   g_cond_signal (&stream->fragment_download_cond);
3106   g_mutex_unlock (&stream->fragment_download_lock);
3107 
3108   return GST_PAD_PROBE_REMOVE;
3109 }
3110 
3111 #ifndef GST_DISABLE_GST_DEBUG
3112 static const char *
uritype(GstAdaptiveDemuxStream * s)3113 uritype (GstAdaptiveDemuxStream * s)
3114 {
3115   if (s->downloading_header)
3116     return "header";
3117   if (s->downloading_index)
3118     return "index";
3119   return "fragment";
3120 }
3121 #endif
3122 
3123 /* must be called with manifest_lock taken.
3124  * Can temporarily release manifest_lock
3125  *
3126  * Will return when URI is fully downloaded (or aborted/errored)
3127  */
3128 static GstFlowReturn
gst_adaptive_demux_stream_download_uri(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,const gchar * uri,gint64 start,gint64 end,guint * http_status)3129 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3130     GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3131     gint64 end, guint * http_status)
3132 {
3133   GstFlowReturn ret = GST_FLOW_OK;
3134   GST_DEBUG_OBJECT (stream->pad,
3135       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3136       uritype (stream), uri, start, end);
3137 
3138   if (http_status)
3139     *http_status = 200;         /* default to ok if no further information */
3140 
3141   if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3142     ret = stream->last_ret = GST_FLOW_ERROR;
3143     return ret;
3144   }
3145 
3146   gst_element_set_locked_state (stream->src, TRUE);
3147 
3148   GST_MANIFEST_UNLOCK (demux);
3149   if (gst_element_set_state (stream->src,
3150           GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3151     /* If ranges are specified, seek to it */
3152     if (start != 0 || end != -1) {
3153       /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3154        * stop position */
3155       if (end != -1)
3156         end += 1;
3157       /* Send the seek event to the uri_handler, as the other pipeline elements
3158        * can't handle it when READY. */
3159       if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3160                   GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3161                   GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3162 
3163         GST_MANIFEST_LOCK (demux);
3164         /* looks like the source can't handle seeks in READY */
3165         g_clear_error (&stream->last_error);
3166         stream->last_error = g_error_new (GST_CORE_ERROR,
3167             GST_CORE_ERROR_NOT_IMPLEMENTED,
3168             "Source element can't handle range requests");
3169         stream->last_ret = GST_FLOW_ERROR;
3170       } else {
3171         GST_MANIFEST_LOCK (demux);
3172       }
3173     } else {
3174       GST_MANIFEST_LOCK (demux);
3175     }
3176 
3177     if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3178       stream->download_start_time =
3179           GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3180 
3181       /* src element is in state READY. Before we start it, we reset
3182        * download_finished
3183        */
3184       g_mutex_lock (&stream->fragment_download_lock);
3185       stream->download_finished = FALSE;
3186       stream->downloading_first_buffer = TRUE;
3187       g_mutex_unlock (&stream->fragment_download_lock);
3188 
3189       GST_MANIFEST_UNLOCK (demux);
3190 
3191       if (!gst_element_sync_state_with_parent (stream->src)) {
3192         GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3193         GST_MANIFEST_LOCK (demux);
3194         ret = stream->last_ret = GST_FLOW_ERROR;
3195         return ret;
3196       }
3197 
3198       /* wait for the fragment to be completely downloaded */
3199       GST_DEBUG_OBJECT (stream->pad,
3200           "Waiting for %s download to finish: %s", uritype (stream), uri);
3201 
3202       g_mutex_lock (&stream->fragment_download_lock);
3203       stream->src_at_ready = FALSE;
3204       if (G_UNLIKELY (stream->cancelled)) {
3205         g_mutex_unlock (&stream->fragment_download_lock);
3206         GST_MANIFEST_LOCK (demux);
3207         ret = stream->last_ret = GST_FLOW_FLUSHING;
3208         return ret;
3209       }
3210       /* download_finished is only set:
3211        * * in ::fragment_download_finish()
3212        * * if EOS is received on the _src pad
3213        * */
3214       while (!stream->cancelled && !stream->download_finished) {
3215         g_cond_wait (&stream->fragment_download_cond,
3216             &stream->fragment_download_lock);
3217       }
3218       g_mutex_unlock (&stream->fragment_download_lock);
3219 
3220       GST_DEBUG_OBJECT (stream->pad,
3221           "Finished Waiting for %s download: %s", uritype (stream), uri);
3222 
3223       GST_MANIFEST_LOCK (demux);
3224       g_mutex_lock (&stream->fragment_download_lock);
3225       if (G_UNLIKELY (stream->cancelled)) {
3226         ret = stream->last_ret = GST_FLOW_FLUSHING;
3227         g_mutex_unlock (&stream->fragment_download_lock);
3228         return ret;
3229       }
3230       g_mutex_unlock (&stream->fragment_download_lock);
3231 
3232       ret = stream->last_ret;
3233 
3234       GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3235           uritype (stream), uri, stream->last_ret,
3236           gst_flow_get_name (stream->last_ret));
3237       if (stream->last_ret != GST_FLOW_OK && http_status) {
3238         *http_status = stream->last_status_code;
3239       }
3240     }
3241 
3242     /* changing src element state might try to join the streaming thread, so
3243      * we must not hold the manifest lock.
3244      */
3245     GST_MANIFEST_UNLOCK (demux);
3246   } else {
3247     GST_MANIFEST_UNLOCK (demux);
3248     if (stream->last_ret == GST_FLOW_OK)
3249       stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3250     ret = GST_FLOW_CUSTOM_ERROR;
3251   }
3252 
3253   stream->src_at_ready = FALSE;
3254 
3255   gst_element_set_locked_state (stream->src, TRUE);
3256   gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3257       gst_ad_stream_src_to_ready_cb, stream, NULL);
3258 
3259   g_mutex_lock (&stream->fragment_download_lock);
3260   while (!stream->src_at_ready) {
3261     g_cond_wait (&stream->fragment_download_cond,
3262         &stream->fragment_download_lock);
3263   }
3264   g_mutex_unlock (&stream->fragment_download_lock);
3265 
3266   gst_element_set_state (stream->src, GST_STATE_READY);
3267 
3268   /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3269   GST_MANIFEST_LOCK (demux);
3270   g_mutex_lock (&stream->fragment_download_lock);
3271   if (G_UNLIKELY (stream->cancelled)) {
3272     ret = stream->last_ret = GST_FLOW_FLUSHING;
3273     g_mutex_unlock (&stream->fragment_download_lock);
3274     return ret;
3275   }
3276   g_mutex_unlock (&stream->fragment_download_lock);
3277 
3278   /* deactivate and reactivate our ghostpad to make it fresh for a new
3279    * stream */
3280   gst_pad_set_active (stream->internal_pad, FALSE);
3281   gst_pad_set_active (stream->internal_pad, TRUE);
3282 
3283   return ret;
3284 }
3285 
3286 /* must be called with manifest_lock taken.
3287  * Can temporarily release manifest_lock
3288  */
3289 static GstFlowReturn
gst_adaptive_demux_stream_download_header_fragment(GstAdaptiveDemuxStream * stream)3290 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3291     stream)
3292 {
3293   GstAdaptiveDemux *demux = stream->demux;
3294   GstFlowReturn ret = GST_FLOW_OK;
3295 
3296   if (stream->fragment.header_uri != NULL) {
3297     GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3298         G_GINT64_FORMAT, stream->fragment.header_uri,
3299         stream->fragment.header_range_start, stream->fragment.header_range_end);
3300 
3301     stream->downloading_header = TRUE;
3302     ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3303         stream->fragment.header_uri, stream->fragment.header_range_start,
3304         stream->fragment.header_range_end, NULL);
3305     stream->downloading_header = FALSE;
3306   }
3307 
3308   /* check if we have an index */
3309   if (ret == GST_FLOW_OK) {     /* TODO check for other valid types */
3310 
3311     if (stream->fragment.index_uri != NULL) {
3312       GST_DEBUG_OBJECT (demux,
3313           "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3314           stream->fragment.index_uri,
3315           stream->fragment.index_range_start, stream->fragment.index_range_end);
3316       stream->downloading_index = TRUE;
3317       ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3318           stream->fragment.index_uri, stream->fragment.index_range_start,
3319           stream->fragment.index_range_end, NULL);
3320       stream->downloading_index = FALSE;
3321     }
3322   }
3323 
3324   return ret;
3325 }
3326 
3327 /* must be called with manifest_lock taken.
3328  * Can temporarily release manifest_lock
3329  */
3330 static GstFlowReturn
gst_adaptive_demux_stream_download_fragment(GstAdaptiveDemuxStream * stream)3331 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3332 {
3333   GstAdaptiveDemux *demux = stream->demux;
3334   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3335   gchar *url = NULL;
3336   GstFlowReturn ret;
3337   gboolean retried_once = FALSE, live;
3338   guint http_status;
3339   guint last_status_code;
3340 
3341   /* FIXME :  */
3342   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3343   stream->starting_fragment = TRUE;
3344   stream->last_ret = GST_FLOW_OK;
3345   stream->first_fragment_buffer = TRUE;
3346 
3347   GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3348       stream->fragment.uri ? "FRAGMENT " : "",
3349       stream->fragment.header_uri ? "HEADER " : "",
3350       stream->fragment.index_uri ? "INDEX" : "");
3351 
3352   if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3353       stream->fragment.index_uri == NULL)
3354     goto no_url_error;
3355 
3356   if (stream->need_header) {
3357     ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3358     if (ret != GST_FLOW_OK) {
3359       return ret;
3360     }
3361     stream->need_header = FALSE;
3362   }
3363 
3364 again:
3365   ret = GST_FLOW_OK;
3366   url = stream->fragment.uri;
3367   GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3368   if (!url)
3369     return ret;
3370 
3371   stream->last_ret = GST_FLOW_OK;
3372   http_status = 200;
3373 
3374   /* Download the actual fragment, either in fragments or in one go */
3375   if (klass->need_another_chunk && klass->need_another_chunk (stream)
3376       && stream->fragment.chunk_size != 0) {
3377     /* Handle chunk downloading */
3378     gint64 range_start, range_end, chunk_start, chunk_end;
3379     guint64 download_total_bytes;
3380     gint chunk_size = stream->fragment.chunk_size;
3381 
3382     range_start = chunk_start = stream->fragment.range_start;
3383     range_end = stream->fragment.range_end;
3384     /* HTTP ranges are inclusive for the end */
3385     if (chunk_size != -1)
3386       chunk_end = range_start + chunk_size - 1;
3387     else
3388       chunk_end = range_end;
3389 
3390     if (range_end != -1)
3391       chunk_end = MIN (chunk_end, range_end);
3392 
3393     while (!stream->fragment.finished && (chunk_start <= range_end
3394             || range_end == -1)) {
3395       download_total_bytes = stream->download_total_bytes;
3396 
3397       ret =
3398           gst_adaptive_demux_stream_download_uri (demux, stream, url,
3399           chunk_start, chunk_end, &http_status);
3400 
3401       GST_DEBUG_OBJECT (stream->pad,
3402           "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3403           http_status, gst_flow_get_name (stream->last_ret));
3404 
3405       /* Don't retry for any chunks except the first. We would have sent
3406        * data downstream already otherwise and it's difficult to recover
3407        * from that in a meaningful way */
3408       if (chunk_start > range_start)
3409         retried_once = TRUE;
3410 
3411       /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3412        * downloading up to -1. We don't know the full duration.
3413        * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3414       if (ret != GST_FLOW_OK && chunk_end == -1) {
3415         break;
3416       } else if (ret != GST_FLOW_OK) {
3417         chunk_end = -1;
3418         stream->last_ret = GST_FLOW_OK;
3419         continue;
3420       }
3421 
3422       if (chunk_end == -1)
3423         break;
3424 
3425       /* Short read, we're at the end now */
3426       if (stream->download_total_bytes - download_total_bytes <
3427           chunk_end + 1 - chunk_start)
3428         break;
3429 
3430       if (!klass->need_another_chunk (stream))
3431         break;
3432 
3433       /* HTTP ranges are inclusive for the end */
3434       chunk_start += chunk_size;
3435       chunk_size = stream->fragment.chunk_size;
3436       if (chunk_size != -1)
3437         chunk_end = chunk_start + chunk_size - 1;
3438       else
3439         chunk_end = range_end;
3440 
3441       if (range_end != -1)
3442         chunk_end = MIN (chunk_end, range_end);
3443     }
3444   } else {
3445     ret =
3446         gst_adaptive_demux_stream_download_uri (demux, stream, url,
3447         stream->fragment.range_start, stream->fragment.range_end, &http_status);
3448     GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3449         stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3450   }
3451   if (ret == GST_FLOW_OK)
3452     goto beach;
3453 
3454   g_mutex_lock (&stream->fragment_download_lock);
3455   if (G_UNLIKELY (stream->cancelled)) {
3456     g_mutex_unlock (&stream->fragment_download_lock);
3457     return ret;
3458   }
3459   g_mutex_unlock (&stream->fragment_download_lock);
3460 
3461   /* TODO check if we are truly stopping */
3462   if (ret != GST_FLOW_CUSTOM_ERROR)
3463     goto beach;
3464 
3465   last_status_code = stream->last_status_code;
3466   GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3467       last_status_code, stream->download_error_count);
3468 
3469   live = gst_adaptive_demux_is_live (demux);
3470   if (!retried_once && ((last_status_code / 100 == 4 && live)
3471           || last_status_code / 100 == 5)) {
3472     /* 4xx/5xx */
3473     /* if current position is before available start, switch to next */
3474     if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3475       goto flushing;
3476 
3477     if (live) {
3478       gint64 range_start, range_stop;
3479 
3480       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3481               &range_stop))
3482         goto flushing;
3483 
3484       if (demux->segment.position < range_start) {
3485         GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3486         stream->last_ret = GST_FLOW_OK;
3487         ret = gst_adaptive_demux_eos_handling (stream);
3488         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3489             gst_flow_get_name (ret));
3490         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3491         ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3492         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3493             gst_flow_get_name (ret));
3494         if (ret == GST_FLOW_OK) {
3495           retried_once = TRUE;
3496           goto again;
3497         }
3498       } else if (demux->segment.position > range_stop) {
3499         /* wait a bit to be in range, we don't have any locks at that point */
3500         gint64 wait_time =
3501             gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3502         if (wait_time > 0) {
3503           gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3504 
3505           GST_DEBUG_OBJECT (stream->pad,
3506               "Download waiting for %" GST_TIME_FORMAT,
3507               GST_TIME_ARGS (wait_time));
3508 
3509           GST_MANIFEST_UNLOCK (demux);
3510           g_mutex_lock (&stream->fragment_download_lock);
3511           if (G_UNLIKELY (stream->cancelled)) {
3512             g_mutex_unlock (&stream->fragment_download_lock);
3513             GST_MANIFEST_LOCK (demux);
3514             stream->last_ret = GST_FLOW_FLUSHING;
3515             goto flushing;
3516           }
3517           do {
3518             g_cond_wait_until (&stream->fragment_download_cond,
3519                 &stream->fragment_download_lock, end_time);
3520             if (G_UNLIKELY (stream->cancelled)) {
3521               g_mutex_unlock (&stream->fragment_download_lock);
3522               GST_MANIFEST_LOCK (demux);
3523               stream->last_ret = GST_FLOW_FLUSHING;
3524               goto flushing;
3525             }
3526           } while (!stream->download_finished);
3527           g_mutex_unlock (&stream->fragment_download_lock);
3528 
3529           GST_MANIFEST_LOCK (demux);
3530         }
3531       }
3532     }
3533 
3534   flushing:
3535     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3536       /* looks like there is no way of knowing when a live stream has ended
3537        * Have to assume we are falling behind and cause a manifest reload */
3538       GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3539       return GST_FLOW_EOS;
3540     }
3541   } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3542     /* If this is the last fragment, consider failures EOS and not actual
3543      * errors. Due to rounding errors in the durations, the last fragment
3544      * might not actually exist */
3545     GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3546     return GST_FLOW_EOS;
3547   } else {
3548     /* retry once (same segment) for 5xx (server errors) */
3549     if (!retried_once) {
3550       retried_once = TRUE;
3551       /* wait a short time in case the server needs a bit to recover, we don't
3552        * care if we get woken up before end time. We can use sleep here since
3553        * we're already blocking and just want to wait some time. */
3554       g_usleep (100000);        /* a tenth of a second */
3555       goto again;
3556     }
3557   }
3558 
3559 beach:
3560   return ret;
3561 
3562 no_url_error:
3563   {
3564     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3565         (_("Failed to get fragment URL.")),
3566         ("An error happened when getting fragment URL"));
3567     gst_task_stop (stream->download_task);
3568     return GST_FLOW_ERROR;
3569   }
3570 }
3571 
3572 /* this function will take the manifest_lock and will keep it until the end.
3573  * It will release it temporarily only when going to sleep.
3574  * Every time it takes the manifest_lock, it will check for cancelled condition
3575  */
3576 static void
gst_adaptive_demux_stream_download_loop(GstAdaptiveDemuxStream * stream)3577 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3578 {
3579   GstAdaptiveDemux *demux = stream->demux;
3580   GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3581   GstFlowReturn ret;
3582   gboolean live;
3583 
3584   GST_LOG_OBJECT (stream->pad, "download loop start");
3585 
3586   GST_MANIFEST_LOCK (demux);
3587 
3588   g_mutex_lock (&stream->fragment_download_lock);
3589   if (G_UNLIKELY (stream->cancelled)) {
3590     stream->last_ret = GST_FLOW_FLUSHING;
3591     g_mutex_unlock (&stream->fragment_download_lock);
3592     goto cancelled;
3593   }
3594   g_mutex_unlock (&stream->fragment_download_lock);
3595 
3596   /* Check if we're done with our segment */
3597   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3598   if (demux->segment.rate > 0) {
3599     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3600         && stream->segment.position >= stream->segment.stop) {
3601       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3602       ret = GST_FLOW_EOS;
3603       gst_task_stop (stream->download_task);
3604       goto end_of_manifest;
3605     }
3606   } else {
3607     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3608         && stream->segment.position <= stream->segment.start) {
3609       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3610       ret = GST_FLOW_EOS;
3611       gst_task_stop (stream->download_task);
3612       goto end_of_manifest;
3613     }
3614   }
3615   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3616 
3617   /* Cleanup old streams if any */
3618   if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3619     GList *old_streams = demux->priv->old_streams;
3620     demux->priv->old_streams = NULL;
3621 
3622     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
3623     g_list_free_full (old_streams,
3624         (GDestroyNotify) gst_adaptive_demux_stream_free);
3625     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
3626 
3627     /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
3628      * Recheck the cancelled flag.
3629      */
3630     g_mutex_lock (&stream->fragment_download_lock);
3631     if (G_UNLIKELY (stream->cancelled)) {
3632       stream->last_ret = GST_FLOW_FLUSHING;
3633       g_mutex_unlock (&stream->fragment_download_lock);
3634       goto cancelled;
3635     }
3636     g_mutex_unlock (&stream->fragment_download_lock);
3637   }
3638 
3639   /* Restarting download, figure out new position
3640    * FIXME : Move this to a separate function ? */
3641   if (G_UNLIKELY (stream->restart_download)) {
3642     GstEvent *seg_event;
3643     GstClockTime cur, ts = 0;
3644     gint64 pos;
3645 
3646     GST_DEBUG_OBJECT (stream->pad,
3647         "Activating stream due to reconfigure event");
3648 
3649     if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
3650       ts = (GstClockTime) pos;
3651       GST_DEBUG_OBJECT (demux, "Downstream position: %"
3652           GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3653     } else {
3654       /* query other pads as some faulty element in the pad's branch might
3655        * reject position queries. This should be better than using the
3656        * demux segment position that can be much ahead */
3657       GList *iter;
3658 
3659       for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
3660         GstAdaptiveDemuxStream *cur_stream =
3661             (GstAdaptiveDemuxStream *) iter->data;
3662 
3663         if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
3664                 &pos)) {
3665           ts = (GstClockTime) pos;
3666           GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
3667               GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3668           break;
3669         }
3670       }
3671     }
3672 
3673     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3674     cur =
3675         gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
3676         stream->segment.position);
3677 
3678     /* we might have already pushed this data */
3679     ts = MAX (ts, cur);
3680 
3681     GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
3682         "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3683 
3684     if (GST_CLOCK_TIME_IS_VALID (ts)) {
3685       GstClockTime offset, period_start;
3686 
3687       offset =
3688           gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3689       period_start = gst_adaptive_demux_get_period_start_time (demux);
3690 
3691       /* TODO check return */
3692       gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
3693           0, ts, &ts);
3694 
3695       stream->segment.position = ts - period_start + offset;
3696     }
3697 
3698     /* The stream's segment is still correct except for
3699      * the position, so let's send a new one with the
3700      * updated position */
3701     seg_event = gst_event_new_segment (&stream->segment);
3702     gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
3703     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3704 
3705     GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
3706         GST_PTR_FORMAT, seg_event);
3707     gst_pad_push_event (stream->pad, seg_event);
3708 
3709     stream->discont = TRUE;
3710     stream->restart_download = FALSE;
3711   }
3712 
3713   live = gst_adaptive_demux_is_live (demux);
3714 
3715   /* Get information about the fragment to download */
3716   GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3717   ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3718   GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
3719       ret, gst_flow_get_name (ret));
3720   if (ret == GST_FLOW_OK) {
3721 
3722     /* wait for live fragments to be available */
3723     if (live) {
3724       gint64 wait_time =
3725           gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3726       if (wait_time > 0) {
3727         GstClockTime end_time =
3728             gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
3729 
3730         GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
3731             GST_TIME_ARGS (wait_time));
3732 
3733         GST_MANIFEST_UNLOCK (demux);
3734 
3735         g_mutex_lock (&stream->fragment_download_lock);
3736         if (G_UNLIKELY (stream->cancelled)) {
3737           g_mutex_unlock (&stream->fragment_download_lock);
3738           GST_MANIFEST_LOCK (demux);
3739           stream->last_ret = GST_FLOW_FLUSHING;
3740           goto cancelled;
3741         }
3742         gst_adaptive_demux_wait_until (demux->realtime_clock,
3743             &stream->fragment_download_cond, &stream->fragment_download_lock,
3744             end_time);
3745         g_mutex_unlock (&stream->fragment_download_lock);
3746 
3747         GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
3748 
3749         GST_MANIFEST_LOCK (demux);
3750 
3751         g_mutex_lock (&stream->fragment_download_lock);
3752         if (G_UNLIKELY (stream->cancelled)) {
3753           stream->last_ret = GST_FLOW_FLUSHING;
3754           g_mutex_unlock (&stream->fragment_download_lock);
3755           goto cancelled;
3756         }
3757         g_mutex_unlock (&stream->fragment_download_lock);
3758       }
3759     }
3760 
3761     stream->last_ret = GST_FLOW_OK;
3762 
3763     next_download = gst_adaptive_demux_get_monotonic_time (demux);
3764     ret = gst_adaptive_demux_stream_download_fragment (stream);
3765 
3766     if (ret == GST_FLOW_FLUSHING) {
3767       g_mutex_lock (&stream->fragment_download_lock);
3768       if (G_UNLIKELY (stream->cancelled)) {
3769         stream->last_ret = GST_FLOW_FLUSHING;
3770         g_mutex_unlock (&stream->fragment_download_lock);
3771         goto cancelled;
3772       }
3773       g_mutex_unlock (&stream->fragment_download_lock);
3774     }
3775 
3776   } else {
3777     stream->last_ret = ret;
3778   }
3779 
3780   switch (ret) {
3781     case GST_FLOW_OK:
3782       break;                    /* all is good, let's go */
3783     case GST_FLOW_EOS:
3784       GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
3785 
3786       /* we push the EOS after releasing the object lock */
3787       if (gst_adaptive_demux_is_live (demux)
3788           && (demux->segment.rate == 1.0
3789               || gst_adaptive_demux_stream_in_live_seek_range (demux,
3790                   stream))) {
3791         GstAdaptiveDemuxClass *demux_class =
3792             GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3793 
3794         /* this might be a fragment download error, refresh the manifest, just in case */
3795         if (!demux_class->requires_periodical_playlist_update (demux)) {
3796           ret = gst_adaptive_demux_update_manifest (demux);
3797           break;
3798           /* Wait only if we can ensure current manifest has been expired.
3799            * The meaning "we have next period" *WITH* EOS is that, current
3800            * period has been ended but we can continue to the next period */
3801         } else if (!gst_adaptive_demux_has_next_period (demux) &&
3802             gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
3803           goto end;
3804         }
3805         gst_task_stop (stream->download_task);
3806         if (stream->replaced) {
3807           goto end;
3808         }
3809       } else {
3810         gst_task_stop (stream->download_task);
3811       }
3812 
3813       if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
3814         if (gst_adaptive_demux_has_next_period (demux)) {
3815           GST_DEBUG_OBJECT (stream->pad,
3816               "Next period available, not sending EOS");
3817           gst_adaptive_demux_advance_period (demux);
3818           ret = GST_FLOW_OK;
3819         }
3820       }
3821       break;
3822 
3823     case GST_FLOW_NOT_LINKED:
3824     {
3825       GstFlowReturn ret;
3826       gst_task_stop (stream->download_task);
3827 
3828       ret = gst_adaptive_demux_combine_flows (demux);
3829       if (ret == GST_FLOW_NOT_LINKED) {
3830         GST_ELEMENT_FLOW_ERROR (demux, ret);
3831       }
3832     }
3833       break;
3834 
3835     case GST_FLOW_FLUSHING:{
3836       GList *iter;
3837 
3838       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
3839         GstAdaptiveDemuxStream *other;
3840 
3841         other = iter->data;
3842         gst_task_stop (other->download_task);
3843       }
3844     }
3845       break;
3846 
3847     default:
3848       if (ret <= GST_FLOW_ERROR) {
3849         gboolean is_live = gst_adaptive_demux_is_live (demux);
3850         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
3851         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
3852           goto download_error;
3853         }
3854 
3855         g_clear_error (&stream->last_error);
3856 
3857         /* First try to update the playlist for non-live playlists
3858          * in case the URIs have changed in the meantime. But only
3859          * try it the first time, after that we're going to wait a
3860          * a bit to not flood the server */
3861         if (stream->download_error_count == 1 && !is_live) {
3862           /* TODO hlsdemux had more options to this function (boolean and err) */
3863 
3864           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3865             /* Retry immediately, the playlist actually has changed */
3866             GST_DEBUG_OBJECT (demux, "Updated the playlist");
3867             goto end;
3868           }
3869         }
3870 
3871         /* Wait half the fragment duration before retrying */
3872         next_download += stream->fragment.duration / 2;
3873 
3874         GST_MANIFEST_UNLOCK (demux);
3875 
3876         g_mutex_lock (&stream->fragment_download_lock);
3877         if (G_UNLIKELY (stream->cancelled)) {
3878           g_mutex_unlock (&stream->fragment_download_lock);
3879           GST_MANIFEST_LOCK (demux);
3880           stream->last_ret = GST_FLOW_FLUSHING;
3881           goto cancelled;
3882         }
3883         gst_adaptive_demux_wait_until (demux->realtime_clock,
3884             &stream->fragment_download_cond, &stream->fragment_download_lock,
3885             next_download);
3886         g_mutex_unlock (&stream->fragment_download_lock);
3887 
3888         GST_DEBUG_OBJECT (demux, "Retrying now");
3889 
3890         GST_MANIFEST_LOCK (demux);
3891 
3892         g_mutex_lock (&stream->fragment_download_lock);
3893         if (G_UNLIKELY (stream->cancelled)) {
3894           stream->last_ret = GST_FLOW_FLUSHING;
3895           g_mutex_unlock (&stream->fragment_download_lock);
3896           goto cancelled;
3897         }
3898         g_mutex_unlock (&stream->fragment_download_lock);
3899 
3900         /* Refetch the playlist now after we waited */
3901         if (!is_live
3902             && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3903           GST_DEBUG_OBJECT (demux, "Updated the playlist");
3904         }
3905         goto end;
3906       }
3907       break;
3908   }
3909 
3910 end_of_manifest:
3911   if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3912     if (GST_OBJECT_PARENT (stream->pad) != NULL) {
3913       if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
3914         GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
3915         gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3916       } else {
3917         GST_DEBUG_OBJECT (stream->src,
3918             "Stream is EOS, but we're switching fragments. Not sending.");
3919       }
3920     } else {
3921       GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
3922       goto download_error;
3923     }
3924   }
3925 
3926 end:
3927   GST_MANIFEST_UNLOCK (demux);
3928   GST_LOG_OBJECT (stream->pad, "download loop end");
3929   return;
3930 
3931 cancelled:
3932   {
3933     GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3934     goto end;
3935   }
3936 download_error:
3937   {
3938     GstMessage *msg;
3939 
3940     if (stream->last_error) {
3941       gchar *debug = g_strdup_printf ("Error on stream %s:%s",
3942           GST_DEBUG_PAD_NAME (stream->pad));
3943       msg =
3944           gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
3945           debug);
3946       GST_ERROR_OBJECT (stream->pad, "Download error: %s",
3947           stream->last_error->message);
3948       g_free (debug);
3949     } else {
3950       GError *err =
3951           g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
3952           _("Couldn't download fragments"));
3953       msg =
3954           gst_message_new_error (GST_OBJECT_CAST (demux), err,
3955           "Fragment downloading has failed consecutive times");
3956       g_error_free (err);
3957       GST_ERROR_OBJECT (stream->pad,
3958           "Download error: Couldn't download fragments, too many failures");
3959     }
3960 
3961     gst_task_stop (stream->download_task);
3962     if (stream->src) {
3963       GstElement *src = stream->src;
3964 
3965       stream->src = NULL;
3966       GST_MANIFEST_UNLOCK (demux);
3967       gst_element_set_locked_state (src, TRUE);
3968       gst_element_set_state (src, GST_STATE_NULL);
3969       gst_bin_remove (GST_BIN_CAST (demux), src);
3970       GST_MANIFEST_LOCK (demux);
3971     }
3972 
3973     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3974 
3975     goto end;
3976   }
3977 }
3978 
3979 static void
gst_adaptive_demux_updates_loop(GstAdaptiveDemux * demux)3980 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
3981 {
3982   GstClockTime next_update;
3983   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3984 
3985   /* Loop for updating of the playlist. This periodically checks if
3986    * the playlist is updated and does so, then signals the streaming
3987    * thread in case it can continue downloading now. */
3988 
3989   /* block until the next scheduled update or the signal to quit this thread */
3990   GST_DEBUG_OBJECT (demux, "Started updates task");
3991 
3992   GST_MANIFEST_LOCK (demux);
3993 
3994   next_update =
3995       gst_adaptive_demux_get_monotonic_time (demux) +
3996       klass->get_manifest_update_interval (demux) * GST_USECOND;
3997 
3998   /* Updating playlist only needed for live playlists */
3999   while (gst_adaptive_demux_is_live (demux)) {
4000     GstFlowReturn ret = GST_FLOW_OK;
4001 
4002     /* Wait here until we should do the next update or we're cancelled */
4003     GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4004 
4005     GST_MANIFEST_UNLOCK (demux);
4006 
4007     g_mutex_lock (&demux->priv->updates_timed_lock);
4008     if (demux->priv->stop_updates_task) {
4009       g_mutex_unlock (&demux->priv->updates_timed_lock);
4010       goto quit;
4011     }
4012     gst_adaptive_demux_wait_until (demux->realtime_clock,
4013         &demux->priv->updates_timed_cond,
4014         &demux->priv->updates_timed_lock, next_update);
4015     g_mutex_unlock (&demux->priv->updates_timed_lock);
4016 
4017     g_mutex_lock (&demux->priv->updates_timed_lock);
4018     if (demux->priv->stop_updates_task) {
4019       g_mutex_unlock (&demux->priv->updates_timed_lock);
4020       goto quit;
4021     }
4022     g_mutex_unlock (&demux->priv->updates_timed_lock);
4023 
4024     GST_MANIFEST_LOCK (demux);
4025 
4026     GST_DEBUG_OBJECT (demux, "Updating playlist");
4027 
4028     ret = gst_adaptive_demux_update_manifest (demux);
4029 
4030     if (ret == GST_FLOW_EOS) {
4031     } else if (ret != GST_FLOW_OK) {
4032       /* update_failed_count is used only here, no need to protect it */
4033       demux->priv->update_failed_count++;
4034       if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4035         GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4036             gst_flow_get_name (ret));
4037         next_update = gst_adaptive_demux_get_monotonic_time (demux)
4038             + klass->get_manifest_update_interval (demux) * GST_USECOND;
4039       } else {
4040         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4041             (_("Internal data stream error.")), ("Could not update playlist"));
4042         GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4043         gst_task_stop (demux->priv->updates_task);
4044         GST_MANIFEST_UNLOCK (demux);
4045         goto end;
4046       }
4047     } else {
4048       GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4049       demux->priv->update_failed_count = 0;
4050       next_update =
4051           gst_adaptive_demux_get_monotonic_time (demux) +
4052           klass->get_manifest_update_interval (demux) * GST_USECOND;
4053 
4054       /* Wake up download tasks */
4055       g_mutex_lock (&demux->priv->manifest_update_lock);
4056       g_cond_broadcast (&demux->priv->manifest_cond);
4057       g_mutex_unlock (&demux->priv->manifest_update_lock);
4058     }
4059   }
4060 
4061   GST_MANIFEST_UNLOCK (demux);
4062 
4063 quit:
4064   {
4065     GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4066   }
4067 
4068 end:
4069   {
4070     return;
4071   }
4072 }
4073 
4074 /* must be called with manifest_lock taken */
4075 static gboolean
gst_adaptive_demux_stream_push_event(GstAdaptiveDemuxStream * stream,GstEvent * event)4076 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4077     GstEvent * event)
4078 {
4079   gboolean ret;
4080   GstPad *pad;
4081   GstAdaptiveDemux *demux = stream->demux;
4082 
4083   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4084     stream->eos = TRUE;
4085   }
4086 
4087   pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4088 
4089   /* Can't push events holding the manifest lock */
4090   GST_MANIFEST_UNLOCK (demux);
4091 
4092   GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4093       "Pushing event %" GST_PTR_FORMAT, event);
4094 
4095   ret = gst_pad_push_event (pad, event);
4096 
4097   gst_object_unref (pad);
4098 
4099   GST_MANIFEST_LOCK (demux);
4100 
4101   return ret;
4102 }
4103 
4104 /* must be called with manifest_lock taken */
4105 static gboolean
gst_adaptive_demux_is_live(GstAdaptiveDemux * demux)4106 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4107 {
4108   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4109 
4110   if (klass->is_live)
4111     return klass->is_live (demux);
4112   return FALSE;
4113 }
4114 
4115 /* must be called with manifest_lock taken */
4116 static GstFlowReturn
gst_adaptive_demux_stream_seek(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,gboolean forward,GstSeekFlags flags,GstClockTime ts,GstClockTime * final_ts)4117 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4118     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4119     GstClockTime ts, GstClockTime * final_ts)
4120 {
4121   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4122 
4123   if (klass->stream_seek)
4124     return klass->stream_seek (stream, forward, flags, ts, final_ts);
4125   return GST_FLOW_ERROR;
4126 }
4127 
4128 /* must be called with manifest_lock taken */
4129 static gboolean
gst_adaptive_demux_stream_has_next_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4130 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4131     GstAdaptiveDemuxStream * stream)
4132 {
4133   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4134   gboolean ret = TRUE;
4135 
4136   if (klass->stream_has_next_fragment)
4137     ret = klass->stream_has_next_fragment (stream);
4138 
4139   return ret;
4140 }
4141 
4142 /* must be called with manifest_lock taken */
4143 /* Called from:
4144  *  the ::finish_fragment() handlers when an *actual* fragment is done
4145  *   */
4146 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4147 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4148     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4149 {
4150   GstFlowReturn ret;
4151 
4152   if (stream->last_ret == GST_FLOW_OK) {
4153     stream->last_ret =
4154         gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4155         duration);
4156   }
4157   ret = stream->last_ret;
4158 
4159   return ret;
4160 }
4161 
4162 /* must be called with manifest_lock taken */
4163 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4164 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4165     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4166 {
4167   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4168   GstFlowReturn ret;
4169 
4170   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4171 
4172   GST_LOG_OBJECT (stream->pad,
4173       "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4174       GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4175 
4176   stream->download_error_count = 0;
4177   g_clear_error (&stream->last_error);
4178 
4179   /* FIXME - url has no indication of byte ranges for subsegments */
4180   /* FIXME : All those time statistics are biased, since they are calculated
4181    * *AFTER* the queue2, which might be blocking. They should ideally be
4182    * calculated *before* queue2 in the uri_handler_probe */
4183   gst_element_post_message (GST_ELEMENT_CAST (demux),
4184       gst_message_new_element (GST_OBJECT_CAST (demux),
4185           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4186               "manifest-uri", G_TYPE_STRING,
4187               demux->manifest_uri, "uri", G_TYPE_STRING,
4188               stream->fragment.uri, "fragment-start-time",
4189               GST_TYPE_CLOCK_TIME, stream->download_start_time,
4190               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4191               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4192               stream->download_total_bytes, "fragment-download-time",
4193               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4194 
4195   /* Don't update to the end of the segment if in reverse playback */
4196   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4197   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4198     GstClockTime offset =
4199         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4200     GstClockTime period_start =
4201         gst_adaptive_demux_get_period_start_time (demux);
4202 
4203     stream->segment.position += duration;
4204 
4205     /* Convert from position inside the stream's segment to the demuxer's
4206      * segment, they are not necessarily the same */
4207     if (stream->segment.position - offset + period_start >
4208         demux->segment.position)
4209       demux->segment.position =
4210           stream->segment.position - offset + period_start;
4211   }
4212   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4213 
4214   /* When advancing with a non 1.0 rate on live streams, we need to check
4215    * the live seeking range again to make sure we can still advance to
4216    * that position */
4217   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4218     if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4219       ret = GST_FLOW_EOS;
4220     else
4221       ret = klass->stream_advance_fragment (stream);
4222   } else if (gst_adaptive_demux_is_live (demux)
4223       || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4224     ret = klass->stream_advance_fragment (stream);
4225   } else {
4226     ret = GST_FLOW_EOS;
4227   }
4228 
4229   stream->download_start_time =
4230       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4231 
4232   if (ret == GST_FLOW_OK) {
4233     if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4234             gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4235       stream->need_header = TRUE;
4236       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4237     }
4238 
4239     /* the subclass might want to switch pads */
4240     if (G_UNLIKELY (demux->next_streams)) {
4241       GList *iter;
4242       gboolean can_expose = TRUE;
4243 
4244       gst_task_stop (stream->download_task);
4245 
4246       ret = GST_FLOW_EOS;
4247 
4248       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4249         /* Only expose if all streams are now cancelled or finished downloading */
4250         GstAdaptiveDemuxStream *other = iter->data;
4251         if (other != stream) {
4252           g_mutex_lock (&other->fragment_download_lock);
4253           can_expose &= (other->cancelled == TRUE
4254               || other->download_finished == TRUE);
4255           g_mutex_unlock (&other->fragment_download_lock);
4256         }
4257       }
4258 
4259       if (can_expose) {
4260         GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4261             "to do bitrate switching");
4262         gst_adaptive_demux_prepare_streams (demux, FALSE);
4263         gst_adaptive_demux_start_tasks (demux, TRUE);
4264       } else {
4265         GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4266       }
4267     }
4268   }
4269 
4270   return ret;
4271 }
4272 
4273 /* must be called with manifest_lock taken */
4274 static gboolean
gst_adaptive_demux_stream_select_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 bitrate)4275 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4276     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4277 {
4278   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4279 
4280   if (klass->stream_select_bitrate)
4281     return klass->stream_select_bitrate (stream, bitrate);
4282   return FALSE;
4283 }
4284 
4285 /* must be called with manifest_lock taken */
4286 static GstFlowReturn
gst_adaptive_demux_stream_update_fragment_info(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4287 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4288     GstAdaptiveDemuxStream * stream)
4289 {
4290   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4291   GstFlowReturn ret;
4292 
4293   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4294       GST_FLOW_ERROR);
4295 
4296   /* Make sure the sub-class will update bitrate, or else
4297    * we will later */
4298   stream->fragment.bitrate = 0;
4299   stream->fragment.finished = FALSE;
4300 
4301   GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4302       GST_TIME_ARGS (stream->segment.position));
4303 
4304   ret = klass->stream_update_fragment_info (stream);
4305 
4306   GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4307       stream->fragment.uri);
4308   if (ret == GST_FLOW_OK) {
4309     GST_LOG_OBJECT (stream->pad,
4310         "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4311         GST_TIME_ARGS (stream->fragment.timestamp),
4312         GST_TIME_ARGS (stream->fragment.duration));
4313     GST_LOG_OBJECT (stream->pad,
4314         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4315         stream->fragment.range_start, stream->fragment.range_end);
4316   }
4317 
4318   return ret;
4319 }
4320 
4321 /* must be called with manifest_lock taken */
4322 static gint64
gst_adaptive_demux_stream_get_fragment_waiting_time(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4323 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4324     demux, GstAdaptiveDemuxStream * stream)
4325 {
4326   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4327 
4328   if (klass->stream_get_fragment_waiting_time)
4329     return klass->stream_get_fragment_waiting_time (stream);
4330   return 0;
4331 }
4332 
4333 /* must be called with manifest_lock taken */
4334 static GstFlowReturn
gst_adaptive_demux_update_manifest_default(GstAdaptiveDemux * demux)4335 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4336 {
4337   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4338   GstFragment *download;
4339   GstBuffer *buffer;
4340   GstFlowReturn ret;
4341   GError *error = NULL;
4342 
4343   download = gst_uri_downloader_fetch_uri (demux->downloader,
4344       demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4345   if (download) {
4346     g_free (demux->manifest_uri);
4347     g_free (demux->manifest_base_uri);
4348     if (download->redirect_permanent && download->redirect_uri) {
4349       demux->manifest_uri = g_strdup (download->redirect_uri);
4350       demux->manifest_base_uri = NULL;
4351     } else {
4352       demux->manifest_uri = g_strdup (download->uri);
4353       demux->manifest_base_uri = g_strdup (download->redirect_uri);
4354     }
4355 
4356     buffer = gst_fragment_get_buffer (download);
4357     g_object_unref (download);
4358     ret = klass->update_manifest_data (demux, buffer);
4359     gst_buffer_unref (buffer);
4360     /* FIXME: Should the manifest uri vars be reverted to original
4361      * values if updating fails? */
4362   } else {
4363     GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4364         error->message);
4365     ret = GST_FLOW_NOT_LINKED;
4366   }
4367   g_clear_error (&error);
4368 
4369   return ret;
4370 }
4371 
4372 /* must be called with manifest_lock taken */
4373 static GstFlowReturn
gst_adaptive_demux_update_manifest(GstAdaptiveDemux * demux)4374 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4375 {
4376   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4377   GstFlowReturn ret;
4378 
4379   ret = klass->update_manifest (demux);
4380 
4381   if (ret == GST_FLOW_OK) {
4382     GstClockTime duration;
4383     /* Send an updated duration message */
4384     duration = klass->get_duration (demux);
4385     if (duration != GST_CLOCK_TIME_NONE) {
4386       GST_DEBUG_OBJECT (demux,
4387           "Sending duration message : %" GST_TIME_FORMAT,
4388           GST_TIME_ARGS (duration));
4389       gst_element_post_message (GST_ELEMENT (demux),
4390           gst_message_new_duration_changed (GST_OBJECT (demux)));
4391     } else {
4392       GST_DEBUG_OBJECT (demux,
4393           "Duration unknown, can not send the duration message");
4394     }
4395 
4396     /* If a manifest changes it's liveness or periodic updateness, we need
4397      * to start/stop the manifest update task appropriately */
4398     /* Keep this condition in sync with the one in
4399      * gst_adaptive_demux_start_manifest_update_task()
4400      */
4401     if (gst_adaptive_demux_is_live (demux) &&
4402         klass->requires_periodical_playlist_update (demux)) {
4403       gst_adaptive_demux_start_manifest_update_task (demux);
4404     } else {
4405       gst_adaptive_demux_stop_manifest_update_task (demux);
4406     }
4407   }
4408 
4409   return ret;
4410 }
4411 
4412 void
gst_adaptive_demux_stream_fragment_clear(GstAdaptiveDemuxStreamFragment * f)4413 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4414 {
4415   g_free (f->uri);
4416   f->uri = NULL;
4417   f->range_start = 0;
4418   f->range_end = -1;
4419 
4420   g_free (f->header_uri);
4421   f->header_uri = NULL;
4422   f->header_range_start = 0;
4423   f->header_range_end = -1;
4424 
4425   g_free (f->index_uri);
4426   f->index_uri = NULL;
4427   f->index_range_start = 0;
4428   f->index_range_end = -1;
4429 
4430   f->finished = FALSE;
4431 }
4432 
4433 /* must be called with manifest_lock taken */
4434 static gboolean
gst_adaptive_demux_has_next_period(GstAdaptiveDemux * demux)4435 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4436 {
4437   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4438   gboolean ret = FALSE;
4439 
4440   if (klass->has_next_period)
4441     ret = klass->has_next_period (demux);
4442   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4443   return ret;
4444 }
4445 
4446 /* must be called with manifest_lock taken */
4447 static void
gst_adaptive_demux_advance_period(GstAdaptiveDemux * demux)4448 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4449 {
4450   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4451 
4452   g_return_if_fail (klass->advance_period != NULL);
4453 
4454   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4455   klass->advance_period (demux);
4456   gst_adaptive_demux_prepare_streams (demux, FALSE);
4457   gst_adaptive_demux_start_tasks (demux, TRUE);
4458 }
4459 
4460 /**
4461  * gst_adaptive_demux_get_monotonic_time:
4462  * Returns: a monotonically increasing time, using the system realtime clock
4463  */
4464 GstClockTime
gst_adaptive_demux_get_monotonic_time(GstAdaptiveDemux * demux)4465 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4466 {
4467   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4468   return gst_clock_get_time (demux->realtime_clock);
4469 }
4470 
4471 /**
4472  * gst_adaptive_demux_get_client_now_utc:
4473  * @demux: #GstAdaptiveDemux
4474  * Returns: the client's estimate of UTC
4475  *
4476  * Used to find the client's estimate of UTC, using the system realtime clock.
4477  */
4478 GDateTime *
gst_adaptive_demux_get_client_now_utc(GstAdaptiveDemux * demux)4479 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4480 {
4481   GstClockTime rtc_now;
4482   gint64 utc_now;
4483   GTimeVal gtv;
4484 
4485   rtc_now = gst_clock_get_time (demux->realtime_clock);
4486   utc_now = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4487   gtv.tv_sec = utc_now / G_TIME_SPAN_SECOND;
4488   gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
4489   return g_date_time_new_from_timeval_utc (&gtv);
4490 }
4491 
4492 /**
4493  * gst_adaptive_demux_is_running
4494  * @demux: #GstAdaptiveDemux
4495  * Returns: whether the demuxer is processing data
4496  *
4497  * Returns FALSE if shutdown has started (transitioning down from
4498  * PAUSED), otherwise TRUE.
4499  */
4500 gboolean
gst_adaptive_demux_is_running(GstAdaptiveDemux * demux)4501 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4502 {
4503   return g_atomic_int_get (&demux->running);
4504 }
4505 
4506 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_new(GCond * cond,GMutex * mutex)4507 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4508 {
4509   GstAdaptiveDemuxTimer *timer;
4510 
4511   timer = g_slice_new (GstAdaptiveDemuxTimer);
4512   timer->fired = FALSE;
4513   timer->cond = cond;
4514   timer->mutex = mutex;
4515   timer->ref_count = 1;
4516   return timer;
4517 }
4518 
4519 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_ref(GstAdaptiveDemuxTimer * timer)4520 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4521 {
4522   g_return_val_if_fail (timer != NULL, NULL);
4523   g_atomic_int_inc (&timer->ref_count);
4524   return timer;
4525 }
4526 
4527 static void
gst_adaptive_demux_timer_unref(GstAdaptiveDemuxTimer * timer)4528 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4529 {
4530   g_return_if_fail (timer != NULL);
4531   if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4532     g_slice_free (GstAdaptiveDemuxTimer, timer);
4533   }
4534 }
4535 
4536 /* gst_adaptive_demux_wait_until:
4537  * A replacement for g_cond_wait_until that uses the clock rather
4538  * than system time to control the duration of the sleep. Typically
4539  * clock is actually a #GstSystemClock, in which case this function
4540  * behaves exactly like g_cond_wait_until. Inside unit tests,
4541  * the clock is typically a #GstTestClock, which allows tests to run
4542  * in non-realtime.
4543  * This function must be called with mutex held.
4544  */
4545 static gboolean
gst_adaptive_demux_wait_until(GstClock * clock,GCond * cond,GMutex * mutex,GstClockTime end_time)4546 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4547     GstClockTime end_time)
4548 {
4549   GstAdaptiveDemuxTimer *timer;
4550   gboolean fired;
4551   GstClockReturn res;
4552 
4553   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4554     /* for an invalid time, gst_clock_id_wait_async will try to call
4555      * gst_adaptive_demux_clock_callback from the current thread.
4556      * It still holds the mutex while doing that, so it will deadlock.
4557      * g_cond_wait_until would return immediately with false, so we'll do the same.
4558      */
4559     return FALSE;
4560   }
4561   timer = gst_adaptive_demux_timer_new (cond, mutex);
4562   timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4563   res =
4564       gst_clock_id_wait_async (timer->clock_id,
4565       gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4566       (GDestroyNotify) gst_adaptive_demux_timer_unref);
4567   /* clock does not support asynchronously wait. Assert and return */
4568   if (res == GST_CLOCK_UNSUPPORTED) {
4569     gst_clock_id_unref (timer->clock_id);
4570     gst_adaptive_demux_timer_unref (timer);
4571     g_return_val_if_reached (TRUE);
4572   }
4573   g_assert (!timer->fired);
4574   /* the gst_adaptive_demux_clock_callback() will signal the
4575    * cond when the clock's single shot timer fires, or the cond will be
4576    * signalled by another thread that wants to cause this wait to finish
4577    * early (e.g. to terminate the waiting thread).
4578    * There is no need for a while loop here, because that logic is
4579    * implemented by the function calling gst_adaptive_demux_wait_until() */
4580   g_cond_wait (cond, mutex);
4581   fired = timer->fired;
4582   if (!fired)
4583     gst_clock_id_unschedule (timer->clock_id);
4584   gst_clock_id_unref (timer->clock_id);
4585   gst_adaptive_demux_timer_unref (timer);
4586   return !fired;
4587 }
4588 
4589 static gboolean
gst_adaptive_demux_clock_callback(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)4590 gst_adaptive_demux_clock_callback (GstClock * clock,
4591     GstClockTime time, GstClockID id, gpointer user_data)
4592 {
4593   GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4594   g_return_val_if_fail (timer != NULL, FALSE);
4595   g_mutex_lock (timer->mutex);
4596   timer->fired = TRUE;
4597   g_cond_signal (timer->cond);
4598   g_mutex_unlock (timer->mutex);
4599   return TRUE;
4600 }
4601