1 /* GStreamer
2 *
3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 /**
23 * SECTION:gstadaptivedemux
24 * @short_description: Base class for adaptive demuxers
25 * @see_also:
26 *
27 * What is an adaptive demuxer?
28 * Adaptive demuxers are special demuxers in the sense that they don't
29 * actually demux data received from upstream but download the data
30 * themselves.
31 *
32 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
33 * a set of fragments. The manifest describes the available media and
34 * the sequence of fragments to use. Each fragment contains a small
35 * part of the media (typically only a few seconds). It is possible for
36 * the manifest to have the same media available in different configurations
37 * (bitrates for example) so that the client can select the one that
38 * best suits its scenario (network fluctuation, hardware requirements...).
39 * It is possible to switch from one representation of the media to another
40 * during playback. That's why it is called 'adaptive', because it can be
41 * adapted to the client's needs.
42 *
43 * Architectural overview:
44 * The manifest is received by the demuxer in its sink pad and, upon receiving
45 * EOS, it parses the manifest and exposes the streams available in it. For
46 * each stream a source element will be created and will download the list
47 * of fragments one by one. Once a fragment is finished downloading, the next
48 * URI is set to the source element and it starts fetching it and pushing
49 * through the stream's pad. This implies that each stream is independent from
50 * each other as it runs on a separate thread.
51 *
52 * After downloading each fragment, the download rate of it is calculated and
53 * the demuxer has a chance to switch to a different bitrate if needed. The
54 * switch can be done by simply pushing a new caps before the next fragment
55 * when codecs are the same, or by exposing a new pad group if it needs
56 * a codec change.
57 *
58 * Extra features:
59 * - Not linked streams: Streams that are not-linked have their download threads
60 * interrupted to save network bandwidth. When they are
61 * relinked a reconfigure event is received and the
62 * stream is restarted.
63 *
64 * Subclasses:
65 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
66 * about the intrinsics of the subclass formats, so the subclasses are
67 * resposible for maintaining the manifest data structures and stream
68 * information.
69 */
70
71 /*
72 MT safety.
73 The following rules were observed while implementing MT safety in adaptive demux:
74 1. If a variable is accessed from multiple threads and at least one thread
75 writes to it, then all the accesses needs to be done from inside a critical section.
76 2. If thread A wants to join thread B then at the moment it calls gst_task_join
77 it must not hold any mutexes that thread B might take.
78
79 Adaptive demux API can be called from several threads. More, adaptive demux
80 starts some threads to monitor the download of fragments. In order to protect
81 accesses to shared variables (demux and streams) all the API functions that
82 can be run in different threads will need to get a mutex (manifest_lock)
83 when they start and release it when they end. Because some of those functions
84 can indirectly call other API functions (eg they can generate events or messages
85 that are processed in the same thread) the manifest_lock must be recursive.
86
87 The manifest_lock will serialize the public API making access to shared
88 variables safe. But some of these functions will try at some moment to join
89 threads created by adaptive demux, or to change the state of src elements
90 (which will block trying to join the src element streaming thread). Because
91 of rule 2, those functions will need to release the manifest_lock during the
92 call of gst_task_join. During this time they can be interrupted by other API calls.
93 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
94 is called and this will join all threads. In order to prevent interruptions
95 during such period, all the API functions will also use a second lock: api_lock.
96 This will be taken at the beginning of the function and released at the end,
97 but this time this lock will not be temporarily released during join.
98 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
99 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
100 to hold it while joining the threads or changing the src element state. The
101 api_lock will serialise all external requests to adaptive demux. In order to
102 avoid deadlocks, if a function needs to acquire both manifest and api locks,
103 the api_lock will be taken first and the manifest_lock second.
104
105 By using the api_lock a thread is protected against other API calls. But when
106 temporarily dropping the manifest_lock, it will be vulnerable to changes from
107 threads that use only the manifest_lock and not the api_lock. These threads run
108 one of the following functions: gst_adaptive_demux_stream_download_loop,
109 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
110 that all operations during an API call are not impacted by other writes, the
111 above mentioned functions must check a cancelled flag every time they reacquire
112 the manifest_lock. If the flag is set, they must exit immediately, without
113 performing any changes on the shared data. In this way, an API call (eg seek
114 request) can set the cancel flag before releasing the manifest_lock and be sure
115 that the demux object and its streams are not changed by anybody else.
116 */
117
118 #ifdef HAVE_CONFIG_H
119 #include "config.h"
120 #endif
121
122 #include "gstadaptivedemux.h"
123 #include "gst/gst-i18n-plugin.h"
124 #include <gst/base/gstadapter.h>
125
126 GST_DEBUG_CATEGORY (adaptivedemux_debug);
127 #define GST_CAT_DEFAULT adaptivedemux_debug
128
129 #define MAX_DOWNLOAD_ERROR_COUNT 3
130 #define DEFAULT_FAILED_COUNT 3
131 #define DEFAULT_CONNECTION_SPEED 0
132 #define DEFAULT_BITRATE_LIMIT 0.8f
133 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024 /* For safety. Large enough to hold a segment. */
134 #define NUM_LOOKBACK_FRAGMENTS 3
135
136 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
137 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
138 GST_TRACE("Locking from thread %p", g_thread_self()); \
139 g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
140 GST_TRACE("Locked from thread %p", g_thread_self()); \
141 } G_STMT_END
142
143 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
144 GST_TRACE("Unlocking from thread %p", g_thread_self()); \
145 g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
146 } G_STMT_END
147
148 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
149 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
150 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
151
152 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
153 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
154 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
155
156 enum
157 {
158 PROP_0,
159 PROP_CONNECTION_SPEED,
160 PROP_BITRATE_LIMIT,
161 PROP_LAST
162 };
163
164 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
165 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
166
167 struct _GstAdaptiveDemuxPrivate
168 {
169 GstAdapter *input_adapter; /* protected by manifest_lock */
170 gint have_manifest; /* MT safe */
171
172 GList *old_streams; /* protected by manifest_lock */
173
174 GstTask *updates_task; /* MT safe */
175 GRecMutex updates_lock;
176 GMutex updates_timed_lock;
177 GCond updates_timed_cond; /* protected by updates_timed_lock */
178 gboolean stop_updates_task; /* protected by updates_timed_lock */
179
180 /* used only from updates_task, no need to protect it */
181 gint update_failed_count;
182
183 guint32 segment_seqnum; /* protected by manifest_lock */
184
185 /* main lock used to protect adaptive demux and all its streams.
186 * It serializes the adaptive demux public API.
187 */
188 GRecMutex manifest_lock;
189
190 /* condition to wait for manifest updates on a live stream.
191 * In order to signal the manifest_cond, the caller needs to hold both
192 * manifest_lock and manifest_update_lock (taken in this order)
193 */
194 GCond manifest_cond;
195 GMutex manifest_update_lock;
196
197 /* Lock and condition for prerolling streams before exposing */
198 GMutex preroll_lock;
199 GCond preroll_cond;
200 gint preroll_pending;
201
202 GMutex api_lock;
203
204 /* Protects demux and stream segment information
205 * Needed because seeks can update segment information
206 * without needing to stop tasks when they just want to
207 * update the segment boundaries */
208 GMutex segment_lock;
209 };
210
211 typedef struct _GstAdaptiveDemuxTimer
212 {
213 volatile gint ref_count;
214 GCond *cond;
215 GMutex *mutex;
216 GstClockID clock_id;
217 gboolean fired;
218 } GstAdaptiveDemuxTimer;
219
220 static GstBinClass *parent_class = NULL;
221 static gint private_offset = 0;
222
223 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
224 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
225 GstAdaptiveDemuxClass * klass);
226 static void gst_adaptive_demux_finalize (GObject * object);
227 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
228 element, GstStateChange transition);
229
230 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
231
232 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
233 GstEvent * event);
234 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
235 GstObject * parent, GstBuffer * buffer);
236 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
237 GstQuery * query);
238 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
239 GstEvent * event);
240
241 static gboolean
242 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
243
244 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
245 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
246 stream);
247 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
248 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
249 gboolean first_and_live);
250 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
251 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
252 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
253 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
254 GstClockTime ts, GstClockTime * final_ts);
255 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
256 demux, GstAdaptiveDemuxStream * stream);
257 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
258 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
259 static GstFlowReturn
260 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
261 GstAdaptiveDemuxStream * stream);
262 static gint64
263 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
264 GstAdaptiveDemuxStream * stream);
265 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
266 demux);
267 static GstFlowReturn
268 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
269 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
270 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
271
272 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
273 static GstFlowReturn
274 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
275 GstEvent * event);
276
277 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
278 demux);
279 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
280 demux);
281
282 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
283 gboolean start_preroll_streams);
284 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
285 gboolean stop_updates);
286 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
287 demux);
288 static void
289 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
290 stream, GstFlowReturn ret, GError * err);
291 static GstFlowReturn
292 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
293 GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
294 static GstFlowReturn
295 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
296 GstAdaptiveDemuxStream * stream);
297 static GstFlowReturn
298 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
299 GstAdaptiveDemuxStream * stream, GstClockTime duration);
300 static gboolean
301 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
302 GstClockTime end_time);
303 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
304 GstClockTime time, GstClockID id, gpointer user_data);
305 static gboolean
306 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
307 * demux);
308
309 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
310 * method to get to the padtemplates */
311 GType
gst_adaptive_demux_get_type(void)312 gst_adaptive_demux_get_type (void)
313 {
314 static volatile gsize type = 0;
315
316 if (g_once_init_enter (&type)) {
317 GType _type;
318 static const GTypeInfo info = {
319 sizeof (GstAdaptiveDemuxClass),
320 NULL,
321 NULL,
322 (GClassInitFunc) gst_adaptive_demux_class_init,
323 NULL,
324 NULL,
325 sizeof (GstAdaptiveDemux),
326 0,
327 (GInstanceInitFunc) gst_adaptive_demux_init,
328 };
329
330 _type = g_type_register_static (GST_TYPE_BIN,
331 "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
332
333 private_offset =
334 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
335
336 g_once_init_leave (&type, _type);
337 }
338 return type;
339 }
340
341 static inline GstAdaptiveDemuxPrivate *
gst_adaptive_demux_get_instance_private(GstAdaptiveDemux * self)342 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
343 {
344 return (G_STRUCT_MEMBER_P (self, private_offset));
345 }
346
347 static void
gst_adaptive_demux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)348 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
349 const GValue * value, GParamSpec * pspec)
350 {
351 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
352
353 GST_API_LOCK (demux);
354 GST_MANIFEST_LOCK (demux);
355
356 switch (prop_id) {
357 case PROP_CONNECTION_SPEED:
358 demux->connection_speed = g_value_get_uint (value) * 1000;
359 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
360 demux->connection_speed);
361 break;
362 case PROP_BITRATE_LIMIT:
363 demux->bitrate_limit = g_value_get_float (value);
364 break;
365 default:
366 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
367 break;
368 }
369
370 GST_MANIFEST_UNLOCK (demux);
371 GST_API_UNLOCK (demux);
372 }
373
374 static void
gst_adaptive_demux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)375 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
376 GValue * value, GParamSpec * pspec)
377 {
378 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
379
380 GST_MANIFEST_LOCK (demux);
381
382 switch (prop_id) {
383 case PROP_CONNECTION_SPEED:
384 g_value_set_uint (value, demux->connection_speed / 1000);
385 break;
386 case PROP_BITRATE_LIMIT:
387 g_value_set_float (value, demux->bitrate_limit);
388 break;
389 default:
390 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
391 break;
392 }
393
394 GST_MANIFEST_UNLOCK (demux);
395 }
396
397 static void
gst_adaptive_demux_class_init(GstAdaptiveDemuxClass * klass)398 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
399 {
400 GObjectClass *gobject_class;
401 GstElementClass *gstelement_class;
402 GstBinClass *gstbin_class;
403
404 gobject_class = G_OBJECT_CLASS (klass);
405 gstelement_class = GST_ELEMENT_CLASS (klass);
406 gstbin_class = GST_BIN_CLASS (klass);
407
408 GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
409 "Base Adaptive Demux");
410
411 parent_class = g_type_class_peek_parent (klass);
412
413 if (private_offset != 0)
414 g_type_class_adjust_private_offset (klass, &private_offset);
415
416 gobject_class->set_property = gst_adaptive_demux_set_property;
417 gobject_class->get_property = gst_adaptive_demux_get_property;
418 gobject_class->finalize = gst_adaptive_demux_finalize;
419
420 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
421 g_param_spec_uint ("connection-speed", "Connection Speed",
422 "Network connection speed in kbps (0 = calculate from downloaded"
423 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
424 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
425
426 /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
427 g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
428 g_param_spec_float ("bitrate-limit",
429 "Bitrate limit in %",
430 "Limit of the available bitrate to use when switching to alternates.",
431 0, 1, DEFAULT_BITRATE_LIMIT,
432 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433
434 gstelement_class->change_state = gst_adaptive_demux_change_state;
435
436 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
437
438 klass->data_received = gst_adaptive_demux_stream_data_received_default;
439 klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
440 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
441 klass->requires_periodical_playlist_update =
442 gst_adaptive_demux_requires_periodical_playlist_update_default;
443
444 }
445
446 static void
gst_adaptive_demux_init(GstAdaptiveDemux * demux,GstAdaptiveDemuxClass * klass)447 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
448 GstAdaptiveDemuxClass * klass)
449 {
450 GstPadTemplate *pad_template;
451 GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
452 GObjectClass *gobject_class;
453
454 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
455
456 demux->priv = gst_adaptive_demux_get_instance_private (demux);
457 demux->priv->input_adapter = gst_adapter_new ();
458 demux->downloader = gst_uri_downloader_new ();
459 gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
460 demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
461 demux->priv->segment_seqnum = gst_util_seqnum_next ();
462 demux->have_group_id = FALSE;
463 demux->group_id = G_MAXUINT;
464
465 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
466
467 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
468 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
469
470 demux->realtime_clock = gst_system_clock_obtain ();
471 g_assert (demux->realtime_clock != NULL);
472 gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
473 if (g_object_class_find_property (gobject_class, "clock-type")) {
474 g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
475 } else {
476 GST_WARNING_OBJECT (demux,
477 "System clock does not have clock-type property");
478 }
479 if (clock_type == GST_CLOCK_TYPE_REALTIME) {
480 demux->clock_offset = 0;
481 } else {
482 GDateTime *utc_now;
483 GstClockTime rtc_now;
484 GTimeVal gtv;
485
486 utc_now = g_date_time_new_now_utc ();
487 rtc_now = gst_clock_get_time (demux->realtime_clock);
488 g_date_time_to_timeval (utc_now, >v);
489 demux->clock_offset =
490 gtv.tv_sec * G_TIME_SPAN_SECOND + gtv.tv_usec -
491 GST_TIME_AS_USECONDS (rtc_now);
492 g_date_time_unref (utc_now);
493 }
494 g_rec_mutex_init (&demux->priv->updates_lock);
495 demux->priv->updates_task =
496 gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
497 demux, NULL);
498 gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
499
500 g_mutex_init (&demux->priv->updates_timed_lock);
501 g_cond_init (&demux->priv->updates_timed_cond);
502
503 g_cond_init (&demux->priv->manifest_cond);
504 g_mutex_init (&demux->priv->manifest_update_lock);
505
506 g_rec_mutex_init (&demux->priv->manifest_lock);
507 g_mutex_init (&demux->priv->api_lock);
508 g_mutex_init (&demux->priv->segment_lock);
509
510 g_cond_init (&demux->priv->preroll_cond);
511 g_mutex_init (&demux->priv->preroll_lock);
512
513 pad_template =
514 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
515 g_return_if_fail (pad_template != NULL);
516
517 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
518 gst_pad_set_event_function (demux->sinkpad,
519 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
520 gst_pad_set_chain_function (demux->sinkpad,
521 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
522
523 /* Properties */
524 demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
525 demux->connection_speed = DEFAULT_CONNECTION_SPEED;
526
527 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
528 }
529
530 static void
gst_adaptive_demux_finalize(GObject * object)531 gst_adaptive_demux_finalize (GObject * object)
532 {
533 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
534 GstAdaptiveDemuxPrivate *priv = demux->priv;
535
536 GST_DEBUG_OBJECT (object, "finalize");
537
538 g_object_unref (priv->input_adapter);
539 g_object_unref (demux->downloader);
540
541 g_mutex_clear (&priv->updates_timed_lock);
542 g_cond_clear (&priv->updates_timed_cond);
543 g_mutex_clear (&demux->priv->manifest_update_lock);
544 g_cond_clear (&demux->priv->manifest_cond);
545 g_object_unref (priv->updates_task);
546 g_rec_mutex_clear (&priv->updates_lock);
547 g_rec_mutex_clear (&demux->priv->manifest_lock);
548 g_mutex_clear (&demux->priv->api_lock);
549 g_mutex_clear (&demux->priv->segment_lock);
550 if (demux->realtime_clock) {
551 gst_object_unref (demux->realtime_clock);
552 demux->realtime_clock = NULL;
553 }
554
555 g_cond_clear (&demux->priv->preroll_cond);
556 g_mutex_clear (&demux->priv->preroll_lock);
557
558 G_OBJECT_CLASS (parent_class)->finalize (object);
559 }
560
561 static GstStateChangeReturn
gst_adaptive_demux_change_state(GstElement * element,GstStateChange transition)562 gst_adaptive_demux_change_state (GstElement * element,
563 GstStateChange transition)
564 {
565 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
566 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
567
568 switch (transition) {
569 case GST_STATE_CHANGE_PAUSED_TO_READY:
570 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
571 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
572 gst_uri_downloader_cancel (demux->downloader);
573
574 GST_API_LOCK (demux);
575 GST_MANIFEST_LOCK (demux);
576 gst_adaptive_demux_reset (demux);
577 GST_MANIFEST_UNLOCK (demux);
578 GST_API_UNLOCK (demux);
579 break;
580 case GST_STATE_CHANGE_READY_TO_PAUSED:
581 GST_API_LOCK (demux);
582 GST_MANIFEST_LOCK (demux);
583 gst_adaptive_demux_reset (demux);
584 /* Clear "cancelled" flag in uridownloader since subclass might want to
585 * use uridownloader to fetch another manifest */
586 gst_uri_downloader_reset (demux->downloader);
587 if (g_atomic_int_get (&demux->priv->have_manifest))
588 gst_adaptive_demux_start_manifest_update_task (demux);
589 GST_MANIFEST_UNLOCK (demux);
590 GST_API_UNLOCK (demux);
591 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
592 GST_DEBUG_OBJECT (demux, "demuxer has started running");
593 break;
594 default:
595 break;
596 }
597
598 /* this must be run without MANIFEST_LOCK taken.
599 * For PLAYING to PLAYING state changes, it will want to take a lock in
600 * src element and that lock is held while the streaming thread is running.
601 * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
602 */
603 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
604
605 return result;
606 }
607
608 static gboolean
gst_adaptive_demux_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)609 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
610 GstEvent * event)
611 {
612 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
613 gboolean ret;
614
615 switch (event->type) {
616 case GST_EVENT_FLUSH_STOP:{
617 GST_API_LOCK (demux);
618 GST_MANIFEST_LOCK (demux);
619
620 gst_adaptive_demux_reset (demux);
621
622 ret = gst_pad_event_default (pad, parent, event);
623
624 GST_MANIFEST_UNLOCK (demux);
625 GST_API_UNLOCK (demux);
626
627 return ret;
628 }
629 case GST_EVENT_EOS:{
630 GstAdaptiveDemuxClass *demux_class;
631 GstQuery *query;
632 gboolean query_res;
633 gboolean ret = TRUE;
634 gsize available;
635 GstBuffer *manifest_buffer;
636
637 GST_API_LOCK (demux);
638 GST_MANIFEST_LOCK (demux);
639
640 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
641
642 available = gst_adapter_available (demux->priv->input_adapter);
643
644 if (available == 0) {
645 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
646 ret = gst_pad_event_default (pad, parent, event);
647
648 GST_MANIFEST_UNLOCK (demux);
649 GST_API_UNLOCK (demux);
650
651 return ret;
652 }
653
654 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
655
656 /* Need to get the URI to use it as a base to generate the fragment's
657 * uris */
658 query = gst_query_new_uri ();
659 query_res = gst_pad_peer_query (pad, query);
660 if (query_res) {
661 gchar *uri, *redirect_uri;
662 gboolean permanent;
663
664 gst_query_parse_uri (query, &uri);
665 gst_query_parse_uri_redirection (query, &redirect_uri);
666 gst_query_parse_uri_redirection_permanent (query, &permanent);
667
668 if (permanent && redirect_uri) {
669 demux->manifest_uri = redirect_uri;
670 demux->manifest_base_uri = NULL;
671 g_free (uri);
672 } else {
673 demux->manifest_uri = uri;
674 demux->manifest_base_uri = redirect_uri;
675 }
676
677 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
678 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
679 } else {
680 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
681 }
682 gst_query_unref (query);
683
684 /* Let the subclass parse the manifest */
685 manifest_buffer =
686 gst_adapter_take_buffer (demux->priv->input_adapter, available);
687 if (!demux_class->process_manifest (demux, manifest_buffer)) {
688 /* In most cases, this will happen if we set a wrong url in the
689 * source element and we have received the 404 HTML response instead of
690 * the manifest */
691 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
692 (NULL));
693 ret = FALSE;
694 } else {
695 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
696 }
697 gst_buffer_unref (manifest_buffer);
698
699 gst_element_post_message (GST_ELEMENT_CAST (demux),
700 gst_message_new_element (GST_OBJECT_CAST (demux),
701 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
702 "manifest-uri", G_TYPE_STRING,
703 demux->manifest_uri, "uri", G_TYPE_STRING,
704 demux->manifest_uri,
705 "manifest-download-start", GST_TYPE_CLOCK_TIME,
706 GST_CLOCK_TIME_NONE,
707 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
708 gst_util_get_timestamp (), NULL)));
709
710 if (ret) {
711 /* Send duration message */
712 if (!gst_adaptive_demux_is_live (demux)) {
713 GstClockTime duration = demux_class->get_duration (demux);
714
715 if (duration != GST_CLOCK_TIME_NONE) {
716 GST_DEBUG_OBJECT (demux,
717 "Sending duration message : %" GST_TIME_FORMAT,
718 GST_TIME_ARGS (duration));
719 gst_element_post_message (GST_ELEMENT (demux),
720 gst_message_new_duration_changed (GST_OBJECT (demux)));
721 } else {
722 GST_DEBUG_OBJECT (demux,
723 "media duration unknown, can not send the duration message");
724 }
725 }
726
727 if (demux->next_streams) {
728 gst_adaptive_demux_prepare_streams (demux,
729 gst_adaptive_demux_is_live (demux));
730 gst_adaptive_demux_start_tasks (demux, TRUE);
731 gst_adaptive_demux_start_manifest_update_task (demux);
732 } else {
733 /* no streams */
734 GST_WARNING_OBJECT (demux, "No streams created from manifest");
735 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
736 (_("This file contains no playable streams.")),
737 ("No known stream formats found at the Manifest"));
738 ret = FALSE;
739 }
740
741 }
742 GST_MANIFEST_UNLOCK (demux);
743 GST_API_UNLOCK (demux);
744
745 gst_event_unref (event);
746 return ret;
747 }
748 case GST_EVENT_SEGMENT:
749 /* Swallow newsegments, we'll push our own */
750 gst_event_unref (event);
751 return TRUE;
752 default:
753 break;
754 }
755
756 return gst_pad_event_default (pad, parent, event);
757 }
758
759 static GstFlowReturn
gst_adaptive_demux_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)760 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
761 GstBuffer * buffer)
762 {
763 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
764
765 GST_MANIFEST_LOCK (demux);
766
767 gst_adapter_push (demux->priv->input_adapter, buffer);
768
769 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
770 (gint) gst_adapter_available (demux->priv->input_adapter));
771
772 GST_MANIFEST_UNLOCK (demux);
773 return GST_FLOW_OK;
774 }
775
776 /* must be called with manifest_lock taken */
777 static void
gst_adaptive_demux_reset(GstAdaptiveDemux * demux)778 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
779 {
780 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
781 GList *iter;
782 GList *old_streams;
783 GstEvent *eos;
784
785 /* take ownership of old_streams before releasing the manifest_lock in
786 * gst_adaptive_demux_stop_tasks
787 */
788 old_streams = demux->priv->old_streams;
789 demux->priv->old_streams = NULL;
790
791 gst_adaptive_demux_stop_tasks (demux, TRUE);
792
793 if (klass->reset)
794 klass->reset (demux);
795
796 eos = gst_event_new_eos ();
797 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
798 GstAdaptiveDemuxStream *stream = iter->data;
799 if (stream->pad) {
800 gst_pad_push_event (stream->pad, gst_event_ref (eos));
801 gst_pad_set_active (stream->pad, FALSE);
802
803 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
804 }
805 gst_adaptive_demux_stream_free (stream);
806 }
807 gst_event_unref (eos);
808 g_list_free (demux->streams);
809 demux->streams = NULL;
810 if (demux->prepared_streams) {
811 g_list_free_full (demux->prepared_streams,
812 (GDestroyNotify) gst_adaptive_demux_stream_free);
813 demux->prepared_streams = NULL;
814 }
815 if (demux->next_streams) {
816 g_list_free_full (demux->next_streams,
817 (GDestroyNotify) gst_adaptive_demux_stream_free);
818 demux->next_streams = NULL;
819 }
820
821 if (old_streams) {
822 g_list_free_full (old_streams,
823 (GDestroyNotify) gst_adaptive_demux_stream_free);
824 }
825
826 if (demux->priv->old_streams) {
827 g_list_free_full (demux->priv->old_streams,
828 (GDestroyNotify) gst_adaptive_demux_stream_free);
829 demux->priv->old_streams = NULL;
830 }
831
832 g_free (demux->manifest_uri);
833 g_free (demux->manifest_base_uri);
834 demux->manifest_uri = NULL;
835 demux->manifest_base_uri = NULL;
836
837 gst_adapter_clear (demux->priv->input_adapter);
838 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
839
840 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
841
842 demux->have_group_id = FALSE;
843 demux->group_id = G_MAXUINT;
844 demux->priv->segment_seqnum = gst_util_seqnum_next ();
845 }
846
847 static void
gst_adaptive_demux_handle_message(GstBin * bin,GstMessage * msg)848 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
849 {
850 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
851
852 switch (GST_MESSAGE_TYPE (msg)) {
853 case GST_MESSAGE_ERROR:{
854 GList *iter;
855 GstAdaptiveDemuxStream *stream = NULL;
856 GError *err = NULL;
857 gchar *debug = NULL;
858 gchar *new_error = NULL;
859 const GstStructure *details = NULL;
860
861 GST_MANIFEST_LOCK (demux);
862
863 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
864 GstAdaptiveDemuxStream *cur = iter->data;
865 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
866 GST_OBJECT_CAST (cur->src))) {
867 stream = cur;
868 break;
869 }
870 }
871 if (stream == NULL) {
872 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
873 GstAdaptiveDemuxStream *cur = iter->data;
874 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
875 GST_OBJECT_CAST (cur->src))) {
876 stream = cur;
877 break;
878 }
879 }
880 if (stream == NULL) {
881 GST_WARNING_OBJECT (demux,
882 "Failed to locate stream for errored element");
883 break;
884 }
885 }
886
887 gst_message_parse_error (msg, &err, &debug);
888
889 GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
890 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
891 err->message, debug);
892
893 if (debug)
894 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
895 if (new_error) {
896 g_free (err->message);
897 err->message = new_error;
898 }
899
900 gst_message_parse_error_details (msg, &details);
901 if (details) {
902 gst_structure_get_uint (details, "http-status-code",
903 &stream->last_status_code);
904 }
905
906 /* error, but ask to retry */
907 gst_adaptive_demux_stream_fragment_download_finish (stream,
908 GST_FLOW_CUSTOM_ERROR, err);
909
910 g_error_free (err);
911 g_free (debug);
912
913 GST_MANIFEST_UNLOCK (demux);
914
915 gst_message_unref (msg);
916 msg = NULL;
917 }
918 break;
919 default:
920 break;
921 }
922
923 if (msg)
924 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
925 }
926
927 void
gst_adaptive_demux_set_stream_struct_size(GstAdaptiveDemux * demux,gsize struct_size)928 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
929 gsize struct_size)
930 {
931 GST_API_LOCK (demux);
932 GST_MANIFEST_LOCK (demux);
933 demux->stream_struct_size = struct_size;
934 GST_MANIFEST_UNLOCK (demux);
935 GST_API_UNLOCK (demux);
936 }
937
938 /* must be called with manifest_lock taken */
939 static gboolean
gst_adaptive_demux_prepare_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)940 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
941 GstAdaptiveDemuxStream * stream)
942 {
943 GstPad *pad = stream->pad;
944 gchar *name = gst_pad_get_name (pad);
945 GstEvent *event;
946 gchar *stream_id;
947
948 gst_pad_set_active (pad, TRUE);
949 stream->need_header = TRUE;
950
951 stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
952
953 event =
954 gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
955 GST_EVENT_STREAM_START, 0);
956 if (event) {
957 if (gst_event_parse_group_id (event, &demux->group_id))
958 demux->have_group_id = TRUE;
959 else
960 demux->have_group_id = FALSE;
961 gst_event_unref (event);
962 } else if (!demux->have_group_id) {
963 demux->have_group_id = TRUE;
964 demux->group_id = gst_util_group_id_next ();
965 }
966 event = gst_event_new_stream_start (stream_id);
967 if (demux->have_group_id)
968 gst_event_set_group_id (event, demux->group_id);
969
970 gst_pad_push_event (pad, event);
971 g_free (stream_id);
972 g_free (name);
973
974 GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
975
976 stream->discont = TRUE;
977
978 return TRUE;
979 }
980
981 static gboolean
gst_adaptive_demux_expose_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)982 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
983 GstAdaptiveDemuxStream * stream)
984 {
985 gboolean ret;
986 GstPad *pad = stream->pad;
987 GstCaps *caps;
988
989 if (stream->pending_caps) {
990 gst_pad_set_caps (pad, stream->pending_caps);
991 caps = stream->pending_caps;
992 stream->pending_caps = NULL;
993 } else {
994 caps = gst_pad_get_current_caps (pad);
995 }
996
997 GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
998 GST_DEBUG_PAD_NAME (pad), caps);
999 if (caps)
1000 gst_caps_unref (caps);
1001
1002 gst_object_ref (pad);
1003
1004 /* Don't hold the manifest lock while exposing a pad */
1005 GST_MANIFEST_UNLOCK (demux);
1006 ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1007 GST_MANIFEST_LOCK (demux);
1008
1009 return ret;
1010 }
1011
1012 /* must be called with manifest_lock taken */
1013 static GstClockTime
gst_adaptive_demux_stream_get_presentation_offset(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1014 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1015 GstAdaptiveDemuxStream * stream)
1016 {
1017 GstAdaptiveDemuxClass *klass;
1018
1019 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1020
1021 if (klass->get_presentation_offset == NULL)
1022 return 0;
1023
1024 return klass->get_presentation_offset (demux, stream);
1025 }
1026
1027 /* must be called with manifest_lock taken */
1028 static GstClockTime
gst_adaptive_demux_get_period_start_time(GstAdaptiveDemux * demux)1029 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1030 {
1031 GstAdaptiveDemuxClass *klass;
1032
1033 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1034
1035 if (klass->get_period_start_time == NULL)
1036 return 0;
1037
1038 return klass->get_period_start_time (demux);
1039 }
1040
1041 /* must be called with manifest_lock taken */
1042 static gboolean
gst_adaptive_demux_prepare_streams(GstAdaptiveDemux * demux,gboolean first_and_live)1043 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1044 gboolean first_and_live)
1045 {
1046 GList *iter;
1047 GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1048
1049 g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1050 if (demux->prepared_streams != NULL) {
1051 /* Old streams that were never exposed, due to a seek or so */
1052 GST_FIXME_OBJECT (demux,
1053 "Preparing new streams without cleaning up old ones!");
1054 return FALSE;
1055 }
1056
1057 demux->prepared_streams = demux->next_streams;
1058 demux->next_streams = NULL;
1059
1060 if (!gst_adaptive_demux_is_running (demux)) {
1061 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1062 return TRUE;
1063 }
1064
1065 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1066 GstAdaptiveDemuxStream *stream = iter->data;
1067
1068 stream->do_block = TRUE;
1069
1070 if (!gst_adaptive_demux_prepare_stream (demux,
1071 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1072 /* TODO act on error */
1073 GST_FIXME_OBJECT (stream->pad,
1074 "Do something on failure to expose stream");
1075 }
1076
1077 if (first_and_live) {
1078 /* TODO we only need the first timestamp, maybe create a simple function to
1079 * get the current PTS of a fragment ? */
1080 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1081 gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1082
1083 if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1084 min_pts = MIN (min_pts, stream->fragment.timestamp);
1085 } else {
1086 min_pts = stream->fragment.timestamp;
1087 }
1088 }
1089 }
1090
1091 period_start = gst_adaptive_demux_get_period_start_time (demux);
1092
1093 /* For live streams, the subclass is supposed to seek to the current
1094 * fragment and then tell us its timestamp in stream->fragment.timestamp.
1095 * We now also have to seek our demuxer segment to reflect this.
1096 *
1097 * FIXME: This needs some refactoring at some point.
1098 */
1099 if (first_and_live) {
1100 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1101 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1102 GST_SEEK_TYPE_NONE, -1, NULL);
1103 }
1104
1105 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1106 GstAdaptiveDemuxStream *stream = iter->data;
1107 GstClockTime offset;
1108
1109 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1110 stream->segment = demux->segment;
1111
1112 /* The demuxer segment is just built from seek events, but for each stream
1113 * we have to adjust segments according to the current period and the
1114 * stream specific presentation time offset.
1115 *
1116 * For each period, buffer timestamps start again from 0. Additionally the
1117 * buffer timestamps are shifted by the stream specific presentation time
1118 * offset, so the first buffer timestamp of a period is 0 + presentation
1119 * time offset. If the stream contains timestamps itself, this is also
1120 * supposed to be the presentation time stored inside the stream.
1121 *
1122 * The stream time over periods is supposed to be continuous, that is the
1123 * buffer timestamp 0 + presentation time offset should map to the start
1124 * time of the current period.
1125 *
1126 *
1127 * The adjustment of the stream segments as such works the following.
1128 *
1129 * If the demuxer segment start is bigger than the period start, this
1130 * means that we have to drop some media at the beginning of the current
1131 * period, e.g. because a seek into the middle of the period has
1132 * happened. The amount of media to drop is the difference between the
1133 * period start and the demuxer segment start, and as each period starts
1134 * again from 0, this difference is going to be the actual stream's
1135 * segment start. As all timestamps of the stream are shifted by the
1136 * presentation time offset, we will also have to move the segment start
1137 * by that offset.
1138 *
1139 * Likewise, the demuxer segment stop value is adjusted in the same
1140 * fashion.
1141 *
1142 * Now the running time and stream time at the stream's segment start has
1143 * to be the one that is stored inside the demuxer's segment, which means
1144 * that segment.base and segment.time have to be copied over (done just
1145 * above)
1146 *
1147 *
1148 * If the demuxer segment start is smaller than the period start time,
1149 * this means that the whole period is inside the segment. As each period
1150 * starts timestamps from 0, and additionally timestamps are shifted by
1151 * the presentation time offset, the stream's first timestamp (and as such
1152 * the stream's segment start) has to be the presentation time offset.
1153 * The stream time at the segment start is supposed to be the stream time
1154 * of the period start according to the demuxer segment, so the stream
1155 * segment's time would be set to that. The same goes for the stream
1156 * segment's base, which is supposed to be the running time of the period
1157 * start according to the demuxer's segment.
1158 *
1159 * The same logic applies for negative rates with the segment stop and
1160 * the period stop time (which gets clamped).
1161 *
1162 *
1163 * For the first case where not the complete period is inside the segment,
1164 * the segment time and base as calculated by the second case would be
1165 * equivalent.
1166 */
1167 GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1168 &demux->segment);
1169 GST_DEBUG_OBJECT (demux,
1170 "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1171 GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1172 /* note for readers:
1173 * Since stream->segment is initially a copy of demux->segment,
1174 * only the values that need updating are modified below. */
1175 if (first_and_live) {
1176 /* If first and live, demuxer did seek to the current position already */
1177 stream->segment.start = demux->segment.start - period_start + offset;
1178 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1179 stream->segment.stop = demux->segment.stop - period_start + offset;
1180 /* FIXME : Do we need to handle negative rates for this ? */
1181 stream->segment.position = stream->segment.start;
1182 } else if (demux->segment.start > period_start) {
1183 /* seek within a period */
1184 stream->segment.start = demux->segment.start - period_start + offset;
1185 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1186 stream->segment.stop = demux->segment.stop - period_start + offset;
1187 if (stream->segment.rate >= 0)
1188 stream->segment.position = offset;
1189 else
1190 stream->segment.position = stream->segment.stop;
1191 } else {
1192 stream->segment.start = offset;
1193 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1194 stream->segment.stop = demux->segment.stop - period_start + offset;
1195 if (stream->segment.rate >= 0)
1196 stream->segment.position = offset;
1197 else
1198 stream->segment.position = stream->segment.stop;
1199 stream->segment.time =
1200 gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1201 period_start);
1202 stream->segment.base =
1203 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1204 period_start);
1205 }
1206
1207 stream->pending_segment = gst_event_new_segment (&stream->segment);
1208 gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1209 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1210
1211 GST_DEBUG_OBJECT (demux,
1212 "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1213 &stream->segment, stream);
1214 }
1215
1216 return TRUE;
1217 }
1218
1219 static gboolean
gst_adaptive_demux_expose_streams(GstAdaptiveDemux * demux)1220 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1221 {
1222 GList *iter;
1223 GList *old_streams;
1224
1225 g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1226
1227 old_streams = demux->streams;
1228 demux->streams = demux->prepared_streams;
1229 demux->prepared_streams = NULL;
1230
1231 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1232 GstAdaptiveDemuxStream *stream = iter->data;
1233
1234 if (!gst_adaptive_demux_expose_stream (demux,
1235 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1236 /* TODO act on error */
1237 }
1238 }
1239 demux->priv->preroll_pending = 0;
1240
1241 GST_MANIFEST_UNLOCK (demux);
1242 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1243 GST_MANIFEST_LOCK (demux);
1244
1245 if (old_streams) {
1246 GstEvent *eos = gst_event_new_eos ();
1247
1248 /* before we put streams in the demux->priv->old_streams list,
1249 * we ask the download task to stop. In this way, it will no longer be
1250 * allowed to change the demux object.
1251 */
1252 for (iter = old_streams; iter; iter = g_list_next (iter)) {
1253 GstAdaptiveDemuxStream *stream = iter->data;
1254 GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1255
1256 GST_MANIFEST_UNLOCK (demux);
1257
1258 GST_DEBUG_OBJECT (pad, "Pushing EOS");
1259 gst_pad_push_event (pad, gst_event_ref (eos));
1260 gst_pad_set_active (pad, FALSE);
1261
1262 GST_LOG_OBJECT (pad, "Removing stream");
1263 gst_element_remove_pad (GST_ELEMENT (demux), pad);
1264 GST_MANIFEST_LOCK (demux);
1265
1266 gst_object_unref (GST_OBJECT (pad));
1267
1268 /* ask the download task to stop.
1269 * We will not join it now, because our thread can be one of these tasks.
1270 * We will do the joining later, from another stream download task or
1271 * from gst_adaptive_demux_stop_tasks.
1272 * We also cannot change the state of the stream->src element, because
1273 * that will wait on the streaming thread (which could be this thread)
1274 * to stop first.
1275 * Because we sent an EOS to the downstream element, the stream->src
1276 * element should detect this in its streaming task and stop.
1277 * Even if it doesn't do that, we will change its state later in
1278 * gst_adaptive_demux_stop_tasks.
1279 */
1280 GST_LOG_OBJECT (stream, "Marking stream as cancelled");
1281 gst_task_stop (stream->download_task);
1282 g_mutex_lock (&stream->fragment_download_lock);
1283 stream->cancelled = TRUE;
1284 stream->replaced = TRUE;
1285 g_cond_signal (&stream->fragment_download_cond);
1286 g_mutex_unlock (&stream->fragment_download_lock);
1287 }
1288 gst_event_unref (eos);
1289
1290 /* The list should be freed from another thread as we can't properly
1291 * cleanup a GstTask from itself */
1292 demux->priv->old_streams =
1293 g_list_concat (demux->priv->old_streams, old_streams);
1294 }
1295
1296 /* Unblock after removing oldstreams */
1297 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1298 GstAdaptiveDemuxStream *stream = iter->data;
1299 stream->do_block = FALSE;
1300 }
1301
1302 GST_DEBUG_OBJECT (demux, "All streams are exposed");
1303
1304 return TRUE;
1305 }
1306
1307 /* must be called with manifest_lock taken */
1308 GstAdaptiveDemuxStream *
gst_adaptive_demux_stream_new(GstAdaptiveDemux * demux,GstPad * pad)1309 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1310 {
1311 GstAdaptiveDemuxStream *stream;
1312
1313 stream = g_malloc0 (demux->stream_struct_size);
1314
1315 /* Downloading task */
1316 g_rec_mutex_init (&stream->download_lock);
1317 stream->download_task =
1318 gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1319 stream, NULL);
1320 gst_task_set_lock (stream->download_task, &stream->download_lock);
1321
1322 stream->pad = pad;
1323 stream->demux = demux;
1324 stream->fragment_bitrates =
1325 g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1326 gst_pad_set_element_private (pad, stream);
1327 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1328
1329 g_mutex_lock (&demux->priv->preroll_lock);
1330 stream->do_block = TRUE;
1331 demux->priv->preroll_pending++;
1332 g_mutex_unlock (&demux->priv->preroll_lock);
1333
1334 gst_pad_set_query_function (pad,
1335 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1336 gst_pad_set_event_function (pad,
1337 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1338
1339 gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1340 g_cond_init (&stream->fragment_download_cond);
1341 g_mutex_init (&stream->fragment_download_lock);
1342
1343 demux->next_streams = g_list_append (demux->next_streams, stream);
1344
1345 return stream;
1346 }
1347
1348 GstAdaptiveDemuxStream *
gst_adaptive_demux_find_stream_for_pad(GstAdaptiveDemux * demux,GstPad * pad)1349 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1350 {
1351 GList *iter;
1352
1353 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1354 GstAdaptiveDemuxStream *stream = iter->data;
1355 if (stream->pad == pad) {
1356 return stream;
1357 }
1358 }
1359
1360 return NULL;
1361 }
1362
1363 /* must be called with manifest_lock taken.
1364 * It will temporarily drop the manifest_lock in order to join the task.
1365 * It will join only the old_streams (the demux->streams are joined by
1366 * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1367 * called)
1368 */
1369 static void
gst_adaptive_demux_stream_free(GstAdaptiveDemuxStream * stream)1370 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1371 {
1372 GstAdaptiveDemux *demux = stream->demux;
1373 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1374
1375 if (klass->stream_free)
1376 klass->stream_free (stream);
1377
1378 g_clear_error (&stream->last_error);
1379 if (stream->download_task) {
1380 if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1381 GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1382 GST_DEBUG_PAD_NAME (stream->pad));
1383
1384 gst_task_stop (stream->download_task);
1385
1386 g_mutex_lock (&stream->fragment_download_lock);
1387 stream->cancelled = TRUE;
1388 g_cond_signal (&stream->fragment_download_cond);
1389 g_mutex_unlock (&stream->fragment_download_lock);
1390 }
1391 GST_LOG_OBJECT (demux, "Waiting for task to finish");
1392
1393 /* temporarily drop the manifest lock to join the task */
1394 GST_MANIFEST_UNLOCK (demux);
1395
1396 gst_task_join (stream->download_task);
1397
1398 GST_MANIFEST_LOCK (demux);
1399
1400 GST_LOG_OBJECT (demux, "Finished");
1401 gst_object_unref (stream->download_task);
1402 g_rec_mutex_clear (&stream->download_lock);
1403 stream->download_task = NULL;
1404 }
1405
1406 gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1407
1408 if (stream->pending_segment) {
1409 gst_event_unref (stream->pending_segment);
1410 stream->pending_segment = NULL;
1411 }
1412
1413 if (stream->pending_events) {
1414 g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1415 stream->pending_events = NULL;
1416 }
1417
1418 if (stream->internal_pad) {
1419 gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1420 }
1421
1422 if (stream->src_srcpad) {
1423 gst_object_unref (stream->src_srcpad);
1424 stream->src_srcpad = NULL;
1425 }
1426
1427 if (stream->src) {
1428 GstElement *src = stream->src;
1429
1430 stream->src = NULL;
1431
1432 GST_MANIFEST_UNLOCK (demux);
1433 gst_element_set_locked_state (src, TRUE);
1434 gst_element_set_state (src, GST_STATE_NULL);
1435 gst_bin_remove (GST_BIN_CAST (demux), src);
1436 GST_MANIFEST_LOCK (demux);
1437 }
1438
1439 g_cond_clear (&stream->fragment_download_cond);
1440 g_mutex_clear (&stream->fragment_download_lock);
1441 g_free (stream->fragment_bitrates);
1442
1443 if (stream->pad) {
1444 gst_object_unref (stream->pad);
1445 stream->pad = NULL;
1446 }
1447 if (stream->pending_caps)
1448 gst_caps_unref (stream->pending_caps);
1449
1450 g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1451
1452 g_free (stream);
1453 }
1454
1455 /* must be called with manifest_lock taken */
1456 static gboolean
gst_adaptive_demux_get_live_seek_range(GstAdaptiveDemux * demux,gint64 * range_start,gint64 * range_stop)1457 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1458 gint64 * range_start, gint64 * range_stop)
1459 {
1460 GstAdaptiveDemuxClass *klass;
1461
1462 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1463
1464 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1465
1466 return klass->get_live_seek_range (demux, range_start, range_stop);
1467 }
1468
1469 /* must be called with manifest_lock taken */
1470 static gboolean
gst_adaptive_demux_stream_in_live_seek_range(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1471 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1472 GstAdaptiveDemuxStream * stream)
1473 {
1474 gint64 range_start, range_stop;
1475 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1476 GST_LOG_OBJECT (stream->pad,
1477 "stream position %" GST_TIME_FORMAT " live seek range %"
1478 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1479 GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1480 GST_STIME_ARGS (range_stop));
1481 return (stream->segment.position >= range_start
1482 && stream->segment.position <= range_stop);
1483 }
1484
1485 return FALSE;
1486 }
1487
1488 /* must be called with manifest_lock taken */
1489 static gboolean
gst_adaptive_demux_can_seek(GstAdaptiveDemux * demux)1490 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1491 {
1492 GstAdaptiveDemuxClass *klass;
1493
1494 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1495 if (gst_adaptive_demux_is_live (demux)) {
1496 return klass->get_live_seek_range != NULL;
1497 }
1498
1499 return klass->seek != NULL;
1500 }
1501
1502 static void
gst_adaptive_demux_update_streams_segment(GstAdaptiveDemux * demux,GList * streams,gint64 period_start,GstSeekType start_type,GstSeekType stop_type)1503 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1504 GList * streams, gint64 period_start, GstSeekType start_type,
1505 GstSeekType stop_type)
1506 {
1507 GList *iter;
1508 for (iter = streams; iter; iter = g_list_next (iter)) {
1509 GstAdaptiveDemuxStream *stream = iter->data;
1510 GstEvent *seg_evt;
1511 GstClockTime offset;
1512
1513 /* See comments in gst_adaptive_demux_get_period_start_time() for
1514 * an explanation of the segment modifications */
1515 stream->segment = demux->segment;
1516 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1517 stream->segment.start += offset - period_start;
1518 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1519 stream->segment.stop += offset - period_start;
1520 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1521 stream->segment.position = stream->segment.start;
1522 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1523 stream->segment.position = stream->segment.stop;
1524 seg_evt = gst_event_new_segment (&stream->segment);
1525 gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1526 gst_event_replace (&stream->pending_segment, seg_evt);
1527 GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1528 stream->pending_segment);
1529 gst_event_unref (seg_evt);
1530 /* Make sure the first buffer after a seek has the discont flag */
1531 stream->discont = TRUE;
1532 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1533 }
1534 }
1535
1536 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1537 GST_SEEK_FLAG_SNAP_AFTER | \
1538 GST_SEEK_FLAG_SNAP_NEAREST | \
1539 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1540 GST_SEEK_FLAG_KEY_UNIT))
1541 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1542 GST_SEEK_FLAG_SNAP_AFTER | \
1543 GST_SEEK_FLAG_SNAP_NEAREST))
1544
1545 static gboolean
gst_adaptive_demux_handle_seek_event(GstAdaptiveDemux * demux,GstPad * pad,GstEvent * event)1546 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1547 GstEvent * event)
1548 {
1549 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1550 gdouble rate;
1551 GstFormat format;
1552 GstSeekFlags flags;
1553 GstSeekType start_type, stop_type;
1554 gint64 start, stop;
1555 guint32 seqnum;
1556 gboolean update;
1557 gboolean ret;
1558 GstSegment oldsegment;
1559 GstAdaptiveDemuxStream *stream = NULL;
1560
1561 GST_INFO_OBJECT (demux, "Received seek event");
1562
1563 GST_API_LOCK (demux);
1564 GST_MANIFEST_LOCK (demux);
1565
1566 if (!gst_adaptive_demux_can_seek (demux)) {
1567 GST_MANIFEST_UNLOCK (demux);
1568 GST_API_UNLOCK (demux);
1569 gst_event_unref (event);
1570 return FALSE;
1571 }
1572
1573 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1574 &stop_type, &stop);
1575
1576 if (format != GST_FORMAT_TIME) {
1577 GST_MANIFEST_UNLOCK (demux);
1578 GST_API_UNLOCK (demux);
1579 GST_WARNING_OBJECT (demux,
1580 "Adaptive demuxers only support TIME-based seeking");
1581 gst_event_unref (event);
1582 return FALSE;
1583 }
1584
1585 if (flags & GST_SEEK_FLAG_SEGMENT) {
1586 GST_FIXME_OBJECT (demux, "Handle segment seeks");
1587 GST_MANIFEST_UNLOCK (demux);
1588 GST_API_UNLOCK (demux);
1589 gst_event_unref (event);
1590 return FALSE;
1591 }
1592
1593 seqnum = gst_event_get_seqnum (event);
1594
1595 if (gst_adaptive_demux_is_live (demux)) {
1596 gint64 range_start, range_stop;
1597 gboolean changed = FALSE;
1598 gboolean start_valid = TRUE, stop_valid = TRUE;
1599
1600 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1601 &range_stop)) {
1602 GST_MANIFEST_UNLOCK (demux);
1603 GST_API_UNLOCK (demux);
1604 gst_event_unref (event);
1605 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1606 return FALSE;
1607 }
1608
1609 GST_DEBUG_OBJECT (demux,
1610 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1611 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1612
1613 /* Handle relative positioning for live streams (relative to the range_stop) */
1614 if (start_type == GST_SEEK_TYPE_END) {
1615 start = range_stop + start;
1616 start_type = GST_SEEK_TYPE_SET;
1617 changed = TRUE;
1618 }
1619 if (stop_type == GST_SEEK_TYPE_END) {
1620 stop = range_stop + stop;
1621 stop_type = GST_SEEK_TYPE_SET;
1622 changed = TRUE;
1623 }
1624
1625 /* Adjust the requested start/stop position if it falls beyond the live
1626 * seek range.
1627 * The only case where we don't adjust is for the starting point of
1628 * an accurate seek (start if forward and stop if backwards)
1629 */
1630 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1631 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1632 GST_DEBUG_OBJECT (demux,
1633 "seek before live stream start, setting to range start: %"
1634 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1635 start = range_start;
1636 changed = TRUE;
1637 }
1638 /* truncate stop position also if set */
1639 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1640 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1641 GST_DEBUG_OBJECT (demux,
1642 "seek ending after live start, adjusting to: %"
1643 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1644 stop = range_stop;
1645 changed = TRUE;
1646 }
1647
1648 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1649 (start < range_start || start > range_stop)) {
1650 GST_WARNING_OBJECT (demux,
1651 "Seek to invalid position start:%" GST_STIME_FORMAT
1652 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1653 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1654 GST_STIME_ARGS (range_stop));
1655 start_valid = FALSE;
1656 }
1657 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1658 (stop < range_start || stop > range_stop)) {
1659 GST_WARNING_OBJECT (demux,
1660 "Seek to invalid position stop:%" GST_STIME_FORMAT
1661 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1662 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1663 GST_STIME_ARGS (range_stop));
1664 stop_valid = FALSE;
1665 }
1666
1667 /* If the seek position is still outside of the seekable range, refuse the seek */
1668 if (!start_valid || !stop_valid) {
1669 GST_MANIFEST_UNLOCK (demux);
1670 GST_API_UNLOCK (demux);
1671 gst_event_unref (event);
1672 return FALSE;
1673 }
1674
1675 /* Re-create seek event with changed/updated values */
1676 if (changed) {
1677 gst_event_unref (event);
1678 event =
1679 gst_event_new_seek (rate, format, flags,
1680 start_type, start, stop_type, stop);
1681 gst_event_set_seqnum (event, seqnum);
1682 }
1683 }
1684
1685 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1686
1687 /* have a backup in case seek fails */
1688 gst_segment_copy_into (&demux->segment, &oldsegment);
1689
1690 if (flags & GST_SEEK_FLAG_FLUSH) {
1691 GstEvent *fevent;
1692
1693 GST_DEBUG_OBJECT (demux, "sending flush start");
1694 fevent = gst_event_new_flush_start ();
1695 gst_event_set_seqnum (fevent, seqnum);
1696 GST_MANIFEST_UNLOCK (demux);
1697 gst_adaptive_demux_push_src_event (demux, fevent);
1698 GST_MANIFEST_LOCK (demux);
1699
1700 gst_adaptive_demux_stop_tasks (demux, FALSE);
1701 } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1702 (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1703
1704 gst_adaptive_demux_stop_tasks (demux, FALSE);
1705 }
1706
1707 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1708
1709 /*
1710 * Handle snap seeks as follows:
1711 * 1) do the snap seeking on the stream that received
1712 * the event
1713 * 2) use the final position on this stream to seek
1714 * on the other streams to the same position
1715 *
1716 * We can't snap at all streams at the same time as
1717 * they might end in different positions, so just
1718 * use the one that received the event as the 'leading'
1719 * one to do the snap seek.
1720 */
1721 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1722 gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1723 GstClockTime ts;
1724 GstSeekFlags stream_seek_flags = flags;
1725
1726 /* snap-seek on the stream that received the event and then
1727 * use the resulting position to seek on all streams */
1728
1729 if (rate >= 0) {
1730 if (start_type != GST_SEEK_TYPE_NONE)
1731 ts = start;
1732 else {
1733 ts = stream->segment.position;
1734 start_type = GST_SEEK_TYPE_SET;
1735 }
1736 } else {
1737 if (stop_type != GST_SEEK_TYPE_NONE)
1738 ts = stop;
1739 else {
1740 stop_type = GST_SEEK_TYPE_SET;
1741 ts = stream->segment.position;
1742 }
1743 }
1744
1745 if (stream) {
1746 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1747 }
1748
1749 /* replace event with a new one without snaping to seek on all streams */
1750 gst_event_unref (event);
1751 if (rate >= 0) {
1752 start = ts;
1753 } else {
1754 stop = ts;
1755 }
1756 event =
1757 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1758 start_type, start, stop_type, stop);
1759 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1760 }
1761 stream = NULL;
1762
1763 gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1764 start, stop_type, stop, &update);
1765
1766 /* FIXME - this seems unatural, do_seek() is updating base when we
1767 * only want the start/stop position to change, maybe do_seek() needs
1768 * some fixing? */
1769 if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1770 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1771 && stop_type == GST_SEEK_TYPE_NONE))) {
1772 demux->segment.base = oldsegment.base;
1773 }
1774
1775 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1776
1777 ret = demux_class->seek (demux, event);
1778
1779 if (!ret) {
1780 /* Is there anything else we can do if it fails? */
1781 gst_segment_copy_into (&oldsegment, &demux->segment);
1782 } else {
1783 demux->priv->segment_seqnum = seqnum;
1784 }
1785 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1786
1787 if (flags & GST_SEEK_FLAG_FLUSH) {
1788 GstEvent *fevent;
1789
1790 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1791 fevent = gst_event_new_flush_stop (TRUE);
1792 gst_event_set_seqnum (fevent, seqnum);
1793 gst_adaptive_demux_push_src_event (demux, fevent);
1794 }
1795
1796 if (demux->next_streams) {
1797 /* If the seek generated new streams, get them
1798 * to preroll */
1799 gst_adaptive_demux_prepare_streams (demux, FALSE);
1800 gst_adaptive_demux_start_tasks (demux, TRUE);
1801 } else {
1802 GstClockTime period_start =
1803 gst_adaptive_demux_get_period_start_time (demux);
1804
1805 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1806 gst_adaptive_demux_update_streams_segment (demux, demux->streams,
1807 period_start, start_type, stop_type);
1808 gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
1809 period_start, start_type, stop_type);
1810 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1811
1812 /* Restart the demux */
1813 gst_adaptive_demux_start_tasks (demux, FALSE);
1814 }
1815
1816 GST_MANIFEST_UNLOCK (demux);
1817 GST_API_UNLOCK (demux);
1818 gst_event_unref (event);
1819
1820 return ret;
1821 }
1822
1823 static gboolean
gst_adaptive_demux_src_event(GstPad * pad,GstObject * parent,GstEvent * event)1824 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1825 GstEvent * event)
1826 {
1827 GstAdaptiveDemux *demux;
1828
1829 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1830
1831 /* FIXME handle events received on pads that are to be removed */
1832
1833 switch (event->type) {
1834 case GST_EVENT_SEEK:
1835 {
1836 guint32 seqnum = gst_event_get_seqnum (event);
1837 if (seqnum == demux->priv->segment_seqnum) {
1838 GST_LOG_OBJECT (pad,
1839 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
1840 gst_event_unref (event);
1841 return TRUE;
1842 }
1843 return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1844 }
1845 case GST_EVENT_RECONFIGURE:{
1846 GstAdaptiveDemuxStream *stream;
1847
1848 GST_MANIFEST_LOCK (demux);
1849 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1850
1851 if (stream) {
1852 if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
1853 stream->last_ret == GST_FLOW_NOT_LINKED) {
1854 stream->last_ret = GST_FLOW_OK;
1855 stream->restart_download = TRUE;
1856 stream->need_header = TRUE;
1857 stream->discont = TRUE;
1858 GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1859 gst_task_start (stream->download_task);
1860 }
1861 gst_event_unref (event);
1862 GST_MANIFEST_UNLOCK (demux);
1863 return TRUE;
1864 }
1865 GST_MANIFEST_UNLOCK (demux);
1866 }
1867 break;
1868 case GST_EVENT_LATENCY:{
1869 /* Upstream and our internal source are irrelevant
1870 * for latency, and we should not fail here to
1871 * configure the latency */
1872 gst_event_unref (event);
1873 return TRUE;
1874 }
1875 case GST_EVENT_QOS:{
1876 GstAdaptiveDemuxStream *stream;
1877
1878 GST_MANIFEST_LOCK (demux);
1879 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1880
1881 if (stream) {
1882 GstClockTimeDiff diff;
1883 GstClockTime timestamp;
1884
1885 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
1886 /* Only take into account lateness if late */
1887 if (diff > 0)
1888 stream->qos_earliest_time = timestamp + 2 * diff;
1889 else
1890 stream->qos_earliest_time = timestamp;
1891 GST_DEBUG_OBJECT (stream->pad, "qos_earliest_time %" GST_TIME_FORMAT,
1892 GST_TIME_ARGS (stream->qos_earliest_time));
1893 }
1894 GST_MANIFEST_UNLOCK (demux);
1895 break;
1896 }
1897 default:
1898 break;
1899 }
1900
1901 return gst_pad_event_default (pad, parent, event);
1902 }
1903
1904 static gboolean
gst_adaptive_demux_src_query(GstPad * pad,GstObject * parent,GstQuery * query)1905 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1906 GstQuery * query)
1907 {
1908 GstAdaptiveDemux *demux;
1909 GstAdaptiveDemuxClass *demux_class;
1910 gboolean ret = FALSE;
1911
1912 if (query == NULL)
1913 return FALSE;
1914
1915 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1916 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1917
1918 switch (query->type) {
1919 case GST_QUERY_DURATION:{
1920 GstClockTime duration = -1;
1921 GstFormat fmt;
1922
1923 gst_query_parse_duration (query, &fmt, NULL);
1924
1925 if (fmt == GST_FORMAT_TIME
1926 && g_atomic_int_get (&demux->priv->have_manifest)) {
1927 duration = demux_class->get_duration (demux);
1928
1929 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1930 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1931 ret = TRUE;
1932 }
1933 }
1934
1935 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1936 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1937 break;
1938 }
1939 case GST_QUERY_LATENCY:{
1940 gst_query_set_latency (query, FALSE, 0, -1);
1941 ret = TRUE;
1942 break;
1943 }
1944 case GST_QUERY_SEEKING:{
1945 GstFormat fmt;
1946 gint64 stop = -1;
1947 gint64 start = 0;
1948
1949 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
1950 GST_INFO_OBJECT (demux,
1951 "Don't have manifest yet, can't answer seeking query");
1952 return FALSE; /* can't answer without manifest */
1953 }
1954
1955 GST_MANIFEST_LOCK (demux);
1956
1957 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1958 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
1959 if (fmt == GST_FORMAT_TIME) {
1960 GstClockTime duration;
1961 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
1962
1963 ret = TRUE;
1964 if (can_seek) {
1965 if (gst_adaptive_demux_is_live (demux)) {
1966 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
1967 if (!ret) {
1968 GST_MANIFEST_UNLOCK (demux);
1969 GST_INFO_OBJECT (demux, "can't answer seeking query");
1970 return FALSE;
1971 }
1972 } else {
1973 duration = demux_class->get_duration (demux);
1974 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
1975 stop = duration;
1976 }
1977 }
1978 gst_query_set_seeking (query, fmt, can_seek, start, stop);
1979 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
1980 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
1981 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
1982 }
1983 GST_MANIFEST_UNLOCK (demux);
1984 break;
1985 }
1986 case GST_QUERY_URI:
1987
1988 GST_MANIFEST_LOCK (demux);
1989
1990 /* TODO HLS can answer this differently it seems */
1991 if (demux->manifest_uri) {
1992 /* FIXME: (hls) Do we answer with the variant playlist, with the current
1993 * playlist or the the uri of the last downlowaded fragment? */
1994 gst_query_set_uri (query, demux->manifest_uri);
1995 ret = TRUE;
1996 }
1997
1998 GST_MANIFEST_UNLOCK (demux);
1999 break;
2000 default:
2001 /* Don't forward queries upstream because of the special nature of this
2002 * "demuxer", which relies on the upstream element only to be fed
2003 * the Manifest
2004 */
2005 break;
2006 }
2007
2008 return ret;
2009 }
2010
2011 /* must be called with manifest_lock taken */
2012 static void
gst_adaptive_demux_start_tasks(GstAdaptiveDemux * demux,gboolean start_preroll_streams)2013 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2014 gboolean start_preroll_streams)
2015 {
2016 GList *iter;
2017
2018 if (!gst_adaptive_demux_is_running (demux)) {
2019 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2020 return;
2021 }
2022
2023 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2024
2025 iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2026
2027 for (; iter; iter = g_list_next (iter)) {
2028 GstAdaptiveDemuxStream *stream = iter->data;
2029
2030 if (!start_preroll_streams) {
2031 g_mutex_lock (&stream->fragment_download_lock);
2032 stream->cancelled = FALSE;
2033 stream->replaced = FALSE;
2034 g_mutex_unlock (&stream->fragment_download_lock);
2035 }
2036
2037 stream->last_ret = GST_FLOW_OK;
2038 gst_task_start (stream->download_task);
2039 }
2040 }
2041
2042 /* must be called with manifest_lock taken */
2043 static void
gst_adaptive_demux_stop_manifest_update_task(GstAdaptiveDemux * demux)2044 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2045 {
2046 gst_uri_downloader_cancel (demux->downloader);
2047
2048 gst_task_stop (demux->priv->updates_task);
2049
2050 g_mutex_lock (&demux->priv->updates_timed_lock);
2051 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2052 demux->priv->stop_updates_task = TRUE;
2053 g_cond_signal (&demux->priv->updates_timed_cond);
2054 g_mutex_unlock (&demux->priv->updates_timed_lock);
2055 }
2056
2057 /* must be called with manifest_lock taken */
2058 static void
gst_adaptive_demux_start_manifest_update_task(GstAdaptiveDemux * demux)2059 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2060 {
2061 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2062
2063 if (gst_adaptive_demux_is_live (demux)) {
2064 gst_uri_downloader_reset (demux->downloader);
2065 g_mutex_lock (&demux->priv->updates_timed_lock);
2066 demux->priv->stop_updates_task = FALSE;
2067 g_mutex_unlock (&demux->priv->updates_timed_lock);
2068 /* Task to periodically update the manifest */
2069 if (demux_class->requires_periodical_playlist_update (demux)) {
2070 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2071 gst_task_start (demux->priv->updates_task);
2072 }
2073 }
2074 }
2075
2076 /* must be called with manifest_lock taken
2077 * This function will temporarily release manifest_lock in order to join the
2078 * download threads.
2079 * The api_lock will still protect it against other threads trying to modify
2080 * the demux element.
2081 */
2082 static void
gst_adaptive_demux_stop_tasks(GstAdaptiveDemux * demux,gboolean stop_updates)2083 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2084 {
2085 int i;
2086 GList *iter;
2087 GList *list_to_process;
2088
2089 GST_LOG_OBJECT (demux, "Stopping tasks");
2090
2091 if (stop_updates)
2092 gst_adaptive_demux_stop_manifest_update_task (demux);
2093
2094 list_to_process = demux->streams;
2095 for (i = 0; i < 2; ++i) {
2096 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2097 GstAdaptiveDemuxStream *stream = iter->data;
2098
2099 g_mutex_lock (&stream->fragment_download_lock);
2100 stream->cancelled = TRUE;
2101 gst_task_stop (stream->download_task);
2102 g_cond_signal (&stream->fragment_download_cond);
2103 g_mutex_unlock (&stream->fragment_download_lock);
2104 }
2105 list_to_process = demux->prepared_streams;
2106 }
2107
2108 GST_MANIFEST_UNLOCK (demux);
2109 g_mutex_lock (&demux->priv->preroll_lock);
2110 g_cond_broadcast (&demux->priv->preroll_cond);
2111 g_mutex_unlock (&demux->priv->preroll_lock);
2112 GST_MANIFEST_LOCK (demux);
2113
2114 g_mutex_lock (&demux->priv->manifest_update_lock);
2115 g_cond_broadcast (&demux->priv->manifest_cond);
2116 g_mutex_unlock (&demux->priv->manifest_update_lock);
2117
2118 /* need to release manifest_lock before stopping the src element.
2119 * The streams were asked to cancel, so they will not make any writes to demux
2120 * object. Even if we temporarily release manifest_lock, the demux->streams
2121 * cannot change and iter cannot be invalidated.
2122 */
2123 list_to_process = demux->streams;
2124 for (i = 0; i < 2; ++i) {
2125 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2126 GstAdaptiveDemuxStream *stream = iter->data;
2127 GstElement *src = stream->src;
2128
2129 GST_MANIFEST_UNLOCK (demux);
2130
2131 if (src) {
2132 gst_element_set_locked_state (src, TRUE);
2133 gst_element_set_state (src, GST_STATE_READY);
2134 }
2135
2136 /* stream->download_task value never changes, so it is safe to read it
2137 * outside critical section
2138 */
2139 gst_task_join (stream->download_task);
2140
2141 GST_MANIFEST_LOCK (demux);
2142 }
2143 list_to_process = demux->prepared_streams;
2144 }
2145
2146 GST_MANIFEST_UNLOCK (demux);
2147 if (stop_updates)
2148 gst_task_join (demux->priv->updates_task);
2149
2150 GST_MANIFEST_LOCK (demux);
2151
2152 list_to_process = demux->streams;
2153 for (i = 0; i < 2; ++i) {
2154 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2155 GstAdaptiveDemuxStream *stream = iter->data;
2156
2157 stream->download_error_count = 0;
2158 stream->need_header = TRUE;
2159 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
2160 }
2161 list_to_process = demux->prepared_streams;
2162 }
2163 }
2164
2165 /* must be called with manifest_lock taken */
2166 static gboolean
gst_adaptive_demux_push_src_event(GstAdaptiveDemux * demux,GstEvent * event)2167 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2168 {
2169 GList *iter;
2170 gboolean ret = TRUE;
2171
2172 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2173 GstAdaptiveDemuxStream *stream = iter->data;
2174 gst_event_ref (event);
2175 ret = ret & gst_pad_push_event (stream->pad, event);
2176 }
2177 gst_event_unref (event);
2178 return ret;
2179 }
2180
2181 /* must be called with manifest_lock taken */
2182 void
gst_adaptive_demux_stream_set_caps(GstAdaptiveDemuxStream * stream,GstCaps * caps)2183 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2184 GstCaps * caps)
2185 {
2186 GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2187 caps);
2188 gst_caps_replace (&stream->pending_caps, caps);
2189 gst_caps_unref (caps);
2190 }
2191
2192 /* must be called with manifest_lock taken */
2193 void
gst_adaptive_demux_stream_set_tags(GstAdaptiveDemuxStream * stream,GstTagList * tags)2194 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2195 GstTagList * tags)
2196 {
2197 GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2198 tags);
2199 if (stream->pending_tags) {
2200 gst_tag_list_unref (stream->pending_tags);
2201 }
2202 stream->pending_tags = tags;
2203 }
2204
2205 /* must be called with manifest_lock taken */
2206 void
gst_adaptive_demux_stream_queue_event(GstAdaptiveDemuxStream * stream,GstEvent * event)2207 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2208 GstEvent * event)
2209 {
2210 stream->pending_events = g_list_append (stream->pending_events, event);
2211 }
2212
2213 /* must be called with manifest_lock taken */
2214 static guint64
_update_average_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 new_bitrate)2215 _update_average_bitrate (GstAdaptiveDemux * demux,
2216 GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2217 {
2218 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2219
2220 stream->moving_bitrate -= stream->fragment_bitrates[index];
2221 stream->fragment_bitrates[index] = new_bitrate;
2222 stream->moving_bitrate += new_bitrate;
2223
2224 stream->moving_index += 1;
2225
2226 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2227 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2228 return stream->moving_bitrate / stream->moving_index;
2229 }
2230
2231 /* must be called with manifest_lock taken */
2232 static guint64
gst_adaptive_demux_stream_update_current_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2233 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2234 GstAdaptiveDemuxStream * stream)
2235 {
2236 guint64 average_bitrate;
2237 guint64 fragment_bitrate;
2238
2239 if (demux->connection_speed) {
2240 GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2241 demux->connection_speed / 1000);
2242 stream->current_download_rate = demux->connection_speed;
2243 return demux->connection_speed;
2244 }
2245
2246 fragment_bitrate = stream->last_bitrate;
2247 GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2248 fragment_bitrate);
2249
2250 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2251
2252 GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
2253 fragment_bitrate);
2254 GST_INFO_OBJECT (stream,
2255 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2256 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2257
2258 /* Conservative approach, make sure we don't upgrade too fast */
2259 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2260
2261 stream->current_download_rate *= demux->bitrate_limit;
2262 GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2263 G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2264
2265 #if 0
2266 /* Debugging code, modulate the bitrate every few fragments */
2267 {
2268 static guint ctr = 0;
2269 if (ctr % 3 == 0) {
2270 GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2271 stream->current_download_rate /= 2;
2272 }
2273 ctr++;
2274 }
2275 #endif
2276
2277 return stream->current_download_rate;
2278 }
2279
2280 /* must be called with manifest_lock taken */
2281 static GstFlowReturn
gst_adaptive_demux_combine_flows(GstAdaptiveDemux * demux)2282 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2283 {
2284 gboolean all_notlinked = TRUE;
2285 gboolean all_eos = TRUE;
2286 GList *iter;
2287
2288 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2289 GstAdaptiveDemuxStream *stream = iter->data;
2290
2291 if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2292 all_notlinked = FALSE;
2293 if (stream->last_ret != GST_FLOW_EOS)
2294 all_eos = FALSE;
2295 }
2296
2297 if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2298 || stream->last_ret == GST_FLOW_FLUSHING) {
2299 return stream->last_ret;
2300 }
2301 }
2302 if (all_notlinked)
2303 return GST_FLOW_NOT_LINKED;
2304 else if (all_eos)
2305 return GST_FLOW_EOS;
2306 return GST_FLOW_OK;
2307 }
2308
2309 /* Called with preroll_lock */
2310 static void
gst_adaptive_demux_handle_preroll(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2311 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2312 GstAdaptiveDemuxStream * stream)
2313 {
2314 demux->priv->preroll_pending--;
2315 if (demux->priv->preroll_pending == 0) {
2316 /* That was the last one, time to release all streams
2317 * and expose them */
2318 GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2319 gst_adaptive_demux_expose_streams (demux);
2320 g_cond_broadcast (&demux->priv->preroll_cond);
2321 }
2322 }
2323
2324 /* must be called with manifest_lock taken.
2325 * Temporarily releases manifest_lock
2326 */
2327 GstFlowReturn
gst_adaptive_demux_stream_push_buffer(GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2328 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2329 GstBuffer * buffer)
2330 {
2331 GstAdaptiveDemux *demux = stream->demux;
2332 GstFlowReturn ret = GST_FLOW_OK;
2333 gboolean discont = FALSE;
2334 /* Pending events */
2335 GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2336 GList *pending_events = NULL;
2337
2338 /* FIXME :
2339 * This is duplicating *exactly* the same thing as what is done at the beginning
2340 * of _src_chain if starting_fragment is TRUE */
2341 if (stream->first_fragment_buffer) {
2342 GstClockTime offset =
2343 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2344 GstClockTime period_start =
2345 gst_adaptive_demux_get_period_start_time (demux);
2346
2347 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2348 if (demux->segment.rate < 0)
2349 /* Set DISCONT flag for every first buffer in reverse playback mode
2350 * as each fragment for its own has to be reversed */
2351 discont = TRUE;
2352
2353 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2354 if (GST_BUFFER_PTS_IS_VALID (buffer))
2355 GST_BUFFER_PTS (buffer) += offset;
2356
2357 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2358 stream->segment.position = GST_BUFFER_PTS (buffer);
2359
2360 /* Convert from position inside the stream's segment to the demuxer's
2361 * segment, they are not necessarily the same */
2362 if (stream->segment.position - offset + period_start >
2363 demux->segment.position)
2364 demux->segment.position =
2365 stream->segment.position - offset + period_start;
2366 }
2367 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2368
2369 GST_LOG_OBJECT (stream->pad,
2370 "Going to push buffer with PTS %" GST_TIME_FORMAT,
2371 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2372 } else {
2373 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2374 }
2375
2376 if (stream->discont) {
2377 discont = TRUE;
2378 stream->discont = FALSE;
2379 }
2380
2381 if (discont) {
2382 GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2383 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2384 } else {
2385 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2386 }
2387
2388 stream->first_fragment_buffer = FALSE;
2389
2390 GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2391 GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2392 if (G_UNLIKELY (stream->pending_caps)) {
2393 pending_caps = gst_event_new_caps (stream->pending_caps);
2394 gst_caps_unref (stream->pending_caps);
2395 stream->pending_caps = NULL;
2396 }
2397
2398 if (stream->do_block) {
2399
2400 g_mutex_lock (&demux->priv->preroll_lock);
2401
2402 /* If we are preroll state, set caps in here */
2403 if (pending_caps) {
2404 gst_pad_push_event (stream->pad, pending_caps);
2405 pending_caps = NULL;
2406 }
2407
2408 gst_adaptive_demux_handle_preroll (demux, stream);
2409 GST_MANIFEST_UNLOCK (demux);
2410
2411 while (stream->do_block && !stream->cancelled) {
2412 GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2413 g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2414 }
2415 if (stream->cancelled) {
2416 GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2417 gst_buffer_unref (buffer);
2418 g_mutex_unlock (&demux->priv->preroll_lock);
2419 return GST_FLOW_FLUSHING;
2420 }
2421
2422 g_mutex_unlock (&demux->priv->preroll_lock);
2423 GST_MANIFEST_LOCK (demux);
2424 }
2425
2426 if (G_UNLIKELY (stream->pending_segment)) {
2427 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2428 pending_segment = stream->pending_segment;
2429 stream->pending_segment = NULL;
2430 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2431 }
2432 if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2433 GstTagList *tags = stream->pending_tags;
2434
2435 stream->pending_tags = NULL;
2436 stream->bitrate_changed = 0;
2437
2438 if (stream->fragment.bitrate != 0) {
2439 if (tags)
2440 tags = gst_tag_list_make_writable (tags);
2441 else
2442 tags = gst_tag_list_new_empty ();
2443
2444 gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2445 GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2446 }
2447 if (tags)
2448 pending_tags = gst_event_new_tag (tags);
2449 }
2450 if (G_UNLIKELY (stream->pending_events)) {
2451 pending_events = stream->pending_events;
2452 stream->pending_events = NULL;
2453 }
2454
2455 GST_MANIFEST_UNLOCK (demux);
2456
2457 /* Do not push events or buffers holding the manifest lock */
2458 if (G_UNLIKELY (pending_caps)) {
2459 GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2460 pending_caps);
2461 gst_pad_push_event (stream->pad, pending_caps);
2462 }
2463 if (G_UNLIKELY (pending_segment)) {
2464 GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2465 pending_segment);
2466 gst_pad_push_event (stream->pad, pending_segment);
2467 }
2468 if (G_UNLIKELY (pending_tags)) {
2469 GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2470 pending_tags);
2471 gst_pad_push_event (stream->pad, pending_tags);
2472 }
2473 while (pending_events != NULL) {
2474 GstEvent *event = pending_events->data;
2475
2476 if (!gst_pad_push_event (stream->pad, event))
2477 GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2478
2479 pending_events = g_list_delete_link (pending_events, pending_events);
2480 }
2481
2482 /* Wait for preroll if blocking */
2483 GST_DEBUG_OBJECT (stream->pad,
2484 "About to push buffer of size %" G_GSIZE_FORMAT,
2485 gst_buffer_get_size (buffer));
2486
2487 ret = gst_pad_push (stream->pad, buffer);
2488
2489 GST_MANIFEST_LOCK (demux);
2490
2491 g_mutex_lock (&stream->fragment_download_lock);
2492 if (G_UNLIKELY (stream->cancelled)) {
2493 GST_LOG_OBJECT (stream, "Stream was cancelled");
2494 ret = stream->last_ret = GST_FLOW_FLUSHING;
2495 g_mutex_unlock (&stream->fragment_download_lock);
2496 return ret;
2497 }
2498 g_mutex_unlock (&stream->fragment_download_lock);
2499
2500 GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2501 gst_flow_get_name (ret));
2502
2503 return ret;
2504 }
2505
2506 /* must be called with manifest_lock taken */
2507 static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2508 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2509 GstAdaptiveDemuxStream * stream)
2510 {
2511 /* No need to advance, this isn't a real fragment */
2512 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2513 return GST_FLOW_OK;
2514
2515 return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2516 stream->fragment.duration);
2517 }
2518
2519 /* must be called with manifest_lock taken.
2520 * Can temporarily release manifest_lock
2521 */
2522 static GstFlowReturn
gst_adaptive_demux_stream_data_received_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2523 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2524 GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2525 {
2526 return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2527 }
2528
2529 static gboolean
gst_adaptive_demux_requires_periodical_playlist_update_default(GstAdaptiveDemux * demux)2530 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2531 * demux)
2532 {
2533 return TRUE;
2534 }
2535
2536 static GstFlowReturn
_src_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2537 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2538 {
2539 GstAdaptiveDemuxStream *stream;
2540 GstAdaptiveDemux *demux;
2541 GstAdaptiveDemuxClass *klass;
2542 GstFlowReturn ret = GST_FLOW_OK;
2543
2544 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2545 stream = gst_pad_get_element_private (pad);
2546 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2547
2548 GST_MANIFEST_LOCK (demux);
2549
2550 /* do not make any changes if the stream is cancelled */
2551 g_mutex_lock (&stream->fragment_download_lock);
2552 if (G_UNLIKELY (stream->cancelled)) {
2553 g_mutex_unlock (&stream->fragment_download_lock);
2554 gst_buffer_unref (buffer);
2555 ret = stream->last_ret = GST_FLOW_FLUSHING;
2556 GST_MANIFEST_UNLOCK (demux);
2557 return ret;
2558 }
2559 g_mutex_unlock (&stream->fragment_download_lock);
2560
2561 /* starting_fragment is set to TRUE at the beginning of
2562 * _stream_download_fragment()
2563 * /!\ If there is a header/index being downloaded, then this will
2564 * be TRUE for the first one ... but FALSE for the remaining ones,
2565 * including the *actual* fragment ! */
2566 if (stream->starting_fragment) {
2567 GstClockTime offset =
2568 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2569 GstClockTime period_start =
2570 gst_adaptive_demux_get_period_start_time (demux);
2571
2572 stream->starting_fragment = FALSE;
2573 if (klass->start_fragment) {
2574 if (!klass->start_fragment (demux, stream)) {
2575 ret = GST_FLOW_ERROR;
2576 goto error;
2577 }
2578 }
2579
2580 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2581 if (GST_BUFFER_PTS_IS_VALID (buffer))
2582 GST_BUFFER_PTS (buffer) += offset;
2583
2584 GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2585 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2586
2587 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2588 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2589 stream->segment.position = GST_BUFFER_PTS (buffer);
2590
2591 /* Convert from position inside the stream's segment to the demuxer's
2592 * segment, they are not necessarily the same */
2593 if (stream->segment.position - offset + period_start >
2594 demux->segment.position)
2595 demux->segment.position =
2596 stream->segment.position - offset + period_start;
2597 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2598 }
2599
2600 } else {
2601 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2602 }
2603
2604 /* downloading_first_buffer is set to TRUE in download_uri() just before
2605 * activating the source (i.e. requesting a given URI)
2606 *
2607 * The difference with starting_fragment is that this will be called
2608 * for *all* first buffers (of index, and header, and fragment)
2609 *
2610 * ... to then only do something useful (in this block) for actual
2611 * fragments... */
2612 if (stream->downloading_first_buffer) {
2613 gint64 chunk_size = 0;
2614
2615 stream->downloading_first_buffer = FALSE;
2616
2617 if (!stream->downloading_header && !stream->downloading_index) {
2618 /* If this is the first buffer of a fragment (not the headers or index)
2619 * and we don't have a birate from the sub-class, then see if we
2620 * can work it out from the fragment size and duration */
2621 if (stream->fragment.bitrate == 0 &&
2622 stream->fragment.duration != 0 &&
2623 gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2624 &chunk_size)) {
2625 guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2626 8 * GST_SECOND, stream->fragment.duration));
2627 GST_LOG_OBJECT (demux,
2628 "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2629 " = bitrate %u", chunk_size,
2630 GST_TIME_ARGS (stream->fragment.duration), bitrate);
2631 stream->fragment.bitrate = bitrate;
2632 }
2633 if (stream->fragment.bitrate) {
2634 stream->bitrate_changed = TRUE;
2635 } else {
2636 GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2637 }
2638 }
2639 }
2640
2641 stream->download_total_bytes += gst_buffer_get_size (buffer);
2642
2643 GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2644 gst_buffer_get_size (buffer));
2645
2646 ret = klass->data_received (demux, stream, buffer);
2647
2648 if (ret == GST_FLOW_FLUSHING) {
2649 /* do not make any changes if the stream is cancelled */
2650 g_mutex_lock (&stream->fragment_download_lock);
2651 if (G_UNLIKELY (stream->cancelled)) {
2652 g_mutex_unlock (&stream->fragment_download_lock);
2653 GST_MANIFEST_UNLOCK (demux);
2654 return ret;
2655 }
2656 g_mutex_unlock (&stream->fragment_download_lock);
2657 }
2658
2659 if (ret != GST_FLOW_OK) {
2660 gboolean finished = FALSE;
2661
2662 if (ret < GST_FLOW_EOS) {
2663 GST_ELEMENT_FLOW_ERROR (demux, ret);
2664
2665 /* TODO push this on all pads */
2666 gst_pad_push_event (stream->pad, gst_event_new_eos ());
2667 } else {
2668 GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2669 gst_flow_get_name (ret));
2670 }
2671
2672 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2673 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
2674 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
2675 /* Behaves like an EOS event from upstream */
2676 stream->fragment.finished = TRUE;
2677 ret = klass->finish_fragment (demux, stream);
2678 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2679 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
2680 } else if (ret != GST_FLOW_OK) {
2681 goto error;
2682 }
2683 finished = TRUE;
2684 }
2685
2686 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2687 if (finished)
2688 ret = GST_FLOW_EOS;
2689 }
2690
2691 error:
2692
2693 GST_MANIFEST_UNLOCK (demux);
2694
2695 return ret;
2696 }
2697
2698 /* must be called with manifest_lock taken */
2699 static void
gst_adaptive_demux_stream_fragment_download_finish(GstAdaptiveDemuxStream * stream,GstFlowReturn ret,GError * err)2700 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2701 stream, GstFlowReturn ret, GError * err)
2702 {
2703 GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2704 gst_flow_get_name (ret), err);
2705
2706 /* if we have an error, only replace last_ret if it was OK before to avoid
2707 * overwriting the first error we got */
2708 if (stream->last_ret == GST_FLOW_OK) {
2709 stream->last_ret = ret;
2710 if (err) {
2711 g_clear_error (&stream->last_error);
2712 stream->last_error = g_error_copy (err);
2713 }
2714 }
2715 g_mutex_lock (&stream->fragment_download_lock);
2716 stream->download_finished = TRUE;
2717 g_cond_signal (&stream->fragment_download_cond);
2718 g_mutex_unlock (&stream->fragment_download_lock);
2719 }
2720
2721 static GstFlowReturn
gst_adaptive_demux_eos_handling(GstAdaptiveDemuxStream * stream)2722 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
2723 {
2724 GstFlowReturn ret = GST_FLOW_OK;
2725 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
2726
2727 if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
2728 || !klass->need_another_chunk (stream)
2729 || stream->fragment.chunk_size == 0) {
2730 stream->fragment.finished = TRUE;
2731 ret = klass->finish_fragment (stream->demux, stream);
2732 }
2733 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2734
2735 return ret;
2736 }
2737
2738 static gboolean
_src_event(GstPad * pad,GstObject * parent,GstEvent * event)2739 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2740 {
2741 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2742 GstAdaptiveDemux *demux = stream->demux;
2743
2744 switch (GST_EVENT_TYPE (event)) {
2745 case GST_EVENT_EOS:{
2746 GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
2747 GST_MANIFEST_LOCK (demux);
2748
2749 gst_adaptive_demux_eos_handling (stream);
2750
2751 /* FIXME ?
2752 * _eos_handling() calls fragment_download_finish() which does the
2753 * same thing as below.
2754 * Could this cause races ? */
2755 g_mutex_lock (&stream->fragment_download_lock);
2756 stream->download_finished = TRUE;
2757 g_cond_signal (&stream->fragment_download_cond);
2758 g_mutex_unlock (&stream->fragment_download_lock);
2759
2760 GST_MANIFEST_UNLOCK (demux);
2761 break;
2762 }
2763 default:
2764 break;
2765 }
2766
2767 gst_event_unref (event);
2768
2769 return TRUE;
2770 }
2771
2772 static gboolean
_src_query(GstPad * pad,GstObject * parent,GstQuery * query)2773 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2774 {
2775 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2776
2777 switch (GST_QUERY_TYPE (query)) {
2778 case GST_QUERY_ALLOCATION:
2779 return FALSE;
2780 break;
2781 default:
2782 break;
2783 }
2784
2785 return gst_pad_peer_query (stream->pad, query);
2786 }
2787
2788 static GstPadProbeReturn
_uri_handler_probe(GstPad * pad,GstPadProbeInfo * info,GstAdaptiveDemuxStream * stream)2789 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
2790 GstAdaptiveDemuxStream * stream)
2791 {
2792 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2793
2794 if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
2795 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2796 if (stream->fragment_bytes_downloaded == 0) {
2797 stream->last_latency =
2798 gst_adaptive_demux_get_monotonic_time (stream->demux) -
2799 (stream->download_start_time * GST_USECOND);
2800 GST_DEBUG_OBJECT (pad,
2801 "FIRST BYTE since download_start %" GST_TIME_FORMAT,
2802 GST_TIME_ARGS (stream->last_latency));
2803 }
2804 stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
2805 GST_LOG_OBJECT (pad,
2806 "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
2807 gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
2808 } else if (GST_PAD_PROBE_INFO_TYPE (info) &
2809 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
2810 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2811 GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
2812 GST_EVENT_TYPE_NAME (ev), ev);
2813 switch (GST_EVENT_TYPE (ev)) {
2814 case GST_EVENT_SEGMENT:
2815 stream->fragment_bytes_downloaded = 0;
2816 break;
2817 case GST_EVENT_EOS:
2818 {
2819 stream->last_download_time =
2820 gst_adaptive_demux_get_monotonic_time (stream->demux) -
2821 (stream->download_start_time * GST_USECOND);
2822 stream->last_bitrate =
2823 gst_util_uint64_scale (stream->fragment_bytes_downloaded,
2824 8 * GST_SECOND, stream->last_download_time);
2825 GST_DEBUG_OBJECT (pad,
2826 "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
2827 G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
2828 stream->last_bitrate);
2829 /* Calculate bitrate since URI request */
2830 }
2831 break;
2832 default:
2833 break;
2834 }
2835 }
2836
2837 return ret;
2838 }
2839
2840 /* must be called with manifest_lock taken.
2841 * Can temporarily release manifest_lock
2842 */
2843 static gboolean
gst_adaptive_demux_stream_wait_manifest_update(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2844 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2845 GstAdaptiveDemuxStream * stream)
2846 {
2847 gboolean ret = TRUE;
2848
2849 /* Wait until we're cancelled or there's something for
2850 * us to download in the playlist or the playlist
2851 * became non-live */
2852 while (TRUE) {
2853 GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2854
2855 /* get the manifest_update_lock while still holding the manifest_lock.
2856 * This will prevent other threads to signal the condition (they will need
2857 * both manifest_lock and manifest_update_lock in order to signal).
2858 * It cannot deadlock because all threads always get the manifest_lock first
2859 * and manifest_update_lock second.
2860 */
2861 g_mutex_lock (&demux->priv->manifest_update_lock);
2862
2863 GST_MANIFEST_UNLOCK (demux);
2864
2865 g_cond_wait (&demux->priv->manifest_cond,
2866 &demux->priv->manifest_update_lock);
2867 g_mutex_unlock (&demux->priv->manifest_update_lock);
2868
2869 GST_MANIFEST_LOCK (demux);
2870
2871 /* check for cancelled every time we get the manifest_lock */
2872 g_mutex_lock (&stream->fragment_download_lock);
2873 if (G_UNLIKELY (stream->cancelled)) {
2874 ret = FALSE;
2875 stream->last_ret = GST_FLOW_FLUSHING;
2876 g_mutex_unlock (&stream->fragment_download_lock);
2877 break;
2878 }
2879 g_mutex_unlock (&stream->fragment_download_lock);
2880
2881 /* Got a new fragment or not live anymore? */
2882 if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
2883 GST_FLOW_OK) {
2884 GST_DEBUG_OBJECT (demux, "new fragment available, "
2885 "not waiting for manifest update");
2886 ret = TRUE;
2887 break;
2888 }
2889
2890 if (!gst_adaptive_demux_is_live (demux)) {
2891 GST_DEBUG_OBJECT (demux, "Not live anymore, "
2892 "not waiting for manifest update");
2893 ret = FALSE;
2894 break;
2895 }
2896 }
2897 GST_DEBUG_OBJECT (demux, "Retrying now");
2898 return ret;
2899 }
2900
2901 /* must be called with manifest_lock taken */
2902 static gboolean
gst_adaptive_demux_stream_update_source(GstAdaptiveDemuxStream * stream,const gchar * uri,const gchar * referer,gboolean refresh,gboolean allow_cache)2903 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2904 const gchar * uri, const gchar * referer, gboolean refresh,
2905 gboolean allow_cache)
2906 {
2907 GstAdaptiveDemux *demux = stream->demux;
2908
2909 if (!gst_uri_is_valid (uri)) {
2910 GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2911 return FALSE;
2912 }
2913
2914 /* Try to re-use existing source element */
2915 if (stream->src != NULL) {
2916 gchar *old_protocol, *new_protocol;
2917 gchar *old_uri;
2918
2919 old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2920 old_protocol = gst_uri_get_protocol (old_uri);
2921 new_protocol = gst_uri_get_protocol (uri);
2922
2923 if (!g_str_equal (old_protocol, new_protocol)) {
2924 GstElement *src = stream->src;
2925
2926 stream->src = NULL;
2927 gst_object_unref (stream->src_srcpad);
2928 stream->src_srcpad = NULL;
2929 GST_MANIFEST_UNLOCK (demux);
2930 gst_element_set_locked_state (src, TRUE);
2931 gst_element_set_state (src, GST_STATE_NULL);
2932 gst_bin_remove (GST_BIN_CAST (demux), src);
2933 GST_MANIFEST_LOCK (demux);
2934 GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2935 } else {
2936 GError *err = NULL;
2937
2938 GST_DEBUG_OBJECT (demux, "Re-using old source element");
2939 if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
2940 &err)) {
2941 GstElement *src = stream->src;
2942
2943 stream->src = NULL;
2944 GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
2945 err ? err->message : "Unknown error");
2946 g_clear_error (&err);
2947 gst_object_unref (stream->src_srcpad);
2948 stream->src_srcpad = NULL;
2949 GST_MANIFEST_UNLOCK (demux);
2950 gst_element_set_locked_state (src, TRUE);
2951 gst_element_set_state (src, GST_STATE_NULL);
2952 gst_bin_remove (GST_BIN_CAST (demux), src);
2953 GST_MANIFEST_LOCK (demux);
2954 }
2955 }
2956 g_free (old_uri);
2957 g_free (old_protocol);
2958 g_free (new_protocol);
2959 }
2960
2961 if (stream->src == NULL) {
2962 GstPad *uri_handler_src;
2963 GstPad *queue_sink;
2964 GstPad *queue_src;
2965 GstElement *uri_handler;
2966 GstElement *queue;
2967 GstPadLinkReturn pad_link_ret;
2968 GObjectClass *gobject_class;
2969 gchar *internal_name, *bin_name;
2970
2971 /* Our src consists of a bin containing uri_handler -> queue . The
2972 * purpose of the queue is to allow the uri_handler to download an
2973 * entire fragment without blocking, so we can accurately measure the
2974 * download bitrate. */
2975
2976 queue = gst_element_factory_make ("queue", NULL);
2977 if (queue == NULL)
2978 return FALSE;
2979
2980 g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
2981 g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
2982 g_object_set (queue, "max-size-time", (guint64) 0, NULL);
2983
2984 uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
2985 if (uri_handler == NULL) {
2986 GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
2987 ("Missing plugin to handle URI: '%s'", uri), (NULL));
2988 gst_object_unref (queue);
2989 return FALSE;
2990 }
2991
2992 gobject_class = G_OBJECT_GET_CLASS (uri_handler);
2993
2994 if (g_object_class_find_property (gobject_class, "compress"))
2995 g_object_set (uri_handler, "compress", FALSE, NULL);
2996 if (g_object_class_find_property (gobject_class, "keep-alive"))
2997 g_object_set (uri_handler, "keep-alive", TRUE, NULL);
2998 if (g_object_class_find_property (gobject_class, "extra-headers")) {
2999 if (referer || refresh || !allow_cache) {
3000 GstStructure *extra_headers = gst_structure_new_empty ("headers");
3001
3002 if (referer)
3003 gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3004 NULL);
3005
3006 if (!allow_cache)
3007 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3008 "no-cache", NULL);
3009 else if (refresh)
3010 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3011 "max-age=0", NULL);
3012
3013 g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3014
3015 gst_structure_free (extra_headers);
3016 } else {
3017 g_object_set (uri_handler, "extra-headers", NULL, NULL);
3018 }
3019 }
3020
3021 /* Source bin creation */
3022 bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3023 stream->src = gst_bin_new (bin_name);
3024 g_free (bin_name);
3025 if (stream->src == NULL) {
3026 gst_object_unref (queue);
3027 gst_object_unref (uri_handler);
3028 return FALSE;
3029 }
3030
3031 gst_bin_add (GST_BIN_CAST (stream->src), queue);
3032 gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3033
3034 uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3035 queue_sink = gst_element_get_static_pad (queue, "sink");
3036
3037 pad_link_ret =
3038 gst_pad_link_full (uri_handler_src, queue_sink,
3039 GST_PAD_LINK_CHECK_NOTHING);
3040 if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3041 GST_WARNING_OBJECT (demux,
3042 "Could not link pads %s:%s to %s:%s for reason %d",
3043 GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3044 pad_link_ret);
3045 g_object_unref (queue_sink);
3046 g_object_unref (uri_handler_src);
3047 gst_object_unref (stream->src);
3048 stream->src = NULL;
3049 return FALSE;
3050 }
3051
3052 /* Add a downstream event and data probe */
3053 gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3054 (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3055
3056 g_object_unref (queue_sink);
3057 g_object_unref (uri_handler_src);
3058 queue_src = gst_element_get_static_pad (queue, "src");
3059 stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3060 g_object_unref (queue_src);
3061 gst_element_add_pad (stream->src, stream->src_srcpad);
3062
3063 gst_element_set_locked_state (stream->src, TRUE);
3064 gst_bin_add (GST_BIN_CAST (demux), stream->src);
3065 stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3066
3067 /* set up our internal floating pad to drop all events from
3068 * the http src we don't care about. On the chain function
3069 * we just push the buffer forward */
3070 internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3071 stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3072 g_free (internal_name);
3073 gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3074 GST_OBJECT_CAST (demux));
3075 GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3076 gst_pad_set_element_private (stream->internal_pad, stream);
3077 gst_pad_set_active (stream->internal_pad, TRUE);
3078 gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3079 gst_pad_set_event_function (stream->internal_pad, _src_event);
3080 gst_pad_set_query_function (stream->internal_pad, _src_query);
3081
3082 if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3083 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3084 GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3085 return FALSE;
3086 }
3087
3088 stream->uri_handler = uri_handler;
3089 stream->queue = queue;
3090
3091 stream->last_status_code = 200; /* default to OK */
3092 }
3093 return TRUE;
3094 }
3095
3096 static GstPadProbeReturn
gst_ad_stream_src_to_ready_cb(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3097 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3098 gpointer user_data)
3099 {
3100 GstAdaptiveDemuxStream *stream = user_data;
3101
3102 /* The source's src pad is IDLE so now set the state to READY */
3103 g_mutex_lock (&stream->fragment_download_lock);
3104 stream->src_at_ready = TRUE;
3105 g_cond_signal (&stream->fragment_download_cond);
3106 g_mutex_unlock (&stream->fragment_download_lock);
3107
3108 return GST_PAD_PROBE_REMOVE;
3109 }
3110
3111 #ifndef GST_DISABLE_GST_DEBUG
3112 static const char *
uritype(GstAdaptiveDemuxStream * s)3113 uritype (GstAdaptiveDemuxStream * s)
3114 {
3115 if (s->downloading_header)
3116 return "header";
3117 if (s->downloading_index)
3118 return "index";
3119 return "fragment";
3120 }
3121 #endif
3122
3123 /* must be called with manifest_lock taken.
3124 * Can temporarily release manifest_lock
3125 *
3126 * Will return when URI is fully downloaded (or aborted/errored)
3127 */
3128 static GstFlowReturn
gst_adaptive_demux_stream_download_uri(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,const gchar * uri,gint64 start,gint64 end,guint * http_status)3129 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3130 GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3131 gint64 end, guint * http_status)
3132 {
3133 GstFlowReturn ret = GST_FLOW_OK;
3134 GST_DEBUG_OBJECT (stream->pad,
3135 "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3136 uritype (stream), uri, start, end);
3137
3138 if (http_status)
3139 *http_status = 200; /* default to ok if no further information */
3140
3141 if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3142 ret = stream->last_ret = GST_FLOW_ERROR;
3143 return ret;
3144 }
3145
3146 gst_element_set_locked_state (stream->src, TRUE);
3147
3148 GST_MANIFEST_UNLOCK (demux);
3149 if (gst_element_set_state (stream->src,
3150 GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3151 /* If ranges are specified, seek to it */
3152 if (start != 0 || end != -1) {
3153 /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3154 * stop position */
3155 if (end != -1)
3156 end += 1;
3157 /* Send the seek event to the uri_handler, as the other pipeline elements
3158 * can't handle it when READY. */
3159 if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3160 GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3161 GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3162
3163 GST_MANIFEST_LOCK (demux);
3164 /* looks like the source can't handle seeks in READY */
3165 g_clear_error (&stream->last_error);
3166 stream->last_error = g_error_new (GST_CORE_ERROR,
3167 GST_CORE_ERROR_NOT_IMPLEMENTED,
3168 "Source element can't handle range requests");
3169 stream->last_ret = GST_FLOW_ERROR;
3170 } else {
3171 GST_MANIFEST_LOCK (demux);
3172 }
3173 } else {
3174 GST_MANIFEST_LOCK (demux);
3175 }
3176
3177 if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3178 stream->download_start_time =
3179 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3180
3181 /* src element is in state READY. Before we start it, we reset
3182 * download_finished
3183 */
3184 g_mutex_lock (&stream->fragment_download_lock);
3185 stream->download_finished = FALSE;
3186 stream->downloading_first_buffer = TRUE;
3187 g_mutex_unlock (&stream->fragment_download_lock);
3188
3189 GST_MANIFEST_UNLOCK (demux);
3190
3191 if (!gst_element_sync_state_with_parent (stream->src)) {
3192 GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3193 GST_MANIFEST_LOCK (demux);
3194 ret = stream->last_ret = GST_FLOW_ERROR;
3195 return ret;
3196 }
3197
3198 /* wait for the fragment to be completely downloaded */
3199 GST_DEBUG_OBJECT (stream->pad,
3200 "Waiting for %s download to finish: %s", uritype (stream), uri);
3201
3202 g_mutex_lock (&stream->fragment_download_lock);
3203 stream->src_at_ready = FALSE;
3204 if (G_UNLIKELY (stream->cancelled)) {
3205 g_mutex_unlock (&stream->fragment_download_lock);
3206 GST_MANIFEST_LOCK (demux);
3207 ret = stream->last_ret = GST_FLOW_FLUSHING;
3208 return ret;
3209 }
3210 /* download_finished is only set:
3211 * * in ::fragment_download_finish()
3212 * * if EOS is received on the _src pad
3213 * */
3214 while (!stream->cancelled && !stream->download_finished) {
3215 g_cond_wait (&stream->fragment_download_cond,
3216 &stream->fragment_download_lock);
3217 }
3218 g_mutex_unlock (&stream->fragment_download_lock);
3219
3220 GST_DEBUG_OBJECT (stream->pad,
3221 "Finished Waiting for %s download: %s", uritype (stream), uri);
3222
3223 GST_MANIFEST_LOCK (demux);
3224 g_mutex_lock (&stream->fragment_download_lock);
3225 if (G_UNLIKELY (stream->cancelled)) {
3226 ret = stream->last_ret = GST_FLOW_FLUSHING;
3227 g_mutex_unlock (&stream->fragment_download_lock);
3228 return ret;
3229 }
3230 g_mutex_unlock (&stream->fragment_download_lock);
3231
3232 ret = stream->last_ret;
3233
3234 GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3235 uritype (stream), uri, stream->last_ret,
3236 gst_flow_get_name (stream->last_ret));
3237 if (stream->last_ret != GST_FLOW_OK && http_status) {
3238 *http_status = stream->last_status_code;
3239 }
3240 }
3241
3242 /* changing src element state might try to join the streaming thread, so
3243 * we must not hold the manifest lock.
3244 */
3245 GST_MANIFEST_UNLOCK (demux);
3246 } else {
3247 GST_MANIFEST_UNLOCK (demux);
3248 if (stream->last_ret == GST_FLOW_OK)
3249 stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3250 ret = GST_FLOW_CUSTOM_ERROR;
3251 }
3252
3253 stream->src_at_ready = FALSE;
3254
3255 gst_element_set_locked_state (stream->src, TRUE);
3256 gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3257 gst_ad_stream_src_to_ready_cb, stream, NULL);
3258
3259 g_mutex_lock (&stream->fragment_download_lock);
3260 while (!stream->src_at_ready) {
3261 g_cond_wait (&stream->fragment_download_cond,
3262 &stream->fragment_download_lock);
3263 }
3264 g_mutex_unlock (&stream->fragment_download_lock);
3265
3266 gst_element_set_state (stream->src, GST_STATE_READY);
3267
3268 /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3269 GST_MANIFEST_LOCK (demux);
3270 g_mutex_lock (&stream->fragment_download_lock);
3271 if (G_UNLIKELY (stream->cancelled)) {
3272 ret = stream->last_ret = GST_FLOW_FLUSHING;
3273 g_mutex_unlock (&stream->fragment_download_lock);
3274 return ret;
3275 }
3276 g_mutex_unlock (&stream->fragment_download_lock);
3277
3278 /* deactivate and reactivate our ghostpad to make it fresh for a new
3279 * stream */
3280 gst_pad_set_active (stream->internal_pad, FALSE);
3281 gst_pad_set_active (stream->internal_pad, TRUE);
3282
3283 return ret;
3284 }
3285
3286 /* must be called with manifest_lock taken.
3287 * Can temporarily release manifest_lock
3288 */
3289 static GstFlowReturn
gst_adaptive_demux_stream_download_header_fragment(GstAdaptiveDemuxStream * stream)3290 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3291 stream)
3292 {
3293 GstAdaptiveDemux *demux = stream->demux;
3294 GstFlowReturn ret = GST_FLOW_OK;
3295
3296 if (stream->fragment.header_uri != NULL) {
3297 GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3298 G_GINT64_FORMAT, stream->fragment.header_uri,
3299 stream->fragment.header_range_start, stream->fragment.header_range_end);
3300
3301 stream->downloading_header = TRUE;
3302 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3303 stream->fragment.header_uri, stream->fragment.header_range_start,
3304 stream->fragment.header_range_end, NULL);
3305 stream->downloading_header = FALSE;
3306 }
3307
3308 /* check if we have an index */
3309 if (ret == GST_FLOW_OK) { /* TODO check for other valid types */
3310
3311 if (stream->fragment.index_uri != NULL) {
3312 GST_DEBUG_OBJECT (demux,
3313 "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3314 stream->fragment.index_uri,
3315 stream->fragment.index_range_start, stream->fragment.index_range_end);
3316 stream->downloading_index = TRUE;
3317 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3318 stream->fragment.index_uri, stream->fragment.index_range_start,
3319 stream->fragment.index_range_end, NULL);
3320 stream->downloading_index = FALSE;
3321 }
3322 }
3323
3324 return ret;
3325 }
3326
3327 /* must be called with manifest_lock taken.
3328 * Can temporarily release manifest_lock
3329 */
3330 static GstFlowReturn
gst_adaptive_demux_stream_download_fragment(GstAdaptiveDemuxStream * stream)3331 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3332 {
3333 GstAdaptiveDemux *demux = stream->demux;
3334 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3335 gchar *url = NULL;
3336 GstFlowReturn ret;
3337 gboolean retried_once = FALSE, live;
3338 guint http_status;
3339 guint last_status_code;
3340
3341 /* FIXME : */
3342 /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3343 stream->starting_fragment = TRUE;
3344 stream->last_ret = GST_FLOW_OK;
3345 stream->first_fragment_buffer = TRUE;
3346
3347 GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3348 stream->fragment.uri ? "FRAGMENT " : "",
3349 stream->fragment.header_uri ? "HEADER " : "",
3350 stream->fragment.index_uri ? "INDEX" : "");
3351
3352 if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3353 stream->fragment.index_uri == NULL)
3354 goto no_url_error;
3355
3356 if (stream->need_header) {
3357 ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3358 if (ret != GST_FLOW_OK) {
3359 return ret;
3360 }
3361 stream->need_header = FALSE;
3362 }
3363
3364 again:
3365 ret = GST_FLOW_OK;
3366 url = stream->fragment.uri;
3367 GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3368 if (!url)
3369 return ret;
3370
3371 stream->last_ret = GST_FLOW_OK;
3372 http_status = 200;
3373
3374 /* Download the actual fragment, either in fragments or in one go */
3375 if (klass->need_another_chunk && klass->need_another_chunk (stream)
3376 && stream->fragment.chunk_size != 0) {
3377 /* Handle chunk downloading */
3378 gint64 range_start, range_end, chunk_start, chunk_end;
3379 guint64 download_total_bytes;
3380 gint chunk_size = stream->fragment.chunk_size;
3381
3382 range_start = chunk_start = stream->fragment.range_start;
3383 range_end = stream->fragment.range_end;
3384 /* HTTP ranges are inclusive for the end */
3385 if (chunk_size != -1)
3386 chunk_end = range_start + chunk_size - 1;
3387 else
3388 chunk_end = range_end;
3389
3390 if (range_end != -1)
3391 chunk_end = MIN (chunk_end, range_end);
3392
3393 while (!stream->fragment.finished && (chunk_start <= range_end
3394 || range_end == -1)) {
3395 download_total_bytes = stream->download_total_bytes;
3396
3397 ret =
3398 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3399 chunk_start, chunk_end, &http_status);
3400
3401 GST_DEBUG_OBJECT (stream->pad,
3402 "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3403 http_status, gst_flow_get_name (stream->last_ret));
3404
3405 /* Don't retry for any chunks except the first. We would have sent
3406 * data downstream already otherwise and it's difficult to recover
3407 * from that in a meaningful way */
3408 if (chunk_start > range_start)
3409 retried_once = TRUE;
3410
3411 /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3412 * downloading up to -1. We don't know the full duration.
3413 * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3414 if (ret != GST_FLOW_OK && chunk_end == -1) {
3415 break;
3416 } else if (ret != GST_FLOW_OK) {
3417 chunk_end = -1;
3418 stream->last_ret = GST_FLOW_OK;
3419 continue;
3420 }
3421
3422 if (chunk_end == -1)
3423 break;
3424
3425 /* Short read, we're at the end now */
3426 if (stream->download_total_bytes - download_total_bytes <
3427 chunk_end + 1 - chunk_start)
3428 break;
3429
3430 if (!klass->need_another_chunk (stream))
3431 break;
3432
3433 /* HTTP ranges are inclusive for the end */
3434 chunk_start += chunk_size;
3435 chunk_size = stream->fragment.chunk_size;
3436 if (chunk_size != -1)
3437 chunk_end = chunk_start + chunk_size - 1;
3438 else
3439 chunk_end = range_end;
3440
3441 if (range_end != -1)
3442 chunk_end = MIN (chunk_end, range_end);
3443 }
3444 } else {
3445 ret =
3446 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3447 stream->fragment.range_start, stream->fragment.range_end, &http_status);
3448 GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3449 stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3450 }
3451 if (ret == GST_FLOW_OK)
3452 goto beach;
3453
3454 g_mutex_lock (&stream->fragment_download_lock);
3455 if (G_UNLIKELY (stream->cancelled)) {
3456 g_mutex_unlock (&stream->fragment_download_lock);
3457 return ret;
3458 }
3459 g_mutex_unlock (&stream->fragment_download_lock);
3460
3461 /* TODO check if we are truly stopping */
3462 if (ret != GST_FLOW_CUSTOM_ERROR)
3463 goto beach;
3464
3465 last_status_code = stream->last_status_code;
3466 GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3467 last_status_code, stream->download_error_count);
3468
3469 live = gst_adaptive_demux_is_live (demux);
3470 if (!retried_once && ((last_status_code / 100 == 4 && live)
3471 || last_status_code / 100 == 5)) {
3472 /* 4xx/5xx */
3473 /* if current position is before available start, switch to next */
3474 if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3475 goto flushing;
3476
3477 if (live) {
3478 gint64 range_start, range_stop;
3479
3480 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3481 &range_stop))
3482 goto flushing;
3483
3484 if (demux->segment.position < range_start) {
3485 GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3486 stream->last_ret = GST_FLOW_OK;
3487 ret = gst_adaptive_demux_eos_handling (stream);
3488 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3489 gst_flow_get_name (ret));
3490 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3491 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3492 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3493 gst_flow_get_name (ret));
3494 if (ret == GST_FLOW_OK) {
3495 retried_once = TRUE;
3496 goto again;
3497 }
3498 } else if (demux->segment.position > range_stop) {
3499 /* wait a bit to be in range, we don't have any locks at that point */
3500 gint64 wait_time =
3501 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3502 if (wait_time > 0) {
3503 gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3504
3505 GST_DEBUG_OBJECT (stream->pad,
3506 "Download waiting for %" GST_TIME_FORMAT,
3507 GST_TIME_ARGS (wait_time));
3508
3509 GST_MANIFEST_UNLOCK (demux);
3510 g_mutex_lock (&stream->fragment_download_lock);
3511 if (G_UNLIKELY (stream->cancelled)) {
3512 g_mutex_unlock (&stream->fragment_download_lock);
3513 GST_MANIFEST_LOCK (demux);
3514 stream->last_ret = GST_FLOW_FLUSHING;
3515 goto flushing;
3516 }
3517 do {
3518 g_cond_wait_until (&stream->fragment_download_cond,
3519 &stream->fragment_download_lock, end_time);
3520 if (G_UNLIKELY (stream->cancelled)) {
3521 g_mutex_unlock (&stream->fragment_download_lock);
3522 GST_MANIFEST_LOCK (demux);
3523 stream->last_ret = GST_FLOW_FLUSHING;
3524 goto flushing;
3525 }
3526 } while (!stream->download_finished);
3527 g_mutex_unlock (&stream->fragment_download_lock);
3528
3529 GST_MANIFEST_LOCK (demux);
3530 }
3531 }
3532 }
3533
3534 flushing:
3535 if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3536 /* looks like there is no way of knowing when a live stream has ended
3537 * Have to assume we are falling behind and cause a manifest reload */
3538 GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3539 return GST_FLOW_EOS;
3540 }
3541 } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3542 /* If this is the last fragment, consider failures EOS and not actual
3543 * errors. Due to rounding errors in the durations, the last fragment
3544 * might not actually exist */
3545 GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3546 return GST_FLOW_EOS;
3547 } else {
3548 /* retry once (same segment) for 5xx (server errors) */
3549 if (!retried_once) {
3550 retried_once = TRUE;
3551 /* wait a short time in case the server needs a bit to recover, we don't
3552 * care if we get woken up before end time. We can use sleep here since
3553 * we're already blocking and just want to wait some time. */
3554 g_usleep (100000); /* a tenth of a second */
3555 goto again;
3556 }
3557 }
3558
3559 beach:
3560 return ret;
3561
3562 no_url_error:
3563 {
3564 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3565 (_("Failed to get fragment URL.")),
3566 ("An error happened when getting fragment URL"));
3567 gst_task_stop (stream->download_task);
3568 return GST_FLOW_ERROR;
3569 }
3570 }
3571
3572 /* this function will take the manifest_lock and will keep it until the end.
3573 * It will release it temporarily only when going to sleep.
3574 * Every time it takes the manifest_lock, it will check for cancelled condition
3575 */
3576 static void
gst_adaptive_demux_stream_download_loop(GstAdaptiveDemuxStream * stream)3577 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3578 {
3579 GstAdaptiveDemux *demux = stream->demux;
3580 GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3581 GstFlowReturn ret;
3582 gboolean live;
3583
3584 GST_LOG_OBJECT (stream->pad, "download loop start");
3585
3586 GST_MANIFEST_LOCK (demux);
3587
3588 g_mutex_lock (&stream->fragment_download_lock);
3589 if (G_UNLIKELY (stream->cancelled)) {
3590 stream->last_ret = GST_FLOW_FLUSHING;
3591 g_mutex_unlock (&stream->fragment_download_lock);
3592 goto cancelled;
3593 }
3594 g_mutex_unlock (&stream->fragment_download_lock);
3595
3596 /* Check if we're done with our segment */
3597 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3598 if (demux->segment.rate > 0) {
3599 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3600 && stream->segment.position >= stream->segment.stop) {
3601 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3602 ret = GST_FLOW_EOS;
3603 gst_task_stop (stream->download_task);
3604 goto end_of_manifest;
3605 }
3606 } else {
3607 if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3608 && stream->segment.position <= stream->segment.start) {
3609 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3610 ret = GST_FLOW_EOS;
3611 gst_task_stop (stream->download_task);
3612 goto end_of_manifest;
3613 }
3614 }
3615 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3616
3617 /* Cleanup old streams if any */
3618 if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3619 GList *old_streams = demux->priv->old_streams;
3620 demux->priv->old_streams = NULL;
3621
3622 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
3623 g_list_free_full (old_streams,
3624 (GDestroyNotify) gst_adaptive_demux_stream_free);
3625 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
3626
3627 /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
3628 * Recheck the cancelled flag.
3629 */
3630 g_mutex_lock (&stream->fragment_download_lock);
3631 if (G_UNLIKELY (stream->cancelled)) {
3632 stream->last_ret = GST_FLOW_FLUSHING;
3633 g_mutex_unlock (&stream->fragment_download_lock);
3634 goto cancelled;
3635 }
3636 g_mutex_unlock (&stream->fragment_download_lock);
3637 }
3638
3639 /* Restarting download, figure out new position
3640 * FIXME : Move this to a separate function ? */
3641 if (G_UNLIKELY (stream->restart_download)) {
3642 GstEvent *seg_event;
3643 GstClockTime cur, ts = 0;
3644 gint64 pos;
3645
3646 GST_DEBUG_OBJECT (stream->pad,
3647 "Activating stream due to reconfigure event");
3648
3649 if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
3650 ts = (GstClockTime) pos;
3651 GST_DEBUG_OBJECT (demux, "Downstream position: %"
3652 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3653 } else {
3654 /* query other pads as some faulty element in the pad's branch might
3655 * reject position queries. This should be better than using the
3656 * demux segment position that can be much ahead */
3657 GList *iter;
3658
3659 for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
3660 GstAdaptiveDemuxStream *cur_stream =
3661 (GstAdaptiveDemuxStream *) iter->data;
3662
3663 if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
3664 &pos)) {
3665 ts = (GstClockTime) pos;
3666 GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
3667 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3668 break;
3669 }
3670 }
3671 }
3672
3673 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3674 cur =
3675 gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
3676 stream->segment.position);
3677
3678 /* we might have already pushed this data */
3679 ts = MAX (ts, cur);
3680
3681 GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
3682 "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3683
3684 if (GST_CLOCK_TIME_IS_VALID (ts)) {
3685 GstClockTime offset, period_start;
3686
3687 offset =
3688 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3689 period_start = gst_adaptive_demux_get_period_start_time (demux);
3690
3691 /* TODO check return */
3692 gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
3693 0, ts, &ts);
3694
3695 stream->segment.position = ts - period_start + offset;
3696 }
3697
3698 /* The stream's segment is still correct except for
3699 * the position, so let's send a new one with the
3700 * updated position */
3701 seg_event = gst_event_new_segment (&stream->segment);
3702 gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
3703 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3704
3705 GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
3706 GST_PTR_FORMAT, seg_event);
3707 gst_pad_push_event (stream->pad, seg_event);
3708
3709 stream->discont = TRUE;
3710 stream->restart_download = FALSE;
3711 }
3712
3713 live = gst_adaptive_demux_is_live (demux);
3714
3715 /* Get information about the fragment to download */
3716 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3717 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3718 GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
3719 ret, gst_flow_get_name (ret));
3720 if (ret == GST_FLOW_OK) {
3721
3722 /* wait for live fragments to be available */
3723 if (live) {
3724 gint64 wait_time =
3725 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3726 if (wait_time > 0) {
3727 GstClockTime end_time =
3728 gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
3729
3730 GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
3731 GST_TIME_ARGS (wait_time));
3732
3733 GST_MANIFEST_UNLOCK (demux);
3734
3735 g_mutex_lock (&stream->fragment_download_lock);
3736 if (G_UNLIKELY (stream->cancelled)) {
3737 g_mutex_unlock (&stream->fragment_download_lock);
3738 GST_MANIFEST_LOCK (demux);
3739 stream->last_ret = GST_FLOW_FLUSHING;
3740 goto cancelled;
3741 }
3742 gst_adaptive_demux_wait_until (demux->realtime_clock,
3743 &stream->fragment_download_cond, &stream->fragment_download_lock,
3744 end_time);
3745 g_mutex_unlock (&stream->fragment_download_lock);
3746
3747 GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
3748
3749 GST_MANIFEST_LOCK (demux);
3750
3751 g_mutex_lock (&stream->fragment_download_lock);
3752 if (G_UNLIKELY (stream->cancelled)) {
3753 stream->last_ret = GST_FLOW_FLUSHING;
3754 g_mutex_unlock (&stream->fragment_download_lock);
3755 goto cancelled;
3756 }
3757 g_mutex_unlock (&stream->fragment_download_lock);
3758 }
3759 }
3760
3761 stream->last_ret = GST_FLOW_OK;
3762
3763 next_download = gst_adaptive_demux_get_monotonic_time (demux);
3764 ret = gst_adaptive_demux_stream_download_fragment (stream);
3765
3766 if (ret == GST_FLOW_FLUSHING) {
3767 g_mutex_lock (&stream->fragment_download_lock);
3768 if (G_UNLIKELY (stream->cancelled)) {
3769 stream->last_ret = GST_FLOW_FLUSHING;
3770 g_mutex_unlock (&stream->fragment_download_lock);
3771 goto cancelled;
3772 }
3773 g_mutex_unlock (&stream->fragment_download_lock);
3774 }
3775
3776 } else {
3777 stream->last_ret = ret;
3778 }
3779
3780 switch (ret) {
3781 case GST_FLOW_OK:
3782 break; /* all is good, let's go */
3783 case GST_FLOW_EOS:
3784 GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
3785
3786 /* we push the EOS after releasing the object lock */
3787 if (gst_adaptive_demux_is_live (demux)
3788 && (demux->segment.rate == 1.0
3789 || gst_adaptive_demux_stream_in_live_seek_range (demux,
3790 stream))) {
3791 GstAdaptiveDemuxClass *demux_class =
3792 GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3793
3794 /* this might be a fragment download error, refresh the manifest, just in case */
3795 if (!demux_class->requires_periodical_playlist_update (demux)) {
3796 ret = gst_adaptive_demux_update_manifest (demux);
3797 break;
3798 /* Wait only if we can ensure current manifest has been expired.
3799 * The meaning "we have next period" *WITH* EOS is that, current
3800 * period has been ended but we can continue to the next period */
3801 } else if (!gst_adaptive_demux_has_next_period (demux) &&
3802 gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
3803 goto end;
3804 }
3805 gst_task_stop (stream->download_task);
3806 if (stream->replaced) {
3807 goto end;
3808 }
3809 } else {
3810 gst_task_stop (stream->download_task);
3811 }
3812
3813 if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
3814 if (gst_adaptive_demux_has_next_period (demux)) {
3815 GST_DEBUG_OBJECT (stream->pad,
3816 "Next period available, not sending EOS");
3817 gst_adaptive_demux_advance_period (demux);
3818 ret = GST_FLOW_OK;
3819 }
3820 }
3821 break;
3822
3823 case GST_FLOW_NOT_LINKED:
3824 {
3825 GstFlowReturn ret;
3826 gst_task_stop (stream->download_task);
3827
3828 ret = gst_adaptive_demux_combine_flows (demux);
3829 if (ret == GST_FLOW_NOT_LINKED) {
3830 GST_ELEMENT_FLOW_ERROR (demux, ret);
3831 }
3832 }
3833 break;
3834
3835 case GST_FLOW_FLUSHING:{
3836 GList *iter;
3837
3838 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
3839 GstAdaptiveDemuxStream *other;
3840
3841 other = iter->data;
3842 gst_task_stop (other->download_task);
3843 }
3844 }
3845 break;
3846
3847 default:
3848 if (ret <= GST_FLOW_ERROR) {
3849 gboolean is_live = gst_adaptive_demux_is_live (demux);
3850 GST_WARNING_OBJECT (demux, "Error while downloading fragment");
3851 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
3852 goto download_error;
3853 }
3854
3855 g_clear_error (&stream->last_error);
3856
3857 /* First try to update the playlist for non-live playlists
3858 * in case the URIs have changed in the meantime. But only
3859 * try it the first time, after that we're going to wait a
3860 * a bit to not flood the server */
3861 if (stream->download_error_count == 1 && !is_live) {
3862 /* TODO hlsdemux had more options to this function (boolean and err) */
3863
3864 if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3865 /* Retry immediately, the playlist actually has changed */
3866 GST_DEBUG_OBJECT (demux, "Updated the playlist");
3867 goto end;
3868 }
3869 }
3870
3871 /* Wait half the fragment duration before retrying */
3872 next_download += stream->fragment.duration / 2;
3873
3874 GST_MANIFEST_UNLOCK (demux);
3875
3876 g_mutex_lock (&stream->fragment_download_lock);
3877 if (G_UNLIKELY (stream->cancelled)) {
3878 g_mutex_unlock (&stream->fragment_download_lock);
3879 GST_MANIFEST_LOCK (demux);
3880 stream->last_ret = GST_FLOW_FLUSHING;
3881 goto cancelled;
3882 }
3883 gst_adaptive_demux_wait_until (demux->realtime_clock,
3884 &stream->fragment_download_cond, &stream->fragment_download_lock,
3885 next_download);
3886 g_mutex_unlock (&stream->fragment_download_lock);
3887
3888 GST_DEBUG_OBJECT (demux, "Retrying now");
3889
3890 GST_MANIFEST_LOCK (demux);
3891
3892 g_mutex_lock (&stream->fragment_download_lock);
3893 if (G_UNLIKELY (stream->cancelled)) {
3894 stream->last_ret = GST_FLOW_FLUSHING;
3895 g_mutex_unlock (&stream->fragment_download_lock);
3896 goto cancelled;
3897 }
3898 g_mutex_unlock (&stream->fragment_download_lock);
3899
3900 /* Refetch the playlist now after we waited */
3901 if (!is_live
3902 && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3903 GST_DEBUG_OBJECT (demux, "Updated the playlist");
3904 }
3905 goto end;
3906 }
3907 break;
3908 }
3909
3910 end_of_manifest:
3911 if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3912 if (GST_OBJECT_PARENT (stream->pad) != NULL) {
3913 if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
3914 GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
3915 gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3916 } else {
3917 GST_DEBUG_OBJECT (stream->src,
3918 "Stream is EOS, but we're switching fragments. Not sending.");
3919 }
3920 } else {
3921 GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
3922 goto download_error;
3923 }
3924 }
3925
3926 end:
3927 GST_MANIFEST_UNLOCK (demux);
3928 GST_LOG_OBJECT (stream->pad, "download loop end");
3929 return;
3930
3931 cancelled:
3932 {
3933 GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3934 goto end;
3935 }
3936 download_error:
3937 {
3938 GstMessage *msg;
3939
3940 if (stream->last_error) {
3941 gchar *debug = g_strdup_printf ("Error on stream %s:%s",
3942 GST_DEBUG_PAD_NAME (stream->pad));
3943 msg =
3944 gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
3945 debug);
3946 GST_ERROR_OBJECT (stream->pad, "Download error: %s",
3947 stream->last_error->message);
3948 g_free (debug);
3949 } else {
3950 GError *err =
3951 g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
3952 _("Couldn't download fragments"));
3953 msg =
3954 gst_message_new_error (GST_OBJECT_CAST (demux), err,
3955 "Fragment downloading has failed consecutive times");
3956 g_error_free (err);
3957 GST_ERROR_OBJECT (stream->pad,
3958 "Download error: Couldn't download fragments, too many failures");
3959 }
3960
3961 gst_task_stop (stream->download_task);
3962 if (stream->src) {
3963 GstElement *src = stream->src;
3964
3965 stream->src = NULL;
3966 GST_MANIFEST_UNLOCK (demux);
3967 gst_element_set_locked_state (src, TRUE);
3968 gst_element_set_state (src, GST_STATE_NULL);
3969 gst_bin_remove (GST_BIN_CAST (demux), src);
3970 GST_MANIFEST_LOCK (demux);
3971 }
3972
3973 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3974
3975 goto end;
3976 }
3977 }
3978
3979 static void
gst_adaptive_demux_updates_loop(GstAdaptiveDemux * demux)3980 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
3981 {
3982 GstClockTime next_update;
3983 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3984
3985 /* Loop for updating of the playlist. This periodically checks if
3986 * the playlist is updated and does so, then signals the streaming
3987 * thread in case it can continue downloading now. */
3988
3989 /* block until the next scheduled update or the signal to quit this thread */
3990 GST_DEBUG_OBJECT (demux, "Started updates task");
3991
3992 GST_MANIFEST_LOCK (demux);
3993
3994 next_update =
3995 gst_adaptive_demux_get_monotonic_time (demux) +
3996 klass->get_manifest_update_interval (demux) * GST_USECOND;
3997
3998 /* Updating playlist only needed for live playlists */
3999 while (gst_adaptive_demux_is_live (demux)) {
4000 GstFlowReturn ret = GST_FLOW_OK;
4001
4002 /* Wait here until we should do the next update or we're cancelled */
4003 GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4004
4005 GST_MANIFEST_UNLOCK (demux);
4006
4007 g_mutex_lock (&demux->priv->updates_timed_lock);
4008 if (demux->priv->stop_updates_task) {
4009 g_mutex_unlock (&demux->priv->updates_timed_lock);
4010 goto quit;
4011 }
4012 gst_adaptive_demux_wait_until (demux->realtime_clock,
4013 &demux->priv->updates_timed_cond,
4014 &demux->priv->updates_timed_lock, next_update);
4015 g_mutex_unlock (&demux->priv->updates_timed_lock);
4016
4017 g_mutex_lock (&demux->priv->updates_timed_lock);
4018 if (demux->priv->stop_updates_task) {
4019 g_mutex_unlock (&demux->priv->updates_timed_lock);
4020 goto quit;
4021 }
4022 g_mutex_unlock (&demux->priv->updates_timed_lock);
4023
4024 GST_MANIFEST_LOCK (demux);
4025
4026 GST_DEBUG_OBJECT (demux, "Updating playlist");
4027
4028 ret = gst_adaptive_demux_update_manifest (demux);
4029
4030 if (ret == GST_FLOW_EOS) {
4031 } else if (ret != GST_FLOW_OK) {
4032 /* update_failed_count is used only here, no need to protect it */
4033 demux->priv->update_failed_count++;
4034 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4035 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4036 gst_flow_get_name (ret));
4037 next_update = gst_adaptive_demux_get_monotonic_time (demux)
4038 + klass->get_manifest_update_interval (demux) * GST_USECOND;
4039 } else {
4040 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4041 (_("Internal data stream error.")), ("Could not update playlist"));
4042 GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4043 gst_task_stop (demux->priv->updates_task);
4044 GST_MANIFEST_UNLOCK (demux);
4045 goto end;
4046 }
4047 } else {
4048 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4049 demux->priv->update_failed_count = 0;
4050 next_update =
4051 gst_adaptive_demux_get_monotonic_time (demux) +
4052 klass->get_manifest_update_interval (demux) * GST_USECOND;
4053
4054 /* Wake up download tasks */
4055 g_mutex_lock (&demux->priv->manifest_update_lock);
4056 g_cond_broadcast (&demux->priv->manifest_cond);
4057 g_mutex_unlock (&demux->priv->manifest_update_lock);
4058 }
4059 }
4060
4061 GST_MANIFEST_UNLOCK (demux);
4062
4063 quit:
4064 {
4065 GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4066 }
4067
4068 end:
4069 {
4070 return;
4071 }
4072 }
4073
4074 /* must be called with manifest_lock taken */
4075 static gboolean
gst_adaptive_demux_stream_push_event(GstAdaptiveDemuxStream * stream,GstEvent * event)4076 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4077 GstEvent * event)
4078 {
4079 gboolean ret;
4080 GstPad *pad;
4081 GstAdaptiveDemux *demux = stream->demux;
4082
4083 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4084 stream->eos = TRUE;
4085 }
4086
4087 pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4088
4089 /* Can't push events holding the manifest lock */
4090 GST_MANIFEST_UNLOCK (demux);
4091
4092 GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4093 "Pushing event %" GST_PTR_FORMAT, event);
4094
4095 ret = gst_pad_push_event (pad, event);
4096
4097 gst_object_unref (pad);
4098
4099 GST_MANIFEST_LOCK (demux);
4100
4101 return ret;
4102 }
4103
4104 /* must be called with manifest_lock taken */
4105 static gboolean
gst_adaptive_demux_is_live(GstAdaptiveDemux * demux)4106 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4107 {
4108 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4109
4110 if (klass->is_live)
4111 return klass->is_live (demux);
4112 return FALSE;
4113 }
4114
4115 /* must be called with manifest_lock taken */
4116 static GstFlowReturn
gst_adaptive_demux_stream_seek(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,gboolean forward,GstSeekFlags flags,GstClockTime ts,GstClockTime * final_ts)4117 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4118 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4119 GstClockTime ts, GstClockTime * final_ts)
4120 {
4121 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4122
4123 if (klass->stream_seek)
4124 return klass->stream_seek (stream, forward, flags, ts, final_ts);
4125 return GST_FLOW_ERROR;
4126 }
4127
4128 /* must be called with manifest_lock taken */
4129 static gboolean
gst_adaptive_demux_stream_has_next_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4130 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4131 GstAdaptiveDemuxStream * stream)
4132 {
4133 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4134 gboolean ret = TRUE;
4135
4136 if (klass->stream_has_next_fragment)
4137 ret = klass->stream_has_next_fragment (stream);
4138
4139 return ret;
4140 }
4141
4142 /* must be called with manifest_lock taken */
4143 /* Called from:
4144 * the ::finish_fragment() handlers when an *actual* fragment is done
4145 * */
4146 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4147 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4148 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4149 {
4150 GstFlowReturn ret;
4151
4152 if (stream->last_ret == GST_FLOW_OK) {
4153 stream->last_ret =
4154 gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4155 duration);
4156 }
4157 ret = stream->last_ret;
4158
4159 return ret;
4160 }
4161
4162 /* must be called with manifest_lock taken */
4163 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4164 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4165 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4166 {
4167 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4168 GstFlowReturn ret;
4169
4170 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4171
4172 GST_LOG_OBJECT (stream->pad,
4173 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4174 GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4175
4176 stream->download_error_count = 0;
4177 g_clear_error (&stream->last_error);
4178
4179 /* FIXME - url has no indication of byte ranges for subsegments */
4180 /* FIXME : All those time statistics are biased, since they are calculated
4181 * *AFTER* the queue2, which might be blocking. They should ideally be
4182 * calculated *before* queue2 in the uri_handler_probe */
4183 gst_element_post_message (GST_ELEMENT_CAST (demux),
4184 gst_message_new_element (GST_OBJECT_CAST (demux),
4185 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4186 "manifest-uri", G_TYPE_STRING,
4187 demux->manifest_uri, "uri", G_TYPE_STRING,
4188 stream->fragment.uri, "fragment-start-time",
4189 GST_TYPE_CLOCK_TIME, stream->download_start_time,
4190 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4191 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4192 stream->download_total_bytes, "fragment-download-time",
4193 GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4194
4195 /* Don't update to the end of the segment if in reverse playback */
4196 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4197 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4198 GstClockTime offset =
4199 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4200 GstClockTime period_start =
4201 gst_adaptive_demux_get_period_start_time (demux);
4202
4203 stream->segment.position += duration;
4204
4205 /* Convert from position inside the stream's segment to the demuxer's
4206 * segment, they are not necessarily the same */
4207 if (stream->segment.position - offset + period_start >
4208 demux->segment.position)
4209 demux->segment.position =
4210 stream->segment.position - offset + period_start;
4211 }
4212 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4213
4214 /* When advancing with a non 1.0 rate on live streams, we need to check
4215 * the live seeking range again to make sure we can still advance to
4216 * that position */
4217 if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4218 if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4219 ret = GST_FLOW_EOS;
4220 else
4221 ret = klass->stream_advance_fragment (stream);
4222 } else if (gst_adaptive_demux_is_live (demux)
4223 || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4224 ret = klass->stream_advance_fragment (stream);
4225 } else {
4226 ret = GST_FLOW_EOS;
4227 }
4228
4229 stream->download_start_time =
4230 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4231
4232 if (ret == GST_FLOW_OK) {
4233 if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4234 gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4235 stream->need_header = TRUE;
4236 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4237 }
4238
4239 /* the subclass might want to switch pads */
4240 if (G_UNLIKELY (demux->next_streams)) {
4241 GList *iter;
4242 gboolean can_expose = TRUE;
4243
4244 gst_task_stop (stream->download_task);
4245
4246 ret = GST_FLOW_EOS;
4247
4248 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4249 /* Only expose if all streams are now cancelled or finished downloading */
4250 GstAdaptiveDemuxStream *other = iter->data;
4251 if (other != stream) {
4252 g_mutex_lock (&other->fragment_download_lock);
4253 can_expose &= (other->cancelled == TRUE
4254 || other->download_finished == TRUE);
4255 g_mutex_unlock (&other->fragment_download_lock);
4256 }
4257 }
4258
4259 if (can_expose) {
4260 GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4261 "to do bitrate switching");
4262 gst_adaptive_demux_prepare_streams (demux, FALSE);
4263 gst_adaptive_demux_start_tasks (demux, TRUE);
4264 } else {
4265 GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4266 }
4267 }
4268 }
4269
4270 return ret;
4271 }
4272
4273 /* must be called with manifest_lock taken */
4274 static gboolean
gst_adaptive_demux_stream_select_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 bitrate)4275 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4276 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4277 {
4278 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4279
4280 if (klass->stream_select_bitrate)
4281 return klass->stream_select_bitrate (stream, bitrate);
4282 return FALSE;
4283 }
4284
4285 /* must be called with manifest_lock taken */
4286 static GstFlowReturn
gst_adaptive_demux_stream_update_fragment_info(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4287 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4288 GstAdaptiveDemuxStream * stream)
4289 {
4290 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4291 GstFlowReturn ret;
4292
4293 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4294 GST_FLOW_ERROR);
4295
4296 /* Make sure the sub-class will update bitrate, or else
4297 * we will later */
4298 stream->fragment.bitrate = 0;
4299 stream->fragment.finished = FALSE;
4300
4301 GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4302 GST_TIME_ARGS (stream->segment.position));
4303
4304 ret = klass->stream_update_fragment_info (stream);
4305
4306 GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4307 stream->fragment.uri);
4308 if (ret == GST_FLOW_OK) {
4309 GST_LOG_OBJECT (stream->pad,
4310 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4311 GST_TIME_ARGS (stream->fragment.timestamp),
4312 GST_TIME_ARGS (stream->fragment.duration));
4313 GST_LOG_OBJECT (stream->pad,
4314 "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4315 stream->fragment.range_start, stream->fragment.range_end);
4316 }
4317
4318 return ret;
4319 }
4320
4321 /* must be called with manifest_lock taken */
4322 static gint64
gst_adaptive_demux_stream_get_fragment_waiting_time(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4323 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4324 demux, GstAdaptiveDemuxStream * stream)
4325 {
4326 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4327
4328 if (klass->stream_get_fragment_waiting_time)
4329 return klass->stream_get_fragment_waiting_time (stream);
4330 return 0;
4331 }
4332
4333 /* must be called with manifest_lock taken */
4334 static GstFlowReturn
gst_adaptive_demux_update_manifest_default(GstAdaptiveDemux * demux)4335 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4336 {
4337 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4338 GstFragment *download;
4339 GstBuffer *buffer;
4340 GstFlowReturn ret;
4341 GError *error = NULL;
4342
4343 download = gst_uri_downloader_fetch_uri (demux->downloader,
4344 demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4345 if (download) {
4346 g_free (demux->manifest_uri);
4347 g_free (demux->manifest_base_uri);
4348 if (download->redirect_permanent && download->redirect_uri) {
4349 demux->manifest_uri = g_strdup (download->redirect_uri);
4350 demux->manifest_base_uri = NULL;
4351 } else {
4352 demux->manifest_uri = g_strdup (download->uri);
4353 demux->manifest_base_uri = g_strdup (download->redirect_uri);
4354 }
4355
4356 buffer = gst_fragment_get_buffer (download);
4357 g_object_unref (download);
4358 ret = klass->update_manifest_data (demux, buffer);
4359 gst_buffer_unref (buffer);
4360 /* FIXME: Should the manifest uri vars be reverted to original
4361 * values if updating fails? */
4362 } else {
4363 GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4364 error->message);
4365 ret = GST_FLOW_NOT_LINKED;
4366 }
4367 g_clear_error (&error);
4368
4369 return ret;
4370 }
4371
4372 /* must be called with manifest_lock taken */
4373 static GstFlowReturn
gst_adaptive_demux_update_manifest(GstAdaptiveDemux * demux)4374 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4375 {
4376 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4377 GstFlowReturn ret;
4378
4379 ret = klass->update_manifest (demux);
4380
4381 if (ret == GST_FLOW_OK) {
4382 GstClockTime duration;
4383 /* Send an updated duration message */
4384 duration = klass->get_duration (demux);
4385 if (duration != GST_CLOCK_TIME_NONE) {
4386 GST_DEBUG_OBJECT (demux,
4387 "Sending duration message : %" GST_TIME_FORMAT,
4388 GST_TIME_ARGS (duration));
4389 gst_element_post_message (GST_ELEMENT (demux),
4390 gst_message_new_duration_changed (GST_OBJECT (demux)));
4391 } else {
4392 GST_DEBUG_OBJECT (demux,
4393 "Duration unknown, can not send the duration message");
4394 }
4395
4396 /* If a manifest changes it's liveness or periodic updateness, we need
4397 * to start/stop the manifest update task appropriately */
4398 /* Keep this condition in sync with the one in
4399 * gst_adaptive_demux_start_manifest_update_task()
4400 */
4401 if (gst_adaptive_demux_is_live (demux) &&
4402 klass->requires_periodical_playlist_update (demux)) {
4403 gst_adaptive_demux_start_manifest_update_task (demux);
4404 } else {
4405 gst_adaptive_demux_stop_manifest_update_task (demux);
4406 }
4407 }
4408
4409 return ret;
4410 }
4411
4412 void
gst_adaptive_demux_stream_fragment_clear(GstAdaptiveDemuxStreamFragment * f)4413 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4414 {
4415 g_free (f->uri);
4416 f->uri = NULL;
4417 f->range_start = 0;
4418 f->range_end = -1;
4419
4420 g_free (f->header_uri);
4421 f->header_uri = NULL;
4422 f->header_range_start = 0;
4423 f->header_range_end = -1;
4424
4425 g_free (f->index_uri);
4426 f->index_uri = NULL;
4427 f->index_range_start = 0;
4428 f->index_range_end = -1;
4429
4430 f->finished = FALSE;
4431 }
4432
4433 /* must be called with manifest_lock taken */
4434 static gboolean
gst_adaptive_demux_has_next_period(GstAdaptiveDemux * demux)4435 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4436 {
4437 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4438 gboolean ret = FALSE;
4439
4440 if (klass->has_next_period)
4441 ret = klass->has_next_period (demux);
4442 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4443 return ret;
4444 }
4445
4446 /* must be called with manifest_lock taken */
4447 static void
gst_adaptive_demux_advance_period(GstAdaptiveDemux * demux)4448 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4449 {
4450 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4451
4452 g_return_if_fail (klass->advance_period != NULL);
4453
4454 GST_DEBUG_OBJECT (demux, "Advancing to next period");
4455 klass->advance_period (demux);
4456 gst_adaptive_demux_prepare_streams (demux, FALSE);
4457 gst_adaptive_demux_start_tasks (demux, TRUE);
4458 }
4459
4460 /**
4461 * gst_adaptive_demux_get_monotonic_time:
4462 * Returns: a monotonically increasing time, using the system realtime clock
4463 */
4464 GstClockTime
gst_adaptive_demux_get_monotonic_time(GstAdaptiveDemux * demux)4465 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4466 {
4467 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4468 return gst_clock_get_time (demux->realtime_clock);
4469 }
4470
4471 /**
4472 * gst_adaptive_demux_get_client_now_utc:
4473 * @demux: #GstAdaptiveDemux
4474 * Returns: the client's estimate of UTC
4475 *
4476 * Used to find the client's estimate of UTC, using the system realtime clock.
4477 */
4478 GDateTime *
gst_adaptive_demux_get_client_now_utc(GstAdaptiveDemux * demux)4479 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4480 {
4481 GstClockTime rtc_now;
4482 gint64 utc_now;
4483 GTimeVal gtv;
4484
4485 rtc_now = gst_clock_get_time (demux->realtime_clock);
4486 utc_now = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4487 gtv.tv_sec = utc_now / G_TIME_SPAN_SECOND;
4488 gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
4489 return g_date_time_new_from_timeval_utc (>v);
4490 }
4491
4492 /**
4493 * gst_adaptive_demux_is_running
4494 * @demux: #GstAdaptiveDemux
4495 * Returns: whether the demuxer is processing data
4496 *
4497 * Returns FALSE if shutdown has started (transitioning down from
4498 * PAUSED), otherwise TRUE.
4499 */
4500 gboolean
gst_adaptive_demux_is_running(GstAdaptiveDemux * demux)4501 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4502 {
4503 return g_atomic_int_get (&demux->running);
4504 }
4505
4506 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_new(GCond * cond,GMutex * mutex)4507 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4508 {
4509 GstAdaptiveDemuxTimer *timer;
4510
4511 timer = g_slice_new (GstAdaptiveDemuxTimer);
4512 timer->fired = FALSE;
4513 timer->cond = cond;
4514 timer->mutex = mutex;
4515 timer->ref_count = 1;
4516 return timer;
4517 }
4518
4519 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_ref(GstAdaptiveDemuxTimer * timer)4520 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4521 {
4522 g_return_val_if_fail (timer != NULL, NULL);
4523 g_atomic_int_inc (&timer->ref_count);
4524 return timer;
4525 }
4526
4527 static void
gst_adaptive_demux_timer_unref(GstAdaptiveDemuxTimer * timer)4528 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4529 {
4530 g_return_if_fail (timer != NULL);
4531 if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4532 g_slice_free (GstAdaptiveDemuxTimer, timer);
4533 }
4534 }
4535
4536 /* gst_adaptive_demux_wait_until:
4537 * A replacement for g_cond_wait_until that uses the clock rather
4538 * than system time to control the duration of the sleep. Typically
4539 * clock is actually a #GstSystemClock, in which case this function
4540 * behaves exactly like g_cond_wait_until. Inside unit tests,
4541 * the clock is typically a #GstTestClock, which allows tests to run
4542 * in non-realtime.
4543 * This function must be called with mutex held.
4544 */
4545 static gboolean
gst_adaptive_demux_wait_until(GstClock * clock,GCond * cond,GMutex * mutex,GstClockTime end_time)4546 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4547 GstClockTime end_time)
4548 {
4549 GstAdaptiveDemuxTimer *timer;
4550 gboolean fired;
4551 GstClockReturn res;
4552
4553 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4554 /* for an invalid time, gst_clock_id_wait_async will try to call
4555 * gst_adaptive_demux_clock_callback from the current thread.
4556 * It still holds the mutex while doing that, so it will deadlock.
4557 * g_cond_wait_until would return immediately with false, so we'll do the same.
4558 */
4559 return FALSE;
4560 }
4561 timer = gst_adaptive_demux_timer_new (cond, mutex);
4562 timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4563 res =
4564 gst_clock_id_wait_async (timer->clock_id,
4565 gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4566 (GDestroyNotify) gst_adaptive_demux_timer_unref);
4567 /* clock does not support asynchronously wait. Assert and return */
4568 if (res == GST_CLOCK_UNSUPPORTED) {
4569 gst_clock_id_unref (timer->clock_id);
4570 gst_adaptive_demux_timer_unref (timer);
4571 g_return_val_if_reached (TRUE);
4572 }
4573 g_assert (!timer->fired);
4574 /* the gst_adaptive_demux_clock_callback() will signal the
4575 * cond when the clock's single shot timer fires, or the cond will be
4576 * signalled by another thread that wants to cause this wait to finish
4577 * early (e.g. to terminate the waiting thread).
4578 * There is no need for a while loop here, because that logic is
4579 * implemented by the function calling gst_adaptive_demux_wait_until() */
4580 g_cond_wait (cond, mutex);
4581 fired = timer->fired;
4582 if (!fired)
4583 gst_clock_id_unschedule (timer->clock_id);
4584 gst_clock_id_unref (timer->clock_id);
4585 gst_adaptive_demux_timer_unref (timer);
4586 return !fired;
4587 }
4588
4589 static gboolean
gst_adaptive_demux_clock_callback(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)4590 gst_adaptive_demux_clock_callback (GstClock * clock,
4591 GstClockTime time, GstClockID id, gpointer user_data)
4592 {
4593 GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4594 g_return_val_if_fail (timer != NULL, FALSE);
4595 g_mutex_lock (timer->mutex);
4596 timer->fired = TRUE;
4597 g_cond_signal (timer->cond);
4598 g_mutex_unlock (timer->mutex);
4599 return TRUE;
4600 }
4601