1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @title: GstAppSrc
23  * @short_description: Easy way for applications to inject buffers into a
24  *     pipeline
25  * @see_also: #GstBaseSrc, appsink
26  *
27  * The appsrc element can be used by applications to insert data into a
28  * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
29  * external API functions.
30  *
31  * appsrc can be used by linking with the libgstapp library to access the
32  * methods directly or by using the appsrc action signals.
33  *
34  * Before operating appsrc, the caps property must be set to fixed caps
35  * describing the format of the data that will be pushed with appsrc. An
36  * exception to this is when pushing buffers with unknown caps, in which case no
37  * caps should be set. This is typically true of file-like sources that push raw
38  * byte buffers. If you don't want to explicitly set the caps, you can use
39  * gst_app_src_push_sample. This method gets the caps associated with the
40  * sample and sets them on the appsrc replacing any previously set caps (if
41  * different from sample's caps).
42  *
43  * The main way of handing data to the appsrc element is by calling the
44  * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
45  * This will put the buffer onto a queue from which appsrc will read from in its
46  * streaming thread. It is important to note that data transport will not happen
47  * from the thread that performed the push-buffer call.
48  *
49  * The "max-bytes" property controls how much data can be queued in appsrc
50  * before appsrc considers the queue full. A filled internal queue will always
51  * signal the "enough-data" signal, which signals the application that it should
52  * stop pushing data into appsrc. The "block" property will cause appsrc to
53  * block the push-buffer method until free data becomes available again.
54  *
55  * When the internal queue is running out of data, the "need-data" signal is
56  * emitted, which signals the application that it should start pushing more data
57  * into appsrc.
58  *
59  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
60  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
61  * "random-access". The signal argument will contain the new desired position in
62  * the stream expressed in the unit set with the "format" property. After
63  * receiving the seek-data signal, the application should push-buffers from the
64  * new position.
65  *
66  * These signals allow the application to operate the appsrc in two different
67  * ways:
68  *
69  * The push mode, in which the application repeatedly calls the push-buffer/push-sample
70  * method with a new buffer/sample. Optionally, the queue size in the appsrc
71  * can be controlled with the enough-data and need-data signals by respectively
72  * stopping/starting the push-buffer/push-sample calls. This is a typical
73  * mode of operation for the stream-type "stream" and "seekable". Use this
74  * mode when implementing various network protocols or hardware devices.
75  *
76  * The pull mode, in which the need-data signal triggers the next push-buffer call.
77  * This mode is typically used in the "random-access" stream-type. Use this
78  * mode for file access or other randomly accessable sources. In this mode, a
79  * buffer of exactly the amount of bytes given by the need-data signal should be
80  * pushed into appsrc.
81  *
82  * In all modes, the size property on appsrc should contain the total stream
83  * size in bytes. Setting this property is mandatory in the random-access mode.
84  * For the stream and seekable modes, setting this property is optional but
85  * recommended.
86  *
87  * When the application has finished pushing data into appsrc, it should call
88  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
89  * this call, no more buffers can be pushed into appsrc until a flushing seek
90  * occurs or the state of the appsrc has gone through READY.
91  */
92 
93 #ifdef HAVE_CONFIG_H
94 #include "config.h"
95 #endif
96 
97 #include <gst/gst.h>
98 #include <gst/base/base.h>
99 
100 #include <string.h>
101 
102 #include "gstappsrc.h"
103 
104 typedef enum
105 {
106   NOONE_WAITING = 0,
107   STREAM_WAITING = 1 << 0,      /* streaming thread is waiting for application thread */
108   APP_WAITING = 1 << 1,         /* application thread is waiting for streaming thread */
109 } GstAppSrcWaitStatus;
110 
111 struct _GstAppSrcPrivate
112 {
113   GCond cond;
114   GMutex mutex;
115   GstQueueArray *queue;
116   GstAppSrcWaitStatus wait_status;
117 
118   GstCaps *last_caps;
119   GstCaps *current_caps;
120 
121   gint64 size;
122   GstClockTime duration;
123   GstAppStreamType stream_type;
124   guint64 max_bytes;
125   GstFormat format;
126   gboolean block;
127   gchar *uri;
128 
129   gboolean flushing;
130   gboolean started;
131   gboolean is_eos;
132   guint64 queued_bytes;
133   guint64 offset;
134   GstAppStreamType current_type;
135 
136   guint64 min_latency;
137   guint64 max_latency;
138   gboolean emit_signals;
139   guint min_percent;
140 
141   GstAppSrcCallbacks callbacks;
142   gpointer user_data;
143   GDestroyNotify notify;
144 };
145 
146 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
147 #define GST_CAT_DEFAULT app_src_debug
148 
149 enum
150 {
151   /* signals */
152   SIGNAL_NEED_DATA,
153   SIGNAL_ENOUGH_DATA,
154   SIGNAL_SEEK_DATA,
155 
156   /* actions */
157   SIGNAL_PUSH_BUFFER,
158   SIGNAL_END_OF_STREAM,
159   SIGNAL_PUSH_SAMPLE,
160   SIGNAL_PUSH_BUFFER_LIST,
161 
162   LAST_SIGNAL
163 };
164 
165 #define DEFAULT_PROP_SIZE          -1
166 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
167 #define DEFAULT_PROP_MAX_BYTES     200000
168 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
169 #define DEFAULT_PROP_BLOCK         FALSE
170 #define DEFAULT_PROP_IS_LIVE       FALSE
171 #define DEFAULT_PROP_MIN_LATENCY   -1
172 #define DEFAULT_PROP_MAX_LATENCY   -1
173 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
174 #define DEFAULT_PROP_MIN_PERCENT   0
175 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES   0
176 #define DEFAULT_PROP_DURATION      GST_CLOCK_TIME_NONE
177 
178 enum
179 {
180   PROP_0,
181   PROP_CAPS,
182   PROP_SIZE,
183   PROP_STREAM_TYPE,
184   PROP_MAX_BYTES,
185   PROP_FORMAT,
186   PROP_BLOCK,
187   PROP_IS_LIVE,
188   PROP_MIN_LATENCY,
189   PROP_MAX_LATENCY,
190   PROP_EMIT_SIGNALS,
191   PROP_MIN_PERCENT,
192   PROP_CURRENT_LEVEL_BYTES,
193   PROP_DURATION,
194   PROP_LAST
195 };
196 
197 static GstStaticPadTemplate gst_app_src_template =
198 GST_STATIC_PAD_TEMPLATE ("src",
199     GST_PAD_SRC,
200     GST_PAD_ALWAYS,
201     GST_STATIC_CAPS_ANY);
202 
203 static void gst_app_src_uri_handler_init (gpointer g_iface,
204     gpointer iface_data);
205 
206 static void gst_app_src_dispose (GObject * object);
207 static void gst_app_src_finalize (GObject * object);
208 
209 static void gst_app_src_set_property (GObject * object, guint prop_id,
210     const GValue * value, GParamSpec * pspec);
211 static void gst_app_src_get_property (GObject * object, guint prop_id,
212     GValue * value, GParamSpec * pspec);
213 
214 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
215 
216 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
217     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
218 
219 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
220 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
221     GstCaps * filter);
222 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
223     guint size, GstBuffer ** buf);
224 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
225 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
226 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
227 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
228 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
229 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
230 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
231 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
232 static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
233 
234 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
235     GstBuffer * buffer);
236 static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
237     GstBufferList * buffer_list);
238 static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
239     GstSample * sample);
240 
241 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
242 
243 #define gst_app_src_parent_class parent_class
244 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
245     G_ADD_PRIVATE (GstAppSrc)
246     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
247 
248 static void
gst_app_src_class_init(GstAppSrcClass * klass)249 gst_app_src_class_init (GstAppSrcClass * klass)
250 {
251   GObjectClass *gobject_class = (GObjectClass *) klass;
252   GstElementClass *element_class = (GstElementClass *) klass;
253   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
254 
255   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
256 
257   gobject_class->dispose = gst_app_src_dispose;
258   gobject_class->finalize = gst_app_src_finalize;
259 
260   gobject_class->set_property = gst_app_src_set_property;
261   gobject_class->get_property = gst_app_src_get_property;
262 
263   /**
264    * GstAppSrc::caps:
265    *
266    * The GstCaps that will negotiated downstream and will be put
267    * on outgoing buffers.
268    */
269   g_object_class_install_property (gobject_class, PROP_CAPS,
270       g_param_spec_boxed ("caps", "Caps",
271           "The allowed caps for the src pad", GST_TYPE_CAPS,
272           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273   /**
274    * GstAppSrc::format:
275    *
276    * The format to use for segment events. When the source is producing
277    * timestamped buffers this property should be set to GST_FORMAT_TIME.
278    */
279   g_object_class_install_property (gobject_class, PROP_FORMAT,
280       g_param_spec_enum ("format", "Format",
281           "The format of the segment events and seek", GST_TYPE_FORMAT,
282           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283   /**
284    * GstAppSrc::size:
285    *
286    * The total size in bytes of the data stream. If the total size is known, it
287    * is recommended to configure it with this property.
288    */
289   g_object_class_install_property (gobject_class, PROP_SIZE,
290       g_param_spec_int64 ("size", "Size",
291           "The size of the data stream in bytes (-1 if unknown)",
292           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
293           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
294   /**
295    * GstAppSrc::stream-type:
296    *
297    * The type of stream that this source is producing.  For seekable streams the
298    * application should connect to the seek-data signal.
299    */
300   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
301       g_param_spec_enum ("stream-type", "Stream Type",
302           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
303           DEFAULT_PROP_STREAM_TYPE,
304           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305   /**
306    * GstAppSrc::max-bytes:
307    *
308    * The maximum amount of bytes that can be queued internally.
309    * After the maximum amount of bytes are queued, appsrc will emit the
310    * "enough-data" signal.
311    */
312   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
313       g_param_spec_uint64 ("max-bytes", "Max bytes",
314           "The maximum number of bytes to queue internally (0 = unlimited)",
315           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
316           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
317   /**
318    * GstAppSrc::block:
319    *
320    * When max-bytes are queued and after the enough-data signal has been emitted,
321    * block any further push-buffer calls until the amount of queued bytes drops
322    * below the max-bytes limit.
323    */
324   g_object_class_install_property (gobject_class, PROP_BLOCK,
325       g_param_spec_boolean ("block", "Block",
326           "Block push-buffer when max-bytes are queued",
327           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328 
329   /**
330    * GstAppSrc::is-live:
331    *
332    * Instruct the source to behave like a live source. This includes that it
333    * will only push out buffers in the PLAYING state.
334    */
335   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
336       g_param_spec_boolean ("is-live", "Is Live",
337           "Whether to act as a live source",
338           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339   /**
340    * GstAppSrc::min-latency:
341    *
342    * The minimum latency of the source. A value of -1 will use the default
343    * latency calculations of #GstBaseSrc.
344    */
345   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
346       g_param_spec_int64 ("min-latency", "Min Latency",
347           "The minimum latency (-1 = default)",
348           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
349           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350   /**
351    * GstAppSrc::max-latency:
352    *
353    * The maximum latency of the source. A value of -1 means an unlimited amout
354    * of latency.
355    */
356   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
357       g_param_spec_int64 ("max-latency", "Max Latency",
358           "The maximum latency (-1 = unlimited)",
359           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
360           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361 
362   /**
363    * GstAppSrc::emit-signals:
364    *
365    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
366    * This option is by default enabled for backwards compatibility reasons but
367    * can disabled when needed because signal emission is expensive.
368    */
369   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
370       g_param_spec_boolean ("emit-signals", "Emit signals",
371           "Emit need-data, enough-data and seek-data signals",
372           DEFAULT_PROP_EMIT_SIGNALS,
373           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374 
375   /**
376    * GstAppSrc::min-percent:
377    *
378    * Make appsrc emit the "need-data" signal when the amount of bytes in the
379    * queue drops below this percentage of max-bytes.
380    */
381   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
382       g_param_spec_uint ("min-percent", "Min Percent",
383           "Emit need-data when queued bytes drops below this percent of max-bytes",
384           0, 100, DEFAULT_PROP_MIN_PERCENT,
385           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386 
387   /**
388    * GstAppSrc::current-level-bytes:
389    *
390    * The number of currently queued bytes inside appsrc.
391    *
392    * Since: 1.2
393    */
394   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
395       g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
396           "The number of currently queued bytes",
397           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
398           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
399 
400   /**
401    * GstAppSrc::duration:
402    *
403    * The total duration in nanoseconds of the data stream. If the total duration is known, it
404    * is recommended to configure it with this property.
405    *
406    * Since: 1.10
407    */
408   g_object_class_install_property (gobject_class, PROP_DURATION,
409       g_param_spec_uint64 ("duration", "Duration",
410           "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
411           0, G_MAXUINT64, DEFAULT_PROP_DURATION,
412           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413 
414   /**
415    * GstAppSrc::need-data:
416    * @appsrc: the appsrc element that emitted the signal
417    * @length: the amount of bytes needed.
418    *
419    * Signal that the source needs more data. In the callback or from another
420    * thread you should call push-buffer or end-of-stream.
421    *
422    * @length is just a hint and when it is set to -1, any number of bytes can be
423    * pushed into @appsrc.
424    *
425    * You can call push-buffer multiple times until the enough-data signal is
426    * fired.
427    */
428   gst_app_src_signals[SIGNAL_NEED_DATA] =
429       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
430       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
431       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
432 
433   /**
434    * GstAppSrc::enough-data:
435    * @appsrc: the appsrc element that emitted the signal
436    *
437    * Signal that the source has enough data. It is recommended that the
438    * application stops calling push-buffer until the need-data signal is
439    * emitted again to avoid excessive buffer queueing.
440    */
441   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
442       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
443       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
444       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
445 
446   /**
447    * GstAppSrc::seek-data:
448    * @appsrc: the appsrc element that emitted the signal
449    * @offset: the offset to seek to
450    *
451    * Seek to the given offset. The next push-buffer should produce buffers from
452    * the new @offset.
453    * This callback is only called for seekable stream types.
454    *
455    * Returns: %TRUE if the seek succeeded.
456    */
457   gst_app_src_signals[SIGNAL_SEEK_DATA] =
458       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
459       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
460       NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
461 
462    /**
463     * GstAppSrc::push-buffer:
464     * @appsrc: the appsrc
465     * @buffer: a buffer to push
466     *
467     * Adds a buffer to the queue of buffers that the appsrc element will
468     * push to its source pad. This function does not take ownership of the
469     * buffer so the buffer needs to be unreffed after calling this function.
470     *
471     * When the block property is TRUE, this function can block until free space
472     * becomes available in the queue.
473     */
474   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
475       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
476       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
477           push_buffer), NULL, NULL, NULL,
478       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
479 
480    /**
481     * GstAppSrc::push-buffer-list:
482     * @appsrc: the appsrc
483     * @buffer_list: a buffer list to push
484     *
485     * Adds a buffer list to the queue of buffers and buffer lists that the
486     * appsrc element will push to its source pad. This function does not take
487     * ownership of the buffer list so the buffer list needs to be unreffed
488     * after calling this function.
489     *
490     * When the block property is TRUE, this function can block until free space
491     * becomes available in the queue.
492     *
493     * Since: 1.14
494     */
495   gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
496       g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
497       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
498           push_buffer_list), NULL, NULL, NULL,
499       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
500 
501   /**
502     * GstAppSrc::push-sample:
503     * @appsrc: the appsrc
504     * @sample: a sample from which extract buffer to push
505     *
506     * Extract a buffer from the provided sample and adds the extracted buffer
507     * to the queue of buffers that the appsrc element will
508     * push to its source pad. This function set the appsrc caps based on the caps
509     * in the sample and reset the caps if they change.
510     * Only the caps and the buffer of the provided sample are used and not
511     * for example the segment in the sample.
512     * This function does not take ownership of the
513     * sample so the sample needs to be unreffed after calling this function.
514     *
515     * When the block property is TRUE, this function can block until free space
516     * becomes available in the queue.
517     *
518     * Since: 1.6
519     *
520     */
521   gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
522       g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
523       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
524           push_sample), NULL, NULL, NULL,
525       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
526 
527 
528    /**
529     * GstAppSrc::end-of-stream:
530     * @appsrc: the appsrc
531     *
532     * Notify @appsrc that no more buffer are available.
533     */
534   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
535       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
536       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
537           end_of_stream), NULL, NULL, NULL,
538       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
539 
540   gst_element_class_set_static_metadata (element_class, "AppSrc",
541       "Generic/Source", "Allow the application to feed buffers to a pipeline",
542       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
543 
544   gst_element_class_add_static_pad_template (element_class,
545       &gst_app_src_template);
546 
547   element_class->send_event = gst_app_src_send_event;
548 
549   basesrc_class->negotiate = gst_app_src_negotiate;
550   basesrc_class->get_caps = gst_app_src_internal_get_caps;
551   basesrc_class->create = gst_app_src_create;
552   basesrc_class->start = gst_app_src_start;
553   basesrc_class->stop = gst_app_src_stop;
554   basesrc_class->unlock = gst_app_src_unlock;
555   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
556   basesrc_class->do_seek = gst_app_src_do_seek;
557   basesrc_class->is_seekable = gst_app_src_is_seekable;
558   basesrc_class->get_size = gst_app_src_do_get_size;
559   basesrc_class->query = gst_app_src_query;
560   basesrc_class->event = gst_app_src_event;
561 
562   klass->push_buffer = gst_app_src_push_buffer_action;
563   klass->push_buffer_list = gst_app_src_push_buffer_list_action;
564   klass->push_sample = gst_app_src_push_sample_action;
565   klass->end_of_stream = gst_app_src_end_of_stream;
566 }
567 
568 static void
gst_app_src_init(GstAppSrc * appsrc)569 gst_app_src_init (GstAppSrc * appsrc)
570 {
571   GstAppSrcPrivate *priv;
572 
573   priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
574 
575   g_mutex_init (&priv->mutex);
576   g_cond_init (&priv->cond);
577   priv->queue = gst_queue_array_new (16);
578   priv->wait_status = NOONE_WAITING;
579 
580   priv->size = DEFAULT_PROP_SIZE;
581   priv->duration = DEFAULT_PROP_DURATION;
582   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
583   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
584   priv->format = DEFAULT_PROP_FORMAT;
585   priv->block = DEFAULT_PROP_BLOCK;
586   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
587   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
588   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
589   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
590 
591   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
592 }
593 
594 /* Must be called with priv->mutex */
595 static void
gst_app_src_flush_queued(GstAppSrc * src,gboolean retain_last_caps)596 gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
597 {
598   GstMiniObject *obj;
599   GstAppSrcPrivate *priv = src->priv;
600   GstCaps *requeue_caps = NULL;
601 
602   while (!gst_queue_array_is_empty (priv->queue)) {
603     obj = gst_queue_array_pop_head (priv->queue);
604     if (obj) {
605       if (GST_IS_CAPS (obj) && retain_last_caps) {
606         gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
607       }
608       gst_mini_object_unref (obj);
609     }
610   }
611 
612   if (requeue_caps) {
613     gst_queue_array_push_tail (priv->queue, requeue_caps);
614   }
615 
616   priv->queued_bytes = 0;
617 }
618 
619 static void
gst_app_src_dispose(GObject * obj)620 gst_app_src_dispose (GObject * obj)
621 {
622   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
623   GstAppSrcPrivate *priv = appsrc->priv;
624 
625   GST_OBJECT_LOCK (appsrc);
626   if (priv->current_caps) {
627     gst_caps_unref (priv->current_caps);
628     priv->current_caps = NULL;
629   }
630   if (priv->last_caps) {
631     gst_caps_unref (priv->last_caps);
632     priv->last_caps = NULL;
633   }
634   if (priv->notify) {
635     priv->notify (priv->user_data);
636   }
637   priv->user_data = NULL;
638   priv->notify = NULL;
639 
640   GST_OBJECT_UNLOCK (appsrc);
641 
642   g_mutex_lock (&priv->mutex);
643   gst_app_src_flush_queued (appsrc, FALSE);
644   g_mutex_unlock (&priv->mutex);
645 
646   G_OBJECT_CLASS (parent_class)->dispose (obj);
647 }
648 
649 static void
gst_app_src_finalize(GObject * obj)650 gst_app_src_finalize (GObject * obj)
651 {
652   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
653   GstAppSrcPrivate *priv = appsrc->priv;
654 
655   g_mutex_clear (&priv->mutex);
656   g_cond_clear (&priv->cond);
657   gst_queue_array_free (priv->queue);
658 
659   g_free (priv->uri);
660 
661   G_OBJECT_CLASS (parent_class)->finalize (obj);
662 }
663 
664 static GstCaps *
gst_app_src_internal_get_caps(GstBaseSrc * bsrc,GstCaps * filter)665 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
666 {
667   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
668   GstCaps *caps;
669 
670   GST_OBJECT_LOCK (appsrc);
671   if ((caps = appsrc->priv->current_caps))
672     gst_caps_ref (caps);
673   GST_OBJECT_UNLOCK (appsrc);
674 
675   if (filter) {
676     if (caps) {
677       GstCaps *intersection =
678           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
679       gst_caps_unref (caps);
680       caps = intersection;
681     } else {
682       caps = gst_caps_ref (filter);
683     }
684   }
685 
686   GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
687   return caps;
688 }
689 
690 static void
gst_app_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)691 gst_app_src_set_property (GObject * object, guint prop_id,
692     const GValue * value, GParamSpec * pspec)
693 {
694   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
695   GstAppSrcPrivate *priv = appsrc->priv;
696 
697   switch (prop_id) {
698     case PROP_CAPS:
699       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
700       break;
701     case PROP_SIZE:
702       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
703       break;
704     case PROP_STREAM_TYPE:
705       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
706       break;
707     case PROP_MAX_BYTES:
708       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
709       break;
710     case PROP_FORMAT:
711       priv->format = g_value_get_enum (value);
712       break;
713     case PROP_BLOCK:
714       priv->block = g_value_get_boolean (value);
715       break;
716     case PROP_IS_LIVE:
717       gst_base_src_set_live (GST_BASE_SRC (appsrc),
718           g_value_get_boolean (value));
719       break;
720     case PROP_MIN_LATENCY:
721       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
722           FALSE, -1);
723       break;
724     case PROP_MAX_LATENCY:
725       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
726           g_value_get_int64 (value));
727       break;
728     case PROP_EMIT_SIGNALS:
729       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
730       break;
731     case PROP_MIN_PERCENT:
732       priv->min_percent = g_value_get_uint (value);
733       break;
734     case PROP_DURATION:
735       gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
736       break;
737     default:
738       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
739       break;
740   }
741 }
742 
743 static void
gst_app_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)744 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
745     GParamSpec * pspec)
746 {
747   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
748   GstAppSrcPrivate *priv = appsrc->priv;
749 
750   switch (prop_id) {
751     case PROP_CAPS:
752       g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
753       break;
754     case PROP_SIZE:
755       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
756       break;
757     case PROP_STREAM_TYPE:
758       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
759       break;
760     case PROP_MAX_BYTES:
761       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
762       break;
763     case PROP_FORMAT:
764       g_value_set_enum (value, priv->format);
765       break;
766     case PROP_BLOCK:
767       g_value_set_boolean (value, priv->block);
768       break;
769     case PROP_IS_LIVE:
770       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
771       break;
772     case PROP_MIN_LATENCY:
773     {
774       guint64 min = 0;
775 
776       gst_app_src_get_latency (appsrc, &min, NULL);
777       g_value_set_int64 (value, min);
778       break;
779     }
780     case PROP_MAX_LATENCY:
781     {
782       guint64 max = 0;
783 
784       gst_app_src_get_latency (appsrc, NULL, &max);
785       g_value_set_int64 (value, max);
786       break;
787     }
788     case PROP_EMIT_SIGNALS:
789       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
790       break;
791     case PROP_MIN_PERCENT:
792       g_value_set_uint (value, priv->min_percent);
793       break;
794     case PROP_CURRENT_LEVEL_BYTES:
795       g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
796       break;
797     case PROP_DURATION:
798       g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
799       break;
800     default:
801       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
802       break;
803   }
804 }
805 
806 static gboolean
gst_app_src_send_event(GstElement * element,GstEvent * event)807 gst_app_src_send_event (GstElement * element, GstEvent * event)
808 {
809   GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
810   GstAppSrcPrivate *priv = appsrc->priv;
811 
812   switch (GST_EVENT_TYPE (event)) {
813     case GST_EVENT_FLUSH_STOP:
814       g_mutex_lock (&priv->mutex);
815       gst_app_src_flush_queued (appsrc, TRUE);
816       g_mutex_unlock (&priv->mutex);
817       break;
818     default:
819       break;
820   }
821 
822   return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
823           event), FALSE);
824 }
825 
826 static gboolean
gst_app_src_unlock(GstBaseSrc * bsrc)827 gst_app_src_unlock (GstBaseSrc * bsrc)
828 {
829   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
830   GstAppSrcPrivate *priv = appsrc->priv;
831 
832   g_mutex_lock (&priv->mutex);
833   GST_DEBUG_OBJECT (appsrc, "unlock start");
834   priv->flushing = TRUE;
835   g_cond_broadcast (&priv->cond);
836   g_mutex_unlock (&priv->mutex);
837 
838   return TRUE;
839 }
840 
841 static gboolean
gst_app_src_unlock_stop(GstBaseSrc * bsrc)842 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
843 {
844   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
845   GstAppSrcPrivate *priv = appsrc->priv;
846 
847   g_mutex_lock (&priv->mutex);
848   GST_DEBUG_OBJECT (appsrc, "unlock stop");
849   priv->flushing = FALSE;
850   g_cond_broadcast (&priv->cond);
851   g_mutex_unlock (&priv->mutex);
852 
853   return TRUE;
854 }
855 
856 static gboolean
gst_app_src_start(GstBaseSrc * bsrc)857 gst_app_src_start (GstBaseSrc * bsrc)
858 {
859   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
860   GstAppSrcPrivate *priv = appsrc->priv;
861 
862   g_mutex_lock (&priv->mutex);
863   GST_DEBUG_OBJECT (appsrc, "starting");
864   priv->started = TRUE;
865   /* set the offset to -1 so that we always do a first seek. This is only used
866    * in random-access mode. */
867   priv->offset = -1;
868   priv->flushing = FALSE;
869   g_mutex_unlock (&priv->mutex);
870 
871   gst_base_src_set_format (bsrc, priv->format);
872 
873   return TRUE;
874 }
875 
876 static gboolean
gst_app_src_stop(GstBaseSrc * bsrc)877 gst_app_src_stop (GstBaseSrc * bsrc)
878 {
879   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
880   GstAppSrcPrivate *priv = appsrc->priv;
881 
882   g_mutex_lock (&priv->mutex);
883   GST_DEBUG_OBJECT (appsrc, "stopping");
884   priv->is_eos = FALSE;
885   priv->flushing = TRUE;
886   priv->started = FALSE;
887   gst_app_src_flush_queued (appsrc, TRUE);
888   g_cond_broadcast (&priv->cond);
889   g_mutex_unlock (&priv->mutex);
890 
891   return TRUE;
892 }
893 
894 static gboolean
gst_app_src_is_seekable(GstBaseSrc * src)895 gst_app_src_is_seekable (GstBaseSrc * src)
896 {
897   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
898   GstAppSrcPrivate *priv = appsrc->priv;
899   gboolean res = FALSE;
900 
901   switch (priv->stream_type) {
902     case GST_APP_STREAM_TYPE_STREAM:
903       break;
904     case GST_APP_STREAM_TYPE_SEEKABLE:
905     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
906       res = TRUE;
907       break;
908   }
909   return res;
910 }
911 
912 static gboolean
gst_app_src_do_get_size(GstBaseSrc * src,guint64 * size)913 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
914 {
915   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
916 
917   *size = gst_app_src_get_size (appsrc);
918 
919   return TRUE;
920 }
921 
922 static gboolean
gst_app_src_query(GstBaseSrc * src,GstQuery * query)923 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
924 {
925   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
926   GstAppSrcPrivate *priv = appsrc->priv;
927   gboolean res;
928 
929   switch (GST_QUERY_TYPE (query)) {
930     case GST_QUERY_LATENCY:
931     {
932       GstClockTime min, max;
933       gboolean live;
934 
935       /* Query the parent class for the defaults */
936       res = gst_base_src_query_latency (src, &live, &min, &max);
937 
938       /* overwrite with our values when we need to */
939       g_mutex_lock (&priv->mutex);
940       if (priv->min_latency != -1) {
941         min = priv->min_latency;
942         max = priv->max_latency;
943       }
944       g_mutex_unlock (&priv->mutex);
945 
946       gst_query_set_latency (query, live, min, max);
947       break;
948     }
949     case GST_QUERY_SCHEDULING:
950     {
951       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
952       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
953 
954       switch (priv->stream_type) {
955         case GST_APP_STREAM_TYPE_STREAM:
956         case GST_APP_STREAM_TYPE_SEEKABLE:
957           break;
958         case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
959           gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
960           break;
961       }
962       res = TRUE;
963       break;
964     }
965     case GST_QUERY_DURATION:
966     {
967       GstFormat format;
968       gst_query_parse_duration (query, &format, NULL);
969       if (format == GST_FORMAT_BYTES) {
970         gst_query_set_duration (query, format, priv->size);
971         res = TRUE;
972       } else if (format == GST_FORMAT_TIME) {
973         if (priv->duration != GST_CLOCK_TIME_NONE) {
974           gst_query_set_duration (query, format, priv->duration);
975           res = TRUE;
976         } else {
977           res = FALSE;
978         }
979       } else {
980         res = FALSE;
981       }
982       break;
983     }
984     default:
985       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
986       break;
987   }
988 
989   return res;
990 }
991 
992 /* will be called in push mode */
993 static gboolean
gst_app_src_do_seek(GstBaseSrc * src,GstSegment * segment)994 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
995 {
996   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
997   GstAppSrcPrivate *priv = appsrc->priv;
998   gint64 desired_position;
999   gboolean res = FALSE;
1000 
1001   desired_position = segment->position;
1002 
1003   /* no need to try to seek in streaming mode */
1004   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
1005     return TRUE;
1006 
1007   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
1008       desired_position, gst_format_get_name (segment->format));
1009 
1010   if (priv->callbacks.seek_data)
1011     res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
1012   else {
1013     gboolean emit;
1014 
1015     g_mutex_lock (&priv->mutex);
1016     emit = priv->emit_signals;
1017     g_mutex_unlock (&priv->mutex);
1018 
1019     if (emit)
1020       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1021           desired_position, &res);
1022   }
1023 
1024   if (res) {
1025     GST_DEBUG_OBJECT (appsrc, "flushing queue");
1026     g_mutex_lock (&priv->mutex);
1027     gst_app_src_flush_queued (appsrc, TRUE);
1028     g_mutex_unlock (&priv->mutex);
1029     priv->is_eos = FALSE;
1030   } else {
1031     GST_WARNING_OBJECT (appsrc, "seek failed");
1032   }
1033 
1034   return res;
1035 }
1036 
1037 /* must be called with the appsrc mutex */
1038 static gboolean
gst_app_src_emit_seek(GstAppSrc * appsrc,guint64 offset)1039 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
1040 {
1041   gboolean res = FALSE;
1042   gboolean emit;
1043   GstAppSrcPrivate *priv = appsrc->priv;
1044 
1045   emit = priv->emit_signals;
1046   g_mutex_unlock (&priv->mutex);
1047 
1048   GST_DEBUG_OBJECT (appsrc,
1049       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
1050       priv->offset, offset);
1051 
1052   if (priv->callbacks.seek_data)
1053     res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
1054   else if (emit)
1055     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1056         offset, &res);
1057 
1058   g_mutex_lock (&priv->mutex);
1059 
1060   return res;
1061 }
1062 
1063 /* must be called with the appsrc mutex. After this call things can be
1064  * flushing */
1065 static void
gst_app_src_emit_need_data(GstAppSrc * appsrc,guint size)1066 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
1067 {
1068   gboolean emit;
1069   GstAppSrcPrivate *priv = appsrc->priv;
1070 
1071   emit = priv->emit_signals;
1072   g_mutex_unlock (&priv->mutex);
1073 
1074   /* we have no data, we need some. We fire the signal with the size hint. */
1075   if (priv->callbacks.need_data)
1076     priv->callbacks.need_data (appsrc, size, priv->user_data);
1077   else if (emit)
1078     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
1079         NULL);
1080 
1081   g_mutex_lock (&priv->mutex);
1082   /* we can be flushing now because we released the lock */
1083 }
1084 
1085 /* must be called with the appsrc mutex */
1086 static gboolean
gst_app_src_do_negotiate(GstBaseSrc * basesrc)1087 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
1088 {
1089   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1090   GstAppSrcPrivate *priv = appsrc->priv;
1091   gboolean result;
1092   GstCaps *caps;
1093 
1094   GST_OBJECT_LOCK (basesrc);
1095   caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
1096   GST_OBJECT_UNLOCK (basesrc);
1097 
1098   /* Avoid deadlock by unlocking mutex
1099    * otherwise we get deadlock between this and stream lock */
1100   g_mutex_unlock (&priv->mutex);
1101   if (caps) {
1102     result = gst_base_src_set_caps (basesrc, caps);
1103     gst_caps_unref (caps);
1104   } else {
1105     result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
1106   }
1107   g_mutex_lock (&priv->mutex);
1108 
1109   return result;
1110 }
1111 
1112 static gboolean
gst_app_src_negotiate(GstBaseSrc * basesrc)1113 gst_app_src_negotiate (GstBaseSrc * basesrc)
1114 {
1115   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1116   GstAppSrcPrivate *priv = appsrc->priv;
1117   gboolean result;
1118 
1119   g_mutex_lock (&priv->mutex);
1120   result = gst_app_src_do_negotiate (basesrc);
1121   g_mutex_unlock (&priv->mutex);
1122   return result;
1123 }
1124 
1125 static GstFlowReturn
gst_app_src_create(GstBaseSrc * bsrc,guint64 offset,guint size,GstBuffer ** buf)1126 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
1127     GstBuffer ** buf)
1128 {
1129   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1130   GstAppSrcPrivate *priv = appsrc->priv;
1131   GstFlowReturn ret;
1132 
1133   GST_OBJECT_LOCK (appsrc);
1134   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
1135           bsrc->segment.format == GST_FORMAT_BYTES)) {
1136     GST_DEBUG_OBJECT (appsrc,
1137         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
1138         bsrc->segment.duration, priv->size);
1139     bsrc->segment.duration = priv->size;
1140     GST_OBJECT_UNLOCK (appsrc);
1141 
1142     gst_element_post_message (GST_ELEMENT (appsrc),
1143         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1144   } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
1145           bsrc->segment.format == GST_FORMAT_TIME)) {
1146     GST_DEBUG_OBJECT (appsrc,
1147         "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
1148         GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
1149     bsrc->segment.duration = priv->duration;
1150     GST_OBJECT_UNLOCK (appsrc);
1151 
1152     gst_element_post_message (GST_ELEMENT (appsrc),
1153         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1154   } else {
1155     GST_OBJECT_UNLOCK (appsrc);
1156   }
1157 
1158   g_mutex_lock (&priv->mutex);
1159   /* check flushing first */
1160   if (G_UNLIKELY (priv->flushing))
1161     goto flushing;
1162 
1163   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
1164     /* if we are dealing with a random-access stream, issue a seek if the offset
1165      * changed. */
1166     if (G_UNLIKELY (priv->offset != offset)) {
1167       gboolean res;
1168 
1169       /* do the seek */
1170       res = gst_app_src_emit_seek (appsrc, offset);
1171 
1172       if (G_UNLIKELY (!res))
1173         /* failing to seek is fatal */
1174         goto seek_error;
1175 
1176       priv->offset = offset;
1177       priv->is_eos = FALSE;
1178     }
1179   }
1180 
1181   while (TRUE) {
1182     /* return data as long as we have some */
1183     if (!gst_queue_array_is_empty (priv->queue)) {
1184       guint buf_size;
1185       GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
1186 
1187       if (GST_IS_CAPS (obj)) {
1188         GstCaps *next_caps = GST_CAPS (obj);
1189         gboolean caps_changed = TRUE;
1190 
1191         if (next_caps && priv->current_caps)
1192           caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
1193         else
1194           caps_changed = (next_caps != priv->current_caps);
1195 
1196         gst_caps_replace (&priv->current_caps, next_caps);
1197 
1198         if (next_caps) {
1199           gst_caps_unref (next_caps);
1200         }
1201 
1202         if (caps_changed)
1203           gst_app_src_do_negotiate (bsrc);
1204 
1205         /* Lock has released so now may need
1206          *- flushing
1207          *- new caps change
1208          *- check queue has data */
1209         if (G_UNLIKELY (priv->flushing))
1210           goto flushing;
1211 
1212         /* Continue checks caps and queue */
1213         continue;
1214       }
1215 
1216       if (GST_IS_BUFFER (obj)) {
1217         *buf = GST_BUFFER (obj);
1218         buf_size = gst_buffer_get_size (*buf);
1219         GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", *buf, buf_size);
1220       } else {
1221         GstBufferList *buffer_list;
1222 
1223         g_assert (GST_IS_BUFFER_LIST (obj));
1224 
1225         buffer_list = GST_BUFFER_LIST (obj);
1226 
1227         buf_size = gst_buffer_list_calculate_size (buffer_list);
1228 
1229         GST_LOG_OBJECT (appsrc, "have buffer list %p of size %u, %u buffers",
1230             buffer_list, buf_size, gst_buffer_list_length (buffer_list));
1231 
1232         gst_base_src_submit_buffer_list (bsrc, buffer_list);
1233         *buf = NULL;
1234       }
1235 
1236       priv->queued_bytes -= buf_size;
1237 
1238       /* only update the offset when in random_access mode */
1239       if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1240         priv->offset += buf_size;
1241 
1242       /* signal that we removed an item */
1243       if ((priv->wait_status & APP_WAITING))
1244         g_cond_broadcast (&priv->cond);
1245 
1246       /* see if we go lower than the min-percent */
1247       if (priv->min_percent && priv->max_bytes) {
1248         if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
1249           /* ignore flushing state, we got a buffer and we will return it now.
1250            * Errors will be handled in the next round */
1251           gst_app_src_emit_need_data (appsrc, size);
1252       }
1253       ret = GST_FLOW_OK;
1254       break;
1255     } else {
1256       gst_app_src_emit_need_data (appsrc, size);
1257 
1258       /* we can be flushing now because we released the lock above */
1259       if (G_UNLIKELY (priv->flushing))
1260         goto flushing;
1261 
1262       /* if we have a buffer now, continue the loop and try to return it. In
1263        * random-access mode (where a buffer is normally pushed in the above
1264        * signal) we can still be empty because the pushed buffer got flushed or
1265        * when the application pushes the requested buffer later, we support both
1266        * possibilities. */
1267       if (!gst_queue_array_is_empty (priv->queue))
1268         continue;
1269 
1270       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1271     }
1272 
1273     /* check EOS */
1274     if (G_UNLIKELY (priv->is_eos))
1275       goto eos;
1276 
1277     /* nothing to return, wait a while for new data or flushing. */
1278     priv->wait_status |= STREAM_WAITING;
1279     g_cond_wait (&priv->cond, &priv->mutex);
1280     priv->wait_status &= ~STREAM_WAITING;
1281   }
1282   g_mutex_unlock (&priv->mutex);
1283   return ret;
1284 
1285   /* ERRORS */
1286 flushing:
1287   {
1288     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1289     g_mutex_unlock (&priv->mutex);
1290     return GST_FLOW_FLUSHING;
1291   }
1292 eos:
1293   {
1294     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1295     g_mutex_unlock (&priv->mutex);
1296     return GST_FLOW_EOS;
1297   }
1298 seek_error:
1299   {
1300     g_mutex_unlock (&priv->mutex);
1301     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1302         GST_ERROR_SYSTEM);
1303     return GST_FLOW_ERROR;
1304   }
1305 }
1306 
1307 /* external API */
1308 
1309 /**
1310  * gst_app_src_set_caps:
1311  * @appsrc: a #GstAppSrc
1312  * @caps: caps to set
1313  *
1314  * Set the capabilities on the appsrc element.  This function takes
1315  * a copy of the caps structure. After calling this method, the source will
1316  * only produce caps that match @caps. @caps must be fixed and the caps on the
1317  * buffers must match the caps or left NULL.
1318  */
1319 void
gst_app_src_set_caps(GstAppSrc * appsrc,const GstCaps * caps)1320 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1321 {
1322   GstAppSrcPrivate *priv;
1323   gboolean caps_changed;
1324 
1325   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1326 
1327   priv = appsrc->priv;
1328 
1329   g_mutex_lock (&priv->mutex);
1330 
1331   GST_OBJECT_LOCK (appsrc);
1332   if (caps && priv->last_caps)
1333     caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
1334   else
1335     caps_changed = (caps != priv->last_caps);
1336 
1337   if (caps_changed) {
1338     GstCaps *new_caps;
1339     gpointer t;
1340 
1341     new_caps = caps ? gst_caps_copy (caps) : NULL;
1342     GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1343 
1344     while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
1345       gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
1346     }
1347     gst_queue_array_push_tail (priv->queue, new_caps);
1348     gst_caps_replace (&priv->last_caps, new_caps);
1349   }
1350 
1351   GST_OBJECT_UNLOCK (appsrc);
1352 
1353   g_mutex_unlock (&priv->mutex);
1354 }
1355 
1356 /**
1357  * gst_app_src_get_caps:
1358  * @appsrc: a #GstAppSrc
1359  *
1360  * Get the configured caps on @appsrc.
1361  *
1362  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1363  */
1364 GstCaps *
gst_app_src_get_caps(GstAppSrc * appsrc)1365 gst_app_src_get_caps (GstAppSrc * appsrc)
1366 {
1367 
1368   GstCaps *caps;
1369 
1370   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1371 
1372   GST_OBJECT_LOCK (appsrc);
1373   if ((caps = appsrc->priv->last_caps))
1374     gst_caps_ref (caps);
1375   GST_OBJECT_UNLOCK (appsrc);
1376 
1377   return caps;
1378 
1379 }
1380 
1381 /**
1382  * gst_app_src_set_size:
1383  * @appsrc: a #GstAppSrc
1384  * @size: the size to set
1385  *
1386  * Set the size of the stream in bytes. A value of -1 means that the size is
1387  * not known.
1388  */
1389 void
gst_app_src_set_size(GstAppSrc * appsrc,gint64 size)1390 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1391 {
1392   GstAppSrcPrivate *priv;
1393 
1394   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1395 
1396   priv = appsrc->priv;
1397 
1398   GST_OBJECT_LOCK (appsrc);
1399   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1400   priv->size = size;
1401   GST_OBJECT_UNLOCK (appsrc);
1402 }
1403 
1404 /**
1405  * gst_app_src_get_size:
1406  * @appsrc: a #GstAppSrc
1407  *
1408  * Get the size of the stream in bytes. A value of -1 means that the size is
1409  * not known.
1410  *
1411  * Returns: the size of the stream previously set with gst_app_src_set_size();
1412  */
1413 gint64
gst_app_src_get_size(GstAppSrc * appsrc)1414 gst_app_src_get_size (GstAppSrc * appsrc)
1415 {
1416   gint64 size;
1417   GstAppSrcPrivate *priv;
1418 
1419   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1420 
1421   priv = appsrc->priv;
1422 
1423   GST_OBJECT_LOCK (appsrc);
1424   size = priv->size;
1425   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1426   GST_OBJECT_UNLOCK (appsrc);
1427 
1428   return size;
1429 }
1430 
1431 /**
1432  * gst_app_src_set_duration:
1433  * @appsrc: a #GstAppSrc
1434  * @duration: the duration to set
1435  *
1436  * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1437  * not known.
1438  *
1439  * Since: 1.10
1440  */
1441 void
gst_app_src_set_duration(GstAppSrc * appsrc,GstClockTime duration)1442 gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
1443 {
1444   GstAppSrcPrivate *priv;
1445 
1446   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1447 
1448   priv = appsrc->priv;
1449 
1450   GST_OBJECT_LOCK (appsrc);
1451   GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
1452       GST_TIME_ARGS (duration));
1453   priv->duration = duration;
1454   GST_OBJECT_UNLOCK (appsrc);
1455 }
1456 
1457 /**
1458  * gst_app_src_get_duration:
1459  * @appsrc: a #GstAppSrc
1460  *
1461  * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1462  * not known.
1463  *
1464  * Returns: the duration of the stream previously set with gst_app_src_set_duration();
1465  *
1466  * Since: 1.10
1467  */
1468 GstClockTime
gst_app_src_get_duration(GstAppSrc * appsrc)1469 gst_app_src_get_duration (GstAppSrc * appsrc)
1470 {
1471   GstClockTime duration;
1472   GstAppSrcPrivate *priv;
1473 
1474   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
1475 
1476   priv = appsrc->priv;
1477 
1478   GST_OBJECT_LOCK (appsrc);
1479   duration = priv->duration;
1480   GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
1481       GST_TIME_ARGS (duration));
1482   GST_OBJECT_UNLOCK (appsrc);
1483 
1484   return duration;
1485 }
1486 
1487 /**
1488  * gst_app_src_set_stream_type:
1489  * @appsrc: a #GstAppSrc
1490  * @type: the new state
1491  *
1492  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1493  * be connected to.
1494  *
1495  * A stream_type stream
1496  */
1497 void
gst_app_src_set_stream_type(GstAppSrc * appsrc,GstAppStreamType type)1498 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1499 {
1500   GstAppSrcPrivate *priv;
1501 
1502   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1503 
1504   priv = appsrc->priv;
1505 
1506   GST_OBJECT_LOCK (appsrc);
1507   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1508   priv->stream_type = type;
1509   GST_OBJECT_UNLOCK (appsrc);
1510 }
1511 
1512 /**
1513  * gst_app_src_get_stream_type:
1514  * @appsrc: a #GstAppSrc
1515  *
1516  * Get the stream type. Control the stream type of @appsrc
1517  * with gst_app_src_set_stream_type().
1518  *
1519  * Returns: the stream type.
1520  */
1521 GstAppStreamType
gst_app_src_get_stream_type(GstAppSrc * appsrc)1522 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1523 {
1524   gboolean stream_type;
1525   GstAppSrcPrivate *priv;
1526 
1527   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1528 
1529   priv = appsrc->priv;
1530 
1531   GST_OBJECT_LOCK (appsrc);
1532   stream_type = priv->stream_type;
1533   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1534   GST_OBJECT_UNLOCK (appsrc);
1535 
1536   return stream_type;
1537 }
1538 
1539 /**
1540  * gst_app_src_set_max_bytes:
1541  * @appsrc: a #GstAppSrc
1542  * @max: the maximum number of bytes to queue
1543  *
1544  * Set the maximum amount of bytes that can be queued in @appsrc.
1545  * After the maximum amount of bytes are queued, @appsrc will emit the
1546  * "enough-data" signal.
1547  */
1548 void
gst_app_src_set_max_bytes(GstAppSrc * appsrc,guint64 max)1549 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1550 {
1551   GstAppSrcPrivate *priv;
1552 
1553   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1554 
1555   priv = appsrc->priv;
1556 
1557   g_mutex_lock (&priv->mutex);
1558   if (max != priv->max_bytes) {
1559     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1560     priv->max_bytes = max;
1561     /* signal the change */
1562     g_cond_broadcast (&priv->cond);
1563   }
1564   g_mutex_unlock (&priv->mutex);
1565 }
1566 
1567 /**
1568  * gst_app_src_get_max_bytes:
1569  * @appsrc: a #GstAppSrc
1570  *
1571  * Get the maximum amount of bytes that can be queued in @appsrc.
1572  *
1573  * Returns: The maximum amount of bytes that can be queued.
1574  */
1575 guint64
gst_app_src_get_max_bytes(GstAppSrc * appsrc)1576 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1577 {
1578   guint64 result;
1579   GstAppSrcPrivate *priv;
1580 
1581   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1582 
1583   priv = appsrc->priv;
1584 
1585   g_mutex_lock (&priv->mutex);
1586   result = priv->max_bytes;
1587   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1588   g_mutex_unlock (&priv->mutex);
1589 
1590   return result;
1591 }
1592 
1593 /**
1594  * gst_app_src_get_current_level_bytes:
1595  * @appsrc: a #GstAppSrc
1596  *
1597  * Get the number of currently queued bytes inside @appsrc.
1598  *
1599  * Returns: The number of currently queued bytes.
1600  *
1601  * Since: 1.2
1602  */
1603 guint64
gst_app_src_get_current_level_bytes(GstAppSrc * appsrc)1604 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
1605 {
1606   gint64 queued;
1607   GstAppSrcPrivate *priv;
1608 
1609   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1610 
1611   priv = appsrc->priv;
1612 
1613   GST_OBJECT_LOCK (appsrc);
1614   queued = priv->queued_bytes;
1615   GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
1616       queued);
1617   GST_OBJECT_UNLOCK (appsrc);
1618 
1619   return queued;
1620 }
1621 
1622 static void
gst_app_src_set_latencies(GstAppSrc * appsrc,gboolean do_min,guint64 min,gboolean do_max,guint64 max)1623 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1624     gboolean do_max, guint64 max)
1625 {
1626   GstAppSrcPrivate *priv = appsrc->priv;
1627   gboolean changed = FALSE;
1628 
1629   g_mutex_lock (&priv->mutex);
1630   if (do_min && priv->min_latency != min) {
1631     priv->min_latency = min;
1632     changed = TRUE;
1633   }
1634   if (do_max && priv->max_latency != max) {
1635     priv->max_latency = max;
1636     changed = TRUE;
1637   }
1638   g_mutex_unlock (&priv->mutex);
1639 
1640   if (changed) {
1641     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1642     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1643         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1644   }
1645 }
1646 
1647 /**
1648  * gst_app_src_set_latency:
1649  * @appsrc: a #GstAppSrc
1650  * @min: the min latency
1651  * @max: the max latency
1652  *
1653  * Configure the @min and @max latency in @src. If @min is set to -1, the
1654  * default latency calculations for pseudo-live sources will be used.
1655  */
1656 void
gst_app_src_set_latency(GstAppSrc * appsrc,guint64 min,guint64 max)1657 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1658 {
1659   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1660 }
1661 
1662 /**
1663  * gst_app_src_get_latency:
1664  * @appsrc: a #GstAppSrc
1665  * @min: (out): the min latency
1666  * @max: (out): the max latency
1667  *
1668  * Retrieve the min and max latencies in @min and @max respectively.
1669  */
1670 void
gst_app_src_get_latency(GstAppSrc * appsrc,guint64 * min,guint64 * max)1671 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1672 {
1673   GstAppSrcPrivate *priv;
1674 
1675   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1676 
1677   priv = appsrc->priv;
1678 
1679   g_mutex_lock (&priv->mutex);
1680   if (min)
1681     *min = priv->min_latency;
1682   if (max)
1683     *max = priv->max_latency;
1684   g_mutex_unlock (&priv->mutex);
1685 }
1686 
1687 /**
1688  * gst_app_src_set_emit_signals:
1689  * @appsrc: a #GstAppSrc
1690  * @emit: the new state
1691  *
1692  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1693  * by default disabled because signal emission is expensive and unneeded when
1694  * the application prefers to operate in pull mode.
1695  */
1696 void
gst_app_src_set_emit_signals(GstAppSrc * appsrc,gboolean emit)1697 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1698 {
1699   GstAppSrcPrivate *priv;
1700 
1701   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1702 
1703   priv = appsrc->priv;
1704 
1705   g_mutex_lock (&priv->mutex);
1706   priv->emit_signals = emit;
1707   g_mutex_unlock (&priv->mutex);
1708 }
1709 
1710 /**
1711  * gst_app_src_get_emit_signals:
1712  * @appsrc: a #GstAppSrc
1713  *
1714  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1715  *
1716  * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
1717  * signals.
1718  */
1719 gboolean
gst_app_src_get_emit_signals(GstAppSrc * appsrc)1720 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1721 {
1722   gboolean result;
1723   GstAppSrcPrivate *priv;
1724 
1725   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1726 
1727   priv = appsrc->priv;
1728 
1729   g_mutex_lock (&priv->mutex);
1730   result = priv->emit_signals;
1731   g_mutex_unlock (&priv->mutex);
1732 
1733   return result;
1734 }
1735 
1736 static GstFlowReturn
gst_app_src_push_internal(GstAppSrc * appsrc,GstBuffer * buffer,GstBufferList * buflist,gboolean steal_ref)1737 gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
1738     GstBufferList * buflist, gboolean steal_ref)
1739 {
1740   gboolean first = TRUE;
1741   GstAppSrcPrivate *priv;
1742 
1743   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1744 
1745   priv = appsrc->priv;
1746 
1747   if (buffer != NULL)
1748     g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1749   else
1750     g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
1751 
1752   if (buflist != NULL) {
1753     if (gst_buffer_list_length (buflist) == 0)
1754       return GST_FLOW_OK;
1755 
1756     buffer = gst_buffer_list_get (buflist, 0);
1757   }
1758 
1759   if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
1760       GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
1761       gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
1762     GstClock *clock;
1763 
1764     clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
1765     if (clock) {
1766       GstClockTime now;
1767       GstClockTime base_time =
1768           gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
1769 
1770       now = gst_clock_get_time (clock);
1771       if (now > base_time)
1772         now -= base_time;
1773       else
1774         now = 0;
1775       gst_object_unref (clock);
1776 
1777       if (buflist == NULL) {
1778         if (!steal_ref) {
1779           buffer = gst_buffer_copy (buffer);
1780           steal_ref = TRUE;
1781         } else {
1782           buffer = gst_buffer_make_writable (buffer);
1783         }
1784       } else {
1785         if (!steal_ref) {
1786           buflist = gst_buffer_list_copy (buflist);
1787           steal_ref = TRUE;
1788         } else {
1789           buflist = gst_buffer_list_make_writable (buflist);
1790         }
1791         buffer = gst_buffer_list_get_writable (buflist, 0);
1792       }
1793 
1794       GST_BUFFER_PTS (buffer) = now;
1795       GST_BUFFER_DTS (buffer) = now;
1796     } else {
1797       GST_WARNING_OBJECT (appsrc,
1798           "do-timestamp=TRUE but buffers are provided before "
1799           "reaching the PLAYING state and having a clock. Timestamps will "
1800           "not be accurate!");
1801     }
1802   }
1803 
1804   g_mutex_lock (&priv->mutex);
1805 
1806   while (TRUE) {
1807     /* can't accept buffers when we are flushing or EOS */
1808     if (priv->flushing)
1809       goto flushing;
1810 
1811     if (priv->is_eos)
1812       goto eos;
1813 
1814     if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1815       GST_DEBUG_OBJECT (appsrc,
1816           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1817           priv->queued_bytes, priv->max_bytes);
1818 
1819       if (first) {
1820         gboolean emit;
1821 
1822         emit = priv->emit_signals;
1823         /* only signal on the first push */
1824         g_mutex_unlock (&priv->mutex);
1825 
1826         if (priv->callbacks.enough_data)
1827           priv->callbacks.enough_data (appsrc, priv->user_data);
1828         else if (emit)
1829           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1830               NULL);
1831 
1832         g_mutex_lock (&priv->mutex);
1833         /* continue to check for flushing/eos after releasing the lock */
1834         first = FALSE;
1835         continue;
1836       }
1837       if (priv->block) {
1838         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1839         /* we are filled, wait until a buffer gets popped or when we
1840          * flush. */
1841         priv->wait_status |= APP_WAITING;
1842         g_cond_wait (&priv->cond, &priv->mutex);
1843         priv->wait_status &= ~APP_WAITING;
1844       } else {
1845         /* no need to wait for free space, we just pump more data into the
1846          * queue hoping that the caller reacts to the enough-data signal and
1847          * stops pushing buffers. */
1848         break;
1849       }
1850     } else
1851       break;
1852   }
1853 
1854   if (buflist != NULL) {
1855     GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
1856     if (!steal_ref)
1857       gst_buffer_list_ref (buflist);
1858     gst_queue_array_push_tail (priv->queue, buflist);
1859     priv->queued_bytes += gst_buffer_list_calculate_size (buflist);
1860   } else {
1861     GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1862     if (!steal_ref)
1863       gst_buffer_ref (buffer);
1864     gst_queue_array_push_tail (priv->queue, buffer);
1865     priv->queued_bytes += gst_buffer_get_size (buffer);
1866   }
1867 
1868   if ((priv->wait_status & STREAM_WAITING))
1869     g_cond_broadcast (&priv->cond);
1870 
1871   g_mutex_unlock (&priv->mutex);
1872 
1873   return GST_FLOW_OK;
1874 
1875   /* ERRORS */
1876 flushing:
1877   {
1878     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1879     if (steal_ref)
1880       gst_buffer_unref (buffer);
1881     g_mutex_unlock (&priv->mutex);
1882     return GST_FLOW_FLUSHING;
1883   }
1884 eos:
1885   {
1886     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1887     if (steal_ref)
1888       gst_buffer_unref (buffer);
1889     g_mutex_unlock (&priv->mutex);
1890     return GST_FLOW_EOS;
1891   }
1892 }
1893 
1894 static GstFlowReturn
gst_app_src_push_buffer_full(GstAppSrc * appsrc,GstBuffer * buffer,gboolean steal_ref)1895 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1896     gboolean steal_ref)
1897 {
1898   return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
1899 }
1900 
1901 static GstFlowReturn
gst_app_src_push_sample_internal(GstAppSrc * appsrc,GstSample * sample)1902 gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
1903 {
1904   GstBufferList *buffer_list;
1905   GstBuffer *buffer;
1906   GstCaps *caps;
1907 
1908   g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
1909 
1910   caps = gst_sample_get_caps (sample);
1911   if (caps != NULL) {
1912     gst_app_src_set_caps (appsrc, caps);
1913   } else {
1914     GST_WARNING_OBJECT (appsrc, "received sample without caps");
1915   }
1916 
1917   buffer = gst_sample_get_buffer (sample);
1918   if (buffer != NULL)
1919     return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1920 
1921   buffer_list = gst_sample_get_buffer_list (sample);
1922   if (buffer_list != NULL)
1923     return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
1924 
1925   GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
1926   return GST_FLOW_OK;
1927 }
1928 
1929 /**
1930  * gst_app_src_push_buffer:
1931  * @appsrc: a #GstAppSrc
1932  * @buffer: (transfer full): a #GstBuffer to push
1933  *
1934  * Adds a buffer to the queue of buffers that the appsrc element will
1935  * push to its source pad.  This function takes ownership of the buffer.
1936  *
1937  * When the block property is TRUE, this function can block until free
1938  * space becomes available in the queue.
1939  *
1940  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1941  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
1942  * #GST_FLOW_EOS when EOS occured.
1943  */
1944 GstFlowReturn
gst_app_src_push_buffer(GstAppSrc * appsrc,GstBuffer * buffer)1945 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1946 {
1947   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1948 }
1949 
1950 /**
1951  * gst_app_src_push_buffer_list:
1952  * @appsrc: a #GstAppSrc
1953  * @buffer_list: (transfer full): a #GstBufferList to push
1954  *
1955  * Adds a buffer list to the queue of buffers and buffer lists that the
1956  * appsrc element will push to its source pad.  This function takes ownership
1957  * of @buffer_list.
1958  *
1959  * When the block property is TRUE, this function can block until free
1960  * space becomes available in the queue.
1961  *
1962  * Returns: #GST_FLOW_OK when the buffer list was successfuly queued.
1963  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
1964  * #GST_FLOW_EOS when EOS occured.
1965  *
1966  * Since: 1.14
1967  */
1968 GstFlowReturn
gst_app_src_push_buffer_list(GstAppSrc * appsrc,GstBufferList * buffer_list)1969 gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
1970 {
1971   return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
1972 }
1973 
1974 /**
1975  * gst_app_src_push_sample:
1976  * @appsrc: a #GstAppSrc
1977  * @sample: (transfer none): a #GstSample from which buffer and caps may be
1978  * extracted
1979  *
1980  * Extract a buffer from the provided sample and adds it to the queue of
1981  * buffers that the appsrc element will push to its source pad. Any
1982  * previous caps that were set on appsrc will be replaced by the caps
1983  * associated with the sample if not equal.
1984  *
1985  * This function does not take ownership of the
1986  * sample so the sample needs to be unreffed after calling this function.
1987  *
1988  * When the block property is TRUE, this function can block until free
1989  * space becomes available in the queue.
1990  *
1991  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1992  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
1993  * #GST_FLOW_EOS when EOS occured.
1994  *
1995  * Since: 1.6
1996  *
1997  */
1998 GstFlowReturn
gst_app_src_push_sample(GstAppSrc * appsrc,GstSample * sample)1999 gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
2000 {
2001   return gst_app_src_push_sample_internal (appsrc, sample);
2002 }
2003 
2004 /* push a buffer without stealing the ref of the buffer. This is used for the
2005  * action signal. */
2006 static GstFlowReturn
gst_app_src_push_buffer_action(GstAppSrc * appsrc,GstBuffer * buffer)2007 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
2008 {
2009   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2010 }
2011 
2012 /* push a buffer list without stealing the ref of the buffer list. This is
2013  * used for the action signal. */
2014 static GstFlowReturn
gst_app_src_push_buffer_list_action(GstAppSrc * appsrc,GstBufferList * buffer_list)2015 gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
2016     GstBufferList * buffer_list)
2017 {
2018   return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2019 }
2020 
2021 /* push a sample without stealing the ref. This is used for the
2022  * action signal. */
2023 static GstFlowReturn
gst_app_src_push_sample_action(GstAppSrc * appsrc,GstSample * sample)2024 gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
2025 {
2026   return gst_app_src_push_sample_internal (appsrc, sample);
2027 }
2028 
2029 /**
2030  * gst_app_src_end_of_stream:
2031  * @appsrc: a #GstAppSrc
2032  *
2033  * Indicates to the appsrc element that the last buffer queued in the
2034  * element is the last buffer of the stream.
2035  *
2036  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
2037  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2038  */
2039 GstFlowReturn
gst_app_src_end_of_stream(GstAppSrc * appsrc)2040 gst_app_src_end_of_stream (GstAppSrc * appsrc)
2041 {
2042   GstAppSrcPrivate *priv;
2043 
2044   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2045 
2046   priv = appsrc->priv;
2047 
2048   g_mutex_lock (&priv->mutex);
2049   /* can't accept buffers when we are flushing. We can accept them when we are
2050    * EOS although it will not do anything. */
2051   if (priv->flushing)
2052     goto flushing;
2053 
2054   GST_DEBUG_OBJECT (appsrc, "sending EOS");
2055   priv->is_eos = TRUE;
2056   g_cond_broadcast (&priv->cond);
2057   g_mutex_unlock (&priv->mutex);
2058 
2059   return GST_FLOW_OK;
2060 
2061   /* ERRORS */
2062 flushing:
2063   {
2064     g_mutex_unlock (&priv->mutex);
2065     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
2066     return GST_FLOW_FLUSHING;
2067   }
2068 }
2069 
2070 /**
2071  * gst_app_src_set_callbacks: (skip)
2072  * @appsrc: a #GstAppSrc
2073  * @callbacks: the callbacks
2074  * @user_data: a user_data argument for the callbacks
2075  * @notify: a destroy notify function
2076  *
2077  * Set callbacks which will be executed when data is needed, enough data has
2078  * been collected or when a seek should be performed.
2079  * This is an alternative to using the signals, it has lower overhead and is thus
2080  * less expensive, but also less flexible.
2081  *
2082  * If callbacks are installed, no signals will be emitted for performance
2083  * reasons.
2084  */
2085 void
gst_app_src_set_callbacks(GstAppSrc * appsrc,GstAppSrcCallbacks * callbacks,gpointer user_data,GDestroyNotify notify)2086 gst_app_src_set_callbacks (GstAppSrc * appsrc,
2087     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
2088 {
2089   GDestroyNotify old_notify;
2090   GstAppSrcPrivate *priv;
2091 
2092   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2093   g_return_if_fail (callbacks != NULL);
2094 
2095   priv = appsrc->priv;
2096 
2097   GST_OBJECT_LOCK (appsrc);
2098   old_notify = priv->notify;
2099 
2100   if (old_notify) {
2101     gpointer old_data;
2102 
2103     old_data = priv->user_data;
2104 
2105     priv->user_data = NULL;
2106     priv->notify = NULL;
2107     GST_OBJECT_UNLOCK (appsrc);
2108 
2109     old_notify (old_data);
2110 
2111     GST_OBJECT_LOCK (appsrc);
2112   }
2113   priv->callbacks = *callbacks;
2114   priv->user_data = user_data;
2115   priv->notify = notify;
2116   GST_OBJECT_UNLOCK (appsrc);
2117 }
2118 
2119 /*** GSTURIHANDLER INTERFACE *************************************************/
2120 
2121 static GstURIType
gst_app_src_uri_get_type(GType type)2122 gst_app_src_uri_get_type (GType type)
2123 {
2124   return GST_URI_SRC;
2125 }
2126 
2127 static const gchar *const *
gst_app_src_uri_get_protocols(GType type)2128 gst_app_src_uri_get_protocols (GType type)
2129 {
2130   static const gchar *protocols[] = { "appsrc", NULL };
2131 
2132   return protocols;
2133 }
2134 
2135 static gchar *
gst_app_src_uri_get_uri(GstURIHandler * handler)2136 gst_app_src_uri_get_uri (GstURIHandler * handler)
2137 {
2138   GstAppSrc *appsrc = GST_APP_SRC (handler);
2139 
2140   return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
2141 }
2142 
2143 static gboolean
gst_app_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)2144 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
2145     GError ** error)
2146 {
2147   GstAppSrc *appsrc = GST_APP_SRC (handler);
2148 
2149   g_free (appsrc->priv->uri);
2150   appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
2151 
2152   return TRUE;
2153 }
2154 
2155 static void
gst_app_src_uri_handler_init(gpointer g_iface,gpointer iface_data)2156 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
2157 {
2158   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
2159 
2160   iface->get_type = gst_app_src_uri_get_type;
2161   iface->get_protocols = gst_app_src_uri_get_protocols;
2162   iface->get_uri = gst_app_src_uri_get_uri;
2163   iface->set_uri = gst_app_src_uri_set_uri;
2164 }
2165 
2166 static gboolean
gst_app_src_event(GstBaseSrc * src,GstEvent * event)2167 gst_app_src_event (GstBaseSrc * src, GstEvent * event)
2168 {
2169   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
2170   GstAppSrcPrivate *priv = appsrc->priv;
2171 
2172   switch (GST_EVENT_TYPE (event)) {
2173     case GST_EVENT_FLUSH_STOP:
2174       g_mutex_lock (&priv->mutex);
2175       priv->is_eos = FALSE;
2176       g_mutex_unlock (&priv->mutex);
2177       break;
2178     default:
2179       break;
2180   }
2181 
2182   return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
2183 }
2184